From bbcdbd8c36c7756f39bb7464cd1c69b67e66cfaa Mon Sep 17 00:00:00 2001 From: Anand Avati Date: Fri, 15 Mar 2013 11:20:53 -0700 Subject: synctask: minor enhancements - Enhance syncenv_new() to accept scaling parameters of syncproc. Previously the scaling parameters were hardcoded and decided at compile time. - New API synctask_create() which returns the created synctask. This is similar to synctask_new which only returned the status of whether a synctask could be created or not. The meaning of NULL cbk in synctask_create() means the task is "joinable". Until synctask_join() is called on such a synctask, the task is not reaped and resources are not destroyed. The task would be in a zombie state after synctask_fn returns and before synctask_join() is called. Change-Id: I368ec9037de9510d2ba951f0aad86aaf18d9a6b6 BUG: 986775 Signed-off-by: Anand Avati Reviewed-on: http://review.gluster.org/5365 Tested-by: Gluster Build System Reviewed-by: Brian Foster --- libglusterfs/src/syncop.c | 103 ++++++++++++++++++++++++++++++++-------------- libglusterfs/src/syncop.h | 9 +++- 2 files changed, 81 insertions(+), 31 deletions(-) (limited to 'libglusterfs') diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index 9cceaf55..d2c8381a 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -28,7 +28,7 @@ __run (struct synctask *task) case SYNCTASK_SUSPEND: break; case SYNCTASK_RUN: - gf_log (task->xl->name, GF_LOG_WARNING, + gf_log (task->xl->name, GF_LOG_DEBUG, "re-running already running task"); env->runcount--; break; @@ -38,7 +38,11 @@ __run (struct synctask *task) case SYNCTASK_DONE: gf_log (task->xl->name, GF_LOG_WARNING, "running completed task"); - break; + return; + case SYNCTASK_ZOMBIE: + gf_log (task->xl->name, GF_LOG_WARNING, + "attempted to wake up zombie!!"); + return; } list_add_tail (&task->all_tasks, &env->runq); @@ -70,7 +74,11 @@ __wait (struct synctask *task) case SYNCTASK_DONE: gf_log (task->xl->name, GF_LOG_WARNING, "running completed task"); - break; + return; + case SYNCTASK_ZOMBIE: + gf_log (task->xl->name, GF_LOG_WARNING, + "attempted to sleep a zombie!!"); + return; } list_add_tail (&task->all_tasks, &env->waitq); @@ -168,6 +176,7 @@ synctask_done (struct synctask *task) pthread_mutex_lock (&task->mutex); { + task->state = SYNCTASK_ZOMBIE; task->done = 1; pthread_cond_broadcast (&task->cond); } @@ -191,20 +200,19 @@ synctask_setid (struct synctask *task, uid_t uid, gid_t gid) } -int -synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, - call_frame_t *frame, void *opaque) +struct synctask * +synctask_create (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, + call_frame_t *frame, void *opaque) { struct synctask *newtask = NULL; xlator_t *this = THIS; - int ret = 0; VALIDATE_OR_GOTO (env, err); VALIDATE_OR_GOTO (fn, err); newtask = CALLOC (1, sizeof (*newtask)); if (!newtask) - return -ENOMEM; + return NULL; newtask->frame = frame; if (!frame) { @@ -263,21 +271,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, */ syncenv_scale(env); - if (!cbk) { - pthread_mutex_lock (&newtask->mutex); - { - while (!newtask->done) { - pthread_cond_wait (&newtask->cond, &newtask->mutex); - } - } - pthread_mutex_unlock (&newtask->mutex); - - ret = newtask->ret; - - synctask_destroy (newtask); - } - - return ret; + return newtask; err: if (newtask) { FREE (newtask->stack); @@ -285,7 +279,46 @@ err: STACK_DESTROY (newtask->opframe->root); FREE (newtask); } - return -1; + + return NULL; +} + + +int +synctask_join (struct synctask *task) +{ + int ret = 0; + + pthread_mutex_lock (&task->mutex); + { + while (!task->done) + pthread_cond_wait (&task->cond, &task->mutex); + } + pthread_mutex_unlock (&task->mutex); + + ret = task->ret; + + synctask_destroy (task); + + return ret; +} + + +int +synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, + call_frame_t *frame, void *opaque) +{ + struct synctask *newtask = NULL; + int ret = 0; + + newtask = synctask_create (env, fn, cbk, frame, opaque); + if (!newtask) + return -1; + + if (!cbk) + ret = synctask_join (newtask); + + return ret; } @@ -308,7 +341,7 @@ syncenv_task (struct syncproc *proc) if (!list_empty (&env->runq)) break; if ((ret == ETIMEDOUT) && - (env->procs > SYNCENV_PROC_MIN)) { + (env->procs > env->procmin)) { task = NULL; env->procs--; memset (proc, 0, sizeof (*proc)); @@ -408,13 +441,13 @@ syncenv_scale (struct syncenv *env) goto unlock; scale = env->runcount; - if (scale > SYNCENV_PROC_MAX) - scale = SYNCENV_PROC_MAX; + if (scale > env->procmax) + scale = env->procmax; if (scale > env->procs) diff = scale - env->procs; while (diff) { diff--; - for (; (i < SYNCENV_PROC_MAX); i++) { + for (; (i < env->procmax); i++) { if (env->proc[i].processor == 0) break; } @@ -441,12 +474,20 @@ syncenv_destroy (struct syncenv *env) struct syncenv * -syncenv_new (size_t stacksize) +syncenv_new (size_t stacksize, int procmin, int procmax) { struct syncenv *newenv = NULL; int ret = 0; int i = 0; + if (!procmin || procmin < 0) + procmin = SYNCENV_PROC_MIN; + if (!procmax || procmax > SYNCENV_PROC_MAX) + procmax = SYNCENV_PROC_MAX; + + if (procmin > procmax) + return NULL; + newenv = CALLOC (1, sizeof (*newenv)); if (!newenv) @@ -461,8 +502,10 @@ syncenv_new (size_t stacksize) newenv->stacksize = SYNCENV_DEFAULT_STACKSIZE; if (stacksize) newenv->stacksize = stacksize; + newenv->procmin = procmin; + newenv->procmax = procmax; - for (i = 0; i < SYNCENV_PROC_MIN; i++) { + for (i = 0; i < newenv->procmin; i++) { newenv->proc[i].env = newenv; ret = pthread_create (&newenv->proc[i].processor, NULL, syncenv_processor, &newenv->proc[i]); diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index c4b339ee..64350030 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -41,6 +41,7 @@ typedef enum { SYNCTASK_SUSPEND, SYNCTASK_WAIT, SYNCTASK_DONE, + SYNCTASK_ZOMBIE, } synctask_state_t; /* for one sequential execution of @syncfn */ @@ -90,6 +91,9 @@ struct syncenv { struct list_head waitq; int waitcount; + int procmin; + int procmax; + pthread_mutex_t mutex; pthread_cond_t cond; @@ -219,11 +223,14 @@ struct syncargs { #define SYNCENV_DEFAULT_STACKSIZE (2 * 1024 * 1024) -struct syncenv * syncenv_new (); +struct syncenv * syncenv_new (size_t stacksize, int procmin, int procmax); void syncenv_destroy (struct syncenv *); void syncenv_scale (struct syncenv *env); int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *); +struct synctask *synctask_create (struct syncenv *, synctask_fn_t, + synctask_cbk_t, call_frame_t *, void *); +int synctask_join (struct synctask *task); void synctask_wake (struct synctask *task); void synctask_yield (struct synctask *task); void synctask_waitfor (struct synctask *task, int count); -- cgit