diff options
| -rw-r--r-- | xlators/cluster/dht/src/dht-common.c | 248 | 
1 files changed, 203 insertions, 45 deletions
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c index ba082e28956..e3af5a7fe8b 100644 --- a/xlators/cluster/dht/src/dht-common.c +++ b/xlators/cluster/dht/src/dht-common.c @@ -41,6 +41,11 @@ dht_setxattr2 (xlator_t *this, xlator_t *subvol, call_frame_t *frame,                 int ret); +int +dht_rmdir_readdirp_do (call_frame_t *readdirp_frame, xlator_t *this); + + +  /* Sets the blocks and size values to fixed values. This is to be called   * only for dirs. The caller is responsible for checking the type   */ @@ -8211,8 +8216,8 @@ dht_rmdir_linkfile_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this          dht_local_t    *local = NULL;          xlator_t       *prev = NULL;          xlator_t       *src = NULL; -        call_frame_t   *main_frame = NULL; -        dht_local_t    *main_local = NULL; +        call_frame_t   *readdirp_frame = NULL; +        dht_local_t    *readdirp_local = NULL;          int             this_call_cnt = 0;          char gfid[GF_UUID_BUF_SIZE] ={0}; @@ -8221,8 +8226,9 @@ dht_rmdir_linkfile_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this          prev   = cookie;          src    = prev; -        main_frame = local->main_frame; -        main_local = main_frame->local; + +        readdirp_frame = local->main_frame; +        readdirp_local = readdirp_frame->local;          gf_uuid_unparse(local->loc.gfid, gfid); @@ -8231,16 +8237,18 @@ dht_rmdir_linkfile_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this                                "Unlinked linkfile %s on %s, gfid = %s",                                local->loc.path, src->name, gfid);          } else { -                main_local->op_ret   = -1; -                main_local->op_errno = op_errno; +                if (op_errno != ENOENT) { +                        readdirp_local->op_ret   = -1; +                        readdirp_local->op_errno = op_errno; +                }                  gf_msg_debug (this->name, op_errno,                                "Unlink of %s on %s failed. (gfid = %s)",                                local->loc.path, src->name, gfid);          } -        this_call_cnt = dht_frame_return (main_frame); +        this_call_cnt = dht_frame_return (readdirp_frame);          if (is_last_call (this_call_cnt)) -                dht_rmdir_do (main_frame, this); +                dht_rmdir_readdirp_do (readdirp_frame, this);          DHT_STACK_DESTROY (frame);          return 0; @@ -8255,8 +8263,8 @@ dht_rmdir_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          dht_local_t    *local = NULL;          xlator_t       *prev = NULL;          xlator_t       *src = NULL; -        call_frame_t   *main_frame = NULL; -        dht_local_t    *main_local = NULL; +        call_frame_t   *readdirp_frame = NULL; +        dht_local_t    *readdirp_local = NULL;          int             this_call_cnt = 0;          dht_conf_t     *conf = this->private;          char               gfid[GF_UUID_BUF_SIZE] = {0}; @@ -8265,17 +8273,24 @@ dht_rmdir_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          prev  = cookie;          src   = prev; -        main_frame = local->main_frame; -        main_local = main_frame->local; +gf_log ("dht", GF_LOG_INFO, "dht_rmdir_lookup_cbk %s", local->loc.path); +        readdirp_frame = local->main_frame; +        readdirp_local = readdirp_frame->local; -        if (op_ret != 0) +        if (op_ret != 0) { + +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_NOT_LINK_FILE_ERROR, +                        "lookup failed for %s on %s  (type=0%o)", +                        local->loc.path, src->name, stbuf->ia_type);                  goto err; +        }          if (!check_is_linkfile (inode, stbuf, xattr, conf->link_xattr_name)) { -                main_local->op_ret  = -1; -                main_local->op_errno = ENOTEMPTY; +                readdirp_local->op_ret  = -1; +                readdirp_local->op_errno = ENOTEMPTY; -                 gf_uuid_unparse(local->loc.gfid, gfid); +                gf_uuid_unparse(local->loc.gfid, gfid);                  gf_msg (this->name, GF_LOG_WARNING, 0,                          DHT_MSG_NOT_LINK_FILE_ERROR, @@ -8289,9 +8304,9 @@ dht_rmdir_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        this_call_cnt = dht_frame_return (main_frame); +        this_call_cnt = dht_frame_return (readdirp_frame);          if (is_last_call (this_call_cnt)) -                dht_rmdir_do (main_frame, this); +                dht_rmdir_readdirp_do (readdirp_frame, this);          DHT_STACK_DESTROY (frame);          return 0; @@ -8304,24 +8319,30 @@ dht_rmdir_cached_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                               struct iatt *stbuf, dict_t *xattr,                               struct iatt *parent)  { -        dht_local_t    *local         = NULL; -        xlator_t       *src           = NULL; -        call_frame_t   *main_frame    = NULL; -        dht_local_t    *main_local    = NULL; -        int             this_call_cnt = 0; -        dht_conf_t     *conf          = this->private; -        dict_t         *xattrs        = NULL; -        int             ret           = 0; +        dht_local_t    *local             = NULL; +        xlator_t       *src               = NULL; +        call_frame_t   *readdirp_frame    = NULL; +        dht_local_t    *readdirp_local    = NULL; +        int             this_call_cnt     = 0; +        dht_conf_t     *conf              = this->private; +        dict_t         *xattrs            = NULL; +        int             ret               = 0;          local = frame->local;          src   = local->hashed_subvol; -        main_frame = local->main_frame; -        main_local = main_frame->local; + +        /* main_frame here is the readdirp_frame */ + +        readdirp_frame = local->main_frame; +        readdirp_local = readdirp_frame->local; + +        gf_msg_debug (this->name, 0, "returning for %s ", +                      local->loc.path);          if (op_ret == 0) { -                main_local->op_ret  = -1; -                main_local->op_errno = ENOTEMPTY; +                readdirp_local->op_ret  = -1; +                readdirp_local->op_errno = ENOTEMPTY;                  gf_msg (this->name, GF_LOG_WARNING, 0,                          DHT_MSG_SUBVOL_ERROR, @@ -8329,8 +8350,13 @@ dht_rmdir_cached_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          local->loc.path, src->name);                  goto err;          } else if (op_errno != ENOENT) { -                main_local->op_ret  = -1; -                main_local->op_errno = op_errno; +                readdirp_local->op_ret  = -1; +                readdirp_local->op_errno = op_errno; + +                gf_msg (this->name, GF_LOG_WARNING, op_errno, +                        DHT_MSG_SUBVOL_ERROR, +                        "%s not found on cached subvol %s", +                        local->loc.path, src->name);                  goto err;          } @@ -8351,7 +8377,6 @@ dht_rmdir_cached_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          dict_unref (xattrs);                  goto err;          } -          STACK_WIND_COOKIE (frame, dht_rmdir_lookup_cbk, src, src,                             src->fops->lookup, &local->loc, xattrs);          if (xattrs) @@ -8360,9 +8385,16 @@ dht_rmdir_cached_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          return 0;  err: -        this_call_cnt = dht_frame_return (main_frame); +        this_call_cnt = dht_frame_return (readdirp_frame); + +        /* Once all the lookups/unlinks etc have returned, proceed to wind +         * readdirp on the subvol again until no entries are returned. +         * This is required if there are more entries than can be returned +         * in a single readdirp call. +         */ +          if (is_last_call (this_call_cnt)) -                dht_rmdir_do (main_frame, this); +                dht_rmdir_readdirp_do (readdirp_frame, this);          DHT_STACK_DESTROY (frame);          return 0; @@ -8457,12 +8489,13 @@ dht_rmdir_is_subvol_empty (call_frame_t *frame, xlator_t *this,                  gf_uuid_unparse(lookup_local->loc.gfid, gfid); -                gf_msg_trace (this->name, 0, +                gf_msg_debug (this->name, 0,                                "looking up %s on subvolume %s, gfid = %s",                                lookup_local->loc.path, src->name, gfid);                  LOCK (&frame->lock);                  { +                        /* Increment the call count for the readdir frame */                          local->call_cnt++;                  }                  UNLOCK (&frame->lock); @@ -8470,15 +8503,27 @@ dht_rmdir_is_subvol_empty (call_frame_t *frame, xlator_t *this,                  subvol = dht_linkfile_subvol (this, NULL, &trav->d_stat,                                                trav->dict);                  if (!subvol) { +                          gf_msg (this->name, GF_LOG_INFO, 0,                                  DHT_MSG_INVALID_LINKFILE,                                  "Linkfile does not have link subvolume. "                                  "path = %s, gfid = %s",                                  lookup_local->loc.path, gfid); + +                        gf_msg_debug (this->name, 0, +                                     "looking up %s on subvolume %s, gfid = %s", +                                      lookup_local->loc.path, src->name, gfid); +                          STACK_WIND_COOKIE (lookup_frame, dht_rmdir_lookup_cbk,                                             src, src, src->fops->lookup,                                             &lookup_local->loc, xattrs);                  } else { +                        gf_msg_debug (this->name, 0, +                                      "Looking up linkfile target %s on " +                                      " subvolume %s, gfid = %s", +                                      lookup_local->loc.path, subvol->name, +                                      gfid); +                          STACK_WIND (lookup_frame, dht_rmdir_cached_lookup_cbk,                                      subvol, subvol->fops->lookup,                                      &lookup_local->loc, xattrs); @@ -8500,17 +8545,56 @@ err:  } + +/* + * No more entries on this subvol. Proceed to the actual rmdir operation. + */ + +void +dht_rmdir_readdirp_done (call_frame_t *readdirp_frame, xlator_t *this) +{ + +        call_frame_t      *main_frame    = NULL; +        dht_local_t       *main_local         = NULL; +        dht_local_t       *local         = NULL; +        int                this_call_cnt = 0; + + +        local = readdirp_frame->local; +        main_frame = local->main_frame; +        main_local = main_frame->local; + +        /* At least one readdirp failed. +         * This is a bit hit or miss - if readdirp failed on more than +         * one subvol, we don't know which error is returned. +         */ +        if (local->op_ret == -1) { +                main_local->op_ret = local->op_ret; +                main_local->op_errno = local->op_errno; +        } + +        this_call_cnt = dht_frame_return (main_frame); + +        if (is_last_call (this_call_cnt)) +                dht_rmdir_do (main_frame, this); + + +        DHT_STACK_DESTROY (readdirp_frame); +} + + +  int  dht_rmdir_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          int op_ret, int op_errno, gf_dirent_t *entries,                          dict_t *xdata)  {          dht_local_t  *local = NULL; -        int           this_call_cnt = -1;          xlator_t     *prev = NULL;          xlator_t     *src = NULL;          int           ret = 0; +          local = frame->local;          prev  = cookie;          src   = prev; @@ -8526,7 +8610,7 @@ dht_rmdir_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                                        local->loc.path, op_ret);                          local->op_ret = -1;                          local->op_errno = ENOTEMPTY; -                        break; +                        goto done;                  default:                          /* @ret number of linkfiles are getting unlinked */                          gf_msg_trace (this->name, 0, @@ -8535,17 +8619,54 @@ dht_rmdir_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                                        local->loc.path, ret);                          break;                  } +          } -        this_call_cnt = dht_frame_return (frame); -        if (is_last_call (this_call_cnt)) { -                dht_rmdir_do (frame, this); +        if (ret) { +                return 0;          } +done: +        /* readdirp failed or no linkto files were found on this subvol */ + +        dht_rmdir_readdirp_done (frame, this);          return 0;  } +/* Keep sending readdirp on the subvol until it returns no more entries + * It is possible that not all entries will fit in a single readdirp in which + * case the rmdir will keep failing with ENOTEMPTY + */ + +int +dht_rmdir_readdirp_do (call_frame_t *readdirp_frame, xlator_t *this) +{ +        dht_local_t *local  = NULL; + +        local = readdirp_frame->local; + +        if (local->op_ret == -1) { +                /* there is no point doing another readdirp on this +                 * subvol . */ +                dht_rmdir_readdirp_done (readdirp_frame, this); +                return 0; +        } + +        gf_msg_debug ("this->name", 0, "Calling dht_rmdir_readdirp_do for %p", +                      readdirp_frame); + +        STACK_WIND_COOKIE (readdirp_frame, dht_rmdir_readdirp_cbk, +                           local->hashed_subvol, +                           local->hashed_subvol, +                           local->hashed_subvol->fops->readdirp, +                           local->fd, 4096, 0, local->xattr); + + +        return 0; + +} +  int  dht_rmdir_opendir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, @@ -8554,11 +8675,13 @@ dht_rmdir_opendir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          dht_local_t  *local         = NULL;          int           this_call_cnt = -1;          xlator_t     *prev          = NULL; -        dict_t       *dict          = NULL;          int           ret           = 0;          dht_conf_t   *conf          = this->private; +        dict_t       *dict          = NULL;          int           i             = 0; -        char               gfid[GF_UUID_BUF_SIZE] = {0}; +        char          gfid[GF_UUID_BUF_SIZE] = {0}; +        dht_local_t  *readdirp_local = NULL; +        call_frame_t *readdirp_frame = NULL;          local = frame->local;          prev  = cookie; @@ -8586,6 +8709,7 @@ dht_rmdir_opendir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  goto err;          fd_bind (fd); +          dict = dict_new ();          if (!dict) {                  local->op_ret = -1; @@ -8601,16 +8725,50 @@ dht_rmdir_opendir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                          local->loc.path, conf->link_xattr_name);          local->call_cnt = conf->subvolume_cnt; + + +        /* Create a separate frame per subvol as we might need +         * to resend readdirp multiple times to get all the +         * entries. +         */ +          for (i = 0; i < conf->subvolume_cnt; i++) { -                STACK_WIND_COOKIE (frame, dht_rmdir_readdirp_cbk, + +                readdirp_frame = copy_frame (frame); + +                if (!readdirp_frame) { +                        local->call_cnt--; +                        continue; +                } + +                readdirp_local = dht_local_init (readdirp_frame, &local->loc, +                                                 local->fd, 0); + +                if (!readdirp_local) { +                        DHT_STACK_DESTROY (readdirp_frame); +                        local->call_cnt--; +                        continue; +                } +                readdirp_local->main_frame = frame; +                readdirp_local->op_ret = 0; +                readdirp_local->xattr = dict_ref (dict); +                /* overload this field to save the subvol info */ +                readdirp_local->hashed_subvol = conf->subvolumes[i]; + +                STACK_WIND_COOKIE (readdirp_frame, dht_rmdir_readdirp_cbk,                                     conf->subvolumes[i], conf->subvolumes[i],                                     conf->subvolumes[i]->fops->readdirp, -                                   local->fd, 4096, 0, dict); +                                   readdirp_local->fd, 4096, 0, +                                   readdirp_local->xattr);          }          if (dict)                  dict_unref (dict); +        /* Could not wind readdirp to any subvol */ +        if (!local->call_cnt) +                goto err; +          return 0;  err:  | 
