diff options
author | Raghavendra G <raghavendra@gluster.com> | 2010-08-17 05:35:42 +0000 |
---|---|---|
committer | Anand V. Avati <avati@dev.gluster.com> | 2010-08-18 00:37:03 -0700 |
commit | 4e01a54eaa6da1bd6817d62dcc51a75e22699e2b (patch) | |
tree | 7832f4070729ef43f0ee1560bf3c737b2e123d49 /rpc | |
parent | d8a8a66523e06abc0f44e1cdfe528cbf28d881a9 (diff) |
rpc - cleanup and changes related to rdma
- remove rpc_conn_state structure.
- add a member to point struct rpc_req in rpc_transport_req structure.
This is needed for rdma to store rdma specific per request data.
Signed-off-by: Raghavendra G <raghavendra@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/rpc-lib/src/auth-glusterfs.c | 2 | ||||
-rw-r--r-- | rpc/rpc-lib/src/auth-null.c | 2 | ||||
-rw-r--r-- | rpc/rpc-lib/src/auth-unix.c | 2 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 5 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 4 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 18 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc-auth.c | 2 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc-common.h | 4 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 768 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 92 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 9 |
11 files changed, 409 insertions, 499 deletions
diff --git a/rpc/rpc-lib/src/auth-glusterfs.c b/rpc/rpc-lib/src/auth-glusterfs.c index 4748a318e74..459cad87791 100644 --- a/rpc/rpc-lib/src/auth-glusterfs.c +++ b/rpc/rpc-lib/src/auth-glusterfs.c @@ -180,7 +180,7 @@ err: } rpcsvc_auth_ops_t auth_glusterfs_ops = { - .conn_init = NULL, + .transport_init = NULL, .request_init = auth_glusterfs_request_init, .authenticate = auth_glusterfs_authenticate }; diff --git a/rpc/rpc-lib/src/auth-null.c b/rpc/rpc-lib/src/auth-null.c index bfdabaa840c..20dd7e77c8b 100644 --- a/rpc/rpc-lib/src/auth-null.c +++ b/rpc/rpc-lib/src/auth-null.c @@ -50,7 +50,7 @@ int auth_null_authenticate (rpcsvc_request_t *req, void *priv) } rpcsvc_auth_ops_t auth_null_ops = { - .conn_init = NULL, + .transport_init = NULL, .request_init = auth_null_request_init, .authenticate = auth_null_authenticate }; diff --git a/rpc/rpc-lib/src/auth-unix.c b/rpc/rpc-lib/src/auth-unix.c index 4e99c1a5b6f..30b395bd4fa 100644 --- a/rpc/rpc-lib/src/auth-unix.c +++ b/rpc/rpc-lib/src/auth-unix.c @@ -70,7 +70,7 @@ err: } rpcsvc_auth_ops_t auth_unix_ops = { - .conn_init = NULL, + .transport_init = NULL, .request_init = auth_unix_request_init, .authenticate = auth_unix_authenticate }; diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index da25d33c1ed..bac8b0246a8 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -418,7 +418,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) if (ret == -1) { gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the saved " - "frame corresponding to xid (%d)", info->xid); + "frame corresponding to xid (%d) for msg arrived on " + "transport %s", + info->xid, clnt->conn.trans->name); goto out; } @@ -1268,6 +1270,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, req.rsp.rsp_payload = rsp_payload; req.rsp.rsp_payload_count = rsp_payload_count; req.rsp.rsp_iobref = rsp_iobref; + req.rpc_req = rpcreq; ret = rpc_transport_submit_request (rpc->conn.trans, &req); diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index 200ff383052..e3bc519fc97 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -587,7 +587,7 @@ validate_volume_options (char *name, dict_t *options, volume_option_t *opt) int32_t rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen, - struct sockaddr *sa, size_t salen) + struct sockaddr_storage *sa, size_t salen) { if (!this) return -1; @@ -614,7 +614,7 @@ rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen) int32_t rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, - struct sockaddr *sa, size_t salen) + struct sockaddr_storage *sa, size_t salen) { if (!this) return -1; diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 7ef3abb7320..a6a3441dcdd 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -125,8 +125,9 @@ struct rpc_transport_rsp { typedef struct rpc_transport_rsp rpc_transport_rsp_t; struct rpc_transport_req { - rpc_transport_msg_t msg; - rpc_transport_rsp_t rsp; + rpc_transport_msg_t msg; + rpc_transport_rsp_t rsp; + struct rpc_req *rpc_req; }; typedef struct rpc_transport_req rpc_transport_req_t; @@ -168,6 +169,11 @@ typedef int (*rpc_transport_notify_t) (rpc_transport_t *, void *mydata, rpc_transport_event_t, void *data, ...); struct rpc_transport { struct rpc_transport_ops *ops; + rpc_transport_t *listener; /* listener transport to which + * request for creation of this + * transport came from. valid only + * on server process. + */ void *private; void *xl_private; void *xl; /* Used for THIS */ @@ -202,12 +208,12 @@ struct rpc_transport_ops { int32_t (*get_peername) (rpc_transport_t *this, char *hostname, int hostlen); int32_t (*get_peeraddr) (rpc_transport_t *this, char *peeraddr, - int addrlen, struct sockaddr *sa, + int addrlen, struct sockaddr_storage *sa, socklen_t sasize); int32_t (*get_myname) (rpc_transport_t *this, char *hostname, int hostlen); int32_t (*get_myaddr) (rpc_transport_t *this, char *peeraddr, - int addrlen, struct sockaddr *sa, + int addrlen, struct sockaddr_storage *sa, socklen_t sasize); }; @@ -253,14 +259,14 @@ rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen); int32_t rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, - struct sockaddr *sa, size_t salen); + struct sockaddr_storage *sa, size_t salen); int32_t rpc_transport_get_myname (rpc_transport_t *this, char *hostname, int hostlen); int32_t rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen, - struct sockaddr *sa, size_t salen); + struct sockaddr_storage *sa, size_t salen); rpc_transport_pollin_t * rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, diff --git a/rpc/rpc-lib/src/rpcsvc-auth.c b/rpc/rpc-lib/src/rpcsvc-auth.c index d14b91f3a01..5cfa255ba95 100644 --- a/rpc/rpc-lib/src/rpcsvc-auth.c +++ b/rpc/rpc-lib/src/rpcsvc-auth.c @@ -210,7 +210,7 @@ __rpcsvc_auth_get_handler (rpcsvc_request_t *req) if (!req) return NULL; - svc = rpcsvc_request_service (req); + svc = req->svc; if (!svc) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "!svc"); goto err; diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h index aef791cfa0d..7e72bc3ae44 100644 --- a/rpc/rpc-lib/src/rpcsvc-common.h +++ b/rpc/rpc-lib/src/rpcsvc-common.h @@ -62,8 +62,6 @@ typedef struct rpcsvc_state { glusterfs_ctx_t *ctx; - void *listener; - /* list of connections which will listen for incoming connections */ struct list_head listeners; @@ -76,7 +74,7 @@ typedef struct rpcsvc_state { void *mydata; /* This is xlator */ rpcsvc_notify_t notifyfn; - + struct mem_pool *rxpool; } rpcsvc_t; diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 57e9b12aaa6..82a19bbd19d 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -48,16 +48,17 @@ struct rpcsvc_program gluster_dump_prog; - -#define rpcsvc_alloc_request(con, request) \ +#define rpcsvc_alloc_request(svc, request) \ do { \ - request = (rpcsvc_request_t *) mem_get ((con)->rxpool); \ + request = (rpcsvc_request_t *) mem_get ((svc)->rxpool); \ memset (request, 0, sizeof (rpcsvc_request_t)); \ } while (0) +rpcsvc_listener_t * +rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans); int -rpcsvc_conn_peer_check_search (dict_t *options, char *pattern, char *clstr) +rpcsvc_transport_peer_check_search (dict_t *options, char *pattern, char *clstr) { int ret = -1; char *addrtok = NULL; @@ -103,7 +104,7 @@ err: int -rpcsvc_conn_peer_check_allow (dict_t *options, char *volname, char *clstr) +rpcsvc_transport_peer_check_allow (dict_t *options, char *volname, char *clstr) { int ret = RPCSVC_AUTH_DONTCARE; char *srchstr = NULL; @@ -126,7 +127,7 @@ rpcsvc_conn_peer_check_allow (dict_t *options, char *volname, char *clstr) } else srchstr = globalrule; - ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr); + ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr); if (volname) GF_FREE (srchstr); @@ -139,7 +140,7 @@ out: } int -rpcsvc_conn_peer_check_reject (dict_t *options, char *volname, char *clstr) +rpcsvc_transport_peer_check_reject (dict_t *options, char *volname, char *clstr) { int ret = RPCSVC_AUTH_DONTCARE; char *srchstr = NULL; @@ -158,7 +159,7 @@ rpcsvc_conn_peer_check_reject (dict_t *options, char *volname, char *clstr) } else srchstr = generalrule; - ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr); + ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr); if (volname) GF_FREE (srchstr); @@ -301,18 +302,18 @@ rpcsvc_combine_gen_spec_volume_checks (int gen, int spec) int -rpcsvc_conn_peer_check_name (dict_t *options, char *volname, - rpcsvc_conn_t *conn) +rpcsvc_transport_peer_check_name (dict_t *options, char *volname, + rpc_transport_t *trans) { int ret = RPCSVC_AUTH_REJECT; int aret = RPCSVC_AUTH_REJECT; int rjret = RPCSVC_AUTH_REJECT; char clstr[RPCSVC_PEER_STRLEN]; - if (!conn) + if (!trans) return ret; - ret = rpcsvc_conn_peername (conn, clstr, RPCSVC_PEER_STRLEN); + ret = rpc_transport_get_peername (trans, clstr, RPCSVC_PEER_STRLEN); if (ret != 0) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: " "%s", gai_strerror (ret)); @@ -320,8 +321,8 @@ rpcsvc_conn_peer_check_name (dict_t *options, char *volname, goto err; } - aret = rpcsvc_conn_peer_check_allow (options, volname, clstr); - rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr); + aret = rpcsvc_transport_peer_check_allow (options, volname, clstr); + rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr); ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret); @@ -331,17 +332,19 @@ err: int -rpcsvc_conn_peer_check_addr (dict_t *options, char *volname,rpcsvc_conn_t *conn) +rpcsvc_transport_peer_check_addr (dict_t *options, char *volname, + rpc_transport_t *trans) { int ret = RPCSVC_AUTH_REJECT; int aret = RPCSVC_AUTH_DONTCARE; int rjret = RPCSVC_AUTH_REJECT; char clstr[RPCSVC_PEER_STRLEN]; - if (!conn) + if (!trans) return ret; - ret = rpcsvc_conn_peeraddr (conn, clstr, RPCSVC_PEER_STRLEN, NULL, 0); + ret = rpcsvc_transport_peeraddr (trans, clstr, RPCSVC_PEER_STRLEN, NULL, + 0); if (ret != 0) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: " "%s", gai_strerror (ret)); @@ -349,8 +352,8 @@ rpcsvc_conn_peer_check_addr (dict_t *options, char *volname,rpcsvc_conn_t *conn) goto err; } - aret = rpcsvc_conn_peer_check_allow (options, volname, clstr); - rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr); + aret = rpcsvc_transport_peer_check_allow (options, volname, clstr); + rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr); ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret); err: @@ -359,8 +362,8 @@ err: int -rpcsvc_conn_check_volume_specific (dict_t *options, char *volname, - rpcsvc_conn_t *conn) +rpcsvc_transport_check_volume_specific (dict_t *options, char *volname, + rpc_transport_t *trans) { int namechk = RPCSVC_AUTH_REJECT; int addrchk = RPCSVC_AUTH_REJECT; @@ -368,7 +371,7 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname, char *namestr = NULL; int ret = 0; - if ((!options) || (!volname) || (!conn)) + if ((!options) || (!volname) || (!trans)) return RPCSVC_AUTH_REJECT; /* Enabled by default */ @@ -389,8 +392,9 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname, * specific which will over-ride the network address rules. */ if (namelookup) - namechk = rpcsvc_conn_peer_check_name (options, volname, conn); - addrchk = rpcsvc_conn_peer_check_addr (options, volname, conn); + namechk = rpcsvc_transport_peer_check_name (options, volname, + trans); + addrchk = rpcsvc_transport_peer_check_addr (options, volname, trans); if (namelookup) ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk); @@ -402,7 +406,7 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname, int -rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn) +rpcsvc_transport_check_volume_general (dict_t *options, rpc_transport_t *trans) { int addrchk = RPCSVC_AUTH_REJECT; int namechk = RPCSVC_AUTH_REJECT; @@ -410,7 +414,7 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn) char *namestr = NULL; int ret = 0; - if ((!options) || (!conn)) + if ((!options) || (!trans)) return RPCSVC_AUTH_REJECT; /* Enabled by default */ @@ -431,8 +435,10 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn) * specific which will over-ride the network address rules. */ if (namelookup) - namechk = rpcsvc_conn_peer_check_name (options, NULL, conn); - addrchk = rpcsvc_conn_peer_check_addr (options, NULL, conn); + namechk = rpcsvc_transport_peer_check_name (options, NULL, + trans); + + addrchk = rpcsvc_transport_peer_check_addr (options, NULL, trans); if (namelookup) ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk); @@ -443,17 +449,18 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn) } int -rpcsvc_conn_peer_check (dict_t *options, char *volname, rpcsvc_conn_t *conn) +rpcsvc_transport_peer_check (dict_t *options, char *volname, + rpc_transport_t *trans) { int general_chk = RPCSVC_AUTH_REJECT; int specific_chk = RPCSVC_AUTH_REJECT; - if ((!options) || (!volname) || (!conn)) + if ((!options) || (!volname) || (!trans)) return RPCSVC_AUTH_REJECT; - general_chk = rpcsvc_conn_check_volume_general (options, conn); - specific_chk = rpcsvc_conn_check_volume_specific (options, volname, - conn); + general_chk = rpcsvc_transport_check_volume_general (options, trans); + specific_chk = rpcsvc_transport_check_volume_specific (options, volname, + trans); return rpcsvc_combine_gen_spec_volume_checks (general_chk,specific_chk); } @@ -494,68 +501,10 @@ out: } - -/* Initialize the core of a connection */ -rpcsvc_conn_t * -rpcsvc_conn_alloc (rpcsvc_t *svc, rpc_transport_t *trans) -{ - rpcsvc_conn_t *conn = NULL; - int ret = -1; - unsigned int poolcount = 0; - - conn = GF_CALLOC (1, sizeof(*conn), gf_common_mt_rpcsvc_conn_t); - if (!conn) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed"); - return NULL; - } - - conn->trans = trans; - conn->svc = svc; - poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor; - - gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount); - conn->rxpool = mem_pool_new (rpcsvc_request_t, poolcount); - /* TODO: leak */ - if (!conn->rxpool) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed"); - goto free_conn; - } - - /* Cannot consider a connection connected unless the user of this - * connection decides it is ready to use. It is possible that we have - * to free this connection soon after. That free will not happpen - * unless the state is disconnected. - */ - conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED; - pthread_mutex_init (&conn->connlock, NULL); - conn->connref = 0; - - ret = 0; - -free_conn: - if (ret == -1) { - GF_FREE (conn); - conn = NULL; - } - - return conn; -} - int rpcsvc_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...); -void -rpcsvc_conn_state_init (rpcsvc_conn_t *conn) -{ - if (!conn) - return; - - ++conn->connref; - conn->connstate = RPCSVC_CONNSTATE_CONNECTED; -} - - rpcsvc_notify_wrapper_t * rpcsvc_notify_wrapper_alloc (void) { @@ -582,11 +531,7 @@ rpcsvc_listener_destroy (rpcsvc_listener_t *listener) goto out; } - if (!listener->conn) { - goto listener_free; - } - - svc = listener->conn->svc; + svc = listener->svc; if (!svc) { goto listener_free; } @@ -604,162 +549,11 @@ out: } -void -rpcsvc_conn_destroy (rpcsvc_conn_t *conn) -{ - rpcsvc_listener_t *listener = NULL; - - if (!conn || !conn->rxpool || !conn->listener) - goto out; - - if (conn->trans) - rpc_transport_destroy (conn->trans); - - mem_pool_destroy (conn->rxpool); - - listener = conn->listener; - if (listener->conn == conn) { - rpcsvc_listener_destroy (listener); - } - - /* Need to destory record state, txlists etc. */ - GF_FREE (conn); -out: - gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection destroyed"); -} - - -rpcsvc_conn_t * -rpcsvc_conn_init (rpcsvc_t *svc, rpc_transport_t *trans) -{ - int ret = -1; - rpcsvc_conn_t *conn = NULL; - - conn = rpcsvc_conn_alloc (svc, trans); - if (!conn) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, "cannot init a connection"); - goto out; - } - - ret = rpc_transport_register_notify (trans, rpcsvc_notify, conn); - if (ret == -1) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed"); - rpcsvc_conn_destroy (conn); - conn = NULL; - goto out; - } - - rpcsvc_conn_state_init (conn); - -out: - return conn; -} - - -int -__rpcsvc_conn_unref (rpcsvc_conn_t *conn) -{ - --conn->connref; - return conn->connref; -} - - -void -__rpcsvc_conn_deinit (rpcsvc_conn_t *conn) -{ - if (!conn) - return; - - if (rpcsvc_conn_check_active (conn)) { - conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED; - } - - if (conn->trans) { - rpc_transport_disconnect (conn->trans); - conn->trans = NULL; - } -} - - -void -rpcsvc_conn_deinit (rpcsvc_conn_t *conn) -{ - int ref = 0; - - if (!conn) - return; - - pthread_mutex_lock (&conn->connlock); - { - __rpcsvc_conn_deinit (conn); - ref = __rpcsvc_conn_unref (conn); - } - pthread_mutex_unlock (&conn->connlock); - - if (ref == 0) - rpcsvc_conn_destroy (conn); - - return; -} - - -void -rpcsvc_conn_unref (rpcsvc_conn_t *conn) -{ - int ref = 0; - if (!conn) - return; - - pthread_mutex_lock (&conn->connlock); - { - ref = __rpcsvc_conn_unref (conn); - } - pthread_mutex_unlock (&conn->connlock); - - if (ref == 0) { - rpcsvc_conn_destroy (conn); - } -} - - int -rpcsvc_conn_active (rpcsvc_conn_t *conn) +rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, + rpc_transport_t *trans) { - int status = 0; - - if (!conn) - return 0; - - pthread_mutex_lock (&conn->connlock); - { - status = rpcsvc_conn_check_active (conn); - } - pthread_mutex_unlock (&conn->connlock); - - return status; -} - - -void -rpcsvc_conn_ref (rpcsvc_conn_t *conn) -{ - if (!conn) - return; - - pthread_mutex_lock (&conn->connlock); - { - ++conn->connref; - } - pthread_mutex_unlock (&conn->connlock); - - return; -} - - -int -rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn) -{ - struct sockaddr_in sa; + struct sockaddr_storage sa = {0, }; int ret = RPCSVC_AUTH_REJECT; socklen_t sasize = sizeof (sa); char *srchstr = NULL; @@ -769,11 +563,11 @@ rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn) uint16_t port = 0; gf_boolean_t insecure = _gf_false; - if ((!svc) || (!volname) || (!conn)) + if ((!svc) || (!volname) || (!trans)) return ret; - ret = rpcsvc_conn_peeraddr (conn, NULL, 0, (struct sockaddr *)&sa, - sasize); + ret = rpcsvc_transport_peeraddr (trans, NULL, 0, &sa, + sasize); if (ret != 0) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get peer addr: %s", gai_strerror (ret)); @@ -781,7 +575,12 @@ rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn) goto err; } - port = ntohs (sa.sin_port); + if (sa.ss_family == AF_INET) { + port = ((struct sockaddr_in *)&sa)->sin_port; + } else { + port = ((struct sockaddr_in6 *)&sa)->sin6_port; + } + gf_log (GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port); /* If the port is already a privileged one, dont bother with checking * options. @@ -851,7 +650,7 @@ err: * of the pointers below are NULL. */ rpcsvc_actor_t * -rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req) +rpcsvc_program_actor (rpcsvc_request_t *req) { rpcsvc_program_t *program = NULL; int err = SYSTEM_ERR; @@ -859,10 +658,10 @@ rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req) rpcsvc_t *svc = NULL; char found = 0; - if ((!conn) || (!req)) + if (!req) goto err; - svc = conn->svc; + svc = req->svc; pthread_mutex_lock (&svc->rpclock); { list_for_each_entry (program, &svc->programs, program) { @@ -938,9 +737,9 @@ rpcsvc_program_notify (rpcsvc_listener_t *listener, rpcsvc_event_t event, goto out; } - list_for_each_entry (wrapper, &listener->list, list) { + list_for_each_entry (wrapper, &listener->svc->notify, list) { if (wrapper->notify) { - wrapper->notify (listener->conn->svc, + wrapper->notify (listener->svc, wrapper->data, event, data); } @@ -951,37 +750,29 @@ out: } -int -rpcsvc_accept (rpcsvc_conn_t *listen_conn, rpc_transport_t *new_trans) +inline int +rpcsvc_accept (rpcsvc_t *svc, rpc_transport_t *listen_trans, + rpc_transport_t *new_trans) { rpcsvc_listener_t *listener = NULL; - rpcsvc_conn_t *conn = NULL; - char clstr[RPCSVC_PEER_STRLEN]; - - listener = listen_conn->listener; - conn = rpcsvc_conn_init (listen_conn->svc, new_trans); - if (!conn) { - rpc_transport_disconnect (new_trans); - memset (clstr, 0, RPCSVC_PEER_STRLEN); - rpc_transport_get_peername (new_trans, clstr, - RPCSVC_PEER_STRLEN); - gf_log (GF_RPCSVC, GF_LOG_DEBUG, "allocating connection for " - "new transport (%s) failed", clstr); + int32_t ret = -1; + + listener = rpcsvc_get_listener (svc, -1, listen_trans); + if (listener == NULL) { goto out; } - conn->listener = listener; - - //rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, conn); + rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, new_trans); + ret = 0; out: - return 0; + return ret; } void -rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req) +rpcsvc_request_destroy (rpcsvc_request_t *req) { - if (!conn || !req) { + if (!req) { goto out; } @@ -989,18 +780,21 @@ rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req) iobref_unref (req->iobref); } - mem_put (conn->rxpool, req); + mem_put (req->svc->rxpool, req); + + rpc_transport_unref (req->trans); out: return; } rpcsvc_request_t * -rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg, +rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans, + struct rpc_msg *callmsg, struct iovec progmsg, rpc_transport_pollin_t *msg, rpcsvc_request_t *req) { - if ((!conn) || (!callmsg)|| (!req) || (!msg)) + if ((!trans) || (!callmsg)|| (!req) || (!msg)) return NULL; /* We start a RPC request as always denied. */ @@ -1009,7 +803,7 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg, req->prognum = rpc_call_program (callmsg); req->progver = rpc_call_progver (callmsg); req->procnum = rpc_call_progproc (callmsg); - req->conn = conn; + req->trans = rpc_transport_ref (trans); req->count = msg->count; req->msg[0] = progmsg; req->iobref = iobref_ref (msg->iobref); @@ -1017,6 +811,7 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg, req->msg[1] = msg->vector[1]; } + req->svc = svc; req->trans_private = msg->private; INIT_LIST_HEAD (&req->txlist); @@ -1038,7 +833,8 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg, rpcsvc_request_t * -rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) +rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, + rpc_transport_pollin_t *msg) { char *msgbuf = NULL; struct rpc_msg rpcmsg; @@ -1047,7 +843,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) size_t msglen = 0; int ret = -1; - if (!conn) + if (!svc || !trans) return NULL; /* We need to allocate the request before actually calling @@ -1056,7 +852,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) * This avoids a need to keep a temp buffer into which the auth data * would've been copied otherwise. */ - rpcsvc_alloc_request (conn, req); + rpcsvc_alloc_request (svc, req); if (!req) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to alloc request"); goto err; @@ -1075,7 +871,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) } ret = -1; - rpcsvc_request_init (conn, &rpcmsg, progmsg, msg, req); + rpcsvc_request_init (svc, trans, &rpcmsg, progmsg, msg, req); gf_log (GF_RPCSVC, GF_LOG_TRACE, "RPC XID: %lx, Ver: %ld, Program: %ld," " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg), @@ -1120,30 +916,33 @@ err: int -rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) +rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, + rpc_transport_pollin_t *msg) { rpcsvc_actor_t *actor = NULL; rpcsvc_request_t *req = NULL; int ret = -1; - if (!conn) + if (!trans || !svc) return -1; - req = rpcsvc_request_create (conn, msg); + req = rpcsvc_request_create (svc, trans, msg); if (!req) goto err; if (!rpcsvc_request_accepted (req)) goto err_reply; - actor = rpcsvc_program_actor (conn, req); + actor = rpcsvc_program_actor (req); if (!actor) goto err_reply; if (actor && (req->rpc_err == SUCCESS)) { + /* Before going to xlator code, set the THIS properly */ + THIS = svc->mydata; + if (req->count == 2) { if (actor->vector_actor) { - rpcsvc_conn_ref (conn); ret = actor->vector_actor (req, &req->msg[1], 1, req->iobref); } else { @@ -1153,16 +952,14 @@ rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) ret = RPCSVC_ACTOR_ERROR; } } else if (actor->actor) { - rpcsvc_conn_ref (req->conn); - /* Before going to xlator code, set the THIS properly */ - THIS = conn->svc->mydata; ret = actor->actor (req); } } err_reply: - if ((ret == RPCSVC_ACTOR_ERROR) || (req->rpc_err != SUCCESS)) + if ((ret == RPCSVC_ACTOR_ERROR) || (req->rpc_err != SUCCESS)) { ret = rpcsvc_error_reply (req); + } if (ret) gf_log ("rpcsvc", GF_LOG_DEBUG, "failed to queue error reply"); @@ -1170,39 +967,87 @@ err_reply: /* No need to propagate error beyond this function since the reply * has now been queued. */ ret = 0; + err: return ret; } int +rpcsvc_handle_disconnect (rpcsvc_t *svc, rpc_transport_t *trans) +{ + rpcsvc_event_t event; + rpcsvc_notify_wrapper_t *wrappers = NULL, *wrapper; + int32_t ret = -1, i = 0, wrapper_count = 0; + rpcsvc_listener_t *listener = NULL; + + event = (trans->listener == NULL) ? RPCSVC_EVENT_LISTENER_DEAD + : RPCSVC_EVENT_DISCONNECT; + + pthread_mutex_lock (&svc->rpclock); + { + wrappers = GF_CALLOC (svc->notify_count, sizeof (*wrapper), + gf_common_mt_rpcsvc_wrapper_t); + if (!wrappers) { + goto unlock; + } + + list_for_each_entry (wrapper, &svc->notify, list) { + if (wrapper->notify) { + wrappers[i++] = *wrapper; + } + } + + wrapper_count = i; + } +unlock: + pthread_mutex_unlock (&svc->rpclock); + + if (wrappers) { + for (i = 0; i < wrapper_count; i++) { + wrappers[i].notify (svc, wrappers[i].data, + event, trans); + } + + GF_FREE (wrappers); + } + + if (event == RPCSVC_EVENT_LISTENER_DEAD) { + listener = rpcsvc_get_listener (svc, -1, trans->listener); + rpcsvc_listener_destroy (listener); + } + + return ret; +} + + +int rpcsvc_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...) { - rpcsvc_conn_t *conn = NULL; int ret = -1; rpc_transport_pollin_t *msg = NULL; rpc_transport_t *new_trans = NULL; + rpcsvc_t *svc = NULL; - conn = mydata; - if (conn == NULL) { + svc = mydata; + if (svc == NULL) { goto out; } switch (event) { case RPC_TRANSPORT_ACCEPT: new_trans = data; - ret = rpcsvc_accept (conn, new_trans); + ret = rpcsvc_accept (svc, trans, new_trans); break; case RPC_TRANSPORT_DISCONNECT: - rpcsvc_conn_deinit (conn); - ret = 0; + ret = rpcsvc_handle_disconnect (svc, trans); break; case RPC_TRANSPORT_MSG_RECEIVED: msg = data; - ret = rpcsvc_handle_rpc_call (conn, msg); + ret = rpcsvc_handle_rpc_call (svc, trans, msg); break; case RPC_TRANSPORT_MSG_SENT: @@ -1274,16 +1119,16 @@ err: } -int -rpcsvc_conn_submit (rpcsvc_conn_t *conn, struct iovec *hdrvec, - int hdrcount, struct iovec *proghdr, int proghdrcount, - struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *priv) +inline int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, + int hdrcount, struct iovec *proghdr, int proghdrcount, + struct iovec *progpayload, int progpayloadcount, + struct iobref *iobref, void *priv) { int ret = -1; rpc_transport_reply_t reply = {{0, }}; - if ((!conn) || (!hdrvec) || (!hdrvec->iov_base) || (!conn->trans)) { + if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) { goto out; } @@ -1296,15 +1141,7 @@ rpcsvc_conn_submit (rpcsvc_conn_t *conn, struct iovec *hdrvec, reply.msg.iobref = iobref; reply.private = priv; - /* Now that we have both the RPC and Program buffers in xdr format - * lets hand it to the transmission layer. - */ - if (!rpcsvc_conn_check_active (conn)) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection inactive"); - goto out; - } - - ret = rpc_transport_submit_reply (conn->trans, &reply); + ret = rpc_transport_submit_reply (trans, &reply); out: return ret; @@ -1319,6 +1156,7 @@ rpcsvc_fill_reply (rpcsvc_request_t *req, struct rpc_msg *reply) return -1; prog = rpcsvc_request_program (req); + rpc_fill_empty_reply (reply, req->xid); if (req->rpc_status == MSG_DENIED) @@ -1352,17 +1190,12 @@ rpcsvc_record_build_record (rpcsvc_request_t *req, size_t payload, char *record = NULL; struct iovec recordhdr = {0, }; size_t pagesize = 0; - rpcsvc_conn_t *conn = NULL; rpcsvc_t *svc = NULL; - if ((!req) || (!req->conn) || (!recbuf)) + if ((!req) || (!req->trans) || (!req->svc) || (!recbuf)) return NULL; - /* First, try to get a pointer into the buffer which the RPC - * layer can use. - */ - conn = req->conn; - svc = rpcsvc_conn_rpcsvc (conn); + svc = req->svc; replyiob = iobuf_get (svc->ctx->iobuf_pool); pagesize = iobpool_pagesize ((struct iobuf_pool *)svc->ctx->iobuf_pool); if (!replyiob) { @@ -1423,17 +1256,17 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, int hdrcount, struct iovec *payload, int payloadcount, struct iobref *iobref) { - int ret = -1, i = 0; - struct iobuf *replyiob = NULL; - struct iovec recordhdr = {0, }; - rpcsvc_conn_t *conn = NULL; - size_t msglen = 0; - char new_iobref = 0; - - if ((!req) || (!req->conn)) + int ret = -1, i = 0; + struct iobuf *replyiob = NULL; + struct iovec recordhdr = {0, }; + rpc_transport_t *trans = NULL; + size_t msglen = 0; + char new_iobref = 0; + + if ((!req) || (!req->trans)) return -1; - conn = req->conn; + trans = req->trans; for (i = 0; i < hdrcount; i++) { msglen += proghdr[i].iov_len; @@ -1465,11 +1298,9 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, iobref_add (iobref, replyiob); - ret = rpcsvc_conn_submit (conn, &recordhdr, 1, proghdr, hdrcount, - payload, payloadcount, iobref, - req->trans_private); - - rpcsvc_request_destroy (conn, req); + ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount, + payload, payloadcount, iobref, + req->trans_private); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to submit message"); @@ -1484,20 +1315,7 @@ disconnect_exit: iobref_unref (iobref); } - /* Note that a unref is called everytime a reply is sent. This is in - * response to the ref that is performed on the conn when a request is - * handed to the RPC program. - * - * The catch, however, is that if the reply is an rpc error, we must - * not unref. This is because the ref only contains - * references for the actors to which the request was handed plus one - * reference maintained by the RPC layer. By unrefing for a case where - * no actor was called, we will be losing the ref held for the RPC - * layer. - */ - if ((rpcsvc_request_accepted (req)) && - (rpcsvc_request_accepted_success (req))) - rpcsvc_conn_unref (conn); + rpcsvc_request_destroy (req); return ret; } @@ -1519,18 +1337,17 @@ rpcsvc_error_reply (rpcsvc_request_t *req) /* Register the program with the local portmapper service. */ -int -rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, rpcsvc_conn_t *conn) +inline int +rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port) { int ret = 0; - struct sockaddr_in sa = {0, }; - if (!newprog || !conn->trans) { + if (!newprog) { goto out; } if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP, - sa.sin_port))) { + port))) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with" " portmap"); goto out; @@ -1559,10 +1376,11 @@ rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog) rpcsvc_listener_t * -rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port) +rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans) { - rpcsvc_listener_t *listener = NULL; - char found = 0; + rpcsvc_listener_t *listener = NULL; + char found = 0; + uint32_t listener_port = 0; if (!svc) { goto out; @@ -1571,13 +1389,42 @@ rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port) pthread_mutex_lock (&svc->rpclock); { list_for_each_entry (listener, &svc->listeners, list) { - if (((struct sockaddr_in *)&listener->sa)->sin_port - == port) { + if (trans != NULL) { + if (listener->trans == trans) { + found = 1; + break; + } + + continue; + } + + switch (listener->trans->myinfo.sockaddr.ss_family) { + case AF_INET: + listener_port + = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port; + break; + + case AF_INET6: + listener_port + = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port; + break; + + default: + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "invalid address family (%d)", + listener->trans->myinfo.sockaddr.ss_family); + goto unlock; + } + + listener_port = ntohs (listener_port); + + if (listener_port == port) { found = 1; break; } } } +unlock: pthread_mutex_unlock (&svc->rpclock); if (!found) { @@ -1600,7 +1447,7 @@ rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr, int hdrcount, struct iovec *payload, int payloadcount, struct iobref *iobref) { - if ((!req) || (!req->conn) || (!proghdr) || (!proghdr->iov_base)) + if ((!req) || (!req->trans) || (!proghdr) || (!proghdr->iov_base)) return -1; return rpcsvc_submit_generic (req, proghdr, hdrcount, payload, @@ -1639,34 +1486,35 @@ err: } -int -rpcsvc_conn_peername (rpcsvc_conn_t *conn, char *hostname, int hostlen) +inline int +rpcsvc_transport_peername (rpc_transport_t *trans, char *hostname, int hostlen) { - if (!conn || !conn->trans) + if (!trans) { return -1; + } - return rpc_transport_get_peername (conn->trans, hostname, hostlen); + return rpc_transport_get_peername (trans, hostname, hostlen); } -int -rpcsvc_conn_peeraddr (rpcsvc_conn_t *conn, char *addrstr, int addrlen, - struct sockaddr *sa, socklen_t sasize) +inline int +rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen, + struct sockaddr_storage *sa, socklen_t sasize) { - if (!conn || !conn->trans) + if (!trans) { return -1; + } - return rpc_transport_get_peeraddr(conn->trans, addrstr, addrlen, sa, + return rpc_transport_get_peeraddr(trans, addrstr, addrlen, sa, sasize); } -rpcsvc_conn_t * -rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name) +rpc_transport_t * +rpcsvc_transport_create (rpcsvc_t *svc, dict_t *options, char *name) { - int ret = -1; + int ret = -1; rpc_transport_t *trans = NULL; - rpcsvc_conn_t *conn = NULL; trans = rpc_transport_load (svc->ctx, options, name); if (!trans) { @@ -1675,18 +1523,16 @@ rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name) goto out; } - ret = rpc_transport_listen (trans); + ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc); if (ret == -1) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, - "listening on transport failed"); + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed"); goto out; } - conn = rpcsvc_conn_init (svc, trans); - if (!conn) { - ret = -1; + ret = rpc_transport_listen (trans); + if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_DEBUG, - "initializing connection for transport failed"); + "listening on transport failed"); goto out; } @@ -1694,13 +1540,14 @@ rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name) out: if ((ret == -1) && (trans)) { rpc_transport_disconnect (trans); + trans = NULL; } - return conn; + return trans; } rpcsvc_listener_t * -rpcsvc_listener_alloc (rpcsvc_t *svc, rpcsvc_conn_t *conn) +rpcsvc_listener_alloc (rpcsvc_t *svc, rpc_transport_t *trans) { rpcsvc_listener_t *listener = NULL; @@ -1711,7 +1558,8 @@ rpcsvc_listener_alloc (rpcsvc_t *svc, rpcsvc_conn_t *conn) goto out; } - listener->conn = conn; + listener->trans = trans; + listener->svc = svc; INIT_LIST_HEAD (&listener->list); @@ -1728,27 +1576,26 @@ out: rpcsvc_listener_t * rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name) { - rpcsvc_conn_t *conn = NULL; + rpc_transport_t *trans = NULL; rpcsvc_listener_t *listener = NULL; if (!svc || !options) { goto out; } - conn = rpcsvc_conn_create (svc, options, name); - if (!conn) { + trans = rpcsvc_transport_create (svc, options, name); + if (!trans) { goto out; } - listener = rpcsvc_listener_alloc (svc, conn); + listener = rpcsvc_listener_alloc (svc, trans); if (listener == NULL) { goto out; } - conn->listener = listener; out: - if (!listener && conn) { - rpcsvc_conn_deinit (conn); + if (!listener && trans) { + rpc_transport_disconnect (trans); } return listener; @@ -1809,18 +1656,20 @@ out: } - int rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program) { - rpcsvc_program_t *newprog = NULL; - int ret = -1; - rpcsvc_listener_t *listener = NULL; + rpcsvc_program_t *newprog = NULL; + int ret = -1; + rpcsvc_listener_t *listener = NULL; + data_t *listen_port_data = NULL; + uint16_t listen_port = 0; if (!svc) return -1; - newprog = GF_CALLOC (1, sizeof(*newprog), gf_common_mt_rpcsvc_program_t); + newprog = GF_CALLOC (1, sizeof(*newprog), + gf_common_mt_rpcsvc_program_t); if (!newprog) return -1; @@ -1829,9 +1678,76 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program) memcpy (newprog, &program, sizeof (program)); - listener = svc->listener; + listen_port = RPCSVC_DEFAULT_LISTEN_PORT; + if (program.progport != 0) { + listen_port = program.progport; + } else if (program.options != NULL) { + /* + * FIXME: use a method which does not hard-code each transport's + * option keys. + */ + listen_port_data = dict_get (program.options, "listen-port"); + if (listen_port_data != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } else if ((listen_port_data + = dict_get (program.options, + "transport.socket.listen-port")) + != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } else if ((listen_port_data + = dict_get (program.options, + "transport.rdma.listen-port")) + != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } + } + + listener = rpcsvc_get_listener (svc, listen_port, NULL); + if ((listener == NULL) || (listener->trans == NULL)) { + if ((listener != NULL) && (listener->trans == NULL)) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "empty listener without transport found, " + "destroying it"); + rpcsvc_listener_destroy (listener); + } + + if (program.progport != 0) { + ret = dict_set (program.options, + "transport.socket.listen-port", + data_from_uint16 (listen_port)); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "setting listening port (%d) specified " + "by program (%s) in options dictionary " + "failed", listen_port, + program.progname); + goto free_prog; + } - ret = rpcsvc_program_register_portmap (newprog, listener->conn); + ret = dict_set (program.options, + "transport.rdma.listen-port", + data_from_uint16 (listen_port)); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "setting listening port (%d) specified " + "by program (%s) in options dictionary " + "failed", listen_port, + program.progname); + goto free_prog; + } + } + + listener = rpcsvc_create_listener (svc, program.options, + program.progname); + if (listener == NULL) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "creation of listener for program (%s) failed", + program.progname); + goto free_prog; + } + } + + ret = rpcsvc_program_register_portmap (newprog, program.progport); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap registration of" " program failed"); @@ -1860,6 +1776,7 @@ free_prog: return ret; } + static void free_prog_details (gf_dump_rsp *rsp) { @@ -1882,10 +1799,10 @@ build_prog_details (rpcsvc_request_t *req, gf_dump_rsp *rsp) gf_prog_detail *prog = NULL; gf_prog_detail *prev = NULL; - if (!req || !req->conn || !req->conn->svc) + if (!req || !req->trans || !req->svc) goto out; - list_for_each_entry (program, &req->conn->svc->programs, program) { + list_for_each_entry (program, &req->svc->programs, program) { prog = GF_CALLOC (1, sizeof (*prog), 0); if (!prog) goto out; @@ -1958,9 +1875,11 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options) rpcsvc_t * rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) { - rpcsvc_t *svc = NULL; - int ret = -1; - rpcsvc_listener_t *listener = NULL; + rpcsvc_t *svc = NULL; + int ret = -1, poolcount = 0; + rpcsvc_listener_t *listener = NULL; + uint32_t listen_port = 0; + data_t *listen_port_data = NULL; if ((!ctx) || (!options)) return NULL; @@ -1981,6 +1900,16 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) goto free_svc; } + poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor; + + gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount); + svc->rxpool = mem_pool_new (rpcsvc_request_t, poolcount); + /* TODO: leak */ + if (!svc->rxpool) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed"); + goto free_svc; + } + ret = rpcsvc_auth_init (svc, options); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init " @@ -1993,8 +1922,27 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) svc->ctx = ctx; gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited."); + listen_port = RPCSVC_DEFAULT_LISTEN_PORT; + + /* + * FIXME: use a method which does not hard-code each transport's + * option keys. + */ + listen_port_data = dict_get (options, "listen-port"); + if (listen_port_data != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } else if ((listen_port_data + = dict_get (options, "transport.socket.listen-port")) + != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } else if ((listen_port_data + = dict_get (options, "transport.rdma.listen-port")) + != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } + /* One listen port per RPC */ - listener = rpcsvc_get_listener (svc, 0); + listener = rpcsvc_get_listener (svc, 0, NULL); if (!listener) { /* FIXME: listener is given the name of first program that * creates it. This is not always correct. For eg., multiple @@ -2009,13 +1957,13 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) } } - if (!listener->conn) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no connection " + if (!listener->trans) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no transport " "found"); goto free_svc; } - svc->listener = listener; + gluster_dump_prog.options = options; ret = rpcsvc_program_register (svc, gluster_dump_prog); if (ret) { diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 0ce837a281f..25381af7798 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -49,7 +49,7 @@ #define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB)) #define RPCSVC_FRAGHDR_SIZE 4 /* 4-byte RPC fragment header size */ - +#define RPCSVC_DEFAULT_LISTEN_PORT 6996 #define RPCSVC_DEFAULT_MEMFACTOR 15 #define RPCSVC_EVENTPOOL_SIZE_MULT 1024 #define RPCSVC_POOLCOUNT_MULT 35 @@ -120,69 +120,23 @@ struct rpcsvc_notify_wrapper { }; typedef struct rpcsvc_notify_wrapper rpcsvc_notify_wrapper_t; -#define RPCSVC_CONNSTATE_CONNECTED 1 -#define RPCSVC_CONNSTATE_DISCONNECTED 2 - -#define rpcsvc_conn_check_active(conn) ((conn)->connstate==RPCSVC_CONNSTATE_CONNECTED) typedef struct rpcsvc_request rpcsvc_request_t; -typedef struct rpc_conn_state rpcsvc_conn_t; typedef struct { - rpcsvc_conn_t *conn; - struct sockaddr sa; - struct list_head list; + rpc_transport_t *trans; + rpcsvc_t *svc; + /* FIXME: remove address from this structure. Instead use get_myaddr + * interface implemented by individual transports. + */ + struct sockaddr_storage sa; + struct list_head list; } rpcsvc_listener_t; struct rpcsvc_config { int max_block_size; }; -/* Contains the state for each connection that is used for transmitting and - * receiving RPC messages. - * - * Anything that can be accessed by a RPC program must be synced through - * connlock. - */ -struct rpc_conn_state { - - /* Transport or connection state */ - rpc_transport_t *trans; - - rpcsvc_t *svc; - /* RPC Records and Fragments assembly state. - * All incoming data is staged here before being - * called a full RPC message. - */ - /* rpcsvc_record_state_t rstate; */ - - /* It is possible that a client disconnects while - * the higher layer RPC service is busy in a call. - * In this case, we cannot just free the conn - * structure, since the higher layer service could - * still have a reference to it. - * The refcount avoids freeing until all references - * have been given up, although the connection is clos()ed at the first - * call to unref. - */ - int connref; - pthread_mutex_t connlock; - int connstate; - - /* Memory pool for rpcsvc_request_t */ - struct mem_pool *rxpool; - - /* The request which hasnt yet been handed to the RPC program because - * this request is being treated as a vector request and so needs some - * more data to be got from the network. - */ - /* rpcsvc_request_t *vectoredreq; */ - rpcsvc_listener_t *listener; -}; - -#define RPCSVC_CONNSTATE_CONNECTED 1 -#define RPCSVC_CONNSTATE_DISCONNECTED 2 - #define RPCSVC_MAX_AUTH_BYTES 400 typedef struct rpcsvc_auth_data { int flavour; @@ -198,7 +152,9 @@ typedef struct rpcsvc_auth_data { * */ struct rpcsvc_request { /* connection over which this request came. */ - rpcsvc_conn_t *conn; + rpc_transport_t *trans; + + rpcsvc_t *svc; rpcsvc_program_t *prog; @@ -289,13 +245,10 @@ struct rpcsvc_request { #define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog)) #define rpcsvc_request_program_private(req) (((rpcsvc_program_t *)((req)->program))->private) -#define rpcsvc_request_conn(req) (req)->conn #define rpcsvc_request_accepted(req) ((req)->rpc_status == MSG_ACCEPTED) #define rpcsvc_request_accepted_success(req) ((req)->rpc_err == SUCCESS) #define rpcsvc_request_uid(req) ((req)->uid) #define rpcsvc_request_gid(req) ((req)->gid) -#define rpcsvc_conn_rpcsvc(conn) ((conn)->svc) -#define rpcsvc_request_service(req) (rpcsvc_conn_rpcsvc(rpcsvc_request_conn(req))) #define rpcsvc_request_prog_minauth(req) (rpcsvc_request_program(req)->min_auth) #define rpcsvc_request_cred_flavour(req) (rpcsvc_auth_flavour(req->cred)) #define rpcsvc_request_verf_flavour(req) (rpcsvc_auth_flavour(req->verf)) @@ -443,8 +396,7 @@ extern rpcsvc_listener_t * rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name); extern int -rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, - rpcsvc_conn_t *conn); +rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port); /* Inits the global RPC service data structures. * Called in main. @@ -480,17 +432,19 @@ rpcsvc_error_reply (rpcsvc_request_t *req); #define RPCSVC_AUTH_DONTCARE 3 extern int -rpcsvc_conn_peername (rpcsvc_conn_t *conn, char *hostname, int hostlen); +rpcsvc_transport_peername (rpc_transport_t *trans, char *hostname, int hostlen); -extern int -rpcsvc_conn_peeraddr (rpcsvc_conn_t *conn, char *addrstr, int addrlen, - struct sockaddr *returnsa, socklen_t sasize); +extern inline int +rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen, + struct sockaddr_storage *returnsa, socklen_t sasize); extern int -rpcsvc_conn_peer_check (dict_t *options, char *volname, rpcsvc_conn_t *conn); +rpcsvc_transport_peer_check (dict_t *options, char *volname, + rpc_transport_t *trans); extern int -rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn); +rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname, + rpc_transport_t *trans); #define rpcsvc_request_seterr(req, err) (req)->rpc_err = err #define rpcsvc_request_set_autherr(req, err) (req)->auth_err = err @@ -501,7 +455,7 @@ extern int rpcsvc_request_attach_vector (rpcsvc_request_t *req, struct iobref *ioref, int finalvector); -typedef int (*auth_init_conn) (rpcsvc_conn_t *conn, void *priv); +typedef int (*auth_init_trans) (rpc_transport_t *trans, void *priv); typedef int (*auth_init_request) (rpcsvc_request_t *req, void *priv); typedef int (*auth_request_authenticate) (rpcsvc_request_t *req, void *priv); @@ -510,7 +464,7 @@ typedef int (*auth_request_authenticate) (rpcsvc_request_t *req, void *priv); * each connection will end up using a different authentication scheme. */ typedef struct rpcsvc_auth_ops { - auth_init_conn conn_init; + auth_init_trans transport_init; auth_init_request request_init; auth_request_authenticate authenticate; } rpcsvc_auth_ops_t; @@ -546,7 +500,7 @@ extern int rpcsvc_auth_init (rpcsvc_t *svc, dict_t *options); extern int -rpcsvc_auth_conn_init (rpcsvc_conn_t *xprt); +rpcsvc_auth_transport_init (rpc_transport_t *xprt); extern int rpcsvc_authenticate (rpcsvc_request_t *req); diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 961fcdd7ad7..ccddfbc8d76 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1663,6 +1663,7 @@ socket_server_event_handler (int fd, int idx, void *data, new_trans->xl = this->xl; new_trans->mydata = this->mydata; new_trans->notify = this->notify; + new_trans->listener = this; new_priv = new_trans->private; pthread_mutex_lock (&new_priv->lock); @@ -2204,7 +2205,7 @@ out: int32_t socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, - struct sockaddr *sa, socklen_t salen) + struct sockaddr_storage *sa, socklen_t salen) { int32_t ret = -1; @@ -2212,7 +2213,7 @@ socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, goto out; } - *sa = *((struct sockaddr *)&this->peerinfo.sockaddr); + *sa = this->peerinfo.sockaddr; if (peeraddr != NULL) { ret = socket_getpeername (this, peeraddr, addrlen); @@ -2245,7 +2246,7 @@ out: int32_t socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen, - struct sockaddr *sa, socklen_t salen) + struct sockaddr_storage *sa, socklen_t salen) { int32_t ret = 0; @@ -2253,7 +2254,7 @@ socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen, goto out; } - *sa = *((struct sockaddr *)&this->myinfo.sockaddr); + *sa = this->myinfo.sockaddr; if (myaddr != NULL) { ret = socket_getmyname (this, myaddr, addrlen); |