diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpcsvc.c')
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 346 |
1 files changed, 307 insertions, 39 deletions
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index af01c1715..037c157f2 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -28,6 +28,7 @@ #include "xdr-generic.h" #include "rpc-common-xdr.h" #include "syncop.h" +#include "rpc-drc.h" #include <errno.h> #include <pthread.h> @@ -41,8 +42,7 @@ #include <stdio.h> #include "xdr-rpcclnt.h" - -#define ACL_PROGRAM 100227 +#include "glusterfs-acl.h" struct rpcsvc_program gluster_dump_prog; @@ -129,6 +129,37 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, return NULL; } +int +rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta) +{ + int ret = 0; + int old_count = 0; + int new_count = 0; + int limit = 0; + + pthread_mutex_lock (&trans->lock); + { + limit = svc->outstanding_rpc_limit; + if (!limit) + goto unlock; + + old_count = trans->outstanding_rpc_count; + trans->outstanding_rpc_count += delta; + new_count = trans->outstanding_rpc_count; + + if (old_count <= limit && new_count > limit) + ret = rpc_transport_throttle (trans, _gf_true); + + if (old_count > limit && new_count <= limit) + ret = rpc_transport_throttle (trans, _gf_false); + } +unlock: + pthread_mutex_unlock (&trans->lock); + + return ret; +} + + /* This needs to change to returning errors, since * we need to return RPC specific error messages when some * of the pointers below are NULL. @@ -279,8 +310,17 @@ rpcsvc_request_destroy (rpcsvc_request_t *req) if (req->hdr_iobuf) iobuf_unref (req->hdr_iobuf); + /* This marks the "end" of an RPC request. Reply is + completely written to the socket and is on the way + to the client. It is time to decrement the + outstanding request counter by 1. + */ + rpcsvc_request_outstanding (req->svc, req->trans, -1); + rpc_transport_unref (req->trans); + GF_FREE (req->auxgidlarge); + mem_put (req); out: @@ -363,6 +403,12 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, goto err; } + /* We just received a new request from the wire. Account for + it in the outsanding request counter to make sure we don't + ingest too many concurrent requests from the same client. + */ + ret = rpcsvc_request_outstanding (svc, trans, +1); + msgbuf = msg->vector[0].iov_base; msglen = msg->vector[0].iov_len; @@ -422,6 +468,7 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, * since we are not handling authentication failures for now. */ req->rpc_status = MSG_ACCEPTED; + req->reply = NULL; ret = 0; err: if (ret == -1) { @@ -461,13 +508,15 @@ int rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, rpc_transport_pollin_t *msg) { - rpcsvc_actor_t *actor = NULL; - rpcsvc_actor actor_fn = NULL; - rpcsvc_request_t *req = NULL; - int ret = -1; - uint16_t port = 0; - gf_boolean_t is_unix = _gf_false; - gf_boolean_t unprivileged = _gf_false; + rpcsvc_actor_t *actor = NULL; + rpcsvc_actor actor_fn = NULL; + rpcsvc_request_t *req = NULL; + int ret = -1; + uint16_t port = 0; + gf_boolean_t is_unix = _gf_false; + gf_boolean_t unprivileged = _gf_false; + drc_cached_op_t *reply = NULL; + rpcsvc_drc_globals_t *drc = NULL; if (!trans || !svc) return -1; @@ -503,7 +552,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, req = rpcsvc_request_create (svc, trans, msg); if (!req) - goto err; + goto out; if (!rpcsvc_request_accepted (req)) goto err_reply; @@ -521,6 +570,39 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, return -1; } + /* DRC */ + if (rpcsvc_need_drc (req)) { + drc = req->svc->drc; + + LOCK (&drc->lock); + reply = rpcsvc_drc_lookup (req); + + /* retransmission of completed request, send cached reply */ + if (reply && reply->state == DRC_OP_CACHED) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:" + " XID: 0x%x", req->xid); + ret = rpcsvc_send_cached_reply (req, reply); + drc->cache_hits++; + UNLOCK (&drc->lock); + goto out; + + } /* retransmitted request, original op in transit, drop it */ + else if (reply && reply->state == DRC_OP_IN_TRANSIT) { + gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit," + " discarding. XID: 0x%x", req->xid); + ret = 0; + drc->intransit_hits++; + rpcsvc_request_destroy (req); + UNLOCK (&drc->lock); + goto out; + + } /* fresh request, cache it as in-transit and proceed */ + else { + ret = rpcsvc_cache_request (req); + } + UNLOCK (&drc->lock); + } + if (req->rpc_err == SUCCESS) { /* Before going to xlator code, set the THIS properly */ THIS = svc->mydata; @@ -547,7 +629,6 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, req); } else { ret = actor_fn (req); - req->hdr_iobuf = NULL; } } @@ -558,7 +639,7 @@ err_reply: * has now been queued. */ ret = 0; -err: +out: return ret; } @@ -905,21 +986,22 @@ out: return ret; } -static inline int -rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, - int hdrcount, struct iovec *proghdr, int proghdrcount, - struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *priv) +int +rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, + int rpchdrcount, struct iovec *proghdr, + int proghdrcount, struct iovec *progpayload, + int progpayloadcount, struct iobref *iobref, + void *priv) { int ret = -1; rpc_transport_reply_t reply = {{0, }}; - if ((!trans) || (!hdrvec) || (!hdrvec->iov_base)) { + if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) { goto out; } - reply.msg.rpchdr = hdrvec; - reply.msg.rpchdrcount = hdrcount; + reply.msg.rpchdr = rpchdr; + reply.msg.rpchdrcount = rpchdrcount; reply.msg.proghdr = proghdr; reply.msg.proghdrcount = proghdrcount; reply.msg.progpayload = progpayload; @@ -1065,6 +1147,7 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, size_t msglen = 0; size_t hdrlen = 0; char new_iobref = 0; + rpcsvc_drc_globals_t *drc = NULL; if ((!req) || (!req->trans)) return -1; @@ -1099,20 +1182,31 @@ rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, iobref_add (iobref, replyiob); + /* cache the request in the duplicate request cache for appropriate ops */ + if (req->reply) { + drc = req->svc->drc; + + LOCK (&drc->lock); + ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1, + proghdr, hdrcount, + payload, payloadcount); + UNLOCK (&drc->lock); + } + ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount, payload, payloadcount, iobref, req->trans_private); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "failed to submit message " - "(XID: 0x%ux, Program: %s, ProgVers: %d, Proc: %d) to " + "(XID: 0x%x, Program: %s, ProgVers: %d, Proc: %d) to " "rpc-transport (%s)", req->xid, req->prog ? req->prog->progname : "(not matched)", req->prog ? req->prog->progver : 0, req->procnum, trans->name); } else { gf_log (GF_RPCSVC, GF_LOG_TRACE, - "submitted reply for rpc-message (XID: 0x%ux, " + "submitted reply for rpc-message (XID: 0x%x, " "Program: %s, ProgVers: %d, Proc: %d) to rpc-transport " "(%s)", req->xid, req->prog ? req->prog->progname: "-", req->prog ? req->prog->progver : 0, @@ -1155,12 +1249,13 @@ rpcsvc_error_reply (rpcsvc_request_t *req) inline int rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port) { - int ret = 0; + int ret = -1; /* FAIL */ if (!newprog) { goto out; } + /* pmap_set() returns 0 for FAIL and 1 for SUCCESS */ if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP, port))) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with" @@ -1168,16 +1263,16 @@ rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port) goto out; } - ret = 0; + ret = 0; /* SUCCESS */ out: return ret; } -static inline int +inline int rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog) { - int ret = 0; + int ret = -1; if (!prog) goto out; @@ -1795,12 +1890,92 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options) gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration " "disabled"); - ret = 0; + ret = rpcsvc_set_outstanding_rpc_limit (svc, options); out: return ret; } int +rpcsvc_reconfigure_options (rpcsvc_t *svc, dict_t *options) +{ + xlator_t *xlator = NULL; + xlator_list_t *volentry = NULL; + char *srchkey = NULL; + char *keyval = NULL; + int ret = -1; + + if ((!svc) || (!svc->options) || (!options)) + return (-1); + + /* Fetch the xlator from svc */ + xlator = (xlator_t *) svc->mydata; + if (!xlator) + return (-1); + + /* Reconfigure the volume specific rpc-auth.addr allow part */ + volentry = xlator->children; + while (volentry) { + ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.allow", + volentry->xlator->name); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + return (-1); + } + + /* If found the srchkey, delete old key/val pair + * and set the key with new value. + */ + if (!dict_get_str (options, srchkey, &keyval)) { + dict_del (svc->options, srchkey); + ret = dict_set_str (svc->options, srchkey, keyval); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "dict_set_str error"); + GF_FREE (srchkey); + return (-1); + } + } + + GF_FREE (srchkey); + volentry = volentry->next; + } + + /* Reconfigure the volume specific rpc-auth.addr reject part */ + volentry = xlator->children; + while (volentry) { + ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.reject", + volentry->xlator->name); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + return (-1); + } + + /* If found the srchkey, delete old key/val pair + * and set the key with new value. + */ + if (!dict_get_str (options, srchkey, &keyval)) { + dict_del (svc->options, srchkey); + ret = dict_set_str (svc->options, srchkey, keyval); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, + "dict_set_str error"); + GF_FREE (srchkey); + return (-1); + } + } + + GF_FREE (srchkey); + volentry = volentry->next; + } + + ret = rpcsvc_init_options (svc, options); + if (ret) + return (-1); + + return rpcsvc_auth_reconf (svc, options); +} + +int rpcsvc_transport_unix_options_build (dict_t **options, char *filepath) { dict_t *dict = NULL; @@ -1846,6 +2021,48 @@ out: return ret; } +/* + * Reconfigure() the rpc.outstanding-rpc-limit param. + */ +int +rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options) +{ + int ret = -1; /* FAILURE */ + int rpclim = 0; + static char *rpclimkey = "rpc.outstanding-rpc-limit"; + + if ((!svc) || (!options)) + return (-1); + + /* Reconfigure() the rpc.outstanding-rpc-limit param */ + ret = dict_get_int32 (options, rpclimkey, &rpclim); + if (ret < 0) { + /* Fall back to default for FAILURE */ + rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT; + } else { + /* SUCCESS: round off to multiple of 8. + * If the input value fails Boundary check, fall back to + * default i.e. RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT. + * NB: value 0 is special, means its unset i.e. unlimited. + */ + rpclim = ((rpclim + 8 - 1) >> 3) * 8; + if (rpclim < RPCSVC_MIN_OUTSTANDING_RPC_LIMIT) { + rpclim = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT; + } else if (rpclim > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT) { + rpclim = RPCSVC_MAX_OUTSTANDING_RPC_LIMIT; + } + } + + if (svc->outstanding_rpc_limit != rpclim) { + svc->outstanding_rpc_limit = rpclim; + gf_log (GF_RPCSVC, GF_LOG_INFO, + "Configured %s with value %d", + rpclimkey, rpclim); + } + + return (0); +} + /* The global RPC service initializer. */ rpcsvc_t * @@ -1906,6 +2123,7 @@ rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options, "failed to register DUMP program"); goto free_svc; } + ret = 0; free_svc: if (ret == -1) { @@ -1976,7 +2194,7 @@ err: } -int +static int rpcsvc_transport_peer_check_allow (dict_t *options, char *volname, char *ip, char *hostname) { @@ -2005,7 +2223,7 @@ out: return ret; } -int +static int rpcsvc_transport_peer_check_reject (dict_t *options, char *volname, char *ip, char *hostname) { @@ -2057,7 +2275,7 @@ rpcsvc_combine_allow_reject_volume_check (int allow, int reject) } int -rpcsvc_auth_check (dict_t *options, char *volname, +rpcsvc_auth_check (rpcsvc_t *svc, char *volname, rpc_transport_t *trans) { int ret = RPCSVC_AUTH_REJECT; @@ -2066,8 +2284,17 @@ rpcsvc_auth_check (dict_t *options, char *volname, char *hostname = NULL; char *ip = NULL; char client_ip[RPCSVC_PEER_STRLEN] = {0}; + char *allow_str = NULL; + char *reject_str = NULL; + char *srchstr = NULL; + dict_t *options = NULL; - if (!options || !volname || !trans) + if (!svc || !volname || !trans) + return ret; + + /* Fetch the options from svc struct and validate */ + options = svc->options; + if (!options) return ret; ret = rpcsvc_transport_peername (trans, client_ip, RPCSVC_PEER_STRLEN); @@ -2077,12 +2304,51 @@ rpcsvc_auth_check (dict_t *options, char *volname, return RPCSVC_AUTH_REJECT; } - get_host_name (client_ip, &ip); + /* Accept if its the default case: Allow all, Reject none + * The default volfile always contains a 'allow *' rule + * for each volume. If allow rule is missing (which implies + * there is some bad volfile generating code doing this), we + * assume no one is allowed mounts, and thus, we reject mounts. + */ + ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + return RPCSVC_AUTH_REJECT; + } - /* addr-namelookup disabled by default */ - ret = dict_get_str_boolean (options, "rpc-auth.addr.namelookup", 0); - if (ret == _gf_true) - gf_get_hostname_from_ip (ip, &hostname); + ret = dict_get_str (options, srchstr, &allow_str); + GF_FREE (srchstr); + if (ret < 0) + return RPCSVC_AUTH_REJECT; + + ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname); + if (ret == -1) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); + return RPCSVC_AUTH_REJECT; + } + + ret = dict_get_str (options, srchstr, &reject_str); + GF_FREE (srchstr); + if (reject_str == NULL && !strcmp ("*", allow_str)) + return RPCSVC_AUTH_ACCEPT; + + /* Non-default rule, authenticate */ + if (!get_host_name (client_ip, &ip)) + ip = client_ip; + + /* addr-namelookup check */ + if (svc->addr_namelookup == _gf_true) { + ret = gf_get_hostname_from_ip (ip, &hostname); + if (ret) { + if (hostname) + GF_FREE (hostname); + /* failed to get hostname, but hostname auth + * is enabled, so authentication will not be + * 100% correct. reject mounts + */ + return RPCSVC_AUTH_REJECT; + } + } accept = rpcsvc_transport_peer_check_allow (options, volname, ip, hostname); @@ -2090,6 +2356,8 @@ rpcsvc_auth_check (dict_t *options, char *volname, reject = rpcsvc_transport_peer_check_reject (options, volname, ip, hostname); + if (hostname) + GF_FREE (hostname); return rpcsvc_combine_allow_reject_volume_check (accept, reject); } @@ -2197,9 +2465,9 @@ out: rpcsvc_actor_t gluster_dump_actors[] = { - [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0}, - [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0}, - [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0}, + [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA}, + [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA}, + [GF_DUMP_MAXVALUE] = {"MAXVALUE", GF_DUMP_MAXVALUE, NULL, NULL, 0, DRC_NA}, }; |
