diff options
-rw-r--r-- | libglusterfs/src/glusterfs.h | 1 | ||||
-rw-r--r-- | libglusterfs/src/mem-types.h | 2 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-common.c | 130 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 47 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-helper.c | 23 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-mem-types.h | 3 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-rebalance.c | 1140 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-shared.c | 26 |
8 files changed, 1124 insertions, 248 deletions
diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index e9b54535e66..5ca09151ae8 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -83,6 +83,7 @@ #define GF_XATTR_CLRLK_CMD "glusterfs.clrlk" #define GF_XATTR_PATHINFO_KEY "trusted.glusterfs.pathinfo" #define GF_XATTR_NODE_UUID_KEY "trusted.glusterfs.node-uuid" +#define GF_REBAL_FIND_LOCAL_SUBVOL "glusterfs.find-local-subvol" #define GF_XATTR_VOL_ID_KEY "trusted.glusterfs.volume-id" #define GF_XATTR_LOCKINFO_KEY "trusted.glusterfs.lockinfo" #define GF_XATTR_GET_REAL_FILENAME_KEY "glusterfs.get_real_filename:" diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h index fc06d52239b..a77998e8a63 100644 --- a/libglusterfs/src/mem-types.h +++ b/libglusterfs/src/mem-types.h @@ -150,6 +150,8 @@ enum gf_common_mem_types_ { gf_common_mt_nfs_exports = 131, gf_common_mt_gf_brick_spec_t = 132, gf_common_mt_gf_timer_entry_t = 133, + gf_common_mt_int = 134, + gf_common_mt_pointer = 135, gf_common_mt_end }; #endif diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c index 307265fb56a..31270c8b8bc 100644 --- a/xlators/cluster/dht/src/dht-common.c +++ b/xlators/cluster/dht/src/dht-common.c @@ -2534,6 +2534,87 @@ dht_vgetxattr_fill_and_set (dht_local_t *local, dict_t **dict, xlator_t *this, out: return ret; } +int +dht_find_local_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, dict_t *xattr, + dict_t *xdata) +{ + dht_local_t *local = NULL; + dht_conf_t *conf = NULL; + call_frame_t *prev = NULL; + int this_call_cnt = 0; + int ret = 0; + char *uuid_str = NULL; + uuid_t node_uuid = {0,}; + + + VALIDATE_OR_GOTO (frame, out); + VALIDATE_OR_GOTO (frame->local, out); + + local = frame->local; + prev = cookie; + conf = this->private; + + LOCK (&frame->lock); + { + this_call_cnt = --local->call_cnt; + if (op_ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "getxattr err (%s) for dir", + strerror (op_errno)); + local->op_ret = -1; + local->op_errno = op_errno; + goto unlock; + } + + ret = dict_get_str (xattr, local->xsel, &uuid_str); + + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, "Failed to " + "get %s", local->xsel); + local->op_ret = -1; + local->op_errno = EINVAL; + goto unlock; + } + + if (gf_uuid_parse (uuid_str, node_uuid)) { + gf_log (this->name, GF_LOG_ERROR, "Failed to parse uuid" + " failed for %s", prev->this->name); + local->op_ret = -1; + local->op_errno = EINVAL; + goto unlock; + } + + if (gf_uuid_compare (node_uuid, conf->defrag->node_uuid)) { + gf_log (this->name, GF_LOG_DEBUG, "subvol %s does not" + "belong to this node", prev->this->name); + } else { + conf->local_subvols[(conf->local_subvols_cnt)++] + = prev->this; + gf_log (this->name, GF_LOG_DEBUG, "subvol %s belongs to" + " this node", prev->this->name); + } + } + + local->op_ret = 0; + unlock: + UNLOCK (&frame->lock); + + if (!is_last_call (this_call_cnt)) + goto out; + + if (local->op_ret == -1) { + goto unwind; + } + + DHT_STACK_UNWIND (getxattr, frame, 0, 0, NULL, NULL); + goto out; + + unwind: + DHT_STACK_UNWIND (getxattr, frame, -1, local->op_errno, NULL, NULL); + out: + return 0; +} int dht_vgetxattr_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, @@ -2892,7 +2973,8 @@ dht_getxattr (call_frame_t *frame, xlator_t *this, int op_errno = -1; int i = 0; int cnt = 0; - + char *node_uuid_key = NULL; + int ret = -1; VALIDATE_OR_GOTO (frame, err); VALIDATE_OR_GOTO (this, err); VALIDATE_OR_GOTO (loc, err); @@ -2933,6 +3015,28 @@ dht_getxattr (call_frame_t *frame, xlator_t *this, return 0; } + if (key && DHT_IS_DIR(layout) && + (!strcmp (key, GF_REBAL_FIND_LOCAL_SUBVOL))) { + ret = gf_asprintf + (&node_uuid_key, "%s", GF_XATTR_NODE_UUID_KEY); + if (ret == -1 || !node_uuid_key) { + gf_log (this->name, GF_LOG_ERROR, "Failed to copy key"); + op_errno = ENOMEM; + goto err; + } + (void) strncpy (local->xsel, node_uuid_key, 256); + cnt = local->call_cnt = conf->subvolume_cnt; + for (i = 0; i < cnt; i++) { + STACK_WIND (frame, dht_find_local_subvol_cbk, + conf->subvolumes[i], + conf->subvolumes[i]->fops->getxattr, + loc, node_uuid_key, xdata); + } + if (node_uuid_key) + GF_FREE (node_uuid_key); + return 0; + } + /* for file use cached subvolume (obviously!): see if {} * below * for directory: @@ -2942,6 +3046,7 @@ dht_getxattr (call_frame_t *frame, xlator_t *this, * NOTE: Don't trust inode here, as that may not be valid * (until inode_link() happens) */ + if (key && DHT_IS_DIR(layout) && (XATTR_IS_PATHINFO (key) || (strcmp (key, GF_XATTR_NODE_UUID_KEY) == 0))) { @@ -3831,13 +3936,24 @@ dht_opendir (call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd, goto err; } - local->call_cnt = conf->subvolume_cnt; + if (!(conf->local_subvols_cnt) || !conf->defrag) { + local->call_cnt = conf->subvolume_cnt; - for (i = 0; i < conf->subvolume_cnt; i++) { - STACK_WIND (frame, dht_fd_cbk, - conf->subvolumes[i], - conf->subvolumes[i]->fops->opendir, - loc, fd, xdata); + for (i = 0; i < conf->subvolume_cnt; i++) { + STACK_WIND (frame, dht_fd_cbk, + conf->subvolumes[i], + conf->subvolumes[i]->fops->opendir, + loc, fd, xdata); + + } + } else { + local->call_cnt = conf->local_subvols_cnt; + for (i = 0; i < conf->local_subvols_cnt; i++) { + STACK_WIND (frame, dht_fd_cbk, + conf->local_subvols[i], + conf->local_subvols[i]->fops->opendir, + loc, fd, xdata); + } } return 0; diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index 43f7c0264f5..d5bb6fc61d4 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -293,6 +293,20 @@ struct gf_defrag_pattern_list { gf_defrag_pattern_list_t *next; }; +struct dht_container { + union { + struct list_head list; + struct { + struct _gf_dirent_t *next; + struct _gf_dirent_t *prev; + }; + }; + gf_dirent_t *df_entry; + xlator_t *this; + loc_t *parent_loc; + dict_t *migrate_data; +}; + struct gf_defrag_info_ { uint64_t total_files; uint64_t total_data; @@ -320,6 +334,19 @@ struct gf_defrag_info_ { uint64_t total_files_demoted; int write_freq_threshold; int read_freq_threshold; + + pthread_cond_t parallel_migration_cond; + pthread_mutex_t dfq_mutex; + pthread_cond_t rebalance_crawler_alarm; + int32_t q_entry_count; + int32_t global_error; + struct dht_container *queue; + int32_t crawl_done; + int32_t abort; + int32_t wakeup_crawler; + + /* Hard link handle requirement */ + synclock_t link_lock; }; typedef struct gf_defrag_info_ gf_defrag_info_t; @@ -397,9 +424,19 @@ struct dht_conf { dht_methods_t *methods; struct mem_pool *lock_pool; + + /*local subvol storage for rebalance*/ + xlator_t **local_subvols; + int32_t local_subvols_cnt; }; typedef struct dht_conf dht_conf_t; +struct dht_dfoffset_ctx { + xlator_t *this; + off_t offset; + int32_t readdir_done; +}; +typedef struct dht_dfoffset_ctx dht_dfoffset_ctx_t; struct dht_disk_layout { uint32_t cnt; @@ -423,6 +460,14 @@ typedef enum { GF_DHT_WEIGHTED_DISTRIBUTION } dht_distribution_type_t; +struct dir_dfmeta { + gf_dirent_t *equeue; + dht_dfoffset_ctx_t *offset_var; + struct list_head **head; + struct list_head **iterator; + int *fetch_entries; +}; + #define ENTRY_MISSING(op_ret, op_errno) (op_ret == -1 && op_errno == ENOENT) #define is_revalidate(loc) (dht_inode_ctx_layout_get (loc->inode, this, NULL) == 0) @@ -608,6 +653,8 @@ int dht_start_rebalance_task (xlator_t *this, call_frame_t *frame); int dht_rebalance_in_progress_check (xlator_t *this, call_frame_t *frame); int dht_rebalance_complete_check (xlator_t *this, call_frame_t *frame); +int +dht_init_local_subvolumes (xlator_t *this, dht_conf_t *conf); /* FOPS */ int32_t dht_lookup (call_frame_t *frame, diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c index cab66017b84..b5114b620ce 100644 --- a/xlators/cluster/dht/src/dht-helper.c +++ b/xlators/cluster/dht/src/dht-helper.c @@ -731,7 +731,28 @@ err: return -1; } +int +dht_init_local_subvolumes (xlator_t *this, dht_conf_t *conf) +{ + xlator_list_t *subvols = NULL; + int cnt = 0; + if (!conf) + return -1; + + for (subvols = this->children; subvols; subvols = subvols->next) + cnt++; + + conf->local_subvols = GF_CALLOC (cnt, sizeof (xlator_t *), + gf_dht_mt_xlator_t); + if (!conf->local_subvols) { + return -1; + } + + conf->local_subvols_cnt = 0; + + return 0; +} int dht_init_subvolumes (xlator_t *this, dht_conf_t *conf) @@ -752,6 +773,8 @@ dht_init_subvolumes (xlator_t *this, dht_conf_t *conf) } conf->subvolume_cnt = cnt; + conf->local_subvols_cnt = 0; + dht_set_subvol_range(this); cnt = 0; diff --git a/xlators/cluster/dht/src/dht-mem-types.h b/xlators/cluster/dht/src/dht-mem-types.h index e893eb48fd8..46028e6d9e0 100644 --- a/xlators/cluster/dht/src/dht-mem-types.h +++ b/xlators/cluster/dht/src/dht-mem-types.h @@ -30,6 +30,9 @@ enum gf_dht_mem_types_ { gf_defrag_info_mt, gf_dht_mt_inode_ctx_t, gf_dht_mt_ctx_stat_time_t, + gf_dht_mt_dirent_t, + gf_dht_mt_container_t, + gf_dht_mt_octx_t, gf_dht_mt_end }; #endif diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index 206628208f5..52f91946240 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -23,7 +23,25 @@ #define GF_DISK_SECTOR_SIZE 512 #define DHT_REBALANCE_PID 4242 /* Change it if required */ #define DHT_REBALANCE_BLKSIZE (128 * 1024) +#define MAX_MIGRATOR_THREAD_COUNT 20 +#define MAX_MIGRATE_QUEUE_COUNT 500 +#define MIN_MIGRATE_QUEUE_COUNT 200 +#define GF_CRAWL_INDEX_MOVE(idx, sv_cnt) { \ + idx++; \ + idx %= sv_cnt; \ + } + +void +dht_set_global_defrag_error (gf_defrag_info_t *defrag, int ret) +{ + LOCK (&defrag->lock); + { + defrag->global_error = ret; + } + UNLOCK (&defrag->lock); + return; +} static int dht_write_with_holes (xlator_t *to, fd_t *fd, struct iovec *vec, int count, int32_t size, off_t offset, struct iobref *iobref) @@ -178,6 +196,47 @@ gf_defrag_handle_hardlink (xlator_t *this, loc_t *loc, dict_t *xattrs, goto out; } + /* + Parallel migration can lead to migration of the hard link multiple + times which can lead to data loss. Hence, adding a fresh lookup to + decide whether migration is required or not. + + Elaborating the scenario for let say 10 hardlinks [link{1..10}]: + Let say the first hard link "link1" does the setxattr of the + new hashed subvolume info on the cached file. As there are multiple + threads working, we might have already all the links created on the + new hashed by the time we reach hardlink let say link5. Now the + number of links on hashed is equal to that of cached. Hence, file + migration will happen for link6. + + Cached Hashed + --------T link6 rwxrwxrwx link6 + + Now post above state all the link file on the cached will be zero + byte linkto files. Hence, if we still do migration for the following + files link{7..10}, we will end up migrating 0 data leading to data + loss. + Hence, a lookup can make sure whether we need to migrate the + file or not. + */ + + ret = syncop_lookup (this, loc, NULL, NULL, + NULL, NULL); + if (ret) { + /*Ignore ENOENT and ESTALE as file might have been + migrated already*/ + if (-ret == ENOENT || -ret == ESTALE) { + ret = -2; + goto out; + } + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:%s lookup failed with ret = %d", + loc->path, ret); + ret = -1; + goto out; + } + cached_subvol = dht_subvol_get_cached (this, loc->inode); if (!cached_subvol) { gf_msg (this->name, GF_LOG_ERROR, 0, @@ -198,6 +257,11 @@ gf_defrag_handle_hardlink (xlator_t *this, loc_t *loc, dict_t *xattrs, goto out; } + if (hashed_subvol == cached_subvol) { + ret = -2; + goto out; + } + gf_log (this->name, GF_LOG_INFO, "Attempting to migrate hardlink %s " "with gfid %s from %s -> %s", loc->name, uuid_utoa (loc->gfid), cached_subvol->name, hashed_subvol->name); @@ -288,7 +352,8 @@ out: */ static inline int __is_file_migratable (xlator_t *this, loc_t *loc, - struct iatt *stbuf, dict_t *xattrs, int flags) + struct iatt *stbuf, dict_t *xattrs, int flags, + gf_defrag_info_t *defrag) { int ret = -1; @@ -308,13 +373,14 @@ __is_file_migratable (xlator_t *this, loc_t *loc, if (stbuf->ia_nlink > 1) { /* support for decomission */ if (flags == GF_DHT_MIGRATE_HARDLINK) { - ret = gf_defrag_handle_hardlink (this, loc, - xattrs, stbuf); - - /* - Returning zero will force the file to be remigrated. - Checkout gf_defrag_handle_hardlink for more information. - */ + synclock_lock (&defrag->link_lock); + ret = gf_defrag_handle_hardlink + (this, loc, xattrs, stbuf); + synclock_unlock (&defrag->link_lock); + /* + Returning zero will force the file to be remigrated. + Checkout gf_defrag_handle_hardlink for more information. + */ if (ret && ret != -2) { gf_msg (this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED, @@ -610,6 +676,7 @@ __dht_rebalance_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst while (total < ia_size) { read_size = (((ia_size - total) > DHT_REBALANCE_BLKSIZE) ? DHT_REBALANCE_BLKSIZE : (ia_size - total)); + ret = syncop_readv (from, src, read_size, offset, 0, &vector, &count, &iobref, NULL, NULL); @@ -904,6 +971,11 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to, loc_t tmp_loc = {0, }; gf_boolean_t locked = _gf_false; int lk_ret = -1; + gf_defrag_info_t *defrag = NULL; + + defrag = conf->defrag; + if (!defrag) + goto out; gf_log (this->name, GF_LOG_INFO, "%s: attempting to move from %s to %s", loc->path, from->name, to->name); @@ -960,7 +1032,7 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to, src_ia_prot = stbuf.ia_prot; /* Check if file can be migrated */ - ret = __is_file_migratable (this, loc, &stbuf, xattr_rsp, flag); + ret = __is_file_migratable (this, loc, &stbuf, xattr_rsp, flag, defrag); if (ret) { if (ret == -2) ret = 0; @@ -1295,6 +1367,7 @@ dht_start_rebalance_task (xlator_t *this, call_frame_t *frame) { int ret = -1; + frame->root->pid = GF_CLIENT_PID_DEFRAG; ret = synctask_new (this->ctx->env, rebalance_task, rebalance_task_completion, frame, frame); @@ -1406,61 +1479,341 @@ gf_defrag_pattern_match (gf_defrag_info_t *defrag, char *name, uint64_t size) return ret; } -/* We do a depth first traversal of directories. But before we move into - * subdirs, we complete the data migration of those directories whose layouts - * have been fixed - */ +int dht_dfreaddirp_done (dht_dfoffset_ctx_t *offset_var, int cnt) { + + int i; + int result = 1; + + for (i = 0; i < cnt; i++) { + if (offset_var[i].readdir_done == 0) { + result = 0; + break; + } + } + return result; +} + +int static +gf_defrag_ctx_subvols_init (dht_dfoffset_ctx_t *offset_var, xlator_t *this) { + + int i; + dht_conf_t *conf = NULL; + + conf = this->private; + + if (!conf) + return -1; + + for (i = 0; i < conf->local_subvols_cnt; i++) { + offset_var[i].this = conf->local_subvols[i]; + offset_var[i].offset = (off_t) 0; + offset_var[i].readdir_done = 0; + } + + return 0; +} int -gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, - dict_t *migrate_data) +gf_defrag_migrate_single_file (void *opaque) { - int ret = -1; - loc_t entry_loc = {0,}; - fd_t *fd = NULL; - gf_dirent_t entries; - gf_dirent_t *tmp = NULL; + xlator_t *this = NULL; + dht_conf_t *conf = NULL; + gf_defrag_info_t *defrag = NULL; + int ret = 0; gf_dirent_t *entry = NULL; - gf_boolean_t free_entries = _gf_false; - off_t offset = 0; - dict_t *dict = NULL; + struct timeval start = {0,}; + loc_t entry_loc = {0,}; + loc_t *loc = NULL; struct iatt iatt = {0,}; + dict_t *migrate_data = NULL; int32_t op_errno = 0; - char *uuid_str = NULL; - uuid_t node_uuid = {0,}; - struct timeval dir_start = {0,}; struct timeval end = {0,}; double elapsed = {0,}; - struct timeval start = {0,}; - int loglevel = GF_LOG_TRACE; + struct dht_container *rebal_entry = NULL; - gf_log (this->name, GF_LOG_INFO, "migrate data called on %s", - loc->path); - gettimeofday (&dir_start, NULL); + rebal_entry = (struct dht_container *)opaque; + if (!rebal_entry) { + gf_log (this->name, GF_LOG_ERROR, "rebal_entry is NULL"); + ret = -1; + goto out; + } - fd = fd_create (loc->inode, defrag->pid); - if (!fd) { - gf_log (this->name, GF_LOG_ERROR, "Failed to create fd"); + this = rebal_entry->this; + + conf = this->private; + + defrag = conf->defrag; + + loc = rebal_entry->parent_loc; + + migrate_data = rebal_entry->migrate_data; + + entry = rebal_entry->df_entry; + + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + ret = -1; goto out; } - ret = syncop_opendir (this, loc, fd, NULL, NULL); + if (defrag->stats == _gf_true) { + gettimeofday (&start, NULL); + } + + if (defrag->defrag_pattern && + (gf_defrag_pattern_match (defrag, entry->d_name, + entry->d_stat.ia_size) == _gf_false)) { + gf_log (this->name, GF_LOG_ERROR, "pattern_match failed"); + goto out; + } + + memset (&entry_loc, 0, sizeof (entry_loc)); + + ret = dht_build_child_loc (this, &entry_loc, loc, entry->d_name); + if (ret) { + LOCK (&defrag->lock); + { + defrag->total_failures += 1; + } + UNLOCK (&defrag->lock); + + ret = 0; + + gf_log (this->name, GF_LOG_ERROR, "Child loc build failed"); + + goto out; + } + + gf_uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid); + + gf_uuid_copy (entry_loc.pargfid, loc->gfid); + + entry_loc.inode->ia_type = entry->d_stat.ia_type; + + ret = syncop_lookup (this, &entry_loc, &iatt, NULL, NULL, NULL); if (ret) { gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_MIGRATE_DATA_FAILED, - "Migrate data failed: Failed to open dir %s", - loc->path); - ret = -1; + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed: %s lookup failed", + entry_loc.name); + ret = 0; goto out; } - INIT_LIST_HEAD (&entries.list); + ret = syncop_setxattr (this, &entry_loc, migrate_data, 0, NULL, NULL); + if (ret < 0) { + op_errno = -ret; + /* errno is overloaded. See + * rebalance_task_completion () */ + if (op_errno == ENOSPC) { + gf_msg_debug (this->name, 0, "migrate-data skipped for" + " %s due to space constraints", + entry_loc.path); + LOCK (&defrag->lock); + { + defrag->skipped += 1; + } + UNLOCK (&defrag->lock); + } else if (op_errno != EEXIST) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "migrate-data failed for %s", entry_loc.path); - while ((ret = syncop_readdirp (this, fd, 131072, offset, &entries, - NULL, NULL)) != 0) { + LOCK (&defrag->lock); + { + defrag->total_failures += 1; + } + UNLOCK (&defrag->lock); - if (ret < 0) { + } + + ret = gf_defrag_handle_migrate_error (op_errno, defrag); + + if (!ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "migrate-data on %s failed: %s", entry_loc.path, + strerror (op_errno)); + } else if (ret == 1) { + ret = 0; + goto out; + } else if (ret == -1) { + goto out; + } + } else if (ret > 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "migrate-data failed for %s", entry_loc.path); + ret = 0; + LOCK (&defrag->lock); + { + defrag->total_failures += 1; + } + UNLOCK (&defrag->lock); + } + + LOCK (&defrag->lock); + { + defrag->total_files += 1; + defrag->total_data += iatt.ia_size; + } + UNLOCK (&defrag->lock); + + if (defrag->stats == _gf_true) { + gettimeofday (&end, NULL); + elapsed = (end.tv_sec - start.tv_sec) * 1e6 + + (end.tv_usec - start.tv_usec); + gf_log (this->name, GF_LOG_INFO, "Migration of " + "file:%s size:%"PRIu64" bytes took %.2f" + "secs and ret: %d", entry_loc.name, + iatt.ia_size, elapsed/1e6, ret); + } + +out: + loc_wipe (&entry_loc); + + return ret; + +} + +void * +gf_defrag_task (void *opaque) +{ + struct list_head *q_head = NULL; + struct dht_container *iterator = NULL; + gf_defrag_info_t *defrag = NULL; + int ret = 0; + gf_boolean_t true = _gf_true; + + + defrag = (gf_defrag_info_t *)opaque; + if (!defrag) { + gf_msg ("dht", GF_LOG_ERROR, 0, 0, "defrag is NULL"); + goto out; + } + + q_head = &(defrag->queue[0].list); + + /* The following while loop will dequeue one entry from the defrag->queue + under lock. We will update the defrag->global_error only when there + is an error which is critical to stop the rebalance process. The stop + message will be intimated to other migrator threads by setting the + defrag->defrag_status to GF_DEFRAG_STATUS_FAILED. + + In defrag->queue, a low watermark (MIN_MIGRATE_QUEUE_COUNT) is + maintained so that crawler does not starve the file migration + workers and a high watermark (MAX_MIGRATE_QUEUE_COUNT) so that + crawler does not go far ahead in filling up the queue. + */ + while (true) { + + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + goto out; + } + + pthread_mutex_lock (&defrag->dfq_mutex); + { + if (defrag->q_entry_count) { + iterator = list_entry (q_head->next, + typeof(*iterator), list); + + gf_log ("DHT", GF_LOG_DEBUG, "picking entry " + "%s", iterator->df_entry->d_name); + + list_del_init (&(iterator->list)); + + defrag->q_entry_count--; + + if ((defrag->q_entry_count < + MIN_MIGRATE_QUEUE_COUNT) && + defrag->wakeup_crawler) { + pthread_cond_broadcast ( + &defrag->rebalance_crawler_alarm); + } + pthread_mutex_unlock (&defrag->dfq_mutex); + ret = gf_defrag_migrate_single_file + ((void *)iterator); + + /*Critical errors: ENOTCONN and ENOSPACE*/ + if (ret) { + dht_set_global_defrag_error + (defrag, ret); + + defrag->defrag_status = + GF_DEFRAG_STATUS_FAILED; + goto out; + } + + gf_dirent_free (iterator->df_entry); + GF_FREE (iterator); + continue; + } else { + + /* defrag->crawl_done flag is set means crawling + file system is done and hence a list_empty when + the above flag is set indicates there are no more + entries to be added to the queue and rebalance is + finished */ + + if (!defrag->crawl_done) { + pthread_cond_wait ( + &defrag->parallel_migration_cond, + &defrag->dfq_mutex); + } + + if (defrag->crawl_done && + !defrag->q_entry_count) { + pthread_cond_broadcast ( + &defrag->parallel_migration_cond); + goto unlock; + } else { + pthread_mutex_unlock + (&defrag->dfq_mutex); + continue; + } + } + + } +unlock: + pthread_mutex_unlock (&defrag->dfq_mutex); + break; + } +out: + return NULL; +} + +int static +gf_defrag_get_entry (xlator_t *this, int i, struct dht_container **container, + loc_t *loc, dht_conf_t *conf, gf_defrag_info_t *defrag, + fd_t *fd, dict_t *migrate_data, + struct dir_dfmeta *dir_dfmeta, dict_t *xattr_req) +{ + int ret = -1; + char is_linkfile = 0; + gf_dirent_t *df_entry = NULL; + loc_t entry_loc = {0,}; + dict_t *xattr_rsp = NULL; + struct iatt iatt = {0,}; + struct dht_container *tmp_container = NULL; + xlator_t *hashed_subvol = NULL; + xlator_t *cached_subvol = NULL; + + if (dir_dfmeta->offset_var[i].readdir_done == 1) { + ret = 0; + goto out; + } + if (dir_dfmeta->fetch_entries[i] == 1) { + ret = syncop_readdirp (conf->local_subvols[i], fd, 131072, + dir_dfmeta->offset_var[i].offset, + &(dir_dfmeta->equeue[i]), + NULL, NULL); + if (ret == 0) { + dir_dfmeta->offset_var[i].readdir_done = 1; + ret = 0; + goto out; + } + + if (ret < 0) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_DATA_FAILED, "%s: Migrate data failed: Readdir returned" @@ -1470,213 +1823,426 @@ gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, goto out; } - if (list_empty (&entries.list)) - break; + if (list_empty (&(dir_dfmeta->equeue[i].list))) { + dir_dfmeta->offset_var[i].readdir_done = 1; + ret = 0; + goto out; + } - free_entries = _gf_true; + dir_dfmeta->fetch_entries[i] = 0; + } - list_for_each_entry_safe (entry, tmp, &entries.list, list) { + while (1) { - if (dict) { - dict_unref (dict); - dict = NULL; - } + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + ret = -1; + goto out; + } - if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { - ret = 1; - goto out; - } + df_entry = list_entry (dir_dfmeta->iterator[i]->next, + typeof (*df_entry), list); - offset = entry->d_off; + if (&df_entry->list == dir_dfmeta->head[i]) { + gf_dirent_free (&(dir_dfmeta->equeue[i])); + INIT_LIST_HEAD (&(dir_dfmeta->equeue[i].list)); + dir_dfmeta->fetch_entries[i] = 1; + dir_dfmeta->iterator[i] = dir_dfmeta->head[i]; + ret = 0; + goto out; + } - if (!strcmp (entry->d_name, ".") || - !strcmp (entry->d_name, "..")) - continue; + dir_dfmeta->iterator[i] = dir_dfmeta->iterator[i]->next; - if (IA_ISDIR (entry->d_stat.ia_type)) - continue; + dir_dfmeta->offset_var[i].offset = df_entry->d_off; + if (!strcmp (df_entry->d_name, ".") || + !strcmp (df_entry->d_name, "..")) + continue; - defrag->num_files_lookedup++; - if (defrag->stats == _gf_true) { - gettimeofday (&start, NULL); - } + if (IA_ISDIR (df_entry->d_stat.ia_type)) + continue; - if (defrag->defrag_pattern && - (gf_defrag_pattern_match (defrag, entry->d_name, - entry->d_stat.ia_size) - == _gf_false)) { - continue; - } + defrag->num_files_lookedup++; - loc_wipe (&entry_loc); - ret =dht_build_child_loc (this, &entry_loc, loc, - entry->d_name); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Child loc" - " build failed"); - goto out; - } + if (defrag->defrag_pattern && + (gf_defrag_pattern_match (defrag, df_entry->d_name, + df_entry->d_stat.ia_size) + == _gf_false)) { + continue; + } - if (gf_uuid_is_null (entry->d_stat.ia_gfid)) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_GFID_NULL, - "%s/%s gfid not present", loc->path, - entry->d_name); - continue; - } + loc_wipe (&entry_loc); + ret = dht_build_child_loc (this, &entry_loc, loc, + df_entry->d_name); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "Child loc" + " build failed"); + ret = -1; + goto out; + } - gf_uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid); + if (gf_uuid_is_null (df_entry->d_stat.ia_gfid)) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_GFID_NULL, + "%s/%s gfid not present", loc->path, + df_entry->d_name); + continue; + } - if (gf_uuid_is_null (loc->gfid)) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_GFID_NULL, - "%s/%s gfid not present", loc->path, - entry->d_name); - continue; - } + gf_uuid_copy (entry_loc.gfid, df_entry->d_stat.ia_gfid); - gf_uuid_copy (entry_loc.pargfid, loc->gfid); + if (gf_uuid_is_null (loc->gfid)) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_GFID_NULL, + "%s/%s gfid not present", loc->path, + df_entry->d_name); + continue; + } - entry_loc.inode->ia_type = entry->d_stat.ia_type; + gf_uuid_copy (entry_loc.pargfid, loc->gfid); - ret = syncop_lookup (this, &entry_loc, &iatt, NULL, - NULL, NULL); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_MIGRATE_FILE_FAILED, - "Migrate file failed:%s lookup failed", - entry_loc.path); - ret = -1; - continue; - } + entry_loc.inode->ia_type = df_entry->d_stat.ia_type; + ret = syncop_lookup (conf->local_subvols[i], &entry_loc, + &iatt, NULL, xattr_req, &xattr_rsp); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:%s lookup failed", + entry_loc.path); + continue; + } - ret = syncop_getxattr (this, &entry_loc, &dict, - GF_XATTR_NODE_UUID_KEY, NULL, - NULL); - if(ret < 0) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_MIGRATE_FILE_FAILED, - "Migrate file failed:" - "Failed to get node-uuid for %s", - entry_loc.path); - ret = -1; - continue; - } - ret = dict_get_str (dict, GF_XATTR_NODE_UUID_KEY, - &uuid_str); - if(ret < 0) { - gf_log (this->name, GF_LOG_ERROR, "Failed to " - "get node-uuid from dict for %s", - entry_loc.path); - ret = -1; - continue; - } + is_linkfile = check_is_linkfile (NULL, &iatt, xattr_rsp, + conf->link_xattr_name); - if (gf_uuid_parse (uuid_str, node_uuid)) { - gf_log (this->name, GF_LOG_ERROR, "gf_uuid_parse " - "failed for %s", entry_loc.path); - continue; - } + if (is_linkfile) { + /* No need to add linkto file to the queue for + migration. Only the actual data file need to + be checked for migration criteria. + */ + gf_log (this->name, GF_LOG_INFO, "linkfile." + " Hence skip for file: %s", entry_loc.path); + continue; + } - /* if file belongs to different node, skip migration - * the other node will take responsibility of migration - */ - if (gf_uuid_compare (node_uuid, defrag->node_uuid)) { - gf_msg_trace (this->name, 0, "%s does not" - "belong to this node", - entry_loc.path); - continue; - } - uuid_str = NULL; + ret = syncop_lookup (this, &entry_loc, NULL, NULL, + NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:%s lookup failed", + entry_loc.path); + continue; + } - /* if distribute is present, it will honor this key. - * -1, ENODATA is returned if distribute is not present - * or file doesn't have a link-file. If file has - * link-file, the path of link-file will be the value, - * and also that guarantees that file has to be mostly - * migrated */ + /* if distribute is present, it will honor this key. + * -1, ENODATA is returned if distribute is not present + * or file doesn't have a link-file. If file has + * link-file, the path of link-file will be the value, + * and also that guarantees that file has to be mostly + * migrated */ - ret = syncop_getxattr (this, &entry_loc, NULL, - GF_XATTR_LINKINFO_KEY, NULL, - NULL); - if (ret < 0) { - if (-ret != ENODATA) { - loglevel = GF_LOG_ERROR; - defrag->total_failures += 1; - } else { - loglevel = GF_LOG_TRACE; - } - gf_log (this->name, loglevel, "%s: failed to " - "get "GF_XATTR_LINKINFO_KEY" key - %s", - entry_loc.path, strerror (-ret)); - ret = -1; - continue; + hashed_subvol = dht_subvol_get_hashed (this, &entry_loc); + if (!hashed_subvol) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_HASHED_SUBVOL_GET_FAILED, + "Failed to get hashed subvol for %s", + loc->path); + continue; + } + + cached_subvol = dht_subvol_get_cached (this, entry_loc.inode); + if (!cached_subvol) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_CACHED_SUBVOL_GET_FAILED, + "Failed to get cached subvol for %s", + loc->path); + + continue; + } + + if (hashed_subvol == cached_subvol) { + continue; + } + + /*Build Container Structure */ + + tmp_container = GF_CALLOC (1, sizeof(struct dht_container), + gf_dht_mt_container_t); + if (!tmp_container) { + gf_log (this->name, GF_LOG_ERROR, "Failed to allocate " + "memory for container"); + ret = -1; + goto out; + } + tmp_container->df_entry = gf_dirent_for_name (df_entry->d_name); + if (!tmp_container->df_entry) { + gf_log (this->name, GF_LOG_ERROR, "Failed to allocate " + "memory for df_entry"); + ret = -1; + goto out; + } + + tmp_container->df_entry->d_stat = df_entry->d_stat; + + tmp_container->df_entry->d_ino = df_entry->d_ino; + + tmp_container->df_entry->d_type = df_entry->d_type; + + tmp_container->df_entry->d_len = df_entry->d_len; + + tmp_container->parent_loc = GF_CALLOC(1, sizeof(*loc), + gf_dht_mt_loc_t); + if (!tmp_container->parent_loc) { + gf_log (this->name, GF_LOG_ERROR, "Failed to allocate " + "memory for loc"); + ret = -1; + goto out; + } + + + ret = loc_copy (tmp_container->parent_loc, loc); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "loc_copy failed"); + ret = -1; + goto out; + } + + tmp_container->migrate_data = migrate_data; + + tmp_container->this = this; + + if (df_entry->dict) + tmp_container->df_entry->dict = + dict_ref (df_entry->dict); + + /*Build Container Structue >> END*/ + + ret = 0; + goto out; + + } + +out: + if (ret == 0) { + *container = tmp_container; + } else { + GF_FREE (tmp_container->parent_loc); + GF_FREE (tmp_container); + } + + if (xattr_rsp) + dict_unref (xattr_rsp); + return ret; +} + +int +gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, + dict_t *migrate_data) +{ + int ret = -1; + fd_t *fd = NULL; + dht_conf_t *conf = NULL; + gf_dirent_t entries; + dict_t *dict = NULL; + dict_t *xattr_req = NULL; + struct timeval dir_start = {0,}; + struct timeval end = {0,}; + double elapsed = {0,}; + int local_subvols_cnt = 0; + int i = 0; + struct dht_container *container = NULL; + int ldfq_count = 0; + int dfc_index = 0; + struct dir_dfmeta *dir_dfmeta = NULL; + + gf_log (this->name, GF_LOG_INFO, "migrate data called on %s", + loc->path); + gettimeofday (&dir_start, NULL); + + conf = this->private; + local_subvols_cnt = conf->local_subvols_cnt; + + if (!local_subvols_cnt) { + ret = 0; + goto out; + } + + fd = fd_create (loc->inode, defrag->pid); + if (!fd) { + gf_log (this->name, GF_LOG_ERROR, "Failed to create fd"); + ret = -1; + goto out; + } + + ret = syncop_opendir (this, loc, fd, NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_DATA_FAILED, + "Migrate data failed: Failed to open dir %s", + loc->path); + ret = -1; + goto out; + } + + dir_dfmeta = GF_CALLOC (1, sizeof (*dir_dfmeta), + gf_common_mt_pointer); + if (!dir_dfmeta) { + gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta is NULL"); + ret = -1; + goto out; + } + + + dir_dfmeta->head = GF_CALLOC (local_subvols_cnt, + sizeof (*(dir_dfmeta->head)), + gf_common_mt_pointer); + if (!dir_dfmeta->head) { + gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta->head is NULL"); + ret = -1; + goto out; + } + + dir_dfmeta->iterator = GF_CALLOC (local_subvols_cnt, + sizeof (*(dir_dfmeta->iterator)), + gf_common_mt_pointer); + if (!dir_dfmeta->iterator) { + gf_log (this->name, GF_LOG_ERROR, + "dir_dfmeta->iterator is NULL"); + ret = -1; + goto out; + } + + dir_dfmeta->equeue = GF_CALLOC (local_subvols_cnt, sizeof (entries), + gf_dht_mt_dirent_t); + if (!dir_dfmeta->equeue) { + gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta->equeue is NULL"); + ret = -1; + goto out; + } + + dir_dfmeta->offset_var = GF_CALLOC (local_subvols_cnt, + sizeof (dht_dfoffset_ctx_t), + gf_dht_mt_octx_t); + if (!dir_dfmeta->offset_var) { + gf_log (this->name, GF_LOG_ERROR, + "dir_dfmeta->offset_var is NULL"); + ret = -1; + goto out; + } + ret = gf_defrag_ctx_subvols_init (dir_dfmeta->offset_var, this); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "dht_dfoffset_ctx_t" + "initialization failed"); + ret = -1; + goto out; + } + + dir_dfmeta->fetch_entries = GF_CALLOC (local_subvols_cnt, + sizeof (int), gf_common_mt_int); + if (!dir_dfmeta->fetch_entries) { + gf_log (this->name, GF_LOG_ERROR, + "dir_dfmeta->fetch_entries is NULL"); + ret = -1; + goto out; + } + + for (i = 0; i < local_subvols_cnt ; i++) { + INIT_LIST_HEAD (&(dir_dfmeta->equeue[i].list)); + dir_dfmeta->head[i] = &(dir_dfmeta->equeue[i].list); + dir_dfmeta->iterator[i] = dir_dfmeta->head[i]; + dir_dfmeta->fetch_entries[i] = 1; + } + + xattr_req = dict_new (); + if (!xattr_req) { + gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); + ret = -1; + goto out; + } + + ret = dict_set_uint32 (xattr_req, + conf->link_xattr_name, 256); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "failed to set dict for " + "key: %s", conf->link_xattr_name); + ret = -1; + goto out; + } + + /* + Job: Read entries from each local subvol and store the entries + in equeue array of linked list. Now pick one entry from the + equeue array in a round robin basis and add them to defrag Queue. + */ + + while (!dht_dfreaddirp_done(dir_dfmeta->offset_var, + local_subvols_cnt)) { + + pthread_mutex_lock (&defrag->dfq_mutex); + { + while (defrag->q_entry_count > + MAX_MIGRATE_QUEUE_COUNT) { + defrag->wakeup_crawler = 1; + pthread_cond_wait ( + &defrag->rebalance_crawler_alarm, + &defrag->dfq_mutex); } - ret = syncop_setxattr (this, &entry_loc, migrate_data, - 0, NULL, NULL); - if (ret < 0) { - op_errno = -ret; - /* errno is overloaded. See - * rebalance_task_completion () */ - if (op_errno == ENOSPC) { - gf_msg_debug (this->name, 0, - "migrate-data skipped for" - " %s due to space " - "constraints", - entry_loc.path); - defrag->skipped +=1; - } else{ - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_MIGRATE_FILE_FAILED, - "migrate-data failed for %s", - entry_loc.path); - defrag->total_failures +=1; - } + ldfq_count = defrag->q_entry_count; - ret = gf_defrag_handle_migrate_error (op_errno, - defrag); + if (defrag->wakeup_crawler) { + defrag->wakeup_crawler = 0; + } - if (!ret) - gf_msg_debug (this->name, 0, - "migrate-data on %s " - "failed: %s", - entry_loc.path, - strerror (op_errno)); - else if (ret == 1) - continue; - else if (ret == -1) - goto out; - } else if (ret > 0) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_MIGRATE_FILE_FAILED, - "migrate-data failed for %s", - entry_loc.path); - defrag->total_failures +=1; + } + pthread_mutex_unlock (&defrag->dfq_mutex); + + while (ldfq_count <= MAX_MIGRATE_QUEUE_COUNT && + !dht_dfreaddirp_done(dir_dfmeta->offset_var, + local_subvols_cnt)) { + + ret = gf_defrag_get_entry (this, dfc_index, &container, + loc, conf, defrag, fd, + migrate_data, dir_dfmeta, + xattr_req); + if (ret) { + gf_log ("DHT", GF_LOG_INFO, "Found critical " + "error from gf_defrag_get_entry"); + ret = -1; + goto out; } - LOCK (&defrag->lock); - { - defrag->total_files += 1; - defrag->total_data += iatt.ia_size; + /* Check if we got an entry, else we need to move the + index to the next subvol */ + if (!container) { + GF_CRAWL_INDEX_MOVE(dfc_index, + local_subvols_cnt); + continue; } - UNLOCK (&defrag->lock); - if (defrag->stats == _gf_true) { - gettimeofday (&end, NULL); - elapsed = (end.tv_sec - start.tv_sec) * 1e6 + - (end.tv_usec - start.tv_usec); - gf_log (this->name, GF_LOG_INFO, "Migration of " - "file:%s size:%"PRIu64" bytes took %.2f" - "secs", entry_loc.path, iatt.ia_size, - elapsed/1e6); + + /* Q this entry in the dfq */ + pthread_mutex_lock (&defrag->dfq_mutex); + { + list_add_tail (&container->list, + &(defrag->queue[0].list)); + defrag->q_entry_count++; + ldfq_count = defrag->q_entry_count; + + gf_log (this->name, GF_LOG_DEBUG, "added " + "file:%s parent:%s to the queue ", + container->df_entry->d_name, + container->parent_loc->path); + + pthread_cond_signal ( + &defrag->parallel_migration_cond); } - } + pthread_mutex_unlock (&defrag->dfq_mutex); - gf_dirent_free (&entries); - free_entries = _gf_false; - INIT_LIST_HEAD (&entries.list); + GF_CRAWL_INDEX_MOVE(dfc_index, local_subvols_cnt); + } } gettimeofday (&end, NULL); @@ -1686,20 +2252,25 @@ gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, "%.2f secs", loc->path, elapsed/1e6); ret = 0; out: - if (free_entries) - gf_dirent_free (&entries); - loc_wipe (&entry_loc); + GF_FREE (dir_dfmeta->head); + GF_FREE (dir_dfmeta->equeue); + GF_FREE (dir_dfmeta->iterator); + GF_FREE (dir_dfmeta->offset_var); + GF_FREE (dir_dfmeta->fetch_entries); + GF_FREE (dir_dfmeta); if (dict) dict_unref(dict); + if (xattr_req) + dict_unref(xattr_req); + if (fd) fd_unref (fd); return ret; } - int gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, dict_t *fix_layout, dict_t *migrate_data) @@ -1725,7 +2296,7 @@ gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) && (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX)) { - ret = gf_defrag_migrate_data (this, defrag, loc, migrate_data); + ret = gf_defrag_process_dir (this, defrag, loc, migrate_data); if (ret) goto out; } @@ -1877,18 +2448,24 @@ out: int gf_defrag_start_crawl (void *data) { - xlator_t *this = NULL; - dht_conf_t *conf = NULL; - gf_defrag_info_t *defrag = NULL; - int ret = -1; - loc_t loc = {0,}; - struct iatt iatt = {0,}; - struct iatt parent = {0,}; - dict_t *fix_layout = NULL; - dict_t *migrate_data = NULL; - dict_t *status = NULL; - glusterfs_ctx_t *ctx = NULL; - dht_methods_t *methods = NULL; + xlator_t *this = NULL; + dht_conf_t *conf = NULL; + gf_defrag_info_t *defrag = NULL; + int ret = -1; + loc_t loc = {0,}; + struct iatt iatt = {0,}; + struct iatt parent = {0,}; + dict_t *fix_layout = NULL; + dict_t *migrate_data = NULL; + dict_t *status = NULL; + dict_t *dict = NULL; + glusterfs_ctx_t *ctx = NULL; + dht_methods_t *methods = NULL; + int i = 0; + int thread_index = 0; + int err = 0; + int thread_status = 0; + pthread_t tid[MAX_MIGRATOR_THREAD_COUNT]; this = data; if (!this) @@ -1938,7 +2515,7 @@ gf_defrag_start_crawl (void *data) "Failed to start rebalance:" "Failed to set dictionary value: key = %s", GF_XATTR_FIX_LAYOUT_KEY); - + ret = -1; goto out; } @@ -1970,6 +2547,53 @@ gf_defrag_start_crawl (void *data) "non-force"); if (ret) goto out; + + /* Find local subvolumes */ + ret = syncop_getxattr (this, &loc, &dict, + GF_REBAL_FIND_LOCAL_SUBVOL, + NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local " + "subvolume determination failed with error: %d", + -ret); + ret = -1; + goto out; + } + + for (i = 0 ; i < conf->local_subvols_cnt; i++) { + gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvols " + "are %s", conf->local_subvols[i]->name); + } + + /* Initialize global entry queue */ + defrag->queue = GF_CALLOC (1, sizeof (struct dht_container), + gf_dht_mt_container_t); + + if (!defrag->queue) { + gf_log (this->name, GF_LOG_INFO, "No memory for queue"); + ret = -1; + goto out; + } + + INIT_LIST_HEAD (&(defrag->queue[0].list)); + + /*Spawn Threads Here*/ + while (thread_index < MAX_MIGRATOR_THREAD_COUNT) { + err = pthread_create(&(tid[thread_index]), NULL, + &gf_defrag_task, (void *)defrag); + if (err != 0) { + gf_log ("DHT", GF_LOG_ERROR, + "Thread[%d] creation failed. " + "Aborting Rebalance", + thread_index); + ret = -1; + goto out; + } else { + gf_log ("DHT", GF_LOG_INFO, "Thread[%d] " + "creation successful", thread_index); + } + thread_index++; + } } ret = gf_defrag_fix_layout (this, defrag, &loc, fix_layout, @@ -2003,13 +2627,40 @@ gf_defrag_start_crawl (void *data) migrate_data); } } + gf_log ("DHT", GF_LOG_INFO, "crawling file-system completed"); +out: + /* We are here means crawling the entire file system is done + or something failed. Set defrag->crawl_done flag to intimate + the migrator threads to exhaust the defrag->queue and terminate*/ + + if (ret) { + defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; + } + + pthread_mutex_lock (&defrag->dfq_mutex); + { + defrag->crawl_done = 1; + + pthread_cond_broadcast ( + &defrag->parallel_migration_cond); + } + pthread_mutex_unlock (&defrag->dfq_mutex); + + /*Wait for all the threads to complete their task*/ + for (i = 0; i < thread_index; i++) { + thread_status = pthread_join (tid[i], NULL); + } + + if (defrag->queue) { + gf_dirent_free (defrag->queue[0].df_entry); + INIT_LIST_HEAD (&(defrag->queue[0].list)); + } if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) && (defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) { defrag->defrag_status = GF_DEFRAG_STATUS_COMPLETE; } -out: LOCK (&defrag->lock); { status = dict_new (); @@ -2022,10 +2673,13 @@ out: } UNLOCK (&defrag->lock); - if (defrag) { - GF_FREE (defrag); - conf->defrag = NULL; - } + GF_FREE (defrag->queue); + + GF_FREE (defrag); + conf->defrag = NULL; + + if (dict) + dict_unref(dict); return ret; } @@ -2049,6 +2703,7 @@ gf_defrag_start (void *data) dht_conf_t *conf = NULL; gf_defrag_info_t *defrag = NULL; xlator_t *this = NULL; + xlator_t *old_THIS = NULL; this = data; conf = this->private; @@ -2074,6 +2729,8 @@ gf_defrag_start (void *data) defrag->defrag_status = GF_DEFRAG_STATUS_STARTED; + old_THIS = THIS; + THIS = this; ret = synctask_new (this->ctx->env, gf_defrag_start_crawl, gf_defrag_done, frame, this); @@ -2081,6 +2738,7 @@ gf_defrag_start (void *data) gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_REBALANCE_START_FAILED, "Could not create task for rebalance"); + THIS = old_THIS; out: return NULL; } diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c index 042adf1714b..3b22a2a8486 100644 --- a/xlators/cluster/dht/src/dht-shared.c +++ b/xlators/cluster/dht/src/dht-shared.c @@ -589,6 +589,23 @@ dht_init (xlator_t *this) defrag->cmd = cmd; defrag->stats = _gf_false; + + defrag->queue = NULL; + + defrag->crawl_done = 0; + + defrag->global_error = 0; + + defrag->q_entry_count = 0; + + defrag->wakeup_crawler = 0; + + synclock_init (&defrag->link_lock, SYNC_LOCK_DEFAULT); + pthread_mutex_init (&defrag->dfq_mutex, 0); + pthread_cond_init (&defrag->parallel_migration_cond, 0); + pthread_cond_init (&defrag->rebalance_crawler_alarm, 0); + defrag->global_error = 0; + } conf->search_unhashed = GF_DHT_LOOKUP_UNHASHED_ON; @@ -651,6 +668,15 @@ dht_init (xlator_t *this) goto err; } + if (cmd) { + ret = dht_init_local_subvolumes (this, conf); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "dht_init_local_subvolumes failed"); + goto err; + } + } + if (dict_get_str (this->options, "decommissioned-bricks", &temp_str) == 0) { ret = dht_parse_decommissioned_bricks (this, conf, temp_str); if (ret == -1) |