summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorRaghavendra G <raghavendra@gluster.com>2010-08-17 05:35:42 +0000
committerAnand V. Avati <avati@dev.gluster.com>2010-08-18 00:37:03 -0700
commit4e01a54eaa6da1bd6817d62dcc51a75e22699e2b (patch)
tree7832f4070729ef43f0ee1560bf3c737b2e123d49 /rpc
parentd8a8a66523e06abc0f44e1cdfe528cbf28d881a9 (diff)
rpc - cleanup and changes related to rdma
- remove rpc_conn_state structure. - add a member to point struct rpc_req in rpc_transport_req structure. This is needed for rdma to store rdma specific per request data. Signed-off-by: Raghavendra G <raghavendra@gluster.com> Signed-off-by: Anand V. Avati <avati@dev.gluster.com> BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-lib/src/auth-glusterfs.c2
-rw-r--r--rpc/rpc-lib/src/auth-null.c2
-rw-r--r--rpc/rpc-lib/src/auth-unix.c2
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c5
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c4
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h18
-rw-r--r--rpc/rpc-lib/src/rpcsvc-auth.c2
-rw-r--r--rpc/rpc-lib/src/rpcsvc-common.h4
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c768
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h92
-rw-r--r--rpc/rpc-transport/socket/src/socket.c9
11 files changed, 409 insertions, 499 deletions
diff --git a/rpc/rpc-lib/src/auth-glusterfs.c b/rpc/rpc-lib/src/auth-glusterfs.c
index 4748a318e74..459cad87791 100644
--- a/rpc/rpc-lib/src/auth-glusterfs.c
+++ b/rpc/rpc-lib/src/auth-glusterfs.c
@@ -180,7 +180,7 @@ err:
}
rpcsvc_auth_ops_t auth_glusterfs_ops = {
- .conn_init = NULL,
+ .transport_init = NULL,
.request_init = auth_glusterfs_request_init,
.authenticate = auth_glusterfs_authenticate
};
diff --git a/rpc/rpc-lib/src/auth-null.c b/rpc/rpc-lib/src/auth-null.c
index bfdabaa840c..20dd7e77c8b 100644
--- a/rpc/rpc-lib/src/auth-null.c
+++ b/rpc/rpc-lib/src/auth-null.c
@@ -50,7 +50,7 @@ int auth_null_authenticate (rpcsvc_request_t *req, void *priv)
}
rpcsvc_auth_ops_t auth_null_ops = {
- .conn_init = NULL,
+ .transport_init = NULL,
.request_init = auth_null_request_init,
.authenticate = auth_null_authenticate
};
diff --git a/rpc/rpc-lib/src/auth-unix.c b/rpc/rpc-lib/src/auth-unix.c
index 4e99c1a5b6f..30b395bd4fa 100644
--- a/rpc/rpc-lib/src/auth-unix.c
+++ b/rpc/rpc-lib/src/auth-unix.c
@@ -70,7 +70,7 @@ err:
}
rpcsvc_auth_ops_t auth_unix_ops = {
- .conn_init = NULL,
+ .transport_init = NULL,
.request_init = auth_unix_request_init,
.authenticate = auth_unix_authenticate
};
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index da25d33c1ed..bac8b0246a8 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -418,7 +418,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)
if (ret == -1) {
gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the saved "
- "frame corresponding to xid (%d)", info->xid);
+ "frame corresponding to xid (%d) for msg arrived on "
+ "transport %s",
+ info->xid, clnt->conn.trans->name);
goto out;
}
@@ -1268,6 +1270,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
req.rsp.rsp_payload = rsp_payload;
req.rsp.rsp_payload_count = rsp_payload_count;
req.rsp.rsp_iobref = rsp_iobref;
+ req.rpc_req = rpcreq;
ret = rpc_transport_submit_request (rpc->conn.trans,
&req);
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index 200ff383052..e3bc519fc97 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -587,7 +587,7 @@ validate_volume_options (char *name, dict_t *options, volume_option_t *opt)
int32_t
rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen,
- struct sockaddr *sa, size_t salen)
+ struct sockaddr_storage *sa, size_t salen)
{
if (!this)
return -1;
@@ -614,7 +614,7 @@ rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen)
int32_t
rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
- struct sockaddr *sa, size_t salen)
+ struct sockaddr_storage *sa, size_t salen)
{
if (!this)
return -1;
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index 7ef3abb7320..a6a3441dcdd 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -125,8 +125,9 @@ struct rpc_transport_rsp {
typedef struct rpc_transport_rsp rpc_transport_rsp_t;
struct rpc_transport_req {
- rpc_transport_msg_t msg;
- rpc_transport_rsp_t rsp;
+ rpc_transport_msg_t msg;
+ rpc_transport_rsp_t rsp;
+ struct rpc_req *rpc_req;
};
typedef struct rpc_transport_req rpc_transport_req_t;
@@ -168,6 +169,11 @@ typedef int (*rpc_transport_notify_t) (rpc_transport_t *, void *mydata,
rpc_transport_event_t, void *data, ...);
struct rpc_transport {
struct rpc_transport_ops *ops;
+ rpc_transport_t *listener; /* listener transport to which
+ * request for creation of this
+ * transport came from. valid only
+ * on server process.
+ */
void *private;
void *xl_private;
void *xl; /* Used for THIS */
@@ -202,12 +208,12 @@ struct rpc_transport_ops {
int32_t (*get_peername) (rpc_transport_t *this, char *hostname,
int hostlen);
int32_t (*get_peeraddr) (rpc_transport_t *this, char *peeraddr,
- int addrlen, struct sockaddr *sa,
+ int addrlen, struct sockaddr_storage *sa,
socklen_t sasize);
int32_t (*get_myname) (rpc_transport_t *this, char *hostname,
int hostlen);
int32_t (*get_myaddr) (rpc_transport_t *this, char *peeraddr,
- int addrlen, struct sockaddr *sa,
+ int addrlen, struct sockaddr_storage *sa,
socklen_t sasize);
};
@@ -253,14 +259,14 @@ rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen);
int32_t
rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
- struct sockaddr *sa, size_t salen);
+ struct sockaddr_storage *sa, size_t salen);
int32_t
rpc_transport_get_myname (rpc_transport_t *this, char *hostname, int hostlen);
int32_t
rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen,
- struct sockaddr *sa, size_t salen);
+ struct sockaddr_storage *sa, size_t salen);
rpc_transport_pollin_t *
rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector,
diff --git a/rpc/rpc-lib/src/rpcsvc-auth.c b/rpc/rpc-lib/src/rpcsvc-auth.c
index d14b91f3a01..5cfa255ba95 100644
--- a/rpc/rpc-lib/src/rpcsvc-auth.c
+++ b/rpc/rpc-lib/src/rpcsvc-auth.c
@@ -210,7 +210,7 @@ __rpcsvc_auth_get_handler (rpcsvc_request_t *req)
if (!req)
return NULL;
- svc = rpcsvc_request_service (req);
+ svc = req->svc;
if (!svc) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "!svc");
goto err;
diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h
index aef791cfa0d..7e72bc3ae44 100644
--- a/rpc/rpc-lib/src/rpcsvc-common.h
+++ b/rpc/rpc-lib/src/rpcsvc-common.h
@@ -62,8 +62,6 @@ typedef struct rpcsvc_state {
glusterfs_ctx_t *ctx;
- void *listener;
-
/* list of connections which will listen for incoming connections */
struct list_head listeners;
@@ -76,7 +74,7 @@ typedef struct rpcsvc_state {
void *mydata; /* This is xlator */
rpcsvc_notify_t notifyfn;
-
+ struct mem_pool *rxpool;
} rpcsvc_t;
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 57e9b12aaa6..82a19bbd19d 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -48,16 +48,17 @@
struct rpcsvc_program gluster_dump_prog;
-
-#define rpcsvc_alloc_request(con, request) \
+#define rpcsvc_alloc_request(svc, request) \
do { \
- request = (rpcsvc_request_t *) mem_get ((con)->rxpool); \
+ request = (rpcsvc_request_t *) mem_get ((svc)->rxpool); \
memset (request, 0, sizeof (rpcsvc_request_t)); \
} while (0)
+rpcsvc_listener_t *
+rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans);
int
-rpcsvc_conn_peer_check_search (dict_t *options, char *pattern, char *clstr)
+rpcsvc_transport_peer_check_search (dict_t *options, char *pattern, char *clstr)
{
int ret = -1;
char *addrtok = NULL;
@@ -103,7 +104,7 @@ err:
int
-rpcsvc_conn_peer_check_allow (dict_t *options, char *volname, char *clstr)
+rpcsvc_transport_peer_check_allow (dict_t *options, char *volname, char *clstr)
{
int ret = RPCSVC_AUTH_DONTCARE;
char *srchstr = NULL;
@@ -126,7 +127,7 @@ rpcsvc_conn_peer_check_allow (dict_t *options, char *volname, char *clstr)
} else
srchstr = globalrule;
- ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr);
+ ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr);
if (volname)
GF_FREE (srchstr);
@@ -139,7 +140,7 @@ out:
}
int
-rpcsvc_conn_peer_check_reject (dict_t *options, char *volname, char *clstr)
+rpcsvc_transport_peer_check_reject (dict_t *options, char *volname, char *clstr)
{
int ret = RPCSVC_AUTH_DONTCARE;
char *srchstr = NULL;
@@ -158,7 +159,7 @@ rpcsvc_conn_peer_check_reject (dict_t *options, char *volname, char *clstr)
} else
srchstr = generalrule;
- ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr);
+ ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr);
if (volname)
GF_FREE (srchstr);
@@ -301,18 +302,18 @@ rpcsvc_combine_gen_spec_volume_checks (int gen, int spec)
int
-rpcsvc_conn_peer_check_name (dict_t *options, char *volname,
- rpcsvc_conn_t *conn)
+rpcsvc_transport_peer_check_name (dict_t *options, char *volname,
+ rpc_transport_t *trans)
{
int ret = RPCSVC_AUTH_REJECT;
int aret = RPCSVC_AUTH_REJECT;
int rjret = RPCSVC_AUTH_REJECT;
char clstr[RPCSVC_PEER_STRLEN];
- if (!conn)
+ if (!trans)
return ret;
- ret = rpcsvc_conn_peername (conn, clstr, RPCSVC_PEER_STRLEN);
+ ret = rpc_transport_get_peername (trans, clstr, RPCSVC_PEER_STRLEN);
if (ret != 0) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "
"%s", gai_strerror (ret));
@@ -320,8 +321,8 @@ rpcsvc_conn_peer_check_name (dict_t *options, char *volname,
goto err;
}
- aret = rpcsvc_conn_peer_check_allow (options, volname, clstr);
- rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr);
+ aret = rpcsvc_transport_peer_check_allow (options, volname, clstr);
+ rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr);
ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret);
@@ -331,17 +332,19 @@ err:
int
-rpcsvc_conn_peer_check_addr (dict_t *options, char *volname,rpcsvc_conn_t *conn)
+rpcsvc_transport_peer_check_addr (dict_t *options, char *volname,
+ rpc_transport_t *trans)
{
int ret = RPCSVC_AUTH_REJECT;
int aret = RPCSVC_AUTH_DONTCARE;
int rjret = RPCSVC_AUTH_REJECT;
char clstr[RPCSVC_PEER_STRLEN];
- if (!conn)
+ if (!trans)
return ret;
- ret = rpcsvc_conn_peeraddr (conn, clstr, RPCSVC_PEER_STRLEN, NULL, 0);
+ ret = rpcsvc_transport_peeraddr (trans, clstr, RPCSVC_PEER_STRLEN, NULL,
+ 0);
if (ret != 0) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: "
"%s", gai_strerror (ret));
@@ -349,8 +352,8 @@ rpcsvc_conn_peer_check_addr (dict_t *options, char *volname,rpcsvc_conn_t *conn)
goto err;
}
- aret = rpcsvc_conn_peer_check_allow (options, volname, clstr);
- rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr);
+ aret = rpcsvc_transport_peer_check_allow (options, volname, clstr);
+ rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr);
ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret);
err:
@@ -359,8 +362,8 @@ err:
int
-rpcsvc_conn_check_volume_specific (dict_t *options, char *volname,
- rpcsvc_conn_t *conn)
+rpcsvc_transport_check_volume_specific (dict_t *options, char *volname,
+ rpc_transport_t *trans)
{
int namechk = RPCSVC_AUTH_REJECT;
int addrchk = RPCSVC_AUTH_REJECT;
@@ -368,7 +371,7 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname,
char *namestr = NULL;
int ret = 0;
- if ((!options) || (!volname) || (!conn))
+ if ((!options) || (!volname) || (!trans))
return RPCSVC_AUTH_REJECT;
/* Enabled by default */
@@ -389,8 +392,9 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname,
* specific which will over-ride the network address rules.
*/
if (namelookup)
- namechk = rpcsvc_conn_peer_check_name (options, volname, conn);
- addrchk = rpcsvc_conn_peer_check_addr (options, volname, conn);
+ namechk = rpcsvc_transport_peer_check_name (options, volname,
+ trans);
+ addrchk = rpcsvc_transport_peer_check_addr (options, volname, trans);
if (namelookup)
ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk);
@@ -402,7 +406,7 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname,
int
-rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn)
+rpcsvc_transport_check_volume_general (dict_t *options, rpc_transport_t *trans)
{
int addrchk = RPCSVC_AUTH_REJECT;
int namechk = RPCSVC_AUTH_REJECT;
@@ -410,7 +414,7 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn)
char *namestr = NULL;
int ret = 0;
- if ((!options) || (!conn))
+ if ((!options) || (!trans))
return RPCSVC_AUTH_REJECT;
/* Enabled by default */
@@ -431,8 +435,10 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn)
* specific which will over-ride the network address rules.
*/
if (namelookup)
- namechk = rpcsvc_conn_peer_check_name (options, NULL, conn);
- addrchk = rpcsvc_conn_peer_check_addr (options, NULL, conn);
+ namechk = rpcsvc_transport_peer_check_name (options, NULL,
+ trans);
+
+ addrchk = rpcsvc_transport_peer_check_addr (options, NULL, trans);
if (namelookup)
ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk);
@@ -443,17 +449,18 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn)
}
int
-rpcsvc_conn_peer_check (dict_t *options, char *volname, rpcsvc_conn_t *conn)
+rpcsvc_transport_peer_check (dict_t *options, char *volname,
+ rpc_transport_t *trans)
{
int general_chk = RPCSVC_AUTH_REJECT;
int specific_chk = RPCSVC_AUTH_REJECT;
- if ((!options) || (!volname) || (!conn))
+ if ((!options) || (!volname) || (!trans))
return RPCSVC_AUTH_REJECT;
- general_chk = rpcsvc_conn_check_volume_general (options, conn);
- specific_chk = rpcsvc_conn_check_volume_specific (options, volname,
- conn);
+ general_chk = rpcsvc_transport_check_volume_general (options, trans);
+ specific_chk = rpcsvc_transport_check_volume_specific (options, volname,
+ trans);
return rpcsvc_combine_gen_spec_volume_checks (general_chk,specific_chk);
}
@@ -494,68 +501,10 @@ out:
}
-
-/* Initialize the core of a connection */
-rpcsvc_conn_t *
-rpcsvc_conn_alloc (rpcsvc_t *svc, rpc_transport_t *trans)
-{
- rpcsvc_conn_t *conn = NULL;
- int ret = -1;
- unsigned int poolcount = 0;
-
- conn = GF_CALLOC (1, sizeof(*conn), gf_common_mt_rpcsvc_conn_t);
- if (!conn) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed");
- return NULL;
- }
-
- conn->trans = trans;
- conn->svc = svc;
- poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor;
-
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount);
- conn->rxpool = mem_pool_new (rpcsvc_request_t, poolcount);
- /* TODO: leak */
- if (!conn->rxpool) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed");
- goto free_conn;
- }
-
- /* Cannot consider a connection connected unless the user of this
- * connection decides it is ready to use. It is possible that we have
- * to free this connection soon after. That free will not happpen
- * unless the state is disconnected.
- */
- conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED;
- pthread_mutex_init (&conn->connlock, NULL);
- conn->connref = 0;
-
- ret = 0;
-
-free_conn:
- if (ret == -1) {
- GF_FREE (conn);
- conn = NULL;
- }
-
- return conn;
-}
-
int
rpcsvc_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_event_t event, void *data, ...);
-void
-rpcsvc_conn_state_init (rpcsvc_conn_t *conn)
-{
- if (!conn)
- return;
-
- ++conn->connref;
- conn->connstate = RPCSVC_CONNSTATE_CONNECTED;
-}
-
-
rpcsvc_notify_wrapper_t *
rpcsvc_notify_wrapper_alloc (void)
{
@@ -582,11 +531,7 @@ rpcsvc_listener_destroy (rpcsvc_listener_t *listener)
goto out;
}
- if (!listener->conn) {
- goto listener_free;
- }
-
- svc = listener->conn->svc;
+ svc = listener->svc;
if (!svc) {
goto listener_free;
}
@@ -604,162 +549,11 @@ out:
}
-void
-rpcsvc_conn_destroy (rpcsvc_conn_t *conn)
-{
- rpcsvc_listener_t *listener = NULL;
-
- if (!conn || !conn->rxpool || !conn->listener)
- goto out;
-
- if (conn->trans)
- rpc_transport_destroy (conn->trans);
-
- mem_pool_destroy (conn->rxpool);
-
- listener = conn->listener;
- if (listener->conn == conn) {
- rpcsvc_listener_destroy (listener);
- }
-
- /* Need to destory record state, txlists etc. */
- GF_FREE (conn);
-out:
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection destroyed");
-}
-
-
-rpcsvc_conn_t *
-rpcsvc_conn_init (rpcsvc_t *svc, rpc_transport_t *trans)
-{
- int ret = -1;
- rpcsvc_conn_t *conn = NULL;
-
- conn = rpcsvc_conn_alloc (svc, trans);
- if (!conn) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "cannot init a connection");
- goto out;
- }
-
- ret = rpc_transport_register_notify (trans, rpcsvc_notify, conn);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed");
- rpcsvc_conn_destroy (conn);
- conn = NULL;
- goto out;
- }
-
- rpcsvc_conn_state_init (conn);
-
-out:
- return conn;
-}
-
-
-int
-__rpcsvc_conn_unref (rpcsvc_conn_t *conn)
-{
- --conn->connref;
- return conn->connref;
-}
-
-
-void
-__rpcsvc_conn_deinit (rpcsvc_conn_t *conn)
-{
- if (!conn)
- return;
-
- if (rpcsvc_conn_check_active (conn)) {
- conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED;
- }
-
- if (conn->trans) {
- rpc_transport_disconnect (conn->trans);
- conn->trans = NULL;
- }
-}
-
-
-void
-rpcsvc_conn_deinit (rpcsvc_conn_t *conn)
-{
- int ref = 0;
-
- if (!conn)
- return;
-
- pthread_mutex_lock (&conn->connlock);
- {
- __rpcsvc_conn_deinit (conn);
- ref = __rpcsvc_conn_unref (conn);
- }
- pthread_mutex_unlock (&conn->connlock);
-
- if (ref == 0)
- rpcsvc_conn_destroy (conn);
-
- return;
-}
-
-
-void
-rpcsvc_conn_unref (rpcsvc_conn_t *conn)
-{
- int ref = 0;
- if (!conn)
- return;
-
- pthread_mutex_lock (&conn->connlock);
- {
- ref = __rpcsvc_conn_unref (conn);
- }
- pthread_mutex_unlock (&conn->connlock);
-
- if (ref == 0) {
- rpcsvc_conn_destroy (conn);
- }
-}
-
-
int
-rpcsvc_conn_active (rpcsvc_conn_t *conn)
+rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname,
+ rpc_transport_t *trans)
{
- int status = 0;
-
- if (!conn)
- return 0;
-
- pthread_mutex_lock (&conn->connlock);
- {
- status = rpcsvc_conn_check_active (conn);
- }
- pthread_mutex_unlock (&conn->connlock);
-
- return status;
-}
-
-
-void
-rpcsvc_conn_ref (rpcsvc_conn_t *conn)
-{
- if (!conn)
- return;
-
- pthread_mutex_lock (&conn->connlock);
- {
- ++conn->connref;
- }
- pthread_mutex_unlock (&conn->connlock);
-
- return;
-}
-
-
-int
-rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn)
-{
- struct sockaddr_in sa;
+ struct sockaddr_storage sa = {0, };
int ret = RPCSVC_AUTH_REJECT;
socklen_t sasize = sizeof (sa);
char *srchstr = NULL;
@@ -769,11 +563,11 @@ rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn)
uint16_t port = 0;
gf_boolean_t insecure = _gf_false;
- if ((!svc) || (!volname) || (!conn))
+ if ((!svc) || (!volname) || (!trans))
return ret;
- ret = rpcsvc_conn_peeraddr (conn, NULL, 0, (struct sockaddr *)&sa,
- sasize);
+ ret = rpcsvc_transport_peeraddr (trans, NULL, 0, &sa,
+ sasize);
if (ret != 0) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get peer addr: %s",
gai_strerror (ret));
@@ -781,7 +575,12 @@ rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn)
goto err;
}
- port = ntohs (sa.sin_port);
+ if (sa.ss_family == AF_INET) {
+ port = ((struct sockaddr_in *)&sa)->sin_port;
+ } else {
+ port = ((struct sockaddr_in6 *)&sa)->sin6_port;
+ }
+
gf_log (GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port);
/* If the port is already a privileged one, dont bother with checking
* options.
@@ -851,7 +650,7 @@ err:
* of the pointers below are NULL.
*/
rpcsvc_actor_t *
-rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
+rpcsvc_program_actor (rpcsvc_request_t *req)
{
rpcsvc_program_t *program = NULL;
int err = SYSTEM_ERR;
@@ -859,10 +658,10 @@ rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
rpcsvc_t *svc = NULL;
char found = 0;
- if ((!conn) || (!req))
+ if (!req)
goto err;
- svc = conn->svc;
+ svc = req->svc;
pthread_mutex_lock (&svc->rpclock);
{
list_for_each_entry (program, &svc->programs, program) {
@@ -938,9 +737,9 @@ rpcsvc_program_notify (rpcsvc_listener_t *listener, rpcsvc_event_t event,
goto out;
}
- list_for_each_entry (wrapper, &listener->list, list) {
+ list_for_each_entry (wrapper, &listener->svc->notify, list) {
if (wrapper->notify) {
- wrapper->notify (listener->conn->svc,
+ wrapper->notify (listener->svc,
wrapper->data,
event, data);
}
@@ -951,37 +750,29 @@ out:
}
-int
-rpcsvc_accept (rpcsvc_conn_t *listen_conn, rpc_transport_t *new_trans)
+inline int
+rpcsvc_accept (rpcsvc_t *svc, rpc_transport_t *listen_trans,
+ rpc_transport_t *new_trans)
{
rpcsvc_listener_t *listener = NULL;
- rpcsvc_conn_t *conn = NULL;
- char clstr[RPCSVC_PEER_STRLEN];
-
- listener = listen_conn->listener;
- conn = rpcsvc_conn_init (listen_conn->svc, new_trans);
- if (!conn) {
- rpc_transport_disconnect (new_trans);
- memset (clstr, 0, RPCSVC_PEER_STRLEN);
- rpc_transport_get_peername (new_trans, clstr,
- RPCSVC_PEER_STRLEN);
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "allocating connection for "
- "new transport (%s) failed", clstr);
+ int32_t ret = -1;
+
+ listener = rpcsvc_get_listener (svc, -1, listen_trans);
+ if (listener == NULL) {
goto out;
}
- conn->listener = listener;
-
- //rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, conn);
+ rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, new_trans);
+ ret = 0;
out:
- return 0;
+ return ret;
}
void
-rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
+rpcsvc_request_destroy (rpcsvc_request_t *req)
{
- if (!conn || !req) {
+ if (!req) {
goto out;
}
@@ -989,18 +780,21 @@ rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req)
iobref_unref (req->iobref);
}
- mem_put (conn->rxpool, req);
+ mem_put (req->svc->rxpool, req);
+
+ rpc_transport_unref (req->trans);
out:
return;
}
rpcsvc_request_t *
-rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,
+rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
+ struct rpc_msg *callmsg,
struct iovec progmsg, rpc_transport_pollin_t *msg,
rpcsvc_request_t *req)
{
- if ((!conn) || (!callmsg)|| (!req) || (!msg))
+ if ((!trans) || (!callmsg)|| (!req) || (!msg))
return NULL;
/* We start a RPC request as always denied. */
@@ -1009,7 +803,7 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,
req->prognum = rpc_call_program (callmsg);
req->progver = rpc_call_progver (callmsg);
req->procnum = rpc_call_progproc (callmsg);
- req->conn = conn;
+ req->trans = rpc_transport_ref (trans);
req->count = msg->count;
req->msg[0] = progmsg;
req->iobref = iobref_ref (msg->iobref);
@@ -1017,6 +811,7 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,
req->msg[1] = msg->vector[1];
}
+ req->svc = svc;
req->trans_private = msg->private;
INIT_LIST_HEAD (&req->txlist);
@@ -1038,7 +833,8 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg,
rpcsvc_request_t *
-rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
+rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,
+ rpc_transport_pollin_t *msg)
{
char *msgbuf = NULL;
struct rpc_msg rpcmsg;
@@ -1047,7 +843,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
size_t msglen = 0;
int ret = -1;
- if (!conn)
+ if (!svc || !trans)
return NULL;
/* We need to allocate the request before actually calling
@@ -1056,7 +852,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
* This avoids a need to keep a temp buffer into which the auth data
* would've been copied otherwise.
*/
- rpcsvc_alloc_request (conn, req);
+ rpcsvc_alloc_request (svc, req);
if (!req) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to alloc request");
goto err;
@@ -1075,7 +871,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
}
ret = -1;
- rpcsvc_request_init (conn, &rpcmsg, progmsg, msg, req);
+ rpcsvc_request_init (svc, trans, &rpcmsg, progmsg, msg, req);
gf_log (GF_RPCSVC, GF_LOG_TRACE, "RPC XID: %lx, Ver: %ld, Program: %ld,"
" ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg),
@@ -1120,30 +916,33 @@ err:
int
-rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
+rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
+ rpc_transport_pollin_t *msg)
{
rpcsvc_actor_t *actor = NULL;
rpcsvc_request_t *req = NULL;
int ret = -1;
- if (!conn)
+ if (!trans || !svc)
return -1;
- req = rpcsvc_request_create (conn, msg);
+ req = rpcsvc_request_create (svc, trans, msg);
if (!req)
goto err;
if (!rpcsvc_request_accepted (req))
goto err_reply;
- actor = rpcsvc_program_actor (conn, req);
+ actor = rpcsvc_program_actor (req);
if (!actor)
goto err_reply;
if (actor && (req->rpc_err == SUCCESS)) {
+ /* Before going to xlator code, set the THIS properly */
+ THIS = svc->mydata;
+
if (req->count == 2) {
if (actor->vector_actor) {
- rpcsvc_conn_ref (conn);
ret = actor->vector_actor (req, &req->msg[1], 1,
req->iobref);
} else {
@@ -1153,16 +952,14 @@ rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg)
ret = RPCSVC_ACTOR_ERROR;
}
} else if (actor->actor) {
- rpcsvc_conn_ref (req->conn);
- /* Before going to xlator code, set the THIS properly */
- THIS = conn->svc->mydata;
ret = actor->actor (req);
}
}
err_reply:
- if ((ret == RPCSVC_ACTOR_ERROR) || (req->rpc_err != SUCCESS))
+ if ((ret == RPCSVC_ACTOR_ERROR) || (req->rpc_err != SUCCESS)) {
ret = rpcsvc_error_reply (req);
+ }
if (ret)
gf_log ("rpcsvc", GF_LOG_DEBUG, "failed to queue error reply");
@@ -1170,39 +967,87 @@ err_reply:
/* No need to propagate error beyond this function since the reply
* has now been queued. */
ret = 0;
+
err:
return ret;
}
int
+rpcsvc_handle_disconnect (rpcsvc_t *svc, rpc_transport_t *trans)
+{
+ rpcsvc_event_t event;
+ rpcsvc_notify_wrapper_t *wrappers = NULL, *wrapper;
+ int32_t ret = -1, i = 0, wrapper_count = 0;
+ rpcsvc_listener_t *listener = NULL;
+
+ event = (trans->listener == NULL) ? RPCSVC_EVENT_LISTENER_DEAD
+ : RPCSVC_EVENT_DISCONNECT;
+
+ pthread_mutex_lock (&svc->rpclock);
+ {
+ wrappers = GF_CALLOC (svc->notify_count, sizeof (*wrapper),
+ gf_common_mt_rpcsvc_wrapper_t);
+ if (!wrappers) {
+ goto unlock;
+ }
+
+ list_for_each_entry (wrapper, &svc->notify, list) {
+ if (wrapper->notify) {
+ wrappers[i++] = *wrapper;
+ }
+ }
+
+ wrapper_count = i;
+ }
+unlock:
+ pthread_mutex_unlock (&svc->rpclock);
+
+ if (wrappers) {
+ for (i = 0; i < wrapper_count; i++) {
+ wrappers[i].notify (svc, wrappers[i].data,
+ event, trans);
+ }
+
+ GF_FREE (wrappers);
+ }
+
+ if (event == RPCSVC_EVENT_LISTENER_DEAD) {
+ listener = rpcsvc_get_listener (svc, -1, trans->listener);
+ rpcsvc_listener_destroy (listener);
+ }
+
+ return ret;
+}
+
+
+int
rpcsvc_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_event_t event, void *data, ...)
{
- rpcsvc_conn_t *conn = NULL;
int ret = -1;
rpc_transport_pollin_t *msg = NULL;
rpc_transport_t *new_trans = NULL;
+ rpcsvc_t *svc = NULL;
- conn = mydata;
- if (conn == NULL) {
+ svc = mydata;
+ if (svc == NULL) {
goto out;
}
switch (event) {
case RPC_TRANSPORT_ACCEPT:
new_trans = data;
- ret = rpcsvc_accept (conn, new_trans);
+ ret = rpcsvc_accept (svc, trans, new_trans);
break;
case RPC_TRANSPORT_DISCONNECT:
- rpcsvc_conn_deinit (conn);
- ret = 0;
+ ret = rpcsvc_handle_disconnect (svc, trans);
break;
case RPC_TRANSPORT_MSG_RECEIVED:
msg = data;
- ret = rpcsvc_handle_rpc_call (conn, msg);
+ ret = rpcsvc_handle_rpc_call (svc, trans, msg);
break;
case RPC_TRANSPORT_MSG_SENT:
@@ -1274,16 +1119,16 @@ err:
}
-int
-rpcsvc_conn_submit (rpcsvc_conn_t *conn, struct iovec *hdrvec,
- int hdrcount, struct iovec *proghdr, int proghdrcount,
- struct iovec *progpayload, int progpayloadcount,
- struct iobref *iobref, void *priv)
+inline int
+rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec,
+ int hdrcount, struct iovec *proghdr, int proghdrcount,
+ struct iovec *progpayload, int progpayloadcount,
+ struct iobref *iobref, void *priv)
{
int ret = -1;
rpc_transport_reply_t reply = {{0, }};
- if ((!conn) || (!hdrvec) || (!hdrvec->iov_base) || (!conn->trans)) {
+ if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) {
goto out;
}
@@ -1296,15 +1141,7 @@ rpcsvc_conn_submit (rpcsvc_conn_t *conn, struct iovec *hdrvec,
reply.msg.iobref = iobref;
reply.private = priv;
- /* Now that we have both the RPC and Program buffers in xdr format
- * lets hand it to the transmission layer.
- */
- if (!rpcsvc_conn_check_active (conn)) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection inactive");
- goto out;
- }
-
- ret = rpc_transport_submit_reply (conn->trans, &reply);
+ ret = rpc_transport_submit_reply (trans, &reply);
out:
return ret;
@@ -1319,6 +1156,7 @@ rpcsvc_fill_reply (rpcsvc_request_t *req, struct rpc_msg *reply)
return -1;
prog = rpcsvc_request_program (req);
+
rpc_fill_empty_reply (reply, req->xid);
if (req->rpc_status == MSG_DENIED)
@@ -1352,17 +1190,12 @@ rpcsvc_record_build_record (rpcsvc_request_t *req, size_t payload,
char *record = NULL;
struct iovec recordhdr = {0, };
size_t pagesize = 0;
- rpcsvc_conn_t *conn = NULL;
rpcsvc_t *svc = NULL;
- if ((!req) || (!req->conn) || (!recbuf))
+ if ((!req) || (!req->trans) || (!req->svc) || (!recbuf))
return NULL;
- /* First, try to get a pointer into the buffer which the RPC
- * layer can use.
- */
- conn = req->conn;
- svc = rpcsvc_conn_rpcsvc (conn);
+ svc = req->svc;
replyiob = iobuf_get (svc->ctx->iobuf_pool);
pagesize = iobpool_pagesize ((struct iobuf_pool *)svc->ctx->iobuf_pool);
if (!replyiob) {
@@ -1423,17 +1256,17 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
int hdrcount, struct iovec *payload, int payloadcount,
struct iobref *iobref)
{
- int ret = -1, i = 0;
- struct iobuf *replyiob = NULL;
- struct iovec recordhdr = {0, };
- rpcsvc_conn_t *conn = NULL;
- size_t msglen = 0;
- char new_iobref = 0;
-
- if ((!req) || (!req->conn))
+ int ret = -1, i = 0;
+ struct iobuf *replyiob = NULL;
+ struct iovec recordhdr = {0, };
+ rpc_transport_t *trans = NULL;
+ size_t msglen = 0;
+ char new_iobref = 0;
+
+ if ((!req) || (!req->trans))
return -1;
- conn = req->conn;
+ trans = req->trans;
for (i = 0; i < hdrcount; i++) {
msglen += proghdr[i].iov_len;
@@ -1465,11 +1298,9 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
iobref_add (iobref, replyiob);
- ret = rpcsvc_conn_submit (conn, &recordhdr, 1, proghdr, hdrcount,
- payload, payloadcount, iobref,
- req->trans_private);
-
- rpcsvc_request_destroy (conn, req);
+ ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount,
+ payload, payloadcount, iobref,
+ req->trans_private);
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to submit message");
@@ -1484,20 +1315,7 @@ disconnect_exit:
iobref_unref (iobref);
}
- /* Note that a unref is called everytime a reply is sent. This is in
- * response to the ref that is performed on the conn when a request is
- * handed to the RPC program.
- *
- * The catch, however, is that if the reply is an rpc error, we must
- * not unref. This is because the ref only contains
- * references for the actors to which the request was handed plus one
- * reference maintained by the RPC layer. By unrefing for a case where
- * no actor was called, we will be losing the ref held for the RPC
- * layer.
- */
- if ((rpcsvc_request_accepted (req)) &&
- (rpcsvc_request_accepted_success (req)))
- rpcsvc_conn_unref (conn);
+ rpcsvc_request_destroy (req);
return ret;
}
@@ -1519,18 +1337,17 @@ rpcsvc_error_reply (rpcsvc_request_t *req)
/* Register the program with the local portmapper service. */
-int
-rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, rpcsvc_conn_t *conn)
+inline int
+rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port)
{
int ret = 0;
- struct sockaddr_in sa = {0, };
- if (!newprog || !conn->trans) {
+ if (!newprog) {
goto out;
}
if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP,
- sa.sin_port))) {
+ port))) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with"
" portmap");
goto out;
@@ -1559,10 +1376,11 @@ rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog)
rpcsvc_listener_t *
-rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port)
+rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans)
{
- rpcsvc_listener_t *listener = NULL;
- char found = 0;
+ rpcsvc_listener_t *listener = NULL;
+ char found = 0;
+ uint32_t listener_port = 0;
if (!svc) {
goto out;
@@ -1571,13 +1389,42 @@ rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port)
pthread_mutex_lock (&svc->rpclock);
{
list_for_each_entry (listener, &svc->listeners, list) {
- if (((struct sockaddr_in *)&listener->sa)->sin_port
- == port) {
+ if (trans != NULL) {
+ if (listener->trans == trans) {
+ found = 1;
+ break;
+ }
+
+ continue;
+ }
+
+ switch (listener->trans->myinfo.sockaddr.ss_family) {
+ case AF_INET:
+ listener_port
+ = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port;
+ break;
+
+ case AF_INET6:
+ listener_port
+ = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port;
+ break;
+
+ default:
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ "invalid address family (%d)",
+ listener->trans->myinfo.sockaddr.ss_family);
+ goto unlock;
+ }
+
+ listener_port = ntohs (listener_port);
+
+ if (listener_port == port) {
found = 1;
break;
}
}
}
+unlock:
pthread_mutex_unlock (&svc->rpclock);
if (!found) {
@@ -1600,7 +1447,7 @@ rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr,
int hdrcount, struct iovec *payload, int payloadcount,
struct iobref *iobref)
{
- if ((!req) || (!req->conn) || (!proghdr) || (!proghdr->iov_base))
+ if ((!req) || (!req->trans) || (!proghdr) || (!proghdr->iov_base))
return -1;
return rpcsvc_submit_generic (req, proghdr, hdrcount, payload,
@@ -1639,34 +1486,35 @@ err:
}
-int
-rpcsvc_conn_peername (rpcsvc_conn_t *conn, char *hostname, int hostlen)
+inline int
+rpcsvc_transport_peername (rpc_transport_t *trans, char *hostname, int hostlen)
{
- if (!conn || !conn->trans)
+ if (!trans) {
return -1;
+ }
- return rpc_transport_get_peername (conn->trans, hostname, hostlen);
+ return rpc_transport_get_peername (trans, hostname, hostlen);
}
-int
-rpcsvc_conn_peeraddr (rpcsvc_conn_t *conn, char *addrstr, int addrlen,
- struct sockaddr *sa, socklen_t sasize)
+inline int
+rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen,
+ struct sockaddr_storage *sa, socklen_t sasize)
{
- if (!conn || !conn->trans)
+ if (!trans) {
return -1;
+ }
- return rpc_transport_get_peeraddr(conn->trans, addrstr, addrlen, sa,
+ return rpc_transport_get_peeraddr(trans, addrstr, addrlen, sa,
sasize);
}
-rpcsvc_conn_t *
-rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name)
+rpc_transport_t *
+rpcsvc_transport_create (rpcsvc_t *svc, dict_t *options, char *name)
{
- int ret = -1;
+ int ret = -1;
rpc_transport_t *trans = NULL;
- rpcsvc_conn_t *conn = NULL;
trans = rpc_transport_load (svc->ctx, options, name);
if (!trans) {
@@ -1675,18 +1523,16 @@ rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name)
goto out;
}
- ret = rpc_transport_listen (trans);
+ ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG,
- "listening on transport failed");
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed");
goto out;
}
- conn = rpcsvc_conn_init (svc, trans);
- if (!conn) {
- ret = -1;
+ ret = rpc_transport_listen (trans);
+ if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_DEBUG,
- "initializing connection for transport failed");
+ "listening on transport failed");
goto out;
}
@@ -1694,13 +1540,14 @@ rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name)
out:
if ((ret == -1) && (trans)) {
rpc_transport_disconnect (trans);
+ trans = NULL;
}
- return conn;
+ return trans;
}
rpcsvc_listener_t *
-rpcsvc_listener_alloc (rpcsvc_t *svc, rpcsvc_conn_t *conn)
+rpcsvc_listener_alloc (rpcsvc_t *svc, rpc_transport_t *trans)
{
rpcsvc_listener_t *listener = NULL;
@@ -1711,7 +1558,8 @@ rpcsvc_listener_alloc (rpcsvc_t *svc, rpcsvc_conn_t *conn)
goto out;
}
- listener->conn = conn;
+ listener->trans = trans;
+ listener->svc = svc;
INIT_LIST_HEAD (&listener->list);
@@ -1728,27 +1576,26 @@ out:
rpcsvc_listener_t *
rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name)
{
- rpcsvc_conn_t *conn = NULL;
+ rpc_transport_t *trans = NULL;
rpcsvc_listener_t *listener = NULL;
if (!svc || !options) {
goto out;
}
- conn = rpcsvc_conn_create (svc, options, name);
- if (!conn) {
+ trans = rpcsvc_transport_create (svc, options, name);
+ if (!trans) {
goto out;
}
- listener = rpcsvc_listener_alloc (svc, conn);
+ listener = rpcsvc_listener_alloc (svc, trans);
if (listener == NULL) {
goto out;
}
- conn->listener = listener;
out:
- if (!listener && conn) {
- rpcsvc_conn_deinit (conn);
+ if (!listener && trans) {
+ rpc_transport_disconnect (trans);
}
return listener;
@@ -1809,18 +1656,20 @@ out:
}
-
int
rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program)
{
- rpcsvc_program_t *newprog = NULL;
- int ret = -1;
- rpcsvc_listener_t *listener = NULL;
+ rpcsvc_program_t *newprog = NULL;
+ int ret = -1;
+ rpcsvc_listener_t *listener = NULL;
+ data_t *listen_port_data = NULL;
+ uint16_t listen_port = 0;
if (!svc)
return -1;
- newprog = GF_CALLOC (1, sizeof(*newprog), gf_common_mt_rpcsvc_program_t);
+ newprog = GF_CALLOC (1, sizeof(*newprog),
+ gf_common_mt_rpcsvc_program_t);
if (!newprog)
return -1;
@@ -1829,9 +1678,76 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program)
memcpy (newprog, &program, sizeof (program));
- listener = svc->listener;
+ listen_port = RPCSVC_DEFAULT_LISTEN_PORT;
+ if (program.progport != 0) {
+ listen_port = program.progport;
+ } else if (program.options != NULL) {
+ /*
+ * FIXME: use a method which does not hard-code each transport's
+ * option keys.
+ */
+ listen_port_data = dict_get (program.options, "listen-port");
+ if (listen_port_data != NULL) {
+ listen_port = data_to_uint16 (listen_port_data);
+ } else if ((listen_port_data
+ = dict_get (program.options,
+ "transport.socket.listen-port"))
+ != NULL) {
+ listen_port = data_to_uint16 (listen_port_data);
+ } else if ((listen_port_data
+ = dict_get (program.options,
+ "transport.rdma.listen-port"))
+ != NULL) {
+ listen_port = data_to_uint16 (listen_port_data);
+ }
+ }
+
+ listener = rpcsvc_get_listener (svc, listen_port, NULL);
+ if ((listener == NULL) || (listener->trans == NULL)) {
+ if ((listener != NULL) && (listener->trans == NULL)) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ "empty listener without transport found, "
+ "destroying it");
+ rpcsvc_listener_destroy (listener);
+ }
+
+ if (program.progport != 0) {
+ ret = dict_set (program.options,
+ "transport.socket.listen-port",
+ data_from_uint16 (listen_port));
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ "setting listening port (%d) specified "
+ "by program (%s) in options dictionary "
+ "failed", listen_port,
+ program.progname);
+ goto free_prog;
+ }
- ret = rpcsvc_program_register_portmap (newprog, listener->conn);
+ ret = dict_set (program.options,
+ "transport.rdma.listen-port",
+ data_from_uint16 (listen_port));
+ if (ret == -1) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ "setting listening port (%d) specified "
+ "by program (%s) in options dictionary "
+ "failed", listen_port,
+ program.progname);
+ goto free_prog;
+ }
+ }
+
+ listener = rpcsvc_create_listener (svc, program.options,
+ program.progname);
+ if (listener == NULL) {
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ "creation of listener for program (%s) failed",
+ program.progname);
+ goto free_prog;
+ }
+ }
+
+ ret = rpcsvc_program_register_portmap (newprog, program.progport);
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap registration of"
" program failed");
@@ -1860,6 +1776,7 @@ free_prog:
return ret;
}
+
static void
free_prog_details (gf_dump_rsp *rsp)
{
@@ -1882,10 +1799,10 @@ build_prog_details (rpcsvc_request_t *req, gf_dump_rsp *rsp)
gf_prog_detail *prog = NULL;
gf_prog_detail *prev = NULL;
- if (!req || !req->conn || !req->conn->svc)
+ if (!req || !req->trans || !req->svc)
goto out;
- list_for_each_entry (program, &req->conn->svc->programs, program) {
+ list_for_each_entry (program, &req->svc->programs, program) {
prog = GF_CALLOC (1, sizeof (*prog), 0);
if (!prog)
goto out;
@@ -1958,9 +1875,11 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options)
rpcsvc_t *
rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
{
- rpcsvc_t *svc = NULL;
- int ret = -1;
- rpcsvc_listener_t *listener = NULL;
+ rpcsvc_t *svc = NULL;
+ int ret = -1, poolcount = 0;
+ rpcsvc_listener_t *listener = NULL;
+ uint32_t listen_port = 0;
+ data_t *listen_port_data = NULL;
if ((!ctx) || (!options))
return NULL;
@@ -1981,6 +1900,16 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
goto free_svc;
}
+ poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor;
+
+ gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount);
+ svc->rxpool = mem_pool_new (rpcsvc_request_t, poolcount);
+ /* TODO: leak */
+ if (!svc->rxpool) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed");
+ goto free_svc;
+ }
+
ret = rpcsvc_auth_init (svc, options);
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init "
@@ -1993,8 +1922,27 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
svc->ctx = ctx;
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited.");
+ listen_port = RPCSVC_DEFAULT_LISTEN_PORT;
+
+ /*
+ * FIXME: use a method which does not hard-code each transport's
+ * option keys.
+ */
+ listen_port_data = dict_get (options, "listen-port");
+ if (listen_port_data != NULL) {
+ listen_port = data_to_uint16 (listen_port_data);
+ } else if ((listen_port_data
+ = dict_get (options, "transport.socket.listen-port"))
+ != NULL) {
+ listen_port = data_to_uint16 (listen_port_data);
+ } else if ((listen_port_data
+ = dict_get (options, "transport.rdma.listen-port"))
+ != NULL) {
+ listen_port = data_to_uint16 (listen_port_data);
+ }
+
/* One listen port per RPC */
- listener = rpcsvc_get_listener (svc, 0);
+ listener = rpcsvc_get_listener (svc, 0, NULL);
if (!listener) {
/* FIXME: listener is given the name of first program that
* creates it. This is not always correct. For eg., multiple
@@ -2009,13 +1957,13 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
}
}
- if (!listener->conn) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no connection "
+ if (!listener->trans) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no transport "
"found");
goto free_svc;
}
- svc->listener = listener;
+ gluster_dump_prog.options = options;
ret = rpcsvc_program_register (svc, gluster_dump_prog);
if (ret) {
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index 0ce837a281f..25381af7798 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -49,7 +49,7 @@
#define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB))
#define RPCSVC_FRAGHDR_SIZE 4 /* 4-byte RPC fragment header size */
-
+#define RPCSVC_DEFAULT_LISTEN_PORT 6996
#define RPCSVC_DEFAULT_MEMFACTOR 15
#define RPCSVC_EVENTPOOL_SIZE_MULT 1024
#define RPCSVC_POOLCOUNT_MULT 35
@@ -120,69 +120,23 @@ struct rpcsvc_notify_wrapper {
};
typedef struct rpcsvc_notify_wrapper rpcsvc_notify_wrapper_t;
-#define RPCSVC_CONNSTATE_CONNECTED 1
-#define RPCSVC_CONNSTATE_DISCONNECTED 2
-
-#define rpcsvc_conn_check_active(conn) ((conn)->connstate==RPCSVC_CONNSTATE_CONNECTED)
typedef struct rpcsvc_request rpcsvc_request_t;
-typedef struct rpc_conn_state rpcsvc_conn_t;
typedef struct {
- rpcsvc_conn_t *conn;
- struct sockaddr sa;
- struct list_head list;
+ rpc_transport_t *trans;
+ rpcsvc_t *svc;
+ /* FIXME: remove address from this structure. Instead use get_myaddr
+ * interface implemented by individual transports.
+ */
+ struct sockaddr_storage sa;
+ struct list_head list;
} rpcsvc_listener_t;
struct rpcsvc_config {
int max_block_size;
};
-/* Contains the state for each connection that is used for transmitting and
- * receiving RPC messages.
- *
- * Anything that can be accessed by a RPC program must be synced through
- * connlock.
- */
-struct rpc_conn_state {
-
- /* Transport or connection state */
- rpc_transport_t *trans;
-
- rpcsvc_t *svc;
- /* RPC Records and Fragments assembly state.
- * All incoming data is staged here before being
- * called a full RPC message.
- */
- /* rpcsvc_record_state_t rstate; */
-
- /* It is possible that a client disconnects while
- * the higher layer RPC service is busy in a call.
- * In this case, we cannot just free the conn
- * structure, since the higher layer service could
- * still have a reference to it.
- * The refcount avoids freeing until all references
- * have been given up, although the connection is clos()ed at the first
- * call to unref.
- */
- int connref;
- pthread_mutex_t connlock;
- int connstate;
-
- /* Memory pool for rpcsvc_request_t */
- struct mem_pool *rxpool;
-
- /* The request which hasnt yet been handed to the RPC program because
- * this request is being treated as a vector request and so needs some
- * more data to be got from the network.
- */
- /* rpcsvc_request_t *vectoredreq; */
- rpcsvc_listener_t *listener;
-};
-
-#define RPCSVC_CONNSTATE_CONNECTED 1
-#define RPCSVC_CONNSTATE_DISCONNECTED 2
-
#define RPCSVC_MAX_AUTH_BYTES 400
typedef struct rpcsvc_auth_data {
int flavour;
@@ -198,7 +152,9 @@ typedef struct rpcsvc_auth_data {
* */
struct rpcsvc_request {
/* connection over which this request came. */
- rpcsvc_conn_t *conn;
+ rpc_transport_t *trans;
+
+ rpcsvc_t *svc;
rpcsvc_program_t *prog;
@@ -289,13 +245,10 @@ struct rpcsvc_request {
#define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog))
#define rpcsvc_request_program_private(req) (((rpcsvc_program_t *)((req)->program))->private)
-#define rpcsvc_request_conn(req) (req)->conn
#define rpcsvc_request_accepted(req) ((req)->rpc_status == MSG_ACCEPTED)
#define rpcsvc_request_accepted_success(req) ((req)->rpc_err == SUCCESS)
#define rpcsvc_request_uid(req) ((req)->uid)
#define rpcsvc_request_gid(req) ((req)->gid)
-#define rpcsvc_conn_rpcsvc(conn) ((conn)->svc)
-#define rpcsvc_request_service(req) (rpcsvc_conn_rpcsvc(rpcsvc_request_conn(req)))
#define rpcsvc_request_prog_minauth(req) (rpcsvc_request_program(req)->min_auth)
#define rpcsvc_request_cred_flavour(req) (rpcsvc_auth_flavour(req->cred))
#define rpcsvc_request_verf_flavour(req) (rpcsvc_auth_flavour(req->verf))
@@ -443,8 +396,7 @@ extern rpcsvc_listener_t *
rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name);
extern int
-rpcsvc_program_register_portmap (rpcsvc_program_t *newprog,
- rpcsvc_conn_t *conn);
+rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port);
/* Inits the global RPC service data structures.
* Called in main.
@@ -480,17 +432,19 @@ rpcsvc_error_reply (rpcsvc_request_t *req);
#define RPCSVC_AUTH_DONTCARE 3
extern int
-rpcsvc_conn_peername (rpcsvc_conn_t *conn, char *hostname, int hostlen);
+rpcsvc_transport_peername (rpc_transport_t *trans, char *hostname, int hostlen);
-extern int
-rpcsvc_conn_peeraddr (rpcsvc_conn_t *conn, char *addrstr, int addrlen,
- struct sockaddr *returnsa, socklen_t sasize);
+extern inline int
+rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen,
+ struct sockaddr_storage *returnsa, socklen_t sasize);
extern int
-rpcsvc_conn_peer_check (dict_t *options, char *volname, rpcsvc_conn_t *conn);
+rpcsvc_transport_peer_check (dict_t *options, char *volname,
+ rpc_transport_t *trans);
extern int
-rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn);
+rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname,
+ rpc_transport_t *trans);
#define rpcsvc_request_seterr(req, err) (req)->rpc_err = err
#define rpcsvc_request_set_autherr(req, err) (req)->auth_err = err
@@ -501,7 +455,7 @@ extern int rpcsvc_request_attach_vector (rpcsvc_request_t *req,
struct iobref *ioref, int finalvector);
-typedef int (*auth_init_conn) (rpcsvc_conn_t *conn, void *priv);
+typedef int (*auth_init_trans) (rpc_transport_t *trans, void *priv);
typedef int (*auth_init_request) (rpcsvc_request_t *req, void *priv);
typedef int (*auth_request_authenticate) (rpcsvc_request_t *req, void *priv);
@@ -510,7 +464,7 @@ typedef int (*auth_request_authenticate) (rpcsvc_request_t *req, void *priv);
* each connection will end up using a different authentication scheme.
*/
typedef struct rpcsvc_auth_ops {
- auth_init_conn conn_init;
+ auth_init_trans transport_init;
auth_init_request request_init;
auth_request_authenticate authenticate;
} rpcsvc_auth_ops_t;
@@ -546,7 +500,7 @@ extern int
rpcsvc_auth_init (rpcsvc_t *svc, dict_t *options);
extern int
-rpcsvc_auth_conn_init (rpcsvc_conn_t *xprt);
+rpcsvc_auth_transport_init (rpc_transport_t *xprt);
extern int
rpcsvc_authenticate (rpcsvc_request_t *req);
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 961fcdd7ad7..ccddfbc8d76 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -1663,6 +1663,7 @@ socket_server_event_handler (int fd, int idx, void *data,
new_trans->xl = this->xl;
new_trans->mydata = this->mydata;
new_trans->notify = this->notify;
+ new_trans->listener = this;
new_priv = new_trans->private;
pthread_mutex_lock (&new_priv->lock);
@@ -2204,7 +2205,7 @@ out:
int32_t
socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
- struct sockaddr *sa, socklen_t salen)
+ struct sockaddr_storage *sa, socklen_t salen)
{
int32_t ret = -1;
@@ -2212,7 +2213,7 @@ socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
goto out;
}
- *sa = *((struct sockaddr *)&this->peerinfo.sockaddr);
+ *sa = this->peerinfo.sockaddr;
if (peeraddr != NULL) {
ret = socket_getpeername (this, peeraddr, addrlen);
@@ -2245,7 +2246,7 @@ out:
int32_t
socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen,
- struct sockaddr *sa, socklen_t salen)
+ struct sockaddr_storage *sa, socklen_t salen)
{
int32_t ret = 0;
@@ -2253,7 +2254,7 @@ socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen,
goto out;
}
- *sa = *((struct sockaddr *)&this->myinfo.sockaddr);
+ *sa = this->myinfo.sockaddr;
if (myaddr != NULL) {
ret = socket_getmyname (this, myaddr, addrlen);