summaryrefslogtreecommitdiffstats
path: root/xlators/performance/io-threads/src/io-threads.c
diff options
context:
space:
mode:
authorShehjar Tikoo <shehjart@zresearch.com>2009-04-01 13:59:43 -0700
committerAnand V. Avati <avati@amp.gluster.com>2009-04-02 19:19:18 +0530
commitb64fa35870b54b8d9e62afd71c92e35b1f7a4cfd (patch)
tree813474ecff343fb2cb9dca2fa11890aee32507be /xlators/performance/io-threads/src/io-threads.c
parentee79908d3b577c061b497e35481b8d1523502077 (diff)
io-threads: Add ordered threadpool state and code
Now we have the remaining fops going through the ordered thread-pool. To route a request through ordered thread, we use iot_schedule_ordered(..) and the worker thread for ordered requests is iot_worker_ordered(..) Signed-off-by: Anand V. Avati <avati@amp.gluster.com>
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
-rw-r--r--xlators/performance/io-threads/src/io-threads.c220
1 files changed, 162 insertions, 58 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index 899702f1b..1d255d972 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -34,18 +34,12 @@
typedef void *(*iot_worker_fn)(void*);
-static void
-iot_queue (iot_worker_t *worker,
- call_stub_t *stub);
-
-static call_stub_t *
-iot_dequeue (iot_worker_t *worker);
-
void _iot_queue (iot_worker_t *worker, iot_request_t *req);
iot_request_t * iot_init_request (call_stub_t *stub);
void iot_startup_workers (iot_worker_t **workers, int start_idx, int count,
iot_worker_fn workerfunc);
void * iot_worker_unordered (void *arg);
+void * iot_worker_ordered (void *arg);
void
iot_schedule_unordered (iot_conf_t *conf,
@@ -79,19 +73,49 @@ iot_schedule_unordered (iot_conf_t *conf,
pthread_mutex_unlock (&selected_worker->qlock);
}
-static void
-iot_schedule (iot_conf_t *conf,
+void
+iot_schedule_ordered (iot_conf_t *conf,
inode_t *inode,
call_stub_t *stub)
{
- int32_t idx = 0;
+ uint64_t idx = 0;
iot_worker_t *selected_worker = NULL;
+ iot_request_t * req = NULL;
+ int ctxret = 0;
- if (inode)
- idx = (inode->ino % conf->thread_count);
- selected_worker = conf->workers[idx];
-
- iot_queue (selected_worker, stub);
+ if (inode == NULL) {
+ gf_log (conf->this->name, GF_LOG_ERROR,
+ "Got NULL inode for ordered request");
+ return;
+ }
+ req = iot_init_request (stub);
+ LOCK (&inode->lock);
+ {
+ ctxret = __inode_ctx_get (inode, conf->this, &idx);
+ if (ctxret < 0) {
+ idx = (random () % conf->max_o_threads);
+ __inode_ctx_put (inode, conf->this, idx);
+ }
+ /* inode lock once acquired, cannot be left here
+ * because other gluster main threads might be
+ * contending on it to append a request for this file.
+ * So we'll also leave the lock only after we've
+ * added the request to the worker queue.
+ */
+ selected_worker = conf->oworkers[idx];
+ pthread_mutex_lock (&selected_worker->qlock);
+ {
+ if (iot_worker_active (selected_worker))
+ _iot_queue (selected_worker, req);
+ else {
+ iot_startup_workers (conf->oworkers, idx, 1,
+ iot_worker_ordered);
+ _iot_queue (selected_worker, req);
+ }
+ }
+ pthread_mutex_unlock (&selected_worker->qlock);
+ }
+ UNLOCK (&inode->lock);
}
int32_t
@@ -183,7 +207,8 @@ iot_chmod (call_frame_t *frame,
iot_schedule_unordered ((iot_conf_t *)this->private,
loc->inode, stub);
else {
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode,
+ stub);
fd_unref (fd);
}
return 0;
@@ -227,7 +252,7 @@ iot_fchmod (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -277,7 +302,8 @@ iot_chown (call_frame_t *frame,
iot_schedule_unordered ((iot_conf_t *)this->private,
loc->inode, stub);
else {
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode,
+ stub);
fd_unref (fd);
}
@@ -324,7 +350,7 @@ iot_fchown (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -780,7 +806,7 @@ iot_readv (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -823,7 +849,7 @@ iot_flush (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -870,7 +896,7 @@ iot_fsync (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -923,7 +949,7 @@ iot_writev (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -977,7 +1003,7 @@ iot_lk (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -1032,7 +1058,8 @@ iot_stat (call_frame_t *frame,
iot_schedule_unordered ((iot_conf_t *)this->private,
loc->inode, stub);
else {
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode,
+ stub);
fd_unref (fd);
}
@@ -1080,7 +1107,7 @@ iot_fstat (call_frame_t *frame,
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -1136,7 +1163,8 @@ iot_truncate (call_frame_t *frame,
iot_schedule_unordered ((iot_conf_t *)this->private,
loc->inode, stub);
else {
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode,
+ stub);
fd_unref (fd);
}
@@ -1186,7 +1214,7 @@ iot_ftruncate (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL);
return 0;
}
- iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -1243,7 +1271,7 @@ iot_utimens (call_frame_t *frame,
iot_schedule_unordered ((iot_conf_t *)this->private,
loc->inode, stub);
else {
- iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ iot_schedule_ordered ((iot_conf_t *)this->private, loc->inode, stub);
fd_unref (fd);
}
@@ -1369,30 +1397,88 @@ iot_init_request (call_stub_t *stub)
return req;
}
-static void
-iot_queue (iot_worker_t *worker,
- call_stub_t *stub)
+/* Must be called with worker lock held. */
+int
+iot_can_ordered_exit (iot_worker_t * worker)
{
- iot_request_t *req = NULL;
+ int allow_exit = 0;
+ iot_conf_t *conf = NULL;
- req = iot_init_request (stub);
+ conf = worker->conf;
+ if (worker->queue_size > 0)
+ goto decided;
+
+ /* We dont want this thread to exit if its index is
+ * below the min thread count.
+ */
+ if (worker->thread_idx >= conf->min_o_threads)
+ allow_exit = 1;
+
+decided:
+ return allow_exit;
+}
+
+int
+iot_ordered_exit (iot_worker_t *worker)
+{
+ int allow_exit = 0;
+
+ /* It is possible that since the last time we timed out while
+ * waiting for a request, a new request has been added to this
+ * worker's request queue. Before we really exit, we must
+ * check for those requests.
+ */
pthread_mutex_lock (&worker->qlock);
{
- _iot_queue (worker, req);
- }
- pthread_mutex_unlock (&worker->qlock);
+ allow_exit = iot_can_ordered_exit (worker);
+
+ if (allow_exit) {
+ worker->state = IOT_STATE_DEAD;
+ worker->thread = 0;
+ }
+ }
+ pthread_mutex_unlock (&worker->qlock);
+
+ return allow_exit;
}
-static call_stub_t *
-iot_dequeue (iot_worker_t *worker)
+int
+iot_ordered_request_wait (iot_worker_t * worker)
{
- call_stub_t *stub = NULL;
- iot_request_t *req = NULL;
+ struct timeval tv;
+ struct timespec ts;
+ int waitres = 0;
+
+ gettimeofday (&tv, NULL);
+ ts.tv_sec = tv.tv_sec + worker->conf->o_idle_time;
+ /* Slightly skew the idle time for threads so that, we dont
+ * have all of them rushing to exit at the same time, if
+ * they've been idle.
+ */
+ ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000;
+ waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock,
+ &ts);
+ if (waitres == ETIMEDOUT)
+ return -1;
+
+ return 0;
+}
+
+call_stub_t *
+iot_dequeue_ordered (iot_worker_t *worker)
+{
+ call_stub_t *stub = NULL;
+ iot_request_t *req = NULL;
+ int waitstat = 0;
pthread_mutex_lock (&worker->qlock);
{
- while (!worker->queue_size)
- pthread_cond_wait (&worker->dq_cond, &worker->qlock);
+ while (!worker->queue_size) {
+ waitstat = 0;
+ waitstat = iot_ordered_request_wait (worker);
+ if (waitstat == -1)
+ goto out;
+ }
list_for_each_entry (req, &worker->rqlist, list)
break;
@@ -1401,6 +1487,7 @@ iot_dequeue (iot_worker_t *worker)
worker->queue_size--;
}
+out:
pthread_mutex_unlock (&worker->qlock);
FREE (req);
@@ -1408,17 +1495,25 @@ iot_dequeue (iot_worker_t *worker)
return stub;
}
-static void *
-iot_worker (void *arg)
+void *
+iot_worker_ordered (void *arg)
{
- iot_worker_t *worker = arg;
+ iot_worker_t *worker = arg;
+ call_stub_t *stub = NULL;
while (1) {
- call_stub_t *stub;
- stub = iot_dequeue (worker);
+ stub = iot_dequeue_ordered (worker);
+ if (stub == NULL) {
+ if (iot_ordered_exit (worker))
+ break;
+ else
+ continue;
+ }
call_resume (stub);
}
+
+ return NULL;
}
/* Must be called with worker lock held. */
@@ -1460,7 +1555,7 @@ iot_unordered_exit (iot_worker_t *worker)
worker->state = IOT_STATE_DEAD;
worker->thread = 0;
}
- }
+ }
pthread_mutex_unlock (&worker->qlock);
return allow_exit;
@@ -1468,11 +1563,11 @@ iot_unordered_exit (iot_worker_t *worker)
int
-iot_request_wait_idleness (iot_worker_t * worker)
+iot_unordered_request_wait (iot_worker_t * worker)
{
- struct timeval tv;
+ struct timeval tv;
struct timespec ts;
- int waitres = 0;
+ int waitres = 0;
gettimeofday (&tv, NULL);
ts.tv_sec = tv.tv_sec + worker->conf->u_idle_time;
@@ -1481,7 +1576,7 @@ iot_request_wait_idleness (iot_worker_t * worker)
* they've been idle.
*/
ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000;
- waitres = pthread_cond_timedwait (&worker->dq_cond,&worker->qlock,
+ waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock,
&ts);
if (waitres == ETIMEDOUT)
return -1;
@@ -1501,7 +1596,7 @@ iot_dequeue_unordered (iot_worker_t *worker)
{
while (!worker->queue_size) {
waitstat = 0;
- waitstat = iot_request_wait_idleness (worker);
+ waitstat = iot_unordered_request_wait (worker);
/* If -1, request wait must've timed
* out.
*/
@@ -1613,14 +1708,16 @@ iot_startup_workers (iot_worker_t **workers, int start_idx, int count,
static void
workers_init (iot_conf_t *conf)
{
- conf->workers = allocate_worker_array (conf->thread_count);
- allocate_workers (conf, conf->workers, 0, conf->thread_count);
-
/* Initialize un-ordered workers */
conf->uworkers = allocate_worker_array (conf->max_u_threads);
allocate_workers (conf, conf->uworkers, 0, conf->max_u_threads);
- iot_startup_workers (conf->workers, 0, conf->thread_count, iot_worker);
+ /* Initialize ordered workers */
+ conf->oworkers = allocate_worker_array (conf->max_o_threads);
+ allocate_workers (conf, conf->oworkers, 0, conf->max_o_threads);
+
+ iot_startup_workers (conf->oworkers, 0, conf->min_o_threads,
+ iot_worker_ordered);
iot_startup_workers (conf->uworkers, 0, conf->min_u_threads,
iot_worker_unordered);
}
@@ -1666,6 +1763,13 @@ init (xlator_t *this)
conf->min_u_threads = IOT_MIN_THREADS;
conf->u_idle_time = IOT_DEFAULT_IDLE;
+ /* Init params for ordered workers. */
+ pthread_mutex_init (&conf->otlock, NULL);
+ conf->max_o_threads = IOT_MAX_THREADS;
+ conf->min_o_threads = IOT_MIN_THREADS;
+ conf->o_idle_time = IOT_DEFAULT_IDLE;
+
+ conf->this = this;
workers_init (conf);
this->private = conf;