diff options
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 220 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 20 | 
2 files changed, 182 insertions, 58 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 899702f1bdd..1d255d97267 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -34,18 +34,12 @@  typedef void *(*iot_worker_fn)(void*); -static void -iot_queue (iot_worker_t *worker, -           call_stub_t *stub); - -static call_stub_t * -iot_dequeue (iot_worker_t *worker); -  void _iot_queue (iot_worker_t *worker, iot_request_t *req);  iot_request_t * iot_init_request (call_stub_t *stub);  void iot_startup_workers (iot_worker_t **workers, int start_idx, int count,                  iot_worker_fn workerfunc);  void * iot_worker_unordered (void *arg); +void * iot_worker_ordered (void *arg);  void  iot_schedule_unordered (iot_conf_t *conf, @@ -79,19 +73,49 @@ iot_schedule_unordered (iot_conf_t *conf,          pthread_mutex_unlock (&selected_worker->qlock);  } -static void -iot_schedule (iot_conf_t *conf, +void +iot_schedule_ordered (iot_conf_t *conf,                  inode_t *inode,                  call_stub_t *stub)  { -        int32_t         idx = 0; +        uint64_t        idx = 0;          iot_worker_t    *selected_worker = NULL; +        iot_request_t   * req = NULL; +        int             ctxret = 0; -        if (inode) -                idx = (inode->ino % conf->thread_count); -        selected_worker = conf->workers[idx]; - -        iot_queue (selected_worker, stub); +        if (inode == NULL) { +                gf_log (conf->this->name, GF_LOG_ERROR, +                                "Got NULL inode for ordered request"); +                return; +        } +        req = iot_init_request (stub); +        LOCK (&inode->lock); +        { +                ctxret = __inode_ctx_get (inode, conf->this, &idx); +                if (ctxret < 0) { +                        idx = (random () % conf->max_o_threads); +                        __inode_ctx_put (inode, conf->this, idx); +                } +                /* inode lock once acquired, cannot be left here +                 * because other gluster main threads might be +                 * contending on it to append a request for this file. +                 * So we'll also leave the lock only after we've +                 * added the request to the worker queue. +                 */ +                selected_worker = conf->oworkers[idx]; +                pthread_mutex_lock (&selected_worker->qlock); +                { +                        if (iot_worker_active (selected_worker)) +                                _iot_queue (selected_worker, req); +                        else { +                                iot_startup_workers (conf->oworkers, idx, 1, +                                                iot_worker_ordered); +                                _iot_queue (selected_worker, req); +                        } +                } +                pthread_mutex_unlock (&selected_worker->qlock); +        } +        UNLOCK (&inode->lock);  }  int32_t @@ -183,7 +207,8 @@ iot_chmod (call_frame_t *frame,                  iot_schedule_unordered ((iot_conf_t *)this->private,                                  loc->inode, stub);          else { -                iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +                iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode, +                                stub);                  fd_unref (fd);          }          return 0; @@ -227,7 +252,7 @@ iot_fchmod (call_frame_t *frame,                  return 0;          } -        iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); +        iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);          return 0;  } @@ -277,7 +302,8 @@ iot_chown (call_frame_t *frame,                  iot_schedule_unordered ((iot_conf_t *)this->private,                                  loc->inode, stub);          else { -                iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +                iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode, +                                stub);                  fd_unref (fd);          } @@ -324,7 +350,7 @@ iot_fchown (call_frame_t *frame,                  return 0;          } -        iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); +        iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);          return 0;  } @@ -780,7 +806,7 @@ iot_readv (call_frame_t *frame,  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); +        iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);  	return 0;  } @@ -823,7 +849,7 @@ iot_flush (call_frame_t *frame,  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); +        iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);  	return 0;  } @@ -870,7 +896,7 @@ iot_fsync (call_frame_t *frame,  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); +        iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);  	return 0;  } @@ -923,7 +949,7 @@ iot_writev (call_frame_t *frame,  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); +        iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);  	return 0;  } @@ -977,7 +1003,7 @@ iot_lk (call_frame_t *frame,  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); +        iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);  	return 0;  } @@ -1032,7 +1058,8 @@ iot_stat (call_frame_t *frame,                  iot_schedule_unordered ((iot_conf_t *)this->private,                                  loc->inode, stub);          else { -                iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +                iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode, +                                stub);  	        fd_unref (fd);          } @@ -1080,7 +1107,7 @@ iot_fstat (call_frame_t *frame,  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); +        iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);  	return 0;  } @@ -1136,7 +1163,8 @@ iot_truncate (call_frame_t *frame,                  iot_schedule_unordered ((iot_conf_t *)this->private,                                  loc->inode, stub);          else { -                iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +                iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode, +                                stub);  	        fd_unref (fd);          } @@ -1186,7 +1214,7 @@ iot_ftruncate (call_frame_t *frame,  		STACK_UNWIND (frame, -1, ENOMEM, NULL);  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); +        iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);  	return 0;  } @@ -1243,7 +1271,7 @@ iot_utimens (call_frame_t *frame,                  iot_schedule_unordered ((iot_conf_t *)this->private,                                  loc->inode, stub);          else { -                iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +                iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode, stub);  	        fd_unref (fd);          } @@ -1369,30 +1397,88 @@ iot_init_request (call_stub_t *stub)          return req;  } -static void -iot_queue (iot_worker_t *worker, -           call_stub_t *stub) +/* Must be called with worker lock held. */ +int +iot_can_ordered_exit (iot_worker_t * worker)  { -        iot_request_t   *req = NULL; +        int             allow_exit = 0; +        iot_conf_t      *conf = NULL; -        req = iot_init_request (stub); +        conf = worker->conf; +        if (worker->queue_size > 0) +                goto decided; + +        /* We dont want this thread to exit if its index is +         * below the min thread count. +         */ +        if (worker->thread_idx >= conf->min_o_threads) +                allow_exit = 1; + +decided: +        return allow_exit; +} + +int +iot_ordered_exit (iot_worker_t *worker) +{ +        int     allow_exit = 0; + +        /* It is possible that since the last time we timed out while +         * waiting for a request, a new request has been added to this +         * worker's request queue. Before we really exit, we must +         * check for those requests. +         */          pthread_mutex_lock (&worker->qlock);          { -                _iot_queue (worker, req); -        } -	pthread_mutex_unlock (&worker->qlock); +                allow_exit = iot_can_ordered_exit (worker); + +                if (allow_exit) { +                        worker->state = IOT_STATE_DEAD; +                        worker->thread = 0; +                } +       } +        pthread_mutex_unlock (&worker->qlock); + +        return allow_exit;  } -static call_stub_t * -iot_dequeue (iot_worker_t *worker) +int +iot_ordered_request_wait (iot_worker_t * worker)  { -	call_stub_t *stub = NULL; -	iot_request_t *req = NULL; +        struct timeval tv; +        struct timespec ts; +        int waitres = 0; + +        gettimeofday (&tv, NULL); +        ts.tv_sec = tv.tv_sec + worker->conf->o_idle_time; +        /* Slightly skew the idle time for threads so that, we dont +         * have all of them rushing to exit at the same time, if +         * they've been idle. +         */ +        ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000; +        waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock, +                        &ts); +        if (waitres == ETIMEDOUT) +                return -1; + +        return 0; +} + +call_stub_t * +iot_dequeue_ordered (iot_worker_t *worker) +{ +	call_stub_t     *stub = NULL; +	iot_request_t   *req = NULL; +        int             waitstat = 0;  	pthread_mutex_lock (&worker->qlock);          { -                while (!worker->queue_size) -                        pthread_cond_wait (&worker->dq_cond, &worker->qlock); +                while (!worker->queue_size) { +                        waitstat = 0; +                        waitstat = iot_ordered_request_wait (worker); +                        if (waitstat == -1) +                                goto out; +                }                  list_for_each_entry (req, &worker->rqlist, list)                          break; @@ -1401,6 +1487,7 @@ iot_dequeue (iot_worker_t *worker)                  worker->queue_size--;          } +out:  	pthread_mutex_unlock (&worker->qlock);  	FREE (req); @@ -1408,17 +1495,25 @@ iot_dequeue (iot_worker_t *worker)  	return stub;  } -static void * -iot_worker (void *arg) +void * +iot_worker_ordered (void *arg)  { -	iot_worker_t *worker = arg; +        iot_worker_t    *worker = arg; +        call_stub_t     *stub = NULL;  	while (1) { -		call_stub_t *stub; -		stub = iot_dequeue (worker); +                stub = iot_dequeue_ordered (worker); +                if (stub == NULL) { +                        if (iot_ordered_exit (worker)) +                                break; +                        else +                                continue; +                }  		call_resume (stub);  	} + +        return NULL;  }  /* Must be called with worker lock held. */ @@ -1460,7 +1555,7 @@ iot_unordered_exit (iot_worker_t *worker)                          worker->state = IOT_STATE_DEAD;                          worker->thread = 0;                  } -       } +        }          pthread_mutex_unlock (&worker->qlock);          return allow_exit; @@ -1468,11 +1563,11 @@ iot_unordered_exit (iot_worker_t *worker)  int -iot_request_wait_idleness (iot_worker_t * worker) +iot_unordered_request_wait (iot_worker_t * worker)  { -        struct timeval tv; +        struct timeval  tv;          struct timespec ts; -        int waitres = 0; +        int             waitres = 0;          gettimeofday (&tv, NULL);          ts.tv_sec = tv.tv_sec + worker->conf->u_idle_time; @@ -1481,7 +1576,7 @@ iot_request_wait_idleness (iot_worker_t * worker)           * they've been idle.           */          ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000; -        waitres = pthread_cond_timedwait (&worker->dq_cond,&worker->qlock, +        waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock,                          &ts);          if (waitres == ETIMEDOUT)                  return -1; @@ -1501,7 +1596,7 @@ iot_dequeue_unordered (iot_worker_t *worker)          {                  while (!worker->queue_size) {                          waitstat = 0; -                        waitstat = iot_request_wait_idleness (worker); +                        waitstat = iot_unordered_request_wait (worker);                          /* If -1, request wait must've timed                           * out.                           */ @@ -1613,14 +1708,16 @@ iot_startup_workers (iot_worker_t **workers, int start_idx, int count,  static void  workers_init (iot_conf_t *conf)  { -        conf->workers = allocate_worker_array (conf->thread_count); -        allocate_workers (conf, conf->workers, 0, conf->thread_count); -          /* Initialize un-ordered workers */          conf->uworkers = allocate_worker_array (conf->max_u_threads);          allocate_workers (conf, conf->uworkers, 0, conf->max_u_threads); -        iot_startup_workers (conf->workers, 0, conf->thread_count, iot_worker); +        /* Initialize ordered workers */ +        conf->oworkers = allocate_worker_array (conf->max_o_threads); +        allocate_workers (conf, conf->oworkers, 0, conf->max_o_threads); + +        iot_startup_workers (conf->oworkers, 0, conf->min_o_threads, +                        iot_worker_ordered);          iot_startup_workers (conf->uworkers, 0, conf->min_u_threads,                          iot_worker_unordered);  } @@ -1666,6 +1763,13 @@ init (xlator_t *this)          conf->min_u_threads = IOT_MIN_THREADS;          conf->u_idle_time = IOT_DEFAULT_IDLE; +        /* Init params for ordered workers. */ +        pthread_mutex_init (&conf->otlock, NULL); +        conf->max_o_threads = IOT_MAX_THREADS; +        conf->min_o_threads = IOT_MIN_THREADS; +        conf->o_idle_time = IOT_DEFAULT_IDLE; + +        conf->this = this;  	workers_init (conf);  	this->private = conf; diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index f02b641f415..412b3cc7638 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -83,6 +83,26 @@ struct iot_conf {    int32_t thread_count;    struct iot_worker ** workers; +  xlator_t *this; +  /* Config state for ordered threads. */ +  pthread_mutex_t otlock;       /* Used to sync any state that needs to be +                                   changed by the ordered threads. +                                   */ + +  int max_o_threads;            /* Max. number of ordered threads */ +  int min_o_threads;            /* Min. number of ordered threads. Ordered +                                   thread count never falls below this +                                   threshold. +                                   */ + +  int o_idle_time;              /* in Secs. The idle time after which an +                                   ordered thread exits. +                                   */ + +  struct iot_worker **oworkers; /* Ordered thread pool. */ + + +  /* Config state for unordered threads */    pthread_mutex_t utlock;       /* Used for scaling un-ordered threads. */    struct iot_worker **uworkers; /* Un-ordered thread pool. */    int max_u_threads;            /* Number of unordered threads will not be  | 
