diff options
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 90 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 3 |
2 files changed, 58 insertions, 35 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 6261e564f91..820683d2e8c 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1185,7 +1185,8 @@ socket_event_poll_err (rpc_transport_t *this, int gen, int idx) priv = this->private; - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->in_lock); + pthread_mutex_lock (&priv->out_lock); { if ((priv->gen == gen) && (priv->idx == idx) && (priv->sock != -1)) { @@ -1194,7 +1195,8 @@ socket_event_poll_err (rpc_transport_t *this, int gen, int idx) socket_closed = _gf_true; } } - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); + pthread_mutex_unlock (&priv->in_lock); if (socket_closed) { pthread_mutex_lock (&priv->notify.lock); @@ -1224,7 +1226,7 @@ socket_event_poll_out (rpc_transport_t *this) priv = this->private; - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->out_lock); { if (priv->connected == 1) { ret = __socket_ioq_churn (this); @@ -1237,7 +1239,7 @@ socket_event_poll_out (rpc_transport_t *this) } } } - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); if (ret == 0) ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL); @@ -1934,13 +1936,13 @@ __socket_read_reply (rpc_transport_t *this) * and priv->lock, since we are doing an upcall here. */ frag->state = SP_STATE_NOTIFYING_XID; - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->in_lock); { ret = rpc_transport_notify (this, RPC_TRANSPORT_MAP_XID_REQUEST, in->request_info); } - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->in_lock); /* Transition back to externally visible state. */ frag->state = SP_STATE_READ_MSGTYPE; @@ -2275,11 +2277,11 @@ socket_proto_state_machine (rpc_transport_t *this, priv = this->private; - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->in_lock); { ret = __socket_proto_state_machine (this, pollin); } - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->in_lock); out: return ret; @@ -2350,7 +2352,8 @@ socket_connect_finish (rpc_transport_t *this) priv = this->private; - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->in_lock); + pthread_mutex_lock (&priv->out_lock); { if (priv->connected != 0) goto unlock; @@ -2400,7 +2403,8 @@ socket_connect_finish (rpc_transport_t *this) } } unlock: - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); + pthread_mutex_unlock (&priv->in_lock); if (notify_rpc) { rpc_transport_notify (this, event, this); @@ -2432,12 +2436,14 @@ socket_event_handler (int fd, int idx, int gen, void *data, priv = this->private; ctx = this->ctx; - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->in_lock); + pthread_mutex_lock (&priv->out_lock); { priv->idx = idx; priv->gen = gen; } - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); + pthread_mutex_unlock (&priv->in_lock); if (priv->connected != 1) { if (priv->connect_failed) { @@ -2556,9 +2562,9 @@ socket_poller (void *ctx) gen = priv->ot_gen; for (;;) { - pthread_mutex_lock(&priv->lock); + pthread_mutex_lock(&priv->out_lock); to_write = !list_empty(&priv->ioq); - pthread_mutex_unlock(&priv->lock); + pthread_mutex_unlock(&priv->out_lock); pfd[0].fd = priv->pipe[0]; pfd[0].events = POLL_MASK_ERROR; pfd[0].revents = 0; @@ -2652,7 +2658,8 @@ socket_poller (void *ctx) err: /* All (and only) I/O errors should come here. */ - pthread_mutex_lock(&priv->lock); + pthread_mutex_lock(&priv->in_lock); + pthread_mutex_lock(&priv->out_lock); { gf_log (this->name, GF_LOG_TRACE, "disconnecting socket"); __socket_teardown_connection (this); @@ -2666,7 +2673,8 @@ err: priv->ot_state = OT_IDLE; } - pthread_mutex_unlock(&priv->lock); + pthread_mutex_unlock(&priv->out_lock); + pthread_mutex_unlock(&priv->in_lock); rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); @@ -3014,11 +3022,13 @@ socket_disconnect (rpc_transport_t *this, gf_boolean_t wait) pthread_mutex_unlock (&priv->cond_lock); } - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->in_lock); + pthread_mutex_lock (&priv->out_lock); { ret = __socket_disconnect (this); } - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); + pthread_mutex_unlock (&priv->in_lock); out: return ret; @@ -3118,7 +3128,8 @@ socket_connect (rpc_transport_t *this, int port) goto err; } - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->in_lock); + pthread_mutex_lock (&priv->out_lock); { priv->own_thread_done = _gf_false; if (priv->sock != -1) { @@ -3395,7 +3406,8 @@ handler: unlock: sock = priv->sock; } - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); + pthread_mutex_unlock (&priv->in_lock); err: /* if sock != -1, then cleanup is done from the event handler */ @@ -3445,11 +3457,13 @@ socket_listen (rpc_transport_t *this) myinfo = &this->myinfo; ctx = this->ctx; - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->in_lock); + pthread_mutex_lock (&priv->out_lock); { sock = priv->sock; } - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); + pthread_mutex_unlock (&priv->in_lock); if (sock != -1) { gf_log_callingfn (this->name, GF_LOG_DEBUG, @@ -3463,7 +3477,8 @@ socket_listen (rpc_transport_t *this) return ret; } - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->in_lock); + pthread_mutex_lock (&priv->out_lock); { if (priv->sock != -1) { gf_log (this->name, GF_LOG_DEBUG, @@ -3567,7 +3582,8 @@ socket_listen (rpc_transport_t *this) } } unlock: - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); + pthread_mutex_unlock (&priv->in_lock); out: return ret; @@ -3591,7 +3607,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) priv = this->private; ctx = this->ctx; - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->out_lock); { if (priv->connected != 1) { if (!priv->submit_log && !priv->connect_finish_log) { @@ -3641,7 +3657,7 @@ socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) } } unlock: - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); out: return ret; @@ -3665,7 +3681,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) priv = this->private; ctx = this->ctx; - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->out_lock); { if (priv->connected != 1) { if (!priv->submit_log && !priv->connect_finish_log) { @@ -3715,7 +3731,7 @@ socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) } } unlock: - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); out: return ret; @@ -3814,7 +3830,8 @@ socket_throttle (rpc_transport_t *this, gf_boolean_t onoff) will never read() any more data until throttling is turned off. */ - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->in_lock); + pthread_mutex_lock (&priv->out_lock); { /* Throttling is useless on a disconnected transport. In fact, @@ -3828,7 +3845,8 @@ socket_throttle (rpc_transport_t *this, gf_boolean_t onoff) priv->idx, (int) !onoff, -1); } - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); + pthread_mutex_unlock (&priv->in_lock); return 0; } @@ -4108,7 +4126,8 @@ socket_init (rpc_transport_t *this) } memset(priv,0,sizeof(*priv)); - pthread_mutex_init (&priv->lock, NULL); + pthread_mutex_init (&priv->in_lock, NULL); + pthread_mutex_init (&priv->out_lock, NULL); pthread_mutex_init (&priv->cond_lock, NULL); pthread_cond_init (&priv->cond, NULL); @@ -4530,17 +4549,20 @@ fini (rpc_transport_t *this) priv = this->private; if (priv) { if (priv->sock != -1) { - pthread_mutex_lock (&priv->lock); + pthread_mutex_lock (&priv->in_lock); + pthread_mutex_lock (&priv->out_lock); { __socket_ioq_flush (this); __socket_reset (this); } - pthread_mutex_unlock (&priv->lock); + pthread_mutex_unlock (&priv->out_lock); + pthread_mutex_unlock (&priv->in_lock); } gf_log (this->name, GF_LOG_TRACE, "transport %p destroyed", this); - pthread_mutex_destroy (&priv->lock); + pthread_mutex_destroy (&priv->in_lock); + pthread_mutex_destroy (&priv->out_lock); pthread_mutex_destroy (&priv->cond_lock); pthread_cond_destroy (&priv->cond); if (priv->ssl_private_key) { diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index e299a3d7bd5..59110b5043a 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -220,7 +220,8 @@ typedef struct { }; }; struct gf_sock_incoming incoming; - pthread_mutex_t lock; + pthread_mutex_t in_lock; + pthread_mutex_t out_lock; pthread_mutex_t cond_lock; pthread_cond_t cond; int windowsize; |