diff options
author | Anand Avati <avati@redhat.com> | 2013-02-21 00:17:26 -0800 |
---|---|---|
committer | Anand Avati <avati@redhat.com> | 2013-02-21 22:57:04 -0800 |
commit | 72ad9a3a8b684595dc394252c88c76c859919a45 (patch) | |
tree | ef6eb57e7d817b9bf3f8f942ea9ba137cd61d16f | |
parent | 1dbe9a05feac5032990457058f7cef686a293973 (diff) |
synctask: support for (assymetric) counted barriers
This patch introduces a new set of primitives:
- synctask_barrier_init (stub)
- synctask_barrier_waitfor (stub, count)
- synctask_barrier_wake (stub)
Unlike pthread_barrier_t, this barrier has an explicit notion of
"waiter" and "waker". The "waiter" waits for @count number of
"wakers" to call synctask_barrier_wake() before returning. The
wait performed by the waiter via synctask_barrier_waitfor() is
co-operative in nature and yields the thread for scheduling other
synctasks in the mean time.
Intended use case:
Eliminate excessive serialization in glusterd and allow for
concurrent RPC transactions.
Code which are currently in this format:
---old---
list_for_each_entry (peerinfo, peers, op_peers_list) {
...
GD_SYNCOP (peerinfo->rpc, stub, rpc_cbk, ...);
}
...
int rpc_cbk (rpc, stub, ...)
{
...
__wake (stub);
}
---old---
Can be restructred into the format:
---new---
synctask_barrier_init (stub);
{
list_for_each_entry (peerinfo, peers, op_peers_list) {
...
rpc_submit (peerinfo->rpc, stub, rpc_cbk, ...);
count++;
}
}
synctask_barrier_wait (stub, count);
...
int rpc_cbk (rpc, stub, ...)
{
...
synctask_barrier_wake (stub);
}
---new---
In the above structure, from the synctask's point of view, the region
between synctask_barrier_init() and synctask_barrier_wait() are spawning
off asynchronous "threads" (or RPC) and keep count of how many such
threads have been spawned. Each of those threads are expected to make
one call to synctask_barrier_wake(). The call to synctask_barrier_wait()
makes the synctask thread co-operatively wait/sleep till @count such threads
call their wake function.
This way, the synctask thread retains the "synchronous" flow in the code,
yet at the same time allows for asynchronous "threads" to acheive parallelism
over RPC.
Change-Id: Ie037f99b2d306b71e63e3a56353daec06fb0bf41
BUG: 913662
Signed-off-by: Anand Avati <avati@redhat.com>
Reviewed-on: http://review.gluster.org/4558
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Krishnan Parthasarathi <kparthas@redhat.com>
Tested-by: Krishnan Parthasarathi <kparthas@redhat.com>
-rw-r--r-- | libglusterfs/src/syncop.c | 41 | ||||
-rw-r--r-- | libglusterfs/src/syncop.h | 51 |
2 files changed, 69 insertions, 23 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index c996b8fdd01..115debbfbf7 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -80,15 +80,24 @@ __wait (struct synctask *task) void -synctask_yield (struct synctask *task) +synctask_waitfor (struct synctask *task, int waitfor) { + struct syncenv *env = NULL; xlator_t *oldTHIS = THIS; + env = task->env; + #if defined(__NetBSD__) && defined(_UC_TLSBASE) /* Preserve pthread private pointer through swapcontex() */ task->proc->sched.uc_flags &= ~_UC_TLSBASE; #endif + pthread_mutex_lock (&env->mutex); + { + task->waitfor = waitfor; + } + pthread_mutex_unlock (&env->mutex); + if (swapcontext (&task->ctx, &task->proc->sched) < 0) { gf_log ("syncop", GF_LOG_ERROR, "swapcontext failed (%s)", strerror (errno)); @@ -99,6 +108,29 @@ synctask_yield (struct synctask *task) void +synctask_yield (struct synctask *task) +{ + synctask_waitfor (task, 1); +} + + +void +synctask_yawn (struct synctask *task) +{ + struct syncenv *env = NULL; + + env = task->env; + + pthread_mutex_lock (&env->mutex); + { + task->woken = 0; + task->waitfor = 0; + } + pthread_mutex_unlock (&env->mutex); +} + + +void synctask_wake (struct synctask *task) { struct syncenv *env = NULL; @@ -107,9 +139,9 @@ synctask_wake (struct synctask *task) pthread_mutex_lock (&env->mutex); { - task->woken = 1; + task->woken++; - if (task->slept) + if (task->slept && task->woken >= task->waitfor) __run (task); } pthread_mutex_unlock (&env->mutex); @@ -338,6 +370,7 @@ synctask_switchto (struct synctask *task) task->woken = 0; task->slept = 0; + task->waitfor = 0; #if defined(__NetBSD__) && defined(_UC_TLSBASE) /* Preserve pthread private pointer through swapcontex() */ @@ -356,7 +389,7 @@ synctask_switchto (struct synctask *task) pthread_mutex_lock (&env->mutex); { - if (task->woken) { + if (task->woken >= task->waitfor) { __run (task); } else { task->slept = 1; diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index 001c68ff5f0..ba0440cd790 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -57,6 +57,7 @@ struct synctask { void *stack; int woken; int slept; + int waitfor; int ret; uid_t uid; @@ -118,15 +119,18 @@ struct syncargs { struct synctask *task; pthread_mutex_t mutex; pthread_cond_t cond; - int done; + int wakecnt; }; #define __yawn(args) do { \ - if (!args->task) { \ + args->task = synctask_get (); \ + if (args->task) { \ + synctask_yawn (args->task); \ + } else { \ pthread_mutex_init (&args->mutex, NULL); \ pthread_cond_init (&args->cond, NULL); \ - args->done = 0; \ + args->wakecnt = 0; \ } \ } while (0) @@ -137,7 +141,7 @@ struct syncargs { } else { \ pthread_mutex_lock (&args->mutex); \ { \ - args->done = 1; \ + args->wakecnt++; \ pthread_cond_signal (&args->cond); \ } \ pthread_mutex_unlock (&args->mutex); \ @@ -145,21 +149,24 @@ struct syncargs { } while (0) -#define __yield(args) do { \ - if (args->task) { \ - synctask_yield (args->task); \ - } else { \ - pthread_mutex_lock (&args->mutex); \ - { \ - while (!args->done) \ - pthread_cond_wait (&args->cond, \ - &args->mutex); \ - } \ - pthread_mutex_unlock (&args->mutex); \ - pthread_mutex_destroy (&args->mutex); \ - pthread_cond_destroy (&args->cond); \ - } \ - } while (0) +#define __waitfor(args, cnt) do { \ + if (args->task) { \ + synctask_waitfor (args->task, cnt); \ + } else { \ + pthread_mutex_lock (&args->mutex); \ + { \ + while (args->wakecnt < cnt) \ + pthread_cond_wait (&args->cond, \ + &args->mutex); \ + } \ + pthread_mutex_unlock (&args->mutex); \ + pthread_mutex_destroy (&args->mutex); \ + pthread_cond_destroy (&args->cond); \ + } \ + } while (0) + + +#define __yield(args) __waitfor(args, 1) #define SYNCOP(subvol, stb, cbk, op, params ...) do { \ @@ -202,6 +209,12 @@ void syncenv_scale (struct syncenv *env); int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *); void synctask_wake (struct synctask *task); void synctask_yield (struct synctask *task); +void synctask_yawn (struct synctask *task); +void synctask_waitfor (struct synctask *task, int count); + +#define synctask_barrier_init(args) __yawn (args) +#define synctask_barrier_wait(args, n) __waitfor (args, n) +#define synctask_barrier_wake(args) __wake (args) int synctask_setid (struct synctask *task, uid_t uid, gid_t gid); #define SYNCTASK_SETID(uid, gid) synctask_setid (synctask_get(), uid, gid); |