diff options
-rw-r--r-- | libglusterfs/src/Makefile.am | 4 | ||||
-rw-r--r-- | libglusterfs/src/globals.c | 43 | ||||
-rw-r--r-- | libglusterfs/src/globals.h | 5 | ||||
-rw-r--r-- | libglusterfs/src/syncop.c | 350 | ||||
-rw-r--r-- | libglusterfs/src/syncop.h | 164 |
5 files changed, 564 insertions, 2 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 7a13d6955dd..f6c2cf8555b 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -6,9 +6,9 @@ libglusterfs_la_LIBADD = @LEXLIB@ lib_LTLIBRARIES = libglusterfs.la -libglusterfs_la_SOURCES = dict.c graph.lex.c y.tab.c xlator.c logging.c hashfn.c defaults.c common-utils.c timer.c inode.c call-stub.c compat.c fd.c compat-errno.c event.c mem-pool.c gf-dirent.c syscall.c iobuf.c globals.c statedump.c stack.c checksum.c $(CONTRIBDIR)/md5/md5.c $(CONTRIBDIR)/rbtree/rb.c rbthash.c latency.c graph.c $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/tst_uuid.c $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c +libglusterfs_la_SOURCES = dict.c graph.lex.c y.tab.c xlator.c logging.c hashfn.c defaults.c common-utils.c timer.c inode.c call-stub.c compat.c fd.c compat-errno.c event.c mem-pool.c gf-dirent.c syscall.c iobuf.c globals.c statedump.c stack.c checksum.c $(CONTRIBDIR)/md5/md5.c $(CONTRIBDIR)/rbtree/rb.c rbthash.c latency.c graph.c $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/tst_uuid.c $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c syncop.c -noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h logging.h xlator.h stack.h timer.h list.h inode.h call-stub.h compat.h fd.h revision.h compat-errno.h event.h mem-pool.h byte-order.h gf-dirent.h locking.h syscall.h iobuf.h globals.h statedump.h checksum.h $(CONTRIBDIR)/md5/md5.h $(CONTRIBDIR)/rbtree/rb.h rbthash.h iatt.h latency.h mem-types.h $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIBDIR)/uuid/uuid_types.h +noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h logging.h xlator.h stack.h timer.h list.h inode.h call-stub.h compat.h fd.h revision.h compat-errno.h event.h mem-pool.h byte-order.h gf-dirent.h locking.h syscall.h iobuf.h globals.h statedump.h checksum.h $(CONTRIBDIR)/md5/md5.h $(CONTRIBDIR)/rbtree/rb.h rbthash.h iatt.h latency.h mem-types.h $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIBDIR)/uuid/uuid_types.h syncop.h EXTRA_DIST = graph.l graph.y diff --git a/libglusterfs/src/globals.c b/libglusterfs/src/globals.c index c0040db1448..e845b3dcb66 100644 --- a/libglusterfs/src/globals.c +++ b/libglusterfs/src/globals.c @@ -267,6 +267,45 @@ glusterfs_central_log_flag_unset () } + +/* SYNCTASK */ + +static pthread_key_t synctask_key; + + +int +synctask_init () +{ + int ret = 0; + + ret = pthread_key_create (&synctask_key, NULL); + + return ret; +} + + +void * +synctask_get () +{ + void *synctask = NULL; + + synctask = pthread_getspecific (synctask_key); + + return synctask; +} + + +int +synctask_set (void *synctask) +{ + int ret = 0; + + pthread_setspecific (synctask_key, synctask); + + return ret; +} + + int glusterfs_globals_init () { @@ -288,6 +327,10 @@ glusterfs_globals_init () gf_mem_acct_enable_set (); + ret = synctask_init (); + if (ret) + goto out; + out: return ret; } diff --git a/libglusterfs/src/globals.h b/libglusterfs/src/globals.h index e09c695113c..bdd9e891046 100644 --- a/libglusterfs/src/globals.h +++ b/libglusterfs/src/globals.h @@ -42,10 +42,15 @@ xlator_t **__glusterfs_this_location (); xlator_t *glusterfs_this_get (); int glusterfs_this_set (xlator_t *); +/* central log */ + void glusterfs_central_log_flag_set (); long glusterfs_central_log_flag_get (); void glusterfs_central_log_flag_unset (); +/* task */ +void *synctask_get (); +int synctask_set (void *); /* init */ int glusterfs_globals_init (void); diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c new file mode 100644 index 00000000000..beb5d9db4a1 --- /dev/null +++ b/libglusterfs/src/syncop.c @@ -0,0 +1,350 @@ +/* + Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "syncop.h" + + +void +synctask_yield (struct synctask *task) +{ + struct syncenv *env = NULL; + + env = task->env; + + if (swapcontext (&task->ctx, &env->sched) < 0) { + gf_log ("syncop", GF_LOG_ERROR, + "swapcontext failed (%s)", strerror (errno)); + } +} + + +void +synctask_yawn (struct synctask *task) +{ + struct syncenv *env = NULL; + + env = task->env; + + pthread_mutex_lock (&env->mutex); + { + list_del_init (&task->all_tasks); + list_add (&task->all_tasks, &env->waitq); + } + pthread_mutex_unlock (&env->mutex); +} + + +void +synctask_zzzz (struct synctask *task) +{ + synctask_yawn (task); + + synctask_yield (task); +} + + +void +synctask_wake (struct synctask *task) +{ + struct syncenv *env = NULL; + + env = task->env; + + pthread_mutex_lock (&env->mutex); + { + list_del_init (&task->all_tasks); + list_add_tail (&task->all_tasks, &env->runq); + } + pthread_mutex_unlock (&env->mutex); + + pthread_cond_broadcast (&env->cond); +} + + +void +synctask_wrap (struct synctask *task) +{ + int ret; + + ret = task->syncfn (task->opaque); + task->synccbk (ret, task->opaque); + + /* cannot destroy @task right here as we are + in the execution stack of @task itself + */ + task->complete = 1; + synctask_wake (task); +} + + +void +synctask_destroy (struct synctask *task) +{ + if (!task) + return; + + if (task->stack) + FREE (task); + FREE (task); +} + + +int +synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk, + void *opaque) +{ + struct synctask *newtask = NULL; + + newtask = CALLOC (1, sizeof (*newtask)); + if (!newtask) + return -ENOMEM; + + newtask->env = env; + newtask->xl = THIS; + newtask->syncfn = fn; + newtask->synccbk = cbk; + newtask->opaque = opaque; + + INIT_LIST_HEAD (&newtask->all_tasks); + + if (getcontext (&newtask->ctx) < 0) { + gf_log ("syncop", GF_LOG_ERROR, + "getcontext failed (%s)", + strerror (errno)); + goto err; + } + + newtask->stack = CALLOC (1, env->stacksize); + if (!newtask->stack) { + gf_log ("syncop", GF_LOG_ERROR, + "out of memory for stack"); + goto err; + } + + newtask->ctx.uc_stack.ss_sp = newtask->stack; + newtask->ctx.uc_stack.ss_size = env->stacksize; + + makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask); + + synctask_wake (newtask); + + return 0; +err: + if (newtask) { + if (newtask->stack) + FREE (newtask->stack); + FREE (newtask); + } + return -1; +} + + +struct synctask * +syncenv_task (struct syncenv *env) +{ + struct synctask *task = NULL; + + pthread_mutex_lock (&env->mutex); + { + while (list_empty (&env->runq)) + pthread_cond_wait (&env->cond, &env->mutex); + + task = list_entry (env->runq.next, struct synctask, all_tasks); + + list_del_init (&task->all_tasks); + } + pthread_mutex_unlock (&env->mutex); + + return task; +} + + +void +synctask_switchto (struct synctask *task) +{ + struct syncenv *env = NULL; + + env = task->env; + + synctask_set (task); + THIS = task->xl; + + if (swapcontext (&env->sched, &task->ctx) < 0) { + gf_log ("syncop", GF_LOG_ERROR, + "swapcontext failed (%s)", strerror (errno)); + } +} + + +void * +syncenv_processor (void *thdata) +{ + struct syncenv *env = NULL; + struct synctask *task = NULL; + + env = thdata; + + for (;;) { + task = syncenv_task (env); + + if (task->complete) { + synctask_destroy (task); + continue; + } + + synctask_switchto (task); + } + + return NULL; +} + + +void +syncenv_destroy (struct syncenv *env) +{ + +} + + +struct syncenv * +syncenv_new (size_t stacksize) +{ + struct syncenv *newenv = NULL; + int ret = 0; + + newenv = CALLOC (1, sizeof (*newenv)); + + if (!newenv) + return NULL; + + pthread_mutex_init (&newenv->mutex, NULL); + pthread_cond_init (&newenv->cond, NULL); + + INIT_LIST_HEAD (&newenv->runq); + INIT_LIST_HEAD (&newenv->waitq); + + newenv->stacksize = SYNCENV_DEFAULT_STACKSIZE; + if (stacksize) + newenv->stacksize = stacksize; + + ret = pthread_create (&newenv->processor, NULL, + syncenv_processor, newenv); + + if (ret != 0) + syncenv_destroy (newenv); + + return newenv; +} + + +/* FOPS */ + + +int +syncop_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, inode_t *inode, + struct iatt *iatt, dict_t *xattr, struct iatt *parent) +{ + struct syncargs *args = NULL; + + args = cookie; + + args->op_ret = op_ret; + args->op_errno = op_errno; + + if (op_ret == 0) { + args->iatt1 = *iatt; + args->xattr = xattr; + args->iatt2 = *parent; + } + + __wake (args); + + return 0; +} + + +int +syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req, + struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent) +{ + struct syncargs args = {0, }; + + SYNCOP (subvol, (&args), syncop_lookup_cbk, subvol->fops->lookup, + loc, xattr_req); + + if (iatt) + *iatt = args.iatt1; + if (xattr_rsp) + *xattr_rsp = args.xattr; + if (parent) + *parent = args.iatt2; + + errno = args.op_errno; + return args.op_ret; +} + + + +int +syncop_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, + struct iatt *preop, struct iatt *postop) +{ + struct syncargs *args = NULL; + + args = cookie; + + args->op_ret = op_ret; + args->op_errno = op_errno; + + if (op_ret == 0) { + args->iatt1 = *preop; + args->iatt2 = *postop; + } + + __wake (args); + + return 0; +} + + +int +syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid, + struct iatt *preop, struct iatt *postop) +{ + struct syncargs args = {0, }; + + SYNCOP (subvol, (&args), syncop_setattr_cbk, subvol->fops->setattr, + loc, iatt, valid); + + if (preop) + *preop = args.iatt1; + if (postop) + *postop = args.iatt2; + + errno = args.op_errno; + return args.op_ret; +} + diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h new file mode 100644 index 00000000000..ce364b07301 --- /dev/null +++ b/libglusterfs/src/syncop.h @@ -0,0 +1,164 @@ +/* + Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +#ifndef _SYNCOP_H +#define _SYNCOP_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "xlator.h" +#include <sys/time.h> +#include <pthread.h> +#include <ucontext.h> + + +struct synctask; +struct syncenv; + + +typedef int (*synctask_cbk_t) (int ret, void *opaque); + +typedef int (*synctask_fn_t) (void *opaque); + + +/* for one sequential execution of @syncfn */ +struct synctask { + struct list_head all_tasks; + struct syncenv *env; + xlator_t *xl; + synctask_cbk_t synccbk; + synctask_fn_t syncfn; + void *opaque; + void *stack; + int complete; + + ucontext_t ctx; +}; + +/* hosts the scheduler thread and framework for executing synctasks */ +struct syncenv { + pthread_t processor; + struct synctask *current; + + struct list_head runq; + struct list_head waitq; + + pthread_mutex_t mutex; + pthread_cond_t cond; + + ucontext_t sched; + size_t stacksize; +}; + + +struct syncargs { + int op_ret; + int op_errno; + struct iatt iatt1; + struct iatt iatt2; + dict_t *xattr; + + /* do not touch */ + pthread_mutex_t mutex; + char complete; + pthread_cond_t cond; + struct synctask *task; +}; + + +#define __yawn(args) do { \ + struct synctask *task = NULL; \ + \ + task = synctask_get (); \ + if (task) { \ + args->task = task; \ + synctask_yawn (task); \ + } else { \ + pthread_mutex_init (&args->mutex, NULL); \ + pthread_cond_init (&args->cond, NULL); \ + } \ +} while (0) + + +#define __yield(args) do { \ + if (args->task) { \ + synctask_yield (args->task); \ + } else { \ + pthread_mutex_lock (&args->mutex); \ + { \ + while (!args->complete) \ + pthread_cond_wait (&args->cond, \ + &args->mutex); \ + } \ + pthread_mutex_unlock (&args->mutex); \ + \ + pthread_mutex_destroy (&args->mutex); \ + pthread_cond_destroy (&args->cond); \ + } \ +} while (0) + + +#define __wake(args) do { \ + if (args->task) { \ + synctask_wake (args->task); \ + } else { \ + pthread_mutex_lock (&args->mutex); \ + { \ + args->complete = 1; \ + pthread_cond_broadcast (&args->cond); \ + } \ + pthread_mutex_unlock (&args->mutex); \ + } \ +} while (0) + + +#define SYNCOP(subvol, stb, cbk, op, params ...) do { \ + call_frame_t *frame = NULL; \ + \ + frame = create_frame (THIS, THIS->ctx->pool); \ + \ + __yawn (stb); \ + STACK_WIND_COOKIE (frame, (void *)stb, cbk, subvol, op, params);\ + __yield (stb); \ +} while (0) + + +#define SYNCENV_DEFAULT_STACKSIZE (16 * 1024) + +struct syncenv * syncenv_new (); +void syncenv_destroy (struct syncenv *); + +int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, void *); +void synctask_zzzz (struct synctask *task); +void synctask_yawn (struct synctask *task); +void synctask_wake (struct synctask *task); +void synctask_yield (struct synctask *task); + +int syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req, + /* out */ + struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent); + +int syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid, + /* out */ + struct iatt *preop, struct iatt *postop); + +#endif /* _SYNCOP_H */ |