diff options
author | Raghavendra Gowdappa <rgowdapp@redhat.com> | 2018-10-31 16:10:58 +0530 |
---|---|---|
committer | Raghavendra G <rgowdapp@redhat.com> | 2018-11-29 01:19:12 +0000 |
commit | 95e380eca19b9f0d03a53429535f15556e5724ad (patch) | |
tree | be32fca4fbfa7d29e8571545af26e784d34e294c /rpc | |
parent | f0232d07f7e6543b56830be28f6e80f9085e6241 (diff) |
rpcsvc: provide each request handler thread its own queue
A single global per program queue is contended by all request handler
threads and event threads. This can lead to high contention. So,
reduce the contention by providing each request handler thread its own
private queue.
Thanks to "Manoj Pillai"<mpillai@redhat.com> for the idea of pairing a
single queue with a fixed request-handler-thread and event-thread,
which brought down the performance regression due to overhead of
queuing significantly.
Thanks to "Xavi Hernandez"<xhernandez@redhat.com> for discussion on
how to communicate the event-thread death to request-handler-thread.
Thanks to "Karan Sandha"<ksandha@redhat.com> for voluntarily running
the perf benchmarks to qualify that performance regression introduced
by ping-timer-fixes is fixed with this patch and patiently running
many iterations of regression tests while RCAing the issue.
Thanks to "Milind Changire"<mchangir@redhat.com> for patiently running
the many iterations of perf benchmarking tests while RCAing the
regression caused by ping-timer-expiry fixes.
Change-Id: I578c3fc67713f4234bd3abbec5d3fbba19059ea5
Fixes: bz#1644629
Signed-off-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/rpc-lib/src/autoscale-threads.c | 1 | ||||
-rw-r--r-- | rpc/rpc-lib/src/libgfrpc.sym | 1 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 6 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 4 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 3 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 412 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 34 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 28 |
8 files changed, 348 insertions, 141 deletions
diff --git a/rpc/rpc-lib/src/autoscale-threads.c b/rpc/rpc-lib/src/autoscale-threads.c index 337f002df10..d629a1cd430 100644 --- a/rpc/rpc-lib/src/autoscale-threads.c +++ b/rpc/rpc-lib/src/autoscale-threads.c @@ -19,5 +19,4 @@ rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr) pool->auto_thread_count += incr; (void)event_reconfigure_threads(pool, thread_count + incr); - rpcsvc_ownthread_reconf(rpc, pool->eventthreadcount); } diff --git a/rpc/rpc-lib/src/libgfrpc.sym b/rpc/rpc-lib/src/libgfrpc.sym index a7cb5f6b5cb..4f42485044f 100644 --- a/rpc/rpc-lib/src/libgfrpc.sym +++ b/rpc/rpc-lib/src/libgfrpc.sym @@ -51,7 +51,6 @@ rpcsvc_transport_connect rpcsvc_transport_getpeeraddr rpcsvc_unregister_notify rpcsvc_volume_allowed -rpcsvc_ownthread_reconf rpc_transport_count rpc_transport_connect rpc_transport_disconnect diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 2505998b3d4..b26d645bb12 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -969,6 +969,12 @@ rpc_clnt_notify(rpc_transport_t *trans, void *mydata, */ ret = 0; break; + + case RPC_TRANSPORT_EVENT_THREAD_DIED: + /* only meaningful on a server, no need of handling this event on a + * client */ + ret = 0; + break; } out: diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index d70334476c7..54636dcbf00 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -266,6 +266,10 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name) goto fail; } + if (dict_get(options, "notify-poller-death")) { + trans->notify_poller_death = 1; + } + gf_log("rpc-transport", GF_LOG_DEBUG, "attempt to load file %s", name); handle = dlopen(name, RTLD_NOW); diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index c238501b5c7..fd737d0c764 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -97,6 +97,7 @@ typedef enum { RPC_TRANSPORT_MSG_RECEIVED, /* Complete rpc msg has been read */ RPC_TRANSPORT_CONNECT, /* client is connected to server */ RPC_TRANSPORT_MSG_SENT, + RPC_TRANSPORT_EVENT_THREAD_DIED /* event-thread has died */ } rpc_transport_event_t; struct rpc_transport_msg { @@ -213,6 +214,8 @@ struct rpc_transport { * layer or in client management notification handler functions */ gf_boolean_t connect_failed; + char notify_poller_death; + char poller_death_accept; }; struct rpc_transport_ops { diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index c6545193a11..d678bca43a8 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -36,6 +36,7 @@ #include <fnmatch.h> #include <stdarg.h> #include <stdio.h> +#include <math.h> #ifdef IPV6_DEFAULT #include <netconfig.h> @@ -63,10 +64,76 @@ rpcsvc_get_listener(rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans); int rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...); +void * +rpcsvc_request_handler(void *arg); static int rpcsvc_match_subnet_v4(const char *addrtok, const char *ipaddr); +void +rpcsvc_toggle_queue_status(rpcsvc_program_t *prog, + rpcsvc_request_queue_t *queue, char status[]) +{ + int queue_index = 0, status_index = 0, set_bit = 0; + + if (queue != &prog->request_queue[0]) { + queue_index = (queue - &prog->request_queue[0]); + } + + status_index = queue_index / 8; + set_bit = queue_index % 8; + + status[status_index] ^= (1 << set_bit); + + return; +} + +static int +get_rightmost_set_bit(int n) +{ + return log2(n & -n); +} + +int +rpcsvc_get_free_queue_index(rpcsvc_program_t *prog) +{ + int queue_index = 0, max_index = 0, i = 0; + unsigned int right_most_unset_bit = 0; + + right_most_unset_bit = 8; + + max_index = gf_roof(EVENT_MAX_THREADS, 8) / 8; + for (i = 0; i < max_index; i++) { + if (prog->request_queue_status[i] == 0) { + right_most_unset_bit = 0; + break; + } else { + right_most_unset_bit = get_rightmost_set_bit( + ~prog->request_queue_status[i]); + if (right_most_unset_bit < 8) { + break; + } + } + } + + if (right_most_unset_bit > 7) { + queue_index = -1; + } else { + queue_index = i * 8; + queue_index += right_most_unset_bit; + + if (queue_index > EVENT_MAX_THREADS) { + queue_index = -1; + } + } + + if (queue_index != -1) { + prog->request_queue_status[i] |= (0x1 << right_most_unset_bit); + } + + return queue_index; +} + rpcsvc_notify_wrapper_t * rpcsvc_notify_wrapper_alloc(void) { @@ -575,6 +642,73 @@ rpcsvc_check_and_reply_error(int ret, call_frame_t *frame, void *opaque) return 0; } +void +rpcsvc_queue_event_thread_death(rpcsvc_t *svc, rpcsvc_program_t *prog, int gen) +{ + rpcsvc_request_queue_t *queue = NULL; + int num = 0; + void *value = NULL; + rpcsvc_request_t *req = NULL; + char empty = 0; + + value = pthread_getspecific(prog->req_queue_key); + if (value == NULL) { + return; + } + + num = ((unsigned long)value) - 1; + + queue = &prog->request_queue[num]; + + if (queue->gen == gen) { + /* duplicate event */ + gf_log(GF_RPCSVC, GF_LOG_INFO, + "not queuing duplicate event thread death. " + "queue %d program %s", + num, prog->progname); + return; + } + + rpcsvc_alloc_request(svc, req); + req->prognum = RPCSVC_INFRA_PROGRAM; + req->procnum = RPCSVC_PROC_EVENT_THREAD_DEATH; + gf_log(GF_RPCSVC, GF_LOG_INFO, + "queuing event thread death request to queue %d of program %s", num, + prog->progname); + + pthread_mutex_lock(&queue->queue_lock); + { + empty = list_empty(&queue->request_queue); + + list_add_tail(&req->request_list, &queue->request_queue); + queue->gen = gen; + + if (empty && queue->waiting) + pthread_cond_signal(&queue->queue_cond); + } + pthread_mutex_unlock(&queue->queue_lock); + + return; +} + +int +rpcsvc_handle_event_thread_death(rpcsvc_t *svc, rpc_transport_t *trans, int gen) +{ + rpcsvc_program_t *prog = NULL; + + pthread_rwlock_rdlock(&svc->rpclock); + { + list_for_each_entry(prog, &svc->programs, program) + { + if (prog->ownthread) + rpcsvc_queue_event_thread_death(svc, prog, gen); + } + } + pthread_rwlock_unlock(&svc->rpclock); + + return 0; +} + int rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans, rpc_transport_pollin_t *msg) @@ -585,9 +719,12 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans, int ret = -1; uint16_t port = 0; gf_boolean_t is_unix = _gf_false, empty = _gf_false; - gf_boolean_t unprivileged = _gf_false; + gf_boolean_t unprivileged = _gf_false, spawn_request_handler = 0; drc_cached_op_t *reply = NULL; rpcsvc_drc_globals_t *drc = NULL; + rpcsvc_request_queue_t *queue = NULL; + long num = 0; + void *value = NULL; if (!trans || !svc) return -1; @@ -700,19 +837,81 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans, ret = synctask_new(THIS->ctx->env, (synctask_fn_t)actor_fn, rpcsvc_check_and_reply_error, NULL, req); } else if (req->ownthread) { - pthread_mutex_lock(&req->prog->queue_lock); + value = pthread_getspecific(req->prog->req_queue_key); + if (value == NULL) { + pthread_mutex_lock(&req->prog->thr_lock); + { + num = rpcsvc_get_free_queue_index(req->prog); + if (num != -1) { + num++; + value = (void *)num; + ret = pthread_setspecific(req->prog->req_queue_key, + value); + if (ret < 0) { + gf_log(GF_RPCSVC, GF_LOG_WARNING, + "setting request queue in TLS failed"); + rpcsvc_toggle_queue_status( + req->prog, &req->prog->request_queue[num - 1], + req->prog->request_queue_status); + num = -1; + } else { + spawn_request_handler = 1; + } + } + } + pthread_mutex_unlock(&req->prog->thr_lock); + } + + if (num == -1) + goto noqueue; + + num = ((unsigned long)value) - 1; + + queue = &req->prog->request_queue[num]; + + if (spawn_request_handler) { + ret = gf_thread_create(&queue->thread, NULL, + rpcsvc_request_handler, queue, + "rpcrqhnd"); + if (!ret) { + gf_log(GF_RPCSVC, GF_LOG_INFO, + "spawned a request handler thread for queue %d", + (int)num); + + req->prog->threadcount++; + } else { + gf_log( + GF_RPCSVC, GF_LOG_INFO, + "spawning a request handler thread for queue %d failed", + (int)num); + ret = pthread_setspecific(req->prog->req_queue_key, 0); + if (ret < 0) { + gf_log(GF_RPCSVC, GF_LOG_WARNING, + "resetting request queue in TLS failed"); + } + + rpcsvc_toggle_queue_status( + req->prog, &req->prog->request_queue[num - 1], + req->prog->request_queue_status); + + goto noqueue; + } + } + + pthread_mutex_lock(&queue->queue_lock); { - empty = list_empty(&req->prog->request_queue); + empty = list_empty(&queue->request_queue); - list_add_tail(&req->request_list, &req->prog->request_queue); + list_add_tail(&req->request_list, &queue->request_queue); - if (empty) - pthread_cond_signal(&req->prog->queue_cond); + if (empty && queue->waiting) + pthread_cond_signal(&queue->queue_cond); } - pthread_mutex_unlock(&req->prog->queue_lock); + pthread_mutex_unlock(&queue->queue_lock); ret = 0; } else { + noqueue: ret = actor_fn(req); } } @@ -839,6 +1038,12 @@ rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, "got MAP_XID event, which should have not come"); ret = 0; break; + + case RPC_TRANSPORT_EVENT_THREAD_DIED: + rpcsvc_handle_event_thread_death(svc, trans, + (int)(unsigned long)data); + ret = 0; + break; } out: @@ -1877,6 +2082,7 @@ rpcsvc_create_listeners(rpcsvc_t *svc, dict_t *options, char *name) goto out; } + dict_del(options, "notify-poller-death"); GF_FREE(transport_name); transport_name = NULL; count++; @@ -1961,55 +2167,86 @@ out: void * rpcsvc_request_handler(void *arg) { - rpcsvc_program_t *program = arg; - rpcsvc_request_t *req = NULL; + rpcsvc_request_queue_t *queue = NULL; + rpcsvc_program_t *program = NULL; + rpcsvc_request_t *req = NULL, *tmp_req = NULL; rpcsvc_actor_t *actor = NULL; gf_boolean_t done = _gf_false; int ret = 0; + struct list_head tmp_list = { + 0, + }; + + queue = arg; + program = queue->program; + + INIT_LIST_HEAD(&tmp_list); if (!program) return NULL; while (1) { - pthread_mutex_lock(&program->queue_lock); + pthread_mutex_lock(&queue->queue_lock); { - if (!program->alive && list_empty(&program->request_queue)) { + if (!program->alive && list_empty(&queue->request_queue)) { done = 1; goto unlock; } - while (list_empty(&program->request_queue) && - (program->threadcount <= program->eventthreadcount)) { - pthread_cond_wait(&program->queue_cond, &program->queue_lock); + while (list_empty(&queue->request_queue)) { + queue->waiting = _gf_true; + pthread_cond_wait(&queue->queue_cond, &queue->queue_lock); } - if (program->threadcount > program->eventthreadcount) { - done = 1; - program->threadcount--; + queue->waiting = _gf_false; - gf_log(GF_RPCSVC, GF_LOG_INFO, - "program '%s' thread terminated; " - "total count:%d", - program->progname, program->threadcount); - } else if (!list_empty(&program->request_queue)) { - req = list_entry(program->request_queue.next, typeof(*req), - request_list); - - list_del_init(&req->request_list); + if (!list_empty(&queue->request_queue)) { + INIT_LIST_HEAD(&tmp_list); + list_splice_init(&queue->request_queue, &tmp_list); } } unlock: - pthread_mutex_unlock(&program->queue_lock); - - if (req) { - THIS = req->svc->xl; - actor = rpcsvc_program_actor(req); - ret = actor->actor(req); + pthread_mutex_unlock(&queue->queue_lock); - if (ret != 0) { - rpcsvc_check_and_reply_error(ret, NULL, req); + list_for_each_entry_safe(req, tmp_req, &tmp_list, request_list) + { + list_del_init(&req->request_list); + + if (req) { + if (req->prognum == RPCSVC_INFRA_PROGRAM) { + switch (req->procnum) { + case RPCSVC_PROC_EVENT_THREAD_DEATH: + gf_log(GF_RPCSVC, GF_LOG_INFO, + "event thread died, exiting request handler " + "thread for queue %d of program %s", + (int)(queue - &program->request_queue[0]), + program->progname); + done = 1; + pthread_mutex_lock(&program->thr_lock); + { + rpcsvc_toggle_queue_status( + program, queue, + program->request_queue_status); + program->threadcount--; + } + pthread_mutex_unlock(&program->thr_lock); + rpcsvc_request_destroy(req); + break; + + default: + break; + } + } else { + THIS = req->svc->xl; + actor = rpcsvc_program_actor(req); + ret = actor->actor(req); + + if (ret != 0) { + rpcsvc_check_and_reply_error(ret, NULL, req); + } + req = NULL; + } } - req = NULL; } if (done) @@ -2020,59 +2257,10 @@ rpcsvc_request_handler(void *arg) } int -rpcsvc_spawn_threads(rpcsvc_t *svc, rpcsvc_program_t *program) -{ - int ret = 0, delta = 0, creates = 0; - - if (!program || !svc) - goto out; - - pthread_mutex_lock(&program->queue_lock); - { - delta = program->eventthreadcount - program->threadcount; - - if (delta >= 0) { - while (delta--) { - ret = gf_thread_create(&program->thread, NULL, - rpcsvc_request_handler, program, - "rpcrqhnd"); - if (!ret) { - program->threadcount++; - creates++; - } - } - - if (creates) { - gf_log(GF_RPCSVC, GF_LOG_INFO, - "spawned %d threads for program '%s'; " - "total count:%d", - creates, program->progname, program->threadcount); - } - } else { - gf_log(GF_RPCSVC, GF_LOG_INFO, - "terminating %d threads for program '%s'", -delta, - program->progname); - - /* this signal is to just wake up the threads so they - * test for the change in eventthreadcount and kill - * themselves until the program thread count becomes - * equal to the event thread count - */ - pthread_cond_broadcast(&program->queue_cond); - } - } - pthread_mutex_unlock(&program->queue_lock); - -out: - return creates; -} - -int rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program, gf_boolean_t add_to_head) { - int ret = -1; - int creates = -1; + int ret = -1, i = 0; rpcsvc_program_t *newprog = NULL; char already_registered = 0; @@ -2110,9 +2298,16 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program, memcpy(newprog, program, sizeof(*program)); INIT_LIST_HEAD(&newprog->program); - INIT_LIST_HEAD(&newprog->request_queue); - pthread_mutex_init(&newprog->queue_lock, NULL); - pthread_cond_init(&newprog->queue_cond, NULL); + + for (i = 0; i < EVENT_MAX_THREADS; i++) { + INIT_LIST_HEAD(&newprog->request_queue[i].request_queue); + pthread_mutex_init(&newprog->request_queue[i].queue_lock, NULL); + pthread_cond_init(&newprog->request_queue[i].queue_cond, NULL); + newprog->request_queue[i].program = newprog; + } + + pthread_mutex_init(&newprog->thr_lock, NULL); + pthread_cond_init(&newprog->thr_cond, NULL); newprog->alive = _gf_true; @@ -2121,12 +2316,11 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program, newprog->ownthread = _gf_false; if (newprog->ownthread) { - newprog->eventthreadcount = 1; - creates = rpcsvc_spawn_threads(svc, newprog); + struct event_pool *ep = svc->ctx->event_pool; + newprog->eventthreadcount = ep->eventthreadcount; - if (creates < 1) { - goto out; - } + pthread_key_create(&newprog->req_queue_key, NULL); + newprog->thr_queue = 1; } pthread_rwlock_wrlock(&svc->rpclock); @@ -3003,38 +3197,6 @@ out: return ret; } -/* During reconfigure, Make sure to call this function after event-threads are - * reconfigured as programs' threadcount will be made equal to event threads. - */ - -int -rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount) -{ - int ret = -1; - rpcsvc_program_t *program = NULL; - - if (!svc) { - ret = 0; - goto out; - } - - pthread_rwlock_wrlock(&svc->rpclock); - { - list_for_each_entry(program, &svc->programs, program) - { - if (program->ownthread) { - program->eventthreadcount = new_eventthreadcount; - rpcsvc_spawn_threads(svc, program); - } - } - } - pthread_rwlock_unlock(&svc->rpclock); - - ret = 0; -out: - return ret; -} - rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = { [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA}, [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA}, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index ebb836fba3f..8388dd404c5 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -33,6 +33,16 @@ #define MAX_IOVEC 16 #endif +/* TODO: we should store prognums at a centralized location to avoid conflict + or use a robust random number generator to avoid conflicts +*/ + +#define RPCSVC_INFRA_PROGRAM 7712846 /* random number */ + +typedef enum { + RPCSVC_PROC_EVENT_THREAD_DEATH = 0, +} rpcsvc_infra_procnum_t; + #define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT \ 64 /* Default for protocol/server */ #define RPCSVC_DEF_NFS_OUTSTANDING_RPC_LIMIT 16 /* Default for nfs/server */ @@ -362,6 +372,16 @@ typedef struct rpcsvc_actor_desc { drc_op_type_t op_type; } rpcsvc_actor_t; +typedef struct rpcsvc_request_queue { + int gen; + struct list_head request_queue; + pthread_mutex_t queue_lock; + pthread_cond_t queue_cond; + pthread_t thread; + struct rpcsvc_program *program; + gf_boolean_t waiting; +} rpcsvc_request_queue_t; + /* Describes a program and its version along with the function pointers * required to handle the procedures/actors of each program/version. * Never changed ever by any thread so no need for a lock. @@ -421,11 +441,14 @@ struct rpcsvc_program { gf_boolean_t synctask; /* list member to link to list of registered services with rpcsvc */ struct list_head program; - struct list_head request_queue; - pthread_mutex_t queue_lock; - pthread_cond_t queue_cond; - pthread_t thread; + rpcsvc_request_queue_t request_queue[EVENT_MAX_THREADS]; + char request_queue_status[EVENT_MAX_THREADS / 8 + 1]; + pthread_mutex_t thr_lock; + pthread_cond_t thr_cond; int threadcount; + int thr_queue; + pthread_key_t req_queue_key; + /* eventthreadcount is just a readonly copy of the actual value * owned by the event sub-system * It is used to control the scaling of rpcsvc_request_handler threads @@ -652,9 +675,6 @@ rpcsvc_auth_array(rpcsvc_t *svc, char *volname, int *autharr, int arrlen); rpcsvc_vector_sizer rpcsvc_get_program_vector_sizer(rpcsvc_t *svc, uint32_t prognum, uint32_t progver, int procnum); -extern int -rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount); - void rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr); #endif diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index dc227137d57..776e647d4f6 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -2859,7 +2859,7 @@ socket_complete_connection(rpc_transport_t *this) /* reads rpc_requests during pollin */ static int socket_event_handler(int fd, int idx, int gen, void *data, int poll_in, - int poll_out, int poll_err) + int poll_out, int poll_err, char event_thread_died) { rpc_transport_t *this = NULL; socket_private_t *priv = NULL; @@ -2869,6 +2869,11 @@ socket_event_handler(int fd, int idx, int gen, void *data, int poll_in, this = data; + if (event_thread_died) { + /* to avoid duplicate notifications, notify only for listener sockets */ + return 0; + } + GF_VALIDATE_OR_GOTO("socket", this, out); GF_VALIDATE_OR_GOTO("socket", this->private, out); GF_VALIDATE_OR_GOTO("socket", this->xl, out); @@ -2967,7 +2972,7 @@ out: static int socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in, - int poll_out, int poll_err) + int poll_out, int poll_err, char event_thread_died) { rpc_transport_t *this = NULL; socket_private_t *priv = NULL; @@ -2991,6 +2996,12 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in, priv = this->private; ctx = this->ctx; + if (event_thread_died) { + rpc_transport_notify(this, RPC_TRANSPORT_EVENT_THREAD_DIED, + (void *)(unsigned long)gen); + return 0; + } + /* NOTE: * We have done away with the critical section in this function. since * there's little that it helps with. There's no other code that @@ -3099,6 +3110,7 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in, new_trans->mydata = this->mydata; new_trans->notify = this->notify; new_trans->listener = this; + new_trans->notify_poller_death = this->poller_death_accept; new_priv = new_trans->private; if (new_sockaddr.ss_family == AF_UNIX) { @@ -3149,9 +3161,9 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in, ret = rpc_transport_notify(this, RPC_TRANSPORT_ACCEPT, new_trans); if (ret != -1) { - new_priv->idx = event_register(ctx->event_pool, new_sock, - socket_event_handler, new_trans, - 1, 0); + new_priv->idx = event_register( + ctx->event_pool, new_sock, socket_event_handler, new_trans, + 1, 0, new_trans->notify_poller_death); if (new_priv->idx == -1) { ret = -1; gf_log(this->name, GF_LOG_ERROR, @@ -3530,7 +3542,8 @@ socket_connect(rpc_transport_t *this, int port) this->listener = this; priv->idx = event_register(ctx->event_pool, priv->sock, - socket_event_handler, this, 1, 1); + socket_event_handler, this, 1, 1, + this->notify_poller_death); if (priv->idx == -1) { gf_log("", GF_LOG_WARNING, "failed to register the event; " @@ -3709,7 +3722,8 @@ socket_listen(rpc_transport_t *this) rpc_transport_ref(this); priv->idx = event_register(ctx->event_pool, priv->sock, - socket_server_event_handler, this, 1, 0); + socket_server_event_handler, this, 1, 0, + this->notify_poller_death); if (priv->idx == -1) { gf_log(this->name, GF_LOG_WARNING, |