diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpcsvc.c')
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 62 |
1 files changed, 62 insertions, 0 deletions
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 254a05d664d..8fe2e52bcdc 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -129,6 +129,37 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, return NULL; } +int +rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta) +{ + int ret = 0; + int old_count = 0; + int new_count = 0; + int limit = 0; + + pthread_mutex_lock (&trans->lock); + { + limit = svc->outstanding_rpc_limit; + if (!limit) + goto unlock; + + old_count = trans->outstanding_rpc_count; + trans->outstanding_rpc_count += delta; + new_count = trans->outstanding_rpc_count; + + if (old_count <= limit && new_count > limit) + ret = rpc_transport_throttle (trans, _gf_true); + + if (old_count > limit && new_count <= limit) + ret = rpc_transport_throttle (trans, _gf_false); + } +unlock: + pthread_mutex_unlock (&trans->lock); + + return ret; +} + + /* This needs to change to returning errors, since * we need to return RPC specific error messages when some * of the pointers below are NULL. @@ -279,6 +310,13 @@ rpcsvc_request_destroy (rpcsvc_request_t *req) if (req->hdr_iobuf) iobuf_unref (req->hdr_iobuf); + /* This marks the "end" of an RPC request. Reply is + completely written to the socket and is on the way + to the client. It is time to decrement the + outstanding request counter by 1. + */ + rpcsvc_request_outstanding (req->svc, req->trans, -1); + rpc_transport_unref (req->trans); GF_FREE (req->auxgidlarge); @@ -365,6 +403,12 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, goto err; } + /* We just received a new request from the wire. Account for + it in the outsanding request counter to make sure we don't + ingest too many concurrent requests from the same client. + */ + ret = rpcsvc_request_outstanding (svc, trans, +1); + msgbuf = msg->vector[0].iov_base; msglen = msg->vector[0].iov_len; @@ -1845,6 +1889,24 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options) gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration " "disabled"); + svc->outstanding_rpc_limit = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT; + + if (dict_get (options, "rpc.outstanding-rpc-limit")) { + ret = dict_get_str (options, "rpc.oustanding-rpc-limit", + &optstr); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "Value went missing"); + goto out; + } + + ret = gf_string2int32 (optstr, &svc->outstanding_rpc_limit); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "Invalid RPC limit %s", + optstr); + goto out; + } + } + ret = 0; out: return ret; |