diff options
| -rw-r--r-- | libglusterfs/src/glusterfs.h | 1 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/Makefile.am | 4 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.c | 293 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 103 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-helper.c | 662 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-helper.h | 19 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-lock.c | 1383 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-lock.h | 94 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-messages.h | 23 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-rename.c | 358 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-selfheal.c | 99 | 
11 files changed, 1898 insertions, 1141 deletions
diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index ce0dde22d4a..4d5ca839cd3 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -234,6 +234,7 @@  #define GF_MAX_AUX_GROUPS   65535  #define GF_UUID_BUF_SIZE 50 +#define GF_UUID_BNAME_BUF_SIZE (320) /* (64 + 256) */  #define GF_REBALANCE_TID_KEY     "rebalance-id"  #define GF_REMOVE_BRICK_TID_KEY  "remove-brick-id" diff --git a/xlators/cluster/dht/src/Makefile.am b/xlators/cluster/dht/src/Makefile.am index 19ec002f0fd..525a214c24a 100644 --- a/xlators/cluster/dht/src/Makefile.am +++ b/xlators/cluster/dht/src/Makefile.am @@ -10,7 +10,7 @@ xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster  dht_common_source = dht-layout.c dht-helper.c dht-linkfile.c dht-rebalance.c \  	dht-selfheal.c dht-rename.c dht-hashfn.c dht-diskusage.c \  	dht-common.c dht-inode-write.c dht-inode-read.c dht-shared.c \ -	$(top_builddir)/xlators/lib/src/libxlator.c +	dht-lock.c $(top_builddir)/xlators/lib/src/libxlator.c  dht_la_SOURCES = $(dht_common_source) dht.c @@ -35,7 +35,7 @@ tier_la_LDFLAGS = -module -avoid-version -export-symbols \  tier_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la  noinst_HEADERS = dht-common.h dht-mem-types.h dht-messages.h \ -	dht-helper.h tier-common.h tier.h \ +	dht-lock.h tier-common.h tier.h \  	$(top_builddir)/xlators/lib/src/libxlator.h  AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c index 836a009c362..f7b3ffd5aae 100644 --- a/xlators/cluster/dht/src/dht-common.c +++ b/xlators/cluster/dht/src/dht-common.c @@ -15,6 +15,7 @@  #include "xlator.h"  #include "libxlator.h"  #include "dht-common.h" +#include "dht-lock.h"  #include "defaults.h"  #include "byte-order.h"  #include "glusterfs-acl.h" @@ -5527,7 +5528,7 @@ out:          dht_set_fixed_dir_stat (postparent);          dht_set_fixed_dir_stat (preparent); -        if (local && local->lock.locks) { +        if (local && local->lock[0].layout.parent_layout.locks) {                  /* store op_errno for failure case*/                  local->op_errno = op_errno;                  local->refresh_layout_unlock (frame, this, op_ret, 1); @@ -5590,7 +5591,7 @@ dht_mknod_linkfile_create_cbk (call_frame_t *frame, void *cookie,          return 0;  err: -        if (local && local->lock.locks) { +        if (local && local->lock[0].layout.parent_layout.locks) {                  local->refresh_layout_unlock (frame, this, -1, 1);          } else {                  DHT_STACK_UNWIND (mknod, frame, -1, @@ -5720,7 +5721,8 @@ dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret,          int           lock_count = 0;          local = frame->local; -        lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count); +        lock_count = dht_lock_count (local->lock[0].layout.parent_layout.locks, +                                     local->lock[0].layout.parent_layout.lk_count);          if (lock_count == 0)                  goto done; @@ -5735,14 +5737,15 @@ dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret,                  goto done;          } -        lock_local->lock.locks = local->lock.locks; -        lock_local->lock.lk_count = local->lock.lk_count; +        lock_local->lock[0].layout.parent_layout.locks = local->lock[0].layout.parent_layout.locks; +        lock_local->lock[0].layout.parent_layout.lk_count = local->lock[0].layout.parent_layout.lk_count; -        local->lock.locks = NULL; -        local->lock.lk_count = 0; +        local->lock[0].layout.parent_layout.locks = NULL; +        local->lock[0].layout.parent_layout.lk_count = 0; -        dht_unlock_inodelk (lock_frame, lock_local->lock.locks, -                            lock_local->lock.lk_count, +        dht_unlock_inodelk (lock_frame, +                            lock_local->lock[0].layout.parent_layout.locks, +                            lock_local->lock[0].layout.parent_layout.lk_count,                              dht_mknod_unlock_cbk);          lock_frame = NULL; @@ -5804,26 +5807,26 @@ dht_mknod_lock (call_frame_t *frame, xlator_t *subvol)          local = frame->local; -        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); +        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_pointer);          if (lk_array == NULL)                  goto err;          lk_array[0] = dht_lock_new (frame->this, subvol, &local->loc, F_RDLCK, -                                    DHT_LAYOUT_HEAL_DOMAIN); +                                    DHT_LAYOUT_HEAL_DOMAIN, NULL);          if (lk_array[0] == NULL)                  goto err; -        local->lock.locks = lk_array; -        local->lock.lk_count = count; +        local->lock[0].layout.parent_layout.locks = lk_array; +        local->lock[0].layout.parent_layout.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count,                                      IGNORE_ENOENT_ESTALE, dht_mknod_lock_cbk);          if (ret < 0) { -                local->lock.locks = NULL; -                local->lock.lk_count = 0; +                local->lock[0].layout.parent_layout.locks = NULL; +                local->lock[0].layout.parent_layout.lk_count = 0;                  goto err;          } @@ -5917,81 +5920,8 @@ dht_handle_parent_layout_change (xlator_t *this, call_stub_t *stub)  }  int32_t -dht_unlock_parent_layout_during_entry_fop_done (call_frame_t *frame, -                                                void *cookie, -                                                xlator_t *this, -                                                int32_t op_ret, -                                                int32_t op_errno, -                                                dict_t *xdata) -{ -        dht_local_t *local                   = NULL; -        char          gfid[GF_UUID_BUF_SIZE] = {0}; - -        local = frame->local; -        gf_uuid_unparse (local->lock.locks[0]->loc.inode->gfid, gfid); - -        if (op_ret < 0) { -                gf_msg (this->name, GF_LOG_WARNING, op_errno, -                        DHT_MSG_PARENT_LAYOUT_CHANGED, -                        "unlock failed on gfid: %s, stale lock might be left " -                        "in DHT_LAYOUT_HEAL_DOMAIN", gfid); -        } - -        DHT_STACK_DESTROY (frame); -        return 0; -} - -int32_t -dht_unlock_parent_layout_during_entry_fop (call_frame_t *frame) -{ -        dht_local_t  *local                   = NULL, *lock_local = NULL; -        call_frame_t *lock_frame              = NULL; -        char          pgfid[GF_UUID_BUF_SIZE] = {0}; - -        local = frame->local; - -        gf_uuid_unparse (local->loc.parent->gfid, pgfid); - -        lock_frame = copy_frame (frame); -        if (lock_frame == NULL) { -                gf_msg (frame->this->name, GF_LOG_WARNING, ENOMEM, -                        DHT_MSG_PARENT_LAYOUT_CHANGED, -                        "mkdir (%s/%s) (path: %s): " -                        "copy frame failed", pgfid, local->loc.name, -                        local->loc.path); -                goto done; -        } - -        lock_local = mem_get0 (THIS->local_pool); -        if (lock_local == NULL) { -                gf_msg (frame->this->name, GF_LOG_WARNING, ENOMEM, -                        DHT_MSG_PARENT_LAYOUT_CHANGED, -                        "mkdir (%s/%s) (path: %s): " -                        "local creation failed", pgfid, local->loc.name, -                        local->loc.path); -                goto done; -        } - -        lock_frame->local = lock_local; - -        lock_local->lock.locks = local->lock.locks; -        lock_local->lock.lk_count = local->lock.lk_count; - -        local->lock.locks = NULL; -        local->lock.lk_count = 0; - -        dht_unlock_inodelk (lock_frame, lock_local->lock.locks, -                            lock_local->lock.lk_count, -                            dht_unlock_parent_layout_during_entry_fop_done); - -done: -        return 0; -} - -int32_t -dht_guard_parent_layout_during_entry_fop_cbk (call_frame_t *frame, void *cookie, -                                              xlator_t *this, int32_t op_ret, -                                              int32_t op_errno, dict_t *xdata) +dht_call_mkdir_stub (call_frame_t *frame, void *cookie, xlator_t *this, +                     int32_t op_ret, int32_t op_errno, dict_t *xdata)  {          dht_local_t *local = NULL;          call_stub_t *stub  = NULL; @@ -6013,16 +5943,14 @@ dht_guard_parent_layout_during_entry_fop_cbk (call_frame_t *frame, void *cookie,  }  int32_t -dht_guard_parent_layout_during_entry_fop (xlator_t *subvol, call_stub_t *stub) +dht_guard_parent_layout_and_namespace (xlator_t *subvol, call_stub_t *stub)  {          dht_local_t   *local                  = NULL; -        int            count                  = 1,    ret = -1; -        dht_lock_t   **lk_array               = NULL; +        int            ret                    = -1;          loc_t         *loc                    = NULL;          xlator_t      *hashed_subvol          = NULL, *this = NULL;;          call_frame_t  *frame                  = NULL;          char          pgfid[GF_UUID_BUF_SIZE] = {0}; -        loc_t          parent                 = {0, };          int32_t       *parent_disk_layout     = NULL;          dht_layout_t  *parent_layout          = NULL;          dht_conf_t    *conf                   = NULL; @@ -6118,67 +6046,16 @@ dht_guard_parent_layout_during_entry_fop (xlator_t *subvol, call_stub_t *stub)          }          parent_disk_layout = NULL; +        local->hashed_subvol = hashed_subvol; -        parent.inode = inode_ref (loc->parent); -        gf_uuid_copy (parent.gfid, loc->parent->gfid); - -        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); - -        if (lk_array == NULL) { -                local->op_errno = ENOMEM; - -                gf_msg (this->name, GF_LOG_WARNING, local->op_errno, -                        DHT_MSG_PARENT_LAYOUT_CHANGED, -                        "%s (%s/%s) (path: %s): " -                        "calloc failure", -                        gf_fop_list[stub->fop], pgfid, loc->name, loc->path); - -                goto err; -        } - -        lk_array[0] = dht_lock_new (frame->this, hashed_subvol, &parent, -                                    F_RDLCK, DHT_LAYOUT_HEAL_DOMAIN); - -        if (lk_array[0] == NULL) { -                local->op_errno = ENOMEM; -                gf_msg (this->name, GF_LOG_WARNING, local->op_errno, -                        DHT_MSG_PARENT_LAYOUT_CHANGED, -                        "%s (%s/%s) (path: %s): " -                        "lock allocation failed", -                        gf_fop_list[stub->fop], pgfid, loc->name, loc->path); - -                goto err; -        } - -        local->lock.locks = lk_array; -        local->lock.lk_count = count; - -        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR, -                                    dht_guard_parent_layout_during_entry_fop_cbk); - -        if (ret < 0) { -                local->op_errno = EIO; -                local->lock.locks = NULL; -                local->lock.lk_count = 0; -                gf_msg (this->name, GF_LOG_WARNING, local->op_errno, -                        DHT_MSG_PARENT_LAYOUT_CHANGED, -                        "%s (%s/%s) (path: %s): " -                        "dht_blocking_inodelk failed", -                        gf_fop_list[stub->fop], pgfid, loc->name, loc->path); - +        local->current = &local->lock[0]; +        ret = dht_protect_namespace (frame, loc, hashed_subvol, +                                     &local->current->ns, dht_call_mkdir_stub); +        if (ret < 0)                  goto err; -        } - -        loc_wipe (&parent);          return 0;  err: -        if (lk_array != NULL) { -                dht_lock_array_free (lk_array, count); -                GF_FREE (lk_array); -        } - -        loc_wipe (&parent);          if (parent_disk_layout != NULL)                  GF_FREE (parent_disk_layout); @@ -6271,7 +6148,7 @@ dht_mknod (call_frame_t *frame, xlator_t *this,                          if (ret) {                                  gf_msg (this->name, GF_LOG_ERROR, ENOMEM, -                                        DHT_MSG_NO_MEMORY, +                                        DHT_MSG_LOC_FAILED,                                          "parent loc build failed");                                  goto err;                          } @@ -6708,7 +6585,7 @@ out:          dht_set_fixed_dir_stat (preparent);          dht_set_fixed_dir_stat (postparent); -        if (local && local->lock.locks) { +        if (local && local->lock[0].layout.parent_layout.locks) {                  /* store op_errno for failure case*/                  local->op_errno = op_errno;                  local->refresh_layout_unlock (frame, this, op_ret, 1); @@ -6769,7 +6646,7 @@ dht_create_linkfile_create_cbk (call_frame_t *frame, void *cookie,          return 0;  err: -        if (local && local->lock.locks) { +        if (local && local->lock[0].layout.parent_layout.locks) {                  local->refresh_layout_unlock (frame, this, -1, 1);          } else {                  DHT_STACK_UNWIND (create, frame, -1, @@ -6958,7 +6835,8 @@ dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret,          int           lock_count = 0;          local = frame->local; -        lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count); +        lock_count = dht_lock_count (local->lock[0].layout.parent_layout.locks, +                                     local->lock[0].layout.parent_layout.lk_count);          if (lock_count == 0)                  goto done; @@ -6973,14 +6851,15 @@ dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret,                  goto done;          } -        lock_local->lock.locks = local->lock.locks; -        lock_local->lock.lk_count = local->lock.lk_count; +        lock_local->lock[0].layout.parent_layout.locks = local->lock[0].layout.parent_layout.locks; +        lock_local->lock[0].layout.parent_layout.lk_count = local->lock[0].layout.parent_layout.lk_count; -        local->lock.locks = NULL; -        local->lock.lk_count = 0; +        local->lock[0].layout.parent_layout.locks = NULL; +	local->lock[0].layout.parent_layout.lk_count = 0; -        dht_unlock_inodelk (lock_frame, lock_local->lock.locks, -                            lock_local->lock.lk_count, +        dht_unlock_inodelk (lock_frame, +                            lock_local->lock[0].layout.parent_layout.locks, +                            lock_local->lock[0].layout.parent_layout.lk_count,                              dht_create_unlock_cbk);          lock_frame = NULL; @@ -7042,26 +6921,26 @@ dht_create_lock (call_frame_t *frame, xlator_t *subvol)          local = frame->local; -        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); +        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_pointer);          if (lk_array == NULL)                  goto err;          lk_array[0] = dht_lock_new (frame->this, subvol, &local->loc, F_RDLCK, -                                    DHT_LAYOUT_HEAL_DOMAIN); +                                    DHT_LAYOUT_HEAL_DOMAIN, NULL);          if (lk_array[0] == NULL)                  goto err; -        local->lock.locks = lk_array; -        local->lock.lk_count = count; +        local->lock[0].layout.parent_layout.locks = lk_array; +        local->lock[0].layout.parent_layout.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count,                                      IGNORE_ENOENT_ESTALE, dht_create_lock_cbk);          if (ret < 0) { -                local->lock.locks = NULL; -                local->lock.lk_count = 0; +                local->lock[0].layout.parent_layout.locks = NULL; +                local->lock[0].layout.parent_layout.lk_count = 0;                  goto err;          } @@ -7172,7 +7051,7 @@ dht_create (call_frame_t *frame, xlator_t *this,                          if (ret) {                                  gf_msg (this->name, GF_LOG_ERROR, ENOMEM, -                                        DHT_MSG_NO_MEMORY, +                                        DHT_MSG_LOC_FAILED,                                          "parent loc build failed");                                  goto err;                          } @@ -7305,6 +7184,8 @@ unlock:          this_call_cnt = dht_frame_return (frame);          if (is_last_call (this_call_cnt)) { +                /*Unlock entrylk and inodelk once mkdir is done on all subvols*/ +                dht_unlock_namespace (frame, &local->lock[0]);                  FRAME_SU_DO (frame, dht_local_t);                  dht_selfheal_new_directory (frame, dht_mkdir_selfheal_cbk,                                              layout); @@ -7433,7 +7314,7 @@ dht_mkdir_helper (call_frame_t *frame, xlator_t *this,          return 0;  err: -        dht_unlock_parent_layout_during_entry_fop (frame); +        dht_unlock_namespace (frame, &local->lock[0]);          op_errno = local ? local->op_errno : op_errno;          DHT_STACK_UNWIND (mkdir, frame, -1, op_errno, NULL, NULL, NULL, @@ -7508,7 +7389,6 @@ dht_mkdir_hashed_cbk (call_frame_t *frame, void *cookie,                  goto err;          } -        dht_unlock_parent_layout_during_entry_fop (frame);          dict_del (local->params, GF_PREOP_PARENT_KEY);          dict_del (local->params, conf->xattr_name); @@ -7538,6 +7418,8 @@ dht_mkdir_hashed_cbk (call_frame_t *frame, void *cookie,          if (gf_uuid_is_null (local->loc.gfid))                  gf_uuid_copy (local->loc.gfid, stbuf->ia_gfid);          if (local->call_cnt == 0) { +                /*Unlock namespace lock once mkdir is done on all subvols*/ +                dht_unlock_namespace (frame, &local->lock[0]);                  FRAME_SU_DO (frame, dht_local_t);                  dht_selfheal_directory (frame, dht_mkdir_selfheal_cbk,                                          &local->loc, layout); @@ -7554,8 +7436,9 @@ dht_mkdir_hashed_cbk (call_frame_t *frame, void *cookie,          }          return 0;  err: -        if (local->op_ret != 0) -                dht_unlock_parent_layout_during_entry_fop (frame); +        if (local->op_ret != 0) { +                dht_unlock_namespace (frame, &local->lock[0]); +        }          DHT_STACK_UNWIND (mkdir, frame, -1, op_errno, NULL, NULL, NULL,                            NULL, NULL); @@ -7686,7 +7569,7 @@ dht_mkdir (call_frame_t *frame, xlator_t *this,                  goto err;          } -        ret = dht_guard_parent_layout_during_entry_fop (this, stub); +        ret = dht_guard_parent_layout_and_namespace (this, stub);          if (ret < 0) {                  gf_msg (this->name, GF_LOG_WARNING, 0,                          DHT_MSG_PARENT_LAYOUT_CHANGED, @@ -8019,7 +7902,13 @@ dht_rmdir_unlock (call_frame_t *frame, xlator_t *this)          int           lock_count = 0;          local = frame->local; -        lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count); + +        /* Unlock entrylk */ +        dht_unlock_entrylk_wrapper (frame, &local->lock[0].ns.directory_ns); + +        /* Unlock inodelk */ +        lock_count = dht_lock_count (local->lock[0].ns.parent_layout.locks, +                                     local->lock[0].ns.parent_layout.lk_count);          if (lock_count == 0)                  goto done; @@ -8033,13 +7922,14 @@ dht_rmdir_unlock (call_frame_t *frame, xlator_t *this)          if (lock_local == NULL)                  goto done; -        lock_local->lock.locks = local->lock.locks; -        lock_local->lock.lk_count = local->lock.lk_count; +        lock_local->lock[0].ns.parent_layout.locks = local->lock[0].ns.parent_layout.locks; +        lock_local->lock[0].ns.parent_layout.lk_count = local->lock[0].ns.parent_layout.lk_count; -        local->lock.locks = NULL; -        local->lock.lk_count = 0; -        dht_unlock_inodelk (lock_frame, lock_local->lock.locks, -                            lock_local->lock.lk_count, +        local->lock[0].ns.parent_layout.locks = NULL; +        local->lock[0].ns.parent_layout.lk_count = 0; +        dht_unlock_inodelk (lock_frame, +                            lock_local->lock[0].ns.parent_layout.locks, +                            lock_local->lock[0].ns.parent_layout.lk_count,                              dht_rmdir_unlock_cbk);          lock_frame = NULL; @@ -8068,7 +7958,7 @@ dht_rmdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          if (op_ret < 0) {                  gf_msg (this->name, GF_LOG_WARNING, op_errno,                          DHT_MSG_INODE_LK_ERROR, -                        "acquiring inodelk failed rmdir for %s)", +                        "acquiring entrylk after inodelk failed rmdir for %s)",                          local->loc.path);                  local->op_ret = -1; @@ -8090,8 +7980,6 @@ dht_rmdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        /* No harm in calling an extra rmdir unlock */ -        dht_rmdir_unlock (frame, this);          DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,                            &local->preparent, &local->postparent, NULL); @@ -8104,9 +7992,7 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)  {          dht_local_t  *local = NULL;          dht_conf_t   *conf = NULL; -        dht_lock_t   **lk_array = NULL; -        int           i = 0, ret = -1; -        int           count = 1; +        int           ret = -1;          xlator_t     *hashed_subvol = NULL;          char gfid[GF_UUID_BUF_SIZE] ={0}; @@ -8143,36 +8029,10 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)                  return 0;          } -        count = conf->subvolume_cnt; - -        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); -        if (lk_array == NULL) { -                local->op_ret = -1; -                local->op_errno = ENOMEM; -                goto err; -        } - -        for (i = 0; i < count; i++) { -                lk_array[i] = dht_lock_new (frame->this, -                                            conf->subvolumes[i], -                                            &local->loc, F_WRLCK, -                                            DHT_LAYOUT_HEAL_DOMAIN); -                if (lk_array[i] == NULL) { -                        local->op_ret = -1; -                        local->op_errno = EINVAL; -                        goto err; -                } -        } - -        local->lock.locks = lk_array; -        local->lock.lk_count = count; - -        ret = dht_blocking_inodelk (frame, lk_array, count, -                                    IGNORE_ENOENT_ESTALE, -                                    dht_rmdir_lock_cbk); +        local->current = &local->lock[0]; +        ret = dht_protect_namespace (frame, &local->loc, local->hashed_subvol, +                                     &local->current->ns, dht_rmdir_lock_cbk);          if (ret < 0) { -                local->lock.locks = NULL; -                local->lock.lk_count = 0;                  local->op_ret = -1;                  local->op_errno = errno ? errno : EINVAL;                  goto err; @@ -8184,11 +8044,6 @@ err:          dht_set_fixed_dir_stat (&local->preparent);          dht_set_fixed_dir_stat (&local->postparent); -        if (lk_array != NULL) { -                dht_lock_array_free (lk_array, count); -                GF_FREE (lk_array); -        } -          DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,                            &local->preparent, &local->postparent, NULL);          return 0; diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index 0e082e35c57..21433b6c8b7 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -30,7 +30,10 @@  #define GF_DHT_LOOKUP_UNHASHED_AUTO     2  #define DHT_PATHINFO_HEADER             "DISTRIBUTE:"  #define DHT_FILE_MIGRATE_DOMAIN         "dht.file.migrate" +/* Layout synchronization */  #define DHT_LAYOUT_HEAL_DOMAIN          "dht.layout.heal" +/* Namespace synchronization */ +#define DHT_ENTRY_SYNC_DOMAIN           "dht.entry.sync"  #define TIERING_MIGRATION_KEY           "tiering.migration"  #define DHT_LAYOUT_HASH_INVALID         1 @@ -113,6 +116,11 @@ typedef enum {          DHT_HASH_TYPE_DM_USER,  } dht_hashfn_type_t; +typedef enum { +        DHT_INODELK, +        DHT_ENTRYLK, +} dht_lock_type_t; +  /* rebalance related */  struct dht_rebalance_ {          xlator_t            *from_subvol; @@ -166,10 +174,52 @@ typedef struct {          char         *domain;  /* Only locks within a single domain                                  * contend with each other                                  */ +        char         *basename; /* Required for entrylk */          gf_lkowner_t  lk_owner;          gf_boolean_t  locked;  } dht_lock_t; +/* The lock structure represents inodelk. */ +typedef struct { +        fop_inodelk_cbk_t   inodelk_cbk; +        dht_lock_t        **locks; +        int                 lk_count; +        dht_reaction_type_t reaction; + +        /* whether locking failed on _any_ of the "locks" above */ +        int                 op_ret; +        int                 op_errno; +} dht_ilock_wrap_t; + +/* The lock structure represents entrylk. */ +typedef struct { +        fop_entrylk_cbk_t   entrylk_cbk; +        dht_lock_t        **locks; +        int                 lk_count; +        dht_reaction_type_t reaction; + +        /* whether locking failed on _any_ of the "locks" above */ +        int                 op_ret; +        int                 op_errno; +} dht_elock_wrap_t; + +/* The first member of dht_dir_transaction_t should be of type dht_ilock_wrap_t. + * Otherwise it can result in subtle memory corruption issues as in most of the + * places we use lock[0].layout.my_layout or lock[0].layout.parent_layout and + * lock[0].ns.parent_layout (like in dht_local_wipe). + */ +typedef union { +        union { +                 dht_ilock_wrap_t my_layout; +                 dht_ilock_wrap_t parent_layout; +        } layout; +        struct dht_namespace { +                dht_ilock_wrap_t parent_layout; +                dht_elock_wrap_t directory_ns; +                fop_entrylk_cbk_t ns_cbk; +        } ns; +} dht_dir_transaction_t; +  typedef  int (*dht_selfheal_layout_t)(call_frame_t *frame, loc_t *loc,                               dht_layout_t *layout); @@ -288,16 +338,7 @@ struct dht_local {          struct dht_skip_linkto_unlink  skip_unlink; -        struct { -                fop_inodelk_cbk_t   inodelk_cbk; -                dht_lock_t        **locks; -                int                 lk_count; -                dht_reaction_type_t reaction; - -                /* whether locking failed on _any_ of the "locks" above */ -                int                 op_ret; -                int                 op_errno; -        } lock; +        dht_dir_transaction_t lock[2], *current;          short           lock_type; @@ -1187,47 +1228,6 @@ dht_lookup_everywhere_done (call_frame_t *frame, xlator_t *this);  int  dht_fill_dict_to_avoid_unlink_of_migrating_file (dict_t *dict); - -/* Acquire non-blocking inodelk on a list of xlators. - * - * @lk_array: array of lock requests lock on. - * - * @lk_count: number of locks in @lk_array - * - * @inodelk_cbk: will be called after inodelk replies are received - * - * @retval: -1 if stack_winding inodelk fails. 0 otherwise. - *          inodelk_cbk is called with appropriate error on errors. - *          On failure to acquire lock on all members of list, successful - *          locks are unlocked before invoking cbk. - */ - -int -dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                         int lk_count, fop_inodelk_cbk_t inodelk_cbk); - -/* same as dht_nonblocking_inodelk, but issues sequential blocking locks on - * @lk_array directly. locks are issued on some order which remains same - * for a list of xlators (irrespective of order of xlators within list). - */ -int -dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                      int lk_count, dht_reaction_type_t reaction, -                      fop_inodelk_cbk_t inodelk_cbk); - -int32_t -dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count, -                    fop_inodelk_cbk_t inodelk_cbk); - -dht_lock_t * -dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type, -              const char *domain); -void -dht_lock_array_free (dht_lock_t **lk_array, int count); - -int32_t -dht_lock_count (dht_lock_t **lk_array, int lk_count); -  int  dht_layout_sort (dht_layout_t *layout); @@ -1291,5 +1291,4 @@ getChoices (const char *value);  int  dht_aggregate_split_brain_xattr (dict_t *dst, char *key, data_t *value); -  #endif/* _DHT_H */ diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c index 0a2abfb697b..6f08f557730 100644 --- a/xlators/cluster/dht/src/dht-helper.c +++ b/xlators/cluster/dht/src/dht-helper.c @@ -12,8 +12,7 @@  #include "glusterfs.h"  #include "xlator.h"  #include "dht-common.h" -#include "dht-helper.h" - +#include "dht-lock.h"  static void  dht_free_fd_ctx (dht_fd_ctx_t *fd_ctx) @@ -400,170 +399,11 @@ dht_deitransform (xlator_t *this, uint64_t y, xlator_t **subvol_p)          return 0;  } -char * -dht_lock_asprintf (dht_lock_t *lock) -{ -        char *lk_buf                = NULL; -        char gfid[GF_UUID_BUF_SIZE] = {0, }; - -        if (lock == NULL) -                goto out; - -        uuid_utoa_r (lock->loc.gfid, gfid); - -        gf_asprintf (&lk_buf, "%s:%s", lock->xl->name, gfid); - -out: -        return lk_buf; -} - -void -dht_log_lk_array (char *name, gf_loglevel_t log_level, dht_lock_t **lk_array, -                  int count) -{ -        int   i      = 0; -        char *lk_buf = NULL; - -        if ((lk_array == NULL) || (count == 0)) -                goto out; - -        for (i = 0; i < count; i++) { -                lk_buf = dht_lock_asprintf (lk_array[i]); -                gf_msg (name, log_level, 0, DHT_MSG_LK_ARRAY_INFO, -                        "%d. %s", i, lk_buf); -                GF_FREE (lk_buf); -        } - -out: -        return; -} - -void -dht_lock_stack_destroy (call_frame_t *lock_frame) -{ -        dht_local_t *local = NULL; - -        local = lock_frame->local; - -        local->lock.locks = NULL; -        local->lock.lk_count = 0; - -        DHT_STACK_DESTROY (lock_frame); -        return; -} - -void -dht_lock_free (dht_lock_t *lock) -{ -        if (lock == NULL) -                goto out; - -        loc_wipe (&lock->loc); -        GF_FREE (lock->domain); -        mem_put (lock); - -out: -        return; -} - -void -dht_lock_array_free (dht_lock_t **lk_array, int count) -{ -        int            i       = 0; -        dht_lock_t    *lock    = NULL; - -        if (lk_array == NULL) -                goto out; - -        for (i = 0; i < count; i++) { -                lock = lk_array[i]; -                lk_array[i] = NULL; -                dht_lock_free (lock); -        } - -out: -        return; -} - -dht_lock_t * -dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type, -              const char *domain) -{ -        dht_conf_t *conf = NULL; -        dht_lock_t *lock = NULL; - -        conf = this->private; - -        lock = mem_get0 (conf->lock_pool); -        if (lock == NULL) -                goto out; - -        lock->xl = xl; -        lock->type = type; - -        lock->domain = gf_strdup (domain); -        if (lock->domain == NULL) { -                dht_lock_free (lock); -                lock = NULL; -                goto out; -        } - -        /* Fill only inode and gfid. -           posix and protocol/server give preference to pargfid/basename over -           gfid/inode for resolution if all the three parameters of loc_t are -           present. I want to avoid the following hypothetical situation: - -           1. rebalance did a lookup on a dentry and got a gfid. -           2. rebalance acquires lock on loc_t which was filled with gfid and -              path (pargfid/bname) from step 1. -           3. somebody deleted and recreated the same file -           4. rename on the same path acquires lock on loc_t which now points -              to a different inode (and hence gets the lock). -           5. rebalance continues to migrate file (note that not all fops done -              by rebalance during migration are inode/gfid based Eg., unlink) -           6. rename continues. -        */ -        lock->loc.inode = inode_ref (loc->inode); -        loc_gfid (loc, lock->loc.gfid); - -out: -        return lock; -} - -int -dht_local_lock_init (call_frame_t *frame, dht_lock_t **lk_array, -                     int lk_count, fop_inodelk_cbk_t inodelk_cbk) -{ -        int          ret   = -1; -        dht_local_t *local = NULL; - -        local = frame->local; - -        if (local == NULL) { -                local = dht_local_init (frame, NULL, NULL, 0); -        } - -        if (local == NULL) { -                goto out; -        } - -        local->lock.inodelk_cbk = inodelk_cbk; -        local->lock.locks = lk_array; -        local->lock.lk_count = lk_count; - -        ret = dht_lock_order_requests (local->lock.locks, -                                       local->lock.lk_count); -        if (ret < 0) -                goto out; - -        ret = 0; -out: -        return ret; -} -  void  dht_local_wipe (xlator_t *this, dht_local_t *local)  { +        int i = 0; +          if (!local)                  return; @@ -612,8 +452,16 @@ dht_local_wipe (xlator_t *this, dht_local_t *local)                  local->selfheal.refreshed_layout = NULL;          } -        dht_lock_array_free (local->lock.locks, local->lock.lk_count); -        GF_FREE (local->lock.locks); +        for (i = 0; i < 2; i++) { +                dht_lock_array_free (local->lock[i].ns.parent_layout.locks, +                                     local->lock[i].ns.parent_layout.lk_count); + +                GF_FREE (local->lock[i].ns.parent_layout.locks); + +                dht_lock_array_free (local->lock[i].ns.directory_ns.locks, +                                     local->lock[i].ns.directory_ns.lk_count); +                GF_FREE (local->lock[i].ns.directory_ns.locks); +        }          GF_FREE (local->key); @@ -657,6 +505,7 @@ dht_local_init (call_frame_t *frame, loc_t *loc, fd_t *fd, glusterfs_fop_t fop)                          goto out;                  inode = loc->inode; +                local->hashed_subvol = dht_subvol_get_hashed (frame->this, loc);          }          if (fd) { @@ -1727,22 +1576,6 @@ out:          return ret;  } -void -dht_set_lkowner (dht_lock_t **lk_array, int count, gf_lkowner_t *lkowner) -{ -        int i = 0; - -        if (!lk_array || !lkowner) -                goto out; - -        for (i = 0; i < count; i++) { -                lk_array[i]->lk_owner = *lkowner; -        } - -out: -        return; -} -  int  dht_subvol_status (dht_conf_t *conf, xlator_t *subvol)  { @@ -1756,473 +1589,6 @@ dht_subvol_status (dht_conf_t *conf, xlator_t *subvol)          return 0;  } -void -dht_inodelk_done (call_frame_t *lock_frame) -{ -        fop_inodelk_cbk_t  inodelk_cbk = NULL; -        call_frame_t      *main_frame  = NULL; -        dht_local_t       *local       = NULL; - -        local = lock_frame->local; -        main_frame = local->main_frame; - -        local->lock.locks = NULL; -        local->lock.lk_count = 0; - -        inodelk_cbk = local->lock.inodelk_cbk; -        local->lock.inodelk_cbk = NULL; - -        inodelk_cbk (main_frame, NULL, main_frame->this, local->lock.op_ret, -                     local->lock.op_errno, NULL); - -        dht_lock_stack_destroy (lock_frame); -        return; -} - -int -dht_inodelk_cleanup_cbk (call_frame_t *frame, void *cookie, -                         xlator_t *this, int32_t op_ret, int32_t op_errno, -                         dict_t *xdata) -{ -        dht_inodelk_done (frame); -        return 0; -} - -int32_t -dht_lock_count (dht_lock_t **lk_array, int lk_count) -{ -        int i = 0, locked = 0; - -        if ((lk_array == NULL) || (lk_count == 0)) -                goto out; - -        for (i = 0; i < lk_count; i++) { -                if (lk_array[i]->locked) -                        locked++; -        } -out: -        return locked; -} - -void -dht_inodelk_cleanup (call_frame_t *lock_frame) -{ -        dht_lock_t  **lk_array = NULL; -        int           lk_count = 0, lk_acquired = 0; -        dht_local_t  *local    = NULL; - -        local = lock_frame->local; - -        lk_array = local->lock.locks; -        lk_count = local->lock.lk_count; - -        lk_acquired = dht_lock_count (lk_array, lk_count); -        if (lk_acquired != 0) { -                dht_unlock_inodelk (lock_frame, lk_array, lk_count, -                                    dht_inodelk_cleanup_cbk); -        } else { -                dht_inodelk_done (lock_frame); -        } - -        return; -} - -int32_t -dht_unlock_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                        int32_t op_ret, int32_t op_errno, dict_t *xdata) -{ -        dht_local_t *local                  = NULL; -        int          lk_index               = 0, call_cnt = 0; -        char         gfid[GF_UUID_BUF_SIZE] = {0}; - -        lk_index = (long) cookie; - -        local = frame->local; -        if (op_ret < 0) { -                uuid_utoa_r (local->lock.locks[lk_index]->loc.gfid, -                             gfid); - -                gf_msg (this->name, GF_LOG_WARNING, op_errno, -                        DHT_MSG_UNLOCKING_FAILED, -                        "unlocking failed on %s:%s", -                        local->lock.locks[lk_index]->xl->name, -                        gfid); -        } else { -                local->lock.locks[lk_index]->locked = 0; -        } - -        call_cnt = dht_frame_return (frame); -        if (is_last_call (call_cnt)) { -                dht_inodelk_done (frame); -        } - -        return 0; -} - -call_frame_t * -dht_lock_frame (call_frame_t *parent_frame) -{ -        call_frame_t *lock_frame = NULL; - -        lock_frame = copy_frame (parent_frame); -        if (lock_frame == NULL) -                goto out; - -        set_lk_owner_from_ptr (&lock_frame->root->lk_owner, parent_frame->root); - -out: -        return lock_frame; -} - -int32_t -dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count, -                    fop_inodelk_cbk_t inodelk_cbk) -{ -        dht_local_t     *local      = NULL; -        struct gf_flock  flock      = {0,}; -        int              ret        = -1 , i = 0; -        call_frame_t    *lock_frame = NULL; -        int              call_cnt   = 0; - -        GF_VALIDATE_OR_GOTO ("dht-locks", frame, done); -        GF_VALIDATE_OR_GOTO (frame->this->name, lk_array, done); -        GF_VALIDATE_OR_GOTO (frame->this->name, inodelk_cbk, done); - -        call_cnt = dht_lock_count (lk_array, lk_count); -        if (call_cnt == 0) { -                ret = 0; -                goto done; -        } - -        lock_frame = dht_lock_frame (frame); -        if (lock_frame == NULL) { -                gf_msg (frame->this->name, GF_LOG_WARNING, 0, -                        DHT_MSG_UNLOCKING_FAILED, -                        "cannot allocate a frame, not unlocking following " -                        "locks:"); - -                dht_log_lk_array (frame->this->name, GF_LOG_WARNING, lk_array, -                                  lk_count); -                goto done; -        } - -        ret = dht_local_lock_init (lock_frame, lk_array, lk_count, inodelk_cbk); -        if (ret < 0) { -                gf_msg (frame->this->name, GF_LOG_WARNING, 0, -                        DHT_MSG_UNLOCKING_FAILED, -                        "storing locks in local failed, not unlocking " -                        "following locks:"); - -                dht_log_lk_array (frame->this->name, GF_LOG_WARNING, lk_array, -                                  lk_count); - -                goto done; -        } - -        local = lock_frame->local; -        local->main_frame = frame; -        local->call_cnt = call_cnt; - -        flock.l_type = F_UNLCK; - -        for (i = 0; i < local->lock.lk_count; i++) { -                if (!local->lock.locks[i]->locked) -                        continue; - -                lock_frame->root->lk_owner = local->lock.locks[i]->lk_owner; -                STACK_WIND_COOKIE (lock_frame, dht_unlock_inodelk_cbk, -                                   (void *)(long)i, -                                   local->lock.locks[i]->xl, -                                   local->lock.locks[i]->xl->fops->inodelk, -                                   local->lock.locks[i]->domain, -                                   &local->lock.locks[i]->loc, F_SETLK, -                                   &flock, NULL); -                if (!--call_cnt) -                        break; -        } - -        return 0; - -done: -        if (lock_frame) -                dht_lock_stack_destroy (lock_frame); - -        /* no locks acquired, invoke inodelk_cbk */ -        if (ret == 0) -                inodelk_cbk (frame, NULL, frame->this, 0, 0, NULL); - -        return ret; -} - -int32_t -dht_nonblocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                             int32_t op_ret, int32_t op_errno, dict_t *xdata) -{ -        dht_local_t *local                   = NULL; -        int          lk_index               = 0, call_cnt = 0; -        char          gfid[GF_UUID_BUF_SIZE] = {0}; - -        local = frame->local; -        lk_index = (long) cookie; - -        if (op_ret == -1) { -                local->lock.op_ret = -1; -                local->lock.op_errno = op_errno; - -                if (local && local->lock.locks[lk_index]) { -                        uuid_utoa_r (local->lock.locks[lk_index]->loc.inode->gfid, -                                     gfid); - -                        gf_msg_debug (this->name, op_errno, -                                      "inodelk failed on gfid: %s " -                                      "subvolume: %s", gfid, -                                      local->lock.locks[lk_index]->xl->name); -                } - -                goto out; -        } - -        local->lock.locks[lk_index]->locked = _gf_true; - -out: -        call_cnt = dht_frame_return (frame); -        if (is_last_call (call_cnt)) { -                if (local->lock.op_ret < 0) { -                        dht_inodelk_cleanup (frame); -                        return 0; -                } - -                dht_inodelk_done (frame); -        } - -        return 0; -} - -int -dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                         int lk_count, fop_inodelk_cbk_t inodelk_cbk) -{ -        struct gf_flock  flock      = {0,}; -        int              i          = 0, ret = 0; -        dht_local_t     *local      = NULL; -        call_frame_t    *lock_frame = NULL; - -        GF_VALIDATE_OR_GOTO ("dht-locks", frame, out); -        GF_VALIDATE_OR_GOTO (frame->this->name, lk_array, out); -        GF_VALIDATE_OR_GOTO (frame->this->name, inodelk_cbk, out); - -        lock_frame = dht_lock_frame (frame); -        if (lock_frame == NULL) -                goto out; - -        ret = dht_local_lock_init (lock_frame, lk_array, lk_count, inodelk_cbk); -        if (ret < 0) { -                goto out; -        } - -        dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner); - -        local = lock_frame->local; -        local->main_frame = frame; - -        local->call_cnt = lk_count; - -        for (i = 0; i < lk_count; i++) { -                flock.l_type = local->lock.locks[i]->type; - -                STACK_WIND_COOKIE (lock_frame, dht_nonblocking_inodelk_cbk, -                                   (void *) (long) i, -                                   local->lock.locks[i]->xl, -                                   local->lock.locks[i]->xl->fops->inodelk, -                                   local->lock.locks[i]->domain, -                                   &local->lock.locks[i]->loc, F_SETLK, -                                   &flock, NULL); -        } - -        return 0; - -out: -        if (lock_frame) -                dht_lock_stack_destroy (lock_frame); - -        return -1; -} - -int32_t -dht_blocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                          int32_t op_ret, int32_t op_errno, dict_t *xdata) -{ -        int          lk_index                   = 0; -        int          i                          = 0; -        dht_local_t *local                      = NULL; -        char         gfid[GF_UUID_BUF_SIZE]     = {0,}; - -        lk_index = (long) cookie; - -        local = frame->local; -        if (op_ret == 0) { -                local->lock.locks[lk_index]->locked = _gf_true; -        } else { -                switch (op_errno) { -                case ESTALE: -                case ENOENT: -                        if (local->lock.reaction != IGNORE_ENOENT_ESTALE) { -                                gf_uuid_unparse (local->lock.locks[lk_index]->loc.gfid, gfid); -                                local->lock.op_ret = -1; -                                local->lock.op_errno = op_errno; -                                gf_msg (this->name, GF_LOG_ERROR, op_errno, -                                        DHT_MSG_INODELK_FAILED, -                                        "inodelk failed on subvol %s. gfid:%s", -                                        local->lock.locks[lk_index]->xl->name, -                                        gfid); -                                goto cleanup; -                        } -                        break; -                default: -                        gf_uuid_unparse (local->lock.locks[lk_index]->loc.gfid, gfid); -                        local->lock.op_ret = -1; -                        local->lock.op_errno = op_errno; -                        gf_msg (this->name, GF_LOG_ERROR, op_errno, -                                DHT_MSG_INODELK_FAILED, -                                "inodelk failed on subvol %s, gfid:%s", -                                local->lock.locks[lk_index]->xl->name, gfid); -                        goto cleanup; -                } -        } - -        if (lk_index == (local->lock.lk_count - 1)) { -                for (i = 0; (i < local->lock.lk_count) && -                     (!local->lock.locks[i]->locked); i++) -                        ; - -                if (i == local->lock.lk_count) { -                        local->lock.op_ret = -1; -                        local->lock.op_errno = op_errno; -                } - -                dht_inodelk_done (frame); -        } else { -                dht_blocking_inodelk_rec (frame, ++lk_index); -        } - -        return 0; - -cleanup: -        dht_inodelk_cleanup (frame); - -        return 0; -} - -void -dht_blocking_inodelk_rec (call_frame_t *frame, int i) -{ -        dht_local_t     *local = NULL; -        struct gf_flock  flock = {0,}; - -        local = frame->local; - -        flock.l_type = local->lock.locks[i]->type; - -        STACK_WIND_COOKIE (frame, dht_blocking_inodelk_cbk, -                           (void *) (long) i, -                           local->lock.locks[i]->xl, -                           local->lock.locks[i]->xl->fops->inodelk, -                           local->lock.locks[i]->domain, -                           &local->lock.locks[i]->loc, F_SETLKW, &flock, NULL); - -        return; -} - -int -dht_lock_request_cmp (const void *val1, const void *val2) -{ -        dht_lock_t *lock1 = NULL; -        dht_lock_t *lock2 = NULL; -        int         ret   = 0; - -        lock1 = *(dht_lock_t **)val1; -        lock2 = *(dht_lock_t **)val2; - -        GF_VALIDATE_OR_GOTO ("dht-locks", lock1, out); -        GF_VALIDATE_OR_GOTO ("dht-locks", lock2, out); - -        ret = strcmp (lock1->xl->name, lock2->xl->name); - -        if (ret == 0) { -                ret = gf_uuid_compare (lock1->loc.gfid, lock2->loc.gfid); -        } - -out: -        return ret; -} - -int -dht_lock_order_requests (dht_lock_t **locks, int count) -{ -        int        ret     = -1; - -        if (!locks || !count) -                goto out; - -        qsort (locks, count, sizeof (*locks), dht_lock_request_cmp); -        ret = 0; - -out: -        return ret; -} - -int -dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                      int lk_count, dht_reaction_type_t reaction, -                      fop_inodelk_cbk_t inodelk_cbk) -{ -        int           ret                       = -1; -        call_frame_t *lock_frame                = NULL; -        dht_local_t  *local                     = NULL; -        dht_local_t  *tmp_local                 = NULL; -        char          gfid[GF_UUID_BUF_SIZE]    = {0,}; - -        GF_VALIDATE_OR_GOTO ("dht-locks", frame, out); -        GF_VALIDATE_OR_GOTO (frame->this->name, lk_array, out); -        GF_VALIDATE_OR_GOTO (frame->this->name, inodelk_cbk, out); - -        tmp_local = frame->local; - -        lock_frame = dht_lock_frame (frame); -        if (lock_frame == NULL) { -                gf_uuid_unparse (tmp_local->loc.gfid, gfid); -                gf_msg ("dht", GF_LOG_ERROR, ENOMEM, -                        DHT_MSG_LOCK_FRAME_FAILED, -                        "memory allocation failed for lock_frame. gfid:%s" -                        " path:%s", gfid, tmp_local->loc.path); -                goto out; -        } - -        ret = dht_local_lock_init (lock_frame, lk_array, lk_count, inodelk_cbk); -        if (ret < 0) { -                gf_uuid_unparse (tmp_local->loc.gfid, gfid); -                gf_msg ("dht", GF_LOG_ERROR, ENOMEM, -                        DHT_MSG_LOCAL_LOCK_INIT_FAILED, -                        "dht_local_lock_init failed, gfid: %s path:%s", gfid, -                        tmp_local->loc.path); -                goto out; -        } - -        dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner); - -        local = lock_frame->local; -        local->lock.reaction = reaction; -        local->main_frame = frame; - -        dht_blocking_inodelk_rec (lock_frame, 0); - -        return 0; -out: -        if (lock_frame) -                dht_lock_stack_destroy (lock_frame); - -        return -1; -}  inode_t*  dht_heal_path (xlator_t *this, char *path, inode_table_t *itable)  { diff --git a/xlators/cluster/dht/src/dht-helper.h b/xlators/cluster/dht/src/dht-helper.h deleted file mode 100644 index e3ab9c4d93b..00000000000 --- a/xlators/cluster/dht/src/dht-helper.h +++ /dev/null @@ -1,19 +0,0 @@ -/* -  Copyright (c) 2008-2014 Red Hat, Inc. <http://www.redhat.com> -  This file is part of GlusterFS. - -  This file is licensed to you under your choice of the GNU Lesser -  General Public License, version 3 or any later version (LGPLv3 or -  later), or the GNU General Public License, version 2 (GPLv2), in all -  cases as published by the Free Software Foundation. -*/ -#ifndef _DHT_HELPER_H -#define _DHT_HELPER_H - -int -dht_lock_order_requests (dht_lock_t **lk_array, int count); - -void -dht_blocking_inodelk_rec (call_frame_t *frame, int i); - -#endif diff --git a/xlators/cluster/dht/src/dht-lock.c b/xlators/cluster/dht/src/dht-lock.c new file mode 100644 index 00000000000..0a198a17db4 --- /dev/null +++ b/xlators/cluster/dht/src/dht-lock.c @@ -0,0 +1,1383 @@ +/* +  Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +  This file is part of GlusterFS. + +  This file is licensed to you under your choice of the GNU Lesser +  General Public License, version 3 or any later version (LGPLv3 or +  later), or the GNU General Public License, version 2 (GPLv2), in all +  cases as published by the Free Software Foundation. +*/ + +#include "dht-lock.h" + +static char * +dht_lock_asprintf (dht_lock_t *lock) +{ +        char *lk_buf                = NULL; +        char gfid[GF_UUID_BUF_SIZE] = {0, }; + +        if (lock == NULL) +                goto out; + +        uuid_utoa_r (lock->loc.gfid, gfid); + +        gf_asprintf (&lk_buf, "%s:%s", lock->xl->name, gfid); + +out: +        return lk_buf; +} + +static void +dht_log_lk_array (char *name, gf_loglevel_t log_level, dht_lock_t **lk_array, +                  int count) +{ +        int   i      = 0; +        char *lk_buf = NULL; + +        if ((lk_array == NULL) || (count == 0)) +                goto out; + +        for (i = 0; i < count; i++) { +                lk_buf = dht_lock_asprintf (lk_array[i]); +                if (!lk_buf) +                        goto out; + +                gf_msg (name, log_level, 0, DHT_MSG_LK_ARRAY_INFO, +                        "%d. %s", i, lk_buf); +                GF_FREE (lk_buf); +        } + +out: +        return; +} + +static void +dht_lock_stack_destroy (call_frame_t *lock_frame, dht_lock_type_t lk) +{ +        dht_local_t *local = NULL; + +        local = lock_frame->local; + +        if (lk == DHT_INODELK) { +                local->lock[0].layout.my_layout.locks = NULL; +                local->lock[0].layout.my_layout.lk_count = 0; +        } else { +                local->lock[0].ns.directory_ns.locks = NULL; +                local->lock[0].ns.directory_ns.lk_count = 0; +        } + +        DHT_STACK_DESTROY (lock_frame); +        return; +} + +static void +dht_lock_free (dht_lock_t *lock) +{ +        if (lock == NULL) +                goto out; + +        loc_wipe (&lock->loc); +        GF_FREE (lock->domain); +        GF_FREE (lock->basename); +        mem_put (lock); + +out: +        return; +} + +static void +dht_set_lkowner (dht_lock_t **lk_array, int count, gf_lkowner_t *lkowner) +{ +        int i = 0; + +        if (!lk_array || !lkowner) +                goto out; + +        for (i = 0; i < count; i++) { +                lk_array[i]->lk_owner = *lkowner; +        } + +out: +        return; +} + +static int +dht_lock_request_cmp (const void *val1, const void *val2) +{ +        dht_lock_t *lock1 = NULL; +        dht_lock_t *lock2 = NULL; +        int         ret   = -1; + +        lock1 = *(dht_lock_t **)val1; +        lock2 = *(dht_lock_t **)val2; + +        GF_VALIDATE_OR_GOTO ("dht-locks", lock1, out); +        GF_VALIDATE_OR_GOTO ("dht-locks", lock2, out); + +        ret = strcmp (lock1->xl->name, lock2->xl->name); + +        if (ret == 0) { +                ret = gf_uuid_compare (lock1->loc.gfid, lock2->loc.gfid); +        } + +out: +        return ret; +} + +static int +dht_lock_order_requests (dht_lock_t **locks, int count) +{ +        int        ret     = -1; + +        if (!locks || !count) +                goto out; + +        qsort (locks, count, sizeof (*locks), dht_lock_request_cmp); +        ret = 0; + +out: +        return ret; +} + +void +dht_lock_array_free (dht_lock_t **lk_array, int count) +{ +        int            i       = 0; +        dht_lock_t    *lock    = NULL; + +        if (lk_array == NULL) +                goto out; + +        for (i = 0; i < count; i++) { +                lock = lk_array[i]; +                lk_array[i] = NULL; +                dht_lock_free (lock); +        } + +out: +        return; +} + +int32_t +dht_lock_count (dht_lock_t **lk_array, int lk_count) +{ +        int i = 0, locked = 0; + +        if ((lk_array == NULL) || (lk_count == 0)) +                goto out; + +        for (i = 0; i < lk_count; i++) { +                if (lk_array[i]->locked) +                        locked++; +        } +out: +        return locked; +} + +static call_frame_t * +dht_lock_frame (call_frame_t *parent_frame) +{ +        call_frame_t *lock_frame = NULL; + +        lock_frame = copy_frame (parent_frame); +        if (lock_frame == NULL) +                goto out; + +        set_lk_owner_from_ptr (&lock_frame->root->lk_owner, parent_frame->root); + +out: +        return lock_frame; +} + +dht_lock_t * +dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type, +              const char *domain, const char *basename) +{ +        dht_conf_t *conf = NULL; +        dht_lock_t *lock = NULL; + +        conf = this->private; + +        lock = mem_get0 (conf->lock_pool); +        if (lock == NULL) +                goto out; + +        lock->xl = xl; +        lock->type = type; + +        lock->domain = gf_strdup (domain); +        if (lock->domain == NULL) { +                dht_lock_free (lock); +                lock = NULL; +                goto out; +        } + +        if (basename) { +                lock->basename = gf_strdup (basename); +                if (lock->basename == NULL) { +                        dht_lock_free (lock); +                        lock = NULL; +                        goto out; +                } +        } + +        /* Fill only inode and gfid. +           posix and protocol/server give preference to pargfid/basename over +           gfid/inode for resolution if all the three parameters of loc_t are +           present. I want to avoid the following hypothetical situation: + +           1. rebalance did a lookup on a dentry and got a gfid. +           2. rebalance acquires lock on loc_t which was filled with gfid and +              path (pargfid/bname) from step 1. +           3. somebody deleted and recreated the same file +           4. rename on the same path acquires lock on loc_t which now points +              to a different inode (and hence gets the lock). +           5. rebalance continues to migrate file (note that not all fops done +              by rebalance during migration are inode/gfid based Eg., unlink) +           6. rename continues. +        */ +        lock->loc.inode = inode_ref (loc->inode); +        loc_gfid (loc, lock->loc.gfid); + +out: +        return lock; +} + +static int +dht_local_entrylk_init (call_frame_t *frame, dht_lock_t **lk_array, +                     int lk_count, fop_entrylk_cbk_t entrylk_cbk) +{ +        int          ret   = -1; +        dht_local_t *local = NULL; + +        local = frame->local; + +        if (local == NULL) { +                local = dht_local_init (frame, NULL, NULL, 0); +        } + +        if (local == NULL) { +                goto out; +        } + +        local->lock[0].ns.directory_ns.entrylk_cbk = entrylk_cbk; +        local->lock[0].ns.directory_ns.locks = lk_array; +        local->lock[0].ns.directory_ns.lk_count = lk_count; + +        ret = dht_lock_order_requests (local->lock[0].ns.directory_ns.locks, +                                       local->lock[0].ns.directory_ns.lk_count); +        if (ret < 0) +                goto out; + +        ret = 0; +out: +        return ret; +} + +static void +dht_entrylk_done (call_frame_t *lock_frame) +{ +        fop_entrylk_cbk_t  entrylk_cbk = NULL; +        call_frame_t      *main_frame  = NULL; +        dht_local_t       *local       = NULL; + +        local = lock_frame->local; +        main_frame = local->main_frame; + +        local->lock[0].ns.directory_ns.locks = NULL; +        local->lock[0].ns.directory_ns.lk_count = 0; + +        entrylk_cbk = local->lock[0].ns.directory_ns.entrylk_cbk; +        local->lock[0].ns.directory_ns.entrylk_cbk = NULL; + +        entrylk_cbk (main_frame, NULL, main_frame->this, +                     local->lock[0].ns.directory_ns.op_ret, +                     local->lock[0].ns.directory_ns.op_errno, NULL); + +        dht_lock_stack_destroy (lock_frame, DHT_ENTRYLK); +        return; +} + +static int32_t +dht_unlock_entrylk_done (call_frame_t *frame, void *cookie, xlator_t *this, +                          int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        dht_local_t *local                   = NULL; +        char          gfid[GF_UUID_BUF_SIZE] = {0}; + +        local = frame->local; +        gf_uuid_unparse (local->lock[0].ns.directory_ns.locks[0]->loc.inode->gfid, gfid); + +        if (op_ret < 0) { +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_PARENT_LAYOUT_CHANGED, +                        "unlock failed on gfid: %s, stale lock might be left " +                        "in DHT_LAYOUT_HEAL_DOMAIN", gfid); +        } + +        DHT_STACK_DESTROY (frame); +        return 0; +} + +static int32_t +dht_unlock_entrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                        int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        dht_local_t *local                  = NULL; +        int          lk_index               = 0, call_cnt = 0; +        char         gfid[GF_UUID_BUF_SIZE] = {0}; + +        lk_index = (long) cookie; + +        local = frame->local; + +        uuid_utoa_r (local->lock[0].ns.directory_ns.locks[lk_index]->loc.gfid, gfid); + +        if (op_ret < 0) { +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_UNLOCKING_FAILED, +                        "unlocking failed on %s:%s", +                        local->lock[0].ns.directory_ns.locks[lk_index]->xl->name, +                        gfid); +        } else { +                local->lock[0].ns.directory_ns.locks[lk_index]->locked = 0; +        } + +        call_cnt = dht_frame_return (frame); +        if (is_last_call (call_cnt)) { +                dht_entrylk_done (frame); +        } + +        return 0; +} + +static int32_t +dht_unlock_entrylk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count, +                     fop_entrylk_cbk_t entrylk_cbk) +{ +        dht_local_t     *local      = NULL; +        int              ret        = -1 , i = 0; +        call_frame_t    *lock_frame = NULL; +        int              call_cnt   = 0; + +        GF_VALIDATE_OR_GOTO ("dht-locks", frame, done); +        GF_VALIDATE_OR_GOTO (frame->this->name, lk_array, done); +        GF_VALIDATE_OR_GOTO (frame->this->name, entrylk_cbk, done); + +        call_cnt = dht_lock_count (lk_array, lk_count); +        if (call_cnt == 0) { +                ret = 0; +                goto done; +        } + +        lock_frame = dht_lock_frame (frame); +        if (lock_frame == NULL) { +                gf_msg (frame->this->name, GF_LOG_WARNING, 0, +                        DHT_MSG_UNLOCKING_FAILED, +                        "cannot allocate a frame, not unlocking following " +                        "entrylks:"); + +                dht_log_lk_array (frame->this->name, GF_LOG_WARNING, lk_array, +                                  lk_count); +                goto done; +        } + +        ret = dht_local_entrylk_init (lock_frame, lk_array, lk_count, +                                      entrylk_cbk); +        if (ret < 0) { +                gf_msg (frame->this->name, GF_LOG_WARNING, 0, +                        DHT_MSG_UNLOCKING_FAILED, +                        "storing locks in local failed, not unlocking " +                        "following entrylks:"); + +                dht_log_lk_array (frame->this->name, GF_LOG_WARNING, lk_array, +                                  lk_count); + +                goto done; +        } + +        local = lock_frame->local; +        local->main_frame = frame; +        local->call_cnt = call_cnt; + +        for (i = 0; i < local->lock[0].ns.directory_ns.lk_count; i++) { +                if (!local->lock[0].ns.directory_ns.locks[i]->locked) +                        continue; + +                lock_frame->root->lk_owner = local->lock[0].ns.directory_ns.locks[i]->lk_owner; +                STACK_WIND_COOKIE (lock_frame, dht_unlock_entrylk_cbk, +                                   (void *)(long)i, +                                   local->lock[0].ns.directory_ns.locks[i]->xl, +                                   local->lock[0].ns.directory_ns.locks[i]->xl->fops->entrylk, +                                   local->lock[0].ns.directory_ns.locks[i]->domain, +                                   &local->lock[0].ns.directory_ns.locks[i]->loc, +                                   local->lock[0].ns.directory_ns.locks[i]->basename, +                                   ENTRYLK_UNLOCK, ENTRYLK_WRLCK, NULL); +                if (!--call_cnt) +                        break; +        } + +        return 0; + +done: +        if (lock_frame) +                dht_lock_stack_destroy (lock_frame, DHT_ENTRYLK); + +        /* no locks acquired, invoke entrylk_cbk */ +        if (ret == 0) +                entrylk_cbk (frame, NULL, frame->this, 0, 0, NULL); + +        return ret; +} + +int32_t +dht_unlock_entrylk_wrapper (call_frame_t *frame, dht_elock_wrap_t *entrylk) +{ +        dht_local_t  *local                   = NULL, *lock_local = NULL; +        call_frame_t *lock_frame              = NULL; +        char          pgfid[GF_UUID_BUF_SIZE] = {0}; +        int           ret                     = 0; + +        local = frame->local; + +        if (!entrylk || !entrylk->locks) +                goto out; + +        gf_uuid_unparse (local->loc.parent->gfid, pgfid); + +        lock_frame = copy_frame (frame); +        if (lock_frame == NULL) { +                gf_msg (frame->this->name, GF_LOG_WARNING, ENOMEM, +                        DHT_MSG_PARENT_LAYOUT_CHANGED, +                        "mkdir (%s/%s) (path: %s): " +                        "copy frame failed", pgfid, local->loc.name, +                        local->loc.path); +                goto done; +        } + +        lock_local = mem_get0 (THIS->local_pool); +        if (lock_local == NULL) { +                gf_msg (frame->this->name, GF_LOG_WARNING, ENOMEM, +                        DHT_MSG_PARENT_LAYOUT_CHANGED, +                        "mkdir (%s/%s) (path: %s): " +                        "local creation failed", pgfid, local->loc.name, +                        local->loc.path); +                goto done; +        } + +        lock_frame->local = lock_local; + +        lock_local->lock[0].ns.directory_ns.locks = entrylk->locks; +        lock_local->lock[0].ns.directory_ns.lk_count = entrylk->lk_count; +        entrylk->locks = NULL; +        entrylk->lk_count = 0; + +        ret = dht_unlock_entrylk (lock_frame, +                                  lock_local->lock[0].ns.directory_ns.locks, +                                  lock_local->lock[0].ns.directory_ns.lk_count, +                                  dht_unlock_entrylk_done); +        if (ret) +                goto done; + +        lock_frame = NULL; + +done: +        if (lock_frame != NULL) { +                DHT_STACK_DESTROY (lock_frame); +        } + +out: +        return 0; +} + +static int +dht_entrylk_cleanup_cbk (call_frame_t *frame, void *cookie, +                         xlator_t *this, int32_t op_ret, int32_t op_errno, +                         dict_t *xdata) +{ +        dht_entrylk_done (frame); +        return 0; +} + +static void +dht_entrylk_cleanup (call_frame_t *lock_frame) +{ +        dht_lock_t  **lk_array = NULL; +        int           lk_count = 0, lk_acquired = 0; +        dht_local_t  *local    = NULL; + +        local = lock_frame->local; + +        lk_array = local->lock[0].ns.directory_ns.locks; +        lk_count = local->lock[0].ns.directory_ns.lk_count; + +        lk_acquired = dht_lock_count (lk_array, lk_count); +        if (lk_acquired != 0) { +                dht_unlock_entrylk (lock_frame, lk_array, lk_count, +                                     dht_entrylk_cleanup_cbk); +        } else { +                dht_entrylk_done (lock_frame); +        } + +        return; +} + + +static int32_t +dht_blocking_entrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                          int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        int          lk_index = 0; +        int          i        = 0; +        dht_local_t *local    = NULL; + +        lk_index = (long) cookie; + +        local = frame->local; +        if (op_ret == 0) { +                local->lock[0].ns.directory_ns.locks[lk_index]->locked = _gf_true; +        } else { +                switch (op_errno) { +                case ESTALE: +                case ENOENT: +                        if (local->lock[0].ns.directory_ns.reaction != IGNORE_ENOENT_ESTALE) { +                                local->lock[0].ns.directory_ns.op_ret = -1; +                                local->lock[0].ns.directory_ns.op_errno = op_errno; +                                goto cleanup; +                        } +                        break; +                default: +                        local->lock[0].ns.directory_ns.op_ret = -1; +                        local->lock[0].ns.directory_ns.op_errno = op_errno; +                        goto cleanup; +                } +        } + +        if (lk_index == (local->lock[0].ns.directory_ns.lk_count - 1)) { +                for (i = 0; (i < local->lock[0].ns.directory_ns.lk_count) && +                     (!local->lock[0].ns.directory_ns.locks[i]->locked); i++) +                        ; + +                if (i == local->lock[0].ns.directory_ns.lk_count) { +                        local->lock[0].ns.directory_ns.op_ret = -1; +                        local->lock[0].ns.directory_ns.op_errno = op_errno; +                } + +                dht_entrylk_done (frame); +        } else { +                dht_blocking_entrylk_rec (frame, ++lk_index); +        } + +        return 0; + +cleanup: +        dht_entrylk_cleanup (frame); + +        return 0; +} + +void +dht_blocking_entrylk_rec (call_frame_t *frame, int i) +{ +        dht_local_t     *local = NULL; + +        local = frame->local; + +        STACK_WIND_COOKIE (frame, dht_blocking_entrylk_cbk, +                           (void *) (long) i, +                           local->lock[0].ns.directory_ns.locks[i]->xl, +                           local->lock[0].ns.directory_ns.locks[i]->xl->fops->entrylk, +                           local->lock[0].ns.directory_ns.locks[i]->domain, +                           &local->lock[0].ns.directory_ns.locks[i]->loc, +                           local->lock[0].ns.directory_ns.locks[i]->basename, +                           ENTRYLK_LOCK, ENTRYLK_WRLCK, NULL); + +        return; +} + +int +dht_blocking_entrylk (call_frame_t *frame, dht_lock_t **lk_array, +                      int lk_count, dht_reaction_type_t reaction, +                      fop_entrylk_cbk_t entrylk_cbk) +{ +        int           ret        = -1; +        call_frame_t *lock_frame = NULL; +        dht_local_t  *local      = NULL; + +        GF_VALIDATE_OR_GOTO ("dht-locks", frame, out); +        GF_VALIDATE_OR_GOTO (frame->this->name, lk_array, out); +        GF_VALIDATE_OR_GOTO (frame->this->name, entrylk_cbk, out); + +        lock_frame = dht_lock_frame (frame); +        if (lock_frame == NULL) +                goto out; + +        ret = dht_local_entrylk_init (lock_frame, lk_array, lk_count, +                                      entrylk_cbk); +        if (ret < 0) { +                goto out; +        } + +        dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner); + +        local = lock_frame->local; +        local->lock[0].ns.directory_ns.reaction = reaction; +        local->main_frame = frame; + +        dht_blocking_entrylk_rec (lock_frame, 0); + +        return 0; +out: +        if (lock_frame) +                dht_lock_stack_destroy (lock_frame, DHT_ENTRYLK); + +        return -1; +} + +static int +dht_local_inodelk_init (call_frame_t *frame, dht_lock_t **lk_array, +                     int lk_count, fop_inodelk_cbk_t inodelk_cbk) +{ +        int          ret   = -1; +        dht_local_t *local = NULL; + +        local = frame->local; + +        if (local == NULL) { +                local = dht_local_init (frame, NULL, NULL, 0); +        } + +        if (local == NULL) { +                goto out; +        } + +        local->lock[0].layout.my_layout.inodelk_cbk = inodelk_cbk; +        local->lock[0].layout.my_layout.locks = lk_array; +        local->lock[0].layout.my_layout.lk_count = lk_count; + +        ret = dht_lock_order_requests (local->lock[0].layout.my_layout.locks, +                                       local->lock[0].layout.my_layout.lk_count); +        if (ret < 0) +                goto out; + +        ret = 0; +out: +        return ret; +} + +static void +dht_inodelk_done (call_frame_t *lock_frame) +{ +        fop_inodelk_cbk_t  inodelk_cbk = NULL; +        call_frame_t      *main_frame  = NULL; +        dht_local_t       *local       = NULL; + +        local = lock_frame->local; +        main_frame = local->main_frame; + +        local->lock[0].layout.my_layout.locks = NULL; +        local->lock[0].layout.my_layout.lk_count = 0; + +        inodelk_cbk = local->lock[0].layout.my_layout.inodelk_cbk; +        local->lock[0].layout.my_layout.inodelk_cbk = NULL; + +        inodelk_cbk (main_frame, NULL, main_frame->this, +                     local->lock[0].layout.my_layout.op_ret, +                     local->lock[0].layout.my_layout.op_errno, NULL); + +        dht_lock_stack_destroy (lock_frame, DHT_INODELK); +        return; +} + +static int32_t +dht_unlock_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                        int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        dht_local_t *local                  = NULL; +        int          lk_index               = 0, call_cnt = 0; +        char         gfid[GF_UUID_BUF_SIZE] = {0}; + +        lk_index = (long) cookie; + +        local = frame->local; +        if (op_ret < 0) { +                uuid_utoa_r (local->lock[0].layout.my_layout.locks[lk_index]->loc.gfid, +                             gfid); + +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_UNLOCKING_FAILED, +                        "unlocking failed on %s:%s", +                        local->lock[0].layout.my_layout.locks[lk_index]->xl->name, +                        gfid); +        } else { +                local->lock[0].layout.my_layout.locks[lk_index]->locked = 0; +        } + +        call_cnt = dht_frame_return (frame); +        if (is_last_call (call_cnt)) { +                dht_inodelk_done (frame); +        } + +        return 0; +} + +static int32_t +dht_unlock_inodelk_done (call_frame_t *frame, void *cookie, xlator_t *this, +                          int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        dht_local_t *local                   = NULL; +        char          gfid[GF_UUID_BUF_SIZE] = {0}; + +        local = frame->local; +        gf_uuid_unparse (local->lock[0].layout.my_layout.locks[0]->loc.inode->gfid, gfid); + +        if (op_ret < 0) { +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_PARENT_LAYOUT_CHANGED, +                        "unlock failed on gfid: %s, stale lock might be left " +                        "in DHT_LAYOUT_HEAL_DOMAIN", gfid); +        } + +        DHT_STACK_DESTROY (frame); +        return 0; +} + +int32_t +dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count, +                     fop_inodelk_cbk_t inodelk_cbk) +{ +        dht_local_t     *local      = NULL; +        struct gf_flock  flock      = {0,}; +        int              ret        = -1 , i = 0; +        call_frame_t    *lock_frame = NULL; +        int              call_cnt   = 0; + +        GF_VALIDATE_OR_GOTO ("dht-locks", frame, done); +        GF_VALIDATE_OR_GOTO (frame->this->name, lk_array, done); +        GF_VALIDATE_OR_GOTO (frame->this->name, inodelk_cbk, done); + +        call_cnt = dht_lock_count (lk_array, lk_count); +        if (call_cnt == 0) { +                ret = 0; +                goto done; +        } + +        lock_frame = dht_lock_frame (frame); +        if (lock_frame == NULL) { +                gf_msg (frame->this->name, GF_LOG_WARNING, 0, +                        DHT_MSG_UNLOCKING_FAILED, +                        "cannot allocate a frame, not unlocking following " +                        "locks:"); + +                dht_log_lk_array (frame->this->name, GF_LOG_WARNING, lk_array, +                                  lk_count); +                goto done; +        } + +        ret = dht_local_inodelk_init (lock_frame, lk_array, lk_count, +                                      inodelk_cbk); +        if (ret < 0) { +                gf_msg (frame->this->name, GF_LOG_WARNING, 0, +                        DHT_MSG_UNLOCKING_FAILED, +                        "storing locks in local failed, not unlocking " +                        "following locks:"); + +                dht_log_lk_array (frame->this->name, GF_LOG_WARNING, lk_array, +                                  lk_count); + +                goto done; +        } + +        local = lock_frame->local; +        local->main_frame = frame; +        local->call_cnt = call_cnt; + +        flock.l_type = F_UNLCK; + +        for (i = 0; i < local->lock[0].layout.my_layout.lk_count; i++) { +                if (!local->lock[0].layout.my_layout.locks[i]->locked) +                        continue; + +                lock_frame->root->lk_owner = local->lock[0].layout.my_layout.locks[i]->lk_owner; +                STACK_WIND_COOKIE (lock_frame, dht_unlock_inodelk_cbk, +                                   (void *)(long)i, +                                   local->lock[0].layout.my_layout.locks[i]->xl, +                                   local->lock[0].layout.my_layout.locks[i]->xl->fops->inodelk, +                                   local->lock[0].layout.my_layout.locks[i]->domain, +                                   &local->lock[0].layout.my_layout.locks[i]->loc, F_SETLK, +                                   &flock, NULL); +                if (!--call_cnt) +                        break; +        } + +        return 0; + +done: +        if (lock_frame) +                dht_lock_stack_destroy (lock_frame, DHT_INODELK); + +        /* no locks acquired, invoke inodelk_cbk */ +        if (ret == 0) +                inodelk_cbk (frame, NULL, frame->this, 0, 0, NULL); + +        return ret; +} + +int32_t +dht_unlock_inodelk_wrapper (call_frame_t *frame, dht_ilock_wrap_t *inodelk) +{ +        dht_local_t  *local                   = NULL, *lock_local = NULL; +        call_frame_t *lock_frame              = NULL; +        char          pgfid[GF_UUID_BUF_SIZE] = {0}; +        int           ret                     = 0; + +        local = frame->local; + +        if (!inodelk || !inodelk->locks) +                goto out; + +        gf_uuid_unparse (local->loc.parent->gfid, pgfid); + +        lock_frame = copy_frame (frame); +        if (lock_frame == NULL) { +                gf_msg (frame->this->name, GF_LOG_WARNING, ENOMEM, +                        DHT_MSG_PARENT_LAYOUT_CHANGED, +                        "mkdir (%s/%s) (path: %s): " +                        "copy frame failed", pgfid, local->loc.name, +                        local->loc.path); +                goto done; +        } + +        lock_local = mem_get0 (THIS->local_pool); +        if (lock_local == NULL) { +                gf_msg (frame->this->name, GF_LOG_WARNING, ENOMEM, +                        DHT_MSG_PARENT_LAYOUT_CHANGED, +                        "mkdir (%s/%s) (path: %s): " +                        "local creation failed", pgfid, local->loc.name, +                        local->loc.path); +                goto done; +        } + +        lock_frame->local = lock_local; + +        lock_local->lock[0].layout.my_layout.locks = inodelk->locks; +        lock_local->lock[0].layout.my_layout.lk_count = inodelk->lk_count; +        inodelk->locks = NULL; +        inodelk->lk_count = 0; + +        ret = dht_unlock_inodelk (lock_frame, +                                  lock_local->lock[0].layout.my_layout.locks, +                                  lock_local->lock[0].layout.my_layout.lk_count, +                                  dht_unlock_inodelk_done); + +        if (ret) +                goto done; + +        lock_frame = NULL; + +done: +        if (lock_frame != NULL) { +                DHT_STACK_DESTROY (lock_frame); +        } +out: +        return 0; +} + +static int +dht_inodelk_cleanup_cbk (call_frame_t *frame, void *cookie, +                         xlator_t *this, int32_t op_ret, int32_t op_errno, +                         dict_t *xdata) +{ +        dht_inodelk_done (frame); +        return 0; +} + +static void +dht_inodelk_cleanup (call_frame_t *lock_frame) +{ +        dht_lock_t  **lk_array = NULL; +        int           lk_count = 0, lk_acquired = 0; +        dht_local_t  *local    = NULL; + +        local = lock_frame->local; + +        lk_array = local->lock[0].layout.my_layout.locks; +        lk_count = local->lock[0].layout.my_layout.lk_count; + +        lk_acquired = dht_lock_count (lk_array, lk_count); +        if (lk_acquired != 0) { +                dht_unlock_inodelk (lock_frame, lk_array, lk_count, +                                     dht_inodelk_cleanup_cbk); +        } else { +                dht_inodelk_done (lock_frame); +        } + +        return; +} + +static int32_t +dht_nonblocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                             int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        dht_local_t *local                   = NULL; +        int          lk_index               = 0, call_cnt = 0; +        char          gfid[GF_UUID_BUF_SIZE] = {0}; + +        local = frame->local; +        lk_index = (long) cookie; + +        if (op_ret == -1) { +                local->lock[0].layout.my_layout.op_ret = -1; +                local->lock[0].layout.my_layout.op_errno = op_errno; + +                if (local && local->lock[0].layout.my_layout.locks[lk_index]) { +                        uuid_utoa_r (local->lock[0].layout.my_layout.locks[lk_index]->loc.inode->gfid, +                                     gfid); + +                        gf_msg_debug (this->name, op_errno, +                                      "inodelk failed on gfid: %s " +                                      "subvolume: %s", gfid, +                                      local->lock[0].layout.my_layout.locks[lk_index]->xl->name); +                } + +                goto out; +        } + +        local->lock[0].layout.my_layout.locks[lk_index]->locked = _gf_true; + +out: +        call_cnt = dht_frame_return (frame); +        if (is_last_call (call_cnt)) { +                if (local->lock[0].layout.my_layout.op_ret < 0) { +                        dht_inodelk_cleanup (frame); +                        return 0; +                } + +                dht_inodelk_done (frame); +        } + +        return 0; +} + +int +dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, +                         int lk_count, fop_inodelk_cbk_t inodelk_cbk) +{ +        struct gf_flock  flock      = {0,}; +        int              i          = 0, ret = 0; +        dht_local_t     *local      = NULL; +        call_frame_t    *lock_frame = NULL; + +        GF_VALIDATE_OR_GOTO ("dht-locks", frame, out); +        GF_VALIDATE_OR_GOTO (frame->this->name, lk_array, out); +        GF_VALIDATE_OR_GOTO (frame->this->name, inodelk_cbk, out); + +        lock_frame = dht_lock_frame (frame); +        if (lock_frame == NULL) +                goto out; + +        ret = dht_local_inodelk_init (lock_frame, lk_array, lk_count, +                                      inodelk_cbk); +        if (ret < 0) { +                goto out; +        } + +        dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner); + +        local = lock_frame->local; +        local->main_frame = frame; + +        local->call_cnt = lk_count; + +        for (i = 0; i < lk_count; i++) { +                flock.l_type = local->lock[0].layout.my_layout.locks[i]->type; + +                STACK_WIND_COOKIE (lock_frame, dht_nonblocking_inodelk_cbk, +                                   (void *) (long) i, +                                   local->lock[0].layout.my_layout.locks[i]->xl, +                                   local->lock[0].layout.my_layout.locks[i]->xl->fops->inodelk, +                                   local->lock[0].layout.my_layout.locks[i]->domain, +                                   &local->lock[0].layout.my_layout.locks[i]->loc, +                                   F_SETLK, +                                   &flock, NULL); +        } + +        return 0; + +out: +        if (lock_frame) +                dht_lock_stack_destroy (lock_frame, DHT_INODELK); + +        return -1; +} + +static int32_t +dht_blocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                          int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        int          lk_index                   = 0; +        int          i                          = 0; +        dht_local_t *local                      = NULL; +        char         gfid[GF_UUID_BUF_SIZE]     = {0,}; + +        lk_index = (long) cookie; + +        local = frame->local; +        if (op_ret == 0) { +                local->lock[0].layout.my_layout.locks[lk_index]->locked = _gf_true; +        } else { +                switch (op_errno) { +                case ESTALE: +                case ENOENT: +                        if (local->lock[0].layout.my_layout.reaction != IGNORE_ENOENT_ESTALE) { +                                gf_uuid_unparse (local->lock[0].layout.my_layout.locks[lk_index]->loc.gfid, gfid); +                                local->lock[0].layout.my_layout.op_ret = -1; +                                local->lock[0].layout.my_layout.op_errno = op_errno; +                                gf_msg (this->name, GF_LOG_ERROR, op_errno, +                                        DHT_MSG_INODELK_FAILED, +                                        "inodelk failed on subvol %s. gfid:%s", +                                        local->lock[0].layout.my_layout.locks[lk_index]->xl->name, +                                        gfid); +                                goto cleanup; +                        } +                        break; +                default: +                        gf_uuid_unparse (local->lock[0].layout.my_layout.locks[lk_index]->loc.gfid, gfid); +                        local->lock[0].layout.my_layout.op_ret = -1; +                        local->lock[0].layout.my_layout.op_errno = op_errno; +                        gf_msg (this->name, GF_LOG_ERROR, op_errno, +                                DHT_MSG_INODELK_FAILED, +                                "inodelk failed on subvol %s, gfid:%s", +                                local->lock[0].layout.my_layout.locks[lk_index]->xl->name, gfid); +                        goto cleanup; +                } +        } + +        if (lk_index == (local->lock[0].layout.my_layout.lk_count - 1)) { +                for (i = 0; (i < local->lock[0].layout.my_layout.lk_count) && +                     (!local->lock[0].layout.my_layout.locks[i]->locked); i++) +                        ; + +                if (i == local->lock[0].layout.my_layout.lk_count) { +                        local->lock[0].layout.my_layout.op_ret = -1; +                        local->lock[0].layout.my_layout.op_errno = op_errno; +                } + +                dht_inodelk_done (frame); +        } else { +                dht_blocking_inodelk_rec (frame, ++lk_index); +        } + +        return 0; + +cleanup: +        dht_inodelk_cleanup (frame); + +        return 0; +} + +void +dht_blocking_inodelk_rec (call_frame_t *frame, int i) +{ +        dht_local_t     *local = NULL; +        struct gf_flock  flock = {0,}; + +        local = frame->local; + +        flock.l_type = local->lock[0].layout.my_layout.locks[i]->type; + +        STACK_WIND_COOKIE (frame, dht_blocking_inodelk_cbk, +                           (void *) (long) i, +                           local->lock[0].layout.my_layout.locks[i]->xl, +                           local->lock[0].layout.my_layout.locks[i]->xl->fops->inodelk, +                           local->lock[0].layout.my_layout.locks[i]->domain, +                           &local->lock[0].layout.my_layout.locks[i]->loc, +                           F_SETLKW, +                           &flock, NULL); + +        return; +} + +int +dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, +                      int lk_count, dht_reaction_type_t reaction, +                      fop_inodelk_cbk_t inodelk_cbk) +{ +        int           ret                       = -1; +        call_frame_t *lock_frame                = NULL; +        dht_local_t  *local                     = NULL; +        dht_local_t  *tmp_local                 = NULL; +        char          gfid[GF_UUID_BUF_SIZE]    = {0,}; + +        GF_VALIDATE_OR_GOTO ("dht-locks", frame, out); +        GF_VALIDATE_OR_GOTO (frame->this->name, lk_array, out); +        GF_VALIDATE_OR_GOTO (frame->this->name, inodelk_cbk, out); + +        tmp_local = frame->local; + +        lock_frame = dht_lock_frame (frame); +        if (lock_frame == NULL) { +                gf_uuid_unparse (tmp_local->loc.gfid, gfid); +                gf_msg ("dht", GF_LOG_ERROR, ENOMEM, +                        DHT_MSG_LOCK_FRAME_FAILED, +                        "memory allocation failed for lock_frame. gfid:%s" +                        " path:%s", gfid, tmp_local->loc.path); +                goto out; +        } + +        ret = dht_local_inodelk_init (lock_frame, lk_array, lk_count, +                                      inodelk_cbk); +        if (ret < 0) { +                gf_uuid_unparse (tmp_local->loc.gfid, gfid); +                gf_msg ("dht", GF_LOG_ERROR, ENOMEM, +                        DHT_MSG_LOCAL_LOCK_INIT_FAILED, +                        "dht_local_lock_init failed, gfid: %s path:%s", gfid, +                        tmp_local->loc.path); +                goto out; +        } + +        dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner); + +        local = lock_frame->local; +        local->lock[0].layout.my_layout.reaction = reaction; +        local->main_frame = frame; + +        dht_blocking_inodelk_rec (lock_frame, 0); + +        return 0; +out: +        if (lock_frame) +                dht_lock_stack_destroy (lock_frame, DHT_INODELK); + +        return -1; +} + +void +dht_unlock_namespace (call_frame_t *frame, dht_dir_transaction_t *lock) +{ +        GF_VALIDATE_OR_GOTO ("dht-locks", frame, out); +        GF_VALIDATE_OR_GOTO (frame->this->name, lock, out); + +        dht_unlock_entrylk_wrapper (frame, &lock->ns.directory_ns); +        dht_unlock_inodelk_wrapper (frame, &lock->ns.parent_layout); + +out: +        return; +} + +static int32_t +dht_protect_namespace_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                           int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        dht_local_t *local    = NULL; + +        local = frame->local; +        if (op_ret != 0) +                dht_unlock_inodelk_wrapper (frame, +                                            &local->current->ns.parent_layout); + +        local->current->ns.ns_cbk (frame, cookie, this, op_ret, op_errno, +                                   xdata); +        return 0; +} + +int32_t +dht_blocking_entrylk_after_inodelk (call_frame_t *frame, void *cookie, +                                    xlator_t *this, int32_t op_ret, +                                    int32_t op_errno, dict_t *xdata) +{ +        dht_local_t           *local                   = NULL; +        int                    ret                     = -1; +        loc_t                 *loc                     = NULL; +        dht_lock_t           **lk_array                = NULL; +        char                   pgfid[GF_UUID_BUF_SIZE] = {0}; +        int                    count                   = 0; +        dht_elock_wrap_t      *entrylk                 = NULL; + +        local = frame->local; +        entrylk = &local->current->ns.directory_ns; + +        if (op_ret < 0) { +                local->op_ret = -1; +                local->op_errno = op_errno; +                goto err; +        } + +        loc = &entrylk->locks[0]->loc; +        gf_uuid_unparse (loc->gfid, pgfid); + +        local->op_ret = 0; +        lk_array = entrylk->locks; +        count = entrylk->lk_count; + +        ret = dht_blocking_entrylk (frame, lk_array, count, FAIL_ON_ANY_ERROR, +                                    dht_protect_namespace_cbk); + +        if (ret < 0) { +                local->op_ret = -1; +                local->op_errno = EIO; +                gf_msg (this->name, GF_LOG_WARNING, local->op_errno, +                        DHT_MSG_ENTRYLK_ERROR, +                        "%s (%s/%s): " +                        "dht_blocking_entrylk failed after taking inodelk", +                        gf_fop_list[local->fop], pgfid, +                        entrylk->locks[0]->basename); +                goto err; +        } + +        return 0; + +err: +        if (lk_array != NULL) { +                dht_lock_array_free (lk_array, count); +                GF_FREE (lk_array); +                entrylk->locks = NULL; +                entrylk->lk_count = 0; +        } + +        /* Unlock inodelk. No harm calling unlock twice */ +        dht_unlock_inodelk_wrapper (frame, &local->current->ns.parent_layout); +        /* Call ns_cbk. It will take care of unwinding */ +        local->current->ns.ns_cbk (frame, NULL, this, local->op_ret, +                                   local->op_errno, NULL); +        return 0; +} + +/* Given the loc and the subvol, this routine takes the inodelk on + * the parent inode and entrylk on (parent, loc->name). This routine + * is specific as it supports only one subvol on which it takes inodelk + * and then entrylk serially. + */ +int +dht_protect_namespace (call_frame_t *frame, loc_t *loc, +                       xlator_t *subvol, +                       struct dht_namespace *ns, +                       fop_entrylk_cbk_t ns_cbk) +{ +        dht_ilock_wrap_t  *inodelk                 = NULL; +        dht_elock_wrap_t  *entrylk                 = NULL; +        dht_lock_t       **lk_array                = NULL; +        dht_local_t       *local                   = NULL; +        xlator_t          *this                    = NULL; +        loc_t              parent                  = {0,}; +        int                ret                     = -1; +        char               pgfid[GF_UUID_BUF_SIZE] = {0}; +        int32_t            op_errno                = 0; +        int                count                   = 1; + +        GF_VALIDATE_OR_GOTO ("dht-locks", frame, out); +        GF_VALIDATE_OR_GOTO (frame->this->name, loc, out); +        GF_VALIDATE_OR_GOTO (frame->this->name, loc->parent, out); +        GF_VALIDATE_OR_GOTO (frame->this->name, subvol, out); + +        local = frame->local; +        this = frame->this; + +        inodelk = &ns->parent_layout; +        entrylk = &ns->directory_ns; + +        /* Initialize entrylk_cbk and parent loc */ +        ns->ns_cbk = ns_cbk; + +        ret = dht_build_parent_loc (this, &parent, loc, &op_errno); +        if (ret) { +                gf_msg (this->name, GF_LOG_ERROR, op_errno, +                        DHT_MSG_LOC_FAILED, "gfid:%s (name:%s) (path: %s): " +                        "parent loc build failed", loc->gfid, loc->name, +                         loc->path); +                goto out; +        } +        gf_uuid_unparse (parent.gfid, pgfid); + +        /* Alloc inodelk */ +        inodelk->locks = GF_CALLOC (count, sizeof (*lk_array), +                                    gf_common_mt_pointer); +        if (inodelk->locks == NULL) { +                local->op_errno = ENOMEM; +                gf_msg (this->name, GF_LOG_WARNING, local->op_errno, +                        DHT_MSG_NO_MEMORY, +                        "%s (%s/%s) (path: %s): " +                        "calloc failure", +                        gf_fop_list[local->fop], pgfid, loc->name, loc->path); +                goto out; +        } + +        inodelk->locks[0] = dht_lock_new (this, subvol, &parent, F_RDLCK, +                                        DHT_LAYOUT_HEAL_DOMAIN, NULL); +        if (inodelk->locks[0] == NULL) { +                local->op_errno = ENOMEM; +                gf_msg (this->name, GF_LOG_WARNING, local->op_errno, +                        DHT_MSG_NO_MEMORY, +                        "%s (%s/%s) (path: %s): " +                        "inodelk: lock allocation failed", +                        gf_fop_list[local->fop], pgfid, loc->name, loc->path); +                goto err; +        } +        inodelk->lk_count = count; + +        /* Allock entrylk */ +        entrylk->locks = GF_CALLOC (count, sizeof (*lk_array), +                                    gf_common_mt_pointer); +        if (entrylk->locks == NULL) { +                local->op_errno = ENOMEM; +                gf_msg (this->name, GF_LOG_WARNING, local->op_errno, +                        DHT_MSG_NO_MEMORY, +                        "%s (%s/%s) (path: %s): " +                        "entrylk: calloc failure", +                        gf_fop_list[local->fop], pgfid, loc->name, loc->path); + +                goto err; +        } + +        entrylk->locks[0] = dht_lock_new (this, subvol, &parent, F_WRLCK, +                                          DHT_ENTRY_SYNC_DOMAIN, loc->name); +        if (entrylk->locks[0] == NULL) { +                local->op_errno = ENOMEM; +                gf_msg (this->name, GF_LOG_WARNING, local->op_errno, +                        DHT_MSG_NO_MEMORY, +                        "%s (%s/%s) (path: %s): " +                        "entrylk: lock allocation failed", +                        gf_fop_list[local->fop], pgfid, loc->name, loc->path); + +                goto err; +        } +        entrylk->lk_count = count; + +        /* Take read inodelk on parent. If it is successful, take write entrylk +         * on name in cbk. +         */ +        lk_array = inodelk->locks; +        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR, +                                    dht_blocking_entrylk_after_inodelk); +        if (ret < 0) { +                local->op_errno = EIO; +                gf_msg (this->name, GF_LOG_WARNING, local->op_errno, +                        DHT_MSG_INODELK_ERROR, +                        "%s (%s/%s) (path: %s): " +                        "dht_blocking_inodelk failed", +                        gf_fop_list[local->fop], pgfid, loc->name, loc->path); +                goto err; +        } + +        loc_wipe (&parent); + +        return 0; +err: +        if (entrylk->locks != NULL) { +                dht_lock_array_free (entrylk->locks, count); +                GF_FREE (entrylk->locks); +                entrylk->locks = NULL; +                entrylk->lk_count = 0; +        } + +        if (inodelk->locks != NULL) { +                dht_lock_array_free (inodelk->locks, count); +                GF_FREE (inodelk->locks); +                inodelk->locks = NULL; +                inodelk->lk_count = 0; +        } + +        loc_wipe (&parent); +out: +        return -1; +} diff --git a/xlators/cluster/dht/src/dht-lock.h b/xlators/cluster/dht/src/dht-lock.h new file mode 100644 index 00000000000..0557858041e --- /dev/null +++ b/xlators/cluster/dht/src/dht-lock.h @@ -0,0 +1,94 @@ +/* +  Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com> +  This file is part of GlusterFS. + +  This file is licensed to you under your choice of the GNU Lesser +  General Public License, version 3 or any later version (LGPLv3 or +  later), or the GNU General Public License, version 2 (GPLv2), in all +  cases as published by the Free Software Foundation. +*/ + +#ifndef _DHT_LOCK_H +#define _DHT_LOCK_H + +#include "xlator.h" +#include "dht-common.h" + +void +dht_lock_array_free (dht_lock_t **lk_array, int count); + +int32_t +dht_lock_count (dht_lock_t **lk_array, int lk_count); + +dht_lock_t * +dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type, +              const char *domain, const char *basename); + +int32_t +dht_unlock_entrylk_wrapper (call_frame_t *, dht_elock_wrap_t *); + +void +dht_blocking_entrylk_rec (call_frame_t *frame, int i); + +int +dht_blocking_entrylk (call_frame_t *frame, dht_lock_t **lk_array, +                      int lk_count, dht_reaction_type_t reaction, +                      fop_inodelk_cbk_t entrylk_cbk); + +int32_t +dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count, +                     fop_inodelk_cbk_t inodelk_cbk); + +int32_t +dht_unlock_inodelk_wrapper (call_frame_t *, dht_ilock_wrap_t *); + +/* Acquire non-blocking inodelk on a list of xlators. + * + * @lk_array: array of lock requests lock on. + * + * @lk_count: number of locks in @lk_array + * + * @inodelk_cbk: will be called after inodelk replies are received + * + * @retval: -1 if stack_winding inodelk fails. 0 otherwise. + *          inodelk_cbk is called with appropriate error on errors. + *          On failure to acquire lock on all members of list, successful + *          locks are unlocked before invoking cbk. + */ + +int +dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, +                         int lk_count, fop_inodelk_cbk_t inodelk_cbk); + +void +dht_blocking_inodelk_rec (call_frame_t *frame, int i); + +/* same as dht_nonblocking_inodelk, but issues sequential blocking locks on + * @lk_array directly. locks are issued on some order which remains same + * for a list of xlators (irrespective of order of xlators within list). + */ + +int +dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, +                      int lk_count, dht_reaction_type_t reaction, +                      fop_inodelk_cbk_t inodelk_cbk); + +int32_t +dht_blocking_entrylk_after_inodelk (call_frame_t *frame, void *cookie, +                                    xlator_t *this, int32_t op_ret, +                                    int32_t op_errno, dict_t *xdata); + +int32_t +dht_blocking_entrylk_after_inodelk_rename (call_frame_t *frame, void *cookie, +                                           xlator_t *this, int32_t op_ret, +                                           int32_t op_errno, dict_t *xdata); + +void +dht_unlock_namespace (call_frame_t *, dht_dir_transaction_t *); + +int +dht_protect_namespace (call_frame_t *frame, loc_t *loc, xlator_t *subvol, +                       struct dht_namespace *ns, +                       fop_entrylk_cbk_t ns_cbk); + +#endif   /* _DHT_LOCK_H */ diff --git a/xlators/cluster/dht/src/dht-messages.h b/xlators/cluster/dht/src/dht-messages.h index 18859a81912..6c8430b4920 100644 --- a/xlators/cluster/dht/src/dht-messages.h +++ b/xlators/cluster/dht/src/dht-messages.h @@ -40,7 +40,7 @@   */  #define GLFS_DHT_BASE                   GLFS_MSGID_COMP_DHT -#define GLFS_DHT_NUM_MESSAGES           121 +#define GLFS_DHT_NUM_MESSAGES           124  #define GLFS_MSGID_END          (GLFS_DHT_BASE + GLFS_DHT_NUM_MESSAGES + 1)  /* Messages with message IDs */ @@ -1106,5 +1106,26 @@   */  #define DHT_MSG_LOCAL_LOCK_INIT_FAILED          (GLFS_DHT_BASE + 121) +/* + * @messageid 109122 + * @diagnosis + * @recommendedaction None + */ +#define DHT_MSG_ENTRYLK_ERROR          (GLFS_DHT_BASE + 122) + +/* + * @messageid 109123 + * @diagnosis + * @recommendedaction None + */ +#define DHT_MSG_INODELK_ERROR          (GLFS_DHT_BASE + 123) + +/* + * @messageid 109124 + * @diagnosis + * @recommendedaction None + */ +#define DHT_MSG_LOC_FAILED             (GLFS_DHT_BASE + 124) +  #define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages"  #endif /* _DHT_MESSAGES_H_ */ diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c index 53c61f8a714..c24e6ea7aca 100644 --- a/xlators/cluster/dht/src/dht-rename.c +++ b/xlators/cluster/dht/src/dht-rename.c @@ -14,11 +14,104 @@  #include "glusterfs.h"  #include "xlator.h"  #include "dht-common.h" +#include "dht-lock.h"  #include "defaults.h"  int dht_rename_unlock (call_frame_t *frame, xlator_t *this);  int +dht_rename_unlock_cbk (call_frame_t *frame, void *cookie, +                       xlator_t *this, int32_t op_ret, int32_t op_errno, +                       dict_t *xdata) +{ +        dht_local_t *local = NULL; + +        local = frame->local; + +        dht_set_fixed_dir_stat (&local->preoldparent); +        dht_set_fixed_dir_stat (&local->postoldparent); +        dht_set_fixed_dir_stat (&local->preparent); +        dht_set_fixed_dir_stat (&local->postparent); + +        if (IA_ISREG (local->stbuf.ia_type)) +                DHT_STRIP_PHASE1_FLAGS (&local->stbuf); + +        DHT_STACK_UNWIND (rename, frame, local->op_ret, local->op_errno, +                          &local->stbuf, &local->preoldparent, +                          &local->postoldparent, &local->preparent, +                          &local->postparent, local->xattr); +        return 0; +} + +static void +dht_rename_unlock_src (call_frame_t *frame, xlator_t *this) +{ +        dht_local_t *local                      = NULL; + +        local = frame->local; +        dht_unlock_namespace (frame, &local->lock[0]); +        return; +} + +static void +dht_rename_unlock_dst (call_frame_t *frame, xlator_t *this) +{ +        dht_local_t *local                      = NULL; +        int          op_ret                     = -1; +        char         src_gfid[GF_UUID_BUF_SIZE] = {0}; +        char         dst_gfid[GF_UUID_BUF_SIZE] = {0}; + +        local = frame->local; + +        /* Unlock entrylk */ +        dht_unlock_entrylk_wrapper (frame, &local->lock[1].ns.directory_ns); + +        /* Unlock inodelk */ +        op_ret = dht_unlock_inodelk (frame, +                                     local->lock[1].ns.parent_layout.locks, +                                     local->lock[1].ns.parent_layout.lk_count, +                                     dht_rename_unlock_cbk); +        if (op_ret < 0) { +                uuid_utoa_r (local->loc.inode->gfid, src_gfid); + +                if (local->loc2.inode) +                        uuid_utoa_r (local->loc2.inode->gfid, dst_gfid); + +                if (IA_ISREG (local->stbuf.ia_type)) +                        gf_msg (this->name, GF_LOG_WARNING, 0, +                                DHT_MSG_UNLOCKING_FAILED, +                                "winding unlock inodelk failed " +                                "rename (%s:%s:%s %s:%s:%s), " +                                "stale locks left on bricks", +                                local->loc.path, src_gfid, +                                local->src_cached->name, +                                local->loc2.path, dst_gfid, +                                local->dst_cached ? +                                local->dst_cached->name : NULL); +                else +                        gf_msg (this->name, GF_LOG_WARNING, 0, +                                DHT_MSG_UNLOCKING_FAILED, +                                "winding unlock inodelk failed " +                                "rename (%s:%s %s:%s), " +                                "stale locks left on bricks", +                                local->loc.path, src_gfid, +                                local->loc2.path, dst_gfid); + +                dht_rename_unlock_cbk (frame, NULL, this, 0, 0, NULL); +        } + +        return; +} + +static int +dht_rename_dir_unlock (call_frame_t *frame, xlator_t *this) +{ + +        dht_rename_unlock_src (frame, this); +        dht_rename_unlock_dst (frame, this); +        return 0; +} +int  dht_rename_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                      int32_t op_ret, int32_t op_errno, struct iatt *stbuf,                      struct iatt *preoldparent, struct iatt *postoldparent, @@ -39,7 +132,6 @@ dht_rename_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          subvol_cnt = dht_subvol_cnt (this, prev);          local->ret_cache[subvol_cnt] = op_ret; -          if (op_ret == -1) {                  gf_uuid_unparse(local->loc.inode->gfid, gfid); @@ -64,7 +156,6 @@ dht_rename_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          dht_iatt_merge (this, &local->preparent, prenewparent, prev);          dht_iatt_merge (this, &local->postparent, postnewparent, prev); -  unwind:          this_call_cnt = dht_frame_return (frame);          if (is_last_call (this_call_cnt)) { @@ -109,7 +200,7 @@ unwind:                  WIPE (&local->preparent);                  WIPE (&local->postparent); -                dht_rename_unlock (frame, this); +                dht_rename_dir_unlock (frame, this);          }          return 0; @@ -185,7 +276,7 @@ unwind:          WIPE (&local->preparent);          WIPE (&local->postparent); -        dht_rename_unlock (frame, this); +        dht_rename_dir_unlock (frame, this);          return 0;  } @@ -209,7 +300,7 @@ dht_rename_dir_do (call_frame_t *frame, xlator_t *this)          return 0;  err: -        dht_rename_unlock (frame, this); +        dht_rename_dir_unlock (frame, this);          return 0;  } @@ -283,9 +374,8 @@ err:          return 0;  } -  int -dht_rename_dir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +dht_rename_dir_lock2_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                           int32_t op_ret, int32_t op_errno, dict_t *xdata)  {          dht_local_t *local                      = NULL; @@ -305,7 +395,7 @@ dht_rename_dir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  gf_msg (this->name, GF_LOG_WARNING, op_errno,                          DHT_MSG_INODE_LK_ERROR, -                        "acquiring inodelk failed " +                        "acquiring entrylk after inodelk failed"                          "rename (%s:%s:%s %s:%s:%s)",                          local->loc.path, src_gfid, local->src_cached->name,                          local->loc2.path, dst_gfid, @@ -341,22 +431,109 @@ dht_rename_dir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  err:          /* No harm in calling an extra unlock */ -        dht_rename_unlock (frame, this); +        dht_rename_dir_unlock (frame, this);          return 0;  }  int +dht_rename_dir_lock1_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                             int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        dht_local_t *local                      = NULL; +        char         src_gfid[GF_UUID_BUF_SIZE] = {0}; +        char         dst_gfid[GF_UUID_BUF_SIZE] = {0}; +        int          ret                        = 0; +        loc_t       *loc                        = NULL; +        xlator_t    *subvol                     = NULL; + +        local = frame->local; + +        if (op_ret < 0) { +                uuid_utoa_r (local->loc.inode->gfid, src_gfid); + +                if (local->loc2.inode) +                        uuid_utoa_r (local->loc2.inode->gfid, dst_gfid); + +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_INODE_LK_ERROR, +                        "acquiring entrylk after inodelk failed" +                        "rename (%s:%s:%s %s:%s:%s)", +                        local->loc.path, src_gfid, local->src_cached->name, +                        local->loc2.path, dst_gfid, +                        local->dst_cached ? local->dst_cached->name : NULL); + +                local->op_ret = -1; +                local->op_errno = op_errno; +                goto err; +        } + +        if (local->current == &local->lock[0]) { +                loc = &local->loc2; +                subvol = local->dst_hashed; +                local->current = &local->lock[1]; +        } else { +                loc = &local->loc; +                subvol = local->src_hashed; +                local->current = &local->lock[0]; +        } +        ret = dht_protect_namespace (frame, loc, subvol, &local->current->ns, +                                     dht_rename_dir_lock2_cbk); +        if (ret < 0) { +                op_errno = EINVAL; +                goto err; +        } + +        return 0; +err: +        /* No harm in calling an extra unlock */ +        dht_rename_dir_unlock (frame, this); +        return 0; +} + +static void +dht_order_rename_lock (call_frame_t *frame, loc_t **loc, xlator_t **subvol) +{ +        dht_local_t        *local                       = NULL; +        char                src[GF_UUID_BNAME_BUF_SIZE] = {0}; +        char                dst[GF_UUID_BNAME_BUF_SIZE] = {0}; + +        local = frame->local; + +        if (local->loc.pargfid) +                uuid_utoa_r (local->loc.pargfid, src); +        else if (local->loc.parent) +                uuid_utoa_r (local->loc.parent->gfid, src); + +        strcat (src, local->loc.name); + +        if (local->loc2.pargfid) +                uuid_utoa_r (local->loc2.pargfid, dst); +        else if (local->loc2.parent) +                uuid_utoa_r (local->loc2.parent->gfid, dst); + +        strcat (dst, local->loc2.name); + +        if (strcmp(src, dst) > 0) { +                local->current = &local->lock[1]; +                *loc = &local->loc2; +                *subvol = local->dst_hashed; +        } else { +                local->current = &local->lock[0]; +                *loc = &local->loc; +                *subvol = local->src_hashed; +        } + +        return; +} + +int  dht_rename_dir (call_frame_t *frame, xlator_t *this)  {          dht_conf_t    *conf         = NULL;          dht_local_t   *local        = NULL; -        dht_lock_t   **lk_array     = NULL; -        dht_layout_t  *dst_layout   = NULL; -        xlator_t      *first_subvol = NULL; -        loc_t          parent_loc   = {0, }; -        int            count        = 1; +        loc_t         *loc          = NULL; +        xlator_t      *subvol       = NULL;          int            i            = 0; -        int            j            = 0;          int            ret          = 0;          int            op_errno     = -1; @@ -371,21 +548,7 @@ dht_rename_dir (call_frame_t *frame, xlator_t *this)                  goto err;          } -        /* We must take a lock on all the subvols with src gfid. -         * Along with this if dst exists we must take lock on -         * any one subvol with dst gfid. -         */ -        count = local->call_cnt = conf->subvolume_cnt; -        if (local->loc2.inode) { -                dst_layout = dht_layout_get (this, local->loc2.inode); -                if (dst_layout) -                        ++count; -        } else if (gf_uuid_compare (local->loc.parent->gfid, -                                    local->loc2.parent->gfid)) { -                dst_layout = dht_layout_get (this, local->loc2.parent); -                if (dst_layout) -                        ++count; -        } +        local->call_cnt = conf->subvolume_cnt;          for (i = 0; i < conf->subvolume_cnt; i++) {                  if (!conf->subvolume_status[i]) { @@ -398,89 +561,29 @@ dht_rename_dir (call_frame_t *frame, xlator_t *this)                  }          } -        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); -        if (lk_array == NULL) { -                op_errno = ENOMEM; -                goto err; -        } + +        /* Locks on src and dst needs to ordered which otherwise might cause +         * deadlocks when rename (src, dst) and rename (dst, src) is done from +         * two different clients +         */ +        dht_order_rename_lock (frame, &loc, &subvol);          /* Rename must take locks on src to avoid lookup selfheal from           * recreating src on those subvols where the rename was successful. -         * Rename must take locks on all subvols with src because selfheal -         * in entry creation phase may not have acquired lock on all subvols. -        */ -        for (i = 0; i < local->call_cnt; i++) { -                lk_array[i] = dht_lock_new (frame->this, -                                            conf->subvolumes[i], -                                            &local->loc, F_WRLCK, -                                            DHT_LAYOUT_HEAL_DOMAIN); -                if (lk_array[i] == NULL) { -                        op_errno = ENOMEM; -                        goto err; -                } -        } - -        /* If the dst exists, we are going to replace dst layout range with -         * that of src. This will lead to anomalies in dst layout until the -         * rename completes. To avoid a lookup selfheal to change dst layout -         * during this interval we take a lock on one subvol of dst. +         * The locks can't be issued parallel as two different clients might +         * attempt same rename command and be in dead lock.           */ -        for (j = 0; dst_layout && (j < dst_layout->cnt) && -                        (dst_layout->list[j].err == 0); j++) { - -                first_subvol = dst_layout->list[j].xlator; -                if (local->loc2.inode) { -                        lk_array[i] = dht_lock_new (frame->this, first_subvol, -                                                    &local->loc2, F_WRLCK, -                                                    DHT_LAYOUT_HEAL_DOMAIN); -                } else { -                        ret = dht_build_parent_loc (this, &parent_loc, -                                                    &local->loc2, &op_errno); -                        if (ret) { -                                gf_msg (this->name, GF_LOG_ERROR, ENOMEM, -                                        DHT_MSG_NO_MEMORY, -                                        "parent loc build failed"); -                                goto err; -                        } - -                        lk_array[i] = dht_lock_new (frame->this, first_subvol, -                                                    &parent_loc, F_WRLCK, -                                                    DHT_LAYOUT_HEAL_DOMAIN); -                } - -                if (lk_array[i] == NULL) { -                        op_errno = ENOMEM; -                        goto err; -                } -                break; -        } - -        if (!lk_array[i]) -                --count; - -        local->lock.locks = lk_array; -        local->lock.lk_count = count; - -        ret = dht_blocking_inodelk (frame, lk_array, count, -                                    IGNORE_ENOENT_ESTALE, -                                    dht_rename_dir_lock_cbk); +        ret = dht_protect_namespace (frame, loc, subvol, +                                     &local->current->ns, +                                     dht_rename_dir_lock1_cbk);          if (ret < 0) { -                local->lock.locks = NULL; -                local->lock.lk_count = 0;                  op_errno = EINVAL;                  goto err;          } -        loc_wipe (&parent_loc);          return 0;  err: -        if (lk_array != NULL) { -                dht_lock_array_free (lk_array, count); -                GF_FREE (lk_array); -        } - -        loc_wipe (&parent_loc);          op_errno = (op_errno == -1) ? errno : op_errno;          DHT_STACK_UNWIND (rename, frame, -1, op_errno, NULL, NULL, NULL, NULL,                            NULL, NULL); @@ -581,29 +684,6 @@ dht_rename_track_for_changelog (xlator_t *this, dict_t *xattr,                  }                                                            \          } while (0) -int -dht_rename_unlock_cbk (call_frame_t *frame, void *cookie, -                       xlator_t *this, int32_t op_ret, int32_t op_errno, -                       dict_t *xdata) -{ -        dht_local_t *local = NULL; - -        local = frame->local; - -        dht_set_fixed_dir_stat (&local->preoldparent); -        dht_set_fixed_dir_stat (&local->postoldparent); -        dht_set_fixed_dir_stat (&local->preparent); -        dht_set_fixed_dir_stat (&local->postparent); - -        if (IA_ISREG (local->stbuf.ia_type)) -                DHT_STRIP_PHASE1_FLAGS (&local->stbuf); - -        DHT_STACK_UNWIND (rename, frame, local->op_ret, local->op_errno, -                          &local->stbuf, &local->preoldparent, -                          &local->postoldparent, &local->preparent, -                          &local->postparent, local->xattr); -        return 0; -}  int  dht_rename_unlock (call_frame_t *frame, xlator_t *this) @@ -614,8 +694,9 @@ dht_rename_unlock (call_frame_t *frame, xlator_t *this)          char         dst_gfid[GF_UUID_BUF_SIZE] = {0};          local = frame->local; -        op_ret = dht_unlock_inodelk (frame, local->lock.locks, -                                     local->lock.lk_count, +        op_ret = dht_unlock_inodelk (frame, +                                     local->lock[0].layout.parent_layout.locks, +                                     local->lock[0].layout.parent_layout.lk_count,                                       dht_rename_unlock_cbk);          if (op_ret < 0) {                  uuid_utoa_r (local->loc.inode->gfid, src_gfid); @@ -1446,13 +1527,14 @@ dht_rename_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto done;          } -        local->call_cnt = local->lock.lk_count; +        local->call_cnt = local->lock[0].layout.parent_layout.lk_count; -        for (i = 0; i < local->lock.lk_count; i++) { +        for (i = 0; i < local->lock[0].layout.parent_layout.lk_count; i++) {                  STACK_WIND (frame, dht_rename_lookup_cbk, -                            local->lock.locks[i]->xl, -                            local->lock.locks[i]->xl->fops->lookup, -                            &local->lock.locks[i]->loc, xattr_req); +                            local->lock[0].layout.parent_layout.locks[i]->xl, +                            local->lock[0].layout.parent_layout.locks[i]->xl->fops->lookup, +                            &local->lock[0].layout.parent_layout.locks[i]->loc, +                            xattr_req);          }          dict_unref (xattr_req); @@ -1482,31 +1564,31 @@ dht_rename_lock (call_frame_t *frame)          if (local->dst_cached)                  count++; -        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); +        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_pointer);          if (lk_array == NULL)                  goto err;          lk_array[0] = dht_lock_new (frame->this, local->src_cached, &local->loc, -                                    F_WRLCK, DHT_FILE_MIGRATE_DOMAIN); +                                    F_WRLCK, DHT_FILE_MIGRATE_DOMAIN, NULL);          if (lk_array[0] == NULL)                  goto err;          if (local->dst_cached) {                  lk_array[1] = dht_lock_new (frame->this, local->dst_cached,                                              &local->loc2, F_WRLCK, -                                            DHT_FILE_MIGRATE_DOMAIN); +                                            DHT_FILE_MIGRATE_DOMAIN, NULL);                  if (lk_array[1] == NULL)                          goto err;          } -        local->lock.locks = lk_array; -        local->lock.lk_count = count; +        local->lock[0].layout.parent_layout.locks = lk_array; +        local->lock[0].layout.parent_layout.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count,                                      FAIL_ON_ANY_ERROR, dht_rename_lock_cbk);          if (ret < 0) { -                local->lock.locks = NULL; -                local->lock.lk_count = 0; +                local->lock[0].layout.parent_layout.locks = NULL; +                local->lock[0].layout.parent_layout.lk_count = 0;                  goto err;          } diff --git a/xlators/cluster/dht/src/dht-selfheal.c b/xlators/cluster/dht/src/dht-selfheal.c index 0838a627521..de9d30c047f 100644 --- a/xlators/cluster/dht/src/dht-selfheal.c +++ b/xlators/cluster/dht/src/dht-selfheal.c @@ -13,6 +13,7 @@  #include "xlator.h"  #include "dht-common.h"  #include "dht-messages.h" +#include "dht-lock.h"  #include "glusterfs-acl.h"  #define DHT_SET_LAYOUT_RANGE(layout,i,srt,chunk,path)    do {           \ @@ -85,7 +86,13 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret,          int           lock_count = 0;          local = frame->local; -        lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count); + +        /* Unlock entrylk */ +        dht_unlock_entrylk_wrapper (frame, &local->lock[0].ns.directory_ns); + +        /* Unlock inodelk */ +        lock_count = dht_lock_count (local->lock[0].ns.parent_layout.locks, +                                     local->lock[0].ns.parent_layout.lk_count);          if (lock_count == 0)                  goto done; @@ -100,14 +107,15 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret,                  goto done;          } -        lock_local->lock.locks = local->lock.locks; -        lock_local->lock.lk_count = local->lock.lk_count; +        lock_local->lock[0].ns.parent_layout.locks = local->lock[0].ns.parent_layout.locks; +        lock_local->lock[0].ns.parent_layout.lk_count = local->lock[0].ns.parent_layout.lk_count; -        local->lock.locks = NULL; -        local->lock.lk_count = 0; +        local->lock[0].ns.parent_layout.locks = NULL; +        local->lock[0].ns.parent_layout.lk_count = 0; -        dht_unlock_inodelk (lock_frame, lock_local->lock.locks, -                            lock_local->lock.lk_count, +        dht_unlock_inodelk (lock_frame, +                            lock_local->lock[0].ns.parent_layout.locks, +                            lock_local->lock[0].ns.parent_layout.lk_count,                              dht_selfheal_unlock_cbk);          lock_frame = NULL; @@ -579,7 +587,8 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,                          lk_array[i] = dht_lock_new (frame->this,                                                      conf->subvolumes[i],                                                      &local->loc, F_WRLCK, -                                                    DHT_LAYOUT_HEAL_DOMAIN); +                                                    DHT_LAYOUT_HEAL_DOMAIN, +                                                    NULL);                          if (lk_array[i] == NULL) {                                  gf_uuid_unparse (local->stbuf.ia_gfid, gfid);                                  gf_msg (THIS->name, GF_LOG_ERROR, ENOMEM, @@ -604,7 +613,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,                  lk_array[0] = dht_lock_new (frame->this, local->hashed_subvol,                                              &local->loc, F_WRLCK, -                                            DHT_LAYOUT_HEAL_DOMAIN); +                                            DHT_LAYOUT_HEAL_DOMAIN, NULL);                  if (lk_array[0] == NULL) {                          gf_uuid_unparse (local->stbuf.ia_gfid, gfid);                          gf_msg (THIS->name, GF_LOG_ERROR, ENOMEM, @@ -615,14 +624,14 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,                  }          } -        local->lock.locks = lk_array; -        local->lock.lk_count = count; +        local->lock[0].layout.my_layout.locks = lk_array; +        local->lock[0].layout.my_layout.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,                                      dht_selfheal_layout_lock_cbk);          if (ret < 0) { -                local->lock.locks = NULL; -                local->lock.lk_count = 0; +                local->lock[0].layout.my_layout.locks = NULL; +                local->lock[0].layout.my_layout.lk_count = 0;                  goto err;          } @@ -1454,8 +1463,8 @@ dht_selfheal_dir_mkdir_lock_cbk (call_frame_t *frame, void *cookie,                  }                  gf_msg (this->name, GF_LOG_WARNING, op_errno, -                        DHT_MSG_INODE_LK_ERROR, -                        "acquiring inodelk failed for %s", +                        DHT_MSG_ENTRYLK_ERROR, +                        "acquiring entrylk after inodelk failed for %s",                          local->loc.path);                  local->op_errno = op_errno; @@ -1487,15 +1496,9 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,          int           missing_dirs = 0;          int           i     = 0;          int           ret   = -1; -        int           count = 1;          dht_local_t  *local = NULL; -        dht_conf_t   *conf  = NULL; -        xlator_t     *this = NULL; -        dht_lock_t   **lk_array = NULL;          local = frame->local; -        this = frame->this; -        conf = this->private;          local->selfheal.force_mkdir = force;          local->selfheal.hole_cnt = 0; @@ -1511,44 +1514,16 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,                  return 0;          } -        count = conf->subvolume_cnt; - -        /* Locking on all subvols in the mkdir phase of lookup selfheal is -           is done to synchronize with rmdir/rename. -        */ -        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); -        if (lk_array == NULL) -                goto err; - -        for (i = 0; i < count; i++) { -                lk_array[i] = dht_lock_new (frame->this, -                                            conf->subvolumes[i], -                                            &local->loc, F_WRLCK, -                                            DHT_LAYOUT_HEAL_DOMAIN); -                if (lk_array[i] == NULL) -                        goto err; -        } - -        local->lock.locks = lk_array; -        local->lock.lk_count = count; - -        ret = dht_blocking_inodelk (frame, lk_array, count, -                                    IGNORE_ENOENT_ESTALE, -                                    dht_selfheal_dir_mkdir_lock_cbk); +        local->current = &local->lock[0]; +        ret = dht_protect_namespace (frame, loc, local->hashed_subvol, +                                     &local->current->ns, +                                     dht_selfheal_dir_mkdir_lock_cbk); -        if (ret < 0) { -                local->lock.locks = NULL; -                local->lock.lk_count = 0; +        if (ret < 0)                  goto err; -        }          return 0;  err: -        if (lk_array != NULL) { -                dht_lock_array_free (lk_array, count); -                GF_FREE (lk_array); -        } -          return -1;  } @@ -2379,9 +2354,9 @@ dht_update_commit_hash_for_layout_unlock (call_frame_t *frame, xlator_t *this)          local = frame->local; -        ret = dht_unlock_inodelk (frame, local->lock.locks, -                                  local->lock.lk_count, -                                  dht_update_commit_hash_for_layout_done); +        ret = dht_unlock_inodelk (frame, local->lock[0].layout.my_layout.locks, +                                   local->lock[0].layout.my_layout.lk_count, +                                   dht_update_commit_hash_for_layout_done);          if (ret < 0) {                  /* preserve oldest error, just ... */                  if (!local->op_ret) { @@ -2614,19 +2589,19 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)                  lk_array[i] = dht_lock_new (frame->this,                                              conf->local_subvols[i],                                              &local->loc, F_WRLCK, -                                            DHT_LAYOUT_HEAL_DOMAIN); +                                            DHT_LAYOUT_HEAL_DOMAIN, NULL);                  if (lk_array[i] == NULL)                          goto err;          } -        local->lock.locks = lk_array; -        local->lock.lk_count = count; +        local->lock[0].layout.my_layout.locks = lk_array; +        local->lock[0].layout.my_layout.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,                                      dht_update_commit_hash_for_layout_resume);          if (ret < 0) { -                local->lock.locks = NULL; -                local->lock.lk_count = 0; +                local->lock[0].layout.my_layout.locks = NULL; +                local->lock[0].layout.my_layout.lk_count = 0;                  goto err;          }  | 
