diff options
| author | Raghavendra G <raghavendra@gluster.com> | 2010-08-17 05:35:42 +0000 | 
|---|---|---|
| committer | Anand V. Avati <avati@dev.gluster.com> | 2010-08-18 00:37:03 -0700 | 
| commit | 4e01a54eaa6da1bd6817d62dcc51a75e22699e2b (patch) | |
| tree | 7832f4070729ef43f0ee1560bf3c737b2e123d49 /rpc/rpc-lib | |
| parent | d8a8a66523e06abc0f44e1cdfe528cbf28d881a9 (diff) | |
rpc - cleanup and changes related to rdma
- remove rpc_conn_state structure.
   - add a member to point struct rpc_req in rpc_transport_req structure.
     This is needed for rdma to store rdma specific per request data.
Signed-off-by: Raghavendra G <raghavendra@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
Diffstat (limited to 'rpc/rpc-lib')
| -rw-r--r-- | rpc/rpc-lib/src/auth-glusterfs.c | 2 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/auth-null.c | 2 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/auth-unix.c | 2 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 5 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 4 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 18 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc-auth.c | 2 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc-common.h | 4 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 768 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 92 | 
10 files changed, 404 insertions, 495 deletions
diff --git a/rpc/rpc-lib/src/auth-glusterfs.c b/rpc/rpc-lib/src/auth-glusterfs.c index 4748a318e74..459cad87791 100644 --- a/rpc/rpc-lib/src/auth-glusterfs.c +++ b/rpc/rpc-lib/src/auth-glusterfs.c @@ -180,7 +180,7 @@ err:  }  rpcsvc_auth_ops_t auth_glusterfs_ops = { -        .conn_init              = NULL, +        .transport_init         = NULL,          .request_init           = auth_glusterfs_request_init,          .authenticate           = auth_glusterfs_authenticate  }; diff --git a/rpc/rpc-lib/src/auth-null.c b/rpc/rpc-lib/src/auth-null.c index bfdabaa840c..20dd7e77c8b 100644 --- a/rpc/rpc-lib/src/auth-null.c +++ b/rpc/rpc-lib/src/auth-null.c @@ -50,7 +50,7 @@ int auth_null_authenticate (rpcsvc_request_t *req, void *priv)  }  rpcsvc_auth_ops_t auth_null_ops = { -        .conn_init              = NULL, +        .transport_init              = NULL,          .request_init           = auth_null_request_init,          .authenticate           = auth_null_authenticate  }; diff --git a/rpc/rpc-lib/src/auth-unix.c b/rpc/rpc-lib/src/auth-unix.c index 4e99c1a5b6f..30b395bd4fa 100644 --- a/rpc/rpc-lib/src/auth-unix.c +++ b/rpc/rpc-lib/src/auth-unix.c @@ -70,7 +70,7 @@ err:  }  rpcsvc_auth_ops_t auth_unix_ops = { -        .conn_init              = NULL, +        .transport_init              = NULL,          .request_init           = auth_unix_request_init,          .authenticate           = auth_unix_authenticate  }; diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index da25d33c1ed..bac8b0246a8 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -418,7 +418,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)          if (ret == -1) {                  gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the saved " -                        "frame corresponding to xid (%d)", info->xid); +                        "frame corresponding to xid (%d) for msg arrived on " +                        "transport %s", +                        info->xid, clnt->conn.trans->name);                  goto out;          } @@ -1268,6 +1270,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                  req.rsp.rsp_payload = rsp_payload;                  req.rsp.rsp_payload_count = rsp_payload_count;                  req.rsp.rsp_iobref = rsp_iobref; +                req.rpc_req = rpcreq;                  ret = rpc_transport_submit_request (rpc->conn.trans,                                                      &req); diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index 200ff383052..e3bc519fc97 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -587,7 +587,7 @@ validate_volume_options (char *name, dict_t *options, volume_option_t *opt)  int32_t  rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen, -                          struct sockaddr *sa, size_t salen) +                          struct sockaddr_storage *sa, size_t salen)  {          if (!this)                  return -1; @@ -614,7 +614,7 @@ rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen)  int32_t  rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, -                            struct sockaddr *sa, size_t salen) +                            struct sockaddr_storage *sa, size_t salen)  {          if (!this)                  return -1; diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 7ef3abb7320..a6a3441dcdd 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -125,8 +125,9 @@ struct rpc_transport_rsp {  typedef struct rpc_transport_rsp rpc_transport_rsp_t;  struct rpc_transport_req { -        rpc_transport_msg_t msg; -        rpc_transport_rsp_t rsp; +        rpc_transport_msg_t  msg; +        rpc_transport_rsp_t  rsp; +        struct rpc_req      *rpc_req;  };  typedef struct rpc_transport_req rpc_transport_req_t; @@ -168,6 +169,11 @@ typedef int (*rpc_transport_notify_t) (rpc_transport_t *, void *mydata,                                         rpc_transport_event_t, void *data, ...);  struct rpc_transport {  	struct rpc_transport_ops  *ops; +        rpc_transport_t           *listener; /* listener transport to which +                                              * request for creation of this +                                              * transport came from. valid only +                                              * on server process. +                                              */  	void                      *private;          void                      *xl_private;          void                      *xl;       /* Used for THIS */ @@ -202,12 +208,12 @@ struct rpc_transport_ops {          int32_t (*get_peername)   (rpc_transport_t *this, char *hostname,                                     int hostlen);          int32_t (*get_peeraddr)   (rpc_transport_t *this, char *peeraddr, -                                   int addrlen, struct sockaddr *sa, +                                   int addrlen, struct sockaddr_storage *sa,                                     socklen_t sasize);          int32_t (*get_myname)     (rpc_transport_t *this, char *hostname,                                     int hostlen);          int32_t (*get_myaddr)     (rpc_transport_t *this, char *peeraddr, -                                   int addrlen, struct sockaddr *sa, +                                   int addrlen, struct sockaddr_storage *sa,                                     socklen_t sasize);  }; @@ -253,14 +259,14 @@ rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen);  int32_t  rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, -                            struct sockaddr *sa, size_t salen); +                            struct sockaddr_storage *sa, size_t salen);  int32_t  rpc_transport_get_myname (rpc_transport_t *this, char *hostname, int hostlen);  int32_t  rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen, -                          struct sockaddr *sa, size_t salen); +                          struct sockaddr_storage *sa, size_t salen);  rpc_transport_pollin_t *  rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, diff --git a/rpc/rpc-lib/src/rpcsvc-auth.c b/rpc/rpc-lib/src/rpcsvc-auth.c index d14b91f3a01..5cfa255ba95 100644 --- a/rpc/rpc-lib/src/rpcsvc-auth.c +++ b/rpc/rpc-lib/src/rpcsvc-auth.c @@ -210,7 +210,7 @@ __rpcsvc_auth_get_handler (rpcsvc_request_t *req)          if (!req)                  return NULL; -        svc = rpcsvc_request_service (req); +        svc = req->svc;          if (!svc) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "!svc");                  goto err; diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h index aef791cfa0d..7e72bc3ae44 100644 --- a/rpc/rpc-lib/src/rpcsvc-common.h +++ b/rpc/rpc-lib/src/rpcsvc-common.h @@ -62,8 +62,6 @@ typedef struct rpcsvc_state {          glusterfs_ctx_t         *ctx; -        void                    *listener; -          /* list of connections which will listen for incoming connections */          struct list_head         listeners; @@ -76,7 +74,7 @@ typedef struct rpcsvc_state {          void                    *mydata; /* This is xlator */          rpcsvc_notify_t          notifyfn; - +        struct mem_pool         *rxpool;  } rpcsvc_t; diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 57e9b12aaa6..82a19bbd19d 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -48,16 +48,17 @@  struct rpcsvc_program gluster_dump_prog; - -#define rpcsvc_alloc_request(con, request)                              \ +#define rpcsvc_alloc_request(svc, request)                              \          do {                                                            \ -                request = (rpcsvc_request_t *) mem_get ((con)->rxpool); \ +                request = (rpcsvc_request_t *) mem_get ((svc)->rxpool); \                  memset (request, 0, sizeof (rpcsvc_request_t));         \          } while (0) +rpcsvc_listener_t * +rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans);  int -rpcsvc_conn_peer_check_search (dict_t *options, char *pattern, char *clstr) +rpcsvc_transport_peer_check_search (dict_t *options, char *pattern, char *clstr)  {          int                     ret = -1;          char                    *addrtok = NULL; @@ -103,7 +104,7 @@ err:  int -rpcsvc_conn_peer_check_allow (dict_t *options, char *volname, char *clstr) +rpcsvc_transport_peer_check_allow (dict_t *options, char *volname, char *clstr)  {          int     ret = RPCSVC_AUTH_DONTCARE;          char    *srchstr = NULL; @@ -126,7 +127,7 @@ rpcsvc_conn_peer_check_allow (dict_t *options, char *volname, char *clstr)          } else                  srchstr = globalrule; -        ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr); +        ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr);          if (volname)                  GF_FREE (srchstr); @@ -139,7 +140,7 @@ out:  }  int -rpcsvc_conn_peer_check_reject (dict_t *options, char *volname, char *clstr) +rpcsvc_transport_peer_check_reject (dict_t *options, char *volname, char *clstr)  {          int     ret = RPCSVC_AUTH_DONTCARE;          char    *srchstr = NULL; @@ -158,7 +159,7 @@ rpcsvc_conn_peer_check_reject (dict_t *options, char *volname, char *clstr)          } else                  srchstr = generalrule; -        ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr); +        ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr);          if (volname)                  GF_FREE (srchstr); @@ -301,18 +302,18 @@ rpcsvc_combine_gen_spec_volume_checks (int gen, int spec)  int -rpcsvc_conn_peer_check_name (dict_t *options, char *volname, -                             rpcsvc_conn_t *conn) +rpcsvc_transport_peer_check_name (dict_t *options, char *volname, +                                  rpc_transport_t *trans)  {          int     ret = RPCSVC_AUTH_REJECT;          int     aret = RPCSVC_AUTH_REJECT;          int     rjret = RPCSVC_AUTH_REJECT;          char    clstr[RPCSVC_PEER_STRLEN]; -        if (!conn) +        if (!trans)                  return ret; -        ret = rpcsvc_conn_peername (conn, clstr, RPCSVC_PEER_STRLEN); +        ret = rpc_transport_get_peername (trans, clstr, RPCSVC_PEER_STRLEN);          if (ret != 0) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "                          "%s", gai_strerror (ret)); @@ -320,8 +321,8 @@ rpcsvc_conn_peer_check_name (dict_t *options, char *volname,                  goto err;          } -        aret = rpcsvc_conn_peer_check_allow (options, volname, clstr); -        rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr); +        aret = rpcsvc_transport_peer_check_allow (options, volname, clstr); +        rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr);          ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret); @@ -331,17 +332,19 @@ err:  int -rpcsvc_conn_peer_check_addr (dict_t *options, char *volname,rpcsvc_conn_t *conn) +rpcsvc_transport_peer_check_addr (dict_t *options, char *volname, +                                  rpc_transport_t *trans)  {          int     ret = RPCSVC_AUTH_REJECT;          int     aret = RPCSVC_AUTH_DONTCARE;          int     rjret = RPCSVC_AUTH_REJECT;          char    clstr[RPCSVC_PEER_STRLEN]; -        if (!conn) +        if (!trans)                  return ret; -        ret = rpcsvc_conn_peeraddr (conn, clstr, RPCSVC_PEER_STRLEN, NULL, 0); +        ret = rpcsvc_transport_peeraddr (trans, clstr, RPCSVC_PEER_STRLEN, NULL, +                                         0);          if (ret != 0) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "                          "%s", gai_strerror (ret)); @@ -349,8 +352,8 @@ rpcsvc_conn_peer_check_addr (dict_t *options, char *volname,rpcsvc_conn_t *conn)                  goto err;          } -        aret = rpcsvc_conn_peer_check_allow (options, volname, clstr); -        rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr); +        aret = rpcsvc_transport_peer_check_allow (options, volname, clstr); +        rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr);          ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret);  err: @@ -359,8 +362,8 @@ err:  int -rpcsvc_conn_check_volume_specific (dict_t *options, char *volname, -                                   rpcsvc_conn_t *conn) +rpcsvc_transport_check_volume_specific (dict_t *options, char *volname, +                                        rpc_transport_t *trans)  {          int             namechk = RPCSVC_AUTH_REJECT;          int             addrchk = RPCSVC_AUTH_REJECT; @@ -368,7 +371,7 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname,          char            *namestr = NULL;          int             ret = 0; -        if ((!options) || (!volname) || (!conn)) +        if ((!options) || (!volname) || (!trans))                  return RPCSVC_AUTH_REJECT;          /* Enabled by default */ @@ -389,8 +392,9 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname,           * specific which will over-ride the network address rules.           */          if (namelookup) -                namechk = rpcsvc_conn_peer_check_name (options, volname, conn); -        addrchk = rpcsvc_conn_peer_check_addr (options, volname, conn); +                namechk = rpcsvc_transport_peer_check_name (options, volname, +                                                            trans); +        addrchk = rpcsvc_transport_peer_check_addr (options, volname, trans);          if (namelookup)                  ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk); @@ -402,7 +406,7 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname,  int -rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn) +rpcsvc_transport_check_volume_general (dict_t *options, rpc_transport_t *trans)  {          int             addrchk = RPCSVC_AUTH_REJECT;          int             namechk = RPCSVC_AUTH_REJECT; @@ -410,7 +414,7 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn)          char            *namestr = NULL;          int             ret = 0; -        if ((!options) || (!conn)) +        if ((!options) || (!trans))                  return RPCSVC_AUTH_REJECT;          /* Enabled by default */ @@ -431,8 +435,10 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn)           * specific which will over-ride the network address rules.           */          if (namelookup) -                namechk = rpcsvc_conn_peer_check_name (options, NULL, conn); -        addrchk = rpcsvc_conn_peer_check_addr (options, NULL, conn); +                namechk = rpcsvc_transport_peer_check_name (options, NULL,  +                                                            trans); + +        addrchk = rpcsvc_transport_peer_check_addr (options, NULL, trans);          if (namelookup)                  ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk); @@ -443,17 +449,18 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn)  }  int -rpcsvc_conn_peer_check (dict_t *options, char *volname, rpcsvc_conn_t *conn) +rpcsvc_transport_peer_check (dict_t *options, char *volname, +                             rpc_transport_t *trans)  {          int     general_chk = RPCSVC_AUTH_REJECT;          int     specific_chk = RPCSVC_AUTH_REJECT; -        if ((!options) || (!volname) || (!conn)) +        if ((!options) || (!volname) || (!trans))                  return RPCSVC_AUTH_REJECT; -        general_chk = rpcsvc_conn_check_volume_general (options, conn); -        specific_chk = rpcsvc_conn_check_volume_specific (options, volname, -                                                          conn); +        general_chk = rpcsvc_transport_check_volume_general (options, trans); +        specific_chk = rpcsvc_transport_check_volume_specific (options, volname, +                                                               trans);          return rpcsvc_combine_gen_spec_volume_checks (general_chk,specific_chk);  } @@ -494,68 +501,10 @@ out:  } - -/* Initialize the core of a connection */ -rpcsvc_conn_t * -rpcsvc_conn_alloc (rpcsvc_t *svc, rpc_transport_t *trans) -{ -        rpcsvc_conn_t  *conn = NULL; -        int             ret = -1; -        unsigned int    poolcount = 0; - -        conn = GF_CALLOC (1, sizeof(*conn), gf_common_mt_rpcsvc_conn_t); -        if (!conn) { -                gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed"); -                return NULL; -        } - -        conn->trans = trans; -        conn->svc   = svc; -        poolcount   = RPCSVC_POOLCOUNT_MULT * svc->memfactor; - -        gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount); -        conn->rxpool = mem_pool_new (rpcsvc_request_t, poolcount); -        /* TODO: leak */ -        if (!conn->rxpool) { -                gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed"); -                goto free_conn; -        } - -        /* Cannot consider a connection connected unless the user of this -         * connection decides it is ready to use. It is possible that we have -         * to free this connection soon after. That free will not happpen -         * unless the state is disconnected. -         */ -        conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED; -        pthread_mutex_init (&conn->connlock, NULL); -        conn->connref = 0; - -        ret = 0; - -free_conn: -        if (ret == -1) { -                GF_FREE (conn); -                conn = NULL; -        } - -        return conn; -} -  int  rpcsvc_notify (rpc_transport_t *trans, void *mydata,                 rpc_transport_event_t event, void *data, ...); -void -rpcsvc_conn_state_init (rpcsvc_conn_t *conn) -{ -        if (!conn) -                return; - -        ++conn->connref; -        conn->connstate = RPCSVC_CONNSTATE_CONNECTED; -} - -  rpcsvc_notify_wrapper_t *  rpcsvc_notify_wrapper_alloc (void)  { @@ -582,11 +531,7 @@ rpcsvc_listener_destroy (rpcsvc_listener_t *listener)                  goto out;          } -        if (!listener->conn) { -                goto listener_free; -        } - -        svc = listener->conn->svc; +        svc = listener->svc;          if (!svc) {                  goto listener_free;          } @@ -604,162 +549,11 @@ out:  } -void -rpcsvc_conn_destroy (rpcsvc_conn_t *conn) -{ -        rpcsvc_listener_t       *listener = NULL; - -        if (!conn || !conn->rxpool || !conn->listener) -                goto out; - -        if (conn->trans) -                rpc_transport_destroy (conn->trans); - -        mem_pool_destroy (conn->rxpool); - -        listener = conn->listener; -        if (listener->conn == conn) { -                rpcsvc_listener_destroy (listener); -        } - -        /* Need to destory record state, txlists etc. */ -        GF_FREE (conn); -out: -        gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection destroyed"); -} - - -rpcsvc_conn_t * -rpcsvc_conn_init (rpcsvc_t *svc, rpc_transport_t *trans) -{ -        int            ret  = -1; -        rpcsvc_conn_t *conn = NULL; - -        conn = rpcsvc_conn_alloc (svc, trans); -        if (!conn) { -                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "cannot init a connection"); -                goto out; -        } - -        ret = rpc_transport_register_notify (trans, rpcsvc_notify, conn); -        if (ret == -1) { -                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed"); -                rpcsvc_conn_destroy (conn); -                conn = NULL; -                goto out; -        } - -        rpcsvc_conn_state_init (conn); - -out: -        return conn; -} - - -int -__rpcsvc_conn_unref (rpcsvc_conn_t *conn) -{ -        --conn->connref; -        return conn->connref; -} - - -void -__rpcsvc_conn_deinit (rpcsvc_conn_t *conn) -{ -        if (!conn) -                return; - -        if (rpcsvc_conn_check_active (conn)) { -                conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED; -        } - -        if (conn->trans) { -                rpc_transport_disconnect (conn->trans); -                conn->trans = NULL; -        } -} - - -void -rpcsvc_conn_deinit (rpcsvc_conn_t *conn) -{ -        int ref = 0; - -        if (!conn) -                return; - -        pthread_mutex_lock (&conn->connlock); -        { -                __rpcsvc_conn_deinit (conn); -                ref = __rpcsvc_conn_unref (conn); -        } -        pthread_mutex_unlock (&conn->connlock); - -        if (ref == 0) -                rpcsvc_conn_destroy (conn); - -        return; -} - - -void -rpcsvc_conn_unref (rpcsvc_conn_t *conn) -{ -        int ref = 0; -        if (!conn) -                return; - -        pthread_mutex_lock (&conn->connlock); -        { -                ref = __rpcsvc_conn_unref (conn); -        } -        pthread_mutex_unlock (&conn->connlock); - -        if (ref == 0) { -                rpcsvc_conn_destroy (conn); -        } -} - -  int -rpcsvc_conn_active (rpcsvc_conn_t *conn) +rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, +                            rpc_transport_t *trans)  { -        int     status = 0; - -        if (!conn) -                return 0; - -        pthread_mutex_lock (&conn->connlock); -        { -                status = rpcsvc_conn_check_active (conn); -        } -        pthread_mutex_unlock (&conn->connlock); - -        return status; -} - - -void -rpcsvc_conn_ref (rpcsvc_conn_t *conn) -{ -        if (!conn) -                return; - -        pthread_mutex_lock (&conn->connlock); -        { -                ++conn->connref; -        } -        pthread_mutex_unlock (&conn->connlock); - -        return; -} - - -int -rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn) -{ -        struct sockaddr_in      sa; +        struct sockaddr_storage sa  = {0, };          int                     ret = RPCSVC_AUTH_REJECT;          socklen_t               sasize = sizeof (sa);          char                    *srchstr = NULL; @@ -769,11 +563,11 @@ rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn)          uint16_t                port = 0;          gf_boolean_t            insecure = _gf_false; -        if ((!svc) || (!volname) || (!conn)) +        if ((!svc) || (!volname) || (!trans))                  return ret; -        ret = rpcsvc_conn_peeraddr (conn, NULL, 0, (struct sockaddr *)&sa, -                                    sasize); +        ret = rpcsvc_transport_peeraddr (trans, NULL, 0, &sa, +                                         sasize);          if (ret != 0) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get peer addr: %s",                          gai_strerror (ret)); @@ -781,7 +575,12 @@ rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn)                  goto err;          } -        port = ntohs (sa.sin_port); +        if (sa.ss_family == AF_INET) { +                port = ((struct sockaddr_in *)&sa)->sin_port; +        } else { +                port = ((struct sockaddr_in6 *)&sa)->sin6_port; +        } +          gf_log (GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port);          /* If the port is already a privileged one, dont bother with checking           * options. @@ -851,7 +650,7 @@ err:   * of the pointers below are NULL.   */  rpcsvc_actor_t * -rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req) +rpcsvc_program_actor (rpcsvc_request_t *req)  {          rpcsvc_program_t        *program = NULL;          int                     err      = SYSTEM_ERR; @@ -859,10 +658,10 @@ rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req)          rpcsvc_t                *svc     = NULL;          char                    found    = 0; -        if ((!conn) || (!req)) +        if (!req)                  goto err; -        svc = conn->svc; +        svc = req->svc;          pthread_mutex_lock (&svc->rpclock);          {                  list_for_each_entry (program, &svc->programs, program) { @@ -938,9 +737,9 @@ rpcsvc_program_notify (rpcsvc_listener_t *listener, rpcsvc_event_t event,                  goto out;          } -        list_for_each_entry (wrapper, &listener->list, list) { +        list_for_each_entry (wrapper, &listener->svc->notify, list) {                  if (wrapper->notify) { -                        wrapper->notify (listener->conn->svc, +                        wrapper->notify (listener->svc,                                           wrapper->data,                                           event, data);                  } @@ -951,37 +750,29 @@ out:  } -int -rpcsvc_accept (rpcsvc_conn_t *listen_conn, rpc_transport_t *new_trans) +inline int +rpcsvc_accept (rpcsvc_t *svc, rpc_transport_t *listen_trans, +               rpc_transport_t *new_trans)  {          rpcsvc_listener_t *listener = NULL; -        rpcsvc_conn_t     *conn     = NULL; -        char               clstr[RPCSVC_PEER_STRLEN]; - -        listener = listen_conn->listener; -        conn = rpcsvc_conn_init (listen_conn->svc, new_trans); -        if (!conn) { -                rpc_transport_disconnect (new_trans); -                memset (clstr, 0, RPCSVC_PEER_STRLEN); -                rpc_transport_get_peername (new_trans, clstr, -                                            RPCSVC_PEER_STRLEN); -                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "allocating connection for " -                        "new transport (%s) failed", clstr); +        int32_t            ret      = -1; + +        listener = rpcsvc_get_listener (svc, -1, listen_trans); +        if (listener == NULL) {                  goto out;          } -        conn->listener = listener; - -        //rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, conn); +        rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, new_trans); +        ret = 0;  out: -        return 0; +        return ret;  }  void -rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req) +rpcsvc_request_destroy (rpcsvc_request_t *req)  { -        if (!conn || !req) { +        if (!req) {                  goto out;          } @@ -989,18 +780,21 @@ rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req)                  iobref_unref (req->iobref);          } -        mem_put (conn->rxpool, req); +        mem_put (req->svc->rxpool, req); + +        rpc_transport_unref (req->trans);  out:          return;  }  rpcsvc_request_t * -rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg, +rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans, +                     struct rpc_msg *callmsg,                       struct iovec progmsg, rpc_transport_pollin_t *msg,                       rpcsvc_request_t *req)  { -        if ((!conn) || (!callmsg)|| (!req) || (!msg)) +        if ((!trans) || (!callmsg)|| (!req) || (!msg))                  return NULL;          /* We start a RPC request as always denied. */ @@ -1009,7 +803,7 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,          req->prognum = rpc_call_program (callmsg);          req->progver = rpc_call_progver (callmsg);          req->procnum = rpc_call_progproc (callmsg); -        req->conn = conn; +        req->trans = rpc_transport_ref (trans);          req->count = msg->count;          req->msg[0] = progmsg;          req->iobref = iobref_ref (msg->iobref); @@ -1017,6 +811,7 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,                  req->msg[1] = msg->vector[1];          } +        req->svc = svc;          req->trans_private = msg->private;          INIT_LIST_HEAD (&req->txlist); @@ -1038,7 +833,8 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,  rpcsvc_request_t * -rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) +rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, +                       rpc_transport_pollin_t *msg)  {          char                    *msgbuf = NULL;          struct rpc_msg          rpcmsg; @@ -1047,7 +843,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)          size_t                  msglen  = 0;          int                     ret     = -1; -        if (!conn) +        if (!svc || !trans)                  return NULL;          /* We need to allocate the request before actually calling @@ -1056,7 +852,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)           * This avoids a need to keep a temp buffer into which the auth data           * would've been copied otherwise.           */ -        rpcsvc_alloc_request (conn, req); +        rpcsvc_alloc_request (svc, req);          if (!req) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to alloc request");                  goto err; @@ -1075,7 +871,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)          }          ret = -1; -        rpcsvc_request_init (conn, &rpcmsg, progmsg, msg, req); +        rpcsvc_request_init (svc, trans, &rpcmsg, progmsg, msg, req);          gf_log (GF_RPCSVC, GF_LOG_TRACE, "RPC XID: %lx, Ver: %ld, Program: %ld,"                  " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg), @@ -1120,30 +916,33 @@ err:  int -rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) +rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, +                        rpc_transport_pollin_t *msg)  {          rpcsvc_actor_t          *actor = NULL;          rpcsvc_request_t        *req = NULL;          int                     ret = -1; -        if (!conn) +        if (!trans || !svc)                  return -1; -        req = rpcsvc_request_create (conn, msg); +        req = rpcsvc_request_create (svc, trans, msg);          if (!req)                  goto err;          if (!rpcsvc_request_accepted (req))                  goto err_reply; -        actor = rpcsvc_program_actor (conn, req); +        actor = rpcsvc_program_actor (req);          if (!actor)                  goto err_reply;          if (actor && (req->rpc_err == SUCCESS)) { +                /* Before going to xlator code, set the THIS properly */ +                THIS = svc->mydata; +                  if (req->count == 2) {                          if (actor->vector_actor) { -                                rpcsvc_conn_ref (conn);                                  ret = actor->vector_actor (req, &req->msg[1], 1,                                                             req->iobref);                          } else { @@ -1153,16 +952,14 @@ rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)                                  ret = RPCSVC_ACTOR_ERROR;                          }                  } else if (actor->actor) { -                        rpcsvc_conn_ref (req->conn); -                        /* Before going to xlator code, set the THIS properly */ -                        THIS = conn->svc->mydata;                          ret = actor->actor (req);                  }          }  err_reply: -        if ((ret == RPCSVC_ACTOR_ERROR) || (req->rpc_err != SUCCESS)) +        if ((ret == RPCSVC_ACTOR_ERROR) || (req->rpc_err != SUCCESS)) {                  ret = rpcsvc_error_reply (req); +        }          if (ret)                  gf_log ("rpcsvc", GF_LOG_DEBUG, "failed to queue error reply"); @@ -1170,39 +967,87 @@ err_reply:          /* No need to propagate error beyond this function since the reply           * has now been queued. */          ret = 0; +  err:          return ret;  }  int +rpcsvc_handle_disconnect (rpcsvc_t *svc, rpc_transport_t *trans) +{ +        rpcsvc_event_t           event; +        rpcsvc_notify_wrapper_t *wrappers = NULL, *wrapper; +        int32_t                  ret      = -1, i = 0, wrapper_count = 0; +        rpcsvc_listener_t       *listener = NULL; +  +        event = (trans->listener == NULL) ? RPCSVC_EVENT_LISTENER_DEAD +                : RPCSVC_EVENT_DISCONNECT; +  +        pthread_mutex_lock (&svc->rpclock); +        { +                wrappers = GF_CALLOC (svc->notify_count, sizeof (*wrapper), +                                      gf_common_mt_rpcsvc_wrapper_t); +                if (!wrappers) { +                        goto unlock; +                } +  +                list_for_each_entry (wrapper, &svc->notify, list) { +                        if (wrapper->notify) { +                                wrappers[i++] = *wrapper; +                        } +                } +  +                wrapper_count = i; +        } +unlock: +        pthread_mutex_unlock (&svc->rpclock); +  +        if (wrappers) { +                for (i = 0; i < wrapper_count; i++) { +                        wrappers[i].notify (svc, wrappers[i].data, +                                            event, trans); +                } +  +                GF_FREE (wrappers); +        } +  +        if (event == RPCSVC_EVENT_LISTENER_DEAD) { +                listener = rpcsvc_get_listener (svc, -1, trans->listener); +                rpcsvc_listener_destroy (listener); +        } +  +        return ret; +} +  + +int  rpcsvc_notify (rpc_transport_t *trans, void *mydata,                 rpc_transport_event_t event, void *data, ...)  { -        rpcsvc_conn_t          *conn      = NULL;          int                     ret       = -1;          rpc_transport_pollin_t *msg       = NULL;          rpc_transport_t        *new_trans = NULL; +        rpcsvc_t               *svc       = NULL; -        conn = mydata; -        if (conn == NULL) { +        svc = mydata; +        if (svc == NULL) {                  goto out;          }          switch (event) {          case RPC_TRANSPORT_ACCEPT:                  new_trans = data; -                ret = rpcsvc_accept (conn, new_trans); +                ret = rpcsvc_accept (svc, trans, new_trans);                  break;          case RPC_TRANSPORT_DISCONNECT: -                rpcsvc_conn_deinit (conn); -                ret = 0; +                ret = rpcsvc_handle_disconnect (svc, trans);                  break;          case RPC_TRANSPORT_MSG_RECEIVED:                  msg = data; -                ret = rpcsvc_handle_rpc_call (conn, msg); +                ret = rpcsvc_handle_rpc_call (svc, trans, msg);                  break;          case RPC_TRANSPORT_MSG_SENT: @@ -1274,16 +1119,16 @@ err:  } -int -rpcsvc_conn_submit (rpcsvc_conn_t *conn, struct iovec *hdrvec, -                    int hdrcount, struct iovec *proghdr, int proghdrcount, -                    struct iovec *progpayload, int progpayloadcount, -                    struct iobref *iobref, void *priv) +inline int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, +                         int hdrcount, struct iovec *proghdr, int proghdrcount, +                         struct iovec *progpayload, int progpayloadcount, +                         struct iobref *iobref, void *priv)  {          int                   ret   = -1;          rpc_transport_reply_t reply = {{0, }}; -        if ((!conn) || (!hdrvec) || (!hdrvec->iov_base) || (!conn->trans)) { +        if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) {                  goto out;          } @@ -1296,15 +1141,7 @@ rpcsvc_conn_submit (rpcsvc_conn_t *conn, struct iovec *hdrvec,          reply.msg.iobref = iobref;          reply.private = priv; -        /* Now that we have both the RPC and Program buffers in xdr format -         * lets hand it to the transmission layer. -         */ -        if (!rpcsvc_conn_check_active (conn)) { -                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection inactive"); -                goto out; -        } - -        ret = rpc_transport_submit_reply (conn->trans, &reply); +        ret = rpc_transport_submit_reply (trans, &reply);  out:          return ret; @@ -1319,6 +1156,7 @@ rpcsvc_fill_reply (rpcsvc_request_t *req, struct rpc_msg *reply)                  return -1;          prog = rpcsvc_request_program (req); +          rpc_fill_empty_reply (reply, req->xid);          if (req->rpc_status == MSG_DENIED) @@ -1352,17 +1190,12 @@ rpcsvc_record_build_record (rpcsvc_request_t *req, size_t payload,          char                    *record = NULL;          struct iovec            recordhdr = {0, };          size_t                  pagesize = 0; -        rpcsvc_conn_t           *conn = NULL;          rpcsvc_t                *svc = NULL; -        if ((!req) || (!req->conn) || (!recbuf)) +        if ((!req) || (!req->trans) || (!req->svc) || (!recbuf))                  return NULL; -        /* First, try to get a pointer into the buffer which the RPC -         * layer can use. -         */ -        conn = req->conn; -        svc = rpcsvc_conn_rpcsvc (conn); +        svc = req->svc;          replyiob = iobuf_get (svc->ctx->iobuf_pool);          pagesize = iobpool_pagesize ((struct iobuf_pool *)svc->ctx->iobuf_pool);          if (!replyiob) { @@ -1423,17 +1256,17 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,                         int hdrcount, struct iovec *payload, int payloadcount,                         struct iobref *iobref)  { -        int                     ret         = -1, i = 0; -        struct iobuf            *replyiob   = NULL; -        struct iovec            recordhdr   = {0, }; -        rpcsvc_conn_t           *conn       = NULL; -        size_t                   msglen     = 0; -        char                     new_iobref = 0; - -        if ((!req) || (!req->conn)) +        int                     ret        = -1, i = 0; +        struct iobuf           *replyiob   = NULL; +        struct iovec            recordhdr  = {0, }; +        rpc_transport_t        *trans      = NULL; +        size_t                  msglen     = 0; +        char                    new_iobref = 0; + +        if ((!req) || (!req->trans))                  return -1; -        conn = req->conn; +        trans = req->trans;          for (i = 0; i < hdrcount; i++) {                  msglen += proghdr[i].iov_len; @@ -1465,11 +1298,9 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,          iobref_add (iobref, replyiob); -        ret = rpcsvc_conn_submit (conn, &recordhdr, 1, proghdr, hdrcount, -                                  payload, payloadcount, iobref, -                                  req->trans_private); - -        rpcsvc_request_destroy (conn, req); +        ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount, +                                       payload, payloadcount, iobref, +                                       req->trans_private);          if (ret == -1) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to submit message"); @@ -1484,20 +1315,7 @@ disconnect_exit:                  iobref_unref (iobref);          } -        /* Note that a unref is called everytime a reply is sent. This is in -         * response to the ref that is performed on the conn when a request is -         * handed to the RPC program. -         * -         * The catch, however, is that if the reply is an rpc error, we must -         * not unref. This is because the ref only contains -         * references for the actors to which the request was handed plus one -         * reference maintained by the RPC layer. By unrefing for a case where -         * no actor was called, we will be losing the ref held for the RPC -         * layer. -         */ -        if ((rpcsvc_request_accepted (req)) && -            (rpcsvc_request_accepted_success (req))) -                rpcsvc_conn_unref (conn); +        rpcsvc_request_destroy (req);          return ret;  } @@ -1519,18 +1337,17 @@ rpcsvc_error_reply (rpcsvc_request_t *req)  /* Register the program with the local portmapper service. */ -int -rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, rpcsvc_conn_t *conn) +inline int +rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port)  {          int                ret   = 0; -        struct sockaddr_in sa    = {0, }; -        if (!newprog || !conn->trans) { +        if (!newprog) {                  goto out;          }          if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP, -                        sa.sin_port))) { +                        port))) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with"                          " portmap");                  goto out; @@ -1559,10 +1376,11 @@ rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog)  rpcsvc_listener_t * -rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port) +rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans)  { -        rpcsvc_listener_t  *listener = NULL; -        char                found    = 0; +        rpcsvc_listener_t  *listener      = NULL; +        char                found         = 0; +        uint32_t            listener_port = 0;           if (!svc) {                  goto out; @@ -1571,13 +1389,42 @@ rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port)          pthread_mutex_lock (&svc->rpclock);          {                  list_for_each_entry (listener, &svc->listeners, list) { -                        if (((struct sockaddr_in *)&listener->sa)->sin_port -                            == port) { +                        if (trans != NULL) { +                                if (listener->trans == trans) { +                                        found = 1; +                                        break; +                                } + +                                continue; +                        } +                                 +                        switch (listener->trans->myinfo.sockaddr.ss_family) { +                        case AF_INET: +                                listener_port +                                        = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port; +                                break; + +                        case AF_INET6: +                                listener_port +                                        = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port;  +                                break; + +                        default: +                                gf_log (GF_RPCSVC, GF_LOG_DEBUG, +                                        "invalid address family (%d)", +                                        listener->trans->myinfo.sockaddr.ss_family); +                                goto unlock; +                        } + +                        listener_port = ntohs (listener_port); + +                        if (listener_port == port) {                                  found = 1;                                  break;                          }                  }          } +unlock:          pthread_mutex_unlock (&svc->rpclock);          if (!found) { @@ -1600,7 +1447,7 @@ rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr,                         int hdrcount, struct iovec *payload, int payloadcount,                         struct iobref *iobref)  { -        if ((!req) || (!req->conn) || (!proghdr) || (!proghdr->iov_base)) +        if ((!req) || (!req->trans) || (!proghdr) || (!proghdr->iov_base))                  return -1;          return rpcsvc_submit_generic (req, proghdr, hdrcount, payload, @@ -1639,34 +1486,35 @@ err:  } -int -rpcsvc_conn_peername (rpcsvc_conn_t *conn, char *hostname, int hostlen) +inline int +rpcsvc_transport_peername (rpc_transport_t *trans, char *hostname, int hostlen)  { -        if (!conn || !conn->trans) +        if (!trans) {                  return -1; +        } -        return rpc_transport_get_peername (conn->trans, hostname, hostlen); +        return rpc_transport_get_peername (trans, hostname, hostlen);  } -int -rpcsvc_conn_peeraddr (rpcsvc_conn_t *conn, char *addrstr, int addrlen, -                      struct sockaddr *sa, socklen_t sasize) +inline int +rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen, +                           struct sockaddr_storage *sa, socklen_t sasize)  { -        if (!conn || !conn->trans) +        if (!trans) {                  return -1; +        } -        return rpc_transport_get_peeraddr(conn->trans, addrstr, addrlen, sa, +        return rpc_transport_get_peeraddr(trans, addrstr, addrlen, sa,                                            sasize);  } -rpcsvc_conn_t * -rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name) +rpc_transport_t * +rpcsvc_transport_create (rpcsvc_t *svc, dict_t *options, char *name)  { -        int            ret   = -1; +        int                ret   = -1;          rpc_transport_t   *trans = NULL; -        rpcsvc_conn_t *conn  = NULL;          trans = rpc_transport_load (svc->ctx, options, name);          if (!trans) { @@ -1675,18 +1523,16 @@ rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name)                  goto out;          } -        ret = rpc_transport_listen (trans); +        ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc);          if (ret == -1) { -                gf_log (GF_RPCSVC, GF_LOG_DEBUG, -                        "listening on transport failed"); +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed");                  goto out;          } -        conn = rpcsvc_conn_init (svc, trans); -        if (!conn) { -                ret = -1; +        ret = rpc_transport_listen (trans); +        if (ret == -1) {                  gf_log (GF_RPCSVC, GF_LOG_DEBUG, -                        "initializing connection for transport failed"); +                        "listening on transport failed");                  goto out;          } @@ -1694,13 +1540,14 @@ rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name)  out:          if ((ret == -1) && (trans)) {                  rpc_transport_disconnect (trans); +                trans = NULL;          } -        return conn; +        return trans;  }  rpcsvc_listener_t * -rpcsvc_listener_alloc (rpcsvc_t *svc, rpcsvc_conn_t *conn) +rpcsvc_listener_alloc (rpcsvc_t *svc, rpc_transport_t *trans)  {          rpcsvc_listener_t *listener = NULL; @@ -1711,7 +1558,8 @@ rpcsvc_listener_alloc (rpcsvc_t *svc, rpcsvc_conn_t *conn)                  goto out;          } -        listener->conn = conn; +        listener->trans = trans; +        listener->svc = svc;          INIT_LIST_HEAD (&listener->list); @@ -1728,27 +1576,26 @@ out:  rpcsvc_listener_t *  rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name)  { -        rpcsvc_conn_t     *conn     = NULL; +        rpc_transport_t   *trans    = NULL;          rpcsvc_listener_t *listener = NULL;          if (!svc || !options) {                  goto out;          } -        conn = rpcsvc_conn_create (svc, options, name); -        if (!conn) { +        trans = rpcsvc_transport_create (svc, options, name); +        if (!trans) {                  goto out;          } -        listener = rpcsvc_listener_alloc (svc, conn); +        listener = rpcsvc_listener_alloc (svc, trans);          if (listener == NULL) {                  goto out;          } -        conn->listener = listener;  out: -        if (!listener && conn) { -                rpcsvc_conn_deinit (conn); +        if (!listener && trans) { +                rpc_transport_disconnect (trans);          }          return listener; @@ -1809,18 +1656,20 @@ out:  } -  int  rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program)  { -        rpcsvc_program_t        *newprog       = NULL; -        int                      ret           = -1; -        rpcsvc_listener_t       *listener      = NULL; +        rpcsvc_program_t        *newprog          = NULL; +        int                      ret              = -1; +        rpcsvc_listener_t       *listener         = NULL; +        data_t                  *listen_port_data = NULL; +        uint16_t                 listen_port      = 0;          if (!svc)                  return -1; -        newprog = GF_CALLOC (1, sizeof(*newprog), gf_common_mt_rpcsvc_program_t); +        newprog = GF_CALLOC (1, sizeof(*newprog), +                             gf_common_mt_rpcsvc_program_t);          if (!newprog)                  return -1; @@ -1829,9 +1678,76 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program)          memcpy (newprog, &program, sizeof (program)); -        listener = svc->listener; +        listen_port = RPCSVC_DEFAULT_LISTEN_PORT; +        if (program.progport != 0) { +                listen_port = program.progport; +        } else if (program.options != NULL) { +                /* +                 * FIXME: use a method which does not hard-code each transport's +                 * option keys. +                 */ +                listen_port_data = dict_get (program.options, "listen-port"); +                if (listen_port_data != NULL) { +                        listen_port = data_to_uint16 (listen_port_data); +                } else if ((listen_port_data +                            = dict_get (program.options, +                                        "transport.socket.listen-port")) +                           != NULL) { +                        listen_port = data_to_uint16 (listen_port_data); +                } else if ((listen_port_data +                            = dict_get (program.options, +                                        "transport.rdma.listen-port")) +                           != NULL) { +                        listen_port = data_to_uint16 (listen_port_data); +                } +        } + +        listener = rpcsvc_get_listener (svc, listen_port, NULL); +        if ((listener == NULL) || (listener->trans == NULL)) { +                if ((listener != NULL) && (listener->trans == NULL)) { +                        gf_log (GF_RPCSVC, GF_LOG_DEBUG, +                                "empty listener without transport found, " +                                "destroying it"); +                        rpcsvc_listener_destroy (listener); +                } + +                if (program.progport != 0) { +                        ret = dict_set (program.options, +                                        "transport.socket.listen-port", +                                        data_from_uint16 (listen_port)); +                        if (ret == -1) { +                                gf_log (GF_RPCSVC, GF_LOG_DEBUG, +                                        "setting listening port (%d) specified " +                                        "by program (%s) in options dictionary " +                                        "failed", listen_port, +                                        program.progname); +                                goto free_prog; +                        } -        ret = rpcsvc_program_register_portmap (newprog, listener->conn); +                        ret = dict_set (program.options, +                                        "transport.rdma.listen-port", +                                        data_from_uint16 (listen_port)); +                        if (ret == -1) { +                                gf_log (GF_RPCSVC, GF_LOG_DEBUG, +                                        "setting listening port (%d) specified " +                                        "by program (%s) in options dictionary " +                                        "failed", listen_port, +                                        program.progname); +                                goto free_prog; +                        } +                } + +                listener = rpcsvc_create_listener (svc, program.options, +                                                   program.progname); +                if (listener == NULL) { +                        gf_log (GF_RPCSVC, GF_LOG_DEBUG, +                                "creation of listener for program (%s) failed", +                                program.progname); +                        goto free_prog; +                } +        } + +        ret = rpcsvc_program_register_portmap (newprog, program.progport);          if (ret == -1) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap registration of"                          " program failed"); @@ -1860,6 +1776,7 @@ free_prog:          return ret;  } +  static void  free_prog_details (gf_dump_rsp *rsp)  { @@ -1882,10 +1799,10 @@ build_prog_details (rpcsvc_request_t *req, gf_dump_rsp *rsp)          gf_prog_detail   *prog    = NULL;          gf_prog_detail   *prev    = NULL; -        if (!req || !req->conn || !req->conn->svc) +        if (!req || !req->trans || !req->svc)                  goto out; -        list_for_each_entry (program, &req->conn->svc->programs, program) { +        list_for_each_entry (program, &req->svc->programs, program) {                  prog = GF_CALLOC (1, sizeof (*prog), 0);                  if (!prog)                          goto out; @@ -1958,9 +1875,11 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options)  rpcsvc_t *  rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)  { -        rpcsvc_t          *svc      = NULL; -        int                ret      = -1; -        rpcsvc_listener_t *listener = NULL; +        rpcsvc_t          *svc              = NULL; +        int                ret              = -1, poolcount = 0; +        rpcsvc_listener_t *listener         = NULL; +        uint32_t           listen_port      = 0; +        data_t            *listen_port_data = NULL;          if ((!ctx) || (!options))                  return NULL; @@ -1981,6 +1900,16 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)                  goto free_svc;          } +        poolcount   = RPCSVC_POOLCOUNT_MULT * svc->memfactor; + +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount); +        svc->rxpool = mem_pool_new (rpcsvc_request_t, poolcount); +        /* TODO: leak */ +        if (!svc->rxpool) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed"); +                goto free_svc; +        } +          ret = rpcsvc_auth_init (svc, options);          if (ret == -1) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init " @@ -1993,8 +1922,27 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)          svc->ctx = ctx;          gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited."); +        listen_port = RPCSVC_DEFAULT_LISTEN_PORT; + +        /* +         * FIXME: use a method which does not hard-code each transport's +         * option keys. +         */ +        listen_port_data = dict_get (options, "listen-port"); +        if (listen_port_data != NULL) { +                listen_port = data_to_uint16 (listen_port_data); +        } else if ((listen_port_data +                    = dict_get (options, "transport.socket.listen-port")) +                   != NULL) { +                listen_port = data_to_uint16 (listen_port_data); +        } else if ((listen_port_data +                    = dict_get (options, "transport.rdma.listen-port")) +                   != NULL) { +                listen_port = data_to_uint16 (listen_port_data); +        } +          /* One listen port per RPC */ -        listener = rpcsvc_get_listener (svc, 0); +        listener = rpcsvc_get_listener (svc, 0, NULL);          if (!listener) {                  /* FIXME: listener is given the name of first program that                   * creates it. This is not always correct. For eg., multiple @@ -2009,13 +1957,13 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)                  }          } -        if (!listener->conn) { -                gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no connection " +        if (!listener->trans) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no transport "                          "found");                  goto free_svc;          } -        svc->listener = listener; +        gluster_dump_prog.options = options;          ret = rpcsvc_program_register (svc, gluster_dump_prog);          if (ret) { diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 0ce837a281f..25381af7798 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -49,7 +49,7 @@  #define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB))  #define RPCSVC_FRAGHDR_SIZE  4       /* 4-byte RPC fragment header size */ - +#define RPCSVC_DEFAULT_LISTEN_PORT      6996  #define RPCSVC_DEFAULT_MEMFACTOR        15  #define RPCSVC_EVENTPOOL_SIZE_MULT      1024  #define RPCSVC_POOLCOUNT_MULT           35 @@ -120,69 +120,23 @@ struct rpcsvc_notify_wrapper {  };  typedef struct rpcsvc_notify_wrapper rpcsvc_notify_wrapper_t; -#define RPCSVC_CONNSTATE_CONNECTED      1 -#define RPCSVC_CONNSTATE_DISCONNECTED   2 - -#define rpcsvc_conn_check_active(conn) ((conn)->connstate==RPCSVC_CONNSTATE_CONNECTED)  typedef struct rpcsvc_request rpcsvc_request_t; -typedef struct rpc_conn_state rpcsvc_conn_t;  typedef struct { -        rpcsvc_conn_t   *conn; -        struct sockaddr  sa; -        struct list_head list; +        rpc_transport_t         *trans; +        rpcsvc_t                *svc; +        /* FIXME: remove address from this structure. Instead use get_myaddr +         * interface implemented by individual transports. +         */ +        struct sockaddr_storage  sa; +        struct list_head         list;  } rpcsvc_listener_t;  struct rpcsvc_config {          int    max_block_size;  }; -/* Contains the state for each connection that is used for transmitting and - * receiving RPC messages. - * - * Anything that can be accessed by a RPC program must be synced through - * connlock. - */ -struct rpc_conn_state { - -        /* Transport or connection state */ -        rpc_transport_t             *trans; - -        rpcsvc_t                *svc; -        /* RPC Records and Fragments assembly state. -         * All incoming data is staged here before being -         * called a full RPC message. -         */ -        /* rpcsvc_record_state_t   rstate; */ - -         /* It is possible that a client disconnects while -         * the higher layer RPC service is busy in a call. -         * In this case, we cannot just free the conn -         * structure, since the higher layer service could -         * still have a reference to it. -         * The refcount avoids freeing until all references -         * have been given up, although the connection is clos()ed at the first -         * call to unref. -         */ -        int                     connref; -        pthread_mutex_t         connlock; -        int                     connstate; - -        /* Memory pool for rpcsvc_request_t */ -        struct mem_pool         *rxpool; - -        /* The request which hasnt yet been handed to the RPC program because -         * this request is being treated as a vector request and so needs some -         * more data to be got from the network. -         */ -        /* rpcsvc_request_t        *vectoredreq; */ -        rpcsvc_listener_t        *listener; -}; - -#define RPCSVC_CONNSTATE_CONNECTED      1 -#define RPCSVC_CONNSTATE_DISCONNECTED   2 -  #define RPCSVC_MAX_AUTH_BYTES   400  typedef struct rpcsvc_auth_data {          int             flavour; @@ -198,7 +152,9 @@ typedef struct rpcsvc_auth_data {   * */  struct rpcsvc_request {          /* connection over which this request came. */ -        rpcsvc_conn_t         *conn; +        rpc_transport_t       *trans; + +        rpcsvc_t              *svc;          rpcsvc_program_t      *prog; @@ -289,13 +245,10 @@ struct rpcsvc_request {  #define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog))  #define rpcsvc_request_program_private(req) (((rpcsvc_program_t *)((req)->program))->private) -#define rpcsvc_request_conn(req)        (req)->conn  #define rpcsvc_request_accepted(req)    ((req)->rpc_status == MSG_ACCEPTED)  #define rpcsvc_request_accepted_success(req) ((req)->rpc_err == SUCCESS)  #define rpcsvc_request_uid(req)         ((req)->uid)  #define rpcsvc_request_gid(req)         ((req)->gid) -#define rpcsvc_conn_rpcsvc(conn)        ((conn)->svc) -#define rpcsvc_request_service(req)     (rpcsvc_conn_rpcsvc(rpcsvc_request_conn(req)))  #define rpcsvc_request_prog_minauth(req) (rpcsvc_request_program(req)->min_auth)  #define rpcsvc_request_cred_flavour(req) (rpcsvc_auth_flavour(req->cred))  #define rpcsvc_request_verf_flavour(req) (rpcsvc_auth_flavour(req->verf)) @@ -443,8 +396,7 @@ extern rpcsvc_listener_t *  rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name);  extern int -rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, -                                 rpcsvc_conn_t *conn); +rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port);  /* Inits the global RPC service data structures.   * Called in main. @@ -480,17 +432,19 @@ rpcsvc_error_reply (rpcsvc_request_t *req);  #define RPCSVC_AUTH_DONTCARE    3  extern int -rpcsvc_conn_peername (rpcsvc_conn_t *conn, char *hostname, int hostlen); +rpcsvc_transport_peername (rpc_transport_t *trans, char *hostname, int hostlen); -extern int -rpcsvc_conn_peeraddr (rpcsvc_conn_t *conn, char *addrstr, int addrlen, -                      struct sockaddr *returnsa, socklen_t sasize); +extern inline int +rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen, +                           struct sockaddr_storage *returnsa, socklen_t sasize);  extern int -rpcsvc_conn_peer_check (dict_t *options, char *volname, rpcsvc_conn_t *conn); +rpcsvc_transport_peer_check (dict_t *options, char *volname, +                             rpc_transport_t *trans);  extern int -rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn); +rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname, +                                 rpc_transport_t *trans);  #define rpcsvc_request_seterr(req, err)                 (req)->rpc_err = err  #define rpcsvc_request_set_autherr(req, err)            (req)->auth_err = err @@ -501,7 +455,7 @@ extern int rpcsvc_request_attach_vector (rpcsvc_request_t *req,                                           struct iobref *ioref, int finalvector); -typedef int (*auth_init_conn) (rpcsvc_conn_t *conn, void *priv); +typedef int (*auth_init_trans) (rpc_transport_t *trans, void *priv);  typedef int (*auth_init_request) (rpcsvc_request_t *req, void *priv);  typedef int (*auth_request_authenticate) (rpcsvc_request_t *req, void *priv); @@ -510,7 +464,7 @@ typedef int (*auth_request_authenticate) (rpcsvc_request_t *req, void *priv);   * each connection will end up using a different authentication scheme.   */  typedef struct rpcsvc_auth_ops { -        auth_init_conn                conn_init; +        auth_init_trans               transport_init;          auth_init_request             request_init;          auth_request_authenticate     authenticate;  } rpcsvc_auth_ops_t; @@ -546,7 +500,7 @@ extern int  rpcsvc_auth_init (rpcsvc_t *svc, dict_t *options);  extern int -rpcsvc_auth_conn_init (rpcsvc_conn_t *xprt); +rpcsvc_auth_transport_init (rpc_transport_t *xprt);  extern int  rpcsvc_authenticate (rpcsvc_request_t *req);  | 
