diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 99 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 9 |
2 files changed, 69 insertions, 39 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 97c0dc57d41..d33146ab527 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -629,25 +629,23 @@ __socket_disconnect (rpc_transport_t *this) priv = this->private; + gf_log (this->name, GF_LOG_TRACE, + "disconnecting %p, state=%u gen=%u sock=%d", this, + priv->ot_state, priv->ot_gen, priv->sock); + if (priv->sock != -1) { ret = __socket_shutdown(this); if (priv->own_thread) { - /* - * Without this, reconnect (= disconnect + connect) - * won't work except by accident. - */ - close(priv->sock); - priv->sock = -1; /* - * Closing the socket forces an error that will wake - * up the polling thread. Wait for it to notice and - * respond. + * Without this, reconnect (= disconnect + connect) + * won't work except by accident. */ - if (priv->ot_state == OT_ALIVE) { - priv->ot_state = OT_DYING; - pthread_cond_wait(&priv->ot_event,&priv->lock); - } - } + close(priv->sock); + priv->sock = -1; + gf_log (this->name, GF_LOG_TRACE, + "OT_PLEASE_DIE on %p", this); + priv->ot_state = OT_PLEASE_DIE; + } else if (priv->use_ssl) { ssl_teardown_connection(priv); } @@ -1415,7 +1413,7 @@ __socket_read_request (rpc_transport_t *this) buf = rpc_procnum_addr (iobuf_ptr (in->iobuf)); procnum = ntoh32 (*((uint32_t *)buf)); - if (this->listener) { + if (priv->is_server) { /* this check is needed as rpcsvc and rpc-clnt * actor structures are not same */ vector_sizer = @@ -2112,12 +2110,17 @@ socket_event_poll_in (rpc_transport_t *this) { int ret = -1; rpc_transport_pollin_t *pollin = NULL; + socket_private_t *priv = this->private; ret = socket_proto_state_machine (this, &pollin); if (pollin != NULL) { + priv->ot_state = OT_CALLBACK; ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, pollin); + if (priv->ot_state == OT_CALLBACK) { + priv->ot_state = OT_RUNNING; + } rpc_transport_pollin_destroy (pollin); } @@ -2252,6 +2255,9 @@ socket_poller (void *ctx) struct pollfd pfd[2] = {{0,},}; gf_boolean_t to_write = _gf_false; int ret = 0; + uint32_t gen = 0; + + priv->ot_state = OT_RUNNING; if (priv->use_ssl) { if (ssl_setup_connection(this,priv->connected) < 0) { @@ -2287,6 +2293,7 @@ socket_poller (void *ctx) "asynchronous rpc_transport_notify failed"); } + gen = priv->ot_gen; for (;;) { pthread_mutex_lock(&priv->lock); to_write = !list_empty(&priv->ioq); @@ -2323,6 +2330,13 @@ socket_poller (void *ctx) else if (errno == ENOTCONN) { ret = 0; } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_TRACE, + "OT_IDLE on %p (input request)", + this); + priv->ot_state = OT_IDLE; + break; + } } else if (pfd[1].revents & POLL_MASK_OUTPUT) { ret = socket_event_poll_out(this); @@ -2333,6 +2347,13 @@ socket_poller (void *ctx) else if (errno == ENOTCONN) { ret = 0; } + if (priv->ot_state == OT_PLEASE_DIE) { + gf_log (this->name, GF_LOG_TRACE, + "OT_IDLE on %p (output request)", + this); + priv->ot_state = OT_IDLE; + break; + } } else { /* @@ -2353,21 +2374,17 @@ socket_poller (void *ctx) "error in polling loop"); break; } + if (priv->ot_gen != gen) { + gf_log (this->name, GF_LOG_TRACE, + "generation mismatch, my %u != %u", + gen, priv->ot_gen); + return NULL; + } } err: /* All (and only) I/O errors should come here. */ pthread_mutex_lock(&priv->lock); - if (priv->ot_state == OT_ALIVE) { - /* - * We have to do this if we're here because of an error we - * detected ourselves, but need to avoid a recursive call - * if our death is the result of an external disconnect. - */ - __socket_shutdown(this); - close(priv->sock); - priv->sock = -1; - } if (priv->ssl_ssl) { /* * We're always responsible for this part, but only actually @@ -2376,16 +2393,14 @@ err: */ ssl_teardown_connection(priv); } + __socket_shutdown(this); + close(priv->sock); + priv->sock = -1; priv->ot_state = OT_IDLE; - /* - * We expect there to be only one waiter, but if there do happen to - * be multiple it's probably better to unblock them than to let them - * hang. If there are none, this is a harmless no-op. - */ - pthread_cond_broadcast(&priv->ot_event); pthread_mutex_unlock(&priv->lock); - rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this); - rpc_transport_unref (this); + rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, + this); + rpc_transport_unref (this); return NULL; } @@ -2395,13 +2410,20 @@ socket_spawn (rpc_transport_t *this) { socket_private_t *priv = this->private; - if (priv->ot_state == OT_ALIVE) { + switch (priv->ot_state) { + case OT_IDLE: + case OT_PLEASE_DIE: + break; + default: gf_log (this->name, GF_LOG_WARNING, "refusing to start redundant poller"); return; } - priv->ot_state = OT_ALIVE; + priv->ot_gen += 7; + priv->ot_state = OT_SPAWNING; + gf_log (this->name, GF_LOG_TRACE, + "spawning %p with gen %u", this, priv->ot_gen); if (pthread_create(&priv->thread,NULL,socket_poller,this) != 0) { gf_log (this->name, GF_LOG_ERROR, @@ -2554,6 +2576,7 @@ socket_server_event_handler (int fd, int idx, void *data, * connection. */ new_priv->connected = 1; + new_priv->is_server = _gf_true; rpc_transport_ref (new_trans); if (new_priv->own_thread) { @@ -2657,6 +2680,10 @@ socket_connect (rpc_transport_t *this, int port) goto err; } + gf_log (this->name, GF_LOG_TRACE, + "connecting %p, state=%u gen=%u sock=%d", this, + priv->ot_state, priv->ot_gen, priv->sock); + ret = socket_client_get_remote_sockaddr (this, &sock_union.sa, &sockaddr_len, &sa_family); if (ret == -1) { @@ -2822,6 +2849,7 @@ socket_connect (rpc_transport_t *this, int port) * initializing a client connection. */ priv->connected = 0; + priv->is_server = _gf_false; rpc_transport_ref (this); if (priv->own_thread) { @@ -3545,7 +3573,6 @@ socket_init (rpc_transport_t *this) if (priv->own_thread) { priv->ot_state = OT_IDLE; - pthread_cond_init (&priv->ot_event, NULL); } out: diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index bb342d99869..e0b412fcce1 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -186,8 +186,10 @@ struct gf_sock_incoming { typedef enum { OT_IDLE, /* Uninitialized or termination complete. */ - OT_ALIVE, /* Past pthread_create, no error/disconnect. */ - OT_DYING, /* Disconnect in progress. */ + OT_SPAWNING, /* Past pthread_create but not in thread yet. */ + OT_RUNNING, /* Poller thread running normally. */ + OT_CALLBACK, /* Poller thread in the middle of a callback. */ + OT_PLEASE_DIE, /* Poller termination requested. */ } ot_state_t; typedef struct { @@ -229,7 +231,8 @@ typedef struct { int pipe[2]; gf_boolean_t own_thread; ot_state_t ot_state; - pthread_cond_t ot_event; + uint32_t ot_gen; + gf_boolean_t is_server; } socket_private_t; |