diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 96 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 6 |
2 files changed, 83 insertions, 19 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 8ba2692cdc6..e14152c5822 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1172,11 +1172,11 @@ out: } -static int -socket_event_poll_err (rpc_transport_t *this) +static gf_boolean_t +socket_event_poll_err (rpc_transport_t *this, int gen, int idx) { - socket_private_t *priv = NULL; - int ret = -1; + socket_private_t *priv = NULL; + gf_boolean_t socket_closed = _gf_false; GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); @@ -1185,15 +1185,29 @@ socket_event_poll_err (rpc_transport_t *this) pthread_mutex_lock (&priv->lock); { - __socket_ioq_flush (this); - __socket_reset (this); + if ((priv->gen == gen) && (priv->idx == idx) + && (priv->sock != -1)) { + __socket_ioq_flush (this); + __socket_reset (this); + socket_closed = _gf_true; + } } pthread_mutex_unlock (&priv->lock); - rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); + if (socket_closed) { + pthread_mutex_lock (&priv->notify.lock); + { + while (priv->notify.in_progress) + pthread_cond_wait (&priv->notify.cond, + &priv->notify.lock); + } + pthread_mutex_unlock (&priv->notify.lock); + + rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); + } out: - return ret; + return socket_closed; } @@ -2271,22 +2285,50 @@ out: static int -socket_event_poll_in (rpc_transport_t *this) +socket_event_poll_in (rpc_transport_t *this, gf_boolean_t notify_handled) { int ret = -1; rpc_transport_pollin_t *pollin = NULL; socket_private_t *priv = this->private; + glusterfs_ctx_t *ctx = NULL; + + ctx = this->ctx; ret = socket_proto_state_machine (this, &pollin); + if (pollin) { + pthread_mutex_lock (&priv->notify.lock); + { + priv->notify.in_progress++; + } + pthread_mutex_unlock (&priv->notify.lock); + } + + + if (notify_handled && (ret != -1)) + event_handled (ctx->event_pool, priv->sock, priv->idx, + priv->gen); + if (pollin) { priv->ot_state = OT_CALLBACK; + ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); + if (priv->ot_state == OT_CALLBACK) { priv->ot_state = OT_RUNNING; } + rpc_transport_pollin_destroy (pollin); + + pthread_mutex_lock (&priv->notify.lock); + { + --priv->notify.in_progress; + + if (!priv->notify.in_progress) + pthread_cond_signal (&priv->notify.cond); + } + pthread_mutex_unlock (&priv->notify.lock); } return ret; @@ -2369,24 +2411,29 @@ static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait); /* reads rpc_requests during pollin */ static int -socket_event_handler (int fd, int idx, void *data, +socket_event_handler (int fd, int idx, int gen, void *data, int poll_in, int poll_out, int poll_err) { - rpc_transport_t *this = NULL; - socket_private_t *priv = NULL; - int ret = -1; + rpc_transport_t *this = NULL; + socket_private_t *priv = NULL; + int ret = -1; + glusterfs_ctx_t *ctx = NULL; + gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false; this = data; + GF_VALIDATE_OR_GOTO ("socket", this, out); GF_VALIDATE_OR_GOTO ("socket", this->private, out); GF_VALIDATE_OR_GOTO ("socket", this->xl, out); THIS = this->xl; priv = this->private; + ctx = this->ctx; pthread_mutex_lock (&priv->lock); { priv->idx = idx; + priv->gen = gen; } pthread_mutex_unlock (&priv->lock); @@ -2417,16 +2464,23 @@ socket_event_handler (int fd, int idx, void *data, } if (!ret && poll_in) { - ret = socket_event_poll_in (this); + ret = socket_event_poll_in (this, !poll_err); + notify_handled = _gf_true; } if ((ret < 0) || poll_err) { /* Logging has happened already in earlier cases */ gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG), "EPOLLERR - disconnecting now"); - socket_event_poll_err (this); - rpc_transport_unref (this); - } + + socket_closed = socket_event_poll_err (this, gen, idx); + + if (socket_closed) + rpc_transport_unref (this); + + } else if (!notify_handled) { + event_handled (ctx->event_pool, fd, idx, gen); + } out: return ret; @@ -2533,7 +2587,7 @@ socket_poller (void *ctx) } if (pfd[1].revents & POLL_MASK_INPUT) { - ret = socket_event_poll_in(this); + ret = socket_event_poll_in(this, 0); if (ret >= 0) { /* Suppress errors while making progress. */ pfd[1].revents &= ~POLL_MASK_ERROR; @@ -2657,7 +2711,7 @@ socket_spawn (rpc_transport_t *this) } static int -socket_server_event_handler (int fd, int idx, void *data, +socket_server_event_handler (int fd, int idx, int gen, void *data, int poll_in, int poll_out, int poll_err) { rpc_transport_t *this = NULL; @@ -2913,6 +2967,8 @@ socket_server_event_handler (int fd, int idx, void *data, } } out: + event_handled (ctx->event_pool, fd, idx, gen); + if (cname && (cname != this->ssl_name)) { GF_FREE(cname); } @@ -4024,6 +4080,8 @@ socket_init (rpc_transport_t *this) priv->bio = 0; priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; INIT_LIST_HEAD (&priv->ioq); + pthread_mutex_init (&priv->notify.lock, NULL); + pthread_cond_init (&priv->notify.cond, NULL); /* All the below section needs 'this->options' to be present */ if (!this->options) diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 6c8875f7fb7..e299a3d7bd5 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -203,6 +203,7 @@ typedef enum { typedef struct { int32_t sock; int32_t idx; + int32_t gen; /* -1 = not connected. 0 = in progress. 1 = connected */ char connected; /* 1 = connect failed for reasons other than EINPROGRESS/ENOENT @@ -254,6 +255,11 @@ typedef struct { int log_ctr; GF_REF_DECL; /* refcount to keep track of socket_poller threads */ + struct { + pthread_mutex_t lock; + pthread_cond_t cond; + uint64_t in_progress; + } notify; } socket_private_t; |