diff options
author | Susant Palai <spalai@redhat.com> | 2015-04-12 15:55:02 +0530 |
---|---|---|
committer | Shyamsundar Ranganathan <srangana@redhat.com> | 2015-04-29 06:48:00 -0700 |
commit | b3a966c241b5d5b8117f06a4c744c18b6a59bb18 (patch) | |
tree | 48772651727bc7f77905b8d4bc02d6c522838e24 /xlators/cluster/dht/src | |
parent | 9f7557a50584dd71e7d84cedf16d4937dc821f42 (diff) |
rebalance: Introducing local crawl and parallel migration
The current patch address two part of the design proposed.
1. Rebalance multiple files in parallel
2. Crawl only bricks that belong to the current node
Brief design explanation for the above two points.
1. Rebalance multiple files in parallel:
-------------------------------------
The existing rebalance engine is single threaded. Hence, introduced
multiple threads which will be running parallel to the crawler. The
current rebalance migration is converted to a "Producer-Consumer"
frame work.
Where Producer is : Crawler
Consumer is : Migrating Threads
Crawler: Crawler is the main thread. The job of the crawler is now
limited to fix-layout of each directory and add the files which are
eligible for the migration to a global queue in a round robin manner
so that we will use all the disk resources efficiently. Hence, the
crawler will not be "blocked" by migration process.
Producer: Producer will monitor the global queue. If any file is
added to this queue, it will dqueue that entry and migrate the file.
Currently 20 migration threads are spawned at the beginning of the
rebalance process. Hence, multiple file migration happens in parallel.
2. Crawl only bricks that belong to the current node:
--------------------------------------------------
As rebalance process is spawned per node, it migrates only the files
that belongs to it's own node for the sake of load balancing. But it
also reads entries from the whole cluster, which is not necessary as
readdir hits other nodes.
New Design:
As part of the new design the rebalancer decides the subvols
that are local to the rebalancer node by checking the node-uuid of
root directory prior to the crawler starts. Hence, readdir won't hit
the whole cluster as it has already the context of local subvols and
also node-uuid request for each file can be avoided. This makes the
rebalance process "more scalable".
Change-Id: I73ed6ff807adea15086eabbb8d9883e88571ebc1
BUG: 1171954
Signed-off-by: Susant Palai <spalai@redhat.com>
Reviewed-on: http://review.gluster.org/9657
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: N Balachandran <nbalacha@redhat.com>
Reviewed-by: Shyamsundar Ranganathan <srangana@redhat.com>
Diffstat (limited to 'xlators/cluster/dht/src')
-rw-r--r-- | xlators/cluster/dht/src/dht-common.c | 130 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 47 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-helper.c | 23 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-mem-types.h | 3 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-rebalance.c | 1140 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-shared.c | 26 |
6 files changed, 1121 insertions, 248 deletions
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c index 307265fb56a..31270c8b8bc 100644 --- a/xlators/cluster/dht/src/dht-common.c +++ b/xlators/cluster/dht/src/dht-common.c @@ -2534,6 +2534,87 @@ dht_vgetxattr_fill_and_set (dht_local_t *local, dict_t **dict, xlator_t *this, out: return ret; } +int +dht_find_local_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, dict_t *xattr, + dict_t *xdata) +{ + dht_local_t *local = NULL; + dht_conf_t *conf = NULL; + call_frame_t *prev = NULL; + int this_call_cnt = 0; + int ret = 0; + char *uuid_str = NULL; + uuid_t node_uuid = {0,}; + + + VALIDATE_OR_GOTO (frame, out); + VALIDATE_OR_GOTO (frame->local, out); + + local = frame->local; + prev = cookie; + conf = this->private; + + LOCK (&frame->lock); + { + this_call_cnt = --local->call_cnt; + if (op_ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "getxattr err (%s) for dir", + strerror (op_errno)); + local->op_ret = -1; + local->op_errno = op_errno; + goto unlock; + } + + ret = dict_get_str (xattr, local->xsel, &uuid_str); + + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, "Failed to " + "get %s", local->xsel); + local->op_ret = -1; + local->op_errno = EINVAL; + goto unlock; + } + + if (gf_uuid_parse (uuid_str, node_uuid)) { + gf_log (this->name, GF_LOG_ERROR, "Failed to parse uuid" + " failed for %s", prev->this->name); + local->op_ret = -1; + local->op_errno = EINVAL; + goto unlock; + } + + if (gf_uuid_compare (node_uuid, conf->defrag->node_uuid)) { + gf_log (this->name, GF_LOG_DEBUG, "subvol %s does not" + "belong to this node", prev->this->name); + } else { + conf->local_subvols[(conf->local_subvols_cnt)++] + = prev->this; + gf_log (this->name, GF_LOG_DEBUG, "subvol %s belongs to" + " this node", prev->this->name); + } + } + + local->op_ret = 0; + unlock: + UNLOCK (&frame->lock); + + if (!is_last_call (this_call_cnt)) + goto out; + + if (local->op_ret == -1) { + goto unwind; + } + + DHT_STACK_UNWIND (getxattr, frame, 0, 0, NULL, NULL); + goto out; + + unwind: + DHT_STACK_UNWIND (getxattr, frame, -1, local->op_errno, NULL, NULL); + out: + return 0; +} int dht_vgetxattr_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this, @@ -2892,7 +2973,8 @@ dht_getxattr (call_frame_t *frame, xlator_t *this, int op_errno = -1; int i = 0; int cnt = 0; - + char *node_uuid_key = NULL; + int ret = -1; VALIDATE_OR_GOTO (frame, err); VALIDATE_OR_GOTO (this, err); VALIDATE_OR_GOTO (loc, err); @@ -2933,6 +3015,28 @@ dht_getxattr (call_frame_t *frame, xlator_t *this, return 0; } + if (key && DHT_IS_DIR(layout) && + (!strcmp (key, GF_REBAL_FIND_LOCAL_SUBVOL))) { + ret = gf_asprintf + (&node_uuid_key, "%s", GF_XATTR_NODE_UUID_KEY); + if (ret == -1 || !node_uuid_key) { + gf_log (this->name, GF_LOG_ERROR, "Failed to copy key"); + op_errno = ENOMEM; + goto err; + } + (void) strncpy (local->xsel, node_uuid_key, 256); + cnt = local->call_cnt = conf->subvolume_cnt; + for (i = 0; i < cnt; i++) { + STACK_WIND (frame, dht_find_local_subvol_cbk, + conf->subvolumes[i], + conf->subvolumes[i]->fops->getxattr, + loc, node_uuid_key, xdata); + } + if (node_uuid_key) + GF_FREE (node_uuid_key); + return 0; + } + /* for file use cached subvolume (obviously!): see if {} * below * for directory: @@ -2942,6 +3046,7 @@ dht_getxattr (call_frame_t *frame, xlator_t *this, * NOTE: Don't trust inode here, as that may not be valid * (until inode_link() happens) */ + if (key && DHT_IS_DIR(layout) && (XATTR_IS_PATHINFO (key) || (strcmp (key, GF_XATTR_NODE_UUID_KEY) == 0))) { @@ -3831,13 +3936,24 @@ dht_opendir (call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd, goto err; } - local->call_cnt = conf->subvolume_cnt; + if (!(conf->local_subvols_cnt) || !conf->defrag) { + local->call_cnt = conf->subvolume_cnt; - for (i = 0; i < conf->subvolume_cnt; i++) { - STACK_WIND (frame, dht_fd_cbk, - conf->subvolumes[i], - conf->subvolumes[i]->fops->opendir, - loc, fd, xdata); + for (i = 0; i < conf->subvolume_cnt; i++) { + STACK_WIND (frame, dht_fd_cbk, + conf->subvolumes[i], + conf->subvolumes[i]->fops->opendir, + loc, fd, xdata); + + } + } else { + local->call_cnt = conf->local_subvols_cnt; + for (i = 0; i < conf->local_subvols_cnt; i++) { + STACK_WIND (frame, dht_fd_cbk, + conf->local_subvols[i], + conf->local_subvols[i]->fops->opendir, + loc, fd, xdata); + } } return 0; diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index 43f7c0264f5..d5bb6fc61d4 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -293,6 +293,20 @@ struct gf_defrag_pattern_list { gf_defrag_pattern_list_t *next; }; +struct dht_container { + union { + struct list_head list; + struct { + struct _gf_dirent_t *next; + struct _gf_dirent_t *prev; + }; + }; + gf_dirent_t *df_entry; + xlator_t *this; + loc_t *parent_loc; + dict_t *migrate_data; +}; + struct gf_defrag_info_ { uint64_t total_files; uint64_t total_data; @@ -320,6 +334,19 @@ struct gf_defrag_info_ { uint64_t total_files_demoted; int write_freq_threshold; int read_freq_threshold; + + pthread_cond_t parallel_migration_cond; + pthread_mutex_t dfq_mutex; + pthread_cond_t rebalance_crawler_alarm; + int32_t q_entry_count; + int32_t global_error; + struct dht_container *queue; + int32_t crawl_done; + int32_t abort; + int32_t wakeup_crawler; + + /* Hard link handle requirement */ + synclock_t link_lock; }; typedef struct gf_defrag_info_ gf_defrag_info_t; @@ -397,9 +424,19 @@ struct dht_conf { dht_methods_t *methods; struct mem_pool *lock_pool; + + /*local subvol storage for rebalance*/ + xlator_t **local_subvols; + int32_t local_subvols_cnt; }; typedef struct dht_conf dht_conf_t; +struct dht_dfoffset_ctx { + xlator_t *this; + off_t offset; + int32_t readdir_done; +}; +typedef struct dht_dfoffset_ctx dht_dfoffset_ctx_t; struct dht_disk_layout { uint32_t cnt; @@ -423,6 +460,14 @@ typedef enum { GF_DHT_WEIGHTED_DISTRIBUTION } dht_distribution_type_t; +struct dir_dfmeta { + gf_dirent_t *equeue; + dht_dfoffset_ctx_t *offset_var; + struct list_head **head; + struct list_head **iterator; + int *fetch_entries; +}; + #define ENTRY_MISSING(op_ret, op_errno) (op_ret == -1 && op_errno == ENOENT) #define is_revalidate(loc) (dht_inode_ctx_layout_get (loc->inode, this, NULL) == 0) @@ -608,6 +653,8 @@ int dht_start_rebalance_task (xlator_t *this, call_frame_t *frame); int dht_rebalance_in_progress_check (xlator_t *this, call_frame_t *frame); int dht_rebalance_complete_check (xlator_t *this, call_frame_t *frame); +int +dht_init_local_subvolumes (xlator_t *this, dht_conf_t *conf); /* FOPS */ int32_t dht_lookup (call_frame_t *frame, diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c index cab66017b84..b5114b620ce 100644 --- a/xlators/cluster/dht/src/dht-helper.c +++ b/xlators/cluster/dht/src/dht-helper.c @@ -731,7 +731,28 @@ err: return -1; } +int +dht_init_local_subvolumes (xlator_t *this, dht_conf_t *conf) +{ + xlator_list_t *subvols = NULL; + int cnt = 0; + if (!conf) + return -1; + + for (subvols = this->children; subvols; subvols = subvols->next) + cnt++; + + conf->local_subvols = GF_CALLOC (cnt, sizeof (xlator_t *), + gf_dht_mt_xlator_t); + if (!conf->local_subvols) { + return -1; + } + + conf->local_subvols_cnt = 0; + + return 0; +} int dht_init_subvolumes (xlator_t *this, dht_conf_t *conf) @@ -752,6 +773,8 @@ dht_init_subvolumes (xlator_t *this, dht_conf_t *conf) } conf->subvolume_cnt = cnt; + conf->local_subvols_cnt = 0; + dht_set_subvol_range(this); cnt = 0; diff --git a/xlators/cluster/dht/src/dht-mem-types.h b/xlators/cluster/dht/src/dht-mem-types.h index e893eb48fd8..46028e6d9e0 100644 --- a/xlators/cluster/dht/src/dht-mem-types.h +++ b/xlators/cluster/dht/src/dht-mem-types.h @@ -30,6 +30,9 @@ enum gf_dht_mem_types_ { gf_defrag_info_mt, gf_dht_mt_inode_ctx_t, gf_dht_mt_ctx_stat_time_t, + gf_dht_mt_dirent_t, + gf_dht_mt_container_t, + gf_dht_mt_octx_t, gf_dht_mt_end }; #endif diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index 206628208f5..52f91946240 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -23,7 +23,25 @@ #define GF_DISK_SECTOR_SIZE 512 #define DHT_REBALANCE_PID 4242 /* Change it if required */ #define DHT_REBALANCE_BLKSIZE (128 * 1024) +#define MAX_MIGRATOR_THREAD_COUNT 20 +#define MAX_MIGRATE_QUEUE_COUNT 500 +#define MIN_MIGRATE_QUEUE_COUNT 200 +#define GF_CRAWL_INDEX_MOVE(idx, sv_cnt) { \ + idx++; \ + idx %= sv_cnt; \ + } + +void +dht_set_global_defrag_error (gf_defrag_info_t *defrag, int ret) +{ + LOCK (&defrag->lock); + { + defrag->global_error = ret; + } + UNLOCK (&defrag->lock); + return; +} static int dht_write_with_holes (xlator_t *to, fd_t *fd, struct iovec *vec, int count, int32_t size, off_t offset, struct iobref *iobref) @@ -178,6 +196,47 @@ gf_defrag_handle_hardlink (xlator_t *this, loc_t *loc, dict_t *xattrs, goto out; } + /* + Parallel migration can lead to migration of the hard link multiple + times which can lead to data loss. Hence, adding a fresh lookup to + decide whether migration is required or not. + + Elaborating the scenario for let say 10 hardlinks [link{1..10}]: + Let say the first hard link "link1" does the setxattr of the + new hashed subvolume info on the cached file. As there are multiple + threads working, we might have already all the links created on the + new hashed by the time we reach hardlink let say link5. Now the + number of links on hashed is equal to that of cached. Hence, file + migration will happen for link6. + + Cached Hashed + --------T link6 rwxrwxrwx link6 + + Now post above state all the link file on the cached will be zero + byte linkto files. Hence, if we still do migration for the following + files link{7..10}, we will end up migrating 0 data leading to data + loss. + Hence, a lookup can make sure whether we need to migrate the + file or not. + */ + + ret = syncop_lookup (this, loc, NULL, NULL, + NULL, NULL); + if (ret) { + /*Ignore ENOENT and ESTALE as file might have been + migrated already*/ + if (-ret == ENOENT || -ret == ESTALE) { + ret = -2; + goto out; + } + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:%s lookup failed with ret = %d", + loc->path, ret); + ret = -1; + goto out; + } + cached_subvol = dht_subvol_get_cached (this, loc->inode); if (!cached_subvol) { gf_msg (this->name, GF_LOG_ERROR, 0, @@ -198,6 +257,11 @@ gf_defrag_handle_hardlink (xlator_t *this, loc_t *loc, dict_t *xattrs, goto out; } + if (hashed_subvol == cached_subvol) { + ret = -2; + goto out; + } + gf_log (this->name, GF_LOG_INFO, "Attempting to migrate hardlink %s " "with gfid %s from %s -> %s", loc->name, uuid_utoa (loc->gfid), cached_subvol->name, hashed_subvol->name); @@ -288,7 +352,8 @@ out: */ static inline int __is_file_migratable (xlator_t *this, loc_t *loc, - struct iatt *stbuf, dict_t *xattrs, int flags) + struct iatt *stbuf, dict_t *xattrs, int flags, + gf_defrag_info_t *defrag) { int ret = -1; @@ -308,13 +373,14 @@ __is_file_migratable (xlator_t *this, loc_t *loc, if (stbuf->ia_nlink > 1) { /* support for decomission */ if (flags == GF_DHT_MIGRATE_HARDLINK) { - ret = gf_defrag_handle_hardlink (this, loc, - xattrs, stbuf); - - /* - Returning zero will force the file to be remigrated. - Checkout gf_defrag_handle_hardlink for more information. - */ + synclock_lock (&defrag->link_lock); + ret = gf_defrag_handle_hardlink + (this, loc, xattrs, stbuf); + synclock_unlock (&defrag->link_lock); + /* + Returning zero will force the file to be remigrated. + Checkout gf_defrag_handle_hardlink for more information. + */ if (ret && ret != -2) { gf_msg (this->name, GF_LOG_WARNING, 0, DHT_MSG_MIGRATE_FILE_FAILED, @@ -610,6 +676,7 @@ __dht_rebalance_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst while (total < ia_size) { read_size = (((ia_size - total) > DHT_REBALANCE_BLKSIZE) ? DHT_REBALANCE_BLKSIZE : (ia_size - total)); + ret = syncop_readv (from, src, read_size, offset, 0, &vector, &count, &iobref, NULL, NULL); @@ -904,6 +971,11 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to, loc_t tmp_loc = {0, }; gf_boolean_t locked = _gf_false; int lk_ret = -1; + gf_defrag_info_t *defrag = NULL; + + defrag = conf->defrag; + if (!defrag) + goto out; gf_log (this->name, GF_LOG_INFO, "%s: attempting to move from %s to %s", loc->path, from->name, to->name); @@ -960,7 +1032,7 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to, src_ia_prot = stbuf.ia_prot; /* Check if file can be migrated */ - ret = __is_file_migratable (this, loc, &stbuf, xattr_rsp, flag); + ret = __is_file_migratable (this, loc, &stbuf, xattr_rsp, flag, defrag); if (ret) { if (ret == -2) ret = 0; @@ -1295,6 +1367,7 @@ dht_start_rebalance_task (xlator_t *this, call_frame_t *frame) { int ret = -1; + frame->root->pid = GF_CLIENT_PID_DEFRAG; ret = synctask_new (this->ctx->env, rebalance_task, rebalance_task_completion, frame, frame); @@ -1406,61 +1479,341 @@ gf_defrag_pattern_match (gf_defrag_info_t *defrag, char *name, uint64_t size) return ret; } -/* We do a depth first traversal of directories. But before we move into - * subdirs, we complete the data migration of those directories whose layouts - * have been fixed - */ +int dht_dfreaddirp_done (dht_dfoffset_ctx_t *offset_var, int cnt) { + + int i; + int result = 1; + + for (i = 0; i < cnt; i++) { + if (offset_var[i].readdir_done == 0) { + result = 0; + break; + } + } + return result; +} + +int static +gf_defrag_ctx_subvols_init (dht_dfoffset_ctx_t *offset_var, xlator_t *this) { + + int i; + dht_conf_t *conf = NULL; + + conf = this->private; + + if (!conf) + return -1; + + for (i = 0; i < conf->local_subvols_cnt; i++) { + offset_var[i].this = conf->local_subvols[i]; + offset_var[i].offset = (off_t) 0; + offset_var[i].readdir_done = 0; + } + + return 0; +} int -gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, - dict_t *migrate_data) +gf_defrag_migrate_single_file (void *opaque) { - int ret = -1; - loc_t entry_loc = {0,}; - fd_t *fd = NULL; - gf_dirent_t entries; - gf_dirent_t *tmp = NULL; + xlator_t *this = NULL; + dht_conf_t *conf = NULL; + gf_defrag_info_t *defrag = NULL; + int ret = 0; gf_dirent_t *entry = NULL; - gf_boolean_t free_entries = _gf_false; - off_t offset = 0; - dict_t *dict = NULL; + struct timeval start = {0,}; + loc_t entry_loc = {0,}; + loc_t *loc = NULL; struct iatt iatt = {0,}; + dict_t *migrate_data = NULL; int32_t op_errno = 0; - char *uuid_str = NULL; - uuid_t node_uuid = {0,}; - struct timeval dir_start = {0,}; struct timeval end = {0,}; double elapsed = {0,}; - struct timeval start = {0,}; - int loglevel = GF_LOG_TRACE; + struct dht_container *rebal_entry = NULL; - gf_log (this->name, GF_LOG_INFO, "migrate data called on %s", - loc->path); - gettimeofday (&dir_start, NULL); + rebal_entry = (struct dht_container *)opaque; + if (!rebal_entry) { + gf_log (this->name, GF_LOG_ERROR, "rebal_entry is NULL"); + ret = -1; + goto out; + } - fd = fd_create (loc->inode, defrag->pid); - if (!fd) { - gf_log (this->name, GF_LOG_ERROR, "Failed to create fd"); + this = rebal_entry->this; + + conf = this->private; + + defrag = conf->defrag; + + loc = rebal_entry->parent_loc; + + migrate_data = rebal_entry->migrate_data; + + entry = rebal_entry->df_entry; + + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + ret = -1; goto out; } - ret = syncop_opendir (this, loc, fd, NULL, NULL); + if (defrag->stats == _gf_true) { + gettimeofday (&start, NULL); + } + + if (defrag->defrag_pattern && + (gf_defrag_pattern_match (defrag, entry->d_name, + entry->d_stat.ia_size) == _gf_false)) { + gf_log (this->name, GF_LOG_ERROR, "pattern_match failed"); + goto out; + } + + memset (&entry_loc, 0, sizeof (entry_loc)); + + ret = dht_build_child_loc (this, &entry_loc, loc, entry->d_name); + if (ret) { + LOCK (&defrag->lock); + { + defrag->total_failures += 1; + } + UNLOCK (&defrag->lock); + + ret = 0; + + gf_log (this->name, GF_LOG_ERROR, "Child loc build failed"); + + goto out; + } + + gf_uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid); + + gf_uuid_copy (entry_loc.pargfid, loc->gfid); + + entry_loc.inode->ia_type = entry->d_stat.ia_type; + + ret = syncop_lookup (this, &entry_loc, &iatt, NULL, NULL, NULL); if (ret) { gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_MIGRATE_DATA_FAILED, - "Migrate data failed: Failed to open dir %s", - loc->path); - ret = -1; + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed: %s lookup failed", + entry_loc.name); + ret = 0; goto out; } - INIT_LIST_HEAD (&entries.list); + ret = syncop_setxattr (this, &entry_loc, migrate_data, 0, NULL, NULL); + if (ret < 0) { + op_errno = -ret; + /* errno is overloaded. See + * rebalance_task_completion () */ + if (op_errno == ENOSPC) { + gf_msg_debug (this->name, 0, "migrate-data skipped for" + " %s due to space constraints", + entry_loc.path); + LOCK (&defrag->lock); + { + defrag->skipped += 1; + } + UNLOCK (&defrag->lock); + } else if (op_errno != EEXIST) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "migrate-data failed for %s", entry_loc.path); - while ((ret = syncop_readdirp (this, fd, 131072, offset, &entries, - NULL, NULL)) != 0) { + LOCK (&defrag->lock); + { + defrag->total_failures += 1; + } + UNLOCK (&defrag->lock); - if (ret < 0) { + } + + ret = gf_defrag_handle_migrate_error (op_errno, defrag); + + if (!ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "migrate-data on %s failed: %s", entry_loc.path, + strerror (op_errno)); + } else if (ret == 1) { + ret = 0; + goto out; + } else if (ret == -1) { + goto out; + } + } else if (ret > 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "migrate-data failed for %s", entry_loc.path); + ret = 0; + LOCK (&defrag->lock); + { + defrag->total_failures += 1; + } + UNLOCK (&defrag->lock); + } + + LOCK (&defrag->lock); + { + defrag->total_files += 1; + defrag->total_data += iatt.ia_size; + } + UNLOCK (&defrag->lock); + + if (defrag->stats == _gf_true) { + gettimeofday (&end, NULL); + elapsed = (end.tv_sec - start.tv_sec) * 1e6 + + (end.tv_usec - start.tv_usec); + gf_log (this->name, GF_LOG_INFO, "Migration of " + "file:%s size:%"PRIu64" bytes took %.2f" + "secs and ret: %d", entry_loc.name, + iatt.ia_size, elapsed/1e6, ret); + } + +out: + loc_wipe (&entry_loc); + + return ret; + +} + +void * +gf_defrag_task (void *opaque) +{ + struct list_head *q_head = NULL; + struct dht_container *iterator = NULL; + gf_defrag_info_t *defrag = NULL; + int ret = 0; + gf_boolean_t true = _gf_true; + + + defrag = (gf_defrag_info_t *)opaque; + if (!defrag) { + gf_msg ("dht", GF_LOG_ERROR, 0, 0, "defrag is NULL"); + goto out; + } + + q_head = &(defrag->queue[0].list); + + /* The following while loop will dequeue one entry from the defrag->queue + under lock. We will update the defrag->global_error only when there + is an error which is critical to stop the rebalance process. The stop + message will be intimated to other migrator threads by setting the + defrag->defrag_status to GF_DEFRAG_STATUS_FAILED. + + In defrag->queue, a low watermark (MIN_MIGRATE_QUEUE_COUNT) is + maintained so that crawler does not starve the file migration + workers and a high watermark (MAX_MIGRATE_QUEUE_COUNT) so that + crawler does not go far ahead in filling up the queue. + */ + while (true) { + + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + goto out; + } + + pthread_mutex_lock (&defrag->dfq_mutex); + { + if (defrag->q_entry_count) { + iterator = list_entry (q_head->next, + typeof(*iterator), list); + + gf_log ("DHT", GF_LOG_DEBUG, "picking entry " + "%s", iterator->df_entry->d_name); + + list_del_init (&(iterator->list)); + + defrag->q_entry_count--; + + if ((defrag->q_entry_count < + MIN_MIGRATE_QUEUE_COUNT) && + defrag->wakeup_crawler) { + pthread_cond_broadcast ( + &defrag->rebalance_crawler_alarm); + } + pthread_mutex_unlock (&defrag->dfq_mutex); + ret = gf_defrag_migrate_single_file + ((void *)iterator); + + /*Critical errors: ENOTCONN and ENOSPACE*/ + if (ret) { + dht_set_global_defrag_error + (defrag, ret); + + defrag->defrag_status = + GF_DEFRAG_STATUS_FAILED; + goto out; + } + + gf_dirent_free (iterator->df_entry); + GF_FREE (iterator); + continue; + } else { + + /* defrag->crawl_done flag is set means crawling + file system is done and hence a list_empty when + the above flag is set indicates there are no more + entries to be added to the queue and rebalance is + finished */ + + if (!defrag->crawl_done) { + pthread_cond_wait ( + &defrag->parallel_migration_cond, + &defrag->dfq_mutex); + } + + if (defrag->crawl_done && + !defrag->q_entry_count) { + pthread_cond_broadcast ( + &defrag->parallel_migration_cond); + goto unlock; + } else { + pthread_mutex_unlock + (&defrag->dfq_mutex); + continue; + } + } + + } +unlock: + pthread_mutex_unlock (&defrag->dfq_mutex); + break; + } +out: + return NULL; +} + +int static +gf_defrag_get_entry (xlator_t *this, int i, struct dht_container **container, + loc_t *loc, dht_conf_t *conf, gf_defrag_info_t *defrag, + fd_t *fd, dict_t *migrate_data, + struct dir_dfmeta *dir_dfmeta, dict_t *xattr_req) +{ + int ret = -1; + char is_linkfile = 0; + gf_dirent_t *df_entry = NULL; + loc_t entry_loc = {0,}; + dict_t *xattr_rsp = NULL; + struct iatt iatt = {0,}; + struct dht_container *tmp_container = NULL; + xlator_t *hashed_subvol = NULL; + xlator_t *cached_subvol = NULL; + + if (dir_dfmeta->offset_var[i].readdir_done == 1) { + ret = 0; + goto out; + } + if (dir_dfmeta->fetch_entries[i] == 1) { + ret = syncop_readdirp (conf->local_subvols[i], fd, 131072, + dir_dfmeta->offset_var[i].offset, + &(dir_dfmeta->equeue[i]), + NULL, NULL); + if (ret == 0) { + dir_dfmeta->offset_var[i].readdir_done = 1; + ret = 0; + goto out; + } + + if (ret < 0) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_MIGRATE_DATA_FAILED, "%s: Migrate data failed: Readdir returned" @@ -1470,213 +1823,426 @@ gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, goto out; } - if (list_empty (&entries.list)) - break; + if (list_empty (&(dir_dfmeta->equeue[i].list))) { + dir_dfmeta->offset_var[i].readdir_done = 1; + ret = 0; + goto out; + } - free_entries = _gf_true; + dir_dfmeta->fetch_entries[i] = 0; + } - list_for_each_entry_safe (entry, tmp, &entries.list, list) { + while (1) { - if (dict) { - dict_unref (dict); - dict = NULL; - } + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + ret = -1; + goto out; + } - if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { - ret = 1; - goto out; - } + df_entry = list_entry (dir_dfmeta->iterator[i]->next, + typeof (*df_entry), list); - offset = entry->d_off; + if (&df_entry->list == dir_dfmeta->head[i]) { + gf_dirent_free (&(dir_dfmeta->equeue[i])); + INIT_LIST_HEAD (&(dir_dfmeta->equeue[i].list)); + dir_dfmeta->fetch_entries[i] = 1; + dir_dfmeta->iterator[i] = dir_dfmeta->head[i]; + ret = 0; + goto out; + } - if (!strcmp (entry->d_name, ".") || - !strcmp (entry->d_name, "..")) - continue; + dir_dfmeta->iterator[i] = dir_dfmeta->iterator[i]->next; - if (IA_ISDIR (entry->d_stat.ia_type)) - continue; + dir_dfmeta->offset_var[i].offset = df_entry->d_off; + if (!strcmp (df_entry->d_name, ".") || + !strcmp (df_entry->d_name, "..")) + continue; - defrag->num_files_lookedup++; - if (defrag->stats == _gf_true) { - gettimeofday (&start, NULL); - } + if (IA_ISDIR (df_entry->d_stat.ia_type)) + continue; - if (defrag->defrag_pattern && - (gf_defrag_pattern_match (defrag, entry->d_name, - entry->d_stat.ia_size) - == _gf_false)) { - continue; - } + defrag->num_files_lookedup++; - loc_wipe (&entry_loc); - ret =dht_build_child_loc (this, &entry_loc, loc, - entry->d_name); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Child loc" - " build failed"); - goto out; - } + if (defrag->defrag_pattern && + (gf_defrag_pattern_match (defrag, df_entry->d_name, + df_entry->d_stat.ia_size) + == _gf_false)) { + continue; + } - if (gf_uuid_is_null (entry->d_stat.ia_gfid)) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_GFID_NULL, - "%s/%s gfid not present", loc->path, - entry->d_name); - continue; - } + loc_wipe (&entry_loc); + ret = dht_build_child_loc (this, &entry_loc, loc, + df_entry->d_name); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "Child loc" + " build failed"); + ret = -1; + goto out; + } - gf_uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid); + if (gf_uuid_is_null (df_entry->d_stat.ia_gfid)) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_GFID_NULL, + "%s/%s gfid not present", loc->path, + df_entry->d_name); + continue; + } - if (gf_uuid_is_null (loc->gfid)) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_GFID_NULL, - "%s/%s gfid not present", loc->path, - entry->d_name); - continue; - } + gf_uuid_copy (entry_loc.gfid, df_entry->d_stat.ia_gfid); - gf_uuid_copy (entry_loc.pargfid, loc->gfid); + if (gf_uuid_is_null (loc->gfid)) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_GFID_NULL, + "%s/%s gfid not present", loc->path, + df_entry->d_name); + continue; + } - entry_loc.inode->ia_type = entry->d_stat.ia_type; + gf_uuid_copy (entry_loc.pargfid, loc->gfid); - ret = syncop_lookup (this, &entry_loc, &iatt, NULL, - NULL, NULL); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_MIGRATE_FILE_FAILED, - "Migrate file failed:%s lookup failed", - entry_loc.path); - ret = -1; - continue; - } + entry_loc.inode->ia_type = df_entry->d_stat.ia_type; + ret = syncop_lookup (conf->local_subvols[i], &entry_loc, + &iatt, NULL, xattr_req, &xattr_rsp); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:%s lookup failed", + entry_loc.path); + continue; + } - ret = syncop_getxattr (this, &entry_loc, &dict, - GF_XATTR_NODE_UUID_KEY, NULL, - NULL); - if(ret < 0) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_MIGRATE_FILE_FAILED, - "Migrate file failed:" - "Failed to get node-uuid for %s", - entry_loc.path); - ret = -1; - continue; - } - ret = dict_get_str (dict, GF_XATTR_NODE_UUID_KEY, - &uuid_str); - if(ret < 0) { - gf_log (this->name, GF_LOG_ERROR, "Failed to " - "get node-uuid from dict for %s", - entry_loc.path); - ret = -1; - continue; - } + is_linkfile = check_is_linkfile (NULL, &iatt, xattr_rsp, + conf->link_xattr_name); - if (gf_uuid_parse (uuid_str, node_uuid)) { - gf_log (this->name, GF_LOG_ERROR, "gf_uuid_parse " - "failed for %s", entry_loc.path); - continue; - } + if (is_linkfile) { + /* No need to add linkto file to the queue for + migration. Only the actual data file need to + be checked for migration criteria. + */ + gf_log (this->name, GF_LOG_INFO, "linkfile." + " Hence skip for file: %s", entry_loc.path); + continue; + } - /* if file belongs to different node, skip migration - * the other node will take responsibility of migration - */ - if (gf_uuid_compare (node_uuid, defrag->node_uuid)) { - gf_msg_trace (this->name, 0, "%s does not" - "belong to this node", - entry_loc.path); - continue; - } - uuid_str = NULL; + ret = syncop_lookup (this, &entry_loc, NULL, NULL, + NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_FILE_FAILED, + "Migrate file failed:%s lookup failed", + entry_loc.path); + continue; + } - /* if distribute is present, it will honor this key. - * -1, ENODATA is returned if distribute is not present - * or file doesn't have a link-file. If file has - * link-file, the path of link-file will be the value, - * and also that guarantees that file has to be mostly - * migrated */ + /* if distribute is present, it will honor this key. + * -1, ENODATA is returned if distribute is not present + * or file doesn't have a link-file. If file has + * link-file, the path of link-file will be the value, + * and also that guarantees that file has to be mostly + * migrated */ - ret = syncop_getxattr (this, &entry_loc, NULL, - GF_XATTR_LINKINFO_KEY, NULL, - NULL); - if (ret < 0) { - if (-ret != ENODATA) { - loglevel = GF_LOG_ERROR; - defrag->total_failures += 1; - } else { - loglevel = GF_LOG_TRACE; - } - gf_log (this->name, loglevel, "%s: failed to " - "get "GF_XATTR_LINKINFO_KEY" key - %s", - entry_loc.path, strerror (-ret)); - ret = -1; - continue; + hashed_subvol = dht_subvol_get_hashed (this, &entry_loc); + if (!hashed_subvol) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_HASHED_SUBVOL_GET_FAILED, + "Failed to get hashed subvol for %s", + loc->path); + continue; + } + + cached_subvol = dht_subvol_get_cached (this, entry_loc.inode); + if (!cached_subvol) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_CACHED_SUBVOL_GET_FAILED, + "Failed to get cached subvol for %s", + loc->path); + + continue; + } + + if (hashed_subvol == cached_subvol) { + continue; + } + + /*Build Container Structure */ + + tmp_container = GF_CALLOC (1, sizeof(struct dht_container), + gf_dht_mt_container_t); + if (!tmp_container) { + gf_log (this->name, GF_LOG_ERROR, "Failed to allocate " + "memory for container"); + ret = -1; + goto out; + } + tmp_container->df_entry = gf_dirent_for_name (df_entry->d_name); + if (!tmp_container->df_entry) { + gf_log (this->name, GF_LOG_ERROR, "Failed to allocate " + "memory for df_entry"); + ret = -1; + goto out; + } + + tmp_container->df_entry->d_stat = df_entry->d_stat; + + tmp_container->df_entry->d_ino = df_entry->d_ino; + + tmp_container->df_entry->d_type = df_entry->d_type; + + tmp_container->df_entry->d_len = df_entry->d_len; + + tmp_container->parent_loc = GF_CALLOC(1, sizeof(*loc), + gf_dht_mt_loc_t); + if (!tmp_container->parent_loc) { + gf_log (this->name, GF_LOG_ERROR, "Failed to allocate " + "memory for loc"); + ret = -1; + goto out; + } + + + ret = loc_copy (tmp_container->parent_loc, loc); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "loc_copy failed"); + ret = -1; + goto out; + } + + tmp_container->migrate_data = migrate_data; + + tmp_container->this = this; + + if (df_entry->dict) + tmp_container->df_entry->dict = + dict_ref (df_entry->dict); + + /*Build Container Structue >> END*/ + + ret = 0; + goto out; + + } + +out: + if (ret == 0) { + *container = tmp_container; + } else { + GF_FREE (tmp_container->parent_loc); + GF_FREE (tmp_container); + } + + if (xattr_rsp) + dict_unref (xattr_rsp); + return ret; +} + +int +gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, + dict_t *migrate_data) +{ + int ret = -1; + fd_t *fd = NULL; + dht_conf_t *conf = NULL; + gf_dirent_t entries; + dict_t *dict = NULL; + dict_t *xattr_req = NULL; + struct timeval dir_start = {0,}; + struct timeval end = {0,}; + double elapsed = {0,}; + int local_subvols_cnt = 0; + int i = 0; + struct dht_container *container = NULL; + int ldfq_count = 0; + int dfc_index = 0; + struct dir_dfmeta *dir_dfmeta = NULL; + + gf_log (this->name, GF_LOG_INFO, "migrate data called on %s", + loc->path); + gettimeofday (&dir_start, NULL); + + conf = this->private; + local_subvols_cnt = conf->local_subvols_cnt; + + if (!local_subvols_cnt) { + ret = 0; + goto out; + } + + fd = fd_create (loc->inode, defrag->pid); + if (!fd) { + gf_log (this->name, GF_LOG_ERROR, "Failed to create fd"); + ret = -1; + goto out; + } + + ret = syncop_opendir (this, loc, fd, NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_MIGRATE_DATA_FAILED, + "Migrate data failed: Failed to open dir %s", + loc->path); + ret = -1; + goto out; + } + + dir_dfmeta = GF_CALLOC (1, sizeof (*dir_dfmeta), + gf_common_mt_pointer); + if (!dir_dfmeta) { + gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta is NULL"); + ret = -1; + goto out; + } + + + dir_dfmeta->head = GF_CALLOC (local_subvols_cnt, + sizeof (*(dir_dfmeta->head)), + gf_common_mt_pointer); + if (!dir_dfmeta->head) { + gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta->head is NULL"); + ret = -1; + goto out; + } + + dir_dfmeta->iterator = GF_CALLOC (local_subvols_cnt, + sizeof (*(dir_dfmeta->iterator)), + gf_common_mt_pointer); + if (!dir_dfmeta->iterator) { + gf_log (this->name, GF_LOG_ERROR, + "dir_dfmeta->iterator is NULL"); + ret = -1; + goto out; + } + + dir_dfmeta->equeue = GF_CALLOC (local_subvols_cnt, sizeof (entries), + gf_dht_mt_dirent_t); + if (!dir_dfmeta->equeue) { + gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta->equeue is NULL"); + ret = -1; + goto out; + } + + dir_dfmeta->offset_var = GF_CALLOC (local_subvols_cnt, + sizeof (dht_dfoffset_ctx_t), + gf_dht_mt_octx_t); + if (!dir_dfmeta->offset_var) { + gf_log (this->name, GF_LOG_ERROR, + "dir_dfmeta->offset_var is NULL"); + ret = -1; + goto out; + } + ret = gf_defrag_ctx_subvols_init (dir_dfmeta->offset_var, this); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "dht_dfoffset_ctx_t" + "initialization failed"); + ret = -1; + goto out; + } + + dir_dfmeta->fetch_entries = GF_CALLOC (local_subvols_cnt, + sizeof (int), gf_common_mt_int); + if (!dir_dfmeta->fetch_entries) { + gf_log (this->name, GF_LOG_ERROR, + "dir_dfmeta->fetch_entries is NULL"); + ret = -1; + goto out; + } + + for (i = 0; i < local_subvols_cnt ; i++) { + INIT_LIST_HEAD (&(dir_dfmeta->equeue[i].list)); + dir_dfmeta->head[i] = &(dir_dfmeta->equeue[i].list); + dir_dfmeta->iterator[i] = dir_dfmeta->head[i]; + dir_dfmeta->fetch_entries[i] = 1; + } + + xattr_req = dict_new (); + if (!xattr_req) { + gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); + ret = -1; + goto out; + } + + ret = dict_set_uint32 (xattr_req, + conf->link_xattr_name, 256); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "failed to set dict for " + "key: %s", conf->link_xattr_name); + ret = -1; + goto out; + } + + /* + Job: Read entries from each local subvol and store the entries + in equeue array of linked list. Now pick one entry from the + equeue array in a round robin basis and add them to defrag Queue. + */ + + while (!dht_dfreaddirp_done(dir_dfmeta->offset_var, + local_subvols_cnt)) { + + pthread_mutex_lock (&defrag->dfq_mutex); + { + while (defrag->q_entry_count > + MAX_MIGRATE_QUEUE_COUNT) { + defrag->wakeup_crawler = 1; + pthread_cond_wait ( + &defrag->rebalance_crawler_alarm, + &defrag->dfq_mutex); } - ret = syncop_setxattr (this, &entry_loc, migrate_data, - 0, NULL, NULL); - if (ret < 0) { - op_errno = -ret; - /* errno is overloaded. See - * rebalance_task_completion () */ - if (op_errno == ENOSPC) { - gf_msg_debug (this->name, 0, - "migrate-data skipped for" - " %s due to space " - "constraints", - entry_loc.path); - defrag->skipped +=1; - } else{ - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_MIGRATE_FILE_FAILED, - "migrate-data failed for %s", - entry_loc.path); - defrag->total_failures +=1; - } + ldfq_count = defrag->q_entry_count; - ret = gf_defrag_handle_migrate_error (op_errno, - defrag); + if (defrag->wakeup_crawler) { + defrag->wakeup_crawler = 0; + } - if (!ret) - gf_msg_debug (this->name, 0, - "migrate-data on %s " - "failed: %s", - entry_loc.path, - strerror (op_errno)); - else if (ret == 1) - continue; - else if (ret == -1) - goto out; - } else if (ret > 0) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_MIGRATE_FILE_FAILED, - "migrate-data failed for %s", - entry_loc.path); - defrag->total_failures +=1; + } + pthread_mutex_unlock (&defrag->dfq_mutex); + + while (ldfq_count <= MAX_MIGRATE_QUEUE_COUNT && + !dht_dfreaddirp_done(dir_dfmeta->offset_var, + local_subvols_cnt)) { + + ret = gf_defrag_get_entry (this, dfc_index, &container, + loc, conf, defrag, fd, + migrate_data, dir_dfmeta, + xattr_req); + if (ret) { + gf_log ("DHT", GF_LOG_INFO, "Found critical " + "error from gf_defrag_get_entry"); + ret = -1; + goto out; } - LOCK (&defrag->lock); - { - defrag->total_files += 1; - defrag->total_data += iatt.ia_size; + /* Check if we got an entry, else we need to move the + index to the next subvol */ + if (!container) { + GF_CRAWL_INDEX_MOVE(dfc_index, + local_subvols_cnt); + continue; } - UNLOCK (&defrag->lock); - if (defrag->stats == _gf_true) { - gettimeofday (&end, NULL); - elapsed = (end.tv_sec - start.tv_sec) * 1e6 + - (end.tv_usec - start.tv_usec); - gf_log (this->name, GF_LOG_INFO, "Migration of " - "file:%s size:%"PRIu64" bytes took %.2f" - "secs", entry_loc.path, iatt.ia_size, - elapsed/1e6); + + /* Q this entry in the dfq */ + pthread_mutex_lock (&defrag->dfq_mutex); + { + list_add_tail (&container->list, + &(defrag->queue[0].list)); + defrag->q_entry_count++; + ldfq_count = defrag->q_entry_count; + + gf_log (this->name, GF_LOG_DEBUG, "added " + "file:%s parent:%s to the queue ", + container->df_entry->d_name, + container->parent_loc->path); + + pthread_cond_signal ( + &defrag->parallel_migration_cond); } - } + pthread_mutex_unlock (&defrag->dfq_mutex); - gf_dirent_free (&entries); - free_entries = _gf_false; - INIT_LIST_HEAD (&entries.list); + GF_CRAWL_INDEX_MOVE(dfc_index, local_subvols_cnt); + } } gettimeofday (&end, NULL); @@ -1686,20 +2252,25 @@ gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, "%.2f secs", loc->path, elapsed/1e6); ret = 0; out: - if (free_entries) - gf_dirent_free (&entries); - loc_wipe (&entry_loc); + GF_FREE (dir_dfmeta->head); + GF_FREE (dir_dfmeta->equeue); + GF_FREE (dir_dfmeta->iterator); + GF_FREE (dir_dfmeta->offset_var); + GF_FREE (dir_dfmeta->fetch_entries); + GF_FREE (dir_dfmeta); if (dict) dict_unref(dict); + if (xattr_req) + dict_unref(xattr_req); + if (fd) fd_unref (fd); return ret; } - int gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, dict_t *fix_layout, dict_t *migrate_data) @@ -1725,7 +2296,7 @@ gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) && (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX)) { - ret = gf_defrag_migrate_data (this, defrag, loc, migrate_data); + ret = gf_defrag_process_dir (this, defrag, loc, migrate_data); if (ret) goto out; } @@ -1877,18 +2448,24 @@ out: int gf_defrag_start_crawl (void *data) { - xlator_t *this = NULL; - dht_conf_t *conf = NULL; - gf_defrag_info_t *defrag = NULL; - int ret = -1; - loc_t loc = {0,}; - struct iatt iatt = {0,}; - struct iatt parent = {0,}; - dict_t *fix_layout = NULL; - dict_t *migrate_data = NULL; - dict_t *status = NULL; - glusterfs_ctx_t *ctx = NULL; - dht_methods_t *methods = NULL; + xlator_t *this = NULL; + dht_conf_t *conf = NULL; + gf_defrag_info_t *defrag = NULL; + int ret = -1; + loc_t loc = {0,}; + struct iatt iatt = {0,}; + struct iatt parent = {0,}; + dict_t *fix_layout = NULL; + dict_t *migrate_data = NULL; + dict_t *status = NULL; + dict_t *dict = NULL; + glusterfs_ctx_t *ctx = NULL; + dht_methods_t *methods = NULL; + int i = 0; + int thread_index = 0; + int err = 0; + int thread_status = 0; + pthread_t tid[MAX_MIGRATOR_THREAD_COUNT]; this = data; if (!this) @@ -1938,7 +2515,7 @@ gf_defrag_start_crawl (void *data) "Failed to start rebalance:" "Failed to set dictionary value: key = %s", GF_XATTR_FIX_LAYOUT_KEY); - + ret = -1; goto out; } @@ -1970,6 +2547,53 @@ gf_defrag_start_crawl (void *data) "non-force"); if (ret) goto out; + + /* Find local subvolumes */ + ret = syncop_getxattr (this, &loc, &dict, + GF_REBAL_FIND_LOCAL_SUBVOL, + NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local " + "subvolume determination failed with error: %d", + -ret); + ret = -1; + goto out; + } + + for (i = 0 ; i < conf->local_subvols_cnt; i++) { + gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvols " + "are %s", conf->local_subvols[i]->name); + } + + /* Initialize global entry queue */ + defrag->queue = GF_CALLOC (1, sizeof (struct dht_container), + gf_dht_mt_container_t); + + if (!defrag->queue) { + gf_log (this->name, GF_LOG_INFO, "No memory for queue"); + ret = -1; + goto out; + } + + INIT_LIST_HEAD (&(defrag->queue[0].list)); + + /*Spawn Threads Here*/ + while (thread_index < MAX_MIGRATOR_THREAD_COUNT) { + err = pthread_create(&(tid[thread_index]), NULL, + &gf_defrag_task, (void *)defrag); + if (err != 0) { + gf_log ("DHT", GF_LOG_ERROR, + "Thread[%d] creation failed. " + "Aborting Rebalance", + thread_index); + ret = -1; + goto out; + } else { + gf_log ("DHT", GF_LOG_INFO, "Thread[%d] " + "creation successful", thread_index); + } + thread_index++; + } } ret = gf_defrag_fix_layout (this, defrag, &loc, fix_layout, @@ -2003,13 +2627,40 @@ gf_defrag_start_crawl (void *data) migrate_data); } } + gf_log ("DHT", GF_LOG_INFO, "crawling file-system completed"); +out: + /* We are here means crawling the entire file system is done + or something failed. Set defrag->crawl_done flag to intimate + the migrator threads to exhaust the defrag->queue and terminate*/ + + if (ret) { + defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; + } + + pthread_mutex_lock (&defrag->dfq_mutex); + { + defrag->crawl_done = 1; + + pthread_cond_broadcast ( + &defrag->parallel_migration_cond); + } + pthread_mutex_unlock (&defrag->dfq_mutex); + + /*Wait for all the threads to complete their task*/ + for (i = 0; i < thread_index; i++) { + thread_status = pthread_join (tid[i], NULL); + } + + if (defrag->queue) { + gf_dirent_free (defrag->queue[0].df_entry); + INIT_LIST_HEAD (&(defrag->queue[0].list)); + } if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) && (defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) { defrag->defrag_status = GF_DEFRAG_STATUS_COMPLETE; } -out: LOCK (&defrag->lock); { status = dict_new (); @@ -2022,10 +2673,13 @@ out: } UNLOCK (&defrag->lock); - if (defrag) { - GF_FREE (defrag); - conf->defrag = NULL; - } + GF_FREE (defrag->queue); + + GF_FREE (defrag); + conf->defrag = NULL; + + if (dict) + dict_unref(dict); return ret; } @@ -2049,6 +2703,7 @@ gf_defrag_start (void *data) dht_conf_t *conf = NULL; gf_defrag_info_t *defrag = NULL; xlator_t *this = NULL; + xlator_t *old_THIS = NULL; this = data; conf = this->private; @@ -2074,6 +2729,8 @@ gf_defrag_start (void *data) defrag->defrag_status = GF_DEFRAG_STATUS_STARTED; + old_THIS = THIS; + THIS = this; ret = synctask_new (this->ctx->env, gf_defrag_start_crawl, gf_defrag_done, frame, this); @@ -2081,6 +2738,7 @@ gf_defrag_start (void *data) gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_REBALANCE_START_FAILED, "Could not create task for rebalance"); + THIS = old_THIS; out: return NULL; } diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c index 042adf1714b..3b22a2a8486 100644 --- a/xlators/cluster/dht/src/dht-shared.c +++ b/xlators/cluster/dht/src/dht-shared.c @@ -589,6 +589,23 @@ dht_init (xlator_t *this) defrag->cmd = cmd; defrag->stats = _gf_false; + + defrag->queue = NULL; + + defrag->crawl_done = 0; + + defrag->global_error = 0; + + defrag->q_entry_count = 0; + + defrag->wakeup_crawler = 0; + + synclock_init (&defrag->link_lock, SYNC_LOCK_DEFAULT); + pthread_mutex_init (&defrag->dfq_mutex, 0); + pthread_cond_init (&defrag->parallel_migration_cond, 0); + pthread_cond_init (&defrag->rebalance_crawler_alarm, 0); + defrag->global_error = 0; + } conf->search_unhashed = GF_DHT_LOOKUP_UNHASHED_ON; @@ -651,6 +668,15 @@ dht_init (xlator_t *this) goto err; } + if (cmd) { + ret = dht_init_local_subvolumes (this, conf); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "dht_init_local_subvolumes failed"); + goto err; + } + } + if (dict_get_str (this->options, "decommissioned-bricks", &temp_str) == 0) { ret = dht_parse_decommissioned_bricks (this, conf, temp_str); if (ret == -1) |