diff options
Diffstat (limited to 'libglusterfs/src/event-epoll.c')
-rw-r--r-- | libglusterfs/src/event-epoll.c | 210 |
1 files changed, 187 insertions, 23 deletions
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c index 9082954e4e4..8d42fa71fb6 100644 --- a/libglusterfs/src/event-epoll.c +++ b/libglusterfs/src/event-epoll.c @@ -43,6 +43,10 @@ struct event_slot_epoll { gf_lock_t lock; }; +struct event_thread_data { + struct event_pool *event_pool; + int event_index; +}; static struct event_slot_epoll * __event_newtable (struct event_pool *event_pool, int table_idx) @@ -232,7 +236,7 @@ done: static struct event_pool * -event_pool_new_epoll (int count) +event_pool_new_epoll (int count, int eventthreadcount) { struct event_pool *event_pool = NULL; int epfd = -1; @@ -258,6 +262,8 @@ event_pool_new_epoll (int count) event_pool->count = count; + event_pool->eventthreadcount = eventthreadcount; + pthread_mutex_init (&event_pool->mutex, NULL); out: @@ -585,11 +591,45 @@ event_dispatch_epoll_worker (void *data) { struct epoll_event event; int ret = -1; - struct event_pool *event_pool = data; + struct event_thread_data *ev_data = data; + struct event_pool *event_pool; + int myindex = -1; + int timetodie = 0; + + GF_VALIDATE_OR_GOTO ("event", ev_data, out); + + event_pool = ev_data->event_pool; + myindex = ev_data->event_index; GF_VALIDATE_OR_GOTO ("event", event_pool, out); + gf_log ("epoll", GF_LOG_INFO, "Started thread with index %d", myindex); + for (;;) { + if (event_pool->eventthreadcount < myindex) { + /* ...time to die, thread count was decreased below + * this threads index */ + /* Start with extra safety at this point, reducing + * lock conention in normal case when threads are not + * reconfigured always */ + pthread_mutex_lock (&event_pool->mutex); + { + if (event_pool->eventthreadcount < + myindex) { + /* if found true in critical section, + * die */ + event_pool->pollers[myindex - 1] = 0; + timetodie = 1; + } + } + pthread_mutex_unlock (&event_pool->mutex); + if (timetodie) { + gf_log ("epoll", GF_LOG_INFO, + "Exited thread with index %d", myindex); + goto out; + } + } + ret = epoll_wait (event_pool->fd, &event, 1, -1); if (ret == 0) @@ -603,40 +643,164 @@ event_dispatch_epoll_worker (void *data) ret = event_dispatch_epoll_handler (event_pool, &event); } out: + if (ev_data) + GF_FREE (ev_data); return NULL; } - -#define GLUSTERFS_EPOLL_MAXTHREADS 2 - - +/* Attempts to start the # of configured pollers, ensuring at least the first + * is started in a joinable state */ static int event_dispatch_epoll (struct event_pool *event_pool) { - int i = 0; - pthread_t pollers[GLUSTERFS_EPOLL_MAXTHREADS]; - int ret = -1; - - for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) { - ret = pthread_create (&pollers[i], NULL, - event_dispatch_epoll_worker, - event_pool); - } + int i = 0; + pthread_t t_id; + int pollercount = 0; + int ret = -1; + struct event_thread_data *ev_data = NULL; + + /* Start the configured number of pollers */ + pthread_mutex_lock (&event_pool->mutex); + { + pollercount = event_pool->eventthreadcount; + + /* Set to MAX if greater */ + if (pollercount > EVENT_MAX_THREADS) + pollercount = EVENT_MAX_THREADS; + + /* Default pollers to 1 in case this is incorrectly set */ + if (pollercount <= 0) + pollercount = 1; + + for (i = 0; i < pollercount; i++) { + ev_data = GF_CALLOC (1, sizeof (*ev_data), + gf_common_mt_event_pool); + if (!ev_data) { + gf_log ("epoll", GF_LOG_WARNING, + "Allocation failure for index %d", i); + if (i == 0) { + /* Need to suceed creating 0'th + * thread, to joinable and wait */ + break; + } else { + /* Inability to create other threads + * are a lesser evil, and ignored */ + continue; + } + } + + ev_data->event_pool = event_pool; + ev_data->event_index = i + 1; + + ret = pthread_create (&t_id, NULL, + event_dispatch_epoll_worker, + ev_data); + if (!ret) { + event_pool->pollers[i] = t_id; + + /* mark all threads other than one in index 0 + * as detachable. Errors can be ignored, they + * spend their time as zombies if not detched + * and the thread counts are decreased */ + if (i != 0) + pthread_detach (event_pool->pollers[i]); + } else { + gf_log ("epoll", GF_LOG_WARNING, + "Failed to start thread for index %d", + i); + if (i == 0) { + GF_FREE (ev_data); + break; + } else { + GF_FREE (ev_data); + continue; + } + } + } + } + pthread_mutex_unlock (&event_pool->mutex); - for (i = 0; i < GLUSTERFS_EPOLL_MAXTHREADS; i++) - pthread_join (pollers[i], NULL); + /* Just wait for the first thread, that is created in a joinable state + * and will never die, ensuring this function never returns */ + if (event_pool->pollers[0] != 0) + pthread_join (event_pool->pollers[0], NULL); return ret; } +int +event_reconfigure_threads_epoll (struct event_pool *event_pool, int value) +{ + int i; + int ret; + pthread_t t_id; + int oldthreadcount; + struct event_thread_data *ev_data = NULL; + + /* Set to MAX if greater */ + if (value > EVENT_MAX_THREADS) + value = EVENT_MAX_THREADS; + + /* Default pollers to 1 in case this is set incorrectly */ + if (value <= 0) + value = 1; + + pthread_mutex_lock (&event_pool->mutex); + { + oldthreadcount = event_pool->eventthreadcount; + + if (oldthreadcount < value) { + /* create more poll threads */ + for (i = oldthreadcount; i < value; i++) { + /* Start a thread if the index at this location + * is a 0, so that the older thread is confirmed + * as dead */ + if (event_pool->pollers[i] == 0) { + ev_data = GF_CALLOC (1, + sizeof (*ev_data), + gf_common_mt_event_pool); + if (!ev_data) { + gf_log ("epoll", GF_LOG_WARNING, + "Allocation failure for" + " index %d", i); + continue; + } + + ev_data->event_pool = event_pool; + ev_data->event_index = i + 1; + + ret = pthread_create (&t_id, NULL, + event_dispatch_epoll_worker, + ev_data); + if (ret) { + gf_log ("epoll", GF_LOG_WARNING, + "Failed to start thread for" + " index %d", i); + GF_FREE (ev_data); + } else { + pthread_detach (t_id); + event_pool->pollers[i] = t_id; + } + } + } + } + + /* if value decreases, threads will terminate, themselves */ + event_pool->eventthreadcount = value; + } + pthread_mutex_unlock (&event_pool->mutex); + + return 0; +} struct event_ops event_ops_epoll = { - .new = event_pool_new_epoll, - .event_register = event_register_epoll, - .event_select_on = event_select_on_epoll, - .event_unregister = event_unregister_epoll, - .event_unregister_close = event_unregister_close_epoll, - .event_dispatch = event_dispatch_epoll + .new = event_pool_new_epoll, + .event_register = event_register_epoll, + .event_select_on = event_select_on_epoll, + .event_unregister = event_unregister_epoll, + .event_unregister_close = event_unregister_close_epoll, + .event_dispatch = event_dispatch_epoll, + .event_reconfigure_threads = event_reconfigure_threads_epoll }; #endif |