summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--glusterfsd/src/Makefile.am1
-rw-r--r--glusterfsd/src/glusterfsd-mgmt.c11
-rw-r--r--libglusterfs/src/event-poll.c7
-rw-r--r--rpc/rpc-lib/src/Makefile.am1
-rw-r--r--rpc/rpc-lib/src/autoscale-threads.c5
-rw-r--r--rpc/rpc-lib/src/libgfrpc.sym1
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c131
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h10
-rw-r--r--xlators/protocol/server/src/server.c9
9 files changed, 148 insertions, 28 deletions
diff --git a/glusterfsd/src/Makefile.am b/glusterfsd/src/Makefile.am
index 3286e639bcf..eb92e66e989 100644
--- a/glusterfsd/src/Makefile.am
+++ b/glusterfsd/src/Makefile.am
@@ -25,7 +25,6 @@ AM_CPPFLAGS = $(GF_CPPFLAGS) \
-I$(top_srcdir)/rpc/xdr/src \
-I$(top_builddir)/rpc/xdr/src \
-I$(top_srcdir)/xlators/nfs/server/src \
- -I$(top_srcdir)/xlators/protocol/server/src \
-I$(top_srcdir)/api/src
AM_CFLAGS = -Wall $(GF_CFLAGS)
diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c
index 3404a5931f0..e007149c08c 100644
--- a/glusterfsd/src/glusterfsd-mgmt.c
+++ b/glusterfsd/src/glusterfsd-mgmt.c
@@ -33,7 +33,6 @@
#include "xlator.h"
#include "syscall.h"
#include "monitoring.h"
-#include "server.h"
static gf_boolean_t is_mgmt_rpc_reconnect = _gf_false;
int need_emancipate = 0;
@@ -834,8 +833,7 @@ glusterfs_handle_attach (rpcsvc_request_t *req)
xlator_t *nextchild = NULL;
glusterfs_graph_t *newgraph = NULL;
glusterfs_ctx_t *ctx = NULL;
- xlator_t *srv_xl = NULL;
- server_conf_t *srv_conf = NULL;
+ xlator_t *protocol_server = NULL;
GF_ASSERT (req);
this = THIS;
@@ -876,10 +874,9 @@ glusterfs_handle_attach (rpcsvc_request_t *req)
/* we need a protocol/server xlator as
* nextchild
*/
- srv_xl = this->ctx->active->first;
- srv_conf = (server_conf_t *)srv_xl->private;
- rpcsvc_autoscale_threads (this->ctx,
- srv_conf->rpc, 1);
+ protocol_server = this->ctx->active->first;
+ rpcsvc_autoscale_threads (this->ctx, 1,
+ protocol_server);
}
} else {
gf_log (this->name, GF_LOG_WARNING,
diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c
index 3bffc4784d7..b1aca826759 100644
--- a/libglusterfs/src/event-poll.c
+++ b/libglusterfs/src/event-poll.c
@@ -173,6 +173,13 @@ event_pool_new_poll (int count, int eventthreadcount)
"thread count (%d) ignored", eventthreadcount);
}
+ /* although, eventhreadcount for poll implementaiton is always
+ * going to be 1, eventthreadcount needs to be set to 1 so that
+ * rpcsvc_request_handler() thread scaling works flawlessly in
+ * both epoll and poll models
+ */
+ event_pool->eventthreadcount = 1;
+
return event_pool;
}
diff --git a/rpc/rpc-lib/src/Makefile.am b/rpc/rpc-lib/src/Makefile.am
index 81a96476883..f784c2c1ff3 100644
--- a/rpc/rpc-lib/src/Makefile.am
+++ b/rpc/rpc-lib/src/Makefile.am
@@ -22,6 +22,7 @@ AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
-I$(top_srcdir)/rpc/xdr/src \
-I$(top_builddir)/rpc/xdr/src \
-DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" \
+ -I$(top_srcdir)/xlators/protocol/server/src \
-I$(top_srcdir)/contrib/rbtree
AM_CFLAGS = -Wall $(GF_CFLAGS)
diff --git a/rpc/rpc-lib/src/autoscale-threads.c b/rpc/rpc-lib/src/autoscale-threads.c
index e0e89586160..9e20b37ac63 100644
--- a/rpc/rpc-lib/src/autoscale-threads.c
+++ b/rpc/rpc-lib/src/autoscale-threads.c
@@ -10,13 +10,16 @@
#include "event.h"
#include "rpcsvc.h"
+#include "server.h"
void
-rpcsvc_autoscale_threads (glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr)
+rpcsvc_autoscale_threads (glusterfs_ctx_t *ctx, int incr, xlator_t *this)
{
struct event_pool *pool = ctx->event_pool;
+ server_conf_t *conf = this->private;
int thread_count = pool->eventthreadcount;
pool->auto_thread_count += incr;
(void) event_reconfigure_threads (pool, thread_count+incr);
+ rpcsvc_ownthread_reconf (conf->rpc, pool->eventthreadcount);
}
diff --git a/rpc/rpc-lib/src/libgfrpc.sym b/rpc/rpc-lib/src/libgfrpc.sym
index 540181dabb6..4fab688c66d 100644
--- a/rpc/rpc-lib/src/libgfrpc.sym
+++ b/rpc/rpc-lib/src/libgfrpc.sym
@@ -53,6 +53,7 @@ rpcsvc_transport_connect
rpcsvc_transport_getpeeraddr
rpcsvc_unregister_notify
rpcsvc_volume_allowed
+rpcsvc_ownthread_reconf
rpc_transport_count
rpc_transport_connect
rpc_transport_disconnect
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 8766da47b7b..34e7563e163 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -1970,33 +1970,98 @@ rpcsvc_request_handler (void *arg)
goto unlock;
}
- while (list_empty (&program->request_queue))
+ while (list_empty (&program->request_queue) &&
+ (program->threadcount <=
+ program->eventthreadcount)) {
pthread_cond_wait (&program->queue_cond,
&program->queue_lock);
+ }
- req = list_entry (program->request_queue.next,
- typeof (*req), request_list);
-
- list_del_init (&req->request_list);
+ if (program->threadcount > program->eventthreadcount) {
+ done = 1;
+ program->threadcount--;
+
+ gf_log (GF_RPCSVC, GF_LOG_INFO,
+ "program '%s' thread terminated; "
+ "total count:%d",
+ program->progname,
+ program->threadcount);
+ } else if (!list_empty (&program->request_queue)) {
+ req = list_entry (program->request_queue.next,
+ typeof (*req), request_list);
+
+ list_del_init (&req->request_list);
+ }
}
unlock:
pthread_mutex_unlock (&program->queue_lock);
+ if (req) {
+ THIS = req->svc->xl;
+ actor = rpcsvc_program_actor (req);
+ ret = actor->actor (req);
+
+ if (ret != 0) {
+ rpcsvc_check_and_reply_error (ret, NULL, req);
+ }
+ req = NULL;
+ }
+
if (done)
break;
+ }
- THIS = req->svc->xl;
+ return NULL;
+}
- actor = rpcsvc_program_actor (req);
+int
+rpcsvc_spawn_threads (rpcsvc_t *svc, rpcsvc_program_t *program)
+{
+ int ret = 0, delta = 0, creates = 0;
- ret = actor->actor (req);
+ if (!program || !svc)
+ goto out;
- if (ret != 0) {
- rpcsvc_check_and_reply_error (ret, NULL, req);
+ pthread_mutex_lock (&program->queue_lock);
+ {
+ delta = program->eventthreadcount - program->threadcount;
+
+ if (delta >= 0) {
+ while (delta--) {
+ ret = gf_thread_create (&program->thread, NULL,
+ rpcsvc_request_handler,
+ program, "rpcrqhnd");
+ if (!ret) {
+ program->threadcount++;
+ creates++;
+ }
+ }
+
+ if (creates) {
+ gf_log (GF_RPCSVC, GF_LOG_INFO,
+ "spawned %d threads for program '%s'; "
+ "total count:%d",
+ creates,
+ program->progname,
+ program->threadcount);
+ }
+ } else {
+ gf_log (GF_RPCSVC, GF_LOG_INFO,
+ "terminating %d threads for program '%s'",
+ -delta, program->progname);
+
+ /* this signal is to just wake up the threads so they
+ * test for the change in eventthreadcount and kill
+ * themselves until the program thread count becomes
+ * equal to the event thread count
+ */
+ pthread_cond_broadcast (&program->queue_cond);
}
}
+ pthread_mutex_unlock (&program->queue_lock);
- return NULL;
+out:
+ return creates;
}
int
@@ -2004,6 +2069,7 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program,
gf_boolean_t add_to_head)
{
int ret = -1;
+ int creates = -1;
rpcsvc_program_t *newprog = NULL;
char already_registered = 0;
@@ -2051,12 +2117,11 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program,
newprog->ownthread = _gf_false;
if (newprog->ownthread) {
- ret = gf_thread_create (&newprog->thread, NULL,
- rpcsvc_request_handler,
- newprog, "rpcsvcrh");
- if (ret != 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "error creating request handler thread");
+ newprog->eventthreadcount = 1;
+ creates = rpcsvc_spawn_threads (svc, newprog);
+
+ if (creates < 1) {
+ goto out;
}
}
@@ -2924,6 +2989,38 @@ out:
return ret;
}
+/* During reconfigure, Make sure to call this function after event-threads are
+ * reconfigured as programs' threadcount will be made equal to event threads.
+ */
+
+int
+rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount)
+{
+ int ret = -1;
+ rpcsvc_program_t *program = NULL;
+
+ if (!svc) {
+ ret = 0;
+ goto out;
+ }
+
+ pthread_rwlock_wrlock (&svc->rpclock);
+ {
+ list_for_each_entry (program, &svc->programs, program) {
+ if (program->ownthread) {
+ program->eventthreadcount =
+ new_eventthreadcount;
+ rpcsvc_spawn_threads (svc, program);
+ }
+ }
+ }
+ pthread_rwlock_unlock (&svc->rpclock);
+
+ ret = 0;
+out:
+ return ret;
+}
+
rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = {
[GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA},
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index b2814de4c45..578b4e13025 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -422,6 +422,12 @@ struct rpcsvc_program {
pthread_mutex_t queue_lock;
pthread_cond_t queue_cond;
pthread_t thread;
+ int threadcount;
+ /* eventthreadcount is just a readonly copy of the actual value
+ * owned by the event sub-system
+ * It is used to control the scaling of rpcsvc_request_handler threads
+ */
+ int eventthreadcount;
};
typedef struct rpcsvc_cbk_program {
@@ -642,6 +648,8 @@ rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,
uint32_t progver, int procnum);
void
-rpcsvc_autoscale_threads (glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr);
+rpcsvc_autoscale_threads (glusterfs_ctx_t *ctx, int incr, xlator_t *this);
+extern int
+rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount);
#endif
diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c
index db96363123b..6f1d2728847 100644
--- a/xlators/protocol/server/src/server.c
+++ b/xlators/protocol/server/src/server.c
@@ -844,6 +844,12 @@ do_rpc:
if (ret)
goto out;
+ /* rpcsvc thread reconfigure should be after events thread
+ * reconfigure
+ */
+ new_nthread =
+ ((struct event_pool *)(this->ctx->event_pool))->eventthreadcount;
+ ret = rpcsvc_ownthread_reconf (rpc_conf, new_nthread);
out:
THIS = oldTHIS;
gf_msg_debug ("", 0, "returning %d", ret);
@@ -1499,7 +1505,8 @@ server_notify (xlator_t *this, int32_t event, void *data, ...)
if (victim_found)
(*trav_p) = (*trav_p)->next;
rpc_clnt_mgmt_pmap_signout (ctx, victim->name);
- rpcsvc_autoscale_threads (ctx, conf->rpc, -1);
+ /* we need the protocol/server xlator here as 'this' */
+ rpcsvc_autoscale_threads (ctx, -1, this);
default_notify (victim, GF_EVENT_CLEANUP, data);
}
break;