diff options
-rw-r--r-- | api/src/glfs.c | 3 | ||||
-rw-r--r-- | cli/src/cli.c | 3 | ||||
-rw-r--r-- | glusterfsd/src/glusterfsd.c | 3 | ||||
-rw-r--r-- | libglusterfs/src/event-epoll.c | 210 | ||||
-rw-r--r-- | libglusterfs/src/event-poll.c | 17 | ||||
-rw-r--r-- | libglusterfs/src/event.c | 21 | ||||
-rw-r--r-- | libglusterfs/src/event.h | 15 | ||||
-rw-r--r-- | libglusterfs/src/glusterfs.h | 2 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volume-set.c | 8 | ||||
-rw-r--r-- | xlators/protocol/client/src/client.c | 39 | ||||
-rw-r--r-- | xlators/protocol/client/src/client.h | 3 | ||||
-rw-r--r-- | xlators/protocol/server/src/server.c | 42 | ||||
-rw-r--r-- | xlators/protocol/server/src/server.h | 3 |
13 files changed, 336 insertions, 33 deletions
diff --git a/api/src/glfs.c b/api/src/glfs.c index 7542d8b9fcd..48af2412b8b 100644 --- a/api/src/glfs.c +++ b/api/src/glfs.c @@ -95,7 +95,8 @@ glusterfs_ctx_defaults_init (glusterfs_ctx_t *ctx) goto err; } - ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE); + ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE, + STARTING_EVENT_THREADS); if (!ctx->event_pool) { goto err; } diff --git a/cli/src/cli.c b/cli/src/cli.c index b33ce950e11..cd2825e9c4a 100644 --- a/cli/src/cli.c +++ b/cli/src/cli.c @@ -114,7 +114,8 @@ glusterfs_ctx_defaults_init (glusterfs_ctx_t *ctx) if (!ctx->iobuf_pool) return -1; - ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE); + ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE, + STARTING_EVENT_THREADS); if (!ctx->event_pool) return -1; diff --git a/glusterfsd/src/glusterfsd.c b/glusterfsd/src/glusterfsd.c index e750d68dd98..a46385aa292 100644 --- a/glusterfsd/src/glusterfsd.c +++ b/glusterfsd/src/glusterfsd.c @@ -1340,7 +1340,8 @@ glusterfs_ctx_defaults_init (glusterfs_ctx_t *ctx) goto out; } - ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE); + ctx->event_pool = event_pool_new (DEFAULT_EVENT_POOL_SIZE, + STARTING_EVENT_THREADS); if (!ctx->event_pool) { gf_msg ("", GF_LOG_CRITICAL, 0, glusterfsd_msg_14, "event"); goto out; diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index 9082954e4e4..8d42fa71fb6 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -43,6 +43,10 @@ struct event_slot_epoll { gf_lock_t lock; }; +struct event_thread_data { + struct event_pool *event_pool; + int event_index; +}; static struct event_slot_epoll * __event_newtable (struct event_pool *event_pool, int table_idx) @@ -232,7 +236,7 @@ done: static struct event_pool * -event_pool_new_epoll (int count) +event_pool_new_epoll (int count, int eventthreadcount) { struct event_pool *event_pool = NULL; int epfd = -1; @@ -258,6 +262,8 @@ event_pool_new_epoll (int count) event_pool->count = count; + event_pool->eventthreadcount = eventthreadcount; + pthread_mutex_init (&event_pool->mutex, NULL); out: @@ -585,11 +591,45 @@ event_dispatch_epoll_worker (void *data) { struct epoll_event event; int ret = -1; - struct event_pool *event_pool = data; + struct event_thread_data *ev_data = data; + struct event_pool *event_pool; + int myindex = -1; + int timetodie = 0; + + GF_VALIDATE_OR_GOTO ("event", ev_data, out); + + event_pool = ev_data->event_pool; + myindex = ev_data->event_index; GF_VALIDATE_OR_GOTO ("event", event_pool, out); + gf_log ("epoll", GF_LOG_INFO, "Started thread with index %d", myindex); + for (;;) { + if (event_pool->eventthreadcount < myindex) { + /* ...time to die, thread count was decreased below + * this threads index */ + /* Start with extra safety at this point, reducing + * lock conention in normal case when threads are not + * reconfigured always */ + pthread_mutex_lock (&event_pool->mutex); + { + if (event_pool->eventthreadcount < + myindex) { + /* if found true in critical section, + * die */ + event_pool->pollers[myindex - 1] = 0; + timetodie = 1; + } + } + pthread_mutex_unlock (&event_pool->mutex); + if (timetodie) { + gf_log ("epoll", GF_LOG_INFO, + "Exited thread with index %d", myindex); + goto out; + } + } + ret = epoll_wait (event_pool->fd, &event, 1, -1); if (ret == 0) @@ -603,40 +643,164 @@ event_dispatch_epoll_worker (void *data) ret = event_dispatch_epoll_handler (event_pool, &event); } out: + if (ev_data) + GF_FREE (ev_data); return NULL; } - -#define GLUSTERFS_EPOLL_MAXTHREADS 2 - - +/* Attempts to start the # of configured pollers, ensuring at least the first + * is started in a joinable state */ static int event_dispatch_epoll (struct event_pool *event_pool) { - int i = 0; - pthread_t pollers[GLUSTERFS_EPOLL_MAXTHREADS]; - int ret = -1; - - for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) { - ret = pthread_create (&pollers[i], NULL, - event_dispatch_epoll_worker, - event_pool); - } + int i = 0; + pthread_t t_id; + int pollercount = 0; + int ret = -1; + struct event_thread_data *ev_data = NULL; + + /* Start the configured number of pollers */ + pthread_mutex_lock (&event_pool->mutex); + { + pollercount = event_pool->eventthreadcount; + + /* Set to MAX if greater */ + if (pollercount > EVENT_MAX_THREADS) + pollercount = EVENT_MAX_THREADS; + + /* Default pollers to 1 in case this is incorrectly set */ + if (pollercount <= 0) + pollercount = 1; + + for (i = 0; i < pollercount; i++) { + ev_data = GF_CALLOC (1, sizeof (*ev_data), + gf_common_mt_event_pool); + if (!ev_data) { + gf_log ("epoll", GF_LOG_WARNING, + "Allocation failure for index %d", i); + if (i == 0) { + /* Need to suceed creating 0'th + * thread, to joinable and wait */ + break; + } else { + /* Inability to create other threads + * are a lesser evil, and ignored */ + continue; + } + } + + ev_data->event_pool = event_pool; + ev_data->event_index = i + 1; + + ret = pthread_create (&t_id, NULL, + event_dispatch_epoll_worker, + ev_data); + if (!ret) { + event_pool->pollers[i] = t_id; + + /* mark all threads other than one in index 0 + * as detachable. Errors can be ignored, they + * spend their time as zombies if not detched + * and the thread counts are decreased */ + if (i != 0) + pthread_detach (event_pool->pollers[i]); + } else { + gf_log ("epoll", GF_LOG_WARNING, + "Failed to start thread for index %d", + i); + if (i == 0) { + GF_FREE (ev_data); + break; + } else { + GF_FREE (ev_data); + continue; + } + } + } + } + pthread_mutex_unlock (&event_pool->mutex); - for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) - pthread_join (pollers[i], NULL); + /* Just wait for the first thread, that is created in a joinable state + * and will never die, ensuring this function never returns */ + if (event_pool->pollers[0] != 0) + pthread_join (event_pool->pollers[0], NULL); return ret; } +int +event_reconfigure_threads_epoll (struct event_pool *event_pool, int value) +{ + int i; + int ret; + pthread_t t_id; + int oldthreadcount; + struct event_thread_data *ev_data = NULL; + + /* Set to MAX if greater */ + if (value > EVENT_MAX_THREADS) + value = EVENT_MAX_THREADS; + + /* Default pollers to 1 in case this is set incorrectly */ + if (value <= 0) + value = 1; + + pthread_mutex_lock (&event_pool->mutex); + { + oldthreadcount = event_pool->eventthreadcount; + + if (oldthreadcount < value) { + /* create more poll threads */ + for (i = oldthreadcount; i < value; i++) { + /* Start a thread if the index at this location + * is a 0, so that the older thread is confirmed + * as dead */ + if (event_pool->pollers[i] == 0) { + ev_data = GF_CALLOC (1, + sizeof (*ev_data), + gf_common_mt_event_pool); + if (!ev_data) { + gf_log ("epoll", GF_LOG_WARNING, + "Allocation failure for" + " index %d", i); + continue; + } + + ev_data->event_pool = event_pool; + ev_data->event_index = i + 1; + + ret = pthread_create (&t_id, NULL, + event_dispatch_epoll_worker, + ev_data); + if (ret) { + gf_log ("epoll", GF_LOG_WARNING, + "Failed to start thread for" + " index %d", i); + GF_FREE (ev_data); + } else { + pthread_detach (t_id); + event_pool->pollers[i] = t_id; + } + } + } + } + + /* if value decreases, threads will terminate, themselves */ + event_pool->eventthreadcount = value; + } + pthread_mutex_unlock (&event_pool->mutex); + + return 0; +} struct event_ops event_ops_epoll = { - .new = event_pool_new_epoll, - .event_register = event_register_epoll, - .event_select_on = event_select_on_epoll, - .event_unregister = event_unregister_epoll, - .event_unregister_close = event_unregister_close_epoll, - .event_dispatch = event_dispatch_epoll + .new = event_pool_new_epoll, + .event_register = event_register_epoll, + .event_select_on = event_select_on_epoll, + .event_unregister = event_unregister_epoll, + .event_unregister_close = event_unregister_close_epoll, + .event_dispatch = event_dispatch_epoll, + .event_reconfigure_threads = event_reconfigure_threads_epoll }; #endif diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index a7e2e663103..c91fa8487b5 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -95,7 +95,7 @@ out: static struct event_pool * -event_pool_new_poll (int count) +event_pool_new_poll (int count, int eventthreadcount) { struct event_pool *event_pool = NULL; int ret = -1; @@ -171,6 +171,12 @@ event_pool_new_poll (int count) return NULL; } + if (eventthreadcount > 1) { + gf_log ("poll", GF_LOG_INFO, + "Currently poll does not use multiple event processing" + " threads, thread count (%d) ignored", eventthreadcount); + } + return event_pool; } @@ -469,6 +475,12 @@ out: return -1; } +int +event_reconfigure_threads_poll (struct event_pool *event_pool, int value) +{ + /* No-op for poll */ + return 0; +} struct event_ops event_ops_poll = { .new = event_pool_new_poll, @@ -476,5 +488,6 @@ struct event_ops event_ops_poll = { .event_select_on = event_select_on_poll, .event_unregister = event_unregister_poll, .event_unregister_close = event_unregister_close_poll, - .event_dispatch = event_dispatch_poll + .event_dispatch = event_dispatch_poll, + .event_reconfigure_threads = event_reconfigure_threads_poll }; diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c index 6c253df3c1a..4dd0f991700 100644 --- a/libglusterfs/src/event.c +++ b/libglusterfs/src/event.c @@ -29,7 +29,7 @@ struct event_pool * -event_pool_new (int count) +event_pool_new (int count, int eventthreadcount) { struct event_pool *event_pool = NULL; extern struct event_ops event_ops_poll; @@ -37,7 +37,7 @@ event_pool_new (int count) #ifdef HAVE_SYS_EPOLL_H extern struct event_ops event_ops_epoll; - event_pool = event_ops_epoll.new (count); + event_pool = event_ops_epoll.new (count, eventthreadcount); if (event_pool) { event_pool->ops = &event_ops_epoll; @@ -48,7 +48,7 @@ event_pool_new (int count) #endif if (!event_pool) { - event_pool = event_ops_poll.new (count); + event_pool = event_ops_poll.new (count, eventthreadcount); if (event_pool) event_pool->ops = &event_ops_poll; @@ -129,3 +129,18 @@ event_dispatch (struct event_pool *event_pool) out: return ret; } + +int +event_reconfigure_threads (struct event_pool *event_pool, int value) +{ + int ret = -1; + + GF_VALIDATE_OR_GOTO ("event", event_pool, out); + + /* call event refresh function */ + ret = event_pool->ops->event_reconfigure_threads (event_pool, + value); + +out: + return ret; +} diff --git a/libglusterfs/src/event.h b/libglusterfs/src/event.h index 3b3ab0e4b2f..930a7d1e28b 100644 --- a/libglusterfs/src/event.h +++ b/libglusterfs/src/event.h @@ -33,6 +33,7 @@ typedef int (*event_handler_t) (int fd, int idx, void *data, #define EVENT_EPOLL_TABLES 1024 #define EVENT_EPOLL_SLOTS 1024 +#define EVENT_MAX_THREADS 32 struct event_pool { struct event_ops *ops; @@ -53,10 +54,16 @@ struct event_pool { void *evcache; int evcache_size; + + /* NOTE: Currently used only when event processing is done using + * epoll. */ + int eventthreadcount; /* number of event threads to execute. */ + pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store, + * and live status */ }; struct event_ops { - struct event_pool * (*new) (int count); + struct event_pool * (*new) (int count, int eventthreadcount); int (*event_register) (struct event_pool *event_pool, int fd, event_handler_t handler, @@ -71,9 +78,12 @@ struct event_ops { int idx); int (*event_dispatch) (struct event_pool *event_pool); + + int (*event_reconfigure_threads) (struct event_pool *event_pool, + int newcount); }; -struct event_pool * event_pool_new (int count); +struct event_pool *event_pool_new (int count, int eventthreadcount); int event_select_on (struct event_pool *event_pool, int fd, int idx, int poll_in, int poll_out); int event_register (struct event_pool *event_pool, int fd, @@ -82,5 +92,6 @@ int event_register (struct event_pool *event_pool, int fd, int event_unregister (struct event_pool *event_pool, int fd, int idx); int event_unregister_close (struct event_pool *event_pool, int fd, int idx); int event_dispatch (struct event_pool *event_pool); +int event_reconfigure_threads (struct event_pool *event_pool, int value); #endif /* _EVENT_H_ */ diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 8059c976368..9c078e1d5f9 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -157,6 +157,8 @@ #define GLUSTERFS_RPC_REPLY_SIZE 24 +#define STARTING_EVENT_THREADS 1 + #define ZR_FILE_CONTENT_REQUEST(key) (!strncmp(key, ZR_FILE_CONTENT_STR, \ ZR_FILE_CONTENT_STRLEN)) diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index 1d015a94698..e9473658176 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -836,6 +836,10 @@ struct volopt_map_entry glusterd_volopt_map[] = { .type = NO_DOC, .op_version = GD_OP_VERSION_3_7_0, }, + { .key = "client.event-threads", + .voltype = "protocol/client", + .op_version = GD_OP_VERSION_3_7_0, + }, /* Server xlator options */ { .key = "network.tcp-window-size", @@ -939,6 +943,10 @@ struct volopt_map_entry glusterd_volopt_map[] = { .type = NO_DOC, .op_version = GD_OP_VERSION_3_7_0, }, + { .key = "server.event-threads", + .voltype = "protocol/server", + .op_version = GD_OP_VERSION_3_7_0, + }, /* Generic transport options */ { .key = SSL_CERT_DEPTH_OPT, diff --git a/xlators/protocol/client/src/client.c b/xlators/protocol/client/src/client.c index fbd0ff22737..999a4a5c836 100644 --- a/xlators/protocol/client/src/client.c +++ b/xlators/protocol/client/src/client.c @@ -20,6 +20,7 @@ #include "glusterfs.h" #include "statedump.h" #include "compat-errno.h" +#include "event.h" #include "xdr-rpc.h" #include "glusterfs3.h" @@ -2513,6 +2514,23 @@ out: } int +client_check_event_threads (xlator_t *this, dict_t *options, clnt_conf_t *conf) +{ + int ret = -1; + int eventthreads = 0; + + /* Read event-threads from the new configuration */ + ret = dict_get_int32 (options, "event-threads", &eventthreads); + if (!ret) { + conf->event_threads = eventthreads; + } + ret = event_reconfigure_threads (this->ctx->event_pool, + conf->event_threads); + + return ret; +} + +int reconfigure (xlator_t *this, dict_t *options) { clnt_conf_t *conf = NULL; @@ -2531,6 +2549,10 @@ reconfigure (xlator_t *this, dict_t *options) GF_OPTION_RECONF ("ping-timeout", conf->opt.ping_timeout, options, int32, out); + ret = client_check_event_threads (this, options, conf); + if (ret) + goto out; + ret = client_check_remote_host (this, options); if (ret) goto out; @@ -2609,6 +2631,13 @@ init (xlator_t *this) conf->grace_timer = NULL; conf->grace_timer_needed = _gf_true; + /* Set event threads to a default */ + conf->event_threads = STARTING_EVENT_THREADS; + + ret = client_check_event_threads (this, this->options, conf); + if (ret) + goto out; + ret = client_init_grace_timer (this, this->options, conf); if (ret) goto out; @@ -2936,5 +2965,15 @@ struct volume_options options[] = { .type = GF_OPTION_TYPE_BOOL, .default_value = "on", }, + { .key = {"event-threads"}, + .type = GF_OPTION_TYPE_INT, + .min = 1, + .max = 32, + .default_value = "2", + .description = "Specifies the number of event threads to execute in" + "in parallel. Larger values would help process" + " responses faster, depending on available processing" + " power. Range 1-32 threads." + }, { .key = {NULL} }, }; diff --git a/xlators/protocol/client/src/client.h b/xlators/protocol/client/src/client.h index b4809310939..af70926b178 100644 --- a/xlators/protocol/client/src/client.h +++ b/xlators/protocol/client/src/client.h @@ -125,6 +125,9 @@ typedef struct clnt_conf { uint64_t setvol_count; gf_boolean_t send_gids; /* let the server resolve gids */ + + int event_threads; /* # of event threads + * configured */ } clnt_conf_t; typedef struct _client_fd_ctx { diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index 7a2b7fa3297..92113c7c28b 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -25,6 +25,7 @@ #include "statedump.h" #include "defaults.h" #include "authenticate.h" +#include "event.h" void grace_time_handler (void *data) @@ -674,6 +675,24 @@ out: } int +server_check_event_threads (xlator_t *this, dict_t *options, + server_conf_t *conf) +{ + int ret = -1; + int eventthreads = 0; + + /* Read event-threads from the new configuration */ + ret = dict_get_int32 (options, "event-threads", &eventthreads); + if (!ret) { + conf->event_threads = eventthreads; + } + ret = event_reconfigure_threads (this->ctx->event_pool, + conf->event_threads); + + return ret; +} + +int reconfigure (xlator_t *this, dict_t *options) { @@ -693,6 +712,7 @@ reconfigure (xlator_t *this, dict_t *options) gf_log_callingfn (this->name, GF_LOG_DEBUG, "conf == null!!!"); goto out; } + if (dict_get_int32 ( options, "inode-lru-limit", &inode_lru_limit) == 0){ conf->inode_lru_limit = inode_lru_limit; gf_log (this->name, GF_LOG_TRACE, "Reconfigured inode-lru-limit" @@ -790,6 +810,11 @@ reconfigure (xlator_t *this, dict_t *options) "Reconfigure not found for transport" ); } } + + ret = server_check_event_threads (this, options, conf); + if (ret) + goto out; + ret = server_init_grace_timer (this, options, conf); out: @@ -846,6 +871,13 @@ init (xlator_t *this) INIT_LIST_HEAD (&conf->xprt_list); pthread_mutex_init (&conf->mutex, NULL); + /* Set event threads to a default */ + conf->event_threads = STARTING_EVENT_THREADS; + + ret = server_check_event_threads (this, this->options, conf); + if (ret) + goto out; + ret = server_init_grace_timer (this, this->options, conf); if (ret) goto out; @@ -1199,6 +1231,16 @@ struct volume_options options[] = { .default_value = "2", .description = "Timeout in seconds for the cached groups to expire." }, + { .key = {"event-threads"}, + .type = GF_OPTION_TYPE_INT, + .min = 1, + .max = 32, + .default_value = "2", + .description = "Specifies the number of event threads to execute in" + "in parallel. Larger values would help process" + " responses faster, depending on available processing" + " power. Range 1-32 threads." + }, { .key = {NULL} }, }; diff --git a/xlators/protocol/server/src/server.h b/xlators/protocol/server/src/server.h index 3e1feacb94b..dc64edd0ab2 100644 --- a/xlators/protocol/server/src/server.h +++ b/xlators/protocol/server/src/server.h @@ -63,6 +63,9 @@ struct server_conf { gf_boolean_t server_manage_gids; /* resolve gids on brick */ gid_cache_t gid_cache; int32_t gid_cache_timeout; + + int event_threads; /* # of event threads + * configured */ }; typedef struct server_conf server_conf_t; |