diff options
Diffstat (limited to 'rpc/rpc-lib')
-rw-r--r-- | rpc/rpc-lib/src/libgfrpc.sym | 1 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 131 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 8 |
3 files changed, 123 insertions, 17 deletions
diff --git a/rpc/rpc-lib/src/libgfrpc.sym b/rpc/rpc-lib/src/libgfrpc.sym index 89923b22192..7d878abfd4d 100644 --- a/rpc/rpc-lib/src/libgfrpc.sym +++ b/rpc/rpc-lib/src/libgfrpc.sym @@ -53,6 +53,7 @@ rpcsvc_transport_connect rpcsvc_transport_getpeeraddr rpcsvc_unregister_notify rpcsvc_volume_allowed +rpcsvc_ownthread_reconf rpc_transport_count rpc_transport_connect rpc_transport_disconnect diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 8766da47b7b..34e7563e163 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1970,33 +1970,98 @@ rpcsvc_request_handler (void *arg) goto unlock; } - while (list_empty (&program->request_queue)) + while (list_empty (&program->request_queue) && + (program->threadcount <= + program->eventthreadcount)) { pthread_cond_wait (&program->queue_cond, &program->queue_lock); + } - req = list_entry (program->request_queue.next, - typeof (*req), request_list); - - list_del_init (&req->request_list); + if (program->threadcount > program->eventthreadcount) { + done = 1; + program->threadcount--; + + gf_log (GF_RPCSVC, GF_LOG_INFO, + "program '%s' thread terminated; " + "total count:%d", + program->progname, + program->threadcount); + } else if (!list_empty (&program->request_queue)) { + req = list_entry (program->request_queue.next, + typeof (*req), request_list); + + list_del_init (&req->request_list); + } } unlock: pthread_mutex_unlock (&program->queue_lock); + if (req) { + THIS = req->svc->xl; + actor = rpcsvc_program_actor (req); + ret = actor->actor (req); + + if (ret != 0) { + rpcsvc_check_and_reply_error (ret, NULL, req); + } + req = NULL; + } + if (done) break; + } - THIS = req->svc->xl; + return NULL; +} - actor = rpcsvc_program_actor (req); +int +rpcsvc_spawn_threads (rpcsvc_t *svc, rpcsvc_program_t *program) +{ + int ret = 0, delta = 0, creates = 0; - ret = actor->actor (req); + if (!program || !svc) + goto out; - if (ret != 0) { - rpcsvc_check_and_reply_error (ret, NULL, req); + pthread_mutex_lock (&program->queue_lock); + { + delta = program->eventthreadcount - program->threadcount; + + if (delta >= 0) { + while (delta--) { + ret = gf_thread_create (&program->thread, NULL, + rpcsvc_request_handler, + program, "rpcrqhnd"); + if (!ret) { + program->threadcount++; + creates++; + } + } + + if (creates) { + gf_log (GF_RPCSVC, GF_LOG_INFO, + "spawned %d threads for program '%s'; " + "total count:%d", + creates, + program->progname, + program->threadcount); + } + } else { + gf_log (GF_RPCSVC, GF_LOG_INFO, + "terminating %d threads for program '%s'", + -delta, program->progname); + + /* this signal is to just wake up the threads so they + * test for the change in eventthreadcount and kill + * themselves until the program thread count becomes + * equal to the event thread count + */ + pthread_cond_broadcast (&program->queue_cond); } } + pthread_mutex_unlock (&program->queue_lock); - return NULL; +out: + return creates; } int @@ -2004,6 +2069,7 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program, gf_boolean_t add_to_head) { int ret = -1; + int creates = -1; rpcsvc_program_t *newprog = NULL; char already_registered = 0; @@ -2051,12 +2117,11 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program, newprog->ownthread = _gf_false; if (newprog->ownthread) { - ret = gf_thread_create (&newprog->thread, NULL, - rpcsvc_request_handler, - newprog, "rpcsvcrh"); - if (ret != 0) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, - "error creating request handler thread"); + newprog->eventthreadcount = 1; + creates = rpcsvc_spawn_threads (svc, newprog); + + if (creates < 1) { + goto out; } } @@ -2924,6 +2989,38 @@ out: return ret; } +/* During reconfigure, Make sure to call this function after event-threads are + * reconfigured as programs' threadcount will be made equal to event threads. + */ + +int +rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount) +{ + int ret = -1; + rpcsvc_program_t *program = NULL; + + if (!svc) { + ret = 0; + goto out; + } + + pthread_rwlock_wrlock (&svc->rpclock); + { + list_for_each_entry (program, &svc->programs, program) { + if (program->ownthread) { + program->eventthreadcount = + new_eventthreadcount; + rpcsvc_spawn_threads (svc, program); + } + } + } + pthread_rwlock_unlock (&svc->rpclock); + + ret = 0; +out: + return ret; +} + rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = { [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA}, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 878eea28999..ec76b659965 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -422,6 +422,12 @@ struct rpcsvc_program { pthread_mutex_t queue_lock; pthread_cond_t queue_cond; pthread_t thread; + int threadcount; + /* eventthreadcount is just a readonly copy of the actual value + * owned by the event sub-system + * It is used to control the scaling of rpcsvc_request_handler threads + */ + int eventthreadcount; }; typedef struct rpcsvc_cbk_program { @@ -639,4 +645,6 @@ rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen); rpcsvc_vector_sizer rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, uint32_t progver, int procnum); +extern int +rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount); #endif |