diff options
author | Jeff Darcy <jdarcy@redhat.com> | 2013-06-04 15:20:45 -0400 |
---|---|---|
committer | Anand Avati <avati@redhat.com> | 2013-06-04 15:37:54 -0700 |
commit | 5c1710ed60ccb151ccd7a2890b24bb99518d36da (patch) | |
tree | e267cd0208f99d5b3b6420c9e8eff3e029d862ca /rpc | |
parent | dbfe779f3049e6fbc2394bdacdb57165d51dc3f3 (diff) |
transport/socket: fix connect/disconnect races
We might receive a connect request while a disconnect is still in
progress, requiring more states and (the return of) poller generation
numbers to avoid redundant pollers. We might also get either kind of
request from within our own rpc_transport_notify upcall, so we have to
avoid locking and use the PLEASE_DIE state instead.
Change-Id: Icbaacf96c516b607a79ff62c90b74d42b241780f
BUG: 970194
Signed-off-by: Jeff Darcy <jdarcy@redhat.com>
Reviewed-on: http://review.gluster.org/5137
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 99 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 9 |
2 files changed, 69 insertions, 39 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 97c0dc57d41..d33146ab527 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -629,25 +629,23 @@ __socket_disconnect (rpc_transport_t *this) priv = this->private; + gf_log (this->name, GF_LOG_TRACE, + "disconnecting %p, state=%u gen=%u sock=%d", this, + priv->ot_state, priv->ot_gen, priv->sock); + if (priv->sock != -1) { ret = __socket_shutdown(this); if (priv->own_thread) { - /* - * Without this, reconnect (= disconnect + connect) - * won't work except by accident. - */ - close(priv->sock); - priv->sock = -1; /* - * Closing the socket forces an error that will wake - * up the polling thread. Wait for it to notice and - * respond. + * Without this, reconnect (= disconnect + connect) + * won't work except by accident. */ - if (priv->ot_state == OT_ALIVE) { - priv->ot_state = OT_DYING; - pthread_cond_wait(&priv->ot_event,&priv->lock); - } - } + close(priv->sock); + priv->sock = -1; + gf_log (this->name, GF_LOG_TRACE, + "OT_PLEASE_DIE on %p", this); + priv->ot_state = OT_PLEASE_DIE; + } else if (priv->use_ssl) { ssl_teardown_connection(priv); } @@ -1415,7 +1413,7 @@ __socket_read_request (rpc_transport_t *this) buf = rpc_procnum_addr (iobuf_ptr (in->iobuf)); procnum = ntoh32 (*((uint32_t *)buf)); - if (this->listener) { + if (priv->is_server) { /* this check is needed as rpcsvc and rpc-clnt * actor structures are not same */ vector_sizer = @@ -2112,12 +2110,17 @@ socket_event_poll_in (rpc_transport_t *this) { int ret = -1; rpc_transport_pollin_t *pollin = NULL; + socket_private_t *priv = this->private; ret = socket_proto_state_machine (this, &pollin); if (pollin != NULL) { + priv->ot_state = OT_CALLBACK; ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); + if (priv->ot_state == OT_CALLBACK) { + priv->ot_state = OT_RUNNING; + } rpc_transport_pollin_destroy (pollin); } @@ -2252,6 +2255,9 @@ socket_poller (void *ctx) struct pollfd pfd[2] = {{0,},}; gf_boolean_t to_write = _gf_false; int ret = 0; + uint32_t gen = 0; + + priv->ot_state = OT_RUNNING; if (priv->use_ssl) { if (ssl_setup_connection(this,priv->connected) < 0) { @@ -2287,6 +2293,7 @@ socket_poller (void *ctx) "asynchronous rpc_transport_notify failed"); } + gen = priv->ot_gen; for (;;) { pthread_mutex_lock(&priv->lock); to_write = !list_empty(&priv->ioq); @@ -2323,6 +2330,13 @@ socket_poller (void *ctx) else if (errno == ENOTCONN) { ret = 0; } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_TRACE, + "OT_IDLE on %p (input request)", + this); + priv->ot_state = OT_IDLE; + break; + } } else if (pfd[1].revents & POLL_MASK_OUTPUT) { ret = socket_event_poll_out(this); @@ -2333,6 +2347,13 @@ socket_poller (void *ctx) else if (errno == ENOTCONN) { ret = 0; } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_TRACE, + "OT_IDLE on %p (output request)", + this); + priv->ot_state = OT_IDLE; + break; + } } else { /* @@ -2353,21 +2374,17 @@ socket_poller (void *ctx) "error in polling loop"); break; } + if (priv->ot_gen != gen) { + gf_log (this->name, GF_LOG_TRACE, + "generation mismatch, my %u != %u", + gen, priv->ot_gen); + return NULL; + } } err: /* All (and only) I/O errors should come here. */ pthread_mutex_lock(&priv->lock); - if (priv->ot_state == OT_ALIVE) { - /* - * We have to do this if we're here because of an error we - * detected ourselves, but need to avoid a recursive call - * if our death is the result of an external disconnect. - */ - __socket_shutdown(this); - close(priv->sock); - priv->sock = -1; - } if (priv->ssl_ssl) { /* * We're always responsible for this part, but only actually @@ -2376,16 +2393,14 @@ err: */ ssl_teardown_connection(priv); } + __socket_shutdown(this); + close(priv->sock); + priv->sock = -1; priv->ot_state = OT_IDLE; - /* - * We expect there to be only one waiter, but if there do happen to - * be multiple it's probably better to unblock them than to let them - * hang. If there are none, this is a harmless no-op. - */ - pthread_cond_broadcast(&priv->ot_event); pthread_mutex_unlock(&priv->lock); - rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this); - rpc_transport_unref (this); + rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, + this); + rpc_transport_unref (this); return NULL; } @@ -2395,13 +2410,20 @@ socket_spawn (rpc_transport_t *this) { socket_private_t *priv = this->private; - if (priv->ot_state == OT_ALIVE) { + switch (priv->ot_state) { + case OT_IDLE: + case OT_PLEASE_DIE: + break; + default: gf_log (this->name, GF_LOG_WARNING, "refusing to start redundant poller"); return; } - priv->ot_state = OT_ALIVE; + priv->ot_gen += 7; + priv->ot_state = OT_SPAWNING; + gf_log (this->name, GF_LOG_TRACE, + "spawning %p with gen %u", this, priv->ot_gen); if (pthread_create(&priv->thread,NULL,socket_poller,this) != 0) { gf_log (this->name, GF_LOG_ERROR, @@ -2554,6 +2576,7 @@ socket_server_event_handler (int fd, int idx, void *data, * connection. */ new_priv->connected = 1; + new_priv->is_server = _gf_true; rpc_transport_ref (new_trans); if (new_priv->own_thread) { @@ -2657,6 +2680,10 @@ socket_connect (rpc_transport_t *this, int port) goto err; } + gf_log (this->name, GF_LOG_TRACE, + "connecting %p, state=%u gen=%u sock=%d", this, + priv->ot_state, priv->ot_gen, priv->sock); + ret = socket_client_get_remote_sockaddr (this, &sock_union.sa, &sockaddr_len, &sa_family); if (ret == -1) { @@ -2822,6 +2849,7 @@ socket_connect (rpc_transport_t *this, int port) * initializing a client connection. */ priv->connected = 0; + priv->is_server = _gf_false; rpc_transport_ref (this); if (priv->own_thread) { @@ -3545,7 +3573,6 @@ socket_init (rpc_transport_t *this) if (priv->own_thread) { priv->ot_state = OT_IDLE; - pthread_cond_init (&priv->ot_event, NULL); } out: diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index bb342d99869..e0b412fcce1 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -186,8 +186,10 @@ struct gf_sock_incoming { typedef enum { OT_IDLE, /* Uninitialized or termination complete. */ - OT_ALIVE, /* Past pthread_create, no error/disconnect. */ - OT_DYING, /* Disconnect in progress. */ + OT_SPAWNING, /* Past pthread_create but not in thread yet. */ + OT_RUNNING, /* Poller thread running normally. */ + OT_CALLBACK, /* Poller thread in the middle of a callback. */ + OT_PLEASE_DIE, /* Poller termination requested. */ } ot_state_t; typedef struct { @@ -229,7 +231,8 @@ typedef struct { int pipe[2]; gf_boolean_t own_thread; ot_state_t ot_state; - pthread_cond_t ot_event; + uint32_t ot_gen; + gf_boolean_t is_server; } socket_private_t; |