diff options
Diffstat (limited to 'xlators/performance/io-threads')
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 455 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 38 |
2 files changed, 124 insertions, 369 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 3d172f9b4..3bebb3185 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -36,21 +36,18 @@ iot_queue (iot_worker_t *worker, static call_stub_t * iot_dequeue (iot_worker_t *worker); -static iot_worker_t * +static void iot_schedule (iot_conf_t *conf, - iot_file_t *file, - ino_t ino) + inode_t *inode, + call_stub_t *stub) { - int32_t cnt = (ino % conf->thread_count); - iot_worker_t *trav = conf->workers.next; + int32_t idx = 0; + iot_worker_t *selected_worker = NULL; - for (; cnt; cnt--) - trav = trav->next; - - if (file) - file->worker = trav; - trav->fd_count++; - return trav; + idx = (inode->ino % conf->thread_count); + selected_worker = conf->workers[idx]; + + iot_queue (selected_worker, stub); } int32_t @@ -61,28 +58,22 @@ iot_open_cbk (call_frame_t *frame, int32_t op_errno, fd_t *fd) { - iot_conf_t *conf = this->private; - - if (op_ret >= 0) { - iot_file_t *file = CALLOC (1, sizeof (*file)); - ERR_ABORT (file); - - iot_schedule (conf, file, fd->inode->ino); - file->fd = fd; - - fd_ctx_set (fd, this, (uint64_t)(long)file); - - pthread_mutex_lock (&conf->files_lock); - file->next = &conf->files; - file->prev = file->next->prev; - file->next->prev = file; - file->prev->next = file; - pthread_mutex_unlock (&conf->files_lock); - } STACK_UNWIND (frame, op_ret, op_errno, fd); return 0; } +static int32_t +iot_open_wrapper (call_frame_t * frame, + xlator_t * this, + loc_t *loc, + int32_t flags, + fd_t * fd) +{ + STACK_WIND (frame, iot_open_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->open, loc, flags, fd); + return 0; +} + int32_t iot_open (call_frame_t *frame, xlator_t *this, @@ -90,13 +81,16 @@ iot_open (call_frame_t *frame, int32_t flags, fd_t *fd) { - STACK_WIND (frame, - iot_open_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->open, - loc, - flags, - fd); + call_stub_t *stub = NULL; + + stub = fop_open_stub (frame, iot_open_wrapper, loc, flags, fd); + if (!stub) { + gf_log (this->name, GF_LOG_ERROR, + "cannot get open call stub"); + STACK_UNWIND (frame, -1, ENOMEM, NULL, 0); + } + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + return 0; } @@ -111,30 +105,12 @@ iot_create_cbk (call_frame_t *frame, inode_t *inode, struct stat *stbuf) { - iot_conf_t *conf = this->private; - - if (op_ret >= 0) { - iot_file_t *file = CALLOC (1, sizeof (*file)); - ERR_ABORT (file); - - iot_schedule (conf, file, fd->inode->ino); - file->fd = fd; - - fd_ctx_set (fd, this, (uint64_t)(long)file); - - pthread_mutex_lock (&conf->files_lock); - file->next = &conf->files; - file->prev = file->next->prev; - file->next->prev = file; - file->prev->next = file; - pthread_mutex_unlock (&conf->files_lock); - } STACK_UNWIND (frame, op_ret, op_errno, fd, inode, stbuf); return 0; } int32_t -iot_create (call_frame_t *frame, +iot_create_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, @@ -152,7 +128,26 @@ iot_create (call_frame_t *frame, return 0; } +int32_t +iot_create (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + int32_t flags, + mode_t mode, + fd_t *fd) +{ + call_stub_t *stub = NULL; + stub = fop_create_stub (frame, iot_create_wrapper, loc, flags, mode, + fd); + if (!stub) { + gf_log (this->name, GF_LOG_ERROR, + "cannot get create call stub"); + STACK_UNWIND (frame, -1, ENOMEM, NULL, 0); + } + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + return 0; +} int32_t iot_readv_cbk (call_frame_t *frame, @@ -164,10 +159,6 @@ iot_readv_cbk (call_frame_t *frame, int32_t count, struct stat *stbuf) { - iot_local_t *local = frame->local; - - local->frame_size = 0; //iov_length (vector, count); - STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); return 0; @@ -198,25 +189,6 @@ iot_readv (call_frame_t *frame, off_t offset) { call_stub_t *stub; - iot_local_t *local = NULL; - iot_file_t *file = NULL; - iot_worker_t *worker = NULL; - uint64_t tmp_file = 0; - - if (fd_ctx_get (fd, this, &tmp_file)) { - gf_log (this->name, GF_LOG_ERROR, - "fd context is NULL, returning EBADFD"); - STACK_UNWIND (frame, -1, EBADFD); - return 0; - } - - file = (iot_file_t *)(long)tmp_file; - worker = file->worker; - - local = CALLOC (1, sizeof (*local)); - ERR_ABORT (local); - frame->local = local; - stub = fop_readv_stub (frame, iot_readv_wrapper, fd, @@ -229,8 +201,7 @@ iot_readv (call_frame_t *frame, return 0; } - iot_queue (worker, stub); - + iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -264,26 +235,6 @@ iot_flush (call_frame_t *frame, fd_t *fd) { call_stub_t *stub; - iot_local_t *local = NULL; - iot_file_t *file = NULL; - iot_worker_t *worker = NULL; - uint64_t tmp_file = 0; - - if (fd_ctx_get (fd, this, &tmp_file)) { - gf_log (this->name, GF_LOG_ERROR, - "fd context is NULL, returning EBADFD"); - STACK_UNWIND (frame, -1, EBADFD); - return 0; - } - - file = (iot_file_t *)(long)tmp_file; - worker = file->worker; - - local = CALLOC (1, sizeof (*local)); - ERR_ABORT (local); - - frame->local = local; - stub = fop_flush_stub (frame, iot_flush_wrapper, fd); @@ -292,8 +243,8 @@ iot_flush (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM); return 0; } - iot_queue (worker, stub); + iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -330,26 +281,6 @@ iot_fsync (call_frame_t *frame, int32_t datasync) { call_stub_t *stub; - iot_local_t *local = NULL; - iot_file_t *file = NULL; - iot_worker_t *worker = NULL; - uint64_t tmp_file = 0; - - if (fd_ctx_get (fd, this, &tmp_file)) { - gf_log (this->name, GF_LOG_ERROR, - "fd context is NULL, returning EBADFD"); - STACK_UNWIND (frame, -1, EBADFD); - return 0; - } - - file = (iot_file_t *)(long)tmp_file; - worker = file->worker; - - local = CALLOC (1, sizeof (*local)); - ERR_ABORT (local); - - frame->local = local; - stub = fop_fsync_stub (frame, iot_fsync_wrapper, fd, @@ -359,8 +290,8 @@ iot_fsync (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM); return 0; } - iot_queue (worker, stub); + iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -372,10 +303,6 @@ iot_writev_cbk (call_frame_t *frame, int32_t op_errno, struct stat *stbuf) { - iot_local_t *local = frame->local; - - local->frame_size = 0; /* hehe, caught me! */ - STACK_UNWIND (frame, op_ret, op_errno, stbuf); return 0; } @@ -408,30 +335,6 @@ iot_writev (call_frame_t *frame, off_t offset) { call_stub_t *stub; - iot_local_t *local = NULL; - iot_file_t *file = NULL; - iot_worker_t *worker = NULL; - uint64_t tmp_file = 0; - - if (fd_ctx_get (fd, this, &tmp_file)) { - gf_log (this->name, GF_LOG_ERROR, - "fd context is NULL, returning EBADFD"); - STACK_UNWIND (frame, -1, EBADFD); - return 0; - } - - file = (iot_file_t *)(long)tmp_file; - worker = file->worker; - - local = CALLOC (1, sizeof (*local)); - ERR_ABORT (local); - - if (frame->root->req_refs) - local->frame_size = dict_serialized_length (frame->root->req_refs); - else - local->frame_size = iov_length (vector, count); - frame->local = local; - stub = fop_writev_stub (frame, iot_writev_wrapper, fd, vector, count, offset); @@ -441,7 +344,7 @@ iot_writev (call_frame_t *frame, return 0; } - iot_queue (worker, stub); + iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -486,25 +389,6 @@ iot_lk (call_frame_t *frame, struct flock *flock) { call_stub_t *stub; - iot_local_t *local = NULL; - iot_file_t *file = NULL; - iot_worker_t *worker = NULL; - uint64_t tmp_file = 0; - - if (fd_ctx_get (fd, this, &tmp_file)) { - gf_log (this->name, GF_LOG_ERROR, - "fd context is NULL, returning EBADFD"); - STACK_UNWIND (frame, -1, EBADFD); - return 0; - } - - file = (iot_file_t *)(long)tmp_file; - worker = file->worker; - - local = CALLOC (1, sizeof (*local)); - ERR_ABORT (local); - frame->local = local; - stub = fop_lk_stub (frame, iot_lk_wrapper, fd, cmd, flock); @@ -513,9 +397,8 @@ iot_lk (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } - - iot_queue (worker, stub); + iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -552,17 +435,8 @@ iot_stat (call_frame_t *frame, loc_t *loc) { call_stub_t *stub; - iot_local_t *local = NULL; - iot_worker_t *worker = NULL; - iot_conf_t *conf; fd_t *fd = NULL; - conf = this->private; - - local = CALLOC (1, sizeof (*local)); - ERR_ABORT (local); - frame->local = local; - fd = fd_lookup (loc->inode, frame->root->pid); if (fd == NULL) { @@ -576,8 +450,6 @@ iot_stat (call_frame_t *frame, fd_unref (fd); - worker = iot_schedule (conf, NULL, loc->inode->ino); - stub = fop_stat_stub (frame, iot_stat_wrapper, loc); @@ -586,7 +458,7 @@ iot_stat (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } - iot_queue (worker, stub); + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -623,24 +495,6 @@ iot_fstat (call_frame_t *frame, fd_t *fd) { call_stub_t *stub; - iot_local_t *local = NULL; - iot_file_t *file = NULL; - iot_worker_t *worker = NULL; - uint64_t tmp_file = 0; - - if (fd_ctx_get (fd, this, &tmp_file)) { - gf_log (this->name, GF_LOG_ERROR, - "fd context is NULL, returning EBADFD"); - STACK_UNWIND (frame, -1, EBADFD); - return 0; - } - - file = (iot_file_t *)(long)tmp_file; - worker = file->worker; - - local = CALLOC (1, sizeof (*local)); - ERR_ABORT (local); - frame->local = local; stub = fop_fstat_stub (frame, iot_fstat_wrapper, fd); @@ -650,7 +504,7 @@ iot_fstat (call_frame_t *frame, return 0; } - iot_queue (worker, stub); + iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -689,16 +543,8 @@ iot_truncate (call_frame_t *frame, off_t offset) { call_stub_t *stub; - iot_local_t *local = NULL; - iot_worker_t *worker = NULL; - iot_conf_t *conf; fd_t *fd = NULL; - conf = this->private; - local = CALLOC (1, sizeof (*local)); - ERR_ABORT (local); - frame->local = local; - fd = fd_lookup (loc->inode, frame->root->pid); if (fd == NULL) { @@ -713,8 +559,6 @@ iot_truncate (call_frame_t *frame, fd_unref (fd); - worker = iot_schedule (conf, NULL, loc->inode->ino); - stub = fop_truncate_stub (frame, iot_truncate_wrapper, loc, @@ -724,7 +568,7 @@ iot_truncate (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } - iot_queue (worker, stub); + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -763,25 +607,6 @@ iot_ftruncate (call_frame_t *frame, off_t offset) { call_stub_t *stub; - iot_local_t *local = NULL; - iot_file_t *file = NULL; - iot_worker_t *worker = NULL; - uint64_t tmp_file = 0; - - if (fd_ctx_get (fd, this, &tmp_file)) { - gf_log (this->name, GF_LOG_ERROR, - "fd context is NULL, returning EBADFD"); - STACK_UNWIND (frame, -1, EBADFD); - return 0; - } - - file = (iot_file_t *)(long)tmp_file; - worker = file->worker; - - local = CALLOC (1, sizeof (*local)); - ERR_ABORT (local); - frame->local = local; - stub = fop_ftruncate_stub (frame, iot_ftruncate_wrapper, fd, @@ -791,7 +616,7 @@ iot_ftruncate (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } - iot_queue (worker, stub); + iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); return 0; } @@ -831,17 +656,8 @@ iot_utimens (call_frame_t *frame, struct timespec tv[2]) { call_stub_t *stub; - iot_local_t *local = NULL; - iot_worker_t *worker = NULL; - iot_conf_t *conf; fd_t *fd = NULL; - conf = this->private; - - local = CALLOC (1, sizeof (*local)); - ERR_ABORT (local); - frame->local = local; - fd = fd_lookup (loc->inode, frame->root->pid); if (fd == NULL) { @@ -856,8 +672,6 @@ iot_utimens (call_frame_t *frame, fd_unref (fd); - worker = iot_schedule (conf, NULL, loc->inode->ino); - stub = fop_utimens_stub (frame, iot_utimens_wrapper, loc, @@ -867,7 +681,7 @@ iot_utimens (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } - iot_queue (worker, stub); + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -909,16 +723,6 @@ iot_checksum (call_frame_t *frame, int32_t flags) { call_stub_t *stub = NULL; - iot_local_t *local = NULL; - iot_worker_t *worker = NULL; - iot_conf_t *conf = NULL; - - conf = this->private; - - local = CALLOC (1, sizeof (*local)); - frame->local = local; - - worker = iot_schedule (conf, NULL, conf->misc_thread_index++); stub = fop_checksum_stub (frame, iot_checksum_wrapper, @@ -929,7 +733,7 @@ iot_checksum (call_frame_t *frame, STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL); return 0; } - iot_queue (worker, stub); + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); return 0; } @@ -966,52 +770,14 @@ iot_unlink (call_frame_t *frame, loc_t *loc) { call_stub_t *stub = NULL; - iot_local_t *local = NULL; - iot_worker_t *worker = NULL; - iot_conf_t *conf = NULL; - - conf = this->private; - - local = CALLOC (1, sizeof (*local)); - frame->local = local; - - worker = iot_schedule (conf, NULL, conf->misc_thread_index++); - stub = fop_unlink_stub (frame, iot_unlink_wrapper, loc); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get fop_unlink call stub"); STACK_UNWIND (frame, -1, ENOMEM); return 0; } - iot_queue (worker, stub); - - return 0; -} - -int32_t -iot_release (xlator_t *this, - fd_t *fd) -{ - iot_file_t *file = NULL; - iot_conf_t *conf = NULL; - uint64_t tmp_file = 0; - int ret = 0; - - conf = this->private; - ret = fd_ctx_del (fd, this, &tmp_file); - if (ret) - return 0; - - file = (iot_file_t *)(long)tmp_file; - - pthread_mutex_lock (&conf->files_lock); - { - (file->prev)->next = file->next; - (file->next)->prev = file->prev; - } - pthread_mutex_unlock (&conf->files_lock); + iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); - FREE (file); return 0; } @@ -1020,24 +786,18 @@ static void iot_queue (iot_worker_t *worker, call_stub_t *stub) { - iot_queue_t *queue; + iot_request_t *req = NULL; - queue = CALLOC (1, sizeof (*queue)); - ERR_ABORT (queue); - queue->stub = stub; + req = CALLOC (1, sizeof (iot_request_t)); + ERR_ABORT (req); + req->stub = stub; pthread_mutex_lock (&worker->qlock); { - queue->next = &worker->queue; - queue->prev = worker->queue.prev; - - queue->next->prev = queue; - queue->prev->next = queue; + list_add_tail (&req->list, &worker->rqlist); /* dq_cond */ worker->queue_size++; - worker->q++; - pthread_cond_broadcast (&worker->dq_cond); } pthread_mutex_unlock (&worker->qlock); @@ -1047,25 +807,23 @@ static call_stub_t * iot_dequeue (iot_worker_t *worker) { call_stub_t *stub = NULL; - iot_queue_t *queue = NULL; + iot_request_t *req = NULL; pthread_mutex_lock (&worker->qlock); { while (!worker->queue_size) pthread_cond_wait (&worker->dq_cond, &worker->qlock); - queue = worker->queue.next; - queue->next->prev = queue->prev; - queue->prev->next = queue->next; - - stub = queue->stub; + list_for_each_entry (req, &worker->rqlist, list) + break; + list_del (&req->list); + stub = req->stub; worker->queue_size--; - worker->dq++; } pthread_mutex_unlock (&worker->qlock); - FREE (queue); + FREE (req); return stub; } @@ -1083,35 +841,57 @@ iot_worker (void *arg) } } -static void -workers_init (iot_conf_t *conf) +static iot_worker_t ** +allocate_worker_array (int count) { - int i; + iot_worker_t ** warr = NULL; - conf->workers.next = &conf->workers; - conf->workers.prev = &conf->workers; + warr = CALLOC (count, sizeof(iot_worker_t *)); + ERR_ABORT (warr); - for (i=0; i<conf->thread_count; i++) { + return warr; +} - iot_worker_t *worker = CALLOC (1, sizeof (*worker)); - ERR_ABORT (worker); +static iot_worker_t * +allocate_worker (iot_conf_t * conf) +{ + iot_worker_t *wrk = NULL; - worker->next = &conf->workers; - worker->prev = conf->workers.prev; - worker->next->prev = worker; - worker->prev->next = worker; + wrk = CALLOC (1, sizeof (iot_worker_t)); + ERR_ABORT (wrk); - worker->queue.next = &worker->queue; - worker->queue.prev = &worker->queue; + INIT_LIST_HEAD (&wrk->rqlist); + wrk->conf = conf; + pthread_cond_init (&wrk->dq_cond, NULL); + pthread_mutex_init (&wrk->qlock, NULL); - pthread_mutex_init (&worker->qlock, NULL); - pthread_cond_init (&worker->dq_cond, NULL); - worker->conf = conf; + return wrk; +} - pthread_create (&worker->thread, NULL, iot_worker, worker); - } +static void +allocate_workers (iot_conf_t *conf, + int count, + int start_alloc_idx) +{ + int i, end_count; + + end_count = count + start_alloc_idx; + for (i = start_alloc_idx; i < end_count; i++) { + conf->workers[i] = allocate_worker (conf); + pthread_create (&conf->workers[i]->thread, NULL, iot_worker, + conf->workers[i]); + } } +static void +workers_init (iot_conf_t *conf) +{ + conf->workers = allocate_worker_array (conf->thread_count); + allocate_workers (conf, conf->thread_count, 0); +} + + + int32_t init (xlator_t *this) { @@ -1144,10 +924,6 @@ init (xlator_t *this) conf->thread_count); } - conf->files.next = &conf->files; - conf->files.prev = &conf->files; - pthread_mutex_init (&conf->files_lock, NULL); - workers_init (conf); this->private = conf; @@ -1186,7 +962,6 @@ struct xlator_mops mops = { }; struct xlator_cbks cbks = { - .release = iot_release, }; struct volume_options options[] = { diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index 7606d0625..797552900 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -32,59 +32,39 @@ #include "dict.h" #include "xlator.h" #include "common-utils.h" +#include "list.h" #define min(a,b) ((a)<(b)?(a):(b)) #define max(a,b) ((a)>(b)?(a):(b)) struct iot_conf; struct iot_worker; -struct iot_queue; -struct iot_local; -struct iot_file; +struct iot_request; -struct iot_local { - struct iot_file *file; - size_t frame_size; -}; - -struct iot_queue { - struct iot_queue *next, *prev; +struct iot_request { + struct list_head list; /* Attaches this request to the list of + requests. + */ call_stub_t *stub; }; struct iot_worker { - struct iot_worker *next, *prev; - struct iot_queue queue; + struct list_head rqlist; /* List of requests assigned to me. */ struct iot_conf *conf; int64_t q,dq; pthread_cond_t dq_cond; pthread_mutex_t qlock; - int32_t fd_count; int32_t queue_size; pthread_t thread; }; -struct iot_file { - struct iot_file *next, *prev; /* all open files via this xlator */ - struct iot_worker *worker; - fd_t *fd; - int32_t pending_ops; -}; - struct iot_conf { int32_t thread_count; - int32_t misc_thread_index; /* Used to schedule the miscellaneous calls like checksum */ - struct iot_worker workers; - struct iot_file files; - pthread_mutex_t files_lock; - - pthread_cond_t q_cond; + struct iot_worker ** workers; }; -typedef struct iot_file iot_file_t; typedef struct iot_conf iot_conf_t; -typedef struct iot_local iot_local_t; typedef struct iot_worker iot_worker_t; -typedef struct iot_queue iot_queue_t; +typedef struct iot_request iot_request_t; #endif /* __IOT_H */ |
