diff options
| author | Krishnan Parthasarathi <kparthas@redhat.com> | 2013-03-06 11:45:48 +0530 | 
|---|---|---|
| committer | Anand Avati <avati@redhat.com> | 2013-03-06 21:32:34 -0800 | 
| commit | 6f2dc529faba92f10a5fee618bed05ebf752ef9e (patch) | |
| tree | babf8cd8f1cb2234b84b4eae2379af6eb080a1e2 /libglusterfs/src/syncop.h | |
| parent | e52dc374bce30ed8a223e89628324ae18f433b96 (diff) | |
synctask: support for (assymetric) counted barriers
[Backport of Avati's patch on master - http://review.gluster.org/4558]
This patch introduces a new set of primitives:
  - synctask_barrier_init (stub)
  - synctask_barrier_waitfor (stub, count)
  - synctask_barrier_wake (stub)
Unlike pthread_barrier_t, this barrier has an explicit notion of
"waiter" and "waker". The "waiter" waits for @count number of
"wakers" to call synctask_barrier_wake() before returning. The
wait performed by the waiter via synctask_barrier_waitfor() is
co-operative in nature and yields the thread for scheduling other
synctasks in the mean time.
Intended use case:
  Eliminate excessive serialization in glusterd and allow for
concurrent RPC transactions.
  Code which are currently in this format:
---old---
  list_for_each_entry (peerinfo, peers, op_peers_list) {
          ...
          GD_SYNCOP (peerinfo->rpc, stub, rpc_cbk, ...);
  }
  ...
  int rpc_cbk (rpc, stub, ...)
  {
          ...
          __wake (stub);
  }
---old---
  Can be restructred into the format:
---new---
  synctask_barrier_init (stub);
  {
          list_for_each_entry (peerinfo, peers, op_peers_list) {
                  ...
                  rpc_submit (peerinfo->rpc, stub, rpc_cbk, ...);
                  count++;
           }
   }
   synctask_barrier_wait (stub, count);
   ...
   int rpc_cbk (rpc, stub, ...)
   {
           ...
           synctask_barrier_wake (stub);
   }
---new---
  In the above structure, from the synctask's point of view, the region
between synctask_barrier_init() and synctask_barrier_wait() are spawning
off asynchronous "threads" (or RPC) and keep count of how many such
threads have been spawned. Each of those threads are expected to make
one call to synctask_barrier_wake(). The call to synctask_barrier_wait()
makes the synctask thread co-operatively wait/sleep till @count such threads
call their wake function.
  This way, the synctask thread retains the "synchronous" flow in the code,
yet at the same time allows for asynchronous "threads" to acheive parallelism
over RPC.
Change-Id: Ie037f99b2d306b71e63e3a56353daec06fb0bf41
BUG: 913662
Signed-off-by: Anand Avati <avati@redhat.com>
Signed-off-by: Krishnan Parthasarathi <kparthas@redhat.com>
Reviewed-on: http://review.gluster.org/4636
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'libglusterfs/src/syncop.h')
| -rw-r--r-- | libglusterfs/src/syncop.h | 155 | 
1 files changed, 84 insertions, 71 deletions
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index 37e2b0e288a..d4086291a47 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -36,11 +36,11 @@ typedef int (*synctask_fn_t) (void *opaque);  typedef enum { -	SYNCTASK_INIT = 0, -	SYNCTASK_RUN, +        SYNCTASK_INIT = 0, +        SYNCTASK_RUN,          SYNCTASK_SUSPEND, -	SYNCTASK_WAIT, -	SYNCTASK_DONE, +        SYNCTASK_WAIT, +        SYNCTASK_DONE,  } synctask_state_t;  /* for one sequential execution of @syncfn */ @@ -52,22 +52,23 @@ struct synctask {          call_frame_t       *opframe;          synctask_cbk_t      synccbk;          synctask_fn_t       syncfn; -	synctask_state_t    state; +        synctask_state_t    state;          void               *opaque;          void               *stack;          int                 woken;          int                 slept; -	int                 ret; +        int                 waitfor; +        int                 ret; -	uid_t               uid; -	gid_t               gid; +        uid_t               uid; +        gid_t               gid;          ucontext_t          ctx; -	struct syncproc    *proc; +        struct syncproc    *proc; -	pthread_mutex_t     mutex; /* for synchronous spawning of synctask */ -	pthread_cond_t      cond; -	int                 done; +        pthread_mutex_t     mutex; /* for synchronous spawning of synctask */ +        pthread_cond_t      cond; +        int                 done;  }; @@ -116,79 +117,85 @@ struct syncargs {          /* do not touch */          struct synctask    *task; -	pthread_mutex_t     mutex; -	pthread_cond_t      cond; -	int                 done; +        pthread_mutex_t     mutex; +        pthread_cond_t      cond; +        int                 wakecnt;  }; -#define __yawn(args) do {					\ -	if (!args->task) {					\ -		pthread_mutex_init (&args->mutex, NULL);	\ -		pthread_cond_init (&args->cond, NULL);		\ -		args->done = 0;					\ -	}							\ -	} while (0) - - -#define __wake(args) do {					\ -	if (args->task) {					\ -		synctask_wake (args->task);			\ -	} else {						\ -		pthread_mutex_lock (&args->mutex);		\ -		{						\ -			args->done = 1;				\ -			pthread_cond_signal (&args->cond);	\ -		}						\ -		pthread_mutex_unlock (&args->mutex);		\ -	}							\ -	} while (0) - - -#define __yield(args) do {						\ -	if (args->task) {						\ -		synctask_yield (args->task);				\ -	} else {							\ -		pthread_mutex_lock (&args->mutex);			\ -		{							\ -			while (!args->done)				\ -				pthread_cond_wait (&args->cond,		\ -						   &args->mutex);	\ -		}							\ -		pthread_mutex_unlock (&args->mutex);			\ -		pthread_mutex_destroy (&args->mutex);			\ -		pthread_cond_destroy (&args->cond);			\ -	}								\ -	} while (0) +#define __yawn(args) do {                                       \ +        args->task = synctask_get ();                           \ +        if (args->task) {                                       \ +                synctask_yawn (args->task);                     \ +        } else {                                                \ +                pthread_mutex_init (&args->mutex, NULL);        \ +                pthread_cond_init (&args->cond, NULL);          \ +                args->wakecnt = 0;                              \ +        }                                                       \ +        } while (0) + + +#define __wake(args) do {                                       \ +        if (args->task) {                                       \ +                synctask_wake (args->task);                     \ +        } else {                                                \ +                pthread_mutex_lock (&args->mutex);              \ +                {                                               \ +                        args->wakecnt++;                        \ +                        pthread_cond_signal (&args->cond);      \ +                }                                               \ +                pthread_mutex_unlock (&args->mutex);            \ +        }                                                       \ +        } while (0) + + +#define __waitfor(args, cnt) do {                                       \ +        if (args->task) {                                               \ +                synctask_waitfor (args->task, cnt);                     \ +        } else {                                                        \ +                pthread_mutex_lock (&args->mutex);                      \ +                {                                                       \ +                        while (args->wakecnt < cnt)                     \ +                                pthread_cond_wait (&args->cond,         \ +                                                   &args->mutex);       \ +                }                                                       \ +                pthread_mutex_unlock (&args->mutex);                    \ +                pthread_mutex_destroy (&args->mutex);                   \ +                pthread_cond_destroy (&args->cond);                     \ +        }                                                               \ +        } while (0) + + +#define __yield(args) __waitfor(args, 1)  #define SYNCOP(subvol, stb, cbk, op, params ...) do {                   \                  struct  synctask        *task = NULL;                   \ -		call_frame_t            *frame = NULL;			\ +                call_frame_t            *frame = NULL;                  \                                                                          \                  task = synctask_get ();                                 \                  stb->task = task;                                       \ -		if (task)						\ -			frame = task->opframe;				\ -		else							\ -			frame = create_frame (THIS, THIS->ctx->pool);	\ +                if (task)                                               \ +                        frame = task->opframe;                          \ +                else                                                    \ +                        frame = create_frame (THIS, THIS->ctx->pool);   \                  if (task) {                                             \                          frame->root->uid = task->uid;                   \                          frame->root->gid = task->gid;                   \                  }                                                       \ -									\ -		__yawn (stb);						\                                                                          \ -                STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol,	\ -				   op, params);				\ -		if (task)						\ -			task->state = SYNCTASK_SUSPEND;			\ -									\ -                __yield (stb);						\ -		if (task)						\ -			STACK_RESET (frame->root);			\ -		else							\ -			STACK_DESTROY (frame->root);			\ +                __yawn (stb);                                           \ +                                                                        \ +                STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol,     \ +                                   op, params);                         \ +                if (task)                                               \ +                        task->state = SYNCTASK_SUSPEND;                 \ +                                                                        \ +                __yield (stb);                                          \ +                if (task)                                               \ +                        STACK_RESET (frame->root);                      \ +                else                                                    \ +                        STACK_DESTROY (frame->root);                    \          } while (0) @@ -201,6 +208,12 @@ void syncenv_scale (struct syncenv *env);  int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *);  void synctask_wake (struct synctask *task);  void synctask_yield (struct synctask *task); +void synctask_yawn (struct synctask *task); +void synctask_waitfor (struct synctask *task, int count); + +#define synctask_barrier_init(args) __yawn (args) +#define synctask_barrier_wait(args, n) __waitfor (args, n) +#define synctask_barrier_wake(args) __wake (args)  int synctask_setid (struct synctask *task, uid_t uid, gid_t gid);  #define SYNCTASK_SETID(uid, gid) synctask_setid (synctask_get(), uid, gid); @@ -264,7 +277,7 @@ int syncop_fstat (xlator_t *subvol, fd_t *fd, struct iatt *stbuf);  int syncop_stat (xlator_t *subvol, loc_t *loc, struct iatt *stbuf);  int syncop_symlink (xlator_t *subvol, loc_t *loc, const char *newpath, -		    dict_t *dict); +                    dict_t *dict);  int syncop_readlink (xlator_t *subvol, loc_t *loc, char **buffer, size_t size);  int syncop_mknod (xlator_t *subvol, loc_t *loc, mode_t mode, dev_t rdev,                    dict_t *dict);  | 
