diff options
Diffstat (limited to 'xlators/cluster/dht/src')
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.c | 192 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 11 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-helper.c | 33 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-rename.c | 2 | ||||
| -rw-r--r-- | xlators/cluster/dht/src/dht-selfheal.c | 286 | 
5 files changed, 435 insertions, 89 deletions
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c index ee37d2f36af..4c93084ec82 100644 --- a/xlators/cluster/dht/src/dht-common.c +++ b/xlators/cluster/dht/src/dht-common.c @@ -55,6 +55,9 @@ int32_t dht_set_fixed_dir_stat (struct iatt *stat)  int +dht_rmdir_unlock (call_frame_t *frame, xlator_t *this); + +int  dht_aggregate_quota_xattr (dict_t *dst, char *key, data_t *value)  {          int              ret            = -1; @@ -4669,6 +4672,10 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,                  if (IA_ISINVAL(orig_entry->d_stat.ia_type)) {                          /*stat failed somewhere- ignore this entry*/ +                        gf_msg_debug (this->name, EINVAL, +                                      "Invalid stat, ignoring entry " +                                      "%s gfid %s", orig_entry->d_name, +                                      uuid_utoa (orig_entry->d_stat.ia_gfid));                          continue;                  } @@ -4681,7 +4688,6 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,                   * corresponding hashed subvolume will take care of the                   * directory entry.                   */ -                          if (readdir_optimize) {                                  if (prev->this == local->first_up_subvol)                                          goto list; @@ -5199,7 +5205,7 @@ out:          if (local && local->lock.locks) {                  /* store op_errno for failure case*/                  local->op_errno = op_errno; -                local->refresh_layout_unlock (frame, this, op_ret); +                local->refresh_layout_unlock (frame, this, op_ret, 1);                  if (op_ret == 0) {                          DHT_STACK_UNWIND (mknod, frame, op_ret, op_errno, @@ -5260,7 +5266,7 @@ dht_mknod_linkfile_create_cbk (call_frame_t *frame, void *cookie,          return 0;  err:          if (local && local->lock.locks) { -                local->refresh_layout_unlock (frame, this, -1); +                local->refresh_layout_unlock (frame, this, -1, 1);          } else {                  DHT_STACK_UNWIND (mknod, frame, -1,                                    op_errno, NULL, NULL, NULL, @@ -5367,7 +5373,7 @@ dht_mknod_do (call_frame_t *frame)                                           local->umask, local->params);          return 0;  err: -        local->refresh_layout_unlock (frame, this, -1); +        local->refresh_layout_unlock (frame, this, -1, 1);          return 0;  } @@ -5382,7 +5388,8 @@ dht_mknod_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  }  int32_t -dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret) +dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret, +                  int invoke_cbk)  {          dht_local_t  *local      = NULL, *lock_local = NULL;          call_frame_t *lock_frame = NULL; @@ -5457,7 +5464,7 @@ dht_mknod_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        dht_mknod_finish (frame, this, -1); +        dht_mknod_finish (frame, this, -1, 0);          return 0;  } @@ -5488,7 +5495,7 @@ dht_mknod_lock (call_frame_t *frame, xlator_t *subvol)          local->lock.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count, -                                    dht_mknod_lock_cbk); +                                    IGNORE_ENOENT_ESTALE, dht_mknod_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL; @@ -6028,7 +6035,7 @@ out:          if (local && local->lock.locks) {                  /* store op_errno for failure case*/                  local->op_errno = op_errno; -                local->refresh_layout_unlock (frame, this, op_ret); +                local->refresh_layout_unlock (frame, this, op_ret, 1);                  if (op_ret == 0) {                          DHT_STACK_UNWIND (create, frame, op_ret, op_errno, fd, @@ -6087,7 +6094,7 @@ dht_create_linkfile_create_cbk (call_frame_t *frame, void *cookie,          return 0;  err:          if (local && local->lock.locks) { -                local->refresh_layout_unlock (frame, this, -1); +                local->refresh_layout_unlock (frame, this, -1, 1);          } else {                  DHT_STACK_UNWIND (create, frame, -1,                                    op_errno, NULL, NULL, NULL, @@ -6253,7 +6260,7 @@ dht_create_do (call_frame_t *frame)                                           local->umask, local->fd, local->params);          return 0;  err: -        local->refresh_layout_unlock (frame, this, -1); +        local->refresh_layout_unlock (frame, this, -1, 1);          return 0;  } @@ -6267,7 +6274,8 @@ dht_create_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  }  int32_t -dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret) +dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret, +                   int invoke_cbk)  {          dht_local_t  *local      = NULL, *lock_local = NULL;          call_frame_t *lock_frame = NULL; @@ -6342,7 +6350,7 @@ dht_create_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        dht_create_finish (frame, this, -1); +        dht_create_finish (frame, this, -1, 0);          return 0;  } @@ -6373,7 +6381,7 @@ dht_create_lock (call_frame_t *frame, xlator_t *subvol)          local->lock.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count, -                                    dht_create_lock_cbk); +                                    IGNORE_ENOENT_ESTALE, dht_create_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL; @@ -6797,8 +6805,8 @@ dht_rmdir_selfheal_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  int  dht_rmdir_hashed_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -               int op_ret, int op_errno, struct iatt *preparent, -               struct iatt *postparent, dict_t *xdata) +                             int op_ret, int op_errno, struct iatt *preparent, +                             struct iatt *postparent, dict_t *xdata)  {          dht_local_t  *local = NULL;          dht_conf_t   *conf = NULL; @@ -6818,7 +6826,8 @@ dht_rmdir_hashed_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          local->op_errno = op_errno;                          local->op_ret   = -1;                          if (conf->subvolume_cnt != 1) { -                                if (op_errno != ENOENT && op_errno != EACCES) { +                                if (op_errno != ENOENT && op_errno != EACCES +                                    && op_errno != ESTALE) {                                          local->need_selfheal = 1;                                  }                          } @@ -6842,6 +6851,7 @@ unlock:          this_call_cnt = dht_frame_return (frame);          if (is_last_call (this_call_cnt)) {                 if (local->need_selfheal) { +                        dht_rmdir_unlock (frame, this);                          local->layout =                                  dht_layout_get (this, local->loc.inode); @@ -6868,6 +6878,7 @@ unlock:                          dht_set_fixed_dir_stat (&local->preparent);                          dht_set_fixed_dir_stat (&local->postparent); +                        dht_rmdir_unlock (frame, this);                          DHT_STACK_UNWIND (rmdir, frame, local->op_ret,                                            local->op_errno, &local->preparent,                                            &local->postparent, NULL); @@ -6936,6 +6947,7 @@ unlock:          if (done) {                  if (local->need_selfheal && local->fop_succeeded) { +                        dht_rmdir_unlock (frame, this);                          local->layout =                                  dht_layout_get (this, local->loc.inode); @@ -6973,6 +6985,7 @@ unlock:                          dht_set_fixed_dir_stat (&local->preparent);                          dht_set_fixed_dir_stat (&local->postparent); +                        dht_rmdir_unlock (frame, this);                          DHT_STACK_UNWIND (rmdir, frame, local->op_ret,                                            local->op_errno, &local->preparent,                                            &local->postparent, NULL); @@ -6984,11 +6997,110 @@ unlock:  int +dht_rmdir_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                      int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        DHT_STACK_DESTROY (frame); +        return 0; +} + + +int +dht_rmdir_unlock (call_frame_t *frame, xlator_t *this) +{ +        dht_local_t  *local      = NULL, *lock_local = NULL; +        call_frame_t *lock_frame = NULL; +        int           lock_count = 0; + +        local = frame->local; +        lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count); + +        if (lock_count == 0) +                goto done; + +        lock_frame = copy_frame (frame); +        if (lock_frame == NULL) +                goto done; + +        lock_local = dht_local_init (lock_frame, &local->loc, NULL, +                                     lock_frame->root->op); +        if (lock_local == NULL) +                goto done; + +        lock_local->lock.locks = local->lock.locks; +        lock_local->lock.lk_count = local->lock.lk_count; + +        local->lock.locks = NULL; +        local->lock.lk_count = 0; +        dht_unlock_inodelk (lock_frame, lock_local->lock.locks, +                            lock_local->lock.lk_count, +                            dht_rmdir_unlock_cbk); +        lock_frame = NULL; + +done: +        if (lock_frame != NULL) { +                DHT_STACK_DESTROY (lock_frame); +        } + +        return 0; +} + + +int +dht_rmdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int32_t op_ret, int32_t op_errno, dict_t *xdata) +{ +        dht_local_t  *local = NULL; +        dht_conf_t   *conf  = NULL; +        int           i     = 0; + +        VALIDATE_OR_GOTO (this->private, err); + +        conf = this->private; +        local = frame->local; + +        if (op_ret < 0) { +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_INODE_LK_ERROR, +                        "acquiring inodelk failed rmdir for %s)", +                        local->loc.path); + +                local->op_ret = -1; +                local->op_errno = op_errno; +                goto err; +        } + +        for (i = 0; i < conf->subvolume_cnt; i++) { +                if (local->hashed_subvol && +                    (local->hashed_subvol == conf->subvolumes[i])) +                        continue; + +                STACK_WIND (frame, dht_rmdir_cbk, +                            conf->subvolumes[i], +                            conf->subvolumes[i]->fops->rmdir, +                            &local->loc, local->flags, NULL); +        } + +        return 0; + +err: +        /* No harm in calling an extra rmdir unlock */ +        dht_rmdir_unlock (frame, this); +        DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno, +                          &local->preparent, &local->postparent, NULL); + +        return 0; +} + + +int  dht_rmdir_do (call_frame_t *frame, xlator_t *this)  {          dht_local_t  *local = NULL;          dht_conf_t   *conf = NULL; -        int           i = 0; +        dht_lock_t   **lk_array = NULL; +        int           i = 0, ret = -1; +        int           count = 1;          xlator_t     *hashed_subvol = NULL;          char gfid[GF_UUID_BUF_SIZE] ={0}; @@ -7002,7 +7114,6 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)          local->call_cnt = conf->subvolume_cnt; -          /* first remove from non-hashed_subvol */          hashed_subvol = dht_subvol_get_hashed (this, &local->loc); @@ -7026,15 +7137,39 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)                  return 0;          } -        for (i = 0; i < conf->subvolume_cnt; i++) { -                if (hashed_subvol && -                    (hashed_subvol == conf->subvolumes[i])) -                        continue; +        count = conf->subvolume_cnt; -                STACK_WIND (frame, dht_rmdir_cbk, -                            conf->subvolumes[i], -                            conf->subvolumes[i]->fops->rmdir, -                            &local->loc, local->flags, NULL); +        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); +        if (lk_array == NULL) { +                local->op_ret = -1; +                local->op_errno = ENOMEM; +                goto err; +        } + +        for (i = 0; i < count; i++) { +                lk_array[i] = dht_lock_new (frame->this, +                                            conf->subvolumes[i], +                                            &local->loc, F_WRLCK, +                                            DHT_LAYOUT_HEAL_DOMAIN); +                if (lk_array[i] == NULL) { +                        local->op_ret = -1; +                        local->op_errno = EINVAL; +                        goto err; +                } +        } + +        local->lock.locks = lk_array; +        local->lock.lk_count = count; + +        ret = dht_blocking_inodelk (frame, lk_array, count, +                                    IGNORE_ENOENT_ESTALE, +                                    dht_rmdir_lock_cbk); +        if (ret < 0) { +                local->lock.locks = NULL; +                local->lock.lk_count = 0; +                local->op_ret = -1; +                local->op_errno = errno ? errno : EINVAL; +                goto err;          }          return 0; @@ -7043,6 +7178,11 @@ err:          dht_set_fixed_dir_stat (&local->preparent);          dht_set_fixed_dir_stat (&local->postparent); +        if (lk_array != NULL) { +                dht_lock_array_free (lk_array, count); +                GF_FREE (lk_array); +        } +          DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,                            &local->preparent, &local->postparent, NULL);          return 0; diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index cd192f5baff..b63ee65acfb 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -45,7 +45,7 @@ typedef int (*dht_defrag_cbk_fn_t) (xlator_t        *this, xlator_t *dst_node,                                      call_frame_t    *frame, int ret);  typedef int (*dht_refresh_layout_unlock) (call_frame_t *frame, xlator_t *this, -                                         int op_ret); +                                         int op_ret, int invoke_cbk);  typedef int (*dht_refresh_layout_done_handle) (call_frame_t *frame); @@ -140,6 +140,11 @@ typedef enum {          qdstatfs_action_COMPARE,  } qdstatfs_action_t; +typedef enum { +        FAIL_ON_ANY_ERROR, +        IGNORE_ENOENT_ESTALE +} dht_reaction_type_t; +  struct dht_skip_linkto_unlink {          gf_boolean_t    handle_valid_link; @@ -270,6 +275,7 @@ struct dht_local {                  fop_inodelk_cbk_t   inodelk_cbk;                  dht_lock_t        **locks;                  int                 lk_count; +                dht_reaction_type_t reaction;                  /* whether locking failed on _any_ of the "locks" above */                  int                 op_ret; @@ -1128,7 +1134,8 @@ dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,   */  int  dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                      int lk_count, fop_inodelk_cbk_t inodelk_cbk); +                      int lk_count, dht_reaction_type_t reaction, +                      fop_inodelk_cbk_t inodelk_cbk);  int32_t  dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count, diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c index 8e82eef697c..8673c1fd83a 100644 --- a/xlators/cluster/dht/src/dht-helper.c +++ b/xlators/cluster/dht/src/dht-helper.c @@ -491,6 +491,7 @@ dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type,          lock->xl = xl;          lock->type = type; +          lock->domain = gf_strdup (domain);          if (lock->domain == NULL) {                  dht_lock_free (lock); @@ -1976,21 +1977,41 @@ dht_blocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                            int32_t op_ret, int32_t op_errno, dict_t *xdata)  {          int          lk_index = 0; +        int          i        = 0;          dht_local_t *local    = NULL;          lk_index = (long) cookie;          local = frame->local; -          if (op_ret == 0) {                  local->lock.locks[lk_index]->locked = _gf_true;          } else { -                local->lock.op_ret = -1; -                local->lock.op_errno = op_errno; -                goto cleanup; +                switch (op_errno) { +                case ESTALE: +                case ENOENT: +                        if (local->lock.reaction != IGNORE_ENOENT_ESTALE) { +                                local->lock.op_ret = -1; +                                local->lock.op_errno = op_errno; +                                goto cleanup; +                        } +                        break; +                default: +                        local->lock.op_ret = -1; +                        local->lock.op_errno = op_errno; +                        goto cleanup; +                }          }          if (lk_index == (local->lock.lk_count - 1)) { +                for (i = 0; (i < local->lock.lk_count) && +                     (!local->lock.locks[i]->locked); i++) +                        ; + +                if (i == local->lock.lk_count) { +                        local->lock.op_ret = -1; +                        local->lock.op_errno = op_errno; +                } +                  dht_inodelk_done (frame);          } else {                  dht_blocking_inodelk_rec (frame, ++lk_index); @@ -2064,7 +2085,8 @@ out:  int  dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array, -                      int lk_count, fop_inodelk_cbk_t inodelk_cbk) +                      int lk_count, dht_reaction_type_t reaction, +                      fop_inodelk_cbk_t inodelk_cbk)  {          int           ret        = -1;          call_frame_t *lock_frame = NULL; @@ -2086,6 +2108,7 @@ dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,          dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner);          local = lock_frame->local; +        local->lock.reaction = reaction;          local->main_frame = frame;          dht_blocking_inodelk_rec (lock_frame, 0); diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c index 146faa1a257..ed07be73ea7 100644 --- a/xlators/cluster/dht/src/dht-rename.c +++ b/xlators/cluster/dht/src/dht-rename.c @@ -1314,7 +1314,7 @@ dht_rename_lock (call_frame_t *frame)          local->lock.lk_count = count;          ret = dht_blocking_inodelk (frame, lk_array, count, -                                    dht_rename_lock_cbk); +                                    FAIL_ON_ANY_ERROR, dht_rename_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL;                  local->lock.lk_count = 0; diff --git a/xlators/cluster/dht/src/dht-selfheal.c b/xlators/cluster/dht/src/dht-selfheal.c index 00303b5cc84..bb48eb49c38 100644 --- a/xlators/cluster/dht/src/dht-selfheal.c +++ b/xlators/cluster/dht/src/dht-selfheal.c @@ -77,7 +77,8 @@ dht_selfheal_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  }  int -dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret) +dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret, +                         int invoke_cbk)  {          dht_local_t  *local      = NULL, *lock_local = NULL;          call_frame_t *lock_frame = NULL; @@ -85,7 +86,6 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)          local = frame->local;          lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count); -          if (lock_count == 0)                  goto done; @@ -112,8 +112,9 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)          lock_frame = NULL;  done: -        local->selfheal.dir_cbk (frame, NULL, frame->this, ret, -                                 local->op_errno, NULL); +        if (invoke_cbk) +                local->selfheal.dir_cbk (frame, NULL, frame->this, ret, +                                         local->op_errno, NULL);          if (lock_frame != NULL) {                  DHT_STACK_DESTROY (lock_frame);          } @@ -155,13 +156,13 @@ dht_refresh_layout_done (call_frame_t *frame)                  dht_layout_unref (frame->this, heal); -                dht_selfheal_dir_finish (frame, frame->this, 0); +                dht_selfheal_dir_finish (frame, frame->this, 0, 1);          }          return 0;  err: -        dht_selfheal_dir_finish (frame, frame->this, -1); +        dht_selfheal_dir_finish (frame, frame->this, -1, 1);          return 0;  } @@ -221,8 +222,7 @@ unlock:          return 0;  err: -        local->refresh_layout_unlock (frame, this, -1); - +        local->refresh_layout_unlock (frame, this, -1, 1);          return 0;  } @@ -288,7 +288,7 @@ dht_refresh_layout (call_frame_t *frame)          return 0;  out: -        local->refresh_layout_unlock (frame, this, -1); +        local->refresh_layout_unlock (frame, this, -1, 1);          return 0;  } @@ -317,7 +317,7 @@ dht_selfheal_layout_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        dht_selfheal_dir_finish (frame, this, -1); +        dht_selfheal_dir_finish (frame, this, -1, 1);          return 0;  } @@ -580,7 +580,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,          local->lock.locks = lk_array;          local->lock.lk_count = count; -        ret = dht_blocking_inodelk (frame, lk_array, count, +        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,                                      dht_selfheal_layout_lock_cbk);          if (ret < 0) {                  local->lock.locks = NULL; @@ -591,13 +591,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,          return 0;  err:          if (lk_array != NULL) { -                int tmp_count = 0, i = 0; - -                for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) { -                        ; -                } - -                dht_lock_array_free (lk_array, tmp_count); +                dht_lock_array_free (lk_array, count);                  GF_FREE (lk_array);          } @@ -650,7 +644,7 @@ dht_selfheal_dir_xattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          this_call_cnt = dht_frame_return (frame);          if (is_last_call (this_call_cnt)) { -                dht_selfheal_dir_finish (frame, this, 0); +                dht_selfheal_dir_finish (frame, this, 0, 1);          }          return 0; @@ -884,7 +878,7 @@ dht_selfheal_dir_xattr (call_frame_t *frame, loc_t *loc, dht_layout_t *layout)                        missing_xattr, loc->path);          if (missing_xattr == 0) { -                dht_selfheal_dir_finish (frame, this, 0); +                dht_selfheal_dir_finish (frame, this, 0, 1);                  return 0;          } @@ -1011,7 +1005,7 @@ dht_selfheal_dir_xattr_for_nameless_lookup (call_frame_t *frame, loc_t *loc,                        missing_xattr, loc->path);          if (missing_xattr == 0) { -                dht_selfheal_dir_finish (frame, this, 0); +                dht_selfheal_dir_finish (frame, this, 0, 1);                  return 0;          } @@ -1079,7 +1073,7 @@ dht_selfheal_dir_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                                                  dht_should_heal_layout);                  if (ret < 0) { -                        dht_selfheal_dir_finish (frame, this, -1); +                        dht_selfheal_dir_finish (frame, this, -1, 1);                  }          } @@ -1110,7 +1104,7 @@ dht_selfheal_dir_setattr (call_frame_t *frame, loc_t *loc, struct iatt *stbuf,                                                  dht_should_heal_layout);                  if (ret < 0) { -                        dht_selfheal_dir_finish (frame, this, -1); +                        dht_selfheal_dir_finish (frame, this, -1, 1);                  }                  return 0; @@ -1148,7 +1142,7 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          dht_layout_t  *layout = NULL;          call_frame_t  *prev = NULL;          xlator_t      *subvol = NULL; -        int            i = 0; +        int            i = 0, ret = -1;          int            this_call_cnt = 0;          char           gfid[GF_UUID_BUF_SIZE] = {0}; @@ -1177,11 +1171,13 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          }          dht_iatt_merge (this, &local->preparent, preparent, prev->this);          dht_iatt_merge (this, &local->postparent, postparent, prev->this); +        ret = 0;  out:          this_call_cnt = dht_frame_return (frame);          if (is_last_call (this_call_cnt)) { +                dht_selfheal_dir_finish (frame, this, ret, 0);                  dht_selfheal_dir_setattr (frame, &local->loc, &local->stbuf, 0xffffff, layout);          } @@ -1234,33 +1230,21 @@ out:  }  int -dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc, -                        dht_layout_t *layout, int force) +dht_selfheal_dir_mkdir_lookup_done (call_frame_t *frame, xlator_t *this)  { -        int           missing_dirs = 0; +        dht_local_t  *local = NULL;          int           i     = 0;          int           ret   = -1; -        dht_local_t  *local = NULL; -        xlator_t     *this = NULL;          dict_t       *dict = NULL; +        dht_layout_t  *layout = NULL; +        loc_t        *loc   = NULL; -        local = frame->local; -        this = frame->this; - -        local->selfheal.force_mkdir = force ? _gf_true : _gf_false; - -        for (i = 0; i < layout->cnt; i++) { -                if (layout->list[i].err == ENOENT || force) -                        missing_dirs++; -        } +        VALIDATE_OR_GOTO (this->private, err); -        if (missing_dirs == 0) { -                dht_selfheal_dir_setattr (frame, loc, &local->stbuf, -                                          0xffffffff, layout); -                return 0; -        } +        local = frame->local; +        layout = local->layout; +        loc    = &local->loc; -        local->call_cnt = missing_dirs;          if (!gf_uuid_is_null (local->gfid)) {                  dict = dict_new ();                  if (!dict) @@ -1274,6 +1258,7 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,                                  " key = gfid-req", loc->path);          } else if (local->params) {                  /* Send the dictionary from higher layers directly */ +                  dict = dict_ref (local->params);          }          /* Set acls */ @@ -1286,7 +1271,8 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,                          "dict is NULL, need to make sure gfids are same");          for (i = 0; i < layout->cnt; i++) { -                if (layout->list[i].err == ENOENT || force) { +                if (layout->list[i].err == ENOENT || +                    local->selfheal.force_mkdir) {                          gf_msg_debug (this->name, 0,                                        "Creating directory %s on subvol %s",                                        loc->path, layout->list[i].xlator->name); @@ -1305,6 +1291,202 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,                  dict_unref (dict);          return 0; + +err: +        dht_selfheal_dir_finish (frame, this, -1, 1); +        return 0; +} + +int +dht_selfheal_dir_mkdir_lookup_cbk (call_frame_t *frame, void *cookie, +                                   xlator_t *this, int op_ret, int op_errno, +                                   inode_t *inode, struct iatt *stbuf, +                                   dict_t *xattr, struct iatt *postparent) +{ +        dht_local_t  *local = NULL; +        int           i     = 0; +        int           this_call_cnt = 0; +        int           missing_dirs = 0; +        dht_layout_t  *layout = NULL; +        loc_t         *loc    = NULL; + +        VALIDATE_OR_GOTO (this->private, err); + +        local = frame->local; +        layout = local->layout; +        loc = &local->loc; + +        this_call_cnt = dht_frame_return (frame); + +        LOCK (&frame->lock); +        { +                if ((op_ret < 0) && (op_errno == ENOENT || op_errno == ESTALE)) +                        local->selfheal.hole_cnt = !local->selfheal.hole_cnt ? 1 +                                                : local->selfheal.hole_cnt + 1; +        } +        UNLOCK (&frame->lock); + +        if (is_last_call (this_call_cnt)) { +                if (local->selfheal.hole_cnt == layout->cnt) { +                        gf_msg_debug (this->name, op_errno, +                                      "Lookup failed, an rmdir could have " +                                      "deleted this entry %s", loc->name); +                        local->op_errno = op_errno; +                        goto err; +                } else { +                        for (i = 0; i < layout->cnt; i++) { +                                if (layout->list[i].err == ENOENT || +                                    layout->list[i].err == ESTALE || +                                    local->selfheal.force_mkdir) +                                        missing_dirs++; +                        } + +                        if (missing_dirs == 0) { +                                dht_selfheal_dir_finish (frame, this, 0, 0); +                                dht_selfheal_dir_setattr (frame, loc, +                                                          &local->stbuf, +                                                          0xffffffff, layout); +                                return 0; +                        } + +                        local->call_cnt = missing_dirs; +                        dht_selfheal_dir_mkdir_lookup_done (frame, this); +                } +        } + +        return 0; + +err: +        dht_selfheal_dir_finish (frame, this, -1, 1); +        return 0; +} + + +int +dht_selfheal_dir_mkdir_lock_cbk (call_frame_t *frame, void *cookie, +                                 xlator_t *this, int32_t op_ret, +                                 int32_t op_errno, dict_t *xdata) +{ +        dht_local_t  *local = NULL; +        dht_conf_t   *conf  = NULL; +        int           i     = 0; + +        VALIDATE_OR_GOTO (this->private, err); + +        conf = this->private; +        local = frame->local; + +	    local->call_cnt = conf->subvolume_cnt; + +        if (op_ret < 0) { + +                /* We get this error when the directory entry was not created +                 * on a newky attatched tier subvol. Hence proceed and do mkdir +                 * on the tier subvol. +                 */ +                if (op_errno == EINVAL) { +                        local->call_cnt = 1; +                        dht_selfheal_dir_mkdir_lookup_done (frame, this); +                        return 0; +                } + +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_INODE_LK_ERROR, +                        "acquiring inodelk failed for %s", +                        local->loc.path); + +                local->op_errno = op_errno; +                goto err; +        } + +        /* After getting locks, perform lookup again to ensure that the +           directory was not deleted by a racing rmdir +        */ + +        for (i = 0; i < conf->subvolume_cnt; i++) { +                STACK_WIND (frame, dht_selfheal_dir_mkdir_lookup_cbk, +                            conf->subvolumes[i], +                            conf->subvolumes[i]->fops->lookup, +                            &local->loc, NULL); +        } + +        return 0; + +err: +        dht_selfheal_dir_finish (frame, this, -1, 1); +        return 0; +} + +int +dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc, +                        dht_layout_t *layout, int force) +{ +        int           missing_dirs = 0; +        int           i     = 0; +        int           ret   = -1; +        int           count = 1; +        dht_local_t  *local = NULL; +        dht_conf_t   *conf  = NULL; +        xlator_t     *this = NULL; +        dht_lock_t   **lk_array = NULL; + +        local = frame->local; +        this = frame->this; +        conf = this->private; + +        local->selfheal.force_mkdir = force; +        local->selfheal.hole_cnt = 0; + +        for (i = 0; i < layout->cnt; i++) { +                if (layout->list[i].err == ENOENT || force) +                        missing_dirs++; +        } + +        if (missing_dirs == 0) { +                dht_selfheal_dir_setattr (frame, loc, &local->stbuf, +                                          0xffffffff, layout); +                return 0; +        } + +        count = conf->subvolume_cnt; + +        /* Locking on all subvols in the mkdir phase of lookup selfheal is +           is done to synchronize with rmdir/rename. +        */ +        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char); +        if (lk_array == NULL) +                goto err; + +        for (i = 0; i < count; i++) { +                lk_array[i] = dht_lock_new (frame->this, +                                            conf->subvolumes[i], +                                            &local->loc, F_WRLCK, +                                            DHT_LAYOUT_HEAL_DOMAIN); +                if (lk_array[i] == NULL) +                        goto err; +        } + +        local->lock.locks = lk_array; +        local->lock.lk_count = count; + +        ret = dht_blocking_inodelk (frame, lk_array, count, +                                    IGNORE_ENOENT_ESTALE, +                                    dht_selfheal_dir_mkdir_lock_cbk); + +        if (ret < 0) { +                local->lock.locks = NULL; +                local->lock.lk_count = 0; +                goto err; +        } + +        return 0; +err: +        if (lk_array != NULL) { +                dht_lock_array_free (lk_array, count); +                GF_FREE (lk_array); +        } + +        return -1;  }  int @@ -1878,7 +2060,7 @@ dht_selfheal_directory (call_frame_t *frame, dht_selfheal_dir_cbk_t dir_cbk,  sorry_no_fix:          /* TODO: need to put appropriate local->op_errno */ -        dht_selfheal_dir_finish (frame, this, ret); +        dht_selfheal_dir_finish (frame, this, ret, 1);          return 0;  } @@ -1946,7 +2128,7 @@ dht_selfheal_directory_for_nameless_lookup (call_frame_t *frame,  sorry_no_fix:          /* TODO: need to put appropriate local->op_errno */ -        dht_selfheal_dir_finish (frame, this, ret); +        dht_selfheal_dir_finish (frame, this, ret, 1);          return 0; @@ -2299,7 +2481,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)          local->lock.locks = lk_array;          local->lock.lk_count = count; -        ret = dht_blocking_inodelk (frame, lk_array, count, +        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,                                      dht_update_commit_hash_for_layout_resume);          if (ret < 0) {                  local->lock.locks = NULL; @@ -2310,13 +2492,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)          return 0;  err:          if (lk_array != NULL) { -                int tmp_count = 0, i = 0; - -                for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) { -                        ; -                } - -                dht_lock_array_free (lk_array, tmp_count); +                dht_lock_array_free (lk_array, count);                  GF_FREE (lk_array);          }  | 
