diff options
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 399 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 33 | 
2 files changed, 334 insertions, 98 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 59fc642ca3a..899702f1bdd 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -28,6 +28,11 @@  #include "dict.h"  #include "xlator.h"  #include "io-threads.h" +#include <stdlib.h> +#include <sys/time.h> +#include <time.h> + +typedef void *(*iot_worker_fn)(void*);  static void  iot_queue (iot_worker_t *worker, @@ -36,6 +41,44 @@ iot_queue (iot_worker_t *worker,  static call_stub_t *  iot_dequeue (iot_worker_t *worker); +void _iot_queue (iot_worker_t *worker, iot_request_t *req); +iot_request_t * iot_init_request (call_stub_t *stub); +void iot_startup_workers (iot_worker_t **workers, int start_idx, int count, +                iot_worker_fn workerfunc); +void * iot_worker_unordered (void *arg); + +void +iot_schedule_unordered (iot_conf_t *conf, +                inode_t *inode, +                call_stub_t *stub) +{ +        int32_t         idx = 0; +        iot_worker_t    *selected_worker = NULL; +        iot_request_t   *req = NULL; + +        /* First decide which thread will service the request. +         * FIXME: This should change into some form of load-balancing. +         * */ +        idx = (random() % conf->max_u_threads); +        selected_worker = conf->uworkers[idx]; + +        req = iot_init_request (stub); +        /* Having decided that, we must check whether the thread is +         * active at all. +         */ +        pthread_mutex_lock (&selected_worker->qlock); +        { +                if (iot_worker_active (selected_worker)) +                        _iot_queue (selected_worker, req); +                else { +                        iot_startup_workers (conf->uworkers, idx, 1, +                                        iot_worker_unordered); +                        _iot_queue (selected_worker, req); +                } +        } +        pthread_mutex_unlock (&selected_worker->qlock); +} +  static void  iot_schedule (iot_conf_t *conf,                  inode_t *inode, @@ -92,7 +135,7 @@ iot_lookup (call_frame_t *frame,                  return 0;          } -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +        iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);          return 0;  } @@ -126,6 +169,7 @@ iot_chmod (call_frame_t *frame,                  mode_t mode)  {          call_stub_t     *stub = NULL; +        fd_t            *fd = NULL;          stub = fop_chmod_stub (frame, iot_chmod_wrapper, loc, mode);          if (!stub) { @@ -133,7 +177,15 @@ iot_chmod (call_frame_t *frame,                  STACK_UNWIND (frame, -1, ENOMEM, NULL);                  return 0;          } -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + +	fd = fd_lookup (loc->inode, frame->root->pid); +        if (fd == NULL) +                iot_schedule_unordered ((iot_conf_t *)this->private, +                                loc->inode, stub); +        else { +                iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +                fd_unref (fd); +        }          return 0;  } @@ -211,6 +263,7 @@ iot_chown (call_frame_t *frame,                  gid_t gid)  {          call_stub_t     *stub = NULL; +        fd_t            *fd = NULL;          stub = fop_chown_stub (frame, iot_chown_wrapper, loc, uid, gid);          if (!stub) { @@ -219,7 +272,14 @@ iot_chown (call_frame_t *frame,                  return 0;          } -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +        fd = fd_lookup (loc->inode, frame->root->pid); +        if (fd == NULL) +                iot_schedule_unordered ((iot_conf_t *)this->private, +                                loc->inode, stub); +        else { +                iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +                fd_unref (fd); +        }          return 0;  } @@ -306,7 +366,7 @@ iot_access (call_frame_t *frame,                  return 0;          } -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +        iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);          return 0;  } @@ -349,7 +409,7 @@ iot_readlink (call_frame_t *frame,                  return 0;          } -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +        iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);          return 0;  } @@ -394,7 +454,7 @@ iot_mknod (call_frame_t *frame,                  return 0;          } -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +        iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);          return 0;  } @@ -438,7 +498,7 @@ iot_mkdir (call_frame_t *frame,                  return 0;          } -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +        iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);          return 0;  } @@ -477,7 +537,7 @@ iot_rmdir (call_frame_t *frame,                  return 0;          } -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +        iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);          return 0;  } @@ -520,16 +580,7 @@ iot_symlink (call_frame_t *frame,                  return 0;          } -        /* Passing loc->inode does not make sense right now. -         * Why? because passing -         * loc->inode makes the request get ordered on the target -         * file's thread, while we shouldnt really worry because this -         * operation will not change the target in loc. For now, know -         * that this works. Such requests, which operate on a new -         * file/link, such as that in linkname, will be sent to a pool of -         * requests meant specifically for meta-data requests. -         */ -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +        iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);          return 0;  } @@ -571,19 +622,8 @@ iot_rename (call_frame_t *frame,                  return 0;          } -        /* We should order this on the oldloc. rename() -         * allows the blocks of the newloc to be available till -         * the last process that might have it open, closes the file. -         * I suppose this is a trade-off and we weigh in favour of -         * ordering on oldloc because the client issuing a rename() -         * would expect the oldloc's contents to be available at the -         * new location after this request. rename()'s guarantee that -         * the current -         * newloc's block will not be released or over-written allows -         * any othe processes that have the newloc open, to continue -         * operating. -         */ -        iot_schedule ((iot_conf_t *)this->private, oldloc->inode, stub); +        iot_schedule_unordered ((iot_conf_t *)this->private, oldloc->inode, +                        stub);          return 0;  } @@ -627,7 +667,7 @@ iot_open (call_frame_t *frame,                  STACK_UNWIND (frame, -1, ENOMEM, NULL, 0);                  return 0;          } -	iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +	iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);  	return 0;  } @@ -684,7 +724,7 @@ iot_create (call_frame_t *frame,                  STACK_UNWIND (frame, -1, ENOMEM, NULL, 0);                  return 0;          } -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +        iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);          return 0;  } @@ -976,20 +1016,7 @@ iot_stat (call_frame_t *frame,  	call_stub_t *stub;  	fd_t *fd = NULL; -	fd = fd_lookup (loc->inode, frame->root->pid); - -	if (fd == NULL) { -		STACK_WIND(frame, -			   iot_stat_cbk, -			   FIRST_CHILD(this), -			   FIRST_CHILD(this)->fops->stat, -			   loc); -		return 0; -	}  -   -	fd_unref (fd); - -	stub = fop_stat_stub (frame, +        stub = fop_stat_stub (frame,  			      iot_stat_wrapper,  			      loc);  	if (!stub) { @@ -997,7 +1024,17 @@ iot_stat (call_frame_t *frame,  		STACK_UNWIND (frame, -1, ENOMEM, NULL);  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + +	fd = fd_lookup (loc->inode, frame->root->pid); +        /* File is not open, so we can send it through unordered pool. +         */ +	if (fd == NULL) +                iot_schedule_unordered ((iot_conf_t *)this->private, +                                loc->inode, stub); +        else { +                iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +	        fd_unref (fd); +        }  	return 0;  } @@ -1083,22 +1120,8 @@ iot_truncate (call_frame_t *frame,  {  	call_stub_t *stub;  	fd_t *fd = NULL; -   -	fd = fd_lookup (loc->inode, frame->root->pid); -	if (fd == NULL) { -		STACK_WIND(frame, -			   iot_truncate_cbk, -			   FIRST_CHILD(this), -			   FIRST_CHILD(this)->fops->truncate, -			   loc, -			   offset); -		return 0; -	}  -   -	fd_unref (fd); - -	stub = fop_truncate_stub (frame, +        stub = fop_truncate_stub (frame,  				  iot_truncate_wrapper,  				  loc,  				  offset); @@ -1107,7 +1130,15 @@ iot_truncate (call_frame_t *frame,  		STACK_UNWIND (frame, -1, ENOMEM, NULL);  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + +	fd = fd_lookup (loc->inode, frame->root->pid); +	if (fd == NULL) +                iot_schedule_unordered ((iot_conf_t *)this->private, +                                loc->inode, stub); +        else { +                iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +	        fd_unref (fd); +        }  	return 0;  } @@ -1196,20 +1227,6 @@ iot_utimens (call_frame_t *frame,  {  	call_stub_t *stub;  	fd_t *fd = NULL; -   -	fd = fd_lookup (loc->inode, frame->root->pid); - -	if (fd == NULL) { -		STACK_WIND(frame, -			   iot_utimens_cbk, -			   FIRST_CHILD(this), -			   FIRST_CHILD(this)->fops->utimens, -			   loc, -			   tv); -		return 0; -	}  -   -	fd_unref (fd);  	stub = fop_utimens_stub (frame,  				 iot_utimens_wrapper, @@ -1220,7 +1237,15 @@ iot_utimens (call_frame_t *frame,  		STACK_UNWIND (frame, -1, ENOMEM, NULL);  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + +        fd = fd_lookup (loc->inode, frame->root->pid); +        if (fd == NULL) +                iot_schedule_unordered ((iot_conf_t *)this->private, +                                loc->inode, stub); +        else { +                iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +	        fd_unref (fd); +        }  	return 0;  } @@ -1272,7 +1297,7 @@ iot_checksum (call_frame_t *frame,  		STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL);  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +        iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);  	return 0;  } @@ -1315,15 +1340,25 @@ iot_unlink (call_frame_t *frame,  		STACK_UNWIND (frame, -1, ENOMEM);  		return 0;  	} -        iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +        iot_schedule_unordered((iot_conf_t *)this->private, loc->inode, stub);  	return 0;  } +/* Must be called with worker lock held */ +void +_iot_queue (iot_worker_t *worker, +                iot_request_t *req) +{ +        list_add_tail (&req->list, &worker->rqlist); -static void -iot_queue (iot_worker_t *worker, -           call_stub_t *stub) +        /* dq_cond */ +        worker->queue_size++; +        pthread_cond_broadcast (&worker->dq_cond); +} + +iot_request_t * +iot_init_request (call_stub_t *stub)  {  	iot_request_t   *req = NULL; @@ -1331,13 +1366,19 @@ iot_queue (iot_worker_t *worker,          ERR_ABORT (req);          req->stub = stub; +        return req; +} + +static void +iot_queue (iot_worker_t *worker, +           call_stub_t *stub) +{ +        iot_request_t   *req = NULL; + +        req = iot_init_request (stub);          pthread_mutex_lock (&worker->qlock);          { -                list_add_tail (&req->list, &worker->rqlist); - -                /* dq_cond */ -                worker->queue_size++; -                pthread_cond_broadcast (&worker->dq_cond); +                _iot_queue (worker, req);          }  	pthread_mutex_unlock (&worker->qlock);  } @@ -1380,6 +1421,135 @@ iot_worker (void *arg)  	}  } +/* Must be called with worker lock held. */ +int +iot_can_unordered_exit (iot_worker_t * worker) +{ +        int             allow_exit = 0; +        iot_conf_t      *conf = NULL; + +        conf = worker->conf; +        if (worker->queue_size > 0) +                goto decided; + +        /* We dont want this thread to exit if its index is +         * below the min thread count. +         */ +        if (worker->thread_idx >= conf->min_u_threads) +                allow_exit = 1; + +decided: +        return allow_exit; +} + +int +iot_unordered_exit (iot_worker_t *worker) +{ +        int     allow_exit = 0; + +        /* It is possible that since the last time we timed out while +         * waiting for a request, a new request has been added to this +         * worker's request queue. Before we really exit, we must +         * check for those requests. +         */ +        pthread_mutex_lock (&worker->qlock); +        { +                allow_exit = iot_can_unordered_exit (worker); + +                if (allow_exit) { +                        worker->state = IOT_STATE_DEAD; +                        worker->thread = 0; +                } +       } +        pthread_mutex_unlock (&worker->qlock); + +        return allow_exit; +} + + +int +iot_request_wait_idleness (iot_worker_t * worker) +{ +        struct timeval tv; +        struct timespec ts; +        int waitres = 0; + +        gettimeofday (&tv, NULL); +        ts.tv_sec = tv.tv_sec + worker->conf->u_idle_time; +        /* Slightly skew the idle time for threads so that, we dont +         * have all of them rushing to exit at the same time, if +         * they've been idle. +         */ +        ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000; +        waitres = pthread_cond_timedwait (&worker->dq_cond,&worker->qlock, +                        &ts); +        if (waitres == ETIMEDOUT) +                return -1; + +        return 0; +} + + +call_stub_t * +iot_dequeue_unordered (iot_worker_t *worker) +{ +        call_stub_t     *stub= NULL; +        iot_request_t   *req = NULL; +        int             waitstat = 0; + +	pthread_mutex_lock (&worker->qlock); +        { +                while (!worker->queue_size) { +                        waitstat = 0; +                        waitstat = iot_request_wait_idleness (worker); +                        /* If -1, request wait must've timed +                         * out. +                         */ +                        if (waitstat == -1) +                                goto out; +                } + +                list_for_each_entry (req, &worker->rqlist, list) +                        break; +                list_del (&req->list); +                stub = req->stub; + +                worker->queue_size--; +        } +out: +	pthread_mutex_unlock (&worker->qlock); + +	FREE (req); + +	return stub; +} + + +void * +iot_worker_unordered (void *arg) +{ +        iot_worker_t    *worker = arg; +        call_stub_t     *stub = NULL; + +	while (1) { + +		stub = iot_dequeue_unordered (worker); +                /* If no request was received, we must've timed out, +                 * if so, check if we can exit. +                 */ +                if (stub == NULL) { +                        if (iot_unordered_exit (worker)) +                                break; +                        else +                                continue; +                } + +		call_resume (stub); +	} +        return NULL; +} + +  static iot_worker_t **  allocate_worker_array (int count)  { @@ -1403,32 +1573,57 @@ allocate_worker (iot_conf_t * conf)          wrk->conf = conf;          pthread_cond_init (&wrk->dq_cond, NULL);          pthread_mutex_init (&wrk->qlock, NULL); +        wrk->state = IOT_STATE_DEAD;          return wrk;  }  static void  allocate_workers (iot_conf_t *conf, -                int count, -                int start_alloc_idx) +                iot_worker_t ** workers, +                int start_alloc_idx, +                int count)  {          int     i, end_count;          end_count = count + start_alloc_idx;          for (i = start_alloc_idx; i < end_count; i++) { -                conf->workers[i] = allocate_worker (conf); -                pthread_create (&conf->workers[i]->thread, NULL, iot_worker, -                                conf->workers[i]); +                workers[i] = allocate_worker (conf); +                workers[i]->thread_idx = i; +        } +} + + +void +iot_startup_workers (iot_worker_t **workers, int start_idx, int count, +                iot_worker_fn workerfunc) +{ +        int     i = 0; +        int     end_idx = 0; + +        end_idx = start_idx + count; +        for (i = start_idx; i < end_idx; i++) { +                workers[i]->state = IOT_STATE_ACTIVE; +                pthread_create (&workers[i]->thread, NULL, workerfunc, +                                workers[i]);          } +  }  static void  workers_init (iot_conf_t *conf)  {          conf->workers = allocate_worker_array (conf->thread_count); -        allocate_workers (conf, conf->thread_count, 0); -} +        allocate_workers (conf, conf->workers, 0, conf->thread_count); +        /* Initialize un-ordered workers */ +        conf->uworkers = allocate_worker_array (conf->max_u_threads); +        allocate_workers (conf, conf->uworkers, 0, conf->max_u_threads); + +        iot_startup_workers (conf->workers, 0, conf->thread_count, iot_worker); +        iot_startup_workers (conf->uworkers, 0, conf->min_u_threads, +                        iot_worker_unordered); +}  int32_t  @@ -1463,6 +1658,14 @@ init (xlator_t *this)  			conf->thread_count);  	} +        /* Init params for un-ordered workers. These should be got from +         * the volfile options. +         */ +        pthread_mutex_init (&conf->utlock, NULL); +        conf->max_u_threads = IOT_MAX_THREADS; +        conf->min_u_threads = IOT_MIN_THREADS; +        conf->u_idle_time = IOT_DEFAULT_IDLE; +  	workers_init (conf);  	this->private = conf; @@ -1501,13 +1704,13 @@ struct xlator_fops fops = {  	.ftruncate   = iot_ftruncate,   /* O */  	.utimens     = iot_utimens,     /* V */  	.checksum    = iot_checksum,    /* U */ -	.unlink      = iot_unlink,      /* V */ +	.unlink      = iot_unlink,      /* U */          .lookup      = iot_lookup,      /* U */          .chmod       = iot_chmod,       /* V */          .fchmod      = iot_fchmod,      /* O */          .chown       = iot_chown,       /* V */          .fchown      = iot_fchown,      /* O */ -        .access      = iot_access,      /* V */ +        .access      = iot_access,      /* U */          .readlink    = iot_readlink,    /* U */          .mknod       = iot_mknod,       /* U */          .mkdir       = iot_mkdir,       /* U */ diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index 7975529007a..f02b641f415 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -33,6 +33,7 @@  #include "xlator.h"  #include "common-utils.h"  #include "list.h" +#include <stdlib.h>  #define min(a,b) ((a)<(b)?(a):(b))  #define max(a,b) ((a)>(b)?(a):(b)) @@ -48,6 +49,17 @@ struct iot_request {    call_stub_t *stub;  }; +#define IOT_STATE_ACTIVE        1 +#define IOT_STATE_DEAD          2 +#define iot_worker_active(wrk)  ((wrk)->state == IOT_STATE_ACTIVE) + +#define MAX_IDLE_SKEW                   1000    /* usecs */ +#define skew_usec_idle_time(usec)       ((usec) + (random () % MAX_IDLE_SKEW)) +#define IOT_DEFAULT_IDLE                180     /* In secs. */ + +#define IOT_MIN_THREADS         32 +#define IOT_MAX_THREADS         512 +  struct iot_worker {    struct list_head rqlist;      /* List of requests assigned to me. */    struct iot_conf *conf; @@ -56,11 +68,32 @@ struct iot_worker {    pthread_mutex_t qlock;    int32_t queue_size;    pthread_t thread; +  int state;            /* What state is the thread in. */ +  int thread_idx;       /* Thread's index into the worker array. Since this +                         will be thread local data, for ensuring that number +                         of threads dont fall below a minimum, we just dont +                         allow threads with specific indices to exit. +                         Helps us in eliminating one place where otherwise +                         a lock would have been required to update centralized +                         state inside conf. +                         */  };  struct iot_conf {    int32_t thread_count;    struct iot_worker ** workers; + +  pthread_mutex_t utlock;       /* Used for scaling un-ordered threads. */ +  struct iot_worker **uworkers; /* Un-ordered thread pool. */ +  int max_u_threads;            /* Number of unordered threads will not be +                                   higher than this. +                                   */ +  int min_u_threads;            /* Number of unordered threads should not +                                   fall below this value. */ +  int u_idle_time;              /* If an unordered thread does not get a +                                   request for this amount of secs, it should +                                   try to die. +                                   */  };  typedef struct iot_conf iot_conf_t;  | 
