summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/dht-rebalance.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/dht/src/dht-rebalance.c')
-rw-r--r--xlators/cluster/dht/src/dht-rebalance.c326
1 files changed, 221 insertions, 105 deletions
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c
index 60008f2bd..7b04e8a2d 100644
--- a/xlators/cluster/dht/src/dht-rebalance.c
+++ b/xlators/cluster/dht/src/dht-rebalance.c
@@ -83,15 +83,6 @@ dht_write_with_holes (xlator_t *to, fd_t *fd, struct iovec *vec, int count,
break;
}
- /* do it regardless of all the above cases as we had to 'write' the
- given number of bytes */
- ret = syncop_ftruncate (to, fd, offset + size);
- if (ret) {
- gf_log (THIS->name, GF_LOG_WARNING,
- "failed to perform truncate on %s", to->name);
- goto out;
- }
-
ret = size;
out:
return ret;
@@ -102,8 +93,7 @@ static inline int
__is_file_migratable (xlator_t *this, loc_t *loc, dict_t *rsp_dict,
struct iatt *stbuf)
{
- int ret = -1;
- int open_fd_count = 0;
+ int ret = -1;
if (!IA_ISREG (stbuf->ia_type)) {
gf_log (this->name, GF_LOG_WARNING,
@@ -121,14 +111,6 @@ __is_file_migratable (xlator_t *this, loc_t *loc, dict_t *rsp_dict,
goto out;
}
- ret = dict_get_int32 (rsp_dict, GLUSTERFS_OPEN_FD_COUNT, &open_fd_count);
- if (!ret && (open_fd_count > 0)) {
- /* TODO: support migration of files with open fds */
- gf_log (this->name, GF_LOG_WARNING,
- "%s: file has open fds, not attempting migration",
- loc->path);
- goto out;
- }
ret = 0;
out:
@@ -137,11 +119,10 @@ out:
static inline int
__dht_rebalance_create_dst_file (xlator_t *to, loc_t *loc, struct iatt *stbuf,
- dict_t *dict, fd_t **dst_fd, int *need_unlink)
+ dict_t *dict, fd_t **dst_fd)
{
xlator_t *this = NULL;
int ret = -1;
- mode_t mode = 0;
fd_t *fd = NULL;
struct iatt new_stbuf = {0,};
@@ -154,6 +135,13 @@ __dht_rebalance_create_dst_file (xlator_t *to, loc_t *loc, struct iatt *stbuf,
goto out;
}
+ ret = dict_set_str (dict, DHT_LINKFILE_KEY, to->name);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: failed to set gfid in dict for create", loc->path);
+ goto out;
+ }
+
fd = fd_create (loc->inode, DHT_REBALANCE_PID);
if (!fd) {
gf_log (this->name, GF_LOG_ERROR,
@@ -163,40 +151,27 @@ __dht_rebalance_create_dst_file (xlator_t *to, loc_t *loc, struct iatt *stbuf,
}
ret = syncop_lookup (to, loc, NULL, &new_stbuf, NULL, NULL);
- if (ret) {
- gf_log (this->name, GF_LOG_DEBUG, "failed to lookup %s on %s",
- loc->path, to->name);
-
- mode = st_mode_from_ia (stbuf->ia_prot, stbuf->ia_type);
- ret = syncop_create (to, loc, O_WRONLY, mode, fd, dict);
- if (ret < 0) {
+ if (!ret) {
+ /* File exits in the destination, check if gfid matches */
+ if (uuid_compare (stbuf->ia_gfid, new_stbuf.ia_gfid) != 0) {
gf_log (this->name, GF_LOG_ERROR,
- "failed to create %s on %s", loc->path, to->name);
+ "file %s exits in %s with different gfid",
+ loc->path, to->name);
+ fd_unref (fd);
goto out;
}
-
- *need_unlink = 1;
- goto done;
- }
-
- /* File exits in the destination, just do the open if gfid matches */
- if (uuid_compare (stbuf->ia_gfid, new_stbuf.ia_gfid) != 0) {
- gf_log (this->name, GF_LOG_ERROR,
- "file %s exits in %s with different gfid",
- loc->path, to->name);
- fd_unref (fd);
- goto out;
}
- ret = syncop_open (to, loc, O_WRONLY, fd);
+ /* Create the destination with LINKFILE mode, and linkto xattr,
+ if the linkfile already exists, it will just open the file */
+ ret = syncop_create (to, loc, O_RDWR, DHT_LINKFILE_MODE, fd,
+ dict);
if (ret < 0) {
gf_log (this->name, GF_LOG_ERROR,
- "failed to open file %s on %s",
- loc->path, to->name);
- fd_unref (fd);
+ "failed to create %s on %s", loc->path, to->name);
goto out;
}
-done:
+
if (dst_fd)
*dst_fd = fd;
@@ -254,16 +229,21 @@ out:
static inline int
__dht_rebalane_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst,
- int hole_exists)
+ uint64_t ia_size, int hole_exists)
{
- int ret = -1;
+ int ret = 0;
int count = 0;
off_t offset = 0;
struct iovec *vector = NULL;
struct iobref *iobref = NULL;
-
- while (1) {
- ret = syncop_readv (from, src, DHT_REBALANCE_BLKSIZE,
+ uint64_t total = 0;
+ size_t read_size = 0;
+
+ /* if file size is '0', no need to enter this loop */
+ while (total < ia_size) {
+ read_size = (((ia_size - total) > DHT_REBALANCE_BLKSIZE) ?
+ DHT_REBALANCE_BLKSIZE : (ia_size - total));
+ ret = syncop_readv (from, src, read_size,
offset, &vector, &count, &iobref);
if (!ret || (ret < 0)) {
break;
@@ -279,6 +259,7 @@ __dht_rebalane_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst,
break;
}
offset += ret;
+ total += ret;
if (vector)
GF_FREE (vector);
@@ -298,6 +279,84 @@ __dht_rebalane_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst,
return ret;
}
+
+static inline int
+__dht_rebalance_open_src_file (xlator_t *from, xlator_t *to, loc_t *loc,
+ struct iatt *stbuf, fd_t **src_fd)
+{
+ int ret = 0;
+ fd_t *fd = NULL;
+ dict_t *dict = NULL;
+ xlator_t *this = NULL;
+ struct iatt iatt = {0,};
+
+ this = THIS;
+
+ fd = fd_create (loc->inode, DHT_REBALANCE_PID);
+ if (!fd) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "%s: fd create failed (source)", loc->path);
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncop_open (from, loc, O_RDWR, fd);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to open file %s on %s",
+ loc->path, from->name);
+ goto out;
+ }
+
+ ret = -1;
+ dict = dict_new ();
+ if (!dict)
+ goto out;
+
+ ret = dict_set_str (dict, DHT_LINKFILE_KEY, to->name);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set xattr in dict for %s (linkto:%s)",
+ loc->path, to->name);
+ goto out;
+ }
+
+ /* Once the migration starts, the source should have 'linkto' key set
+ to show which is the target, so other clients can work around it */
+ ret = syncop_setxattr (from, loc, dict, 0);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set xattr on %s in %s",
+ loc->path, from->name);
+ goto out;
+ }
+
+ /* mode should be (+S+T) to indicate migration is in progress */
+ iatt.ia_prot = stbuf->ia_prot;
+ iatt.ia_type = stbuf->ia_type;
+ iatt.ia_prot.sticky = 1;
+ iatt.ia_prot.sgid = 1;
+
+ ret = syncop_setattr (from, loc, &iatt, GF_SET_ATTR_MODE, NULL, NULL);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to set mode on %s in %s",
+ loc->path, from->name);
+ goto out;
+ }
+
+ if (src_fd)
+ *src_fd = fd;
+
+ /* success */
+ ret = 0;
+out:
+ if (dict)
+ dict_unref (dict);
+
+ return ret;
+}
+
int
dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
int flag)
@@ -305,13 +364,13 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
int ret = -1;
struct iatt new_stbuf = {0,};
struct iatt stbuf = {0,};
+ struct iatt empty_iatt = {0,};
fd_t *src_fd = NULL;
fd_t *dst_fd = NULL;
dict_t *dict = NULL;
dict_t *xattr = NULL;
dict_t *rsp_dict = NULL;
int file_has_holes = 0;
- int need_unlink = 0;
gf_log (this->name, GF_LOG_INFO, "%s: attempting to move from %s to %s",
loc->path, from->name, to->name);
@@ -326,6 +385,7 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
"%s: failed to set fd-count key in dict, may attempt "
"migration of file which has open fds", loc->path);
+ /* Phase 1 - Data migration is in progress from now on */
ret = syncop_lookup (from, loc, dict, &stbuf, &rsp_dict, NULL);
if (ret) {
gf_log (this->name, GF_LOG_ERROR, "failed to lookup %s on %s",
@@ -338,9 +398,8 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
if (ret)
goto out;
- /* create the destination */
- ret = __dht_rebalance_create_dst_file (to, loc, &stbuf, dict, &dst_fd,
- &need_unlink);
+ /* create the destination, with required modes/xattr */
+ ret = __dht_rebalance_create_dst_file (to, loc, &stbuf, dict, &dst_fd);
if (ret)
goto out;
@@ -351,81 +410,142 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
goto out;
}
- /* Try to preserve 'holes' while migrating data */
- if (stbuf.ia_size > (stbuf.ia_blocks * GF_DISK_SECTOR_SIZE))
- file_has_holes = 1;
-
- src_fd = fd_create (loc->inode, DHT_REBALANCE_PID);
- if (!src_fd) {
- gf_log (this->name, GF_LOG_ERROR,
- "%s: fd create failed (source)", loc->path);
- ret = -1;
+ /* Open the source, and also update mode/xattr */
+ ret = __dht_rebalance_open_src_file (from, to, loc, &stbuf, &src_fd);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to open %s on %s",
+ loc->path, from->name);
goto out;
}
- ret = syncop_open (from, loc, O_RDONLY, src_fd);
- if (ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "failed to open file %s on %s",
+ ret = syncop_fstat (from, src_fd, &stbuf);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to lookup %s on %s",
loc->path, from->name);
goto out;
}
+ /* Try to preserve 'holes' while migrating data */
+ if (stbuf.ia_size > (stbuf.ia_blocks * GF_DISK_SECTOR_SIZE))
+ file_has_holes = 1;
+
/* All I/O happens in this function */
ret = __dht_rebalane_migrate_data (from, to, src_fd, dst_fd,
- file_has_holes);
+ stbuf.ia_size, file_has_holes);
if (ret) {
gf_log (this->name, GF_LOG_ERROR, "%s: failed to migrate data",
loc->path);
goto out;
}
- ret = syncop_lookup (from, loc, NULL, &new_stbuf, NULL, NULL);
+ /* TODO: move all xattr related operations to fd based operations */
+ ret = syncop_listxattr (from, loc, &xattr);
+ if (ret == -1)
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s: failed to get xattr from %s", loc->path, from->name);
+
+ ret = syncop_setxattr (to, loc, xattr, 0);
+ if (ret == -1)
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s: failed to set xattr on %s", loc->path, to->name);
+
+ /* TODO: Sync the locks */
+
+ ret = syncop_fsync (to, dst_fd);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s: failed to fsync on %s", loc->path, to->name);
+
+
+ /* Phase 2 - Data-Migration Complete, Housekeeping updates pending */
+
+ ret = syncop_fstat (from, src_fd, &new_stbuf);
if (ret < 0) {
/* Failed to get the stat info */
gf_log (this->name, GF_LOG_ERROR,
- "failed to lookup file %s on %s",
+ "failed to fstat file %s on %s",
loc->path, from->name);
- need_unlink = 0;
goto out;
}
- /* No need to rebalance, if there is some
- activity on source file */
- if (new_stbuf.ia_mtime != stbuf.ia_mtime) {
+ /* source would have both sticky bit and sgid bit set, reset it to 0,
+ and set the source permission on destination */
+ new_stbuf.ia_prot.sticky = 0;
+ new_stbuf.ia_prot.sgid = 0;
+
+ /* TODO: if the source actually had sticky bit, or sgid bit set,
+ we are not handling it */
+
+ ret = syncop_fsetattr (to, dst_fd, &new_stbuf,
+ (GF_SET_ATTR_UID | GF_SET_ATTR_GID |
+ GF_SET_ATTR_MODE), NULL, NULL);
+ if (ret) {
gf_log (this->name, GF_LOG_WARNING,
- "%s: ignoring destination file as source has "
- "undergone some changes while migration was happening",
- loc->path);
- ret = -1;
- goto out;
+ "%s: failed to perform setattr on %s",
+ loc->path, to->name);
}
+ /* Because 'futimes' is not portable */
ret = syncop_setattr (to, loc, &new_stbuf,
- (GF_SET_ATTR_UID | GF_SET_ATTR_GID |
- GF_SET_ATTR_MODE | GF_SET_ATTR_ATIME |
- GF_SET_ATTR_MTIME), NULL, NULL);
+ (GF_SET_ATTR_MTIME | GF_SET_ATTR_ATIME),
+ NULL, NULL);
if (ret) {
gf_log (this->name, GF_LOG_WARNING,
"%s: failed to perform setattr on %s",
loc->path, to->name);
}
- ret = syncop_listxattr (from, loc, &xattr);
- if (ret == -1)
+ /* Make the source as a linkfile first before deleting it */
+ empty_iatt.ia_prot.sticky = 1;
+ ret = syncop_fsetattr (from, src_fd, &empty_iatt,
+ GF_SET_ATTR_MODE, NULL, NULL);
+ if (ret) {
+ gf_log (this->name, GF_LOG_WARNING, \
+ "%s: failed to perform setattr on %s",
+ loc->path, from->name);
+ }
+
+ /* Do a stat and check the gfid before unlink */
+ ret = syncop_stat (from, loc, &empty_iatt);
+ if (ret) {
gf_log (this->name, GF_LOG_WARNING,
- "%s: failed to get xattr from %s", loc->path, from->name);
+ "%s: failed to do a stat on %s",
+ loc->path, from->name);
+ }
- ret = syncop_setxattr (to, loc, xattr, 0);
- if (ret == -1)
+ if (uuid_compare (empty_iatt.ia_gfid, loc->inode->gfid) == 0) {
+ /* take out the source from namespace */
+ ret = syncop_unlink (from, loc);
+ if (ret) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s: failed to perform unlink on %s",
+ loc->path, from->name);
+ }
+ }
+
+ /* Free up the data blocks on the source node, as the whole
+ file is migrated */
+ ret = syncop_ftruncate (from, src_fd, 0);
+ if (ret) {
gf_log (this->name, GF_LOG_WARNING,
- "%s: failed to set xattr on %s", loc->path, to->name);
+ "%s: failed to perform truncate on %s",
+ loc->path, from->name);
+ }
- /* rebalance complete */
- syncop_close (dst_fd);
- syncop_close (src_fd);
- syncop_unlink (from, loc);
- need_unlink = 0;
+ /* remove the 'linkto' xattr from the destination */
+ ret = syncop_removexattr (to, loc, DHT_LINKFILE_KEY);
+ if (ret) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s: failed to perform removexattr on %s",
+ loc->path, to->name);
+ }
+
+ ret = syncop_lookup (this, loc, NULL, NULL, NULL, NULL);
+ if (ret) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "%s: failed to lookup the file on subvolumes",
+ loc->path);
+ }
gf_log (this->name, GF_LOG_INFO,
"completed migration of %s from subvolume %s to %s",
@@ -436,14 +556,10 @@ out:
if (dict)
dict_unref (dict);
- if (ret) {
- if (dst_fd)
- syncop_close (dst_fd);
- if (src_fd)
- syncop_close (src_fd);
- if (need_unlink)
- syncop_unlink (to, loc);
- }
+ if (dst_fd)
+ syncop_close (dst_fd);
+ if (src_fd)
+ syncop_close (src_fd);
return ret;
}
@@ -461,8 +577,8 @@ rebalance_task (void *data)
/* This function is 'synchrounous', hence if it returns,
we are done with the task */
- ret = dht_migrate_file (THIS, &local->loc, local->from_subvol,
- local->to_subvol, local->flags);
+ ret = dht_migrate_file (THIS, &local->loc, local->rebalance.from_subvol,
+ local->rebalance.target_node, local->flags);
return ret;
}
@@ -488,7 +604,7 @@ rebalance_task_completion (int op_ret, call_frame_t *sync_frame, void *data)
dht_layout_unref (this, layout);
}
- ret = dht_layout_preset (this, local->to_subvol,
+ ret = dht_layout_preset (this, local->rebalance.target_node,
local->loc.inode);
if (ret)
gf_log (this->name, GF_LOG_WARNING,