diff options
author | Anand V. Avati <avati@amp.gluster.com> | 2009-05-07 01:24:41 +0530 |
---|---|---|
committer | Anand V. Avati <avati@amp.gluster.com> | 2009-05-07 01:24:41 +0530 |
commit | f9f5519b66a25651eb03de577f68d481abdd4c40 (patch) | |
tree | b87c19fa62033da1dbc8a11ce6dbd5a9cb3b3ad8 /libglusterfs/src | |
parent | 12eb832e255a1abb90434ab3e0d1e1632ae7ce03 (diff) |
transport shortcut b/w client and server
This patch makes the server pass back the transport pointer of the client. If the UUID matches, the client makes the local transport 'shortcut' with the remote transport (pointer received from server)
The shortcut simulates a socket queue. Instead of serialized messages going over the network and getting queued in the tcp socket queue, the messages get queued in a transport specific queue picked by a polling thread.
Diffstat (limited to 'libglusterfs/src')
-rw-r--r-- | libglusterfs/src/transport.c | 101 | ||||
-rw-r--r-- | libglusterfs/src/transport.h | 19 |
2 files changed, 118 insertions, 2 deletions
diff --git a/libglusterfs/src/transport.c b/libglusterfs/src/transport.c index 2c2894d9031..244aa960b78 100644 --- a/libglusterfs/src/transport.c +++ b/libglusterfs/src/transport.c @@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len, struct iovec *vector, int count, struct iobref *iobref) { - int32_t ret = -1; + int32_t ret = -1; + transport_t *peer_trans = NULL; + struct iobuf *iobuf = NULL; + struct transport_msg *msg = NULL; + + if (this->peer_trans) { + peer_trans = this->peer_trans; + + msg = CALLOC (1, sizeof (*msg)); + if (!msg) { + return -ENOMEM; + } + + msg->hdr = buf; + msg->hdrlen = len; + + if (vector) { + iobuf = iobuf_get (this->xl->ctx->iobuf_pool); + if (!iobuf) { + FREE (msg->hdr); + FREE (msg); + return -ENOMEM; + } + + iov_unload (iobuf->ptr, vector, count); + msg->iobuf = iobuf; + } + + pthread_mutex_lock (&peer_trans->handover.mutex); + { + list_add_tail (&msg->list, &peer_trans->handover.msgs); + pthread_cond_broadcast (&peer_trans->handover.cond); + } + pthread_mutex_unlock (&peer_trans->handover.mutex); + + return 0; + } GF_VALIDATE_OR_GOTO("transport", this, fail); GF_VALIDATE_OR_GOTO("transport", this->ops, fail); @@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p, int32_t ret = -1; GF_VALIDATE_OR_GOTO("transport", this, fail); - + + if (this->peer_trans) { + *hdr_p = this->handover.msg->hdr; + *hdrlen_p = this->handover.msg->hdrlen; + *iobuf_p = this->handover.msg->iobuf; + + return 0; + } + ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p); fail: return ret; @@ -338,3 +382,56 @@ fail: return ret; } + +void * +transport_peerproc (void *trans_data) +{ + transport_t *trans = NULL; + struct transport_msg *msg = NULL; + + trans = trans_data; + + while (1) { + pthread_mutex_lock (&trans->handover.mutex); + { + while (list_empty (&trans->handover.msgs)) + pthread_cond_wait (&trans->handover.cond, + &trans->handover.mutex); + + msg = list_entry (trans->handover.msgs.next, + struct transport_msg, list); + + list_del_init (&msg->list); + } + pthread_mutex_unlock (&trans->handover.mutex); + + trans->handover.msg = msg; + + trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans); + + FREE (msg); + } +} + + +int +transport_setpeer (transport_t *trans, transport_t *peer_trans) +{ + trans->peer_trans = peer_trans; + + INIT_LIST_HEAD (&trans->handover.msgs); + pthread_cond_init (&trans->handover.cond, NULL); + pthread_mutex_init (&trans->handover.mutex, NULL); + pthread_create (&trans->handover.thread, NULL, + transport_peerproc, trans); + + peer_trans->peer_trans = trans; + + INIT_LIST_HEAD (&peer_trans->handover.msgs); + pthread_cond_init (&peer_trans->handover.cond, NULL); + pthread_mutex_init (&peer_trans->handover.mutex, NULL); + pthread_create (&peer_trans->handover.thread, NULL, + transport_peerproc, peer_trans); + + return 0; +} diff --git a/libglusterfs/src/transport.h b/libglusterfs/src/transport.h index a7cd5ec8f41..2d85c76cca1 100644 --- a/libglusterfs/src/transport.h +++ b/libglusterfs/src/transport.h @@ -40,6 +40,13 @@ typedef struct peer_info { char identifier[UNIX_PATH_MAX]; }peer_info_t; +struct transport_msg { + struct list_head list; + char *hdr; + int hdrlen; + struct iobuf *iobuf; +}; + struct transport { struct transport_ops *ops; void *private; @@ -55,6 +62,16 @@ struct transport { /* int (*notify) (transport_t *this, int event, void *data); */ peer_info_t peerinfo; peer_info_t myinfo; + + transport_t *peer_trans; + struct { + pthread_mutex_t mutex; + pthread_cond_t cond; + pthread_t thread; + struct list_head msgs; + struct transport_msg *msg; + } handover; + }; struct transport_ops { @@ -84,4 +101,6 @@ transport_t *transport_load (dict_t *options, xlator_t *xl); transport_t *transport_ref (transport_t *trans); int32_t transport_unref (transport_t *trans); +int transport_setpeer (transport_t *trans, transport_t *trans_peer); + #endif /* __TRANSPORT_H__ */ |