summaryrefslogtreecommitdiffstats
path: root/libglusterfs
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs')
-rw-r--r--libglusterfs/src/syncop.c103
-rw-r--r--libglusterfs/src/syncop.h9
2 files changed, 81 insertions, 31 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index 9cceaf55..d2c8381a 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -28,7 +28,7 @@ __run (struct synctask *task)
case SYNCTASK_SUSPEND:
break;
case SYNCTASK_RUN:
- gf_log (task->xl->name, GF_LOG_WARNING,
+ gf_log (task->xl->name, GF_LOG_DEBUG,
"re-running already running task");
env->runcount--;
break;
@@ -38,7 +38,11 @@ __run (struct synctask *task)
case SYNCTASK_DONE:
gf_log (task->xl->name, GF_LOG_WARNING,
"running completed task");
- break;
+ return;
+ case SYNCTASK_ZOMBIE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "attempted to wake up zombie!!");
+ return;
}
list_add_tail (&task->all_tasks, &env->runq);
@@ -70,7 +74,11 @@ __wait (struct synctask *task)
case SYNCTASK_DONE:
gf_log (task->xl->name, GF_LOG_WARNING,
"running completed task");
- break;
+ return;
+ case SYNCTASK_ZOMBIE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "attempted to sleep a zombie!!");
+ return;
}
list_add_tail (&task->all_tasks, &env->waitq);
@@ -168,6 +176,7 @@ synctask_done (struct synctask *task)
pthread_mutex_lock (&task->mutex);
{
+ task->state = SYNCTASK_ZOMBIE;
task->done = 1;
pthread_cond_broadcast (&task->cond);
}
@@ -191,20 +200,19 @@ synctask_setid (struct synctask *task, uid_t uid, gid_t gid)
}
-int
-synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
- call_frame_t *frame, void *opaque)
+struct synctask *
+synctask_create (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
+ call_frame_t *frame, void *opaque)
{
struct synctask *newtask = NULL;
xlator_t *this = THIS;
- int ret = 0;
VALIDATE_OR_GOTO (env, err);
VALIDATE_OR_GOTO (fn, err);
newtask = CALLOC (1, sizeof (*newtask));
if (!newtask)
- return -ENOMEM;
+ return NULL;
newtask->frame = frame;
if (!frame) {
@@ -263,21 +271,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
*/
syncenv_scale(env);
- if (!cbk) {
- pthread_mutex_lock (&newtask->mutex);
- {
- while (!newtask->done) {
- pthread_cond_wait (&newtask->cond, &newtask->mutex);
- }
- }
- pthread_mutex_unlock (&newtask->mutex);
-
- ret = newtask->ret;
-
- synctask_destroy (newtask);
- }
-
- return ret;
+ return newtask;
err:
if (newtask) {
FREE (newtask->stack);
@@ -285,7 +279,46 @@ err:
STACK_DESTROY (newtask->opframe->root);
FREE (newtask);
}
- return -1;
+
+ return NULL;
+}
+
+
+int
+synctask_join (struct synctask *task)
+{
+ int ret = 0;
+
+ pthread_mutex_lock (&task->mutex);
+ {
+ while (!task->done)
+ pthread_cond_wait (&task->cond, &task->mutex);
+ }
+ pthread_mutex_unlock (&task->mutex);
+
+ ret = task->ret;
+
+ synctask_destroy (task);
+
+ return ret;
+}
+
+
+int
+synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
+ call_frame_t *frame, void *opaque)
+{
+ struct synctask *newtask = NULL;
+ int ret = 0;
+
+ newtask = synctask_create (env, fn, cbk, frame, opaque);
+ if (!newtask)
+ return -1;
+
+ if (!cbk)
+ ret = synctask_join (newtask);
+
+ return ret;
}
@@ -308,7 +341,7 @@ syncenv_task (struct syncproc *proc)
if (!list_empty (&env->runq))
break;
if ((ret == ETIMEDOUT) &&
- (env->procs > SYNCENV_PROC_MIN)) {
+ (env->procs > env->procmin)) {
task = NULL;
env->procs--;
memset (proc, 0, sizeof (*proc));
@@ -408,13 +441,13 @@ syncenv_scale (struct syncenv *env)
goto unlock;
scale = env->runcount;
- if (scale > SYNCENV_PROC_MAX)
- scale = SYNCENV_PROC_MAX;
+ if (scale > env->procmax)
+ scale = env->procmax;
if (scale > env->procs)
diff = scale - env->procs;
while (diff) {
diff--;
- for (; (i < SYNCENV_PROC_MAX); i++) {
+ for (; (i < env->procmax); i++) {
if (env->proc[i].processor == 0)
break;
}
@@ -441,12 +474,20 @@ syncenv_destroy (struct syncenv *env)
struct syncenv *
-syncenv_new (size_t stacksize)
+syncenv_new (size_t stacksize, int procmin, int procmax)
{
struct syncenv *newenv = NULL;
int ret = 0;
int i = 0;
+ if (!procmin || procmin < 0)
+ procmin = SYNCENV_PROC_MIN;
+ if (!procmax || procmax > SYNCENV_PROC_MAX)
+ procmax = SYNCENV_PROC_MAX;
+
+ if (procmin > procmax)
+ return NULL;
+
newenv = CALLOC (1, sizeof (*newenv));
if (!newenv)
@@ -461,8 +502,10 @@ syncenv_new (size_t stacksize)
newenv->stacksize = SYNCENV_DEFAULT_STACKSIZE;
if (stacksize)
newenv->stacksize = stacksize;
+ newenv->procmin = procmin;
+ newenv->procmax = procmax;
- for (i = 0; i < SYNCENV_PROC_MIN; i++) {
+ for (i = 0; i < newenv->procmin; i++) {
newenv->proc[i].env = newenv;
ret = pthread_create (&newenv->proc[i].processor, NULL,
syncenv_processor, &newenv->proc[i]);
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h
index c4b339ee..64350030 100644
--- a/libglusterfs/src/syncop.h
+++ b/libglusterfs/src/syncop.h
@@ -41,6 +41,7 @@ typedef enum {
SYNCTASK_SUSPEND,
SYNCTASK_WAIT,
SYNCTASK_DONE,
+ SYNCTASK_ZOMBIE,
} synctask_state_t;
/* for one sequential execution of @syncfn */
@@ -90,6 +91,9 @@ struct syncenv {
struct list_head waitq;
int waitcount;
+ int procmin;
+ int procmax;
+
pthread_mutex_t mutex;
pthread_cond_t cond;
@@ -219,11 +223,14 @@ struct syncargs {
#define SYNCENV_DEFAULT_STACKSIZE (2 * 1024 * 1024)
-struct syncenv * syncenv_new ();
+struct syncenv * syncenv_new (size_t stacksize, int procmin, int procmax);
void syncenv_destroy (struct syncenv *);
void syncenv_scale (struct syncenv *env);
int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *);
+struct synctask *synctask_create (struct syncenv *, synctask_fn_t,
+ synctask_cbk_t, call_frame_t *, void *);
+int synctask_join (struct synctask *task);
void synctask_wake (struct synctask *task);
void synctask_yield (struct synctask *task);
void synctask_waitfor (struct synctask *task, int count);