diff options
Diffstat (limited to 'xlators/features/cloudsync/src')
-rw-r--r-- | xlators/features/cloudsync/src/Makefile.am | 47 | ||||
-rw-r--r-- | xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.c | 30 | ||||
-rw-r--r-- | xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.h | 24 | ||||
-rw-r--r-- | xlators/features/cloudsync/src/cloudsync-common.c | 44 | ||||
-rw-r--r-- | xlators/features/cloudsync/src/cloudsync-common.h | 97 | ||||
-rw-r--r-- | xlators/features/cloudsync/src/cloudsync-fops-c.py | 305 | ||||
-rw-r--r-- | xlators/features/cloudsync/src/cloudsync-fops-h.py | 30 | ||||
-rw-r--r-- | xlators/features/cloudsync/src/cloudsync-mem-types.h | 23 | ||||
-rw-r--r-- | xlators/features/cloudsync/src/cloudsync-messages.h | 19 | ||||
-rw-r--r-- | xlators/features/cloudsync/src/cloudsync.c | 1673 | ||||
-rw-r--r-- | xlators/features/cloudsync/src/cloudsync.h | 100 |
11 files changed, 2392 insertions, 0 deletions
diff --git a/xlators/features/cloudsync/src/Makefile.am b/xlators/features/cloudsync/src/Makefile.am new file mode 100644 index 00000000000..da660d7401b --- /dev/null +++ b/xlators/features/cloudsync/src/Makefile.am @@ -0,0 +1,47 @@ +xlator_LTLIBRARIES = cloudsync.la + +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features + +CLOUDSYNC_SRC = $(top_srcdir)/xlators/features/cloudsync/src + +cloudsync_sources = cloudsync.c + +CLOUDSYNC_SRC = $(top_srcdir)/xlators/features/cloudsync/src +CLOUDSYNC_BLD = $(top_builddir)/xlators/features/cloudsync/src + +cloudsynccommon_sources = $(CLOUDSYNC_SRC)/cloudsync-common.c + +noinst_HEADERS = $(CLOUDSYNC_BLD)/cloudsync.h \ + $(CLOUDSYNC_BLD)/cloudsync-mem-types.h \ + $(CLOUDSYNC_BLD)/cloudsync-messages.h \ + $(CLOUDSYNC_BLD)/cloudsync-common.h + +cloudsync_la_SOURCES = $(cloudsync_sources) $(cloudsynccommon_sources) + +nodist_cloudsync_la_SOURCES = cloudsync-autogen-fops.c cloudsync-autogen-fops.h +BUILT_SOURCES = cloudsync-autogen-fops.h + +cloudsync_la_LDFLAGS = -module $(GF_XLATOR_DEFAULT_LDFLAGS) + +cloudsync_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/rpc/xdr/src -I$(top_builddir)/rpc/xdr/src + +AM_CFLAGS = -Wall -fno-strict-aliasing $(GF_CFLAGS) + +noinst_PYTHON = cloudsync-fops-c.py cloudsync-fops-h.py +EXTRA_DIST = cloudsync-autogen-fops-tmpl.c cloudsync-autogen-fops-tmpl.h + +cloudsync-autogen-fops.c: cloudsync-fops-c.py cloudsync-autogen-fops-tmpl.c + $(PYTHON) $(CLOUDSYNC_SRC)/cloudsync-fops-c.py \ + $(CLOUDSYNC_SRC)/cloudsync-autogen-fops-tmpl.c > $@ + +cloudsync-autogen-fops.h: cloudsync-fops-h.py cloudsync-autogen-fops-tmpl.h + $(PYTHON) $(CLOUDSYNC_SRC)/cloudsync-fops-h.py \ + $(CLOUDSYNC_SRC)/cloudsync-autogen-fops-tmpl.h > $@ + +CLEANFILES = $(nodist_cloudsync_la_SOURCES) + +uninstall-local: + rm -f $(DESTDIR)$(xlatordir)/cloudsync.so + diff --git a/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.c b/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.c new file mode 100644 index 00000000000..6bb68cd170c --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.c @@ -0,0 +1,30 @@ +/* + Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/* File: cloudsync-autogen-fops-tmpl.c + * This file contains the CLOUDSYNC autogenerated FOPs. This is run through + * the code generator, generator.py to generate the required FOPs. + */ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <dlfcn.h> + +#include "glusterfs.h" +#include "xlator.h" +#include "defaults.h" +#include "cloudsync.h" +#include "cloudsync-common.h" +#include "call-stub.h" + +#pragma generate diff --git a/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.h b/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.h new file mode 100644 index 00000000000..2db2a9c88c7 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-autogen-fops-tmpl.h @@ -0,0 +1,24 @@ +/* + Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +/* File: clousync-autogen-fops-tmpl.h + * This file contains the cloudsync autogenerated FOPs declarations. + */ + +#ifndef _CLOUDSYNC_AUTOGEN_FOPS_H +#define _CLOUDSYNC_AUTOGEN_FOPS_H + +#include "xlator.h" +#include "cloudsync.h" +#include "cloudsync-common.h" + +#pragma generate + +#endif /* _CLOUDSYNC_AUTOGEN_FOPS_H */ diff --git a/xlators/features/cloudsync/src/cloudsync-common.c b/xlators/features/cloudsync/src/cloudsync-common.c new file mode 100644 index 00000000000..d0d00decca3 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-common.c @@ -0,0 +1,44 @@ +/* + Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "cloudsync-common.h" + +void +cs_local_wipe (xlator_t *this, cs_local_t *local) +{ + if (!local) + return; + + loc_wipe (&local->loc); + + if (local->fd) { + fd_unref (local->fd); + local->fd = NULL; + } + + if (local->stub) { + call_stub_destroy (local->stub); + local->stub = NULL; + } + + if (local->xattr_req) + dict_unref (local->xattr_req); + + if (local->xattr_rsp) + dict_unref (local->xattr_rsp); + + if (local->dlfd) + fd_unref (local->dlfd); + + if (local->remotepath) + GF_FREE (local->remotepath); + + mem_put (local); +} diff --git a/xlators/features/cloudsync/src/cloudsync-common.h b/xlators/features/cloudsync/src/cloudsync-common.h new file mode 100644 index 00000000000..3298ab0a6f2 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-common.h @@ -0,0 +1,97 @@ +/* + Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ +#ifndef _CLOUDSYNC_COMMON_H +#define _CLOUDSYNC_COMMON_H + +#include "glusterfs.h" +#include "call-stub.h" +#include "xlator.h" +#include "syncop.h" +#include "cloudsync-mem-types.h" +#include "cloudsync-messages.h" + +typedef struct cs_local { + loc_t loc; + fd_t *fd; + call_stub_t *stub; + call_frame_t *main_frame; + int op_errno; + int op_ret; + fd_t *dlfd; + off_t dloffset; + struct iatt stbuf; + dict_t *xattr_rsp; + dict_t *xattr_req; + glusterfs_fop_t fop; + gf_boolean_t locked; + int call_cnt; + inode_t *inode; + char *remotepath; +} cs_local_t; + +typedef int (*fop_download_t) (call_frame_t *frame, void *config); + +typedef void *(*store_init) (xlator_t *this); + +typedef int (*store_reconfigure) (xlator_t *this, dict_t *options); + +typedef void (*store_fini) (void *config); + +struct cs_remote_stores { + char *name; /* store name */ + void *config; /* store related information */ + fop_download_t dlfop; /* store specific download function */ + store_init init; /* store init to initialize store config */ + store_reconfigure reconfigure; /* reconfigure store config */ + store_fini fini; + void *handle; /* shared library handle*/ +}; + +typedef struct cs_private { + xlator_t *this; + struct cs_remote_stores *stores; + gf_boolean_t abortdl; + pthread_spinlock_t lock; +} cs_private_t; + +void +cs_local_wipe (xlator_t *this, cs_local_t *local); + +#define CS_STACK_UNWIND(fop, frame, params ...) do { \ + cs_local_t *__local = NULL; \ + xlator_t *__xl = NULL; \ + if (frame) { \ + __xl = frame->this; \ + __local = frame->local; \ + frame->local = NULL; \ + } \ + STACK_UNWIND_STRICT (fop, frame, params); \ + cs_local_wipe (__xl, __local); \ +} while (0) + +#define CS_STACK_DESTROY(frame) do { \ + cs_local_t *__local = NULL; \ + xlator_t *__xl = NULL; \ + __xl = frame->this; \ + __local = frame->local; \ + frame->local = NULL; \ + STACK_DESTROY (frame->root); \ + cs_local_wipe (__xl, __local); \ +} while (0) + +typedef struct store_methods { + int (*fop_download) (call_frame_t *frame, void *config); + /* return type should be the store config */ + void *(*fop_init) (xlator_t *this); + int (*fop_reconfigure) (xlator_t *this, dict_t *options); + void (*fop_fini) (void *config); +} store_methods_t; + +#endif /* _CLOUDSYNC_COMMON_H */ diff --git a/xlators/features/cloudsync/src/cloudsync-fops-c.py b/xlators/features/cloudsync/src/cloudsync-fops-c.py new file mode 100644 index 00000000000..e3030724468 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-fops-c.py @@ -0,0 +1,305 @@ +#!/usr/bin/python + +import os +import sys + +curdir = os.path.dirname(sys.argv[0]) +gendir = os.path.join(curdir, '../../../../libglusterfs/src') +sys.path.append(gendir) +from generator import ops, fop_subs, cbk_subs, generate + +FD_DATA_MODIFYING_OP_FOP_TEMPLATE = """ +int32_t +cs_@NAME@ (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@) +{ + int op_errno = -1; + cs_local_t *local = NULL; + int ret = 0; + cs_inode_ctx_t *ctx = NULL; + gf_cs_obj_state state = -1; + + VALIDATE_OR_GOTO (frame, err); + VALIDATE_OR_GOTO (this, err); + VALIDATE_OR_GOTO (fd, err); + + local = cs_local_init (this, frame, NULL, fd, GF_FOP_@UPNAME@); + if (!local) { + + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local init failed"); + op_errno = ENOMEM; + goto err; + } + + __cs_inode_ctx_get (this, fd->inode, &ctx); + + if (ctx) + state = __cs_get_file_state (this, fd->inode, ctx); + else + state = GF_CS_LOCAL; + + local->xattr_req = xdata ? dict_ref (xdata) : (xdata = dict_new ()); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + + local->stub = fop_@NAME@_stub (frame, cs_resume_@NAME@, + @SHORT_ARGS@); + if (!local->stub) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "insufficient memory"); + op_errno = ENOMEM; + goto err; + } + + + if (state == GF_CS_LOCAL) { + STACK_WIND (frame, cs_@NAME@_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@, + @SHORT_ARGS@); + } else { + local->call_cnt++; + ret = locate_and_execute (frame); + if (ret) { + op_errno = ENOMEM; + goto err; + } + } + + return 0; + +err: + CS_STACK_UNWIND (@NAME@, frame, -1, op_errno, @CBK_ERROR_ARGS@); + + return 0; +} +""" + +FD_DATA_MODIFYING_RESUME_OP_FOP_TEMPLATE = """ +int32_t +cs_resume_@NAME@ (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@) +{ + int ret = 0; + + ret = cs_resume_postprocess (this, frame, fd->inode); + if (ret) { + goto unwind; + } + + cs_inodelk_unlock (frame); + + STACK_WIND (frame, cs_@NAME@_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@, + @SHORT_ARGS@); + + return 0; + +unwind: + + cs_inodelk_unlock (frame); + + cs_common_cbk (frame); + + return 0; +} +""" +FD_DATA_MODIFYING_OP_FOP_CBK_TEMPLATE = """ +int32_t +cs_@NAME@_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + @LONG_ARGS@) +{ + cs_local_t *local = NULL; + int ret = 0; + uint64_t val = 0; + fd_t *fd = NULL; + + local = frame->local; + fd = local->fd; + + /* Do we need lock here? */ + local->call_cnt++; + + if (op_ret == -1) { + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (ret == 0) { + if (val == GF_CS_ERROR) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "could not get file state, unwinding"); + op_ret = -1; + op_errno = EIO; + goto unwind; + } else { + __cs_inode_ctx_update (this, fd->inode, val); + gf_msg (this->name, GF_LOG_INFO, 0, 0, + " state = %ld", val); + + if (local->call_cnt == 1 && + (val == GF_CS_REMOTE || + val == GF_CS_DOWNLOADING)) { + gf_msg (this->name, GF_LOG_INFO, 0, + 0, " will repair and download " + "the file, current state : %ld", + val); + goto repair; + } else { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "second @NAME@, Unwinding"); + goto unwind; + } + } + } else { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "file state " + "could not be figured, unwinding"); + goto unwind; + } + } else { + /* successful @NAME@ => file is local */ + __cs_inode_ctx_update (this, fd->inode, GF_CS_LOCAL); + gf_msg (this->name, GF_LOG_INFO, 0, 0, "state : GF_CS_LOCAL" + ", @NAME@ successful"); + + goto unwind; + } + +repair: + ret = locate_and_execute (frame); + if (ret) { + goto unwind; + } + + return 0; + +unwind: + CS_STACK_UNWIND (@NAME@, frame, op_ret, op_errno, @SHORT_ARGS@); + + return 0; +} +""" + +LOC_STAT_OP_FOP_TEMPLATE = """ +int32_t +cs_@NAME@ (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = cs_local_init (this, frame, loc, NULL, GF_FOP_@UPNAME@); + if (!local) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local is NULL"); + goto err; + } + + if (loc->inode->ia_type == IA_IFDIR) + goto wind; + + local->xattr_req = xdata ? dict_ref (xdata) : dict_new (); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + +wind: + STACK_WIND (frame, cs_@NAME@_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->@NAME@, + @SHORT_ARGS@); + + return 0; +err: + CS_STACK_UNWIND (@NAME@, frame, -1, errno, @CBK_ERROR_ARGS@); + + return 0; +} +""" + +LOC_STAT_OP_FOP_CBK_TEMPLATE = """ +int32_t +cs_@NAME@_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + @LONG_ARGS@) +{ + int ret = 0; + uint64_t val = 0; + loc_t *loc = NULL; + cs_local_t *local = NULL; + + local = frame->local; + + loc = &local->loc; + + if (op_ret == 0) { + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (!ret) { + ret = __cs_inode_ctx_update (this, loc->inode, val); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "ctx update failed"); + } + } + } else { + cs_inode_ctx_reset (this, loc->inode); + } + + CS_STACK_UNWIND (@NAME@, frame, op_ret, op_errno, @SHORT_ARGS@); + + return 0; +} +""" + +# All xlator FOPs are covered in the following section just to create a clarity +# The lists themselves are not used. +entry_ops = ['mknod', 'mkdir', 'unlink', 'rmdir', 'symlink', 'rename', 'link', + 'create'] +special_ops = ['statfs', 'lookup', 'ipc', 'compound', 'icreate', 'namelink'] +ignored_ops = ['getspec'] +inode_ops = ['stat', 'readlink', 'truncate', 'open', 'setxattr', 'getxattr', + 'removexattr', 'opendir', 'access', 'inodelk', 'entrylk', + 'xattrop', 'setattr', 'lease', 'getactivelk', 'setactivelk', + 'discover'] +fd_ops = ['readv', 'writev', 'flush', 'fsync', 'fsyncdir', 'ftruncate', + 'fstat', 'lk', 'readdir', 'finodelk', 'fentrylk', 'fxattrop', + 'fsetxattr', 'fgetxattr', 'rchecksum', 'fsetattr', 'readdirp', + 'fremovexattr', 'fallocate', 'discard', 'zerofill', 'seek'] + + +# These are the current actual lists used to generate the code + +# The following list contains fops which are fd based that modifies data +fd_data_modify_op_fop_template = ['readv', 'writev', 'flush', 'fsync', + 'ftruncate', 'rchecksum', 'fallocate', + 'discard', 'zerofill', 'seek'] + +# The following list contains fops which are entry based that does not change +# data +loc_stat_op_fop_template = ['lookup', 'stat', 'discover', 'access', 'setattr', + 'getattr'] + +# These fops need a separate implementation +special_fops = ['readdirp', 'statfs', 'setxattr', 'unlink', 'getxattr', + 'truncate', 'fstat'] + +def gen_defaults(): + for name in ops: + if name in fd_data_modify_op_fop_template: + print generate(FD_DATA_MODIFYING_OP_FOP_CBK_TEMPLATE, name, cbk_subs) + print generate(FD_DATA_MODIFYING_RESUME_OP_FOP_TEMPLATE, name, fop_subs) + print generate(FD_DATA_MODIFYING_OP_FOP_TEMPLATE, name, fop_subs) + elif name in loc_stat_op_fop_template: + print generate(LOC_STAT_OP_FOP_CBK_TEMPLATE, name, cbk_subs) + print generate(LOC_STAT_OP_FOP_TEMPLATE, name, fop_subs) + +for l in open(sys.argv[1], 'r').readlines(): + if l.find('#pragma generate') != -1: + print "/* BEGIN GENERATED CODE - DO NOT MODIFY */" + gen_defaults() + print "/* END GENERATED CODE */" + else: + print l[:-1] diff --git a/xlators/features/cloudsync/src/cloudsync-fops-h.py b/xlators/features/cloudsync/src/cloudsync-fops-h.py new file mode 100644 index 00000000000..552c6b58e3a --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-fops-h.py @@ -0,0 +1,30 @@ +#!/usr/bin/python + +import os +import sys + +curdir = os.path.dirname(sys.argv[0]) +gendir = os.path.join(curdir, '../../../../libglusterfs/src') +sys.path.append(gendir) +from generator import ops, fop_subs, cbk_subs, generate + +OP_FOP_TEMPLATE = """ +int32_t +cs_@NAME@ (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@); +""" + +def gen_defaults(): + for name, value in ops.iteritems(): + if name == 'getspec': + continue + print generate(OP_FOP_TEMPLATE, name, fop_subs) + + +for l in open(sys.argv[1], 'r').readlines(): + if l.find('#pragma generate') != -1: + print "/* BEGIN GENERATED CODE - DO NOT MODIFY */" + gen_defaults() + print "/* END GENERATED CODE */" + else: + print l[:-1] diff --git a/xlators/features/cloudsync/src/cloudsync-mem-types.h b/xlators/features/cloudsync/src/cloudsync-mem-types.h new file mode 100644 index 00000000000..6ebcb16552b --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-mem-types.h @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + * This file is part of GlusterFS. + * + * This file is licensed to you under your choice of the GNU Lesser + * General Public License, version 3 or any later version (LGPLv3 or + * later), or the GNU General Public License, version 2 (GPLv2), in all + * cases as published by the Free Software Foundation. + */ + + +#ifndef __CLOUDSYNC_MEM_TYPES_H__ +#define __CLOUDSYNC_MEM_TYPES_H__ + +#include "mem-types.h" +enum cs_mem_types_ { + gf_cs_mt_cs_private_t = gf_common_mt_end + 1, + gf_cs_mt_cs_remote_stores_t, + gf_cs_mt_cs_inode_ctx_t, + gf_cs_mt_end +}; +#endif /* __CLOUDSYNC_MEM_TYPES_H__ */ + diff --git a/xlators/features/cloudsync/src/cloudsync-messages.h b/xlators/features/cloudsync/src/cloudsync-messages.h new file mode 100644 index 00000000000..ad4b7d2e0b8 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync-messages.h @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + * This file is part of GlusterFS. + * + * This file is licensed to you under your choice of the GNU Lesser + * General Public License, version 3 or any later version (LGPLv3 or + * later), or the GNU General Public License, version 2 (GPLv2), in all + * cases as published by the Free Software Foundation. + */ + + +#ifndef __CLOUDSYNC_MESSAGES_H__ +#define __CLOUDSYNC_MESSAGES_H__ + +/*TODO: define relevant message ids */ + + +#endif /* __CLOUDSYNC_MESSAGES_H__ */ + diff --git a/xlators/features/cloudsync/src/cloudsync.c b/xlators/features/cloudsync/src/cloudsync.c new file mode 100644 index 00000000000..8d74202706e --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync.c @@ -0,0 +1,1673 @@ +/* + * Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + * This file is part of GlusterFS. + * + * This file is licensed to you under your choice of the GNU Lesser + * General Public License, version 3 or any later version (LGPLv3 or + * later), or the GNU General Public License, version 2 (GPLv2), in all + * cases as published by the Free Software Foundation. + */ + +#include "glusterfs.h" +#include "xlator.h" +#include "defaults.h" +#include "cloudsync.h" +#include "cloudsync-common.h" +#include "call-stub.h" +#include "cloudsync-autogen-fops.h" + +#include <dlfcn.h> + +void +cs_cleanup_private (cs_private_t *priv) +{ + if (priv) { + if (priv->stores) { + priv->stores->fini (priv->stores->config); + GF_FREE (priv->stores); + } + + pthread_spin_destroy (&priv->lock); + GF_FREE (priv); + } + + return; +} + +int +cs_init (xlator_t *this) +{ + cs_private_t *priv = NULL; + gf_boolean_t per_vol = _gf_false; + int ret = 0; + char *libpath = ("libaws.so"); + store_methods_t *store_methods = NULL; + void *handle = NULL; + + priv = GF_CALLOC (1, sizeof (*priv), gf_cs_mt_cs_private_t); + if (!priv) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "insufficient memory"); + goto out; + } + + priv->this = this; + + this->local_pool = mem_pool_new (cs_local_t, 512); + if (!this->local_pool) { + gf_msg (this->name, GF_LOG_ERROR, 0, ENOMEM, + "initialisation failed."); + ret = -1; + goto out; + } + + this->private = priv; + + /* temp workaround. Should be configurable through glusterd*/ + per_vol = _gf_true; + + if (per_vol) { + /*TODO:Need to make it configurable. This is a temp workaround*/ + handle = dlopen (libpath, RTLD_NOW); + if (!handle) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "could not load" + " the required library. %s", dlerror ()); + goto out; + } else { + gf_msg (this->name, GF_LOG_INFO, 0, 0, + "loading library:%s successful", libpath); + } + + + priv->stores = GF_CALLOC (1, sizeof (struct cs_remote_stores), + gf_cs_mt_cs_remote_stores_t); + if (!priv->stores) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Could not " + "allocate memory for priv->stores"); + ret = -1; + goto out; + } + + (void) dlerror (); /* clear out previous error string */ + + /* load library methods */ + store_methods = (store_methods_t *) dlsym (handle, "store_ops"); + if (!store_methods) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "null store_methods %s", dlerror ()); + ret = -1; + goto out; + } + + (void) dlerror (); + + priv->stores->dlfop = store_methods->fop_download; + if (!priv->stores->dlfop) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "failed to get" + " download fop %s", dlerror ()); + ret = -1; + goto out; + } + + (void) dlerror (); + priv->stores->init = store_methods->fop_init; + if (!priv->stores->init) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "failed to get" + " init fop %s", dlerror ()); + ret = -1; + goto out; + } + + (void) dlerror (); + priv->stores->reconfigure = store_methods->fop_reconfigure; + if (!priv->stores->reconfigure) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "failed to get" + " reconfigure fop %s", dlerror ()); + ret = -1; + goto out; + } + + priv->stores->handle = handle; + + priv->stores->config = (void *) ((priv->stores->init) (this)); + if (!priv->stores->config) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "null config"); + ret = -1; + goto out; + } + } + +out: + if (ret == -1) { + if (this->local_pool) + mem_pool_destroy (this->local_pool); + + cs_cleanup_private (priv); + } + + return ret; +} + +void +cs_fini (xlator_t *this) +{ + cs_private_t *priv = NULL; + priv = this->private; + + cs_cleanup_private (priv); +} + +int +cs_reconfigure (xlator_t *this, dict_t *options) +{ + cs_private_t *priv = NULL; + int ret = 0; + + priv = this->private; + if (!priv) { + ret = -1; + goto out; + } + + /* needed only for per volume configuration*/ + ret = priv->stores->reconfigure (this, options); + +out: + return ret; +} + +int32_t +cs_mem_acct_init (xlator_t *this) +{ + int ret = -1; + + GF_VALIDATE_OR_GOTO ("cloudsync", this, out); + + ret = xlator_mem_acct_init (this, gf_cs_mt_end + 1); + + if (ret != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "Memory accounting init failed"); + return ret; + } +out: + return ret; +} + +int32_t +cs_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, gf_dirent_t *entries, + dict_t *xdata) +{ + gf_dirent_t *tmp = NULL; + char *sxattr = NULL; + uint64_t ia_size = 0; + int ret = 0; + + list_for_each_entry (tmp, &entries->list, list) { + ret = dict_get_str (tmp->dict, GF_CS_OBJECT_SIZE, &sxattr); + if (ret) { + gf_msg_trace (this->name, 0, "size xattr found"); + continue; + } + + ia_size = atoll (sxattr); + tmp->d_stat.ia_size = ia_size; + } + + STACK_UNWIND_STRICT (readdirp, frame, op_ret, op_errno, + entries, xdata); + return 0; +} + + +int32_t +cs_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t off, dict_t *xdata) +{ + int ret = 0; + int op_errno = ENOMEM; + + if (!xdata) { + xdata = dict_new (); + if (!xdata) { + goto err; + } + } + + ret = dict_set_int32 (xdata, GF_CS_OBJECT_SIZE, 1); + if (ret) { + goto err; + } + + STACK_WIND (frame, cs_readdirp_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp, + fd, size, off, xdata); + return 0; +err: + STACK_UNWIND_STRICT (readdirp, frame, -1, op_errno, + NULL, NULL); + return 0; +} + + +int32_t +cs_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + cs_local_t *local = NULL; + int ret = 0; + uint64_t val = 0; + + local = frame->local; + + /* Do we need lock here? */ + local->call_cnt++; + + if (op_ret == -1) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "truncate failed"); + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (ret == 0) { + if (val == GF_CS_ERROR) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "could not get file state, unwinding"); + op_ret = -1; + op_errno = EIO; + goto unwind; + } else { + __cs_inode_ctx_update (this, local->loc.inode, + val); + gf_msg (this->name, GF_LOG_INFO, 0, 0, + " state = %ld", val); + + if (local->call_cnt == 1 && + (val == GF_CS_REMOTE || + val == GF_CS_DOWNLOADING)) { + gf_msg (this->name, GF_LOG_WARNING, 0, + 0, "will repair and download " + "the file, current state : %ld", + val); + goto repair; + } else { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "second truncate, Unwinding"); + goto unwind; + } + } + } else { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "file state " + "could not be figured, unwinding"); + goto unwind; + } + } else { + /* successful write => file is local */ + __cs_inode_ctx_update (this, local->loc.inode, GF_CS_LOCAL); + gf_msg (this->name, GF_LOG_INFO, 0, 0, "state : GF_CS_LOCAL" + ", truncate successful"); + + goto unwind; + } + +repair: + ret = locate_and_execute (frame); + if (ret) { + goto unwind; + } + + return 0; + +unwind: + CS_STACK_UNWIND (truncate, frame, op_ret, op_errno, prebuf, postbuf, + xdata); + return 0; +} + + +int32_t +cs_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, + dict_t *xdata) +{ + int op_errno = -1; + cs_local_t *local = NULL; + int ret = 0; + cs_inode_ctx_t *ctx = NULL; + gf_cs_obj_state state = -1; + + VALIDATE_OR_GOTO (frame, err); + VALIDATE_OR_GOTO (this, err); + VALIDATE_OR_GOTO (loc, err); + + local = cs_local_init (this, frame, loc, NULL, GF_FOP_TRUNCATE); + if (!local) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local init failed"); + op_errno = ENOMEM; + goto err; + } + + __cs_inode_ctx_get (this, loc->inode, &ctx); + + if (ctx) + state = __cs_get_file_state (this, loc->inode, ctx); + else + state = GF_CS_LOCAL; + + local->xattr_req = xdata ? dict_ref (xdata) : (xdata = dict_new ()); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + + local->stub = fop_truncate_stub (frame, cs_resume_truncate, loc, offset, + xdata); + if (!local->stub) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "insufficient memory"); + op_errno = ENOMEM; + goto err; + } + + if (state == GF_CS_LOCAL) { + STACK_WIND (frame, cs_truncate_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, + loc, offset, xdata); + + } else { + local->call_cnt++; + ret = locate_and_execute (frame); + if (ret) { + op_errno = ENOMEM; + goto err; + } + } + + return 0; +err: + CS_STACK_UNWIND (truncate, frame, -1, op_errno, NULL, NULL, NULL); + return 0; +} + +int32_t +cs_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct statvfs *buf, + dict_t *xdata) +{ + STACK_UNWIND_STRICT (statfs, frame, op_ret, op_errno, + buf, xdata); + return 0; +} + + +int32_t +cs_statfs (call_frame_t *frame, xlator_t *this, + loc_t *loc, + dict_t *xdata) +{ + STACK_WIND (frame, cs_statfs_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->statfs, + loc, xdata); + return 0; +} + + +int32_t +cs_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *dict, + dict_t *xdata) +{ + STACK_UNWIND_STRICT (getxattr, frame, op_ret, op_errno, + dict, xdata); + return 0; +} + + +int32_t +cs_getxattr (call_frame_t *frame, xlator_t *this, + loc_t *loc, + const char *name, + dict_t *xattr_req) +{ + STACK_WIND (frame, cs_getxattr_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->getxattr, + loc, name, xattr_req); + return 0; +} + +int32_t +cs_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + cs_local_t *local = NULL; + + local = frame->local; + + if (local->locked) + cs_inodelk_unlock (frame); + + CS_STACK_UNWIND (setxattr, frame, op_ret, op_errno, xdata); + + return 0; +} + + +int32_t +cs_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, + int32_t flags, dict_t *xdata) +{ + data_t *tmp = NULL; + cs_local_t *local = NULL; + int ret = 0; + + VALIDATE_OR_GOTO (frame, err); + VALIDATE_OR_GOTO (this, err); + + local = cs_local_init (this, frame, loc, NULL, GF_FOP_SETXATTR); + if (!local) { + ret = -1; + goto err; + } + + local->xattr_req = xdata ? dict_ref (xdata) : (xdata = dict_new ()); + + tmp = dict_get (dict, GF_CS_OBJECT_UPLOAD_COMPLETE); + if (tmp) { + /* Value of key should be the atime */ + local->stub = fop_setxattr_stub (frame, cs_resume_setxattr, + loc, dict, flags, xdata); + + if (!local->stub) + goto err; + + ret = locate_and_execute (frame); + if (ret) { + goto err; + } + + return 0; + } + + STACK_WIND (frame, cs_setxattr_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr, + loc, dict, flags, xdata); + return 0; +err: + CS_STACK_UNWIND (setxattr, frame, -1, errno, NULL); + return 0; +} + + +int32_t +cs_fgetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *dict, + dict_t *xdata) +{ + STACK_UNWIND_STRICT (fgetxattr, frame, op_ret, op_errno, + dict, xdata); + return 0; +} + + +int32_t +cs_fgetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name, + dict_t *xdata) +{ + STACK_WIND (frame, cs_fgetxattr_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fgetxattr, + fd, name, xdata); + return 0; +} + + + +int32_t +cs_fsetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno, + xdata); + return 0; +} + + +int32_t +cs_fsetxattr (call_frame_t *frame, xlator_t *this, + fd_t *fd, + dict_t *dict, + int32_t flags, + dict_t *xdata) +{ + STACK_WIND (frame, cs_fsetxattr_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetxattr, + fd, dict, flags, xdata); + return 0; +} + +int32_t +cs_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *preparent, + struct iatt *postparent, + dict_t *xdata) +{ + STACK_UNWIND_STRICT (unlink, frame, op_ret, op_errno, + preparent, postparent, xdata); + return 0; +} + + +int32_t +cs_unlink (call_frame_t *frame, xlator_t *this, + loc_t *loc, + int32_t flags, + dict_t *xattr_req) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = cs_local_init (this, frame, loc, NULL, GF_FOP_UNLINK); + if (!local) + goto err; + + local->xattr_req = xattr_req ? dict_ref (xattr_req) : dict_new (); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + STACK_WIND (frame, cs_unlink_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink, + loc, flags, local->xattr_req); + return 0; +err: + CS_STACK_UNWIND (unlink, frame, -1, errno, NULL, NULL, NULL); + return 0; +} + + +int32_t +cs_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, fd_t *fd, + dict_t *xdata) +{ + int ret = 0; + uint64_t val = 0; + + if (op_ret == 0) { + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (!ret) { + ret = __cs_inode_ctx_update (this, fd->inode, val); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "ctx update failed"); + } + } + } else { + cs_inode_ctx_reset (this, fd->inode); + } + + CS_STACK_UNWIND (open, frame, op_ret, op_errno, fd, xdata); + return 0; +} + + +int32_t +cs_open (call_frame_t *frame, xlator_t *this, + loc_t *loc, + int32_t flags, + fd_t *fd, + dict_t *xattr_req) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = cs_local_init (this, frame, NULL, fd, GF_FOP_OPEN); + if (!local) + goto err; + + local->xattr_req = xattr_req ? dict_ref (xattr_req) : dict_new (); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + + STACK_WIND (frame, cs_open_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, + loc, flags, fd, local->xattr_req); + return 0; +err: + CS_STACK_UNWIND (open, frame, -1, errno, NULL, NULL); + return 0; +} + + +int32_t +cs_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *buf, + dict_t *xdata) +{ + int ret = 0; + uint64_t val = 0; + fd_t *fd = NULL; + cs_local_t *local = NULL; + + local = frame->local; + + fd = local->fd; + + if (op_ret == 0) { + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (!ret) { + gf_msg_debug (this->name, 0, "state %ld", val); + ret = __cs_inode_ctx_update (this, fd->inode, val); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "ctx update failed"); + } + } + } else { + cs_inode_ctx_reset (this, fd->inode); + } + + CS_STACK_UNWIND (fstat, frame, op_ret, op_errno, buf, xdata); + + return 0; +} + + +int32_t +cs_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xattr_req) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = cs_local_init (this, frame, NULL, fd, GF_FOP_FSTAT); + if (!local) + goto err; + + if (fd->inode->ia_type == IA_IFDIR) + goto wind; + + local->xattr_req = xattr_req ? dict_ref (xattr_req) : dict_new (); + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_STATUS, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed key:" + " %s", GF_CS_OBJECT_STATUS); + goto err; + } + +wind: + STACK_WIND (frame, cs_fstat_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, + fd, local->xattr_req); + return 0; +err: + CS_STACK_UNWIND (fstat, frame, -1, errno, NULL, NULL); + return 0; +} + +cs_local_t * +cs_local_init (xlator_t *this, call_frame_t *frame, loc_t *loc, fd_t *fd, + glusterfs_fop_t fop) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = mem_get0 (this->local_pool); + if (!local) + goto out; + + if (loc) { + ret = loc_copy (&local->loc, loc); + if (ret) + goto out; + } + + if (fd) { + local->fd = fd_ref (fd); + } + + local->op_ret = -1; + local->op_errno = EUCLEAN; + local->fop = fop; + local->dloffset = 0; + frame->local = local; + local->locked = _gf_false; + local->call_cnt = 0; +out: + if (ret) { + if (local) + mem_put (local); + local = NULL; + } + + return local; +} + +call_frame_t * +cs_lock_frame (call_frame_t *parent_frame) +{ + call_frame_t *lock_frame = NULL; + + lock_frame = copy_frame (parent_frame); + + if (lock_frame == NULL) + goto out; + + set_lk_owner_from_ptr (&lock_frame->root->lk_owner, parent_frame->root); + +out: + return lock_frame; + +} + +void +cs_lock_wipe (call_frame_t *lock_frame) +{ + CS_STACK_DESTROY (lock_frame); +} + + +int32_t +cs_inodelk_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + cs_lock_wipe (frame); + + return 0; +} + +int +cs_inodelk_unlock (call_frame_t *main_frame) +{ + xlator_t *this = NULL; + struct gf_flock flock = {0,}; + call_frame_t *lock_frame = NULL; + cs_local_t *lock_local = NULL; + cs_local_t *main_local = NULL; + int ret = 0; + + this = main_frame->this; + main_local = main_frame->local; + + lock_frame = cs_lock_frame (main_frame); + if (!lock_frame) + goto out; + + lock_local = cs_local_init (this, lock_frame, NULL, NULL, 0); + if (!lock_local) + goto out; + + ret = cs_build_loc (&lock_local->loc, main_frame); + if (ret) { + goto out; + } + + flock.l_type = F_UNLCK; + + main_local->locked = _gf_false; + + STACK_WIND (lock_frame, cs_inodelk_unlock_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->inodelk, CS_LOCK_DOMAIN, + &lock_local->loc, F_SETLKW, &flock, NULL); + + return 0; + +out: + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Stale lock would be found on" + " server"); + + if (lock_frame) + cs_lock_wipe (lock_frame); + + return 0; +} + +void * +cs_download_task (void *arg) +{ + call_frame_t *frame = NULL; + xlator_t *this = NULL; + cs_private_t *priv = NULL; + int ret = -1; + char *sign_req = NULL; + fd_t *fd = NULL; + cs_local_t *local = NULL; + dict_t *dict = NULL; + int *retval = NULL; + + frame = (call_frame_t *)arg; + + this = frame->this; + + priv = this->private; + + local = frame->local; + + retval = GF_CALLOC (1, sizeof(int), gf_common_mt_int); + if (!retval) { + gf_log (this->name, GF_LOG_ERROR, "insufficient memory"); + ret = -1; + goto out; + } + + if (local->fd) + fd = fd_anonymous (local->fd->inode); + else + fd = fd_anonymous (local->loc.inode); + + if (!fd) { + gf_msg ("CS", GF_LOG_ERROR, 0, 0, "fd creation failed"); + ret = -1; + goto out; + } + + local->dlfd = fd; + local->dloffset = 0; + + dict = dict_new (); + if (!dict) { + gf_msg (this->name, GF_LOG_ERROR, 0, ENOMEM, "failed to create " + "dict"); + ret = -1; + goto out; + } + + ret = dict_set_uint32 (dict, GF_CS_OBJECT_DOWNLOADING, 1); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed"); + ret = -1; + goto out; + } + + ret = syncop_fsetxattr (this, local->fd, dict, 0, NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "fsetxattr failed " + "key %s", GF_CS_OBJECT_DOWNLOADING); + ret = -1; + goto out; + } + /*this calling method is for per volume setting */ + ret = priv->stores->dlfop (frame, priv->stores->config); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "download failed" + ", remotepath: %s", local->remotepath); + + /*using dlfd as it is anonymous and have RDWR flag*/ + ret = syncop_ftruncate (FIRST_CHILD (this), local->dlfd, 0, + NULL, NULL, NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, -ret, + "ftruncate failed"); + } else { + gf_msg_debug (this->name, 0, "ftruncate succeed"); + } + + ret = -1; + goto out; + } else { + gf_msg (this->name, GF_LOG_INFO, 0, 0, "download success, path" + " : %s", local->remotepath); + + ret = syncop_fremovexattr (this, local->fd, + GF_CS_OBJECT_REMOTE, NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, -ret, + "removexattr failed, remotexattr"); + ret = -1; + goto out; + } else { + gf_msg_debug (this->name, 0, "fremovexattr success, " + "path : %s", local->remotepath); + } + + ret = syncop_fremovexattr (this, local->fd, + GF_CS_OBJECT_DOWNLOADING, NULL, + NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, -ret, + "removexattr failed, downloading xattr, path %s" + , local->remotepath); + ret = -1; + goto out; + } else { + gf_msg_debug (this->name, 0, "fremovexattr success" + " path %s", local->remotepath); + } + } + +out: + GF_FREE (sign_req); + + if (dict) + dict_unref (dict); + + if (fd) { + fd_unref (fd); + local->dlfd = NULL; + } + + if (retval) { + *retval = ret; + pthread_exit (retval); + } else { + pthread_exit (&ret); + } +} + +int +cs_download (call_frame_t *frame) +{ + int *retval = NULL; + int ret = 0; + pthread_t dthread; + cs_local_t *local = NULL; + xlator_t *this = NULL; + + local = frame->local; + this = frame->this; + + if (!local->remotepath) { + ret = -1; + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "remote path not" + " available. Check posix logs to resolve"); + goto out; + } + + ret = gf_thread_create (&dthread, NULL, &cs_download_task, + (void *)frame, "downloadthread"); + + pthread_join (dthread, (void **)&retval); + + ret = *retval; + +out: + if (retval) + GF_FREE (retval); + + return ret; +} + +int +cs_stat_check_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, struct iatt *stbuf, dict_t *xdata) +{ + cs_local_t *local = NULL; + call_stub_t *stub = NULL; + char *filepath = NULL; + int ret = 0; + inode_t *inode = NULL; + uint64_t val = 0; + + local = frame->local; + + if (op_ret == -1) { + local->op_ret = op_ret; + local->op_errno = op_errno; + gf_msg (this->name, GF_LOG_ERROR, 0, op_errno, + "stat check failed"); + goto err; + } else { + if (local->fd) + inode = local->fd->inode; + else + inode = local->loc.inode; + + if (!inode) { + local->op_ret = -1; + local->op_errno = EINVAL; + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "null inode " + "returned"); + goto err; + } + + ret = dict_get_uint64 (xdata, GF_CS_OBJECT_STATUS, &val); + if (ret == 0) { + if (val == GF_CS_ERROR) { + cs_inode_ctx_reset (this, inode); + local->op_ret = -1; + local->op_errno = EIO; + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "status = GF_CS_ERROR. failed to get " + " file state"); + goto err; + } else { + ret = __cs_inode_ctx_update (this, inode, val); + gf_msg_debug (this->name, 0, "status : %lu", + val); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "ctx update failed"); + local->op_ret = -1; + local->op_errno = ENOMEM; + goto err; + } + } + } else { + gf_msg_debug (this->name, 0, + "status not found in dict"); + local->op_ret = -1; + local->op_errno = ENOMEM; + goto err; + } + + ret = dict_get_str (xdata, GF_CS_OBJECT_REMOTE, &filepath); + if (filepath) { + gf_msg_debug (this->name, 0, "filepath returned %s", + filepath); + local->remotepath = gf_strdup (filepath); + if (!local->remotepath) { + local->op_ret = -1; + local->op_errno = ENOMEM; + goto err; + } + } else { + gf_msg_debug (this->name, 0, "NULL filepath"); + } + + local->op_ret = 0; + local->xattr_rsp = dict_ref (xdata); + memcpy (&local->stbuf, stbuf, sizeof (struct iatt)); + } + + stub = local->stub; + local->stub = NULL; + call_resume (stub); + + return 0; +err: + cs_inodelk_unlock (frame); + + cs_common_cbk (frame); + + return 0; +} + +int +cs_do_stat_check (call_frame_t *main_frame) +{ + cs_local_t *local = NULL; + xlator_t *this = NULL; + int ret = 0; + + local = main_frame->local; + this = main_frame->this; + + ret = dict_set_uint32 (local->xattr_req, GF_CS_OBJECT_REPAIR, 256); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "dict_set failed"); + goto err; + } + + if (local->fd) { + STACK_WIND (main_frame, cs_stat_check_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->fstat, local->fd, + local->xattr_req); + } else { + STACK_WIND (main_frame, cs_stat_check_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->stat, &local->loc, + local->xattr_req); + } + + return 0; + +err: + cs_inodelk_unlock (main_frame); + + cs_common_cbk (main_frame); + + return 0; +} + +void +cs_common_cbk (call_frame_t *frame) +{ + glusterfs_fop_t fop = -1; + cs_local_t *local = NULL; + + local = frame->local; + + fop = local->fop; + + /*Note: Only the failure case needs to be handled here. Since for + * successful stat check the fop will resume anyway. The unwind can + * happen from the fop_cbk and each cbk can unlock the inodelk in case + * a lock was taken before. The lock status can be stored in frame */ + + /* for failure case */ + + /*TODO: add other fops*/ + switch (fop) { + case GF_FOP_WRITE: + CS_STACK_UNWIND (writev, frame, local->op_ret, + local->op_errno, NULL, NULL, NULL); + break; + + case GF_FOP_SETXATTR: + CS_STACK_UNWIND (setxattr, frame, local->op_ret, + local->op_errno, NULL); + break; + case GF_FOP_READ: + CS_STACK_UNWIND (readv, frame, local->op_ret, + local->op_errno, NULL, 0, NULL, NULL, + NULL); + break; + case GF_FOP_FTRUNCATE: + CS_STACK_UNWIND (ftruncate, frame, local->op_ret, + local->op_errno, NULL, NULL, NULL); + break; + + + default: + break; + } + + return; +} + +int +cs_blocking_inodelk_cbk (call_frame_t *lock_frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ + cs_local_t *main_local = NULL; + call_frame_t *main_frame = NULL; + cs_local_t *lock_local = NULL; + + lock_local = lock_frame->local; + + main_frame = lock_local->main_frame; + main_local = main_frame->local; + + if (op_ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "inodelk failed"); + main_local->op_errno = op_errno; + main_local->op_ret = op_ret; + goto err; + } + + main_local->locked = _gf_true; + + cs_lock_wipe (lock_frame); + + cs_do_stat_check (main_frame); + + return 0; +err: + cs_common_cbk (main_frame); + + cs_lock_wipe (lock_frame); + + return 0; +} + +int +cs_build_loc (loc_t *loc, call_frame_t *frame) +{ + cs_local_t *local = NULL; + int ret = -1; + + local = frame->local; + + if (local->fd) { + loc->inode = inode_ref (local->fd->inode); + if (loc->inode) { + gf_uuid_copy (loc->gfid, loc->inode->gfid); + ret = 0; + goto out; + } else { + ret = -1; + goto out; + } + } else { + loc->inode = inode_ref (local->loc.inode); + if (loc->inode) { + gf_uuid_copy (loc->gfid, loc->inode->gfid); + ret = 0; + goto out; + } else { + ret = -1; + goto out; + } + } +out: + return ret; +} + +int +cs_blocking_inodelk (call_frame_t *parent_frame) +{ + call_frame_t *lock_frame = NULL; + cs_local_t *lock_local = NULL; + xlator_t *this = NULL; + struct gf_flock flock = {0,}; + int ret = 0; + + lock_frame = cs_lock_frame (parent_frame); + if (!lock_frame) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "insuffcient memory"); + goto err; + } + + this = parent_frame->this; + + lock_local = cs_local_init (this, lock_frame, NULL, NULL, 0); + if (!lock_local) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local init failed"); + goto err; + } + + lock_local->main_frame = parent_frame; + + flock.l_type = F_WRLCK; + + ret = cs_build_loc (&lock_local->loc, parent_frame); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "build_loc failed"); + goto err; + } + + STACK_WIND (lock_frame, cs_blocking_inodelk_cbk, FIRST_CHILD (this), + FIRST_CHILD (this)->fops->inodelk, CS_LOCK_DOMAIN, + &lock_local->loc, F_SETLKW, &flock, NULL); + + return 0; +err: + if (lock_frame) + cs_lock_wipe (lock_frame); + + return -1; +} + +int +locate_and_execute (call_frame_t *frame) +{ + int ret = 0; + + ret = cs_blocking_inodelk (frame); + + if (ret) + return -1; + else + return 0; +} + +int32_t +cs_resume_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, + off_t offset, dict_t *xattr_req) +{ + cs_local_t *local = NULL; + int ret = 0; + + local = frame->local; + + ret = cs_resume_postprocess (this, frame, loc->inode); + if (ret) { + goto unwind; + } + + cs_inodelk_unlock (frame); + + STACK_WIND (frame, cs_truncate_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, + loc, offset, local->xattr_req); + + return 0; + +unwind: + cs_inodelk_unlock (frame); + + cs_common_cbk (frame); + + return 0; +} + + +int32_t +cs_resume_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, + dict_t *dict, int32_t flags, dict_t *xdata) +{ + cs_local_t *local = NULL; + cs_inode_ctx_t *ctx = NULL; + gf_cs_obj_state state = GF_CS_ERROR; + + local = frame->local; + + __cs_inode_ctx_get (this, loc->inode, &ctx); + + state = __cs_get_file_state (this, loc->inode, ctx); + + if (state == GF_CS_ERROR) { + /* file is already remote */ + local->op_ret = -1; + local->op_errno = EINVAL; + gf_msg (this->name, GF_LOG_WARNING, 0, 0, + "file %s , could not figure file state", loc->path); + goto unwind; + } + + + if (state == GF_CS_REMOTE) { + /* file is already remote */ + local->op_ret = -1; + local->op_errno = EINVAL; + gf_msg (this->name, GF_LOG_WARNING, 0, EINVAL, + "file %s is already remote", loc->path); + goto unwind; + } + + if (state == GF_CS_DOWNLOADING) { + gf_msg (this->name, GF_LOG_WARNING, 0, 0, + " file is in downloading state."); + local->op_ret = -1; + local->op_errno = EINVAL; + goto unwind; + } + + STACK_WIND (frame, cs_setxattr_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, + local->xattr_req); + + return 0; +unwind: + cs_inodelk_unlock (frame); + + cs_common_cbk (frame); + + return 0; +} + + + + +gf_cs_obj_state +__cs_get_file_state (xlator_t *this, inode_t *inode, cs_inode_ctx_t *ctx) +{ + gf_cs_obj_state state = -1; + + if (!ctx) + return GF_CS_ERROR; + + LOCK (&inode->lock); + { + state = ctx->state; + } + UNLOCK (&inode->lock); + + return state; +} + +void +__cs_inode_ctx_get (xlator_t *this, inode_t *inode, cs_inode_ctx_t **ctx) +{ + uint64_t ctxint = 0; + int ret = 0; + + LOCK (&inode->lock); + { + ret = __inode_ctx_get (inode, this, &ctxint); + } + UNLOCK (&inode->lock); + + if (ret) + *ctx = NULL; + else + *ctx = (cs_inode_ctx_t *)ctxint; + + return; +} + +int +__cs_inode_ctx_update (xlator_t *this, inode_t *inode, uint64_t val) +{ + cs_inode_ctx_t *ctx = NULL; + uint64_t ctxint = 0; + int ret = 0; + + LOCK (&inode->lock); + { + ret = __inode_ctx_get (inode, this, &ctxint); + if (ret) { + ctx = GF_CALLOC (1, sizeof (*ctx), gf_cs_mt_cs_inode_ctx_t); + if (!ctx) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "ctx allocation failed"); + ret = -1; + goto out; + } + + ctx->state = val; + + ctxint = (uint64_t) ctx; + + ret = __inode_ctx_set (inode, this, &ctxint); + if (ret) { + GF_FREE (ctx); + goto out; + } + } else { + ctx = (cs_inode_ctx_t *) ctxint; + + ctx->state = val; + } + + } + +out: + UNLOCK (&inode->lock); + + return ret; +} + +int +cs_inode_ctx_reset (xlator_t *this, inode_t *inode) +{ + cs_inode_ctx_t *ctx = NULL; + uint64_t ctxint = 0; + + inode_ctx_del (inode, this, &ctxint); + if (!ctxint) { + return 0; + } + + ctx = (cs_inode_ctx_t *)ctxint; + + GF_FREE (ctx); + return 0; +} + +int +cs_resume_postprocess (xlator_t *this, call_frame_t *frame, inode_t *inode) +{ + cs_local_t *local = NULL; + gf_cs_obj_state state = -1; + cs_inode_ctx_t *ctx = NULL; + int ret = 0; + + local = frame->local; + if (!local) { + ret = -1; + goto out; + } + + __cs_inode_ctx_get (this, inode, &ctx); + + state = __cs_get_file_state (this, inode, ctx); + if (state == GF_CS_ERROR) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "status is GF_CS_ERROR." + " Aborting write"); + local->op_ret = -1; + local->op_errno = EREMOTE; + ret = -1; + goto out; + } + + if (state == GF_CS_REMOTE || state == GF_CS_DOWNLOADING) { + gf_msg_debug (this->name, 0, "status is %d", state); + ret = cs_download (frame); + if (ret == 0) { + gf_msg_debug (this->name, 0, "Winding for Final Write"); + } else { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + " download failed, unwinding writev"); + local->op_ret = -1; + local->op_errno = EREMOTE; + ret = -1; + } + } +out: + return ret; +} +int32_t +cs_fdctx_to_dict (xlator_t *this, + fd_t *fd, + dict_t *dict) +{ + return 0; +} + + +int32_t +cs_inode (xlator_t *this) +{ + return 0; +} + + +int32_t +cs_inode_to_dict (xlator_t *this, + dict_t *dict) +{ + return 0; +} + + +int32_t +cs_history (xlator_t *this) +{ + return 0; +} + + +int32_t +cs_fd (xlator_t *this) +{ + return 0; +} + + +int32_t +cs_fd_to_dict (xlator_t *this, + dict_t *dict) +{ + return 0; +} + + +int32_t +cs_fdctx (xlator_t *this, + fd_t *fd) +{ + return 0; +} + + +int32_t +cs_inodectx (xlator_t *this, + inode_t *ino) +{ + return 0; +} + + +int32_t +cs_inodectx_to_dict (xlator_t *this, + inode_t *ino, + dict_t *dict) +{ + return 0; +} + + +int32_t +cs_priv_to_dict (xlator_t *this, + dict_t *dict) +{ + return 0; +} + + +int32_t +cs_priv (xlator_t *this) +{ + return 0; +} + +int +cs_notify (xlator_t *this, int event, void *data, ...) +{ + return default_notify (this, event, data); +} + + +struct xlator_fops cs_fops = { + .stat = cs_stat, + .readdirp = cs_readdirp, + .truncate = cs_truncate, + .seek = cs_seek, + .statfs = cs_statfs, + .fallocate = cs_fallocate, + .discard = cs_discard, + .getxattr = cs_getxattr, + .writev = cs_writev, + .setxattr = cs_setxattr, + .fgetxattr = cs_fgetxattr, + .lookup = cs_lookup, + .fsetxattr = cs_fsetxattr, + .readv = cs_readv, + .ftruncate = cs_ftruncate, + .rchecksum = cs_rchecksum, + .unlink = cs_unlink, + .open = cs_open, + .fstat = cs_fstat, + .zerofill = cs_zerofill, +}; + +struct xlator_cbks cs_cbks = { +}; + +struct xlator_dumpops cs_dumpops = { + .fdctx_to_dict = cs_fdctx_to_dict, + .inode = cs_inode, + .inode_to_dict = cs_inode_to_dict, + .history = cs_history, + .fd = cs_fd, + .fd_to_dict = cs_fd_to_dict, + .fdctx = cs_fdctx, + .inodectx = cs_inodectx, + .inodectx_to_dict = cs_inodectx_to_dict, + .priv_to_dict = cs_priv_to_dict, + .priv = cs_priv, +}; + +struct volume_options cs_options[] = { + { .key = {NULL} }, +}; + +xlator_api_t xlator_api = { + .init = cs_init, + .fini = cs_fini, + .notify = cs_notify, + .reconfigure = cs_reconfigure, + .mem_acct_init = cs_mem_acct_init, + .dumpops = &cs_dumpops, + .fops = &cs_fops, + .cbks = &cs_cbks, + .options = cs_options, + .identifier = "cloudsync", +}; diff --git a/xlators/features/cloudsync/src/cloudsync.h b/xlators/features/cloudsync/src/cloudsync.h new file mode 100644 index 00000000000..bd54d760864 --- /dev/null +++ b/xlators/features/cloudsync/src/cloudsync.h @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com> + * This file is part of GlusterFS. + * + * This file is licensed to you under your choice of the GNU Lesser + * General Public License, version 3 or any later version (LGPLv3 or + * later), or the GNU General Public License, version 2 (GPLv2), in all + * cases as published by the Free Software Foundation. + */ + + +#ifndef __CLOUDSYNC_H__ +#define __CLOUDSYNC_H__ + +#include "glusterfs.h" +#include "xlator.h" +#include "defaults.h" +#include "syncop.h" +#include "call-stub.h" +#include "cloudsync-common.h" +#include "cloudsync-autogen-fops.h" + + +#define CS_LOCK_DOMAIN "cs.protect.file.stat" +typedef struct cs_dlstore { + off_t off; + struct iovec *vector; + int32_t count; + struct iobref *iobref; + uint32_t flags; +} cs_dlstore; + +typedef struct cs_inode_ctx { + gf_cs_obj_state state; +} cs_inode_ctx_t; + +cs_local_t * +cs_local_init (xlator_t *this, call_frame_t *frame, loc_t *loc, fd_t *fd, + glusterfs_fop_t fop); + +int +locate_and_execute (call_frame_t *frame); + + +int32_t +cs_resume_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, + dict_t *dict, int32_t flags, dict_t *xdata); + +int32_t +cs_inodelk_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, + int32_t op_errno, dict_t *xdata); + +size_t +cs_write_callback (void *lcurlbuf, size_t size, size_t nitems, void *frame); + +void +cs_common_cbk (call_frame_t *frame); + +gf_boolean_t +cs_is_file_remote (struct iatt *stbuf, dict_t *xattr); + +int32_t +cs_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata); +int +cs_build_loc (loc_t *loc, call_frame_t *frame); + +int +cs_blocking_inodelk_cbk (call_frame_t *lock_frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata); + +int cs_read_authinfo(xlator_t *this); + +int +__cs_inode_ctx_update (xlator_t *this, inode_t *inode, uint64_t val); + +int +cs_inode_ctx_reset (xlator_t *this, inode_t *inode); + +void +__cs_inode_ctx_get (xlator_t *this, inode_t *inode, cs_inode_ctx_t **ctx); + +gf_cs_obj_state +__cs_get_file_state (xlator_t *this, inode_t *inode, cs_inode_ctx_t *ctx); + +int +cs_inodelk_unlock (call_frame_t *main_frame); + +int +cs_resume_postprocess (xlator_t *this, call_frame_t *frame, inode_t *inode); + +int32_t +cs_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata); +int32_t +cs_resume_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, + off_t offset, dict_t *xattr_req); +#endif /* __CLOUDSYNC_H__ */ + |