summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libglusterfs/src/transport.c101
-rw-r--r--libglusterfs/src/transport.h19
-rw-r--r--xlators/protocol/client/src/client-protocol.c10
-rw-r--r--xlators/protocol/server/src/server-protocol.c3
4 files changed, 131 insertions, 2 deletions
diff --git a/libglusterfs/src/transport.c b/libglusterfs/src/transport.c
index 2c2894d90..244aa960b 100644
--- a/libglusterfs/src/transport.c
+++ b/libglusterfs/src/transport.c
@@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len,
struct iovec *vector, int count,
struct iobref *iobref)
{
- int32_t ret = -1;
+ int32_t ret = -1;
+ transport_t *peer_trans = NULL;
+ struct iobuf *iobuf = NULL;
+ struct transport_msg *msg = NULL;
+
+ if (this->peer_trans) {
+ peer_trans = this->peer_trans;
+
+ msg = CALLOC (1, sizeof (*msg));
+ if (!msg) {
+ return -ENOMEM;
+ }
+
+ msg->hdr = buf;
+ msg->hdrlen = len;
+
+ if (vector) {
+ iobuf = iobuf_get (this->xl->ctx->iobuf_pool);
+ if (!iobuf) {
+ FREE (msg->hdr);
+ FREE (msg);
+ return -ENOMEM;
+ }
+
+ iov_unload (iobuf->ptr, vector, count);
+ msg->iobuf = iobuf;
+ }
+
+ pthread_mutex_lock (&peer_trans->handover.mutex);
+ {
+ list_add_tail (&msg->list, &peer_trans->handover.msgs);
+ pthread_cond_broadcast (&peer_trans->handover.cond);
+ }
+ pthread_mutex_unlock (&peer_trans->handover.mutex);
+
+ return 0;
+ }
GF_VALIDATE_OR_GOTO("transport", this, fail);
GF_VALIDATE_OR_GOTO("transport", this->ops, fail);
@@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
int32_t ret = -1;
GF_VALIDATE_OR_GOTO("transport", this, fail);
-
+
+ if (this->peer_trans) {
+ *hdr_p = this->handover.msg->hdr;
+ *hdrlen_p = this->handover.msg->hdrlen;
+ *iobuf_p = this->handover.msg->iobuf;
+
+ return 0;
+ }
+
ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p);
fail:
return ret;
@@ -338,3 +382,56 @@ fail:
return ret;
}
+
+void *
+transport_peerproc (void *trans_data)
+{
+ transport_t *trans = NULL;
+ struct transport_msg *msg = NULL;
+
+ trans = trans_data;
+
+ while (1) {
+ pthread_mutex_lock (&trans->handover.mutex);
+ {
+ while (list_empty (&trans->handover.msgs))
+ pthread_cond_wait (&trans->handover.cond,
+ &trans->handover.mutex);
+
+ msg = list_entry (trans->handover.msgs.next,
+ struct transport_msg, list);
+
+ list_del_init (&msg->list);
+ }
+ pthread_mutex_unlock (&trans->handover.mutex);
+
+ trans->handover.msg = msg;
+
+ trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans);
+
+ FREE (msg);
+ }
+}
+
+
+int
+transport_setpeer (transport_t *trans, transport_t *peer_trans)
+{
+ trans->peer_trans = peer_trans;
+
+ INIT_LIST_HEAD (&trans->handover.msgs);
+ pthread_cond_init (&trans->handover.cond, NULL);
+ pthread_mutex_init (&trans->handover.mutex, NULL);
+ pthread_create (&trans->handover.thread, NULL,
+ transport_peerproc, trans);
+
+ peer_trans->peer_trans = trans;
+
+ INIT_LIST_HEAD (&peer_trans->handover.msgs);
+ pthread_cond_init (&peer_trans->handover.cond, NULL);
+ pthread_mutex_init (&peer_trans->handover.mutex, NULL);
+ pthread_create (&peer_trans->handover.thread, NULL,
+ transport_peerproc, peer_trans);
+
+ return 0;
+}
diff --git a/libglusterfs/src/transport.h b/libglusterfs/src/transport.h
index a7cd5ec8f..2d85c76cc 100644
--- a/libglusterfs/src/transport.h
+++ b/libglusterfs/src/transport.h
@@ -40,6 +40,13 @@ typedef struct peer_info {
char identifier[UNIX_PATH_MAX];
}peer_info_t;
+struct transport_msg {
+ struct list_head list;
+ char *hdr;
+ int hdrlen;
+ struct iobuf *iobuf;
+};
+
struct transport {
struct transport_ops *ops;
void *private;
@@ -55,6 +62,16 @@ struct transport {
/* int (*notify) (transport_t *this, int event, void *data); */
peer_info_t peerinfo;
peer_info_t myinfo;
+
+ transport_t *peer_trans;
+ struct {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ pthread_t thread;
+ struct list_head msgs;
+ struct transport_msg *msg;
+ } handover;
+
};
struct transport_ops {
@@ -84,4 +101,6 @@ transport_t *transport_load (dict_t *options, xlator_t *xl);
transport_t *transport_ref (transport_t *trans);
int32_t transport_unref (transport_t *trans);
+int transport_setpeer (transport_t *trans, transport_t *trans_peer);
+
#endif /* __TRANSPORT_H__ */
diff --git a/xlators/protocol/client/src/client-protocol.c b/xlators/protocol/client/src/client-protocol.c
index 7a2326afd..a0a0f06fd 100644
--- a/xlators/protocol/client/src/client-protocol.c
+++ b/xlators/protocol/client/src/client-protocol.c
@@ -6254,6 +6254,8 @@ client_setvolume_cbk (call_frame_t *frame,
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
int32_t dict_len = 0;
+ transport_t *peer_trans = NULL;
+ uint64_t peer_trans_int = 0;
trans = frame->local; frame->local = NULL;
@@ -6321,14 +6323,22 @@ client_setvolume_cbk (call_frame_t *frame,
ctx = get_global_ctx_ptr ();
if (process_uuid && !strcmp (ctx->process_uuid,process_uuid)) {
+ ret = dict_get_uint64 (reply, "transport-ptr",
+ &peer_trans_int);
+
+ peer_trans = (void *) (long) (peer_trans_int);
gf_log (this->name, GF_LOG_WARNING,
"attaching to the local volume '%s'",
remote_subvol);
+ transport_setpeer (trans, peer_trans);
+
/* TODO: */
+ /*
conf->child = xlator_search_by_name (this,
remote_subvol);
+ */
}
gf_log (trans->xl->name, GF_LOG_NORMAL,
diff --git a/xlators/protocol/server/src/server-protocol.c b/xlators/protocol/server/src/server-protocol.c
index 92cdda2bf..7fc379efb 100644
--- a/xlators/protocol/server/src/server-protocol.c
+++ b/xlators/protocol/server/src/server-protocol.c
@@ -7085,6 +7085,9 @@ mop_setvolume (call_frame_t *frame, xlator_t *bound_xl,
ret = dict_set_str (reply, "process-uuid",
xl->ctx->process_uuid);
+ ret = dict_set_uint64 (reply, "transport-ptr",
+ ((uint64_t) (long) trans));
+
fail:
dict_len = dict_serialized_length (reply);
if (dict_len < 0) {