summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libglusterfs/src/glusterfs.h1
-rw-r--r--libglusterfs/src/mem-types.h2
-rw-r--r--xlators/cluster/dht/src/dht-common.c130
-rw-r--r--xlators/cluster/dht/src/dht-common.h47
-rw-r--r--xlators/cluster/dht/src/dht-helper.c23
-rw-r--r--xlators/cluster/dht/src/dht-mem-types.h3
-rw-r--r--xlators/cluster/dht/src/dht-rebalance.c1140
-rw-r--r--xlators/cluster/dht/src/dht-shared.c26
8 files changed, 1124 insertions, 248 deletions
diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h
index e9b54535e66..5ca09151ae8 100644
--- a/libglusterfs/src/glusterfs.h
+++ b/libglusterfs/src/glusterfs.h
@@ -83,6 +83,7 @@
#define GF_XATTR_CLRLK_CMD "glusterfs.clrlk"
#define GF_XATTR_PATHINFO_KEY "trusted.glusterfs.pathinfo"
#define GF_XATTR_NODE_UUID_KEY "trusted.glusterfs.node-uuid"
+#define GF_REBAL_FIND_LOCAL_SUBVOL "glusterfs.find-local-subvol"
#define GF_XATTR_VOL_ID_KEY "trusted.glusterfs.volume-id"
#define GF_XATTR_LOCKINFO_KEY "trusted.glusterfs.lockinfo"
#define GF_XATTR_GET_REAL_FILENAME_KEY "glusterfs.get_real_filename:"
diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h
index fc06d52239b..a77998e8a63 100644
--- a/libglusterfs/src/mem-types.h
+++ b/libglusterfs/src/mem-types.h
@@ -150,6 +150,8 @@ enum gf_common_mem_types_ {
gf_common_mt_nfs_exports = 131,
gf_common_mt_gf_brick_spec_t = 132,
gf_common_mt_gf_timer_entry_t = 133,
+ gf_common_mt_int = 134,
+ gf_common_mt_pointer = 135,
gf_common_mt_end
};
#endif
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c
index 307265fb56a..31270c8b8bc 100644
--- a/xlators/cluster/dht/src/dht-common.c
+++ b/xlators/cluster/dht/src/dht-common.c
@@ -2534,6 +2534,87 @@ dht_vgetxattr_fill_and_set (dht_local_t *local, dict_t **dict, xlator_t *this,
out:
return ret;
}
+int
+dht_find_local_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, dict_t *xattr,
+ dict_t *xdata)
+{
+ dht_local_t *local = NULL;
+ dht_conf_t *conf = NULL;
+ call_frame_t *prev = NULL;
+ int this_call_cnt = 0;
+ int ret = 0;
+ char *uuid_str = NULL;
+ uuid_t node_uuid = {0,};
+
+
+ VALIDATE_OR_GOTO (frame, out);
+ VALIDATE_OR_GOTO (frame->local, out);
+
+ local = frame->local;
+ prev = cookie;
+ conf = this->private;
+
+ LOCK (&frame->lock);
+ {
+ this_call_cnt = --local->call_cnt;
+ if (op_ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "getxattr err (%s) for dir",
+ strerror (op_errno));
+ local->op_ret = -1;
+ local->op_errno = op_errno;
+ goto unlock;
+ }
+
+ ret = dict_get_str (xattr, local->xsel, &uuid_str);
+
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to "
+ "get %s", local->xsel);
+ local->op_ret = -1;
+ local->op_errno = EINVAL;
+ goto unlock;
+ }
+
+ if (gf_uuid_parse (uuid_str, node_uuid)) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to parse uuid"
+ " failed for %s", prev->this->name);
+ local->op_ret = -1;
+ local->op_errno = EINVAL;
+ goto unlock;
+ }
+
+ if (gf_uuid_compare (node_uuid, conf->defrag->node_uuid)) {
+ gf_log (this->name, GF_LOG_DEBUG, "subvol %s does not"
+ "belong to this node", prev->this->name);
+ } else {
+ conf->local_subvols[(conf->local_subvols_cnt)++]
+ = prev->this;
+ gf_log (this->name, GF_LOG_DEBUG, "subvol %s belongs to"
+ " this node", prev->this->name);
+ }
+ }
+
+ local->op_ret = 0;
+ unlock:
+ UNLOCK (&frame->lock);
+
+ if (!is_last_call (this_call_cnt))
+ goto out;
+
+ if (local->op_ret == -1) {
+ goto unwind;
+ }
+
+ DHT_STACK_UNWIND (getxattr, frame, 0, 0, NULL, NULL);
+ goto out;
+
+ unwind:
+ DHT_STACK_UNWIND (getxattr, frame, -1, local->op_errno, NULL, NULL);
+ out:
+ return 0;
+}
int
dht_vgetxattr_dir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
@@ -2892,7 +2973,8 @@ dht_getxattr (call_frame_t *frame, xlator_t *this,
int op_errno = -1;
int i = 0;
int cnt = 0;
-
+ char *node_uuid_key = NULL;
+ int ret = -1;
VALIDATE_OR_GOTO (frame, err);
VALIDATE_OR_GOTO (this, err);
VALIDATE_OR_GOTO (loc, err);
@@ -2933,6 +3015,28 @@ dht_getxattr (call_frame_t *frame, xlator_t *this,
return 0;
}
+ if (key && DHT_IS_DIR(layout) &&
+ (!strcmp (key, GF_REBAL_FIND_LOCAL_SUBVOL))) {
+ ret = gf_asprintf
+ (&node_uuid_key, "%s", GF_XATTR_NODE_UUID_KEY);
+ if (ret == -1 || !node_uuid_key) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to copy key");
+ op_errno = ENOMEM;
+ goto err;
+ }
+ (void) strncpy (local->xsel, node_uuid_key, 256);
+ cnt = local->call_cnt = conf->subvolume_cnt;
+ for (i = 0; i < cnt; i++) {
+ STACK_WIND (frame, dht_find_local_subvol_cbk,
+ conf->subvolumes[i],
+ conf->subvolumes[i]->fops->getxattr,
+ loc, node_uuid_key, xdata);
+ }
+ if (node_uuid_key)
+ GF_FREE (node_uuid_key);
+ return 0;
+ }
+
/* for file use cached subvolume (obviously!): see if {}
* below
* for directory:
@@ -2942,6 +3046,7 @@ dht_getxattr (call_frame_t *frame, xlator_t *this,
* NOTE: Don't trust inode here, as that may not be valid
* (until inode_link() happens)
*/
+
if (key && DHT_IS_DIR(layout) &&
(XATTR_IS_PATHINFO (key)
|| (strcmp (key, GF_XATTR_NODE_UUID_KEY) == 0))) {
@@ -3831,13 +3936,24 @@ dht_opendir (call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd,
goto err;
}
- local->call_cnt = conf->subvolume_cnt;
+ if (!(conf->local_subvols_cnt) || !conf->defrag) {
+ local->call_cnt = conf->subvolume_cnt;
- for (i = 0; i < conf->subvolume_cnt; i++) {
- STACK_WIND (frame, dht_fd_cbk,
- conf->subvolumes[i],
- conf->subvolumes[i]->fops->opendir,
- loc, fd, xdata);
+ for (i = 0; i < conf->subvolume_cnt; i++) {
+ STACK_WIND (frame, dht_fd_cbk,
+ conf->subvolumes[i],
+ conf->subvolumes[i]->fops->opendir,
+ loc, fd, xdata);
+
+ }
+ } else {
+ local->call_cnt = conf->local_subvols_cnt;
+ for (i = 0; i < conf->local_subvols_cnt; i++) {
+ STACK_WIND (frame, dht_fd_cbk,
+ conf->local_subvols[i],
+ conf->local_subvols[i]->fops->opendir,
+ loc, fd, xdata);
+ }
}
return 0;
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h
index 43f7c0264f5..d5bb6fc61d4 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -293,6 +293,20 @@ struct gf_defrag_pattern_list {
gf_defrag_pattern_list_t *next;
};
+struct dht_container {
+ union {
+ struct list_head list;
+ struct {
+ struct _gf_dirent_t *next;
+ struct _gf_dirent_t *prev;
+ };
+ };
+ gf_dirent_t *df_entry;
+ xlator_t *this;
+ loc_t *parent_loc;
+ dict_t *migrate_data;
+};
+
struct gf_defrag_info_ {
uint64_t total_files;
uint64_t total_data;
@@ -320,6 +334,19 @@ struct gf_defrag_info_ {
uint64_t total_files_demoted;
int write_freq_threshold;
int read_freq_threshold;
+
+ pthread_cond_t parallel_migration_cond;
+ pthread_mutex_t dfq_mutex;
+ pthread_cond_t rebalance_crawler_alarm;
+ int32_t q_entry_count;
+ int32_t global_error;
+ struct dht_container *queue;
+ int32_t crawl_done;
+ int32_t abort;
+ int32_t wakeup_crawler;
+
+ /* Hard link handle requirement */
+ synclock_t link_lock;
};
typedef struct gf_defrag_info_ gf_defrag_info_t;
@@ -397,9 +424,19 @@ struct dht_conf {
dht_methods_t *methods;
struct mem_pool *lock_pool;
+
+ /*local subvol storage for rebalance*/
+ xlator_t **local_subvols;
+ int32_t local_subvols_cnt;
};
typedef struct dht_conf dht_conf_t;
+struct dht_dfoffset_ctx {
+ xlator_t *this;
+ off_t offset;
+ int32_t readdir_done;
+};
+typedef struct dht_dfoffset_ctx dht_dfoffset_ctx_t;
struct dht_disk_layout {
uint32_t cnt;
@@ -423,6 +460,14 @@ typedef enum {
GF_DHT_WEIGHTED_DISTRIBUTION
} dht_distribution_type_t;
+struct dir_dfmeta {
+ gf_dirent_t *equeue;
+ dht_dfoffset_ctx_t *offset_var;
+ struct list_head **head;
+ struct list_head **iterator;
+ int *fetch_entries;
+};
+
#define ENTRY_MISSING(op_ret, op_errno) (op_ret == -1 && op_errno == ENOENT)
#define is_revalidate(loc) (dht_inode_ctx_layout_get (loc->inode, this, NULL) == 0)
@@ -608,6 +653,8 @@ int dht_start_rebalance_task (xlator_t *this, call_frame_t *frame);
int dht_rebalance_in_progress_check (xlator_t *this, call_frame_t *frame);
int dht_rebalance_complete_check (xlator_t *this, call_frame_t *frame);
+int
+dht_init_local_subvolumes (xlator_t *this, dht_conf_t *conf);
/* FOPS */
int32_t dht_lookup (call_frame_t *frame,
diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c
index cab66017b84..b5114b620ce 100644
--- a/xlators/cluster/dht/src/dht-helper.c
+++ b/xlators/cluster/dht/src/dht-helper.c
@@ -731,7 +731,28 @@ err:
return -1;
}
+int
+dht_init_local_subvolumes (xlator_t *this, dht_conf_t *conf)
+{
+ xlator_list_t *subvols = NULL;
+ int cnt = 0;
+ if (!conf)
+ return -1;
+
+ for (subvols = this->children; subvols; subvols = subvols->next)
+ cnt++;
+
+ conf->local_subvols = GF_CALLOC (cnt, sizeof (xlator_t *),
+ gf_dht_mt_xlator_t);
+ if (!conf->local_subvols) {
+ return -1;
+ }
+
+ conf->local_subvols_cnt = 0;
+
+ return 0;
+}
int
dht_init_subvolumes (xlator_t *this, dht_conf_t *conf)
@@ -752,6 +773,8 @@ dht_init_subvolumes (xlator_t *this, dht_conf_t *conf)
}
conf->subvolume_cnt = cnt;
+ conf->local_subvols_cnt = 0;
+
dht_set_subvol_range(this);
cnt = 0;
diff --git a/xlators/cluster/dht/src/dht-mem-types.h b/xlators/cluster/dht/src/dht-mem-types.h
index e893eb48fd8..46028e6d9e0 100644
--- a/xlators/cluster/dht/src/dht-mem-types.h
+++ b/xlators/cluster/dht/src/dht-mem-types.h
@@ -30,6 +30,9 @@ enum gf_dht_mem_types_ {
gf_defrag_info_mt,
gf_dht_mt_inode_ctx_t,
gf_dht_mt_ctx_stat_time_t,
+ gf_dht_mt_dirent_t,
+ gf_dht_mt_container_t,
+ gf_dht_mt_octx_t,
gf_dht_mt_end
};
#endif
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c
index 206628208f5..52f91946240 100644
--- a/xlators/cluster/dht/src/dht-rebalance.c
+++ b/xlators/cluster/dht/src/dht-rebalance.c
@@ -23,7 +23,25 @@
#define GF_DISK_SECTOR_SIZE 512
#define DHT_REBALANCE_PID 4242 /* Change it if required */
#define DHT_REBALANCE_BLKSIZE (128 * 1024)
+#define MAX_MIGRATOR_THREAD_COUNT 20
+#define MAX_MIGRATE_QUEUE_COUNT 500
+#define MIN_MIGRATE_QUEUE_COUNT 200
+#define GF_CRAWL_INDEX_MOVE(idx, sv_cnt) { \
+ idx++; \
+ idx %= sv_cnt; \
+ }
+
+void
+dht_set_global_defrag_error (gf_defrag_info_t *defrag, int ret)
+{
+ LOCK (&defrag->lock);
+ {
+ defrag->global_error = ret;
+ }
+ UNLOCK (&defrag->lock);
+ return;
+}
static int
dht_write_with_holes (xlator_t *to, fd_t *fd, struct iovec *vec, int count,
int32_t size, off_t offset, struct iobref *iobref)
@@ -178,6 +196,47 @@ gf_defrag_handle_hardlink (xlator_t *this, loc_t *loc, dict_t *xattrs,
goto out;
}
+ /*
+ Parallel migration can lead to migration of the hard link multiple
+ times which can lead to data loss. Hence, adding a fresh lookup to
+ decide whether migration is required or not.
+
+ Elaborating the scenario for let say 10 hardlinks [link{1..10}]:
+ Let say the first hard link "link1" does the setxattr of the
+ new hashed subvolume info on the cached file. As there are multiple
+ threads working, we might have already all the links created on the
+ new hashed by the time we reach hardlink let say link5. Now the
+ number of links on hashed is equal to that of cached. Hence, file
+ migration will happen for link6.
+
+ Cached Hashed
+ --------T link6 rwxrwxrwx link6
+
+ Now post above state all the link file on the cached will be zero
+ byte linkto files. Hence, if we still do migration for the following
+ files link{7..10}, we will end up migrating 0 data leading to data
+ loss.
+ Hence, a lookup can make sure whether we need to migrate the
+ file or not.
+ */
+
+ ret = syncop_lookup (this, loc, NULL, NULL,
+ NULL, NULL);
+ if (ret) {
+ /*Ignore ENOENT and ESTALE as file might have been
+ migrated already*/
+ if (-ret == ENOENT || -ret == ESTALE) {
+ ret = -2;
+ goto out;
+ }
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "Migrate file failed:%s lookup failed with ret = %d",
+ loc->path, ret);
+ ret = -1;
+ goto out;
+ }
+
cached_subvol = dht_subvol_get_cached (this, loc->inode);
if (!cached_subvol) {
gf_msg (this->name, GF_LOG_ERROR, 0,
@@ -198,6 +257,11 @@ gf_defrag_handle_hardlink (xlator_t *this, loc_t *loc, dict_t *xattrs,
goto out;
}
+ if (hashed_subvol == cached_subvol) {
+ ret = -2;
+ goto out;
+ }
+
gf_log (this->name, GF_LOG_INFO, "Attempting to migrate hardlink %s "
"with gfid %s from %s -> %s", loc->name, uuid_utoa (loc->gfid),
cached_subvol->name, hashed_subvol->name);
@@ -288,7 +352,8 @@ out:
*/
static inline int
__is_file_migratable (xlator_t *this, loc_t *loc,
- struct iatt *stbuf, dict_t *xattrs, int flags)
+ struct iatt *stbuf, dict_t *xattrs, int flags,
+ gf_defrag_info_t *defrag)
{
int ret = -1;
@@ -308,13 +373,14 @@ __is_file_migratable (xlator_t *this, loc_t *loc,
if (stbuf->ia_nlink > 1) {
/* support for decomission */
if (flags == GF_DHT_MIGRATE_HARDLINK) {
- ret = gf_defrag_handle_hardlink (this, loc,
- xattrs, stbuf);
-
- /*
- Returning zero will force the file to be remigrated.
- Checkout gf_defrag_handle_hardlink for more information.
- */
+ synclock_lock (&defrag->link_lock);
+ ret = gf_defrag_handle_hardlink
+ (this, loc, xattrs, stbuf);
+ synclock_unlock (&defrag->link_lock);
+ /*
+ Returning zero will force the file to be remigrated.
+ Checkout gf_defrag_handle_hardlink for more information.
+ */
if (ret && ret != -2) {
gf_msg (this->name, GF_LOG_WARNING, 0,
DHT_MSG_MIGRATE_FILE_FAILED,
@@ -610,6 +676,7 @@ __dht_rebalance_migrate_data (xlator_t *from, xlator_t *to, fd_t *src, fd_t *dst
while (total < ia_size) {
read_size = (((ia_size - total) > DHT_REBALANCE_BLKSIZE) ?
DHT_REBALANCE_BLKSIZE : (ia_size - total));
+
ret = syncop_readv (from, src, read_size,
offset, 0, &vector, &count, &iobref, NULL,
NULL);
@@ -904,6 +971,11 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
loc_t tmp_loc = {0, };
gf_boolean_t locked = _gf_false;
int lk_ret = -1;
+ gf_defrag_info_t *defrag = NULL;
+
+ defrag = conf->defrag;
+ if (!defrag)
+ goto out;
gf_log (this->name, GF_LOG_INFO, "%s: attempting to move from %s to %s",
loc->path, from->name, to->name);
@@ -960,7 +1032,7 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
src_ia_prot = stbuf.ia_prot;
/* Check if file can be migrated */
- ret = __is_file_migratable (this, loc, &stbuf, xattr_rsp, flag);
+ ret = __is_file_migratable (this, loc, &stbuf, xattr_rsp, flag, defrag);
if (ret) {
if (ret == -2)
ret = 0;
@@ -1295,6 +1367,7 @@ dht_start_rebalance_task (xlator_t *this, call_frame_t *frame)
{
int ret = -1;
+ frame->root->pid = GF_CLIENT_PID_DEFRAG;
ret = synctask_new (this->ctx->env, rebalance_task,
rebalance_task_completion,
frame, frame);
@@ -1406,61 +1479,341 @@ gf_defrag_pattern_match (gf_defrag_info_t *defrag, char *name, uint64_t size)
return ret;
}
-/* We do a depth first traversal of directories. But before we move into
- * subdirs, we complete the data migration of those directories whose layouts
- * have been fixed
- */
+int dht_dfreaddirp_done (dht_dfoffset_ctx_t *offset_var, int cnt) {
+
+ int i;
+ int result = 1;
+
+ for (i = 0; i < cnt; i++) {
+ if (offset_var[i].readdir_done == 0) {
+ result = 0;
+ break;
+ }
+ }
+ return result;
+}
+
+int static
+gf_defrag_ctx_subvols_init (dht_dfoffset_ctx_t *offset_var, xlator_t *this) {
+
+ int i;
+ dht_conf_t *conf = NULL;
+
+ conf = this->private;
+
+ if (!conf)
+ return -1;
+
+ for (i = 0; i < conf->local_subvols_cnt; i++) {
+ offset_var[i].this = conf->local_subvols[i];
+ offset_var[i].offset = (off_t) 0;
+ offset_var[i].readdir_done = 0;
+ }
+
+ return 0;
+}
int
-gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
- dict_t *migrate_data)
+gf_defrag_migrate_single_file (void *opaque)
{
- int ret = -1;
- loc_t entry_loc = {0,};
- fd_t *fd = NULL;
- gf_dirent_t entries;
- gf_dirent_t *tmp = NULL;
+ xlator_t *this = NULL;
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+ int ret = 0;
gf_dirent_t *entry = NULL;
- gf_boolean_t free_entries = _gf_false;
- off_t offset = 0;
- dict_t *dict = NULL;
+ struct timeval start = {0,};
+ loc_t entry_loc = {0,};
+ loc_t *loc = NULL;
struct iatt iatt = {0,};
+ dict_t *migrate_data = NULL;
int32_t op_errno = 0;
- char *uuid_str = NULL;
- uuid_t node_uuid = {0,};
- struct timeval dir_start = {0,};
struct timeval end = {0,};
double elapsed = {0,};
- struct timeval start = {0,};
- int loglevel = GF_LOG_TRACE;
+ struct dht_container *rebal_entry = NULL;
- gf_log (this->name, GF_LOG_INFO, "migrate data called on %s",
- loc->path);
- gettimeofday (&dir_start, NULL);
+ rebal_entry = (struct dht_container *)opaque;
+ if (!rebal_entry) {
+ gf_log (this->name, GF_LOG_ERROR, "rebal_entry is NULL");
+ ret = -1;
+ goto out;
+ }
- fd = fd_create (loc->inode, defrag->pid);
- if (!fd) {
- gf_log (this->name, GF_LOG_ERROR, "Failed to create fd");
+ this = rebal_entry->this;
+
+ conf = this->private;
+
+ defrag = conf->defrag;
+
+ loc = rebal_entry->parent_loc;
+
+ migrate_data = rebal_entry->migrate_data;
+
+ entry = rebal_entry->df_entry;
+
+ if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
+ ret = -1;
goto out;
}
- ret = syncop_opendir (this, loc, fd, NULL, NULL);
+ if (defrag->stats == _gf_true) {
+ gettimeofday (&start, NULL);
+ }
+
+ if (defrag->defrag_pattern &&
+ (gf_defrag_pattern_match (defrag, entry->d_name,
+ entry->d_stat.ia_size) == _gf_false)) {
+ gf_log (this->name, GF_LOG_ERROR, "pattern_match failed");
+ goto out;
+ }
+
+ memset (&entry_loc, 0, sizeof (entry_loc));
+
+ ret = dht_build_child_loc (this, &entry_loc, loc, entry->d_name);
+ if (ret) {
+ LOCK (&defrag->lock);
+ {
+ defrag->total_failures += 1;
+ }
+ UNLOCK (&defrag->lock);
+
+ ret = 0;
+
+ gf_log (this->name, GF_LOG_ERROR, "Child loc build failed");
+
+ goto out;
+ }
+
+ gf_uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid);
+
+ gf_uuid_copy (entry_loc.pargfid, loc->gfid);
+
+ entry_loc.inode->ia_type = entry->d_stat.ia_type;
+
+ ret = syncop_lookup (this, &entry_loc, &iatt, NULL, NULL, NULL);
if (ret) {
gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_MIGRATE_DATA_FAILED,
- "Migrate data failed: Failed to open dir %s",
- loc->path);
- ret = -1;
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "Migrate file failed: %s lookup failed",
+ entry_loc.name);
+ ret = 0;
goto out;
}
- INIT_LIST_HEAD (&entries.list);
+ ret = syncop_setxattr (this, &entry_loc, migrate_data, 0, NULL, NULL);
+ if (ret < 0) {
+ op_errno = -ret;
+ /* errno is overloaded. See
+ * rebalance_task_completion () */
+ if (op_errno == ENOSPC) {
+ gf_msg_debug (this->name, 0, "migrate-data skipped for"
+ " %s due to space constraints",
+ entry_loc.path);
+ LOCK (&defrag->lock);
+ {
+ defrag->skipped += 1;
+ }
+ UNLOCK (&defrag->lock);
+ } else if (op_errno != EEXIST) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "migrate-data failed for %s", entry_loc.path);
- while ((ret = syncop_readdirp (this, fd, 131072, offset, &entries,
- NULL, NULL)) != 0) {
+ LOCK (&defrag->lock);
+ {
+ defrag->total_failures += 1;
+ }
+ UNLOCK (&defrag->lock);
- if (ret < 0) {
+ }
+
+ ret = gf_defrag_handle_migrate_error (op_errno, defrag);
+
+ if (!ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "migrate-data on %s failed: %s", entry_loc.path,
+ strerror (op_errno));
+ } else if (ret == 1) {
+ ret = 0;
+ goto out;
+ } else if (ret == -1) {
+ goto out;
+ }
+ } else if (ret > 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "migrate-data failed for %s", entry_loc.path);
+ ret = 0;
+ LOCK (&defrag->lock);
+ {
+ defrag->total_failures += 1;
+ }
+ UNLOCK (&defrag->lock);
+ }
+
+ LOCK (&defrag->lock);
+ {
+ defrag->total_files += 1;
+ defrag->total_data += iatt.ia_size;
+ }
+ UNLOCK (&defrag->lock);
+
+ if (defrag->stats == _gf_true) {
+ gettimeofday (&end, NULL);
+ elapsed = (end.tv_sec - start.tv_sec) * 1e6 +
+ (end.tv_usec - start.tv_usec);
+ gf_log (this->name, GF_LOG_INFO, "Migration of "
+ "file:%s size:%"PRIu64" bytes took %.2f"
+ "secs and ret: %d", entry_loc.name,
+ iatt.ia_size, elapsed/1e6, ret);
+ }
+
+out:
+ loc_wipe (&entry_loc);
+
+ return ret;
+
+}
+
+void *
+gf_defrag_task (void *opaque)
+{
+ struct list_head *q_head = NULL;
+ struct dht_container *iterator = NULL;
+ gf_defrag_info_t *defrag = NULL;
+ int ret = 0;
+ gf_boolean_t true = _gf_true;
+
+
+ defrag = (gf_defrag_info_t *)opaque;
+ if (!defrag) {
+ gf_msg ("dht", GF_LOG_ERROR, 0, 0, "defrag is NULL");
+ goto out;
+ }
+
+ q_head = &(defrag->queue[0].list);
+
+ /* The following while loop will dequeue one entry from the defrag->queue
+ under lock. We will update the defrag->global_error only when there
+ is an error which is critical to stop the rebalance process. The stop
+ message will be intimated to other migrator threads by setting the
+ defrag->defrag_status to GF_DEFRAG_STATUS_FAILED.
+
+ In defrag->queue, a low watermark (MIN_MIGRATE_QUEUE_COUNT) is
+ maintained so that crawler does not starve the file migration
+ workers and a high watermark (MAX_MIGRATE_QUEUE_COUNT) so that
+ crawler does not go far ahead in filling up the queue.
+ */
+ while (true) {
+
+ if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
+ goto out;
+ }
+
+ pthread_mutex_lock (&defrag->dfq_mutex);
+ {
+ if (defrag->q_entry_count) {
+ iterator = list_entry (q_head->next,
+ typeof(*iterator), list);
+
+ gf_log ("DHT", GF_LOG_DEBUG, "picking entry "
+ "%s", iterator->df_entry->d_name);
+
+ list_del_init (&(iterator->list));
+
+ defrag->q_entry_count--;
+
+ if ((defrag->q_entry_count <
+ MIN_MIGRATE_QUEUE_COUNT) &&
+ defrag->wakeup_crawler) {
+ pthread_cond_broadcast (
+ &defrag->rebalance_crawler_alarm);
+ }
+ pthread_mutex_unlock (&defrag->dfq_mutex);
+ ret = gf_defrag_migrate_single_file
+ ((void *)iterator);
+
+ /*Critical errors: ENOTCONN and ENOSPACE*/
+ if (ret) {
+ dht_set_global_defrag_error
+ (defrag, ret);
+
+ defrag->defrag_status =
+ GF_DEFRAG_STATUS_FAILED;
+ goto out;
+ }
+
+ gf_dirent_free (iterator->df_entry);
+ GF_FREE (iterator);
+ continue;
+ } else {
+
+ /* defrag->crawl_done flag is set means crawling
+ file system is done and hence a list_empty when
+ the above flag is set indicates there are no more
+ entries to be added to the queue and rebalance is
+ finished */
+
+ if (!defrag->crawl_done) {
+ pthread_cond_wait (
+ &defrag->parallel_migration_cond,
+ &defrag->dfq_mutex);
+ }
+
+ if (defrag->crawl_done &&
+ !defrag->q_entry_count) {
+ pthread_cond_broadcast (
+ &defrag->parallel_migration_cond);
+ goto unlock;
+ } else {
+ pthread_mutex_unlock
+ (&defrag->dfq_mutex);
+ continue;
+ }
+ }
+
+ }
+unlock:
+ pthread_mutex_unlock (&defrag->dfq_mutex);
+ break;
+ }
+out:
+ return NULL;
+}
+
+int static
+gf_defrag_get_entry (xlator_t *this, int i, struct dht_container **container,
+ loc_t *loc, dht_conf_t *conf, gf_defrag_info_t *defrag,
+ fd_t *fd, dict_t *migrate_data,
+ struct dir_dfmeta *dir_dfmeta, dict_t *xattr_req)
+{
+ int ret = -1;
+ char is_linkfile = 0;
+ gf_dirent_t *df_entry = NULL;
+ loc_t entry_loc = {0,};
+ dict_t *xattr_rsp = NULL;
+ struct iatt iatt = {0,};
+ struct dht_container *tmp_container = NULL;
+ xlator_t *hashed_subvol = NULL;
+ xlator_t *cached_subvol = NULL;
+
+ if (dir_dfmeta->offset_var[i].readdir_done == 1) {
+ ret = 0;
+ goto out;
+ }
+ if (dir_dfmeta->fetch_entries[i] == 1) {
+ ret = syncop_readdirp (conf->local_subvols[i], fd, 131072,
+ dir_dfmeta->offset_var[i].offset,
+ &(dir_dfmeta->equeue[i]),
+ NULL, NULL);
+ if (ret == 0) {
+ dir_dfmeta->offset_var[i].readdir_done = 1;
+ ret = 0;
+ goto out;
+ }
+
+ if (ret < 0) {
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_MIGRATE_DATA_FAILED,
"%s: Migrate data failed: Readdir returned"
@@ -1470,213 +1823,426 @@ gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
goto out;
}
- if (list_empty (&entries.list))
- break;
+ if (list_empty (&(dir_dfmeta->equeue[i].list))) {
+ dir_dfmeta->offset_var[i].readdir_done = 1;
+ ret = 0;
+ goto out;
+ }
- free_entries = _gf_true;
+ dir_dfmeta->fetch_entries[i] = 0;
+ }
- list_for_each_entry_safe (entry, tmp, &entries.list, list) {
+ while (1) {
- if (dict) {
- dict_unref (dict);
- dict = NULL;
- }
+ if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
+ ret = -1;
+ goto out;
+ }
- if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
- ret = 1;
- goto out;
- }
+ df_entry = list_entry (dir_dfmeta->iterator[i]->next,
+ typeof (*df_entry), list);
- offset = entry->d_off;
+ if (&df_entry->list == dir_dfmeta->head[i]) {
+ gf_dirent_free (&(dir_dfmeta->equeue[i]));
+ INIT_LIST_HEAD (&(dir_dfmeta->equeue[i].list));
+ dir_dfmeta->fetch_entries[i] = 1;
+ dir_dfmeta->iterator[i] = dir_dfmeta->head[i];
+ ret = 0;
+ goto out;
+ }
- if (!strcmp (entry->d_name, ".") ||
- !strcmp (entry->d_name, ".."))
- continue;
+ dir_dfmeta->iterator[i] = dir_dfmeta->iterator[i]->next;
- if (IA_ISDIR (entry->d_stat.ia_type))
- continue;
+ dir_dfmeta->offset_var[i].offset = df_entry->d_off;
+ if (!strcmp (df_entry->d_name, ".") ||
+ !strcmp (df_entry->d_name, ".."))
+ continue;
- defrag->num_files_lookedup++;
- if (defrag->stats == _gf_true) {
- gettimeofday (&start, NULL);
- }
+ if (IA_ISDIR (df_entry->d_stat.ia_type))
+ continue;
- if (defrag->defrag_pattern &&
- (gf_defrag_pattern_match (defrag, entry->d_name,
- entry->d_stat.ia_size)
- == _gf_false)) {
- continue;
- }
+ defrag->num_files_lookedup++;
- loc_wipe (&entry_loc);
- ret =dht_build_child_loc (this, &entry_loc, loc,
- entry->d_name);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "Child loc"
- " build failed");
- goto out;
- }
+ if (defrag->defrag_pattern &&
+ (gf_defrag_pattern_match (defrag, df_entry->d_name,
+ df_entry->d_stat.ia_size)
+ == _gf_false)) {
+ continue;
+ }
- if (gf_uuid_is_null (entry->d_stat.ia_gfid)) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_GFID_NULL,
- "%s/%s gfid not present", loc->path,
- entry->d_name);
- continue;
- }
+ loc_wipe (&entry_loc);
+ ret = dht_build_child_loc (this, &entry_loc, loc,
+ df_entry->d_name);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Child loc"
+ " build failed");
+ ret = -1;
+ goto out;
+ }
- gf_uuid_copy (entry_loc.gfid, entry->d_stat.ia_gfid);
+ if (gf_uuid_is_null (df_entry->d_stat.ia_gfid)) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_GFID_NULL,
+ "%s/%s gfid not present", loc->path,
+ df_entry->d_name);
+ continue;
+ }
- if (gf_uuid_is_null (loc->gfid)) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_GFID_NULL,
- "%s/%s gfid not present", loc->path,
- entry->d_name);
- continue;
- }
+ gf_uuid_copy (entry_loc.gfid, df_entry->d_stat.ia_gfid);
- gf_uuid_copy (entry_loc.pargfid, loc->gfid);
+ if (gf_uuid_is_null (loc->gfid)) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_GFID_NULL,
+ "%s/%s gfid not present", loc->path,
+ df_entry->d_name);
+ continue;
+ }
- entry_loc.inode->ia_type = entry->d_stat.ia_type;
+ gf_uuid_copy (entry_loc.pargfid, loc->gfid);
- ret = syncop_lookup (this, &entry_loc, &iatt, NULL,
- NULL, NULL);
- if (ret) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_MIGRATE_FILE_FAILED,
- "Migrate file failed:%s lookup failed",
- entry_loc.path);
- ret = -1;
- continue;
- }
+ entry_loc.inode->ia_type = df_entry->d_stat.ia_type;
+ ret = syncop_lookup (conf->local_subvols[i], &entry_loc,
+ &iatt, NULL, xattr_req, &xattr_rsp);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "Migrate file failed:%s lookup failed",
+ entry_loc.path);
+ continue;
+ }
- ret = syncop_getxattr (this, &entry_loc, &dict,
- GF_XATTR_NODE_UUID_KEY, NULL,
- NULL);
- if(ret < 0) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_MIGRATE_FILE_FAILED,
- "Migrate file failed:"
- "Failed to get node-uuid for %s",
- entry_loc.path);
- ret = -1;
- continue;
- }
- ret = dict_get_str (dict, GF_XATTR_NODE_UUID_KEY,
- &uuid_str);
- if(ret < 0) {
- gf_log (this->name, GF_LOG_ERROR, "Failed to "
- "get node-uuid from dict for %s",
- entry_loc.path);
- ret = -1;
- continue;
- }
+ is_linkfile = check_is_linkfile (NULL, &iatt, xattr_rsp,
+ conf->link_xattr_name);
- if (gf_uuid_parse (uuid_str, node_uuid)) {
- gf_log (this->name, GF_LOG_ERROR, "gf_uuid_parse "
- "failed for %s", entry_loc.path);
- continue;
- }
+ if (is_linkfile) {
+ /* No need to add linkto file to the queue for
+ migration. Only the actual data file need to
+ be checked for migration criteria.
+ */
+ gf_log (this->name, GF_LOG_INFO, "linkfile."
+ " Hence skip for file: %s", entry_loc.path);
+ continue;
+ }
- /* if file belongs to different node, skip migration
- * the other node will take responsibility of migration
- */
- if (gf_uuid_compare (node_uuid, defrag->node_uuid)) {
- gf_msg_trace (this->name, 0, "%s does not"
- "belong to this node",
- entry_loc.path);
- continue;
- }
- uuid_str = NULL;
+ ret = syncop_lookup (this, &entry_loc, NULL, NULL,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_FILE_FAILED,
+ "Migrate file failed:%s lookup failed",
+ entry_loc.path);
+ continue;
+ }
- /* if distribute is present, it will honor this key.
- * -1, ENODATA is returned if distribute is not present
- * or file doesn't have a link-file. If file has
- * link-file, the path of link-file will be the value,
- * and also that guarantees that file has to be mostly
- * migrated */
+ /* if distribute is present, it will honor this key.
+ * -1, ENODATA is returned if distribute is not present
+ * or file doesn't have a link-file. If file has
+ * link-file, the path of link-file will be the value,
+ * and also that guarantees that file has to be mostly
+ * migrated */
- ret = syncop_getxattr (this, &entry_loc, NULL,
- GF_XATTR_LINKINFO_KEY, NULL,
- NULL);
- if (ret < 0) {
- if (-ret != ENODATA) {
- loglevel = GF_LOG_ERROR;
- defrag->total_failures += 1;
- } else {
- loglevel = GF_LOG_TRACE;
- }
- gf_log (this->name, loglevel, "%s: failed to "
- "get "GF_XATTR_LINKINFO_KEY" key - %s",
- entry_loc.path, strerror (-ret));
- ret = -1;
- continue;
+ hashed_subvol = dht_subvol_get_hashed (this, &entry_loc);
+ if (!hashed_subvol) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_HASHED_SUBVOL_GET_FAILED,
+ "Failed to get hashed subvol for %s",
+ loc->path);
+ continue;
+ }
+
+ cached_subvol = dht_subvol_get_cached (this, entry_loc.inode);
+ if (!cached_subvol) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_CACHED_SUBVOL_GET_FAILED,
+ "Failed to get cached subvol for %s",
+ loc->path);
+
+ continue;
+ }
+
+ if (hashed_subvol == cached_subvol) {
+ continue;
+ }
+
+ /*Build Container Structure */
+
+ tmp_container = GF_CALLOC (1, sizeof(struct dht_container),
+ gf_dht_mt_container_t);
+ if (!tmp_container) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to allocate "
+ "memory for container");
+ ret = -1;
+ goto out;
+ }
+ tmp_container->df_entry = gf_dirent_for_name (df_entry->d_name);
+ if (!tmp_container->df_entry) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to allocate "
+ "memory for df_entry");
+ ret = -1;
+ goto out;
+ }
+
+ tmp_container->df_entry->d_stat = df_entry->d_stat;
+
+ tmp_container->df_entry->d_ino = df_entry->d_ino;
+
+ tmp_container->df_entry->d_type = df_entry->d_type;
+
+ tmp_container->df_entry->d_len = df_entry->d_len;
+
+ tmp_container->parent_loc = GF_CALLOC(1, sizeof(*loc),
+ gf_dht_mt_loc_t);
+ if (!tmp_container->parent_loc) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to allocate "
+ "memory for loc");
+ ret = -1;
+ goto out;
+ }
+
+
+ ret = loc_copy (tmp_container->parent_loc, loc);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "loc_copy failed");
+ ret = -1;
+ goto out;
+ }
+
+ tmp_container->migrate_data = migrate_data;
+
+ tmp_container->this = this;
+
+ if (df_entry->dict)
+ tmp_container->df_entry->dict =
+ dict_ref (df_entry->dict);
+
+ /*Build Container Structue >> END*/
+
+ ret = 0;
+ goto out;
+
+ }
+
+out:
+ if (ret == 0) {
+ *container = tmp_container;
+ } else {
+ GF_FREE (tmp_container->parent_loc);
+ GF_FREE (tmp_container);
+ }
+
+ if (xattr_rsp)
+ dict_unref (xattr_rsp);
+ return ret;
+}
+
+int
+gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
+ dict_t *migrate_data)
+{
+ int ret = -1;
+ fd_t *fd = NULL;
+ dht_conf_t *conf = NULL;
+ gf_dirent_t entries;
+ dict_t *dict = NULL;
+ dict_t *xattr_req = NULL;
+ struct timeval dir_start = {0,};
+ struct timeval end = {0,};
+ double elapsed = {0,};
+ int local_subvols_cnt = 0;
+ int i = 0;
+ struct dht_container *container = NULL;
+ int ldfq_count = 0;
+ int dfc_index = 0;
+ struct dir_dfmeta *dir_dfmeta = NULL;
+
+ gf_log (this->name, GF_LOG_INFO, "migrate data called on %s",
+ loc->path);
+ gettimeofday (&dir_start, NULL);
+
+ conf = this->private;
+ local_subvols_cnt = conf->local_subvols_cnt;
+
+ if (!local_subvols_cnt) {
+ ret = 0;
+ goto out;
+ }
+
+ fd = fd_create (loc->inode, defrag->pid);
+ if (!fd) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to create fd");
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncop_opendir (this, loc, fd, NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_MIGRATE_DATA_FAILED,
+ "Migrate data failed: Failed to open dir %s",
+ loc->path);
+ ret = -1;
+ goto out;
+ }
+
+ dir_dfmeta = GF_CALLOC (1, sizeof (*dir_dfmeta),
+ gf_common_mt_pointer);
+ if (!dir_dfmeta) {
+ gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta is NULL");
+ ret = -1;
+ goto out;
+ }
+
+
+ dir_dfmeta->head = GF_CALLOC (local_subvols_cnt,
+ sizeof (*(dir_dfmeta->head)),
+ gf_common_mt_pointer);
+ if (!dir_dfmeta->head) {
+ gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta->head is NULL");
+ ret = -1;
+ goto out;
+ }
+
+ dir_dfmeta->iterator = GF_CALLOC (local_subvols_cnt,
+ sizeof (*(dir_dfmeta->iterator)),
+ gf_common_mt_pointer);
+ if (!dir_dfmeta->iterator) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dir_dfmeta->iterator is NULL");
+ ret = -1;
+ goto out;
+ }
+
+ dir_dfmeta->equeue = GF_CALLOC (local_subvols_cnt, sizeof (entries),
+ gf_dht_mt_dirent_t);
+ if (!dir_dfmeta->equeue) {
+ gf_log (this->name, GF_LOG_ERROR, "dir_dfmeta->equeue is NULL");
+ ret = -1;
+ goto out;
+ }
+
+ dir_dfmeta->offset_var = GF_CALLOC (local_subvols_cnt,
+ sizeof (dht_dfoffset_ctx_t),
+ gf_dht_mt_octx_t);
+ if (!dir_dfmeta->offset_var) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dir_dfmeta->offset_var is NULL");
+ ret = -1;
+ goto out;
+ }
+ ret = gf_defrag_ctx_subvols_init (dir_dfmeta->offset_var, this);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "dht_dfoffset_ctx_t"
+ "initialization failed");
+ ret = -1;
+ goto out;
+ }
+
+ dir_dfmeta->fetch_entries = GF_CALLOC (local_subvols_cnt,
+ sizeof (int), gf_common_mt_int);
+ if (!dir_dfmeta->fetch_entries) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dir_dfmeta->fetch_entries is NULL");
+ ret = -1;
+ goto out;
+ }
+
+ for (i = 0; i < local_subvols_cnt ; i++) {
+ INIT_LIST_HEAD (&(dir_dfmeta->equeue[i].list));
+ dir_dfmeta->head[i] = &(dir_dfmeta->equeue[i].list);
+ dir_dfmeta->iterator[i] = dir_dfmeta->head[i];
+ dir_dfmeta->fetch_entries[i] = 1;
+ }
+
+ xattr_req = dict_new ();
+ if (!xattr_req) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_uint32 (xattr_req,
+ conf->link_xattr_name, 256);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to set dict for "
+ "key: %s", conf->link_xattr_name);
+ ret = -1;
+ goto out;
+ }
+
+ /*
+ Job: Read entries from each local subvol and store the entries
+ in equeue array of linked list. Now pick one entry from the
+ equeue array in a round robin basis and add them to defrag Queue.
+ */
+
+ while (!dht_dfreaddirp_done(dir_dfmeta->offset_var,
+ local_subvols_cnt)) {
+
+ pthread_mutex_lock (&defrag->dfq_mutex);
+ {
+ while (defrag->q_entry_count >
+ MAX_MIGRATE_QUEUE_COUNT) {
+ defrag->wakeup_crawler = 1;
+ pthread_cond_wait (
+ &defrag->rebalance_crawler_alarm,
+ &defrag->dfq_mutex);
}
- ret = syncop_setxattr (this, &entry_loc, migrate_data,
- 0, NULL, NULL);
- if (ret < 0) {
- op_errno = -ret;
- /* errno is overloaded. See
- * rebalance_task_completion () */
- if (op_errno == ENOSPC) {
- gf_msg_debug (this->name, 0,
- "migrate-data skipped for"
- " %s due to space "
- "constraints",
- entry_loc.path);
- defrag->skipped +=1;
- } else{
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_MIGRATE_FILE_FAILED,
- "migrate-data failed for %s",
- entry_loc.path);
- defrag->total_failures +=1;
- }
+ ldfq_count = defrag->q_entry_count;
- ret = gf_defrag_handle_migrate_error (op_errno,
- defrag);
+ if (defrag->wakeup_crawler) {
+ defrag->wakeup_crawler = 0;
+ }
- if (!ret)
- gf_msg_debug (this->name, 0,
- "migrate-data on %s "
- "failed: %s",
- entry_loc.path,
- strerror (op_errno));
- else if (ret == 1)
- continue;
- else if (ret == -1)
- goto out;
- } else if (ret > 0) {
- gf_msg (this->name, GF_LOG_ERROR, 0,
- DHT_MSG_MIGRATE_FILE_FAILED,
- "migrate-data failed for %s",
- entry_loc.path);
- defrag->total_failures +=1;
+ }
+ pthread_mutex_unlock (&defrag->dfq_mutex);
+
+ while (ldfq_count <= MAX_MIGRATE_QUEUE_COUNT &&
+ !dht_dfreaddirp_done(dir_dfmeta->offset_var,
+ local_subvols_cnt)) {
+
+ ret = gf_defrag_get_entry (this, dfc_index, &container,
+ loc, conf, defrag, fd,
+ migrate_data, dir_dfmeta,
+ xattr_req);
+ if (ret) {
+ gf_log ("DHT", GF_LOG_INFO, "Found critical "
+ "error from gf_defrag_get_entry");
+ ret = -1;
+ goto out;
}
- LOCK (&defrag->lock);
- {
- defrag->total_files += 1;
- defrag->total_data += iatt.ia_size;
+ /* Check if we got an entry, else we need to move the
+ index to the next subvol */
+ if (!container) {
+ GF_CRAWL_INDEX_MOVE(dfc_index,
+ local_subvols_cnt);
+ continue;
}
- UNLOCK (&defrag->lock);
- if (defrag->stats == _gf_true) {
- gettimeofday (&end, NULL);
- elapsed = (end.tv_sec - start.tv_sec) * 1e6 +
- (end.tv_usec - start.tv_usec);
- gf_log (this->name, GF_LOG_INFO, "Migration of "
- "file:%s size:%"PRIu64" bytes took %.2f"
- "secs", entry_loc.path, iatt.ia_size,
- elapsed/1e6);
+
+ /* Q this entry in the dfq */
+ pthread_mutex_lock (&defrag->dfq_mutex);
+ {
+ list_add_tail (&container->list,
+ &(defrag->queue[0].list));
+ defrag->q_entry_count++;
+ ldfq_count = defrag->q_entry_count;
+
+ gf_log (this->name, GF_LOG_DEBUG, "added "
+ "file:%s parent:%s to the queue ",
+ container->df_entry->d_name,
+ container->parent_loc->path);
+
+ pthread_cond_signal (
+ &defrag->parallel_migration_cond);
}
- }
+ pthread_mutex_unlock (&defrag->dfq_mutex);
- gf_dirent_free (&entries);
- free_entries = _gf_false;
- INIT_LIST_HEAD (&entries.list);
+ GF_CRAWL_INDEX_MOVE(dfc_index, local_subvols_cnt);
+ }
}
gettimeofday (&end, NULL);
@@ -1686,20 +2252,25 @@ gf_defrag_migrate_data (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
"%.2f secs", loc->path, elapsed/1e6);
ret = 0;
out:
- if (free_entries)
- gf_dirent_free (&entries);
- loc_wipe (&entry_loc);
+ GF_FREE (dir_dfmeta->head);
+ GF_FREE (dir_dfmeta->equeue);
+ GF_FREE (dir_dfmeta->iterator);
+ GF_FREE (dir_dfmeta->offset_var);
+ GF_FREE (dir_dfmeta->fetch_entries);
+ GF_FREE (dir_dfmeta);
if (dict)
dict_unref(dict);
+ if (xattr_req)
+ dict_unref(xattr_req);
+
if (fd)
fd_unref (fd);
return ret;
}
-
int
gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
dict_t *fix_layout, dict_t *migrate_data)
@@ -1725,7 +2296,7 @@ gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) &&
(defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX)) {
- ret = gf_defrag_migrate_data (this, defrag, loc, migrate_data);
+ ret = gf_defrag_process_dir (this, defrag, loc, migrate_data);
if (ret)
goto out;
}
@@ -1877,18 +2448,24 @@ out:
int
gf_defrag_start_crawl (void *data)
{
- xlator_t *this = NULL;
- dht_conf_t *conf = NULL;
- gf_defrag_info_t *defrag = NULL;
- int ret = -1;
- loc_t loc = {0,};
- struct iatt iatt = {0,};
- struct iatt parent = {0,};
- dict_t *fix_layout = NULL;
- dict_t *migrate_data = NULL;
- dict_t *status = NULL;
- glusterfs_ctx_t *ctx = NULL;
- dht_methods_t *methods = NULL;
+ xlator_t *this = NULL;
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+ int ret = -1;
+ loc_t loc = {0,};
+ struct iatt iatt = {0,};
+ struct iatt parent = {0,};
+ dict_t *fix_layout = NULL;
+ dict_t *migrate_data = NULL;
+ dict_t *status = NULL;
+ dict_t *dict = NULL;
+ glusterfs_ctx_t *ctx = NULL;
+ dht_methods_t *methods = NULL;
+ int i = 0;
+ int thread_index = 0;
+ int err = 0;
+ int thread_status = 0;
+ pthread_t tid[MAX_MIGRATOR_THREAD_COUNT];
this = data;
if (!this)
@@ -1938,7 +2515,7 @@ gf_defrag_start_crawl (void *data)
"Failed to start rebalance:"
"Failed to set dictionary value: key = %s",
GF_XATTR_FIX_LAYOUT_KEY);
-
+ ret = -1;
goto out;
}
@@ -1970,6 +2547,53 @@ gf_defrag_start_crawl (void *data)
"non-force");
if (ret)
goto out;
+
+ /* Find local subvolumes */
+ ret = syncop_getxattr (this, &loc, &dict,
+ GF_REBAL_FIND_LOCAL_SUBVOL,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, 0, "local "
+ "subvolume determination failed with error: %d",
+ -ret);
+ ret = -1;
+ goto out;
+ }
+
+ for (i = 0 ; i < conf->local_subvols_cnt; i++) {
+ gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvols "
+ "are %s", conf->local_subvols[i]->name);
+ }
+
+ /* Initialize global entry queue */
+ defrag->queue = GF_CALLOC (1, sizeof (struct dht_container),
+ gf_dht_mt_container_t);
+
+ if (!defrag->queue) {
+ gf_log (this->name, GF_LOG_INFO, "No memory for queue");
+ ret = -1;
+ goto out;
+ }
+
+ INIT_LIST_HEAD (&(defrag->queue[0].list));
+
+ /*Spawn Threads Here*/
+ while (thread_index < MAX_MIGRATOR_THREAD_COUNT) {
+ err = pthread_create(&(tid[thread_index]), NULL,
+ &gf_defrag_task, (void *)defrag);
+ if (err != 0) {
+ gf_log ("DHT", GF_LOG_ERROR,
+ "Thread[%d] creation failed. "
+ "Aborting Rebalance",
+ thread_index);
+ ret = -1;
+ goto out;
+ } else {
+ gf_log ("DHT", GF_LOG_INFO, "Thread[%d] "
+ "creation successful", thread_index);
+ }
+ thread_index++;
+ }
}
ret = gf_defrag_fix_layout (this, defrag, &loc, fix_layout,
@@ -2003,13 +2627,40 @@ gf_defrag_start_crawl (void *data)
migrate_data);
}
}
+ gf_log ("DHT", GF_LOG_INFO, "crawling file-system completed");
+out:
+ /* We are here means crawling the entire file system is done
+ or something failed. Set defrag->crawl_done flag to intimate
+ the migrator threads to exhaust the defrag->queue and terminate*/
+
+ if (ret) {
+ defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
+ }
+
+ pthread_mutex_lock (&defrag->dfq_mutex);
+ {
+ defrag->crawl_done = 1;
+
+ pthread_cond_broadcast (
+ &defrag->parallel_migration_cond);
+ }
+ pthread_mutex_unlock (&defrag->dfq_mutex);
+
+ /*Wait for all the threads to complete their task*/
+ for (i = 0; i < thread_index; i++) {
+ thread_status = pthread_join (tid[i], NULL);
+ }
+
+ if (defrag->queue) {
+ gf_dirent_free (defrag->queue[0].df_entry);
+ INIT_LIST_HEAD (&(defrag->queue[0].list));
+ }
if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) &&
(defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) {
defrag->defrag_status = GF_DEFRAG_STATUS_COMPLETE;
}
-out:
LOCK (&defrag->lock);
{
status = dict_new ();
@@ -2022,10 +2673,13 @@ out:
}
UNLOCK (&defrag->lock);
- if (defrag) {
- GF_FREE (defrag);
- conf->defrag = NULL;
- }
+ GF_FREE (defrag->queue);
+
+ GF_FREE (defrag);
+ conf->defrag = NULL;
+
+ if (dict)
+ dict_unref(dict);
return ret;
}
@@ -2049,6 +2703,7 @@ gf_defrag_start (void *data)
dht_conf_t *conf = NULL;
gf_defrag_info_t *defrag = NULL;
xlator_t *this = NULL;
+ xlator_t *old_THIS = NULL;
this = data;
conf = this->private;
@@ -2074,6 +2729,8 @@ gf_defrag_start (void *data)
defrag->defrag_status = GF_DEFRAG_STATUS_STARTED;
+ old_THIS = THIS;
+ THIS = this;
ret = synctask_new (this->ctx->env, gf_defrag_start_crawl,
gf_defrag_done, frame, this);
@@ -2081,6 +2738,7 @@ gf_defrag_start (void *data)
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_REBALANCE_START_FAILED,
"Could not create task for rebalance");
+ THIS = old_THIS;
out:
return NULL;
}
diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c
index 042adf1714b..3b22a2a8486 100644
--- a/xlators/cluster/dht/src/dht-shared.c
+++ b/xlators/cluster/dht/src/dht-shared.c
@@ -589,6 +589,23 @@ dht_init (xlator_t *this)
defrag->cmd = cmd;
defrag->stats = _gf_false;
+
+ defrag->queue = NULL;
+
+ defrag->crawl_done = 0;
+
+ defrag->global_error = 0;
+
+ defrag->q_entry_count = 0;
+
+ defrag->wakeup_crawler = 0;
+
+ synclock_init (&defrag->link_lock, SYNC_LOCK_DEFAULT);
+ pthread_mutex_init (&defrag->dfq_mutex, 0);
+ pthread_cond_init (&defrag->parallel_migration_cond, 0);
+ pthread_cond_init (&defrag->rebalance_crawler_alarm, 0);
+ defrag->global_error = 0;
+
}
conf->search_unhashed = GF_DHT_LOOKUP_UNHASHED_ON;
@@ -651,6 +668,15 @@ dht_init (xlator_t *this)
goto err;
}
+ if (cmd) {
+ ret = dht_init_local_subvolumes (this, conf);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dht_init_local_subvolumes failed");
+ goto err;
+ }
+ }
+
if (dict_get_str (this->options, "decommissioned-bricks", &temp_str) == 0) {
ret = dht_parse_decommissioned_bricks (this, conf, temp_str);
if (ret == -1)