diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 359 |
1 files changed, 195 insertions, 164 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 5f9fd03860d..d4a40b574ca 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2657,107 +2657,106 @@ socket_server_event_handler (int fd, int idx, void *data, priv = this->private; ctx = this->ctx; - pthread_mutex_lock (&priv->lock); - { - priv->idx = idx; - - if (poll_in) { - new_sock = accept (priv->sock, SA (&new_sockaddr), - &addrlen); - - if (new_sock == -1) { - gf_log (this->name, GF_LOG_WARNING, - "accept on %d failed (%s)", - priv->sock, strerror (errno)); - goto unlock; - } - - if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) { - ret = __socket_nodelay (new_sock); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "setsockopt() failed for " - "NODELAY (%s)", - strerror (errno)); - } - } + /* NOTE: + * We have done away with the critical section in this function. since + * there's little that it helps with. There's no other code that + * attempts to unref the listener socket/transport from any other + * thread context while we are using it here. + */ + priv->idx = idx; - if (priv->keepalive && - new_sockaddr.ss_family != AF_UNIX) { - ret = __socket_keepalive (new_sock, - new_sockaddr.ss_family, - priv->keepaliveintvl, - priv->keepaliveidle, - priv->timeout); - if (ret == -1) - gf_log (this->name, GF_LOG_WARNING, - "Failed to set keep-alive: %s", - strerror (errno)); - } + if (poll_in) { + new_sock = accept (priv->sock, SA (&new_sockaddr), &addrlen); - new_trans = GF_CALLOC (1, sizeof (*new_trans), - gf_common_mt_rpc_trans_t); - if (!new_trans) { - sys_close (new_sock); - goto unlock; - } + if (new_sock == -1) { + gf_log (this->name, GF_LOG_WARNING, + "accept on %d failed (%s)", + priv->sock, strerror (errno)); + goto out; + } - ret = pthread_mutex_init(&new_trans->lock, NULL); + if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) { + ret = __socket_nodelay (new_sock); if (ret == -1) { gf_log (this->name, GF_LOG_WARNING, - "pthread_mutex_init() failed: %s", + "setsockopt() failed for " + "NODELAY (%s)", strerror (errno)); - sys_close (new_sock); - GF_FREE (new_trans); - goto unlock; } - INIT_LIST_HEAD (&new_trans->list); + } - new_trans->name = gf_strdup (this->name); + if (priv->keepalive && + new_sockaddr.ss_family != AF_UNIX) { + ret = __socket_keepalive (new_sock, + new_sockaddr.ss_family, + priv->keepaliveintvl, + priv->keepaliveidle, + priv->timeout); + if (ret == -1) + gf_log (this->name, GF_LOG_WARNING, + "Failed to set keep-alive: %s", + strerror (errno)); + } - memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, - addrlen); - new_trans->peerinfo.sockaddr_len = addrlen; + new_trans = GF_CALLOC (1, sizeof (*new_trans), + gf_common_mt_rpc_trans_t); + if (!new_trans) { + sys_close (new_sock); + goto out; + } - new_trans->myinfo.sockaddr_len = - sizeof (new_trans->myinfo.sockaddr); + ret = pthread_mutex_init(&new_trans->lock, NULL); + if (ret == -1) { + gf_log (this->name, GF_LOG_WARNING, + "pthread_mutex_init() failed: %s", + strerror (errno)); + sys_close (new_sock); + GF_FREE (new_trans); + goto out; + } + INIT_LIST_HEAD (&new_trans->list); - ret = getsockname (new_sock, - SA (&new_trans->myinfo.sockaddr), - &new_trans->myinfo.sockaddr_len); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "getsockname on %d failed (%s)", - new_sock, strerror (errno)); - sys_close (new_sock); - GF_FREE (new_trans->name); - GF_FREE (new_trans); - goto unlock; - } + new_trans->name = gf_strdup (this->name); - get_transport_identifiers (new_trans); - ret = socket_init(new_trans); - if (ret != 0) { - sys_close (new_sock); - GF_FREE (new_trans->name); - GF_FREE (new_trans); - goto unlock; - } - new_trans->ops = this->ops; - new_trans->init = this->init; - new_trans->fini = this->fini; - new_trans->ctx = ctx; - new_trans->xl = this->xl; - new_trans->mydata = this->mydata; - new_trans->notify = this->notify; - new_trans->listener = this; - new_priv = new_trans->private; - - if (new_sockaddr.ss_family == AF_UNIX) { - new_priv->use_ssl = _gf_false; - } - else { - switch (priv->srvr_ssl) { + memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, addrlen); + new_trans->peerinfo.sockaddr_len = addrlen; + + new_trans->myinfo.sockaddr_len = sizeof (new_trans->myinfo.sockaddr); + + ret = getsockname (new_sock, SA (&new_trans->myinfo.sockaddr), + &new_trans->myinfo.sockaddr_len); + if (ret == -1) { + gf_log (this->name, GF_LOG_WARNING, + "getsockname on %d failed (%s)", + new_sock, strerror (errno)); + sys_close (new_sock); + GF_FREE (new_trans->name); + GF_FREE (new_trans); + goto out; + } + + get_transport_identifiers (new_trans); + ret = socket_init(new_trans); + if (ret != 0) { + sys_close (new_sock); + GF_FREE (new_trans->name); + GF_FREE (new_trans); + goto out; + } + new_trans->ops = this->ops; + new_trans->init = this->init; + new_trans->fini = this->fini; + new_trans->ctx = ctx; + new_trans->xl = this->xl; + new_trans->mydata = this->mydata; + new_trans->notify = this->notify; + new_trans->listener = this; + new_priv = new_trans->private; + + if (new_sockaddr.ss_family == AF_UNIX) { + new_priv->use_ssl = _gf_false; + } else { + switch (priv->srvr_ssl) { case MGMT_SSL_ALWAYS: /* Glusterd with secure_mgmt. */ new_priv->use_ssl = _gf_true; @@ -2768,95 +2767,127 @@ socket_server_event_handler (int fd, int idx, void *data, break; default: new_priv->use_ssl = _gf_false; - } } + } - new_priv->sock = new_sock; - new_priv->own_thread = priv->own_thread; - - new_priv->ssl_ctx = priv->ssl_ctx; - if (new_priv->use_ssl && !new_priv->own_thread) { - cname = ssl_setup_connection(new_trans,1); - if (!cname) { - gf_log(this->name,GF_LOG_ERROR, - "server setup failed"); - sys_close (new_sock); - GF_FREE (new_trans->name); - GF_FREE (new_trans); - goto unlock; - } - this->ssl_name = cname; - } + new_priv->sock = new_sock; + new_priv->own_thread = priv->own_thread; - if (!priv->bio && !priv->own_thread) { - ret = __socket_nonblock (new_sock); + new_priv->ssl_ctx = priv->ssl_ctx; + if (new_priv->use_ssl && !new_priv->own_thread) { + cname = ssl_setup_connection(new_trans, 1); + if (!cname) { + gf_log(this->name, GF_LOG_ERROR, + "server setup failed"); + sys_close (new_sock); + GF_FREE (new_trans->name); + GF_FREE (new_trans); + goto out; + } + this->ssl_name = cname; + } - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "NBIO on %d failed (%s)", - new_sock, strerror (errno)); + if (!priv->bio && !priv->own_thread) { + ret = __socket_nonblock (new_sock); - sys_close (new_sock); - GF_FREE (new_trans->name); - GF_FREE (new_trans); - goto unlock; - } + if (ret == -1) { + gf_log (this->name, GF_LOG_WARNING, + "NBIO on %d failed (%s)", + new_sock, strerror (errno)); + + sys_close (new_sock); + GF_FREE (new_trans->name); + GF_FREE (new_trans); + goto out; } + } - pthread_mutex_lock (&new_priv->lock); - { - /* - * In the own_thread case, this is used to - * indicate that we're initializing a server - * connection. - */ - new_priv->connected = 1; - new_priv->is_server = _gf_true; - rpc_transport_ref (new_trans); - - if (new_priv->own_thread) { - if (pipe(new_priv->pipe) < 0) { - gf_log(this->name, GF_LOG_ERROR, - "could not create pipe"); - } - ret = socket_spawn(new_trans); - if (ret) { - gf_log(this->name, GF_LOG_ERROR, - "could not spawn thread"); - sys_close (new_priv->pipe[0]); - sys_close (new_priv->pipe[1]); - } - } else { - new_priv->idx = - event_register (ctx->event_pool, - new_sock, - socket_event_handler, - new_trans, - 1, 0); - if (new_priv->idx == -1) { - ret = -1; - gf_log(this->name, GF_LOG_ERROR, - "failed to register the socket with event"); - } - } + /* + * In the own_thread case, this is used to + * indicate that we're initializing a server + * connection. + */ + new_priv->connected = 1; + new_priv->is_server = _gf_true; + rpc_transport_ref (new_trans); + if (new_priv->own_thread) { + if (pipe(new_priv->pipe) < 0) { + gf_log(this->name, GF_LOG_ERROR, + "could not create pipe"); } - pthread_mutex_unlock (&new_priv->lock); - if (ret == -1) { - sys_close (new_sock); - rpc_transport_unref (new_trans); - goto unlock; + ret = socket_spawn(new_trans); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "could not spawn thread"); + sys_close (new_priv->pipe[0]); + sys_close (new_priv->pipe[1]); } + } else { + /* Take a ref on the new_trans to avoid + * getting deleted when event_register() + * causes socket_event_handler() to race + * ahead of this path to eventually find + * a disconnect and unref the transport + */ + rpc_transport_ref (new_trans); - if (!priv->own_thread) { - ret = rpc_transport_notify (this, - RPC_TRANSPORT_ACCEPT, new_trans); + /* Send a notification to RPCSVC layer + * to save the new_trans in its service + * list before we register the new_sock + * with epoll to begin receiving notifications + * for data handling. + */ + ret = rpc_transport_notify (this, RPC_TRANSPORT_ACCEPT, new_trans); + + if (ret != -1) { + new_priv->idx = + event_register (ctx->event_pool, + new_sock, + socket_event_handler, + new_trans, + 1, 0); + if (new_priv->idx == -1) { + ret = -1; + gf_log(this->name, GF_LOG_ERROR, + "failed to register the socket " + "with event"); + + /* event_register() could have failed for some + * reason, implying that the new_sock cannot be + * added to the epoll set. If we wont get any + * more notifications for new_sock from epoll, + * then we better remove the corresponding + * new_trans object from the RPCSVC service list. + * Since we've notified RPC service of new_trans + * before we attempted event_register(), we better + * unlink the new_trans from the RPCSVC service list + * to cleanup the stateby sending out a DISCONNECT + * notification. + */ + rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, new_trans); + } } + + /* this rpc_transport_unref() is for managing race between + * 1. socket_server_event_handler and + * 2. socket_event_handler + * trying to add and remove new_trans from the rpcsvc + * service list + * now that we are done with the notifications, lets + * reduce the reference + */ + rpc_transport_unref (new_trans); } - } -unlock: - pthread_mutex_unlock (&priv->lock); + if (ret == -1) { + sys_close (new_sock); + /* this unref is to actually cause the destruction of + * the new_trans since we've failed at everything so far + */ + rpc_transport_unref (new_trans); + } + } out: if (cname && (cname != this->ssl_name)) { GF_FREE(cname); |