diff options
Diffstat (limited to 'rpc/rpc-lib/src')
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 13 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 6 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc-common.h | 3 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 62 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 3 |
5 files changed, 87 insertions, 0 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index 89f3b3e8a0a..5f2e91c7099 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -69,6 +69,19 @@ out: return ret; } +int +rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff) +{ + int ret = 0; + + if (!this->ops->throttle) + return -ENOSYS; + + ret = this->ops->throttle (this, onoff); + + return ret; +} + int32_t rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, size_t salen) diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index aa9df72b2dd..2db9072ae49 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -192,6 +192,8 @@ struct rpc_transport { pthread_mutex_t lock; int32_t refcount; + int32_t outstanding_rpc_count; + glusterfs_ctx_t *ctx; dict_t *options; char *name; @@ -235,6 +237,7 @@ struct rpc_transport_ops { int32_t (*get_myaddr) (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, socklen_t sasize); + int32_t (*throttle) (rpc_transport_t *this, gf_boolean_t onoff); }; @@ -288,6 +291,9 @@ int32_t rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen, struct sockaddr_storage *sa, size_t salen); +int +rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff); + rpc_transport_pollin_t * rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, int count, struct iobuf *hdr_iobuf, diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h index 054e187c96d..53c1a8fe3b2 100644 --- a/rpc/rpc-lib/src/rpcsvc-common.h +++ b/rpc/rpc-lib/src/rpcsvc-common.h @@ -71,6 +71,9 @@ typedef struct rpcsvc_state { rpcsvc_notify_t notifyfn; struct mem_pool *rxpool; rpcsvc_drc_globals_t *drc; + + /* per-client limit of outstanding rpc requests */ + int outstanding_rpc_limit; } rpcsvc_t; /* DRC START */ diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 254a05d664d..8fe2e52bcdc 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -129,6 +129,37 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, return NULL; } +int +rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta) +{ + int ret = 0; + int old_count = 0; + int new_count = 0; + int limit = 0; + + pthread_mutex_lock (&trans->lock); + { + limit = svc->outstanding_rpc_limit; + if (!limit) + goto unlock; + + old_count = trans->outstanding_rpc_count; + trans->outstanding_rpc_count += delta; + new_count = trans->outstanding_rpc_count; + + if (old_count <= limit && new_count > limit) + ret = rpc_transport_throttle (trans, _gf_true); + + if (old_count > limit && new_count <= limit) + ret = rpc_transport_throttle (trans, _gf_false); + } +unlock: + pthread_mutex_unlock (&trans->lock); + + return ret; +} + + /* This needs to change to returning errors, since * we need to return RPC specific error messages when some * of the pointers below are NULL. @@ -279,6 +310,13 @@ rpcsvc_request_destroy (rpcsvc_request_t *req) if (req->hdr_iobuf) iobuf_unref (req->hdr_iobuf); + /* This marks the "end" of an RPC request. Reply is + completely written to the socket and is on the way + to the client. It is time to decrement the + outstanding request counter by 1. + */ + rpcsvc_request_outstanding (req->svc, req->trans, -1); + rpc_transport_unref (req->trans); GF_FREE (req->auxgidlarge); @@ -365,6 +403,12 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, goto err; } + /* We just received a new request from the wire. Account for + it in the outsanding request counter to make sure we don't + ingest too many concurrent requests from the same client. + */ + ret = rpcsvc_request_outstanding (svc, trans, +1); + msgbuf = msg->vector[0].iov_base; msglen = msg->vector[0].iov_len; @@ -1845,6 +1889,24 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options) gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration " "disabled"); + svc->outstanding_rpc_limit = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT; + + if (dict_get (options, "rpc.outstanding-rpc-limit")) { + ret = dict_get_str (options, "rpc.oustanding-rpc-limit", + &optstr); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "Value went missing"); + goto out; + } + + ret = gf_string2int32 (optstr, &svc->outstanding_rpc_limit); + if (ret < 0) { + gf_log (GF_RPCSVC, GF_LOG_ERROR, "Invalid RPC limit %s", + optstr); + goto out; + } + } + ret = 0; out: return ret; diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index a08ee4b57d4..ac2f09beac5 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -38,6 +38,9 @@ #define MAX_IOVEC 16 #endif +#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT 64 +#define RPCSVC_MAX_OUTSTANDING_RPC_LIMIT 65536 + #define GF_RPCSVC "rpc-service" #define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB)) |