diff options
Diffstat (limited to 'xlators/cluster/dht/src/dht-rebalance.c')
| -rw-r--r-- | xlators/cluster/dht/src/dht-rebalance.c | 5549 |
1 files changed, 4316 insertions, 1233 deletions
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index 4c2b4a035e7..8ba8082bd86 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -1,623 +1,1464 @@ /* - Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" +#include "dht-common.h" +#include <glusterfs/syscall.h> +#include <fnmatch.h> +#include <signal.h> +#include <glusterfs/events.h> +#include "glusterfs/compat-errno.h" // for ENODATA on BSD + +#define GF_DISK_SECTOR_SIZE 512 +#define DHT_REBALANCE_PID 4242 /* Change it if required */ +#define DHT_REBALANCE_BLKSIZE 1048576 /* 1 MB */ +#define MAX_MIGRATE_QUEUE_COUNT 500 +#define MIN_MIGRATE_QUEUE_COUNT 200 +#define MAX_REBAL_TYPE_SIZE 16 +#define FILE_CNT_INTERVAL 600 /* 10 mins */ +#define ESTIMATE_START_INTERVAL 600 /* 10 mins */ +#define HARDLINK_MIG_INPROGRESS -2 +#define SKIP_MIGRATION_FD_POSITIVE -3 +#ifndef MAX +#define MAX(a, b) (((a) > (b)) ? (a) : (b)) #endif -#include "dht-common.h" +#define GF_CRAWL_INDEX_MOVE(idx, sv_cnt) \ + { \ + idx++; \ + idx %= sv_cnt; \ + } -#define GF_DISK_SECTOR_SIZE 512 -#define DHT_REBALANCE_PID 4242 /* Change it if required */ -#define DHT_REBALANCE_BLKSIZE (128 * 1024) +uint64_t g_totalfiles = 0; +uint64_t g_totalsize = 0; -static int -dht_write_with_holes (xlator_t *to, fd_t *fd, struct iovec *vec, int count, - int32_t size, off_t offset, struct iobref *iobref) +void +gf_defrag_free_dir_dfmeta(struct dir_dfmeta *meta, int local_subvols_cnt) { - int i = 0; - int ret = -1; - int start_idx = 0; - int tmp_offset = 0; - int write_needed = 0; - int buf_len = 0; - int size_pending = 0; - char *buf = NULL; - - /* loop through each vector */ - for (i = 0; i < count; i++) { - buf = vec[i].iov_base; - buf_len = vec[i].iov_len; - - for (start_idx = 0; (start_idx + GF_DISK_SECTOR_SIZE) <= buf_len; - start_idx += GF_DISK_SECTOR_SIZE) { - - if (mem_0filled (buf + start_idx, GF_DISK_SECTOR_SIZE) != 0) { - write_needed = 1; - continue; - } + int i = 0; + + if (meta) { + for (i = 0; i < local_subvols_cnt; i++) { + if (meta->equeue) + gf_dirent_free(&meta->equeue[i]); + if (meta->lfd && meta->lfd[i]) + fd_unref(meta->lfd[i]); + } - if (write_needed) { - ret = syncop_write (to, fd, (buf + tmp_offset), - (start_idx - tmp_offset), - (offset + tmp_offset), - iobref, 0); - /* 'path' will be logged in calling function */ - if (ret < 0) { - gf_log (THIS->name, GF_LOG_WARNING, - "failed to write (%s)", - strerror (errno)); - goto out; - } - - write_needed = 0; - } - tmp_offset = start_idx + GF_DISK_SECTOR_SIZE; - } + GF_FREE(meta->equeue); + GF_FREE(meta->head); + GF_FREE(meta->iterator); + GF_FREE(meta->offset_var); + GF_FREE(meta->fetch_entries); + GF_FREE(meta->lfd); + GF_FREE(meta); + } +} - if ((start_idx < buf_len) || write_needed) { - /* This means, last chunk is not yet written.. write it */ - ret = syncop_write (to, fd, (buf + tmp_offset), - (buf_len - tmp_offset), - (offset + tmp_offset), iobref, 0); - if (ret < 0) { - /* 'path' will be logged in calling function */ - gf_log (THIS->name, GF_LOG_WARNING, - "failed to write (%s)", - strerror (errno)); - goto out; - } - } +void +gf_defrag_free_container(struct dht_container *container) +{ + if (container) { + gf_dirent_entry_free(container->df_entry); - size_pending = (size - buf_len); - if (!size_pending) - break; + if (container->parent_loc) { + loc_wipe(container->parent_loc); } - ret = size; -out: - return ret; + GF_FREE(container->parent_loc); + GF_FREE(container); + } } -int32_t -gf_defrag_handle_hardlink (xlator_t *this, loc_t *loc, dict_t *xattrs, - struct iatt *stbuf) +void +dht_set_global_defrag_error(gf_defrag_info_t *defrag, int ret) { - int32_t ret = -1; - xlator_t *cached_subvol = NULL; - xlator_t *hashed_subvol = NULL; - xlator_t *linkto_subvol = NULL; - data_t *data = NULL; - struct iatt iatt = {0,}; - int32_t op_errno = 0; - - GF_VALIDATE_OR_GOTO ("defrag", loc, out); - GF_VALIDATE_OR_GOTO ("defrag", loc->name, out); - GF_VALIDATE_OR_GOTO ("defrag", stbuf, out); - GF_VALIDATE_OR_GOTO ("defrag", this, out); - GF_VALIDATE_OR_GOTO ("defrag", xattrs, out); - - if (uuid_is_null (loc->pargfid)) { - gf_log ("", GF_LOG_ERROR, "loc->pargfid is NULL for " - "%s", loc->path); - goto out; - } + LOCK(&defrag->lock); + { + defrag->global_error = ret; + } + UNLOCK(&defrag->lock); + return; +} - if (uuid_is_null (loc->gfid)) { - gf_log ("", GF_LOG_ERROR, "loc->gfid is NULL for " - "%s", loc->path); - goto out; +static int +dht_send_rebalance_event(xlator_t *this, int cmd, gf_defrag_status_t status) +{ + int ret = -1; + char *volname = NULL; + char *tmpstr = NULL; + char *ptr = NULL; + char *suffix = "-dht"; + int len = 0; + + eventtypes_t event = EVENT_LAST; + + switch (status) { + case GF_DEFRAG_STATUS_COMPLETE: + event = EVENT_VOLUME_REBALANCE_COMPLETE; + break; + case GF_DEFRAG_STATUS_FAILED: + event = EVENT_VOLUME_REBALANCE_FAILED; + break; + case GF_DEFRAG_STATUS_STOPPED: + event = EVENT_VOLUME_REBALANCE_STOP; + break; + default: + break; + } + + /* DHT volume */ + len = strlen(this->name) - strlen(suffix); + tmpstr = gf_strdup(this->name); + if (tmpstr) { + ptr = tmpstr + len; + if (!strcmp(ptr, suffix)) { + tmpstr[len] = '\0'; + volname = tmpstr; } + } - cached_subvol = dht_subvol_get_cached (this, loc->inode); - if (!cached_subvol) { - gf_log (this->name, GF_LOG_ERROR, "Failed to get cached subvol" - " for %s on %s", loc->name, this->name); - goto out; - } + if (!volname) { + /* Better than nothing */ + volname = this->name; + } - hashed_subvol = dht_subvol_get_hashed (this, loc); - if (!hashed_subvol) { - gf_log (this->name, GF_LOG_ERROR, "Failed to get hashed subvol" - " for %s on %s", loc->name, this->name); - goto out; + if (event != EVENT_LAST) { + gf_event(event, "volume=%s", volname); + } + + GF_FREE(tmpstr); + return ret; +} + +static void +dht_strip_out_acls(dict_t *dict) +{ + if (dict) { + dict_del(dict, "trusted.SGI_ACL_FILE"); + dict_del(dict, POSIX_ACL_ACCESS_XATTR); + } +} + +/* + return values: + -1 : failure + -2 : success + +Hard link migration is carried out in three stages. + +(Say there are n hardlinks) +Stage 1: Setting the new hashed subvol information on the 1st hardlink + encountered (linkto setxattr) + +Stage 2: Creating hardlinks on new hashed subvol for the 2nd to (n-1)th + hardlink + +Stage 3: Physical migration of the data file for nth hardlink + +Why to deem "-2" as success and not "0": + + dht_migrate_file expects return value "0" from _is_file_migratable if +the file has to be migrated. + + _is_file_migratable returns zero only when it is called with the +flag "GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS". + + gf_defrag_handle_hardlink calls dht_migrate_file for physical migration +of the data file with the flag "GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS" + +Hence, gf_defrag_handle_hardlink returning "0" for success will force +"dht_migrate_file" to migrate each of the hardlink which is not intended. + +For each of the three stage mentioned above "-2" will be returned and will +be converted to "0" in dht_migrate_file. + +*/ + +int32_t +gf_defrag_handle_hardlink(xlator_t *this, loc_t *loc, int *fop_errno) +{ + int32_t ret = -1; + xlator_t *cached_subvol = NULL; + xlator_t *hashed_subvol = NULL; + xlator_t *linkto_subvol = NULL; + data_t *data = NULL; + struct iatt iatt = { + 0, + }; + int32_t op_errno = 0; + dht_conf_t *conf = NULL; + gf_loglevel_t loglevel = 0; + dict_t *link_xattr = NULL; + dict_t *dict = NULL; + dict_t *xattr_rsp = NULL; + struct iatt stbuf = { + 0, + }; + + *fop_errno = EINVAL; + + GF_VALIDATE_OR_GOTO("defrag", loc, out); + GF_VALIDATE_OR_GOTO("defrag", loc->name, out); + GF_VALIDATE_OR_GOTO("defrag", this, out); + GF_VALIDATE_OR_GOTO("defrag", this->private, out); + + conf = this->private; + + if (gf_uuid_is_null(loc->pargfid)) { + gf_msg("", GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed :" + "loc->pargfid is NULL for %s", + loc->path); + *fop_errno = EINVAL; + ret = -1; + goto out; + } + + if (gf_uuid_is_null(loc->gfid)) { + gf_msg("", GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed :" + "loc->gfid is NULL for %s", + loc->path); + *fop_errno = EINVAL; + ret = -1; + goto out; + } + + link_xattr = dict_new(); + if (!link_xattr) { + ret = -1; + *fop_errno = ENOMEM; + goto out; + } + + /* + Parallel migration can lead to migration of the hard link multiple + times which can lead to data loss. Hence, adding a fresh lookup to + decide whether migration is required or not. + + Elaborating the scenario for let say 10 hardlinks [link{1..10}]: + Let say the first hard link "link1" does the setxattr of the + new hashed subvolume info on the cached file. As there are multiple + threads working, we might have already all the links created on the + new hashed by the time we reach hardlink let say link5. Now the + number of links on hashed is equal to that of cached. Hence, file + migration will happen for link6. + + Cached Hashed + --------T link6 rwxrwxrwx link6 + + Now post above state all the link file on the cached will be zero + byte linkto files. Hence, if we still do migration for the following + files link{7..10}, we will end up migrating 0 data leading to data + loss. + Hence, a lookup can make sure whether we need to migrate the + file or not. + */ + + dict = dict_new(); + if (!dict) { + ret = -1; + *fop_errno = ENOMEM; + gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_NO_MEMORY, + "could not allocate memory for dict"); + goto out; + } + + ret = dict_set_int32(dict, conf->link_xattr_name, 256); + if (ret) { + *fop_errno = ENOMEM; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: failed to set 'linkto' key in dict", + loc->path); + goto out; + } + + ret = syncop_lookup(this, loc, &stbuf, NULL, dict, &xattr_rsp); + if (ret) { + /*Ignore ENOENT and ESTALE as file might have been + migrated already*/ + if (-ret == ENOENT || -ret == ESTALE) { + ret = -2; + goto out; + } + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:%s lookup failed with ret = %d", loc->path, + ret); + *fop_errno = -ret; + ret = -1; + goto out; + } + + cached_subvol = dht_subvol_get_cached(this, loc->inode); + if (!cached_subvol) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed :" + "Failed to get cached subvol" + " for %s on %s", + loc->name, this->name); + *fop_errno = EINVAL; + ret = -1; + goto out; + } + + hashed_subvol = dht_subvol_get_hashed(this, loc); + if (!hashed_subvol) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed :" + "Failed to get hashed subvol" + " for %s on %s", + loc->name, this->name); + *fop_errno = EINVAL; + ret = -1; + goto out; + } + + /* Hardlink migration happens only with remove-brick. So this condition will + * be true only when the migration has happened. In case hardlinks are + * migrated for rebalance case, remove this check. Having this check here + * avoid redundant calls below*/ + if (hashed_subvol == cached_subvol) { + ret = -2; + goto out; + } + + gf_log(this->name, GF_LOG_INFO, + "Attempting to migrate hardlink %s " + "with gfid %s from %s -> %s", + loc->name, uuid_utoa(loc->gfid), cached_subvol->name, + hashed_subvol->name); + + data = dict_get(xattr_rsp, conf->link_xattr_name); + /* set linkto on cached -> hashed if not present, else link it */ + if (!data) { + ret = dict_set_str(link_xattr, conf->link_xattr_name, + hashed_subvol->name); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed :" + "Failed to set dictionary value:" + " key = %s for %s", + conf->link_xattr_name, loc->name); + *fop_errno = ENOMEM; + ret = -1; + goto out; } - gf_log (this->name, GF_LOG_INFO, "Attempting to migrate hardlink %s " - "with gfid %s from %s -> %s", loc->name, uuid_utoa (loc->gfid), - cached_subvol->name, hashed_subvol->name); - data = dict_get (xattrs, DHT_LINKFILE_KEY); - /* set linkto on cached -> hashed if not present, else link it */ - if (!data) { - ret = dict_set_str (xattrs, DHT_LINKFILE_KEY, - hashed_subvol->name); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Failed to set " - "linkto xattr in dict for %s", loc->name); - goto out; - } + ret = syncop_setxattr(cached_subvol, loc, link_xattr, 0, NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed :" + "Linkto setxattr failed %s -> %s", + cached_subvol->name, loc->name); + *fop_errno = -ret; + ret = -1; + goto out; + } - ret = syncop_setxattr (cached_subvol, loc, xattrs, 0); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Linkto setxattr " - "failed %s -> %s (%s)", cached_subvol->name, - loc->name, strerror (errno)); - goto out; - } - goto out; + gf_msg_debug(this->name, 0, + "hardlink target subvol created on %s " + ",cached %s, file %s", + hashed_subvol->name, cached_subvol->name, loc->path); + + ret = -2; + goto out; + } else { + linkto_subvol = dht_linkfile_subvol(this, NULL, NULL, xattr_rsp); + if (!linkto_subvol) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_SUBVOL_ERROR, + "Failed to get " + "linkto subvol for %s", + loc->name); } else { - linkto_subvol = dht_linkfile_subvol (this, NULL, NULL, xattrs); - if (!linkto_subvol) { - gf_log (this->name, GF_LOG_ERROR, "Failed to get " - "linkto subvol for %s", loc->name); - } else { - hashed_subvol = linkto_subvol; - } - - ret = syncop_link (hashed_subvol, loc, loc); - if (ret) { - op_errno = errno; - gf_log (this->name, GF_LOG_ERROR, "link of %s -> %s" - " failed on subvol %s (%s)", loc->name, - uuid_utoa(loc->gfid), - hashed_subvol->name, strerror (op_errno)); - if (op_errno != EEXIST) - goto out; - } + hashed_subvol = linkto_subvol; } - ret = syncop_lookup (hashed_subvol, loc, NULL, &iatt, NULL, NULL); + + ret = syncop_link(hashed_subvol, loc, loc, &iatt, NULL, NULL); if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Failed lookup %s on %s (%s)" - , loc->name, hashed_subvol->name, strerror (errno)); + op_errno = -ret; + ret = -1; + + loglevel = (op_errno == EEXIST) ? GF_LOG_DEBUG : GF_LOG_ERROR; + gf_msg(this->name, loglevel, op_errno, + DHT_MSG_MIGRATE_HARDLINK_FILE_FAILED, + "link of %s -> %s" + " failed on subvol %s", + loc->name, uuid_utoa(loc->gfid), hashed_subvol->name); + if (op_errno != EEXIST) { + *fop_errno = op_errno; goto out; + } + } else { + gf_msg_debug(this->name, 0, + "syncop_link successful for" + " hardlink %s on subvol %s, cached %s", + loc->path, hashed_subvol->name, cached_subvol->name); } + } - if (iatt.ia_nlink == stbuf->ia_nlink) { - ret = dht_migrate_file (this, loc, cached_subvol, hashed_subvol, - GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS); - if (ret) - goto out; + ret = syncop_lookup(hashed_subvol, loc, &iatt, NULL, NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed :Failed lookup %s on %s ", loc->name, + hashed_subvol->name); + + *fop_errno = -ret; + ret = -1; + goto out; + } + + /* There is a race where on the target subvol for the hardlink + * (note: hash subvol for the hardlink might differ from this), some + * other client(non-rebalance) would have created a linkto file for that + * hardlink as part of lookup. So let say there are 10 hardlinks, on the + * 5th hardlink it self the hardlinks might have migrated. Now for + * (6..10th) hardlinks the cached and target would be same as the file + * has already migrated. Hence this check is needed */ + if (cached_subvol == hashed_subvol) { + gf_msg_debug(this->name, 0, + "source %s and destination %s " + "for hardlink %s are same", + cached_subvol->name, hashed_subvol->name, loc->path); + ret = -2; + goto out; + } + + if (iatt.ia_nlink == stbuf.ia_nlink) { + ret = dht_migrate_file(this, loc, cached_subvol, hashed_subvol, + GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS, fop_errno); + if (ret) { + goto out; } - ret = 0; + } + ret = -2; out: - return ret; -} + if (link_xattr) + dict_unref(link_xattr); + if (xattr_rsp) + dict_unref(xattr_rsp); -static inline int -__is_file_migratable (xlator_t *this, loc_t *loc, - struct iatt *stbuf, dict_t *xattrs, int flags) -{ - int ret = -1; + if (dict) + dict_unref(dict); - if (IA_ISDIR (stbuf->ia_type)) { - gf_log (this->name, GF_LOG_WARNING, - "%s: migrate-file called on directory", loc->path); - ret = -1; - goto out; - } + return ret; +} - if (flags == GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS) { - ret = 0; - goto out; - } - if (stbuf->ia_nlink > 1) { - /* support for decomission */ - if (flags == GF_DHT_MIGRATE_HARDLINK) { - ret = gf_defrag_handle_hardlink (this, loc, - xattrs, stbuf); - if (ret) { - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to migrate file with link", - loc->path); - } - } else { - gf_log (this->name, GF_LOG_WARNING, - "%s: file has hardlinks", loc->path); - } - ret = ENOTSUP; - goto out; - } +static int +__check_file_has_hardlink(xlator_t *this, loc_t *loc, struct iatt *stbuf, + dict_t *xattrs, int flags, gf_defrag_info_t *defrag, + dht_conf_t *conf, int *fop_errno) +{ + int ret = 0; + if (flags == GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS) { ret = 0; - -out: return ret; + } + if (stbuf->ia_nlink > 1) { + /* support for decomission */ + if (flags == GF_DHT_MIGRATE_HARDLINK) { + synclock_lock(&conf->link_lock); + ret = gf_defrag_handle_hardlink(this, loc, fop_errno); + synclock_unlock(&conf->link_lock); + /* + Returning zero will force the file to be remigrated. + Checkout gf_defrag_handle_hardlink for more information. + */ + if (ret && ret != -2) { + gf_msg(this->name, GF_LOG_WARNING, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: failed to migrate file with link", + loc->path); + } + } else { + gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migration skipped for:" + "%s: file has hardlinks", + loc->path); + *fop_errno = ENOTSUP; + ret = 1; + } + } + + return ret; } -static inline int -__dht_rebalance_create_dst_file (xlator_t *to, xlator_t *from, loc_t *loc, struct iatt *stbuf, - dict_t *dict, fd_t **dst_fd) +/* + return values + 0 : File will be migrated + -2 : File will not be migrated + (This is the return value from gf_defrag_handle_hardlink. Checkout + gf_defrag_handle_hardlink for description of "returning -2") + -1 : failure +*/ +static int +__is_file_migratable(xlator_t *this, loc_t *loc, struct iatt *stbuf, + dict_t *xattrs, int flags, gf_defrag_info_t *defrag, + dht_conf_t *conf, int *fop_errno) { - xlator_t *this = NULL; - int ret = -1; - fd_t *fd = NULL; - struct iatt new_stbuf = {0,}; - - this = THIS; + int ret = -1; + int lock_count = 0; + + if (IA_ISDIR(stbuf->ia_type)) { + gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: migrate-file called on directory", + loc->path); + *fop_errno = EISDIR; + ret = -1; + goto out; + } - ret = dict_set_static_bin (dict, "gfid-req", stbuf->ia_gfid, 16); + if (!conf->lock_migration_enabled) { + ret = dict_get_int32(xattrs, GLUSTERFS_POSIXLK_COUNT, &lock_count); if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "%s: failed to set gfid in dict for create", loc->path); - goto out; + gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: Unable to get lock count for file", + loc->path); + *fop_errno = EINVAL; + ret = -1; + goto out; } - ret = dict_set_str (dict, DHT_LINKFILE_KEY, from->name); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "%s: failed to set gfid in dict for create", loc->path); - goto out; + if (lock_count) { + gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed: %s: File has locks." + " Skipping file migration", + loc->path); + *fop_errno = ENOTSUP; + ret = 1; + goto out; } + } - fd = fd_create (loc->inode, DHT_REBALANCE_PID); - if (!fd) { - gf_log (this->name, GF_LOG_ERROR, - "%s: fd create failed (destination) (%s)", - loc->path, strerror (errno)); - ret = -1; - goto out; - } + /* Check if file has hardlink*/ + ret = __check_file_has_hardlink(this, loc, stbuf, xattrs, flags, defrag, + conf, fop_errno); +out: + return ret; +} - ret = syncop_lookup (to, loc, NULL, &new_stbuf, NULL, NULL); - if (!ret) { - /* File exits in the destination, check if gfid matches */ - if (uuid_compare (stbuf->ia_gfid, new_stbuf.ia_gfid) != 0) { - gf_log (this->name, GF_LOG_ERROR, - "file %s exits in %s with different gfid", - loc->path, to->name); - fd_unref (fd); - goto out; - } +static int +__dht_rebalance_create_dst_file(xlator_t *this, xlator_t *to, xlator_t *from, + loc_t *loc, struct iatt *stbuf, fd_t **dst_fd, + int *fop_errno, int file_has_holes) +{ + int ret = -1; + int ret2 = -1; + fd_t *fd = NULL; + struct iatt new_stbuf = { + 0, + }; + struct iatt check_stbuf = { + 0, + }; + dht_conf_t *conf = NULL; + dict_t *dict = NULL; + dict_t *xdata = NULL; + + conf = this->private; + + dict = dict_new(); + if (!dict) { + *fop_errno = ENOMEM; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_NO_MEMORY, + "dictionary allocation failed for" + "path:%s", + loc->path); + goto out; + } + ret = dict_set_gfuuid(dict, "gfid-req", stbuf->ia_gfid, true); + if (ret) { + *fop_errno = ENOMEM; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED, + "%s: failed to set dictionary value: key = gfid-req", loc->path); + goto out; + } + + ret = dict_set_str(dict, conf->link_xattr_name, from->name); + if (ret) { + *fop_errno = ENOMEM; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED, + "%s: failed to set dictionary value: key = %s ", loc->path, + conf->link_xattr_name); + goto out; + } + + fd = fd_create(loc->inode, DHT_REBALANCE_PID); + if (!fd) { + *fop_errno = ENOMEM; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: fd create failed (destination)", loc->path); + goto out; + } + + xdata = dict_new(); + if (!xdata) { + *fop_errno = ENOMEM; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: dict_new failed)", loc->path); + goto out; + } + + ret = dict_set_int32_sizen(xdata, GF_CLEAN_WRITE_PROTECTION, 1); + if (ret) { + *fop_errno = ENOMEM; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED, + "%s: failed to set dictionary value: key = %s ", loc->path, + GF_CLEAN_WRITE_PROTECTION); + goto out; + } + + ret = syncop_lookup(to, loc, &new_stbuf, NULL, xdata, NULL); + if (!ret) { + /* File exits in the destination, check if gfid matches */ + if (gf_uuid_compare(stbuf->ia_gfid, new_stbuf.ia_gfid) != 0) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_GFID_MISMATCH, + "file %s exists in %s with different gfid", loc->path, + to->name); + *fop_errno = EINVAL; + ret = -1; + goto out; } - if ((ret == -1) && (errno != ENOENT)) { - /* File exists in destination, but not accessible */ - gf_log (THIS->name, GF_LOG_WARNING, - "%s: failed to lookup file (%s)", - loc->path, strerror (errno)); - goto out; + } + if ((ret < 0) && (-ret != ENOENT)) { + /* File exists in destination, but not accessible */ + gf_msg(THIS->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to lookup file", loc->path); + *fop_errno = -ret; + ret = -1; + goto out; + } + + /* Create the destination with LINKFILE mode, and linkto xattr, + if the linkfile already exists, just open the file */ + if (!ret) { + /* + * File already present, just open the file. + */ + ret = syncop_open(to, loc, O_RDWR, fd, NULL, NULL); + if (ret < 0) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "failed to open %s on %s", loc->path, to->name); + *fop_errno = -ret; + ret = -1; + goto out; } - - /* Create the destination with LINKFILE mode, and linkto xattr, - if the linkfile already exists, it will just open the file */ - ret = syncop_create (to, loc, O_RDWR, DHT_LINKFILE_MODE, fd, - dict); + } else { + ret = syncop_create(to, loc, O_RDWR, DHT_LINKFILE_MODE, fd, &new_stbuf, + dict, NULL); if (ret < 0) { - gf_log (this->name, GF_LOG_ERROR, - "failed to create %s on %s (%s)", - loc->path, to->name, strerror (errno)); - goto out; + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "failed to create %s on %s", loc->path, to->name); + *fop_errno = -ret; + ret = -1; + goto out; + } + } + + fd_bind(fd); + + /*Reason of doing lookup after create again: + *In the create, there is some time-gap between opening fd at the + *server (posix_layer) and binding it in server (incrementing fd count), + *so if in that time-gap, if other process sends unlink considering it + *as a linkto file, because inode->fd count will be 0, so file will be + *unlinked at the backend. And because further operations are performed + *on fd, so though migration will be done but will end with no file + *at the backend. + */ + + ret = syncop_lookup(to, loc, &check_stbuf, NULL, NULL, NULL); + if (!ret) { + if (gf_uuid_compare(stbuf->ia_gfid, check_stbuf.ia_gfid) != 0) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_GFID_MISMATCH, + "file %s exists in %s with different gfid," + "found in lookup after create", + loc->path, to->name); + *fop_errno = EINVAL; + ret = -1; + goto out; } + } + + if (-ret == ENOENT) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: file does not exist" + "on %s", + loc->path, to->name); + *fop_errno = -ret; + ret = -1; + goto out; + } + + ret = syncop_fsetattr(to, fd, stbuf, (GF_SET_ATTR_UID | GF_SET_ATTR_GID), + NULL, NULL, NULL, NULL); + if (ret < 0) { + *fop_errno = -ret; + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "chown failed for %s on %s", loc->path, to->name); + } + + /* No need to bother about 0 byte size files */ + if (stbuf->ia_size > 0) { + if (conf->use_fallocate && !file_has_holes) { + ret = syncop_fallocate(to, fd, 0, 0, stbuf->ia_size, NULL, NULL); + if (ret < 0) { + if (ret == -EOPNOTSUPP || ret == -EINVAL || ret == -ENOSYS) { + conf->use_fallocate = _gf_false; + } else { + gf_msg(this->name, GF_LOG_ERROR, -ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "fallocate failed for %s on %s", loc->path, + to->name); + + *fop_errno = -ret; + + /* fallocate does not release the space + * in some cases + */ + ret2 = syncop_ftruncate(to, fd, 0, NULL, NULL, NULL, NULL); + if (ret2 < 0) { + gf_msg(this->name, GF_LOG_WARNING, -ret2, + DHT_MSG_MIGRATE_FILE_FAILED, + "ftruncate failed for " + "%s on %s", + loc->path, to->name); + } + goto out; + } + } + } else { + ret = syncop_ftruncate(to, fd, stbuf->ia_size, NULL, NULL, NULL, + NULL); + if (ret < 0) { + *fop_errno = -ret; + gf_msg(this->name, GF_LOG_WARNING, -ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "ftruncate failed for %s on %s", loc->path, to->name); + } + } + } - if (dst_fd) - *dst_fd = fd; + /* success */ + ret = 0; - /* success */ - ret = 0; + if (dst_fd) + *dst_fd = fd; out: - return ret; -} + if (ret) { + if (fd) { + fd_unref(fd); + } + } + if (dict) + dict_unref(dict); -static inline int -__dht_check_free_space (xlator_t *to, xlator_t *from, loc_t *loc, - struct iatt *stbuf, int flag) -{ - struct statvfs src_statfs = {0,}; - struct statvfs dst_statfs = {0,}; - int ret = -1; - xlator_t *this = NULL; + if (xdata) + dict_unref(xdata); - this = THIS; + return ret; +} - ret = syncop_statfs (from, loc, &src_statfs); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "failed to get statfs of %s on %s (%s)", - loc->path, from->name, strerror (errno)); - goto out; +static int +__dht_check_free_space(xlator_t *this, xlator_t *to, xlator_t *from, loc_t *loc, + struct iatt *stbuf, int flag, dht_conf_t *conf, + gf_boolean_t *target_changed, xlator_t **new_subvol, + int *fop_errno) +{ + struct statvfs src_statfs = { + 0, + }; + struct statvfs dst_statfs = { + 0, + }; + int ret = -1; + dict_t *xdata = NULL; + dht_layout_t *layout = NULL; + uint64_t src_statfs_blocks = 1; + uint64_t dst_statfs_blocks = 1; + double dst_post_availspacepercent = 0; + double src_post_availspacepercent = 0; + uint64_t file_blocks = 0; + uint64_t src_total_blocks = 0; + uint64_t dst_total_blocks = 0; + + xdata = dict_new(); + if (!xdata) { + *fop_errno = ENOMEM; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_NO_MEMORY, + "failed to allocate dictionary"); + goto out; + } + + ret = dict_set_int8(xdata, GF_INTERNAL_IGNORE_DEEM_STATFS, 1); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "Failed to set " GF_INTERNAL_IGNORE_DEEM_STATFS " in dict"); + ret = -1; + *fop_errno = ENOMEM; + goto out; + } + + ret = syncop_statfs(from, loc, &src_statfs, xdata, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "failed to get statfs of %s on %s", loc->path, from->name); + *fop_errno = -ret; + ret = -1; + goto out; + } + + ret = syncop_statfs(to, loc, &dst_statfs, xdata, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "failed to get statfs of %s on %s", loc->path, to->name); + *fop_errno = -ret; + ret = -1; + goto out; + } + + gf_msg_debug(this->name, 0, + "min_free_disk - %f , block available - %" PRId64 + ", block size - %lu", + conf->min_free_disk, dst_statfs.f_bavail, dst_statfs.f_bsize); + + dst_statfs_blocks = dst_statfs.f_bavail * + (dst_statfs.f_frsize / GF_DISK_SECTOR_SIZE); + + src_statfs_blocks = src_statfs.f_bavail * + (src_statfs.f_frsize / GF_DISK_SECTOR_SIZE); + + dst_total_blocks = dst_statfs.f_blocks * + (dst_statfs.f_frsize / GF_DISK_SECTOR_SIZE); + + src_total_blocks = src_statfs.f_blocks * + (src_statfs.f_frsize / GF_DISK_SECTOR_SIZE); + + /* if force option is given, do not check for space @ dst. + * Check only if space is avail for the file */ + if (flag != GF_DHT_MIGRATE_DATA) + goto check_avail_space; + + /* Check: + During rebalance `migrate-data` - Destination subvol experiences + a `reduction` in 'blocks' of free space, at the same time source + subvol gains certain 'blocks' of free space. A valid check is + necessary here to avoid erroneous move to destination where + the space could be scantily available. + With heterogeneous brick support, an actual space comparison could + prevent any files being migrated to newly added bricks if they are + smaller then the free space available on the existing bricks. + */ + if (!conf->use_fallocate) { + file_blocks = stbuf->ia_size + GF_DISK_SECTOR_SIZE - 1; + file_blocks /= GF_DISK_SECTOR_SIZE; + + if (file_blocks >= dst_statfs_blocks) { + dst_statfs_blocks = 0; + } else { + dst_statfs_blocks -= file_blocks; } + } + + src_post_availspacepercent = ((src_statfs_blocks + file_blocks) * 100) / + src_total_blocks; + + dst_post_availspacepercent = (dst_statfs_blocks * 100) / dst_total_blocks; + + if (dst_post_availspacepercent < src_post_availspacepercent) { + gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "data movement of file " + "{blocks:%" PRIu64 + " name:(%s)} would result in " + "dst node (%s:%" PRIu64 + ") having lower disk " + "space than the source node (%s:%" PRIu64 + ")" + ".Skipping file.", + stbuf->ia_blocks, loc->path, to->name, dst_statfs_blocks, + from->name, src_statfs_blocks); + + /* this is not a 'failure', but we don't want to + consider this as 'success' too :-/ */ + *fop_errno = ENOSPC; + ret = 1; + goto out; + } - ret = syncop_statfs (to, loc, &dst_statfs); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "failed to get statfs of %s on %s (%s)", - loc->path, to->name, strerror (errno)); - goto out; +check_avail_space: + if (conf->disk_unit == 'p' && dst_statfs.f_blocks) { + dst_post_availspacepercent = (dst_statfs_blocks * 100) / + dst_total_blocks; + + gf_msg_debug(this->name, 0, + "file : %s, post_availspacepercent" + " : %lf f_bavail : %" PRIu64 " min-free-disk: %lf", + loc->path, dst_post_availspacepercent, dst_statfs.f_bavail, + conf->min_free_disk); + + if (dst_post_availspacepercent < conf->min_free_disk) { + gf_msg(this->name, GF_LOG_WARNING, 0, 0, + "Write will cross min-free-disk for " + "file - %s on subvol - %s. Looking " + "for new subvol", + loc->path, to->name); + + goto find_new_subvol; + } else { + ret = 0; + goto out; } + } - /* if force option is given, do not check for space @ dst. - * Check only if space is avail for the file */ - if (flag != GF_DHT_MIGRATE_DATA) - goto check_avail_space; + if (conf->disk_unit != 'p') { + if ((dst_statfs_blocks * GF_DISK_SECTOR_SIZE) < conf->min_free_disk) { + gf_msg_debug(this->name, 0, + "file : %s, destination frsize: %lu " + "f_bavail : %" PRIu64 " min-free-disk: %lf", + loc->path, dst_statfs.f_frsize, dst_statfs.f_bavail, + conf->min_free_disk); - if (((dst_statfs.f_bavail * - dst_statfs.f_bsize) / GF_DISK_SECTOR_SIZE) < - (((src_statfs.f_bavail * src_statfs.f_bsize) / - GF_DISK_SECTOR_SIZE) - stbuf->ia_blocks)) { - gf_log (this->name, GF_LOG_WARNING, - "data movement attempted from node (%s) with" - " higher disk space to a node (%s) with " - "lesser disk space (%s)", from->name, - to->name, loc->path); + gf_msg(this->name, GF_LOG_WARNING, 0, 0, + "write will" + " cross min-free-disk for file - %s on subvol -" + " %s. looking for new subvol", + loc->path, to->name); - /* this is not a 'failure', but we don't want to - consider this as 'success' too :-/ */ - ret = 1; - goto out; - } + goto find_new_subvol; -check_avail_space: - if (((dst_statfs.f_bavail * dst_statfs.f_bsize) / - GF_DISK_SECTOR_SIZE) < stbuf->ia_blocks) { - gf_log (this->name, GF_LOG_ERROR, - "data movement attempted from node (%s) with " - "to node (%s) which does not have required free space" - " for %s", from->name, to->name, loc->path); - ret = 1; - goto out; + } else { + ret = 0; + goto out; } + } +find_new_subvol: + layout = dht_layout_get(this, loc->parent); + if (!layout) { + gf_log(this->name, GF_LOG_ERROR, "Layout is NULL"); + *fop_errno = EINVAL; + ret = -1; + goto out; + } + + *new_subvol = dht_subvol_with_free_space_inodes(this, to, from, layout, + stbuf->ia_size); + if ((!(*new_subvol)) || (*new_subvol == from)) { + gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_SUBVOL_INSUFF_SPACE, + "Could not find any subvol" + " with space accommodating the file - %s. Consider " + "adding bricks", + loc->path); + + *target_changed = _gf_false; + *fop_errno = ENOSPC; + ret = -1; + } else { + gf_msg(this->name, GF_LOG_INFO, 0, 0, + "new target found - %s" + " for file - %s", + (*new_subvol)->name, loc->path); + *target_changed = _gf_true; ret = 0; + } + out: - return ret; + if (xdata) + dict_unref(xdata); + return ret; } -static inline int -__dht_rebalance_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst, - uint64_t ia_size, int hole_exists) +static int +__dht_rebalance_migrate_data(xlator_t *this, gf_defrag_info_t *defrag, + xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst, + uint64_t ia_size, int hole_exists, int *fop_errno) { - int ret = 0; - int count = 0; - off_t offset = 0; - struct iovec *vector = NULL; - struct iobref *iobref = NULL; - uint64_t total = 0; - size_t read_size = 0; - - /* if file size is '0', no need to enter this loop */ - while (total < ia_size) { - read_size = (((ia_size - total) > DHT_REBALANCE_BLKSIZE) ? - DHT_REBALANCE_BLKSIZE : (ia_size - total)); - ret = syncop_readv (from, src, read_size, - offset, 0, &vector, &count, &iobref); - if (!ret || (ret < 0)) { - break; + int ret = 0; + int count = 0; + off_t offset = 0; + off_t data_offset = 0; + off_t hole_offset = 0; + struct iovec *vector = NULL; + struct iobref *iobref = NULL; + uint64_t total = 0; + size_t read_size = 0; + size_t data_block_size = 0; + dict_t *xdata = NULL; + dht_conf_t *conf = NULL; + + conf = this->private; + + /* if file size is '0', no need to enter this loop */ + while (total < ia_size) { + /* This is a regular file - read it sequentially */ + if (!hole_exists) { + read_size = (((ia_size - total) > DHT_REBALANCE_BLKSIZE) + ? DHT_REBALANCE_BLKSIZE + : (ia_size - total)); + } else { + /* This is a sparse file - read only the data segments in the file + */ + + /* If the previous data block is fully copied, find the next data + * segment + * starting at the offset of the last read and written byte, */ + if (data_block_size <= 0) { + ret = syncop_seek(from, src, offset, GF_SEEK_DATA, NULL, + &data_offset); + if (ret) { + if (ret == -ENXIO) + ret = 0; /* No more data segments */ + else + *fop_errno = -ret; /* Error occurred */ + + break; } - if (hole_exists) - ret = dht_write_with_holes (to, dst, vector, count, - ret, offset, iobref); - else - ret = syncop_writev (to, dst, vector, count, - offset, iobref, 0); - if (ret < 0) { + /* If the position of the current data segment is greater than + * the position of the next hole, find the next hole in order to + * calculate the length of the new data segment */ + if (data_offset > hole_offset) { + /* Starting at the offset of the last data segment, find the + * next hole */ + ret = syncop_seek(from, src, data_offset, GF_SEEK_HOLE, + NULL, &hole_offset); + if (ret) { + /* If an error occurred here it's a real error because + * if the seek for a data segment was successful then + * necessarily another hole must exist (EOF is a hole) + */ + *fop_errno = -ret; break; - } - offset += ret; - total += ret; + } - if (vector) - GF_FREE (vector); - if (iobref) - iobref_unref (iobref); - iobref = NULL; - vector = NULL; + /* Calculate the total size of the current data block */ + data_block_size = hole_offset - data_offset; + } + } else { + /* There is still data in the current segment, move the + * data_offset to the position of the last written byte */ + data_offset = offset; + } + + /* Calculate how much data needs to be read and written. If the data + * segment's length is bigger than DHT_REBALANCE_BLKSIZE, read and + * write DHT_REBALANCE_BLKSIZE data length and the rest in the + * next iteration(s) */ + read_size = ((data_block_size > DHT_REBALANCE_BLKSIZE) + ? DHT_REBALANCE_BLKSIZE + : data_block_size); + + /* Calculate the remaining size of the data block - maybe there's no + * need to seek for data in the next iteration */ + data_block_size -= read_size; + + /* Set offset to the offset of the data segment so read and write + * will have the correct position */ + offset = data_offset; } - if (iobref) - iobref_unref (iobref); - if (vector) - GF_FREE (vector); - - if (ret >= 0) - ret = 0; - - return ret; -} + ret = syncop_readv(from, src, read_size, offset, 0, &vector, &count, + &iobref, NULL, NULL, NULL); -static inline int -__dht_rebalance_open_src_file (xlator_t *from, xlator_t *to, loc_t *loc, - struct iatt *stbuf, fd_t **src_fd) -{ - int ret = 0; - fd_t *fd = NULL; - dict_t *dict = NULL; - xlator_t *this = NULL; - struct iatt iatt = {0,}; + if (!ret || (ret < 0)) { + if (!ret) { + /* File was probably truncated*/ + ret = -1; + *fop_errno = ENOSPC; + } else { + *fop_errno = -ret; + } + break; + } - this = THIS; + if (!conf->force_migration) { + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_msg("dht", GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "insufficient memory"); + ret = -1; + *fop_errno = ENOMEM; + break; + } - fd = fd_create (loc->inode, DHT_REBALANCE_PID); - if (!fd) { - gf_log (this->name, GF_LOG_ERROR, - "%s: fd create failed (source)", loc->path); - ret = -1; - goto out; + /* Fail this write and abort rebalance if we + * detect a write from client since migration of + * this file started. This is done to avoid + * potential data corruption due to out of order + * writes from rebalance and client to the same + * region (as compared between src and dst + * files). See + * https://github.com/gluster/glusterfs/issues/308 + * for more details. + */ + ret = dict_set_int32_sizen(xdata, GF_AVOID_OVERWRITE, 1); + if (ret) { + gf_msg("dht", GF_LOG_ERROR, 0, ENOMEM, + "failed to set dict"); + ret = -1; + *fop_errno = ENOMEM; + break; + } + } } - ret = syncop_open (from, loc, O_RDWR, fd); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "failed to open file %s on %s (%s)", - loc->path, from->name, strerror (errno)); - goto out; + ret = syncop_writev(to, dst, vector, count, offset, iobref, 0, NULL, + NULL, xdata, NULL); + if (ret < 0) { + *fop_errno = -ret; + break; } + offset += ret; + total += ret; + + GF_FREE(vector); + if (iobref) + iobref_unref(iobref); + iobref = NULL; + vector = NULL; + } + if (iobref) + iobref_unref(iobref); + GF_FREE(vector); + + if (ret >= 0) + ret = 0; + else ret = -1; - dict = dict_new (); - if (!dict) - goto out; - ret = dict_set_str (dict, DHT_LINKFILE_KEY, to->name); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "failed to set xattr in dict for %s (linkto:%s)", - loc->path, to->name); - goto out; - } + if (xdata) { + dict_unref(xdata); + } - /* Once the migration starts, the source should have 'linkto' key set - to show which is the target, so other clients can work around it */ - ret = syncop_setxattr (from, loc, dict, 0); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "failed to set xattr on %s in %s (%s)", - loc->path, from->name, strerror (errno)); - goto out; - } + return ret; +} - /* mode should be (+S+T) to indicate migration is in progress */ - iatt.ia_prot = stbuf->ia_prot; - iatt.ia_type = stbuf->ia_type; - iatt.ia_prot.sticky = 1; - iatt.ia_prot.sgid = 1; +static int +__dht_rebalance_open_src_file(xlator_t *this, xlator_t *from, xlator_t *to, + loc_t *loc, struct iatt *stbuf, fd_t **src_fd, + gf_boolean_t *clean_src, int *fop_errno) +{ + int ret = 0; + fd_t *fd = NULL; + dict_t *dict = NULL; + struct iatt iatt = { + 0, + }; + dht_conf_t *conf = NULL; + + conf = this->private; + + *clean_src = _gf_false; + + fd = fd_create(loc->inode, DHT_REBALANCE_PID); + if (!fd) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: fd create failed (source)", loc->path); + *fop_errno = ENOMEM; + ret = -1; + goto out; + } + + ret = syncop_open(from, loc, O_RDWR, fd, NULL, NULL); + if (ret < 0) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "failed to open file %s on %s", loc->path, from->name); + *fop_errno = -ret; + ret = -1; + goto out; + } - ret = syncop_setattr (from, loc, &iatt, GF_SET_ATTR_MODE, NULL, NULL); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "failed to set mode on %s in %s (%s)", - loc->path, from->name, strerror (errno)); - goto out; - } + fd_bind(fd); - if (src_fd) - *src_fd = fd; + if (src_fd) + *src_fd = fd; - /* success */ - ret = 0; + ret = -1; + dict = dict_new(); + if (!dict) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: Could not allocate memory for dict", loc->path); + *fop_errno = ENOMEM; + ret = -1; + goto out; + } + + ret = dict_set_str(dict, conf->link_xattr_name, to->name); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "failed to set xattr in dict for %s (linkto:%s)", loc->path, + to->name); + *fop_errno = ENOMEM; + ret = -1; + goto out; + } + + /* Once the migration starts, the source should have 'linkto' key set + to show which is the target, so other clients can work around it */ + ret = syncop_setxattr(from, loc, dict, 0, NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "failed to set xattr on %s in %s", loc->path, from->name); + *fop_errno = -ret; + ret = -1; + goto out; + } + + /* Reset source mode/xattr if migration fails*/ + *clean_src = _gf_true; + + /* mode should be (+S+T) to indicate migration is in progress */ + iatt.ia_prot = stbuf->ia_prot; + iatt.ia_type = stbuf->ia_type; + iatt.ia_prot.sticky = 1; + iatt.ia_prot.sgid = 1; + + ret = syncop_setattr(from, loc, &iatt, GF_SET_ATTR_MODE, NULL, NULL, NULL, + NULL); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "failed to set mode on %s in %s", loc->path, from->name); + *fop_errno = -ret; + ret = -1; + goto out; + } + + /* success */ + ret = 0; out: - if (dict) - dict_unref (dict); + if (dict) + dict_unref(dict); - return ret; + return ret; } int -migrate_special_files (xlator_t *this, xlator_t *from, xlator_t *to, loc_t *loc, - struct iatt *buf) +migrate_special_files(xlator_t *this, xlator_t *from, xlator_t *to, loc_t *loc, + struct iatt *buf, int *fop_errno) { - int ret = -1; - dict_t *rsp_dict = NULL; - dict_t *dict = NULL; - char *link = NULL; - struct iatt stbuf = {0,}; - - dict = dict_new (); - if (!dict) - goto out; + int ret = -1; + dict_t *rsp_dict = NULL; + dict_t *dict = NULL; + char *link = NULL; + struct iatt stbuf = { + 0, + }; + dht_conf_t *conf = this->private; + + dict = dict_new(); + if (!dict) { + *fop_errno = ENOMEM; + ret = -1; + goto out; + } + ret = dict_set_int32(dict, conf->link_xattr_name, 256); + if (ret) { + *fop_errno = ENOMEM; + ret = -1; + gf_log(this->name, GF_LOG_ERROR, + "%s: failed to set 'linkto' key in dict", loc->path); + goto out; + } + + /* check in the destination if the file is link file */ + ret = syncop_lookup(to, loc, &stbuf, NULL, dict, &rsp_dict); + if ((ret < 0) && (-ret != ENOENT)) { + gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: lookup failed", loc->path); + *fop_errno = -ret; + ret = -1; + goto out; + } + + /* we no more require this key */ + dict_del(dict, conf->link_xattr_name); + + /* file exists in target node, only if it is 'linkfile' its valid, + otherwise, error out */ + if (!ret) { + if (!check_is_linkfile(loc->inode, &stbuf, rsp_dict, + conf->link_xattr_name)) { + gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: file exists in destination", loc->path); + *fop_errno = EINVAL; + ret = -1; + goto out; + } - ret = dict_set_int32 (dict, DHT_LINKFILE_KEY, 256); + /* as file is linkfile, delete it */ + ret = syncop_unlink(to, loc, NULL, NULL); if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "%s: failed to set 'linkto' key in dict", loc->path); - goto out; + gf_msg(this->name, GF_LOG_WARNING, -ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to delete the linkfile", loc->path); + *fop_errno = -ret; + ret = -1; + goto out; } + } - /* check in the destination if the file is link file */ - ret = syncop_lookup (to, loc, dict, &stbuf, &rsp_dict, NULL); - if ((ret == -1) && (errno != ENOENT)) { - gf_log (this->name, GF_LOG_WARNING, "%s: lookup failed (%s)", - loc->path, strerror (errno)); - goto out; + /* Set the gfid of the source file in dict */ + ret = dict_set_gfuuid(dict, "gfid-req", buf->ia_gfid, true); + if (ret) { + *fop_errno = ENOMEM; + ret = -1; + gf_log(this->name, GF_LOG_ERROR, + "%s: failed to set gfid in dict for create", loc->path); + goto out; + } + + /* Create the file in target */ + if (IA_ISLNK(buf->ia_type)) { + /* Handle symlinks separately */ + ret = syncop_readlink(from, loc, &link, buf->ia_size, NULL, NULL); + if (ret < 0) { + gf_msg(this->name, GF_LOG_WARNING, -ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "%s: readlink on symlink failed", loc->path); + *fop_errno = -ret; + ret = -1; + goto out; } - /* we no more require this key */ - dict_del (dict, DHT_LINKFILE_KEY); + ret = syncop_symlink(to, loc, link, 0, dict, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, + DHT_MSG_MIGRATE_FILE_FAILED, "%s: creating symlink failed", + loc->path); + *fop_errno = -ret; + ret = -1; + goto out; + } - /* file exists in target node, only if it is 'linkfile' its valid, - otherwise, error out */ - if (!ret) { - if (!check_is_linkfile (loc->inode, &stbuf, rsp_dict)) { - gf_log (this->name, GF_LOG_WARNING, - "%s: file exists in destination", loc->path); - ret = -1; - goto out; - } + goto done; + } - /* as file is linkfile, delete it */ - ret = syncop_unlink (to, loc); - if (ret) { - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to delete the linkfile (%s)", - loc->path, strerror (errno)); - goto out; - } - } + ret = syncop_mknod(to, loc, st_mode_from_ia(buf->ia_prot, buf->ia_type), + makedev(ia_major(buf->ia_rdev), ia_minor(buf->ia_rdev)), + 0, dict, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: mknod failed", loc->path); + *fop_errno = -ret; + ret = -1; + goto out; + } - /* Set the gfid of the source file in dict */ - ret = dict_set_static_bin (dict, "gfid-req", buf->ia_gfid, 16); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "%s: failed to set gfid in dict for create", loc->path); - goto out; - } +done: + ret = syncop_setattr(to, loc, buf, + (GF_SET_ATTR_MTIME | GF_SET_ATTR_UID | + GF_SET_ATTR_GID | GF_SET_ATTR_MODE), + NULL, NULL, NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to perform setattr on %s", loc->path, to->name); + *fop_errno = -ret; + } + + ret = syncop_unlink(from, loc, NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: unlink failed", loc->path); + *fop_errno = -ret; + ret = -1; + } - /* Create the file in target */ - if (IA_ISLNK (buf->ia_type)) { - /* Handle symlinks separately */ - ret = syncop_readlink (from, loc, &link, buf->ia_size); - if (ret < 0) { - gf_log (this->name, GF_LOG_WARNING, - "%s: readlink on symlink failed (%s)", - loc->path, strerror (errno)); - goto out; - } +out: + GF_FREE(link); + if (dict) + dict_unref(dict); - ret = syncop_symlink (to, loc, link, dict); - if (ret) { - gf_log (this->name, GF_LOG_WARNING, - "%s: creating symlink failed (%s)", - loc->path, strerror (errno)); - goto out; - } + if (rsp_dict) + dict_unref(rsp_dict); - goto done; - } + return ret; +} - ret = syncop_mknod (to, loc, st_mode_from_ia (buf->ia_prot, - buf->ia_type), - makedev (ia_major (buf->ia_rdev), - ia_minor (buf->ia_rdev)), dict); - if (ret) { - gf_log (this->name, GF_LOG_WARNING, "%s: mknod failed (%s)", - loc->path, strerror (errno)); - goto out; - } +static int +__dht_migration_cleanup_src_file(xlator_t *this, loc_t *loc, fd_t *fd, + xlator_t *from, ia_prot_t *src_ia_prot) +{ + int ret = -1; + dht_conf_t *conf = NULL; + struct iatt new_stbuf = { + 0, + }; + + if (!this || !fd || !from || !src_ia_prot) { + goto out; + } + + conf = this->private; + + /*Revert source mode and xattr changes*/ + ret = syncop_fstat(from, fd, &new_stbuf, NULL, NULL); + if (ret < 0) { + /* Failed to get the stat info */ + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file cleanup failed: failed to fstat " + "file %s on %s ", + loc->path, from->name); + ret = -1; + goto out; + } -done: - ret = syncop_unlink (from, loc); - if (ret) - gf_log (this->name, GF_LOG_WARNING, "%s: unlink failed (%s)", - loc->path, strerror (errno)); + /* Remove the sticky bit and sgid bit set, reset it to 0*/ + if (!src_ia_prot->sticky) + new_stbuf.ia_prot.sticky = 0; -out: - if (dict) - dict_unref (dict); + if (!src_ia_prot->sgid) + new_stbuf.ia_prot.sgid = 0; - if (rsp_dict) - dict_unref (rsp_dict); + ret = syncop_fsetattr(from, fd, &new_stbuf, + (GF_SET_ATTR_GID | GF_SET_ATTR_MODE), NULL, NULL, + NULL, NULL); - return ret; + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file cleanup failed:" + "%s: failed to perform fsetattr on %s ", + loc->path, from->name); + ret = -1; + goto out; + } + + ret = syncop_fremovexattr(from, fd, conf->link_xattr_name, 0, NULL); + if (ret) { + gf_log(this->name, GF_LOG_WARNING, + "%s: failed to remove linkto xattr on %s (%s)", loc->path, + from->name, strerror(-ret)); + ret = -1; + goto out; + } + + ret = 0; + +out: + return ret; } /* @@ -628,992 +1469,3234 @@ out: 1 : not a failure, but we can't migrate data as of now */ int -dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to, - int flag) +dht_migrate_file(xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to, + int flag, int *fop_errno) { - int ret = -1; - struct iatt new_stbuf = {0,}; - struct iatt stbuf = {0,}; - struct iatt empty_iatt = {0,}; - ia_prot_t src_ia_prot = {0,}; - fd_t *src_fd = NULL; - fd_t *dst_fd = NULL; - dict_t *dict = NULL; - dict_t *xattr = NULL; - dict_t *xattr_rsp = NULL; - int file_has_holes = 0; - - gf_log (this->name, GF_LOG_INFO, "%s: attempting to move from %s to %s", - loc->path, from->name, to->name); - - dict = dict_new (); - if (!dict) - goto out; + int ret = -1; + struct iatt new_stbuf = { + 0, + }; + struct iatt stbuf = { + 0, + }; + struct iatt empty_iatt = { + 0, + }; + ia_prot_t src_ia_prot = { + 0, + }; + fd_t *src_fd = NULL; + fd_t *dst_fd = NULL; + dict_t *dict = NULL; + dict_t *xattr = NULL; + dict_t *xattr_rsp = NULL; + int file_has_holes = 0; + dht_conf_t *conf = this->private; + int rcvd_enoent_from_src = 0; + struct gf_flock flock = { + 0, + }; + struct gf_flock plock = { + 0, + }; + loc_t tmp_loc = { + 0, + }; + loc_t parent_loc = { + 0, + }; + gf_boolean_t inodelk_locked = _gf_false; + gf_boolean_t entrylk_locked = _gf_false; + gf_boolean_t p_locked = _gf_false; + int lk_ret = -1; + gf_defrag_info_t *defrag = NULL; + gf_boolean_t clean_src = _gf_false; + gf_boolean_t clean_dst = _gf_false; + int log_level = GF_LOG_INFO; + gf_boolean_t delete_src_linkto = _gf_true; + lock_migration_info_t locklist; + dict_t *meta_dict = NULL; + gf_boolean_t meta_locked = _gf_false; + gf_boolean_t target_changed = _gf_false; + xlator_t *new_target = NULL; + xlator_t *old_target = NULL; + xlator_t *hashed_subvol = NULL; + fd_t *linkto_fd = NULL; + dict_t *xdata = NULL; + + if (from == to) { + gf_msg_debug(this->name, 0, + "destination and source are same. file %s" + " might have migrated already", + loc->path); + ret = 0; + goto out; + } + + gf_log(this->name, log_level, "%s: attempting to move from %s to %s", + loc->path, from->name, to->name); - ret = dict_set_int32 (dict, DHT_LINKFILE_KEY, 256); + dict = dict_new(); + if (!dict) { + ret = -1; + *fop_errno = ENOMEM; + gf_msg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_NO_MEMORY, + "Could not allocate memory for dict"); + goto out; + } + ret = dict_set_int32(dict, conf->link_xattr_name, 256); + if (ret) { + *fop_errno = ENOMEM; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: failed to set 'linkto' key in dict", + loc->path); + goto out; + } + + /* Do not migrate file in case lock migration is not enabled on the + * volume*/ + if (!conf->lock_migration_enabled) { + ret = dict_set_int32(dict, GLUSTERFS_POSIXLK_COUNT, sizeof(int32_t)); if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "%s: failed to set 'linkto' key in dict", loc->path); - goto out; + *fop_errno = ENOMEM; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed: %s: failed to " + "set " GLUSTERFS_POSIXLK_COUNT " key in dict", + loc->path); + goto out; + } + } else { + gf_msg(this->name, GF_LOG_INFO, 0, 0, + "locks will be migrated" + " for file: %s", + loc->path); + } + + /* The file is locked to prevent a rename during a migration. Renames + * and migrations on the file at the same time can lead to data loss. + */ + + ret = dht_build_parent_loc(this, &parent_loc, loc, fop_errno); + if (ret < 0) { + ret = -1; + gf_msg(this->name, GF_LOG_WARNING, *fop_errno, + DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to build parent loc, which is needed to " + "acquire entrylk to synchronize with renames on this " + "path. Skipping migration", + loc->path); + goto out; + } + + hashed_subvol = dht_subvol_get_hashed(this, loc); + if (hashed_subvol == NULL) { + ret = -1; + gf_msg(this->name, GF_LOG_WARNING, EINVAL, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: cannot find hashed subvol which is needed to " + "synchronize with renames on this path. " + "Skipping migration", + loc->path); + goto out; + } + + flock.l_type = F_WRLCK; + + tmp_loc.inode = inode_ref(loc->inode); + gf_uuid_copy(tmp_loc.gfid, loc->gfid); + tmp_loc.path = gf_strdup(loc->path); + + /* this inodelk happens with flock.owner being zero. But to synchronize + * hardlink migration we need to have different lkowner for each migration + * Filed a bug here: https://bugzilla.redhat.com/show_bug.cgi?id=1468202 to + * track the fix for this. Currently synclock takes care of synchronizing + * hardlink migration. Once this bug is fixed we can avoid taking synclock + */ + ret = syncop_inodelk(from, DHT_FILE_MIGRATE_DOMAIN, &tmp_loc, F_SETLKW, + &flock, NULL, NULL); + if (ret < 0) { + *fop_errno = -ret; + ret = -1; + gf_msg(this->name, GF_LOG_WARNING, *fop_errno, + DHT_MSG_MIGRATE_FILE_FAILED, + "migrate file failed: " + "%s: failed to lock file on %s", + loc->path, from->name); + goto out; + } + + inodelk_locked = _gf_true; + + /* dht_rename has changed to use entrylk on hashed subvol for + * synchronization. So, rebalance too has to acquire an entrylk on + * hashed subvol. + */ + ret = syncop_entrylk(hashed_subvol, DHT_ENTRY_SYNC_DOMAIN, &parent_loc, + loc->name, ENTRYLK_LOCK, ENTRYLK_WRLCK, NULL, NULL); + if (ret < 0) { + *fop_errno = -ret; + ret = -1; + gf_msg(this->name, GF_LOG_WARNING, *fop_errno, + DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to acquire entrylk on subvol %s", loc->path, + hashed_subvol->name); + goto out; + } + + entrylk_locked = _gf_true; + + /* Phase 1 - Data migration is in progress from now on */ + ret = syncop_lookup(from, loc, &stbuf, NULL, dict, &xattr_rsp); + if (ret) { + *fop_errno = -ret; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, *fop_errno, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: lookup failed on %s", + loc->path, from->name); + goto out; + } + + /* preserve source mode, so set the same to the destination */ + src_ia_prot = stbuf.ia_prot; + + /* Check if file can be migrated */ + ret = __is_file_migratable(this, loc, &stbuf, xattr_rsp, flag, defrag, conf, + fop_errno); + if (ret) { + if (ret == HARDLINK_MIG_INPROGRESS) + ret = 0; + goto out; + } + + /* Take care of the special files */ + if (!IA_ISREG(stbuf.ia_type)) { + /* Special files */ + ret = migrate_special_files(this, from, to, loc, &stbuf, fop_errno); + goto out; + } + + /* Try to preserve 'holes' while migrating data */ + if (stbuf.ia_size > (stbuf.ia_blocks * GF_DISK_SECTOR_SIZE)) + file_has_holes = 1; + + /* create the destination, with required modes/xattr */ + ret = __dht_rebalance_create_dst_file(this, to, from, loc, &stbuf, &dst_fd, + fop_errno, file_has_holes); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, 0, + "Create dst failed" + " on - %s for file - %s", + to->name, loc->path); + goto out; + } + + clean_dst = _gf_true; + + ret = __dht_check_free_space(this, to, from, loc, &stbuf, flag, conf, + &target_changed, &new_target, fop_errno); + if (target_changed) { + /* Can't handle for hardlinks. Marking this as failure */ + if (flag == GF_DHT_MIGRATE_HARDLINK_IN_PROGRESS || stbuf.ia_nlink > 1) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_SUBVOL_INSUFF_SPACE, + "Exiting migration for" + " file - %s. flag - %d, stbuf.ia_nlink - %d", + loc->path, flag, stbuf.ia_nlink); + ret = -1; + goto out; } - /* Phase 1 - Data migration is in progress from now on */ - ret = syncop_lookup (from, loc, dict, &stbuf, &xattr_rsp, NULL); + ret = syncop_ftruncate(to, dst_fd, 0, NULL, NULL, NULL, NULL); if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s: lookup failed on %s (%s)", - loc->path, from->name, strerror (errno)); - goto out; + gf_log(this->name, GF_LOG_WARNING, + "%s: failed to perform truncate on %s (%s)", loc->path, + to->name, strerror(-ret)); } - /* we no more require this key */ - dict_del (dict, DHT_LINKFILE_KEY); + syncop_close(dst_fd); + dst_fd = NULL; - /* preserve source mode, so set the same to the destination */ - src_ia_prot = stbuf.ia_prot; + old_target = to; + to = new_target; - /* Check if file can be migrated */ - ret = __is_file_migratable (this, loc, &stbuf, xattr_rsp, flag); - if (ret) - goto out; + clean_dst = _gf_false; - /* Take care of the special files */ - if (!IA_ISREG (stbuf.ia_type)) { - /* Special files */ - ret = migrate_special_files (this, from, to, loc, &stbuf); - goto out; + /* if the file migration is successful to this new target, then + * update the xattr on the old destination to point the new + * destination. We need to do update this only post migration + * as in case of failure the linkto needs to point to the source + * subvol */ + ret = __dht_rebalance_create_dst_file( + this, to, from, loc, &stbuf, &dst_fd, fop_errno, file_has_holes); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "Create dst failed" + " on - %s for file - %s", + to->name, loc->path); + goto out; + } else { + gf_msg(this->name, GF_LOG_INFO, 0, 0, + "destination for file " + "- %s is changed to - %s", + loc->path, to->name); + clean_dst = _gf_true; } + } + + if (ret) { + goto out; + } + + /* Open the source, and also update mode/xattr */ + ret = __dht_rebalance_open_src_file(this, from, to, loc, &stbuf, &src_fd, + &clean_src, fop_errno); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed: failed to open %s on %s", loc->path, + from->name); + goto out; + } + + /* TODO: move all xattr related operations to fd based operations */ + ret = syncop_listxattr(from, loc, &xattr, NULL, NULL); + if (ret < 0) { + *fop_errno = -ret; + gf_msg(this->name, GF_LOG_WARNING, *fop_errno, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: failed to get xattr from %s", + loc->path, from->name); + ret = -1; + goto out; + } + + /* Copying posix acls to the linkto file messes up the permissions*/ + dht_strip_out_acls(xattr); + + /* Remove the linkto xattr as we don't want to overwrite the value + * set on the dst. + */ + dict_del(xattr, conf->link_xattr_name); + + /* We need to error out if this fails as having the wrong shard xattrs + * set on the dst could cause data corruption + */ + ret = syncop_fsetxattr(to, dst_fd, xattr, 0, NULL, NULL); + if (ret < 0) { + *fop_errno = -ret; + gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to set xattr on %s", loc->path, to->name); + ret = -1; + goto out; + } - /* create the destination, with required modes/xattr */ - ret = __dht_rebalance_create_dst_file (to, from, loc, &stbuf, - dict, &dst_fd); - if (ret) - goto out; + if (xattr_rsp) { + /* we no more require this key */ + dict_del(dict, conf->link_xattr_name); + dict_unref(xattr_rsp); + } + + ret = syncop_fstat(from, src_fd, &stbuf, dict, &xattr_rsp); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:failed to lookup %s on %s ", loc->path, + from->name); + *fop_errno = -ret; + ret = -1; + goto out; + } + + /* Check again if file has hardlink */ + ret = __check_file_has_hardlink(this, loc, &stbuf, xattr_rsp, flag, defrag, + conf, fop_errno); + if (ret) { + if (ret == HARDLINK_MIG_INPROGRESS) + ret = 0; + goto out; + } + + ret = __dht_rebalance_migrate_data(this, defrag, from, to, src_fd, dst_fd, + stbuf.ia_size, file_has_holes, + fop_errno); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed: %s: failed to migrate data", loc->path); - ret = __dht_check_free_space (to, from, loc, &stbuf, flag); - if (ret) { - goto out; + ret = -1; + goto out; + } + + /* TODO: Sync the locks */ + + xdata = dict_new(); + if (!xdata || dict_set_int8(xdata, "last-fsync", 1)) { + gf_log(this->name, GF_LOG_ERROR, + "%s: failed to set last-fsync flag on " + "%s (%s)", + loc->path, to->name, strerror(ENOMEM)); + } + + ret = syncop_fsync(to, dst_fd, 0, NULL, NULL, xdata, NULL); + if (ret) { + gf_log(this->name, GF_LOG_WARNING, "%s: failed to fsync on %s (%s)", + loc->path, to->name, strerror(-ret)); + *fop_errno = -ret; + } + + /* Phase 2 - Data-Migration Complete, Housekeeping updates pending */ + + ret = syncop_fstat(from, src_fd, &new_stbuf, NULL, NULL); + if (ret < 0) { + /* Failed to get the stat info */ + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed: failed to fstat file %s on %s ", loc->path, + from->name); + *fop_errno = -ret; + ret = -1; + goto out; + } + + /* Lock the entire source file to prevent clients from taking a + lock on it as dht_lk does not handle file migration. + + This still leaves a small window where conflicting locks can + be granted to different clients. If client1 requests a blocking + lock on the src file, it will be granted after the migrating + process releases its lock. If client2 requests a lock on the dst + data file, it will also be granted, but all FOPs will be redirected + to the dst data file. + */ + + /* Take meta lock */ + + if (conf->lock_migration_enabled) { + meta_dict = dict_new(); + if (!meta_dict) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "dict_new failed"); + + *fop_errno = ENOMEM; + ret = -1; + goto out; } - /* Open the source, and also update mode/xattr */ - ret = __dht_rebalance_open_src_file (from, to, loc, &stbuf, &src_fd); + ret = dict_set_str(meta_dict, GLUSTERFS_INTERNAL_FOP_KEY, "yes"); if (ret) { - gf_log (this->name, GF_LOG_ERROR, "failed to open %s on %s", - loc->path, from->name); - goto out; + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_DICT_SET_FAILED, + "Failed to set dictionary value: key = %s," + " path = %s", + GLUSTERFS_INTERNAL_FOP_KEY, loc->path); + *fop_errno = ENOMEM; + ret = -1; + goto out; } - ret = syncop_fstat (from, src_fd, &stbuf); + ret = dict_set_int32(meta_dict, GF_META_LOCK_KEY, 1); if (ret) { - gf_log (this->name, GF_LOG_ERROR, "failed to lookup %s on %s (%s)", - loc->path, from->name, strerror (errno)); - goto out; + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Trace dict_set failed"); + *fop_errno = ENOMEM; + ret = -1; + goto out; } - /* Try to preserve 'holes' while migrating data */ - if (stbuf.ia_size > (stbuf.ia_blocks * GF_DISK_SECTOR_SIZE)) - file_has_holes = 1; - - /* All I/O happens in this function */ - ret = __dht_rebalance_migrate_data (from, to, src_fd, dst_fd, - stbuf.ia_size, file_has_holes); + ret = syncop_setxattr(from, loc, meta_dict, 0, NULL, NULL); if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s: failed to migrate data", - loc->path); - /* reset the destination back to 0 */ - ret = syncop_ftruncate (to, dst_fd, 0); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "%s: failed to reset target size back to 0 (%s)", - loc->path, strerror (errno)); - } + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Trace syncop_setxattr metalock failed"); - ret = -1; - goto out; + *fop_errno = -ret; + ret = -1; + goto out; + } else { + meta_locked = _gf_true; } + } - /* TODO: move all xattr related operations to fd based operations */ - ret = syncop_listxattr (from, loc, &xattr); - if (ret == -1) - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to get xattr from %s (%s)", - loc->path, from->name, strerror (errno)); + if (!conf->lock_migration_enabled) { + plock.l_type = F_WRLCK; + plock.l_start = 0; + plock.l_len = 0; + plock.l_whence = SEEK_SET; - ret = syncop_setxattr (to, loc, xattr, 0); - if (ret == -1) - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to set xattr on %s (%s)", - loc->path, to->name, strerror (errno)); + ret = syncop_lk(from, src_fd, F_SETLK, &plock, NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: Failed to lock on %s", + loc->path, from->name); + *fop_errno = -ret; + ret = -1; + goto out; + } - /* TODO: Sync the locks */ + p_locked = _gf_true; - ret = syncop_fsync (to, dst_fd); - if (ret) - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to fsync on %s (%s)", - loc->path, to->name, strerror (errno)); + } else { + INIT_LIST_HEAD(&locklist.list); + ret = syncop_getactivelk(from, loc, &locklist, NULL, NULL); + if (ret == 0) { + gf_log(this->name, GF_LOG_INFO, "No active locks on:%s", loc->path); - /* Phase 2 - Data-Migration Complete, Housekeeping updates pending */ + } else if (ret > 0) { + ret = syncop_setactivelk(to, loc, &locklist, NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, + DHT_MSG_LOCK_MIGRATION_FAILED, "write lock failed on:%s", + loc->path); - ret = syncop_fstat (from, src_fd, &new_stbuf); - if (ret < 0) { - /* Failed to get the stat info */ - gf_log (this->name, GF_LOG_ERROR, - "failed to fstat file %s on %s (%s)", - loc->path, from->name, strerror (errno)); - goto out; + *fop_errno = -ret; + ret = -1; + goto metaunlock; + } + } else { + gf_msg(this->name, GF_LOG_ERROR, -ret, + DHT_MSG_LOCK_MIGRATION_FAILED, + "getactivelk failed for file: %s", loc->path); + *fop_errno = -ret; } - - /* source would have both sticky bit and sgid bit set, reset it to 0, - and set the source permission on destination, if it was not set - prior to setting rebalance-modes in source */ - if (!src_ia_prot.sticky) - new_stbuf.ia_prot.sticky = 0; - - if (!src_ia_prot.sgid) - new_stbuf.ia_prot.sgid = 0; - - /* TODO: if the source actually had sticky bit, or sgid bit set, - we are not handling it */ - - ret = syncop_fsetattr (to, dst_fd, &new_stbuf, - (GF_SET_ATTR_UID | GF_SET_ATTR_GID | - GF_SET_ATTR_MODE), NULL, NULL); + } + + /* source would have both sticky bit and sgid bit set, reset it to 0, + and set the source permission on destination, if it was not set + prior to setting rebalance-modes in source */ + if (!src_ia_prot.sticky) + new_stbuf.ia_prot.sticky = 0; + + if (!src_ia_prot.sgid) + new_stbuf.ia_prot.sgid = 0; + + /* TODO: if the source actually had sticky bit, or sgid bit set, + we are not handling it */ + + ret = syncop_fsetattr( + to, dst_fd, &new_stbuf, + (GF_SET_ATTR_UID | GF_SET_ATTR_GID | GF_SET_ATTR_MODE), NULL, NULL, + NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: failed to perform setattr on %s ", + loc->path, to->name); + *fop_errno = -ret; + ret = -1; + goto metaunlock; + } + + /* Because 'futimes' is not portable */ + ret = syncop_setattr(to, loc, &new_stbuf, + (GF_SET_ATTR_MTIME | GF_SET_ATTR_ATIME), NULL, NULL, + NULL, NULL); + if (ret) { + gf_log(this->name, GF_LOG_WARNING, + "%s: failed to perform setattr on %s ", loc->path, to->name); + *fop_errno = -ret; + } + + if (target_changed) { + dict_del(dict, GLUSTERFS_POSIXLK_COUNT); + ret = dict_set_str(dict, conf->link_xattr_name, to->name); if (ret) { - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to perform setattr on %s (%s)", - loc->path, to->name, strerror (errno)); + gf_log(this->name, GF_LOG_ERROR, + "failed to set xattr in dict for %s (linkto:%s)", loc->path, + to->name); + *fop_errno = ENOMEM; + ret = -1; + goto out; + } + + ret = syncop_setxattr(old_target, loc, dict, 0, NULL, NULL); + if (ret && -ret != ESTALE && -ret != ENOENT) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "failed to set xattr on %s in %s", loc->path, + old_target->name); + *fop_errno = -ret; + ret = -1; + goto out; + } else if (-ret == ESTALE || -ret == ENOENT) { + /* The failure ESTALE indicates that the linkto + * file on the hashed subvol might have been deleted. + * In this case will create a linkto file with new target + * as linkto xattr value*/ + linkto_fd = fd_create(loc->inode, DHT_REBALANCE_PID); + if (!linkto_fd) { + gf_msg(this->name, GF_LOG_ERROR, errno, + DHT_MSG_MIGRATE_FILE_FAILED, "%s: fd create failed", + loc->path); + *fop_errno = ENOMEM; + ret = -1; goto out; + } + ret = syncop_create(old_target, loc, O_RDWR, DHT_LINKFILE_MODE, + linkto_fd, NULL, dict, NULL); + if (ret != 0 && -ret != EEXIST && -ret != ESTALE) { + *fop_errno = -ret; + ret = -1; + gf_msg(this->name, GF_LOG_ERROR, -ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "failed to create linkto file on %s in %s", loc->path, + old_target->name); + goto out; + } else if (ret == 0) { + ret = syncop_fsetattr(old_target, linkto_fd, &stbuf, + (GF_SET_ATTR_UID | GF_SET_ATTR_GID), NULL, + NULL, NULL, NULL); + if (ret < 0) { + *fop_errno = -ret; + gf_msg(this->name, GF_LOG_ERROR, -ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "chown failed for %s on %s", loc->path, + old_target->name); + } + } + } + } + + clean_dst = _gf_false; + + /* Posix acls are not set on DHT linkto files as part of the initial + * initial xattrs set on the dst file, so these need + * to be set on the dst file after the linkto attrs are removed. + * TODO: Optimize this. + */ + if (xattr) { + dict_unref(xattr); + xattr = NULL; + } + + /* Set only the Posix ACLs this time */ + ret = syncop_getxattr(from, loc, &xattr, POSIX_ACL_ACCESS_XATTR, NULL, + NULL); + if (ret < 0) { + if ((-ret != ENODATA) && (-ret != ENOATTR)) { + gf_msg(this->name, GF_LOG_WARNING, -ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: failed to get xattr from %s", + loc->path, from->name); + *fop_errno = -ret; + } + } else { + ret = syncop_setxattr(to, loc, xattr, 0, NULL, NULL); + if (ret < 0) { + /* Potential problem here where Posix ACLs will + * not be set on the target file */ + + gf_msg(this->name, GF_LOG_WARNING, -ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: failed to set xattr on %s", + loc->path, to->name); + *fop_errno = -ret; } + } + + /* The src file is being unlinked after this so we don't need + to clean it up */ + clean_src = _gf_false; + + /* Make the source as a linkfile first before deleting it */ + empty_iatt.ia_prot.sticky = 1; + ret = syncop_fsetattr(from, src_fd, &empty_iatt, GF_SET_ATTR_MODE, NULL, + NULL, NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:" + "%s: failed to perform setattr on %s ", + loc->path, from->name); + *fop_errno = -ret; + ret = -1; + goto metaunlock; + } + + /* Free up the data blocks on the source node, as the whole + file is migrated */ + ret = syncop_ftruncate(from, src_fd, 0, NULL, NULL, NULL, NULL); + if (ret) { + gf_log(this->name, GF_LOG_WARNING, + "%s: failed to perform truncate on %s (%s)", loc->path, + from->name, strerror(-ret)); + *fop_errno = -ret; + } + + /* remove the 'linkto' xattr from the destination */ + ret = syncop_fremovexattr(to, dst_fd, conf->link_xattr_name, 0, NULL); + if (ret) { + gf_log(this->name, GF_LOG_WARNING, + "%s: failed to perform removexattr on %s (%s)", loc->path, + to->name, strerror(-ret)); + *fop_errno = -ret; + } + + /* Do a stat and check the gfid before unlink */ + + /* + * Cached file changes its state from non-linkto to linkto file after + * migrating data. If lookup from any other mount-point is performed, + * converted-linkto-cached file will be treated as a stale and will be + * unlinked. But by this time, file is already migrated. So further + * failure because of ENOENT should not be treated as error + */ + + ret = syncop_stat(from, loc, &empty_iatt, NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_WARNING, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to do a stat on %s", loc->path, from->name); + + if (-ret != ENOENT) { + *fop_errno = -ret; + ret = -1; + goto metaunlock; + } + + rcvd_enoent_from_src = 1; + } - /* Because 'futimes' is not portable */ - ret = syncop_setattr (to, loc, &new_stbuf, - (GF_SET_ATTR_MTIME | GF_SET_ATTR_ATIME), - NULL, NULL); + if ((gf_uuid_compare(empty_iatt.ia_gfid, loc->gfid) == 0) && + (!rcvd_enoent_from_src) && delete_src_linkto) { + /* take out the source from namespace */ + ret = syncop_unlink(from, loc, NULL, NULL); if (ret) { - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to perform setattr on %s (%s)", - loc->path, to->name, strerror (errno)); + gf_msg(this->name, GF_LOG_WARNING, -ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to perform unlink on %s", loc->path, from->name); + *fop_errno = -ret; + ret = -1; + goto metaunlock; } + } + + ret = syncop_lookup(this, loc, NULL, NULL, NULL, NULL); + if (ret) { + gf_msg_debug(this->name, -ret, + "%s: failed to lookup the file on subvolumes", loc->path); + *fop_errno = -ret; + } + + gf_msg(this->name, log_level, 0, DHT_MSG_MIGRATE_FILE_COMPLETE, + "completed migration of %s from subvolume %s to %s", loc->path, + from->name, to->name); + + ret = 0; - /* Make the source as a linkfile first before deleting it */ - empty_iatt.ia_prot.sticky = 1; - ret = syncop_fsetattr (from, src_fd, &empty_iatt, - GF_SET_ATTR_MODE, NULL, NULL); +metaunlock: + + if (conf->lock_migration_enabled && meta_locked) { + dict_del(meta_dict, GF_META_LOCK_KEY); + + ret = dict_set_int32(meta_dict, GF_META_UNLOCK_KEY, 1); if (ret) { - gf_log (this->name, GF_LOG_WARNING, \ - "%s: failed to perform setattr on %s (%s)", - loc->path, from->name, strerror (errno)); - goto out; + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Trace dict_set failed"); + + *fop_errno = ENOMEM; + ret = -1; + goto out; } - /* Do a stat and check the gfid before unlink */ - ret = syncop_stat (from, loc, &empty_iatt); + if (clean_dst == _gf_false) + ret = dict_set_int32(meta_dict, "status", 1); + else + ret = dict_set_int32(meta_dict, "status", 0); + if (ret) { - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to do a stat on %s (%s)", - loc->path, from->name, strerror (errno)); - goto out; - } + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "Trace dict_set failed"); - if (uuid_compare (empty_iatt.ia_gfid, loc->gfid) == 0) { - /* take out the source from namespace */ - ret = syncop_unlink (from, loc); - if (ret) { - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to perform unlink on %s (%s)", - loc->path, from->name, strerror (errno)); - goto out; - } + *fop_errno = ENOMEM; + ret = -1; + goto out; } - /* Free up the data blocks on the source node, as the whole - file is migrated */ - ret = syncop_ftruncate (from, src_fd, 0); + ret = syncop_setxattr(from, loc, meta_dict, 0, NULL, NULL); if (ret) { - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to perform truncate on %s (%s)", - loc->path, from->name, strerror (errno)); + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Trace syncop_setxattr meta unlock failed"); + + *fop_errno = -ret; + ret = -1; + goto out; } + } - /* remove the 'linkto' xattr from the destination */ - ret = syncop_fremovexattr (to, dst_fd, DHT_LINKFILE_KEY); - if (ret) { - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to perform removexattr on %s (%s)", - loc->path, to->name, strerror (errno)); +out: + if (clean_src) { + /* Revert source mode and xattr changes*/ + lk_ret = __dht_migration_cleanup_src_file(this, loc, src_fd, from, + &src_ia_prot); + if (lk_ret) { + gf_msg(this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to cleanup source file on %s", loc->path, + from->name); + } + } + + /* reset the destination back to 0 */ + if (clean_dst) { + lk_ret = syncop_ftruncate(to, dst_fd, 0, NULL, NULL, NULL, NULL); + if (lk_ret) { + gf_msg(this->name, GF_LOG_ERROR, -lk_ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed: " + "%s: failed to reset target size back to 0", + loc->path); } + } - ret = syncop_lookup (this, loc, NULL, NULL, NULL, NULL); - if (ret) { - gf_log (this->name, GF_LOG_DEBUG, - "%s: failed to lookup the file on subvolumes (%s)", - loc->path, strerror (errno)); + if (inodelk_locked) { + flock.l_type = F_UNLCK; + + lk_ret = syncop_inodelk(from, DHT_FILE_MIGRATE_DOMAIN, &tmp_loc, + F_SETLK, &flock, NULL, NULL); + if (lk_ret < 0) { + gf_msg(this->name, GF_LOG_WARNING, -lk_ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to unlock file on %s", loc->path, from->name); } + } + + if (entrylk_locked) { + lk_ret = syncop_entrylk(hashed_subvol, DHT_ENTRY_SYNC_DOMAIN, + &parent_loc, loc->name, ENTRYLK_UNLOCK, + ENTRYLK_UNLOCK, NULL, NULL); + if (lk_ret < 0) { + gf_msg(this->name, GF_LOG_WARNING, -lk_ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to unlock entrylk on %s", loc->path, + hashed_subvol->name); + } + } - gf_log (this->name, GF_LOG_INFO, - "completed migration of %s from subvolume %s to %s", - loc->path, from->name, to->name); + if (p_locked) { + plock.l_type = F_UNLCK; + lk_ret = syncop_lk(from, src_fd, F_SETLK, &plock, NULL, NULL); - ret = 0; -out: - if (dict) - dict_unref (dict); + if (lk_ret < 0) { + gf_msg(this->name, GF_LOG_WARNING, -lk_ret, + DHT_MSG_MIGRATE_FILE_FAILED, + "%s: failed to unlock file on %s", loc->path, from->name); + } + } - if (xattr) - dict_unref (xattr); - if (xattr_rsp) - dict_unref (xattr_rsp); + lk_ret = syncop_removexattr(to, loc, GF_PROTECT_FROM_EXTERNAL_WRITES, NULL, + NULL); + if (lk_ret && (lk_ret != -ENODATA) && (lk_ret != -ENOATTR)) { + gf_msg(this->name, GF_LOG_WARNING, -lk_ret, 0, + "%s: removexattr failed key %s", loc->path, + GF_PROTECT_FROM_EXTERNAL_WRITES); + } - if (dst_fd) - syncop_close (dst_fd); - if (src_fd) - syncop_close (src_fd); + if (dict) + dict_unref(dict); - return ret; -} + if (xattr) + dict_unref(xattr); + if (xattr_rsp) + dict_unref(xattr_rsp); -static int -rebalance_task (void *data) -{ - int ret = -1; - dht_local_t *local = NULL; - call_frame_t *frame = NULL; + if (dst_fd) + syncop_close(dst_fd); - frame = data; + if (src_fd) + syncop_close(src_fd); + if (linkto_fd) + syncop_close(linkto_fd); - local = frame->local; + if (xdata) + dict_unref(xdata); - /* This function is 'synchrounous', hence if it returns, - we are done with the task */ - ret = dht_migrate_file (THIS, &local->loc, local->rebalance.from_subvol, - local->rebalance.target_node, local->flags); + loc_wipe(&tmp_loc); + loc_wipe(&parent_loc); - return ret; + return ret; } static int -rebalance_task_completion (int op_ret, call_frame_t *sync_frame, void *data) +rebalance_task(void *data) { - int ret = -1; - uint64_t layout_int = 0; - dht_layout_t *layout = 0; - xlator_t *this = NULL; - dht_local_t *local = NULL; - int32_t op_errno = EINVAL; - - this = THIS; - local = sync_frame->local; - - if (!op_ret) { - /* Make sure we have valid 'layout' in inode ctx - after the operation */ - ret = inode_ctx_del (local->loc.inode, this, &layout_int); - if (!ret && layout_int) { - layout = (dht_layout_t *)(long)layout_int; - dht_layout_unref (this, layout); - } + int ret = -1; + dht_local_t *local = NULL; + call_frame_t *frame = NULL; + int fop_errno = 0; - ret = dht_layout_preset (this, local->rebalance.target_node, - local->loc.inode); - if (ret) - gf_log (this->name, GF_LOG_WARNING, - "%s: failed to set inode ctx", local->loc.path); - } + frame = data; - if (op_ret == -1) { - /* Failure of migration process, mostly due to write process. - as we can't preserve the exact errno, lets say there was - no space to migrate-data - */ - op_errno = ENOSPC; - } + local = frame->local; - if (op_ret == 1) { - /* migration didn't happen, but is not a failure, let the user - understand that he doesn't have permission to migrate the - file. - */ - op_ret = -1; - op_errno = EPERM; - } + /* This function is 'synchrounous', hence if it returns, + we are done with the task */ + ret = dht_migrate_file(THIS, &local->loc, local->rebalance.from_subvol, + local->rebalance.target_node, local->flags, + &fop_errno); - DHT_STACK_UNWIND (setxattr, sync_frame, op_ret, op_errno, NULL); - return 0; + return ret; } -int -dht_start_rebalance_task (xlator_t *this, call_frame_t *frame) +static int +rebalance_task_completion(int op_ret, call_frame_t *sync_frame, void *data) { - int ret = -1; - dht_conf_t *conf = NULL; + int32_t op_errno = EINVAL; + + if (op_ret == -1) { + /* Failure of migration process, mostly due to write process. + as we can't preserve the exact errno, lets say there was + no space to migrate-data + */ + op_errno = ENOSPC; + } else if (op_ret == 1) { + /* migration didn't happen, but is not a failure, let the user + understand that he doesn't have permission to migrate the + file. + */ + op_ret = -1; + op_errno = EPERM; + } else if (op_ret != 0) { + op_errno = -op_ret; + op_ret = -1; + } + + DHT_STACK_UNWIND(setxattr, sync_frame, op_ret, op_errno, NULL); + return 0; +} - conf = this->private; +int +dht_start_rebalance_task(xlator_t *this, call_frame_t *frame) +{ + int ret = -1; - ret = synctask_new (conf->env, rebalance_task, - rebalance_task_completion, - frame, frame); - return ret; + ret = synctask_new(this->ctx->env, rebalance_task, + rebalance_task_completion, frame, frame); + return ret; } int -gf_listener_stop (void) +gf_listener_stop(xlator_t *this) { - glusterfs_ctx_t *ctx = NULL; - cmd_args_t *cmd_args = NULL; - int ret = 0; - xlator_t *this = NULL; - - ctx = glusterfs_ctx_get (); - GF_ASSERT (ctx); - cmd_args = &ctx->cmd_args; - if (cmd_args->sock_file) { - ret = unlink (cmd_args->sock_file); - if (ret && (ENOENT == errno)) { - ret = 0; - } + glusterfs_ctx_t *ctx = NULL; + cmd_args_t *cmd_args = NULL; + int ret = 0; + + ctx = this->ctx; + GF_ASSERT(ctx); + cmd_args = &ctx->cmd_args; + if (cmd_args->sock_file) { + ret = sys_unlink(cmd_args->sock_file); + if (ret && (ENOENT == errno)) { + ret = 0; } - - if (ret) { - this = THIS; - gf_log (this->name, GF_LOG_ERROR, "Failed to unlink listener " - "socket %s, error: %s", cmd_args->sock_file, - strerror (errno)); - } - return ret; + } + + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, errno, DHT_MSG_SOCKET_ERROR, + "Failed to unlink listener " + "socket %s", + cmd_args->sock_file); + } + return ret; } void -dht_build_root_inode (xlator_t *this, inode_t **inode) +dht_build_root_inode(xlator_t *this, inode_t **inode) { - inode_table_t *itable = NULL; - uuid_t root_gfid = {0, }; + inode_table_t *itable = NULL; + static uuid_t root_gfid = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}; - itable = inode_table_new (0, this); - if (!itable) - return; + itable = inode_table_new(0, this); + if (!itable) + return; - root_gfid[15] = 1; - *inode = inode_find (itable, root_gfid); + *inode = inode_find(itable, root_gfid); } void -dht_build_root_loc (inode_t *inode, loc_t *loc) +dht_build_root_loc(inode_t *inode, loc_t *loc) { - loc->path = "/"; - loc->inode = inode; - loc->inode->ia_type = IA_IFDIR; - memset (loc->gfid, 0, 16); - loc->gfid[15] = 1; + loc->path = "/"; + loc->inode = inode; + loc->inode->ia_type = IA_IFDIR; + memset(loc->gfid, 0, 16); + loc->gfid[15] = 1; } - /* return values: 1 -> error, bug ignore and continue 0 -> proceed -1 -> error, handle it */ int32_t -gf_defrag_handle_migrate_error (int32_t op_errno, gf_defrag_info_t *defrag) +gf_defrag_handle_migrate_error(int32_t op_errno, gf_defrag_info_t *defrag) { - /* if errno is not ENOSPC or ENOTCONN, we can still continue - with rebalance process */ - if ((errno != ENOSPC) || (errno != ENOTCONN)) - return 1; - - if (errno == ENOTCONN) { - /* Most probably mount point went missing (mostly due - to a brick down), say rebalance failure to user, - let him restart it if everything is fine */ - defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; - return -1; + int ret = 0; + /* if errno is not ENOTCONN, we can still continue + with rebalance process */ + if (op_errno != ENOTCONN) { + ret = 1; + goto out; + } + + if (op_errno == ENOTCONN) { + /* Most probably mount point went missing (mostly due + to a brick down), say rebalance failure to user, + let him restart it if everything is fine */ + defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; + ret = -1; + goto out; + } + +out: + return ret; +} + +static gf_boolean_t +gf_defrag_pattern_match(gf_defrag_info_t *defrag, char *name, uint64_t size) +{ + gf_defrag_pattern_list_t *trav = NULL; + gf_boolean_t match = _gf_false; + gf_boolean_t ret = _gf_false; + + GF_VALIDATE_OR_GOTO("dht", defrag, out); + + trav = defrag->defrag_pattern; + while (trav) { + if (!fnmatch(trav->path_pattern, name, FNM_NOESCAPE)) { + match = _gf_true; + break; } + trav = trav->next; + } - if (errno == ENOSPC) { - /* rebalance process itself failed, may be - remote brick went down, or write failed due to - disk full etc etc.. */ - defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; - return -1; + if ((match == _gf_true) && (size >= trav->size)) + ret = _gf_true; + +out: + return ret; +} + +int +dht_dfreaddirp_done(dht_dfoffset_ctx_t *offset_var, int cnt) +{ + int i; + int result = 1; + + for (i = 0; i < cnt; i++) { + if (offset_var[i].readdir_done == 0) { + result = 0; + break; } + } + return result; +} - return 0; +int static gf_defrag_ctx_subvols_init(dht_dfoffset_ctx_t *offset_var, + xlator_t *this) +{ + int i; + dht_conf_t *conf = NULL; + + conf = this->private; + + if (!conf) + return -1; + + for (i = 0; i < conf->local_subvols_cnt; i++) { + offset_var[i].this = conf->local_subvols[i]; + offset_var[i].offset = (off_t)0; + offset_var[i].readdir_done = 0; + } + + return 0; +} + +static int +dht_get_first_non_null_index(subvol_nodeuuids_info_t *entry) +{ + int i = 0; + int index = 0; + + for (i = 0; i < entry->count; i++) { + if (!gf_uuid_is_null(entry->elements[i].uuid)) { + index = i; + goto out; + } + } + + if (i == entry->count) { + index = -1; + } +out: + return index; } -/* We do a depth first traversal of directories. But before we move into - * subdirs, we complete the data migration of those directories whose layouts - * have been fixed +/* Return value + * 0 : this node does not migrate the file + * 1 : this node migrates the file + * + * Use the hash value of the gfid to determine which node will migrate files. + * Using the gfid instead of the name also ensures that the same node handles + * all hardlinks. */ -int -gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, - dict_t *migrate_data) +gf_boolean_t +gf_defrag_should_i_migrate(xlator_t *this, int local_subvol_index, uuid_t gfid) { - int ret = -1; - loc_t entry_loc = {0,}; - fd_t *fd = NULL; - gf_dirent_t entries; - gf_dirent_t *tmp = NULL; - gf_dirent_t *entry = NULL; - gf_boolean_t free_entries = _gf_false; - off_t offset = 0; - dict_t *dict = NULL; - struct iatt iatt = {0,}; - int32_t op_errno = 0; - char *uuid_str = NULL; - uuid_t node_uuid = {0,}; - int readdir_operrno = 0; - - gf_log (this->name, GF_LOG_INFO, "migrate data called on %s", - loc->path); - fd = fd_create (loc->inode, defrag->pid); - if (!fd) { - gf_log (this->name, GF_LOG_ERROR, "Failed to create fd"); - goto out; + gf_boolean_t ret = _gf_false; + int i = local_subvol_index; + char *str = NULL; + uint32_t hashval = 0; + int32_t index = 0; + dht_conf_t *conf = NULL; + char buf[UUID_CANONICAL_FORM_LEN + 1] = { + 0, + }; + subvol_nodeuuids_info_t *entry = NULL; + + conf = this->private; + + /* Pure distribute. A subvol in this case + will be handled by only one node */ + + entry = &(conf->local_nodeuuids[i]); + if (entry->count == 1) { + return 1; + } + + str = uuid_utoa_r(gfid, buf); + if (dht_hash_compute(this, 0, str, &hashval) == 0) { + index = (hashval % entry->count); + if (entry->elements[index].info == REBAL_NODEUUID_MINE) { + /* Index matches this node's nodeuuid.*/ + ret = _gf_true; + goto out; } - ret = syncop_opendir (this, loc, fd); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Failed to open dir %s", - loc->path); + /* Brick down - some other node has to migrate these files*/ + if (gf_uuid_is_null(entry->elements[index].uuid)) { + /* Fall back to the first non-null index */ + index = dht_get_first_non_null_index(entry); + + if (index == -1) { + /* None of the bricks in the subvol are up. + * CHILD_DOWN will kill the process soon */ + + return _gf_false; + } + + if (entry->elements[index].info == REBAL_NODEUUID_MINE) { + /* Index matches this node's nodeuuid.*/ + ret = _gf_true; goto out; + } } + } +out: + return ret; +} - INIT_LIST_HEAD (&entries.list); +int +gf_defrag_migrate_single_file(void *opaque) +{ + xlator_t *this = NULL; + dht_conf_t *conf = NULL; + gf_defrag_info_t *defrag = NULL; + int ret = 0; + gf_dirent_t *entry = NULL; + struct timeval start = { + 0, + }; + loc_t entry_loc = { + 0, + }; + loc_t *loc = NULL; + struct iatt iatt = { + 0, + }; + dict_t *migrate_data = NULL; + struct timeval end = { + 0, + }; + double elapsed = { + 0, + }; + struct dht_container *rebal_entry = NULL; + inode_t *inode = NULL; + xlator_t *hashed_subvol = NULL; + xlator_t *cached_subvol = NULL; + call_frame_t *statfs_frame = NULL; + xlator_t *old_THIS = NULL; + data_t *tmp = NULL; + int fop_errno = 0; + gf_dht_migrate_data_type_t rebal_type = GF_DHT_MIGRATE_DATA; + char value[MAX_REBAL_TYPE_SIZE] = { + 0, + }; + struct iatt *iatt_ptr = NULL; + gf_boolean_t update_skippedcount = _gf_true; + int i = 0; + gf_boolean_t should_i_migrate = 0; + + rebal_entry = (struct dht_container *)opaque; + if (!rebal_entry) { + gf_log("DHT", GF_LOG_ERROR, "rebal_entry is NULL"); + ret = -1; + goto out; + } - while ((ret = syncop_readdirp (this, fd, 131072, offset, NULL, - &entries)) != 0) { - if (ret < 0) - break; + this = rebal_entry->this; - /* Need to keep track of ENOENT errno, that means, there is no - need to send more readdirp() */ - readdir_operrno = errno; + conf = this->private; - free_entries = _gf_true; + defrag = conf->defrag; - if (list_empty (&entries.list)) - break; - list_for_each_entry_safe (entry, tmp, &entries.list, list) { - if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { - ret = 1; - goto out; - } + loc = rebal_entry->parent_loc; - offset = entry->d_off; + migrate_data = rebal_entry->migrate_data; - if (!strcmp (entry->d_name, ".") || - !strcmp (entry->d_name, "..")) - continue; + entry = rebal_entry->df_entry; + iatt_ptr = &entry->d_stat; - if (IA_ISDIR (entry->d_stat.ia_type)) - continue; + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + ret = -1; + goto out; + } - defrag->num_files_lookedup++; + if (defrag->stats == _gf_true) { + gettimeofday(&start, NULL); + } - loc_wipe (&entry_loc); - ret =dht_build_child_loc (this, &entry_loc, loc, - entry->d_name); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Child loc" - " build failed"); - goto out; - } + if (defrag->defrag_pattern && + (gf_defrag_pattern_match(defrag, entry->d_name, + entry->d_stat.ia_size) == _gf_false)) { + gf_log(this->name, GF_LOG_ERROR, "pattern_match failed"); + goto out; + } - if (uuid_is_null (entry->d_stat.ia_gfid)) { - gf_log (this->name, GF_LOG_ERROR, "%s/%s" - " gfid not present", loc->path, - entry->d_name); - continue; - } + memset(&entry_loc, 0, sizeof(entry_loc)); - uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid); + ret = dht_build_child_loc(this, &entry_loc, loc, entry->d_name); + if (ret) { + LOCK(&defrag->lock); + { + defrag->total_failures += 1; + } + UNLOCK(&defrag->lock); - if (uuid_is_null (loc->gfid)) { - gf_log (this->name, GF_LOG_ERROR, "%s/%s" - " gfid not present", loc->path, - entry->d_name); - continue; - } + ret = 0; - uuid_copy (entry_loc.pargfid, loc->gfid); + gf_log(this->name, GF_LOG_ERROR, "Child loc build failed"); - entry_loc.inode->ia_type = entry->d_stat.ia_type; + goto out; + } - ret = syncop_lookup (this, &entry_loc, NULL, &iatt, - NULL, NULL); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s" - " lookup failed", entry_loc.path); - continue; - } + should_i_migrate = gf_defrag_should_i_migrate( + this, rebal_entry->local_subvol_index, entry->d_stat.ia_gfid); - ret = syncop_getxattr (this, &entry_loc, &dict, - GF_XATTR_NODE_UUID_KEY); - if(ret < 0) { - gf_log (this->name, GF_LOG_ERROR, "Failed to " - "get node-uuid for %s", entry_loc.path); - continue; - } + gf_uuid_copy(entry_loc.gfid, entry->d_stat.ia_gfid); - ret = dict_get_str (dict, GF_XATTR_NODE_UUID_KEY, - &uuid_str); - if(ret < 0) { - gf_log (this->name, GF_LOG_ERROR, "Failed to " - "get node-uuid from dict for %s", - entry_loc.path); - continue; - } + gf_uuid_copy(entry_loc.pargfid, loc->gfid); - if (uuid_parse (uuid_str, node_uuid)) { - gf_log (this->name, GF_LOG_ERROR, "uuid_parse " - "failed for %s", entry_loc.path); - continue; - } + ret = syncop_lookup(this, &entry_loc, &iatt, NULL, NULL, NULL); - /* if file belongs to different node, skip migration - * the other node will take responsibility of migration - */ - if (uuid_compare (node_uuid, defrag->node_uuid)) { - gf_log (this->name, GF_LOG_TRACE, "%s does not" - "belong to this node", entry_loc.path); - continue; - } + if (!should_i_migrate) { + /* this node isn't supposed to migrate the file. suppressing any + * potential error from lookup as this file is under migration by + * another node */ + if (ret) { + gf_msg_debug(this->name, -ret, + "Ignoring lookup failure: node isn't migrating %s", + entry_loc.path); + ret = 0; + } + gf_msg_debug(this->name, 0, "Don't migrate %s ", entry_loc.path); + goto out; + } + + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed: %s lookup failed", entry_loc.path); + + /* Increase failure count only for remove-brick op, so that + * user is warned to check the removed-brick for any files left + * unmigrated + */ + if (conf->decommission_subvols_cnt) { + LOCK(&defrag->lock); + { + defrag->total_failures += 1; + } + UNLOCK(&defrag->lock); + } - uuid_str = NULL; + ret = 0; + goto out; + } - dict_del (dict, GF_XATTR_NODE_UUID_KEY); + iatt_ptr = &iatt; + hashed_subvol = dht_subvol_get_hashed(this, &entry_loc); + if (!hashed_subvol) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_HASHED_SUBVOL_GET_FAILED, + "Failed to get hashed subvol for %s", entry_loc.path); + ret = 0; + goto out; + } - /* if distribute is present, it will honor this key. - * -1 is returned if distribute is not present or file - * doesn't have a link-file. If file has link-file, the - * path of link-file will be the value, and also that - * guarantees that file has to be mostly migrated */ + cached_subvol = dht_subvol_get_cached(this, entry_loc.inode); + if (!cached_subvol) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_CACHED_SUBVOL_GET_FAILED, + "Failed to get cached subvol for %s", entry_loc.path); - ret = syncop_getxattr (this, &entry_loc, &dict, - GF_XATTR_LINKINFO_KEY); - if (ret < 0) { - gf_log (this->name, GF_LOG_TRACE, "failed to " - "get link-to key for %s", - entry_loc.path); - continue; - } + ret = 0; + goto out; + } - ret = syncop_setxattr (this, &entry_loc, migrate_data, - 0); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "migrate-data" - " failed for %s", entry_loc.path); - defrag->total_failures +=1; + if (hashed_subvol == cached_subvol) { + ret = 0; + goto out; + } + + inode = inode_link(entry_loc.inode, entry_loc.parent, entry->d_name, &iatt); + inode_unref(entry_loc.inode); + /* use the inode returned by inode_link */ + entry_loc.inode = inode; + + old_THIS = THIS; + THIS = this; + statfs_frame = create_frame(this, this->ctx->pool); + if (!statfs_frame) { + gf_msg(this->name, GF_LOG_ERROR, DHT_MSG_NO_MEMORY, ENOMEM, + "Insufficient memory. Frame creation failed"); + ret = -1; + goto out; + } + + /* async statfs information for honoring min-free-disk */ + dht_get_du_info(statfs_frame, this, loc); + THIS = old_THIS; + + tmp = dict_get(migrate_data, GF_XATTR_FILE_MIGRATE_KEY); + if (tmp) { + memcpy(value, tmp->data, tmp->len); + if (strcmp(value, "force") == 0) + rebal_type = GF_DHT_MIGRATE_DATA_EVEN_IF_LINK_EXISTS; + + if (conf->decommission_in_progress) + rebal_type = GF_DHT_MIGRATE_HARDLINK; + } + + ret = dht_migrate_file(this, &entry_loc, cached_subvol, hashed_subvol, + rebal_type, &fop_errno); + if (ret == 1) { + if (fop_errno == ENOSPC) { + gf_msg_debug(this->name, 0, + "migrate-data skipped for" + " %s due to space constraints", + entry_loc.path); + + /* For remove-brick case if the source is not one of the + * removed-brick, do not mark the error as failure */ + if (conf->decommission_subvols_cnt) { + for (i = 0; i < conf->subvolume_cnt; i++) { + if (conf->decommissioned_bricks[i] == cached_subvol) { + LOCK(&defrag->lock); + { + defrag->total_failures += 1; + update_skippedcount = _gf_false; } + UNLOCK(&defrag->lock); - if (ret == -1) { - op_errno = errno; - ret = gf_defrag_handle_migrate_error (op_errno, - defrag); - - if (!ret) - gf_log (this->name, GF_LOG_DEBUG, - "migrate-data on %s failed: %s", - entry_loc.path, - strerror (op_errno)); - else if (ret == 1) - continue; - else if (ret == -1) - goto out; - } + break; + } + } + } - LOCK (&defrag->lock); - { - defrag->total_files += 1; - defrag->total_data += iatt.ia_size; - } - UNLOCK (&defrag->lock); + if (update_skippedcount) { + LOCK(&defrag->lock); + { + defrag->skipped += 1; } + UNLOCK(&defrag->lock); + + gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_MIGRATE_FILE_SKIPPED, + "File migration skipped for %s.", entry_loc.path); + } + + } else if (fop_errno == ENOTSUP) { + gf_msg_debug(this->name, 0, + "migrate-data skipped for" + " hardlink %s ", + entry_loc.path); + LOCK(&defrag->lock); + { + defrag->skipped += 1; + } + UNLOCK(&defrag->lock); + + gf_msg(this->name, GF_LOG_INFO, 0, DHT_MSG_MIGRATE_FILE_SKIPPED, + "File migration skipped for %s.", entry_loc.path); + } - gf_dirent_free (&entries); - free_entries = _gf_false; - INIT_LIST_HEAD (&entries.list); + ret = 0; + goto out; + } else if (ret < 0) { + if (fop_errno != EEXIST) { + gf_msg(this->name, GF_LOG_ERROR, fop_errno, + DHT_MSG_MIGRATE_FILE_FAILED, "migrate-data failed for %s", + entry_loc.path); + + LOCK(&defrag->lock); + { + defrag->total_failures += 1; + } + UNLOCK(&defrag->lock); + } - if (readdir_operrno == ENOENT) - break; + ret = gf_defrag_handle_migrate_error(fop_errno, defrag); + + if (!ret) { + gf_msg(this->name, GF_LOG_ERROR, fop_errno, + DHT_MSG_MIGRATE_FILE_FAILED, + "migrate-data on %s failed:", entry_loc.path); + } else if (ret == 1) { + ret = 0; } - ret = 0; + + goto out; + } + + LOCK(&defrag->lock); + { + defrag->total_files += 1; + defrag->total_data += iatt.ia_size; + } + UNLOCK(&defrag->lock); + + if (defrag->stats == _gf_true) { + gettimeofday(&end, NULL); + elapsed = gf_tvdiff(&start, &end); + gf_log(this->name, GF_LOG_INFO, + "Migration of " + "file:%s size:%" PRIu64 + " bytes took %.2f" + "secs and ret: %d", + entry_loc.name, iatt.ia_size, elapsed / 1e6, ret); + } + out: - if (free_entries) - gf_dirent_free (&entries); + if (statfs_frame) { + STACK_DESTROY(statfs_frame->root); + } - loc_wipe (&entry_loc); + if (iatt_ptr) { + LOCK(&defrag->lock); + { + defrag->size_processed += iatt_ptr->ia_size; + } + UNLOCK(&defrag->lock); + } + loc_wipe(&entry_loc); - if (dict) - dict_unref(dict); + return ret; +} - if (fd) - fd_unref (fd); - return ret; +void * +gf_defrag_task(void *opaque) +{ + struct list_head *q_head = NULL; + struct dht_container *iterator = NULL; + gf_defrag_info_t *defrag = NULL; + int ret = 0; + pid_t pid = GF_CLIENT_PID_DEFRAG; + + defrag = (gf_defrag_info_t *)opaque; + if (!defrag) { + gf_msg("dht", GF_LOG_ERROR, 0, 0, "defrag is NULL"); + goto out; + } + + syncopctx_setfspid(&pid); + + q_head = &(defrag->queue[0].list); + + /* The following while loop will dequeue one entry from the defrag->queue + under lock. We will update the defrag->global_error only when there + is an error which is critical to stop the rebalance process. The stop + message will be intimated to other migrator threads by setting the + defrag->defrag_status to GF_DEFRAG_STATUS_FAILED. + + In defrag->queue, a low watermark (MIN_MIGRATE_QUEUE_COUNT) is + maintained so that crawler does not starve the file migration + workers and a high watermark (MAX_MIGRATE_QUEUE_COUNT) so that + crawler does not go far ahead in filling up the queue. + */ + + while (_gf_true) { + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + pthread_cond_broadcast(&defrag->rebalance_crawler_alarm); + pthread_cond_broadcast(&defrag->parallel_migration_cond); + goto out; + } + + pthread_mutex_lock(&defrag->dfq_mutex); + { + /*Throttle down: + If the reconfigured count is less than current thread + count, then the current thread will sleep */ + + /*TODO: Need to refactor the following block to work + *under defrag->lock. For now access + * defrag->current_thread_count and rthcount under + * dfq_mutex lock */ + while (!defrag->crawl_done && (defrag->recon_thread_count < + defrag->current_thread_count)) { + defrag->current_thread_count--; + gf_msg_debug("DHT", 0, + "Thread sleeping. " + "current thread count: %d", + defrag->current_thread_count); + + pthread_cond_wait(&defrag->df_wakeup_thread, + &defrag->dfq_mutex); + + defrag->current_thread_count++; + gf_msg_debug("DHT", 0, + "Thread wokeup. " + "current thread count: %d", + defrag->current_thread_count); + } + + if (defrag->q_entry_count) { + iterator = list_entry(q_head->next, typeof(*iterator), list); + + gf_msg_debug("DHT", 0, + "picking entry " + "%s", + iterator->df_entry->d_name); + + list_del_init(&(iterator->list)); + + defrag->q_entry_count--; + + if ((defrag->q_entry_count < MIN_MIGRATE_QUEUE_COUNT) && + defrag->wakeup_crawler) { + pthread_cond_broadcast(&defrag->rebalance_crawler_alarm); + } + pthread_mutex_unlock(&defrag->dfq_mutex); + ret = gf_defrag_migrate_single_file((void *)iterator); + /*Critical errors: ENOTCONN and ENOSPACE*/ + if (ret) { + dht_set_global_defrag_error(defrag, ret); + + defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; + + pthread_cond_broadcast(&defrag->rebalance_crawler_alarm); + + pthread_cond_broadcast(&defrag->parallel_migration_cond); + + goto out; + } + + gf_defrag_free_container(iterator); + + continue; + } else { + /* defrag->crawl_done flag is set means crawling + file system is done and hence a list_empty when + the above flag is set indicates there are no more + entries to be added to the queue and rebalance is + finished */ + + if (!defrag->crawl_done) { + defrag->current_thread_count--; + gf_msg_debug("DHT", 0, + "Thread " + "sleeping while waiting " + "for migration entries. " + "current thread count:%d", + defrag->current_thread_count); + + pthread_cond_wait(&defrag->parallel_migration_cond, + &defrag->dfq_mutex); + } + + if (defrag->crawl_done && !defrag->q_entry_count) { + defrag->current_thread_count++; + gf_msg_debug("DHT", 0, "Exiting thread"); + + pthread_cond_broadcast(&defrag->parallel_migration_cond); + goto unlock; + } else { + defrag->current_thread_count++; + gf_msg_debug("DHT", 0, + "Thread woke up" + " as found migrating entries. " + "current thread count:%d", + defrag->current_thread_count); + + pthread_mutex_unlock(&defrag->dfq_mutex); + continue; + } + } + } + unlock: + pthread_mutex_unlock(&defrag->dfq_mutex); + break; + } +out: + return NULL; } +int static gf_defrag_get_entry(xlator_t *this, int i, + struct dht_container **container, loc_t *loc, + dht_conf_t *conf, gf_defrag_info_t *defrag, + fd_t *fd, dict_t *migrate_data, + struct dir_dfmeta *dir_dfmeta, dict_t *xattr_req, + int *perrno) +{ + int ret = 0; + char is_linkfile = 0; + gf_dirent_t *df_entry = NULL; + struct dht_container *tmp_container = NULL; + + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + ret = -1; + goto out; + } + + if (dir_dfmeta->offset_var[i].readdir_done == 1) { + ret = 0; + goto out; + } + + if (dir_dfmeta->fetch_entries[i] == 1) { + if (!fd) { + dir_dfmeta->fetch_entries[i] = 0; + dir_dfmeta->offset_var[i].readdir_done = 1; + ret = 0; + goto out; + } + + ret = syncop_readdirp(conf->local_subvols[i], fd, 131072, + dir_dfmeta->offset_var[i].offset, + &(dir_dfmeta->equeue[i]), xattr_req, NULL); + if (ret == 0) { + dir_dfmeta->offset_var[i].readdir_done = 1; + ret = 0; + goto out; + } + + if (ret < 0) { + gf_msg(this->name, GF_LOG_WARNING, -ret, + DHT_MSG_MIGRATE_DATA_FAILED, + "Readdirp failed. Aborting data migration for " + "directory: %s", + loc->path); + *perrno = -ret; + ret = -1; + goto out; + } + + if (list_empty(&(dir_dfmeta->equeue[i].list))) { + dir_dfmeta->offset_var[i].readdir_done = 1; + ret = 0; + goto out; + } + + dir_dfmeta->fetch_entries[i] = 0; + } + + while (1) { + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + ret = -1; + goto out; + } + + df_entry = list_entry(dir_dfmeta->iterator[i]->next, typeof(*df_entry), + list); + + if (&df_entry->list == dir_dfmeta->head[i]) { + gf_dirent_free(&(dir_dfmeta->equeue[i])); + INIT_LIST_HEAD(&(dir_dfmeta->equeue[i].list)); + dir_dfmeta->fetch_entries[i] = 1; + dir_dfmeta->iterator[i] = dir_dfmeta->head[i]; + ret = 0; + goto out; + } + + dir_dfmeta->iterator[i] = dir_dfmeta->iterator[i]->next; + + dir_dfmeta->offset_var[i].offset = df_entry->d_off; + if (!strcmp(df_entry->d_name, ".") || !strcmp(df_entry->d_name, "..")) + continue; + + if (IA_ISDIR(df_entry->d_stat.ia_type)) { + defrag->size_processed += df_entry->d_stat.ia_size; + continue; + } + + defrag->num_files_lookedup++; + + if (defrag->defrag_pattern && + (gf_defrag_pattern_match(defrag, df_entry->d_name, + df_entry->d_stat.ia_size) == _gf_false)) { + defrag->size_processed += df_entry->d_stat.ia_size; + continue; + } + + is_linkfile = check_is_linkfile(NULL, &df_entry->d_stat, df_entry->dict, + conf->link_xattr_name); + + if (is_linkfile) { + /* No need to add linkto file to the queue for + migration. Only the actual data file need to + be checked for migration criteria. + */ + + gf_msg_debug(this->name, 0, + "Skipping linkfile" + " %s on subvol: %s", + df_entry->d_name, conf->local_subvols[i]->name); + continue; + } + + /*Build Container Structure */ + + tmp_container = GF_CALLOC(1, sizeof(struct dht_container), + gf_dht_mt_container_t); + if (!tmp_container) { + gf_log(this->name, GF_LOG_ERROR, + "Failed to allocate " + "memory for container"); + ret = -1; + goto out; + } + tmp_container->df_entry = gf_dirent_for_name(df_entry->d_name); + if (!tmp_container->df_entry) { + gf_log(this->name, GF_LOG_ERROR, + "Failed to allocate " + "memory for df_entry"); + ret = -1; + goto out; + } + + tmp_container->local_subvol_index = i; + + tmp_container->df_entry->d_stat = df_entry->d_stat; + + tmp_container->df_entry->d_ino = df_entry->d_ino; + + tmp_container->df_entry->d_type = df_entry->d_type; + + tmp_container->df_entry->d_len = df_entry->d_len; + + tmp_container->parent_loc = GF_CALLOC(1, sizeof(*loc), gf_dht_mt_loc_t); + if (!tmp_container->parent_loc) { + gf_log(this->name, GF_LOG_ERROR, + "Failed to allocate " + "memory for loc"); + ret = -1; + goto out; + } + + ret = loc_copy(tmp_container->parent_loc, loc); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, "loc_copy failed"); + ret = -1; + goto out; + } + + tmp_container->migrate_data = migrate_data; + + tmp_container->this = this; + + if (df_entry->dict) + tmp_container->df_entry->dict = dict_ref(df_entry->dict); + + /*Build Container Structure >> END*/ + + ret = 0; + goto out; + } + +out: + if (ret == 0) { + *container = tmp_container; + } else { + if (tmp_container) { + gf_defrag_free_container(tmp_container); + } + } + + return ret; +} int -gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, - dict_t *fix_layout, dict_t *migrate_data) +gf_defrag_process_dir(xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, + dict_t *migrate_data, int *perrno) { - int ret = -1; - loc_t entry_loc = {0,}; - fd_t *fd = NULL; - gf_dirent_t entries; - gf_dirent_t *tmp = NULL; - gf_dirent_t *entry = NULL; - gf_boolean_t free_entries = _gf_false; - dict_t *dict = NULL; - off_t offset = 0; - struct iatt iatt = {0,}; - - ret = syncop_lookup (this, loc, NULL, &iatt, NULL, NULL); + int ret = -1; + dht_conf_t *conf = NULL; + gf_dirent_t entries; + dict_t *xattr_req = NULL; + struct timeval dir_start = { + 0, + }; + struct timeval end = { + 0, + }; + double elapsed = { + 0, + }; + int local_subvols_cnt = 0; + int i = 0; + int j = 0; + struct dht_container *container = NULL; + int ldfq_count = 0; + int dfc_index = 0; + int throttle_up = 0; + struct dir_dfmeta *dir_dfmeta = NULL; + xlator_t *old_THIS = NULL; + + gf_log(this->name, GF_LOG_INFO, "migrate data called on %s", loc->path); + gettimeofday(&dir_start, NULL); + + conf = this->private; + local_subvols_cnt = conf->local_subvols_cnt; + + if (!local_subvols_cnt) { + ret = 0; + goto out; + } + + old_THIS = THIS; + THIS = this; + + dir_dfmeta = GF_CALLOC(1, sizeof(*dir_dfmeta), gf_common_mt_pointer); + if (!dir_dfmeta) { + gf_log(this->name, GF_LOG_ERROR, "dir_dfmeta is NULL"); + ret = -1; + goto out; + } + + dir_dfmeta->lfd = GF_CALLOC(local_subvols_cnt, sizeof(fd_t *), + gf_common_mt_pointer); + if (!dir_dfmeta->lfd) { + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_INSUFF_MEMORY, + "for dir_dfmeta", NULL); + ret = -1; + *perrno = ENOMEM; + goto out; + } + + for (i = 0; i < local_subvols_cnt; i++) { + dir_dfmeta->lfd[i] = fd_create(loc->inode, defrag->pid); + if (!dir_dfmeta->lfd[i]) { + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_FD_CREATE_FAILED, + NULL); + *perrno = ENOMEM; + ret = -1; + goto out; + } + + ret = syncop_opendir(conf->local_subvols[i], loc, dir_dfmeta->lfd[i], + NULL, NULL); if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Lookup failed on %s", - loc->path); + fd_unref(dir_dfmeta->lfd[i]); + dir_dfmeta->lfd[i] = NULL; + gf_smsg(this->name, GF_LOG_WARNING, 0, DHT_MSG_FAILED_TO_OPEN, + "dir: %s", loc->path, "subvol: %s", + conf->local_subvols[i]->name, NULL); + + if (conf->decommission_in_progress) { + *perrno = -ret; + ret = -1; goto out; + } + } else { + fd_bind(dir_dfmeta->lfd[i]); } + } - if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) { - ret = gf_defrag_migrate_data (this, defrag, loc, migrate_data); - if (ret) - goto out; + dir_dfmeta->head = GF_CALLOC(local_subvols_cnt, sizeof(*(dir_dfmeta->head)), + gf_common_mt_pointer); + if (!dir_dfmeta->head) { + gf_log(this->name, GF_LOG_ERROR, "dir_dfmeta->head is NULL"); + ret = -1; + goto out; + } + + dir_dfmeta->iterator = GF_CALLOC(local_subvols_cnt, + sizeof(*(dir_dfmeta->iterator)), + gf_common_mt_pointer); + if (!dir_dfmeta->iterator) { + gf_log(this->name, GF_LOG_ERROR, "dir_dfmeta->iterator is NULL"); + ret = -1; + goto out; + } + + dir_dfmeta->equeue = GF_CALLOC(local_subvols_cnt, sizeof(entries), + gf_dht_mt_dirent_t); + if (!dir_dfmeta->equeue) { + gf_log(this->name, GF_LOG_ERROR, "dir_dfmeta->equeue is NULL"); + ret = -1; + goto out; + } + + dir_dfmeta->offset_var = GF_CALLOC( + local_subvols_cnt, sizeof(dht_dfoffset_ctx_t), gf_dht_mt_octx_t); + if (!dir_dfmeta->offset_var) { + gf_log(this->name, GF_LOG_ERROR, "dir_dfmeta->offset_var is NULL"); + ret = -1; + goto out; + } + + ret = gf_defrag_ctx_subvols_init(dir_dfmeta->offset_var, this); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "dht_dfoffset_ctx_t" + "initialization failed"); + ret = -1; + goto out; + } + + dir_dfmeta->fetch_entries = GF_CALLOC(local_subvols_cnt, sizeof(int), + gf_common_mt_int); + if (!dir_dfmeta->fetch_entries) { + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, DHT_MSG_INSUFF_MEMORY, + "for dir_dfmeta->fetch_entries", NULL); + ret = -1; + goto out; + } + + for (i = 0; i < local_subvols_cnt; i++) { + INIT_LIST_HEAD(&(dir_dfmeta->equeue[i].list)); + dir_dfmeta->head[i] = &(dir_dfmeta->equeue[i].list); + dir_dfmeta->iterator[i] = dir_dfmeta->head[i]; + dir_dfmeta->fetch_entries[i] = 1; + } + + xattr_req = dict_new(); + if (!xattr_req) { + gf_log(this->name, GF_LOG_ERROR, "dict_new failed"); + ret = -1; + goto out; + } + + ret = dict_set_uint32(xattr_req, conf->link_xattr_name, 256); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "failed to set dict for " + "key: %s", + conf->link_xattr_name); + ret = -1; + goto out; + } + + /* + Job: Read entries from each local subvol and store the entries + in equeue array of linked list. Now pick one entry from the + equeue array in a round robin basis and add them to defrag Queue. + */ + + while (!dht_dfreaddirp_done(dir_dfmeta->offset_var, local_subvols_cnt)) { + pthread_mutex_lock(&defrag->dfq_mutex); + { + /*Throttle up: If reconfigured count is higher than + current thread count, wake up the sleeping threads + TODO: Need to refactor this. Instead of making the + thread sleep and wake, we should terminate and spawn + threads on-demand*/ + + if (defrag->recon_thread_count > defrag->current_thread_count) { + throttle_up = (defrag->recon_thread_count - + defrag->current_thread_count); + for (j = 0; j < throttle_up; j++) { + pthread_cond_signal(&defrag->df_wakeup_thread); + } + } + + while (defrag->q_entry_count > MAX_MIGRATE_QUEUE_COUNT) { + defrag->wakeup_crawler = 1; + pthread_cond_wait(&defrag->rebalance_crawler_alarm, + &defrag->dfq_mutex); + } + + ldfq_count = defrag->q_entry_count; + + if (defrag->wakeup_crawler) { + defrag->wakeup_crawler = 0; + } } + pthread_mutex_unlock(&defrag->dfq_mutex); - gf_log (this->name, GF_LOG_TRACE, "fix layout called on %s", loc->path); + while ( + ldfq_count <= MAX_MIGRATE_QUEUE_COUNT && + !dht_dfreaddirp_done(dir_dfmeta->offset_var, local_subvols_cnt)) { + ret = gf_defrag_get_entry(this, dfc_index, &container, loc, conf, + defrag, dir_dfmeta->lfd[dfc_index], + migrate_data, dir_dfmeta, xattr_req, + perrno); + + if (defrag->defrag_status == GF_DEFRAG_STATUS_STOPPED) { + goto out; + } + + if (ret) { + gf_log(this->name, GF_LOG_WARNING, + "Found " + "error from gf_defrag_get_entry"); - fd = fd_create (loc->inode, defrag->pid); - if (!fd) { - gf_log (this->name, GF_LOG_ERROR, "Failed to create fd"); ret = -1; goto out; + } + + /* Check if we got an entry, else we need to move the + index to the next subvol */ + if (!container) { + GF_CRAWL_INDEX_MOVE(dfc_index, local_subvols_cnt); + continue; + } + + /* Q this entry in the dfq */ + pthread_mutex_lock(&defrag->dfq_mutex); + { + list_add_tail(&container->list, &(defrag->queue[0].list)); + defrag->q_entry_count++; + ldfq_count = defrag->q_entry_count; + + gf_msg_debug(this->name, 0, + "added " + "file:%s parent:%s to the queue ", + container->df_entry->d_name, + container->parent_loc->path); + + pthread_cond_signal(&defrag->parallel_migration_cond); + } + pthread_mutex_unlock(&defrag->dfq_mutex); + + GF_CRAWL_INDEX_MOVE(dfc_index, local_subvols_cnt); } + } + + gettimeofday(&end, NULL); + elapsed = gf_tvdiff(&dir_start, &end); + gf_log(this->name, GF_LOG_INFO, + "Migration operation on dir %s took " + "%.2f secs", + loc->path, elapsed / 1e6); + ret = 0; +out: + THIS = old_THIS; + gf_defrag_free_dir_dfmeta(dir_dfmeta, local_subvols_cnt); + + if (xattr_req) + dict_unref(xattr_req); + + /* It does not matter if it errored out - this number is + * used to calculate rebalance estimated time to complete. + * No locking required as dirs are processed by a single thread. + */ + defrag->num_dirs_processed++; + return ret; +} - ret = syncop_opendir (this, loc, fd); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Failed to open dir %s", - loc->path); - ret = -1; +int +gf_defrag_settle_hash(xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, + dict_t *fix_layout) +{ + int ret; + dht_conf_t *conf = NULL; + /* + * Now we're ready to update the directory commit hash for the volume + * root, so that hash miscompares and broadcast lookups can stop. + * However, we want to skip that if fix-layout is all we did. In + * that case, we want the miscompares etc. to continue until a real + * rebalance is complete. + */ + if (defrag->cmd == GF_DEFRAG_CMD_START_LAYOUT_FIX || + defrag->cmd == GF_DEFRAG_CMD_DETACH_START) { + return 0; + } + + conf = this->private; + if (!conf) { + /*Uh oh + */ + return -1; + } + + if (conf->local_subvols_cnt == 0 || !conf->lookup_optimize) { + /* Commit hash updates are only done on local subvolumes and + * only when lookup optimization is needed (for older client + * support) + */ + return 0; + } + + ret = dict_set_uint32(fix_layout, "new-commit-hash", + defrag->new_commit_hash); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, "Failed to set new-commit-hash"); + return -1; + } + + ret = syncop_setxattr(this, loc, fix_layout, 0, NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_LAYOUT_FIX_FAILED, + "fix layout on %s failed", loc->path); + + if (-ret == ENOENT || -ret == ESTALE) { + /* Dir most likely is deleted */ + return 0; + } + + return -1; + } + + /* TBD: find more efficient solution than adding/deleting every time */ + dict_del(fix_layout, "new-commit-hash"); + + return 0; +} + +int +gf_defrag_fix_layout(xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, + dict_t *fix_layout, dict_t *migrate_data) +{ + int ret = -1; + loc_t entry_loc = { + 0, + }; + fd_t *fd = NULL; + gf_dirent_t entries; + gf_dirent_t *tmp = NULL; + gf_dirent_t *entry = NULL; + gf_boolean_t free_entries = _gf_false; + off_t offset = 0; + struct iatt iatt = { + 0, + }; + inode_t *linked_inode = NULL, *inode = NULL; + dht_conf_t *conf = NULL; + int perrno = 0; + + conf = this->private; + if (!conf) { + ret = -1; + goto out; + } + + ret = syncop_lookup(this, loc, &iatt, NULL, NULL, NULL); + if (ret) { + if (strcmp(loc->path, "/") == 0) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_DIR_LOOKUP_FAILED, + "lookup failed for:%s", loc->path); + + defrag->total_failures++; + ret = -1; + goto out; + } + + if (-ret == ENOENT || -ret == ESTALE) { + gf_msg(this->name, GF_LOG_INFO, -ret, DHT_MSG_DIR_LOOKUP_FAILED, + "Dir:%s renamed or removed. Skipping", loc->path); + if (conf->decommission_subvols_cnt) { + defrag->total_failures++; + } + ret = 0; + goto out; + } else { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_DIR_LOOKUP_FAILED, + "lookup failed for:%s", loc->path); + + defrag->total_failures++; + goto out; + } + } + + fd = fd_create(loc->inode, defrag->pid); + if (!fd) { + gf_log(this->name, GF_LOG_ERROR, "Failed to create fd"); + ret = -1; + goto out; + } + + ret = syncop_opendir(this, loc, fd, NULL, NULL); + if (ret) { + if (-ret == ENOENT || -ret == ESTALE) { + if (conf->decommission_subvols_cnt) { + defrag->total_failures++; + } + ret = 0; + goto out; + } + + gf_log(this->name, GF_LOG_ERROR, + "Failed to open dir %s, " + "err:%d", + loc->path, -ret); + + ret = -1; + goto out; + } + + fd_bind(fd); + INIT_LIST_HEAD(&entries.list); + + while ((ret = syncop_readdirp(this, fd, 131072, offset, &entries, NULL, + NULL)) != 0) { + if (ret < 0) { + if (-ret == ENOENT || -ret == ESTALE) { + if (conf->decommission_subvols_cnt) { + defrag->total_failures++; + } + ret = 0; goto out; + } + + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_READDIR_ERROR, + "readdirp failed for " + "path %s. Aborting fix-layout", + loc->path); + + ret = -1; + goto out; } - INIT_LIST_HEAD (&entries.list); - while ((ret = syncop_readdirp (this, fd, 131072, offset, NULL, - &entries)) != 0) + if (list_empty(&entries.list)) + break; + + free_entries = _gf_true; + + list_for_each_entry_safe(entry, tmp, &entries.list, list) { - if ((ret < 0) || (ret && (errno == ENOENT))) - break; - free_entries = _gf_true; + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + ret = 1; + goto out; + } - if (list_empty (&entries.list)) - break; - list_for_each_entry_safe (entry, tmp, &entries.list, list) { - if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { - ret = 1; - goto out; - } + offset = entry->d_off; - offset = entry->d_off; + if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, "..")) + continue; + if (!IA_ISDIR(entry->d_stat.ia_type)) { + continue; + } + loc_wipe(&entry_loc); - if (!strcmp (entry->d_name, ".") || - !strcmp (entry->d_name, "..")) - continue; + ret = dht_build_child_loc(this, &entry_loc, loc, entry->d_name); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "Child loc" + " build failed for entry: %s", + entry->d_name); - if (!IA_ISDIR (entry->d_stat.ia_type)) - continue; + if (conf->decommission_in_progress) { + defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; - loc_wipe (&entry_loc); - ret =dht_build_child_loc (this, &entry_loc, loc, - entry->d_name); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Child loc" - " build failed"); - goto out; - } + goto out; + } else { + continue; + } + } + + if (gf_uuid_is_null(entry->d_stat.ia_gfid)) { + gf_log(this->name, GF_LOG_ERROR, + "%s/%s" + " gfid not present", + loc->path, entry->d_name); + continue; + } + + gf_uuid_copy(entry_loc.gfid, entry->d_stat.ia_gfid); + + /*In case the gfid stored in the inode by inode_link + * and the gfid obtained in the lookup differs, then + * client3_3_lookup_cbk will return ESTALE and proper + * error will be captured + */ + + linked_inode = inode_link(entry_loc.inode, loc->inode, + entry->d_name, &entry->d_stat); + + inode = entry_loc.inode; + entry_loc.inode = linked_inode; + inode_unref(inode); + + if (gf_uuid_is_null(loc->gfid)) { + gf_log(this->name, GF_LOG_ERROR, + "%s/%s" + " gfid not present", + loc->path, entry->d_name); + continue; + } + + gf_uuid_copy(entry_loc.pargfid, loc->gfid); + + ret = syncop_lookup(this, &entry_loc, &iatt, NULL, NULL, NULL); + if (ret) { + if (-ret == ENOENT || -ret == ESTALE) { + gf_msg(this->name, GF_LOG_INFO, -ret, + DHT_MSG_DIR_LOOKUP_FAILED, + "Dir:%s renamed or removed. " + "Skipping", + loc->path); + ret = 0; + if (conf->decommission_subvols_cnt) { + defrag->total_failures++; + } + continue; + } else { + gf_msg(this->name, GF_LOG_ERROR, -ret, + DHT_MSG_DIR_LOOKUP_FAILED, "lookup failed for:%s", + entry_loc.path); - if (uuid_is_null (entry->d_stat.ia_gfid)) { - gf_log (this->name, GF_LOG_ERROR, "%s/%s" - "gfid not present", loc->path, - entry->d_name); - continue; - } + defrag->total_failures++; - entry_loc.inode->ia_type = entry->d_stat.ia_type; + if (conf->decommission_in_progress) { + defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; + ret = -1; + goto out; + } else { + continue; + } + } + } - uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid); - if (uuid_is_null (loc->gfid)) { - gf_log (this->name, GF_LOG_ERROR, "%s/%s" - "gfid not present", loc->path, - entry->d_name); - continue; - } + /* A return value of 2 means, either process_dir or + * lookup of a dir failed. Hence, don't commit hash + * for the current directory*/ - uuid_copy (entry_loc.pargfid, loc->gfid); + ret = gf_defrag_fix_layout(this, defrag, &entry_loc, fix_layout, + migrate_data); - ret = syncop_lookup (this, &entry_loc, NULL, &iatt, - NULL, NULL); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s" - " lookup failed", entry_loc.path); - continue; - } + if (defrag->defrag_status == GF_DEFRAG_STATUS_STOPPED || + defrag->defrag_status == GF_DEFRAG_STATUS_FAILED) { + goto out; + } - ret = syncop_setxattr (this, &entry_loc, fix_layout, - 0); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Setxattr " - "failed for %s", entry_loc.path); - defrag->defrag_status = - GF_DEFRAG_STATUS_FAILED; - goto out; - } - ret = gf_defrag_fix_layout (this, defrag, &entry_loc, - fix_layout, migrate_data); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_LAYOUT_FIX_FAILED, + "Fix layout failed for %s", entry_loc.path); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Fix layout " - "failed for %s", entry_loc.path); - goto out; - } + defrag->total_failures++; + if (conf->decommission_in_progress) { + defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; + + goto out; + } else { + /* Let's not commit-hash if + * gf_defrag_fix_layout failed*/ + continue; } - gf_dirent_free (&entries); - free_entries = _gf_false; - INIT_LIST_HEAD (&entries.list); + } } - ret = 0; + gf_dirent_free(&entries); + free_entries = _gf_false; + INIT_LIST_HEAD(&entries.list); + } + + /* A directory layout is fixed only after its subdirs are healed to + * any newly added bricks. If the layout is fixed before subdirs are + * healed, the newly added brick will get a non-null layout. + * Any subdirs which hash to that layout will no longer show up + * in a directory listing until they are healed. + */ + + ret = syncop_setxattr(this, loc, fix_layout, 0, NULL, NULL); + + /* In case of a race where the directory is deleted just before + * layout setxattr, the errors are updated in the layout structure. + * We can use this information to make a decision whether the directory + * is deleted entirely. + */ + if (ret == 0) { + ret = dht_dir_layout_error_check(this, loc->inode); + ret = -ret; + } + + if (ret) { + if (-ret == ENOENT || -ret == ESTALE) { + gf_msg(this->name, GF_LOG_INFO, -ret, DHT_MSG_LAYOUT_FIX_FAILED, + "Setxattr failed. Dir %s " + "renamed or removed", + loc->path); + if (conf->decommission_subvols_cnt) { + defrag->total_failures++; + } + ret = 0; + goto out; + } else { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_LAYOUT_FIX_FAILED, + "Setxattr failed for %s", loc->path); + + defrag->total_failures++; + + if (conf->decommission_in_progress) { + defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; + ret = -1; + goto out; + } + } + } + + if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) { + ret = gf_defrag_process_dir(this, defrag, loc, migrate_data, &perrno); + + if (ret) { + if (perrno == ENOENT || perrno == ESTALE) { + ret = 0; + goto out; + } else { + defrag->total_failures++; + + gf_msg(this->name, GF_LOG_ERROR, 0, + DHT_MSG_DEFRAG_PROCESS_DIR_FAILED, + "gf_defrag_process_dir failed for " + "directory: %s", + loc->path); + + if (conf->decommission_in_progress) { + goto out; + } + } + } + } + + gf_msg_trace(this->name, 0, "fix layout called on %s", loc->path); + + if (gf_defrag_settle_hash(this, defrag, loc, fix_layout) != 0) { + defrag->total_failures++; + + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_SETTLE_HASH_FAILED, + "Settle hash failed for %s", loc->path); + + ret = -1; + + if (conf->decommission_in_progress) { + defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; + goto out; + } + } + + ret = 0; out: - if (free_entries) - gf_dirent_free (&entries); + if (free_entries) + gf_dirent_free(&entries); - loc_wipe (&entry_loc); + loc_wipe(&entry_loc); - if (dict) - dict_unref(dict); + if (fd) + fd_unref(fd); - if (fd) - fd_unref (fd); + return ret; +} +int +dht_init_local_subvols_and_nodeuuids(xlator_t *this, dht_conf_t *conf, + loc_t *loc) +{ + dict_t *dict = NULL; + uuid_t *uuid_ptr = NULL; + int ret = -1; + int i = 0; + int j = 0; + + /* Find local subvolumes */ + ret = syncop_getxattr(this, loc, &dict, GF_REBAL_FIND_LOCAL_SUBVOL, NULL, + NULL); + if (ret && (ret != -ENODATA)) { + gf_msg(this->name, GF_LOG_ERROR, -ret, 0, + "local " + "subvolume determination failed with error: %d", + -ret); + ret = -1; + goto out; + } + + if (!ret) + goto out; + + ret = syncop_getxattr(this, loc, &dict, GF_REBAL_OLD_FIND_LOCAL_SUBVOL, + NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, 0, + "local " + "subvolume determination failed with error: %d", + -ret); + ret = -1; + goto out; + } + ret = 0; + +out: + if (ret) { return ret; + } + + for (i = 0; i < conf->local_subvols_cnt; i++) { + gf_msg(this->name, GF_LOG_INFO, 0, 0, + "local subvol: " + "%s", + conf->local_subvols[i]->name); + + for (j = 0; j < conf->local_nodeuuids[i].count; j++) { + uuid_ptr = &(conf->local_nodeuuids[i].elements[j].uuid); + gf_msg(this->name, GF_LOG_INFO, 0, 0, "node uuid : %s", + uuid_utoa(*uuid_ptr)); + } + } + return ret; } +/* Functions for the rebalance estimates feature */ -int -gf_defrag_start_crawl (void *data) +uint64_t +gf_defrag_subvol_file_size(xlator_t *this, loc_t *root_loc) { - xlator_t *this = NULL; - dht_conf_t *conf = NULL; - gf_defrag_info_t *defrag = NULL; - int ret = -1; - loc_t loc = {0,}; - struct iatt iatt = {0,}; - struct iatt parent = {0,}; - dict_t *fix_layout = NULL; - dict_t *migrate_data = NULL; - - this = data; - if (!this) - goto out; + int ret = -1; + struct statvfs buf = { + 0, + }; + + ret = syncop_statfs(this, root_loc, &buf, NULL, NULL); + if (ret) { + /* Aargh! */ + return 0; + } + return ((buf.f_blocks - buf.f_bfree) * buf.f_frsize); +} - conf = this->private; - if (!conf) - goto out; +uint64_t +gf_defrag_total_file_size(xlator_t *this, loc_t *root_loc) +{ + dht_conf_t *conf = NULL; + int i = 0; + uint64_t size_files = 0; + uint64_t total_size = 0; - defrag = conf->defrag; - if (!defrag) - goto out; - dht_build_root_inode (this, &defrag->root_inode); - if (!defrag->root_inode) - goto out; + conf = this->private; + if (!conf) { + return 0; + } + + for (i = 0; i < conf->local_subvols_cnt; i++) { + size_files = gf_defrag_subvol_file_size(conf->local_subvols[i], + root_loc); + total_size += size_files; + gf_msg(this->name, GF_LOG_INFO, 0, 0, + "local subvol: %s," + "cnt = %" PRIu64, + conf->local_subvols[i]->name, size_files); + } + + gf_msg(this->name, GF_LOG_INFO, 0, 0, "Total size files = %" PRIu64, + total_size); + + return total_size; +} + +static void * +dht_file_counter_thread(void *args) +{ + gf_defrag_info_t *defrag = NULL; + loc_t root_loc = { + 0, + }; + struct timespec time_to_wait = { + 0, + }; + uint64_t tmp_size = 0; + + if (!args) + return NULL; - dht_build_root_loc (defrag->root_inode, &loc); + defrag = (gf_defrag_info_t *)args; + dht_build_root_loc(defrag->root_inode, &root_loc); - /* fix-layout on '/' first */ + while (defrag->defrag_status == GF_DEFRAG_STATUS_STARTED) { + timespec_now(&time_to_wait); + time_to_wait.tv_sec += 600; - ret = syncop_lookup (this, &loc, NULL, &iatt, NULL, &parent); + pthread_mutex_lock(&defrag->fc_mutex); + pthread_cond_timedwait(&defrag->fc_wakeup_cond, &defrag->fc_mutex, + &time_to_wait); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "look up on / failed"); - goto out; + pthread_mutex_unlock(&defrag->fc_mutex); + + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) + break; + + tmp_size = gf_defrag_total_file_size(defrag->this, &root_loc); + + gf_log("dht", GF_LOG_INFO, "tmp data size =%" PRIu64, tmp_size); + + if (!tmp_size) { + gf_msg("dht", GF_LOG_ERROR, 0, 0, + "Failed to get " + "the total data size. Unable to estimate " + "time to complete rebalance."); + } else { + g_totalsize = tmp_size; + gf_msg_debug("dht", 0, "total data size =%" PRIu64, g_totalsize); } + } - fix_layout = dict_new (); - if (!fix_layout) { - ret = -1; - goto out; + return NULL; +} + +int +gf_defrag_estimates_cleanup(xlator_t *this, gf_defrag_info_t *defrag, + pthread_t filecnt_thread) +{ + int ret = -1; + + /* Wake up the filecounter thread. + * By now the defrag status will no longer be + * GF_DEFRAG_STATUS_STARTED so the thread will exit the loop. + */ + pthread_mutex_lock(&defrag->fc_mutex); + { + pthread_cond_broadcast(&defrag->fc_wakeup_cond); + } + pthread_mutex_unlock(&defrag->fc_mutex); + + ret = pthread_join(filecnt_thread, NULL); + if (ret) { + gf_msg("dht", GF_LOG_ERROR, ret, 0, + "file_counter_thread: pthread_join failed."); + ret = -1; + } + return ret; +} + +int +gf_defrag_estimates_init(xlator_t *this, loc_t *loc, pthread_t *filecnt_thread) +{ + int ret = -1; + dht_conf_t *conf = NULL; + gf_defrag_info_t *defrag = NULL; + + conf = this->private; + defrag = conf->defrag; + + g_totalsize = gf_defrag_total_file_size(this, loc); + if (!g_totalsize) { + gf_msg(this->name, GF_LOG_ERROR, 0, 0, + "Failed to get " + "the total data size. Unable to estimate " + "time to complete rebalance."); + goto out; + } + + ret = gf_thread_create(filecnt_thread, NULL, dht_file_counter_thread, + (void *)defrag, "dhtfcnt"); + + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, ret, 0, + "Failed to " + "create the file counter thread "); + ret = -1; + goto out; + } + ret = 0; +out: + return ret; +} + +/* Init and cleanup functions for parallel file migration*/ +int +gf_defrag_parallel_migration_init(xlator_t *this, gf_defrag_info_t *defrag, + pthread_t **tid_array, int *thread_index) +{ + int ret = -1; + int thread_spawn_count = 0; + int index = 0; + pthread_t *tid = NULL; + + if (!defrag) + goto out; + + /* Initialize global entry queue */ + defrag->queue = GF_CALLOC(1, sizeof(struct dht_container), + gf_dht_mt_container_t); + + if (!defrag->queue) { + gf_msg(this->name, GF_LOG_ERROR, ENOMEM, 0, + "Failed to initialise migration queue"); + ret = -1; + goto out; + } + + INIT_LIST_HEAD(&(defrag->queue[0].list)); + + thread_spawn_count = MAX(MAX_REBAL_THREADS, 4); + + gf_msg_debug(this->name, 0, "thread_spawn_count: %d", thread_spawn_count); + + tid = GF_CALLOC(thread_spawn_count, sizeof(pthread_t), + gf_common_mt_pthread_t); + if (!tid) { + gf_msg(this->name, GF_LOG_ERROR, ENOMEM, 0, + "Failed to create migration threads"); + ret = -1; + goto out; + } + defrag->current_thread_count = thread_spawn_count; + + /*Spawn Threads Here*/ + while (index < thread_spawn_count) { + ret = gf_thread_create(&(tid[index]), NULL, gf_defrag_task, + (void *)defrag, "dhtmig%d", (index + 1) & 0x3ff); + if (ret != 0) { + gf_msg("DHT", GF_LOG_ERROR, ret, 0, "Thread[%d] creation failed. ", + index); + ret = -1; + goto out; + } else { + gf_log("DHT", GF_LOG_INFO, + "Thread[%d] " + "creation successful", + index); } + index++; + } + + ret = 0; +out: + *thread_index = index; + *tid_array = tid; + + return ret; +} + +int +gf_defrag_parallel_migration_cleanup(gf_defrag_info_t *defrag, + pthread_t *tid_array, int thread_index) +{ + int ret = -1; + int i = 0; + + if (!defrag) + goto out; + + /* Wake up all migration threads */ + pthread_mutex_lock(&defrag->dfq_mutex); + { + defrag->crawl_done = 1; + + pthread_cond_broadcast(&defrag->parallel_migration_cond); + pthread_cond_broadcast(&defrag->df_wakeup_thread); + } + pthread_mutex_unlock(&defrag->dfq_mutex); + + /*Wait for all the threads to complete their task*/ + for (i = 0; i < thread_index; i++) { + pthread_join(tid_array[i], NULL); + } + + GF_FREE(tid_array); + + /* Cleanup the migration queue */ + if (defrag->queue) { + gf_dirent_free(defrag->queue[0].df_entry); + INIT_LIST_HEAD(&(defrag->queue[0].list)); + } + + GF_FREE(defrag->queue); + + ret = 0; +out: + return ret; +} + +int +gf_defrag_start_crawl(void *data) +{ + xlator_t *this = NULL; + dht_conf_t *conf = NULL; + gf_defrag_info_t *defrag = NULL; + dict_t *fix_layout = NULL; + dict_t *migrate_data = NULL; + dict_t *status = NULL; + glusterfs_ctx_t *ctx = NULL; + call_frame_t *statfs_frame = NULL; + xlator_t *old_THIS = NULL; + int ret = -1; + loc_t loc = { + 0, + }; + struct iatt iatt = { + 0, + }; + struct iatt parent = { + 0, + }; + int thread_index = 0; + pthread_t *tid = NULL; + pthread_t filecnt_thread; + gf_boolean_t fc_thread_started = _gf_false; + + this = data; + if (!this) + goto exit; + + ctx = this->ctx; + if (!ctx) + goto exit; + + conf = this->private; + if (!conf) + goto exit; + + defrag = conf->defrag; + if (!defrag) + goto exit; + + defrag->start_time = gf_time(); + + dht_build_root_inode(this, &defrag->root_inode); + if (!defrag->root_inode) + goto out; + + dht_build_root_loc(defrag->root_inode, &loc); + + /* fix-layout on '/' first */ + + ret = syncop_lookup(this, &loc, &iatt, &parent, NULL, NULL); + + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_REBALANCE_START_FAILED, + "Failed to start rebalance: look up on / failed"); + ret = -1; + goto out; + } + + old_THIS = THIS; + THIS = this; + + statfs_frame = create_frame(this, this->ctx->pool); + if (!statfs_frame) { + gf_msg(this->name, GF_LOG_ERROR, DHT_MSG_NO_MEMORY, ENOMEM, + "Insufficient memory. Frame creation failed"); + ret = -1; + goto out; + } + + /* async statfs update for honoring min-free-disk */ + dht_get_du_info(statfs_frame, this, &loc); + THIS = old_THIS; + + fix_layout = dict_new(); + if (!fix_layout) { + ret = -1; + goto out; + } + + /* + * Unfortunately, we can't do special xattrs (like fix.layout) and + * real ones in the same call currently, and changing it seems + * riskier than just doing two calls. + */ + + gf_log(this->name, GF_LOG_INFO, "%s using commit hash %u", __func__, + conf->vol_commit_hash); + + ret = dict_set_uint32(fix_layout, conf->commithash_xattr_name, + conf->vol_commit_hash); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, "Failed to set %s", + conf->commithash_xattr_name); + defrag->total_failures++; + ret = -1; + goto out; + } + + ret = syncop_setxattr(this, &loc, fix_layout, 0, NULL, NULL); + if (ret) { + gf_log(this->name, GF_LOG_ERROR, + "Failed to set commit hash on %s. " + "Rebalance cannot proceed.", + loc.path); + defrag->total_failures++; + ret = -1; + goto out; + } + + /* We now return to our regularly scheduled program. */ + + ret = dict_set_str(fix_layout, GF_XATTR_FIX_LAYOUT_KEY, "yes"); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_REBALANCE_START_FAILED, + "Failed to start rebalance:" + "Failed to set dictionary value: key = %s", + GF_XATTR_FIX_LAYOUT_KEY); + defrag->total_failures++; + ret = -1; + goto out; + } + + defrag->new_commit_hash = conf->vol_commit_hash; - ret = dict_set_str (fix_layout, GF_XATTR_FIX_LAYOUT_KEY, "yes"); + ret = syncop_setxattr(this, &loc, fix_layout, 0, NULL, NULL); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, -ret, DHT_MSG_REBALANCE_FAILED, + "fix layout on %s failed", loc.path); + defrag->total_failures++; + ret = -1; + goto out; + } + + if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) { + /* We need to migrate files */ + + migrate_data = dict_new(); + if (!migrate_data) { + defrag->total_failures++; + ret = -1; + goto out; + } + ret = dict_set_str( + migrate_data, GF_XATTR_FILE_MIGRATE_KEY, + (defrag->cmd == GF_DEFRAG_CMD_START_FORCE) ? "force" : "non-force"); if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Failed to set dict str"); - goto out; + defrag->total_failures++; + ret = -1; + goto out; } - ret = syncop_setxattr (this, &loc, fix_layout, 0); + ret = dht_init_local_subvols_and_nodeuuids(this, conf, &loc); if (ret) { - gf_log (this->name, GF_LOG_ERROR, "fix layout on %s failed", - loc.path); - goto out; + ret = -1; + goto out; } - if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) { - migrate_data = dict_new (); - if (!migrate_data) { - ret = -1; - goto out; - } - if (defrag->cmd == GF_DEFRAG_CMD_START_FORCE) - ret = dict_set_str (migrate_data, - "distribute.migrate-data", "force"); - else - ret = dict_set_str (migrate_data, - "distribute.migrate-data", - "non-force"); - if (ret) - goto out; + /* Initialise the structures required for parallel migration */ + ret = gf_defrag_parallel_migration_init(this, defrag, &tid, + &thread_index); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, 0, "Aborting rebalance."); + goto out; } - ret = gf_defrag_fix_layout (this, defrag, &loc, fix_layout, - migrate_data); -out: - LOCK (&defrag->lock); - { - gf_defrag_status_get (defrag, NULL); - defrag->is_exiting = 1; + ret = gf_defrag_estimates_init(this, &loc, &filecnt_thread); + if (ret) { + /* Not a fatal error. Allow the rebalance to proceed*/ + ret = 0; + } else { + fc_thread_started = _gf_true; } - UNLOCK (&defrag->lock); + } - if (defrag) - GF_FREE (defrag); + ret = gf_defrag_fix_layout(this, defrag, &loc, fix_layout, migrate_data); + if (ret) { + defrag->total_failures++; + ret = -1; + goto out; + } - return ret; -} + if (gf_defrag_settle_hash(this, defrag, &loc, fix_layout) != 0) { + defrag->total_failures++; + ret = -1; + goto out; + } + gf_log("DHT", GF_LOG_INFO, "crawling file-system completed"); +out: + + /* We are here means crawling the entire file system is done + or something failed. Set defrag->crawl_done flag to intimate + the migrator threads to exhaust the defrag->queue and terminate*/ + + if (ret) { + defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; + } + + gf_defrag_parallel_migration_cleanup(defrag, tid, thread_index); + + if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) && + (defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) { + defrag->defrag_status = GF_DEFRAG_STATUS_COMPLETE; + } + + if (fc_thread_started) { + gf_defrag_estimates_cleanup(this, defrag, filecnt_thread); + } + + dht_send_rebalance_event(this, defrag->cmd, defrag->defrag_status); + + status = dict_new(); + LOCK(&defrag->lock); + { + gf_defrag_status_get(conf, status); + if (ctx && ctx->notify) + ctx->notify(GF_EN_DEFRAG_STATUS, status); + if (status) + dict_unref(status); + defrag->is_exiting = 1; + } + UNLOCK(&defrag->lock); + + GF_FREE(defrag); + conf->defrag = NULL; + + if (migrate_data) + dict_unref(migrate_data); + + if (statfs_frame) { + STACK_DESTROY(statfs_frame->root); + } +exit: + return ret; +} static int -gf_defrag_done (int ret, call_frame_t *sync_frame, void *data) +gf_defrag_done(int ret, call_frame_t *sync_frame, void *data) { - gf_listener_stop(); + gf_listener_stop(sync_frame->this); - GF_FREE (data); - STACK_DESTROY (sync_frame->root); - kill (getpid(), SIGTERM); - return 0; + STACK_DESTROY(sync_frame->root); + kill(getpid(), SIGTERM); + return 0; } void * -gf_defrag_start (void *data) +gf_defrag_start(void *data) { - int ret = -1; - call_frame_t *frame = NULL; - dht_conf_t *conf = NULL; - gf_defrag_info_t *defrag = NULL; - xlator_t *this = NULL; - - this = data; - conf = this->private; - if (!conf) - goto out; + int ret = -1; + call_frame_t *frame = NULL; + dht_conf_t *conf = NULL; + gf_defrag_info_t *defrag = NULL; + xlator_t *this = NULL; + xlator_t *old_THIS = NULL; - defrag = conf->defrag; - if (!defrag) - goto out; + this = data; + conf = this->private; + if (!conf) + goto out; - frame = create_frame (this, this->ctx->pool); - if (!frame) - goto out; + defrag = conf->defrag; + if (!defrag) + goto out; + + frame = create_frame(this, this->ctx->pool); + if (!frame) + goto out; + + frame->root->pid = GF_CLIENT_PID_DEFRAG; - defrag->pid = frame->root->pid; + defrag->pid = frame->root->pid; - defrag->defrag_status = GF_DEFRAG_STATUS_STARTED; + defrag->defrag_status = GF_DEFRAG_STATUS_STARTED; - ret = synctask_new (this->ctx->env, gf_defrag_start_crawl, - gf_defrag_done, frame, this); + old_THIS = THIS; + THIS = this; + ret = synctask_new(this->ctx->env, gf_defrag_start_crawl, gf_defrag_done, + frame, this); - if (ret) - gf_log (this->name, GF_LOG_ERROR, "Could not create" - " task for rebalance"); + if (ret) + gf_msg(this->name, GF_LOG_ERROR, 0, DHT_MSG_REBALANCE_START_FAILED, + "Could not create task for rebalance"); + THIS = old_THIS; out: - return NULL; + return NULL; } -int -gf_defrag_status_get (gf_defrag_info_t *defrag, dict_t *dict) +uint64_t +gf_defrag_get_estimates_based_on_size(dht_conf_t *conf) { - int ret = 0; - uint64_t files = 0; - uint64_t size = 0; - uint64_t lookup = 0; - uint64_t failures = 0; + gf_defrag_info_t *defrag = NULL; + double rate_processed = 0; + uint64_t total_processed = 0; + uint64_t tmp_count = 0; + uint64_t time_to_complete = 0; + double elapsed = 0; - if (!defrag) - goto out; + defrag = conf->defrag; - ret = 0; - if (defrag->defrag_status == GF_DEFRAG_STATUS_NOT_STARTED) - goto out; + if (!g_totalsize) + goto out; - files = defrag->total_files; - size = defrag->total_data; - lookup = defrag->num_files_lookedup; - failures = defrag->total_failures; + elapsed = gf_time() - defrag->start_time; - if (!dict) - goto log; + /* Don't calculate the estimates for the first 10 minutes. + * It is unlikely to be accurate and estimates are not required + * if the process finishes in less than 10 mins. + */ - ret = dict_set_uint64 (dict, "files", files); - if (ret) - gf_log (THIS->name, GF_LOG_WARNING, - "failed to set file count"); + if (elapsed < ESTIMATE_START_INTERVAL) { + gf_msg(THIS->name, GF_LOG_INFO, 0, 0, + "Rebalance estimates will not be available for the " + "first %d seconds.", + ESTIMATE_START_INTERVAL); - ret = dict_set_uint64 (dict, "size", size); - if (ret) - gf_log (THIS->name, GF_LOG_WARNING, - "failed to set size of xfer"); + goto out; + } - ret = dict_set_uint64 (dict, "lookups", lookup); - if (ret) - gf_log (THIS->name, GF_LOG_WARNING, - "failed to set lookedup file count"); + total_processed = defrag->size_processed; - ret = dict_set_int32 (dict, "status", defrag->defrag_status); - if (ret) - gf_log (THIS->name, GF_LOG_WARNING, - "failed to set status"); + /* rate at which files processed */ + rate_processed = (total_processed) / elapsed; - ret = dict_set_uint64 (dict, "failures", failures); -log: - gf_log (THIS->name, GF_LOG_INFO, "Files migrated: %"PRIu64", size: %" - PRIu64", lookups: %"PRIu64", failures: %"PRIu64, files, size, - lookup, failures); + tmp_count = g_totalsize; + if (rate_processed) { + time_to_complete = (tmp_count) / rate_processed; + + } else { + gf_msg(THIS->name, GF_LOG_ERROR, 0, 0, + "Unable to calculate estimated time for rebalance"); + } + + gf_log(THIS->name, GF_LOG_INFO, + "TIME: (size) total_processed=%" PRIu64 " tmp_cnt = %" PRIu64 + "," + "rate_processed=%f, elapsed = %f", + total_processed, tmp_count, rate_processed, elapsed); out: - return 0; + return time_to_complete; } int -gf_defrag_stop (gf_defrag_info_t *defrag, dict_t *output) +gf_defrag_status_get(dht_conf_t *conf, dict_t *dict) { - /* TODO: set a variable 'stop_defrag' here, it should be checked - in defrag loop */ - int ret = -1; - GF_ASSERT (defrag); + int ret = 0; + uint64_t files = 0; + uint64_t size = 0; + uint64_t lookup = 0; + uint64_t failures = 0; + uint64_t skipped = 0; + char *status = ""; + double elapsed = 0; + uint64_t time_to_complete = 0; + uint64_t time_left = 0; + gf_defrag_info_t *defrag = conf->defrag; + + if (!defrag) + goto out; + + ret = 0; + if (defrag->defrag_status == GF_DEFRAG_STATUS_NOT_STARTED) + goto out; + + files = defrag->total_files; + size = defrag->total_data; + lookup = defrag->num_files_lookedup; + failures = defrag->total_failures; + skipped = defrag->skipped; + + elapsed = gf_time() - defrag->start_time; + + /* The rebalance is still in progress */ + + if (defrag->defrag_status == GF_DEFRAG_STATUS_STARTED) { + time_to_complete = gf_defrag_get_estimates_based_on_size(conf); + + if (time_to_complete && (time_to_complete > elapsed)) + time_left = time_to_complete - elapsed; + + gf_log(THIS->name, GF_LOG_INFO, + "TIME: Estimated total time to complete (size)= %" PRIu64 + " seconds, seconds left = %" PRIu64 "", + time_to_complete, time_left); + } + + if (!dict) + goto log; + + ret = dict_set_uint64(dict, "files", files); + if (ret) + gf_log(THIS->name, GF_LOG_WARNING, "failed to set file count"); + + ret = dict_set_uint64(dict, "size", size); + if (ret) + gf_log(THIS->name, GF_LOG_WARNING, "failed to set size of xfer"); + + ret = dict_set_uint64(dict, "lookups", lookup); + if (ret) + gf_log(THIS->name, GF_LOG_WARNING, "failed to set lookedup file count"); + + ret = dict_set_int32(dict, "status", defrag->defrag_status); + if (ret) + gf_log(THIS->name, GF_LOG_WARNING, "failed to set status"); + + ret = dict_set_double(dict, "run-time", elapsed); + if (ret) + gf_log(THIS->name, GF_LOG_WARNING, "failed to set run-time"); + + ret = dict_set_uint64(dict, "failures", failures); + if (ret) + gf_log(THIS->name, GF_LOG_WARNING, "failed to set failure count"); + + ret = dict_set_uint64(dict, "skipped", skipped); + if (ret) + gf_log(THIS->name, GF_LOG_WARNING, "failed to set skipped file count"); + + ret = dict_set_uint64(dict, "time-left", time_left); + if (ret) + gf_log(THIS->name, GF_LOG_WARNING, "failed to set time-left"); - if (defrag->defrag_status == GF_DEFRAG_STATUS_NOT_STARTED) { - goto out; - } +log: + switch (defrag->defrag_status) { + case GF_DEFRAG_STATUS_NOT_STARTED: + status = "not started"; + break; + case GF_DEFRAG_STATUS_STARTED: + status = "in progress"; + break; + case GF_DEFRAG_STATUS_STOPPED: + status = "stopped"; + break; + case GF_DEFRAG_STATUS_COMPLETE: + status = "completed"; + break; + case GF_DEFRAG_STATUS_FAILED: + status = "failed"; + break; + default: + break; + } + + gf_msg(THIS->name, GF_LOG_INFO, 0, DHT_MSG_REBALANCE_STATUS, + "Rebalance is %s. Time taken is %.2f secs", status, elapsed); + gf_msg(THIS->name, GF_LOG_INFO, 0, DHT_MSG_REBALANCE_STATUS, + "Files migrated: %" PRIu64 ", size: %" PRIu64 ", lookups: %" PRIu64 + ", failures: %" PRIu64 + ", skipped: " + "%" PRIu64, + files, size, lookup, failures, skipped); +out: + return 0; +} + +int +gf_defrag_stop(dht_conf_t *conf, gf_defrag_status_t status, dict_t *output) +{ + /* TODO: set a variable 'stop_defrag' here, it should be checked + in defrag loop */ + int ret = -1; + gf_defrag_info_t *defrag = conf->defrag; - defrag->defrag_status = GF_DEFRAG_STATUS_STOPPED; + GF_ASSERT(defrag); - if (output) - gf_defrag_status_get (defrag, output); - ret = 0; + if (defrag->defrag_status == GF_DEFRAG_STATUS_NOT_STARTED) { + goto out; + } + + gf_msg("", GF_LOG_INFO, 0, DHT_MSG_REBALANCE_STOPPED, + "Received stop command on rebalance"); + defrag->defrag_status = status; + + if (output) + gf_defrag_status_get(conf, output); + ret = 0; out: - gf_log ("glusterd", GF_LOG_DEBUG, "Returning %d", ret); - return ret; + gf_msg_debug("", 0, "Returning %d", ret); + return ret; } |
