summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/transport.c
diff options
context:
space:
mode:
authorAnand V. Avati <avati@amp.gluster.com>2009-05-07 01:24:41 +0530
committerAnand V. Avati <avati@amp.gluster.com>2009-05-07 01:24:41 +0530
commitf9f5519b66a25651eb03de577f68d481abdd4c40 (patch)
treeb87c19fa62033da1dbc8a11ce6dbd5a9cb3b3ad8 /libglusterfs/src/transport.c
parent12eb832e255a1abb90434ab3e0d1e1632ae7ce03 (diff)
transport shortcut b/w client and server
This patch makes the server pass back the transport pointer of the client. If the UUID matches, the client makes the local transport 'shortcut' with the remote transport (pointer received from server) The shortcut simulates a socket queue. Instead of serialized messages going over the network and getting queued in the tcp socket queue, the messages get queued in a transport specific queue picked by a polling thread.
Diffstat (limited to 'libglusterfs/src/transport.c')
-rw-r--r--libglusterfs/src/transport.c101
1 files changed, 99 insertions, 2 deletions
diff --git a/libglusterfs/src/transport.c b/libglusterfs/src/transport.c
index 2c2894d90..244aa960b 100644
--- a/libglusterfs/src/transport.c
+++ b/libglusterfs/src/transport.c
@@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len,
struct iovec *vector, int count,
struct iobref *iobref)
{
- int32_t ret = -1;
+ int32_t ret = -1;
+ transport_t *peer_trans = NULL;
+ struct iobuf *iobuf = NULL;
+ struct transport_msg *msg = NULL;
+
+ if (this->peer_trans) {
+ peer_trans = this->peer_trans;
+
+ msg = CALLOC (1, sizeof (*msg));
+ if (!msg) {
+ return -ENOMEM;
+ }
+
+ msg->hdr = buf;
+ msg->hdrlen = len;
+
+ if (vector) {
+ iobuf = iobuf_get (this->xl->ctx->iobuf_pool);
+ if (!iobuf) {
+ FREE (msg->hdr);
+ FREE (msg);
+ return -ENOMEM;
+ }
+
+ iov_unload (iobuf->ptr, vector, count);
+ msg->iobuf = iobuf;
+ }
+
+ pthread_mutex_lock (&peer_trans->handover.mutex);
+ {
+ list_add_tail (&msg->list, &peer_trans->handover.msgs);
+ pthread_cond_broadcast (&peer_trans->handover.cond);
+ }
+ pthread_mutex_unlock (&peer_trans->handover.mutex);
+
+ return 0;
+ }
GF_VALIDATE_OR_GOTO("transport", this, fail);
GF_VALIDATE_OR_GOTO("transport", this->ops, fail);
@@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
int32_t ret = -1;
GF_VALIDATE_OR_GOTO("transport", this, fail);
-
+
+ if (this->peer_trans) {
+ *hdr_p = this->handover.msg->hdr;
+ *hdrlen_p = this->handover.msg->hdrlen;
+ *iobuf_p = this->handover.msg->iobuf;
+
+ return 0;
+ }
+
ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p);
fail:
return ret;
@@ -338,3 +382,56 @@ fail:
return ret;
}
+
+void *
+transport_peerproc (void *trans_data)
+{
+ transport_t *trans = NULL;
+ struct transport_msg *msg = NULL;
+
+ trans = trans_data;
+
+ while (1) {
+ pthread_mutex_lock (&trans->handover.mutex);
+ {
+ while (list_empty (&trans->handover.msgs))
+ pthread_cond_wait (&trans->handover.cond,
+ &trans->handover.mutex);
+
+ msg = list_entry (trans->handover.msgs.next,
+ struct transport_msg, list);
+
+ list_del_init (&msg->list);
+ }
+ pthread_mutex_unlock (&trans->handover.mutex);
+
+ trans->handover.msg = msg;
+
+ trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans);
+
+ FREE (msg);
+ }
+}
+
+
+int
+transport_setpeer (transport_t *trans, transport_t *peer_trans)
+{
+ trans->peer_trans = peer_trans;
+
+ INIT_LIST_HEAD (&trans->handover.msgs);
+ pthread_cond_init (&trans->handover.cond, NULL);
+ pthread_mutex_init (&trans->handover.mutex, NULL);
+ pthread_create (&trans->handover.thread, NULL,
+ transport_peerproc, trans);
+
+ peer_trans->peer_trans = trans;
+
+ INIT_LIST_HEAD (&peer_trans->handover.msgs);
+ pthread_cond_init (&peer_trans->handover.cond, NULL);
+ pthread_mutex_init (&peer_trans->handover.mutex, NULL);
+ pthread_create (&peer_trans->handover.thread, NULL,
+ transport_peerproc, peer_trans);
+
+ return 0;
+}