diff options
Diffstat (limited to 'libglusterfs/src')
| -rw-r--r-- | libglusterfs/src/transport.c | 101 | ||||
| -rw-r--r-- | libglusterfs/src/transport.h | 19 | 
2 files changed, 118 insertions, 2 deletions
diff --git a/libglusterfs/src/transport.c b/libglusterfs/src/transport.c index 2c2894d9031..244aa960b78 100644 --- a/libglusterfs/src/transport.c +++ b/libglusterfs/src/transport.c @@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len,  		  struct iovec *vector, int count,                    struct iobref *iobref)  { -	int32_t ret = -1; +	int32_t               ret = -1; +        transport_t          *peer_trans = NULL; +        struct iobuf         *iobuf = NULL; +        struct transport_msg *msg = NULL; + +        if (this->peer_trans) { +                peer_trans = this->peer_trans; + +                msg = CALLOC (1, sizeof (*msg)); +                if (!msg) { +                        return -ENOMEM; +                } + +                msg->hdr = buf; +                msg->hdrlen = len; + +                if (vector) { +                        iobuf = iobuf_get (this->xl->ctx->iobuf_pool); +                        if (!iobuf) { +                                FREE (msg->hdr); +                                FREE (msg); +                                return -ENOMEM; +                        } + +                        iov_unload (iobuf->ptr, vector, count); +                        msg->iobuf = iobuf; +                } + +                pthread_mutex_lock (&peer_trans->handover.mutex); +                { +                        list_add_tail (&msg->list, &peer_trans->handover.msgs); +                        pthread_cond_broadcast (&peer_trans->handover.cond); +                } +                pthread_mutex_unlock (&peer_trans->handover.mutex); + +                return 0; +        }  	GF_VALIDATE_OR_GOTO("transport", this, fail);  	GF_VALIDATE_OR_GOTO("transport", this->ops, fail); @@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,  	int32_t ret = -1;  	GF_VALIDATE_OR_GOTO("transport", this, fail); -   + +        if (this->peer_trans) { +                *hdr_p = this->handover.msg->hdr; +                *hdrlen_p = this->handover.msg->hdrlen; +                *iobuf_p = this->handover.msg->iobuf; + +                return 0; +        } +  	ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p);  fail:  	return ret; @@ -338,3 +382,56 @@ fail:  	return ret;  } + +void * +transport_peerproc (void *trans_data) +{ +        transport_t          *trans = NULL; +        struct transport_msg *msg = NULL; + +        trans = trans_data; + +        while (1) { +                pthread_mutex_lock (&trans->handover.mutex); +                { +                        while (list_empty (&trans->handover.msgs)) +                                pthread_cond_wait (&trans->handover.cond, +                                                   &trans->handover.mutex); + +                        msg = list_entry (trans->handover.msgs.next, +                                          struct transport_msg, list); + +                        list_del_init (&msg->list); +                } +                pthread_mutex_unlock (&trans->handover.mutex); + +                trans->handover.msg = msg; + +                trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans); + +                FREE (msg); +        } +} + + +int +transport_setpeer (transport_t *trans, transport_t *peer_trans) +{ +        trans->peer_trans = peer_trans; + +        INIT_LIST_HEAD (&trans->handover.msgs); +        pthread_cond_init (&trans->handover.cond, NULL); +        pthread_mutex_init (&trans->handover.mutex, NULL); +        pthread_create (&trans->handover.thread, NULL, +                        transport_peerproc, trans); + +        peer_trans->peer_trans = trans; + +        INIT_LIST_HEAD (&peer_trans->handover.msgs); +        pthread_cond_init (&peer_trans->handover.cond, NULL); +        pthread_mutex_init (&peer_trans->handover.mutex, NULL); +        pthread_create (&peer_trans->handover.thread, NULL, +                        transport_peerproc, peer_trans); + +        return 0; +} diff --git a/libglusterfs/src/transport.h b/libglusterfs/src/transport.h index a7cd5ec8f41..2d85c76cca1 100644 --- a/libglusterfs/src/transport.h +++ b/libglusterfs/src/transport.h @@ -40,6 +40,13 @@ typedef struct peer_info {  	char identifier[UNIX_PATH_MAX];  }peer_info_t; +struct transport_msg { +        struct list_head  list; +        char             *hdr; +        int               hdrlen; +        struct iobuf     *iobuf; +}; +  struct transport {  	struct transport_ops  *ops;  	void                  *private; @@ -55,6 +62,16 @@ struct transport {  	/*  int                  (*notify) (transport_t *this, int event, void *data); */  	peer_info_t     peerinfo;  	peer_info_t     myinfo; + +        transport_t    *peer_trans; +        struct { +                pthread_mutex_t       mutex; +                pthread_cond_t        cond; +                pthread_t             thread; +                struct list_head      msgs; +                struct transport_msg *msg; +        } handover; +                  };  struct transport_ops { @@ -84,4 +101,6 @@ transport_t *transport_load  (dict_t *options, xlator_t *xl);  transport_t *transport_ref   (transport_t *trans);  int32_t      transport_unref (transport_t *trans); +int transport_setpeer (transport_t *trans, transport_t *trans_peer); +  #endif /* __TRANSPORT_H__ */  | 
