diff options
| -rw-r--r-- | libglusterfs/src/transport.c | 101 | ||||
| -rw-r--r-- | libglusterfs/src/transport.h | 19 | ||||
| -rw-r--r-- | xlators/protocol/client/src/client-protocol.c | 10 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server-protocol.c | 3 | 
4 files changed, 131 insertions, 2 deletions
diff --git a/libglusterfs/src/transport.c b/libglusterfs/src/transport.c index 2c2894d9031..244aa960b78 100644 --- a/libglusterfs/src/transport.c +++ b/libglusterfs/src/transport.c @@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len,  		  struct iovec *vector, int count,                    struct iobref *iobref)  { -	int32_t ret = -1; +	int32_t               ret = -1; +        transport_t          *peer_trans = NULL; +        struct iobuf         *iobuf = NULL; +        struct transport_msg *msg = NULL; + +        if (this->peer_trans) { +                peer_trans = this->peer_trans; + +                msg = CALLOC (1, sizeof (*msg)); +                if (!msg) { +                        return -ENOMEM; +                } + +                msg->hdr = buf; +                msg->hdrlen = len; + +                if (vector) { +                        iobuf = iobuf_get (this->xl->ctx->iobuf_pool); +                        if (!iobuf) { +                                FREE (msg->hdr); +                                FREE (msg); +                                return -ENOMEM; +                        } + +                        iov_unload (iobuf->ptr, vector, count); +                        msg->iobuf = iobuf; +                } + +                pthread_mutex_lock (&peer_trans->handover.mutex); +                { +                        list_add_tail (&msg->list, &peer_trans->handover.msgs); +                        pthread_cond_broadcast (&peer_trans->handover.cond); +                } +                pthread_mutex_unlock (&peer_trans->handover.mutex); + +                return 0; +        }  	GF_VALIDATE_OR_GOTO("transport", this, fail);  	GF_VALIDATE_OR_GOTO("transport", this->ops, fail); @@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,  	int32_t ret = -1;  	GF_VALIDATE_OR_GOTO("transport", this, fail); -   + +        if (this->peer_trans) { +                *hdr_p = this->handover.msg->hdr; +                *hdrlen_p = this->handover.msg->hdrlen; +                *iobuf_p = this->handover.msg->iobuf; + +                return 0; +        } +  	ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p);  fail:  	return ret; @@ -338,3 +382,56 @@ fail:  	return ret;  } + +void * +transport_peerproc (void *trans_data) +{ +        transport_t          *trans = NULL; +        struct transport_msg *msg = NULL; + +        trans = trans_data; + +        while (1) { +                pthread_mutex_lock (&trans->handover.mutex); +                { +                        while (list_empty (&trans->handover.msgs)) +                                pthread_cond_wait (&trans->handover.cond, +                                                   &trans->handover.mutex); + +                        msg = list_entry (trans->handover.msgs.next, +                                          struct transport_msg, list); + +                        list_del_init (&msg->list); +                } +                pthread_mutex_unlock (&trans->handover.mutex); + +                trans->handover.msg = msg; + +                trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans); + +                FREE (msg); +        } +} + + +int +transport_setpeer (transport_t *trans, transport_t *peer_trans) +{ +        trans->peer_trans = peer_trans; + +        INIT_LIST_HEAD (&trans->handover.msgs); +        pthread_cond_init (&trans->handover.cond, NULL); +        pthread_mutex_init (&trans->handover.mutex, NULL); +        pthread_create (&trans->handover.thread, NULL, +                        transport_peerproc, trans); + +        peer_trans->peer_trans = trans; + +        INIT_LIST_HEAD (&peer_trans->handover.msgs); +        pthread_cond_init (&peer_trans->handover.cond, NULL); +        pthread_mutex_init (&peer_trans->handover.mutex, NULL); +        pthread_create (&peer_trans->handover.thread, NULL, +                        transport_peerproc, peer_trans); + +        return 0; +} diff --git a/libglusterfs/src/transport.h b/libglusterfs/src/transport.h index a7cd5ec8f41..2d85c76cca1 100644 --- a/libglusterfs/src/transport.h +++ b/libglusterfs/src/transport.h @@ -40,6 +40,13 @@ typedef struct peer_info {  	char identifier[UNIX_PATH_MAX];  }peer_info_t; +struct transport_msg { +        struct list_head  list; +        char             *hdr; +        int               hdrlen; +        struct iobuf     *iobuf; +}; +  struct transport {  	struct transport_ops  *ops;  	void                  *private; @@ -55,6 +62,16 @@ struct transport {  	/*  int                  (*notify) (transport_t *this, int event, void *data); */  	peer_info_t     peerinfo;  	peer_info_t     myinfo; + +        transport_t    *peer_trans; +        struct { +                pthread_mutex_t       mutex; +                pthread_cond_t        cond; +                pthread_t             thread; +                struct list_head      msgs; +                struct transport_msg *msg; +        } handover; +                  };  struct transport_ops { @@ -84,4 +101,6 @@ transport_t *transport_load  (dict_t *options, xlator_t *xl);  transport_t *transport_ref   (transport_t *trans);  int32_t      transport_unref (transport_t *trans); +int transport_setpeer (transport_t *trans, transport_t *trans_peer); +  #endif /* __TRANSPORT_H__ */ diff --git a/xlators/protocol/client/src/client-protocol.c b/xlators/protocol/client/src/client-protocol.c index 7a2326afdd9..a0a0f06fd8d 100644 --- a/xlators/protocol/client/src/client-protocol.c +++ b/xlators/protocol/client/src/client-protocol.c @@ -6254,6 +6254,8 @@ client_setvolume_cbk (call_frame_t *frame,  	int32_t                 op_ret   = -1;  	int32_t                 op_errno = EINVAL;  	int32_t                 dict_len = 0; +        transport_t            *peer_trans = NULL; +        uint64_t                peer_trans_int = 0;  	trans = frame->local; frame->local = NULL; @@ -6321,14 +6323,22 @@ client_setvolume_cbk (call_frame_t *frame,  		ctx = get_global_ctx_ptr ();  		if (process_uuid && !strcmp (ctx->process_uuid,process_uuid)) { +                        ret = dict_get_uint64 (reply, "transport-ptr", +                                               &peer_trans_int); + +                        peer_trans = (void *) (long) (peer_trans_int);  			gf_log (this->name, GF_LOG_WARNING,   				"attaching to the local volume '%s'",  				remote_subvol); +                        transport_setpeer (trans, peer_trans); +  			/* TODO: */ +                        /*  			conf->child = xlator_search_by_name (this,   							     remote_subvol); +                        */  		}                  gf_log (trans->xl->name, GF_LOG_NORMAL, diff --git a/xlators/protocol/server/src/server-protocol.c b/xlators/protocol/server/src/server-protocol.c index 92cdda2bf84..7fc379efb77 100644 --- a/xlators/protocol/server/src/server-protocol.c +++ b/xlators/protocol/server/src/server-protocol.c @@ -7085,6 +7085,9 @@ mop_setvolume (call_frame_t *frame, xlator_t *bound_xl,  	ret = dict_set_str (reply, "process-uuid",   			    xl->ctx->process_uuid); +	ret = dict_set_uint64 (reply, "transport-ptr", +                               ((uint64_t) (long) trans)); +  fail:  	dict_len = dict_serialized_length (reply);  	if (dict_len < 0) {  | 
