diff options
| author | Rajesh Joseph <rjoseph@redhat.com> | 2016-12-13 15:28:42 +0530 | 
|---|---|---|
| committer | Raghavendra G <rgowdapp@redhat.com> | 2016-12-21 20:49:19 -0800 | 
| commit | af6769675acbbfd780fa2ece8587502d6d579372 (patch) | |
| tree | e464f44c08163b9ba84e3d91d1e9d71efd5c04dc | |
| parent | 8b42e1b5688f8600086ecc0e33ac4abf5e7c2772 (diff) | |
socket: socket disconnect should wait for poller thread exit
When SSL is enabled or if "transport.socket.own-thread" option is set
then socket_poller is run as different thread. Currently during
disconnect or PARENT_DOWN scenario we don't wait for this thread
to terminate. PARENT_DOWN will disconnect the socket layer and
cleanup resources used by socket_poller.
Therefore before disconnect we should wait for poller thread to exit.
Change-Id: I71f984b47d260ffd979102f180a99a0bed29f0d6
BUG: 1404181
Signed-off-by: Rajesh Joseph <rjoseph@redhat.com>
Reviewed-on: http://review.gluster.org/16141
Smoke: Gluster Build System <jenkins@build.gluster.org>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Kaushal M <kaushal@redhat.com>
Reviewed-by: Raghavendra Talur <rtalur@redhat.com>
Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt-ping.c | 2 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 4 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 5 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 4 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 2 | ||||
| -rw-r--r-- | rpc/rpc-transport/rdma/src/rdma.c | 19 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 98 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.h | 6 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-handshake.c | 7 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-utils.c | 3 | ||||
| -rw-r--r-- | xlators/protocol/client/src/client-handshake.c | 4 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server.c | 3 | 
12 files changed, 116 insertions, 41 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt-ping.c b/rpc/rpc-lib/src/rpc-clnt-ping.c index 3eb7e90cb01..e042121ad47 100644 --- a/rpc/rpc-lib/src/rpc-clnt-ping.c +++ b/rpc/rpc-lib/src/rpc-clnt-ping.c @@ -159,7 +159,7 @@ rpc_clnt_ping_timer_expired (void *rpc_ptr)                          trans->peerinfo.identifier,                          conn->ping_timeout); -                rpc_transport_disconnect (conn->trans); +                rpc_transport_disconnect (conn->trans, _gf_false);          }  out: diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index b868f56bdb3..d39b5236b91 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -1854,7 +1854,7 @@ rpc_clnt_disable (struct rpc_clnt *rpc)          pthread_mutex_unlock (&conn->lock);          if (trans) { -                rpc_transport_disconnect (trans); +                rpc_transport_disconnect (trans, _gf_true);          }          if (unref) @@ -1913,7 +1913,7 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)          pthread_mutex_unlock (&conn->lock);          if (trans) { -                rpc_transport_disconnect (trans); +                rpc_transport_disconnect (trans, _gf_true);          }          if (unref)                  rpc_clnt_unref (rpc); diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index 6ee5e15ede4..33e94450d9c 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -435,13 +435,14 @@ fail:  int32_t -rpc_transport_disconnect (rpc_transport_t *this) +rpc_transport_disconnect (rpc_transport_t *this, gf_boolean_t wait)  {  	int32_t ret = -1;  	GF_VALIDATE_OR_GOTO("rpc_transport", this, fail); -	ret = this->ops->disconnect (this); +        ret = this->ops->disconnect (this, wait); +  fail:  	return ret;  } diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 4e7a8c46fae..717c40af13a 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -224,7 +224,7 @@ struct rpc_transport_ops {                                     rpc_transport_reply_t *reply);          int32_t (*connect)        (rpc_transport_t *this, int port);          int32_t (*listen)         (rpc_transport_t *this); -        int32_t (*disconnect)     (rpc_transport_t *this); +        int32_t (*disconnect)     (rpc_transport_t *this, gf_boolean_t wait);          int32_t (*get_peername)   (rpc_transport_t *this, char *hostname,                                     int hostlen);          int32_t (*get_peeraddr)   (rpc_transport_t *this, char *peeraddr, @@ -248,7 +248,7 @@ int32_t  rpc_transport_connect (rpc_transport_t *this, int port);  int32_t -rpc_transport_disconnect (rpc_transport_t *this); +rpc_transport_disconnect (rpc_transport_t *this, gf_boolean_t wait);  int32_t  rpc_transport_destroy (rpc_transport_t *this); diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index c792909cb87..52b57205f6d 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1654,7 +1654,7 @@ rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name)          ret = 0;  out:          if (!listener && trans) { -                rpc_transport_disconnect (trans); +                rpc_transport_disconnect (trans, _gf_true);          }          return ret; diff --git a/rpc/rpc-transport/rdma/src/rdma.c b/rpc/rpc-transport/rdma/src/rdma.c index 551ac072079..d2f04bd6d0c 100644 --- a/rpc/rpc-transport/rdma/src/rdma.c +++ b/rpc/rpc-transport/rdma/src/rdma.c @@ -51,7 +51,7 @@ static int32_t  gf_rdma_teardown (rpc_transport_t *this);  static int32_t -gf_rdma_disconnect (rpc_transport_t *this); +gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait);  static void  gf_rdma_cm_handle_disconnect (rpc_transport_t *this); @@ -1209,7 +1209,7 @@ gf_rdma_cm_handle_connect_init (struct rdma_cm_event *event)          }          if (ret < 0) { -                gf_rdma_disconnect (this); +                gf_rdma_disconnect (this, _gf_false);          }          return ret; @@ -3014,7 +3014,7 @@ gf_rdma_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)                          RDMA_MSG_WRITE_PEER_FAILED,                          "sending request to peer (%s) failed",                          this->peerinfo.identifier); -                rpc_transport_disconnect (this); +                rpc_transport_disconnect (this, _gf_false);          }  out: @@ -3051,7 +3051,7 @@ gf_rdma_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)                          RDMA_MSG_WRITE_PEER_FAILED,                          "sending request to peer (%s) failed",                          this->peerinfo.identifier); -                rpc_transport_disconnect (this); +                rpc_transport_disconnect (this, _gf_false);          }  out: @@ -4095,7 +4095,7 @@ gf_rdma_process_recv (gf_rdma_peer_t *peer, struct ibv_wc *wc)  out:          if (ret == -1) { -                rpc_transport_disconnect (peer->trans); +                rpc_transport_disconnect (peer->trans, _gf_false);          }          return; @@ -4216,7 +4216,8 @@ gf_rdma_recv_completion_proc (void *data)                                  if (peer) {                                          ibv_ack_cq_events (event_cq, num_wr);                                          rpc_transport_unref (peer->trans); -                                        rpc_transport_disconnect (peer->trans); +                                        rpc_transport_disconnect (peer->trans, +                                                                  _gf_false);                                  }                                  if (post) { @@ -4292,7 +4293,7 @@ gf_rdma_handle_failed_send_completion (gf_rdma_peer_t *peer, struct ibv_wc *wc)          }          if (peer) { -                rpc_transport_disconnect (peer->trans); +                rpc_transport_disconnect (peer->trans, _gf_false);          }          return; @@ -4343,7 +4344,7 @@ gf_rdma_handle_successful_send_completion (gf_rdma_peer_t *peer,          ret = gf_rdma_pollin_notify (peer, post);          if ((ret == -1) && (peer != NULL)) { -                rpc_transport_disconnect (peer->trans); +                rpc_transport_disconnect (peer->trans, _gf_false);          }  out: @@ -4657,7 +4658,7 @@ gf_rdma_init (rpc_transport_t *this)  static int32_t -gf_rdma_disconnect (rpc_transport_t *this) +gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait)  {          gf_rdma_private_t *priv = NULL;          int32_t            ret  = 0; diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 36388548937..d05dc4189aa 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2344,7 +2344,7 @@ out:          return ret;  } -static int socket_disconnect (rpc_transport_t *this); +static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);  /* reads rpc_requests during pollin */  static int @@ -2375,7 +2375,7 @@ socket_event_handler (int fd, int idx, void *data,                          EINPROGRESS or ENOENT, so nothing more to do, fail                          reading/writing anything even if poll_in or poll_out                          is set */ -                        ret = socket_disconnect (this); +                        ret = socket_disconnect (this, _gf_false);                          /* Force ret to be -1, as we are officially done with                          this socket */ @@ -2424,6 +2424,13 @@ socket_poller (void *ctx)           * conditionally           */          THIS = this->xl; +        GF_REF_GET (priv); + +        if (priv->ot_state == OT_PLEASE_DIE) { +                gf_log (this->name, GF_LOG_DEBUG, "socket_poller is exiting " +                        "because socket state is OT_PLEASE_DIE"); +                goto err; +        }          priv->ot_state = OT_RUNNING; @@ -2494,6 +2501,13 @@ socket_poller (void *ctx)  			break;  		} +                if (priv->ot_state == OT_PLEASE_DIE) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "OT_PLEASE_DIE on %p (exiting socket_poller)", +                                this); +                        break; +                } +  		if (pfd[1].revents & POLL_MASK_INPUT) {  			ret = socket_event_poll_in(this);  			if (ret >= 0) { @@ -2507,7 +2521,6 @@ socket_poller (void *ctx)                                  gf_log (this->name, GF_LOG_TRACE,                                          "OT_IDLE on %p (input request)",                                          this); -                                priv->ot_state = OT_IDLE;                                  break;                          }  		} @@ -2524,7 +2537,6 @@ socket_poller (void *ctx)                                  gf_log (this->name, GF_LOG_TRACE,                                          "OT_IDLE on %p (output request)",                                          this); -                                priv->ot_state = OT_IDLE;                                  break;                          }  		} @@ -2561,22 +2573,24 @@ socket_poller (void *ctx)  err:  	/* All (and only) I/O errors should come here. */          pthread_mutex_lock(&priv->lock); +        { +                __socket_teardown_connection (this); +                sys_close (priv->sock); +                priv->sock = -1; -        __socket_teardown_connection (this); -        sys_close (priv->sock); -        priv->sock = -1; - -        sys_close (priv->pipe[0]); -        sys_close (priv->pipe[1]); -        priv->pipe[0] = -1; -        priv->pipe[1] = -1; - -        priv->ot_state = OT_IDLE; +                sys_close (priv->pipe[0]); +                sys_close (priv->pipe[1]); +                priv->pipe[0] = -1; +                priv->pipe[1] = -1; +                priv->ot_state = OT_IDLE; +        }          pthread_mutex_unlock(&priv->lock);          rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); +        GF_REF_PUT (priv); +          rpc_transport_unref (this);  	return NULL; @@ -2848,16 +2862,39 @@ out:  static int -socket_disconnect (rpc_transport_t *this) +socket_disconnect (rpc_transport_t *this, gf_boolean_t wait)  { -        socket_private_t *priv = NULL; -        int               ret = -1; +        socket_private_t *priv   = NULL; +        int               ret    = -1; +        char              a_byte = 'r';          GF_VALIDATE_OR_GOTO ("socket", this, out);          GF_VALIDATE_OR_GOTO ("socket", this->private, out);          priv = this->private; +        if (wait && priv->own_thread) { +                pthread_mutex_lock (&priv->cond_lock); +                { +                        GF_REF_PUT (priv); +                        /* Change the state to OT_PLEASE_DIE so that +                         * socket_poller can exit. */ +                        priv->ot_state = OT_PLEASE_DIE; +                        /* Write something into the pipe so that poller +                         * thread can wake up.*/ +                        if (sys_write (priv->pipe[1], &a_byte, 1) < 1) { +                                gf_log (this->name, GF_LOG_WARNING, +                                                "write error on pipe"); +                        } + +                        /* Wait for socket_poller to exit */ +                        if (!priv->own_thread_done) +                                pthread_cond_wait (&priv->cond, +                                                   &priv->cond_lock); +                } +                pthread_mutex_unlock (&priv->cond_lock); +        } +          pthread_mutex_lock (&priv->lock);          {                  ret = __socket_disconnect (this); @@ -2937,6 +2974,7 @@ socket_connect (rpc_transport_t *this, int port)          pthread_mutex_lock (&priv->lock);          { +                priv->own_thread_done = _gf_false;                  if (priv->sock != -1) {                          gf_log_callingfn (this->name, GF_LOG_TRACE,                                            "connect () called on transport " @@ -3805,6 +3843,26 @@ init_openssl_mt (void)          SSL_load_error_strings();  } +void +socket_poller_mayday (void *data) +{ +        socket_private_t *priv = (socket_private_t *)data; + +        if (priv == NULL) +                return; + +        pthread_mutex_lock (&priv->cond_lock); +        { +                /* Signal waiting threads before exiting from socket_poller */ +                if (!priv->own_thread_done) { +                        gf_log ("socket", GF_LOG_TRACE, "priv->cond SIGNALED"); +                        pthread_cond_signal (&priv->cond); +                        priv->own_thread_done = _gf_true; +                } +        } +        pthread_mutex_unlock (&priv->cond_lock); +} +  static int  socket_init (rpc_transport_t *this)  { @@ -3835,6 +3893,10 @@ socket_init (rpc_transport_t *this)          memset(priv,0,sizeof(*priv));          pthread_mutex_init (&priv->lock, NULL); +        pthread_mutex_init (&priv->cond_lock, NULL); +        pthread_cond_init (&priv->cond, NULL); + +        GF_REF_INIT (priv, socket_poller_mayday);          priv->sock = -1;          priv->idx = -1; @@ -4265,6 +4327,8 @@ fini (rpc_transport_t *this)                          "transport %p destroyed", this);                  pthread_mutex_destroy (&priv->lock); +                pthread_mutex_destroy (&priv->cond_lock); +                pthread_cond_destroy (&priv->cond);  		if (priv->ssl_private_key) {  			GF_FREE(priv->ssl_private_key);  		} diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h index 7c7005b59e7..8528bdeba8d 100644 --- a/rpc/rpc-transport/socket/src/socket.h +++ b/rpc/rpc-transport/socket/src/socket.h @@ -27,6 +27,7 @@  #include "dict.h"  #include "mem-pool.h"  #include "globals.h" +#include "refcount.h"  #ifndef MAX_IOVEC  #define MAX_IOVEC 16 @@ -215,6 +216,8 @@ typedef struct {          };          struct gf_sock_incoming incoming;          pthread_mutex_t        lock; +        pthread_mutex_t        cond_lock; +        pthread_cond_t         cond;          int                    windowsize;          char                   lowlat;          char                   nodelay; @@ -239,10 +242,13 @@ typedef struct {  	pthread_t              thread;  	int                    pipe[2];  	gf_boolean_t           own_thread; +        gf_boolean_t           own_thread_done;          ot_state_t             ot_state;          uint32_t               ot_gen;          gf_boolean_t           is_server;          int                    log_ctr; +        GF_REF_DECL;           /* refcount to keep track of socket_poller +                                  threads */  } socket_private_t; diff --git a/xlators/mgmt/glusterd/src/glusterd-handshake.c b/xlators/mgmt/glusterd/src/glusterd-handshake.c index dc772ca6f51..550ee2c10b3 100644 --- a/xlators/mgmt/glusterd/src/glusterd-handshake.c +++ b/xlators/mgmt/glusterd/src/glusterd-handshake.c @@ -1824,7 +1824,7 @@ __glusterd_mgmt_hndsk_version_ack_cbk (struct rpc_req *req, struct iovec *iov,  out:          if (ret != 0 && peerinfo) -                rpc_transport_disconnect (peerinfo->rpc->conn.trans); +                rpc_transport_disconnect (peerinfo->rpc->conn.trans, _gf_false);          rcu_read_unlock (); @@ -1949,7 +1949,8 @@ out:                  frame->local = NULL;                  STACK_DESTROY (frame->root);                  if (peerinfo) -                        rpc_transport_disconnect (peerinfo->rpc->conn.trans); +                        rpc_transport_disconnect (peerinfo->rpc->conn.trans, +                                                  _gf_false);          }          rcu_read_unlock (); @@ -2218,7 +2219,7 @@ __glusterd_peer_dump_version_cbk (struct rpc_req *req, struct iovec *iov,  out:          if (ret != 0 && peerinfo) -                rpc_transport_disconnect (peerinfo->rpc->conn.trans); +                rpc_transport_disconnect (peerinfo->rpc->conn.trans, _gf_false);          rcu_read_unlock (); diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index fedfb746c50..a1c9132feda 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -4059,7 +4059,8 @@ gd_check_and_update_rebalance_info (glusterd_volinfo_t *old_volinfo,          //Disconnect from rebalance process          if (glusterd_defrag_rpc_get (old->defrag)) { -                rpc_transport_disconnect (old->defrag->rpc->conn.trans); +                rpc_transport_disconnect (old->defrag->rpc->conn.trans, +                                          _gf_false);                  glusterd_defrag_rpc_put (old->defrag);          } diff --git a/xlators/protocol/client/src/client-handshake.c b/xlators/protocol/client/src/client-handshake.c index c28fa5dd7cd..adccd8d4e51 100644 --- a/xlators/protocol/client/src/client-handshake.c +++ b/xlators/protocol/client/src/client-handshake.c @@ -1558,7 +1558,7 @@ out:          if (conf) {                  /* Need this to connect the same transport on different port */                  /* ie, glusterd to glusterfsd */ -                rpc_transport_disconnect (conf->rpc->conn.trans); +                rpc_transport_disconnect (conf->rpc->conn.trans, _gf_false);          }          return ret; @@ -1677,7 +1677,7 @@ out:          STACK_DESTROY (frame->root);          if (ret != 0) -                rpc_transport_disconnect (conf->rpc->conn.trans); +                rpc_transport_disconnect (conf->rpc->conn.trans, _gf_false);          return ret;  } diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index a99c1470276..af3adb36ef2 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -897,7 +897,8 @@ reconfigure (xlator_t *this, dict_t *options)                                                  "unauthorized client, hence "                                                  "terminating the connection %s",                                                  xprt->peerinfo.identifier); -                                        rpc_transport_disconnect(xprt); +                                        rpc_transport_disconnect(xprt, +                                                                 _gf_false);                                  }                          }                  }  | 
