diff options
| author | Susant Palai <spalai@redhat.com> | 2018-03-09 20:07:19 +0530 | 
|---|---|---|
| committer | Amar Tumballi <amarts@redhat.com> | 2018-04-10 01:09:29 +0000 | 
| commit | 48623a33a0ef38f6c99208b0580954d7d7c80e76 (patch) | |
| tree | bbe9aa572b36a41c3c64b93c43f2d43fd2634bd1 /xlators/features/cloudsync/src/cloudsync.c | |
| parent | f946d98a95249c8d906323e6419ec8538467d2ab (diff) | |
experimental/cloudsync: Download xlator for archival feature
spec-files:
https://review.gluster.org/#/c/18854/
Overview:
* Cloudsync maintains three file states in it's inode-ctx i.e
  1 - LOCAL,
  2 - REMOTE,
  3 - DOWNLOADING.
* A data modifying fop is allowed only if the state is LOCAL.
  If the state is REMOTE or DOWNLOADING, client will download
  or wait for the download to finish initiated by other client.
* Multiple download and upload from different clients are synchronized
  by inodelk.
* In POSIX a state check is done (part of different commit)before
  allowing the fop to continue. If the state is remote/downloading the
  fop is unwound with EREMOTE. The client will then download the file
  and continue with the fop again.
* Basic Algo for fop (let's say write fop):
  - If LOCAL -> resume fop
  - If REMOTE ->
	- INODELK
	- STAT (this gets state and heal the state if needed)
	- DOWNLOAD
	- resume fop
Note:
* Developers will need to write plugins for download, based on the
remote store they choose. In phase-1, support will be added for
one remote store per volume. In future, more options for multiple
remote stores will be explored.
TODOs:
 - Implement stat/lookup/readdirp to return size info from xattr
 - Make plugins configurable
 - Implement unlink fop
 - Add metrics collection
 - Add sharding support
Design Contributions:
Aravinda V K <avishwan@redhat.com>
Amar Tumballi <amarts@redhat.com>
Ram Ankireddypalle <areddy@commvault.com>
Susant Palai <spalai@redhat.com>
updates: #387
Change-Id: Iddf711ee7ab4e946ae3e472ff62791a7b85e6d4b
Signed-off-by: Susant Palai <spalai@redhat.com>
Diffstat (limited to 'xlators/features/cloudsync/src/cloudsync.c')
| -rw-r--r-- | xlators/features/cloudsync/src/cloudsync.c | 1673 | 
1 files changed, 1673 insertions, 0 deletions
diff --git a/xlators/features/cloudsync/src/cloudsync.c b/xlators/features/cloudsync/src/cloudsync.c new file mode 100644 index 00000000000..8d74202706e --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync.c @@ -0,0 +1,1673 @@ +/* + *   Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + *   This file is part of GlusterFS. + * + *   This file is licensed to you under your choice of the GNU Lesser + *   General Public License, version 3 or any later version (LGPLv3 or + *   later), or the GNU General Public License, version 2 (GPLv2), in all + *   cases as published by the Free Software Foundation. + */ + +#include "glusterfs.h" +#include "xlator.h" +#include "defaults.h" +#include "cloudsync.h" +#include "cloudsync-common.h" +#include "call-stub.h" +#include "cloudsync-autogen-fops.h" + +#include <dlfcn.h> + +void +cs_cleanup_private (cs_private_t *priv) +{ +        if (priv) { +                if (priv->stores) { +                        priv->stores->fini (priv->stores->config); +                        GF_FREE (priv->stores); +                } + +                pthread_spin_destroy (&priv->lock); +                GF_FREE (priv); +        } + +        return; +} + +int +cs_init (xlator_t *this) +{ +        cs_private_t            *priv = NULL; +        gf_boolean_t            per_vol = _gf_false; +        int                     ret = 0; +        char                    *libpath = ("libaws.so"); +        store_methods_t         *store_methods = NULL; +        void                    *handle = NULL; + +        priv = GF_CALLOC (1, sizeof (*priv), gf_cs_mt_cs_private_t); +        if (!priv) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "insufficient memory"); +                goto out; +        } + +        priv->this = this; + +        this->local_pool = mem_pool_new (cs_local_t, 512); +        if (!this->local_pool) { +                gf_msg (this->name, GF_LOG_ERROR, 0, ENOMEM, +                        "initialisation failed."); +                ret = -1; +                goto out; +        } + +        this->private = priv; + +        /* temp workaround. Should be configurable through glusterd*/ +        per_vol = _gf_true; + +        if (per_vol) { +                /*TODO:Need to make it configurable. This is a temp workaround*/ +                handle = dlopen (libpath, RTLD_NOW); +                if (!handle) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, "could not load" +                                " the required library. %s", dlerror ()); +                        goto out; +                } else { +                        gf_msg (this->name, GF_LOG_INFO, 0, 0, +                                "loading library:%s successful", libpath); +                } + + +                priv->stores = GF_CALLOC (1, sizeof (struct cs_remote_stores), +                                          gf_cs_mt_cs_remote_stores_t); +                if (!priv->stores) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Could not " +                                "allocate memory for priv->stores"); +                        ret = -1; +                        goto out; +                } + +                (void) dlerror (); /* clear out previous error string */ + +                /* load library methods */ +                store_methods = (store_methods_t *) dlsym (handle, "store_ops"); +                if (!store_methods) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, +                                "null store_methods %s", dlerror ()); +                        ret = -1; +                        goto out; +                } + +                (void) dlerror (); + +                priv->stores->dlfop = store_methods->fop_download; +                if (!priv->stores->dlfop) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, "failed to get" +                                " download fop %s", dlerror ()); +                        ret = -1; +                        goto out; +                } + +                (void) dlerror (); +                priv->stores->init = store_methods->fop_init; +                if (!priv->stores->init) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, "failed to get" +                                " init fop %s", dlerror ()); +                        ret = -1; +                        goto out; +                } + +                (void) dlerror (); +                priv->stores->reconfigure = store_methods->fop_reconfigure; +                if (!priv->stores->reconfigure) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, "failed to get" +                                " reconfigure fop %s", dlerror ()); +                        ret = -1; +                        goto out; +                } + +                priv->stores->handle = handle; + +                priv->stores->config = (void *) ((priv->stores->init) (this)); +                if (!priv->stores->config) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, "null config"); +                        ret = -1; +                        goto out; +                } +        } + +out: +        if (ret == -1) { +                if (this->local_pool) +                        mem_pool_destroy (this->local_pool); + +                cs_cleanup_private (priv); +        } + +        return ret; +} + +void +cs_fini (xlator_t *this) +{ +        cs_private_t *priv = NULL; +        priv = this->private; + +        cs_cleanup_private (priv); +} + +int +cs_reconfigure (xlator_t *this, dict_t *options) +{ +        cs_private_t    *priv = NULL; +        int              ret = 0; + +        priv = this->private; +        if (!priv) { +                ret = -1; +                goto out; +        } + +        /* needed only for per volume configuration*/ +        ret = priv->stores->reconfigure (this, options); + +out: +        return ret; +} + +int32_t +cs_mem_acct_init (xlator_t *this) +{ +        int     ret = -1; + +        GF_VALIDATE_OR_GOTO ("cloudsync", this, out); + +        ret = xlator_mem_acct_init (this, gf_cs_mt_end + 1); + +        if (ret != 0) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, +                        "Memory accounting init failed"); +                return ret; +        } +out: +        return ret; +} + +int32_t +cs_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                 int32_t op_ret, int32_t op_errno, gf_dirent_t *entries, +                 dict_t *xdata) +{ +        gf_dirent_t *tmp = NULL; +        char        *sxattr = NULL; +        uint64_t     ia_size = 0; +        int          ret = 0; + +        list_for_each_entry (tmp, &entries->list, list) { +                ret = dict_get_str (tmp->dict, GF_CS_OBJECT_SIZE, &sxattr); +                if (ret) { +                        gf_msg_trace (this->name, 0, "size xattr found"); +                        continue; +                } + +                ia_size = atoll (sxattr); +                tmp->d_stat.ia_size = ia_size; +        } + +        STACK_UNWIND_STRICT (readdirp, frame, op_ret, op_errno, +                             entries, xdata); +        return 0; +} + + +int32_t +cs_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, +             off_t off, dict_t *xdata) +{ +        int ret = 0; +        int op_errno = ENOMEM; + +        if (!xdata) { +                xdata = dict_new (); +                if (!xdata) { +                        goto err; +                } +        } + +        ret = dict_set_int32 (xdata, GF_CS_OBJECT_SIZE, 1); +        if (ret) { +                goto err; +        } + +        STACK_WIND (frame, cs_readdirp_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, +                    fd, size, off, xdata); +        return 0; +err: +        STACK_UNWIND_STRICT (readdirp, frame, -1, op_errno, +                             NULL, NULL); +        return 0; +} + + +int32_t +cs_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                 int32_t op_ret, int32_t op_errno, struct iatt *prebuf, +                 struct iatt *postbuf, dict_t *xdata) +{ +        cs_local_t      *local = NULL; +        int              ret = 0; +        uint64_t         val = 0; + +        local = frame->local; + +        /* Do we need lock here? */ +        local->call_cnt++; + +        if (op_ret == -1) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "truncate failed"); +                ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); +                if (ret == 0) { +                        if (val == GF_CS_ERROR) { +                                gf_msg (this->name, GF_LOG_ERROR, 0, 0, +                                        "could not get file state, unwinding"); +                                op_ret = -1; +                                op_errno = EIO; +                                goto unwind; +                        } else { +                                __cs_inode_ctx_update (this, local->loc.inode, +                                                       val); +                                gf_msg (this->name, GF_LOG_INFO, 0, 0, +                                        " state = %ld", val); + +                                if (local->call_cnt == 1 && +                                    (val == GF_CS_REMOTE || +                                     val == GF_CS_DOWNLOADING))  { +                                        gf_msg (this->name, GF_LOG_WARNING, 0, +                                                0, "will repair and download " +                                                "the file, current state : %ld", +                                                val); +                                        goto repair; +                                } else { +                                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, +                                                "second truncate, Unwinding"); +                                        goto unwind; +                                } +                        } +                } else { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, "file state " +                                "could not be figured, unwinding"); +                        goto unwind; +                } +        } else { +                /* successful write => file is local */ +                __cs_inode_ctx_update (this, local->loc.inode, GF_CS_LOCAL); +                gf_msg (this->name, GF_LOG_INFO, 0, 0, "state : GF_CS_LOCAL" +                        ", truncate successful"); + +                goto unwind; +        } + +repair: +        ret = locate_and_execute (frame); +        if (ret) { +                goto unwind; +        } + +        return 0; + +unwind: +        CS_STACK_UNWIND (truncate, frame, op_ret, op_errno, prebuf, postbuf, +                         xdata); +        return 0; +} + + +int32_t +cs_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, +             dict_t *xdata) +{ +        int                     op_errno        = -1; +        cs_local_t             *local           = NULL; +        int                     ret             = 0; +        cs_inode_ctx_t         *ctx             = NULL; +        gf_cs_obj_state            state           = -1; + +        VALIDATE_OR_GOTO (frame, err); +        VALIDATE_OR_GOTO (this, err); +        VALIDATE_OR_GOTO (loc, err); + +        local = cs_local_init (this, frame, loc, NULL, GF_FOP_TRUNCATE); +        if (!local) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local init failed"); +                op_errno = ENOMEM; +                goto err; +        } + +        __cs_inode_ctx_get (this, loc->inode, &ctx); + +        if (ctx) +                state = __cs_get_file_state (this, loc->inode, ctx); +        else +                state = GF_CS_LOCAL; + +        local->xattr_req = xdata ? dict_ref (xdata) : (xdata = dict_new ()); + +        ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); +        if (ret) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" +                        " %s", GF_CS_OBJECT_STATUS); +                goto err; +        } + +        local->stub = fop_truncate_stub (frame, cs_resume_truncate, loc, offset, +                                         xdata); +        if (!local->stub) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "insufficient memory"); +                op_errno = ENOMEM; +                goto err; +        } + +        if (state == GF_CS_LOCAL) { +                STACK_WIND (frame, cs_truncate_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->truncate, +                            loc, offset, xdata); + +        } else { +                local->call_cnt++; +                ret = locate_and_execute (frame); +                if (ret) { +                        op_errno = ENOMEM; +                        goto err; +                } +        } + +        return 0; +err: +        CS_STACK_UNWIND (truncate, frame, -1, op_errno, NULL, NULL, NULL); +        return 0; +} + +int32_t +cs_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +        int32_t op_ret, int32_t op_errno, struct statvfs *buf, +        dict_t *xdata) +{ +        STACK_UNWIND_STRICT (statfs, frame, op_ret, op_errno, +                             buf, xdata); +        return 0; +} + + +int32_t +cs_statfs (call_frame_t *frame, xlator_t *this, +        loc_t *loc, +        dict_t *xdata) +{ +        STACK_WIND (frame, cs_statfs_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->statfs, +                    loc, xdata); +        return 0; +} + + +int32_t +cs_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +        int32_t op_ret, int32_t op_errno, dict_t *dict, +        dict_t *xdata) +{ +        STACK_UNWIND_STRICT (getxattr, frame, op_ret, op_errno, +                             dict, xdata); +        return 0; +} + + +int32_t +cs_getxattr (call_frame_t *frame, xlator_t *this, +        loc_t *loc, +        const char *name, +        dict_t *xattr_req) +{ +        STACK_WIND (frame, cs_getxattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr, +                    loc, name, xattr_req); +        return 0; +} + +int32_t +cs_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +        int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        cs_local_t      *local = NULL; + +        local = frame->local; + +        if (local->locked) +                cs_inodelk_unlock (frame); + +        CS_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata); + +        return 0; +} + + +int32_t +cs_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, +             int32_t flags, dict_t *xdata) +{ +        data_t          *tmp = NULL; +        cs_local_t      *local = NULL; +        int              ret = 0; + +        VALIDATE_OR_GOTO (frame, err); +        VALIDATE_OR_GOTO (this, err); + +        local = cs_local_init (this, frame, loc, NULL, GF_FOP_SETXATTR); +        if (!local) { +                ret = -1; +                goto err; +        } + +        local->xattr_req = xdata ? dict_ref (xdata) : (xdata = dict_new ()); + +        tmp = dict_get (dict, GF_CS_OBJECT_UPLOAD_COMPLETE); +        if (tmp) { +                /* Value of key should be the atime */ +                local->stub = fop_setxattr_stub (frame, cs_resume_setxattr, +                                                 loc, dict, flags, xdata); + +                if (!local->stub) +                        goto err; + +                ret = locate_and_execute (frame); +                if (ret) { +                        goto err; +                } + +                return 0; +        } + +        STACK_WIND (frame, cs_setxattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, +                    loc, dict, flags, xdata); +        return 0; +err: +        CS_STACK_UNWIND (setxattr, frame, -1, errno, NULL); +        return 0; +} + + +int32_t +cs_fgetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +        int32_t op_ret, int32_t op_errno, dict_t *dict, +        dict_t *xdata) +{ +        STACK_UNWIND_STRICT (fgetxattr, frame, op_ret, op_errno, +                             dict, xdata); +        return 0; +} + + +int32_t +cs_fgetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name, +              dict_t *xdata) +{ +        STACK_WIND (frame, cs_fgetxattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fgetxattr, +                    fd, name, xdata); +        return 0; +} + + + +int32_t +cs_fsetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +        int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno, +                             xdata); +        return 0; +} + + +int32_t +cs_fsetxattr (call_frame_t *frame, xlator_t *this, +        fd_t *fd, +        dict_t *dict, +        int32_t flags, +        dict_t *xdata) +{ +        STACK_WIND (frame, cs_fsetxattr_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, +                    fd, dict, flags, xdata); +        return 0; +} + +int32_t +cs_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +        int32_t op_ret, int32_t op_errno, struct iatt *preparent, +        struct iatt *postparent, +        dict_t *xdata) +{ +        STACK_UNWIND_STRICT (unlink, frame, op_ret, op_errno, +                             preparent, postparent, xdata); +        return 0; +} + + +int32_t +cs_unlink (call_frame_t *frame, xlator_t *this, +        loc_t *loc, +        int32_t flags, +        dict_t *xattr_req) +{ +        cs_local_t      *local = NULL; +        int              ret   = 0; + +        local = cs_local_init (this, frame, loc, NULL, GF_FOP_UNLINK); +        if (!local) +                goto err; + +        local->xattr_req = xattr_req ? dict_ref (xattr_req) : dict_new (); + +        ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); +        if (ret) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" +                        " %s", GF_CS_OBJECT_STATUS); +                goto err; +        } +        STACK_WIND (frame, cs_unlink_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, +                    loc, flags, local->xattr_req); +        return 0; +err: +        CS_STACK_UNWIND (unlink, frame, -1, errno, NULL, NULL, NULL); +        return 0; +} + + +int32_t +cs_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +        int32_t op_ret, int32_t op_errno, fd_t *fd, +        dict_t *xdata) +{ +        int              ret = 0; +        uint64_t         val = 0; + +        if (op_ret == 0) { +                ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); +                if (!ret) { +                        ret = __cs_inode_ctx_update (this, fd->inode, val); +                        if (ret) { +                                gf_msg (this->name, GF_LOG_ERROR, 0, 0, +                                        "ctx update failed"); +                        } +                } +        } else { +                cs_inode_ctx_reset (this, fd->inode); +        } + +        CS_STACK_UNWIND (open, frame, op_ret, op_errno, fd, xdata); +        return 0; +} + + +int32_t +cs_open (call_frame_t *frame, xlator_t *this, +        loc_t *loc, +        int32_t flags, +        fd_t *fd, +        dict_t *xattr_req) +{ +        cs_local_t      *local = NULL; +        int              ret   = 0; + +        local = cs_local_init (this, frame, NULL, fd, GF_FOP_OPEN); +        if (!local) +                goto err; + +        local->xattr_req = xattr_req ? dict_ref (xattr_req) : dict_new (); + +        ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); +        if (ret) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" +                        " %s", GF_CS_OBJECT_STATUS); +                goto err; +        } + +        STACK_WIND (frame, cs_open_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, +                    loc, flags, fd, local->xattr_req); +        return 0; +err: +        CS_STACK_UNWIND (open, frame, -1, errno, NULL, NULL); +        return 0; +} + + +int32_t +cs_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +        int32_t op_ret, int32_t op_errno, struct iatt *buf, +        dict_t *xdata) +{ +        int              ret = 0; +        uint64_t         val = 0; +        fd_t            *fd  = NULL; +        cs_local_t      *local = NULL; + +        local = frame->local; + +        fd = local->fd; + +        if (op_ret == 0) { +                ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); +                if (!ret) { +                        gf_msg_debug (this->name, 0, "state %ld", val); +                        ret = __cs_inode_ctx_update (this, fd->inode, val); +                        if (ret) { +                                gf_msg (this->name, GF_LOG_ERROR, 0, 0, +                                        "ctx update failed"); +                        } +                } +        } else { +                cs_inode_ctx_reset (this, fd->inode); +        } + +        CS_STACK_UNWIND (fstat, frame, op_ret, op_errno, buf, xdata); + +        return 0; +} + + +int32_t +cs_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xattr_req) +{ +        cs_local_t      *local = NULL; +        int              ret   = 0; + +        local = cs_local_init (this, frame, NULL, fd, GF_FOP_FSTAT); +        if (!local) +                goto err; + +        if (fd->inode->ia_type == IA_IFDIR) +                goto wind; + +        local->xattr_req = xattr_req ? dict_ref (xattr_req) : dict_new (); + +        ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); +        if (ret) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" +                        " %s", GF_CS_OBJECT_STATUS); +                goto err; +        } + +wind: +        STACK_WIND (frame, cs_fstat_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, +                    fd, local->xattr_req); +        return 0; +err: +        CS_STACK_UNWIND (fstat, frame, -1, errno, NULL, NULL); +        return 0; +} + +cs_local_t * +cs_local_init (xlator_t *this, call_frame_t *frame, loc_t *loc, fd_t *fd, +               glusterfs_fop_t fop) +{ +        cs_local_t      *local = NULL; +        int              ret   = 0; + +        local = mem_get0 (this->local_pool); +        if (!local) +                goto out; + +        if (loc) { +                ret = loc_copy (&local->loc, loc); +                if (ret) +                        goto out; +        } + +        if (fd) { +                local->fd = fd_ref (fd); +        } + +        local->op_ret   = -1; +        local->op_errno = EUCLEAN; +        local->fop      = fop; +        local->dloffset = 0; +        frame->local = local; +        local->locked = _gf_false; +        local->call_cnt = 0; +out: +        if (ret) { +                if (local) +                        mem_put (local); +                local = NULL; +        } + +        return local; +} + +call_frame_t * +cs_lock_frame (call_frame_t *parent_frame) +{ +        call_frame_t    *lock_frame = NULL; + +        lock_frame = copy_frame (parent_frame); + +        if (lock_frame == NULL) +                goto out; + +        set_lk_owner_from_ptr (&lock_frame->root->lk_owner, parent_frame->root); + +out: +        return lock_frame; + +} + +void +cs_lock_wipe (call_frame_t *lock_frame) +{ +        CS_STACK_DESTROY (lock_frame); +} + + +int32_t +cs_inodelk_unlock_cbk (call_frame_t *frame, void *cookie,  xlator_t *this, +                       int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        cs_lock_wipe (frame); + +        return 0; +} + +int +cs_inodelk_unlock (call_frame_t *main_frame) +{ +        xlator_t        *this   = NULL; +        struct gf_flock  flock  = {0,}; +        call_frame_t    *lock_frame  = NULL; +        cs_local_t      *lock_local  = NULL; +        cs_local_t      *main_local  = NULL; +        int              ret    = 0; + +        this = main_frame->this; +        main_local = main_frame->local; + +        lock_frame = cs_lock_frame (main_frame); +        if (!lock_frame) +                goto out; + +        lock_local = cs_local_init (this, lock_frame, NULL, NULL, 0); +        if (!lock_local) +                goto out; + +        ret = cs_build_loc (&lock_local->loc, main_frame); +        if (ret) { +                goto out; +        } + +        flock.l_type = F_UNLCK; + +        main_local->locked = _gf_false; + +        STACK_WIND (lock_frame, cs_inodelk_unlock_cbk, FIRST_CHILD (this), +                    FIRST_CHILD (this)->fops->inodelk, CS_LOCK_DOMAIN, +                    &lock_local->loc, F_SETLKW, &flock, NULL); + +        return 0; + +out: +        gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Stale lock would be found on" +                " server"); + +        if (lock_frame) +                cs_lock_wipe (lock_frame); + +        return 0; +} + +void * +cs_download_task (void *arg) +{ +        call_frame_t    *frame          = NULL; +        xlator_t        *this           = NULL; +        cs_private_t    *priv           = NULL; +        int              ret            = -1; +        char            *sign_req       = NULL; +        fd_t            *fd             = NULL; +        cs_local_t      *local          = NULL; +        dict_t          *dict           = NULL; +        int             *retval         = NULL; + +        frame = (call_frame_t *)arg; + +        this = frame->this; + +        priv = this->private; + +        local = frame->local; + +        retval = GF_CALLOC (1, sizeof(int), gf_common_mt_int); +        if (!retval) { +                gf_log (this->name, GF_LOG_ERROR, "insufficient memory"); +                ret = -1; +                goto out; +        } + +        if (local->fd) +                fd = fd_anonymous (local->fd->inode); +        else +                fd = fd_anonymous (local->loc.inode); + +        if (!fd) { +                gf_msg ("CS", GF_LOG_ERROR, 0, 0, "fd creation failed"); +                ret = -1; +                goto out; +        } + +        local->dlfd = fd; +        local->dloffset = 0; + +        dict = dict_new (); +        if (!dict) { +                gf_msg (this->name, GF_LOG_ERROR, 0, ENOMEM, "failed to create " +                        "dict"); +                ret = -1; +                goto out; +        } + +        ret = dict_set_uint32 (dict, GF_CS_OBJECT_DOWNLOADING, 1); +        if (ret) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed"); +                ret = -1; +                goto out; +        } + +        ret = syncop_fsetxattr (this, local->fd, dict, 0, NULL, NULL); +        if (ret) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "fsetxattr failed " +                        "key %s", GF_CS_OBJECT_DOWNLOADING); +                ret = -1; +                goto out; +        } +        /*this calling method is for per volume setting */ +        ret = priv->stores->dlfop (frame, priv->stores->config); +        if (ret) { +                 gf_msg (this->name, GF_LOG_ERROR, 0, 0, "download failed" +                         ", remotepath: %s", local->remotepath); + +                 /*using dlfd as it is anonymous and have RDWR flag*/ +                 ret = syncop_ftruncate (FIRST_CHILD (this), local->dlfd, 0, +                                         NULL, NULL, NULL, NULL); +                 if (ret) { +                         gf_msg (this->name, GF_LOG_ERROR, 0, -ret, +                                 "ftruncate failed"); +                 } else { +                         gf_msg_debug (this->name, 0, "ftruncate succeed"); +                 } + +                 ret = -1; +                 goto out; +        } else { +                gf_msg (this->name, GF_LOG_INFO, 0, 0, "download success, path" +                        " : %s", local->remotepath); + +                ret = syncop_fremovexattr (this, local->fd, +                                           GF_CS_OBJECT_REMOTE, NULL, NULL); +                if (ret) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, -ret, +                                "removexattr failed, remotexattr"); +                        ret = -1; +                        goto out; +                } else { +                        gf_msg_debug (this->name, 0, "fremovexattr success, " +                                      "path : %s", local->remotepath); +                } + +                ret = syncop_fremovexattr (this, local->fd, +                                           GF_CS_OBJECT_DOWNLOADING, NULL, +                                           NULL); +                if (ret) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, -ret, +                                "removexattr failed, downloading xattr, path %s" +                                , local->remotepath); +                        ret = -1; +                        goto out; +                } else { +                        gf_msg_debug (this->name, 0, "fremovexattr success" +                                      " path  %s", local->remotepath); +                } +        } + +out: +        GF_FREE (sign_req); + +        if (dict) +                dict_unref (dict); + +        if (fd) { +                fd_unref (fd); +                local->dlfd = NULL; +        } + +        if (retval) { +                *retval = ret; +                pthread_exit (retval); +        } else { +                pthread_exit (&ret); +        } +} + +int +cs_download (call_frame_t *frame) +{ +        int             *retval = NULL; +        int             ret = 0; +        pthread_t       dthread; +        cs_local_t      *local = NULL; +        xlator_t        *this = NULL; + +        local = frame->local; +        this = frame->this; + +        if (!local->remotepath) { +                ret = -1; +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "remote path not" +                        " available. Check posix logs to resolve"); +                goto out; +        } + +        ret = gf_thread_create (&dthread, NULL, &cs_download_task, +                                (void *)frame, "downloadthread"); + +        pthread_join (dthread, (void **)&retval); + +        ret = *retval; + +out: +        if (retval) +                GF_FREE (retval); + +        return ret; +} + +int +cs_stat_check_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                   int op_ret, int op_errno, struct iatt *stbuf, dict_t *xdata) +{ +        cs_local_t      *local = NULL; +        call_stub_t     *stub  = NULL; +        char            *filepath = NULL; +        int              ret = 0; +        inode_t         *inode = NULL; +        uint64_t         val = 0; + +        local = frame->local; + +        if (op_ret == -1) { +                local->op_ret = op_ret; +                local->op_errno = op_errno; +                gf_msg (this->name, GF_LOG_ERROR, 0, op_errno, +                        "stat check failed"); +                goto err; +        } else { +                if (local->fd) +                        inode = local->fd->inode; +                else +                        inode = local->loc.inode; + +                if (!inode) { +                        local->op_ret = -1; +                        local->op_errno = EINVAL; +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, "null inode " +                                "returned"); +                        goto err; +                } + +                ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); +                if (ret == 0) { +                        if (val == GF_CS_ERROR) { +                                cs_inode_ctx_reset (this, inode); +                                local->op_ret = -1; +                                local->op_errno = EIO; +                                gf_msg (this->name, GF_LOG_ERROR, 0, 0, +                                        "status = GF_CS_ERROR. failed to get " +                                        " file state"); +                                goto err; +                        } else { +                                ret = __cs_inode_ctx_update (this, inode, val); +                                gf_msg_debug (this->name, 0, "status : %lu", +                                              val); +                                if (ret) { +                                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, +                                                "ctx update failed"); +                                        local->op_ret = -1; +                                        local->op_errno = ENOMEM; +                                        goto err; +                                } +                        } +                } else { +                        gf_msg_debug (this->name, 0, +                                      "status not found in dict"); +                        local->op_ret = -1; +                        local->op_errno = ENOMEM; +                        goto err; +                } + +                ret = dict_get_str (xdata, GF_CS_OBJECT_REMOTE, &filepath); +                if (filepath) { +                        gf_msg_debug (this->name, 0, "filepath returned %s", +                                      filepath); +                        local->remotepath = gf_strdup (filepath); +                        if (!local->remotepath) { +                                local->op_ret = -1; +                                local->op_errno = ENOMEM; +                                goto err; +                        } +                } else { +                        gf_msg_debug (this->name, 0, "NULL filepath"); +                } + +                local->op_ret = 0; +                local->xattr_rsp = dict_ref (xdata); +                memcpy (&local->stbuf, stbuf, sizeof (struct iatt)); +        } + +        stub = local->stub; +        local->stub = NULL; +        call_resume (stub); + +        return 0; +err: +        cs_inodelk_unlock (frame); + +        cs_common_cbk (frame); + +        return 0; +} + +int +cs_do_stat_check (call_frame_t *main_frame) +{ +        cs_local_t       *local  = NULL; +        xlator_t         *this   = NULL; +        int               ret   = 0; + +        local = main_frame->local; +        this = main_frame->this; + +        ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_REPAIR, 256); +        if (ret) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed"); +                goto err; +        } + +        if (local->fd) { +                STACK_WIND (main_frame, cs_stat_check_cbk, FIRST_CHILD (this), +                            FIRST_CHILD (this)->fops->fstat, local->fd, +                            local->xattr_req); +        } else { +                STACK_WIND (main_frame, cs_stat_check_cbk, FIRST_CHILD (this), +                            FIRST_CHILD (this)->fops->stat, &local->loc, +                            local->xattr_req); +        } + +        return 0; + +err: +        cs_inodelk_unlock (main_frame); + +        cs_common_cbk (main_frame); + +        return 0; +} + +void +cs_common_cbk (call_frame_t *frame) +{ +        glusterfs_fop_t fop = -1; +        cs_local_t      *local = NULL; + +        local = frame->local; + +        fop = local->fop; + +        /*Note: Only the failure case needs to be handled here. Since for +         * successful stat check the fop will resume anyway. The unwind can +         * happen from the fop_cbk and each cbk can unlock the inodelk in case +         * a lock was taken before. The lock status can be stored in frame */ + +        /* for failure case  */ + +        /*TODO: add other fops*/ +        switch (fop) { +        case GF_FOP_WRITE: +                CS_STACK_UNWIND (writev, frame, local->op_ret, +                                 local->op_errno, NULL, NULL, NULL); +                break; + +        case GF_FOP_SETXATTR: +                CS_STACK_UNWIND (setxattr, frame, local->op_ret, +                                 local->op_errno, NULL); +                break; +        case GF_FOP_READ: +                CS_STACK_UNWIND (readv, frame, local->op_ret, +                                 local->op_errno, NULL, 0, NULL, NULL, +                                 NULL); +                break; +        case GF_FOP_FTRUNCATE: +                CS_STACK_UNWIND (ftruncate, frame, local->op_ret, +                                 local->op_errno, NULL, NULL, NULL); +                break; + + +        default: +                break; +        } + +        return; +} + +int +cs_blocking_inodelk_cbk (call_frame_t *lock_frame, void *cookie, xlator_t *this, +                         int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        cs_local_t      *main_local = NULL; +        call_frame_t    *main_frame = NULL; +        cs_local_t      *lock_local = NULL; + +        lock_local = lock_frame->local; + +        main_frame = lock_local->main_frame; +        main_local = main_frame->local; + +        if (op_ret) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "inodelk failed"); +                main_local->op_errno = op_errno; +                main_local->op_ret = op_ret; +                goto err; +        } + +        main_local->locked = _gf_true; + +        cs_lock_wipe (lock_frame); + +        cs_do_stat_check (main_frame); + +        return 0; +err: +        cs_common_cbk (main_frame); + +        cs_lock_wipe (lock_frame); + +        return 0; +} + +int +cs_build_loc (loc_t *loc, call_frame_t *frame) +{ +        cs_local_t      *local  = NULL; +        int              ret    = -1; + +        local = frame->local; + +        if (local->fd) { +                loc->inode = inode_ref (local->fd->inode); +                if (loc->inode) { +                        gf_uuid_copy (loc->gfid, loc->inode->gfid); +                        ret = 0; +                        goto out; +                } else { +                        ret = -1; +                        goto out; +                } +        } else { +                loc->inode = inode_ref (local->loc.inode); +                if (loc->inode) { +                        gf_uuid_copy (loc->gfid, loc->inode->gfid); +                        ret = 0; +                        goto out; +                } else { +                        ret = -1; +                        goto out; +                } +        } +out: +        return ret; +} + +int +cs_blocking_inodelk (call_frame_t *parent_frame) +{ +        call_frame_t    *lock_frame = NULL; +        cs_local_t      *lock_local = NULL; +        xlator_t        *this       = NULL; +        struct gf_flock  flock      = {0,}; +        int              ret        = 0; + +        lock_frame = cs_lock_frame (parent_frame); +        if (!lock_frame) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "insuffcient memory"); +                goto err; +        } + +        this = parent_frame->this; + +        lock_local = cs_local_init (this, lock_frame, NULL, NULL, 0); +        if (!lock_local) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local init failed"); +                goto err; +        } + +        lock_local->main_frame = parent_frame; + +        flock.l_type = F_WRLCK; + +        ret = cs_build_loc (&lock_local->loc, parent_frame); +        if (ret) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "build_loc failed"); +                goto err; +        } + +        STACK_WIND (lock_frame, cs_blocking_inodelk_cbk, FIRST_CHILD (this), +                    FIRST_CHILD (this)->fops->inodelk, CS_LOCK_DOMAIN, +                    &lock_local->loc, F_SETLKW, &flock, NULL); + +        return 0; +err: +        if (lock_frame) +                cs_lock_wipe (lock_frame); + +        return -1; +} + +int +locate_and_execute (call_frame_t *frame) +{ +        int     ret = 0; + +        ret = cs_blocking_inodelk (frame); + +        if (ret) +                return -1; +        else +                return 0; +} + +int32_t +cs_resume_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, +                    off_t offset, dict_t *xattr_req) +{ +        cs_local_t      *local  = NULL; +        int              ret    = 0; + +        local = frame->local; + +        ret = cs_resume_postprocess (this, frame, loc->inode); +        if (ret) { +                goto unwind; +        } + +        cs_inodelk_unlock (frame); + +        STACK_WIND (frame, cs_truncate_cbk, +                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, +                    loc, offset, local->xattr_req); + +        return 0; + +unwind: +        cs_inodelk_unlock (frame); + +        cs_common_cbk (frame); + +        return 0; +} + + +int32_t +cs_resume_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, +                    dict_t *dict, int32_t flags, dict_t *xdata) +{ +        cs_local_t      *local = NULL; +        cs_inode_ctx_t  *ctx   = NULL; +        gf_cs_obj_state  state = GF_CS_ERROR; + +        local = frame->local; + +        __cs_inode_ctx_get (this, loc->inode, &ctx); + +        state = __cs_get_file_state (this, loc->inode, ctx); + +        if (state == GF_CS_ERROR) { +                /* file is already remote */ +                local->op_ret = -1; +                local->op_errno = EINVAL; +                gf_msg (this->name, GF_LOG_WARNING, 0, 0, +                        "file %s , could not figure file state", loc->path); +                goto unwind; +        } + + +        if (state == GF_CS_REMOTE) { +                /* file is already remote */ +                local->op_ret = -1; +                local->op_errno = EINVAL; +                gf_msg (this->name, GF_LOG_WARNING, 0, EINVAL, +                        "file %s is already remote", loc->path); +                goto unwind; +        } + +        if (state == GF_CS_DOWNLOADING) { +                gf_msg (this->name, GF_LOG_WARNING, 0, 0, +                        " file is in downloading state."); +                local->op_ret = -1; +                local->op_errno = EINVAL; +                goto unwind; +        } + +        STACK_WIND (frame, cs_setxattr_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, +                    local->xattr_req); + +        return 0; +unwind: +        cs_inodelk_unlock (frame); + +        cs_common_cbk (frame); + +        return 0; +} + + + + +gf_cs_obj_state +__cs_get_file_state (xlator_t *this, inode_t *inode, cs_inode_ctx_t *ctx) +{ +        gf_cs_obj_state    state = -1; + +        if (!ctx) +                return GF_CS_ERROR; + +        LOCK (&inode->lock); +        { +                state = ctx->state; +        } +        UNLOCK (&inode->lock); + +        return state; +} + +void +__cs_inode_ctx_get (xlator_t *this, inode_t *inode, cs_inode_ctx_t **ctx) +{ +        uint64_t        ctxint = 0; +        int             ret = 0; + +        LOCK (&inode->lock); +        { +        ret = __inode_ctx_get (inode, this, &ctxint); +        } +        UNLOCK (&inode->lock); + +        if (ret) +                *ctx = NULL; +        else +                *ctx = (cs_inode_ctx_t *)ctxint; + +        return; +} + +int +__cs_inode_ctx_update (xlator_t *this, inode_t *inode, uint64_t val) +{ +        cs_inode_ctx_t  *ctx = NULL; +        uint64_t         ctxint = 0; +        int              ret = 0; + +        LOCK (&inode->lock); +        { +        ret = __inode_ctx_get (inode, this, &ctxint); +        if (ret) { +                ctx = GF_CALLOC (1, sizeof (*ctx), gf_cs_mt_cs_inode_ctx_t); +                if (!ctx) { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, +                                "ctx allocation failed"); +                        ret = -1; +                        goto out; +                } + +                ctx->state = val; + +                ctxint = (uint64_t) ctx; + +                ret = __inode_ctx_set (inode, this, &ctxint); +                if (ret) { +                        GF_FREE (ctx); +                        goto out; +                } +        } else { +                ctx = (cs_inode_ctx_t *) ctxint; + +                ctx->state = val; +        } + +        } + +out: +        UNLOCK (&inode->lock); + +        return ret; +} + +int +cs_inode_ctx_reset (xlator_t *this, inode_t *inode) +{ +        cs_inode_ctx_t  *ctx = NULL; +        uint64_t         ctxint = 0; + +        inode_ctx_del (inode, this, &ctxint); +        if (!ctxint) { +                return 0; +        } + +        ctx = (cs_inode_ctx_t *)ctxint; + +        GF_FREE (ctx); +        return 0; +} + +int +cs_resume_postprocess (xlator_t *this, call_frame_t *frame, inode_t *inode) +{ +        cs_local_t      *local = NULL; +        gf_cs_obj_state  state = -1; +        cs_inode_ctx_t  *ctx   = NULL; +        int              ret   = 0; + +        local = frame->local; +        if (!local) { +                ret = -1; +                goto out; +        } + +        __cs_inode_ctx_get (this, inode, &ctx); + +        state = __cs_get_file_state (this, inode, ctx); +        if (state == GF_CS_ERROR) { +                gf_msg (this->name, GF_LOG_ERROR, 0, 0, "status is GF_CS_ERROR." +                        " Aborting write"); +                local->op_ret = -1; +                local->op_errno = EREMOTE; +                ret = -1; +                goto out; +        } + +        if (state == GF_CS_REMOTE || state == GF_CS_DOWNLOADING) { +                gf_msg_debug (this->name, 0, "status is %d", state); +                ret = cs_download (frame); +                if (ret == 0) { +                        gf_msg_debug (this->name, 0, "Winding for Final Write"); +                } else { +                        gf_msg (this->name, GF_LOG_ERROR, 0, 0, +                                " download failed, unwinding writev"); +                        local->op_ret = -1; +                        local->op_errno = EREMOTE; +                        ret = -1; +                } +        } +out: +        return ret; +} +int32_t +cs_fdctx_to_dict (xlator_t *this, +        fd_t *fd, +        dict_t *dict) +{ +        return 0; +} + + +int32_t +cs_inode (xlator_t *this) +{ +        return 0; +} + + +int32_t +cs_inode_to_dict (xlator_t *this, +        dict_t *dict) +{ +        return 0; +} + + +int32_t +cs_history (xlator_t *this) +{ +        return 0; +} + + +int32_t +cs_fd (xlator_t *this) +{ +        return 0; +} + + +int32_t +cs_fd_to_dict (xlator_t *this, +        dict_t *dict) +{ +        return 0; +} + + +int32_t +cs_fdctx (xlator_t *this, +        fd_t *fd) +{ +        return 0; +} + + +int32_t +cs_inodectx (xlator_t *this, +        inode_t *ino) +{ +        return 0; +} + + +int32_t +cs_inodectx_to_dict (xlator_t *this, +        inode_t *ino, +        dict_t *dict) +{ +        return 0; +} + + +int32_t +cs_priv_to_dict (xlator_t *this, +        dict_t *dict) +{ +        return 0; +} + + +int32_t +cs_priv (xlator_t *this) +{ +        return 0; +} + +int +cs_notify (xlator_t *this, int event, void *data, ...) +{ +        return default_notify (this, event, data); +} + + +struct xlator_fops cs_fops = { +        .stat                 = cs_stat, +        .readdirp             = cs_readdirp, +        .truncate             = cs_truncate, +        .seek                 = cs_seek, +        .statfs               = cs_statfs, +        .fallocate            = cs_fallocate, +        .discard              = cs_discard, +        .getxattr             = cs_getxattr, +        .writev               = cs_writev, +        .setxattr             = cs_setxattr, +        .fgetxattr            = cs_fgetxattr, +        .lookup               = cs_lookup, +        .fsetxattr            = cs_fsetxattr, +        .readv                = cs_readv, +        .ftruncate            = cs_ftruncate, +        .rchecksum            = cs_rchecksum, +        .unlink               = cs_unlink, +        .open                 = cs_open, +        .fstat                = cs_fstat, +        .zerofill             = cs_zerofill, +}; + +struct xlator_cbks cs_cbks = { +}; + +struct xlator_dumpops cs_dumpops = { +        .fdctx_to_dict        = cs_fdctx_to_dict, +        .inode                = cs_inode, +        .inode_to_dict        = cs_inode_to_dict, +        .history              = cs_history, +        .fd                   = cs_fd, +        .fd_to_dict           = cs_fd_to_dict, +        .fdctx                = cs_fdctx, +        .inodectx             = cs_inodectx, +        .inodectx_to_dict     = cs_inodectx_to_dict, +        .priv_to_dict         = cs_priv_to_dict, +        .priv                 = cs_priv, +}; + +struct volume_options cs_options[] = { +       { .key  = {NULL} }, +}; + +xlator_api_t xlator_api = { +        .init           = cs_init, +        .fini           = cs_fini, +        .notify         = cs_notify, +        .reconfigure    = cs_reconfigure, +        .mem_acct_init  = cs_mem_acct_init, +        .dumpops        = &cs_dumpops, +        .fops           = &cs_fops, +        .cbks           = &cs_cbks, +        .options        = cs_options, +        .identifier     = "cloudsync", +};  | 
