summaryrefslogtreecommitdiffstats
path: root/libglusterfs
diff options
context:
space:
mode:
Diffstat (limited to 'libglusterfs')
-rw-r--r--libglusterfs/src/syncop.c232
-rw-r--r--libglusterfs/src/syncop.h51
2 files changed, 216 insertions, 67 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index 712e5b1f239..096c29efe0a 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -39,42 +39,76 @@ syncop_create_frame ()
return (call_frame_t *)frame;
}
-void
-synctask_yield (struct synctask *task)
+
+static void
+__run (struct synctask *task)
{
- struct syncenv *env = NULL;
+ struct syncenv *env = NULL;
env = task->env;
- if (swapcontext (&task->ctx, &env->sched) < 0) {
- gf_log ("syncop", GF_LOG_ERROR,
- "swapcontext failed (%s)", strerror (errno));
- }
-}
-
-
-void
-synctask_yawn (struct synctask *task)
+ list_del_init (&task->all_tasks);
+ switch (task->state) {
+ case SYNCTASK_INIT:
+ break;
+ case SYNCTASK_RUN:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "re-running already running task");
+ env->runcount--;
+ break;
+ case SYNCTASK_WAIT:
+ env->waitcount--;
+ break;
+ case SYNCTASK_DONE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "running completed task");
+ break;
+ }
+
+ list_add_tail (&task->all_tasks, &env->runq);
+ env->runcount++;
+ task->state = SYNCTASK_RUN;
+}
+
+
+static void
+__wait (struct synctask *task)
{
- struct syncenv *env = NULL;
+ struct syncenv *env = NULL;
- env = task->env;
+ env = task->env;
- pthread_mutex_lock (&env->mutex);
- {
- list_del_init (&task->all_tasks);
- list_add (&task->all_tasks, &env->waitq);
- }
- pthread_mutex_unlock (&env->mutex);
+ list_del_init (&task->all_tasks);
+ switch (task->state) {
+ case SYNCTASK_INIT:
+ break;
+ case SYNCTASK_RUN:
+ env->runcount--;
+ break;
+ case SYNCTASK_WAIT:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "re-waiting already waiting task");
+ env->waitcount--;
+ break;
+ case SYNCTASK_DONE:
+ gf_log (task->xl->name, GF_LOG_WARNING,
+ "running completed task");
+ break;
+ }
+
+ list_add_tail (&task->all_tasks, &env->waitq);
+ env->waitcount++;
+ task->state = SYNCTASK_WAIT;
}
void
-synctask_zzzz (struct synctask *task)
+synctask_yield (struct synctask *task)
{
- synctask_yawn (task);
-
- synctask_yield (task);
+ if (swapcontext (&task->ctx, &task->proc->sched) < 0) {
+ gf_log ("syncop", GF_LOG_ERROR,
+ "swapcontext failed (%s)", strerror (errno));
+ }
}
@@ -87,8 +121,10 @@ synctask_wake (struct synctask *task)
pthread_mutex_lock (&env->mutex);
{
- list_del_init (&task->all_tasks);
- list_add_tail (&task->all_tasks, &env->runq);
+ task->woken = 1;
+
+ if (task->slept)
+ __run (task);
}
pthread_mutex_unlock (&env->mutex);
@@ -99,21 +135,17 @@ synctask_wake (struct synctask *task)
void
synctask_wrap (struct synctask *old_task)
{
- int ret;
struct synctask *task = NULL;
/* Do not trust the pointer received. It may be
wrong and can lead to crashes. */
task = synctask_get ();
- ret = task->syncfn (task->opaque);
- task->synccbk (ret, task->frame, task->opaque);
+ task->ret = task->syncfn (task->opaque);
+ if (task->synccbk)
+ task->synccbk (task->ret, task->frame, task->opaque);
- /* cannot destroy @task right here as we are
- in the execution stack of @task itself
- */
- task->complete = 1;
- synctask_wake (task);
+ task->state = SYNCTASK_DONE;
synctask_yield (task);
}
@@ -127,20 +159,42 @@ synctask_destroy (struct synctask *task)
if (task->stack)
FREE (task->stack);
+
+ pthread_mutex_destroy (&task->mutex);
+
+ pthread_cond_destroy (&task->cond);
+
FREE (task);
}
+void
+synctask_done (struct synctask *task)
+{
+ if (task->synccbk) {
+ synctask_destroy (task);
+ return;
+ }
+
+ pthread_mutex_lock (&task->mutex);
+ {
+ task->done = 1;
+ pthread_cond_broadcast (&task->cond);
+ }
+ pthread_mutex_unlock (&task->mutex);
+}
+
+
int
synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
call_frame_t *frame, void *opaque)
{
struct synctask *newtask = NULL;
xlator_t *this = THIS;
+ int ret = 0;
VALIDATE_OR_GOTO (env, err);
VALIDATE_OR_GOTO (fn, err);
- VALIDATE_OR_GOTO (cbk, err);
VALIDATE_OR_GOTO (frame, err);
newtask = CALLOC (1, sizeof (*newtask));
@@ -150,7 +204,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
newtask->env = env;
newtask->xl = this;
newtask->syncfn = fn;
- newtask->synccbk = cbk;
+ newtask->synccbk = cbk;
newtask->opaque = opaque;
newtask->frame = frame;
@@ -175,9 +229,33 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask);
+ newtask->state = SYNCTASK_INIT;
+
+ newtask->slept = 1;
+
+ if (!cbk) {
+ pthread_mutex_init (&newtask->mutex, NULL);
+ pthread_cond_init (&newtask->cond, NULL);
+ newtask->done = 0;
+ }
+
synctask_wake (newtask);
- return 0;
+ if (!cbk) {
+ pthread_mutex_lock (&newtask->mutex);
+ {
+ while (!newtask->done) {
+ pthread_cond_wait (&newtask->cond, &newtask->mutex);
+ }
+ }
+ pthread_mutex_unlock (&newtask->mutex);
+
+ ret = newtask->ret;
+
+ synctask_destroy (newtask);
+ }
+
+ return ret;
err:
if (newtask) {
if (newtask->stack)
@@ -189,10 +267,13 @@ err:
struct synctask *
-syncenv_task (struct syncenv *env)
+syncenv_task (struct syncproc *proc)
{
+ struct syncenv *env = NULL;
struct synctask *task = NULL;
+ env = proc->env;
+
pthread_mutex_lock (&env->mutex);
{
while (list_empty (&env->runq))
@@ -201,6 +282,9 @@ syncenv_task (struct syncenv *env)
task = list_entry (env->runq.next, struct synctask, all_tasks);
list_del_init (&task->all_tasks);
+ env->runcount--;
+
+ task->proc = proc;
}
pthread_mutex_unlock (&env->mutex);
@@ -218,10 +302,29 @@ synctask_switchto (struct synctask *task)
synctask_set (task);
THIS = task->xl;
- if (swapcontext (&env->sched, &task->ctx) < 0) {
+ task->woken = 0;
+ task->slept = 0;
+
+ if (swapcontext (&task->proc->sched, &task->ctx) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
"swapcontext failed (%s)", strerror (errno));
}
+
+ if (task->state == SYNCTASK_DONE) {
+ synctask_done (task);
+ return;
+ }
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ if (task->woken) {
+ __run (task);
+ } else {
+ task->slept = 1;
+ __wait (task);
+ }
+ }
+ pthread_mutex_unlock (&env->mutex);
}
@@ -229,19 +332,18 @@ void *
syncenv_processor (void *thdata)
{
struct syncenv *env = NULL;
+ struct syncproc *proc = NULL;
struct synctask *task = NULL;
- env = thdata;
+ proc = thdata;
+ env = proc->env;
for (;;) {
- task = syncenv_task (env);
-
- if (task->complete) {
- synctask_destroy (task);
- continue;
- }
+ task = syncenv_task (proc);
synctask_switchto (task);
+
+ syncenv_scale (env);
}
return NULL;
@@ -249,6 +351,33 @@ syncenv_processor (void *thdata)
void
+syncenv_scale (struct syncenv *env)
+{
+ int thmax = 0;
+ int i = 0;
+ int ret = 0;
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ if (env->procs > env->runcount)
+ goto unlock;
+
+ thmax = max (env->runcount, SYNCENV_PROC_MAX);
+ for (i = env->procs; i < thmax; i++) {
+ env->proc[i].env = env;
+ ret = pthread_create (&env->proc[i].processor, NULL,
+ syncenv_processor, &env->proc[i]);
+ if (ret)
+ break;
+ env->procs++;
+ }
+ }
+unlock:
+ pthread_mutex_unlock (&env->mutex);
+}
+
+
+void
syncenv_destroy (struct syncenv *env)
{
@@ -260,6 +389,7 @@ syncenv_new (size_t stacksize)
{
struct syncenv *newenv = NULL;
int ret = 0;
+ int i = 0;
newenv = CALLOC (1, sizeof (*newenv));
@@ -276,8 +406,14 @@ syncenv_new (size_t stacksize)
if (stacksize)
newenv->stacksize = stacksize;
- ret = pthread_create (&newenv->processor, NULL,
- syncenv_processor, newenv);
+ for (i = 0; i < SYNCENV_PROC_MIN; i++) {
+ newenv->proc[i].env = newenv;
+ ret = pthread_create (&newenv->proc[i].processor, NULL,
+ syncenv_processor, &newenv->proc[i]);
+ if (ret)
+ break;
+ newenv->procs++;
+ }
if (ret != 0)
syncenv_destroy (newenv);
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h
index 7d8a2cb0230..9554edb7250 100644
--- a/libglusterfs/src/syncop.h
+++ b/libglusterfs/src/syncop.h
@@ -30,8 +30,11 @@
#include <pthread.h>
#include <ucontext.h>
+#define SYNCENV_PROC_MAX 16
+#define SYNCENV_PROC_MIN 2
struct synctask;
+struct syncproc;
struct syncenv;
@@ -40,6 +43,13 @@ typedef int (*synctask_cbk_t) (int ret, call_frame_t *frame, void *opaque);
typedef int (*synctask_fn_t) (void *opaque);
+typedef enum {
+ SYNCTASK_INIT = 0,
+ SYNCTASK_RUN,
+ SYNCTASK_WAIT,
+ SYNCTASK_DONE,
+} synctask_state_t;
+
/* for one sequential execution of @syncfn */
struct synctask {
struct list_head all_tasks;
@@ -48,25 +58,43 @@ struct synctask {
call_frame_t *frame;
synctask_cbk_t synccbk;
synctask_fn_t syncfn;
+ synctask_state_t state;
void *opaque;
void *stack;
+ int woken;
+ int slept;
int complete;
+ int ret;
ucontext_t ctx;
+ struct syncproc *proc;
+
+ pthread_mutex_t mutex; /* for synchronous spawning of synctask */
+ pthread_cond_t cond;
+ int done;
};
-/* hosts the scheduler thread and framework for executing synctasks */
-struct syncenv {
+
+struct syncproc {
pthread_t processor;
+ ucontext_t sched;
+ struct syncenv *env;
struct synctask *current;
+};
+
+/* hosts the scheduler thread and framework for executing synctasks */
+struct syncenv {
+ struct syncproc proc[SYNCENV_PROC_MAX];
+ int procs;
struct list_head runq;
+ int runcount;
struct list_head waitq;
+ int waitcount;
pthread_mutex_t mutex;
pthread_cond_t cond;
- ucontext_t sched;
size_t stacksize;
};
@@ -92,20 +120,6 @@ struct syncargs {
};
-#define __yawn(args) do { \
- struct synctask *task = NULL; \
- \
- task = synctask_get (); \
- if (task) { \
- args->task = task; \
- synctask_yawn (task); \
- } else { \
- pthread_mutex_init (&args->mutex, NULL); \
- pthread_cond_init (&args->cond, NULL); \
- } \
- } while (0)
-
-
#define __yield(args) do { \
if (args->task) { \
synctask_yield (args->task); \
@@ -143,7 +157,6 @@ struct syncargs {
\
frame = syncop_create_frame (); \
\
- __yawn (stb); \
STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, op, params); \
__yield (stb); \
} while (0)
@@ -153,10 +166,10 @@ struct syncargs {
struct syncenv * syncenv_new ();
void syncenv_destroy (struct syncenv *);
+void syncenv_scale (struct syncenv *env);
int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *);
void synctask_zzzz (struct synctask *task);
-void synctask_yawn (struct synctask *task);
void synctask_wake (struct synctask *task);
void synctask_yield (struct synctask *task);