diff options
| author | Pavan Sondur <pavan@gluster.com> | 2010-07-25 18:13:08 +0000 | 
|---|---|---|
| committer | Anand V. Avati <avati@dev.gluster.com> | 2010-07-28 23:58:13 -0700 | 
| commit | fac3ff8bfb3958a3bdc34dc9bff7cb281597e40f (patch) | |
| tree | ed16cfdbdb386ef81f957533f9d563076da953c7 | |
| parent | cadd1256355f4e1a5bd43c3a71dbd6cb97dff66d (diff) | |
syncop: initial implementation
Resending Avati's syncop patch with a few bug fixes.
(please do not skip the IMPORTANT NOTES section)
* Framework for SYNChronous OPerations
--------------------------------------
This patch provides a framework for performing synchronous operations
over the underlying actual asynchronous GlusterFS FOPS.
* Use cases
-----------
1. Convenient implementation of crawler thread in replicate/pump
2. Convenient implementation of high level control flow in DVM
* Background
------------
All (almost) threads in GlusterFS are hosts for executing aysnchronous
file operations using the STACK_WIND and STACK_UNWIND primitives - as calls
and callbacks.
While the STACK_WIND and STACK_UNWIND macros provide high control for
efficiently implementing file operations in a clustered/parallel environment,
there are tasks where the nature of the task itself is sequential and
the execution performance of the task is not critical. In these cases the
complexity to implement the task with STACK_WIND/STACK_UNWIND based operations
as calls and callbacks is an overkill.
* Introduction
---------------
syncop: are wrappers around the STACK_WIND/STACK_UNWIND based asynchronous fops.
synctask: a sequential task (a C function) which uses syncops.
syncenv: an environement to schedule and execute synctasks.
The synchronicity is implemented via ucontext.h based continuations.
Execution of synchronous tasks is possible only in a synchronous environment.
Therefore, the first step  is to create such an environment -
        struct syncenv *env = syncenv_new (0);
This creates a synchronous environment, with a thread (scheduler) to host the
synchronous tasks. Creation of this environment is generally to be done at the
time of process initialization. Next is to spawn a synchronous task in this
environment -
        int slow_self_heal (void *data);
        int completion_func (int ret, void *data);
        ret = synctask_new (env, slow_self_heal, completion_func, data);
Here slow_self_heal is a task which is implemented using synchronous operations.
When slow_self_heal() completes, completion_func() is called with the first
parameter as the return value of slow_self_heal(). Both these functions get
the @data argument as the same value passed to synctask_new().
        int
        slow_self_heal (void *data)
        {
            xlator_t *child = FIRST_CHILD (THIS);
            fd_t     *dir = NULL;
            ...
            dir = syncop_opendir (child, loc);
            entry = syncop_readdir (dir);
            ...
            return ret;
        }
* IMPORTANT NOTES
-----------------
- calling syncops in code executing outside the synchronous environment will
  very likely cause and undesired blocking of the executing thread leading to
  deadlocks!!
  The synchronous environment is a special thread where such sleeps are safe,
  and these sleeps result in the scheduler to 'swap in' other synctasks.
- syncops can put the task to sleep. DO NOT issue syncops while holding mutexes.
  This is very similar to the blunder of holding a mutex and doing STACK_WIND.
- It works best when synctasks use only syncops. If a call_frame is created and
  STACK_WIND'ed, the callback would very likely happen in a thread outside the
  synchronous enviroment, at an undefined time - as expected. So note that the
  synchronous environment does not tame the notorious behaviour of STACK_WIND.
Signed-off-by: Anand V. Avati <avati@blackhole.gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 971 (dynamic volume management)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=971
| -rw-r--r-- | libglusterfs/src/Makefile.am | 4 | ||||
| -rw-r--r-- | libglusterfs/src/globals.c | 43 | ||||
| -rw-r--r-- | libglusterfs/src/globals.h | 5 | ||||
| -rw-r--r-- | libglusterfs/src/syncop.c | 350 | ||||
| -rw-r--r-- | libglusterfs/src/syncop.h | 164 | 
5 files changed, 564 insertions, 2 deletions
| diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 7a13d6955dd..f6c2cf8555b 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -6,9 +6,9 @@ libglusterfs_la_LIBADD = @LEXLIB@  lib_LTLIBRARIES = libglusterfs.la -libglusterfs_la_SOURCES = dict.c graph.lex.c y.tab.c xlator.c logging.c  hashfn.c defaults.c common-utils.c timer.c inode.c call-stub.c compat.c fd.c compat-errno.c event.c mem-pool.c gf-dirent.c syscall.c iobuf.c globals.c statedump.c stack.c checksum.c $(CONTRIBDIR)/md5/md5.c $(CONTRIBDIR)/rbtree/rb.c rbthash.c latency.c graph.c $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/tst_uuid.c $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c +libglusterfs_la_SOURCES = dict.c graph.lex.c y.tab.c xlator.c logging.c  hashfn.c defaults.c common-utils.c timer.c inode.c call-stub.c compat.c fd.c compat-errno.c event.c mem-pool.c gf-dirent.c syscall.c iobuf.c globals.c statedump.c stack.c checksum.c $(CONTRIBDIR)/md5/md5.c $(CONTRIBDIR)/rbtree/rb.c rbthash.c latency.c graph.c $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/tst_uuid.c $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c syncop.c -noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h logging.h  xlator.h  stack.h timer.h list.h inode.h call-stub.h compat.h fd.h revision.h compat-errno.h event.h mem-pool.h byte-order.h gf-dirent.h locking.h syscall.h iobuf.h globals.h statedump.h checksum.h $(CONTRIBDIR)/md5/md5.h $(CONTRIBDIR)/rbtree/rb.h rbthash.h iatt.h latency.h mem-types.h $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIBDIR)/uuid/uuid_types.h +noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h logging.h  xlator.h  stack.h timer.h list.h inode.h call-stub.h compat.h fd.h revision.h compat-errno.h event.h mem-pool.h byte-order.h gf-dirent.h locking.h syscall.h iobuf.h globals.h statedump.h checksum.h $(CONTRIBDIR)/md5/md5.h $(CONTRIBDIR)/rbtree/rb.h rbthash.h iatt.h latency.h mem-types.h $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIBDIR)/uuid/uuid_types.h syncop.h  EXTRA_DIST = graph.l graph.y diff --git a/libglusterfs/src/globals.c b/libglusterfs/src/globals.c index c0040db1448..e845b3dcb66 100644 --- a/libglusterfs/src/globals.c +++ b/libglusterfs/src/globals.c @@ -267,6 +267,45 @@ glusterfs_central_log_flag_unset ()  } + +/* SYNCTASK */ + +static pthread_key_t synctask_key; + + +int +synctask_init () +{ +        int  ret = 0; + +        ret = pthread_key_create (&synctask_key, NULL); + +        return ret; +} + + +void * +synctask_get () +{ +        void   *synctask = NULL; + +        synctask = pthread_getspecific (synctask_key); + +        return synctask; +} + + +int +synctask_set (void *synctask) +{ +        int     ret = 0; + +        pthread_setspecific (synctask_key, synctask); + +        return ret; +} + +  int  glusterfs_globals_init ()  { @@ -288,6 +327,10 @@ glusterfs_globals_init ()          gf_mem_acct_enable_set (); +        ret = synctask_init (); +        if (ret) +                goto out; +  out:          return ret;  } diff --git a/libglusterfs/src/globals.h b/libglusterfs/src/globals.h index e09c695113c..bdd9e891046 100644 --- a/libglusterfs/src/globals.h +++ b/libglusterfs/src/globals.h @@ -42,10 +42,15 @@ xlator_t **__glusterfs_this_location ();  xlator_t *glusterfs_this_get ();  int glusterfs_this_set (xlator_t *); +/* central log */ +  void glusterfs_central_log_flag_set ();  long glusterfs_central_log_flag_get ();  void glusterfs_central_log_flag_unset (); +/* task */ +void *synctask_get (); +int synctask_set (void *);  /* init */  int glusterfs_globals_init (void); diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c new file mode 100644 index 00000000000..beb5d9db4a1 --- /dev/null +++ b/libglusterfs/src/syncop.c @@ -0,0 +1,350 @@ +/* +   Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +   This file is part of GlusterFS. + +   GlusterFS is free software; you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published +   by the Free Software Foundation; either version 3 of the License, +   or (at your option) any later version. + +   GlusterFS is distributed in the hope that it will be useful, but +   WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +   General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see +   <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "syncop.h" + + +void +synctask_yield (struct synctask *task) +{ +        struct syncenv   *env = NULL; + +        env = task->env; + +        if (swapcontext (&task->ctx, &env->sched) < 0) { +                gf_log ("syncop", GF_LOG_ERROR, +                        "swapcontext failed (%s)", strerror (errno)); +        } +} + + +void +synctask_yawn (struct synctask *task) +{ +        struct syncenv  *env = NULL; + +        env  = task->env; + +        pthread_mutex_lock (&env->mutex); +        { +                list_del_init (&task->all_tasks); +                list_add (&task->all_tasks, &env->waitq); +        } +        pthread_mutex_unlock (&env->mutex); +} + + +void +synctask_zzzz (struct synctask *task) +{ +        synctask_yawn (task); + +        synctask_yield (task); +} + + +void +synctask_wake (struct synctask *task) +{ +        struct syncenv *env = NULL; + +        env = task->env; + +        pthread_mutex_lock (&env->mutex); +        { +                list_del_init (&task->all_tasks); +                list_add_tail (&task->all_tasks, &env->runq); +        } +        pthread_mutex_unlock (&env->mutex); + +        pthread_cond_broadcast (&env->cond); +} + + +void +synctask_wrap (struct synctask *task) +{ +        int              ret; + +        ret = task->syncfn (task->opaque); +        task->synccbk (ret, task->opaque); + +        /* cannot destroy @task right here as we are +           in the execution stack of @task itself +        */ +        task->complete = 1; +        synctask_wake (task); +} + + +void +synctask_destroy (struct synctask *task) +{ +        if (!task) +                return; + +        if (task->stack) +                FREE (task); +        FREE (task); +} + + +int +synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, +              void *opaque) +{ +        struct synctask *newtask = NULL; + +        newtask = CALLOC (1, sizeof (*newtask)); +        if (!newtask) +                return -ENOMEM; + +        newtask->env        = env; +        newtask->xl         = THIS; +        newtask->syncfn     = fn; +        newtask->synccbk    = cbk; +        newtask->opaque     = opaque; + +        INIT_LIST_HEAD (&newtask->all_tasks); + +        if (getcontext (&newtask->ctx) < 0) { +                gf_log ("syncop", GF_LOG_ERROR, +                        "getcontext failed (%s)", +                        strerror (errno)); +                goto err; +        } + +        newtask->stack = CALLOC (1, env->stacksize); +        if (!newtask->stack) { +                gf_log ("syncop", GF_LOG_ERROR, +                        "out of memory for stack"); +                goto err; +        } + +        newtask->ctx.uc_stack.ss_sp   = newtask->stack; +        newtask->ctx.uc_stack.ss_size = env->stacksize; + +        makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask); + +        synctask_wake (newtask); + +        return 0; +err: +        if (newtask) { +                if (newtask->stack) +                        FREE (newtask->stack); +                FREE (newtask); +        } +        return -1; +} + + +struct synctask * +syncenv_task (struct syncenv *env) +{ +        struct synctask  *task = NULL; + +        pthread_mutex_lock (&env->mutex); +        { +                while (list_empty (&env->runq)) +                        pthread_cond_wait (&env->cond, &env->mutex); + +                task = list_entry (env->runq.next, struct synctask, all_tasks); + +                list_del_init (&task->all_tasks); +        } +        pthread_mutex_unlock (&env->mutex); + +        return task; +} + + +void +synctask_switchto (struct synctask *task) +{ +        struct syncenv *env = NULL; + +        env = task->env; + +        synctask_set (task); +        THIS = task->xl; + +        if (swapcontext (&env->sched, &task->ctx) < 0) { +                gf_log ("syncop", GF_LOG_ERROR, +                        "swapcontext failed (%s)", strerror (errno)); +        } +} + + +void * +syncenv_processor (void *thdata) +{ +        struct syncenv  *env = NULL; +        struct synctask *task = NULL; + +        env = thdata; + +        for (;;) { +                task = syncenv_task (env); + +                if (task->complete) { +                        synctask_destroy (task); +                        continue; +                } + +                synctask_switchto (task); +        } + +        return NULL; +} + + +void +syncenv_destroy (struct syncenv *env) +{ + +} + + +struct syncenv * +syncenv_new (size_t stacksize) +{ +        struct syncenv *newenv = NULL; +        int             ret = 0; + +        newenv = CALLOC (1, sizeof (*newenv)); + +        if (!newenv) +                return NULL; + +        pthread_mutex_init (&newenv->mutex, NULL); +        pthread_cond_init (&newenv->cond, NULL); + +        INIT_LIST_HEAD (&newenv->runq); +        INIT_LIST_HEAD (&newenv->waitq); + +        newenv->stacksize    = SYNCENV_DEFAULT_STACKSIZE; +        if (stacksize) +                newenv->stacksize = stacksize; + +        ret = pthread_create (&newenv->processor, NULL, +                              syncenv_processor, newenv); + +        if (ret != 0) +                syncenv_destroy (newenv); + +        return newenv; +} + + +/* FOPS */ + + +int +syncop_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                   int op_ret, int op_errno, inode_t *inode, +                   struct iatt *iatt, dict_t *xattr, struct iatt *parent) +{ +        struct syncargs *args = NULL; + +        args = cookie; + +        args->op_ret   = op_ret; +        args->op_errno = op_errno; + +        if (op_ret == 0) { +                args->iatt1  = *iatt; +                args->xattr  = xattr; +                args->iatt2  = *parent; +        } + +        __wake (args); + +        return 0; +} + + +int +syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req, +               struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent) +{ +        struct syncargs args = {0, }; + +        SYNCOP (subvol, (&args), syncop_lookup_cbk, subvol->fops->lookup, +                loc, xattr_req); + +        if (iatt) +                *iatt = args.iatt1; +        if (xattr_rsp) +                *xattr_rsp = args.xattr; +        if (parent) +                *parent = args.iatt2; + +        errno = args.op_errno; +        return args.op_ret; +} + + + +int +syncop_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int op_ret, int op_errno, +                    struct iatt *preop, struct iatt *postop) +{ +        struct syncargs *args = NULL; + +        args = cookie; + +        args->op_ret   = op_ret; +        args->op_errno = op_errno; + +        if (op_ret == 0) { +                args->iatt1  = *preop; +                args->iatt2  = *postop; +        } + +        __wake (args); + +        return 0; +} + + +int +syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid, +                struct iatt *preop, struct iatt *postop) +{ +        struct syncargs args = {0, }; + +        SYNCOP (subvol, (&args), syncop_setattr_cbk, subvol->fops->setattr, +                loc, iatt, valid); + +        if (preop) +                *preop = args.iatt1; +        if (postop) +                *postop = args.iatt2; + +        errno = args.op_errno; +        return args.op_ret; +} + diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h new file mode 100644 index 00000000000..ce364b07301 --- /dev/null +++ b/libglusterfs/src/syncop.h @@ -0,0 +1,164 @@ +/* +   Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +   This file is part of GlusterFS. + +   GlusterFS is free software; you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published +   by the Free Software Foundation; either version 3 of the License, +   or (at your option) any later version. + +   GlusterFS is distributed in the hope that it will be useful, but +   WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +   General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see +   <http://www.gnu.org/licenses/>. +*/ + +#ifndef _SYNCOP_H +#define _SYNCOP_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "xlator.h" +#include <sys/time.h> +#include <pthread.h> +#include <ucontext.h> + + +struct synctask; +struct syncenv; + + +typedef int (*synctask_cbk_t) (int ret, void *opaque); + +typedef int (*synctask_fn_t) (void *opaque); + + +/* for one sequential execution of @syncfn */ +struct synctask { +        struct list_head    all_tasks; +        struct syncenv     *env; +        xlator_t           *xl; +        synctask_cbk_t      synccbk; +        synctask_fn_t       syncfn; +        void               *opaque; +        void               *stack; +        int                 complete; + +        ucontext_t          ctx; +}; + +/* hosts the scheduler thread and framework for executing synctasks */ +struct syncenv { +        pthread_t           processor; +        struct synctask    *current; + +        struct list_head    runq; +        struct list_head    waitq; + +        pthread_mutex_t     mutex; +        pthread_cond_t      cond; + +        ucontext_t          sched; +        size_t              stacksize; +}; + + +struct syncargs { +        int                 op_ret; +        int                 op_errno; +        struct iatt         iatt1; +        struct iatt         iatt2; +        dict_t             *xattr; + +        /* do not touch */ +        pthread_mutex_t     mutex; +        char                complete; +        pthread_cond_t      cond; +        struct synctask    *task; +}; + + +#define __yawn(args) do {                                               \ +        struct synctask *task = NULL;                                   \ +                                                                        \ +        task = synctask_get ();                                         \ +        if (task) {                                                     \ +                args->task = task;                                      \ +                synctask_yawn (task);                                   \ +        } else {                                                        \ +                pthread_mutex_init (&args->mutex, NULL);                \ +                pthread_cond_init (&args->cond, NULL);                  \ +        }                                                               \ +} while (0) + + +#define __yield(args) do {                                              \ +        if (args->task) {                                               \ +                synctask_yield (args->task);                            \ +        } else {                                                        \ +                pthread_mutex_lock (&args->mutex);                      \ +                {                                                       \ +                        while (!args->complete)                         \ +                                pthread_cond_wait (&args->cond,         \ +                                                   &args->mutex);       \ +                }                                                       \ +                pthread_mutex_unlock (&args->mutex);                    \ +                                                                        \ +                pthread_mutex_destroy (&args->mutex);                   \ +                pthread_cond_destroy (&args->cond);                     \ +        }                                                               \ +} while (0) + + +#define __wake(args) do {                                               \ +        if (args->task) {                                               \ +                synctask_wake (args->task);                             \ +        } else {                                                        \ +                pthread_mutex_lock (&args->mutex);                      \ +                {                                                       \ +                        args->complete = 1;                             \ +                        pthread_cond_broadcast (&args->cond);           \ +                }                                                       \ +                pthread_mutex_unlock (&args->mutex);                    \ +        }                                                               \ +} while (0) + + +#define SYNCOP(subvol, stb, cbk, op, params ...) do {                   \ +        call_frame_t    *frame = NULL;                                  \ +                                                                        \ +        frame = create_frame (THIS, THIS->ctx->pool);                   \ +                                                                        \ +        __yawn (stb);                                                   \ +        STACK_WIND_COOKIE (frame, (void *)stb, cbk, subvol, op, params);\ +        __yield (stb);                                                  \ +} while (0) + + +#define SYNCENV_DEFAULT_STACKSIZE (16 * 1024) + +struct syncenv * syncenv_new (); +void syncenv_destroy (struct syncenv *); + +int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, void *); +void synctask_zzzz (struct synctask *task); +void synctask_yawn (struct synctask *task); +void synctask_wake (struct synctask *task); +void synctask_yield (struct synctask *task); + +int syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req, +                   /* out */ +                   struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent); + +int syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid, +                    /* out */ +                    struct iatt *preop, struct iatt *postop); + +#endif /* _SYNCOP_H */ | 
