diff options
| -rw-r--r-- | libglusterfs/src/Makefile.am | 4 | ||||
| -rw-r--r-- | libglusterfs/src/globals.c | 43 | ||||
| -rw-r--r-- | libglusterfs/src/globals.h | 5 | ||||
| -rw-r--r-- | libglusterfs/src/syncop.c | 350 | ||||
| -rw-r--r-- | libglusterfs/src/syncop.h | 164 | 
5 files changed, 564 insertions, 2 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 7a13d6955dd..f6c2cf8555b 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -6,9 +6,9 @@ libglusterfs_la_LIBADD = @LEXLIB@  lib_LTLIBRARIES = libglusterfs.la -libglusterfs_la_SOURCES = dict.c graph.lex.c y.tab.c xlator.c logging.c  hashfn.c defaults.c common-utils.c timer.c inode.c call-stub.c compat.c fd.c compat-errno.c event.c mem-pool.c gf-dirent.c syscall.c iobuf.c globals.c statedump.c stack.c checksum.c $(CONTRIBDIR)/md5/md5.c $(CONTRIBDIR)/rbtree/rb.c rbthash.c latency.c graph.c $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/tst_uuid.c $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c +libglusterfs_la_SOURCES = dict.c graph.lex.c y.tab.c xlator.c logging.c  hashfn.c defaults.c common-utils.c timer.c inode.c call-stub.c compat.c fd.c compat-errno.c event.c mem-pool.c gf-dirent.c syscall.c iobuf.c globals.c statedump.c stack.c checksum.c $(CONTRIBDIR)/md5/md5.c $(CONTRIBDIR)/rbtree/rb.c rbthash.c latency.c graph.c $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/tst_uuid.c $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c syncop.c -noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h logging.h  xlator.h  stack.h timer.h list.h inode.h call-stub.h compat.h fd.h revision.h compat-errno.h event.h mem-pool.h byte-order.h gf-dirent.h locking.h syscall.h iobuf.h globals.h statedump.h checksum.h $(CONTRIBDIR)/md5/md5.h $(CONTRIBDIR)/rbtree/rb.h rbthash.h iatt.h latency.h mem-types.h $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIBDIR)/uuid/uuid_types.h +noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h logging.h  xlator.h  stack.h timer.h list.h inode.h call-stub.h compat.h fd.h revision.h compat-errno.h event.h mem-pool.h byte-order.h gf-dirent.h locking.h syscall.h iobuf.h globals.h statedump.h checksum.h $(CONTRIBDIR)/md5/md5.h $(CONTRIBDIR)/rbtree/rb.h rbthash.h iatt.h latency.h mem-types.h $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIBDIR)/uuid/uuid_types.h syncop.h  EXTRA_DIST = graph.l graph.y diff --git a/libglusterfs/src/globals.c b/libglusterfs/src/globals.c index c0040db1448..e845b3dcb66 100644 --- a/libglusterfs/src/globals.c +++ b/libglusterfs/src/globals.c @@ -267,6 +267,45 @@ glusterfs_central_log_flag_unset ()  } + +/* SYNCTASK */ + +static pthread_key_t synctask_key; + + +int +synctask_init () +{ +        int  ret = 0; + +        ret = pthread_key_create (&synctask_key, NULL); + +        return ret; +} + + +void * +synctask_get () +{ +        void   *synctask = NULL; + +        synctask = pthread_getspecific (synctask_key); + +        return synctask; +} + + +int +synctask_set (void *synctask) +{ +        int     ret = 0; + +        pthread_setspecific (synctask_key, synctask); + +        return ret; +} + +  int  glusterfs_globals_init ()  { @@ -288,6 +327,10 @@ glusterfs_globals_init ()          gf_mem_acct_enable_set (); +        ret = synctask_init (); +        if (ret) +                goto out; +  out:          return ret;  } diff --git a/libglusterfs/src/globals.h b/libglusterfs/src/globals.h index e09c695113c..bdd9e891046 100644 --- a/libglusterfs/src/globals.h +++ b/libglusterfs/src/globals.h @@ -42,10 +42,15 @@ xlator_t **__glusterfs_this_location ();  xlator_t *glusterfs_this_get ();  int glusterfs_this_set (xlator_t *); +/* central log */ +  void glusterfs_central_log_flag_set ();  long glusterfs_central_log_flag_get ();  void glusterfs_central_log_flag_unset (); +/* task */ +void *synctask_get (); +int synctask_set (void *);  /* init */  int glusterfs_globals_init (void); diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c new file mode 100644 index 00000000000..beb5d9db4a1 --- /dev/null +++ b/libglusterfs/src/syncop.c @@ -0,0 +1,350 @@ +/* +   Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +   This file is part of GlusterFS. + +   GlusterFS is free software; you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published +   by the Free Software Foundation; either version 3 of the License, +   or (at your option) any later version. + +   GlusterFS is distributed in the hope that it will be useful, but +   WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +   General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see +   <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "syncop.h" + + +void +synctask_yield (struct synctask *task) +{ +        struct syncenv   *env = NULL; + +        env = task->env; + +        if (swapcontext (&task->ctx, &env->sched) < 0) { +                gf_log ("syncop", GF_LOG_ERROR, +                        "swapcontext failed (%s)", strerror (errno)); +        } +} + + +void +synctask_yawn (struct synctask *task) +{ +        struct syncenv  *env = NULL; + +        env  = task->env; + +        pthread_mutex_lock (&env->mutex); +        { +                list_del_init (&task->all_tasks); +                list_add (&task->all_tasks, &env->waitq); +        } +        pthread_mutex_unlock (&env->mutex); +} + + +void +synctask_zzzz (struct synctask *task) +{ +        synctask_yawn (task); + +        synctask_yield (task); +} + + +void +synctask_wake (struct synctask *task) +{ +        struct syncenv *env = NULL; + +        env = task->env; + +        pthread_mutex_lock (&env->mutex); +        { +                list_del_init (&task->all_tasks); +                list_add_tail (&task->all_tasks, &env->runq); +        } +        pthread_mutex_unlock (&env->mutex); + +        pthread_cond_broadcast (&env->cond); +} + + +void +synctask_wrap (struct synctask *task) +{ +        int              ret; + +        ret = task->syncfn (task->opaque); +        task->synccbk (ret, task->opaque); + +        /* cannot destroy @task right here as we are +           in the execution stack of @task itself +        */ +        task->complete = 1; +        synctask_wake (task); +} + + +void +synctask_destroy (struct synctask *task) +{ +        if (!task) +                return; + +        if (task->stack) +                FREE (task); +        FREE (task); +} + + +int +synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, +              void *opaque) +{ +        struct synctask *newtask = NULL; + +        newtask = CALLOC (1, sizeof (*newtask)); +        if (!newtask) +                return -ENOMEM; + +        newtask->env        = env; +        newtask->xl         = THIS; +        newtask->syncfn     = fn; +        newtask->synccbk    = cbk; +        newtask->opaque     = opaque; + +        INIT_LIST_HEAD (&newtask->all_tasks); + +        if (getcontext (&newtask->ctx) < 0) { +                gf_log ("syncop", GF_LOG_ERROR, +                        "getcontext failed (%s)", +                        strerror (errno)); +                goto err; +        } + +        newtask->stack = CALLOC (1, env->stacksize); +        if (!newtask->stack) { +                gf_log ("syncop", GF_LOG_ERROR, +                        "out of memory for stack"); +                goto err; +        } + +        newtask->ctx.uc_stack.ss_sp   = newtask->stack; +        newtask->ctx.uc_stack.ss_size = env->stacksize; + +        makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask); + +        synctask_wake (newtask); + +        return 0; +err: +        if (newtask) { +                if (newtask->stack) +                        FREE (newtask->stack); +                FREE (newtask); +        } +        return -1; +} + + +struct synctask * +syncenv_task (struct syncenv *env) +{ +        struct synctask  *task = NULL; + +        pthread_mutex_lock (&env->mutex); +        { +                while (list_empty (&env->runq)) +                        pthread_cond_wait (&env->cond, &env->mutex); + +                task = list_entry (env->runq.next, struct synctask, all_tasks); + +                list_del_init (&task->all_tasks); +        } +        pthread_mutex_unlock (&env->mutex); + +        return task; +} + + +void +synctask_switchto (struct synctask *task) +{ +        struct syncenv *env = NULL; + +        env = task->env; + +        synctask_set (task); +        THIS = task->xl; + +        if (swapcontext (&env->sched, &task->ctx) < 0) { +                gf_log ("syncop", GF_LOG_ERROR, +                        "swapcontext failed (%s)", strerror (errno)); +        } +} + + +void * +syncenv_processor (void *thdata) +{ +        struct syncenv  *env = NULL; +        struct synctask *task = NULL; + +        env = thdata; + +        for (;;) { +                task = syncenv_task (env); + +                if (task->complete) { +                        synctask_destroy (task); +                        continue; +                } + +                synctask_switchto (task); +        } + +        return NULL; +} + + +void +syncenv_destroy (struct syncenv *env) +{ + +} + + +struct syncenv * +syncenv_new (size_t stacksize) +{ +        struct syncenv *newenv = NULL; +        int             ret = 0; + +        newenv = CALLOC (1, sizeof (*newenv)); + +        if (!newenv) +                return NULL; + +        pthread_mutex_init (&newenv->mutex, NULL); +        pthread_cond_init (&newenv->cond, NULL); + +        INIT_LIST_HEAD (&newenv->runq); +        INIT_LIST_HEAD (&newenv->waitq); + +        newenv->stacksize    = SYNCENV_DEFAULT_STACKSIZE; +        if (stacksize) +                newenv->stacksize = stacksize; + +        ret = pthread_create (&newenv->processor, NULL, +                              syncenv_processor, newenv); + +        if (ret != 0) +                syncenv_destroy (newenv); + +        return newenv; +} + + +/* FOPS */ + + +int +syncop_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                   int op_ret, int op_errno, inode_t *inode, +                   struct iatt *iatt, dict_t *xattr, struct iatt *parent) +{ +        struct syncargs *args = NULL; + +        args = cookie; + +        args->op_ret   = op_ret; +        args->op_errno = op_errno; + +        if (op_ret == 0) { +                args->iatt1  = *iatt; +                args->xattr  = xattr; +                args->iatt2  = *parent; +        } + +        __wake (args); + +        return 0; +} + + +int +syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req, +               struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent) +{ +        struct syncargs args = {0, }; + +        SYNCOP (subvol, (&args), syncop_lookup_cbk, subvol->fops->lookup, +                loc, xattr_req); + +        if (iatt) +                *iatt = args.iatt1; +        if (xattr_rsp) +                *xattr_rsp = args.xattr; +        if (parent) +                *parent = args.iatt2; + +        errno = args.op_errno; +        return args.op_ret; +} + + + +int +syncop_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int op_ret, int op_errno, +                    struct iatt *preop, struct iatt *postop) +{ +        struct syncargs *args = NULL; + +        args = cookie; + +        args->op_ret   = op_ret; +        args->op_errno = op_errno; + +        if (op_ret == 0) { +                args->iatt1  = *preop; +                args->iatt2  = *postop; +        } + +        __wake (args); + +        return 0; +} + + +int +syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid, +                struct iatt *preop, struct iatt *postop) +{ +        struct syncargs args = {0, }; + +        SYNCOP (subvol, (&args), syncop_setattr_cbk, subvol->fops->setattr, +                loc, iatt, valid); + +        if (preop) +                *preop = args.iatt1; +        if (postop) +                *postop = args.iatt2; + +        errno = args.op_errno; +        return args.op_ret; +} + diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h new file mode 100644 index 00000000000..ce364b07301 --- /dev/null +++ b/libglusterfs/src/syncop.h @@ -0,0 +1,164 @@ +/* +   Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +   This file is part of GlusterFS. + +   GlusterFS is free software; you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published +   by the Free Software Foundation; either version 3 of the License, +   or (at your option) any later version. + +   GlusterFS is distributed in the hope that it will be useful, but +   WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +   General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see +   <http://www.gnu.org/licenses/>. +*/ + +#ifndef _SYNCOP_H +#define _SYNCOP_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "xlator.h" +#include <sys/time.h> +#include <pthread.h> +#include <ucontext.h> + + +struct synctask; +struct syncenv; + + +typedef int (*synctask_cbk_t) (int ret, void *opaque); + +typedef int (*synctask_fn_t) (void *opaque); + + +/* for one sequential execution of @syncfn */ +struct synctask { +        struct list_head    all_tasks; +        struct syncenv     *env; +        xlator_t           *xl; +        synctask_cbk_t      synccbk; +        synctask_fn_t       syncfn; +        void               *opaque; +        void               *stack; +        int                 complete; + +        ucontext_t          ctx; +}; + +/* hosts the scheduler thread and framework for executing synctasks */ +struct syncenv { +        pthread_t           processor; +        struct synctask    *current; + +        struct list_head    runq; +        struct list_head    waitq; + +        pthread_mutex_t     mutex; +        pthread_cond_t      cond; + +        ucontext_t          sched; +        size_t              stacksize; +}; + + +struct syncargs { +        int                 op_ret; +        int                 op_errno; +        struct iatt         iatt1; +        struct iatt         iatt2; +        dict_t             *xattr; + +        /* do not touch */ +        pthread_mutex_t     mutex; +        char                complete; +        pthread_cond_t      cond; +        struct synctask    *task; +}; + + +#define __yawn(args) do {                                               \ +        struct synctask *task = NULL;                                   \ +                                                                        \ +        task = synctask_get ();                                         \ +        if (task) {                                                     \ +                args->task = task;                                      \ +                synctask_yawn (task);                                   \ +        } else {                                                        \ +                pthread_mutex_init (&args->mutex, NULL);                \ +                pthread_cond_init (&args->cond, NULL);                  \ +        }                                                               \ +} while (0) + + +#define __yield(args) do {                                              \ +        if (args->task) {                                               \ +                synctask_yield (args->task);                            \ +        } else {                                                        \ +                pthread_mutex_lock (&args->mutex);                      \ +                {                                                       \ +                        while (!args->complete)                         \ +                                pthread_cond_wait (&args->cond,         \ +                                                   &args->mutex);       \ +                }                                                       \ +                pthread_mutex_unlock (&args->mutex);                    \ +                                                                        \ +                pthread_mutex_destroy (&args->mutex);                   \ +                pthread_cond_destroy (&args->cond);                     \ +        }                                                               \ +} while (0) + + +#define __wake(args) do {                                               \ +        if (args->task) {                                               \ +                synctask_wake (args->task);                             \ +        } else {                                                        \ +                pthread_mutex_lock (&args->mutex);                      \ +                {                                                       \ +                        args->complete = 1;                             \ +                        pthread_cond_broadcast (&args->cond);           \ +                }                                                       \ +                pthread_mutex_unlock (&args->mutex);                    \ +        }                                                               \ +} while (0) + + +#define SYNCOP(subvol, stb, cbk, op, params ...) do {                   \ +        call_frame_t    *frame = NULL;                                  \ +                                                                        \ +        frame = create_frame (THIS, THIS->ctx->pool);                   \ +                                                                        \ +        __yawn (stb);                                                   \ +        STACK_WIND_COOKIE (frame, (void *)stb, cbk, subvol, op, params);\ +        __yield (stb);                                                  \ +} while (0) + + +#define SYNCENV_DEFAULT_STACKSIZE (16 * 1024) + +struct syncenv * syncenv_new (); +void syncenv_destroy (struct syncenv *); + +int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, void *); +void synctask_zzzz (struct synctask *task); +void synctask_yawn (struct synctask *task); +void synctask_wake (struct synctask *task); +void synctask_yield (struct synctask *task); + +int syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req, +                   /* out */ +                   struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent); + +int syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid, +                    /* out */ +                    struct iatt *preop, struct iatt *postop); + +#endif /* _SYNCOP_H */  | 
