diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpcsvc.c')
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 768 |
1 files changed, 358 insertions, 410 deletions
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 57e9b12aaa6..82a19bbd19d 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -48,16 +48,17 @@ struct rpcsvc_program gluster_dump_prog; - -#define rpcsvc_alloc_request(con, request) \ +#define rpcsvc_alloc_request(svc, request) \ do { \ - request = (rpcsvc_request_t *) mem_get ((con)->rxpool); \ + request = (rpcsvc_request_t *) mem_get ((svc)->rxpool); \ memset (request, 0, sizeof (rpcsvc_request_t)); \ } while (0) +rpcsvc_listener_t * +rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans); int -rpcsvc_conn_peer_check_search (dict_t *options, char *pattern, char *clstr) +rpcsvc_transport_peer_check_search (dict_t *options, char *pattern, char *clstr) { int ret = -1; char *addrtok = NULL; @@ -103,7 +104,7 @@ err: int -rpcsvc_conn_peer_check_allow (dict_t *options, char *volname, char *clstr) +rpcsvc_transport_peer_check_allow (dict_t *options, char *volname, char *clstr) { int ret = RPCSVC_AUTH_DONTCARE; char *srchstr = NULL; @@ -126,7 +127,7 @@ rpcsvc_conn_peer_check_allow (dict_t *options, char *volname, char *clstr) } else srchstr = globalrule; - ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr); + ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr); if (volname) GF_FREE (srchstr); @@ -139,7 +140,7 @@ out: } int -rpcsvc_conn_peer_check_reject (dict_t *options, char *volname, char *clstr) +rpcsvc_transport_peer_check_reject (dict_t *options, char *volname, char *clstr) { int ret = RPCSVC_AUTH_DONTCARE; char *srchstr = NULL; @@ -158,7 +159,7 @@ rpcsvc_conn_peer_check_reject (dict_t *options, char *volname, char *clstr) } else srchstr = generalrule; - ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr); + ret = rpcsvc_transport_peer_check_search (options, srchstr, clstr); if (volname) GF_FREE (srchstr); @@ -301,18 +302,18 @@ rpcsvc_combine_gen_spec_volume_checks (int gen, int spec) int -rpcsvc_conn_peer_check_name (dict_t *options, char *volname, - rpcsvc_conn_t *conn) +rpcsvc_transport_peer_check_name (dict_t *options, char *volname, + rpc_transport_t *trans) { int ret = RPCSVC_AUTH_REJECT; int aret = RPCSVC_AUTH_REJECT; int rjret = RPCSVC_AUTH_REJECT; char clstr[RPCSVC_PEER_STRLEN]; - if (!conn) + if (!trans) return ret; - ret = rpcsvc_conn_peername (conn, clstr, RPCSVC_PEER_STRLEN); + ret = rpc_transport_get_peername (trans, clstr, RPCSVC_PEER_STRLEN); if (ret != 0) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: " "%s", gai_strerror (ret)); @@ -320,8 +321,8 @@ rpcsvc_conn_peer_check_name (dict_t *options, char *volname, goto err; } - aret = rpcsvc_conn_peer_check_allow (options, volname, clstr); - rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr); + aret = rpcsvc_transport_peer_check_allow (options, volname, clstr); + rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr); ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret); @@ -331,17 +332,19 @@ err: int -rpcsvc_conn_peer_check_addr (dict_t *options, char *volname,rpcsvc_conn_t *conn) +rpcsvc_transport_peer_check_addr (dict_t *options, char *volname, + rpc_transport_t *trans) { int ret = RPCSVC_AUTH_REJECT; int aret = RPCSVC_AUTH_DONTCARE; int rjret = RPCSVC_AUTH_REJECT; char clstr[RPCSVC_PEER_STRLEN]; - if (!conn) + if (!trans) return ret; - ret = rpcsvc_conn_peeraddr (conn, clstr, RPCSVC_PEER_STRLEN, NULL, 0); + ret = rpcsvc_transport_peeraddr (trans, clstr, RPCSVC_PEER_STRLEN, NULL, + 0); if (ret != 0) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: " "%s", gai_strerror (ret)); @@ -349,8 +352,8 @@ rpcsvc_conn_peer_check_addr (dict_t *options, char *volname,rpcsvc_conn_t *conn) goto err; } - aret = rpcsvc_conn_peer_check_allow (options, volname, clstr); - rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr); + aret = rpcsvc_transport_peer_check_allow (options, volname, clstr); + rjret = rpcsvc_transport_peer_check_reject (options, volname, clstr); ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret); err: @@ -359,8 +362,8 @@ err: int -rpcsvc_conn_check_volume_specific (dict_t *options, char *volname, - rpcsvc_conn_t *conn) +rpcsvc_transport_check_volume_specific (dict_t *options, char *volname, + rpc_transport_t *trans) { int namechk = RPCSVC_AUTH_REJECT; int addrchk = RPCSVC_AUTH_REJECT; @@ -368,7 +371,7 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname, char *namestr = NULL; int ret = 0; - if ((!options) || (!volname) || (!conn)) + if ((!options) || (!volname) || (!trans)) return RPCSVC_AUTH_REJECT; /* Enabled by default */ @@ -389,8 +392,9 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname, * specific which will over-ride the network address rules. */ if (namelookup) - namechk = rpcsvc_conn_peer_check_name (options, volname, conn); - addrchk = rpcsvc_conn_peer_check_addr (options, volname, conn); + namechk = rpcsvc_transport_peer_check_name (options, volname, + trans); + addrchk = rpcsvc_transport_peer_check_addr (options, volname, trans); if (namelookup) ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk); @@ -402,7 +406,7 @@ rpcsvc_conn_check_volume_specific (dict_t *options, char *volname, int -rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn) +rpcsvc_transport_check_volume_general (dict_t *options, rpc_transport_t *trans) { int addrchk = RPCSVC_AUTH_REJECT; int namechk = RPCSVC_AUTH_REJECT; @@ -410,7 +414,7 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn) char *namestr = NULL; int ret = 0; - if ((!options) || (!conn)) + if ((!options) || (!trans)) return RPCSVC_AUTH_REJECT; /* Enabled by default */ @@ -431,8 +435,10 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn) * specific which will over-ride the network address rules. */ if (namelookup) - namechk = rpcsvc_conn_peer_check_name (options, NULL, conn); - addrchk = rpcsvc_conn_peer_check_addr (options, NULL, conn); + namechk = rpcsvc_transport_peer_check_name (options, NULL, + trans); + + addrchk = rpcsvc_transport_peer_check_addr (options, NULL, trans); if (namelookup) ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk); @@ -443,17 +449,18 @@ rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn) } int -rpcsvc_conn_peer_check (dict_t *options, char *volname, rpcsvc_conn_t *conn) +rpcsvc_transport_peer_check (dict_t *options, char *volname, + rpc_transport_t *trans) { int general_chk = RPCSVC_AUTH_REJECT; int specific_chk = RPCSVC_AUTH_REJECT; - if ((!options) || (!volname) || (!conn)) + if ((!options) || (!volname) || (!trans)) return RPCSVC_AUTH_REJECT; - general_chk = rpcsvc_conn_check_volume_general (options, conn); - specific_chk = rpcsvc_conn_check_volume_specific (options, volname, - conn); + general_chk = rpcsvc_transport_check_volume_general (options, trans); + specific_chk = rpcsvc_transport_check_volume_specific (options, volname, + trans); return rpcsvc_combine_gen_spec_volume_checks (general_chk,specific_chk); } @@ -494,68 +501,10 @@ out: } - -/* Initialize the core of a connection */ -rpcsvc_conn_t * -rpcsvc_conn_alloc (rpcsvc_t *svc, rpc_transport_t *trans) -{ - rpcsvc_conn_t *conn = NULL; - int ret = -1; - unsigned int poolcount = 0; - - conn = GF_CALLOC (1, sizeof(*conn), gf_common_mt_rpcsvc_conn_t); - if (!conn) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed"); - return NULL; - } - - conn->trans = trans; - conn->svc = svc; - poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor; - - gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount); - conn->rxpool = mem_pool_new (rpcsvc_request_t, poolcount); - /* TODO: leak */ - if (!conn->rxpool) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed"); - goto free_conn; - } - - /* Cannot consider a connection connected unless the user of this - * connection decides it is ready to use. It is possible that we have - * to free this connection soon after. That free will not happpen - * unless the state is disconnected. - */ - conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED; - pthread_mutex_init (&conn->connlock, NULL); - conn->connref = 0; - - ret = 0; - -free_conn: - if (ret == -1) { - GF_FREE (conn); - conn = NULL; - } - - return conn; -} - int rpcsvc_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...); -void -rpcsvc_conn_state_init (rpcsvc_conn_t *conn) -{ - if (!conn) - return; - - ++conn->connref; - conn->connstate = RPCSVC_CONNSTATE_CONNECTED; -} - - rpcsvc_notify_wrapper_t * rpcsvc_notify_wrapper_alloc (void) { @@ -582,11 +531,7 @@ rpcsvc_listener_destroy (rpcsvc_listener_t *listener) goto out; } - if (!listener->conn) { - goto listener_free; - } - - svc = listener->conn->svc; + svc = listener->svc; if (!svc) { goto listener_free; } @@ -604,162 +549,11 @@ out: } -void -rpcsvc_conn_destroy (rpcsvc_conn_t *conn) -{ - rpcsvc_listener_t *listener = NULL; - - if (!conn || !conn->rxpool || !conn->listener) - goto out; - - if (conn->trans) - rpc_transport_destroy (conn->trans); - - mem_pool_destroy (conn->rxpool); - - listener = conn->listener; - if (listener->conn == conn) { - rpcsvc_listener_destroy (listener); - } - - /* Need to destory record state, txlists etc. */ - GF_FREE (conn); -out: - gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection destroyed"); -} - - -rpcsvc_conn_t * -rpcsvc_conn_init (rpcsvc_t *svc, rpc_transport_t *trans) -{ - int ret = -1; - rpcsvc_conn_t *conn = NULL; - - conn = rpcsvc_conn_alloc (svc, trans); - if (!conn) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, "cannot init a connection"); - goto out; - } - - ret = rpc_transport_register_notify (trans, rpcsvc_notify, conn); - if (ret == -1) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed"); - rpcsvc_conn_destroy (conn); - conn = NULL; - goto out; - } - - rpcsvc_conn_state_init (conn); - -out: - return conn; -} - - -int -__rpcsvc_conn_unref (rpcsvc_conn_t *conn) -{ - --conn->connref; - return conn->connref; -} - - -void -__rpcsvc_conn_deinit (rpcsvc_conn_t *conn) -{ - if (!conn) - return; - - if (rpcsvc_conn_check_active (conn)) { - conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED; - } - - if (conn->trans) { - rpc_transport_disconnect (conn->trans); - conn->trans = NULL; - } -} - - -void -rpcsvc_conn_deinit (rpcsvc_conn_t *conn) -{ - int ref = 0; - - if (!conn) - return; - - pthread_mutex_lock (&conn->connlock); - { - __rpcsvc_conn_deinit (conn); - ref = __rpcsvc_conn_unref (conn); - } - pthread_mutex_unlock (&conn->connlock); - - if (ref == 0) - rpcsvc_conn_destroy (conn); - - return; -} - - -void -rpcsvc_conn_unref (rpcsvc_conn_t *conn) -{ - int ref = 0; - if (!conn) - return; - - pthread_mutex_lock (&conn->connlock); - { - ref = __rpcsvc_conn_unref (conn); - } - pthread_mutex_unlock (&conn->connlock); - - if (ref == 0) { - rpcsvc_conn_destroy (conn); - } -} - - int -rpcsvc_conn_active (rpcsvc_conn_t *conn) +rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, + rpc_transport_t *trans) { - int status = 0; - - if (!conn) - return 0; - - pthread_mutex_lock (&conn->connlock); - { - status = rpcsvc_conn_check_active (conn); - } - pthread_mutex_unlock (&conn->connlock); - - return status; -} - - -void -rpcsvc_conn_ref (rpcsvc_conn_t *conn) -{ - if (!conn) - return; - - pthread_mutex_lock (&conn->connlock); - { - ++conn->connref; - } - pthread_mutex_unlock (&conn->connlock); - - return; -} - - -int -rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn) -{ - struct sockaddr_in sa; + struct sockaddr_storage sa = {0, }; int ret = RPCSVC_AUTH_REJECT; socklen_t sasize = sizeof (sa); char *srchstr = NULL; @@ -769,11 +563,11 @@ rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn) uint16_t port = 0; gf_boolean_t insecure = _gf_false; - if ((!svc) || (!volname) || (!conn)) + if ((!svc) || (!volname) || (!trans)) return ret; - ret = rpcsvc_conn_peeraddr (conn, NULL, 0, (struct sockaddr *)&sa, - sasize); + ret = rpcsvc_transport_peeraddr (trans, NULL, 0, &sa, + sasize); if (ret != 0) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get peer addr: %s", gai_strerror (ret)); @@ -781,7 +575,12 @@ rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn) goto err; } - port = ntohs (sa.sin_port); + if (sa.ss_family == AF_INET) { + port = ((struct sockaddr_in *)&sa)->sin_port; + } else { + port = ((struct sockaddr_in6 *)&sa)->sin6_port; + } + gf_log (GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port); /* If the port is already a privileged one, dont bother with checking * options. @@ -851,7 +650,7 @@ err: * of the pointers below are NULL. */ rpcsvc_actor_t * -rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req) +rpcsvc_program_actor (rpcsvc_request_t *req) { rpcsvc_program_t *program = NULL; int err = SYSTEM_ERR; @@ -859,10 +658,10 @@ rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req) rpcsvc_t *svc = NULL; char found = 0; - if ((!conn) || (!req)) + if (!req) goto err; - svc = conn->svc; + svc = req->svc; pthread_mutex_lock (&svc->rpclock); { list_for_each_entry (program, &svc->programs, program) { @@ -938,9 +737,9 @@ rpcsvc_program_notify (rpcsvc_listener_t *listener, rpcsvc_event_t event, goto out; } - list_for_each_entry (wrapper, &listener->list, list) { + list_for_each_entry (wrapper, &listener->svc->notify, list) { if (wrapper->notify) { - wrapper->notify (listener->conn->svc, + wrapper->notify (listener->svc, wrapper->data, event, data); } @@ -951,37 +750,29 @@ out: } -int -rpcsvc_accept (rpcsvc_conn_t *listen_conn, rpc_transport_t *new_trans) +inline int +rpcsvc_accept (rpcsvc_t *svc, rpc_transport_t *listen_trans, + rpc_transport_t *new_trans) { rpcsvc_listener_t *listener = NULL; - rpcsvc_conn_t *conn = NULL; - char clstr[RPCSVC_PEER_STRLEN]; - - listener = listen_conn->listener; - conn = rpcsvc_conn_init (listen_conn->svc, new_trans); - if (!conn) { - rpc_transport_disconnect (new_trans); - memset (clstr, 0, RPCSVC_PEER_STRLEN); - rpc_transport_get_peername (new_trans, clstr, - RPCSVC_PEER_STRLEN); - gf_log (GF_RPCSVC, GF_LOG_DEBUG, "allocating connection for " - "new transport (%s) failed", clstr); + int32_t ret = -1; + + listener = rpcsvc_get_listener (svc, -1, listen_trans); + if (listener == NULL) { goto out; } - conn->listener = listener; - - //rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, conn); + rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, new_trans); + ret = 0; out: - return 0; + return ret; } void -rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req) +rpcsvc_request_destroy (rpcsvc_request_t *req) { - if (!conn || !req) { + if (!req) { goto out; } @@ -989,18 +780,21 @@ rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req) iobref_unref (req->iobref); } - mem_put (conn->rxpool, req); + mem_put (req->svc->rxpool, req); + + rpc_transport_unref (req->trans); out: return; } rpcsvc_request_t * -rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg, +rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans, + struct rpc_msg *callmsg, struct iovec progmsg, rpc_transport_pollin_t *msg, rpcsvc_request_t *req) { - if ((!conn) || (!callmsg)|| (!req) || (!msg)) + if ((!trans) || (!callmsg)|| (!req) || (!msg)) return NULL; /* We start a RPC request as always denied. */ @@ -1009,7 +803,7 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg, req->prognum = rpc_call_program (callmsg); req->progver = rpc_call_progver (callmsg); req->procnum = rpc_call_progproc (callmsg); - req->conn = conn; + req->trans = rpc_transport_ref (trans); req->count = msg->count; req->msg[0] = progmsg; req->iobref = iobref_ref (msg->iobref); @@ -1017,6 +811,7 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg, req->msg[1] = msg->vector[1]; } + req->svc = svc; req->trans_private = msg->private; INIT_LIST_HEAD (&req->txlist); @@ -1038,7 +833,8 @@ rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg, rpcsvc_request_t * -rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) +rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, + rpc_transport_pollin_t *msg) { char *msgbuf = NULL; struct rpc_msg rpcmsg; @@ -1047,7 +843,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) size_t msglen = 0; int ret = -1; - if (!conn) + if (!svc || !trans) return NULL; /* We need to allocate the request before actually calling @@ -1056,7 +852,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) * This avoids a need to keep a temp buffer into which the auth data * would've been copied otherwise. */ - rpcsvc_alloc_request (conn, req); + rpcsvc_alloc_request (svc, req); if (!req) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to alloc request"); goto err; @@ -1075,7 +871,7 @@ rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) } ret = -1; - rpcsvc_request_init (conn, &rpcmsg, progmsg, msg, req); + rpcsvc_request_init (svc, trans, &rpcmsg, progmsg, msg, req); gf_log (GF_RPCSVC, GF_LOG_TRACE, "RPC XID: %lx, Ver: %ld, Program: %ld," " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg), @@ -1120,30 +916,33 @@ err: int -rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) +rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, + rpc_transport_pollin_t *msg) { rpcsvc_actor_t *actor = NULL; rpcsvc_request_t *req = NULL; int ret = -1; - if (!conn) + if (!trans || !svc) return -1; - req = rpcsvc_request_create (conn, msg); + req = rpcsvc_request_create (svc, trans, msg); if (!req) goto err; if (!rpcsvc_request_accepted (req)) goto err_reply; - actor = rpcsvc_program_actor (conn, req); + actor = rpcsvc_program_actor (req); if (!actor) goto err_reply; if (actor && (req->rpc_err == SUCCESS)) { + /* Before going to xlator code, set the THIS properly */ + THIS = svc->mydata; + if (req->count == 2) { if (actor->vector_actor) { - rpcsvc_conn_ref (conn); ret = actor->vector_actor (req, &req->msg[1], 1, req->iobref); } else { @@ -1153,16 +952,14 @@ rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) ret = RPCSVC_ACTOR_ERROR; } } else if (actor->actor) { - rpcsvc_conn_ref (req->conn); - /* Before going to xlator code, set the THIS properly */ - THIS = conn->svc->mydata; ret = actor->actor (req); } } err_reply: - if ((ret == RPCSVC_ACTOR_ERROR) || (req->rpc_err != SUCCESS)) + if ((ret == RPCSVC_ACTOR_ERROR) || (req->rpc_err != SUCCESS)) { ret = rpcsvc_error_reply (req); + } if (ret) gf_log ("rpcsvc", GF_LOG_DEBUG, "failed to queue error reply"); @@ -1170,39 +967,87 @@ err_reply: /* No need to propagate error beyond this function since the reply * has now been queued. */ ret = 0; + err: return ret; } int +rpcsvc_handle_disconnect (rpcsvc_t *svc, rpc_transport_t *trans) +{ + rpcsvc_event_t event; + rpcsvc_notify_wrapper_t *wrappers = NULL, *wrapper; + int32_t ret = -1, i = 0, wrapper_count = 0; + rpcsvc_listener_t *listener = NULL; + + event = (trans->listener == NULL) ? RPCSVC_EVENT_LISTENER_DEAD + : RPCSVC_EVENT_DISCONNECT; + + pthread_mutex_lock (&svc->rpclock); + { + wrappers = GF_CALLOC (svc->notify_count, sizeof (*wrapper), + gf_common_mt_rpcsvc_wrapper_t); + if (!wrappers) { + goto unlock; + } + + list_for_each_entry (wrapper, &svc->notify, list) { + if (wrapper->notify) { + wrappers[i++] = *wrapper; + } + } + + wrapper_count = i; + } +unlock: + pthread_mutex_unlock (&svc->rpclock); + + if (wrappers) { + for (i = 0; i < wrapper_count; i++) { + wrappers[i].notify (svc, wrappers[i].data, + event, trans); + } + + GF_FREE (wrappers); + } + + if (event == RPCSVC_EVENT_LISTENER_DEAD) { + listener = rpcsvc_get_listener (svc, -1, trans->listener); + rpcsvc_listener_destroy (listener); + } + + return ret; +} + + +int rpcsvc_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...) { - rpcsvc_conn_t *conn = NULL; int ret = -1; rpc_transport_pollin_t *msg = NULL; rpc_transport_t *new_trans = NULL; + rpcsvc_t *svc = NULL; - conn = mydata; - if (conn == NULL) { + svc = mydata; + if (svc == NULL) { goto out; } switch (event) { case RPC_TRANSPORT_ACCEPT: new_trans = data; - ret = rpcsvc_accept (conn, new_trans); + ret = rpcsvc_accept (svc, trans, new_trans); break; case RPC_TRANSPORT_DISCONNECT: - rpcsvc_conn_deinit (conn); - ret = 0; + ret = rpcsvc_handle_disconnect (svc, trans); break; case RPC_TRANSPORT_MSG_RECEIVED: msg = data; - ret = rpcsvc_handle_rpc_call (conn, msg); + ret = rpcsvc_handle_rpc_call (svc, trans, msg); break; case RPC_TRANSPORT_MSG_SENT: @@ -1274,16 +1119,16 @@ err: } -int -rpcsvc_conn_submit (rpcsvc_conn_t *conn, struct iovec *hdrvec, - int hdrcount, struct iovec *proghdr, int proghdrcount, - struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *priv) +inline int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, + int hdrcount, struct iovec *proghdr, int proghdrcount, + struct iovec *progpayload, int progpayloadcount, + struct iobref *iobref, void *priv) { int ret = -1; rpc_transport_reply_t reply = {{0, }}; - if ((!conn) || (!hdrvec) || (!hdrvec->iov_base) || (!conn->trans)) { + if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) { goto out; } @@ -1296,15 +1141,7 @@ rpcsvc_conn_submit (rpcsvc_conn_t *conn, struct iovec *hdrvec, reply.msg.iobref = iobref; reply.private = priv; - /* Now that we have both the RPC and Program buffers in xdr format - * lets hand it to the transmission layer. - */ - if (!rpcsvc_conn_check_active (conn)) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection inactive"); - goto out; - } - - ret = rpc_transport_submit_reply (conn->trans, &reply); + ret = rpc_transport_submit_reply (trans, &reply); out: return ret; @@ -1319,6 +1156,7 @@ rpcsvc_fill_reply (rpcsvc_request_t *req, struct rpc_msg *reply) return -1; prog = rpcsvc_request_program (req); + rpc_fill_empty_reply (reply, req->xid); if (req->rpc_status == MSG_DENIED) @@ -1352,17 +1190,12 @@ rpcsvc_record_build_record (rpcsvc_request_t *req, size_t payload, char *record = NULL; struct iovec recordhdr = {0, }; size_t pagesize = 0; - rpcsvc_conn_t *conn = NULL; rpcsvc_t *svc = NULL; - if ((!req) || (!req->conn) || (!recbuf)) + if ((!req) || (!req->trans) || (!req->svc) || (!recbuf)) return NULL; - /* First, try to get a pointer into the buffer which the RPC - * layer can use. - */ - conn = req->conn; - svc = rpcsvc_conn_rpcsvc (conn); + svc = req->svc; replyiob = iobuf_get (svc->ctx->iobuf_pool); pagesize = iobpool_pagesize ((struct iobuf_pool *)svc->ctx->iobuf_pool); if (!replyiob) { @@ -1423,17 +1256,17 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, int hdrcount, struct iovec *payload, int payloadcount, struct iobref *iobref) { - int ret = -1, i = 0; - struct iobuf *replyiob = NULL; - struct iovec recordhdr = {0, }; - rpcsvc_conn_t *conn = NULL; - size_t msglen = 0; - char new_iobref = 0; - - if ((!req) || (!req->conn)) + int ret = -1, i = 0; + struct iobuf *replyiob = NULL; + struct iovec recordhdr = {0, }; + rpc_transport_t *trans = NULL; + size_t msglen = 0; + char new_iobref = 0; + + if ((!req) || (!req->trans)) return -1; - conn = req->conn; + trans = req->trans; for (i = 0; i < hdrcount; i++) { msglen += proghdr[i].iov_len; @@ -1465,11 +1298,9 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, iobref_add (iobref, replyiob); - ret = rpcsvc_conn_submit (conn, &recordhdr, 1, proghdr, hdrcount, - payload, payloadcount, iobref, - req->trans_private); - - rpcsvc_request_destroy (conn, req); + ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount, + payload, payloadcount, iobref, + req->trans_private); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to submit message"); @@ -1484,20 +1315,7 @@ disconnect_exit: iobref_unref (iobref); } - /* Note that a unref is called everytime a reply is sent. This is in - * response to the ref that is performed on the conn when a request is - * handed to the RPC program. - * - * The catch, however, is that if the reply is an rpc error, we must - * not unref. This is because the ref only contains - * references for the actors to which the request was handed plus one - * reference maintained by the RPC layer. By unrefing for a case where - * no actor was called, we will be losing the ref held for the RPC - * layer. - */ - if ((rpcsvc_request_accepted (req)) && - (rpcsvc_request_accepted_success (req))) - rpcsvc_conn_unref (conn); + rpcsvc_request_destroy (req); return ret; } @@ -1519,18 +1337,17 @@ rpcsvc_error_reply (rpcsvc_request_t *req) /* Register the program with the local portmapper service. */ -int -rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, rpcsvc_conn_t *conn) +inline int +rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port) { int ret = 0; - struct sockaddr_in sa = {0, }; - if (!newprog || !conn->trans) { + if (!newprog) { goto out; } if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP, - sa.sin_port))) { + port))) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with" " portmap"); goto out; @@ -1559,10 +1376,11 @@ rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog) rpcsvc_listener_t * -rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port) +rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans) { - rpcsvc_listener_t *listener = NULL; - char found = 0; + rpcsvc_listener_t *listener = NULL; + char found = 0; + uint32_t listener_port = 0; if (!svc) { goto out; @@ -1571,13 +1389,42 @@ rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port) pthread_mutex_lock (&svc->rpclock); { list_for_each_entry (listener, &svc->listeners, list) { - if (((struct sockaddr_in *)&listener->sa)->sin_port - == port) { + if (trans != NULL) { + if (listener->trans == trans) { + found = 1; + break; + } + + continue; + } + + switch (listener->trans->myinfo.sockaddr.ss_family) { + case AF_INET: + listener_port + = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port; + break; + + case AF_INET6: + listener_port + = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port; + break; + + default: + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "invalid address family (%d)", + listener->trans->myinfo.sockaddr.ss_family); + goto unlock; + } + + listener_port = ntohs (listener_port); + + if (listener_port == port) { found = 1; break; } } } +unlock: pthread_mutex_unlock (&svc->rpclock); if (!found) { @@ -1600,7 +1447,7 @@ rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr, int hdrcount, struct iovec *payload, int payloadcount, struct iobref *iobref) { - if ((!req) || (!req->conn) || (!proghdr) || (!proghdr->iov_base)) + if ((!req) || (!req->trans) || (!proghdr) || (!proghdr->iov_base)) return -1; return rpcsvc_submit_generic (req, proghdr, hdrcount, payload, @@ -1639,34 +1486,35 @@ err: } -int -rpcsvc_conn_peername (rpcsvc_conn_t *conn, char *hostname, int hostlen) +inline int +rpcsvc_transport_peername (rpc_transport_t *trans, char *hostname, int hostlen) { - if (!conn || !conn->trans) + if (!trans) { return -1; + } - return rpc_transport_get_peername (conn->trans, hostname, hostlen); + return rpc_transport_get_peername (trans, hostname, hostlen); } -int -rpcsvc_conn_peeraddr (rpcsvc_conn_t *conn, char *addrstr, int addrlen, - struct sockaddr *sa, socklen_t sasize) +inline int +rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen, + struct sockaddr_storage *sa, socklen_t sasize) { - if (!conn || !conn->trans) + if (!trans) { return -1; + } - return rpc_transport_get_peeraddr(conn->trans, addrstr, addrlen, sa, + return rpc_transport_get_peeraddr(trans, addrstr, addrlen, sa, sasize); } -rpcsvc_conn_t * -rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name) +rpc_transport_t * +rpcsvc_transport_create (rpcsvc_t *svc, dict_t *options, char *name) { - int ret = -1; + int ret = -1; rpc_transport_t *trans = NULL; - rpcsvc_conn_t *conn = NULL; trans = rpc_transport_load (svc->ctx, options, name); if (!trans) { @@ -1675,18 +1523,16 @@ rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name) goto out; } - ret = rpc_transport_listen (trans); + ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc); if (ret == -1) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, - "listening on transport failed"); + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed"); goto out; } - conn = rpcsvc_conn_init (svc, trans); - if (!conn) { - ret = -1; + ret = rpc_transport_listen (trans); + if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_DEBUG, - "initializing connection for transport failed"); + "listening on transport failed"); goto out; } @@ -1694,13 +1540,14 @@ rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name) out: if ((ret == -1) && (trans)) { rpc_transport_disconnect (trans); + trans = NULL; } - return conn; + return trans; } rpcsvc_listener_t * -rpcsvc_listener_alloc (rpcsvc_t *svc, rpcsvc_conn_t *conn) +rpcsvc_listener_alloc (rpcsvc_t *svc, rpc_transport_t *trans) { rpcsvc_listener_t *listener = NULL; @@ -1711,7 +1558,8 @@ rpcsvc_listener_alloc (rpcsvc_t *svc, rpcsvc_conn_t *conn) goto out; } - listener->conn = conn; + listener->trans = trans; + listener->svc = svc; INIT_LIST_HEAD (&listener->list); @@ -1728,27 +1576,26 @@ out: rpcsvc_listener_t * rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name) { - rpcsvc_conn_t *conn = NULL; + rpc_transport_t *trans = NULL; rpcsvc_listener_t *listener = NULL; if (!svc || !options) { goto out; } - conn = rpcsvc_conn_create (svc, options, name); - if (!conn) { + trans = rpcsvc_transport_create (svc, options, name); + if (!trans) { goto out; } - listener = rpcsvc_listener_alloc (svc, conn); + listener = rpcsvc_listener_alloc (svc, trans); if (listener == NULL) { goto out; } - conn->listener = listener; out: - if (!listener && conn) { - rpcsvc_conn_deinit (conn); + if (!listener && trans) { + rpc_transport_disconnect (trans); } return listener; @@ -1809,18 +1656,20 @@ out: } - int rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program) { - rpcsvc_program_t *newprog = NULL; - int ret = -1; - rpcsvc_listener_t *listener = NULL; + rpcsvc_program_t *newprog = NULL; + int ret = -1; + rpcsvc_listener_t *listener = NULL; + data_t *listen_port_data = NULL; + uint16_t listen_port = 0; if (!svc) return -1; - newprog = GF_CALLOC (1, sizeof(*newprog), gf_common_mt_rpcsvc_program_t); + newprog = GF_CALLOC (1, sizeof(*newprog), + gf_common_mt_rpcsvc_program_t); if (!newprog) return -1; @@ -1829,9 +1678,76 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program) memcpy (newprog, &program, sizeof (program)); - listener = svc->listener; + listen_port = RPCSVC_DEFAULT_LISTEN_PORT; + if (program.progport != 0) { + listen_port = program.progport; + } else if (program.options != NULL) { + /* + * FIXME: use a method which does not hard-code each transport's + * option keys. + */ + listen_port_data = dict_get (program.options, "listen-port"); + if (listen_port_data != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } else if ((listen_port_data + = dict_get (program.options, + "transport.socket.listen-port")) + != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } else if ((listen_port_data + = dict_get (program.options, + "transport.rdma.listen-port")) + != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } + } + + listener = rpcsvc_get_listener (svc, listen_port, NULL); + if ((listener == NULL) || (listener->trans == NULL)) { + if ((listener != NULL) && (listener->trans == NULL)) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "empty listener without transport found, " + "destroying it"); + rpcsvc_listener_destroy (listener); + } + + if (program.progport != 0) { + ret = dict_set (program.options, + "transport.socket.listen-port", + data_from_uint16 (listen_port)); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "setting listening port (%d) specified " + "by program (%s) in options dictionary " + "failed", listen_port, + program.progname); + goto free_prog; + } - ret = rpcsvc_program_register_portmap (newprog, listener->conn); + ret = dict_set (program.options, + "transport.rdma.listen-port", + data_from_uint16 (listen_port)); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "setting listening port (%d) specified " + "by program (%s) in options dictionary " + "failed", listen_port, + program.progname); + goto free_prog; + } + } + + listener = rpcsvc_create_listener (svc, program.options, + program.progname); + if (listener == NULL) { + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "creation of listener for program (%s) failed", + program.progname); + goto free_prog; + } + } + + ret = rpcsvc_program_register_portmap (newprog, program.progport); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap registration of" " program failed"); @@ -1860,6 +1776,7 @@ free_prog: return ret; } + static void free_prog_details (gf_dump_rsp *rsp) { @@ -1882,10 +1799,10 @@ build_prog_details (rpcsvc_request_t *req, gf_dump_rsp *rsp) gf_prog_detail *prog = NULL; gf_prog_detail *prev = NULL; - if (!req || !req->conn || !req->conn->svc) + if (!req || !req->trans || !req->svc) goto out; - list_for_each_entry (program, &req->conn->svc->programs, program) { + list_for_each_entry (program, &req->svc->programs, program) { prog = GF_CALLOC (1, sizeof (*prog), 0); if (!prog) goto out; @@ -1958,9 +1875,11 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options) rpcsvc_t * rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) { - rpcsvc_t *svc = NULL; - int ret = -1; - rpcsvc_listener_t *listener = NULL; + rpcsvc_t *svc = NULL; + int ret = -1, poolcount = 0; + rpcsvc_listener_t *listener = NULL; + uint32_t listen_port = 0; + data_t *listen_port_data = NULL; if ((!ctx) || (!options)) return NULL; @@ -1981,6 +1900,16 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) goto free_svc; } + poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor; + + gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount); + svc->rxpool = mem_pool_new (rpcsvc_request_t, poolcount); + /* TODO: leak */ + if (!svc->rxpool) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed"); + goto free_svc; + } + ret = rpcsvc_auth_init (svc, options); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init " @@ -1993,8 +1922,27 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) svc->ctx = ctx; gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited."); + listen_port = RPCSVC_DEFAULT_LISTEN_PORT; + + /* + * FIXME: use a method which does not hard-code each transport's + * option keys. + */ + listen_port_data = dict_get (options, "listen-port"); + if (listen_port_data != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } else if ((listen_port_data + = dict_get (options, "transport.socket.listen-port")) + != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } else if ((listen_port_data + = dict_get (options, "transport.rdma.listen-port")) + != NULL) { + listen_port = data_to_uint16 (listen_port_data); + } + /* One listen port per RPC */ - listener = rpcsvc_get_listener (svc, 0); + listener = rpcsvc_get_listener (svc, 0, NULL); if (!listener) { /* FIXME: listener is given the name of first program that * creates it. This is not always correct. For eg., multiple @@ -2009,13 +1957,13 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) } } - if (!listener->conn) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no connection " + if (!listener->trans) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no transport " "found"); goto free_svc; } - svc->listener = listener; + gluster_dump_prog.options = options; ret = rpcsvc_program_register (svc, gluster_dump_prog); if (ret) { |