diff options
| author | Sakshi <sabansal@redhat.com> | 2015-07-16 14:31:03 +0530 | 
|---|---|---|
| committer | Raghavendra G <rgowdapp@redhat.com> | 2015-08-27 06:25:34 -0700 | 
| commit | 9e51aa646fdc5840b6fa9b12b35c5cc2af274c3c (patch) | |
| tree | a7ff968e51340b9e590805bc721fd8ae43c4b3ad | |
| parent | cf3d6f14ae031ba2f5269cea6dbf80e60d00cce5 (diff) | |
dht : lock on subvols to prevent lookup vs rmdir race
There is a possibility that while an rmdir is completed on
some non-hashed subvol and proceeding to others. A lookup
selfheal can recreate the same directory on those subvols
for which the rmdir had succeeded. The fix is to take a
blocking inodelk on the subvols before starting rmdir.
Since selfheal requires lock on all subvols, if an rmdir
is in progess acquiring locks will fail and vice versa.
Change-Id: I841a44758c3b88f5e04d1cb73ad36e0cac9fdabb
BUG: 1245065
Signed-off-by: Sakshi <sabansal@redhat.com>
Reviewed-on: http://review.gluster.org/11725
Tested-by: NetBSD Build System <jenkins@build.gluster.org>
Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.c | 180 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 14 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-helper.c | 38 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-rename.c | 2 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-selfheal.c | 181 | 
5 files changed, 331 insertions, 84 deletions
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c index 1f67d660d15..9de1550f53b 100644 --- a/xlators/cluster/dht/src/dht-common.c +++ b/xlators/cluster/dht/src/dht-common.c @@ -35,6 +35,10 @@ int  dht_setxattr2 (xlator_t *this, xlator_t *subvol, call_frame_t *frame);  int +dht_rmdir_unlock (call_frame_t *frame, xlator_t *this); + + +int  dht_aggregate_quota_xattr (dict_t *dst, char *key, data_t *value)  {          int              ret            = -1; @@ -4524,7 +4528,6 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,                   * corresponding hashed subvolume will take care of the                   * directory entry.                   */ -                          if (readdir_optimize) {                                  if (prev->this == local->first_up_subvol)                                          goto list; @@ -5009,7 +5012,7 @@ out:          if (local && local->lock.locks) {                  /* store op_errno for failure case*/                  local->op_errno = op_errno; -                local->refresh_layout_unlock (frame, this, op_ret); +                local->refresh_layout_unlock (frame, this, op_ret, 0);                  if (op_ret == 0) {                          DHT_STACK_UNWIND (mknod, frame, op_ret, op_errno, @@ -5054,7 +5057,7 @@ dht_mknod_linkfile_create_cbk (call_frame_t *frame, void *cookie,          return 0;  err:          if (local->lock.locks) -                local->refresh_layout_unlock (frame, this, -1); +                local->refresh_layout_unlock (frame, this, -1, 0);          return 0;  } @@ -5159,7 +5162,7 @@ dht_mknod_do (call_frame_t *frame)                                           local->umask, local->params);          return 0;  err: -        local->refresh_layout_unlock (frame, this, -1); +        local->refresh_layout_unlock (frame, this, -1, 0);          return 0;  } @@ -5174,7 +5177,7 @@ dht_mknod_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  }  int32_t -dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret) +dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret, int invoke_cbk)  {          dht_local_t  *local      = NULL, *lock_local = NULL;          call_frame_t *lock_frame = NULL; @@ -5249,7 +5252,7 @@ dht_mknod_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        dht_mknod_finish (frame, this, -1); +        dht_mknod_finish (frame, this, -1, 0);          return 0;  } @@ -5280,7 +5283,7 @@ dht_mknod_lock (call_frame_t *frame, xlator_t *subvol)          local->lock.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count, -                                    dht_mknod_lock_cbk); +                                    IGNORE_ENOENT_ESTALE, dht_mknod_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL; @@ -5808,7 +5811,7 @@ out:          if (local && local->lock.locks) {                  /* store op_errno for failure case*/                  local->op_errno = op_errno; -                local->refresh_layout_unlock (frame, this, op_ret); +                local->refresh_layout_unlock (frame, this, op_ret, 0);                  if (op_ret == 0) {                          DHT_STACK_UNWIND (create, frame, op_ret, op_errno, fd, @@ -5849,7 +5852,7 @@ dht_create_linkfile_create_cbk (call_frame_t *frame, void *cookie,          return 0;  err:          if (local->lock.locks) -                local->refresh_layout_unlock (frame, this, -1); +                local->refresh_layout_unlock (frame, this, -1, 0);          return 0;  } @@ -6013,7 +6016,7 @@ dht_create_do (call_frame_t *frame)                                           local->umask, local->fd, local->params);          return 0;  err: -        local->refresh_layout_unlock (frame, this, -1); +        local->refresh_layout_unlock (frame, this, -1, 0);          return 0;  } @@ -6027,7 +6030,7 @@ dht_create_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  }  int32_t -dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret) +dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret, int invoke_cbk)  {          dht_local_t  *local      = NULL, *lock_local = NULL;          call_frame_t *lock_frame = NULL; @@ -6102,7 +6105,7 @@ dht_create_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        dht_create_finish (frame, this, -1); +        dht_create_finish (frame, this, -1, 0);          return 0;  } @@ -6133,7 +6136,7 @@ dht_create_lock (call_frame_t *frame, xlator_t *subvol)          local->lock.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count, -                                    dht_create_lock_cbk); +                                    IGNORE_ENOENT_ESTALE, dht_create_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL; @@ -6593,6 +6596,7 @@ unlock:          this_call_cnt = dht_frame_return (frame);          if (is_last_call (this_call_cnt)) {                 if (local->need_selfheal) { +                        dht_rmdir_unlock (frame, this);                          local->layout =                                  dht_layout_get (this, local->loc.inode); @@ -6616,6 +6620,7 @@ unlock:                                                             1);                          } +                        dht_rmdir_unlock (frame, this);                          DHT_STACK_UNWIND (rmdir, frame, local->op_ret,                                            local->op_errno, &local->preparent,                                            &local->postparent, NULL); @@ -6684,6 +6689,7 @@ unlock:          if (done) {                  if (local->need_selfheal && local->fop_succeeded) { +                        dht_rmdir_unlock (frame, this);                          local->layout =                                  dht_layout_get (this, local->loc.inode); @@ -6718,6 +6724,7 @@ unlock:                          } +                        dht_rmdir_unlock (frame, this);                          DHT_STACK_UNWIND (rmdir, frame, local->op_ret,                                            local->op_errno, &local->preparent,                                            &local->postparent, NULL); @@ -6729,11 +6736,110 @@ unlock:  int +dht_rmdir_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                      int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        DHT_STACK_DESTROY (frame); +        return 0; +} + + +int +dht_rmdir_unlock (call_frame_t *frame, xlator_t *this) +{ +        dht_local_t  *local      = NULL, *lock_local = NULL; +        call_frame_t *lock_frame = NULL; +        int           lock_count = 0; + +        local = frame->local; +        lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count); + +        if (lock_count == 0) +                goto done; + +        lock_frame = copy_frame (frame); +        if (lock_frame == NULL) +                goto done; + +        lock_local = dht_local_init (lock_frame, &local->loc, NULL, +                                     lock_frame->root->op); +        if (lock_local == NULL) +                goto done; + +        lock_local->lock.locks = local->lock.locks; +        lock_local->lock.lk_count = local->lock.lk_count; + +        local->lock.locks = NULL; +        local->lock.lk_count = 0; +        dht_unlock_inodelk (lock_frame, lock_local->lock.locks, +                            lock_local->lock.lk_count, +                            dht_rmdir_unlock_cbk); +        lock_frame = NULL; + +done: +        if (lock_frame != NULL) { +                DHT_STACK_DESTROY (lock_frame); +        } + +        return 0; +} + + +int +dht_rmdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        dht_local_t  *local = NULL; +        dht_conf_t   *conf  = NULL; +        int           i     = 0; + +        VALIDATE_OR_GOTO (this->private, err); + +        conf = this->private; +        local = frame->local; + +        if (op_ret < 0) { +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_INODE_LK_ERROR, +                        "acquiring inodelk failed rmdir for %s)", +                        local->loc.path); + +                local->op_ret = -1; +                local->op_errno = (op_errno == EAGAIN) ? EBUSY : op_errno; +                goto err; +        } + +        for (i = 0; i < conf->subvolume_cnt; i++) { +                if (local->hashed_subvol && +                    (local->hashed_subvol == conf->subvolumes[i])) +                        continue; + +                STACK_WIND (frame, dht_rmdir_cbk, +                            conf->subvolumes[i], +                            conf->subvolumes[i]->fops->rmdir, +                            &local->loc, local->flags, NULL); +        } + +        return 0; + +err: +        /* No harm in calling an extra rmdir unlock */ +        dht_rmdir_unlock (frame, this); +        DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno, +                          &local->preparent, &local->postparent, NULL); + +        return 0; +} + + +int  dht_rmdir_do (call_frame_t *frame, xlator_t *this)  {          dht_local_t  *local = NULL;          dht_conf_t   *conf = NULL; -        int           i = 0; +        dht_lock_t   **lk_array = NULL; +        int           i = 0, ret = -1; +        int           count = 1;          xlator_t     *hashed_subvol = NULL;          char gfid[GF_UUID_BUF_SIZE] ={0}; @@ -6747,7 +6853,6 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)          local->call_cnt = conf->subvolume_cnt; -          /* first remove from non-hashed_subvol */          hashed_subvol = dht_subvol_get_hashed (this, &local->loc); @@ -6771,20 +6876,49 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)                  return 0;          } -        for (i = 0; i < conf->subvolume_cnt; i++) { -                if (hashed_subvol && -                    (hashed_subvol == conf->subvolumes[i])) -                        continue; +        count = conf->subvolume_cnt; -                STACK_WIND (frame, dht_rmdir_cbk, -                            conf->subvolumes[i], -                            conf->subvolumes[i]->fops->rmdir, -                            &local->loc, local->flags, NULL); +        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); +        if (lk_array == NULL) { +                local->op_ret = -1; +                local->op_errno = ENOMEM; +                goto err; +        } + +        for (i = 0; i < count; i++) { +                lk_array[i] = dht_lock_new (frame->this, +                                            conf->subvolumes[i], +                                            &local->loc, F_WRLCK, +                                            DHT_LAYOUT_HEAL_DOMAIN); +                if (lk_array[i] == NULL) { +                        local->op_ret = -1; +                        local->op_errno = EINVAL; +                        goto err; +                } +        } + +        local->lock.locks = lk_array; +        local->lock.lk_count = count; + +        ret = dht_blocking_inodelk (frame, lk_array, count, +                                    IGNORE_ENOENT_ESTALE, +                                    dht_rmdir_lock_cbk); +        if (ret < 0) { +                local->lock.locks = NULL; +                local->lock.lk_count = 0; +                local->op_ret = -1; +                local->op_errno = errno ? errno : EINVAL; +                goto err;          }          return 0;  err: +        if (lk_array != NULL) { +                dht_lock_array_free (lk_array, count); +                GF_FREE (lk_array); +        } +          DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,                            &local->preparent, &local->postparent, NULL);          return 0; diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index 6bf8ba1c406..53eb34a9e90 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -40,7 +40,7 @@ typedef int (*dht_defrag_cbk_fn_t) (xlator_t        *this, xlator_t *dst_node,                                      call_frame_t    *frame);  typedef int (*dht_refresh_layout_unlock) (call_frame_t *frame, xlator_t *this, -                                         int op_ret); +                                         int op_ret, int invoke_cbk);  typedef int (*dht_refresh_layout_done_handle) (call_frame_t *frame); @@ -131,6 +131,11 @@ typedef enum {          qdstatfs_action_COMPARE,  } qdstatfs_action_t; +typedef enum { +        FAIL_ON_ANY_ERROR, +        IGNORE_ENOENT_ESTALE +} dht_reaction_type_t; +  struct dht_skip_linkto_unlink {          gf_boolean_t    handle_valid_link; @@ -261,6 +266,7 @@ struct dht_local {                  fop_inodelk_cbk_t   inodelk_cbk;                  dht_lock_t        **locks;                  int                 lk_count; +                dht_reaction_type_t reaction;                  /* whether locking failed on _any_ of the "locks" above */                  int                 op_ret; @@ -1042,7 +1048,8 @@ dht_fill_dict_to_avoid_unlink_of_migrating_file (dict_t *dict);  int  dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                         int lk_count, fop_inodelk_cbk_t inodelk_cbk); +                         int lk_count, dht_reaction_type_t reaction, +                         fop_inodelk_cbk_t inodelk_cbk);  /* same as dht_nonblocking_inodelk, but issues sequential blocking locks on   * @lk_array directly. locks are issued on some order which remains same @@ -1050,7 +1057,8 @@ dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,   */  int  dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                      int lk_count, fop_inodelk_cbk_t inodelk_cbk); +                      int lk_count, dht_reaction_type_t reaction, +                      fop_inodelk_cbk_t inodelk_cbk);  int32_t  dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count, diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c index c242cb2f2fe..7107a085762 100644 --- a/xlators/cluster/dht/src/dht-helper.c +++ b/xlators/cluster/dht/src/dht-helper.c @@ -342,6 +342,7 @@ dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type,          lock->xl = xl;          lock->type = type; +          lock->domain = gf_strdup (domain);          if (lock->domain == NULL) {                  dht_lock_free (lock); @@ -1679,7 +1680,8 @@ out:  int  dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                         int lk_count, fop_inodelk_cbk_t inodelk_cbk) +                         int lk_count, dht_reaction_type_t reaction, +                         fop_inodelk_cbk_t inodelk_cbk)  {          struct gf_flock  flock      = {0,};          int              i          = 0, ret = 0; @@ -1702,6 +1704,7 @@ dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,          dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner);          local = lock_frame->local; +        local->lock.reaction = reaction;          local->main_frame = frame;          local->call_cnt = lk_count; @@ -1732,21 +1735,42 @@ dht_blocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                            int32_t op_ret, int32_t op_errno, dict_t *xdata)  {          int          lk_index = 0; +        int          i        = 0;          dht_local_t *local    = NULL;          lk_index = (long) cookie;          local = frame->local; -          if (op_ret == 0) {                  local->lock.locks[lk_index]->locked = _gf_true;          } else { -                local->lock.op_ret = -1; -                local->lock.op_errno = op_errno; -                goto cleanup; +                switch (op_errno) { +                case ESTALE: +                case ENOENT: +                        if (local->lock.reaction != IGNORE_ENOENT_ESTALE) { +                                local->lock.op_ret = -1; +                                local->lock.op_errno = op_errno; +                                goto cleanup; +                        } +                        break; +                default: +                        local->lock.op_ret = -1; +                        local->lock.op_errno = op_errno; +                        goto cleanup; +                }          }          if (lk_index == (local->lock.lk_count - 1)) { +                for (i = 0; (i < local->lock.lk_count) && +                     (!local->lock.locks[i]->locked); i++) { +                        ; +                } + +                if (i == local->lock.lk_count) { +                        local->lock.op_ret = -1; +                        local->lock.op_errno = op_errno; +                } +                  dht_inodelk_done (frame);          } else {                  dht_blocking_inodelk_rec (frame, ++lk_index); @@ -1820,7 +1844,8 @@ out:  int  dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                      int lk_count, fop_inodelk_cbk_t inodelk_cbk) +                      int lk_count, dht_reaction_type_t reaction, +                      fop_inodelk_cbk_t inodelk_cbk)  {          int           ret        = -1;          call_frame_t *lock_frame = NULL; @@ -1842,6 +1867,7 @@ dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,          dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner);          local = lock_frame->local; +        local->lock.reaction = reaction;          local->main_frame = frame;          dht_blocking_inodelk_rec (lock_frame, 0); diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c index bde0ce83439..311dbf2d495 100644 --- a/xlators/cluster/dht/src/dht-rename.c +++ b/xlators/cluster/dht/src/dht-rename.c @@ -1303,7 +1303,7 @@ dht_rename_lock (call_frame_t *frame)          local->lock.lk_count = count;          ret = dht_nonblocking_inodelk (frame, lk_array, count, -                                       dht_rename_lock_cbk); +                                       FAIL_ON_ANY_ERROR, dht_rename_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL;                  local->lock.lk_count = 0; diff --git a/xlators/cluster/dht/src/dht-selfheal.c b/xlators/cluster/dht/src/dht-selfheal.c index 42ab822a701..0e84a38082f 100644 --- a/xlators/cluster/dht/src/dht-selfheal.c +++ b/xlators/cluster/dht/src/dht-selfheal.c @@ -77,7 +77,7 @@ dht_selfheal_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  }  int -dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret) +dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret, int invoke_cbk)  {          dht_local_t  *local      = NULL, *lock_local = NULL;          call_frame_t *lock_frame = NULL; @@ -85,7 +85,6 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)          local = frame->local;          lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count); -          if (lock_count == 0)                  goto done; @@ -112,8 +111,9 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)          lock_frame = NULL;  done: -        local->selfheal.dir_cbk (frame, NULL, frame->this, ret, -                                 local->op_errno, NULL); +        if (!invoke_cbk) +                local->selfheal.dir_cbk (frame, NULL, frame->this, ret, +                                         local->op_errno, NULL);          if (lock_frame != NULL) {                  DHT_STACK_DESTROY (lock_frame);          } @@ -155,13 +155,13 @@ dht_refresh_layout_done (call_frame_t *frame)                  dht_layout_unref (frame->this, heal); -                dht_selfheal_dir_finish (frame, frame->this, 0); +                dht_selfheal_dir_finish (frame, frame->this, 0, 0);          }          return 0;  err: -        dht_selfheal_dir_finish (frame, frame->this, -1); +        dht_selfheal_dir_finish (frame, frame->this, -1, 0);          return 0;  } @@ -219,8 +219,9 @@ unlock:          return 0;  err: -        local->refresh_layout_unlock (frame, this, -1); +        local->refresh_layout_unlock (frame, this, -1, 0); +        dht_selfheal_dir_finish (frame, this, -1, 0);          return 0;  } @@ -286,7 +287,8 @@ dht_refresh_layout (call_frame_t *frame)          return 0;  out: -        local->refresh_layout_unlock (frame, this, -1); +        local->refresh_layout_unlock (frame, this, -1, 0); +        dht_selfheal_dir_finish (frame, this, -1, 0);          return 0;  } @@ -314,7 +316,7 @@ dht_selfheal_layout_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        dht_selfheal_dir_finish (frame, this, -1); +        dht_selfheal_dir_finish (frame, this, -1, 0);          return 0;  } @@ -575,7 +577,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,          local->lock.locks = lk_array;          local->lock.lk_count = count; -        ret = dht_blocking_inodelk (frame, lk_array, count, +        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,                                      dht_selfheal_layout_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL; @@ -586,13 +588,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,          return 0;  err:          if (lk_array != NULL) { -                int tmp_count = 0, i = 0; - -                for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) { -                        ; -                } - -                dht_lock_array_free (lk_array, tmp_count); +                dht_lock_array_free (lk_array, count);                  GF_FREE (lk_array);          } @@ -631,7 +627,7 @@ dht_selfheal_dir_xattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          this_call_cnt = dht_frame_return (frame);          if (is_last_call (this_call_cnt)) { -                dht_selfheal_dir_finish (frame, this, 0); +                dht_selfheal_dir_finish (frame, this, 0, 0);          }          return 0; @@ -827,7 +823,7 @@ dht_selfheal_dir_xattr (call_frame_t *frame, loc_t *loc, dht_layout_t *layout)                        missing_xattr, loc->path);          if (missing_xattr == 0) { -                dht_selfheal_dir_finish (frame, this, 0); +                dht_selfheal_dir_finish (frame, this, 0, 0);                  return 0;          } @@ -954,7 +950,7 @@ dht_selfheal_dir_xattr_for_nameless_lookup (call_frame_t *frame, loc_t *loc,                        missing_xattr, loc->path);          if (missing_xattr == 0) { -                dht_selfheal_dir_finish (frame, this, 0); +                dht_selfheal_dir_finish (frame, this, 0, 0);                  return 0;          } @@ -1022,7 +1018,7 @@ dht_selfheal_dir_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                                                  dht_should_heal_layout);                  if (ret < 0) { -                        dht_selfheal_dir_finish (frame, this, -1); +                        dht_selfheal_dir_finish (frame, this, -1, 0);                  }          } @@ -1053,7 +1049,7 @@ dht_selfheal_dir_setattr (call_frame_t *frame, loc_t *loc, struct iatt *stbuf,                                                  dht_should_heal_layout);                  if (ret < 0) { -                        dht_selfheal_dir_finish (frame, this, -1); +                        dht_selfheal_dir_finish (frame, this, -1, 0);                  }                  return 0; @@ -1091,7 +1087,7 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          dht_layout_t  *layout = NULL;          call_frame_t  *prev = NULL;          xlator_t      *subvol = NULL; -        int            i = 0; +        int            i = 0, ret = -1;          int            this_call_cnt = 0;          char           gfid[GF_UUID_BUF_SIZE] = {0}; @@ -1110,7 +1106,6 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          }          if (op_ret) { -                  gf_uuid_unparse(local->loc.gfid, gfid);                  gf_msg (this->name, ((op_errno == EEXIST) ? GF_LOG_DEBUG :                                       GF_LOG_WARNING), @@ -1121,11 +1116,13 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          }          dht_iatt_merge (this, &local->preparent, preparent, prev->this);          dht_iatt_merge (this, &local->postparent, postparent, prev->this); +        ret = 0;  out:          this_call_cnt = dht_frame_return (frame);          if (is_last_call (this_call_cnt)) { +                dht_selfheal_dir_finish (frame, this, ret, -1);                  dht_selfheal_dir_setattr (frame, &local->loc, &local->stbuf, 0xffffff, layout);          } @@ -1178,32 +1175,33 @@ out:  }  int -dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc, -                        dht_layout_t *layout, int force) +dht_selfheal_dir_mkdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                                 int32_t op_ret, int32_t op_errno, dict_t *xdata)  { -        int           missing_dirs = 0; +        dht_local_t  *local = NULL;          int           i     = 0;          int           ret   = -1; -        dht_local_t  *local = NULL; -        xlator_t     *this = NULL;          dict_t       *dict = NULL; +        dht_layout_t  *layout = NULL; +        loc_t        *loc   = NULL; -        local = frame->local; -        this = frame->this; +        VALIDATE_OR_GOTO (this->private, err); -        local->selfheal.force_mkdir = force ? _gf_true : _gf_false; +        local = frame->local; +        layout = local->layout; +        loc    = &local->loc; -        for (i = 0; i < layout->cnt; i++) { -                if (layout->list[i].err == ENOENT || force) -                        missing_dirs++; -        } +        if (op_ret < 0) { +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_INODE_LK_ERROR, +                        "acquiring inodelk failed for %s", +                        loc->path); -        if (missing_dirs == 0) { -                dht_selfheal_dir_setattr (frame, loc, &local->stbuf, 0xffffffff, layout); -                return 0; +                local->op_ret = -1; +                local->op_errno = (op_errno == EAGAIN) ? EBUSY : op_errno; +                goto err;          } -        local->call_cnt = missing_dirs;          if (!gf_uuid_is_null (local->gfid)) {                  dict = dict_new ();                  if (!dict) @@ -1217,6 +1215,7 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,                                  " key = gfid-req", loc->path);          } else if (local->params) {                  /* Send the dictionary from higher layers directly */ +                  dict = dict_ref (local->params);          }          /* Set acls */ @@ -1228,8 +1227,18 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,                          DHT_MSG_DICT_SET_FAILED,                          "dict is NULL, need to make sure gfids are same"); + +        /* We don't have to do a lookup here again: +            1) Parallel rmdir would had removed the directory and locking would +               have anyway failed with an ESTALE on all subvols. Hence selfheal +               will never create the directory. +            2) Parallel lookup creating directory does not have to be mutually +               exclusive for the mkdir phase of lookup selfheal. +        */ +          for (i = 0; i < layout->cnt; i++) { -                if (layout->list[i].err == ENOENT || force) { +                if (layout->list[i].err == ENOENT || +                    local->selfheal.force_mkdir) {                          gf_msg_debug (this->name, 0,                                        "Creating directory %s on subvol %s",                                        loc->path, layout->list[i].xlator->name); @@ -1248,6 +1257,82 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,                  dict_unref (dict);          return 0; + +err: +        dht_selfheal_dir_finish (frame, this, -1, 0); +        return 0; +} + +int +dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc, +                        dht_layout_t *layout, int force) +{ +        int           missing_dirs = 0; +        int           i     = 0; +        int           ret   = -1; +        int           count = 1; +        dht_local_t  *local = NULL; +        dht_conf_t   *conf  = NULL; +        xlator_t     *this = NULL; +        dht_lock_t   **lk_array = NULL; + +        local = frame->local; +        this = frame->this; +        conf = this->private; + +        local->selfheal.force_mkdir = force ? _gf_true : _gf_false; + +        for (i = 0; i < layout->cnt; i++) { +                if (layout->list[i].err == ENOENT || force) +                        missing_dirs++; +        } + +        if (missing_dirs == 0) { +                dht_selfheal_dir_setattr (frame, loc, &local->stbuf, +                                          0xffffffff, layout); +                return 0; +        } + +        local->call_cnt = missing_dirs; +        count = conf->subvolume_cnt; + +        /* Locking on all subvols in the mkdir phase of lookup selfheal is +           is done to synchronize with rmdir/rename. +        */ +        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); +        if (lk_array == NULL) +                goto err; + +        for (i = 0; i < count; i++) { +                lk_array[i] = dht_lock_new (frame->this, +                                            conf->subvolumes[i], +                                            &local->loc, F_WRLCK, +                                            DHT_LAYOUT_HEAL_DOMAIN); +                if (lk_array[i] == NULL) +                        goto err; +        } + +        local->lock.locks = lk_array; +        local->lock.lk_count = count; + +        ret = dht_blocking_inodelk (frame, lk_array, count, +                                    IGNORE_ENOENT_ESTALE, +                                    dht_selfheal_dir_mkdir_lock_cbk); + +        if (ret < 0) { +                local->lock.locks = NULL; +                local->lock.lk_count = 0; +                goto err; +        } + +        return 0; +err: +        if (lk_array != NULL) { +                dht_lock_array_free (lk_array, count); +                GF_FREE (lk_array); +        } + +        return -1;  }  int @@ -1819,7 +1904,7 @@ dht_selfheal_directory (call_frame_t *frame, dht_selfheal_dir_cbk_t dir_cbk,  sorry_no_fix:          /* TODO: need to put appropriate local->op_errno */ -        dht_selfheal_dir_finish (frame, this, ret); +        dht_selfheal_dir_finish (frame, this, ret, 0);          return 0;  } @@ -1887,7 +1972,7 @@ dht_selfheal_directory_for_nameless_lookup (call_frame_t *frame,  sorry_no_fix:          /* TODO: need to put appropriate local->op_errno */ -        dht_selfheal_dir_finish (frame, this, ret); +        dht_selfheal_dir_finish (frame, this, ret, 0);          return 0; @@ -2240,7 +2325,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)          local->lock.locks = lk_array;          local->lock.lk_count = count; -        ret = dht_blocking_inodelk (frame, lk_array, count, +        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,                                      dht_update_commit_hash_for_layout_resume);          if (ret < 0) {                  local->lock.locks = NULL; @@ -2251,13 +2336,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)          return 0;  err:          if (lk_array != NULL) { -                int tmp_count = 0, i = 0; - -                for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) { -                        ; -                } - -                dht_lock_array_free (lk_array, tmp_count); +                dht_lock_array_free (lk_array, count);                  GF_FREE (lk_array);          }  | 
