diff options
author | Anand Avati <avati@gluster.com> | 2010-02-22 11:00:20 +0000 |
---|---|---|
committer | Anand V. Avati <avati@dev.gluster.com> | 2010-02-22 08:51:40 -0800 |
commit | 5ae4f11319de9a800a595175678762f9fc924755 (patch) | |
tree | 0e19ec86de320453e8441fe59fe3f23844eb7d5d | |
parent | 5f524f4b2f0dbccfe6c8d4aab16ce425dd6d2b50 (diff) |
io-threads: single queue/multi-thread model
This patch lets io-threads work with a single queue and multiple
threads work on picking the next request from the queue and process
it.
Whenever the number of pending requests in the queue double, a new
worker thread is spawned.
Workers expire after a (configurable) timeout of inactivity
Signed-off-by: Anand V. Avati <avati@amp.gluster.com>
Signed-off-by: Anand V. Avati <avati@blackhole.gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 583 (filesystem access hangs while deleting large files)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=583
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 976 | ||||
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 134 |
2 files changed, 154 insertions, 956 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index e99012cc0..5339ce9a8 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -33,283 +33,126 @@ #include <time.h> #include "locking.h" -typedef void *(*iot_worker_fn)(void*); +void *iot_worker (void *arg); +int iot_workers_scale (iot_conf_t *conf); +int __iot_workers_scale (iot_conf_t *conf); -void -iot_stop_worker (iot_worker_t *worker); - -void -iot_stop_workers (iot_worker_t **workers, int start_idx, int count); - -void -_iot_queue (iot_worker_t *worker, iot_request_t *req); - -iot_request_t * -iot_init_request (iot_worker_t *conf, call_stub_t *stub); -int -iot_startup_workers (iot_worker_t **workers, int start_idx, int count, - iot_worker_fn workerfunc); - -void * -iot_worker_unordered (void *arg); - -void * -iot_worker_ordered (void *arg); - -int -iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc); - -void -iot_destroy_request (iot_worker_t *worker, iot_request_t * req); - -void -iot_notify_worker (iot_worker_t *worker) +call_stub_t * +__iot_dequeue (iot_conf_t *conf) { -#ifndef HAVE_SPINLOCK - pthread_cond_broadcast (&worker->notifier); -#else - sem_post (&worker->notifier); -#endif - - return; -} + call_stub_t *stub = NULL; -int -iot_notify_wait (iot_worker_t *worker, int idletime) -{ - struct timeval tv; - struct timespec ts = {0, }; - int waitres = 0; + if (list_empty (&conf->req)) + return NULL; - gettimeofday (&tv, NULL); - /* Slightly skew the idle time for threads so that, we dont - * have all of them rushing to exit at the same time, if - * they've been idle. - */ - ts.tv_sec = skew_sec_idle_time (tv.tv_sec + idletime); - -#ifndef HAVE_SPINLOCK - waitres = pthread_cond_timedwait (&worker->notifier, &worker->qlock, - &ts); -#else - UNLOCK (&worker->qlock); - errno = 0; - waitres = sem_timedwait (&worker->notifier, &ts); - LOCK (&worker->qlock); - if (waitres < 0) - waitres = errno; -#endif + stub = list_entry (conf->req.next, call_stub_t, list); + list_del_init (&stub->list); + conf->queue_size--; - return waitres; + return stub; } + void -iot_notify_init (iot_worker_t *worker) +__iot_enqueue (iot_conf_t *conf, call_stub_t *stub) { - if (worker == NULL) - return; - - LOCK_INIT (&worker->qlock); - -#ifndef HAVE_SPINLOCK - pthread_cond_init (&worker->notifier, NULL); -#else - sem_init (&worker->notifier, 0, 0); -#endif + list_add_tail (&stub->list, &conf->req); + conf->queue_size++; return; } -/* I know this function modularizes things a bit too much, - * but it is easier on the eyes to read this than see all that locking, - * queueing, and thread firing in the same curly block, as was the - * case before this function. - */ -int -iot_request_queue_and_thread_fire (iot_worker_t *worker, - iot_worker_fn workerfunc, iot_request_t *req) -{ - int ret = -1; - LOCK (&worker->qlock); - { - if (iot_worker_active (worker)) { - _iot_queue (worker, req); - ret = 0; - }else { - ret = iot_startup_worker (worker, workerfunc); - if (ret < 0) { - goto unlock; - } - _iot_queue (worker, req); - } - } -unlock: - UNLOCK (&worker->qlock); - return ret; -} +void * +iot_worker (void *data) +{ + iot_conf_t *conf = NULL; + xlator_t *this = NULL; + call_stub_t *stub = NULL; + struct timespec sleep_till = {0, }; + int ret = 0; + char timeout = 0; + char bye = 0; + + conf = data; + this = conf->this; + THIS = this; + + while (1) { + sleep_till.tv_sec = time (NULL) + conf->idle_time; + + pthread_mutex_lock (&conf->mutex); + { + while (list_empty (&conf->req)) { + conf->sleep_count++; + + ret = pthread_cond_timedwait (&conf->cond, + &conf->mutex, + &sleep_till); + conf->sleep_count--; + + if (ret == -1 && errno == ETIMEDOUT) { + timeout = 1; + break; + } + } + if (timeout) { + if (conf->curr_count > IOT_MIN_THREADS) { + conf->curr_count--; + bye = 1; + } else { + timeout = 0; + } + } -int -iot_unordered_request_balancer (iot_conf_t *conf) -{ - long int rand = 0; - int idx = 0; + stub = __iot_dequeue (conf); + } + pthread_mutex_unlock (&conf->mutex); - /* Decide which thread will service the request. - * FIXME: This should change into some form of load-balancing. - * */ - rand = random (); + if (stub) /* guard against spurious wakeups */ + call_resume (stub); - /* If scaling is on, we can choose from any thread - * that has been allocated upto, max_o_threads, but - * with scaling off, we'll never have threads more - * than min_o_threads. - */ - if (iot_unordered_scaling_on (conf)) - idx = (rand % conf->max_u_threads); - else - idx = (rand % conf->min_u_threads); + if (bye) + break; + } - return idx; + return NULL; } int -iot_schedule_unordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub) +iot_schedule (iot_conf_t *conf, call_stub_t *stub) { - int32_t idx = 0; - iot_worker_t *selected_worker = NULL; - iot_request_t *req = NULL; - int ret = -1; + int ret = 0; - idx = iot_unordered_request_balancer (conf); - selected_worker = conf->uworkers[idx]; + pthread_mutex_lock (&conf->mutex); + { + __iot_enqueue (conf, stub); - req = iot_init_request (selected_worker, stub); - if (req == NULL) { - ret = -ENOMEM; - goto out; - } + pthread_cond_signal (&conf->cond); - ret = iot_request_queue_and_thread_fire (selected_worker, - iot_worker_unordered, req); - if (ret < 0) { - iot_destroy_request (selected_worker, req); + ret = __iot_workers_scale (conf); } -out: - return ret; -} - - -/* Only to be used with ordered requests. - */ -uint64_t -iot_create_inode_worker_assoc (iot_conf_t * conf, inode_t * inode) -{ - long int rand = 0; - uint64_t idx = 0; - - rand = random (); - /* If scaling is on, we can choose from any thread - * that has been allocated upto, max_o_threads, but - * with scaling off, we'll never have threads more - * than min_o_threads. - */ - if (iot_ordered_scaling_on (conf)) - idx = (rand % conf->max_o_threads); - else - idx = (rand % conf->min_o_threads); + pthread_mutex_unlock (&conf->mutex); - __inode_ctx_put (inode, conf->this, idx); - - return idx; + return 0; } -/* Assumes inode lock is held. */ -int32_t -iot_ordered_request_balancer (iot_conf_t *conf, inode_t *inode, uint64_t *idx) +int +iot_schedule_unordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub) { - int ret = -1; - - if (__inode_ctx_get (inode, conf->this, idx) < 0) - *idx = iot_create_inode_worker_assoc (conf, inode); - else { - /* Sanity check to ensure the idx received from the inode - * context is within bounds. We're a bit optimistic in - * assuming that if an index is within bounds, it is - * not corrupted. idx is uint so we dont check for less - * than 0. - */ - if ((*idx >= (uint64_t)conf->max_o_threads)) { - gf_log (conf->this->name, GF_LOG_DEBUG, - "inode context returned insane thread index %" - PRIu64, *idx); - ret = -EINVAL; - goto out; - } - } - ret = 0; -out: - return ret; + return iot_schedule (conf, stub); } int iot_schedule_ordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub) { - uint64_t idx = 0; - iot_worker_t *selected_worker = NULL; - iot_request_t *req = NULL; - int balstatus = 0, ret = -1; - - if (inode == NULL) { - gf_log (conf->this->name, GF_LOG_DEBUG, - "Got NULL inode for ordered request"); - ret = -EINVAL; - goto out; - } - - LOCK (&inode->lock); - { - balstatus = iot_ordered_request_balancer (conf, inode, &idx); - if (balstatus < 0) { - gf_log (conf->this->name, GF_LOG_DEBUG, - "Insane worker index. Unwinding stack"); - ret = -ECANCELED; - goto unlock_out; - } - /* inode lock once acquired, cannot be left here - * because other gluster main threads might be - * contending on it to append a request for this file. - * So we'll also leave the lock only after we've - * added the request to the worker queue. - */ - selected_worker = conf->oworkers[idx]; - - req = iot_init_request (selected_worker, stub); - if (req == NULL) { - gf_log (conf->this->name, GF_LOG_ERROR,"out of memory"); - ret = -ENOMEM; - goto unlock_out; - } - ret = iot_request_queue_and_thread_fire (selected_worker, - iot_worker_ordered, - req); - } -unlock_out: - UNLOCK (&inode->lock); - -out: - if (ret < 0) { - if (req != NULL) { - iot_destroy_request (selected_worker, req); - } - } - return ret; + return iot_schedule (conf, stub); } @@ -394,7 +237,7 @@ iot_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, struct stat *stbuf, int32_t valid) { call_stub_t *stub = NULL; - int ret = -1; + int ret = -1; stub = fop_setattr_stub (frame, iot_setattr_wrapper, loc, stbuf, valid); if (!stub) { @@ -445,7 +288,7 @@ iot_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, struct stat *stbuf, int32_t valid) { call_stub_t *stub = NULL; - int ret = -1; + int ret = -1; stub = fop_fsetattr_stub (frame, iot_fsetattr_wrapper, fd, stbuf, valid); @@ -1442,7 +1285,7 @@ iot_checksum_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, FIRST_CHILD(this), FIRST_CHILD(this)->fops->checksum, loc, flags); - + return 0; } @@ -1494,7 +1337,6 @@ iot_unlink_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc) FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, loc); - return 0; } @@ -2190,355 +2032,58 @@ out: } -/* Must be called with worker lock held */ -void -_iot_queue (iot_worker_t *worker, iot_request_t *req) -{ - list_add_tail (&req->list, &worker->rqlist); - - /* dq_cond */ - worker->queue_size++; - iot_notify_worker(worker); -} - - -iot_request_t * -iot_init_request (iot_worker_t *worker, call_stub_t *stub) -{ - iot_request_t *req = NULL; - - req = mem_get (worker->req_pool); - if (req == NULL) { - goto out; - } - - req->stub = stub; -out: - return req; -} - - -void -iot_destroy_request (iot_worker_t *worker, iot_request_t * req) -{ - if ((req == NULL) || (worker == NULL)) - return; - - mem_put (worker->req_pool, req); -} - - -/* Must be called with worker lock held. */ -gf_boolean_t -iot_can_ordered_exit (iot_worker_t * worker) -{ - gf_boolean_t allow_exit = _gf_false; - iot_conf_t *conf = NULL; - - conf = worker->conf; - /* We dont want this thread to exit if its index is - * below the min thread count. - */ - if (worker->thread_idx >= conf->min_o_threads) - allow_exit = _gf_true; - - return allow_exit; -} - -/* Must be called with worker lock held. */ -gf_boolean_t -iot_ordered_exit (int cond_waitres, iot_worker_t *worker) -{ - gf_boolean_t allow_exit = _gf_false; - - if (worker->state == IOT_STATE_EXIT_REQUEST) { - allow_exit = _gf_true; - } else if (cond_waitres == ETIMEDOUT) { - allow_exit = iot_can_ordered_exit (worker); - } - - if (allow_exit) { - worker->state = IOT_STATE_DEAD; - worker->thread = 0; - } - - return allow_exit; -} - - int -iot_ordered_request_wait (iot_worker_t * worker) +__iot_workers_scale (iot_conf_t *conf) { - int waitres = 0; - int retstat = 0; + int log2 = 0; + int scale = 0; + int diff = 0; + pthread_t thread; + int ret = 0; - if (worker->state == IOT_STATE_EXIT_REQUEST) { - retstat = -1; - goto out; - } + log2 = log_base2 (conf->queue_size); - waitres = iot_notify_wait (worker, worker->conf->o_idle_time); - if (iot_ordered_exit (waitres, worker)) { - retstat = -1; - } + scale = log2; -out: - return retstat; -} - - -call_stub_t * -iot_dequeue_ordered (iot_worker_t *worker) -{ - call_stub_t *stub = NULL; - iot_request_t *req = NULL; - int waitstat = 0; + if (log2 < IOT_MIN_THREADS) + scale = IOT_MIN_THREADS; - LOCK (&worker->qlock); - { - while (!worker->queue_size) { - waitstat = 0; - waitstat = iot_ordered_request_wait (worker); - /* We must've timed out and are now required to - * exit. - */ - if (waitstat == -1) - goto out; - } + if (log2 > conf->max_count) + scale = IOT_MAX_THREADS; - list_for_each_entry (req, &worker->rqlist, list) - break; - list_del (&req->list); - stub = req->stub; - - worker->queue_size--; + if (conf->curr_count < scale) { + diff = scale - conf->curr_count; } -out: - UNLOCK (&worker->qlock); - iot_destroy_request (worker, req); - return stub; -} - - -void * -iot_worker_ordered (void *arg) -{ - iot_worker_t *worker = arg; - call_stub_t *stub = NULL; - - while (1) { + while (diff) { + diff --; - stub = iot_dequeue_ordered (worker); - /* If stub is NULL, we must've timed out waiting for a - * request and have now been allowed to exit. - */ - if (!stub) + ret = pthread_create (&thread, &conf->w_attr, iot_worker, conf); + if (ret == 0) + conf->curr_count++; + else break; - call_resume (stub); - } - - return NULL; -} - - -/* Must be called with worker lock held. */ -gf_boolean_t -iot_can_unordered_exit (iot_worker_t * worker) -{ - gf_boolean_t allow_exit = _gf_false; - iot_conf_t *conf = NULL; - - conf = worker->conf; - /* We dont want this thread to exit if its index is - * below the min thread count. - */ - if (worker->thread_idx >= conf->min_u_threads) - allow_exit = _gf_true; - - return allow_exit; -} - - -/* Must be called with worker lock held. */ -gf_boolean_t -iot_unordered_exit (int cond_waitres, iot_worker_t *worker) -{ - gf_boolean_t allow_exit = _gf_false; - - if (worker->state == IOT_STATE_EXIT_REQUEST) { - allow_exit = _gf_true; - } else if (cond_waitres == ETIMEDOUT) { - allow_exit = iot_can_unordered_exit (worker); } - if (allow_exit) { - worker->state = IOT_STATE_DEAD; - worker->thread = 0; - } - - return allow_exit; + return diff; } int -iot_unordered_request_wait (iot_worker_t * worker) +iot_workers_scale (iot_conf_t *conf) { - int waitres = 0; - int retstat = 0; + int ret = -1; - if (worker->state == IOT_STATE_EXIT_REQUEST) { - retstat = -1; + if (conf == NULL) { + ret = -EINVAL; goto out; } - waitres = iot_notify_wait (worker, worker->conf->u_idle_time); - if (iot_unordered_exit (waitres, worker)) { - retstat = -1; - } - -out: - return retstat; -} - - -call_stub_t * -iot_dequeue_unordered (iot_worker_t *worker) -{ - call_stub_t *stub= NULL; - iot_request_t *req = NULL; - int waitstat = 0; - - LOCK (&worker->qlock); + pthread_mutex_lock (&conf->mutex); { - while (!worker->queue_size) { - waitstat = 0; - waitstat = iot_unordered_request_wait (worker); - /* If -1, request wait must've timed - * out. - */ - if (waitstat == -1) - goto out; - } - - list_for_each_entry (req, &worker->rqlist, list) - break; - list_del (&req->list); - stub = req->stub; - - worker->queue_size--; + ret = __iot_workers_scale (conf); } -out: - UNLOCK (&worker->qlock); - iot_destroy_request (worker, req); - - return stub; -} - - -void * -iot_worker_unordered (void *arg) -{ - iot_worker_t *worker = arg; - call_stub_t *stub = NULL; - - while (1) { - - stub = iot_dequeue_unordered (worker); - /* If no request was received, we must've timed out, - * and can exit. */ - if (!stub) - break; - - call_resume (stub); - } - - return NULL; -} - - -void -deallocate_worker_array (iot_worker_t **workers) -{ - FREE (workers); -} - -void -deallocate_workers (iot_worker_t **workers, - int start_alloc_idx, int count) -{ - int i; - int end_count; - - end_count = count + start_alloc_idx; - for (i = start_alloc_idx; (i < end_count); i++) { - if (workers[i] != NULL) { - mem_pool_destroy (workers[i]->req_pool); - FREE (workers[i]); - workers[i] = NULL; - } - } - -} - - -iot_worker_t ** -allocate_worker_array (int count) -{ - iot_worker_t **warr = NULL; - - warr = CALLOC (count, sizeof (iot_worker_t *)); - - return warr; -} - - -iot_worker_t * -allocate_worker (iot_conf_t * conf) -{ - iot_worker_t *wrk = NULL; - - wrk = CALLOC (1, sizeof (iot_worker_t)); - if (wrk == NULL) { - gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); - goto out; - } - - wrk->req_pool = mem_pool_new (iot_request_t, IOT_REQUEST_MEMPOOL_SIZE); - if (wrk->req_pool == NULL) - goto free_wrk; - - INIT_LIST_HEAD (&wrk->rqlist); - wrk->conf = conf; - iot_notify_init (wrk); - wrk->state = IOT_STATE_DEAD; - -out: - return wrk; - -free_wrk: - FREE (wrk); - return NULL; -} - - -int -allocate_workers (iot_conf_t *conf, iot_worker_t **workers, int start_alloc_idx, - int count) -{ - int i; - int end_count, ret = -1; - - end_count = count + start_alloc_idx; - for (i = start_alloc_idx; i < end_count; i++) { - workers[i] = allocate_worker (conf); - if (workers[i] == NULL) { - ret = -ENOMEM; - goto out; - } - workers[i]->thread_idx = i; - } - ret = 0; + pthread_mutex_unlock (&conf->mutex); out: return ret; @@ -2546,75 +2091,6 @@ out: void -iot_stop_worker (iot_worker_t *worker) -{ - LOCK (&worker->qlock); - { - worker->state = IOT_STATE_EXIT_REQUEST; - } - UNLOCK (&worker->qlock); - - iot_notify_worker (worker); - pthread_join (worker->thread, NULL); -} - - -void -iot_stop_workers (iot_worker_t **workers, int start_idx, int count) -{ - int i = 0; - int end_idx = 0; - - end_idx = start_idx + count; - for (i = start_idx; i < end_idx; i++) { - if (workers[i] != NULL) { - iot_stop_worker (workers[i]); - } - } -} - - -int -iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc) -{ - int ret = -1; - ret = pthread_create (&worker->thread, &worker->conf->w_attr, - workerfunc, worker); - if (ret != 0) { - gf_log (worker->conf->this->name, GF_LOG_ERROR, - "cannot start worker (%s)", strerror (errno)); - ret = -ret; - } else { - worker->state = IOT_STATE_ACTIVE; - } - - return ret; -} - - -int -iot_startup_workers (iot_worker_t **workers, int start_idx, int count, - iot_worker_fn workerfunc) -{ - int i = 0; - int end_idx = 0; - int ret = -1; - - end_idx = start_idx + count; - for (i = start_idx; i < end_idx; i++) { - ret = iot_startup_worker (workers[i], workerfunc); - if (ret < 0) { - goto out; - } - } - - ret = 0; -out: - return ret; -} - - -void set_stack_size (iot_conf_t *conf) { int err = 0; @@ -2624,99 +2100,8 @@ set_stack_size (iot_conf_t *conf) err = pthread_attr_setstacksize (&conf->w_attr, stacksize); if (err == EINVAL) { gf_log (conf->this->name, GF_LOG_WARNING, - "Using default thread stack size"); - } -} - - -void -iot_cleanup_workers (iot_conf_t *conf) -{ - if (conf->uworkers != NULL) { - iot_stop_workers (conf->uworkers, 0, - conf->max_u_threads); - - deallocate_workers (conf->uworkers, 0, - conf->max_u_threads); - - deallocate_worker_array (conf->uworkers); - } - - if (conf->oworkers != NULL) { - iot_stop_workers (conf->oworkers, 0, - conf->max_o_threads); - - deallocate_workers (conf->oworkers, 0, - conf->max_o_threads); - - deallocate_worker_array (conf->oworkers); - } -} - - -int -workers_init (iot_conf_t *conf) -{ - int ret = -1; - - if (conf == NULL) { - ret = -EINVAL; - goto err; - } - - /* Initialize un-ordered workers */ - conf->uworkers = allocate_worker_array (conf->max_u_threads); - if (conf->uworkers == NULL) { - gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); - ret = -ENOMEM; - goto err; - } - - ret = allocate_workers (conf, conf->uworkers, 0, - conf->max_u_threads); - if (ret < 0) { - gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); - goto err; + "Using default thread stack size"); } - - /* Initialize ordered workers */ - conf->oworkers = allocate_worker_array (conf->max_o_threads); - if (conf->oworkers == NULL) { - gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); - ret = -ENOMEM; - goto err; - } - - ret = allocate_workers (conf, conf->oworkers, 0, - conf->max_o_threads); - if (ret < 0) { - gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); - goto err; - } - - set_stack_size (conf); - ret = iot_startup_workers (conf->oworkers, 0, conf->min_o_threads, - iot_worker_ordered); - if (ret == -1) { - /* logged inside iot_startup_workers */ - goto err; - } - - ret = iot_startup_workers (conf->uworkers, 0, conf->min_u_threads, - iot_worker_unordered); - if (ret == -1) { - /* logged inside iot_startup_workers */ - goto err; - } - - return 0; - -err: - if (conf != NULL) { - iot_cleanup_workers (conf); - } - - return ret; } @@ -2725,11 +2110,10 @@ init (xlator_t *this) { iot_conf_t *conf = NULL; dict_t *options = this->options; - int thread_count = IOT_DEFAULT_THREADS; - gf_boolean_t autoscaling = IOT_SCALING_OFF; - char *scalestr = NULL; - int min_threads, max_threads, ret = -1; - + int thread_count = IOT_DEFAULT_THREADS; + int idle_time = IOT_DEFAULT_IDLE; + int ret = 0; + if (!this->children || this->children->next) { gf_log ("io-threads", GF_LOG_ERROR, "FATAL: iot not configured with exactly one child"); @@ -2748,100 +2132,33 @@ init (xlator_t *this) goto out; } - if ((dict_get_str (options, "autoscaling", &scalestr)) == 0) { - if ((gf_string2boolean (scalestr, &autoscaling)) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'autoscaling' option must be" - " boolean"); - goto out; - } - } + thread_count = IOT_DEFAULT_THREADS; if (dict_get (options, "thread-count")) { thread_count = data_to_int32 (dict_get (options, - "thread-count")); - if (scalestr != NULL) - gf_log (this->name, GF_LOG_WARNING, - "'thread-count' is specified with " - "'autoscaling' on. Ignoring" - "'thread-count' option."); - if (thread_count < 2) + "thread-count")); + if (thread_count < IOT_MIN_THREADS) thread_count = IOT_MIN_THREADS; - } - min_threads = IOT_DEFAULT_THREADS; - max_threads = IOT_MAX_THREADS; - if (dict_get (options, "min-threads")) - min_threads = data_to_int32 (dict_get (options, - "min-threads")); + if (thread_count > IOT_MAX_THREADS) + thread_count = IOT_MAX_THREADS; + } + conf->max_count = thread_count; - if (dict_get (options, "max-threads")) - max_threads = data_to_int32 (dict_get (options, - "max-threads")); - - if (min_threads > max_threads) { - gf_log (this->name, GF_LOG_ERROR, " min-threads must be less " - "than max-threads"); - goto out; + if (dict_get (options, "idle-time")) { + idle_time = data_to_int32 (dict_get (options, + "idle-time")); + if (idle_time < 0) + idle_time = 1; } + conf->idle_time = idle_time; - /* If autoscaling is off, then adjust the min and max - * threads according to thread-count. - * This is based on the assumption that despite autoscaling - * being off, we still want to have separate pools for data - * and meta-data threads. - */ - if (!autoscaling) - max_threads = min_threads = thread_count; + conf->this = this; - /* If user specifies an odd number of threads, increase it by - * one. The reason for having an even number of threads is - * explained later. - */ - if (max_threads % 2) - max_threads++; - - if(min_threads % 2) - min_threads++; - - /* If the user wants to have only a single thread for - * some strange reason, make sure we set this count to - * 2. Explained later. - */ - if (min_threads < IOT_MIN_THREADS) - min_threads = IOT_MIN_THREADS; - - /* Again, have atleast two. Read on. */ - if (max_threads < IOT_MIN_THREADS) - max_threads = IOT_MIN_THREADS; - - /* This is why we need atleast two threads. - * We're dividing the specified thread pool into - * 2 halves, equally between ordered and unordered - * pools. - */ + INIT_LIST_HEAD (&conf->req); - /* Init params for un-ordered workers. */ - pthread_mutex_init (&conf->utlock, NULL); - conf->max_u_threads = max_threads / 2; - conf->min_u_threads = min_threads / 2; - conf->u_idle_time = IOT_DEFAULT_IDLE; - conf->u_scaling = autoscaling; - - /* Init params for ordered workers. */ - pthread_mutex_init (&conf->otlock, NULL); - conf->max_o_threads = max_threads / 2; - conf->min_o_threads = min_threads / 2; - conf->o_idle_time = IOT_DEFAULT_IDLE; - conf->o_scaling = autoscaling; - - gf_log (this->name, GF_LOG_DEBUG, - "io-threads: Autoscaling: %s, " - "min_threads: %d, max_threads: %d", - (autoscaling) ? "on":"off", min_threads, max_threads); + ret = iot_workers_scale (conf); - conf->this = this; - ret = workers_init (conf); if (ret == -1) { gf_log (this->name, GF_LOG_ERROR, "cannot initialize worker threads, exiting init"); @@ -2902,7 +2219,7 @@ struct xlator_fops fops = { .opendir = iot_opendir, /* U */ .fsyncdir = iot_fsyncdir, /* O */ .statfs = iot_statfs, /* U */ - .setxattr = iot_setxattr, /* U */ + .setxattr = iot_setxattr, /* U */ .getxattr = iot_getxattr, /* U */ .fgetxattr = iot_fgetxattr, /* O */ .fsetxattr = iot_fsetxattr, /* O */ @@ -2920,29 +2237,16 @@ struct xlator_cbks cbks = { }; struct volume_options options[] = { - { .key = {"thread-count"}, - .type = GF_OPTION_TYPE_INT, - .min = IOT_MIN_THREADS, + { .key = {"thread-count"}, + .type = GF_OPTION_TYPE_INT, + .min = IOT_MIN_THREADS, .max = IOT_MAX_THREADS }, - { .key = {"autoscaling"}, - .type = GF_OPTION_TYPE_BOOL - }, - { .key = {"min-threads"}, - .type = GF_OPTION_TYPE_INT, - .min = IOT_MIN_THREADS, - .max = IOT_MAX_THREADS, - .description = "Minimum number of threads must be greater than or " - "equal to 2. If the specified value is less than 2 " - "it is adjusted upwards to 2. This is a requirement" - " for the current model of threading in io-threads." + {.key = {"idle-time"}, + .type = GF_OPTION_TYPE_INT, + .min = 1, + .max = 0x7fffffff, }, - { .key = {"max-threads"}, - .type = GF_OPTION_TYPE_INT, - .min = IOT_MIN_THREADS, - .max = IOT_MAX_THREADS, - .description = "Maximum number of threads is advisory only so the " - "user specified value will be used." + { .key = {NULL}, }, - { .key = {NULL} }, }; diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index 3cd959069..8b9985be1 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -37,144 +37,38 @@ #include "locking.h" #include <semaphore.h> -#define min(a,b) ((a)<(b)?(a):(b)) -#define max(a,b) ((a)>(b)?(a):(b)) struct iot_conf; -struct iot_worker; -struct iot_request; - -struct iot_request { - struct list_head list; /* Attaches this request to the list of - requests. - */ - call_stub_t *stub; -}; - -typedef enum { - IOT_STATE_ACTIVE, - IOT_STATE_EXIT_REQUEST, - IOT_STATE_DEAD -}iot_state_t; -#define iot_worker_active(wrk) ((wrk)->state == IOT_STATE_ACTIVE) #define MAX_IDLE_SKEW 4 /* In secs */ #define skew_sec_idle_time(sec) ((sec) + (random () % MAX_IDLE_SKEW)) #define IOT_DEFAULT_IDLE 180 /* In secs. */ -#define IOT_MIN_THREADS 2 -#define IOT_DEFAULT_THREADS 16 +#define IOT_MIN_THREADS 1 +#define IOT_DEFAULT_THREADS 8 #define IOT_MAX_THREADS 64 -#define IOT_SCALING_OFF _gf_false -#define IOT_SCALING_ON _gf_true -#define iot_ordered_scaling_on(conf) ((conf)->o_scaling == IOT_SCALING_ON) -#define iot_unordered_scaling_on(conf) ((conf)->u_scaling == IOT_SCALING_ON) #define IOT_THREAD_STACK_SIZE ((size_t)(1024*1024)) -/* This signifies the max number of outstanding request we're expecting - * at a point for every worker thread. - * For an idea of the memory foot-print, consider at most 16 Bytes per - * iot_request_t on a 64-bit system with another 16 bytes per chunk in the - * header. For 64 slots in the pool, we'll use up 2 KiB, with 64 threads this - * goes up to 128 KiB. - * - * Note that this size defines the size of the per-worker mem pool. The - * advantage is that, we're not only reducing the rate of small iot_request_t - * allocations from the heap but also reducing the contention on the libc heap - * by having a mem pool, though small, for each worker. - */ -#define IOT_REQUEST_MEMPOOL_SIZE 64 - -struct iot_worker { - struct list_head rqlist; /* List of requests assigned to me. */ - struct iot_conf *conf; -#ifndef HAVE_SPINLOCK - pthread_cond_t notifier; -#else - sem_t notifier; -#endif - int64_t q,dq; - gf_lock_t qlock; - int32_t queue_size; - pthread_t thread; - iot_state_t state; /* What state is the thread in. */ - int thread_idx; /* Thread's index into the worker - array. Since this will be thread - local data, for ensuring that - number of threads dont fall below - a minimum, we just dont allow - threads with specific indices to - exit. Helps us in eliminating one - place where otherwise a lock - would have been required to update - centralized state inside conf. - */ - struct mem_pool *req_pool; /* iot_request_t's come from here. */ -}; struct iot_conf { - int32_t thread_count; - struct iot_worker **workers; + pthread_mutex_t mutex; + pthread_cond_t cond; + + int32_t max_count; /* configured maximum */ + int32_t curr_count; /* actual number of threads running */ + int32_t sleep_count; + + int32_t idle_time; /* in seconds */ + + struct list_head req; + int queue_size; + pthread_attr_t w_attr; xlator_t *this; - /* Config state for ordered threads. */ - pthread_mutex_t otlock; /* Used to sync any state that needs - to be changed by the ordered - threads. - */ - - int max_o_threads; /* Max. number of ordered threads */ - int min_o_threads; /* Min. number of ordered threads. - Ordered thread count never falls - below this threshold. - */ - - int o_idle_time; /* in Secs. The idle time after - which an ordered thread exits. - */ - gf_boolean_t o_scaling; /* Set to IOT_SCALING_OFF if user - does not want thread scaling on - ordered threads. If scaling is - off, io-threads maintains at - least min_o_threads number of - threads and never lets any thread - exit. - */ - struct iot_worker **oworkers; /* Ordered thread pool. */ - - - /* Config state for unordered threads */ - pthread_mutex_t utlock; /* Used for scaling un-ordered - threads. */ - struct iot_worker **uworkers; /* Un-ordered thread pool. */ - int max_u_threads; /* Number of unordered threads will - not be higher than this. */ - int min_u_threads; /* Number of unordered threads - should not fall below this value. - */ - int u_idle_time; /* If an unordered thread does not - get a request for this amount of - secs, it should try to die. - */ - gf_boolean_t u_scaling; /* Set to IOT_SCALING_OFF if user - does not want thread scaling on - unordered threads. If scaling is - off, io-threads maintains at - least min_u_threads number of - threads and never lets any thread - exit. - */ - - pthread_attr_t w_attr; /* Used to reduce the stack size of - the pthread worker down from the - default of 8MiB. - */ }; typedef struct iot_conf iot_conf_t; -typedef struct iot_worker iot_worker_t; -typedef struct iot_request iot_request_t; #endif /* __IOT_H */ |