diff options
Diffstat (limited to 'libglusterfs/src/transport.c')
-rw-r--r-- | libglusterfs/src/transport.c | 101 |
1 files changed, 99 insertions, 2 deletions
diff --git a/libglusterfs/src/transport.c b/libglusterfs/src/transport.c index 2c2894d90..244aa960b 100644 --- a/libglusterfs/src/transport.c +++ b/libglusterfs/src/transport.c @@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len, struct iovec *vector, int count, struct iobref *iobref) { - int32_t ret = -1; + int32_t ret = -1; + transport_t *peer_trans = NULL; + struct iobuf *iobuf = NULL; + struct transport_msg *msg = NULL; + + if (this->peer_trans) { + peer_trans = this->peer_trans; + + msg = CALLOC (1, sizeof (*msg)); + if (!msg) { + return -ENOMEM; + } + + msg->hdr = buf; + msg->hdrlen = len; + + if (vector) { + iobuf = iobuf_get (this->xl->ctx->iobuf_pool); + if (!iobuf) { + FREE (msg->hdr); + FREE (msg); + return -ENOMEM; + } + + iov_unload (iobuf->ptr, vector, count); + msg->iobuf = iobuf; + } + + pthread_mutex_lock (&peer_trans->handover.mutex); + { + list_add_tail (&msg->list, &peer_trans->handover.msgs); + pthread_cond_broadcast (&peer_trans->handover.cond); + } + pthread_mutex_unlock (&peer_trans->handover.mutex); + + return 0; + } GF_VALIDATE_OR_GOTO("transport", this, fail); GF_VALIDATE_OR_GOTO("transport", this->ops, fail); @@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p, int32_t ret = -1; GF_VALIDATE_OR_GOTO("transport", this, fail); - + + if (this->peer_trans) { + *hdr_p = this->handover.msg->hdr; + *hdrlen_p = this->handover.msg->hdrlen; + *iobuf_p = this->handover.msg->iobuf; + + return 0; + } + ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p); fail: return ret; @@ -338,3 +382,56 @@ fail: return ret; } + +void * +transport_peerproc (void *trans_data) +{ + transport_t *trans = NULL; + struct transport_msg *msg = NULL; + + trans = trans_data; + + while (1) { + pthread_mutex_lock (&trans->handover.mutex); + { + while (list_empty (&trans->handover.msgs)) + pthread_cond_wait (&trans->handover.cond, + &trans->handover.mutex); + + msg = list_entry (trans->handover.msgs.next, + struct transport_msg, list); + + list_del_init (&msg->list); + } + pthread_mutex_unlock (&trans->handover.mutex); + + trans->handover.msg = msg; + + trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans); + + FREE (msg); + } +} + + +int +transport_setpeer (transport_t *trans, transport_t *peer_trans) +{ + trans->peer_trans = peer_trans; + + INIT_LIST_HEAD (&trans->handover.msgs); + pthread_cond_init (&trans->handover.cond, NULL); + pthread_mutex_init (&trans->handover.mutex, NULL); + pthread_create (&trans->handover.thread, NULL, + transport_peerproc, trans); + + peer_trans->peer_trans = trans; + + INIT_LIST_HEAD (&peer_trans->handover.msgs); + pthread_cond_init (&peer_trans->handover.cond, NULL); + pthread_mutex_init (&peer_trans->handover.mutex, NULL); + pthread_create (&peer_trans->handover.thread, NULL, + transport_peerproc, peer_trans); + + return 0; +} |