diff options
author | Susant Palai <spalai@redhat.com> | 2018-03-09 20:07:19 +0530 |
---|---|---|
committer | Amar Tumballi <amarts@redhat.com> | 2018-04-10 01:09:29 +0000 |
commit | 48623a33a0ef38f6c99208b0580954d7d7c80e76 (patch) | |
tree | bbe9aa572b36a41c3c64b93c43f2d43fd2634bd1 | |
parent | f946d98a95249c8d906323e6419ec8538467d2ab (diff) |
experimental/cloudsync: Download xlator for archival feature
spec-files:
https://review.gluster.org/#/c/18854/
Overview:
* Cloudsync maintains three file states in it's inode-ctx i.e
1 - LOCAL,
2 - REMOTE,
3 - DOWNLOADING.
* A data modifying fop is allowed only if the state is LOCAL.
If the state is REMOTE or DOWNLOADING, client will download
or wait for the download to finish initiated by other client.
* Multiple download and upload from different clients are synchronized
by inodelk.
* In POSIX a state check is done (part of different commit)before
allowing the fop to continue. If the state is remote/downloading the
fop is unwound with EREMOTE. The client will then download the file
and continue with the fop again.
* Basic Algo for fop (let's say write fop):
- If LOCAL -> resume fop
- If REMOTE ->
- INODELK
- STAT (this gets state and heal the state if needed)
- DOWNLOAD
- resume fop
Note:
* Developers will need to write plugins for download, based on the
remote store they choose. In phase-1, support will be added for
one remote store per volume. In future, more options for multiple
remote stores will be explored.
TODOs:
- Implement stat/lookup/readdirp to return size info from xattr
- Make plugins configurable
- Implement unlink fop
- Add metrics collection
- Add sharding support
Design Contributions:
Aravinda V K <avishwan@redhat.com>
Amar Tumballi <amarts@redhat.com>
Ram Ankireddypalle <areddy@commvault.com>
Susant Palai <spalai@redhat.com>
updates: #387
Change-Id: Iddf711ee7ab4e946ae3e472ff62791a7b85e6d4b
Signed-off-by: Susant Palai <spalai@redhat.com>
21 files changed, 2468 insertions, 4 deletions
diff --git a/MAINTAINERS b/MAINTAINERS index 3f662f91944..a51949739d8 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -284,6 +284,11 @@ P: Karthik US <ksubrahm@redhat.com> S: Maintained F: xlators/features/read-only/ +Cloudsync +M: Susant Kumar Palai <spalai@redhat.com> +S: Maintained +F: xlators/features/cloudsync/ + Experimental Features: ---------------------- diff --git a/configure.ac b/configure.ac index 6624bd63cb1..81eca4976a1 100644 --- a/configure.ac +++ b/configure.ac @@ -176,6 +176,8 @@ AC_CONFIG_FILES([Makefile xlators/features/bit-rot/src/bitd/Makefile xlators/features/leases/Makefile xlators/features/leases/src/Makefile + xlators/features/cloudsync/Makefile + xlators/features/cloudsync/src/Makefile xlators/playground/Makefile xlators/playground/template/Makefile xlators/playground/template/src/Makefile diff --git a/glusterfs.spec.in b/glusterfs.spec.in index d7938c24edd..a465f5445d6 100644 --- a/glusterfs.spec.in +++ b/glusterfs.spec.in @@ -1092,6 +1092,7 @@ exit 0 %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/shard.so %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/snapview-client.so %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/worm.so + %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/cloudsync.so %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/meta.so %dir %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/performance %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/performance/io-cache.so diff --git a/libglusterfs/src/generator.py b/libglusterfs/src/generator.py index c9311ea3856..3072382c7a5 100755 --- a/libglusterfs/src/generator.py +++ b/libglusterfs/src/generator.py @@ -714,7 +714,7 @@ def get_error_arg (type_str): return "NULL" return "-1" -def get_subs (names, types): +def get_subs (names, types, cbktypes=None): sdict = {} sdict["@SHORT_ARGS@"] = string.join(names,", ") # Convert two separate tuples to one of (name, type) sub-tuples. @@ -725,6 +725,9 @@ def get_subs (names, types): sdict["@LONG_ARGS@"] = string.join(as_strings,",\n\t") # So much more readable than string.join(map(string.join,zip(...)))) sdict["@ERROR_ARGS@"] = string.join(map(get_error_arg,types),", ") + if cbktypes is not None: + sdict["@CBK_ERROR_ARGS@"] = string.join(map( + get_error_arg,cbktypes),", ") return sdict def generate (tmpl, name, subs): @@ -732,6 +735,8 @@ def generate (tmpl, name, subs): if name == "writev": # More spurious inconsistency. text = text.replace("@UPNAME@","WRITE") + elif name == "readv": + text = text.replace("@UPNAME@","READ") else: text = text.replace("@UPNAME@",name.upper()) for old, new in subs[name].iteritems(): @@ -747,7 +752,8 @@ for name, args in ops.iteritems(): # Create the necessary substitution strings for fops. arg_names = [ a[1] for a in args if a[0] == 'fop-arg'] arg_types = [ a[2] for a in args if a[0] == 'fop-arg'] - fop_subs[name] = get_subs(arg_names,arg_types) + cbk_types = [ a[2] for a in args if a[0] == 'cbk-arg'] + fop_subs[name] = get_subs(arg_names,arg_types,cbk_types) # Same thing for callbacks. arg_names = [ a[1] for a in args if a[0] == 'cbk-arg'] diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 43e6f48d905..5b9f2a45405 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -358,6 +358,22 @@ enum gf_internal_fop_indicator { } \ } while (0) \ +#define GF_CS_OBJECT_SIZE "trusted.glusterfs.cs.object_size" + +#define GF_CS_OBJECT_UPLOAD_COMPLETE "trusted.glusterfs.csou.complete" +#define GF_CS_OBJECT_REMOTE "trusted.glusterfs.cs.remote" +#define GF_CS_OBJECT_DOWNLOADING "trusted.glusterfs.cs.downloading" +#define GF_CS_OBJECT_DOWNLOADED "trusted.glusterfs.cs.downloaded" +#define GF_CS_OBJECT_STATUS "trusted.glusterfs.cs.status" +#define GF_CS_OBJECT_REPAIR "trusted.glusterfs.cs.repair" + +typedef enum { + GF_CS_LOCAL = 1, + GF_CS_REMOTE = 2, + GF_CS_REPAIR = 4, + GF_CS_DOWNLOADING = 8, + GF_CS_ERROR = 16, +} gf_cs_obj_state; typedef enum { GF_FOP_PRI_UNSPEC = -1, /* Priority not specified */ diff --git a/tests/basic/cloudsync-sanity.t b/tests/basic/cloudsync-sanity.t new file mode 100644 index 00000000000..3cf719da011 --- /dev/null +++ b/tests/basic/cloudsync-sanity.t @@ -0,0 +1,22 @@ +#!/bin/bash + +. $(dirname $0)/../include.rc +. $(dirname $0)/../volume.rc + +cleanup; + +TEST glusterd +TEST pidof glusterd + +TEST $CLI volume create $V0 replica 3 $H0:$B0/${V0}{1,2,3,4,5,6,7,8,9}; +TEST $CLI volume set $V0 features.cloudsync enable; +TEST $CLI volume start $V0; + +## Mount FUSE +TEST $GFS -s $H0 --volfile-id $V0 $M1; + +# This test covers lookup, mkdir, mknod, symlink, link, rename, +# create operations +TEST $(dirname $0)/rpc-coverage.sh $M1 + +cleanup; diff --git a/xlators/features/Makefile.am b/xlators/features/Makefile.am index 1a2bc70ec68..f164efc1218 100644 --- a/xlators/features/Makefile.am +++ b/xlators/features/Makefile.am @@ -1,5 +1,5 @@ SUBDIRS = locks quota read-only quiesce marker index barrier \ arbiter compress changelog changetimerecorder \ gfid-access $(GLUPY_SUBDIR) upcall snapview-client snapview-server \ - trash shard bit-rot leases selinux sdfs namespace + trash shard bit-rot leases selinux sdfs namespace cloudsync CLEANFILES = diff --git a/xlators/features/cloudsync/Makefile.am b/xlators/features/cloudsync/Makefile.am new file mode 100644 index 00000000000..a985f42a877 --- /dev/null +++ b/xlators/features/cloudsync/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES = diff --git a/xlators/features/cloudsync/src/Makefile.am b/xlators/features/cloudsync/src/Makefile.am new file mode 100644 index 00000000000..da660d7401b --- /dev/null +++ b/xlators/features/cloudsync/src/Makefile.am @@ -0,0 +1,47 @@ +xlator_LTLIBRARIES = cloudsync.la + +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features + +CLOUDSYNC_SRC = $(top_srcdir)/xlators/features/cloudsync/src + +cloudsync_sources = cloudsync.c + +CLOUDSYNC_SRC = $(top_srcdir)/xlators/features/cloudsync/src +CLOUDSYNC_BLD = $(top_builddir)/xlators/features/cloudsync/src + +cloudsynccommon_sources = $(CLOUDSYNC_SRC)/cloudsync-common.c + +noinst_HEADERS = $(CLOUDSYNC_BLD)/cloudsync.h \ + $(CLOUDSYNC_BLD)/cloudsync-mem-types.h \ + $(CLOUDSYNC_BLD)/cloudsync-messages.h \ + $(CLOUDSYNC_BLD)/cloudsync-common.h + +cloudsync_la_SOURCES = $(cloudsync_sources) $(cloudsynccommon_sources) + +nodist_cloudsync_la_SOURCES = cloudsync-autogen-fops.c cloudsync-autogen-fops.h +BUILT_SOURCES = cloudsync-autogen-fops.h + +cloudsync_la_LDFLAGS = -module $(GF_XLATOR_DEFAULT_LDFLAGS) + +cloudsync_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/rpc/xdr/src -I$(top_builddir)/rpc/xdr/src + +AM_CFLAGS = -Wall -fno-strict-aliasing $(GF_CFLAGS) + +noinst_PYTHON = cloudsync-fops-c.py cloudsync-fops-h.py +EXTRA_DIST = cloudsync-autogen-fops-tmpl.c cloudsync-autogen-fops-tmpl.h + +cloudsync-autogen-fops.c: cloudsync-fops-c.py cloudsync-autogen-fops-tmpl.c + $(PYTHON) $(CLOUDSYNC_SRC)/cloudsync-fops-c.py \ + $(CLOUDSYNC_SRC)/cloudsync-autogen-fops-tmpl.c > $@ + +cloudsync-autogen-fops.h: cloudsync-fops-h.py cloudsync-autogen-fops-tmpl.h + $(PYTHON) $(CLOUDSYNC_SRC)/cloudsync-fops-h.py \ + $(CLOUDSYNC_SRC)/cloudsync-autogen-fops-tmpl.h > $@ + +CLEANFILES = $(nodist_cloudsync_la_SOURCES) + +uninstall-local: + rm -f $(DESTDIR)$(xlatordir)/cloudsync.so + diff --git a/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.c b/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.c new file mode 100644 index 00000000000..6bb68cd170c --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.c @@ -0,0 +1,30 @@ +/* + Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/* File: cloudsync-autogen-fops-tmpl.c + * This file contains the CLOUDSYNC autogenerated FOPs. This is run through + * the code generator, generator.py to generate the required FOPs. + */ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <dlfcn.h> + +#include "glusterfs.h" +#include "xlator.h" +#include "defaults.h" +#include "cloudsync.h" +#include "cloudsync-common.h" +#include "call-stub.h" + +#pragma generate diff --git a/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.h b/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.h new file mode 100644 index 00000000000..2db2a9c88c7 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.h @@ -0,0 +1,24 @@ +/* + Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/* File: clousync-autogen-fops-tmpl.h + * This file contains the cloudsync autogenerated FOPs declarations. + */ + +#ifndef _CLOUDSYNC_AUTOGEN_FOPS_H +#define _CLOUDSYNC_AUTOGEN_FOPS_H + +#include "xlator.h" +#include "cloudsync.h" +#include "cloudsync-common.h" + +#pragma generate + +#endif /* _CLOUDSYNC_AUTOGEN_FOPS_H */ diff --git a/xlators/features/cloudsync/src/cloudsync-common.c b/xlators/features/cloudsync/src/cloudsync-common.c new file mode 100644 index 00000000000..d0d00decca3 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-common.c @@ -0,0 +1,44 @@ +/* + Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "cloudsync-common.h" + +void +cs_local_wipe (xlator_t *this, cs_local_t *local) +{ + if (!local) + return; + + loc_wipe (&local->loc); + + if (local->fd) { + fd_unref (local->fd); + local->fd = NULL; + } + + if (local->stub) { + call_stub_destroy (local->stub); + local->stub = NULL; + } + + if (local->xattr_req) + dict_unref (local->xattr_req); + + if (local->xattr_rsp) + dict_unref (local->xattr_rsp); + + if (local->dlfd) + fd_unref (local->dlfd); + + if (local->remotepath) + GF_FREE (local->remotepath); + + mem_put (local); +} diff --git a/xlators/features/cloudsync/src/cloudsync-common.h b/xlators/features/cloudsync/src/cloudsync-common.h new file mode 100644 index 00000000000..3298ab0a6f2 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-common.h @@ -0,0 +1,97 @@ +/* + Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ +#ifndef _CLOUDSYNC_COMMON_H +#define _CLOUDSYNC_COMMON_H + +#include "glusterfs.h" +#include "call-stub.h" +#include "xlator.h" +#include "syncop.h" +#include "cloudsync-mem-types.h" +#include "cloudsync-messages.h" + +typedef struct cs_local { + loc_t loc; + fd_t *fd; + call_stub_t *stub; + call_frame_t *main_frame; + int op_errno; + int op_ret; + fd_t *dlfd; + off_t dloffset; + struct iatt stbuf; + dict_t *xattr_rsp; + dict_t *xattr_req; + glusterfs_fop_t fop; + gf_boolean_t locked; + int call_cnt; + inode_t *inode; + char *remotepath; +} cs_local_t; + +typedef int (*fop_download_t) (call_frame_t *frame, void *config); + +typedef void *(*store_init) (xlator_t *this); + +typedef int (*store_reconfigure) (xlator_t *this, dict_t *options); + +typedef void (*store_fini) (void *config); + +struct cs_remote_stores { + char *name; /* store name */ + void *config; /* store related information */ + fop_download_t dlfop; /* store specific download function */ + store_init init; /* store init to initialize store config */ + store_reconfigure reconfigure; /* reconfigure store config */ + store_fini fini; + void *handle; /* shared library handle*/ +}; + +typedef struct cs_private { + xlator_t *this; + struct cs_remote_stores *stores; + gf_boolean_t abortdl; + pthread_spinlock_t lock; +} cs_private_t; + +void +cs_local_wipe (xlator_t *this, cs_local_t *local); + +#define CS_STACK_UNWIND(fop, frame, params ...) do { \ + cs_local_t *__local = NULL; \ + xlator_t *__xl = NULL; \ + if (frame) { \ + __xl = frame->this; \ + __local = frame->local; \ + frame->local = NULL; \ + } \ + STACK_UNWIND_STRICT (fop, frame, params); \ + cs_local_wipe (__xl, __local); \ +} while (0) + +#define CS_STACK_DESTROY(frame) do { \ + cs_local_t *__local = NULL; \ + xlator_t *__xl = NULL; \ + __xl = frame->this; \ + __local = frame->local; \ + frame->local = NULL; \ + STACK_DESTROY (frame->root); \ + cs_local_wipe (__xl, __local); \ +} while (0) + +typedef struct store_methods { + int (*fop_download) (call_frame_t *frame, void *config); + /* return type should be the store config */ + void *(*fop_init) (xlator_t *this); + int (*fop_reconfigure) (xlator_t *this, dict_t *options); + void (*fop_fini) (void *config); +} store_methods_t; + +#endif /* _CLOUDSYNC_COMMON_H */ diff --git a/xlators/features/cloudsync/src/cloudsync-fops-c.py b/xlators/features/cloudsync/src/cloudsync-fops-c.py new file mode 100644 index 00000000000..e3030724468 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-fops-c.py @@ -0,0 +1,305 @@ +#!/usr/bin/python + +import os +import sys + +curdir = os.path.dirname(sys.argv[0]) +gendir = os.path.join(curdir, '../../../../libglusterfs/src') +sys.path.append(gendir) +from generator import ops, fop_subs, cbk_subs, generate + +FD_DATA_MODIFYING_OP_FOP_TEMPLATE = """ +int32_t +cs_@NAME@ (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@) +{ + int op_errno = -1; + cs_local_t *local = NULL; + int ret = 0; + cs_inode_ctx_t *ctx = NULL; + gf_cs_obj_state state = -1; + + VALIDATE_OR_GOTO (frame, err); + VALIDATE_OR_GOTO (this, err); + VALIDATE_OR_GOTO (fd, err); + + local = cs_local_init (this, frame, NULL, fd, GF_FOP_@UPNAME@); + if (!local) { + + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local init failed"); + op_errno = ENOMEM; + goto err; + } + + __cs_inode_ctx_get (this, fd->inode, &ctx); + + if (ctx) + state = __cs_get_file_state (this, fd->inode, ctx); + else + state = GF_CS_LOCAL; + + local->xattr_req = xdata ? dict_ref (xdata) : (xdata = dict_new ()); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + + local->stub = fop_@NAME@_stub (frame, cs_resume_@NAME@, + @SHORT_ARGS@); + if (!local->stub) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "insufficient memory"); + op_errno = ENOMEM; + goto err; + } + + + if (state == GF_CS_LOCAL) { + STACK_WIND (frame, cs_@NAME@_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@, + @SHORT_ARGS@); + } else { + local->call_cnt++; + ret = locate_and_execute (frame); + if (ret) { + op_errno = ENOMEM; + goto err; + } + } + + return 0; + +err: + CS_STACK_UNWIND (@NAME@, frame, -1, op_errno, @CBK_ERROR_ARGS@); + + return 0; +} +""" + +FD_DATA_MODIFYING_RESUME_OP_FOP_TEMPLATE = """ +int32_t +cs_resume_@NAME@ (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@) +{ + int ret = 0; + + ret = cs_resume_postprocess (this, frame, fd->inode); + if (ret) { + goto unwind; + } + + cs_inodelk_unlock (frame); + + STACK_WIND (frame, cs_@NAME@_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@, + @SHORT_ARGS@); + + return 0; + +unwind: + + cs_inodelk_unlock (frame); + + cs_common_cbk (frame); + + return 0; +} +""" +FD_DATA_MODIFYING_OP_FOP_CBK_TEMPLATE = """ +int32_t +cs_@NAME@_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + @LONG_ARGS@) +{ + cs_local_t *local = NULL; + int ret = 0; + uint64_t val = 0; + fd_t *fd = NULL; + + local = frame->local; + fd = local->fd; + + /* Do we need lock here? */ + local->call_cnt++; + + if (op_ret == -1) { + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (ret == 0) { + if (val == GF_CS_ERROR) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "could not get file state, unwinding"); + op_ret = -1; + op_errno = EIO; + goto unwind; + } else { + __cs_inode_ctx_update (this, fd->inode, val); + gf_msg (this->name, GF_LOG_INFO, 0, 0, + " state = %ld", val); + + if (local->call_cnt == 1 && + (val == GF_CS_REMOTE || + val == GF_CS_DOWNLOADING)) { + gf_msg (this->name, GF_LOG_INFO, 0, + 0, " will repair and download " + "the file, current state : %ld", + val); + goto repair; + } else { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "second @NAME@, Unwinding"); + goto unwind; + } + } + } else { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "file state " + "could not be figured, unwinding"); + goto unwind; + } + } else { + /* successful @NAME@ => file is local */ + __cs_inode_ctx_update (this, fd->inode, GF_CS_LOCAL); + gf_msg (this->name, GF_LOG_INFO, 0, 0, "state : GF_CS_LOCAL" + ", @NAME@ successful"); + + goto unwind; + } + +repair: + ret = locate_and_execute (frame); + if (ret) { + goto unwind; + } + + return 0; + +unwind: + CS_STACK_UNWIND (@NAME@, frame, op_ret, op_errno, @SHORT_ARGS@); + + return 0; +} +""" + +LOC_STAT_OP_FOP_TEMPLATE = """ +int32_t +cs_@NAME@ (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = cs_local_init (this, frame, loc, NULL, GF_FOP_@UPNAME@); + if (!local) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local is NULL"); + goto err; + } + + if (loc->inode->ia_type == IA_IFDIR) + goto wind; + + local->xattr_req = xdata ? dict_ref (xdata) : dict_new (); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + +wind: + STACK_WIND (frame, cs_@NAME@_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->@NAME@, + @SHORT_ARGS@); + + return 0; +err: + CS_STACK_UNWIND (@NAME@, frame, -1, errno, @CBK_ERROR_ARGS@); + + return 0; +} +""" + +LOC_STAT_OP_FOP_CBK_TEMPLATE = """ +int32_t +cs_@NAME@_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + @LONG_ARGS@) +{ + int ret = 0; + uint64_t val = 0; + loc_t *loc = NULL; + cs_local_t *local = NULL; + + local = frame->local; + + loc = &local->loc; + + if (op_ret == 0) { + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (!ret) { + ret = __cs_inode_ctx_update (this, loc->inode, val); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "ctx update failed"); + } + } + } else { + cs_inode_ctx_reset (this, loc->inode); + } + + CS_STACK_UNWIND (@NAME@, frame, op_ret, op_errno, @SHORT_ARGS@); + + return 0; +} +""" + +# All xlator FOPs are covered in the following section just to create a clarity +# The lists themselves are not used. +entry_ops = ['mknod', 'mkdir', 'unlink', 'rmdir', 'symlink', 'rename', 'link', + 'create'] +special_ops = ['statfs', 'lookup', 'ipc', 'compound', 'icreate', 'namelink'] +ignored_ops = ['getspec'] +inode_ops = ['stat', 'readlink', 'truncate', 'open', 'setxattr', 'getxattr', + 'removexattr', 'opendir', 'access', 'inodelk', 'entrylk', + 'xattrop', 'setattr', 'lease', 'getactivelk', 'setactivelk', + 'discover'] +fd_ops = ['readv', 'writev', 'flush', 'fsync', 'fsyncdir', 'ftruncate', + 'fstat', 'lk', 'readdir', 'finodelk', 'fentrylk', 'fxattrop', + 'fsetxattr', 'fgetxattr', 'rchecksum', 'fsetattr', 'readdirp', + 'fremovexattr', 'fallocate', 'discard', 'zerofill', 'seek'] + + +# These are the current actual lists used to generate the code + +# The following list contains fops which are fd based that modifies data +fd_data_modify_op_fop_template = ['readv', 'writev', 'flush', 'fsync', + 'ftruncate', 'rchecksum', 'fallocate', + 'discard', 'zerofill', 'seek'] + +# The following list contains fops which are entry based that does not change +# data +loc_stat_op_fop_template = ['lookup', 'stat', 'discover', 'access', 'setattr', + 'getattr'] + +# These fops need a separate implementation +special_fops = ['readdirp', 'statfs', 'setxattr', 'unlink', 'getxattr', + 'truncate', 'fstat'] + +def gen_defaults(): + for name in ops: + if name in fd_data_modify_op_fop_template: + print generate(FD_DATA_MODIFYING_OP_FOP_CBK_TEMPLATE, name, cbk_subs) + print generate(FD_DATA_MODIFYING_RESUME_OP_FOP_TEMPLATE, name, fop_subs) + print generate(FD_DATA_MODIFYING_OP_FOP_TEMPLATE, name, fop_subs) + elif name in loc_stat_op_fop_template: + print generate(LOC_STAT_OP_FOP_CBK_TEMPLATE, name, cbk_subs) + print generate(LOC_STAT_OP_FOP_TEMPLATE, name, fop_subs) + +for l in open(sys.argv[1], 'r').readlines(): + if l.find('#pragma generate') != -1: + print "/* BEGIN GENERATED CODE - DO NOT MODIFY */" + gen_defaults() + print "/* END GENERATED CODE */" + else: + print l[:-1] diff --git a/xlators/features/cloudsync/src/cloudsync-fops-h.py b/xlators/features/cloudsync/src/cloudsync-fops-h.py new file mode 100644 index 00000000000..552c6b58e3a --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-fops-h.py @@ -0,0 +1,30 @@ +#!/usr/bin/python + +import os +import sys + +curdir = os.path.dirname(sys.argv[0]) +gendir = os.path.join(curdir, '../../../../libglusterfs/src') +sys.path.append(gendir) +from generator import ops, fop_subs, cbk_subs, generate + +OP_FOP_TEMPLATE = """ +int32_t +cs_@NAME@ (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@); +""" + +def gen_defaults(): + for name, value in ops.iteritems(): + if name == 'getspec': + continue + print generate(OP_FOP_TEMPLATE, name, fop_subs) + + +for l in open(sys.argv[1], 'r').readlines(): + if l.find('#pragma generate') != -1: + print "/* BEGIN GENERATED CODE - DO NOT MODIFY */" + gen_defaults() + print "/* END GENERATED CODE */" + else: + print l[:-1] diff --git a/xlators/features/cloudsync/src/cloudsync-mem-types.h b/xlators/features/cloudsync/src/cloudsync-mem-types.h new file mode 100644 index 00000000000..6ebcb16552b --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-mem-types.h @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + * This file is part of GlusterFS. + * + * This file is licensed to you under your choice of the GNU Lesser + * General Public License, version 3 or any later version (LGPLv3 or + * later), or the GNU General Public License, version 2 (GPLv2), in all + * cases as published by the Free Software Foundation. + */ + + +#ifndef __CLOUDSYNC_MEM_TYPES_H__ +#define __CLOUDSYNC_MEM_TYPES_H__ + +#include "mem-types.h" +enum cs_mem_types_ { + gf_cs_mt_cs_private_t = gf_common_mt_end + 1, + gf_cs_mt_cs_remote_stores_t, + gf_cs_mt_cs_inode_ctx_t, + gf_cs_mt_end +}; +#endif /* __CLOUDSYNC_MEM_TYPES_H__ */ + diff --git a/xlators/features/cloudsync/src/cloudsync-messages.h b/xlators/features/cloudsync/src/cloudsync-messages.h new file mode 100644 index 00000000000..ad4b7d2e0b8 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-messages.h @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + * This file is part of GlusterFS. + * + * This file is licensed to you under your choice of the GNU Lesser + * General Public License, version 3 or any later version (LGPLv3 or + * later), or the GNU General Public License, version 2 (GPLv2), in all + * cases as published by the Free Software Foundation. + */ + + +#ifndef __CLOUDSYNC_MESSAGES_H__ +#define __CLOUDSYNC_MESSAGES_H__ + +/*TODO: define relevant message ids */ + + +#endif /* __CLOUDSYNC_MESSAGES_H__ */ + diff --git a/xlators/features/cloudsync/src/cloudsync.c b/xlators/features/cloudsync/src/cloudsync.c new file mode 100644 index 00000000000..8d74202706e --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync.c @@ -0,0 +1,1673 @@ +/* + * Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + * This file is part of GlusterFS. + * + * This file is licensed to you under your choice of the GNU Lesser + * General Public License, version 3 or any later version (LGPLv3 or + * later), or the GNU General Public License, version 2 (GPLv2), in all + * cases as published by the Free Software Foundation. + */ + +#include "glusterfs.h" +#include "xlator.h" +#include "defaults.h" +#include "cloudsync.h" +#include "cloudsync-common.h" +#include "call-stub.h" +#include "cloudsync-autogen-fops.h" + +#include <dlfcn.h> + +void +cs_cleanup_private (cs_private_t *priv) +{ + if (priv) { + if (priv->stores) { + priv->stores->fini (priv->stores->config); + GF_FREE (priv->stores); + } + + pthread_spin_destroy (&priv->lock); + GF_FREE (priv); + } + + return; +} + +int +cs_init (xlator_t *this) +{ + cs_private_t *priv = NULL; + gf_boolean_t per_vol = _gf_false; + int ret = 0; + char *libpath = ("libaws.so"); + store_methods_t *store_methods = NULL; + void *handle = NULL; + + priv = GF_CALLOC (1, sizeof (*priv), gf_cs_mt_cs_private_t); + if (!priv) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "insufficient memory"); + goto out; + } + + priv->this = this; + + this->local_pool = mem_pool_new (cs_local_t, 512); + if (!this->local_pool) { + gf_msg (this->name, GF_LOG_ERROR, 0, ENOMEM, + "initialisation failed."); + ret = -1; + goto out; + } + + this->private = priv; + + /* temp workaround. Should be configurable through glusterd*/ + per_vol = _gf_true; + + if (per_vol) { + /*TODO:Need to make it configurable. This is a temp workaround*/ + handle = dlopen (libpath, RTLD_NOW); + if (!handle) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "could not load" + " the required library. %s", dlerror ()); + goto out; + } else { + gf_msg (this->name, GF_LOG_INFO, 0, 0, + "loading library:%s successful", libpath); + } + + + priv->stores = GF_CALLOC (1, sizeof (struct cs_remote_stores), + gf_cs_mt_cs_remote_stores_t); + if (!priv->stores) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Could not " + "allocate memory for priv->stores"); + ret = -1; + goto out; + } + + (void) dlerror (); /* clear out previous error string */ + + /* load library methods */ + store_methods = (store_methods_t *) dlsym (handle, "store_ops"); + if (!store_methods) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "null store_methods %s", dlerror ()); + ret = -1; + goto out; + } + + (void) dlerror (); + + priv->stores->dlfop = store_methods->fop_download; + if (!priv->stores->dlfop) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "failed to get" + " download fop %s", dlerror ()); + ret = -1; + goto out; + } + + (void) dlerror (); + priv->stores->init = store_methods->fop_init; + if (!priv->stores->init) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "failed to get" + " init fop %s", dlerror ()); + ret = -1; + goto out; + } + + (void) dlerror (); + priv->stores->reconfigure = store_methods->fop_reconfigure; + if (!priv->stores->reconfigure) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "failed to get" + " reconfigure fop %s", dlerror ()); + ret = -1; + goto out; + } + + priv->stores->handle = handle; + + priv->stores->config = (void *) ((priv->stores->init) (this)); + if (!priv->stores->config) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "null config"); + ret = -1; + goto out; + } + } + +out: + if (ret == -1) { + if (this->local_pool) + mem_pool_destroy (this->local_pool); + + cs_cleanup_private (priv); + } + + return ret; +} + +void +cs_fini (xlator_t *this) +{ + cs_private_t *priv = NULL; + priv = this->private; + + cs_cleanup_private (priv); +} + +int +cs_reconfigure (xlator_t *this, dict_t *options) +{ + cs_private_t *priv = NULL; + int ret = 0; + + priv = this->private; + if (!priv) { + ret = -1; + goto out; + } + + /* needed only for per volume configuration*/ + ret = priv->stores->reconfigure (this, options); + +out: + return ret; +} + +int32_t +cs_mem_acct_init (xlator_t *this) +{ + int ret = -1; + + GF_VALIDATE_OR_GOTO ("cloudsync", this, out); + + ret = xlator_mem_acct_init (this, gf_cs_mt_end + 1); + + if (ret != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "Memory accounting init failed"); + return ret; + } +out: + return ret; +} + +int32_t +cs_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, gf_dirent_t *entries, + dict_t *xdata) +{ + gf_dirent_t *tmp = NULL; + char *sxattr = NULL; + uint64_t ia_size = 0; + int ret = 0; + + list_for_each_entry (tmp, &entries->list, list) { + ret = dict_get_str (tmp->dict, GF_CS_OBJECT_SIZE, &sxattr); + if (ret) { + gf_msg_trace (this->name, 0, "size xattr found"); + continue; + } + + ia_size = atoll (sxattr); + tmp->d_stat.ia_size = ia_size; + } + + STACK_UNWIND_STRICT (readdirp, frame, op_ret, op_errno, + entries, xdata); + return 0; +} + + +int32_t +cs_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t off, dict_t *xdata) +{ + int ret = 0; + int op_errno = ENOMEM; + + if (!xdata) { + xdata = dict_new (); + if (!xdata) { + goto err; + } + } + + ret = dict_set_int32 (xdata, GF_CS_OBJECT_SIZE, 1); + if (ret) { + goto err; + } + + STACK_WIND (frame, cs_readdirp_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, + fd, size, off, xdata); + return 0; +err: + STACK_UNWIND_STRICT (readdirp, frame, -1, op_errno, + NULL, NULL); + return 0; +} + + +int32_t +cs_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + cs_local_t *local = NULL; + int ret = 0; + uint64_t val = 0; + + local = frame->local; + + /* Do we need lock here? */ + local->call_cnt++; + + if (op_ret == -1) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "truncate failed"); + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (ret == 0) { + if (val == GF_CS_ERROR) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "could not get file state, unwinding"); + op_ret = -1; + op_errno = EIO; + goto unwind; + } else { + __cs_inode_ctx_update (this, local->loc.inode, + val); + gf_msg (this->name, GF_LOG_INFO, 0, 0, + " state = %ld", val); + + if (local->call_cnt == 1 && + (val == GF_CS_REMOTE || + val == GF_CS_DOWNLOADING)) { + gf_msg (this->name, GF_LOG_WARNING, 0, + 0, "will repair and download " + "the file, current state : %ld", + val); + goto repair; + } else { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "second truncate, Unwinding"); + goto unwind; + } + } + } else { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "file state " + "could not be figured, unwinding"); + goto unwind; + } + } else { + /* successful write => file is local */ + __cs_inode_ctx_update (this, local->loc.inode, GF_CS_LOCAL); + gf_msg (this->name, GF_LOG_INFO, 0, 0, "state : GF_CS_LOCAL" + ", truncate successful"); + + goto unwind; + } + +repair: + ret = locate_and_execute (frame); + if (ret) { + goto unwind; + } + + return 0; + +unwind: + CS_STACK_UNWIND (truncate, frame, op_ret, op_errno, prebuf, postbuf, + xdata); + return 0; +} + + +int32_t +cs_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, + dict_t *xdata) +{ + int op_errno = -1; + cs_local_t *local = NULL; + int ret = 0; + cs_inode_ctx_t *ctx = NULL; + gf_cs_obj_state state = -1; + + VALIDATE_OR_GOTO (frame, err); + VALIDATE_OR_GOTO (this, err); + VALIDATE_OR_GOTO (loc, err); + + local = cs_local_init (this, frame, loc, NULL, GF_FOP_TRUNCATE); + if (!local) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local init failed"); + op_errno = ENOMEM; + goto err; + } + + __cs_inode_ctx_get (this, loc->inode, &ctx); + + if (ctx) + state = __cs_get_file_state (this, loc->inode, ctx); + else + state = GF_CS_LOCAL; + + local->xattr_req = xdata ? dict_ref (xdata) : (xdata = dict_new ()); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + + local->stub = fop_truncate_stub (frame, cs_resume_truncate, loc, offset, + xdata); + if (!local->stub) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "insufficient memory"); + op_errno = ENOMEM; + goto err; + } + + if (state == GF_CS_LOCAL) { + STACK_WIND (frame, cs_truncate_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, + loc, offset, xdata); + + } else { + local->call_cnt++; + ret = locate_and_execute (frame); + if (ret) { + op_errno = ENOMEM; + goto err; + } + } + + return 0; +err: + CS_STACK_UNWIND (truncate, frame, -1, op_errno, NULL, NULL, NULL); + return 0; +} + +int32_t +cs_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct statvfs *buf, + dict_t *xdata) +{ + STACK_UNWIND_STRICT (statfs, frame, op_ret, op_errno, + buf, xdata); + return 0; +} + + +int32_t +cs_statfs (call_frame_t *frame, xlator_t *this, + loc_t *loc, + dict_t *xdata) +{ + STACK_WIND (frame, cs_statfs_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->statfs, + loc, xdata); + return 0; +} + + +int32_t +cs_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *dict, + dict_t *xdata) +{ + STACK_UNWIND_STRICT (getxattr, frame, op_ret, op_errno, + dict, xdata); + return 0; +} + + +int32_t +cs_getxattr (call_frame_t *frame, xlator_t *this, + loc_t *loc, + const char *name, + dict_t *xattr_req) +{ + STACK_WIND (frame, cs_getxattr_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr, + loc, name, xattr_req); + return 0; +} + +int32_t +cs_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + cs_local_t *local = NULL; + + local = frame->local; + + if (local->locked) + cs_inodelk_unlock (frame); + + CS_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata); + + return 0; +} + + +int32_t +cs_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, + int32_t flags, dict_t *xdata) +{ + data_t *tmp = NULL; + cs_local_t *local = NULL; + int ret = 0; + + VALIDATE_OR_GOTO (frame, err); + VALIDATE_OR_GOTO (this, err); + + local = cs_local_init (this, frame, loc, NULL, GF_FOP_SETXATTR); + if (!local) { + ret = -1; + goto err; + } + + local->xattr_req = xdata ? dict_ref (xdata) : (xdata = dict_new ()); + + tmp = dict_get (dict, GF_CS_OBJECT_UPLOAD_COMPLETE); + if (tmp) { + /* Value of key should be the atime */ + local->stub = fop_setxattr_stub (frame, cs_resume_setxattr, + loc, dict, flags, xdata); + + if (!local->stub) + goto err; + + ret = locate_and_execute (frame); + if (ret) { + goto err; + } + + return 0; + } + + STACK_WIND (frame, cs_setxattr_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, + loc, dict, flags, xdata); + return 0; +err: + CS_STACK_UNWIND (setxattr, frame, -1, errno, NULL); + return 0; +} + + +int32_t +cs_fgetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *dict, + dict_t *xdata) +{ + STACK_UNWIND_STRICT (fgetxattr, frame, op_ret, op_errno, + dict, xdata); + return 0; +} + + +int32_t +cs_fgetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name, + dict_t *xdata) +{ + STACK_WIND (frame, cs_fgetxattr_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fgetxattr, + fd, name, xdata); + return 0; +} + + + +int32_t +cs_fsetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno, + xdata); + return 0; +} + + +int32_t +cs_fsetxattr (call_frame_t *frame, xlator_t *this, + fd_t *fd, + dict_t *dict, + int32_t flags, + dict_t *xdata) +{ + STACK_WIND (frame, cs_fsetxattr_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, + fd, dict, flags, xdata); + return 0; +} + +int32_t +cs_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *preparent, + struct iatt *postparent, + dict_t *xdata) +{ + STACK_UNWIND_STRICT (unlink, frame, op_ret, op_errno, + preparent, postparent, xdata); + return 0; +} + + +int32_t +cs_unlink (call_frame_t *frame, xlator_t *this, + loc_t *loc, + int32_t flags, + dict_t *xattr_req) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = cs_local_init (this, frame, loc, NULL, GF_FOP_UNLINK); + if (!local) + goto err; + + local->xattr_req = xattr_req ? dict_ref (xattr_req) : dict_new (); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + STACK_WIND (frame, cs_unlink_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, + loc, flags, local->xattr_req); + return 0; +err: + CS_STACK_UNWIND (unlink, frame, -1, errno, NULL, NULL, NULL); + return 0; +} + + +int32_t +cs_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, fd_t *fd, + dict_t *xdata) +{ + int ret = 0; + uint64_t val = 0; + + if (op_ret == 0) { + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (!ret) { + ret = __cs_inode_ctx_update (this, fd->inode, val); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "ctx update failed"); + } + } + } else { + cs_inode_ctx_reset (this, fd->inode); + } + + CS_STACK_UNWIND (open, frame, op_ret, op_errno, fd, xdata); + return 0; +} + + +int32_t +cs_open (call_frame_t *frame, xlator_t *this, + loc_t *loc, + int32_t flags, + fd_t *fd, + dict_t *xattr_req) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = cs_local_init (this, frame, NULL, fd, GF_FOP_OPEN); + if (!local) + goto err; + + local->xattr_req = xattr_req ? dict_ref (xattr_req) : dict_new (); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + + STACK_WIND (frame, cs_open_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, + loc, flags, fd, local->xattr_req); + return 0; +err: + CS_STACK_UNWIND (open, frame, -1, errno, NULL, NULL); + return 0; +} + + +int32_t +cs_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *buf, + dict_t *xdata) +{ + int ret = 0; + uint64_t val = 0; + fd_t *fd = NULL; + cs_local_t *local = NULL; + + local = frame->local; + + fd = local->fd; + + if (op_ret == 0) { + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (!ret) { + gf_msg_debug (this->name, 0, "state %ld", val); + ret = __cs_inode_ctx_update (this, fd->inode, val); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "ctx update failed"); + } + } + } else { + cs_inode_ctx_reset (this, fd->inode); + } + + CS_STACK_UNWIND (fstat, frame, op_ret, op_errno, buf, xdata); + + return 0; +} + + +int32_t +cs_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xattr_req) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = cs_local_init (this, frame, NULL, fd, GF_FOP_FSTAT); + if (!local) + goto err; + + if (fd->inode->ia_type == IA_IFDIR) + goto wind; + + local->xattr_req = xattr_req ? dict_ref (xattr_req) : dict_new (); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + +wind: + STACK_WIND (frame, cs_fstat_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, + fd, local->xattr_req); + return 0; +err: + CS_STACK_UNWIND (fstat, frame, -1, errno, NULL, NULL); + return 0; +} + +cs_local_t * +cs_local_init (xlator_t *this, call_frame_t *frame, loc_t *loc, fd_t *fd, + glusterfs_fop_t fop) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = mem_get0 (this->local_pool); + if (!local) + goto out; + + if (loc) { + ret = loc_copy (&local->loc, loc); + if (ret) + goto out; + } + + if (fd) { + local->fd = fd_ref (fd); + } + + local->op_ret = -1; + local->op_errno = EUCLEAN; + local->fop = fop; + local->dloffset = 0; + frame->local = local; + local->locked = _gf_false; + local->call_cnt = 0; +out: + if (ret) { + if (local) + mem_put (local); + local = NULL; + } + + return local; +} + +call_frame_t * +cs_lock_frame (call_frame_t *parent_frame) +{ + call_frame_t *lock_frame = NULL; + + lock_frame = copy_frame (parent_frame); + + if (lock_frame == NULL) + goto out; + + set_lk_owner_from_ptr (&lock_frame->root->lk_owner, parent_frame->root); + +out: + return lock_frame; + +} + +void +cs_lock_wipe (call_frame_t *lock_frame) +{ + CS_STACK_DESTROY (lock_frame); +} + + +int32_t +cs_inodelk_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + cs_lock_wipe (frame); + + return 0; +} + +int +cs_inodelk_unlock (call_frame_t *main_frame) +{ + xlator_t *this = NULL; + struct gf_flock flock = {0,}; + call_frame_t *lock_frame = NULL; + cs_local_t *lock_local = NULL; + cs_local_t *main_local = NULL; + int ret = 0; + + this = main_frame->this; + main_local = main_frame->local; + + lock_frame = cs_lock_frame (main_frame); + if (!lock_frame) + goto out; + + lock_local = cs_local_init (this, lock_frame, NULL, NULL, 0); + if (!lock_local) + goto out; + + ret = cs_build_loc (&lock_local->loc, main_frame); + if (ret) { + goto out; + } + + flock.l_type = F_UNLCK; + + main_local->locked = _gf_false; + + STACK_WIND (lock_frame, cs_inodelk_unlock_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->inodelk, CS_LOCK_DOMAIN, + &lock_local->loc, F_SETLKW, &flock, NULL); + + return 0; + +out: + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Stale lock would be found on" + " server"); + + if (lock_frame) + cs_lock_wipe (lock_frame); + + return 0; +} + +void * +cs_download_task (void *arg) +{ + call_frame_t *frame = NULL; + xlator_t *this = NULL; + cs_private_t *priv = NULL; + int ret = -1; + char *sign_req = NULL; + fd_t *fd = NULL; + cs_local_t *local = NULL; + dict_t *dict = NULL; + int *retval = NULL; + + frame = (call_frame_t *)arg; + + this = frame->this; + + priv = this->private; + + local = frame->local; + + retval = GF_CALLOC (1, sizeof(int), gf_common_mt_int); + if (!retval) { + gf_log (this->name, GF_LOG_ERROR, "insufficient memory"); + ret = -1; + goto out; + } + + if (local->fd) + fd = fd_anonymous (local->fd->inode); + else + fd = fd_anonymous (local->loc.inode); + + if (!fd) { + gf_msg ("CS", GF_LOG_ERROR, 0, 0, "fd creation failed"); + ret = -1; + goto out; + } + + local->dlfd = fd; + local->dloffset = 0; + + dict = dict_new (); + if (!dict) { + gf_msg (this->name, GF_LOG_ERROR, 0, ENOMEM, "failed to create " + "dict"); + ret = -1; + goto out; + } + + ret = dict_set_uint32 (dict, GF_CS_OBJECT_DOWNLOADING, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed"); + ret = -1; + goto out; + } + + ret = syncop_fsetxattr (this, local->fd, dict, 0, NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "fsetxattr failed " + "key %s", GF_CS_OBJECT_DOWNLOADING); + ret = -1; + goto out; + } + /*this calling method is for per volume setting */ + ret = priv->stores->dlfop (frame, priv->stores->config); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "download failed" + ", remotepath: %s", local->remotepath); + + /*using dlfd as it is anonymous and have RDWR flag*/ + ret = syncop_ftruncate (FIRST_CHILD (this), local->dlfd, 0, + NULL, NULL, NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, -ret, + "ftruncate failed"); + } else { + gf_msg_debug (this->name, 0, "ftruncate succeed"); + } + + ret = -1; + goto out; + } else { + gf_msg (this->name, GF_LOG_INFO, 0, 0, "download success, path" + " : %s", local->remotepath); + + ret = syncop_fremovexattr (this, local->fd, + GF_CS_OBJECT_REMOTE, NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, -ret, + "removexattr failed, remotexattr"); + ret = -1; + goto out; + } else { + gf_msg_debug (this->name, 0, "fremovexattr success, " + "path : %s", local->remotepath); + } + + ret = syncop_fremovexattr (this, local->fd, + GF_CS_OBJECT_DOWNLOADING, NULL, + NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, -ret, + "removexattr failed, downloading xattr, path %s" + , local->remotepath); + ret = -1; + goto out; + } else { + gf_msg_debug (this->name, 0, "fremovexattr success" + " path %s", local->remotepath); + } + } + +out: + GF_FREE (sign_req); + + if (dict) + dict_unref (dict); + + if (fd) { + fd_unref (fd); + local->dlfd = NULL; + } + + if (retval) { + *retval = ret; + pthread_exit (retval); + } else { + pthread_exit (&ret); + } +} + +int +cs_download (call_frame_t *frame) +{ + int *retval = NULL; + int ret = 0; + pthread_t dthread; + cs_local_t *local = NULL; + xlator_t *this = NULL; + + local = frame->local; + this = frame->this; + + if (!local->remotepath) { + ret = -1; + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "remote path not" + " available. Check posix logs to resolve"); + goto out; + } + + ret = gf_thread_create (&dthread, NULL, &cs_download_task, + (void *)frame, "downloadthread"); + + pthread_join (dthread, (void **)&retval); + + ret = *retval; + +out: + if (retval) + GF_FREE (retval); + + return ret; +} + +int +cs_stat_check_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, struct iatt *stbuf, dict_t *xdata) +{ + cs_local_t *local = NULL; + call_stub_t *stub = NULL; + char *filepath = NULL; + int ret = 0; + inode_t *inode = NULL; + uint64_t val = 0; + + local = frame->local; + + if (op_ret == -1) { + local->op_ret = op_ret; + local->op_errno = op_errno; + gf_msg (this->name, GF_LOG_ERROR, 0, op_errno, + "stat check failed"); + goto err; + } else { + if (local->fd) + inode = local->fd->inode; + else + inode = local->loc.inode; + + if (!inode) { + local->op_ret = -1; + local->op_errno = EINVAL; + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "null inode " + "returned"); + goto err; + } + + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (ret == 0) { + if (val == GF_CS_ERROR) { + cs_inode_ctx_reset (this, inode); + local->op_ret = -1; + local->op_errno = EIO; + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "status = GF_CS_ERROR. failed to get " + " file state"); + goto err; + } else { + ret = __cs_inode_ctx_update (this, inode, val); + gf_msg_debug (this->name, 0, "status : %lu", + val); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "ctx update failed"); + local->op_ret = -1; + local->op_errno = ENOMEM; + goto err; + } + } + } else { + gf_msg_debug (this->name, 0, + "status not found in dict"); + local->op_ret = -1; + local->op_errno = ENOMEM; + goto err; + } + + ret = dict_get_str (xdata, GF_CS_OBJECT_REMOTE, &filepath); + if (filepath) { + gf_msg_debug (this->name, 0, "filepath returned %s", + filepath); + local->remotepath = gf_strdup (filepath); + if (!local->remotepath) { + local->op_ret = -1; + local->op_errno = ENOMEM; + goto err; + } + } else { + gf_msg_debug (this->name, 0, "NULL filepath"); + } + + local->op_ret = 0; + local->xattr_rsp = dict_ref (xdata); + memcpy (&local->stbuf, stbuf, sizeof (struct iatt)); + } + + stub = local->stub; + local->stub = NULL; + call_resume (stub); + + return 0; +err: + cs_inodelk_unlock (frame); + + cs_common_cbk (frame); + + return 0; +} + +int +cs_do_stat_check (call_frame_t *main_frame) +{ + cs_local_t *local = NULL; + xlator_t *this = NULL; + int ret = 0; + + local = main_frame->local; + this = main_frame->this; + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_REPAIR, 256); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed"); + goto err; + } + + if (local->fd) { + STACK_WIND (main_frame, cs_stat_check_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->fstat, local->fd, + local->xattr_req); + } else { + STACK_WIND (main_frame, cs_stat_check_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->stat, &local->loc, + local->xattr_req); + } + + return 0; + +err: + cs_inodelk_unlock (main_frame); + + cs_common_cbk (main_frame); + + return 0; +} + +void +cs_common_cbk (call_frame_t *frame) +{ + glusterfs_fop_t fop = -1; + cs_local_t *local = NULL; + + local = frame->local; + + fop = local->fop; + + /*Note: Only the failure case needs to be handled here. Since for + * successful stat check the fop will resume anyway. The unwind can + * happen from the fop_cbk and each cbk can unlock the inodelk in case + * a lock was taken before. The lock status can be stored in frame */ + + /* for failure case */ + + /*TODO: add other fops*/ + switch (fop) { + case GF_FOP_WRITE: + CS_STACK_UNWIND (writev, frame, local->op_ret, + local->op_errno, NULL, NULL, NULL); + break; + + case GF_FOP_SETXATTR: + CS_STACK_UNWIND (setxattr, frame, local->op_ret, + local->op_errno, NULL); + break; + case GF_FOP_READ: + CS_STACK_UNWIND (readv, frame, local->op_ret, + local->op_errno, NULL, 0, NULL, NULL, + NULL); + break; + case GF_FOP_FTRUNCATE: + CS_STACK_UNWIND (ftruncate, frame, local->op_ret, + local->op_errno, NULL, NULL, NULL); + break; + + + default: + break; + } + + return; +} + +int +cs_blocking_inodelk_cbk (call_frame_t *lock_frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + cs_local_t *main_local = NULL; + call_frame_t *main_frame = NULL; + cs_local_t *lock_local = NULL; + + lock_local = lock_frame->local; + + main_frame = lock_local->main_frame; + main_local = main_frame->local; + + if (op_ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "inodelk failed"); + main_local->op_errno = op_errno; + main_local->op_ret = op_ret; + goto err; + } + + main_local->locked = _gf_true; + + cs_lock_wipe (lock_frame); + + cs_do_stat_check (main_frame); + + return 0; +err: + cs_common_cbk (main_frame); + + cs_lock_wipe (lock_frame); + + return 0; +} + +int +cs_build_loc (loc_t *loc, call_frame_t *frame) +{ + cs_local_t *local = NULL; + int ret = -1; + + local = frame->local; + + if (local->fd) { + loc->inode = inode_ref (local->fd->inode); + if (loc->inode) { + gf_uuid_copy (loc->gfid, loc->inode->gfid); + ret = 0; + goto out; + } else { + ret = -1; + goto out; + } + } else { + loc->inode = inode_ref (local->loc.inode); + if (loc->inode) { + gf_uuid_copy (loc->gfid, loc->inode->gfid); + ret = 0; + goto out; + } else { + ret = -1; + goto out; + } + } +out: + return ret; +} + +int +cs_blocking_inodelk (call_frame_t *parent_frame) +{ + call_frame_t *lock_frame = NULL; + cs_local_t *lock_local = NULL; + xlator_t *this = NULL; + struct gf_flock flock = {0,}; + int ret = 0; + + lock_frame = cs_lock_frame (parent_frame); + if (!lock_frame) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "insuffcient memory"); + goto err; + } + + this = parent_frame->this; + + lock_local = cs_local_init (this, lock_frame, NULL, NULL, 0); + if (!lock_local) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local init failed"); + goto err; + } + + lock_local->main_frame = parent_frame; + + flock.l_type = F_WRLCK; + + ret = cs_build_loc (&lock_local->loc, parent_frame); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "build_loc failed"); + goto err; + } + + STACK_WIND (lock_frame, cs_blocking_inodelk_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->inodelk, CS_LOCK_DOMAIN, + &lock_local->loc, F_SETLKW, &flock, NULL); + + return 0; +err: + if (lock_frame) + cs_lock_wipe (lock_frame); + + return -1; +} + +int +locate_and_execute (call_frame_t *frame) +{ + int ret = 0; + + ret = cs_blocking_inodelk (frame); + + if (ret) + return -1; + else + return 0; +} + +int32_t +cs_resume_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, + off_t offset, dict_t *xattr_req) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = frame->local; + + ret = cs_resume_postprocess (this, frame, loc->inode); + if (ret) { + goto unwind; + } + + cs_inodelk_unlock (frame); + + STACK_WIND (frame, cs_truncate_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, + loc, offset, local->xattr_req); + + return 0; + +unwind: + cs_inodelk_unlock (frame); + + cs_common_cbk (frame); + + return 0; +} + + +int32_t +cs_resume_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, + dict_t *dict, int32_t flags, dict_t *xdata) +{ + cs_local_t *local = NULL; + cs_inode_ctx_t *ctx = NULL; + gf_cs_obj_state state = GF_CS_ERROR; + + local = frame->local; + + __cs_inode_ctx_get (this, loc->inode, &ctx); + + state = __cs_get_file_state (this, loc->inode, ctx); + + if (state == GF_CS_ERROR) { + /* file is already remote */ + local->op_ret = -1; + local->op_errno = EINVAL; + gf_msg (this->name, GF_LOG_WARNING, 0, 0, + "file %s , could not figure file state", loc->path); + goto unwind; + } + + + if (state == GF_CS_REMOTE) { + /* file is already remote */ + local->op_ret = -1; + local->op_errno = EINVAL; + gf_msg (this->name, GF_LOG_WARNING, 0, EINVAL, + "file %s is already remote", loc->path); + goto unwind; + } + + if (state == GF_CS_DOWNLOADING) { + gf_msg (this->name, GF_LOG_WARNING, 0, 0, + " file is in downloading state."); + local->op_ret = -1; + local->op_errno = EINVAL; + goto unwind; + } + + STACK_WIND (frame, cs_setxattr_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, + local->xattr_req); + + return 0; +unwind: + cs_inodelk_unlock (frame); + + cs_common_cbk (frame); + + return 0; +} + + + + +gf_cs_obj_state +__cs_get_file_state (xlator_t *this, inode_t *inode, cs_inode_ctx_t *ctx) +{ + gf_cs_obj_state state = -1; + + if (!ctx) + return GF_CS_ERROR; + + LOCK (&inode->lock); + { + state = ctx->state; + } + UNLOCK (&inode->lock); + + return state; +} + +void +__cs_inode_ctx_get (xlator_t *this, inode_t *inode, cs_inode_ctx_t **ctx) +{ + uint64_t ctxint = 0; + int ret = 0; + + LOCK (&inode->lock); + { + ret = __inode_ctx_get (inode, this, &ctxint); + } + UNLOCK (&inode->lock); + + if (ret) + *ctx = NULL; + else + *ctx = (cs_inode_ctx_t *)ctxint; + + return; +} + +int +__cs_inode_ctx_update (xlator_t *this, inode_t *inode, uint64_t val) +{ + cs_inode_ctx_t *ctx = NULL; + uint64_t ctxint = 0; + int ret = 0; + + LOCK (&inode->lock); + { + ret = __inode_ctx_get (inode, this, &ctxint); + if (ret) { + ctx = GF_CALLOC (1, sizeof (*ctx), gf_cs_mt_cs_inode_ctx_t); + if (!ctx) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "ctx allocation failed"); + ret = -1; + goto out; + } + + ctx->state = val; + + ctxint = (uint64_t) ctx; + + ret = __inode_ctx_set (inode, this, &ctxint); + if (ret) { + GF_FREE (ctx); + goto out; + } + } else { + ctx = (cs_inode_ctx_t *) ctxint; + + ctx->state = val; + } + + } + +out: + UNLOCK (&inode->lock); + + return ret; +} + +int +cs_inode_ctx_reset (xlator_t *this, inode_t *inode) +{ + cs_inode_ctx_t *ctx = NULL; + uint64_t ctxint = 0; + + inode_ctx_del (inode, this, &ctxint); + if (!ctxint) { + return 0; + } + + ctx = (cs_inode_ctx_t *)ctxint; + + GF_FREE (ctx); + return 0; +} + +int +cs_resume_postprocess (xlator_t *this, call_frame_t *frame, inode_t *inode) +{ + cs_local_t *local = NULL; + gf_cs_obj_state state = -1; + cs_inode_ctx_t *ctx = NULL; + int ret = 0; + + local = frame->local; + if (!local) { + ret = -1; + goto out; + } + + __cs_inode_ctx_get (this, inode, &ctx); + + state = __cs_get_file_state (this, inode, ctx); + if (state == GF_CS_ERROR) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "status is GF_CS_ERROR." + " Aborting write"); + local->op_ret = -1; + local->op_errno = EREMOTE; + ret = -1; + goto out; + } + + if (state == GF_CS_REMOTE || state == GF_CS_DOWNLOADING) { + gf_msg_debug (this->name, 0, "status is %d", state); + ret = cs_download (frame); + if (ret == 0) { + gf_msg_debug (this->name, 0, "Winding for Final Write"); + } else { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + " download failed, unwinding writev"); + local->op_ret = -1; + local->op_errno = EREMOTE; + ret = -1; + } + } +out: + return ret; +} +int32_t +cs_fdctx_to_dict (xlator_t *this, + fd_t *fd, + dict_t *dict) +{ + return 0; +} + + +int32_t +cs_inode (xlator_t *this) +{ + return 0; +} + + +int32_t +cs_inode_to_dict (xlator_t *this, + dict_t *dict) +{ + return 0; +} + + +int32_t +cs_history (xlator_t *this) +{ + return 0; +} + + +int32_t +cs_fd (xlator_t *this) +{ + return 0; +} + + +int32_t +cs_fd_to_dict (xlator_t *this, + dict_t *dict) +{ + return 0; +} + + +int32_t +cs_fdctx (xlator_t *this, + fd_t *fd) +{ + return 0; +} + + +int32_t +cs_inodectx (xlator_t *this, + inode_t *ino) +{ + return 0; +} + + +int32_t +cs_inodectx_to_dict (xlator_t *this, + inode_t *ino, + dict_t *dict) +{ + return 0; +} + + +int32_t +cs_priv_to_dict (xlator_t *this, + dict_t *dict) +{ + return 0; +} + + +int32_t +cs_priv (xlator_t *this) +{ + return 0; +} + +int +cs_notify (xlator_t *this, int event, void *data, ...) +{ + return default_notify (this, event, data); +} + + +struct xlator_fops cs_fops = { + .stat = cs_stat, + .readdirp = cs_readdirp, + .truncate = cs_truncate, + .seek = cs_seek, + .statfs = cs_statfs, + .fallocate = cs_fallocate, + .discard = cs_discard, + .getxattr = cs_getxattr, + .writev = cs_writev, + .setxattr = cs_setxattr, + .fgetxattr = cs_fgetxattr, + .lookup = cs_lookup, + .fsetxattr = cs_fsetxattr, + .readv = cs_readv, + .ftruncate = cs_ftruncate, + .rchecksum = cs_rchecksum, + .unlink = cs_unlink, + .open = cs_open, + .fstat = cs_fstat, + .zerofill = cs_zerofill, +}; + +struct xlator_cbks cs_cbks = { +}; + +struct xlator_dumpops cs_dumpops = { + .fdctx_to_dict = cs_fdctx_to_dict, + .inode = cs_inode, + .inode_to_dict = cs_inode_to_dict, + .history = cs_history, + .fd = cs_fd, + .fd_to_dict = cs_fd_to_dict, + .fdctx = cs_fdctx, + .inodectx = cs_inodectx, + .inodectx_to_dict = cs_inodectx_to_dict, + .priv_to_dict = cs_priv_to_dict, + .priv = cs_priv, +}; + +struct volume_options cs_options[] = { + { .key = {NULL} }, +}; + +xlator_api_t xlator_api = { + .init = cs_init, + .fini = cs_fini, + .notify = cs_notify, + .reconfigure = cs_reconfigure, + .mem_acct_init = cs_mem_acct_init, + .dumpops = &cs_dumpops, + .fops = &cs_fops, + .cbks = &cs_cbks, + .options = cs_options, + .identifier = "cloudsync", +}; diff --git a/xlators/features/cloudsync/src/cloudsync.h b/xlators/features/cloudsync/src/cloudsync.h new file mode 100644 index 00000000000..bd54d760864 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync.h @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + * This file is part of GlusterFS. + * + * This file is licensed to you under your choice of the GNU Lesser + * General Public License, version 3 or any later version (LGPLv3 or + * later), or the GNU General Public License, version 2 (GPLv2), in all + * cases as published by the Free Software Foundation. + */ + + +#ifndef __CLOUDSYNC_H__ +#define __CLOUDSYNC_H__ + +#include "glusterfs.h" +#include "xlator.h" +#include "defaults.h" +#include "syncop.h" +#include "call-stub.h" +#include "cloudsync-common.h" +#include "cloudsync-autogen-fops.h" + + +#define CS_LOCK_DOMAIN "cs.protect.file.stat" +typedef struct cs_dlstore { + off_t off; + struct iovec *vector; + int32_t count; + struct iobref *iobref; + uint32_t flags; +} cs_dlstore; + +typedef struct cs_inode_ctx { + gf_cs_obj_state state; +} cs_inode_ctx_t; + +cs_local_t * +cs_local_init (xlator_t *this, call_frame_t *frame, loc_t *loc, fd_t *fd, + glusterfs_fop_t fop); + +int +locate_and_execute (call_frame_t *frame); + + +int32_t +cs_resume_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, + dict_t *dict, int32_t flags, dict_t *xdata); + +int32_t +cs_inodelk_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *xdata); + +size_t +cs_write_callback (void *lcurlbuf, size_t size, size_t nitems, void *frame); + +void +cs_common_cbk (call_frame_t *frame); + +gf_boolean_t +cs_is_file_remote (struct iatt *stbuf, dict_t *xattr); + +int32_t +cs_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata); +int +cs_build_loc (loc_t *loc, call_frame_t *frame); + +int +cs_blocking_inodelk_cbk (call_frame_t *lock_frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata); + +int cs_read_authinfo(xlator_t *this); + +int +__cs_inode_ctx_update (xlator_t *this, inode_t *inode, uint64_t val); + +int +cs_inode_ctx_reset (xlator_t *this, inode_t *inode); + +void +__cs_inode_ctx_get (xlator_t *this, inode_t *inode, cs_inode_ctx_t **ctx); + +gf_cs_obj_state +__cs_get_file_state (xlator_t *this, inode_t *inode, cs_inode_ctx_t *ctx); + +int +cs_inodelk_unlock (call_frame_t *main_frame); + +int +cs_resume_postprocess (xlator_t *this, call_frame_t *frame, inode_t *inode); + +int32_t +cs_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata); +int32_t +cs_resume_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, + off_t offset, dict_t *xattr_req); +#endif /* __CLOUDSYNC_H__ */ + diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index caf421165f8..f31311b3b5f 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -4679,6 +4679,18 @@ client_graph_builder (volgen_graph_t *graph, glusterd_volinfo_t *volinfo, if (ret) return -1; + ret = dict_get_str_boolean (set_dict, "features.cloudsync", _gf_false); + if (ret == -1) + goto out; + + if (ret) { + xl = volgen_graph_add (graph, "features/cloudsync", volname); + if (!xl) { + ret = -1; + goto out; + } + } + ret = -1; xl = volgen_graph_add_as (graph, "debug/io-stats", volname); if (!xl) diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index 1c448809641..db4f7e4bcc8 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -3766,7 +3766,12 @@ struct volopt_map_entry glusterd_volopt_map[] = { .flags = VOLOPT_FLAG_CLIENT_OPT | VOLOPT_FLAG_XLATOR_OPT, .type = NO_DOC, }, - + { .key = "features.cloudsync", + .voltype = "features/cloudsync", + .value = "off", + .op_version = GD_OP_VERSION_4_1_0, + .flags = VOLOPT_FLAG_CLIENT_OPT + }, { .key = NULL } }; |