diff options
Diffstat (limited to 'libglusterfs/src/syncop.h')
| -rw-r--r-- | libglusterfs/src/syncop.h | 329 |
1 files changed, 260 insertions, 69 deletions
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index 627fb6197..f790981f0 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -1,20 +1,11 @@ /* - Copyright (c) 2010-2011 Gluster, Inc. <http://www.gluster.com> + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ #ifndef _SYNCOP_H @@ -30,8 +21,19 @@ #include <pthread.h> #include <ucontext.h> +#define SYNCENV_PROC_MAX 16 +#define SYNCENV_PROC_MIN 2 +#define SYNCPROC_IDLE_TIME 600 + +/* + * Flags for syncopctx valid elements + */ +#define SYNCOPCTX_UID 0x00000001 +#define SYNCOPCTX_GID 0x00000002 +#define SYNCOPCTX_GROUPS 0x00000004 struct synctask; +struct syncproc; struct syncenv; @@ -40,37 +42,91 @@ typedef int (*synctask_cbk_t) (int ret, call_frame_t *frame, void *opaque); typedef int (*synctask_fn_t) (void *opaque); +typedef enum { + SYNCTASK_INIT = 0, + SYNCTASK_RUN, + SYNCTASK_SUSPEND, + SYNCTASK_WAIT, + SYNCTASK_DONE, + SYNCTASK_ZOMBIE, +} synctask_state_t; + /* for one sequential execution of @syncfn */ struct synctask { struct list_head all_tasks; struct syncenv *env; xlator_t *xl; call_frame_t *frame; + call_frame_t *opframe; synctask_cbk_t synccbk; synctask_fn_t syncfn; + synctask_state_t state; void *opaque; void *stack; - int complete; + int woken; + int slept; + int ret; + + uid_t uid; + gid_t gid; ucontext_t ctx; + struct syncproc *proc; + + pthread_mutex_t mutex; /* for synchronous spawning of synctask */ + pthread_cond_t cond; + int done; + + struct list_head waitq; /* can wait only "once" at a time */ }; -/* hosts the scheduler thread and framework for executing synctasks */ -struct syncenv { + +struct syncproc { pthread_t processor; + ucontext_t sched; + struct syncenv *env; struct synctask *current; +}; + +/* hosts the scheduler thread and framework for executing synctasks */ +struct syncenv { + struct syncproc proc[SYNCENV_PROC_MAX]; + int procs; struct list_head runq; + int runcount; struct list_head waitq; + int waitcount; + + int procmin; + int procmax; pthread_mutex_t mutex; pthread_cond_t cond; - ucontext_t sched; size_t stacksize; }; +struct synclock { + pthread_mutex_t guard; /* guard the remaining members, pair @cond */ + pthread_cond_t cond; /* waiting non-synctasks */ + struct list_head waitq; /* waiting synctasks */ + gf_boolean_t lock; /* _gf_true or _gf_false, lock status */ + struct synctask *owner; /* NULL if current owner is not a synctask */ +}; +typedef struct synclock synclock_t; + + +struct syncbarrier { + pthread_mutex_t guard; /* guard the remaining members, pair @cond */ + pthread_cond_t cond; /* waiting non-synctasks */ + struct list_head waitq; /* waiting synctasks */ + int count; /* count the number of wakes */ +}; +typedef struct syncbarrier syncbarrier_t; + + struct syncargs { int op_ret; int op_errno; @@ -83,82 +139,197 @@ struct syncargs { int count; struct iobref *iobref; char *buffer; + dict_t *xdata; + struct gf_flock flock; + + /* some more _cbk needs */ + uuid_t uuid; + char *errstr; + dict_t *dict; + pthread_mutex_t lock_dict; + + syncbarrier_t barrier; /* do not touch */ + struct synctask *task; pthread_mutex_t mutex; - char complete; pthread_cond_t cond; - struct synctask *task; + int done; }; +struct syncopctx { + unsigned int valid; /* valid flags for elements that are set */ + uid_t uid; + gid_t gid; + int grpsize; + int ngrps; + gid_t *groups; +}; -#define __yawn(args) do { \ - struct synctask *task = NULL; \ - \ - task = synctask_get (); \ - if (task) { \ - args->task = task; \ - synctask_yawn (task); \ - } else { \ - pthread_mutex_init (&args->mutex, NULL); \ - pthread_cond_init (&args->cond, NULL); \ - } \ +#define __yawn(args) do { \ + args->task = synctask_get (); \ + if (args->task) \ + break; \ + pthread_mutex_init (&args->mutex, NULL); \ + pthread_cond_init (&args->cond, NULL); \ + args->done = 0; \ } while (0) -#define __yield(args) do { \ - if (args->task) { \ - synctask_yield (args->task); \ - } else { \ - pthread_mutex_lock (&args->mutex); \ - { \ - while (!args->complete) \ - pthread_cond_wait (&args->cond, \ - &args->mutex); \ - } \ - pthread_mutex_unlock (&args->mutex); \ - \ - pthread_mutex_destroy (&args->mutex); \ - pthread_cond_destroy (&args->cond); \ - } \ +#define __wake(args) do { \ + if (args->task) { \ + synctask_wake (args->task); \ + } else { \ + pthread_mutex_lock (&args->mutex); \ + { \ + args->done = 1; \ + pthread_cond_signal (&args->cond); \ + } \ + pthread_mutex_unlock (&args->mutex); \ + } \ } while (0) -#define __wake(args) do { \ - if (args->task) { \ - synctask_wake (args->task); \ - } else { \ - pthread_mutex_lock (&args->mutex); \ - { \ - args->complete = 1; \ - pthread_cond_broadcast (&args->cond); \ - } \ - pthread_mutex_unlock (&args->mutex); \ - } \ - } while (0) +#define __yield(args) do { \ + if (args->task) { \ + synctask_yield (args->task); \ + } else { \ + pthread_mutex_lock (&args->mutex); \ + { \ + while (!args->done) \ + pthread_cond_wait (&args->cond, \ + &args->mutex); \ + } \ + pthread_mutex_unlock (&args->mutex); \ + pthread_mutex_destroy (&args->mutex); \ + pthread_cond_destroy (&args->cond); \ + } \ + } while (0) #define SYNCOP(subvol, stb, cbk, op, params ...) do { \ - call_frame_t *frame = NULL; \ + struct synctask *task = NULL; \ + call_frame_t *frame = NULL; \ + \ + task = synctask_get (); \ + stb->task = task; \ + if (task) \ + frame = task->opframe; \ + else \ + frame = syncop_create_frame (THIS); \ \ - frame = syncop_create_frame (); \ + if (task) { \ + frame->root->uid = task->uid; \ + frame->root->gid = task->gid; \ + } \ \ __yawn (stb); \ - STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, op, params); \ + \ + STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, \ + op, params); \ + \ __yield (stb); \ + if (task) \ + STACK_RESET (frame->root); \ + else \ + STACK_DESTROY (frame->root); \ } while (0) #define SYNCENV_DEFAULT_STACKSIZE (2 * 1024 * 1024) -struct syncenv * syncenv_new (); +struct syncenv * syncenv_new (size_t stacksize, int procmin, int procmax); void syncenv_destroy (struct syncenv *); +void syncenv_scale (struct syncenv *env); int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *); -void synctask_zzzz (struct synctask *task); -void synctask_yawn (struct synctask *task); +struct synctask *synctask_create (struct syncenv *, synctask_fn_t, + synctask_cbk_t, call_frame_t *, void *); +int synctask_join (struct synctask *task); void synctask_wake (struct synctask *task); void synctask_yield (struct synctask *task); +void synctask_waitfor (struct synctask *task, int count); + +#define synctask_barrier_init(args) syncbarrier_init (&args->barrier) +#define synctask_barrier_wait(args, n) syncbarrier_wait (&args->barrier, n) +#define synctask_barrier_wake(args) syncbarrier_wake (&args->barrier) + +int synctask_setid (struct synctask *task, uid_t uid, gid_t gid); +#define SYNCTASK_SETID(uid, gid) synctask_setid (synctask_get(), uid, gid); + +int syncopctx_setfsuid (void *uid); +int syncopctx_setfsgid (void *gid); +int syncopctx_setfsgroups (int count, const void *groups); + +static inline call_frame_t * +syncop_create_frame (xlator_t *this) +{ + call_frame_t *frame = NULL; + int ngrps = -1; + struct syncopctx *opctx = NULL; + + frame = create_frame (this, this->ctx->pool); + if (!frame) + return NULL; + + frame->root->pid = getpid (); + + opctx = syncopctx_getctx (); + if (opctx && (opctx->valid & SYNCOPCTX_UID)) + frame->root->uid = opctx->uid; + else + frame->root->uid = geteuid (); + + if (opctx && (opctx->valid & SYNCOPCTX_GID)) + frame->root->gid = opctx->gid; + else + frame->root->gid = getegid (); + + if (opctx && (opctx->valid & SYNCOPCTX_GROUPS)) { + ngrps = opctx->ngrps; + + if (ngrps != 0 && opctx->groups != NULL) { + if (call_stack_alloc_groups (frame->root, ngrps) != 0) { + STACK_DESTROY (frame->root); + return NULL; + } + + memcpy (frame->root->groups, opctx->groups, + (sizeof (gid_t) * ngrps)); + } + } + else { + ngrps = getgroups (0, 0); + if (ngrps < 0) { + STACK_DESTROY (frame->root); + return NULL; + } + + if (call_stack_alloc_groups (frame->root, ngrps) != 0) { + STACK_DESTROY (frame->root); + return NULL; + } + + if (getgroups (ngrps, frame->root->groups) < 0) { + STACK_DESTROY (frame->root); + return NULL; + } + } + + return frame; +} + +int synclock_init (synclock_t *lock); +int synclock_destory (synclock_t *lock); +int synclock_lock (synclock_t *lock); +int synclock_trylock (synclock_t *lock); +int synclock_unlock (synclock_t *lock); + + +int syncbarrier_init (syncbarrier_t *barrier); +int syncbarrier_wait (syncbarrier_t *barrier, int waitfor); +int syncbarrier_wake (syncbarrier_t *barrier); +int syncbarrier_destroy (syncbarrier_t *barrier); int syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req, /* out */ @@ -169,6 +340,9 @@ int syncop_readdirp (xlator_t *subvol, fd_t *fd, size_t size, off_t off, /* out */ gf_dirent_t *entries); +int syncop_readdir (xlator_t *subvol, fd_t *fd, size_t size, off_t off, + gf_dirent_t *entries); + int syncop_opendir (xlator_t *subvol, loc_t *loc, fd_t *fd); int syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid, @@ -190,13 +364,13 @@ int syncop_removexattr (xlator_t *subvol, loc_t *loc, const char *name); int syncop_fremovexattr (xlator_t *subvol, fd_t *fd, const char *name); int syncop_create (xlator_t *subvol, loc_t *loc, int32_t flags, mode_t mode, - fd_t *fd, dict_t *dict); + fd_t *fd, dict_t *dict, struct iatt *iatt); int syncop_open (xlator_t *subvol, loc_t *loc, int32_t flags, fd_t *fd); int syncop_close (fd_t *fd); int syncop_write (xlator_t *subvol, fd_t *fd, const char *buf, int size, off_t offset, struct iobref *iobref, uint32_t flags); -int syncop_writev (xlator_t *subvol, fd_t *fd, struct iovec *vector, +int syncop_writev (xlator_t *subvol, fd_t *fd, const struct iovec *vector, int32_t count, off_t offset, struct iobref *iobref, uint32_t flags); int syncop_readv (xlator_t *subvol, fd_t *fd, size_t size, off_t off, @@ -208,14 +382,31 @@ int syncop_ftruncate (xlator_t *subvol, fd_t *fd, off_t offset); int syncop_truncate (xlator_t *subvol, loc_t *loc, off_t offset); int syncop_unlink (xlator_t *subvol, loc_t *loc); +int syncop_rmdir (xlator_t *subvol, loc_t *loc); -int syncop_fsync (xlator_t *subvol, fd_t *fd); +int syncop_fsync (xlator_t *subvol, fd_t *fd, int dataonly); +int syncop_flush (xlator_t *subvol, fd_t *fd); int syncop_fstat (xlator_t *subvol, fd_t *fd, struct iatt *stbuf); int syncop_stat (xlator_t *subvol, loc_t *loc, struct iatt *stbuf); -int syncop_symlink (xlator_t *subvol, loc_t *loc, char *newpath, dict_t *dict); +int syncop_symlink (xlator_t *subvol, loc_t *loc, const char *newpath, + dict_t *dict, struct iatt *iatt); int syncop_readlink (xlator_t *subvol, loc_t *loc, char **buffer, size_t size); int syncop_mknod (xlator_t *subvol, loc_t *loc, mode_t mode, dev_t rdev, - dict_t *dict); + dict_t *dict, struct iatt *iatt); +int syncop_mkdir (xlator_t *subvol, loc_t *loc, mode_t mode, dict_t *dict, + struct iatt *iatt); +int syncop_link (xlator_t *subvol, loc_t *oldloc, loc_t *newloc); +int syncop_fsyncdir (xlator_t *subvol, fd_t *fd, int datasync); +int syncop_access (xlator_t *subvol, loc_t *loc, int32_t mask); +int syncop_fallocate(xlator_t *subvol, fd_t *fd, int32_t keep_size, off_t offset, + size_t len); +int syncop_discard(xlator_t *subvol, fd_t *fd, off_t offset, size_t len); + +int syncop_zerofill(xlator_t *subvol, fd_t *fd, off_t offset, size_t len); + +int syncop_rename (xlator_t *subvol, loc_t *oldloc, loc_t *newloc); + +int syncop_lk (xlator_t *subvol, fd_t *fd, int cmd, struct gf_flock *flock); #endif /* _SYNCOP_H */ |
