summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libglusterfs/src/Makefile.am4
-rw-r--r--libglusterfs/src/globals.c43
-rw-r--r--libglusterfs/src/globals.h5
-rw-r--r--libglusterfs/src/syncop.c350
-rw-r--r--libglusterfs/src/syncop.h164
5 files changed, 564 insertions, 2 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am
index 7a13d6955..f6c2cf855 100644
--- a/libglusterfs/src/Makefile.am
+++ b/libglusterfs/src/Makefile.am
@@ -6,9 +6,9 @@ libglusterfs_la_LIBADD = @LEXLIB@
lib_LTLIBRARIES = libglusterfs.la
-libglusterfs_la_SOURCES = dict.c graph.lex.c y.tab.c xlator.c logging.c hashfn.c defaults.c common-utils.c timer.c inode.c call-stub.c compat.c fd.c compat-errno.c event.c mem-pool.c gf-dirent.c syscall.c iobuf.c globals.c statedump.c stack.c checksum.c $(CONTRIBDIR)/md5/md5.c $(CONTRIBDIR)/rbtree/rb.c rbthash.c latency.c graph.c $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/tst_uuid.c $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c
+libglusterfs_la_SOURCES = dict.c graph.lex.c y.tab.c xlator.c logging.c hashfn.c defaults.c common-utils.c timer.c inode.c call-stub.c compat.c fd.c compat-errno.c event.c mem-pool.c gf-dirent.c syscall.c iobuf.c globals.c statedump.c stack.c checksum.c $(CONTRIBDIR)/md5/md5.c $(CONTRIBDIR)/rbtree/rb.c rbthash.c latency.c graph.c $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/tst_uuid.c $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c syncop.c
-noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h logging.h xlator.h stack.h timer.h list.h inode.h call-stub.h compat.h fd.h revision.h compat-errno.h event.h mem-pool.h byte-order.h gf-dirent.h locking.h syscall.h iobuf.h globals.h statedump.h checksum.h $(CONTRIBDIR)/md5/md5.h $(CONTRIBDIR)/rbtree/rb.h rbthash.h iatt.h latency.h mem-types.h $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIBDIR)/uuid/uuid_types.h
+noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h logging.h xlator.h stack.h timer.h list.h inode.h call-stub.h compat.h fd.h revision.h compat-errno.h event.h mem-pool.h byte-order.h gf-dirent.h locking.h syscall.h iobuf.h globals.h statedump.h checksum.h $(CONTRIBDIR)/md5/md5.h $(CONTRIBDIR)/rbtree/rb.h rbthash.h iatt.h latency.h mem-types.h $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIBDIR)/uuid/uuid_types.h syncop.h
EXTRA_DIST = graph.l graph.y
diff --git a/libglusterfs/src/globals.c b/libglusterfs/src/globals.c
index c0040db14..e845b3dcb 100644
--- a/libglusterfs/src/globals.c
+++ b/libglusterfs/src/globals.c
@@ -267,6 +267,45 @@ glusterfs_central_log_flag_unset ()
}
+
+/* SYNCTASK */
+
+static pthread_key_t synctask_key;
+
+
+int
+synctask_init ()
+{
+ int ret = 0;
+
+ ret = pthread_key_create (&synctask_key, NULL);
+
+ return ret;
+}
+
+
+void *
+synctask_get ()
+{
+ void *synctask = NULL;
+
+ synctask = pthread_getspecific (synctask_key);
+
+ return synctask;
+}
+
+
+int
+synctask_set (void *synctask)
+{
+ int ret = 0;
+
+ pthread_setspecific (synctask_key, synctask);
+
+ return ret;
+}
+
+
int
glusterfs_globals_init ()
{
@@ -288,6 +327,10 @@ glusterfs_globals_init ()
gf_mem_acct_enable_set ();
+ ret = synctask_init ();
+ if (ret)
+ goto out;
+
out:
return ret;
}
diff --git a/libglusterfs/src/globals.h b/libglusterfs/src/globals.h
index e09c69511..bdd9e8910 100644
--- a/libglusterfs/src/globals.h
+++ b/libglusterfs/src/globals.h
@@ -42,10 +42,15 @@ xlator_t **__glusterfs_this_location ();
xlator_t *glusterfs_this_get ();
int glusterfs_this_set (xlator_t *);
+/* central log */
+
void glusterfs_central_log_flag_set ();
long glusterfs_central_log_flag_get ();
void glusterfs_central_log_flag_unset ();
+/* task */
+void *synctask_get ();
+int synctask_set (void *);
/* init */
int glusterfs_globals_init (void);
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
new file mode 100644
index 000000000..beb5d9db4
--- /dev/null
+++ b/libglusterfs/src/syncop.c
@@ -0,0 +1,350 @@
+/*
+ Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "syncop.h"
+
+
+void
+synctask_yield (struct synctask *task)
+{
+ struct syncenv *env = NULL;
+
+ env = task->env;
+
+ if (swapcontext (&task->ctx, &env->sched) < 0) {
+ gf_log ("syncop", GF_LOG_ERROR,
+ "swapcontext failed (%s)", strerror (errno));
+ }
+}
+
+
+void
+synctask_yawn (struct synctask *task)
+{
+ struct syncenv *env = NULL;
+
+ env = task->env;
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ list_del_init (&task->all_tasks);
+ list_add (&task->all_tasks, &env->waitq);
+ }
+ pthread_mutex_unlock (&env->mutex);
+}
+
+
+void
+synctask_zzzz (struct synctask *task)
+{
+ synctask_yawn (task);
+
+ synctask_yield (task);
+}
+
+
+void
+synctask_wake (struct synctask *task)
+{
+ struct syncenv *env = NULL;
+
+ env = task->env;
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ list_del_init (&task->all_tasks);
+ list_add_tail (&task->all_tasks, &env->runq);
+ }
+ pthread_mutex_unlock (&env->mutex);
+
+ pthread_cond_broadcast (&env->cond);
+}
+
+
+void
+synctask_wrap (struct synctask *task)
+{
+ int ret;
+
+ ret = task->syncfn (task->opaque);
+ task->synccbk (ret, task->opaque);
+
+ /* cannot destroy @task right here as we are
+ in the execution stack of @task itself
+ */
+ task->complete = 1;
+ synctask_wake (task);
+}
+
+
+void
+synctask_destroy (struct synctask *task)
+{
+ if (!task)
+ return;
+
+ if (task->stack)
+ FREE (task);
+ FREE (task);
+}
+
+
+int
+synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
+ void *opaque)
+{
+ struct synctask *newtask = NULL;
+
+ newtask = CALLOC (1, sizeof (*newtask));
+ if (!newtask)
+ return -ENOMEM;
+
+ newtask->env = env;
+ newtask->xl = THIS;
+ newtask->syncfn = fn;
+ newtask->synccbk = cbk;
+ newtask->opaque = opaque;
+
+ INIT_LIST_HEAD (&newtask->all_tasks);
+
+ if (getcontext (&newtask->ctx) < 0) {
+ gf_log ("syncop", GF_LOG_ERROR,
+ "getcontext failed (%s)",
+ strerror (errno));
+ goto err;
+ }
+
+ newtask->stack = CALLOC (1, env->stacksize);
+ if (!newtask->stack) {
+ gf_log ("syncop", GF_LOG_ERROR,
+ "out of memory for stack");
+ goto err;
+ }
+
+ newtask->ctx.uc_stack.ss_sp = newtask->stack;
+ newtask->ctx.uc_stack.ss_size = env->stacksize;
+
+ makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask);
+
+ synctask_wake (newtask);
+
+ return 0;
+err:
+ if (newtask) {
+ if (newtask->stack)
+ FREE (newtask->stack);
+ FREE (newtask);
+ }
+ return -1;
+}
+
+
+struct synctask *
+syncenv_task (struct syncenv *env)
+{
+ struct synctask *task = NULL;
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ while (list_empty (&env->runq))
+ pthread_cond_wait (&env->cond, &env->mutex);
+
+ task = list_entry (env->runq.next, struct synctask, all_tasks);
+
+ list_del_init (&task->all_tasks);
+ }
+ pthread_mutex_unlock (&env->mutex);
+
+ return task;
+}
+
+
+void
+synctask_switchto (struct synctask *task)
+{
+ struct syncenv *env = NULL;
+
+ env = task->env;
+
+ synctask_set (task);
+ THIS = task->xl;
+
+ if (swapcontext (&env->sched, &task->ctx) < 0) {
+ gf_log ("syncop", GF_LOG_ERROR,
+ "swapcontext failed (%s)", strerror (errno));
+ }
+}
+
+
+void *
+syncenv_processor (void *thdata)
+{
+ struct syncenv *env = NULL;
+ struct synctask *task = NULL;
+
+ env = thdata;
+
+ for (;;) {
+ task = syncenv_task (env);
+
+ if (task->complete) {
+ synctask_destroy (task);
+ continue;
+ }
+
+ synctask_switchto (task);
+ }
+
+ return NULL;
+}
+
+
+void
+syncenv_destroy (struct syncenv *env)
+{
+
+}
+
+
+struct syncenv *
+syncenv_new (size_t stacksize)
+{
+ struct syncenv *newenv = NULL;
+ int ret = 0;
+
+ newenv = CALLOC (1, sizeof (*newenv));
+
+ if (!newenv)
+ return NULL;
+
+ pthread_mutex_init (&newenv->mutex, NULL);
+ pthread_cond_init (&newenv->cond, NULL);
+
+ INIT_LIST_HEAD (&newenv->runq);
+ INIT_LIST_HEAD (&newenv->waitq);
+
+ newenv->stacksize = SYNCENV_DEFAULT_STACKSIZE;
+ if (stacksize)
+ newenv->stacksize = stacksize;
+
+ ret = pthread_create (&newenv->processor, NULL,
+ syncenv_processor, newenv);
+
+ if (ret != 0)
+ syncenv_destroy (newenv);
+
+ return newenv;
+}
+
+
+/* FOPS */
+
+
+int
+syncop_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, inode_t *inode,
+ struct iatt *iatt, dict_t *xattr, struct iatt *parent)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ if (op_ret == 0) {
+ args->iatt1 = *iatt;
+ args->xattr = xattr;
+ args->iatt2 = *parent;
+ }
+
+ __wake (args);
+
+ return 0;
+}
+
+
+int
+syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req,
+ struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_lookup_cbk, subvol->fops->lookup,
+ loc, xattr_req);
+
+ if (iatt)
+ *iatt = args.iatt1;
+ if (xattr_rsp)
+ *xattr_rsp = args.xattr;
+ if (parent)
+ *parent = args.iatt2;
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
+
+
+int
+syncop_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno,
+ struct iatt *preop, struct iatt *postop)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ if (op_ret == 0) {
+ args->iatt1 = *preop;
+ args->iatt2 = *postop;
+ }
+
+ __wake (args);
+
+ return 0;
+}
+
+
+int
+syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid,
+ struct iatt *preop, struct iatt *postop)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_setattr_cbk, subvol->fops->setattr,
+ loc, iatt, valid);
+
+ if (preop)
+ *preop = args.iatt1;
+ if (postop)
+ *postop = args.iatt2;
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h
new file mode 100644
index 000000000..ce364b073
--- /dev/null
+++ b/libglusterfs/src/syncop.h
@@ -0,0 +1,164 @@
+/*
+ Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _SYNCOP_H
+#define _SYNCOP_H
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "xlator.h"
+#include <sys/time.h>
+#include <pthread.h>
+#include <ucontext.h>
+
+
+struct synctask;
+struct syncenv;
+
+
+typedef int (*synctask_cbk_t) (int ret, void *opaque);
+
+typedef int (*synctask_fn_t) (void *opaque);
+
+
+/* for one sequential execution of @syncfn */
+struct synctask {
+ struct list_head all_tasks;
+ struct syncenv *env;
+ xlator_t *xl;
+ synctask_cbk_t synccbk;
+ synctask_fn_t syncfn;
+ void *opaque;
+ void *stack;
+ int complete;
+
+ ucontext_t ctx;
+};
+
+/* hosts the scheduler thread and framework for executing synctasks */
+struct syncenv {
+ pthread_t processor;
+ struct synctask *current;
+
+ struct list_head runq;
+ struct list_head waitq;
+
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+
+ ucontext_t sched;
+ size_t stacksize;
+};
+
+
+struct syncargs {
+ int op_ret;
+ int op_errno;
+ struct iatt iatt1;
+ struct iatt iatt2;
+ dict_t *xattr;
+
+ /* do not touch */
+ pthread_mutex_t mutex;
+ char complete;
+ pthread_cond_t cond;
+ struct synctask *task;
+};
+
+
+#define __yawn(args) do { \
+ struct synctask *task = NULL; \
+ \
+ task = synctask_get (); \
+ if (task) { \
+ args->task = task; \
+ synctask_yawn (task); \
+ } else { \
+ pthread_mutex_init (&args->mutex, NULL); \
+ pthread_cond_init (&args->cond, NULL); \
+ } \
+} while (0)
+
+
+#define __yield(args) do { \
+ if (args->task) { \
+ synctask_yield (args->task); \
+ } else { \
+ pthread_mutex_lock (&args->mutex); \
+ { \
+ while (!args->complete) \
+ pthread_cond_wait (&args->cond, \
+ &args->mutex); \
+ } \
+ pthread_mutex_unlock (&args->mutex); \
+ \
+ pthread_mutex_destroy (&args->mutex); \
+ pthread_cond_destroy (&args->cond); \
+ } \
+} while (0)
+
+
+#define __wake(args) do { \
+ if (args->task) { \
+ synctask_wake (args->task); \
+ } else { \
+ pthread_mutex_lock (&args->mutex); \
+ { \
+ args->complete = 1; \
+ pthread_cond_broadcast (&args->cond); \
+ } \
+ pthread_mutex_unlock (&args->mutex); \
+ } \
+} while (0)
+
+
+#define SYNCOP(subvol, stb, cbk, op, params ...) do { \
+ call_frame_t *frame = NULL; \
+ \
+ frame = create_frame (THIS, THIS->ctx->pool); \
+ \
+ __yawn (stb); \
+ STACK_WIND_COOKIE (frame, (void *)stb, cbk, subvol, op, params);\
+ __yield (stb); \
+} while (0)
+
+
+#define SYNCENV_DEFAULT_STACKSIZE (16 * 1024)
+
+struct syncenv * syncenv_new ();
+void syncenv_destroy (struct syncenv *);
+
+int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, void *);
+void synctask_zzzz (struct synctask *task);
+void synctask_yawn (struct synctask *task);
+void synctask_wake (struct synctask *task);
+void synctask_yield (struct synctask *task);
+
+int syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req,
+ /* out */
+ struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent);
+
+int syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid,
+ /* out */
+ struct iatt *preop, struct iatt *postop);
+
+#endif /* _SYNCOP_H */