diff options
Diffstat (limited to 'libglusterfs/src/syncop.c')
-rw-r--r-- | libglusterfs/src/syncop.c | 232 |
1 files changed, 184 insertions, 48 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index 712e5b1f239..096c29efe0a 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -39,42 +39,76 @@ syncop_create_frame () return (call_frame_t *)frame; } -void -synctask_yield (struct synctask *task) + +static void +__run (struct synctask *task) { - struct syncenv *env = NULL; + struct syncenv *env = NULL; env = task->env; - if (swapcontext (&task->ctx, &env->sched) < 0) { - gf_log ("syncop", GF_LOG_ERROR, - "swapcontext failed (%s)", strerror (errno)); - } -} - - -void -synctask_yawn (struct synctask *task) + list_del_init (&task->all_tasks); + switch (task->state) { + case SYNCTASK_INIT: + break; + case SYNCTASK_RUN: + gf_log (task->xl->name, GF_LOG_WARNING, + "re-running already running task"); + env->runcount--; + break; + case SYNCTASK_WAIT: + env->waitcount--; + break; + case SYNCTASK_DONE: + gf_log (task->xl->name, GF_LOG_WARNING, + "running completed task"); + break; + } + + list_add_tail (&task->all_tasks, &env->runq); + env->runcount++; + task->state = SYNCTASK_RUN; +} + + +static void +__wait (struct synctask *task) { - struct syncenv *env = NULL; + struct syncenv *env = NULL; - env = task->env; + env = task->env; - pthread_mutex_lock (&env->mutex); - { - list_del_init (&task->all_tasks); - list_add (&task->all_tasks, &env->waitq); - } - pthread_mutex_unlock (&env->mutex); + list_del_init (&task->all_tasks); + switch (task->state) { + case SYNCTASK_INIT: + break; + case SYNCTASK_RUN: + env->runcount--; + break; + case SYNCTASK_WAIT: + gf_log (task->xl->name, GF_LOG_WARNING, + "re-waiting already waiting task"); + env->waitcount--; + break; + case SYNCTASK_DONE: + gf_log (task->xl->name, GF_LOG_WARNING, + "running completed task"); + break; + } + + list_add_tail (&task->all_tasks, &env->waitq); + env->waitcount++; + task->state = SYNCTASK_WAIT; } void -synctask_zzzz (struct synctask *task) +synctask_yield (struct synctask *task) { - synctask_yawn (task); - - synctask_yield (task); + if (swapcontext (&task->ctx, &task->proc->sched) < 0) { + gf_log ("syncop", GF_LOG_ERROR, + "swapcontext failed (%s)", strerror (errno)); + } } @@ -87,8 +121,10 @@ synctask_wake (struct synctask *task) pthread_mutex_lock (&env->mutex); { - list_del_init (&task->all_tasks); - list_add_tail (&task->all_tasks, &env->runq); + task->woken = 1; + + if (task->slept) + __run (task); } pthread_mutex_unlock (&env->mutex); @@ -99,21 +135,17 @@ synctask_wake (struct synctask *task) void synctask_wrap (struct synctask *old_task) { - int ret; struct synctask *task = NULL; /* Do not trust the pointer received. It may be wrong and can lead to crashes. */ task = synctask_get (); - ret = task->syncfn (task->opaque); - task->synccbk (ret, task->frame, task->opaque); + task->ret = task->syncfn (task->opaque); + if (task->synccbk) + task->synccbk (task->ret, task->frame, task->opaque); - /* cannot destroy @task right here as we are - in the execution stack of @task itself - */ - task->complete = 1; - synctask_wake (task); + task->state = SYNCTASK_DONE; synctask_yield (task); } @@ -127,20 +159,42 @@ synctask_destroy (struct synctask *task) if (task->stack) FREE (task->stack); + + pthread_mutex_destroy (&task->mutex); + + pthread_cond_destroy (&task->cond); + FREE (task); } +void +synctask_done (struct synctask *task) +{ + if (task->synccbk) { + synctask_destroy (task); + return; + } + + pthread_mutex_lock (&task->mutex); + { + task->done = 1; + pthread_cond_broadcast (&task->cond); + } + pthread_mutex_unlock (&task->mutex); +} + + int synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, call_frame_t *frame, void *opaque) { struct synctask *newtask = NULL; xlator_t *this = THIS; + int ret = 0; VALIDATE_OR_GOTO (env, err); VALIDATE_OR_GOTO (fn, err); - VALIDATE_OR_GOTO (cbk, err); VALIDATE_OR_GOTO (frame, err); newtask = CALLOC (1, sizeof (*newtask)); @@ -150,7 +204,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, newtask->env = env; newtask->xl = this; newtask->syncfn = fn; - newtask->synccbk = cbk; + newtask->synccbk = cbk; newtask->opaque = opaque; newtask->frame = frame; @@ -175,9 +229,33 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask); + newtask->state = SYNCTASK_INIT; + + newtask->slept = 1; + + if (!cbk) { + pthread_mutex_init (&newtask->mutex, NULL); + pthread_cond_init (&newtask->cond, NULL); + newtask->done = 0; + } + synctask_wake (newtask); - return 0; + if (!cbk) { + pthread_mutex_lock (&newtask->mutex); + { + while (!newtask->done) { + pthread_cond_wait (&newtask->cond, &newtask->mutex); + } + } + pthread_mutex_unlock (&newtask->mutex); + + ret = newtask->ret; + + synctask_destroy (newtask); + } + + return ret; err: if (newtask) { if (newtask->stack) @@ -189,10 +267,13 @@ err: struct synctask * -syncenv_task (struct syncenv *env) +syncenv_task (struct syncproc *proc) { + struct syncenv *env = NULL; struct synctask *task = NULL; + env = proc->env; + pthread_mutex_lock (&env->mutex); { while (list_empty (&env->runq)) @@ -201,6 +282,9 @@ syncenv_task (struct syncenv *env) task = list_entry (env->runq.next, struct synctask, all_tasks); list_del_init (&task->all_tasks); + env->runcount--; + + task->proc = proc; } pthread_mutex_unlock (&env->mutex); @@ -218,10 +302,29 @@ synctask_switchto (struct synctask *task) synctask_set (task); THIS = task->xl; - if (swapcontext (&env->sched, &task->ctx) < 0) { + task->woken = 0; + task->slept = 0; + + if (swapcontext (&task->proc->sched, &task->ctx) < 0) { gf_log ("syncop", GF_LOG_ERROR, "swapcontext failed (%s)", strerror (errno)); } + + if (task->state == SYNCTASK_DONE) { + synctask_done (task); + return; + } + + pthread_mutex_lock (&env->mutex); + { + if (task->woken) { + __run (task); + } else { + task->slept = 1; + __wait (task); + } + } + pthread_mutex_unlock (&env->mutex); } @@ -229,19 +332,18 @@ void * syncenv_processor (void *thdata) { struct syncenv *env = NULL; + struct syncproc *proc = NULL; struct synctask *task = NULL; - env = thdata; + proc = thdata; + env = proc->env; for (;;) { - task = syncenv_task (env); - - if (task->complete) { - synctask_destroy (task); - continue; - } + task = syncenv_task (proc); synctask_switchto (task); + + syncenv_scale (env); } return NULL; @@ -249,6 +351,33 @@ syncenv_processor (void *thdata) void +syncenv_scale (struct syncenv *env) +{ + int thmax = 0; + int i = 0; + int ret = 0; + + pthread_mutex_lock (&env->mutex); + { + if (env->procs > env->runcount) + goto unlock; + + thmax = max (env->runcount, SYNCENV_PROC_MAX); + for (i = env->procs; i < thmax; i++) { + env->proc[i].env = env; + ret = pthread_create (&env->proc[i].processor, NULL, + syncenv_processor, &env->proc[i]); + if (ret) + break; + env->procs++; + } + } +unlock: + pthread_mutex_unlock (&env->mutex); +} + + +void syncenv_destroy (struct syncenv *env) { @@ -260,6 +389,7 @@ syncenv_new (size_t stacksize) { struct syncenv *newenv = NULL; int ret = 0; + int i = 0; newenv = CALLOC (1, sizeof (*newenv)); @@ -276,8 +406,14 @@ syncenv_new (size_t stacksize) if (stacksize) newenv->stacksize = stacksize; - ret = pthread_create (&newenv->processor, NULL, - syncenv_processor, newenv); + for (i = 0; i < SYNCENV_PROC_MIN; i++) { + newenv->proc[i].env = newenv; + ret = pthread_create (&newenv->proc[i].processor, NULL, + syncenv_processor, &newenv->proc[i]); + if (ret) + break; + newenv->procs++; + } if (ret != 0) syncenv_destroy (newenv); |