summaryrefslogtreecommitdiffstats
path: root/xlators/performance/io-threads/src/io-threads.c
diff options
context:
space:
mode:
authorAnand Avati <avati@gluster.com>2010-02-22 11:00:20 +0000
committerAnand V. Avati <avati@dev.gluster.com>2010-02-22 08:51:40 -0800
commit5ae4f11319de9a800a595175678762f9fc924755 (patch)
tree0e19ec86de320453e8441fe59fe3f23844eb7d5d /xlators/performance/io-threads/src/io-threads.c
parent5f524f4b2f0dbccfe6c8d4aab16ce425dd6d2b50 (diff)
io-threads: single queue/multi-thread model
This patch lets io-threads work with a single queue and multiple threads work on picking the next request from the queue and process it. Whenever the number of pending requests in the queue double, a new worker thread is spawned. Workers expire after a (configurable) timeout of inactivity Signed-off-by: Anand V. Avati <avati@amp.gluster.com> Signed-off-by: Anand V. Avati <avati@blackhole.gluster.com> Signed-off-by: Anand V. Avati <avati@dev.gluster.com> BUG: 583 (filesystem access hangs while deleting large files) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=583
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
-rw-r--r--xlators/performance/io-threads/src/io-threads.c976
1 files changed, 140 insertions, 836 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index e99012cc0..5339ce9a8 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -33,283 +33,126 @@
#include <time.h>
#include "locking.h"
-typedef void *(*iot_worker_fn)(void*);
+void *iot_worker (void *arg);
+int iot_workers_scale (iot_conf_t *conf);
+int __iot_workers_scale (iot_conf_t *conf);
-void
-iot_stop_worker (iot_worker_t *worker);
-
-void
-iot_stop_workers (iot_worker_t **workers, int start_idx, int count);
-
-void
-_iot_queue (iot_worker_t *worker, iot_request_t *req);
-
-iot_request_t *
-iot_init_request (iot_worker_t *conf, call_stub_t *stub);
-int
-iot_startup_workers (iot_worker_t **workers, int start_idx, int count,
- iot_worker_fn workerfunc);
-
-void *
-iot_worker_unordered (void *arg);
-
-void *
-iot_worker_ordered (void *arg);
-
-int
-iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc);
-
-void
-iot_destroy_request (iot_worker_t *worker, iot_request_t * req);
-
-void
-iot_notify_worker (iot_worker_t *worker)
+call_stub_t *
+__iot_dequeue (iot_conf_t *conf)
{
-#ifndef HAVE_SPINLOCK
- pthread_cond_broadcast (&worker->notifier);
-#else
- sem_post (&worker->notifier);
-#endif
-
- return;
-}
+ call_stub_t *stub = NULL;
-int
-iot_notify_wait (iot_worker_t *worker, int idletime)
-{
- struct timeval tv;
- struct timespec ts = {0, };
- int waitres = 0;
+ if (list_empty (&conf->req))
+ return NULL;
- gettimeofday (&tv, NULL);
- /* Slightly skew the idle time for threads so that, we dont
- * have all of them rushing to exit at the same time, if
- * they've been idle.
- */
- ts.tv_sec = skew_sec_idle_time (tv.tv_sec + idletime);
-
-#ifndef HAVE_SPINLOCK
- waitres = pthread_cond_timedwait (&worker->notifier, &worker->qlock,
- &ts);
-#else
- UNLOCK (&worker->qlock);
- errno = 0;
- waitres = sem_timedwait (&worker->notifier, &ts);
- LOCK (&worker->qlock);
- if (waitres < 0)
- waitres = errno;
-#endif
+ stub = list_entry (conf->req.next, call_stub_t, list);
+ list_del_init (&stub->list);
+ conf->queue_size--;
- return waitres;
+ return stub;
}
+
void
-iot_notify_init (iot_worker_t *worker)
+__iot_enqueue (iot_conf_t *conf, call_stub_t *stub)
{
- if (worker == NULL)
- return;
-
- LOCK_INIT (&worker->qlock);
-
-#ifndef HAVE_SPINLOCK
- pthread_cond_init (&worker->notifier, NULL);
-#else
- sem_init (&worker->notifier, 0, 0);
-#endif
+ list_add_tail (&stub->list, &conf->req);
+ conf->queue_size++;
return;
}
-/* I know this function modularizes things a bit too much,
- * but it is easier on the eyes to read this than see all that locking,
- * queueing, and thread firing in the same curly block, as was the
- * case before this function.
- */
-int
-iot_request_queue_and_thread_fire (iot_worker_t *worker,
- iot_worker_fn workerfunc, iot_request_t *req)
-{
- int ret = -1;
- LOCK (&worker->qlock);
- {
- if (iot_worker_active (worker)) {
- _iot_queue (worker, req);
- ret = 0;
- }else {
- ret = iot_startup_worker (worker, workerfunc);
- if (ret < 0) {
- goto unlock;
- }
- _iot_queue (worker, req);
- }
- }
-unlock:
- UNLOCK (&worker->qlock);
- return ret;
-}
+void *
+iot_worker (void *data)
+{
+ iot_conf_t *conf = NULL;
+ xlator_t *this = NULL;
+ call_stub_t *stub = NULL;
+ struct timespec sleep_till = {0, };
+ int ret = 0;
+ char timeout = 0;
+ char bye = 0;
+
+ conf = data;
+ this = conf->this;
+ THIS = this;
+
+ while (1) {
+ sleep_till.tv_sec = time (NULL) + conf->idle_time;
+
+ pthread_mutex_lock (&conf->mutex);
+ {
+ while (list_empty (&conf->req)) {
+ conf->sleep_count++;
+
+ ret = pthread_cond_timedwait (&conf->cond,
+ &conf->mutex,
+ &sleep_till);
+ conf->sleep_count--;
+
+ if (ret == -1 && errno == ETIMEDOUT) {
+ timeout = 1;
+ break;
+ }
+ }
+ if (timeout) {
+ if (conf->curr_count > IOT_MIN_THREADS) {
+ conf->curr_count--;
+ bye = 1;
+ } else {
+ timeout = 0;
+ }
+ }
-int
-iot_unordered_request_balancer (iot_conf_t *conf)
-{
- long int rand = 0;
- int idx = 0;
+ stub = __iot_dequeue (conf);
+ }
+ pthread_mutex_unlock (&conf->mutex);
- /* Decide which thread will service the request.
- * FIXME: This should change into some form of load-balancing.
- * */
- rand = random ();
+ if (stub) /* guard against spurious wakeups */
+ call_resume (stub);
- /* If scaling is on, we can choose from any thread
- * that has been allocated upto, max_o_threads, but
- * with scaling off, we'll never have threads more
- * than min_o_threads.
- */
- if (iot_unordered_scaling_on (conf))
- idx = (rand % conf->max_u_threads);
- else
- idx = (rand % conf->min_u_threads);
+ if (bye)
+ break;
+ }
- return idx;
+ return NULL;
}
int
-iot_schedule_unordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub)
+iot_schedule (iot_conf_t *conf, call_stub_t *stub)
{
- int32_t idx = 0;
- iot_worker_t *selected_worker = NULL;
- iot_request_t *req = NULL;
- int ret = -1;
+ int ret = 0;
- idx = iot_unordered_request_balancer (conf);
- selected_worker = conf->uworkers[idx];
+ pthread_mutex_lock (&conf->mutex);
+ {
+ __iot_enqueue (conf, stub);
- req = iot_init_request (selected_worker, stub);
- if (req == NULL) {
- ret = -ENOMEM;
- goto out;
- }
+ pthread_cond_signal (&conf->cond);
- ret = iot_request_queue_and_thread_fire (selected_worker,
- iot_worker_unordered, req);
- if (ret < 0) {
- iot_destroy_request (selected_worker, req);
+ ret = __iot_workers_scale (conf);
}
-out:
- return ret;
-}
-
-
-/* Only to be used with ordered requests.
- */
-uint64_t
-iot_create_inode_worker_assoc (iot_conf_t * conf, inode_t * inode)
-{
- long int rand = 0;
- uint64_t idx = 0;
-
- rand = random ();
- /* If scaling is on, we can choose from any thread
- * that has been allocated upto, max_o_threads, but
- * with scaling off, we'll never have threads more
- * than min_o_threads.
- */
- if (iot_ordered_scaling_on (conf))
- idx = (rand % conf->max_o_threads);
- else
- idx = (rand % conf->min_o_threads);
+ pthread_mutex_unlock (&conf->mutex);
- __inode_ctx_put (inode, conf->this, idx);
-
- return idx;
+ return 0;
}
-/* Assumes inode lock is held. */
-int32_t
-iot_ordered_request_balancer (iot_conf_t *conf, inode_t *inode, uint64_t *idx)
+int
+iot_schedule_unordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub)
{
- int ret = -1;
-
- if (__inode_ctx_get (inode, conf->this, idx) < 0)
- *idx = iot_create_inode_worker_assoc (conf, inode);
- else {
- /* Sanity check to ensure the idx received from the inode
- * context is within bounds. We're a bit optimistic in
- * assuming that if an index is within bounds, it is
- * not corrupted. idx is uint so we dont check for less
- * than 0.
- */
- if ((*idx >= (uint64_t)conf->max_o_threads)) {
- gf_log (conf->this->name, GF_LOG_DEBUG,
- "inode context returned insane thread index %"
- PRIu64, *idx);
- ret = -EINVAL;
- goto out;
- }
- }
- ret = 0;
-out:
- return ret;
+ return iot_schedule (conf, stub);
}
int
iot_schedule_ordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub)
{
- uint64_t idx = 0;
- iot_worker_t *selected_worker = NULL;
- iot_request_t *req = NULL;
- int balstatus = 0, ret = -1;
-
- if (inode == NULL) {
- gf_log (conf->this->name, GF_LOG_DEBUG,
- "Got NULL inode for ordered request");
- ret = -EINVAL;
- goto out;
- }
-
- LOCK (&inode->lock);
- {
- balstatus = iot_ordered_request_balancer (conf, inode, &idx);
- if (balstatus < 0) {
- gf_log (conf->this->name, GF_LOG_DEBUG,
- "Insane worker index. Unwinding stack");
- ret = -ECANCELED;
- goto unlock_out;
- }
- /* inode lock once acquired, cannot be left here
- * because other gluster main threads might be
- * contending on it to append a request for this file.
- * So we'll also leave the lock only after we've
- * added the request to the worker queue.
- */
- selected_worker = conf->oworkers[idx];
-
- req = iot_init_request (selected_worker, stub);
- if (req == NULL) {
- gf_log (conf->this->name, GF_LOG_ERROR,"out of memory");
- ret = -ENOMEM;
- goto unlock_out;
- }
- ret = iot_request_queue_and_thread_fire (selected_worker,
- iot_worker_ordered,
- req);
- }
-unlock_out:
- UNLOCK (&inode->lock);
-
-out:
- if (ret < 0) {
- if (req != NULL) {
- iot_destroy_request (selected_worker, req);
- }
- }
- return ret;
+ return iot_schedule (conf, stub);
}
@@ -394,7 +237,7 @@ iot_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,
struct stat *stbuf, int32_t valid)
{
call_stub_t *stub = NULL;
- int ret = -1;
+ int ret = -1;
stub = fop_setattr_stub (frame, iot_setattr_wrapper, loc, stbuf, valid);
if (!stub) {
@@ -445,7 +288,7 @@ iot_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd,
struct stat *stbuf, int32_t valid)
{
call_stub_t *stub = NULL;
- int ret = -1;
+ int ret = -1;
stub = fop_fsetattr_stub (frame, iot_fsetattr_wrapper, fd, stbuf,
valid);
@@ -1442,7 +1285,7 @@ iot_checksum_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->checksum,
loc, flags);
-
+
return 0;
}
@@ -1494,7 +1337,6 @@ iot_unlink_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc)
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->unlink,
loc);
-
return 0;
}
@@ -2190,355 +2032,58 @@ out:
}
-/* Must be called with worker lock held */
-void
-_iot_queue (iot_worker_t *worker, iot_request_t *req)
-{
- list_add_tail (&req->list, &worker->rqlist);
-
- /* dq_cond */
- worker->queue_size++;
- iot_notify_worker(worker);
-}
-
-
-iot_request_t *
-iot_init_request (iot_worker_t *worker, call_stub_t *stub)
-{
- iot_request_t *req = NULL;
-
- req = mem_get (worker->req_pool);
- if (req == NULL) {
- goto out;
- }
-
- req->stub = stub;
-out:
- return req;
-}
-
-
-void
-iot_destroy_request (iot_worker_t *worker, iot_request_t * req)
-{
- if ((req == NULL) || (worker == NULL))
- return;
-
- mem_put (worker->req_pool, req);
-}
-
-
-/* Must be called with worker lock held. */
-gf_boolean_t
-iot_can_ordered_exit (iot_worker_t * worker)
-{
- gf_boolean_t allow_exit = _gf_false;
- iot_conf_t *conf = NULL;
-
- conf = worker->conf;
- /* We dont want this thread to exit if its index is
- * below the min thread count.
- */
- if (worker->thread_idx >= conf->min_o_threads)
- allow_exit = _gf_true;
-
- return allow_exit;
-}
-
-/* Must be called with worker lock held. */
-gf_boolean_t
-iot_ordered_exit (int cond_waitres, iot_worker_t *worker)
-{
- gf_boolean_t allow_exit = _gf_false;
-
- if (worker->state == IOT_STATE_EXIT_REQUEST) {
- allow_exit = _gf_true;
- } else if (cond_waitres == ETIMEDOUT) {
- allow_exit = iot_can_ordered_exit (worker);
- }
-
- if (allow_exit) {
- worker->state = IOT_STATE_DEAD;
- worker->thread = 0;
- }
-
- return allow_exit;
-}
-
-
int
-iot_ordered_request_wait (iot_worker_t * worker)
+__iot_workers_scale (iot_conf_t *conf)
{
- int waitres = 0;
- int retstat = 0;
+ int log2 = 0;
+ int scale = 0;
+ int diff = 0;
+ pthread_t thread;
+ int ret = 0;
- if (worker->state == IOT_STATE_EXIT_REQUEST) {
- retstat = -1;
- goto out;
- }
+ log2 = log_base2 (conf->queue_size);
- waitres = iot_notify_wait (worker, worker->conf->o_idle_time);
- if (iot_ordered_exit (waitres, worker)) {
- retstat = -1;
- }
+ scale = log2;
-out:
- return retstat;
-}
-
-
-call_stub_t *
-iot_dequeue_ordered (iot_worker_t *worker)
-{
- call_stub_t *stub = NULL;
- iot_request_t *req = NULL;
- int waitstat = 0;
+ if (log2 < IOT_MIN_THREADS)
+ scale = IOT_MIN_THREADS;
- LOCK (&worker->qlock);
- {
- while (!worker->queue_size) {
- waitstat = 0;
- waitstat = iot_ordered_request_wait (worker);
- /* We must've timed out and are now required to
- * exit.
- */
- if (waitstat == -1)
- goto out;
- }
+ if (log2 > conf->max_count)
+ scale = IOT_MAX_THREADS;
- list_for_each_entry (req, &worker->rqlist, list)
- break;
- list_del (&req->list);
- stub = req->stub;
-
- worker->queue_size--;
+ if (conf->curr_count < scale) {
+ diff = scale - conf->curr_count;
}
-out:
- UNLOCK (&worker->qlock);
- iot_destroy_request (worker, req);
- return stub;
-}
-
-
-void *
-iot_worker_ordered (void *arg)
-{
- iot_worker_t *worker = arg;
- call_stub_t *stub = NULL;
-
- while (1) {
+ while (diff) {
+ diff --;
- stub = iot_dequeue_ordered (worker);
- /* If stub is NULL, we must've timed out waiting for a
- * request and have now been allowed to exit.
- */
- if (!stub)
+ ret = pthread_create (&thread, &conf->w_attr, iot_worker, conf);
+ if (ret == 0)
+ conf->curr_count++;
+ else
break;
- call_resume (stub);
- }
-
- return NULL;
-}
-
-
-/* Must be called with worker lock held. */
-gf_boolean_t
-iot_can_unordered_exit (iot_worker_t * worker)
-{
- gf_boolean_t allow_exit = _gf_false;
- iot_conf_t *conf = NULL;
-
- conf = worker->conf;
- /* We dont want this thread to exit if its index is
- * below the min thread count.
- */
- if (worker->thread_idx >= conf->min_u_threads)
- allow_exit = _gf_true;
-
- return allow_exit;
-}
-
-
-/* Must be called with worker lock held. */
-gf_boolean_t
-iot_unordered_exit (int cond_waitres, iot_worker_t *worker)
-{
- gf_boolean_t allow_exit = _gf_false;
-
- if (worker->state == IOT_STATE_EXIT_REQUEST) {
- allow_exit = _gf_true;
- } else if (cond_waitres == ETIMEDOUT) {
- allow_exit = iot_can_unordered_exit (worker);
}
- if (allow_exit) {
- worker->state = IOT_STATE_DEAD;
- worker->thread = 0;
- }
-
- return allow_exit;
+ return diff;
}
int
-iot_unordered_request_wait (iot_worker_t * worker)
+iot_workers_scale (iot_conf_t *conf)
{
- int waitres = 0;
- int retstat = 0;
+ int ret = -1;
- if (worker->state == IOT_STATE_EXIT_REQUEST) {
- retstat = -1;
+ if (conf == NULL) {
+ ret = -EINVAL;
goto out;
}
- waitres = iot_notify_wait (worker, worker->conf->u_idle_time);
- if (iot_unordered_exit (waitres, worker)) {
- retstat = -1;
- }
-
-out:
- return retstat;
-}
-
-
-call_stub_t *
-iot_dequeue_unordered (iot_worker_t *worker)
-{
- call_stub_t *stub= NULL;
- iot_request_t *req = NULL;
- int waitstat = 0;
-
- LOCK (&worker->qlock);
+ pthread_mutex_lock (&conf->mutex);
{
- while (!worker->queue_size) {
- waitstat = 0;
- waitstat = iot_unordered_request_wait (worker);
- /* If -1, request wait must've timed
- * out.
- */
- if (waitstat == -1)
- goto out;
- }
-
- list_for_each_entry (req, &worker->rqlist, list)
- break;
- list_del (&req->list);
- stub = req->stub;
-
- worker->queue_size--;
+ ret = __iot_workers_scale (conf);
}
-out:
- UNLOCK (&worker->qlock);
- iot_destroy_request (worker, req);
-
- return stub;
-}
-
-
-void *
-iot_worker_unordered (void *arg)
-{
- iot_worker_t *worker = arg;
- call_stub_t *stub = NULL;
-
- while (1) {
-
- stub = iot_dequeue_unordered (worker);
- /* If no request was received, we must've timed out,
- * and can exit. */
- if (!stub)
- break;
-
- call_resume (stub);
- }
-
- return NULL;
-}
-
-
-void
-deallocate_worker_array (iot_worker_t **workers)
-{
- FREE (workers);
-}
-
-void
-deallocate_workers (iot_worker_t **workers,
- int start_alloc_idx, int count)
-{
- int i;
- int end_count;
-
- end_count = count + start_alloc_idx;
- for (i = start_alloc_idx; (i < end_count); i++) {
- if (workers[i] != NULL) {
- mem_pool_destroy (workers[i]->req_pool);
- FREE (workers[i]);
- workers[i] = NULL;
- }
- }
-
-}
-
-
-iot_worker_t **
-allocate_worker_array (int count)
-{
- iot_worker_t **warr = NULL;
-
- warr = CALLOC (count, sizeof (iot_worker_t *));
-
- return warr;
-}
-
-
-iot_worker_t *
-allocate_worker (iot_conf_t * conf)
-{
- iot_worker_t *wrk = NULL;
-
- wrk = CALLOC (1, sizeof (iot_worker_t));
- if (wrk == NULL) {
- gf_log (conf->this->name, GF_LOG_ERROR, "out of memory");
- goto out;
- }
-
- wrk->req_pool = mem_pool_new (iot_request_t, IOT_REQUEST_MEMPOOL_SIZE);
- if (wrk->req_pool == NULL)
- goto free_wrk;
-
- INIT_LIST_HEAD (&wrk->rqlist);
- wrk->conf = conf;
- iot_notify_init (wrk);
- wrk->state = IOT_STATE_DEAD;
-
-out:
- return wrk;
-
-free_wrk:
- FREE (wrk);
- return NULL;
-}
-
-
-int
-allocate_workers (iot_conf_t *conf, iot_worker_t **workers, int start_alloc_idx,
- int count)
-{
- int i;
- int end_count, ret = -1;
-
- end_count = count + start_alloc_idx;
- for (i = start_alloc_idx; i < end_count; i++) {
- workers[i] = allocate_worker (conf);
- if (workers[i] == NULL) {
- ret = -ENOMEM;
- goto out;
- }
- workers[i]->thread_idx = i;
- }
- ret = 0;
+ pthread_mutex_unlock (&conf->mutex);
out:
return ret;
@@ -2546,75 +2091,6 @@ out:
void
-iot_stop_worker (iot_worker_t *worker)
-{
- LOCK (&worker->qlock);
- {
- worker->state = IOT_STATE_EXIT_REQUEST;
- }
- UNLOCK (&worker->qlock);
-
- iot_notify_worker (worker);
- pthread_join (worker->thread, NULL);
-}
-
-
-void
-iot_stop_workers (iot_worker_t **workers, int start_idx, int count)
-{
- int i = 0;
- int end_idx = 0;
-
- end_idx = start_idx + count;
- for (i = start_idx; i < end_idx; i++) {
- if (workers[i] != NULL) {
- iot_stop_worker (workers[i]);
- }
- }
-}
-
-
-int
-iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc)
-{
- int ret = -1;
- ret = pthread_create (&worker->thread, &worker->conf->w_attr,
- workerfunc, worker);
- if (ret != 0) {
- gf_log (worker->conf->this->name, GF_LOG_ERROR,
- "cannot start worker (%s)", strerror (errno));
- ret = -ret;
- } else {
- worker->state = IOT_STATE_ACTIVE;
- }
-
- return ret;
-}
-
-
-int
-iot_startup_workers (iot_worker_t **workers, int start_idx, int count,
- iot_worker_fn workerfunc)
-{
- int i = 0;
- int end_idx = 0;
- int ret = -1;
-
- end_idx = start_idx + count;
- for (i = start_idx; i < end_idx; i++) {
- ret = iot_startup_worker (workers[i], workerfunc);
- if (ret < 0) {
- goto out;
- }
- }
-
- ret = 0;
-out:
- return ret;
-}
-
-
-void
set_stack_size (iot_conf_t *conf)
{
int err = 0;
@@ -2624,99 +2100,8 @@ set_stack_size (iot_conf_t *conf)
err = pthread_attr_setstacksize (&conf->w_attr, stacksize);
if (err == EINVAL) {
gf_log (conf->this->name, GF_LOG_WARNING,
- "Using default thread stack size");
- }
-}
-
-
-void
-iot_cleanup_workers (iot_conf_t *conf)
-{
- if (conf->uworkers != NULL) {
- iot_stop_workers (conf->uworkers, 0,
- conf->max_u_threads);
-
- deallocate_workers (conf->uworkers, 0,
- conf->max_u_threads);
-
- deallocate_worker_array (conf->uworkers);
- }
-
- if (conf->oworkers != NULL) {
- iot_stop_workers (conf->oworkers, 0,
- conf->max_o_threads);
-
- deallocate_workers (conf->oworkers, 0,
- conf->max_o_threads);
-
- deallocate_worker_array (conf->oworkers);
- }
-}
-
-
-int
-workers_init (iot_conf_t *conf)
-{
- int ret = -1;
-
- if (conf == NULL) {
- ret = -EINVAL;
- goto err;
- }
-
- /* Initialize un-ordered workers */
- conf->uworkers = allocate_worker_array (conf->max_u_threads);
- if (conf->uworkers == NULL) {
- gf_log (conf->this->name, GF_LOG_ERROR, "out of memory");
- ret = -ENOMEM;
- goto err;
- }
-
- ret = allocate_workers (conf, conf->uworkers, 0,
- conf->max_u_threads);
- if (ret < 0) {
- gf_log (conf->this->name, GF_LOG_ERROR, "out of memory");
- goto err;
+ "Using default thread stack size");
}
-
- /* Initialize ordered workers */
- conf->oworkers = allocate_worker_array (conf->max_o_threads);
- if (conf->oworkers == NULL) {
- gf_log (conf->this->name, GF_LOG_ERROR, "out of memory");
- ret = -ENOMEM;
- goto err;
- }
-
- ret = allocate_workers (conf, conf->oworkers, 0,
- conf->max_o_threads);
- if (ret < 0) {
- gf_log (conf->this->name, GF_LOG_ERROR, "out of memory");
- goto err;
- }
-
- set_stack_size (conf);
- ret = iot_startup_workers (conf->oworkers, 0, conf->min_o_threads,
- iot_worker_ordered);
- if (ret == -1) {
- /* logged inside iot_startup_workers */
- goto err;
- }
-
- ret = iot_startup_workers (conf->uworkers, 0, conf->min_u_threads,
- iot_worker_unordered);
- if (ret == -1) {
- /* logged inside iot_startup_workers */
- goto err;
- }
-
- return 0;
-
-err:
- if (conf != NULL) {
- iot_cleanup_workers (conf);
- }
-
- return ret;
}
@@ -2725,11 +2110,10 @@ init (xlator_t *this)
{
iot_conf_t *conf = NULL;
dict_t *options = this->options;
- int thread_count = IOT_DEFAULT_THREADS;
- gf_boolean_t autoscaling = IOT_SCALING_OFF;
- char *scalestr = NULL;
- int min_threads, max_threads, ret = -1;
-
+ int thread_count = IOT_DEFAULT_THREADS;
+ int idle_time = IOT_DEFAULT_IDLE;
+ int ret = 0;
+
if (!this->children || this->children->next) {
gf_log ("io-threads", GF_LOG_ERROR,
"FATAL: iot not configured with exactly one child");
@@ -2748,100 +2132,33 @@ init (xlator_t *this)
goto out;
}
- if ((dict_get_str (options, "autoscaling", &scalestr)) == 0) {
- if ((gf_string2boolean (scalestr, &autoscaling)) == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "'autoscaling' option must be"
- " boolean");
- goto out;
- }
- }
+ thread_count = IOT_DEFAULT_THREADS;
if (dict_get (options, "thread-count")) {
thread_count = data_to_int32 (dict_get (options,
- "thread-count"));
- if (scalestr != NULL)
- gf_log (this->name, GF_LOG_WARNING,
- "'thread-count' is specified with "
- "'autoscaling' on. Ignoring"
- "'thread-count' option.");
- if (thread_count < 2)
+ "thread-count"));
+ if (thread_count < IOT_MIN_THREADS)
thread_count = IOT_MIN_THREADS;
- }
- min_threads = IOT_DEFAULT_THREADS;
- max_threads = IOT_MAX_THREADS;
- if (dict_get (options, "min-threads"))
- min_threads = data_to_int32 (dict_get (options,
- "min-threads"));
+ if (thread_count > IOT_MAX_THREADS)
+ thread_count = IOT_MAX_THREADS;
+ }
+ conf->max_count = thread_count;
- if (dict_get (options, "max-threads"))
- max_threads = data_to_int32 (dict_get (options,
- "max-threads"));
-
- if (min_threads > max_threads) {
- gf_log (this->name, GF_LOG_ERROR, " min-threads must be less "
- "than max-threads");
- goto out;
+ if (dict_get (options, "idle-time")) {
+ idle_time = data_to_int32 (dict_get (options,
+ "idle-time"));
+ if (idle_time < 0)
+ idle_time = 1;
}
+ conf->idle_time = idle_time;
- /* If autoscaling is off, then adjust the min and max
- * threads according to thread-count.
- * This is based on the assumption that despite autoscaling
- * being off, we still want to have separate pools for data
- * and meta-data threads.
- */
- if (!autoscaling)
- max_threads = min_threads = thread_count;
+ conf->this = this;
- /* If user specifies an odd number of threads, increase it by
- * one. The reason for having an even number of threads is
- * explained later.
- */
- if (max_threads % 2)
- max_threads++;
-
- if(min_threads % 2)
- min_threads++;
-
- /* If the user wants to have only a single thread for
- * some strange reason, make sure we set this count to
- * 2. Explained later.
- */
- if (min_threads < IOT_MIN_THREADS)
- min_threads = IOT_MIN_THREADS;
-
- /* Again, have atleast two. Read on. */
- if (max_threads < IOT_MIN_THREADS)
- max_threads = IOT_MIN_THREADS;
-
- /* This is why we need atleast two threads.
- * We're dividing the specified thread pool into
- * 2 halves, equally between ordered and unordered
- * pools.
- */
+ INIT_LIST_HEAD (&conf->req);
- /* Init params for un-ordered workers. */
- pthread_mutex_init (&conf->utlock, NULL);
- conf->max_u_threads = max_threads / 2;
- conf->min_u_threads = min_threads / 2;
- conf->u_idle_time = IOT_DEFAULT_IDLE;
- conf->u_scaling = autoscaling;
-
- /* Init params for ordered workers. */
- pthread_mutex_init (&conf->otlock, NULL);
- conf->max_o_threads = max_threads / 2;
- conf->min_o_threads = min_threads / 2;
- conf->o_idle_time = IOT_DEFAULT_IDLE;
- conf->o_scaling = autoscaling;
-
- gf_log (this->name, GF_LOG_DEBUG,
- "io-threads: Autoscaling: %s, "
- "min_threads: %d, max_threads: %d",
- (autoscaling) ? "on":"off", min_threads, max_threads);
+ ret = iot_workers_scale (conf);
- conf->this = this;
- ret = workers_init (conf);
if (ret == -1) {
gf_log (this->name, GF_LOG_ERROR,
"cannot initialize worker threads, exiting init");
@@ -2902,7 +2219,7 @@ struct xlator_fops fops = {
.opendir = iot_opendir, /* U */
.fsyncdir = iot_fsyncdir, /* O */
.statfs = iot_statfs, /* U */
- .setxattr = iot_setxattr, /* U */
+ .setxattr = iot_setxattr, /* U */
.getxattr = iot_getxattr, /* U */
.fgetxattr = iot_fgetxattr, /* O */
.fsetxattr = iot_fsetxattr, /* O */
@@ -2920,29 +2237,16 @@ struct xlator_cbks cbks = {
};
struct volume_options options[] = {
- { .key = {"thread-count"},
- .type = GF_OPTION_TYPE_INT,
- .min = IOT_MIN_THREADS,
+ { .key = {"thread-count"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = IOT_MIN_THREADS,
.max = IOT_MAX_THREADS
},
- { .key = {"autoscaling"},
- .type = GF_OPTION_TYPE_BOOL
- },
- { .key = {"min-threads"},
- .type = GF_OPTION_TYPE_INT,
- .min = IOT_MIN_THREADS,
- .max = IOT_MAX_THREADS,
- .description = "Minimum number of threads must be greater than or "
- "equal to 2. If the specified value is less than 2 "
- "it is adjusted upwards to 2. This is a requirement"
- " for the current model of threading in io-threads."
+ {.key = {"idle-time"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 1,
+ .max = 0x7fffffff,
},
- { .key = {"max-threads"},
- .type = GF_OPTION_TYPE_INT,
- .min = IOT_MIN_THREADS,
- .max = IOT_MAX_THREADS,
- .description = "Maximum number of threads is advisory only so the "
- "user specified value will be used."
+ { .key = {NULL},
},
- { .key = {NULL} },
};