summaryrefslogtreecommitdiffstats
path: root/xlators/performance/io-threads/src/io-threads.c
diff options
context:
space:
mode:
authorShehjar Tikoo <shehjart@zresearch.com>2009-04-01 13:59:24 -0700
committerAnand V. Avati <avati@amp.gluster.com>2009-04-02 19:19:14 +0530
commitee79908d3b577c061b497e35481b8d1523502077 (patch)
tree9397ee47bcfe31cceae0014423ebfe8ee5b59bfe /xlators/performance/io-threads/src/io-threads.c
parente27f7f344e12d0885a48fcca8dfce36440f5e9e8 (diff)
io-threads: Add un-ordered thread-pool.
This commit adds everything needed to: a. Get un-ordered request going through the un-ordered thread-pool. This happens through, the iot_schedule_unordered(..). The unordered thread-pool consists of thread running the iot_worker_unordered(..) function. b. Make threads in the un-ordered thread pool start-up and exit depending on the thread state. Note that at this point the requests that need ordering are still going through iot_schedule(..). Signed-off-by: Anand V. Avati <avati@amp.gluster.com>
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
-rw-r--r--xlators/performance/io-threads/src/io-threads.c399
1 files changed, 301 insertions, 98 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index 59fc642ca..899702f1b 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -28,6 +28,11 @@
#include "dict.h"
#include "xlator.h"
#include "io-threads.h"
+#include <stdlib.h>
+#include <sys/time.h>
+#include <time.h>
+
+typedef void *(*iot_worker_fn)(void*);
static void
iot_queue (iot_worker_t *worker,
@@ -36,6 +41,44 @@ iot_queue (iot_worker_t *worker,
static call_stub_t *
iot_dequeue (iot_worker_t *worker);
+void _iot_queue (iot_worker_t *worker, iot_request_t *req);
+iot_request_t * iot_init_request (call_stub_t *stub);
+void iot_startup_workers (iot_worker_t **workers, int start_idx, int count,
+ iot_worker_fn workerfunc);
+void * iot_worker_unordered (void *arg);
+
+void
+iot_schedule_unordered (iot_conf_t *conf,
+ inode_t *inode,
+ call_stub_t *stub)
+{
+ int32_t idx = 0;
+ iot_worker_t *selected_worker = NULL;
+ iot_request_t *req = NULL;
+
+ /* First decide which thread will service the request.
+ * FIXME: This should change into some form of load-balancing.
+ * */
+ idx = (random() % conf->max_u_threads);
+ selected_worker = conf->uworkers[idx];
+
+ req = iot_init_request (stub);
+ /* Having decided that, we must check whether the thread is
+ * active at all.
+ */
+ pthread_mutex_lock (&selected_worker->qlock);
+ {
+ if (iot_worker_active (selected_worker))
+ _iot_queue (selected_worker, req);
+ else {
+ iot_startup_workers (conf->uworkers, idx, 1,
+ iot_worker_unordered);
+ _iot_queue (selected_worker, req);
+ }
+ }
+ pthread_mutex_unlock (&selected_worker->qlock);
+}
+
static void
iot_schedule (iot_conf_t *conf,
inode_t *inode,
@@ -92,7 +135,7 @@ iot_lookup (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -126,6 +169,7 @@ iot_chmod (call_frame_t *frame,
mode_t mode)
{
call_stub_t *stub = NULL;
+ fd_t *fd = NULL;
stub = fop_chmod_stub (frame, iot_chmod_wrapper, loc, mode);
if (!stub) {
@@ -133,7 +177,15 @@ iot_chmod (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL);
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+
+ fd = fd_lookup (loc->inode, frame->root->pid);
+ if (fd == NULL)
+ iot_schedule_unordered ((iot_conf_t *)this->private,
+ loc->inode, stub);
+ else {
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ fd_unref (fd);
+ }
return 0;
}
@@ -211,6 +263,7 @@ iot_chown (call_frame_t *frame,
gid_t gid)
{
call_stub_t *stub = NULL;
+ fd_t *fd = NULL;
stub = fop_chown_stub (frame, iot_chown_wrapper, loc, uid, gid);
if (!stub) {
@@ -219,7 +272,14 @@ iot_chown (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ fd = fd_lookup (loc->inode, frame->root->pid);
+ if (fd == NULL)
+ iot_schedule_unordered ((iot_conf_t *)this->private,
+ loc->inode, stub);
+ else {
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ fd_unref (fd);
+ }
return 0;
}
@@ -306,7 +366,7 @@ iot_access (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -349,7 +409,7 @@ iot_readlink (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -394,7 +454,7 @@ iot_mknod (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -438,7 +498,7 @@ iot_mkdir (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -477,7 +537,7 @@ iot_rmdir (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -520,16 +580,7 @@ iot_symlink (call_frame_t *frame,
return 0;
}
- /* Passing loc->inode does not make sense right now.
- * Why? because passing
- * loc->inode makes the request get ordered on the target
- * file's thread, while we shouldnt really worry because this
- * operation will not change the target in loc. For now, know
- * that this works. Such requests, which operate on a new
- * file/link, such as that in linkname, will be sent to a pool of
- * requests meant specifically for meta-data requests.
- */
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -571,19 +622,8 @@ iot_rename (call_frame_t *frame,
return 0;
}
- /* We should order this on the oldloc. rename()
- * allows the blocks of the newloc to be available till
- * the last process that might have it open, closes the file.
- * I suppose this is a trade-off and we weigh in favour of
- * ordering on oldloc because the client issuing a rename()
- * would expect the oldloc's contents to be available at the
- * new location after this request. rename()'s guarantee that
- * the current
- * newloc's block will not be released or over-written allows
- * any othe processes that have the newloc open, to continue
- * operating.
- */
- iot_schedule ((iot_conf_t *)this->private, oldloc->inode, stub);
+ iot_schedule_unordered ((iot_conf_t *)this->private, oldloc->inode,
+ stub);
return 0;
}
@@ -627,7 +667,7 @@ iot_open (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL, 0);
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -684,7 +724,7 @@ iot_create (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL, 0);
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -976,20 +1016,7 @@ iot_stat (call_frame_t *frame,
call_stub_t *stub;
fd_t *fd = NULL;
- fd = fd_lookup (loc->inode, frame->root->pid);
-
- if (fd == NULL) {
- STACK_WIND(frame,
- iot_stat_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->stat,
- loc);
- return 0;
- }
-
- fd_unref (fd);
-
- stub = fop_stat_stub (frame,
+ stub = fop_stat_stub (frame,
iot_stat_wrapper,
loc);
if (!stub) {
@@ -997,7 +1024,17 @@ iot_stat (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL);
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+
+ fd = fd_lookup (loc->inode, frame->root->pid);
+ /* File is not open, so we can send it through unordered pool.
+ */
+ if (fd == NULL)
+ iot_schedule_unordered ((iot_conf_t *)this->private,
+ loc->inode, stub);
+ else {
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ fd_unref (fd);
+ }
return 0;
}
@@ -1083,22 +1120,8 @@ iot_truncate (call_frame_t *frame,
{
call_stub_t *stub;
fd_t *fd = NULL;
-
- fd = fd_lookup (loc->inode, frame->root->pid);
- if (fd == NULL) {
- STACK_WIND(frame,
- iot_truncate_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->truncate,
- loc,
- offset);
- return 0;
- }
-
- fd_unref (fd);
-
- stub = fop_truncate_stub (frame,
+ stub = fop_truncate_stub (frame,
iot_truncate_wrapper,
loc,
offset);
@@ -1107,7 +1130,15 @@ iot_truncate (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL);
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+
+ fd = fd_lookup (loc->inode, frame->root->pid);
+ if (fd == NULL)
+ iot_schedule_unordered ((iot_conf_t *)this->private,
+ loc->inode, stub);
+ else {
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ fd_unref (fd);
+ }
return 0;
}
@@ -1196,20 +1227,6 @@ iot_utimens (call_frame_t *frame,
{
call_stub_t *stub;
fd_t *fd = NULL;
-
- fd = fd_lookup (loc->inode, frame->root->pid);
-
- if (fd == NULL) {
- STACK_WIND(frame,
- iot_utimens_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->utimens,
- loc,
- tv);
- return 0;
- }
-
- fd_unref (fd);
stub = fop_utimens_stub (frame,
iot_utimens_wrapper,
@@ -1220,7 +1237,15 @@ iot_utimens (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL);
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+
+ fd = fd_lookup (loc->inode, frame->root->pid);
+ if (fd == NULL)
+ iot_schedule_unordered ((iot_conf_t *)this->private,
+ loc->inode, stub);
+ else {
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ fd_unref (fd);
+ }
return 0;
}
@@ -1272,7 +1297,7 @@ iot_checksum (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL);
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -1315,15 +1340,25 @@ iot_unlink (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM);
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_unordered((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
+/* Must be called with worker lock held */
+void
+_iot_queue (iot_worker_t *worker,
+ iot_request_t *req)
+{
+ list_add_tail (&req->list, &worker->rqlist);
-static void
-iot_queue (iot_worker_t *worker,
- call_stub_t *stub)
+ /* dq_cond */
+ worker->queue_size++;
+ pthread_cond_broadcast (&worker->dq_cond);
+}
+
+iot_request_t *
+iot_init_request (call_stub_t *stub)
{
iot_request_t *req = NULL;
@@ -1331,13 +1366,19 @@ iot_queue (iot_worker_t *worker,
ERR_ABORT (req);
req->stub = stub;
+ return req;
+}
+
+static void
+iot_queue (iot_worker_t *worker,
+ call_stub_t *stub)
+{
+ iot_request_t *req = NULL;
+
+ req = iot_init_request (stub);
pthread_mutex_lock (&worker->qlock);
{
- list_add_tail (&req->list, &worker->rqlist);
-
- /* dq_cond */
- worker->queue_size++;
- pthread_cond_broadcast (&worker->dq_cond);
+ _iot_queue (worker, req);
}
pthread_mutex_unlock (&worker->qlock);
}
@@ -1380,6 +1421,135 @@ iot_worker (void *arg)
}
}
+/* Must be called with worker lock held. */
+int
+iot_can_unordered_exit (iot_worker_t * worker)
+{
+ int allow_exit = 0;
+ iot_conf_t *conf = NULL;
+
+ conf = worker->conf;
+ if (worker->queue_size > 0)
+ goto decided;
+
+ /* We dont want this thread to exit if its index is
+ * below the min thread count.
+ */
+ if (worker->thread_idx >= conf->min_u_threads)
+ allow_exit = 1;
+
+decided:
+ return allow_exit;
+}
+
+int
+iot_unordered_exit (iot_worker_t *worker)
+{
+ int allow_exit = 0;
+
+ /* It is possible that since the last time we timed out while
+ * waiting for a request, a new request has been added to this
+ * worker's request queue. Before we really exit, we must
+ * check for those requests.
+ */
+ pthread_mutex_lock (&worker->qlock);
+ {
+ allow_exit = iot_can_unordered_exit (worker);
+
+ if (allow_exit) {
+ worker->state = IOT_STATE_DEAD;
+ worker->thread = 0;
+ }
+ }
+ pthread_mutex_unlock (&worker->qlock);
+
+ return allow_exit;
+}
+
+
+int
+iot_request_wait_idleness (iot_worker_t * worker)
+{
+ struct timeval tv;
+ struct timespec ts;
+ int waitres = 0;
+
+ gettimeofday (&tv, NULL);
+ ts.tv_sec = tv.tv_sec + worker->conf->u_idle_time;
+ /* Slightly skew the idle time for threads so that, we dont
+ * have all of them rushing to exit at the same time, if
+ * they've been idle.
+ */
+ ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000;
+ waitres = pthread_cond_timedwait (&worker->dq_cond,&worker->qlock,
+ &ts);
+ if (waitres == ETIMEDOUT)
+ return -1;
+
+ return 0;
+}
+
+
+call_stub_t *
+iot_dequeue_unordered (iot_worker_t *worker)
+{
+ call_stub_t *stub= NULL;
+ iot_request_t *req = NULL;
+ int waitstat = 0;
+
+ pthread_mutex_lock (&worker->qlock);
+ {
+ while (!worker->queue_size) {
+ waitstat = 0;
+ waitstat = iot_request_wait_idleness (worker);
+ /* If -1, request wait must've timed
+ * out.
+ */
+ if (waitstat == -1)
+ goto out;
+ }
+
+ list_for_each_entry (req, &worker->rqlist, list)
+ break;
+ list_del (&req->list);
+ stub = req->stub;
+
+ worker->queue_size--;
+ }
+out:
+ pthread_mutex_unlock (&worker->qlock);
+
+ FREE (req);
+
+ return stub;
+}
+
+
+void *
+iot_worker_unordered (void *arg)
+{
+ iot_worker_t *worker = arg;
+ call_stub_t *stub = NULL;
+
+ while (1) {
+
+ stub = iot_dequeue_unordered (worker);
+ /* If no request was received, we must've timed out,
+ * if so, check if we can exit.
+ */
+ if (stub == NULL) {
+ if (iot_unordered_exit (worker))
+ break;
+ else
+ continue;
+ }
+
+ call_resume (stub);
+ }
+ return NULL;
+}
+
+
static iot_worker_t **
allocate_worker_array (int count)
{
@@ -1403,32 +1573,57 @@ allocate_worker (iot_conf_t * conf)
wrk->conf = conf;
pthread_cond_init (&wrk->dq_cond, NULL);
pthread_mutex_init (&wrk->qlock, NULL);
+ wrk->state = IOT_STATE_DEAD;
return wrk;
}
static void
allocate_workers (iot_conf_t *conf,
- int count,
- int start_alloc_idx)
+ iot_worker_t ** workers,
+ int start_alloc_idx,
+ int count)
{
int i, end_count;
end_count = count + start_alloc_idx;
for (i = start_alloc_idx; i < end_count; i++) {
- conf->workers[i] = allocate_worker (conf);
- pthread_create (&conf->workers[i]->thread, NULL, iot_worker,
- conf->workers[i]);
+ workers[i] = allocate_worker (conf);
+ workers[i]->thread_idx = i;
+ }
+}
+
+
+void
+iot_startup_workers (iot_worker_t **workers, int start_idx, int count,
+ iot_worker_fn workerfunc)
+{
+ int i = 0;
+ int end_idx = 0;
+
+ end_idx = start_idx + count;
+ for (i = start_idx; i < end_idx; i++) {
+ workers[i]->state = IOT_STATE_ACTIVE;
+ pthread_create (&workers[i]->thread, NULL, workerfunc,
+ workers[i]);
}
+
}
static void
workers_init (iot_conf_t *conf)
{
conf->workers = allocate_worker_array (conf->thread_count);
- allocate_workers (conf, conf->thread_count, 0);
-}
+ allocate_workers (conf, conf->workers, 0, conf->thread_count);
+ /* Initialize un-ordered workers */
+ conf->uworkers = allocate_worker_array (conf->max_u_threads);
+ allocate_workers (conf, conf->uworkers, 0, conf->max_u_threads);
+
+ iot_startup_workers (conf->workers, 0, conf->thread_count, iot_worker);
+ iot_startup_workers (conf->uworkers, 0, conf->min_u_threads,
+ iot_worker_unordered);
+}
int32_t
@@ -1463,6 +1658,14 @@ init (xlator_t *this)
conf->thread_count);
}
+ /* Init params for un-ordered workers. These should be got from
+ * the volfile options.
+ */
+ pthread_mutex_init (&conf->utlock, NULL);
+ conf->max_u_threads = IOT_MAX_THREADS;
+ conf->min_u_threads = IOT_MIN_THREADS;
+ conf->u_idle_time = IOT_DEFAULT_IDLE;
+
workers_init (conf);
this->private = conf;
@@ -1501,13 +1704,13 @@ struct xlator_fops fops = {
.ftruncate = iot_ftruncate, /* O */
.utimens = iot_utimens, /* V */
.checksum = iot_checksum, /* U */
- .unlink = iot_unlink, /* V */
+ .unlink = iot_unlink, /* U */
.lookup = iot_lookup, /* U */
.chmod = iot_chmod, /* V */
.fchmod = iot_fchmod, /* O */
.chown = iot_chown, /* V */
.fchown = iot_fchown, /* O */
- .access = iot_access, /* V */
+ .access = iot_access, /* U */
.readlink = iot_readlink, /* U */
.mknod = iot_mknod, /* U */
.mkdir = iot_mkdir, /* U */