diff options
Diffstat (limited to 'xlators/cluster/dht/src/dht-rebalance.c')
-rw-r--r-- | xlators/cluster/dht/src/dht-rebalance.c | 326 |
1 files changed, 221 insertions, 105 deletions
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index 60008f2bd64..7b04e8a2dc8 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -83,15 +83,6 @@ dht_write_with_holes (xlator_t *to, fd_t *fd, struct iovec *vec, int count, break; } - /* do it regardless of all the above cases as we had to 'write' the - given number of bytes */ - ret = syncop_ftruncate (to, fd, offset + size); - if (ret) { - gf_log (THIS->name, GF_LOG_WARNING, - "failed to perform truncate on %s", to->name); - goto out; - } - ret = size; out: return ret; @@ -102,8 +93,7 @@ static inline int __is_file_migratable (xlator_t *this, loc_t *loc, dict_t *rsp_dict, struct iatt *stbuf) { - int ret = -1; - int open_fd_count = 0; + int ret = -1; if (!IA_ISREG (stbuf->ia_type)) { gf_log (this->name, GF_LOG_WARNING, @@ -121,14 +111,6 @@ __is_file_migratable (xlator_t *this, loc_t *loc, dict_t *rsp_dict, goto out; } - ret = dict_get_int32 (rsp_dict, GLUSTERFS_OPEN_FD_COUNT, &open_fd_count); - if (!ret && (open_fd_count > 0)) { - /* TODO: support migration of files with open fds */ - gf_log (this->name, GF_LOG_WARNING, - "%s: file has open fds, not attempting migration", - loc->path); - goto out; - } ret = 0; out: @@ -137,11 +119,10 @@ out: static inline int __dht_rebalance_create_dst_file (xlator_t *to, loc_t *loc, struct iatt *stbuf, - dict_t *dict, fd_t **dst_fd, int *need_unlink) + dict_t *dict, fd_t **dst_fd) { xlator_t *this = NULL; int ret = -1; - mode_t mode = 0; fd_t *fd = NULL; struct iatt new_stbuf = {0,}; @@ -154,6 +135,13 @@ __dht_rebalance_create_dst_file (xlator_t *to, loc_t *loc, struct iatt *stbuf, goto out; } + ret = dict_set_str (dict, DHT_LINKFILE_KEY, to->name); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "%s: failed to set gfid in dict for create", loc->path); + goto out; + } + fd = fd_create (loc->inode, DHT_REBALANCE_PID); if (!fd) { gf_log (this->name, GF_LOG_ERROR, @@ -163,40 +151,27 @@ __dht_rebalance_create_dst_file (xlator_t *to, loc_t *loc, struct iatt *stbuf, } ret = syncop_lookup (to, loc, NULL, &new_stbuf, NULL, NULL); - if (ret) { - gf_log (this->name, GF_LOG_DEBUG, "failed to lookup %s on %s", - loc->path, to->name); - - mode = st_mode_from_ia (stbuf->ia_prot, stbuf->ia_type); - ret = syncop_create (to, loc, O_WRONLY, mode, fd, dict); - if (ret < 0) { + if (!ret) { + /* File exits in the destination, check if gfid matches */ + if (uuid_compare (stbuf->ia_gfid, new_stbuf.ia_gfid) != 0) { gf_log (this->name, GF_LOG_ERROR, - "failed to create %s on %s", loc->path, to->name); + "file %s exits in %s with different gfid", + loc->path, to->name); + fd_unref (fd); goto out; } - - *need_unlink = 1; - goto done; - } - - /* File exits in the destination, just do the open if gfid matches */ - if (uuid_compare (stbuf->ia_gfid, new_stbuf.ia_gfid) != 0) { - gf_log (this->name, GF_LOG_ERROR, - "file %s exits in %s with different gfid", - loc->path, to->name); - fd_unref (fd); - goto out; } - ret = syncop_open (to, loc, O_WRONLY, fd); + /* Create the destination with LINKFILE mode, and linkto xattr, + if the linkfile already exists, it will just open the file */ + ret = syncop_create (to, loc, O_RDWR, DHT_LINKFILE_MODE, fd, + dict); if (ret < 0) { gf_log (this->name, GF_LOG_ERROR, - "failed to open file %s on %s", - loc->path, to->name); - fd_unref (fd); + "failed to create %s on %s", loc->path, to->name); goto out; } -done: + if (dst_fd) *dst_fd = fd; @@ -254,16 +229,21 @@ out: static inline int __dht_rebalane_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst, - int hole_exists) + uint64_t ia_size, int hole_exists) { - int ret = -1; + int ret = 0; int count = 0; off_t offset = 0; struct iovec *vector = NULL; struct iobref *iobref = NULL; - - while (1) { - ret = syncop_readv (from, src, DHT_REBALANCE_BLKSIZE, + uint64_t total = 0; + size_t read_size = 0; + + /* if file size is '0', no need to enter this loop */ + while (total < ia_size) { + read_size = (((ia_size - total) > DHT_REBALANCE_BLKSIZE) ? + DHT_REBALANCE_BLKSIZE : (ia_size - total)); + ret = syncop_readv (from, src, read_size, offset, &vector, &count, &iobref); if (!ret || (ret < 0)) { break; @@ -279,6 +259,7 @@ __dht_rebalane_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst, break; } offset += ret; + total += ret; if (vector) GF_FREE (vector); @@ -298,6 +279,84 @@ __dht_rebalane_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst, return ret; } + +static inline int +__dht_rebalance_open_src_file (xlator_t *from, xlator_t *to, loc_t *loc, + struct iatt *stbuf, fd_t **src_fd) +{ + int ret = 0; + fd_t *fd = NULL; + dict_t *dict = NULL; + xlator_t *this = NULL; + struct iatt iatt = {0,}; + + this = THIS; + + fd = fd_create (loc->inode, DHT_REBALANCE_PID); + if (!fd) { + gf_log (this->name, GF_LOG_ERROR, + "%s: fd create failed (source)", loc->path); + ret = -1; + goto out; + } + + ret = syncop_open (from, loc, O_RDWR, fd); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "failed to open file %s on %s", + loc->path, from->name); + goto out; + } + + ret = -1; + dict = dict_new (); + if (!dict) + goto out; + + ret = dict_set_str (dict, DHT_LINKFILE_KEY, to->name); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set xattr in dict for %s (linkto:%s)", + loc->path, to->name); + goto out; + } + + /* Once the migration starts, the source should have 'linkto' key set + to show which is the target, so other clients can work around it */ + ret = syncop_setxattr (from, loc, dict, 0); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set xattr on %s in %s", + loc->path, from->name); + goto out; + } + + /* mode should be (+S+T) to indicate migration is in progress */ + iatt.ia_prot = stbuf->ia_prot; + iatt.ia_type = stbuf->ia_type; + iatt.ia_prot.sticky = 1; + iatt.ia_prot.sgid = 1; + + ret = syncop_setattr (from, loc, &iatt, GF_SET_ATTR_MODE, NULL, NULL); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set mode on %s in %s", + loc->path, from->name); + goto out; + } + + if (src_fd) + *src_fd = fd; + + /* success */ + ret = 0; +out: + if (dict) + dict_unref (dict); + + return ret; +} + int dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to, int flag) @@ -305,13 +364,13 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to, int ret = -1; struct iatt new_stbuf = {0,}; struct iatt stbuf = {0,}; + struct iatt empty_iatt = {0,}; fd_t *src_fd = NULL; fd_t *dst_fd = NULL; dict_t *dict = NULL; dict_t *xattr = NULL; dict_t *rsp_dict = NULL; int file_has_holes = 0; - int need_unlink = 0; gf_log (this->name, GF_LOG_INFO, "%s: attempting to move from %s to %s", loc->path, from->name, to->name); @@ -326,6 +385,7 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to, "%s: failed to set fd-count key in dict, may attempt " "migration of file which has open fds", loc->path); + /* Phase 1 - Data migration is in progress from now on */ ret = syncop_lookup (from, loc, dict, &stbuf, &rsp_dict, NULL); if (ret) { gf_log (this->name, GF_LOG_ERROR, "failed to lookup %s on %s", @@ -338,9 +398,8 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to, if (ret) goto out; - /* create the destination */ - ret = __dht_rebalance_create_dst_file (to, loc, &stbuf, dict, &dst_fd, - &need_unlink); + /* create the destination, with required modes/xattr */ + ret = __dht_rebalance_create_dst_file (to, loc, &stbuf, dict, &dst_fd); if (ret) goto out; @@ -351,81 +410,142 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to, goto out; } - /* Try to preserve 'holes' while migrating data */ - if (stbuf.ia_size > (stbuf.ia_blocks * GF_DISK_SECTOR_SIZE)) - file_has_holes = 1; - - src_fd = fd_create (loc->inode, DHT_REBALANCE_PID); - if (!src_fd) { - gf_log (this->name, GF_LOG_ERROR, - "%s: fd create failed (source)", loc->path); - ret = -1; + /* Open the source, and also update mode/xattr */ + ret = __dht_rebalance_open_src_file (from, to, loc, &stbuf, &src_fd); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "failed to open %s on %s", + loc->path, from->name); goto out; } - ret = syncop_open (from, loc, O_RDONLY, src_fd); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "failed to open file %s on %s", + ret = syncop_fstat (from, src_fd, &stbuf); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "failed to lookup %s on %s", loc->path, from->name); goto out; } + /* Try to preserve 'holes' while migrating data */ + if (stbuf.ia_size > (stbuf.ia_blocks * GF_DISK_SECTOR_SIZE)) + file_has_holes = 1; + /* All I/O happens in this function */ ret = __dht_rebalane_migrate_data (from, to, src_fd, dst_fd, - file_has_holes); + stbuf.ia_size, file_has_holes); if (ret) { gf_log (this->name, GF_LOG_ERROR, "%s: failed to migrate data", loc->path); goto out; } - ret = syncop_lookup (from, loc, NULL, &new_stbuf, NULL, NULL); + /* TODO: move all xattr related operations to fd based operations */ + ret = syncop_listxattr (from, loc, &xattr); + if (ret == -1) + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to get xattr from %s", loc->path, from->name); + + ret = syncop_setxattr (to, loc, xattr, 0); + if (ret == -1) + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to set xattr on %s", loc->path, to->name); + + /* TODO: Sync the locks */ + + ret = syncop_fsync (to, dst_fd); + if (ret) + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to fsync on %s", loc->path, to->name); + + + /* Phase 2 - Data-Migration Complete, Housekeeping updates pending */ + + ret = syncop_fstat (from, src_fd, &new_stbuf); if (ret < 0) { /* Failed to get the stat info */ gf_log (this->name, GF_LOG_ERROR, - "failed to lookup file %s on %s", + "failed to fstat file %s on %s", loc->path, from->name); - need_unlink = 0; goto out; } - /* No need to rebalance, if there is some - activity on source file */ - if (new_stbuf.ia_mtime != stbuf.ia_mtime) { + /* source would have both sticky bit and sgid bit set, reset it to 0, + and set the source permission on destination */ + new_stbuf.ia_prot.sticky = 0; + new_stbuf.ia_prot.sgid = 0; + + /* TODO: if the source actually had sticky bit, or sgid bit set, + we are not handling it */ + + ret = syncop_fsetattr (to, dst_fd, &new_stbuf, + (GF_SET_ATTR_UID | GF_SET_ATTR_GID | + GF_SET_ATTR_MODE), NULL, NULL); + if (ret) { gf_log (this->name, GF_LOG_WARNING, - "%s: ignoring destination file as source has " - "undergone some changes while migration was happening", - loc->path); - ret = -1; - goto out; + "%s: failed to perform setattr on %s", + loc->path, to->name); } + /* Because 'futimes' is not portable */ ret = syncop_setattr (to, loc, &new_stbuf, - (GF_SET_ATTR_UID | GF_SET_ATTR_GID | - GF_SET_ATTR_MODE | GF_SET_ATTR_ATIME | - GF_SET_ATTR_MTIME), NULL, NULL); + (GF_SET_ATTR_MTIME | GF_SET_ATTR_ATIME), + NULL, NULL); if (ret) { gf_log (this->name, GF_LOG_WARNING, "%s: failed to perform setattr on %s", loc->path, to->name); } - ret = syncop_listxattr (from, loc, &xattr); - if (ret == -1) + /* Make the source as a linkfile first before deleting it */ + empty_iatt.ia_prot.sticky = 1; + ret = syncop_fsetattr (from, src_fd, &empty_iatt, + GF_SET_ATTR_MODE, NULL, NULL); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, \ + "%s: failed to perform setattr on %s", + loc->path, from->name); + } + + /* Do a stat and check the gfid before unlink */ + ret = syncop_stat (from, loc, &empty_iatt); + if (ret) { gf_log (this->name, GF_LOG_WARNING, - "%s: failed to get xattr from %s", loc->path, from->name); + "%s: failed to do a stat on %s", + loc->path, from->name); + } - ret = syncop_setxattr (to, loc, xattr, 0); - if (ret == -1) + if (uuid_compare (empty_iatt.ia_gfid, loc->inode->gfid) == 0) { + /* take out the source from namespace */ + ret = syncop_unlink (from, loc); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to perform unlink on %s", + loc->path, from->name); + } + } + + /* Free up the data blocks on the source node, as the whole + file is migrated */ + ret = syncop_ftruncate (from, src_fd, 0); + if (ret) { gf_log (this->name, GF_LOG_WARNING, - "%s: failed to set xattr on %s", loc->path, to->name); + "%s: failed to perform truncate on %s", + loc->path, from->name); + } - /* rebalance complete */ - syncop_close (dst_fd); - syncop_close (src_fd); - syncop_unlink (from, loc); - need_unlink = 0; + /* remove the 'linkto' xattr from the destination */ + ret = syncop_removexattr (to, loc, DHT_LINKFILE_KEY); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to perform removexattr on %s", + loc->path, to->name); + } + + ret = syncop_lookup (this, loc, NULL, NULL, NULL, NULL); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "%s: failed to lookup the file on subvolumes", + loc->path); + } gf_log (this->name, GF_LOG_INFO, "completed migration of %s from subvolume %s to %s", @@ -436,14 +556,10 @@ out: if (dict) dict_unref (dict); - if (ret) { - if (dst_fd) - syncop_close (dst_fd); - if (src_fd) - syncop_close (src_fd); - if (need_unlink) - syncop_unlink (to, loc); - } + if (dst_fd) + syncop_close (dst_fd); + if (src_fd) + syncop_close (src_fd); return ret; } @@ -461,8 +577,8 @@ rebalance_task (void *data) /* This function is 'synchrounous', hence if it returns, we are done with the task */ - ret = dht_migrate_file (THIS, &local->loc, local->from_subvol, - local->to_subvol, local->flags); + ret = dht_migrate_file (THIS, &local->loc, local->rebalance.from_subvol, + local->rebalance.target_node, local->flags); return ret; } @@ -488,7 +604,7 @@ rebalance_task_completion (int op_ret, call_frame_t *sync_frame, void *data) dht_layout_unref (this, layout); } - ret = dht_layout_preset (this, local->to_subvol, + ret = dht_layout_preset (this, local->rebalance.target_node, local->loc.inode); if (ret) gf_log (this->name, GF_LOG_WARNING, |