summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libglusterfs/src/syncop.c43
-rw-r--r--libglusterfs/src/syncop.h155
2 files changed, 122 insertions, 76 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index b15ee31ba..a8faa65e0 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -80,15 +80,24 @@ __wait (struct synctask *task)
void
-synctask_yield (struct synctask *task)
+synctask_waitfor (struct synctask *task, int waitfor)
{
- xlator_t *oldTHIS = THIS;
+ struct syncenv *env = NULL;
+ xlator_t *oldTHIS = THIS;
+
+ env = task->env;
#if defined(__NetBSD__) && defined(_UC_TLSBASE)
/* Preserve pthread private pointer through swapcontex() */
task->proc->sched.uc_flags &= ~_UC_TLSBASE;
#endif
+ pthread_mutex_lock (&env->mutex);
+ {
+ task->waitfor = waitfor;
+ }
+ pthread_mutex_unlock (&env->mutex);
+
if (swapcontext (&task->ctx, &task->proc->sched) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
"swapcontext failed (%s)", strerror (errno));
@@ -99,6 +108,29 @@ synctask_yield (struct synctask *task)
void
+synctask_yield (struct synctask *task)
+{
+ synctask_waitfor (task, 1);
+}
+
+
+void
+synctask_yawn (struct synctask *task)
+{
+ struct syncenv *env = NULL;
+
+ env = task->env;
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ task->woken = 0;
+ task->waitfor = 0;
+ }
+ pthread_mutex_unlock (&env->mutex);
+}
+
+
+void
synctask_wake (struct synctask *task)
{
struct syncenv *env = NULL;
@@ -107,9 +139,9 @@ synctask_wake (struct synctask *task)
pthread_mutex_lock (&env->mutex);
{
- task->woken = 1;
+ task->woken++;
- if (task->slept)
+ if (task->slept && task->woken >= task->waitfor)
__run (task);
}
pthread_mutex_unlock (&env->mutex);
@@ -338,6 +370,7 @@ synctask_switchto (struct synctask *task)
task->woken = 0;
task->slept = 0;
+ task->waitfor = 0;
#if defined(__NetBSD__) && defined(_UC_TLSBASE)
/* Preserve pthread private pointer through swapcontex() */
@@ -356,7 +389,7 @@ synctask_switchto (struct synctask *task)
pthread_mutex_lock (&env->mutex);
{
- if (task->woken) {
+ if (task->woken >= task->waitfor) {
__run (task);
} else {
task->slept = 1;
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h
index 37e2b0e28..d4086291a 100644
--- a/libglusterfs/src/syncop.h
+++ b/libglusterfs/src/syncop.h
@@ -36,11 +36,11 @@ typedef int (*synctask_fn_t) (void *opaque);
typedef enum {
- SYNCTASK_INIT = 0,
- SYNCTASK_RUN,
+ SYNCTASK_INIT = 0,
+ SYNCTASK_RUN,
SYNCTASK_SUSPEND,
- SYNCTASK_WAIT,
- SYNCTASK_DONE,
+ SYNCTASK_WAIT,
+ SYNCTASK_DONE,
} synctask_state_t;
/* for one sequential execution of @syncfn */
@@ -52,22 +52,23 @@ struct synctask {
call_frame_t *opframe;
synctask_cbk_t synccbk;
synctask_fn_t syncfn;
- synctask_state_t state;
+ synctask_state_t state;
void *opaque;
void *stack;
int woken;
int slept;
- int ret;
+ int waitfor;
+ int ret;
- uid_t uid;
- gid_t gid;
+ uid_t uid;
+ gid_t gid;
ucontext_t ctx;
- struct syncproc *proc;
+ struct syncproc *proc;
- pthread_mutex_t mutex; /* for synchronous spawning of synctask */
- pthread_cond_t cond;
- int done;
+ pthread_mutex_t mutex; /* for synchronous spawning of synctask */
+ pthread_cond_t cond;
+ int done;
};
@@ -116,79 +117,85 @@ struct syncargs {
/* do not touch */
struct synctask *task;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
- int done;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int wakecnt;
};
-#define __yawn(args) do { \
- if (!args->task) { \
- pthread_mutex_init (&args->mutex, NULL); \
- pthread_cond_init (&args->cond, NULL); \
- args->done = 0; \
- } \
- } while (0)
-
-
-#define __wake(args) do { \
- if (args->task) { \
- synctask_wake (args->task); \
- } else { \
- pthread_mutex_lock (&args->mutex); \
- { \
- args->done = 1; \
- pthread_cond_signal (&args->cond); \
- } \
- pthread_mutex_unlock (&args->mutex); \
- } \
- } while (0)
-
-
-#define __yield(args) do { \
- if (args->task) { \
- synctask_yield (args->task); \
- } else { \
- pthread_mutex_lock (&args->mutex); \
- { \
- while (!args->done) \
- pthread_cond_wait (&args->cond, \
- &args->mutex); \
- } \
- pthread_mutex_unlock (&args->mutex); \
- pthread_mutex_destroy (&args->mutex); \
- pthread_cond_destroy (&args->cond); \
- } \
- } while (0)
+#define __yawn(args) do { \
+ args->task = synctask_get (); \
+ if (args->task) { \
+ synctask_yawn (args->task); \
+ } else { \
+ pthread_mutex_init (&args->mutex, NULL); \
+ pthread_cond_init (&args->cond, NULL); \
+ args->wakecnt = 0; \
+ } \
+ } while (0)
+
+
+#define __wake(args) do { \
+ if (args->task) { \
+ synctask_wake (args->task); \
+ } else { \
+ pthread_mutex_lock (&args->mutex); \
+ { \
+ args->wakecnt++; \
+ pthread_cond_signal (&args->cond); \
+ } \
+ pthread_mutex_unlock (&args->mutex); \
+ } \
+ } while (0)
+
+
+#define __waitfor(args, cnt) do { \
+ if (args->task) { \
+ synctask_waitfor (args->task, cnt); \
+ } else { \
+ pthread_mutex_lock (&args->mutex); \
+ { \
+ while (args->wakecnt < cnt) \
+ pthread_cond_wait (&args->cond, \
+ &args->mutex); \
+ } \
+ pthread_mutex_unlock (&args->mutex); \
+ pthread_mutex_destroy (&args->mutex); \
+ pthread_cond_destroy (&args->cond); \
+ } \
+ } while (0)
+
+
+#define __yield(args) __waitfor(args, 1)
#define SYNCOP(subvol, stb, cbk, op, params ...) do { \
struct synctask *task = NULL; \
- call_frame_t *frame = NULL; \
+ call_frame_t *frame = NULL; \
\
task = synctask_get (); \
stb->task = task; \
- if (task) \
- frame = task->opframe; \
- else \
- frame = create_frame (THIS, THIS->ctx->pool); \
+ if (task) \
+ frame = task->opframe; \
+ else \
+ frame = create_frame (THIS, THIS->ctx->pool); \
if (task) { \
frame->root->uid = task->uid; \
frame->root->gid = task->gid; \
} \
- \
- __yawn (stb); \
\
- STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, \
- op, params); \
- if (task) \
- task->state = SYNCTASK_SUSPEND; \
- \
- __yield (stb); \
- if (task) \
- STACK_RESET (frame->root); \
- else \
- STACK_DESTROY (frame->root); \
+ __yawn (stb); \
+ \
+ STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, \
+ op, params); \
+ if (task) \
+ task->state = SYNCTASK_SUSPEND; \
+ \
+ __yield (stb); \
+ if (task) \
+ STACK_RESET (frame->root); \
+ else \
+ STACK_DESTROY (frame->root); \
} while (0)
@@ -201,6 +208,12 @@ void syncenv_scale (struct syncenv *env);
int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *);
void synctask_wake (struct synctask *task);
void synctask_yield (struct synctask *task);
+void synctask_yawn (struct synctask *task);
+void synctask_waitfor (struct synctask *task, int count);
+
+#define synctask_barrier_init(args) __yawn (args)
+#define synctask_barrier_wait(args, n) __waitfor (args, n)
+#define synctask_barrier_wake(args) __wake (args)
int synctask_setid (struct synctask *task, uid_t uid, gid_t gid);
#define SYNCTASK_SETID(uid, gid) synctask_setid (synctask_get(), uid, gid);
@@ -264,7 +277,7 @@ int syncop_fstat (xlator_t *subvol, fd_t *fd, struct iatt *stbuf);
int syncop_stat (xlator_t *subvol, loc_t *loc, struct iatt *stbuf);
int syncop_symlink (xlator_t *subvol, loc_t *loc, const char *newpath,
- dict_t *dict);
+ dict_t *dict);
int syncop_readlink (xlator_t *subvol, loc_t *loc, char **buffer, size_t size);
int syncop_mknod (xlator_t *subvol, loc_t *loc, mode_t mode, dev_t rdev,
dict_t *dict);