summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs/src/transport.c')
-rw-r--r--libglusterfs/src/transport.c101
1 files changed, 99 insertions, 2 deletions
diff --git a/libglusterfs/src/transport.c b/libglusterfs/src/transport.c
index 2c2894d90..244aa960b 100644
--- a/libglusterfs/src/transport.c
+++ b/libglusterfs/src/transport.c
@@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len,
struct iovec *vector, int count,
struct iobref *iobref)
{
- int32_t ret = -1;
+ int32_t ret = -1;
+ transport_t *peer_trans = NULL;
+ struct iobuf *iobuf = NULL;
+ struct transport_msg *msg = NULL;
+
+ if (this->peer_trans) {
+ peer_trans = this->peer_trans;
+
+ msg = CALLOC (1, sizeof (*msg));
+ if (!msg) {
+ return -ENOMEM;
+ }
+
+ msg->hdr = buf;
+ msg->hdrlen = len;
+
+ if (vector) {
+ iobuf = iobuf_get (this->xl->ctx->iobuf_pool);
+ if (!iobuf) {
+ FREE (msg->hdr);
+ FREE (msg);
+ return -ENOMEM;
+ }
+
+ iov_unload (iobuf->ptr, vector, count);
+ msg->iobuf = iobuf;
+ }
+
+ pthread_mutex_lock (&peer_trans->handover.mutex);
+ {
+ list_add_tail (&msg->list, &peer_trans->handover.msgs);
+ pthread_cond_broadcast (&peer_trans->handover.cond);
+ }
+ pthread_mutex_unlock (&peer_trans->handover.mutex);
+
+ return 0;
+ }
GF_VALIDATE_OR_GOTO("transport", this, fail);
GF_VALIDATE_OR_GOTO("transport", this->ops, fail);
@@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
int32_t ret = -1;
GF_VALIDATE_OR_GOTO("transport", this, fail);
-
+
+ if (this->peer_trans) {
+ *hdr_p = this->handover.msg->hdr;
+ *hdrlen_p = this->handover.msg->hdrlen;
+ *iobuf_p = this->handover.msg->iobuf;
+
+ return 0;
+ }
+
ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p);
fail:
return ret;
@@ -338,3 +382,56 @@ fail:
return ret;
}
+
+void *
+transport_peerproc (void *trans_data)
+{
+ transport_t *trans = NULL;
+ struct transport_msg *msg = NULL;
+
+ trans = trans_data;
+
+ while (1) {
+ pthread_mutex_lock (&trans->handover.mutex);
+ {
+ while (list_empty (&trans->handover.msgs))
+ pthread_cond_wait (&trans->handover.cond,
+ &trans->handover.mutex);
+
+ msg = list_entry (trans->handover.msgs.next,
+ struct transport_msg, list);
+
+ list_del_init (&msg->list);
+ }
+ pthread_mutex_unlock (&trans->handover.mutex);
+
+ trans->handover.msg = msg;
+
+ trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans);
+
+ FREE (msg);
+ }
+}
+
+
+int
+transport_setpeer (transport_t *trans, transport_t *peer_trans)
+{
+ trans->peer_trans = peer_trans;
+
+ INIT_LIST_HEAD (&trans->handover.msgs);
+ pthread_cond_init (&trans->handover.cond, NULL);
+ pthread_mutex_init (&trans->handover.mutex, NULL);
+ pthread_create (&trans->handover.thread, NULL,
+ transport_peerproc, trans);
+
+ peer_trans->peer_trans = trans;
+
+ INIT_LIST_HEAD (&peer_trans->handover.msgs);
+ pthread_cond_init (&peer_trans->handover.cond, NULL);
+ pthread_mutex_init (&peer_trans->handover.mutex, NULL);
+ pthread_create (&peer_trans->handover.thread, NULL,
+ transport_peerproc, peer_trans);
+
+ return 0;
+}