diff options
| -rw-r--r-- | configure.ac | 2 | ||||
| -rw-r--r-- | glusterfs.spec.in | 1 | ||||
| -rw-r--r-- | libglusterfs/src/glfs-message-id.h | 1 | ||||
| -rw-r--r-- | xlators/features/Makefile.am | 7 | ||||
| -rw-r--r-- | xlators/features/sdfs/Makefile.am | 3 | ||||
| -rw-r--r-- | xlators/features/sdfs/src/Makefile.am | 17 | ||||
| -rw-r--r-- | xlators/features/sdfs/src/sdfs-messages.h | 69 | ||||
| -rw-r--r-- | xlators/features/sdfs/src/sdfs.c | 1351 | ||||
| -rw-r--r-- | xlators/features/sdfs/src/sdfs.h | 49 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volgen.c | 26 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volume-set.c | 10 | 
11 files changed, 1532 insertions, 4 deletions
diff --git a/configure.ac b/configure.ac index 81c56d4e68f..ed84c661797 100644 --- a/configure.ac +++ b/configure.ac @@ -152,6 +152,8 @@ AC_CONFIG_FILES([Makefile                  xlators/features/marker/src/Makefile                  xlators/features/selinux/Makefile                  xlators/features/selinux/src/Makefile +                xlators/features/sdfs/Makefile +                xlators/features/sdfs/src/Makefile                  xlators/features/read-only/Makefile                  xlators/features/read-only/src/Makefile                  xlators/features/compress/Makefile diff --git a/glusterfs.spec.in b/glusterfs.spec.in index 2dbd013c1d9..f7f8cbae6b3 100644 --- a/glusterfs.spec.in +++ b/glusterfs.spec.in @@ -1256,6 +1256,7 @@ exit 0       %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/arbiter.so       %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/bit-rot.so       %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/bitrot-stub.so +     %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/sdfs.so  %if ( 0%{!?_without_tiering:1} )       %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/changetimerecorder.so       %{_libdir}/libgfdb.so.* diff --git a/libglusterfs/src/glfs-message-id.h b/libglusterfs/src/glfs-message-id.h index 241913d5b45..6a7d07bef84 100644 --- a/libglusterfs/src/glfs-message-id.h +++ b/libglusterfs/src/glfs-message-id.h @@ -85,6 +85,7 @@ enum _msgid_comp {          GLFS_MSGID_COMP(NLC,              1),          GLFS_MSGID_COMP(SL,               1),          GLFS_MSGID_COMP(HAM,              1), +        GLFS_MSGID_COMP(SDFS,              1),  /* --- new segments for messages goes above this line --- */ diff --git a/xlators/features/Makefile.am b/xlators/features/Makefile.am index f7791b0cc32..1e24679903d 100644 --- a/xlators/features/Makefile.am +++ b/xlators/features/Makefile.am @@ -1,6 +1,5 @@  SUBDIRS = locks quota read-only quiesce marker index barrier \ -	  arbiter compress changelog changetimerecorder \ -	  gfid-access $(GLUPY_SUBDIR) upcall snapview-client snapview-server \ -	  trash shard bit-rot leases selinux - +          arbiter compress changelog changetimerecorder \ +          gfid-access $(GLUPY_SUBDIR) upcall snapview-client snapview-server \ +          trash shard bit-rot leases selinux sdfs  CLEANFILES = diff --git a/xlators/features/sdfs/Makefile.am b/xlators/features/sdfs/Makefile.am new file mode 100644 index 00000000000..a985f42a877 --- /dev/null +++ b/xlators/features/sdfs/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES = diff --git a/xlators/features/sdfs/src/Makefile.am b/xlators/features/sdfs/src/Makefile.am new file mode 100644 index 00000000000..ec9ed804b32 --- /dev/null +++ b/xlators/features/sdfs/src/Makefile.am @@ -0,0 +1,17 @@ +xlator_LTLIBRARIES = sdfs.la +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features + +sdfs_la_LDFLAGS = -module $(GF_XLATOR_DEFAULT_LDFLAGS) + +sdfs_la_SOURCES = sdfs.c +sdfs_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +noinst_HEADERS = sdfs.h sdfs-messages.h $(top_builddir)/xlators/lib/src/libxlator.h + +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ +                                 -I$(top_srcdir)/xlators/lib/src \ +        -I$(top_srcdir)/rpc/xdr/src/ -I$(top_builddir)/rpc/xdr/src/ + +AM_CFLAGS = -Wall -fno-strict-aliasing $(GF_CFLAGS) + +CLEANFILES = diff --git a/xlators/features/sdfs/src/sdfs-messages.h b/xlators/features/sdfs/src/sdfs-messages.h new file mode 100644 index 00000000000..6c7a9d90667 --- /dev/null +++ b/xlators/features/sdfs/src/sdfs-messages.h @@ -0,0 +1,69 @@ +/* + Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. + */ + +#ifndef _DFS_MESSAGES_H_ +#define _DFS_MESSAGES_H_ + +#include "glfs-message-id.h" + +/* file bit-rot-bitd-messages.h + * brief SDFS log-message IDs and their descriptions + */ + +/* NOTE: Rules for message additions + * 1) Each instance of a message is _better_ left with a unique message ID, even + *    if the message format is the same. Reasoning is that, if the message + *    format needs to change in one instance, the other instances are not + *    impacted or the new change does not change the ID of the instance being + *    modified. + * 2) Addition of a message, + *       - Should increment the GLFS_NUM_MESSAGES + *       - Append to the list of messages defined, towards the end + *       - Retain macro naming as glfs_msg_X (for redability across developers) + * NOTE: Rules for message format modifications + * 3) Check acorss the code if the message ID macro in question is reused + *    anywhere. If reused then then the modifications should ensure correctness + *    everywhere, or needs a new message ID as (1) above was not adhered to. If + *    not used anywhere, proceed with the required modification. + * NOTE: Rules for message deletion + * 4) Check (3) and if used anywhere else, then cannot be deleted. If not used + *    anywhere, then can be deleted, but will leave a hole by design, as + *    addition rules specify modification to the end of the list and not filling + *    holes. + */ + +#define GLFS_SDFS_BASE                     GLFS_MSGID_COMP_SDFS +#define GLFS_SDFS_NUM_MESSAGES             2 +#define GLFS_MSGID_END                    (GLFS_SDFS_BASE + \ +                                           GLFS_SDFS_NUM_MESSAGES + 1) +/* Messaged with message IDs */ +#define glfs_msg_start_x  GLFS_DFS_BASE, "Invalid: Start of messages" +/*------------*/ + + +#define SDFS_MSG_ENTRYLK_ERROR             (GLFS_SDFS_BASE + 1) +/*! + * @messageid + * @diagnosis + * @recommendedaction + * + */ + +#define SDFS_MSG_MKDIR_ERROR               (GLFS_SDFS_BASE + 2) +/*! + * @messageid + * @diagnosis + * @recommendedaction + * + */ +/*------------*/ + +#define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages" +#endif /* !_SDFS_MESSAGES_H_ */ diff --git a/xlators/features/sdfs/src/sdfs.c b/xlators/features/sdfs/src/sdfs.c new file mode 100644 index 00000000000..3b70dce5d27 --- /dev/null +++ b/xlators/features/sdfs/src/sdfs.c @@ -0,0 +1,1351 @@ +/* +   Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ +#include <libgen.h> +#include "sdfs.h" + +static int +sdfs_frame_return (call_frame_t *frame) +{ +        sdfs_local_t *local             = NULL; + +        if (!frame) +                return -1; + +        local = frame->local; + +        return GF_ATOMIC_DEC (local->call_cnt); +} + +static void +sdfs_lock_free (sdfs_entry_lock_t *entrylk) +{ +        if (entrylk == NULL) +                goto out; + +        loc_wipe (&entrylk->parent_loc); +        GF_FREE (entrylk->basename); + +out: +        return; +} + +static void +sdfs_lock_array_free (sdfs_lock_t *lock) +{ +        sdfs_entry_lock_t       *entrylk = NULL; +        int                      i       = 0; + +        if (lock == NULL) +                goto out; + +        for (i = 0; i < lock->lock_count; i++) { +                entrylk = &lock->entrylk[i]; +                sdfs_lock_free (entrylk); +        } + +out: +        return; +} + +static void +sdfs_local_cleanup (sdfs_local_t *local) +{ +        if (!local) +                return; + +        loc_wipe (&local->loc); +        loc_wipe (&local->parent_loc); + +        if (local->stub) { +                call_stub_destroy (local->stub); +                local->stub = NULL; +        } + +        sdfs_lock_array_free (local->lock); +        GF_FREE (local->lock); + +        mem_put (local); +} + +static int +sdfs_build_parent_loc (loc_t *parent, loc_t *child) +{ +        int     ret     = -1; +        char    *path   = NULL; + +        if (!child->parent) { +                goto out; +        } +        parent->inode = inode_ref (child->parent); +        path = gf_strdup (child->path); +        if (!path) { +                ret = -ENOMEM; +                goto out; +        } + +        parent->path = dirname(path); +        if (!parent->path) { +                goto out; +        } + +        gf_uuid_copy (parent->gfid, child->pargfid); +        return 0; + +out: +        GF_FREE (path); +        return ret; +} + +static sdfs_local_t * +sdfs_local_init (call_frame_t *frame, xlator_t *this) +{ +        sdfs_local_t *local = NULL; + +        local = mem_get0 (this->local_pool); +        if (!local) +                goto out; + +        frame->local = local; +out: +        return local; +} + +static int +sdfs_get_new_frame (call_frame_t *frame, loc_t *loc, call_frame_t **new_frame) +{ +        int           ret       = -1; +        sdfs_local_t *local     = NULL; +        client_t     *client    = NULL; + +        *new_frame = copy_frame (frame); +        if (!*new_frame) { +                goto err; +        } + +        client = frame->root->client; +        gf_client_ref (client); +        (*new_frame)->root->client = client; +        local = sdfs_local_init (*new_frame, THIS); +        if (!local) { +                goto err; +        } + +        local->main_frame = frame; + +        ret = sdfs_build_parent_loc (&local->parent_loc, loc); +        if (ret) { +                goto err; +        } + +        ret = loc_copy (&local->loc, loc); +        if (ret == -1) { +                goto err; +        } + +        ret = 0; +err: +        if (ret == -1) { +                SDFS_STACK_DESTROY (frame); +        } +        return ret; +} + +int +sdfs_entrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                  int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        sdfs_local_t    *local          = NULL; +        call_stub_t     *stub           = NULL; + +        local = frame->local; + +        local->op_ret = op_ret; +        local->op_errno = op_errno; + +        if (local->stub) { +                stub = local->stub; +                local->stub = NULL; +                call_resume (stub); +        } else { +                if (op_ret < 0) +                        gf_msg (this->name, GF_LOG_ERROR, 0, +                                SDFS_MSG_ENTRYLK_ERROR, +                                "Unlocking entry lock failed for %s", +                                local->loc.name); + +                SDFS_STACK_DESTROY (frame); +        } + +        return 0; +} + +int +sdfs_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                int32_t op_ret, int32_t op_errno, inode_t *inode, +                struct iatt *stbuf, struct iatt *preparent, +                struct iatt *postparent, dict_t *xdata) +{ +        sdfs_local_t *local = NULL; + +        local = frame->local; + +        STACK_UNWIND_STRICT (mkdir, local->main_frame, op_ret, op_errno, inode, +                             stbuf, preparent, postparent, xdata); + +        local->main_frame = NULL; +        STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata); +        return 0; +} + +int +sdfs_mkdir_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, +                   mode_t mode, mode_t umask, dict_t *xdata) +{ +        sdfs_local_t    *local                  = NULL; +        char             gfid[GF_UUID_BUF_SIZE] = {0}; +        int              op_errno               = -1; + +        local = frame->local; + +        gf_uuid_unparse(loc->pargfid, gfid); + +        if (local->op_ret < 0) { +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        SDFS_MSG_ENTRYLK_ERROR, +                        "Acquiring entry lock failed for directory %s " +                        "with parent gfid %s", local->loc.name, gfid); +                op_errno = local->op_errno; +                goto err; +        } + +        STACK_WIND (frame, sdfs_mkdir_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->mkdir, loc, +                    mode, umask, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (mkdir, local->main_frame, -1, op_errno, +                             NULL, NULL, NULL, NULL, NULL); + +        local->main_frame = NULL; +        SDFS_STACK_DESTROY (frame); +        return 0; +} + +int +sdfs_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, +            mode_t umask, dict_t *xdata) +{ +        sdfs_local_t    *local      = NULL; +        call_frame_t    *new_frame  = NULL; +        call_stub_t     *stub       = NULL; +        int              op_errno   = 0; + +        if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) { +                op_errno = ENOMEM; +                goto err; +        } + +        stub = fop_mkdir_stub (new_frame, sdfs_mkdir_helper, loc, mode, +                               umask, xdata); +        if (!stub) { +                op_errno = ENOMEM; +                goto err; +        } + +        ((sdfs_local_t *)new_frame->local)->stub = stub; + +        STACK_WIND (new_frame, sdfs_entrylk_cbk, +                    FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (mkdir, frame, -1, op_errno, NULL, NULL, +                             NULL, NULL, NULL); + +        if (new_frame) +                SDFS_STACK_DESTROY (new_frame); + +        return 0; +} + +int +sdfs_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                int32_t op_ret, int32_t op_errno, struct iatt *preparent, +                struct iatt *postparent, dict_t *xdata) +{ +        sdfs_local_t *local = NULL; + +        local = frame->local; + +        STACK_UNWIND_STRICT (rmdir, local->main_frame, op_ret, op_errno, +                             preparent, postparent, xdata); + +        local->main_frame = NULL; +        STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata); +        return 0; +} + +int +sdfs_rmdir_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, +                   int flags, dict_t *xdata) +{ +        sdfs_local_t    *local                  = NULL; +        char             gfid[GF_UUID_BUF_SIZE] = {0}; + +        local = frame->local; + +        gf_uuid_unparse(loc->pargfid, gfid); + +        if (local->op_ret < 0) { +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        SDFS_MSG_ENTRYLK_ERROR, +                        "Acquiring entry lock failed for directory %s " +                        "with parent gfid %s", local->loc.name, gfid); +                goto err; +        } + +        STACK_WIND (frame, sdfs_rmdir_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->rmdir, loc, +                    flags, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (rmdir, local->main_frame, -1, local->op_errno, +                             NULL, NULL, NULL); + +        local->main_frame = NULL; +        SDFS_STACK_DESTROY (frame); +        return 0; +} + +int +sdfs_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int flags, +            dict_t *xdata) +{ +        sdfs_local_t    *local      = NULL; +        call_frame_t    *new_frame  = NULL; +        call_stub_t     *stub       = NULL; +        int              op_errno   = 0; + +        if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) { +                op_errno = ENOMEM; +                goto err; +        } + +        stub = fop_rmdir_stub (new_frame, sdfs_rmdir_helper, loc, flags, xdata); +        if (!stub) { +                op_errno = ENOMEM; +                goto err; +        } + +        ((sdfs_local_t *)new_frame->local)->stub = stub; + +        STACK_WIND (new_frame, sdfs_entrylk_cbk, +                    FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (rmdir, frame, -1, op_errno, NULL, NULL, +                             NULL); + +        if (new_frame) +                SDFS_STACK_DESTROY (new_frame); + +        return 0; +} + +int +sdfs_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                 int32_t op_ret, int32_t op_errno, +                 fd_t *fd, inode_t *inode, struct iatt *stbuf, +                 struct iatt *preparent, struct iatt *postparent, +                 dict_t *xdata) +{ +        sdfs_local_t *local = NULL; + +        local = frame->local; + +        STACK_UNWIND_STRICT (create, local->main_frame, op_ret, op_errno, fd, +                             inode, stbuf, preparent, postparent, xdata); + +        local->main_frame = NULL; +        STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata); +        return 0; +} + +int +sdfs_create_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, +                    int32_t flags, mode_t mode, mode_t umask, fd_t *fd, +                    dict_t *xdata) +{ +        sdfs_local_t    *local                  = NULL; +        char             gfid[GF_UUID_BUF_SIZE] = {0}; + +        local = frame->local; + +        gf_uuid_unparse(loc->pargfid, gfid); + +        if (local->op_ret < 0) { +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        SDFS_MSG_ENTRYLK_ERROR, +                        "Acquiring entry lock failed for directory %s " +                        "with parent gfid %s", local->loc.name, gfid); +                goto err; +        } + +        STACK_WIND (frame, sdfs_create_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->create, loc, flags, +                    mode, umask, fd, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (create, local->main_frame, -1, local->op_errno, +                             NULL, NULL, NULL, NULL, NULL, NULL); + +        local->main_frame = NULL; +        SDFS_STACK_DESTROY (frame); +        return 0; +} + +int +sdfs_create (call_frame_t *frame, xlator_t *this, loc_t *loc, +             int32_t flags, mode_t mode, mode_t umask, +             fd_t *fd, dict_t *xdata) +{ +        sdfs_local_t    *local      = NULL; +        call_frame_t    *new_frame  = NULL; +        call_stub_t     *stub       = NULL; +        int              op_errno   = 0; + +        if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) { +                op_errno = ENOMEM; +                goto err; +        } + +        stub = fop_create_stub (new_frame, sdfs_create_helper, loc, +                                flags, mode, umask, fd, xdata); +        if (!stub) { +                op_errno = ENOMEM; +                goto err; +        } + +        ((sdfs_local_t *)new_frame->local)->stub = stub; + +        STACK_WIND (new_frame, sdfs_entrylk_cbk, +                    FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (create, frame, -1, op_errno, NULL, NULL, +                             NULL, NULL, NULL, NULL); + +        if (new_frame) +                SDFS_STACK_DESTROY (new_frame); + +        return 0; +} + +int +sdfs_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                 int32_t op_ret, int32_t op_errno, struct iatt *preparent, +                 struct iatt *postparent, dict_t *xdata) +{ +        sdfs_local_t *local = NULL; + +        local = frame->local; + +        STACK_UNWIND_STRICT (unlink, local->main_frame, op_ret, op_errno, +                             preparent, postparent, xdata); + +        local->main_frame = NULL; +        STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata); +        return 0; +} + +int +sdfs_unlink_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, +                    int flags, dict_t *xdata) +{ +        sdfs_local_t    *local                  = NULL; +        char             gfid[GF_UUID_BUF_SIZE] = {0}; + +        local = frame->local; + +        gf_uuid_unparse(loc->pargfid, gfid); + +        if (local->op_ret < 0) { +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        SDFS_MSG_ENTRYLK_ERROR, +                        "Acquiring entry lock failed for directory %s " +                        "with parent gfid %s", local->loc.name, gfid); +                goto err; +        } + +        STACK_WIND (frame, sdfs_unlink_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->unlink, loc, flags, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (unlink, local->main_frame, -1, local->op_errno, +                             NULL, NULL, NULL); + +        local->main_frame = NULL; +        SDFS_STACK_DESTROY (frame); +        return 0; +} + +int +sdfs_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, +             int flags, dict_t *xdata) +{ +        sdfs_local_t    *local      = NULL; +        call_frame_t    *new_frame  = NULL; +        call_stub_t     *stub       = NULL; +        int              op_errno   = 0; + +        if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) { +                op_errno = ENOMEM; +                goto err; +        } + +        stub = fop_unlink_stub (new_frame, sdfs_unlink_helper, loc, +                                flags, xdata); +        if (!stub) { +                op_errno = ENOMEM; +                goto err; +        } + +        ((sdfs_local_t *)new_frame->local)->stub = stub; + +        STACK_WIND (new_frame, sdfs_entrylk_cbk, +                    FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (unlink, frame, -1, op_errno, NULL, NULL, +                             NULL); + +        if (new_frame) +                SDFS_STACK_DESTROY (new_frame); + +        return 0; +} + +int +sdfs_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                  int32_t op_ret, int32_t op_errno, inode_t *inode, +                  struct iatt *stbuf, struct iatt *preparent, +                  struct iatt *postparent, dict_t *xdata) +{ +        sdfs_local_t *local = NULL; + +        local = frame->local; + +        STACK_UNWIND_STRICT (link, local->main_frame, op_ret, op_errno, inode, +                             stbuf, preparent, postparent, xdata); + +        local->main_frame = NULL; +        STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata); +        return 0; +} + +int +sdfs_symlink_helper (call_frame_t *frame, xlator_t *this, +                     const char *linkname, loc_t *loc, mode_t umask, +                     dict_t *xdata) +{ +        sdfs_local_t    *local                  = NULL; +        char             gfid[GF_UUID_BUF_SIZE] = {0}; + +        local = frame->local; + +        gf_uuid_unparse(loc->pargfid, gfid); + +        if (local->op_ret < 0) { +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        SDFS_MSG_ENTRYLK_ERROR, +                        "Acquiring entry lock failed for directory %s " +                        "with parent gfid %s", local->loc.name, gfid); +                goto err; +        } + +        STACK_WIND (frame, sdfs_symlink_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->symlink, linkname, loc, +                    umask, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (link, local->main_frame, -1, local->op_errno, +                             NULL, NULL, NULL, NULL, NULL); + +        local->main_frame = NULL; +        SDFS_STACK_DESTROY (frame); +        return 0; +} + +int +sdfs_symlink (call_frame_t *frame, xlator_t *this, const char *linkname, +              loc_t *loc, mode_t umask, dict_t *xdata) +{ +        sdfs_local_t    *local      = NULL; +        call_frame_t    *new_frame  = NULL; +        call_stub_t     *stub       = NULL; +        int              op_errno   = 0; + +        if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) { +                op_errno = ENOMEM; +                goto err; +        } + +        stub = fop_symlink_stub (new_frame, sdfs_symlink_helper, linkname, loc, +                                 umask, xdata); +        if (!stub) { +                op_errno = ENOMEM; +                goto err; +        } + +        ((sdfs_local_t *)new_frame->local)->stub = stub; + +        STACK_WIND (new_frame, sdfs_entrylk_cbk, +                    FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (link, frame, -1, op_errno, NULL, NULL, +                             NULL, NULL, NULL); + +        if (new_frame) +                SDFS_STACK_DESTROY (new_frame); + +        return 0; +} + +int +sdfs_common_entrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                         int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        sdfs_local_t    *local          = NULL; +        int              this_call_cnt  = 0; +        int              lk_index       = 0; +        sdfs_lock_t     *locks          = NULL; +        call_stub_t     *stub           = NULL; + +        local = frame->local; +        locks = local->lock; +        lk_index = (long) cookie; + +        if (op_ret < 0) { +                local->op_ret = op_ret; +                local->op_errno = op_errno; +        } else { +                locks->entrylk->locked[lk_index] = _gf_true; +        } + +        this_call_cnt = sdfs_frame_return (frame); +        if (this_call_cnt > 0) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "As there are more callcnt (%d) returning without WIND", +                        this_call_cnt); +                return 0; +        } + +        if (local->stub) { +                stub = local->stub; +                local->stub = NULL; +                call_resume (stub); +        } else { +                if (local->op_ret < 0) +                        gf_msg (this->name, GF_LOG_ERROR, 0, +                                SDFS_MSG_ENTRYLK_ERROR, +                                "unlocking entry lock failed "); +                SDFS_STACK_DESTROY (frame); +        } + +        return 0; +} + +int +sdfs_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +               int32_t op_ret, int32_t op_errno, +               inode_t *inode, struct iatt *stbuf, struct iatt *preparent, +               struct iatt *postparent, dict_t *xdata) +{ +        sdfs_local_t             *local          = NULL; +        sdfs_lock_t              *lock           = NULL; +        int                      i              = 0; +        int lock_count = 0; + +        local = frame->local; +        lock  = local->lock; + +        STACK_UNWIND_STRICT (link, local->main_frame, op_ret, op_errno, inode, +                             stbuf, preparent, postparent, xdata); + +        local->main_frame = NULL; +        lock_count = lock->lock_count; +        for (i = 0; i < lock_count; i++) { +                STACK_WIND_COOKIE (frame, sdfs_common_entrylk_cbk, +                                   (void *)(long) i, +                                   FIRST_CHILD (this), +                                   FIRST_CHILD(this)->fops->entrylk, +                                   this->name, &lock->entrylk[i].parent_loc, +                                   lock->entrylk[i].basename, +                                   ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata); +        } + +        return 0; +} + +int +sdfs_link_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc, +                  loc_t *newloc, dict_t *xdata) +{ +        sdfs_local_t            *local                  = NULL; +        sdfs_lock_t             *locks                  = NULL; +        gf_boolean_t             stack_destroy          = _gf_true; +        int                      lock_count             = 0; +        int                      i                      = 0; + +        local = frame->local; +        locks = local->lock; + +        if (local->op_ret < 0) { +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        SDFS_MSG_ENTRYLK_ERROR, +                        "Acquiring entry lock failed"); +                goto err; +        } + +        STACK_WIND (frame, sdfs_link_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->link, oldloc, newloc, +                    xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (link, local->main_frame, -1, local->op_errno, +                             NULL, NULL, NULL, NULL, NULL); + +        local->main_frame = NULL; +        for (i = 0; i < locks->lock_count && locks->entrylk->locked[i]; i++) { +                lock_count++; +        } +        GF_ATOMIC_INIT (local->call_cnt, lock_count); + +        for (i = 0; i < lock_count; i++) { +                if (!locks->entrylk->locked[i]) { +                        lock_count++; +                        continue; +                } + +                stack_destroy = _gf_false; +                STACK_WIND (frame, sdfs_common_entrylk_cbk, +                            FIRST_CHILD (this), +                            FIRST_CHILD(this)->fops->entrylk, +                            this->name, &locks->entrylk[i].parent_loc, +                            locks->entrylk[i].basename, +                            ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata); +        } + +        if (stack_destroy) +                SDFS_STACK_DESTROY (frame); + +        return 0; +} + +static int +sdfs_init_entry_lock (sdfs_entry_lock_t *lock, loc_t *loc) +{ +        int ret = 0; + +        ret = sdfs_build_parent_loc (&lock->parent_loc, loc); +        if (ret) +                return -1; + +        lock->basename = gf_strdup (loc->name); +        if (!lock->basename) +                return -1; + +        return 0; +} + +int +sdfs_entry_lock_cmp (const void *l1, const void *l2) +{ +        const sdfs_entry_lock_t        *r1      = l1; +        const sdfs_entry_lock_t        *r2      = l2; +        int                             ret     = 0; +        uuid_t                          gfid1   = {0}; +        uuid_t                          gfid2   = {0}; + +        loc_gfid ((loc_t *)&r1->parent_loc, gfid1); +        loc_gfid ((loc_t *)&r2->parent_loc, gfid2); +        ret = gf_uuid_compare (gfid1, gfid2); +        /*Entrylks with NULL basename are the 'smallest'*/ +        if (ret == 0) { +                if (!r1->basename) +                        return -1; +                if (!r2->basename) +                        return 1; +                ret = strcmp (r1->basename, r2->basename); +        } + +        if (ret <= 0) +                return -1; +        else +                return 1; +} + +int +sdfs_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, +           loc_t *newloc, dict_t *xdata) +{ +        sdfs_local_t            *local      = NULL; +        call_frame_t            *new_frame  = NULL; +        call_stub_t             *stub       = NULL; +        sdfs_lock_t             *lock       = NULL; +        client_t                *client     = NULL; +        int                      ret        = 0; +        int                      op_errno   = 0; + +        new_frame = copy_frame (frame); +        if (!new_frame) { +                op_errno = ENOMEM; +                goto err; +        } + +        gf_client_ref (client); +        new_frame->root->client = client; +        local = sdfs_local_init (new_frame, this); +        if (!local) { +                op_errno = ENOMEM; +                goto err; +        } + +        local->main_frame = frame; + + +        lock = GF_CALLOC (1, sizeof (*lock), gf_common_mt_char); +        if (!lock) +                goto err; + +        local->lock = lock; + +        ret = sdfs_init_entry_lock (&lock->entrylk[0], newloc); +        if (ret) +                goto err; + +        ++lock->lock_count; + +        local->lock = lock; +        GF_ATOMIC_INIT (local->call_cnt, lock->lock_count); + +        loc_copy (&local->loc, newloc); +        if (ret == -1) { +                op_errno = ENOMEM; +                goto err; +        } + +        stub = fop_link_stub (new_frame, sdfs_link_helper, oldloc, +                              newloc, xdata); +        if (!stub) { +                op_errno = ENOMEM; +                goto err; +        } + +        local->stub = stub; + +        STACK_WIND_COOKIE (new_frame, sdfs_common_entrylk_cbk, +                           0, FIRST_CHILD (this), +                           FIRST_CHILD(this)->fops->entrylk, +                           this->name, &lock->entrylk[0].parent_loc, +                           lock->entrylk[0].basename, ENTRYLK_LOCK, +                           ENTRYLK_WRLCK, xdata); + +        return 0; +err: + +        STACK_UNWIND_STRICT (link, frame, -1, op_errno, NULL, NULL, +                             NULL, NULL, NULL); + +        if (new_frame) +                SDFS_STACK_DESTROY (new_frame); + +        return 0; +} + +int +sdfs_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                int32_t op_ret, int32_t op_errno, inode_t *inode, +                struct iatt *stbuf, struct iatt *preparent, +                struct iatt *postparent, dict_t *xdata) +{ +        sdfs_local_t *local = NULL; + +        local = frame->local; + +        STACK_UNWIND_STRICT (mknod, local->main_frame, op_ret, op_errno, inode, +                             stbuf, preparent, postparent, xdata); + +        local->main_frame = NULL; +        STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata); +        return 0; +} + +int +sdfs_mknod_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, +                   mode_t mode, dev_t rdev, mode_t umask, dict_t *xdata) +{ +        sdfs_local_t    *local                   = NULL; +        char             gfid[GF_UUID_BUF_SIZE]  = {0}; + +        local = frame->local; + +        gf_uuid_unparse(loc->pargfid, gfid); + +        if (local->op_ret < 0) { +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        SDFS_MSG_ENTRYLK_ERROR, +                        "Acquiring entry lock failed for directory %s " +                        "with parent gfid %s", local->loc.name, gfid); +                goto err; +        } + +        STACK_WIND (frame, sdfs_mknod_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, +                    umask, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (mknod, local->main_frame, -1, local->op_errno, +                             NULL, NULL, NULL, NULL, NULL); + +        local->main_frame = NULL; +        SDFS_STACK_DESTROY (frame); +        return 0; +} + +int +sdfs_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, +            dev_t rdev, mode_t umask, dict_t *xdata) +{ +        sdfs_local_t            *local      = NULL; +        call_frame_t            *new_frame  = NULL; +        call_stub_t             *stub       = NULL; +        int                      op_errno   = 0; + +        if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) { +                op_errno = ENOMEM; +                goto err; +        } + +        stub = fop_mknod_stub (new_frame, sdfs_mknod_helper, loc, mode, +                               rdev, umask, xdata); +        if (!stub) { +                op_errno = ENOMEM; +                goto err; +        } + +        ((sdfs_local_t *)new_frame->local)->stub = stub; + +        STACK_WIND (new_frame, sdfs_entrylk_cbk, +                    FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (mknod, frame, -1, op_errno, NULL, NULL, +                             NULL, NULL, NULL); + +        if (new_frame) +                SDFS_STACK_DESTROY (new_frame); + +        return 0; +} + +int +sdfs_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                 int32_t op_ret, int32_t op_errno, struct iatt *stbuf, +                 struct iatt *preoldparent, struct iatt *postoldparent, +                 struct iatt *prenewparent, struct iatt *postnewparent, +                 dict_t *xdata) +{ +        sdfs_local_t    *local = NULL; +        sdfs_lock_t     *lock  = NULL; +        int              i     = 0; +        int              call_cnt = 0; + +        local = frame->local; +        lock = local->lock; +        GF_ATOMIC_INIT (local->call_cnt, lock->lock_count); + +        STACK_UNWIND_STRICT (rename, local->main_frame, op_ret, op_errno, stbuf, +                             preoldparent, postoldparent, prenewparent, +                             postnewparent, xdata); + +        local->main_frame = NULL; +        call_cnt = GF_ATOMIC_GET (local->call_cnt); + +        for (i = 0; i < call_cnt; i++) { +                STACK_WIND_COOKIE (frame, sdfs_common_entrylk_cbk, +                                   (void *)(long) i, +                                   FIRST_CHILD (this), +                                   FIRST_CHILD(this)->fops->entrylk, +                                   this->name, &lock->entrylk[i].parent_loc, +                                   lock->entrylk[i].basename, +                                   ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata); +        } + +        return 0; +} + +int +sdfs_rename_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc, +                    loc_t *newloc, dict_t *xdata) +{ +        sdfs_local_t            *local                  = NULL; +        sdfs_lock_t             *lock                   = NULL; +        gf_boolean_t             stack_destroy          = _gf_true; +        int                      lock_count             = 0; +        int                      i                      = 0; + +        local = frame->local; +        lock = local->lock; + +        if (local->op_ret < 0) { +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        SDFS_MSG_ENTRYLK_ERROR, +                        "Acquiring entry lock failed "); +               goto err; +        } + +        STACK_WIND (frame, sdfs_rename_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->rename, oldloc, newloc, +                    xdata); + +        return 0; + +err: +        STACK_UNWIND_STRICT (rename, local->main_frame, -1, local->op_errno, +                             NULL, NULL, NULL, NULL, NULL, NULL); + +        local->main_frame = NULL; +        for (i = 0; i < lock->lock_count && lock->entrylk->locked[i]; i++) { +                lock_count++; +        } +        GF_ATOMIC_INIT (local->call_cnt, lock_count); + +        for (i = 0; i < lock_count; i++) { +                if (!lock->entrylk->locked[i]) { +                        lock_count++; +                        continue; +                } +                stack_destroy = _gf_false; +                STACK_WIND (frame, sdfs_common_entrylk_cbk, +                            FIRST_CHILD (this), +                            FIRST_CHILD(this)->fops->entrylk, +                            this->name, &lock->entrylk[i].parent_loc, +                            lock->entrylk[i].basename, +                            ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata); +        } + +        if (stack_destroy) +                SDFS_STACK_DESTROY (frame); + +        return 0; +} + +int +sdfs_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, +             loc_t *newloc, dict_t *xdata) +{ +        sdfs_local_t    *local      = NULL; +        sdfs_lock_t     *lock       = NULL; +        call_frame_t    *new_frame  = NULL; +        call_stub_t     *stub       = NULL; +        client_t        *client     = NULL; +        int              ret        = 0; +        int              op_errno   = -1; +        int              i          = 0; +        int              call_cnt   = 0; + +        new_frame = copy_frame (frame); +        if (!new_frame) { +                op_errno = ENOMEM; +                goto err; +        } + +        gf_client_ref (client); +        new_frame->root->client = client; +        local = sdfs_local_init (new_frame, this); +        if (!local) { +                op_errno = ENOMEM; +                goto err; +        } + +        local->main_frame = frame; + +        lock = GF_CALLOC (1, sizeof (*lock), gf_common_mt_char); +        if (!lock) +                goto err; + +        local->lock = lock; + +        ret = sdfs_init_entry_lock (&lock->entrylk[0], oldloc); +        if (ret) +                goto err; +        lock->entrylk->locked[0] = _gf_false; + +        ++lock->lock_count; + +        ret = sdfs_init_entry_lock (&lock->entrylk[1], newloc); +        if (ret) +                goto err; +        lock->entrylk->locked[1] = _gf_false; + +        ++lock->lock_count; + +        qsort (lock->entrylk, lock->lock_count, sizeof (*lock->entrylk), +               sdfs_entry_lock_cmp); + +        local->lock = lock; +        GF_ATOMIC_INIT (local->call_cnt, lock->lock_count); + +        stub = fop_rename_stub (new_frame, sdfs_rename_helper, oldloc, +                                newloc, xdata); +        if (!stub) { +                op_errno = ENOMEM; +                goto err; +        } + +        local->stub = stub; +        call_cnt = GF_ATOMIC_GET (local->call_cnt); +        for (i = 0; i < call_cnt; i++) { +                STACK_WIND_COOKIE (new_frame, sdfs_common_entrylk_cbk, +                                   (void *)(long) i, +                                   FIRST_CHILD (this), +                                   FIRST_CHILD(this)->fops->entrylk, +                                   this->name, &lock->entrylk[i].parent_loc, +                                   lock->entrylk[i].basename, +                                   ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata); +        } + +        return 0; +err: + +        STACK_UNWIND_STRICT (rename, frame, -1, op_errno, NULL, NULL, +                             NULL, NULL, NULL, NULL); + +        if (new_frame) +                SDFS_STACK_DESTROY (new_frame); + +        return 0; +} + +int +sdfs_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                 int32_t op_ret, int32_t op_errno, inode_t *inode, +                 struct iatt *stbuf,  dict_t *xdata, +                 struct iatt *postparent) +{ +        sdfs_local_t *local = NULL; + +        local = frame->local; + +        if (!local->loc.parent) { +                sdfs_local_cleanup (local); +                frame->local = NULL; +                STACK_UNWIND_STRICT (lookup, frame, op_ret, op_errno, +                                     inode, stbuf, xdata, postparent); +                return 0; +        } + +        STACK_UNWIND_STRICT (lookup, local->main_frame, op_ret, op_errno, inode, +                             stbuf, xdata, postparent); + +        local->main_frame = NULL; +        STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_UNLOCK, ENTRYLK_RDLCK, xdata); +        return 0; +} + +int +sdfs_lookup_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, +                    dict_t *xdata) +{ +        sdfs_local_t    *local                  = NULL; +        char             gfid[GF_UUID_BUF_SIZE] = {0}; + +        local = frame->local; + +        gf_uuid_unparse(loc->pargfid, gfid); + +        if (local->op_ret < 0) { +                gf_msg (this->name, GF_LOG_ERROR, 0, +                        SDFS_MSG_ENTRYLK_ERROR, +                        "Acquiring entry lock failed for directory %s " +                        "with parent gfid %s", local->loc.name, gfid); +                goto err; +        } + +        STACK_WIND (frame, sdfs_lookup_cbk, FIRST_CHILD(this), +                    FIRST_CHILD(this)->fops->lookup, loc, xdata); + +        return 0; +err: +        STACK_UNWIND_STRICT (lookup, local->main_frame, -1, local->op_errno, +                             NULL, NULL, NULL, NULL); +        local->main_frame = NULL; + +        SDFS_STACK_DESTROY (frame); +        return 0; +} + +int +sdfs_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, +             dict_t *xdata) +{ +        sdfs_local_t            *local      = NULL; +        call_frame_t            *new_frame  = NULL; +        call_stub_t             *stub       = NULL; +        int                      op_errno   = 0; + +        if (!loc->parent) { +                local = sdfs_local_init (frame, this); +                if (!local) { +                        op_errno = ENOMEM; +                        goto err; +                } + +                STACK_WIND_TAIL(frame, FIRST_CHILD(this), +                                FIRST_CHILD(this)->fops->lookup, +                                loc, xdata); +                return 0; +        } + +        if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) { +                op_errno = ENOMEM; +                goto err; +        } + +        stub = fop_lookup_stub (new_frame, sdfs_lookup_helper, loc, +                                xdata); +        if (!stub) { +                op_errno = ENOMEM; +                goto err; +        } + +        ((sdfs_local_t *)new_frame->local)->stub = stub; + +        STACK_WIND (new_frame, sdfs_entrylk_cbk, +                    FIRST_CHILD (this), +                    FIRST_CHILD(this)->fops->entrylk, +                    this->name, &local->parent_loc, local->loc.name, +                    ENTRYLK_LOCK, ENTRYLK_RDLCK, xdata); + +        return 0; + +err: +        STACK_UNWIND_STRICT (lookup, frame, -1, op_errno, NULL, NULL, +                             NULL, NULL); + +        if (new_frame) +                SDFS_STACK_DESTROY (new_frame); + +        return 0; +} + +int +init (xlator_t *this) +{ +        int ret = -1; + +        if (!this->children || this->children->next) { +                gf_log (this->name, GF_LOG_ERROR, +                        "'dentry-fop-serializer' not configured with exactly one child"); +                goto out; +        } + +        if (!this->parents) { +                gf_log (this->name, GF_LOG_WARNING, +                        "dangling volume. check volfile "); +        } + +        this->local_pool = mem_pool_new (sdfs_local_t, 512); +        if (!this->local_pool) { +                goto out; +        } + +        ret = 0; + +out: +        return ret; +} + +int +fini (xlator_t *this) +{ +        mem_pool_destroy (this->local_pool); + +        return 0; +} + + +struct xlator_fops fops = { +        .mkdir      = sdfs_mkdir, +        .rmdir      = sdfs_rmdir, +        .create     = sdfs_create, +        .unlink     = sdfs_unlink, +        .symlink    = sdfs_symlink, +        .link       = sdfs_link, +        .mknod      = sdfs_mknod, +        .rename     = sdfs_rename, +        .lookup     = sdfs_lookup, +}; + +struct xlator_cbks cbks; + diff --git a/xlators/features/sdfs/src/sdfs.h b/xlators/features/sdfs/src/sdfs.h new file mode 100644 index 00000000000..d28257eda5e --- /dev/null +++ b/xlators/features/sdfs/src/sdfs.h @@ -0,0 +1,49 @@ +/* +   Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "xlator.h" +#include "call-stub.h" +#include "sdfs-messages.h" +#include "atomic.h" + +#define SDFS_LOCK_COUNT_MAX    2 + +typedef struct{ +        loc_t           parent_loc; +        char            *basename; +        int             locked[SDFS_LOCK_COUNT_MAX]; +} sdfs_entry_lock_t; + +typedef struct { +        sdfs_entry_lock_t entrylk[SDFS_LOCK_COUNT_MAX]; +        int               lock_count; +} sdfs_lock_t; + +struct sdfs_local { +        call_frame_t *main_frame; +        loc_t         loc; +        loc_t         parent_loc; +        call_stub_t  *stub; +        sdfs_lock_t   *lock; +        int           op_ret; +        int           op_errno; +        gf_atomic_t   call_cnt; +}; +typedef struct sdfs_local sdfs_local_t; + +#define SDFS_STACK_DESTROY(frame) do {                   \ +                sdfs_local_t *__local = NULL;            \ +                __local               = frame->local;    \ +                frame->local          = NULL;            \ +                gf_client_unref (frame->root->client);   \ +                STACK_DESTROY (frame->root);             \ +                sdfs_local_cleanup (__local);            \ +        } while (0) + diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c index 6a02da17918..235a4bd4022 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volgen.c +++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c @@ -2024,6 +2024,31 @@ out:          return ret;  } +static int +brick_graph_add_sdfs (volgen_graph_t *graph, glusterd_volinfo_t *volinfo, +                      dict_t *set_dict, glusterd_brickinfo_t *brickinfo) +{ +        xlator_t        *xl = NULL; +        int             ret = -1; + +        if (!graph || !volinfo) +                goto out; + +        if (!dict_get_str_boolean (set_dict, "features.sdfs", 0)) { +                /* update only if option is enabled */ +                ret = 0; +                goto out; +        } + +        xl = volgen_graph_add (graph, "features/sdfs", volinfo->volname); +        if (!xl) +                goto out; + +        ret = 0; +out: +        return ret; +} +  xlator_t *  add_one_peer (volgen_graph_t *graph, glusterd_brickinfo_t *peer,                char *volname, uint16_t index) @@ -2616,6 +2641,7 @@ static volgen_brick_xlator_t server_graph_table[] = {          {brick_graph_add_server, NULL},          {brick_graph_add_decompounder, "decompounder"},          {brick_graph_add_io_stats, "NULL"}, +        {brick_graph_add_sdfs, "sdfs"},          {brick_graph_add_cdc, NULL},          {brick_graph_add_quota, "quota"},          {brick_graph_add_index, "index"}, diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index acb493540da..1d4797afbeb 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -3718,6 +3718,16 @@ struct volopt_map_entry glusterd_volopt_map[] = {            .op_version = GD_OP_VERSION_3_13_0,            .flags      = VOLOPT_FLAG_CLIENT_OPT          }, +        { .key         = "features.sdfs", +          .voltype     = "features/sdfs", +          .value       = "off", +          .option      = "!features", +          .op_version  = GD_OP_VERSION_4_0_0, +          .description = "enable/disable dentry serialization xlator in volume", +          .flags       = VOLOPT_FLAG_CLIENT_OPT | VOLOPT_FLAG_XLATOR_OPT, +          .type        = NO_DOC, +        }, +          { .key         = NULL          }  };  | 
