diff options
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 976 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 134 | 
2 files changed, 154 insertions, 956 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index e99012cc0d6..5339ce9a8c1 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -33,283 +33,126 @@  #include <time.h>  #include "locking.h" -typedef void *(*iot_worker_fn)(void*); +void *iot_worker (void *arg); +int iot_workers_scale (iot_conf_t *conf); +int __iot_workers_scale (iot_conf_t *conf); -void -iot_stop_worker (iot_worker_t *worker); - -void -iot_stop_workers (iot_worker_t **workers, int start_idx, int count); - -void -_iot_queue (iot_worker_t *worker, iot_request_t *req); - -iot_request_t * -iot_init_request (iot_worker_t *conf, call_stub_t *stub); -int -iot_startup_workers (iot_worker_t **workers, int start_idx, int count, -                     iot_worker_fn workerfunc); - -void * -iot_worker_unordered (void *arg); - -void * -iot_worker_ordered (void *arg); - -int -iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc); - -void -iot_destroy_request (iot_worker_t *worker, iot_request_t * req); - -void -iot_notify_worker (iot_worker_t *worker) +call_stub_t * +__iot_dequeue (iot_conf_t *conf)  { -#ifndef HAVE_SPINLOCK -        pthread_cond_broadcast (&worker->notifier); -#else -        sem_post (&worker->notifier); -#endif - -        return; -} +        call_stub_t  *stub = NULL; -int -iot_notify_wait (iot_worker_t *worker, int idletime) -{ -        struct timeval  tv; -        struct timespec ts = {0, }; -        int             waitres = 0; +        if (list_empty (&conf->req)) +                return NULL; -        gettimeofday (&tv, NULL); -        /* Slightly skew the idle time for threads so that, we dont -         * have all of them rushing to exit at the same time, if -         * they've been idle. -         */ -        ts.tv_sec = skew_sec_idle_time (tv.tv_sec + idletime); - -#ifndef HAVE_SPINLOCK -        waitres = pthread_cond_timedwait (&worker->notifier, &worker->qlock, -                                          &ts); -#else -        UNLOCK (&worker->qlock); -        errno = 0; -        waitres = sem_timedwait (&worker->notifier, &ts); -        LOCK (&worker->qlock); -        if (waitres < 0) -                waitres = errno; -#endif +        stub = list_entry (conf->req.next, call_stub_t, list); +        list_del_init (&stub->list); +        conf->queue_size--; -        return waitres; +        return stub;  } +  void -iot_notify_init (iot_worker_t *worker) +__iot_enqueue (iot_conf_t *conf, call_stub_t *stub)  { -        if (worker == NULL) -                return; - -        LOCK_INIT (&worker->qlock); - -#ifndef HAVE_SPINLOCK -        pthread_cond_init (&worker->notifier, NULL); -#else -        sem_init (&worker->notifier, 0, 0); -#endif +        list_add_tail (&stub->list, &conf->req); +        conf->queue_size++;          return;  } -/* I know this function modularizes things a bit too much, - * but it is easier on the eyes to read this than see all that locking, - * queueing, and thread firing in the same curly block, as was the - * case before this function. - */ -int -iot_request_queue_and_thread_fire (iot_worker_t *worker, -                                   iot_worker_fn workerfunc, iot_request_t *req) -{ -        int     ret = -1;  -        LOCK (&worker->qlock); -        { -                if (iot_worker_active (worker)) { -                        _iot_queue (worker, req); -                        ret = 0; -                }else { -                        ret = iot_startup_worker (worker, workerfunc); -                        if (ret < 0) { -                                goto unlock; -                        } -                        _iot_queue (worker, req); -                } -        } -unlock: -        UNLOCK (&worker->qlock); -        return ret; -} +void * +iot_worker (void *data) +{ +        iot_conf_t       *conf = NULL; +        xlator_t         *this = NULL; +        call_stub_t      *stub = NULL; +        struct timespec   sleep_till = {0, }; +        int               ret = 0; +        char              timeout = 0; +        char              bye = 0; + +        conf = data; +        this = conf->this; +        THIS = this; + +        while (1) { +                sleep_till.tv_sec = time (NULL) + conf->idle_time; + +                pthread_mutex_lock (&conf->mutex); +                { +                        while (list_empty (&conf->req)) { +                                conf->sleep_count++; + +                                ret = pthread_cond_timedwait (&conf->cond, +                                                              &conf->mutex, +                                                              &sleep_till); +                                conf->sleep_count--; + +                                if (ret == -1 && errno == ETIMEDOUT) { +                                        timeout = 1; +                                        break; +                                } +                        } +                        if (timeout) { +                                if (conf->curr_count > IOT_MIN_THREADS) { +                                        conf->curr_count--; +                                        bye = 1; +                                } else { +                                        timeout = 0; +                                } +                        } -int -iot_unordered_request_balancer (iot_conf_t *conf) -{ -        long int        rand = 0; -        int             idx = 0; +                        stub = __iot_dequeue (conf); +                } +                pthread_mutex_unlock (&conf->mutex); -        /* Decide which thread will service the request. -         * FIXME: This should change into some form of load-balancing. -         * */ -        rand = random (); +                if (stub) /* guard against spurious wakeups */ +                        call_resume (stub); -        /* If scaling is on, we can choose from any thread -        * that has been allocated upto, max_o_threads, but -        * with scaling off, we'll never have threads more -        * than min_o_threads. -        */ -        if (iot_unordered_scaling_on (conf)) -                idx = (rand % conf->max_u_threads); -        else -                idx = (rand % conf->min_u_threads); +                if (bye) +                        break; +        } -        return idx; +        return NULL;  }  int -iot_schedule_unordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub) +iot_schedule (iot_conf_t *conf, call_stub_t *stub)  { -        int32_t          idx = 0; -        iot_worker_t    *selected_worker = NULL; -        iot_request_t   *req = NULL; -        int             ret = -1; +        int   ret = 0; -        idx = iot_unordered_request_balancer (conf); -        selected_worker = conf->uworkers[idx]; +        pthread_mutex_lock (&conf->mutex); +        { +                __iot_enqueue (conf, stub); -        req = iot_init_request (selected_worker, stub); -        if (req == NULL) { -                ret = -ENOMEM; -                goto out; -        } +                pthread_cond_signal (&conf->cond); -        ret = iot_request_queue_and_thread_fire (selected_worker, -                                                 iot_worker_unordered, req); -        if (ret < 0) { -                iot_destroy_request (selected_worker, req); +                ret = __iot_workers_scale (conf);          } -out: -        return ret; -} - - -/* Only to be used with ordered requests. - */ -uint64_t -iot_create_inode_worker_assoc (iot_conf_t * conf, inode_t * inode) -{ -        long int        rand = 0; -        uint64_t        idx = 0; - -        rand = random (); -        /* If scaling is on, we can choose from any thread -        * that has been allocated upto, max_o_threads, but -        * with scaling off, we'll never have threads more -        * than min_o_threads. -        */ -        if (iot_ordered_scaling_on (conf)) -                idx = (rand % conf->max_o_threads); -        else -                idx = (rand % conf->min_o_threads); +        pthread_mutex_unlock (&conf->mutex); -        __inode_ctx_put (inode, conf->this, idx); - -        return idx; +        return 0;  } -/* Assumes inode lock is held. */ -int32_t -iot_ordered_request_balancer (iot_conf_t *conf, inode_t *inode, uint64_t *idx) +int +iot_schedule_unordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub)  { -        int ret = -1; - -        if (__inode_ctx_get (inode, conf->this, idx) < 0) -                *idx = iot_create_inode_worker_assoc (conf, inode); -        else { -                /* Sanity check to ensure the idx received from the inode -                * context is within bounds. We're a bit optimistic in -                * assuming that if an index is within bounds, it is -                * not corrupted. idx is uint so we dont check for less -                * than 0. -                */ -                if ((*idx >= (uint64_t)conf->max_o_threads)) { -                        gf_log (conf->this->name, GF_LOG_DEBUG, -                                "inode context returned insane thread index %" -                                PRIu64, *idx); -                        ret = -EINVAL; -                        goto out; -                } -        } -        ret = 0; -out: -        return ret; +        return iot_schedule (conf, stub);  }  int  iot_schedule_ordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub)  { -        uint64_t         idx = 0; -        iot_worker_t    *selected_worker = NULL; -        iot_request_t   *req = NULL; -        int              balstatus = 0, ret = -1; - -        if (inode == NULL) { -                gf_log (conf->this->name, GF_LOG_DEBUG, -                        "Got NULL inode for ordered request"); -                ret = -EINVAL; -                goto out; -        } - -        LOCK (&inode->lock); -        { -                balstatus = iot_ordered_request_balancer (conf, inode, &idx); -                if (balstatus < 0) { -                        gf_log (conf->this->name, GF_LOG_DEBUG, -                                "Insane worker index. Unwinding stack"); -                        ret = -ECANCELED; -                        goto unlock_out; -                } -                /* inode lock once acquired, cannot be left here -                 * because other gluster main threads might be -                 * contending on it to append a request for this file. -                 * So we'll also leave the lock only after we've -                 * added the request to the worker queue. -                 */ -                selected_worker = conf->oworkers[idx]; - -                req = iot_init_request (selected_worker, stub); -                if (req == NULL) { -                        gf_log (conf->this->name, GF_LOG_ERROR,"out of memory"); -                        ret = -ENOMEM; -                        goto unlock_out; -                } -                ret = iot_request_queue_and_thread_fire (selected_worker, -                                                         iot_worker_ordered, -                                                         req); -        } -unlock_out: -        UNLOCK (&inode->lock); - -out: -        if (ret < 0) { -                if (req != NULL) { -                        iot_destroy_request (selected_worker, req); -                } -        } -        return ret; +        return iot_schedule (conf, stub);  } @@ -394,7 +237,7 @@ iot_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,               struct stat *stbuf, int32_t valid)  {          call_stub_t     *stub = NULL; -        int             ret = -1; +        int              ret = -1;          stub = fop_setattr_stub (frame, iot_setattr_wrapper, loc, stbuf, valid);          if (!stub) { @@ -445,7 +288,7 @@ iot_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd,                struct stat *stbuf, int32_t valid)  {          call_stub_t     *stub = NULL; -        int             ret = -1; +        int              ret = -1;          stub = fop_fsetattr_stub (frame, iot_fsetattr_wrapper, fd, stbuf,                                    valid); @@ -1442,7 +1285,7 @@ iot_checksum_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,  		    FIRST_CHILD(this),  		    FIRST_CHILD(this)->fops->checksum,  		    loc, flags); -   +  	return 0;  } @@ -1494,7 +1337,6 @@ iot_unlink_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc)  		    FIRST_CHILD(this),  		    FIRST_CHILD(this)->fops->unlink,  		    loc); -    	return 0;  } @@ -2190,355 +2032,58 @@ out:  } -/* Must be called with worker lock held */ -void -_iot_queue (iot_worker_t *worker, iot_request_t *req) -{ -        list_add_tail (&req->list, &worker->rqlist); - -        /* dq_cond */ -        worker->queue_size++; -        iot_notify_worker(worker); -} - - -iot_request_t * -iot_init_request (iot_worker_t *worker, call_stub_t *stub) -{ -	iot_request_t   *req = NULL; - -        req = mem_get (worker->req_pool); -        if (req == NULL) { -                goto out; -        } - -        req->stub = stub; -out: -        return req; -} - - -void -iot_destroy_request (iot_worker_t *worker, iot_request_t * req) -{ -        if ((req == NULL) || (worker == NULL)) -                return; - -        mem_put (worker->req_pool, req); -} - - -/* Must be called with worker lock held. */ -gf_boolean_t -iot_can_ordered_exit (iot_worker_t * worker) -{ -        gf_boolean_t     allow_exit = _gf_false; -        iot_conf_t      *conf = NULL; - -        conf = worker->conf; -        /* We dont want this thread to exit if its index is -         * below the min thread count. -         */ -        if (worker->thread_idx >= conf->min_o_threads) -                allow_exit = _gf_true; - -        return allow_exit; -} - -/* Must be called with worker lock held. */ -gf_boolean_t -iot_ordered_exit (int cond_waitres, iot_worker_t *worker) -{ -        gf_boolean_t     allow_exit = _gf_false; - -        if (worker->state == IOT_STATE_EXIT_REQUEST) { -                allow_exit = _gf_true; -        } else if (cond_waitres == ETIMEDOUT) { -                allow_exit = iot_can_ordered_exit (worker); -        } - -        if (allow_exit) { -                worker->state = IOT_STATE_DEAD; -                worker->thread = 0; -        } - -        return allow_exit; -} - -  int -iot_ordered_request_wait (iot_worker_t * worker) +__iot_workers_scale (iot_conf_t *conf)  { -        int             waitres = 0; -        int             retstat = 0; +        int       log2 = 0; +        int       scale = 0; +        int       diff = 0; +        pthread_t thread; +        int       ret = 0; -        if (worker->state == IOT_STATE_EXIT_REQUEST) { -                retstat = -1; -                goto out; -        } +        log2 = log_base2 (conf->queue_size); -        waitres = iot_notify_wait (worker, worker->conf->o_idle_time); -        if (iot_ordered_exit (waitres, worker)) { -                retstat = -1; -        } +        scale = log2; -out: -        return retstat; -} - - -call_stub_t * -iot_dequeue_ordered (iot_worker_t *worker) -{ -	call_stub_t     *stub = NULL; -	iot_request_t   *req = NULL; -        int              waitstat = 0; +        if (log2 < IOT_MIN_THREADS) +                scale = IOT_MIN_THREADS; -	LOCK (&worker->qlock); -        { -                while (!worker->queue_size) { -                        waitstat = 0; -                        waitstat = iot_ordered_request_wait (worker); -                        /* We must've timed out and are now required to -                         * exit. -                         */ -                        if (waitstat == -1) -                                goto out; -                } +        if (log2 > conf->max_count) +                scale = IOT_MAX_THREADS; -                list_for_each_entry (req, &worker->rqlist, list) -                        break; -                list_del (&req->list); -                stub = req->stub; - -                worker->queue_size--; +        if (conf->curr_count < scale) { +                diff = scale - conf->curr_count;          } -out: -	UNLOCK (&worker->qlock); -        iot_destroy_request (worker, req); -	return stub; -} - - -void * -iot_worker_ordered (void *arg) -{ -        iot_worker_t    *worker = arg; -        call_stub_t     *stub = NULL; - -	while (1) { +        while (diff) { +                diff --; -                stub = iot_dequeue_ordered (worker); -                /* If stub is NULL, we must've timed out waiting for a -                 * request and have now been allowed to exit. -                 */ -                if (!stub) +                ret = pthread_create (&thread, &conf->w_attr, iot_worker, conf); +                if (ret == 0) +                        conf->curr_count++; +                else                          break; -		call_resume (stub); -	} - -        return NULL; -} - - -/* Must be called with worker lock held. */ -gf_boolean_t -iot_can_unordered_exit (iot_worker_t * worker) -{ -        gf_boolean_t    allow_exit = _gf_false; -        iot_conf_t      *conf = NULL; - -        conf = worker->conf; -        /* We dont want this thread to exit if its index is -         * below the min thread count. -         */ -        if (worker->thread_idx >= conf->min_u_threads) -                allow_exit = _gf_true; - -        return allow_exit; -} - - -/* Must be called with worker lock held. */ -gf_boolean_t -iot_unordered_exit (int cond_waitres, iot_worker_t *worker) -{ -        gf_boolean_t     allow_exit = _gf_false; - -        if (worker->state == IOT_STATE_EXIT_REQUEST) { -                allow_exit = _gf_true; -        } else if (cond_waitres == ETIMEDOUT) { -                allow_exit = iot_can_unordered_exit (worker);          } -        if (allow_exit) { -                worker->state = IOT_STATE_DEAD; -                worker->thread = 0; -        } - -        return allow_exit; +        return diff;  }  int -iot_unordered_request_wait (iot_worker_t * worker) +iot_workers_scale (iot_conf_t *conf)  { -        int             waitres = 0; -        int             retstat = 0; +        int     ret = -1; -        if (worker->state == IOT_STATE_EXIT_REQUEST) { -                retstat = -1; +        if (conf == NULL) { +                ret = -EINVAL;                  goto out;          } -        waitres = iot_notify_wait (worker, worker->conf->u_idle_time); -        if (iot_unordered_exit (waitres, worker)) { -                retstat = -1; -        } - -out: -        return retstat; -} - - -call_stub_t * -iot_dequeue_unordered (iot_worker_t *worker) -{ -        call_stub_t     *stub= NULL; -        iot_request_t   *req = NULL; -        int              waitstat = 0; - -	LOCK (&worker->qlock); +        pthread_mutex_lock (&conf->mutex);          { -                while (!worker->queue_size) { -                        waitstat = 0; -                        waitstat = iot_unordered_request_wait (worker); -                        /* If -1, request wait must've timed -                         * out. -                         */ -                        if (waitstat == -1) -                                goto out; -                } - -                list_for_each_entry (req, &worker->rqlist, list) -                        break; -                list_del (&req->list); -                stub = req->stub; - -                worker->queue_size--; +                ret = __iot_workers_scale (conf);          } -out: -	UNLOCK (&worker->qlock); -        iot_destroy_request (worker, req); - -	return stub; -} - - -void * -iot_worker_unordered (void *arg) -{ -        iot_worker_t    *worker = arg; -        call_stub_t     *stub = NULL; - -	while (1) { - -		stub = iot_dequeue_unordered (worker); -                /* If no request was received, we must've timed out, -                 * and can exit. */ -                if (!stub) -                        break; - -		call_resume (stub); -	} - -        return NULL; -} - - -void -deallocate_worker_array (iot_worker_t **workers) -{ -        FREE (workers); -} - -void -deallocate_workers (iot_worker_t **workers, -                    int start_alloc_idx, int count) -{ -        int     i; -        int     end_count; - -        end_count = count + start_alloc_idx; -        for (i = start_alloc_idx; (i < end_count); i++) { -                if (workers[i] != NULL) { -                        mem_pool_destroy (workers[i]->req_pool); -                        FREE (workers[i]); -                        workers[i] = NULL; -                } -        } -         -} - - -iot_worker_t ** -allocate_worker_array (int count) -{ -        iot_worker_t    **warr = NULL; - -        warr = CALLOC (count, sizeof (iot_worker_t *)); - -        return warr; -} - - -iot_worker_t * -allocate_worker (iot_conf_t * conf) -{ -        iot_worker_t    *wrk = NULL; - -        wrk = CALLOC (1, sizeof (iot_worker_t)); -        if (wrk == NULL) { -                gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); -                goto out; -        } - -        wrk->req_pool = mem_pool_new (iot_request_t, IOT_REQUEST_MEMPOOL_SIZE); -        if (wrk->req_pool == NULL) -                goto free_wrk; - -        INIT_LIST_HEAD (&wrk->rqlist); -        wrk->conf = conf; -        iot_notify_init (wrk); -        wrk->state = IOT_STATE_DEAD; - -out: -        return wrk; - -free_wrk: -        FREE (wrk); -        return NULL; -} - - -int -allocate_workers (iot_conf_t *conf, iot_worker_t **workers, int start_alloc_idx, -                  int count) -{ -        int     i; -        int     end_count, ret = -1; - -        end_count = count + start_alloc_idx; -        for (i = start_alloc_idx; i < end_count; i++) { -                workers[i] = allocate_worker (conf); -                if (workers[i] == NULL) { -                        ret = -ENOMEM; -                        goto out; -                } -                workers[i]->thread_idx = i; -        } -        ret = 0; +        pthread_mutex_unlock (&conf->mutex);  out:          return ret; @@ -2546,75 +2091,6 @@ out:  void -iot_stop_worker (iot_worker_t *worker) -{ -        LOCK (&worker->qlock); -        { -                worker->state = IOT_STATE_EXIT_REQUEST; -        } -        UNLOCK (&worker->qlock); - -        iot_notify_worker (worker); -        pthread_join (worker->thread, NULL); -} - - -void -iot_stop_workers (iot_worker_t **workers, int start_idx, int count) -{ -        int     i = 0; -        int     end_idx = 0; - -        end_idx = start_idx + count; -        for (i = start_idx; i < end_idx; i++) { -                if (workers[i] != NULL) { -                        iot_stop_worker (workers[i]); -                } -        } -} - - -int -iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc) -{ -        int     ret = -1; -        ret = pthread_create (&worker->thread, &worker->conf->w_attr, -                              workerfunc, worker); -        if (ret != 0) { -                gf_log (worker->conf->this->name, GF_LOG_ERROR, -                        "cannot start worker (%s)", strerror (errno)); -                ret = -ret; -        } else { -                worker->state = IOT_STATE_ACTIVE; -        } - -        return ret; -} - - -int -iot_startup_workers (iot_worker_t **workers, int start_idx, int count, -                     iot_worker_fn workerfunc) -{ -        int     i = 0; -        int     end_idx = 0; -        int     ret = -1;  - -        end_idx = start_idx + count; -        for (i = start_idx; i < end_idx; i++) { -                ret = iot_startup_worker (workers[i], workerfunc); -                if (ret < 0) { -                        goto out; -                } -        } - -        ret = 0; -out: -        return ret; -} - - -void  set_stack_size (iot_conf_t *conf)  {          int     err = 0; @@ -2624,99 +2100,8 @@ set_stack_size (iot_conf_t *conf)          err = pthread_attr_setstacksize (&conf->w_attr, stacksize);          if (err == EINVAL) {                  gf_log (conf->this->name, GF_LOG_WARNING, -                                "Using default thread stack size"); -        } -} - - -void -iot_cleanup_workers (iot_conf_t *conf) -{ -        if (conf->uworkers != NULL) { -                iot_stop_workers (conf->uworkers, 0, -                                  conf->max_u_threads); -                     -                deallocate_workers (conf->uworkers, 0, -                                    conf->max_u_threads); - -                deallocate_worker_array (conf->uworkers); -        } -                 -        if (conf->oworkers != NULL) { -                iot_stop_workers (conf->oworkers, 0, -                                  conf->max_o_threads); -                         -                deallocate_workers (conf->oworkers, 0, -                                    conf->max_o_threads); -                         -                deallocate_worker_array (conf->oworkers); -        } -} - - -int -workers_init (iot_conf_t *conf) -{ -        int     ret = -1; - -        if (conf == NULL) { -                ret = -EINVAL; -                goto err; -        } - -        /* Initialize un-ordered workers */ -        conf->uworkers = allocate_worker_array (conf->max_u_threads); -        if (conf->uworkers == NULL) { -                gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); -                ret = -ENOMEM; -                goto err; -        } - -        ret = allocate_workers (conf, conf->uworkers, 0, -                                conf->max_u_threads); -        if (ret < 0) { -                gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); -                goto err; +                        "Using default thread stack size");          } - -        /* Initialize ordered workers */ -        conf->oworkers = allocate_worker_array (conf->max_o_threads); -        if (conf->oworkers == NULL) { -                gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); -                ret = -ENOMEM; -                goto err; -        } - -        ret = allocate_workers (conf, conf->oworkers, 0, -                                conf->max_o_threads); -        if (ret < 0) { -                gf_log (conf->this->name, GF_LOG_ERROR, "out of memory"); -                goto err; -        } - -        set_stack_size (conf); -        ret = iot_startup_workers (conf->oworkers, 0, conf->min_o_threads, -                                   iot_worker_ordered); -        if (ret == -1) { -                /* logged inside iot_startup_workers */ -                goto err; -        } - -        ret = iot_startup_workers (conf->uworkers, 0, conf->min_u_threads, -                                   iot_worker_unordered); -        if (ret == -1) { -                /* logged inside iot_startup_workers */ -                goto err; -        } - -        return 0; - -err: -        if (conf != NULL)  { -                iot_cleanup_workers (conf); -        } - -        return ret;  } @@ -2725,11 +2110,10 @@ init (xlator_t *this)  {          iot_conf_t      *conf = NULL;          dict_t          *options = this->options; -        int             thread_count = IOT_DEFAULT_THREADS; -        gf_boolean_t    autoscaling = IOT_SCALING_OFF; -        char            *scalestr = NULL; -        int             min_threads, max_threads, ret = -1; -         +        int              thread_count = IOT_DEFAULT_THREADS; +        int              idle_time = IOT_DEFAULT_IDLE; +        int              ret = 0; +  	if (!this->children || this->children->next) {  		gf_log ("io-threads", GF_LOG_ERROR,  			"FATAL: iot not configured with exactly one child"); @@ -2748,100 +2132,33 @@ init (xlator_t *this)                  goto out;          } -        if ((dict_get_str (options, "autoscaling", &scalestr)) == 0) { -                if ((gf_string2boolean (scalestr, &autoscaling)) == -1) { -                        gf_log (this->name, GF_LOG_ERROR, -                                        "'autoscaling' option must be" -                                        " boolean"); -                        goto out; -                } -        } +        thread_count = IOT_DEFAULT_THREADS;  	if (dict_get (options, "thread-count")) {                  thread_count = data_to_int32 (dict_get (options, -                                        "thread-count")); -                if (scalestr != NULL) -                        gf_log (this->name, GF_LOG_WARNING, -                                        "'thread-count' is specified with " -                                        "'autoscaling' on. Ignoring" -                                        "'thread-count' option."); -                if (thread_count < 2) +                                                        "thread-count")); +                if (thread_count < IOT_MIN_THREADS)                          thread_count = IOT_MIN_THREADS; -        } -        min_threads = IOT_DEFAULT_THREADS; -        max_threads = IOT_MAX_THREADS; -        if (dict_get (options, "min-threads")) -                min_threads = data_to_int32 (dict_get (options, -                                        "min-threads")); +                if (thread_count > IOT_MAX_THREADS) +                        thread_count = IOT_MAX_THREADS; +        } +        conf->max_count = thread_count; -        if (dict_get (options, "max-threads")) -                max_threads = data_to_int32 (dict_get (options, -                                        "max-threads")); -        -        if (min_threads > max_threads) { -                gf_log (this->name, GF_LOG_ERROR, " min-threads must be less " -                                "than max-threads"); -                goto out; +	if (dict_get (options, "idle-time")) { +                idle_time = data_to_int32 (dict_get (options, +                                                     "idle-time")); +                if (idle_time < 0) +                        idle_time = 1;          } +        conf->idle_time = idle_time; -        /* If autoscaling is off, then adjust the min and max -         * threads according to thread-count. -         * This is based on the assumption that despite autoscaling -         * being off, we still want to have separate pools for data -         * and meta-data threads. -         */ -        if (!autoscaling) -                max_threads = min_threads = thread_count; +        conf->this = this; -        /* If user specifies an odd number of threads, increase it by -         * one. The reason for having an even number of threads is -         * explained later. -         */ -        if (max_threads % 2) -                max_threads++; - -        if(min_threads % 2) -                min_threads++; - -        /* If the user wants to have only a single thread for -        * some strange reason, make sure we set this count to -        * 2. Explained later. -        */ -        if (min_threads < IOT_MIN_THREADS) -                min_threads = IOT_MIN_THREADS; - -        /* Again, have atleast two. Read on. */ -        if (max_threads < IOT_MIN_THREADS) -                max_threads = IOT_MIN_THREADS; - -        /* This is why we need atleast two threads. -         * We're dividing the specified thread pool into -         * 2 halves, equally between ordered and unordered -         * pools. -         */ +        INIT_LIST_HEAD (&conf->req); -        /* Init params for un-ordered workers. */ -        pthread_mutex_init (&conf->utlock, NULL); -        conf->max_u_threads = max_threads / 2; -        conf->min_u_threads = min_threads / 2; -        conf->u_idle_time = IOT_DEFAULT_IDLE; -        conf->u_scaling = autoscaling; - -        /* Init params for ordered workers. */ -        pthread_mutex_init (&conf->otlock, NULL); -        conf->max_o_threads = max_threads / 2; -        conf->min_o_threads = min_threads / 2; -        conf->o_idle_time = IOT_DEFAULT_IDLE; -        conf->o_scaling = autoscaling; - -        gf_log (this->name, GF_LOG_DEBUG, -                "io-threads: Autoscaling: %s, " -                "min_threads: %d, max_threads: %d", -                (autoscaling) ? "on":"off", min_threads, max_threads); +	ret = iot_workers_scale (conf); -        conf->this = this; -	ret = workers_init (conf);          if (ret == -1) {                  gf_log (this->name, GF_LOG_ERROR,                          "cannot initialize worker threads, exiting init"); @@ -2902,7 +2219,7 @@ struct xlator_fops fops = {          .opendir     = iot_opendir,     /* U */          .fsyncdir    = iot_fsyncdir,    /* O */          .statfs      = iot_statfs,      /* U */ -        .setxattr    = iot_setxattr,     /* U */ +        .setxattr    = iot_setxattr,    /* U */          .getxattr    = iot_getxattr,    /* U */          .fgetxattr   = iot_fgetxattr,   /* O */          .fsetxattr   = iot_fsetxattr,   /* O */ @@ -2920,29 +2237,16 @@ struct xlator_cbks cbks = {  };  struct volume_options options[] = { -	{ .key  = {"thread-count"},  -	  .type = GF_OPTION_TYPE_INT,  -	  .min  = IOT_MIN_THREADS,  +	{ .key  = {"thread-count"}, +	  .type = GF_OPTION_TYPE_INT, +	  .min  = IOT_MIN_THREADS,  	  .max  = IOT_MAX_THREADS  	}, -        { .key  = {"autoscaling"}, -          .type = GF_OPTION_TYPE_BOOL -        }, -        { .key          = {"min-threads"}, -          .type         = GF_OPTION_TYPE_INT, -          .min          = IOT_MIN_THREADS, -          .max          = IOT_MAX_THREADS, -          .description  = "Minimum number of threads must be greater than or " -                          "equal to 2. If the specified value is less than 2 " -                          "it is adjusted upwards to 2. This is a requirement" -                          " for the current model of threading in io-threads." +        {.key   = {"idle-time"}, +         .type  = GF_OPTION_TYPE_INT, +         .min   = 1, +         .max   = 0x7fffffff,          }, -        { .key          = {"max-threads"}, -          .type         = GF_OPTION_TYPE_INT, -          .min          = IOT_MIN_THREADS, -          .max          = IOT_MAX_THREADS, -          .description  = "Maximum number of threads is advisory only so the " -                          "user specified value will be used." +	{ .key  = {NULL},          }, -	{ .key  = {NULL} },  }; diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index 3cd959069ba..8b9985be16c 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -37,144 +37,38 @@  #include "locking.h"  #include <semaphore.h> -#define min(a,b) ((a)<(b)?(a):(b)) -#define max(a,b) ((a)>(b)?(a):(b))  struct iot_conf; -struct iot_worker; -struct iot_request; - -struct iot_request { -        struct list_head list;        /* Attaches this request to the list of -                                         requests. -                                      */ -        call_stub_t *stub; -}; - -typedef enum { -        IOT_STATE_ACTIVE, -        IOT_STATE_EXIT_REQUEST, -        IOT_STATE_DEAD -}iot_state_t; -#define iot_worker_active(wrk)  ((wrk)->state == IOT_STATE_ACTIVE)  #define MAX_IDLE_SKEW                   4       /* In secs */  #define skew_sec_idle_time(sec)         ((sec) + (random () % MAX_IDLE_SKEW))  #define IOT_DEFAULT_IDLE                180     /* In secs. */ -#define IOT_MIN_THREADS         2 -#define IOT_DEFAULT_THREADS     16 +#define IOT_MIN_THREADS         1 +#define IOT_DEFAULT_THREADS     8  #define IOT_MAX_THREADS         64 -#define IOT_SCALING_OFF                 _gf_false -#define IOT_SCALING_ON                  _gf_true -#define iot_ordered_scaling_on(conf)    ((conf)->o_scaling == IOT_SCALING_ON) -#define iot_unordered_scaling_on(conf)  ((conf)->u_scaling == IOT_SCALING_ON)  #define IOT_THREAD_STACK_SIZE   ((size_t)(1024*1024)) -/* This signifies the max number of outstanding request we're expecting - * at a point for every worker thread. - * For an idea of the memory foot-print, consider at most 16 Bytes per - * iot_request_t on a 64-bit system with another 16 bytes per chunk in the - * header. For 64 slots in the pool, we'll use up 2 KiB, with 64 threads this - * goes up to 128 KiB. - * - * Note that this size defines the size of the per-worker mem pool. The - * advantage is that, we're not only reducing the rate of small iot_request_t - * allocations from the heap but also reducing the contention on the libc heap - * by having a mem pool, though small, for each worker. - */ -#define IOT_REQUEST_MEMPOOL_SIZE        64 - -struct iot_worker { -        struct list_head rqlist;      /* List of requests assigned to me. */ -        struct iot_conf  *conf; -#ifndef HAVE_SPINLOCK -        pthread_cond_t   notifier; -#else -        sem_t            notifier; -#endif -        int64_t          q,dq; -        gf_lock_t        qlock; -        int32_t          queue_size; -        pthread_t        thread; -        iot_state_t      state;            /* What state is the thread in. */ -        int              thread_idx;       /* Thread's index into the worker -                                              array. Since this will be thread -                                              local data, for ensuring that -                                              number of threads dont fall below -                                              a minimum, we just dont allow -                                              threads with specific indices to -                                              exit. Helps us in eliminating one -                                              place where otherwise a lock -                                              would have been required to update -                                              centralized state inside conf. -                                           */ -        struct mem_pool  *req_pool;    /* iot_request_t's come from here. */ -};  struct iot_conf { -        int32_t              thread_count; -        struct iot_worker  **workers; +        pthread_mutex_t      mutex; +        pthread_cond_t       cond; + +        int32_t              max_count;   /* configured maximum */ +        int32_t              curr_count;  /* actual number of threads running */ +        int32_t              sleep_count; + +        int32_t              idle_time;   /* in seconds */ + +        struct list_head     req; +        int                  queue_size; +        pthread_attr_t       w_attr;          xlator_t            *this; -        /* Config state for ordered threads. */ -        pthread_mutex_t      otlock;       /* Used to sync any state that needs -                                              to be changed by the ordered -                                              threads. -                                           */ - -        int                  max_o_threads; /* Max. number of ordered threads */ -        int                  min_o_threads; /* Min. number of ordered threads. -                                               Ordered thread count never falls -                                               below this threshold. -                                            */ - -        int                  o_idle_time;   /* in Secs. The idle time after -                                               which an ordered thread exits. -                                            */ -        gf_boolean_t         o_scaling;     /* Set to IOT_SCALING_OFF if user -                                               does not want thread scaling on -                                               ordered threads. If scaling is -                                               off, io-threads maintains at -                                               least min_o_threads number of -                                               threads and never lets any thread -                                               exit. -                                            */ -        struct iot_worker  **oworkers;      /* Ordered thread pool. */ - - -        /* Config state for unordered threads */ -        pthread_mutex_t      utlock;       /* Used for scaling un-ordered -                                              threads. */ -        struct iot_worker  **uworkers;     /* Un-ordered thread pool. */ -        int                  max_u_threads; /* Number of unordered threads will -                                               not be higher than this. */ -        int                  min_u_threads; /* Number of unordered threads -                                               should not fall below this value. -                                            */ -        int                  u_idle_time;   /* If an unordered thread does not -                                               get a request for this amount of -                                               secs, it should try to die. -                                            */ -        gf_boolean_t         u_scaling;     /* Set to IOT_SCALING_OFF if user -                                               does not want thread scaling on -                                               unordered threads. If scaling is -                                               off, io-threads maintains at -                                               least min_u_threads number of -                                               threads and never lets any thread -                                               exit. -                                            */ - -        pthread_attr_t       w_attr;        /* Used to reduce the stack size of -                                               the pthread worker down from the -                                               default of 8MiB. -                                            */  };  typedef struct iot_conf iot_conf_t; -typedef struct iot_worker iot_worker_t; -typedef struct iot_request iot_request_t;  #endif /* __IOT_H */  | 
