diff options
-rw-r--r-- | api/src/glfs.c | 2 | ||||
-rw-r--r-- | glusterfsd/src/glusterfsd.c | 2 | ||||
-rw-r--r-- | libglusterfs/src/syncop.c | 103 | ||||
-rw-r--r-- | libglusterfs/src/syncop.h | 9 | ||||
-rw-r--r-- | xlators/cluster/afr/src/pump.c | 5 |
5 files changed, 84 insertions, 37 deletions
diff --git a/api/src/glfs.c b/api/src/glfs.c index 9fa0a467b02..2200358c651 100644 --- a/api/src/glfs.c +++ b/api/src/glfs.c @@ -85,7 +85,7 @@ glusterfs_ctx_defaults_init (glusterfs_ctx_t *ctx) goto err; } - ctx->env = syncenv_new (0); + ctx->env = syncenv_new (0, 0, 0); if (!ctx->env) { goto err; } diff --git a/glusterfsd/src/glusterfsd.c b/glusterfsd/src/glusterfsd.c index 9ef2d00a3c9..7b2ccd6eb1a 100644 --- a/glusterfsd/src/glusterfsd.c +++ b/glusterfsd/src/glusterfsd.c @@ -1957,7 +1957,7 @@ main (int argc, char *argv[]) if (ret) goto out; - ctx->env = syncenv_new (0); + ctx->env = syncenv_new (0, 0, 0); if (!ctx->env) { gf_log ("", GF_LOG_ERROR, "Could not create new sync-environment"); diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index 9cceaf55c5c..d2c8381a3d3 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -28,7 +28,7 @@ __run (struct synctask *task) case SYNCTASK_SUSPEND: break; case SYNCTASK_RUN: - gf_log (task->xl->name, GF_LOG_WARNING, + gf_log (task->xl->name, GF_LOG_DEBUG, "re-running already running task"); env->runcount--; break; @@ -38,7 +38,11 @@ __run (struct synctask *task) case SYNCTASK_DONE: gf_log (task->xl->name, GF_LOG_WARNING, "running completed task"); - break; + return; + case SYNCTASK_ZOMBIE: + gf_log (task->xl->name, GF_LOG_WARNING, + "attempted to wake up zombie!!"); + return; } list_add_tail (&task->all_tasks, &env->runq); @@ -70,7 +74,11 @@ __wait (struct synctask *task) case SYNCTASK_DONE: gf_log (task->xl->name, GF_LOG_WARNING, "running completed task"); - break; + return; + case SYNCTASK_ZOMBIE: + gf_log (task->xl->name, GF_LOG_WARNING, + "attempted to sleep a zombie!!"); + return; } list_add_tail (&task->all_tasks, &env->waitq); @@ -168,6 +176,7 @@ synctask_done (struct synctask *task) pthread_mutex_lock (&task->mutex); { + task->state = SYNCTASK_ZOMBIE; task->done = 1; pthread_cond_broadcast (&task->cond); } @@ -191,20 +200,19 @@ synctask_setid (struct synctask *task, uid_t uid, gid_t gid) } -int -synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, - call_frame_t *frame, void *opaque) +struct synctask * +synctask_create (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, + call_frame_t *frame, void *opaque) { struct synctask *newtask = NULL; xlator_t *this = THIS; - int ret = 0; VALIDATE_OR_GOTO (env, err); VALIDATE_OR_GOTO (fn, err); newtask = CALLOC (1, sizeof (*newtask)); if (!newtask) - return -ENOMEM; + return NULL; newtask->frame = frame; if (!frame) { @@ -263,21 +271,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, */ syncenv_scale(env); - if (!cbk) { - pthread_mutex_lock (&newtask->mutex); - { - while (!newtask->done) { - pthread_cond_wait (&newtask->cond, &newtask->mutex); - } - } - pthread_mutex_unlock (&newtask->mutex); - - ret = newtask->ret; - - synctask_destroy (newtask); - } - - return ret; + return newtask; err: if (newtask) { FREE (newtask->stack); @@ -285,7 +279,46 @@ err: STACK_DESTROY (newtask->opframe->root); FREE (newtask); } - return -1; + + return NULL; +} + + +int +synctask_join (struct synctask *task) +{ + int ret = 0; + + pthread_mutex_lock (&task->mutex); + { + while (!task->done) + pthread_cond_wait (&task->cond, &task->mutex); + } + pthread_mutex_unlock (&task->mutex); + + ret = task->ret; + + synctask_destroy (task); + + return ret; +} + + +int +synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, + call_frame_t *frame, void *opaque) +{ + struct synctask *newtask = NULL; + int ret = 0; + + newtask = synctask_create (env, fn, cbk, frame, opaque); + if (!newtask) + return -1; + + if (!cbk) + ret = synctask_join (newtask); + + return ret; } @@ -308,7 +341,7 @@ syncenv_task (struct syncproc *proc) if (!list_empty (&env->runq)) break; if ((ret == ETIMEDOUT) && - (env->procs > SYNCENV_PROC_MIN)) { + (env->procs > env->procmin)) { task = NULL; env->procs--; memset (proc, 0, sizeof (*proc)); @@ -408,13 +441,13 @@ syncenv_scale (struct syncenv *env) goto unlock; scale = env->runcount; - if (scale > SYNCENV_PROC_MAX) - scale = SYNCENV_PROC_MAX; + if (scale > env->procmax) + scale = env->procmax; if (scale > env->procs) diff = scale - env->procs; while (diff) { diff--; - for (; (i < SYNCENV_PROC_MAX); i++) { + for (; (i < env->procmax); i++) { if (env->proc[i].processor == 0) break; } @@ -441,12 +474,20 @@ syncenv_destroy (struct syncenv *env) struct syncenv * -syncenv_new (size_t stacksize) +syncenv_new (size_t stacksize, int procmin, int procmax) { struct syncenv *newenv = NULL; int ret = 0; int i = 0; + if (!procmin || procmin < 0) + procmin = SYNCENV_PROC_MIN; + if (!procmax || procmax > SYNCENV_PROC_MAX) + procmax = SYNCENV_PROC_MAX; + + if (procmin > procmax) + return NULL; + newenv = CALLOC (1, sizeof (*newenv)); if (!newenv) @@ -461,8 +502,10 @@ syncenv_new (size_t stacksize) newenv->stacksize = SYNCENV_DEFAULT_STACKSIZE; if (stacksize) newenv->stacksize = stacksize; + newenv->procmin = procmin; + newenv->procmax = procmax; - for (i = 0; i < SYNCENV_PROC_MIN; i++) { + for (i = 0; i < newenv->procmin; i++) { newenv->proc[i].env = newenv; ret = pthread_create (&newenv->proc[i].processor, NULL, syncenv_processor, &newenv->proc[i]); diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index c4b339ee7ba..64350030e56 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -41,6 +41,7 @@ typedef enum { SYNCTASK_SUSPEND, SYNCTASK_WAIT, SYNCTASK_DONE, + SYNCTASK_ZOMBIE, } synctask_state_t; /* for one sequential execution of @syncfn */ @@ -90,6 +91,9 @@ struct syncenv { struct list_head waitq; int waitcount; + int procmin; + int procmax; + pthread_mutex_t mutex; pthread_cond_t cond; @@ -219,11 +223,14 @@ struct syncargs { #define SYNCENV_DEFAULT_STACKSIZE (2 * 1024 * 1024) -struct syncenv * syncenv_new (); +struct syncenv * syncenv_new (size_t stacksize, int procmin, int procmax); void syncenv_destroy (struct syncenv *); void syncenv_scale (struct syncenv *env); int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *); +struct synctask *synctask_create (struct syncenv *, synctask_fn_t, + synctask_cbk_t, call_frame_t *, void *); +int synctask_join (struct synctask *task); void synctask_wake (struct synctask *task); void synctask_yield (struct synctask *task); void synctask_waitfor (struct synctask *task, int count); diff --git a/xlators/cluster/afr/src/pump.c b/xlators/cluster/afr/src/pump.c index c7b85869114..a7f72fb30e9 100644 --- a/xlators/cluster/afr/src/pump.c +++ b/xlators/cluster/afr/src/pump.c @@ -2538,7 +2538,7 @@ init (xlator_t *this) goto out; } - pump_priv->env = syncenv_new (0); + pump_priv->env = this->ctx->env; if (!pump_priv->env) { gf_log (this->name, GF_LOG_ERROR, "Could not create new sync-environment"); @@ -2579,9 +2579,6 @@ fini (xlator_t *this) if (!pump_priv) goto afr_priv; - if (pump_priv->env) - syncenv_destroy (pump_priv->env); - GF_FREE (pump_priv->resume_path); LOCK_DESTROY (&pump_priv->resume_path_lock); LOCK_DESTROY (&pump_priv->pump_state_lock); |