diff options
author | Anand Avati <avati@redhat.com> | 2012-02-21 09:25:14 +0530 |
---|---|---|
committer | Anand Avati <avati@redhat.com> | 2012-02-20 21:12:08 -0800 |
commit | 1206437fcfc1f3e1bd4a6faec3341c240bae5cf2 (patch) | |
tree | aec03c585583007ee57d3053b62dfe40e06700ef | |
parent | dfc88bf3727fb33e2fc273bd7f24401e0209f39e (diff) |
syncop: Multi-processor support in syncenv
This patch introduces:
- multithreading of syncop processors permitting synctasks to be executed
concurrently if the runqueue has many tasks.
- Auto scaling of syncop processors based on runqueue length.
- Execute a synctask (synctask_new) in a blocking way if callback function
is set NULL. The return value of the syncfn will be the return value
of synctask_new()
Change-Id: Iff369709af9adfd07be3386842876a24e1a5a9b5
BUG: 763820
Reviewed-on: http://review.gluster.com/443
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
-rw-r--r-- | libglusterfs/src/syncop.c | 232 | ||||
-rw-r--r-- | libglusterfs/src/syncop.h | 51 |
2 files changed, 216 insertions, 67 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index 712e5b1f239..096c29efe0a 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -39,42 +39,76 @@ syncop_create_frame () return (call_frame_t *)frame; } -void -synctask_yield (struct synctask *task) + +static void +__run (struct synctask *task) { - struct syncenv *env = NULL; + struct syncenv *env = NULL; env = task->env; - if (swapcontext (&task->ctx, &env->sched) < 0) { - gf_log ("syncop", GF_LOG_ERROR, - "swapcontext failed (%s)", strerror (errno)); - } -} - - -void -synctask_yawn (struct synctask *task) + list_del_init (&task->all_tasks); + switch (task->state) { + case SYNCTASK_INIT: + break; + case SYNCTASK_RUN: + gf_log (task->xl->name, GF_LOG_WARNING, + "re-running already running task"); + env->runcount--; + break; + case SYNCTASK_WAIT: + env->waitcount--; + break; + case SYNCTASK_DONE: + gf_log (task->xl->name, GF_LOG_WARNING, + "running completed task"); + break; + } + + list_add_tail (&task->all_tasks, &env->runq); + env->runcount++; + task->state = SYNCTASK_RUN; +} + + +static void +__wait (struct synctask *task) { - struct syncenv *env = NULL; + struct syncenv *env = NULL; - env = task->env; + env = task->env; - pthread_mutex_lock (&env->mutex); - { - list_del_init (&task->all_tasks); - list_add (&task->all_tasks, &env->waitq); - } - pthread_mutex_unlock (&env->mutex); + list_del_init (&task->all_tasks); + switch (task->state) { + case SYNCTASK_INIT: + break; + case SYNCTASK_RUN: + env->runcount--; + break; + case SYNCTASK_WAIT: + gf_log (task->xl->name, GF_LOG_WARNING, + "re-waiting already waiting task"); + env->waitcount--; + break; + case SYNCTASK_DONE: + gf_log (task->xl->name, GF_LOG_WARNING, + "running completed task"); + break; + } + + list_add_tail (&task->all_tasks, &env->waitq); + env->waitcount++; + task->state = SYNCTASK_WAIT; } void -synctask_zzzz (struct synctask *task) +synctask_yield (struct synctask *task) { - synctask_yawn (task); - - synctask_yield (task); + if (swapcontext (&task->ctx, &task->proc->sched) < 0) { + gf_log ("syncop", GF_LOG_ERROR, + "swapcontext failed (%s)", strerror (errno)); + } } @@ -87,8 +121,10 @@ synctask_wake (struct synctask *task) pthread_mutex_lock (&env->mutex); { - list_del_init (&task->all_tasks); - list_add_tail (&task->all_tasks, &env->runq); + task->woken = 1; + + if (task->slept) + __run (task); } pthread_mutex_unlock (&env->mutex); @@ -99,21 +135,17 @@ synctask_wake (struct synctask *task) void synctask_wrap (struct synctask *old_task) { - int ret; struct synctask *task = NULL; /* Do not trust the pointer received. It may be wrong and can lead to crashes. */ task = synctask_get (); - ret = task->syncfn (task->opaque); - task->synccbk (ret, task->frame, task->opaque); + task->ret = task->syncfn (task->opaque); + if (task->synccbk) + task->synccbk (task->ret, task->frame, task->opaque); - /* cannot destroy @task right here as we are - in the execution stack of @task itself - */ - task->complete = 1; - synctask_wake (task); + task->state = SYNCTASK_DONE; synctask_yield (task); } @@ -127,20 +159,42 @@ synctask_destroy (struct synctask *task) if (task->stack) FREE (task->stack); + + pthread_mutex_destroy (&task->mutex); + + pthread_cond_destroy (&task->cond); + FREE (task); } +void +synctask_done (struct synctask *task) +{ + if (task->synccbk) { + synctask_destroy (task); + return; + } + + pthread_mutex_lock (&task->mutex); + { + task->done = 1; + pthread_cond_broadcast (&task->cond); + } + pthread_mutex_unlock (&task->mutex); +} + + int synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, call_frame_t *frame, void *opaque) { struct synctask *newtask = NULL; xlator_t *this = THIS; + int ret = 0; VALIDATE_OR_GOTO (env, err); VALIDATE_OR_GOTO (fn, err); - VALIDATE_OR_GOTO (cbk, err); VALIDATE_OR_GOTO (frame, err); newtask = CALLOC (1, sizeof (*newtask)); @@ -150,7 +204,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, newtask->env = env; newtask->xl = this; newtask->syncfn = fn; - newtask->synccbk = cbk; + newtask->synccbk = cbk; newtask->opaque = opaque; newtask->frame = frame; @@ -175,9 +229,33 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask); + newtask->state = SYNCTASK_INIT; + + newtask->slept = 1; + + if (!cbk) { + pthread_mutex_init (&newtask->mutex, NULL); + pthread_cond_init (&newtask->cond, NULL); + newtask->done = 0; + } + synctask_wake (newtask); - return 0; + if (!cbk) { + pthread_mutex_lock (&newtask->mutex); + { + while (!newtask->done) { + pthread_cond_wait (&newtask->cond, &newtask->mutex); + } + } + pthread_mutex_unlock (&newtask->mutex); + + ret = newtask->ret; + + synctask_destroy (newtask); + } + + return ret; err: if (newtask) { if (newtask->stack) @@ -189,10 +267,13 @@ err: struct synctask * -syncenv_task (struct syncenv *env) +syncenv_task (struct syncproc *proc) { + struct syncenv *env = NULL; struct synctask *task = NULL; + env = proc->env; + pthread_mutex_lock (&env->mutex); { while (list_empty (&env->runq)) @@ -201,6 +282,9 @@ syncenv_task (struct syncenv *env) task = list_entry (env->runq.next, struct synctask, all_tasks); list_del_init (&task->all_tasks); + env->runcount--; + + task->proc = proc; } pthread_mutex_unlock (&env->mutex); @@ -218,10 +302,29 @@ synctask_switchto (struct synctask *task) synctask_set (task); THIS = task->xl; - if (swapcontext (&env->sched, &task->ctx) < 0) { + task->woken = 0; + task->slept = 0; + + if (swapcontext (&task->proc->sched, &task->ctx) < 0) { gf_log ("syncop", GF_LOG_ERROR, "swapcontext failed (%s)", strerror (errno)); } + + if (task->state == SYNCTASK_DONE) { + synctask_done (task); + return; + } + + pthread_mutex_lock (&env->mutex); + { + if (task->woken) { + __run (task); + } else { + task->slept = 1; + __wait (task); + } + } + pthread_mutex_unlock (&env->mutex); } @@ -229,19 +332,18 @@ void * syncenv_processor (void *thdata) { struct syncenv *env = NULL; + struct syncproc *proc = NULL; struct synctask *task = NULL; - env = thdata; + proc = thdata; + env = proc->env; for (;;) { - task = syncenv_task (env); - - if (task->complete) { - synctask_destroy (task); - continue; - } + task = syncenv_task (proc); synctask_switchto (task); + + syncenv_scale (env); } return NULL; @@ -249,6 +351,33 @@ syncenv_processor (void *thdata) void +syncenv_scale (struct syncenv *env) +{ + int thmax = 0; + int i = 0; + int ret = 0; + + pthread_mutex_lock (&env->mutex); + { + if (env->procs > env->runcount) + goto unlock; + + thmax = max (env->runcount, SYNCENV_PROC_MAX); + for (i = env->procs; i < thmax; i++) { + env->proc[i].env = env; + ret = pthread_create (&env->proc[i].processor, NULL, + syncenv_processor, &env->proc[i]); + if (ret) + break; + env->procs++; + } + } +unlock: + pthread_mutex_unlock (&env->mutex); +} + + +void syncenv_destroy (struct syncenv *env) { @@ -260,6 +389,7 @@ syncenv_new (size_t stacksize) { struct syncenv *newenv = NULL; int ret = 0; + int i = 0; newenv = CALLOC (1, sizeof (*newenv)); @@ -276,8 +406,14 @@ syncenv_new (size_t stacksize) if (stacksize) newenv->stacksize = stacksize; - ret = pthread_create (&newenv->processor, NULL, - syncenv_processor, newenv); + for (i = 0; i < SYNCENV_PROC_MIN; i++) { + newenv->proc[i].env = newenv; + ret = pthread_create (&newenv->proc[i].processor, NULL, + syncenv_processor, &newenv->proc[i]); + if (ret) + break; + newenv->procs++; + } if (ret != 0) syncenv_destroy (newenv); diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index 7d8a2cb0230..9554edb7250 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -30,8 +30,11 @@ #include <pthread.h> #include <ucontext.h> +#define SYNCENV_PROC_MAX 16 +#define SYNCENV_PROC_MIN 2 struct synctask; +struct syncproc; struct syncenv; @@ -40,6 +43,13 @@ typedef int (*synctask_cbk_t) (int ret, call_frame_t *frame, void *opaque); typedef int (*synctask_fn_t) (void *opaque); +typedef enum { + SYNCTASK_INIT = 0, + SYNCTASK_RUN, + SYNCTASK_WAIT, + SYNCTASK_DONE, +} synctask_state_t; + /* for one sequential execution of @syncfn */ struct synctask { struct list_head all_tasks; @@ -48,25 +58,43 @@ struct synctask { call_frame_t *frame; synctask_cbk_t synccbk; synctask_fn_t syncfn; + synctask_state_t state; void *opaque; void *stack; + int woken; + int slept; int complete; + int ret; ucontext_t ctx; + struct syncproc *proc; + + pthread_mutex_t mutex; /* for synchronous spawning of synctask */ + pthread_cond_t cond; + int done; }; -/* hosts the scheduler thread and framework for executing synctasks */ -struct syncenv { + +struct syncproc { pthread_t processor; + ucontext_t sched; + struct syncenv *env; struct synctask *current; +}; + +/* hosts the scheduler thread and framework for executing synctasks */ +struct syncenv { + struct syncproc proc[SYNCENV_PROC_MAX]; + int procs; struct list_head runq; + int runcount; struct list_head waitq; + int waitcount; pthread_mutex_t mutex; pthread_cond_t cond; - ucontext_t sched; size_t stacksize; }; @@ -92,20 +120,6 @@ struct syncargs { }; -#define __yawn(args) do { \ - struct synctask *task = NULL; \ - \ - task = synctask_get (); \ - if (task) { \ - args->task = task; \ - synctask_yawn (task); \ - } else { \ - pthread_mutex_init (&args->mutex, NULL); \ - pthread_cond_init (&args->cond, NULL); \ - } \ - } while (0) - - #define __yield(args) do { \ if (args->task) { \ synctask_yield (args->task); \ @@ -143,7 +157,6 @@ struct syncargs { \ frame = syncop_create_frame (); \ \ - __yawn (stb); \ STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, op, params); \ __yield (stb); \ } while (0) @@ -153,10 +166,10 @@ struct syncargs { struct syncenv * syncenv_new (); void syncenv_destroy (struct syncenv *); +void syncenv_scale (struct syncenv *env); int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *); void synctask_zzzz (struct synctask *task); -void synctask_yawn (struct synctask *task); void synctask_wake (struct synctask *task); void synctask_yield (struct synctask *task); |