diff options
author | Raghavendra G <raghavendra@zresearch.com> | 2009-05-15 05:50:09 +0000 |
---|---|---|
committer | Anand V. Avati <avati@dev.gluster.com> | 2009-05-20 11:37:37 -0700 |
commit | ef0af3ca33a007f2aae2016cc27b6d828367c987 (patch) | |
tree | bb9757c246237b317213f26c3727aa303a4782ae /xlators/performance/io-threads | |
parent | c22810911149506d972133c4e87dbcab01330daa (diff) |
io-threads: Add graceful shutdown of worker threads
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
Diffstat (limited to 'xlators/performance/io-threads')
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 95 | ||||
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 9 |
2 files changed, 79 insertions, 25 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 6f0cb06b99a..813b7d73d81 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -35,6 +35,12 @@ typedef void *(*iot_worker_fn)(void*); void +iot_stop_worker (iot_worker_t *worker); + +void +iot_stop_workers (iot_worker_t **workers, int start_idx, int count); + +void _iot_queue (iot_worker_t *worker, iot_request_t *req); iot_request_t * @@ -2264,20 +2270,24 @@ iot_can_ordered_exit (iot_worker_t * worker) /* We dont want this thread to exit if its index is * below the min thread count. */ - if ((worker->thread_idx >= conf->min_o_threads)) + if (worker->thread_idx >= conf->min_o_threads) allow_exit = _gf_true; return allow_exit; } - /* Must be called with worker lock held. */ gf_boolean_t -iot_ordered_exit (iot_worker_t *worker) +iot_ordered_exit (int cond_waitres, iot_worker_t *worker) { gf_boolean_t allow_exit = _gf_false; - allow_exit = iot_can_ordered_exit (worker); + if (worker->state == IOT_STATE_EXIT_REQUEST) { + allow_exit = 1; + } else if (cond_waitres == ETIMEDOUT) { + allow_exit = iot_can_ordered_exit (worker); + } + if (allow_exit) { worker->state = IOT_STATE_DEAD; worker->thread = 0; @@ -2295,6 +2305,11 @@ iot_ordered_request_wait (iot_worker_t * worker) int waitres = 0; int retstat = 0; + if (worker->state == IOT_STATE_EXIT_REQUEST) { + retstat = -1; + goto out; + } + gettimeofday (&tv, NULL); ts.tv_sec = tv.tv_sec + worker->conf->o_idle_time; /* Slightly skew the idle time for threads so that, we dont @@ -2304,10 +2319,11 @@ iot_ordered_request_wait (iot_worker_t * worker) ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000; waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock, &ts); - if (waitres == ETIMEDOUT) - if (iot_ordered_exit (worker)) - retstat = -1; + if (iot_ordered_exit (waitres, worker)) { + retstat = -1; + } +out: return retstat; } @@ -2376,11 +2392,10 @@ iot_can_unordered_exit (iot_worker_t * worker) iot_conf_t *conf = NULL; conf = worker->conf; - /* We dont want this thread to exit if its index is * below the min thread count. */ - if ((worker->thread_idx >= conf->min_u_threads)) + if (worker->thread_idx >= conf->min_u_threads) allow_exit = _gf_true; return allow_exit; @@ -2389,11 +2404,16 @@ iot_can_unordered_exit (iot_worker_t * worker) /* Must be called with worker lock held. */ gf_boolean_t -iot_unordered_exit (iot_worker_t *worker) +iot_unordered_exit (int cond_waitres, iot_worker_t *worker) { gf_boolean_t allow_exit = _gf_false; - allow_exit = iot_can_unordered_exit (worker); + if (worker->state == IOT_STATE_EXIT_REQUEST) { + allow_exit = 1; + } else if (cond_waitres == ETIMEDOUT) { + allow_exit = iot_can_unordered_exit (worker); + } + if (allow_exit) { worker->state = IOT_STATE_DEAD; worker->thread = 0; @@ -2411,6 +2431,11 @@ iot_unordered_request_wait (iot_worker_t * worker) int waitres = 0; int retstat = 0; + if (worker->state == IOT_STATE_EXIT_REQUEST) { + retstat = -1; + goto out; + } + gettimeofday (&tv, NULL); ts.tv_sec = tv.tv_sec + worker->conf->u_idle_time; /* Slightly skew the idle time for threads so that, we dont @@ -2420,10 +2445,11 @@ iot_unordered_request_wait (iot_worker_t * worker) ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000; waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock, &ts); - if (waitres == ETIMEDOUT) - if (iot_unordered_exit (worker)) - retstat = -1; + if (iot_unordered_exit (waitres, worker)) { + retstat = -1; + } +out: return retstat; } @@ -2564,6 +2590,35 @@ out: } +void +iot_stop_worker (iot_worker_t *worker) +{ + pthread_mutex_lock (&worker->qlock); + { + worker->state = IOT_STATE_EXIT_REQUEST; + } + pthread_mutex_unlock (&worker->qlock); + + pthread_cond_broadcast (&worker->dq_cond); + pthread_join (worker->thread, NULL); +} + + +void +iot_stop_workers (iot_worker_t **workers, int start_idx, int count) +{ + int i = 0; + int end_idx = 0; + + end_idx = start_idx + count; + for (i = start_idx; i < end_idx; i++) { + if (workers[i] != NULL) { + iot_stop_worker (workers[i]); + } + } +} + + int iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc) { @@ -2623,10 +2678,8 @@ void iot_cleanup_workers (iot_conf_t *conf) { if (conf->uworkers != NULL) { - /* - iot_stop_workers (conf->oworkers, 0, - conf->max_u_threads); - */ + iot_stop_workers (conf->uworkers, 0, + conf->max_u_threads); deallocate_workers (conf->uworkers, 0, conf->max_u_threads); @@ -2635,10 +2688,8 @@ iot_cleanup_workers (iot_conf_t *conf) } if (conf->oworkers != NULL) { - /* - iot_stop_workers (conf->uworkers, 0, - conf->max_o_threads); - */ + iot_stop_workers (conf->oworkers, 0, + conf->max_o_threads); deallocate_workers (conf->oworkers, 0, conf->max_o_threads); diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index f0210942854..8075972b455 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -49,8 +49,11 @@ struct iot_request { call_stub_t *stub; }; -#define IOT_STATE_ACTIVE 1 -#define IOT_STATE_DEAD 2 +typedef enum { + IOT_STATE_ACTIVE, + IOT_STATE_EXIT_REQUEST, + IOT_STATE_DEAD +}iot_state_t; #define iot_worker_active(wrk) ((wrk)->state == IOT_STATE_ACTIVE) #define MAX_IDLE_SKEW 1000 /* usecs */ @@ -76,7 +79,7 @@ struct iot_worker { pthread_mutex_t qlock; int32_t queue_size; pthread_t thread; - int state; /* What state is the thread in. */ + iot_state_t state; /* What state is the thread in. */ int thread_idx; /* Thread's index into the worker array. Since this will be thread local data, for ensuring that |