diff options
| author | Sakshi <sabansal@redhat.com> | 2015-07-16 14:31:03 +0530 | 
|---|---|---|
| committer | Raghavendra G <rgowdapp@redhat.com> | 2016-04-06 03:08:06 -0700 | 
| commit | b9c37234e0933c836e1bfdb72607b592ea5080c4 (patch) | |
| tree | d2a2f859d4ef179ef28d3a772af1553306e2765b | |
| parent | 5eabe98861d265abf4d82a783db0568e958ecdcd (diff) | |
dht: lock on subvols to prevent lookup vs rmdir race
There is a possibility that while an rmdir is completed on
some non-hashed subvol and proceeding to others, a lookup
selfheal can recreate the same directory on those subvols
for which the rmdir had succeeded. Now the deletion of the
parent directory will fail with an ENOTEMPTY.
To fix this take blocking inodelk on the subvols before
starting rmdir. Selfheal must also take blocking inodelk
before creating the entry.
Backport of http://review.gluster.org/13528
> Change-Id: I168a195c35ac1230ba7124d3b0ca157755b3df96
> BUG: 1245065
> Signed-off-by: Sakshi <sabansal@redhat.com>
> Reviewed-on: http://review.gluster.org/13528
> CentOS-regression: Gluster Build System <jenkins@build.gluster.com>
> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
> Smoke: Gluster Build System <jenkins@build.gluster.com>
> Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
> Tested-by: Raghavendra G <rgowdapp@redhat.com>
Change-Id: I168a195c35ac1230ba7124d3b0ca157755b3df96
BUG: 1257894
Signed-off-by: Sakshi <sabansal@redhat.com>
Reviewed-on: http://review.gluster.org/13915
Smoke: Gluster Build System <jenkins@build.gluster.com>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
CentOS-regression: Gluster Build System <jenkins@build.gluster.com>
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.c | 192 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 11 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-helper.c | 33 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-rename.c | 2 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-selfheal.c | 285 | 
5 files changed, 435 insertions, 88 deletions
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c index 7755eba9887..36244e7eaac 100644 --- a/xlators/cluster/dht/src/dht-common.c +++ b/xlators/cluster/dht/src/dht-common.c @@ -60,6 +60,9 @@ int32_t dht_set_fixed_dir_stat (struct iatt *stat)  int +dht_rmdir_unlock (call_frame_t *frame, xlator_t *this); + +int  dht_aggregate_quota_xattr (dict_t *dst, char *key, data_t *value)  {          int              ret            = -1; @@ -4673,6 +4676,10 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,                  if (IA_ISINVAL(orig_entry->d_stat.ia_type)) {                          /*stat failed somewhere- ignore this entry*/ +                        gf_msg_debug (this->name, EINVAL, +                                      "Invalid stat, ignoring entry " +                                      "%s gfid %s", orig_entry->d_name, +                                      uuid_utoa (orig_entry->d_stat.ia_gfid));                          continue;                  } @@ -4685,7 +4692,6 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,                   * corresponding hashed subvolume will take care of the                   * directory entry.                   */ -                          if (readdir_optimize) {                                  if (prev->this == local->first_up_subvol)                                          goto list; @@ -5203,7 +5209,7 @@ out:          if (local && local->lock.locks) {                  /* store op_errno for failure case*/                  local->op_errno = op_errno; -                local->refresh_layout_unlock (frame, this, op_ret); +                local->refresh_layout_unlock (frame, this, op_ret, 1);                  if (op_ret == 0) {                          DHT_STACK_UNWIND (mknod, frame, op_ret, op_errno, @@ -5261,7 +5267,7 @@ dht_mknod_linkfile_create_cbk (call_frame_t *frame, void *cookie,          return 0;  err:          if (local && local->lock.locks) { -                local->refresh_layout_unlock (frame, this, -1); +                local->refresh_layout_unlock (frame, this, -1, 1);          } else {                  DHT_STACK_UNWIND (mknod, frame, -1,                                    op_errno, NULL, NULL, NULL, @@ -5369,7 +5375,7 @@ dht_mknod_do (call_frame_t *frame)                                           local->umask, local->params);          return 0;  err: -        local->refresh_layout_unlock (frame, this, -1); +        local->refresh_layout_unlock (frame, this, -1, 1);          return 0;  } @@ -5384,7 +5390,8 @@ dht_mknod_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  }  int32_t -dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret) +dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret, +                  int invoke_cbk)  {          dht_local_t  *local      = NULL, *lock_local = NULL;          call_frame_t *lock_frame = NULL; @@ -5459,7 +5466,7 @@ dht_mknod_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        dht_mknod_finish (frame, this, -1); +        dht_mknod_finish (frame, this, -1, 0);          return 0;  } @@ -5490,7 +5497,7 @@ dht_mknod_lock (call_frame_t *frame, xlator_t *subvol)          local->lock.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count, -                                    dht_mknod_lock_cbk); +                                    IGNORE_ENOENT_ESTALE, dht_mknod_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL; @@ -6030,7 +6037,7 @@ out:          if (local && local->lock.locks) {                  /* store op_errno for failure case*/                  local->op_errno = op_errno; -                local->refresh_layout_unlock (frame, this, op_ret); +                local->refresh_layout_unlock (frame, this, op_ret, 1);                  if (op_ret == 0) {                          DHT_STACK_UNWIND (create, frame, op_ret, op_errno, fd, @@ -6089,7 +6096,7 @@ dht_create_linkfile_create_cbk (call_frame_t *frame, void *cookie,  err:          if (local && local->lock.locks) { -                local->refresh_layout_unlock (frame, this, -1); +                local->refresh_layout_unlock (frame, this, -1, 1);          } else {                  DHT_STACK_UNWIND (create, frame, -1,                                    op_errno, NULL, NULL, NULL, @@ -6256,7 +6263,7 @@ dht_create_do (call_frame_t *frame)                                           local->umask, local->fd, local->params);          return 0;  err: -        local->refresh_layout_unlock (frame, this, -1); +        local->refresh_layout_unlock (frame, this, -1, 1);          return 0;  } @@ -6270,7 +6277,8 @@ dht_create_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  }  int32_t -dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret) +dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret, +                   int invoke_cbk)  {          dht_local_t  *local      = NULL, *lock_local = NULL;          call_frame_t *lock_frame = NULL; @@ -6345,7 +6353,7 @@ dht_create_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        dht_create_finish (frame, this, -1); +        dht_create_finish (frame, this, -1, 0);          return 0;  } @@ -6376,7 +6384,7 @@ dht_create_lock (call_frame_t *frame, xlator_t *subvol)          local->lock.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count, -                                    dht_create_lock_cbk); +                                    IGNORE_ENOENT_ESTALE, dht_create_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL; @@ -6800,8 +6808,8 @@ dht_rmdir_selfheal_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  int  dht_rmdir_hashed_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -               int op_ret, int op_errno, struct iatt *preparent, -               struct iatt *postparent, dict_t *xdata) +                             int op_ret, int op_errno, struct iatt *preparent, +                             struct iatt *postparent, dict_t *xdata)  {          dht_local_t  *local = NULL;          dht_conf_t   *conf = NULL; @@ -6821,7 +6829,8 @@ dht_rmdir_hashed_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          local->op_errno = op_errno;                          local->op_ret   = -1;                          if (conf->subvolume_cnt != 1) { -                                if (op_errno != ENOENT && op_errno != EACCES) { +                                if (op_errno != ENOENT && op_errno != EACCES +                                    && op_errno != ESTALE) {                                          local->need_selfheal = 1;                                  }                          } @@ -6845,6 +6854,7 @@ unlock:          this_call_cnt = dht_frame_return (frame);          if (is_last_call (this_call_cnt)) {                 if (local->need_selfheal) { +                        dht_rmdir_unlock (frame, this);                          local->layout =                                  dht_layout_get (this, local->loc.inode); @@ -6871,6 +6881,7 @@ unlock:                          dht_set_fixed_dir_stat (&local->preparent);                          dht_set_fixed_dir_stat (&local->postparent); +                        dht_rmdir_unlock (frame, this);                          DHT_STACK_UNWIND (rmdir, frame, local->op_ret,                                            local->op_errno, &local->preparent,                                            &local->postparent, NULL); @@ -6939,6 +6950,7 @@ unlock:          if (done) {                  if (local->need_selfheal && local->fop_succeeded) { +                        dht_rmdir_unlock (frame, this);                          local->layout =                                  dht_layout_get (this, local->loc.inode); @@ -6976,6 +6988,7 @@ unlock:                          dht_set_fixed_dir_stat (&local->preparent);                          dht_set_fixed_dir_stat (&local->postparent); +                        dht_rmdir_unlock (frame, this);                          DHT_STACK_UNWIND (rmdir, frame, local->op_ret,                                            local->op_errno, &local->preparent,                                            &local->postparent, NULL); @@ -6987,11 +7000,110 @@ unlock:  int +dht_rmdir_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                      int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        DHT_STACK_DESTROY (frame); +        return 0; +} + + +int +dht_rmdir_unlock (call_frame_t *frame, xlator_t *this) +{ +        dht_local_t  *local      = NULL, *lock_local = NULL; +        call_frame_t *lock_frame = NULL; +        int           lock_count = 0; + +        local = frame->local; +        lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count); + +        if (lock_count == 0) +                goto done; + +        lock_frame = copy_frame (frame); +        if (lock_frame == NULL) +                goto done; + +        lock_local = dht_local_init (lock_frame, &local->loc, NULL, +                                     lock_frame->root->op); +        if (lock_local == NULL) +                goto done; + +        lock_local->lock.locks = local->lock.locks; +        lock_local->lock.lk_count = local->lock.lk_count; + +        local->lock.locks = NULL; +        local->lock.lk_count = 0; +        dht_unlock_inodelk (lock_frame, lock_local->lock.locks, +                            lock_local->lock.lk_count, +                            dht_rmdir_unlock_cbk); +        lock_frame = NULL; + +done: +        if (lock_frame != NULL) { +                DHT_STACK_DESTROY (lock_frame); +        } + +        return 0; +} + + +int +dht_rmdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        dht_local_t  *local = NULL; +        dht_conf_t   *conf  = NULL; +        int           i     = 0; + +        VALIDATE_OR_GOTO (this->private, err); + +        conf = this->private; +        local = frame->local; + +        if (op_ret < 0) { +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_INODE_LK_ERROR, +                        "acquiring inodelk failed rmdir for %s)", +                        local->loc.path); + +                local->op_ret = -1; +                local->op_errno = op_errno; +                goto err; +        } + +        for (i = 0; i < conf->subvolume_cnt; i++) { +                if (local->hashed_subvol && +                    (local->hashed_subvol == conf->subvolumes[i])) +                        continue; + +                STACK_WIND (frame, dht_rmdir_cbk, +                            conf->subvolumes[i], +                            conf->subvolumes[i]->fops->rmdir, +                            &local->loc, local->flags, NULL); +        } + +        return 0; + +err: +        /* No harm in calling an extra rmdir unlock */ +        dht_rmdir_unlock (frame, this); +        DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno, +                          &local->preparent, &local->postparent, NULL); + +        return 0; +} + + +int  dht_rmdir_do (call_frame_t *frame, xlator_t *this)  {          dht_local_t  *local = NULL;          dht_conf_t   *conf = NULL; -        int           i = 0; +        dht_lock_t   **lk_array = NULL; +        int           i = 0, ret = -1; +        int           count = 1;          xlator_t     *hashed_subvol = NULL;          char gfid[GF_UUID_BUF_SIZE] ={0}; @@ -7005,7 +7117,6 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)          local->call_cnt = conf->subvolume_cnt; -          /* first remove from non-hashed_subvol */          hashed_subvol = dht_subvol_get_hashed (this, &local->loc); @@ -7029,15 +7140,39 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)                  return 0;          } -        for (i = 0; i < conf->subvolume_cnt; i++) { -                if (hashed_subvol && -                    (hashed_subvol == conf->subvolumes[i])) -                        continue; +        count = conf->subvolume_cnt; -                STACK_WIND (frame, dht_rmdir_cbk, -                            conf->subvolumes[i], -                            conf->subvolumes[i]->fops->rmdir, -                            &local->loc, local->flags, NULL); +        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); +        if (lk_array == NULL) { +                local->op_ret = -1; +                local->op_errno = ENOMEM; +                goto err; +        } + +        for (i = 0; i < count; i++) { +                lk_array[i] = dht_lock_new (frame->this, +                                            conf->subvolumes[i], +                                            &local->loc, F_WRLCK, +                                            DHT_LAYOUT_HEAL_DOMAIN); +                if (lk_array[i] == NULL) { +                        local->op_ret = -1; +                        local->op_errno = EINVAL; +                        goto err; +                } +        } + +        local->lock.locks = lk_array; +        local->lock.lk_count = count; + +        ret = dht_blocking_inodelk (frame, lk_array, count, +                                    IGNORE_ENOENT_ESTALE, +                                    dht_rmdir_lock_cbk); +        if (ret < 0) { +                local->lock.locks = NULL; +                local->lock.lk_count = 0; +                local->op_ret = -1; +                local->op_errno = errno ? errno : EINVAL; +                goto err;          }          return 0; @@ -7046,6 +7181,11 @@ err:          dht_set_fixed_dir_stat (&local->preparent);          dht_set_fixed_dir_stat (&local->postparent); +        if (lk_array != NULL) { +                dht_lock_array_free (lk_array, count); +                GF_FREE (lk_array); +        } +          DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,                            &local->preparent, &local->postparent, NULL);          return 0; diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index edfb80566c0..d06224c6b68 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -50,7 +50,7 @@ typedef int (*dht_defrag_cbk_fn_t) (xlator_t        *this, xlator_t *dst_node,                                      call_frame_t    *frame, int ret);  typedef int (*dht_refresh_layout_unlock) (call_frame_t *frame, xlator_t *this, -                                         int op_ret); +                                         int op_ret, int invoke_cbk);  typedef int (*dht_refresh_layout_done_handle) (call_frame_t *frame); @@ -145,6 +145,11 @@ typedef enum {          qdstatfs_action_COMPARE,  } qdstatfs_action_t; +typedef enum { +        FAIL_ON_ANY_ERROR, +        IGNORE_ENOENT_ESTALE +} dht_reaction_type_t; +  struct dht_skip_linkto_unlink {          gf_boolean_t    handle_valid_link; @@ -275,6 +280,7 @@ struct dht_local {                  fop_inodelk_cbk_t   inodelk_cbk;                  dht_lock_t        **locks;                  int                 lk_count; +                dht_reaction_type_t reaction;                  /* whether locking failed on _any_ of the "locks" above */                  int                 op_ret; @@ -1132,7 +1138,8 @@ dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,   */  int  dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                      int lk_count, fop_inodelk_cbk_t inodelk_cbk); +                      int lk_count, dht_reaction_type_t reaction, +                      fop_inodelk_cbk_t inodelk_cbk);  int32_t  dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count, diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c index df31cdbb047..881db81c262 100644 --- a/xlators/cluster/dht/src/dht-helper.c +++ b/xlators/cluster/dht/src/dht-helper.c @@ -496,6 +496,7 @@ dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type,          lock->xl = xl;          lock->type = type; +          lock->domain = gf_strdup (domain);          if (lock->domain == NULL) {                  dht_lock_free (lock); @@ -1978,21 +1979,41 @@ dht_blocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                            int32_t op_ret, int32_t op_errno, dict_t *xdata)  {          int          lk_index = 0; +        int          i        = 0;          dht_local_t *local    = NULL;          lk_index = (long) cookie;          local = frame->local; -          if (op_ret == 0) {                  local->lock.locks[lk_index]->locked = _gf_true;          } else { -                local->lock.op_ret = -1; -                local->lock.op_errno = op_errno; -                goto cleanup; +                switch (op_errno) { +                case ESTALE: +                case ENOENT: +                        if (local->lock.reaction != IGNORE_ENOENT_ESTALE) { +                                local->lock.op_ret = -1; +                                local->lock.op_errno = op_errno; +                                goto cleanup; +                        } +                        break; +                default: +                        local->lock.op_ret = -1; +                        local->lock.op_errno = op_errno; +                        goto cleanup; +                }          }          if (lk_index == (local->lock.lk_count - 1)) { +                for (i = 0; (i < local->lock.lk_count) && +                     (!local->lock.locks[i]->locked); i++) +                        ; + +                if (i == local->lock.lk_count) { +                        local->lock.op_ret = -1; +                        local->lock.op_errno = op_errno; +                } +                  dht_inodelk_done (frame);          } else {                  dht_blocking_inodelk_rec (frame, ++lk_index); @@ -2066,7 +2087,8 @@ out:  int  dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                      int lk_count, fop_inodelk_cbk_t inodelk_cbk) +                      int lk_count, dht_reaction_type_t reaction, +                      fop_inodelk_cbk_t inodelk_cbk)  {          int           ret        = -1;          call_frame_t *lock_frame = NULL; @@ -2088,6 +2110,7 @@ dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,          dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner);          local = lock_frame->local; +        local->lock.reaction = reaction;          local->main_frame = frame;          dht_blocking_inodelk_rec (lock_frame, 0); diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c index 79b87069d2b..3b636c529a2 100644 --- a/xlators/cluster/dht/src/dht-rename.c +++ b/xlators/cluster/dht/src/dht-rename.c @@ -1320,7 +1320,7 @@ dht_rename_lock (call_frame_t *frame)          local->lock.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count, -                                    dht_rename_lock_cbk); +                                    FAIL_ON_ANY_ERROR, dht_rename_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL;                  local->lock.lk_count = 0; diff --git a/xlators/cluster/dht/src/dht-selfheal.c b/xlators/cluster/dht/src/dht-selfheal.c index fd553030212..307116ae618 100644 --- a/xlators/cluster/dht/src/dht-selfheal.c +++ b/xlators/cluster/dht/src/dht-selfheal.c @@ -82,7 +82,8 @@ dht_selfheal_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  }  int -dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret) +dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret, +                         int invoke_cbk)  {          dht_local_t  *local      = NULL, *lock_local = NULL;          call_frame_t *lock_frame = NULL; @@ -90,7 +91,6 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)          local = frame->local;          lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count); -          if (lock_count == 0)                  goto done; @@ -117,8 +117,9 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)          lock_frame = NULL;  done: -        local->selfheal.dir_cbk (frame, NULL, frame->this, ret, -                                 local->op_errno, NULL); +        if (invoke_cbk) +                local->selfheal.dir_cbk (frame, NULL, frame->this, ret, +                                         local->op_errno, NULL);          if (lock_frame != NULL) {                  DHT_STACK_DESTROY (lock_frame);          } @@ -160,13 +161,13 @@ dht_refresh_layout_done (call_frame_t *frame)                  dht_layout_unref (frame->this, heal); -                dht_selfheal_dir_finish (frame, frame->this, 0); +                dht_selfheal_dir_finish (frame, frame->this, 0, 1);          }          return 0;  err: -        dht_selfheal_dir_finish (frame, frame->this, -1); +        dht_selfheal_dir_finish (frame, frame->this, -1, 1);          return 0;  } @@ -226,8 +227,7 @@ unlock:          return 0;  err: -        local->refresh_layout_unlock (frame, this, -1); - +        local->refresh_layout_unlock (frame, this, -1, 1);          return 0;  } @@ -293,7 +293,7 @@ dht_refresh_layout (call_frame_t *frame)          return 0;  out: -        local->refresh_layout_unlock (frame, this, -1); +        local->refresh_layout_unlock (frame, this, -1, 1);          return 0;  } @@ -322,7 +322,7 @@ dht_selfheal_layout_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        dht_selfheal_dir_finish (frame, this, -1); +        dht_selfheal_dir_finish (frame, this, -1, 1);          return 0;  } @@ -583,7 +583,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,          local->lock.locks = lk_array;          local->lock.lk_count = count; -        ret = dht_blocking_inodelk (frame, lk_array, count, +        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,                                      dht_selfheal_layout_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL; @@ -594,13 +594,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,          return 0;  err:          if (lk_array != NULL) { -                int tmp_count = 0, i = 0; - -                for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) { -                        ; -                } - -                dht_lock_array_free (lk_array, tmp_count); +                dht_lock_array_free (lk_array, count);                  GF_FREE (lk_array);          } @@ -653,7 +647,7 @@ dht_selfheal_dir_xattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          this_call_cnt = dht_frame_return (frame);          if (is_last_call (this_call_cnt)) { -                dht_selfheal_dir_finish (frame, this, 0); +                dht_selfheal_dir_finish (frame, this, 0, 1);          }          return 0; @@ -886,7 +880,7 @@ dht_selfheal_dir_xattr (call_frame_t *frame, loc_t *loc, dht_layout_t *layout)                        missing_xattr, loc->path);          if (missing_xattr == 0) { -                dht_selfheal_dir_finish (frame, this, 0); +                dht_selfheal_dir_finish (frame, this, 0, 1);                  return 0;          } @@ -1013,7 +1007,7 @@ dht_selfheal_dir_xattr_for_nameless_lookup (call_frame_t *frame, loc_t *loc,                        missing_xattr, loc->path);          if (missing_xattr == 0) { -                dht_selfheal_dir_finish (frame, this, 0); +                dht_selfheal_dir_finish (frame, this, 0, 1);                  return 0;          } @@ -1081,7 +1075,7 @@ dht_selfheal_dir_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                                                  dht_should_heal_layout);                  if (ret < 0) { -                        dht_selfheal_dir_finish (frame, this, -1); +                        dht_selfheal_dir_finish (frame, this, -1, 1);                  }          } @@ -1112,7 +1106,7 @@ dht_selfheal_dir_setattr (call_frame_t *frame, loc_t *loc, struct iatt *stbuf,                                                  dht_should_heal_layout);                  if (ret < 0) { -                        dht_selfheal_dir_finish (frame, this, -1); +                        dht_selfheal_dir_finish (frame, this, -1, 1);                  }                  return 0; @@ -1150,7 +1144,7 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          dht_layout_t  *layout = NULL;          call_frame_t  *prev = NULL;          xlator_t      *subvol = NULL; -        int            i = 0; +        int            i = 0, ret = -1;          int            this_call_cnt = 0;          char           gfid[GF_UUID_BUF_SIZE] = {0}; @@ -1182,11 +1176,13 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          dht_iatt_merge (this, &local->stbuf, stbuf, prev->this);          dht_iatt_merge (this, &local->preparent, preparent, prev->this);          dht_iatt_merge (this, &local->postparent, postparent, prev->this); +        ret = 0;  out:          this_call_cnt = dht_frame_return (frame);          if (is_last_call (this_call_cnt)) { +                dht_selfheal_dir_finish (frame, this, ret, 0);                  dht_selfheal_dir_setattr (frame, &local->loc, &local->stbuf, 0xffffff, layout);          } @@ -1239,32 +1235,21 @@ out:  }  int -dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc, -                        dht_layout_t *layout, int force) +dht_selfheal_dir_mkdir_lookup_done (call_frame_t *frame, xlator_t *this)  { -        int           missing_dirs = 0; +        dht_local_t  *local = NULL;          int           i     = 0;          int           ret   = -1; -        dht_local_t  *local = NULL; -        xlator_t     *this = NULL;          dict_t       *dict = NULL; +        dht_layout_t  *layout = NULL; +        loc_t        *loc   = NULL; -        local = frame->local; -        this = frame->this; - -        local->selfheal.force_mkdir = force ? _gf_true : _gf_false; - -        for (i = 0; i < layout->cnt; i++) { -                if (layout->list[i].err == ENOENT || force) -                        missing_dirs++; -        } +        VALIDATE_OR_GOTO (this->private, err); -        if (missing_dirs == 0) { -                dht_selfheal_dir_setattr (frame, loc, &local->stbuf, 0xffffffff, layout); -                return 0; -        } +        local = frame->local; +        layout = local->layout; +        loc    = &local->loc; -        local->call_cnt = missing_dirs;          if (!gf_uuid_is_null (local->gfid)) {                  dict = dict_new ();                  if (!dict) @@ -1278,6 +1263,7 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,                                  " key = gfid-req", loc->path);          } else if (local->params) {                  /* Send the dictionary from higher layers directly */ +                  dict = dict_ref (local->params);          }          /* Set acls */ @@ -1290,7 +1276,8 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,                          "dict is NULL, need to make sure gfids are same");          for (i = 0; i < layout->cnt; i++) { -                if (layout->list[i].err == ENOENT || force) { +                if (layout->list[i].err == ENOENT || +                    local->selfheal.force_mkdir) {                          gf_msg_debug (this->name, 0,                                        "Creating directory %s on subvol %s",                                        loc->path, layout->list[i].xlator->name); @@ -1309,6 +1296,202 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,                  dict_unref (dict);          return 0; + +err: +        dht_selfheal_dir_finish (frame, this, -1, 1); +        return 0; +} + +int +dht_selfheal_dir_mkdir_lookup_cbk (call_frame_t *frame, void *cookie, +                                   xlator_t *this, int op_ret, int op_errno, +                                   inode_t *inode, struct iatt *stbuf, +                                   dict_t *xattr, struct iatt *postparent) +{ +        dht_local_t  *local = NULL; +        int           i     = 0; +        int           this_call_cnt = 0; +        int           missing_dirs = 0; +        dht_layout_t  *layout = NULL; +        loc_t         *loc    = NULL; + +        VALIDATE_OR_GOTO (this->private, err); + +        local = frame->local; +        layout = local->layout; +        loc = &local->loc; + +        this_call_cnt = dht_frame_return (frame); + +        LOCK (&frame->lock); +        { +                if ((op_ret < 0) && (op_errno == ENOENT || op_errno == ESTALE)) +                        local->selfheal.hole_cnt = !local->selfheal.hole_cnt ? 1 +                                                : local->selfheal.hole_cnt + 1; +        } +        UNLOCK (&frame->lock); + +        if (is_last_call (this_call_cnt)) { +                if (local->selfheal.hole_cnt == layout->cnt) { +                        gf_msg_debug (this->name, op_errno, +                                      "Lookup failed, an rmdir could have " +                                      "deleted this entry %s", loc->name); +                        local->op_errno = op_errno; +                        goto err; +                } else { +                        for (i = 0; i < layout->cnt; i++) { +                                if (layout->list[i].err == ENOENT || +                                    layout->list[i].err == ESTALE || +                                    local->selfheal.force_mkdir) +                                        missing_dirs++; +                        } + +                        if (missing_dirs == 0) { +                                dht_selfheal_dir_finish (frame, this, 0, 0); +                                dht_selfheal_dir_setattr (frame, loc, +                                                          &local->stbuf, +                                                          0xffffffff, layout); +                                return 0; +                        } + +                        local->call_cnt = missing_dirs; +                        dht_selfheal_dir_mkdir_lookup_done (frame, this); +                } +        } + +        return 0; + +err: +        dht_selfheal_dir_finish (frame, this, -1, 1); +        return 0; +} + + +int +dht_selfheal_dir_mkdir_lock_cbk (call_frame_t *frame, void *cookie, +                                 xlator_t *this, int32_t op_ret, +                                 int32_t op_errno, dict_t *xdata) +{ +        dht_local_t  *local = NULL; +        dht_conf_t   *conf  = NULL; +        int           i     = 0; + +        VALIDATE_OR_GOTO (this->private, err); + +        conf = this->private; +        local = frame->local; + +	    local->call_cnt = conf->subvolume_cnt; + +        if (op_ret < 0) { + +                /* We get this error when the directory entry was not created +                 * on a newky attatched tier subvol. Hence proceed and do mkdir +                 * on the tier subvol. +                 */ +                if (op_errno == EINVAL) { +                        local->call_cnt = 1; +                        dht_selfheal_dir_mkdir_lookup_done (frame, this); +                        return 0; +                } + +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_INODE_LK_ERROR, +                        "acquiring inodelk failed for %s", +                        local->loc.path); + +                local->op_errno = op_errno; +                goto err; +        } + +        /* After getting locks, perform lookup again to ensure that the +           directory was not deleted by a racing rmdir +        */ + +        for (i = 0; i < conf->subvolume_cnt; i++) { +                STACK_WIND (frame, dht_selfheal_dir_mkdir_lookup_cbk, +                            conf->subvolumes[i], +                            conf->subvolumes[i]->fops->lookup, +                            &local->loc, NULL); +        } + +        return 0; + +err: +        dht_selfheal_dir_finish (frame, this, -1, 1); +        return 0; +} + +int +dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc, +                        dht_layout_t *layout, int force) +{ +        int           missing_dirs = 0; +        int           i     = 0; +        int           ret   = -1; +        int           count = 1; +        dht_local_t  *local = NULL; +        dht_conf_t   *conf  = NULL; +        xlator_t     *this = NULL; +        dht_lock_t   **lk_array = NULL; + +        local = frame->local; +        this = frame->this; +        conf = this->private; + +        local->selfheal.force_mkdir = force; +        local->selfheal.hole_cnt = 0; + +        for (i = 0; i < layout->cnt; i++) { +                if (layout->list[i].err == ENOENT || force) +                        missing_dirs++; +        } + +        if (missing_dirs == 0) { +                dht_selfheal_dir_setattr (frame, loc, &local->stbuf, +                                          0xffffffff, layout); +                return 0; +        } + +        count = conf->subvolume_cnt; + +        /* Locking on all subvols in the mkdir phase of lookup selfheal is +           is done to synchronize with rmdir/rename. +        */ +        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); +        if (lk_array == NULL) +                goto err; + +        for (i = 0; i < count; i++) { +                lk_array[i] = dht_lock_new (frame->this, +                                            conf->subvolumes[i], +                                            &local->loc, F_WRLCK, +                                            DHT_LAYOUT_HEAL_DOMAIN); +                if (lk_array[i] == NULL) +                        goto err; +        } + +        local->lock.locks = lk_array; +        local->lock.lk_count = count; + +        ret = dht_blocking_inodelk (frame, lk_array, count, +                                    IGNORE_ENOENT_ESTALE, +                                    dht_selfheal_dir_mkdir_lock_cbk); + +        if (ret < 0) { +                local->lock.locks = NULL; +                local->lock.lk_count = 0; +                goto err; +        } + +        return 0; +err: +        if (lk_array != NULL) { +                dht_lock_array_free (lk_array, count); +                GF_FREE (lk_array); +        } + +        return -1;  }  int @@ -1882,7 +2065,7 @@ dht_selfheal_directory (call_frame_t *frame, dht_selfheal_dir_cbk_t dir_cbk,  sorry_no_fix:          /* TODO: need to put appropriate local->op_errno */ -        dht_selfheal_dir_finish (frame, this, ret); +        dht_selfheal_dir_finish (frame, this, ret, 1);          return 0;  } @@ -1950,7 +2133,7 @@ dht_selfheal_directory_for_nameless_lookup (call_frame_t *frame,  sorry_no_fix:          /* TODO: need to put appropriate local->op_errno */ -        dht_selfheal_dir_finish (frame, this, ret); +        dht_selfheal_dir_finish (frame, this, ret, 1);          return 0; @@ -2301,7 +2484,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)          local->lock.locks = lk_array;          local->lock.lk_count = count; -        ret = dht_blocking_inodelk (frame, lk_array, count, +        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,                                      dht_update_commit_hash_for_layout_resume);          if (ret < 0) {                  local->lock.locks = NULL; @@ -2312,13 +2495,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)          return 0;  err:          if (lk_array != NULL) { -                int tmp_count = 0, i = 0; - -                for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) { -                        ; -                } - -                dht_lock_array_free (lk_array, tmp_count); +                dht_lock_array_free (lk_array, count);                  GF_FREE (lk_array);          }  | 
