diff options
author | Anand Avati <avati@redhat.com> | 2013-10-20 08:45:18 -0700 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2013-10-28 00:33:19 -0700 |
commit | a4056292528db49a666422c7f8e0c032441cc83f (patch) | |
tree | 3d433a1ed29f1a07fdfc1425d2a52018e67328ad | |
parent | 0162933589d025ca1812e159368d107cfc355e8e (diff) |
rpcsvc: implement per-client RPC throttling
Implement a limit on the total number of outstanding RPC requests
from a given cient. Once the limit is reached the client socket
is removed from POLL-IN event polling.
Change-Id: I8071b8c89b78d02e830e6af5a540308199d6bdcd
BUG: 1008301
Signed-off-by: Anand Avati <avati@redhat.com>
Reviewed-on: http://review.gluster.org/6114
Reviewed-by: Santosh Pradhan <spradhan@redhat.com>
Reviewed-by: Rajesh Joseph <rjoseph@redhat.com>
Reviewed-by: Harshavardhana <harsha@harshavardhana.net>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 13 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 6 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc-common.h | 3 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 62 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 3 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 20 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volume-set.c | 12 | ||||
-rw-r--r-- | xlators/nfs/server/src/nfs.c | 9 |
8 files changed, 128 insertions, 0 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index 89f3b3e8a0a..5f2e91c7099 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -69,6 +69,19 @@ out: return ret; } +int +rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff) +{ + int ret = 0; + + if (!this->ops->throttle) + return -ENOSYS; + + ret = this->ops->throttle (this, onoff); + + return ret; +} + int32_t rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, size_t salen) diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index aa9df72b2dd..2db9072ae49 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -192,6 +192,8 @@ struct rpc_transport { pthread_mutex_t lock; int32_t refcount; + int32_t outstanding_rpc_count; + glusterfs_ctx_t *ctx; dict_t *options; char *name; @@ -235,6 +237,7 @@ struct rpc_transport_ops { int32_t (*get_myaddr) (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, socklen_t sasize); + int32_t (*throttle) (rpc_transport_t *this, gf_boolean_t onoff); }; @@ -288,6 +291,9 @@ int32_t rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, size_t salen); +int +rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff); + rpc_transport_pollin_t * rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, int count, struct iobuf *hdr_iobuf, diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h index 054e187c96d..53c1a8fe3b2 100644 --- a/rpc/rpc-lib/src/rpcsvc-common.h +++ b/rpc/rpc-lib/src/rpcsvc-common.h @@ -71,6 +71,9 @@ typedef struct rpcsvc_state { rpcsvc_notify_t notifyfn; struct mem_pool *rxpool; rpcsvc_drc_globals_t *drc; + + /* per-client limit of outstanding rpc requests */ + int outstanding_rpc_limit; } rpcsvc_t; /* DRC START */ diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 254a05d664d..8fe2e52bcdc 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -129,6 +129,37 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, return NULL; } +int +rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta) +{ + int ret = 0; + int old_count = 0; + int new_count = 0; + int limit = 0; + + pthread_mutex_lock (&trans->lock); + { + limit = svc->outstanding_rpc_limit; + if (!limit) + goto unlock; + + old_count = trans->outstanding_rpc_count; + trans->outstanding_rpc_count += delta; + new_count = trans->outstanding_rpc_count; + + if (old_count <= limit && new_count > limit) + ret = rpc_transport_throttle (trans, _gf_true); + + if (old_count > limit && new_count <= limit) + ret = rpc_transport_throttle (trans, _gf_false); + } +unlock: + pthread_mutex_unlock (&trans->lock); + + return ret; +} + + /* This needs to change to returning errors, since * we need to return RPC specific error messages when some * of the pointers below are NULL. @@ -279,6 +310,13 @@ rpcsvc_request_destroy (rpcsvc_request_t *req) if (req->hdr_iobuf) iobuf_unref (req->hdr_iobuf); + /* This marks the "end" of an RPC request. Reply is + completely written to the socket and is on the way + to the client. It is time to decrement the + outstanding request counter by 1. + */ + rpcsvc_request_outstanding (req->svc, req->trans, -1); + rpc_transport_unref (req->trans); GF_FREE (req->auxgidlarge); @@ -365,6 +403,12 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, goto err; } + /* We just received a new request from the wire. Account for + it in the outsanding request counter to make sure we don't + ingest too many concurrent requests from the same client. + */ + ret = rpcsvc_request_outstanding (svc, trans, +1); + msgbuf = msg->vector[0].iov_base; msglen = msg->vector[0].iov_len; @@ -1845,6 +1889,24 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options) gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration " "disabled"); + svc->outstanding_rpc_limit = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT; + + if (dict_get (options, "rpc.outstanding-rpc-limit")) { + ret = dict_get_str (options, "rpc.oustanding-rpc-limit", + &optstr); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "Value went missing"); + goto out; + } + + ret = gf_string2int32 (optstr, &svc->outstanding_rpc_limit); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "Invalid RPC limit %s", + optstr); + goto out; + } + } + ret = 0; out: return ret; diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index a08ee4b57d4..ac2f09beac5 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -38,6 +38,9 @@ #define MAX_IOVEC 16 #endif +#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT 64 +#define RPCSVC_MAX_OUTSTANDING_RPC_LIMIT 65536 + #define GF_RPCSVC "rpc-service" #define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB)) diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 8883ccbb4d0..93da3f29690 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -3261,6 +3261,25 @@ out: } +static int +socket_throttle (rpc_transport_t *this, gf_boolean_t onoff) +{ + socket_private_t *priv = NULL; + + priv = this->private; + + /* The way we implement throttling is by taking off + POLLIN event from the polled flags. This way we + never get called with the POLLIN event and therefore + will never read() any more data until throttling + is turned off. + */ + priv->idx = event_select_on (this->ctx->event_pool, priv->sock, + priv->idx, (int) !onoff, -1); + return 0; +} + + struct rpc_transport_ops tops = { .listen = socket_listen, .connect = socket_connect, @@ -3271,6 +3290,7 @@ struct rpc_transport_ops tops = { .get_peeraddr = socket_getpeeraddr, .get_myname = socket_getmyname, .get_myaddr = socket_getmyaddr, + .throttle = socket_throttle, }; int diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index 76d2333d7db..adf0f2922ee 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -808,6 +808,12 @@ struct volopt_map_entry glusterd_volopt_map[] = { .option = "statedump-path", .op_version = 1 }, + { .key = "server.outstanding-rpc-limit", + .voltype = "protocol/server", + .option = "rpc.outstanding-rpc-limit", + .type = GLOBAL_DOC, + .op_version = 3 + }, { .key = "features.lock-heal", .voltype = "protocol/server", .option = "lk-heal", @@ -1142,6 +1148,12 @@ struct volopt_map_entry glusterd_volopt_map[] = { .type = GLOBAL_DOC, .op_version = 1 }, + { .key = "nfs.outstanding-rpc-limit", + .voltype = "nfs/server", + .option = "rpc.outstanding-rpc-limit", + .type = GLOBAL_DOC, + .op_version = 3 + }, { .key = "nfs.port", .voltype = "nfs/server", .option = "nfs.port", diff --git a/xlators/nfs/server/src/nfs.c b/xlators/nfs/server/src/nfs.c index 8158f954f22..33bd36e7461 100644 --- a/xlators/nfs/server/src/nfs.c +++ b/xlators/nfs/server/src/nfs.c @@ -1359,6 +1359,15 @@ struct volume_options options[] = { "portmap service. Use this option to turn off portmap " "registration for Gluster NFS. On by default" }, + { .key = {"rpc.outstanding-rpc-limit"}, + .type = GF_OPTION_TYPE_INT, + .min = 0, + .max = RPCSVC_MAX_OUTSTANDING_RPC_LIMIT, + .default_value = TOSTRING(RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT), + .description = "Parameter to throttle the number of incoming RPC " + "requests from a client. 0 means no limit (can " + "potentially run out of memory)" + }, { .key = {"nfs.port"}, .type = GF_OPTION_TYPE_INT, .min = 1, |