diff options
author | Shehjar Tikoo <shehjart@zresearch.com> | 2009-04-01 13:59:43 -0700 |
---|---|---|
committer | Anand V. Avati <avati@amp.gluster.com> | 2009-04-02 19:19:18 +0530 |
commit | b64fa35870b54b8d9e62afd71c92e35b1f7a4cfd (patch) | |
tree | 813474ecff343fb2cb9dca2fa11890aee32507be /xlators/performance/io-threads | |
parent | ee79908d3b577c061b497e35481b8d1523502077 (diff) |
io-threads: Add ordered threadpool state and code
Now we have the remaining fops going through the ordered
thread-pool.
To route a request through ordered thread, we use
iot_schedule_ordered(..) and the worker thread for
ordered requests is iot_worker_ordered(..)
Signed-off-by: Anand V. Avati <avati@amp.gluster.com>
Diffstat (limited to 'xlators/performance/io-threads')
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 220 | ||||
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 20 |
2 files changed, 182 insertions, 58 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 899702f1bdd..1d255d97267 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -34,18 +34,12 @@ typedef void *(*iot_worker_fn)(void*); -static void -iot_queue (iot_worker_t *worker, - call_stub_t *stub); - -static call_stub_t * -iot_dequeue (iot_worker_t *worker); - void _iot_queue (iot_worker_t *worker, iot_request_t *req); iot_request_t * iot_init_request (call_stub_t *stub); void iot_startup_workers (iot_worker_t **workers, int start_idx, int count, iot_worker_fn workerfunc); void * iot_worker_unordered (void *arg); +void * iot_worker_ordered (void *arg); void iot_schedule_unordered (iot_conf_t *conf, @@ -79,19 +73,49 @@ iot_schedule_unordered (iot_conf_t *conf, pthread_mutex_unlock (&selected_worker->qlock); } -static void -iot_schedule (iot_conf_t *conf, +void +iot_schedule_ordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub) { - int32_t idx = 0; + uint64_t idx = 0; iot_worker_t *selected_worker = NULL; + iot_request_t * req = NULL; + int ctxret = 0; - if (inode) - idx = (inode->ino % conf->thread_count); - selected_worker = conf->workers[idx]; - - iot_queue (selected_worker, stub); + if (inode == NULL) { + gf_log (conf->this->name, GF_LOG_ERROR, + "Got NULL inode for ordered request"); + return; + } + req = iot_init_request (stub); + LOCK (&inode->lock); + { + ctxret = __inode_ctx_get (inode, conf->this, &idx); + if (ctxret < 0) { + idx = (random () % conf->max_o_threads); + __inode_ctx_put (inode, conf->this, idx); + } + /* inode lock once acquired, cannot be left here + * because other gluster main threads might be + * contending on it to append a request for this file. + * So we'll also leave the lock only after we've + * added the request to the worker queue. + */ + selected_worker = conf->oworkers[idx]; + pthread_mutex_lock (&selected_worker->qlock); + { + if (iot_worker_active (selected_worker)) + _iot_queue (selected_worker, req); + else { + iot_startup_workers (conf->oworkers, idx, 1, + iot_worker_ordered); + _iot_queue (selected_worker, req); + } + } + pthread_mutex_unlock (&selected_worker->qlock); + } + UNLOCK (&inode->lock); } int32_t @@ -183,7 +207,8 @@ iot_chmod (call_frame_t *frame, iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); else { - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode, + stub); fd_unref (fd); } return 0; @@ -227,7 +252,7 @@ iot_fchmod (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -277,7 +302,8 @@ iot_chown (call_frame_t *frame, iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); else { - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode, + stub); fd_unref (fd); } @@ -324,7 +350,7 @@ iot_fchown (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -780,7 +806,7 @@ iot_readv (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -823,7 +849,7 @@ iot_flush (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -870,7 +896,7 @@ iot_fsync (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -923,7 +949,7 @@ iot_writev (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -977,7 +1003,7 @@ iot_lk (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -1032,7 +1058,8 @@ iot_stat (call_frame_t *frame, iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); else { - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode, + stub); fd_unref (fd); } @@ -1080,7 +1107,7 @@ iot_fstat (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -1136,7 +1163,8 @@ iot_truncate (call_frame_t *frame, iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); else { - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode, + stub); fd_unref (fd); } @@ -1186,7 +1214,7 @@ iot_ftruncate (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -1243,7 +1271,7 @@ iot_utimens (call_frame_t *frame, iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); else { - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode, stub); fd_unref (fd); } @@ -1369,30 +1397,88 @@ iot_init_request (call_stub_t *stub) return req; } -static void -iot_queue (iot_worker_t *worker, - call_stub_t *stub) +/* Must be called with worker lock held. */ +int +iot_can_ordered_exit (iot_worker_t * worker) { - iot_request_t *req = NULL; + int allow_exit = 0; + iot_conf_t *conf = NULL; - req = iot_init_request (stub); + conf = worker->conf; + if (worker->queue_size > 0) + goto decided; + + /* We dont want this thread to exit if its index is + * below the min thread count. + */ + if (worker->thread_idx >= conf->min_o_threads) + allow_exit = 1; + +decided: + return allow_exit; +} + +int +iot_ordered_exit (iot_worker_t *worker) +{ + int allow_exit = 0; + + /* It is possible that since the last time we timed out while + * waiting for a request, a new request has been added to this + * worker's request queue. Before we really exit, we must + * check for those requests. + */ pthread_mutex_lock (&worker->qlock); { - _iot_queue (worker, req); - } - pthread_mutex_unlock (&worker->qlock); + allow_exit = iot_can_ordered_exit (worker); + + if (allow_exit) { + worker->state = IOT_STATE_DEAD; + worker->thread = 0; + } + } + pthread_mutex_unlock (&worker->qlock); + + return allow_exit; } -static call_stub_t * -iot_dequeue (iot_worker_t *worker) +int +iot_ordered_request_wait (iot_worker_t * worker) { - call_stub_t *stub = NULL; - iot_request_t *req = NULL; + struct timeval tv; + struct timespec ts; + int waitres = 0; + + gettimeofday (&tv, NULL); + ts.tv_sec = tv.tv_sec + worker->conf->o_idle_time; + /* Slightly skew the idle time for threads so that, we dont + * have all of them rushing to exit at the same time, if + * they've been idle. + */ + ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000; + waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock, + &ts); + if (waitres == ETIMEDOUT) + return -1; + + return 0; +} + +call_stub_t * +iot_dequeue_ordered (iot_worker_t *worker) +{ + call_stub_t *stub = NULL; + iot_request_t *req = NULL; + int waitstat = 0; pthread_mutex_lock (&worker->qlock); { - while (!worker->queue_size) - pthread_cond_wait (&worker->dq_cond, &worker->qlock); + while (!worker->queue_size) { + waitstat = 0; + waitstat = iot_ordered_request_wait (worker); + if (waitstat == -1) + goto out; + } list_for_each_entry (req, &worker->rqlist, list) break; @@ -1401,6 +1487,7 @@ iot_dequeue (iot_worker_t *worker) worker->queue_size--; } +out: pthread_mutex_unlock (&worker->qlock); FREE (req); @@ -1408,17 +1495,25 @@ iot_dequeue (iot_worker_t *worker) return stub; } -static void * -iot_worker (void *arg) +void * +iot_worker_ordered (void *arg) { - iot_worker_t *worker = arg; + iot_worker_t *worker = arg; + call_stub_t *stub = NULL; while (1) { - call_stub_t *stub; - stub = iot_dequeue (worker); + stub = iot_dequeue_ordered (worker); + if (stub == NULL) { + if (iot_ordered_exit (worker)) + break; + else + continue; + } call_resume (stub); } + + return NULL; } /* Must be called with worker lock held. */ @@ -1460,7 +1555,7 @@ iot_unordered_exit (iot_worker_t *worker) worker->state = IOT_STATE_DEAD; worker->thread = 0; } - } + } pthread_mutex_unlock (&worker->qlock); return allow_exit; @@ -1468,11 +1563,11 @@ iot_unordered_exit (iot_worker_t *worker) int -iot_request_wait_idleness (iot_worker_t * worker) +iot_unordered_request_wait (iot_worker_t * worker) { - struct timeval tv; + struct timeval tv; struct timespec ts; - int waitres = 0; + int waitres = 0; gettimeofday (&tv, NULL); ts.tv_sec = tv.tv_sec + worker->conf->u_idle_time; @@ -1481,7 +1576,7 @@ iot_request_wait_idleness (iot_worker_t * worker) * they've been idle. */ ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000; - waitres = pthread_cond_timedwait (&worker->dq_cond,&worker->qlock, + waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock, &ts); if (waitres == ETIMEDOUT) return -1; @@ -1501,7 +1596,7 @@ iot_dequeue_unordered (iot_worker_t *worker) { while (!worker->queue_size) { waitstat = 0; - waitstat = iot_request_wait_idleness (worker); + waitstat = iot_unordered_request_wait (worker); /* If -1, request wait must've timed * out. */ @@ -1613,14 +1708,16 @@ iot_startup_workers (iot_worker_t **workers, int start_idx, int count, static void workers_init (iot_conf_t *conf) { - conf->workers = allocate_worker_array (conf->thread_count); - allocate_workers (conf, conf->workers, 0, conf->thread_count); - /* Initialize un-ordered workers */ conf->uworkers = allocate_worker_array (conf->max_u_threads); allocate_workers (conf, conf->uworkers, 0, conf->max_u_threads); - iot_startup_workers (conf->workers, 0, conf->thread_count, iot_worker); + /* Initialize ordered workers */ + conf->oworkers = allocate_worker_array (conf->max_o_threads); + allocate_workers (conf, conf->oworkers, 0, conf->max_o_threads); + + iot_startup_workers (conf->oworkers, 0, conf->min_o_threads, + iot_worker_ordered); iot_startup_workers (conf->uworkers, 0, conf->min_u_threads, iot_worker_unordered); } @@ -1666,6 +1763,13 @@ init (xlator_t *this) conf->min_u_threads = IOT_MIN_THREADS; conf->u_idle_time = IOT_DEFAULT_IDLE; + /* Init params for ordered workers. */ + pthread_mutex_init (&conf->otlock, NULL); + conf->max_o_threads = IOT_MAX_THREADS; + conf->min_o_threads = IOT_MIN_THREADS; + conf->o_idle_time = IOT_DEFAULT_IDLE; + + conf->this = this; workers_init (conf); this->private = conf; diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index f02b641f415..412b3cc7638 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -83,6 +83,26 @@ struct iot_conf { int32_t thread_count; struct iot_worker ** workers; + xlator_t *this; + /* Config state for ordered threads. */ + pthread_mutex_t otlock; /* Used to sync any state that needs to be + changed by the ordered threads. + */ + + int max_o_threads; /* Max. number of ordered threads */ + int min_o_threads; /* Min. number of ordered threads. Ordered + thread count never falls below this + threshold. + */ + + int o_idle_time; /* in Secs. The idle time after which an + ordered thread exits. + */ + + struct iot_worker **oworkers; /* Ordered thread pool. */ + + + /* Config state for unordered threads */ pthread_mutex_t utlock; /* Used for scaling un-ordered threads. */ struct iot_worker **uworkers; /* Un-ordered thread pool. */ int max_u_threads; /* Number of unordered threads will not be |