diff options
Diffstat (limited to 'libglusterfs/src/glusterfs/syncop.h')
| -rw-r--r-- | libglusterfs/src/glusterfs/syncop.h | 637 | 
1 files changed, 637 insertions, 0 deletions
diff --git a/libglusterfs/src/glusterfs/syncop.h b/libglusterfs/src/glusterfs/syncop.h new file mode 100644 index 00000000000..203abe92b57 --- /dev/null +++ b/libglusterfs/src/glusterfs/syncop.h @@ -0,0 +1,637 @@ +/* +  Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> +  This file is part of GlusterFS. + +  This file is licensed to you under your choice of the GNU Lesser +  General Public License, version 3 or any later version (LGPLv3 or +  later), or the GNU General Public License, version 2 (GPLv2), in all +  cases as published by the Free Software Foundation. +*/ + +#ifndef _SYNCOP_H +#define _SYNCOP_H + +#include "glusterfs/xlator.h" +#include <sys/time.h> +#include <pthread.h> +#include <ucontext.h> + +#define SYNCENV_PROC_MAX 16 +#define SYNCENV_PROC_MIN 2 +#define SYNCPROC_IDLE_TIME 600 + +/* + * Flags for syncopctx valid elements + */ +#define SYNCOPCTX_UID 0x00000001 +#define SYNCOPCTX_GID 0x00000002 +#define SYNCOPCTX_GROUPS 0x00000004 +#define SYNCOPCTX_PID 0x00000008 +#define SYNCOPCTX_LKOWNER 0x00000010 + +struct synctask; +struct syncproc; +struct syncenv; + +typedef int (*synctask_cbk_t)(int ret, call_frame_t *frame, void *opaque); + +typedef int (*synctask_fn_t)(void *opaque); + +typedef enum { +    SYNCTASK_INIT = 0, +    SYNCTASK_RUN, +    SYNCTASK_SUSPEND, +    SYNCTASK_WAIT, +    SYNCTASK_DONE, +    SYNCTASK_ZOMBIE, +} synctask_state_t; + +/* for one sequential execution of @syncfn */ +struct synctask { +    struct list_head all_tasks; +    struct syncenv *env; +    xlator_t *xl; +    call_frame_t *frame; +    call_frame_t *opframe; +    synctask_cbk_t synccbk; +    synctask_fn_t syncfn; +    synctask_state_t state; +    void *opaque; +    void *stack; +    int woken; +    int slept; +    int ret; + +    uid_t uid; +    gid_t gid; + +    ucontext_t ctx; +    struct syncproc *proc; + +    pthread_mutex_t mutex; /* for synchronous spawning of synctask */ +    pthread_cond_t cond; +    int done; + +    struct list_head waitq; /* can wait only "once" at a time */ +    char btbuf[GF_BACKTRACE_LEN]; +}; + +struct syncproc { +    pthread_t processor; +    ucontext_t sched; +    struct syncenv *env; +    struct synctask *current; +}; + +/* hosts the scheduler thread and framework for executing synctasks */ +struct syncenv { +    struct syncproc proc[SYNCENV_PROC_MAX]; +    int procs; + +    struct list_head runq; +    int runcount; +    struct list_head waitq; +    int waitcount; + +    int procmin; +    int procmax; + +    pthread_mutex_t mutex; +    pthread_cond_t cond; + +    size_t stacksize; + +    int destroy; /* FLAG to mark syncenv is in destroy mode +                    so that no more synctasks are accepted*/ +}; + +typedef enum { LOCK_NULL = 0, LOCK_TASK, LOCK_THREAD } lock_type_t; + +typedef enum { +    SYNC_LOCK_DEFAULT = 0, +    SYNC_LOCK_RECURSIVE, /*it allows recursive locking*/ +} lock_attr_t; + +struct synclock { +    pthread_mutex_t guard;  /* guard the remaining members, pair @cond */ +    pthread_cond_t cond;    /* waiting non-synctasks */ +    struct list_head waitq; /* waiting synctasks */ +    volatile int lock;      /* true(non zero) or false(zero), lock status */ +    lock_attr_t attr; +    struct synctask *owner; /* NULL if current owner is not a synctask */ +    pthread_t owner_tid; +    lock_type_t type; +}; +typedef struct synclock synclock_t; + +struct syncbarrier { +    gf_boolean_t initialized; /*Set on successful initialization*/ +    pthread_mutex_t guard;    /* guard the remaining members, pair @cond */ +    pthread_cond_t cond;      /* waiting non-synctasks */ +    struct list_head waitq;   /* waiting synctasks */ +    int count;                /* count the number of wakes */ +    int waitfor;              /* no. of wakes until which task can be in +                                 waitq before being woken up. */ +}; +typedef struct syncbarrier syncbarrier_t; + +struct syncargs { +    int op_ret; +    int op_errno; +    struct iatt iatt1; +    struct iatt iatt2; +    dict_t *xattr; +    struct statvfs statvfs_buf; +    struct iovec *vector; +    int count; +    struct iobref *iobref; +    char *buffer; +    dict_t *xdata; +    struct gf_flock flock; +    struct gf_lease lease; +    dict_t *dict_out; + +    /* some more _cbk needs */ +    uuid_t uuid; +    char *errstr; +    dict_t *dict; +    pthread_mutex_t lock_dict; + +    syncbarrier_t barrier; + +    /* do not touch */ +    struct synctask *task; +    pthread_mutex_t mutex; +    pthread_cond_t cond; +    int done; + +    gf_dirent_t entries; +    off_t offset; + +    lock_migration_info_t locklist; +}; + +struct syncopctx { +    unsigned int valid; /* valid flags for elements that are set */ +    uid_t uid; +    gid_t gid; +    int grpsize; +    int ngrps; +    gid_t *groups; +    pid_t pid; +    gf_lkowner_t lk_owner; +}; + +#define __yawn(args)                                                           \ +    do {                                                                       \ +        args->task = synctask_get();                                           \ +        if (args->task)                                                        \ +            break;                                                             \ +        pthread_mutex_init(&args->mutex, NULL);                                \ +        pthread_cond_init(&args->cond, NULL);                                  \ +        args->done = 0;                                                        \ +    } while (0) + +#define __wake(args)                                                           \ +    do {                                                                       \ +        if (args->task) {                                                      \ +            synctask_wake(args->task);                                         \ +        } else {                                                               \ +            pthread_mutex_lock(&args->mutex);                                  \ +            {                                                                  \ +                args->done = 1;                                                \ +                pthread_cond_signal(&args->cond);                              \ +            }                                                                  \ +            pthread_mutex_unlock(&args->mutex);                                \ +        }                                                                      \ +    } while (0) + +#define __yield(args)                                                          \ +    do {                                                                       \ +        if (args->task) {                                                      \ +            synctask_yield(args->task);                                        \ +        } else {                                                               \ +            pthread_mutex_lock(&args->mutex);                                  \ +            {                                                                  \ +                while (!args->done)                                            \ +                    pthread_cond_wait(&args->cond, &args->mutex);              \ +            }                                                                  \ +            pthread_mutex_unlock(&args->mutex);                                \ +            pthread_mutex_destroy(&args->mutex);                               \ +            pthread_cond_destroy(&args->cond);                                 \ +        }                                                                      \ +    } while (0) + +#define SYNCOP(subvol, stb, cbk, fn_op, params...)                             \ +    do {                                                                       \ +        struct synctask *task = NULL;                                          \ +        call_frame_t *frame = NULL;                                            \ +                                                                               \ +        task = synctask_get();                                                 \ +        stb->task = task;                                                      \ +        if (task)                                                              \ +            frame = task->opframe;                                             \ +        else                                                                   \ +            frame = syncop_create_frame(THIS);                                 \ +                                                                               \ +        if (task) {                                                            \ +            frame->root->uid = task->uid;                                      \ +            frame->root->gid = task->gid;                                      \ +        }                                                                      \ +                                                                               \ +        __yawn(stb);                                                           \ +                                                                               \ +        frame->op = get_fop_index_from_fn(subvol, fn_op);                      \ +        STACK_WIND_COOKIE(frame, cbk, (void *)stb, subvol, fn_op, params);     \ +                                                                               \ +        __yield(stb);                                                          \ +        if (task)                                                              \ +            STACK_RESET(frame->root);                                          \ +        else                                                                   \ +            STACK_DESTROY(frame->root);                                        \ +    } while (0) + +/* + * syncop_xxx() calls are executed in two ways, one is inside a synctask where + * the executing function will do 'swapcontext' and the other is without + * synctask where the executing thread is made to wait using pthread_cond_wait. + * Executing thread may change when syncop_xxx() is executed inside a synctask. + * This leads to errno_location change i.e. errno may give errno of + * non-executing thread. So errno is not touched inside a synctask execution. + * All gfapi calls are executed using the second way of executing syncop_xxx() + * where the executing thread waits using pthread_cond_wait so it is ok to set + * errno in these cases. The following macro makes syncop_xxx() behave just + * like a system call, where -1 is returned and errno is set when a failure + * occurs. + */ +#define DECODE_SYNCOP_ERR(ret)                                                 \ +    do {                                                                       \ +        if (ret < 0) {                                                         \ +            errno = -ret;                                                      \ +            ret = -1;                                                          \ +        } else {                                                               \ +            errno = 0;                                                         \ +        }                                                                      \ +    } while (0) + +#define SYNCENV_DEFAULT_STACKSIZE (2 * 1024 * 1024) + +struct syncenv * +syncenv_new(size_t stacksize, int procmin, int procmax); +void +syncenv_destroy(struct syncenv *); +void +syncenv_scale(struct syncenv *env); + +int +synctask_new1(struct syncenv *, size_t stacksize, synctask_fn_t, synctask_cbk_t, +              call_frame_t *frame, void *); +int +synctask_new(struct syncenv *, synctask_fn_t, synctask_cbk_t, +             call_frame_t *frame, void *); +struct synctask * +synctask_create(struct syncenv *, size_t stacksize, synctask_fn_t, +                synctask_cbk_t, call_frame_t *, void *); +int +synctask_join(struct synctask *task); +void +synctask_wake(struct synctask *task); +void +synctask_yield(struct synctask *task); +void +synctask_waitfor(struct synctask *task, int count); + +#define synctask_barrier_init(args) syncbarrier_init(&args->barrier) +#define synctask_barrier_wait(args, n) syncbarrier_wait(&args->barrier, n) +#define synctask_barrier_wake(args) syncbarrier_wake(&args->barrier) + +int +synctask_setid(struct synctask *task, uid_t uid, gid_t gid); +#define SYNCTASK_SETID(uid, gid) synctask_setid(synctask_get(), uid, gid); + +int +syncopctx_setfsuid(void *uid); +int +syncopctx_setfsgid(void *gid); +int +syncopctx_setfsgroups(int count, const void *groups); +int +syncopctx_setfspid(void *pid); +int +syncopctx_setfslkowner(gf_lkowner_t *lk_owner); + +static inline call_frame_t * +syncop_create_frame(xlator_t *this) +{ +    call_frame_t *frame = NULL; +    int ngrps = -1; +    struct syncopctx *opctx = NULL; + +    frame = create_frame(this, this->ctx->pool); +    if (!frame) +        return NULL; + +    frame->root->type = GF_OP_TYPE_FOP; +    opctx = syncopctx_getctx(); + +    if (opctx && (opctx->valid & SYNCOPCTX_PID)) +        frame->root->pid = opctx->pid; +    else +        frame->root->pid = getpid(); + +    if (opctx && (opctx->valid & SYNCOPCTX_UID)) +        frame->root->uid = opctx->uid; +    else +        frame->root->uid = geteuid(); + +    if (opctx && (opctx->valid & SYNCOPCTX_GID)) +        frame->root->gid = opctx->gid; +    else +        frame->root->gid = getegid(); + +    if (opctx && (opctx->valid & SYNCOPCTX_GROUPS)) { +        ngrps = opctx->ngrps; + +        if (ngrps != 0 && opctx->groups != NULL) { +            if (call_stack_alloc_groups(frame->root, ngrps) != 0) { +                STACK_DESTROY(frame->root); +                return NULL; +            } + +            memcpy(frame->root->groups, opctx->groups, (sizeof(gid_t) * ngrps)); +        } +    } else { +        ngrps = getgroups(0, 0); +        if (ngrps < 0) { +            STACK_DESTROY(frame->root); +            return NULL; +        } + +        if (call_stack_alloc_groups(frame->root, ngrps) != 0) { +            STACK_DESTROY(frame->root); +            return NULL; +        } + +        if (getgroups(ngrps, frame->root->groups) < 0) { +            STACK_DESTROY(frame->root); +            return NULL; +        } +    } + +    if (opctx && (opctx->valid & SYNCOPCTX_LKOWNER)) +        frame->root->lk_owner = opctx->lk_owner; + +    return frame; +} + +int +synclock_init(synclock_t *lock, lock_attr_t attr); +int +synclock_destroy(synclock_t *lock); +int +synclock_lock(synclock_t *lock); +int +synclock_trylock(synclock_t *lock); +int +synclock_unlock(synclock_t *lock); + +int +syncbarrier_init(syncbarrier_t *barrier); +int +syncbarrier_wait(syncbarrier_t *barrier, int waitfor); +int +syncbarrier_wake(syncbarrier_t *barrier); +int +syncbarrier_destroy(syncbarrier_t *barrier); + +int +syncop_lookup(xlator_t *subvol, loc_t *loc, +              /* out */ +              struct iatt *iatt, struct iatt *parent, +              /* xdata */ +              dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_readdirp(xlator_t *subvol, fd_t *fd, size_t size, off_t off, +                /* out */ +                gf_dirent_t *entries, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_readdir(xlator_t *subvol, fd_t *fd, size_t size, off_t off, +               gf_dirent_t *entries, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_opendir(xlator_t *subvol, loc_t *loc, fd_t *fd, dict_t *xdata_in, +               dict_t **xdata_out); + +int +syncop_setattr(xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid, +               /* out */ +               struct iatt *preop, struct iatt *postop, dict_t *xdata_in, +               dict_t **xdata_out); + +int +syncop_fsetattr(xlator_t *subvol, fd_t *fd, struct iatt *iatt, int valid, +                /* out */ +                struct iatt *preop, struct iatt *postop, dict_t *xdata_in, +                dict_t **xdata_out); + +int +syncop_statfs(xlator_t *subvol, loc_t *loc, +              /* out */ +              struct statvfs *buf, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_setxattr(xlator_t *subvol, loc_t *loc, dict_t *dict, int32_t flags, +                dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_fsetxattr(xlator_t *subvol, fd_t *fd, dict_t *dict, int32_t flags, +                 dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_listxattr(xlator_t *subvol, loc_t *loc, dict_t **dict, dict_t *xdata_in, +                 dict_t **xdata_out); + +int +syncop_getxattr(xlator_t *xl, loc_t *loc, dict_t **dict, const char *key, +                dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_fgetxattr(xlator_t *xl, fd_t *fd, dict_t **dict, const char *key, +                 dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_removexattr(xlator_t *subvol, loc_t *loc, const char *name, +                   dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_fremovexattr(xlator_t *subvol, fd_t *fd, const char *name, +                    dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_create(xlator_t *subvol, loc_t *loc, int32_t flags, mode_t mode, +              fd_t *fd, struct iatt *iatt, dict_t *xdata_in, +              dict_t **xdata_out); + +int +syncop_open(xlator_t *subvol, loc_t *loc, int32_t flags, fd_t *fd, +            dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_close(fd_t *fd); + +int +syncop_write(xlator_t *subvol, fd_t *fd, const char *buf, int size, +             off_t offset, struct iobref *iobref, uint32_t flags, +             dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_writev(xlator_t *subvol, fd_t *fd, const struct iovec *vector, +              int32_t count, off_t offset, struct iobref *iobref, +              uint32_t flags, struct iatt *preiatt, struct iatt *postiatt, +              dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_readv(xlator_t *subvol, fd_t *fd, size_t size, off_t off, uint32_t flags, +             /* out */ +             struct iovec **vector, int *count, struct iobref **iobref, +             struct iatt *iatt, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_ftruncate(xlator_t *subvol, fd_t *fd, off_t offset, struct iatt *preiatt, +                 struct iatt *postiatt, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_truncate(xlator_t *subvol, loc_t *loc, off_t offset, dict_t *xdata_in, +                dict_t **xdata_out); + +int +syncop_unlink(xlator_t *subvol, loc_t *loc, dict_t *xdata_in, +              dict_t **xdata_out); + +int +syncop_rmdir(xlator_t *subvol, loc_t *loc, int flags, dict_t *xdata_in, +             dict_t **xdata_out); + +int +syncop_fsync(xlator_t *subvol, fd_t *fd, int dataonly, struct iatt *preiatt, +             struct iatt *postiatt, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_flush(xlator_t *subvol, fd_t *fd, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_fstat(xlator_t *subvol, fd_t *fd, struct iatt *stbuf, dict_t *xdata_in, +             dict_t **xdata_out); + +int +syncop_stat(xlator_t *subvol, loc_t *loc, struct iatt *stbuf, dict_t *xdata_in, +            dict_t **xdata_out); + +int +syncop_symlink(xlator_t *subvol, loc_t *loc, const char *newpath, +               struct iatt *iatt, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_readlink(xlator_t *subvol, loc_t *loc, char **buffer, size_t size, +                dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_mknod(xlator_t *subvol, loc_t *loc, mode_t mode, dev_t rdev, +             struct iatt *iatt, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_mkdir(xlator_t *subvol, loc_t *loc, mode_t mode, struct iatt *iatt, +             dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_link(xlator_t *subvol, loc_t *oldloc, loc_t *newloc, struct iatt *iatt, +            dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_fsyncdir(xlator_t *subvol, fd_t *fd, int datasync, dict_t *xdata_in, +                dict_t **xdata_out); + +int +syncop_access(xlator_t *subvol, loc_t *loc, int32_t mask, dict_t *xdata_in, +              dict_t **xdata_out); + +int +syncop_fallocate(xlator_t *subvol, fd_t *fd, int32_t keep_size, off_t offset, +                 size_t len, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_discard(xlator_t *subvol, fd_t *fd, off_t offset, size_t len, +               dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_zerofill(xlator_t *subvol, fd_t *fd, off_t offset, off_t len, +                dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_rename(xlator_t *subvol, loc_t *oldloc, loc_t *newloc, dict_t *xdata_in, +              dict_t **xdata_out); + +int +syncop_lk(xlator_t *subvol, fd_t *fd, int cmd, struct gf_flock *flock, +          dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_inodelk(xlator_t *subvol, const char *volume, loc_t *loc, int32_t cmd, +               struct gf_flock *lock, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_lease(xlator_t *subvol, loc_t *loc, struct gf_lease *lease, +             dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_ipc(xlator_t *subvol, int op, dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_xattrop(xlator_t *subvol, loc_t *loc, gf_xattrop_flags_t flags, +               dict_t *dict, dict_t *xdata_in, dict_t **dict_out, +               dict_t **xdata_out); + +int +syncop_fxattrop(xlator_t *subvol, fd_t *fd, gf_xattrop_flags_t flags, +                dict_t *dict, dict_t *xdata_in, dict_t **dict_out, +                dict_t **xdata_out); + +int +syncop_seek(xlator_t *subvol, fd_t *fd, off_t offset, gf_seek_what_t what, +            dict_t *xdata_in, off_t *off); + +int +syncop_getactivelk(xlator_t *subvol, loc_t *loc, +                   lock_migration_info_t *locklist, dict_t *xdata_in, +                   dict_t **xdata_out); + +int +syncop_setactivelk_cbk(call_frame_t *frame, void *cookie, xlator_t *this, +                       int32_t op_ret, int32_t op_errno, dict_t *xdata); + +int +syncop_setactivelk(xlator_t *subvol, loc_t *loc, +                   lock_migration_info_t *locklist, dict_t *xdata_in, +                   dict_t **xdata_out); + +int +syncop_put(xlator_t *subvol, loc_t *loc, mode_t mode, mode_t umask, +           uint32_t flags, struct iovec *vector, int32_t count, off_t offset, +           struct iobref *iobref, dict_t *xattr, struct iatt *iatt, +           dict_t *xdata_in, dict_t **xdata_out); + +int +syncop_setactivelk_cbk(call_frame_t *frame, void *cookie, xlator_t *this, +                       int32_t op_ret, int32_t op_errno, dict_t *xdata); + +int +syncop_icreate(xlator_t *subvol, loc_t *loc, mode_t mode, dict_t *xdata_out); + +int +syncop_entrylk(xlator_t *subvol, const char *volume, loc_t *loc, +               const char *basename, entrylk_cmd cmd, entrylk_type type, +               dict_t *xdata_in, dict_t **xdata_out); + +#endif /* _SYNCOP_H */  | 
