summaryrefslogtreecommitdiffstats
path: root/xlators
diff options
context:
space:
mode:
authorRaghavendra G <raghavendra@zresearch.com>2009-05-15 05:50:09 +0000
committerAnand V. Avati <avati@dev.gluster.com>2009-05-20 11:37:37 -0700
commitef0af3ca33a007f2aae2016cc27b6d828367c987 (patch)
treebb9757c246237b317213f26c3727aa303a4782ae /xlators
parentc22810911149506d972133c4e87dbcab01330daa (diff)
io-threads: Add graceful shutdown of worker threads
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
Diffstat (limited to 'xlators')
-rw-r--r--xlators/performance/io-threads/src/io-threads.c95
-rw-r--r--xlators/performance/io-threads/src/io-threads.h9
2 files changed, 79 insertions, 25 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index 6f0cb06b9..813b7d73d 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -35,6 +35,12 @@
typedef void *(*iot_worker_fn)(void*);
void
+iot_stop_worker (iot_worker_t *worker);
+
+void
+iot_stop_workers (iot_worker_t **workers, int start_idx, int count);
+
+void
_iot_queue (iot_worker_t *worker, iot_request_t *req);
iot_request_t *
@@ -2264,20 +2270,24 @@ iot_can_ordered_exit (iot_worker_t * worker)
/* We dont want this thread to exit if its index is
* below the min thread count.
*/
- if ((worker->thread_idx >= conf->min_o_threads))
+ if (worker->thread_idx >= conf->min_o_threads)
allow_exit = _gf_true;
return allow_exit;
}
-
/* Must be called with worker lock held. */
gf_boolean_t
-iot_ordered_exit (iot_worker_t *worker)
+iot_ordered_exit (int cond_waitres, iot_worker_t *worker)
{
gf_boolean_t allow_exit = _gf_false;
- allow_exit = iot_can_ordered_exit (worker);
+ if (worker->state == IOT_STATE_EXIT_REQUEST) {
+ allow_exit = 1;
+ } else if (cond_waitres == ETIMEDOUT) {
+ allow_exit = iot_can_ordered_exit (worker);
+ }
+
if (allow_exit) {
worker->state = IOT_STATE_DEAD;
worker->thread = 0;
@@ -2295,6 +2305,11 @@ iot_ordered_request_wait (iot_worker_t * worker)
int waitres = 0;
int retstat = 0;
+ if (worker->state == IOT_STATE_EXIT_REQUEST) {
+ retstat = -1;
+ goto out;
+ }
+
gettimeofday (&tv, NULL);
ts.tv_sec = tv.tv_sec + worker->conf->o_idle_time;
/* Slightly skew the idle time for threads so that, we dont
@@ -2304,10 +2319,11 @@ iot_ordered_request_wait (iot_worker_t * worker)
ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000;
waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock,
&ts);
- if (waitres == ETIMEDOUT)
- if (iot_ordered_exit (worker))
- retstat = -1;
+ if (iot_ordered_exit (waitres, worker)) {
+ retstat = -1;
+ }
+out:
return retstat;
}
@@ -2376,11 +2392,10 @@ iot_can_unordered_exit (iot_worker_t * worker)
iot_conf_t *conf = NULL;
conf = worker->conf;
-
/* We dont want this thread to exit if its index is
* below the min thread count.
*/
- if ((worker->thread_idx >= conf->min_u_threads))
+ if (worker->thread_idx >= conf->min_u_threads)
allow_exit = _gf_true;
return allow_exit;
@@ -2389,11 +2404,16 @@ iot_can_unordered_exit (iot_worker_t * worker)
/* Must be called with worker lock held. */
gf_boolean_t
-iot_unordered_exit (iot_worker_t *worker)
+iot_unordered_exit (int cond_waitres, iot_worker_t *worker)
{
gf_boolean_t allow_exit = _gf_false;
- allow_exit = iot_can_unordered_exit (worker);
+ if (worker->state == IOT_STATE_EXIT_REQUEST) {
+ allow_exit = 1;
+ } else if (cond_waitres == ETIMEDOUT) {
+ allow_exit = iot_can_unordered_exit (worker);
+ }
+
if (allow_exit) {
worker->state = IOT_STATE_DEAD;
worker->thread = 0;
@@ -2411,6 +2431,11 @@ iot_unordered_request_wait (iot_worker_t * worker)
int waitres = 0;
int retstat = 0;
+ if (worker->state == IOT_STATE_EXIT_REQUEST) {
+ retstat = -1;
+ goto out;
+ }
+
gettimeofday (&tv, NULL);
ts.tv_sec = tv.tv_sec + worker->conf->u_idle_time;
/* Slightly skew the idle time for threads so that, we dont
@@ -2420,10 +2445,11 @@ iot_unordered_request_wait (iot_worker_t * worker)
ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000;
waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock,
&ts);
- if (waitres == ETIMEDOUT)
- if (iot_unordered_exit (worker))
- retstat = -1;
+ if (iot_unordered_exit (waitres, worker)) {
+ retstat = -1;
+ }
+out:
return retstat;
}
@@ -2564,6 +2590,35 @@ out:
}
+void
+iot_stop_worker (iot_worker_t *worker)
+{
+ pthread_mutex_lock (&worker->qlock);
+ {
+ worker->state = IOT_STATE_EXIT_REQUEST;
+ }
+ pthread_mutex_unlock (&worker->qlock);
+
+ pthread_cond_broadcast (&worker->dq_cond);
+ pthread_join (worker->thread, NULL);
+}
+
+
+void
+iot_stop_workers (iot_worker_t **workers, int start_idx, int count)
+{
+ int i = 0;
+ int end_idx = 0;
+
+ end_idx = start_idx + count;
+ for (i = start_idx; i < end_idx; i++) {
+ if (workers[i] != NULL) {
+ iot_stop_worker (workers[i]);
+ }
+ }
+}
+
+
int
iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc)
{
@@ -2623,10 +2678,8 @@ void
iot_cleanup_workers (iot_conf_t *conf)
{
if (conf->uworkers != NULL) {
- /*
- iot_stop_workers (conf->oworkers, 0,
- conf->max_u_threads);
- */
+ iot_stop_workers (conf->uworkers, 0,
+ conf->max_u_threads);
deallocate_workers (conf->uworkers, 0,
conf->max_u_threads);
@@ -2635,10 +2688,8 @@ iot_cleanup_workers (iot_conf_t *conf)
}
if (conf->oworkers != NULL) {
- /*
- iot_stop_workers (conf->uworkers, 0,
- conf->max_o_threads);
- */
+ iot_stop_workers (conf->oworkers, 0,
+ conf->max_o_threads);
deallocate_workers (conf->oworkers, 0,
conf->max_o_threads);
diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h
index f02109428..8075972b4 100644
--- a/xlators/performance/io-threads/src/io-threads.h
+++ b/xlators/performance/io-threads/src/io-threads.h
@@ -49,8 +49,11 @@ struct iot_request {
call_stub_t *stub;
};
-#define IOT_STATE_ACTIVE 1
-#define IOT_STATE_DEAD 2
+typedef enum {
+ IOT_STATE_ACTIVE,
+ IOT_STATE_EXIT_REQUEST,
+ IOT_STATE_DEAD
+}iot_state_t;
#define iot_worker_active(wrk) ((wrk)->state == IOT_STATE_ACTIVE)
#define MAX_IDLE_SKEW 1000 /* usecs */
@@ -76,7 +79,7 @@ struct iot_worker {
pthread_mutex_t qlock;
int32_t queue_size;
pthread_t thread;
- int state; /* What state is the thread in. */
+ iot_state_t state; /* What state is the thread in. */
int thread_idx; /* Thread's index into the worker
array. Since this will be thread
local data, for ensuring that