summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rpc/rpc-transport/socket/src/socket.c99
-rw-r--r--rpc/rpc-transport/socket/src/socket.h9
2 files changed, 69 insertions, 39 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 97c0dc57..d33146ab 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -629,25 +629,23 @@ __socket_disconnect (rpc_transport_t *this)
priv = this->private;
+ gf_log (this->name, GF_LOG_TRACE,
+ "disconnecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
if (priv->sock != -1) {
ret = __socket_shutdown(this);
if (priv->own_thread) {
- /*
- * Without this, reconnect (= disconnect + connect)
- * won't work except by accident.
- */
- close(priv->sock);
- priv->sock = -1;
/*
- * Closing the socket forces an error that will wake
- * up the polling thread. Wait for it to notice and
- * respond.
+ * Without this, reconnect (= disconnect + connect)
+ * won't work except by accident.
*/
- if (priv->ot_state == OT_ALIVE) {
- priv->ot_state = OT_DYING;
- pthread_cond_wait(&priv->ot_event,&priv->lock);
- }
- }
+ close(priv->sock);
+ priv->sock = -1;
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_PLEASE_DIE on %p", this);
+ priv->ot_state = OT_PLEASE_DIE;
+ }
else if (priv->use_ssl) {
ssl_teardown_connection(priv);
}
@@ -1415,7 +1413,7 @@ __socket_read_request (rpc_transport_t *this)
buf = rpc_procnum_addr (iobuf_ptr (in->iobuf));
procnum = ntoh32 (*((uint32_t *)buf));
- if (this->listener) {
+ if (priv->is_server) {
/* this check is needed as rpcsvc and rpc-clnt
* actor structures are not same */
vector_sizer =
@@ -2112,12 +2110,17 @@ socket_event_poll_in (rpc_transport_t *this)
{
int ret = -1;
rpc_transport_pollin_t *pollin = NULL;
+ socket_private_t *priv = this->private;
ret = socket_proto_state_machine (this, &pollin);
if (pollin != NULL) {
+ priv->ot_state = OT_CALLBACK;
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
+ if (priv->ot_state == OT_CALLBACK) {
+ priv->ot_state = OT_RUNNING;
+ }
rpc_transport_pollin_destroy (pollin);
}
@@ -2252,6 +2255,9 @@ socket_poller (void *ctx)
struct pollfd pfd[2] = {{0,},};
gf_boolean_t to_write = _gf_false;
int ret = 0;
+ uint32_t gen = 0;
+
+ priv->ot_state = OT_RUNNING;
if (priv->use_ssl) {
if (ssl_setup_connection(this,priv->connected) < 0) {
@@ -2287,6 +2293,7 @@ socket_poller (void *ctx)
"asynchronous rpc_transport_notify failed");
}
+ gen = priv->ot_gen;
for (;;) {
pthread_mutex_lock(&priv->lock);
to_write = !list_empty(&priv->ioq);
@@ -2323,6 +2330,13 @@ socket_poller (void *ctx)
else if (errno == ENOTCONN) {
ret = 0;
}
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (input request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
}
else if (pfd[1].revents & POLL_MASK_OUTPUT) {
ret = socket_event_poll_out(this);
@@ -2333,6 +2347,13 @@ socket_poller (void *ctx)
else if (errno == ENOTCONN) {
ret = 0;
}
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "OT_IDLE on %p (output request)",
+ this);
+ priv->ot_state = OT_IDLE;
+ break;
+ }
}
else {
/*
@@ -2353,21 +2374,17 @@ socket_poller (void *ctx)
"error in polling loop");
break;
}
+ if (priv->ot_gen != gen) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "generation mismatch, my %u != %u",
+ gen, priv->ot_gen);
+ return NULL;
+ }
}
err:
/* All (and only) I/O errors should come here. */
pthread_mutex_lock(&priv->lock);
- if (priv->ot_state == OT_ALIVE) {
- /*
- * We have to do this if we're here because of an error we
- * detected ourselves, but need to avoid a recursive call
- * if our death is the result of an external disconnect.
- */
- __socket_shutdown(this);
- close(priv->sock);
- priv->sock = -1;
- }
if (priv->ssl_ssl) {
/*
* We're always responsible for this part, but only actually
@@ -2376,16 +2393,14 @@ err:
*/
ssl_teardown_connection(priv);
}
+ __socket_shutdown(this);
+ close(priv->sock);
+ priv->sock = -1;
priv->ot_state = OT_IDLE;
- /*
- * We expect there to be only one waiter, but if there do happen to
- * be multiple it's probably better to unblock them than to let them
- * hang. If there are none, this is a harmless no-op.
- */
- pthread_cond_broadcast(&priv->ot_event);
pthread_mutex_unlock(&priv->lock);
- rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT, this);
- rpc_transport_unref (this);
+ rpc_transport_notify (this->listener, RPC_TRANSPORT_DISCONNECT,
+ this);
+ rpc_transport_unref (this);
return NULL;
}
@@ -2395,13 +2410,20 @@ socket_spawn (rpc_transport_t *this)
{
socket_private_t *priv = this->private;
- if (priv->ot_state == OT_ALIVE) {
+ switch (priv->ot_state) {
+ case OT_IDLE:
+ case OT_PLEASE_DIE:
+ break;
+ default:
gf_log (this->name, GF_LOG_WARNING,
"refusing to start redundant poller");
return;
}
- priv->ot_state = OT_ALIVE;
+ priv->ot_gen += 7;
+ priv->ot_state = OT_SPAWNING;
+ gf_log (this->name, GF_LOG_TRACE,
+ "spawning %p with gen %u", this, priv->ot_gen);
if (pthread_create(&priv->thread,NULL,socket_poller,this) != 0) {
gf_log (this->name, GF_LOG_ERROR,
@@ -2554,6 +2576,7 @@ socket_server_event_handler (int fd, int idx, void *data,
* connection.
*/
new_priv->connected = 1;
+ new_priv->is_server = _gf_true;
rpc_transport_ref (new_trans);
if (new_priv->own_thread) {
@@ -2657,6 +2680,10 @@ socket_connect (rpc_transport_t *this, int port)
goto err;
}
+ gf_log (this->name, GF_LOG_TRACE,
+ "connecting %p, state=%u gen=%u sock=%d", this,
+ priv->ot_state, priv->ot_gen, priv->sock);
+
ret = socket_client_get_remote_sockaddr (this, &sock_union.sa,
&sockaddr_len, &sa_family);
if (ret == -1) {
@@ -2822,6 +2849,7 @@ socket_connect (rpc_transport_t *this, int port)
* initializing a client connection.
*/
priv->connected = 0;
+ priv->is_server = _gf_false;
rpc_transport_ref (this);
if (priv->own_thread) {
@@ -3545,7 +3573,6 @@ socket_init (rpc_transport_t *this)
if (priv->own_thread) {
priv->ot_state = OT_IDLE;
- pthread_cond_init (&priv->ot_event, NULL);
}
out:
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index bb342d99..e0b412fc 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -186,8 +186,10 @@ struct gf_sock_incoming {
typedef enum {
OT_IDLE, /* Uninitialized or termination complete. */
- OT_ALIVE, /* Past pthread_create, no error/disconnect. */
- OT_DYING, /* Disconnect in progress. */
+ OT_SPAWNING, /* Past pthread_create but not in thread yet. */
+ OT_RUNNING, /* Poller thread running normally. */
+ OT_CALLBACK, /* Poller thread in the middle of a callback. */
+ OT_PLEASE_DIE, /* Poller termination requested. */
} ot_state_t;
typedef struct {
@@ -229,7 +231,8 @@ typedef struct {
int pipe[2];
gf_boolean_t own_thread;
ot_state_t ot_state;
- pthread_cond_t ot_event;
+ uint32_t ot_gen;
+ gf_boolean_t is_server;
} socket_private_t;