diff options
Diffstat (limited to 'rpc/rpc-transport/socket/src/socket.c')
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 361 | 
1 files changed, 196 insertions, 165 deletions
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index a4791437e6d..8ba2692cdc6 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2680,108 +2680,107 @@ socket_server_event_handler (int fd, int idx, void *data,          priv = this->private;          ctx  = this->ctx; -        pthread_mutex_lock (&priv->lock); -        { -                priv->idx = idx; - -                if (poll_in) { -                        new_sock = accept (priv->sock, SA (&new_sockaddr), -                                           &addrlen); - -                        if (new_sock == -1) { -                                gf_log (this->name, GF_LOG_WARNING, -                                        "accept on %d failed (%s)", -                                        priv->sock, strerror (errno)); -                                goto unlock; -                        } - -                        if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) { -                                ret = __socket_nodelay (new_sock); -                                if (ret == -1) { -                                        gf_log (this->name, GF_LOG_WARNING, -                                                "setsockopt() failed for " -                                                "NODELAY (%s)", -                                                strerror (errno)); -                                } -                        } +        /* NOTE: +         * We have done away with the critical section in this function. since +         * there's little that it helps with. There's no other code that +         * attempts to unref the listener socket/transport from any other +         * thread context while we are using it here. +         */ +        priv->idx = idx; -                        if (priv->keepalive && -                            new_sockaddr.ss_family != AF_UNIX) { -                                ret = __socket_keepalive (new_sock, -                                                          new_sockaddr.ss_family, -                                                          priv->keepaliveintvl, -                                                          priv->keepaliveidle, -                                                          priv->keepalivecnt, -                                                          priv->timeout); -                                if (ret == -1) -                                        gf_log (this->name, GF_LOG_WARNING, -                                                "Failed to set keep-alive: %s", -                                                strerror (errno)); -                        } +        if (poll_in) { +                new_sock = accept (priv->sock, SA (&new_sockaddr), &addrlen); -                        new_trans = GF_CALLOC (1, sizeof (*new_trans), -                                               gf_common_mt_rpc_trans_t); -                        if (!new_trans) { -                                sys_close (new_sock); -                                goto unlock; -                        } +                if (new_sock == -1) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "accept on %d failed (%s)", +                                priv->sock, strerror (errno)); +                        goto out; +                } -                        ret = pthread_mutex_init(&new_trans->lock, NULL); +                if (priv->nodelay && (new_sockaddr.ss_family != AF_UNIX)) { +                        ret = __socket_nodelay (new_sock);                          if (ret == -1) {                                  gf_log (this->name, GF_LOG_WARNING, -                                        "pthread_mutex_init() failed: %s", +                                        "setsockopt() failed for " +                                        "NODELAY (%s)",                                          strerror (errno)); -                                sys_close (new_sock); -                                GF_FREE (new_trans); -                                goto unlock;                          } -                        INIT_LIST_HEAD (&new_trans->list); +                } -                        new_trans->name = gf_strdup (this->name); +                if (priv->keepalive && +                                new_sockaddr.ss_family != AF_UNIX) { +                        ret = __socket_keepalive (new_sock, +                                                  new_sockaddr.ss_family, +                                                  priv->keepaliveintvl, +                                                  priv->keepaliveidle, +                                                  priv->keepalivecnt, +                                                  priv->timeout); +                        if (ret == -1) +                                gf_log (this->name, GF_LOG_WARNING, +                                        "Failed to set keep-alive: %s", +                                        strerror (errno)); +                } -                        memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, -                                addrlen); -                        new_trans->peerinfo.sockaddr_len = addrlen; +                new_trans = GF_CALLOC (1, sizeof (*new_trans), +                                gf_common_mt_rpc_trans_t); +                if (!new_trans) { +                        sys_close (new_sock); +                        goto out; +                } -                        new_trans->myinfo.sockaddr_len = -                                sizeof (new_trans->myinfo.sockaddr); +                ret = pthread_mutex_init(&new_trans->lock, NULL); +                if (ret == -1) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "pthread_mutex_init() failed: %s", +                                strerror (errno)); +                        sys_close (new_sock); +                        GF_FREE (new_trans); +                        goto out; +                } +                INIT_LIST_HEAD (&new_trans->list); -                        ret = getsockname (new_sock, -                                           SA (&new_trans->myinfo.sockaddr), -                                           &new_trans->myinfo.sockaddr_len); -                        if (ret == -1) { -                                gf_log (this->name, GF_LOG_WARNING, -                                        "getsockname on %d failed (%s)", -                                        new_sock, strerror (errno)); -                                sys_close (new_sock); -                                GF_FREE (new_trans->name); -                                GF_FREE (new_trans); -                                goto unlock; -                        } +                new_trans->name = gf_strdup (this->name); -                        get_transport_identifiers (new_trans); -                        ret = socket_init(new_trans); -                        if (ret != 0) { -                                sys_close (new_sock); -                                GF_FREE (new_trans->name); -                                GF_FREE (new_trans); -                                goto unlock; -                        } -                        new_trans->ops = this->ops; -                        new_trans->init = this->init; -                        new_trans->fini = this->fini; -                        new_trans->ctx  = ctx; -                        new_trans->xl   = this->xl; -                        new_trans->mydata = this->mydata; -                        new_trans->notify = this->notify; -                        new_trans->listener = this; -                        new_priv = new_trans->private; - -                        if (new_sockaddr.ss_family == AF_UNIX) { -                                new_priv->use_ssl = _gf_false; -                        } -                        else { -                                switch (priv->srvr_ssl) { +                memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, addrlen); +                new_trans->peerinfo.sockaddr_len = addrlen; + +                new_trans->myinfo.sockaddr_len = sizeof (new_trans->myinfo.sockaddr); + +                ret = getsockname (new_sock, SA (&new_trans->myinfo.sockaddr), +                                   &new_trans->myinfo.sockaddr_len); +                if (ret == -1) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "getsockname on %d failed (%s)", +                                new_sock, strerror (errno)); +                        sys_close (new_sock); +                        GF_FREE (new_trans->name); +                        GF_FREE (new_trans); +                        goto out; +                } + +                get_transport_identifiers (new_trans); +                ret = socket_init(new_trans); +                if (ret != 0) { +                        sys_close (new_sock); +                        GF_FREE (new_trans->name); +                        GF_FREE (new_trans); +                        goto out; +                } +                new_trans->ops = this->ops; +                new_trans->init = this->init; +                new_trans->fini = this->fini; +                new_trans->ctx  = ctx; +                new_trans->xl   = this->xl; +                new_trans->mydata = this->mydata; +                new_trans->notify = this->notify; +                new_trans->listener = this; +                new_priv = new_trans->private; + +                if (new_sockaddr.ss_family == AF_UNIX) { +                        new_priv->use_ssl = _gf_false; +                } else { +                        switch (priv->srvr_ssl) {                                  case MGMT_SSL_ALWAYS:                                          /* Glusterd with secure_mgmt. */                                          new_priv->use_ssl = _gf_true; @@ -2792,95 +2791,127 @@ socket_server_event_handler (int fd, int idx, void *data,                                          break;                                  default:                                          new_priv->use_ssl = _gf_false; -                                }                          } +                } -			new_priv->sock = new_sock; -			new_priv->own_thread = priv->own_thread; - -                        new_priv->ssl_ctx = priv->ssl_ctx; -			if (new_priv->use_ssl && !new_priv->own_thread) { -				cname = ssl_setup_connection(new_trans,1); -                                if (!cname) { -					gf_log(this->name,GF_LOG_ERROR, -					       "server setup failed"); -					sys_close (new_sock); -                                        GF_FREE (new_trans->name); -                                        GF_FREE (new_trans); -					goto unlock; -				} -                                this->ssl_name = cname; -			} +                new_priv->sock = new_sock; +                new_priv->own_thread = priv->own_thread; + +                new_priv->ssl_ctx = priv->ssl_ctx; +                if (new_priv->use_ssl && !new_priv->own_thread) { +                        cname = ssl_setup_connection(new_trans, 1); +                        if (!cname) { +                                gf_log(this->name, GF_LOG_ERROR, +                                       "server setup failed"); +                                sys_close (new_sock); +                                GF_FREE (new_trans->name); +                                GF_FREE (new_trans); +                                goto out; +                        } +                        this->ssl_name = cname; +                } -                        if (!priv->bio && !priv->own_thread) { -                                ret = __socket_nonblock (new_sock); +                if (!priv->bio && !priv->own_thread) { +                        ret = __socket_nonblock (new_sock); -                                if (ret == -1) { -                                        gf_log (this->name, GF_LOG_WARNING, -                                                "NBIO on %d failed (%s)", -                                                new_sock, strerror (errno)); +                        if (ret == -1) { +                                gf_log (this->name, GF_LOG_WARNING, +                                        "NBIO on %d failed (%s)", +                                        new_sock, strerror (errno)); -                                        sys_close (new_sock); -                                        GF_FREE (new_trans->name); -                                        GF_FREE (new_trans); -                                        goto unlock; -                                } +                                sys_close (new_sock); +                                GF_FREE (new_trans->name); +                                GF_FREE (new_trans); +                                goto out;                          } +                } -                        pthread_mutex_lock (&new_priv->lock); -                        { -                                /* -                                 * In the own_thread case, this is used to -                                 * indicate that we're initializing a server -                                 * connection. -                                 */ -                                new_priv->connected = 1; -                                new_priv->is_server = _gf_true; -                                rpc_transport_ref (new_trans); - -                                if (new_priv->own_thread) { -                                        if (pipe(new_priv->pipe) < 0) { -                                                gf_log(this->name, GF_LOG_ERROR, -                                                       "could not create pipe"); -                                        } -                                        ret = socket_spawn(new_trans); -                                        if (ret) { -                                                gf_log(this->name, GF_LOG_ERROR, -                                                       "could not spawn thread"); -                                                sys_close (new_priv->pipe[0]); -                                                sys_close (new_priv->pipe[1]); -                                        } -                                }  else { -                                        new_priv->idx = -                                                event_register (ctx->event_pool, -                                                                new_sock, -                                                           socket_event_handler, -                                                                new_trans, -                                                                1, 0); -                                        if (new_priv->idx == -1) { -                                                ret = -1; -                                                gf_log(this->name, GF_LOG_ERROR, -                                                       "failed to register the socket with event"); -                                        } -                                } +                /* +                 * In the own_thread case, this is used to +                 * indicate that we're initializing a server +                 * connection. +                 */ +                new_priv->connected = 1; +                new_priv->is_server = _gf_true; +                rpc_transport_ref (new_trans); +                if (new_priv->own_thread) { +                        if (pipe(new_priv->pipe) < 0) { +                                gf_log(this->name, GF_LOG_ERROR, +                                       "could not create pipe");                          } -                        pthread_mutex_unlock (&new_priv->lock); -                        if (ret == -1) { -                                sys_close (new_sock); -                                rpc_transport_unref (new_trans); -                                goto unlock; +                        ret = socket_spawn(new_trans); +                        if (ret) { +                                gf_log(this->name, GF_LOG_ERROR, +                                       "could not spawn thread"); +                                sys_close (new_priv->pipe[0]); +                                sys_close (new_priv->pipe[1]);                          } +                }  else { +                        /* Take a ref on the new_trans to avoid +                         * getting deleted when event_register() +                         * causes socket_event_handler() to race +                         * ahead of this path to eventually find +                         * a disconnect and unref the transport +                         */ +                        rpc_transport_ref (new_trans); -                        if (!priv->own_thread) { -                                ret = rpc_transport_notify (this, -                                        RPC_TRANSPORT_ACCEPT, new_trans); +                        /* Send a notification to RPCSVC layer +                         * to save the new_trans in its service +                         * list before we register the new_sock +                         * with epoll to begin receiving notifications +                         * for data handling. +                         */ +                        ret = rpc_transport_notify (this, RPC_TRANSPORT_ACCEPT, new_trans); + +                        if (ret != -1) { +                                new_priv->idx = +                                        event_register (ctx->event_pool, +                                                        new_sock, +                                                        socket_event_handler, +                                                        new_trans, +                                                        1, 0); +                                if (new_priv->idx == -1) { +                                        ret = -1; +                                        gf_log(this->name, GF_LOG_ERROR, +                                               "failed to register the socket " +                                               "with event"); + +                                        /* event_register() could have failed for some +                                         * reason, implying that the new_sock cannot be +                                         * added to the epoll set. If we wont get any +                                         * more notifications for new_sock from epoll, +                                         * then we better remove the corresponding +                                         * new_trans object from the RPCSVC service list. +                                         * Since we've notified RPC service of new_trans +                                         * before we attempted event_register(), we better +                                         * unlink the new_trans from the RPCSVC service list +                                         * to cleanup the stateby sending out a DISCONNECT +                                         * notification. +                                         */ +                                        rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, new_trans); +                                }                          } + +                        /* this rpc_transport_unref() is for managing race between +                         * 1. socket_server_event_handler and +                         * 2. socket_event_handler +                         * trying to add and remove new_trans from the rpcsvc +                         * service list +                         * now that we are done with the notifications, lets +                         * reduce the reference +                         */ +                        rpc_transport_unref (new_trans);                  } -        } -unlock: -        pthread_mutex_unlock (&priv->lock); +                if (ret == -1) { +                        sys_close (new_sock); +                        /* this unref is to actually cause the destruction of +                         * the new_trans since we've failed at everything so far +                         */ +                        rpc_transport_unref (new_trans); +                } +        }  out:          if (cname && (cname != this->ssl_name)) {                  GF_FREE(cname);  | 
