summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src')
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c13
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h6
-rw-r--r--rpc/rpc-lib/src/rpcsvc-common.h3
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c62
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h3
5 files changed, 87 insertions, 0 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index 89f3b3e8a0a..5f2e91c7099 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -69,6 +69,19 @@ out:
return ret;
}
+int
+rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff)
+{
+ int ret = 0;
+
+ if (!this->ops->throttle)
+ return -ENOSYS;
+
+ ret = this->ops->throttle (this, onoff);
+
+ return ret;
+}
+
int32_t
rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr_storage *sa, size_t salen)
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index aa9df72b2dd..2db9072ae49 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -192,6 +192,8 @@ struct rpc_transport {
pthread_mutex_t lock;
int32_t refcount;
+ int32_t outstanding_rpc_count;
+
glusterfs_ctx_t *ctx;
dict_t *options;
char *name;
@@ -235,6 +237,7 @@ struct rpc_transport_ops {
int32_t (*get_myaddr) (rpc_transport_t *this, char *peeraddr,
int addrlen, struct sockaddr_storage *sa,
socklen_t sasize);
+ int32_t (*throttle) (rpc_transport_t *this, gf_boolean_t onoff);
};
@@ -288,6 +291,9 @@ int32_t
rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr_storage *sa, size_t salen);
+int
+rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff);
+
rpc_transport_pollin_t *
rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector,
int count, struct iobuf *hdr_iobuf,
diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h
index 054e187c96d..53c1a8fe3b2 100644
--- a/rpc/rpc-lib/src/rpcsvc-common.h
+++ b/rpc/rpc-lib/src/rpcsvc-common.h
@@ -71,6 +71,9 @@ typedef struct rpcsvc_state {
rpcsvc_notify_t notifyfn;
struct mem_pool *rxpool;
rpcsvc_drc_globals_t *drc;
+
+ /* per-client limit of outstanding rpc requests */
+ int outstanding_rpc_limit;
} rpcsvc_t;
/* DRC START */
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 254a05d664d..8fe2e52bcdc 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -129,6 +129,37 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,
return NULL;
}
+int
+rpcsvc_request_outstanding (rpcsvc_t *svc, rpc_transport_t *trans, int delta)
+{
+ int ret = 0;
+ int old_count = 0;
+ int new_count = 0;
+ int limit = 0;
+
+ pthread_mutex_lock (&trans->lock);
+ {
+ limit = svc->outstanding_rpc_limit;
+ if (!limit)
+ goto unlock;
+
+ old_count = trans->outstanding_rpc_count;
+ trans->outstanding_rpc_count += delta;
+ new_count = trans->outstanding_rpc_count;
+
+ if (old_count <= limit && new_count > limit)
+ ret = rpc_transport_throttle (trans, _gf_true);
+
+ if (old_count > limit && new_count <= limit)
+ ret = rpc_transport_throttle (trans, _gf_false);
+ }
+unlock:
+ pthread_mutex_unlock (&trans->lock);
+
+ return ret;
+}
+
+
/* This needs to change to returning errors, since
* we need to return RPC specific error messages when some
* of the pointers below are NULL.
@@ -279,6 +310,13 @@ rpcsvc_request_destroy (rpcsvc_request_t *req)
if (req->hdr_iobuf)
iobuf_unref (req->hdr_iobuf);
+ /* This marks the "end" of an RPC request. Reply is
+ completely written to the socket and is on the way
+ to the client. It is time to decrement the
+ outstanding request counter by 1.
+ */
+ rpcsvc_request_outstanding (req->svc, req->trans, -1);
+
rpc_transport_unref (req->trans);
GF_FREE (req->auxgidlarge);
@@ -365,6 +403,12 @@ rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,
goto err;
}
+ /* We just received a new request from the wire. Account for
+ it in the outsanding request counter to make sure we don't
+ ingest too many concurrent requests from the same client.
+ */
+ ret = rpcsvc_request_outstanding (svc, trans, +1);
+
msgbuf = msg->vector[0].iov_base;
msglen = msg->vector[0].iov_len;
@@ -1845,6 +1889,24 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options)
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration "
"disabled");
+ svc->outstanding_rpc_limit = RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT;
+
+ if (dict_get (options, "rpc.outstanding-rpc-limit")) {
+ ret = dict_get_str (options, "rpc.oustanding-rpc-limit",
+ &optstr);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Value went missing");
+ goto out;
+ }
+
+ ret = gf_string2int32 (optstr, &svc->outstanding_rpc_limit);
+ if (ret < 0) {
+ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Invalid RPC limit %s",
+ optstr);
+ goto out;
+ }
+ }
+
ret = 0;
out:
return ret;
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index a08ee4b57d4..ac2f09beac5 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -38,6 +38,9 @@
#define MAX_IOVEC 16
#endif
+#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT 64
+#define RPCSVC_MAX_OUTSTANDING_RPC_LIMIT 65536
+
#define GF_RPCSVC "rpc-service"
#define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB))