diff options
author | Shehjar Tikoo <shehjart@zresearch.com> | 2009-04-01 13:59:24 -0700 |
---|---|---|
committer | Anand V. Avati <avati@amp.gluster.com> | 2009-04-02 19:19:14 +0530 |
commit | ee79908d3b577c061b497e35481b8d1523502077 (patch) | |
tree | 9397ee47bcfe31cceae0014423ebfe8ee5b59bfe | |
parent | e27f7f344e12d0885a48fcca8dfce36440f5e9e8 (diff) |
io-threads: Add un-ordered thread-pool.
This commit adds everything needed to:
a. Get un-ordered request going through the un-ordered
thread-pool. This happens through, the
iot_schedule_unordered(..). The unordered thread-pool
consists of thread running the iot_worker_unordered(..)
function.
b. Make threads in the un-ordered thread pool start-up
and exit depending on the thread state.
Note that at this point the requests that need
ordering are still going through iot_schedule(..).
Signed-off-by: Anand V. Avati <avati@amp.gluster.com>
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 399 | ||||
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 33 |
2 files changed, 334 insertions, 98 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 59fc642ca..899702f1b 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -28,6 +28,11 @@ #include "dict.h" #include "xlator.h" #include "io-threads.h" +#include <stdlib.h> +#include <sys/time.h> +#include <time.h> + +typedef void *(*iot_worker_fn)(void*); static void iot_queue (iot_worker_t *worker, @@ -36,6 +41,44 @@ iot_queue (iot_worker_t *worker, static call_stub_t * iot_dequeue (iot_worker_t *worker); +void _iot_queue (iot_worker_t *worker, iot_request_t *req); +iot_request_t * iot_init_request (call_stub_t *stub); +void iot_startup_workers (iot_worker_t **workers, int start_idx, int count, + iot_worker_fn workerfunc); +void * iot_worker_unordered (void *arg); + +void +iot_schedule_unordered (iot_conf_t *conf, + inode_t *inode, + call_stub_t *stub) +{ + int32_t idx = 0; + iot_worker_t *selected_worker = NULL; + iot_request_t *req = NULL; + + /* First decide which thread will service the request. + * FIXME: This should change into some form of load-balancing. + * */ + idx = (random() % conf->max_u_threads); + selected_worker = conf->uworkers[idx]; + + req = iot_init_request (stub); + /* Having decided that, we must check whether the thread is + * active at all. + */ + pthread_mutex_lock (&selected_worker->qlock); + { + if (iot_worker_active (selected_worker)) + _iot_queue (selected_worker, req); + else { + iot_startup_workers (conf->uworkers, idx, 1, + iot_worker_unordered); + _iot_queue (selected_worker, req); + } + } + pthread_mutex_unlock (&selected_worker->qlock); +} + static void iot_schedule (iot_conf_t *conf, inode_t *inode, @@ -92,7 +135,7 @@ iot_lookup (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -126,6 +169,7 @@ iot_chmod (call_frame_t *frame, mode_t mode) { call_stub_t *stub = NULL; + fd_t *fd = NULL; stub = fop_chmod_stub (frame, iot_chmod_wrapper, loc, mode); if (!stub) { @@ -133,7 +177,15 @@ iot_chmod (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + + fd = fd_lookup (loc->inode, frame->root->pid); + if (fd == NULL) + iot_schedule_unordered ((iot_conf_t *)this->private, + loc->inode, stub); + else { + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + fd_unref (fd); + } return 0; } @@ -211,6 +263,7 @@ iot_chown (call_frame_t *frame, gid_t gid) { call_stub_t *stub = NULL; + fd_t *fd = NULL; stub = fop_chown_stub (frame, iot_chown_wrapper, loc, uid, gid); if (!stub) { @@ -219,7 +272,14 @@ iot_chown (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + fd = fd_lookup (loc->inode, frame->root->pid); + if (fd == NULL) + iot_schedule_unordered ((iot_conf_t *)this->private, + loc->inode, stub); + else { + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + fd_unref (fd); + } return 0; } @@ -306,7 +366,7 @@ iot_access (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -349,7 +409,7 @@ iot_readlink (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -394,7 +454,7 @@ iot_mknod (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -438,7 +498,7 @@ iot_mkdir (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -477,7 +537,7 @@ iot_rmdir (call_frame_t *frame, return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -520,16 +580,7 @@ iot_symlink (call_frame_t *frame, return 0; } - /* Passing loc->inode does not make sense right now. - * Why? because passing - * loc->inode makes the request get ordered on the target - * file's thread, while we shouldnt really worry because this - * operation will not change the target in loc. For now, know - * that this works. Such requests, which operate on a new - * file/link, such as that in linkname, will be sent to a pool of - * requests meant specifically for meta-data requests. - */ - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -571,19 +622,8 @@ iot_rename (call_frame_t *frame, return 0; } - /* We should order this on the oldloc. rename() - * allows the blocks of the newloc to be available till - * the last process that might have it open, closes the file. - * I suppose this is a trade-off and we weigh in favour of - * ordering on oldloc because the client issuing a rename() - * would expect the oldloc's contents to be available at the - * new location after this request. rename()'s guarantee that - * the current - * newloc's block will not be released or over-written allows - * any othe processes that have the newloc open, to continue - * operating. - */ - iot_schedule ((iot_conf_t *)this->private, oldloc->inode, stub); + iot_schedule_unordered ((iot_conf_t *)this->private, oldloc->inode, + stub); return 0; } @@ -627,7 +667,7 @@ iot_open (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL, 0); return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -684,7 +724,7 @@ iot_create (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL, 0); return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -976,20 +1016,7 @@ iot_stat (call_frame_t *frame, call_stub_t *stub; fd_t *fd = NULL; - fd = fd_lookup (loc->inode, frame->root->pid); - - if (fd == NULL) { - STACK_WIND(frame, - iot_stat_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->stat, - loc); - return 0; - } - - fd_unref (fd); - - stub = fop_stat_stub (frame, + stub = fop_stat_stub (frame, iot_stat_wrapper, loc); if (!stub) { @@ -997,7 +1024,17 @@ iot_stat (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + + fd = fd_lookup (loc->inode, frame->root->pid); + /* File is not open, so we can send it through unordered pool. + */ + if (fd == NULL) + iot_schedule_unordered ((iot_conf_t *)this->private, + loc->inode, stub); + else { + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + fd_unref (fd); + } return 0; } @@ -1083,22 +1120,8 @@ iot_truncate (call_frame_t *frame, { call_stub_t *stub; fd_t *fd = NULL; - - fd = fd_lookup (loc->inode, frame->root->pid); - if (fd == NULL) { - STACK_WIND(frame, - iot_truncate_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->truncate, - loc, - offset); - return 0; - } - - fd_unref (fd); - - stub = fop_truncate_stub (frame, + stub = fop_truncate_stub (frame, iot_truncate_wrapper, loc, offset); @@ -1107,7 +1130,15 @@ iot_truncate (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + + fd = fd_lookup (loc->inode, frame->root->pid); + if (fd == NULL) + iot_schedule_unordered ((iot_conf_t *)this->private, + loc->inode, stub); + else { + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + fd_unref (fd); + } return 0; } @@ -1196,20 +1227,6 @@ iot_utimens (call_frame_t *frame, { call_stub_t *stub; fd_t *fd = NULL; - - fd = fd_lookup (loc->inode, frame->root->pid); - - if (fd == NULL) { - STACK_WIND(frame, - iot_utimens_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->utimens, - loc, - tv); - return 0; - } - - fd_unref (fd); stub = fop_utimens_stub (frame, iot_utimens_wrapper, @@ -1220,7 +1237,15 @@ iot_utimens (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + + fd = fd_lookup (loc->inode, frame->root->pid); + if (fd == NULL) + iot_schedule_unordered ((iot_conf_t *)this->private, + loc->inode, stub); + else { + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + fd_unref (fd); + } return 0; } @@ -1272,7 +1297,7 @@ iot_checksum (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL); return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -1315,15 +1340,25 @@ iot_unlink (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM); return 0; } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + iot_schedule_unordered((iot_conf_t *)this->private, loc->inode, stub); return 0; } +/* Must be called with worker lock held */ +void +_iot_queue (iot_worker_t *worker, + iot_request_t *req) +{ + list_add_tail (&req->list, &worker->rqlist); -static void -iot_queue (iot_worker_t *worker, - call_stub_t *stub) + /* dq_cond */ + worker->queue_size++; + pthread_cond_broadcast (&worker->dq_cond); +} + +iot_request_t * +iot_init_request (call_stub_t *stub) { iot_request_t *req = NULL; @@ -1331,13 +1366,19 @@ iot_queue (iot_worker_t *worker, ERR_ABORT (req); req->stub = stub; + return req; +} + +static void +iot_queue (iot_worker_t *worker, + call_stub_t *stub) +{ + iot_request_t *req = NULL; + + req = iot_init_request (stub); pthread_mutex_lock (&worker->qlock); { - list_add_tail (&req->list, &worker->rqlist); - - /* dq_cond */ - worker->queue_size++; - pthread_cond_broadcast (&worker->dq_cond); + _iot_queue (worker, req); } pthread_mutex_unlock (&worker->qlock); } @@ -1380,6 +1421,135 @@ iot_worker (void *arg) } } +/* Must be called with worker lock held. */ +int +iot_can_unordered_exit (iot_worker_t * worker) +{ + int allow_exit = 0; + iot_conf_t *conf = NULL; + + conf = worker->conf; + if (worker->queue_size > 0) + goto decided; + + /* We dont want this thread to exit if its index is + * below the min thread count. + */ + if (worker->thread_idx >= conf->min_u_threads) + allow_exit = 1; + +decided: + return allow_exit; +} + +int +iot_unordered_exit (iot_worker_t *worker) +{ + int allow_exit = 0; + + /* It is possible that since the last time we timed out while + * waiting for a request, a new request has been added to this + * worker's request queue. Before we really exit, we must + * check for those requests. + */ + pthread_mutex_lock (&worker->qlock); + { + allow_exit = iot_can_unordered_exit (worker); + + if (allow_exit) { + worker->state = IOT_STATE_DEAD; + worker->thread = 0; + } + } + pthread_mutex_unlock (&worker->qlock); + + return allow_exit; +} + + +int +iot_request_wait_idleness (iot_worker_t * worker) +{ + struct timeval tv; + struct timespec ts; + int waitres = 0; + + gettimeofday (&tv, NULL); + ts.tv_sec = tv.tv_sec + worker->conf->u_idle_time; + /* Slightly skew the idle time for threads so that, we dont + * have all of them rushing to exit at the same time, if + * they've been idle. + */ + ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000; + waitres = pthread_cond_timedwait (&worker->dq_cond,&worker->qlock, + &ts); + if (waitres == ETIMEDOUT) + return -1; + + return 0; +} + + +call_stub_t * +iot_dequeue_unordered (iot_worker_t *worker) +{ + call_stub_t *stub= NULL; + iot_request_t *req = NULL; + int waitstat = 0; + + pthread_mutex_lock (&worker->qlock); + { + while (!worker->queue_size) { + waitstat = 0; + waitstat = iot_request_wait_idleness (worker); + /* If -1, request wait must've timed + * out. + */ + if (waitstat == -1) + goto out; + } + + list_for_each_entry (req, &worker->rqlist, list) + break; + list_del (&req->list); + stub = req->stub; + + worker->queue_size--; + } +out: + pthread_mutex_unlock (&worker->qlock); + + FREE (req); + + return stub; +} + + +void * +iot_worker_unordered (void *arg) +{ + iot_worker_t *worker = arg; + call_stub_t *stub = NULL; + + while (1) { + + stub = iot_dequeue_unordered (worker); + /* If no request was received, we must've timed out, + * if so, check if we can exit. + */ + if (stub == NULL) { + if (iot_unordered_exit (worker)) + break; + else + continue; + } + + call_resume (stub); + } + return NULL; +} + + static iot_worker_t ** allocate_worker_array (int count) { @@ -1403,32 +1573,57 @@ allocate_worker (iot_conf_t * conf) wrk->conf = conf; pthread_cond_init (&wrk->dq_cond, NULL); pthread_mutex_init (&wrk->qlock, NULL); + wrk->state = IOT_STATE_DEAD; return wrk; } static void allocate_workers (iot_conf_t *conf, - int count, - int start_alloc_idx) + iot_worker_t ** workers, + int start_alloc_idx, + int count) { int i, end_count; end_count = count + start_alloc_idx; for (i = start_alloc_idx; i < end_count; i++) { - conf->workers[i] = allocate_worker (conf); - pthread_create (&conf->workers[i]->thread, NULL, iot_worker, - conf->workers[i]); + workers[i] = allocate_worker (conf); + workers[i]->thread_idx = i; + } +} + + +void +iot_startup_workers (iot_worker_t **workers, int start_idx, int count, + iot_worker_fn workerfunc) +{ + int i = 0; + int end_idx = 0; + + end_idx = start_idx + count; + for (i = start_idx; i < end_idx; i++) { + workers[i]->state = IOT_STATE_ACTIVE; + pthread_create (&workers[i]->thread, NULL, workerfunc, + workers[i]); } + } static void workers_init (iot_conf_t *conf) { conf->workers = allocate_worker_array (conf->thread_count); - allocate_workers (conf, conf->thread_count, 0); -} + allocate_workers (conf, conf->workers, 0, conf->thread_count); + /* Initialize un-ordered workers */ + conf->uworkers = allocate_worker_array (conf->max_u_threads); + allocate_workers (conf, conf->uworkers, 0, conf->max_u_threads); + + iot_startup_workers (conf->workers, 0, conf->thread_count, iot_worker); + iot_startup_workers (conf->uworkers, 0, conf->min_u_threads, + iot_worker_unordered); +} int32_t @@ -1463,6 +1658,14 @@ init (xlator_t *this) conf->thread_count); } + /* Init params for un-ordered workers. These should be got from + * the volfile options. + */ + pthread_mutex_init (&conf->utlock, NULL); + conf->max_u_threads = IOT_MAX_THREADS; + conf->min_u_threads = IOT_MIN_THREADS; + conf->u_idle_time = IOT_DEFAULT_IDLE; + workers_init (conf); this->private = conf; @@ -1501,13 +1704,13 @@ struct xlator_fops fops = { .ftruncate = iot_ftruncate, /* O */ .utimens = iot_utimens, /* V */ .checksum = iot_checksum, /* U */ - .unlink = iot_unlink, /* V */ + .unlink = iot_unlink, /* U */ .lookup = iot_lookup, /* U */ .chmod = iot_chmod, /* V */ .fchmod = iot_fchmod, /* O */ .chown = iot_chown, /* V */ .fchown = iot_fchown, /* O */ - .access = iot_access, /* V */ + .access = iot_access, /* U */ .readlink = iot_readlink, /* U */ .mknod = iot_mknod, /* U */ .mkdir = iot_mkdir, /* U */ diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index 797552900..f02b641f4 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -33,6 +33,7 @@ #include "xlator.h" #include "common-utils.h" #include "list.h" +#include <stdlib.h> #define min(a,b) ((a)<(b)?(a):(b)) #define max(a,b) ((a)>(b)?(a):(b)) @@ -48,6 +49,17 @@ struct iot_request { call_stub_t *stub; }; +#define IOT_STATE_ACTIVE 1 +#define IOT_STATE_DEAD 2 +#define iot_worker_active(wrk) ((wrk)->state == IOT_STATE_ACTIVE) + +#define MAX_IDLE_SKEW 1000 /* usecs */ +#define skew_usec_idle_time(usec) ((usec) + (random () % MAX_IDLE_SKEW)) +#define IOT_DEFAULT_IDLE 180 /* In secs. */ + +#define IOT_MIN_THREADS 32 +#define IOT_MAX_THREADS 512 + struct iot_worker { struct list_head rqlist; /* List of requests assigned to me. */ struct iot_conf *conf; @@ -56,11 +68,32 @@ struct iot_worker { pthread_mutex_t qlock; int32_t queue_size; pthread_t thread; + int state; /* What state is the thread in. */ + int thread_idx; /* Thread's index into the worker array. Since this + will be thread local data, for ensuring that number + of threads dont fall below a minimum, we just dont + allow threads with specific indices to exit. + Helps us in eliminating one place where otherwise + a lock would have been required to update centralized + state inside conf. + */ }; struct iot_conf { int32_t thread_count; struct iot_worker ** workers; + + pthread_mutex_t utlock; /* Used for scaling un-ordered threads. */ + struct iot_worker **uworkers; /* Un-ordered thread pool. */ + int max_u_threads; /* Number of unordered threads will not be + higher than this. + */ + int min_u_threads; /* Number of unordered threads should not + fall below this value. */ + int u_idle_time; /* If an unordered thread does not get a + request for this amount of secs, it should + try to die. + */ }; typedef struct iot_conf iot_conf_t; |