diff options
-rw-r--r-- | libglusterfs/src/transport.c | 101 | ||||
-rw-r--r-- | libglusterfs/src/transport.h | 19 | ||||
-rw-r--r-- | xlators/protocol/client/src/client-protocol.c | 10 | ||||
-rw-r--r-- | xlators/protocol/server/src/server-protocol.c | 3 |
4 files changed, 131 insertions, 2 deletions
diff --git a/libglusterfs/src/transport.c b/libglusterfs/src/transport.c index 2c2894d90..244aa960b 100644 --- a/libglusterfs/src/transport.c +++ b/libglusterfs/src/transport.c @@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len, struct iovec *vector, int count, struct iobref *iobref) { - int32_t ret = -1; + int32_t ret = -1; + transport_t *peer_trans = NULL; + struct iobuf *iobuf = NULL; + struct transport_msg *msg = NULL; + + if (this->peer_trans) { + peer_trans = this->peer_trans; + + msg = CALLOC (1, sizeof (*msg)); + if (!msg) { + return -ENOMEM; + } + + msg->hdr = buf; + msg->hdrlen = len; + + if (vector) { + iobuf = iobuf_get (this->xl->ctx->iobuf_pool); + if (!iobuf) { + FREE (msg->hdr); + FREE (msg); + return -ENOMEM; + } + + iov_unload (iobuf->ptr, vector, count); + msg->iobuf = iobuf; + } + + pthread_mutex_lock (&peer_trans->handover.mutex); + { + list_add_tail (&msg->list, &peer_trans->handover.msgs); + pthread_cond_broadcast (&peer_trans->handover.cond); + } + pthread_mutex_unlock (&peer_trans->handover.mutex); + + return 0; + } GF_VALIDATE_OR_GOTO("transport", this, fail); GF_VALIDATE_OR_GOTO("transport", this->ops, fail); @@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p, int32_t ret = -1; GF_VALIDATE_OR_GOTO("transport", this, fail); - + + if (this->peer_trans) { + *hdr_p = this->handover.msg->hdr; + *hdrlen_p = this->handover.msg->hdrlen; + *iobuf_p = this->handover.msg->iobuf; + + return 0; + } + ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p); fail: return ret; @@ -338,3 +382,56 @@ fail: return ret; } + +void * +transport_peerproc (void *trans_data) +{ + transport_t *trans = NULL; + struct transport_msg *msg = NULL; + + trans = trans_data; + + while (1) { + pthread_mutex_lock (&trans->handover.mutex); + { + while (list_empty (&trans->handover.msgs)) + pthread_cond_wait (&trans->handover.cond, + &trans->handover.mutex); + + msg = list_entry (trans->handover.msgs.next, + struct transport_msg, list); + + list_del_init (&msg->list); + } + pthread_mutex_unlock (&trans->handover.mutex); + + trans->handover.msg = msg; + + trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans); + + FREE (msg); + } +} + + +int +transport_setpeer (transport_t *trans, transport_t *peer_trans) +{ + trans->peer_trans = peer_trans; + + INIT_LIST_HEAD (&trans->handover.msgs); + pthread_cond_init (&trans->handover.cond, NULL); + pthread_mutex_init (&trans->handover.mutex, NULL); + pthread_create (&trans->handover.thread, NULL, + transport_peerproc, trans); + + peer_trans->peer_trans = trans; + + INIT_LIST_HEAD (&peer_trans->handover.msgs); + pthread_cond_init (&peer_trans->handover.cond, NULL); + pthread_mutex_init (&peer_trans->handover.mutex, NULL); + pthread_create (&peer_trans->handover.thread, NULL, + transport_peerproc, peer_trans); + + return 0; +} diff --git a/libglusterfs/src/transport.h b/libglusterfs/src/transport.h index a7cd5ec8f..2d85c76cc 100644 --- a/libglusterfs/src/transport.h +++ b/libglusterfs/src/transport.h @@ -40,6 +40,13 @@ typedef struct peer_info { char identifier[UNIX_PATH_MAX]; }peer_info_t; +struct transport_msg { + struct list_head list; + char *hdr; + int hdrlen; + struct iobuf *iobuf; +}; + struct transport { struct transport_ops *ops; void *private; @@ -55,6 +62,16 @@ struct transport { /* int (*notify) (transport_t *this, int event, void *data); */ peer_info_t peerinfo; peer_info_t myinfo; + + transport_t *peer_trans; + struct { + pthread_mutex_t mutex; + pthread_cond_t cond; + pthread_t thread; + struct list_head msgs; + struct transport_msg *msg; + } handover; + }; struct transport_ops { @@ -84,4 +101,6 @@ transport_t *transport_load (dict_t *options, xlator_t *xl); transport_t *transport_ref (transport_t *trans); int32_t transport_unref (transport_t *trans); +int transport_setpeer (transport_t *trans, transport_t *trans_peer); + #endif /* __TRANSPORT_H__ */ diff --git a/xlators/protocol/client/src/client-protocol.c b/xlators/protocol/client/src/client-protocol.c index 7a2326afd..a0a0f06fd 100644 --- a/xlators/protocol/client/src/client-protocol.c +++ b/xlators/protocol/client/src/client-protocol.c @@ -6254,6 +6254,8 @@ client_setvolume_cbk (call_frame_t *frame, int32_t op_ret = -1; int32_t op_errno = EINVAL; int32_t dict_len = 0; + transport_t *peer_trans = NULL; + uint64_t peer_trans_int = 0; trans = frame->local; frame->local = NULL; @@ -6321,14 +6323,22 @@ client_setvolume_cbk (call_frame_t *frame, ctx = get_global_ctx_ptr (); if (process_uuid && !strcmp (ctx->process_uuid,process_uuid)) { + ret = dict_get_uint64 (reply, "transport-ptr", + &peer_trans_int); + + peer_trans = (void *) (long) (peer_trans_int); gf_log (this->name, GF_LOG_WARNING, "attaching to the local volume '%s'", remote_subvol); + transport_setpeer (trans, peer_trans); + /* TODO: */ + /* conf->child = xlator_search_by_name (this, remote_subvol); + */ } gf_log (trans->xl->name, GF_LOG_NORMAL, diff --git a/xlators/protocol/server/src/server-protocol.c b/xlators/protocol/server/src/server-protocol.c index 92cdda2bf..7fc379efb 100644 --- a/xlators/protocol/server/src/server-protocol.c +++ b/xlators/protocol/server/src/server-protocol.c @@ -7085,6 +7085,9 @@ mop_setvolume (call_frame_t *frame, xlator_t *bound_xl, ret = dict_set_str (reply, "process-uuid", xl->ctx->process_uuid); + ret = dict_set_uint64 (reply, "transport-ptr", + ((uint64_t) (long) trans)); + fail: dict_len = dict_serialized_length (reply); if (dict_len < 0) { |