From 5702ff3012f6b97f6b497b5c2e89e8700caf8bc1 Mon Sep 17 00:00:00 2001 From: N Balachandran Date: Wed, 6 Jun 2018 12:57:50 +0530 Subject: cluster/dht: Refactor rebalance code Created init and cleanup functions for certain functionality in order to improve readability. Removed unused code. Change-Id: Ia6a2f4ab64923b6ea8e10487227fb5621eec1488 updates: bz#1586363 Signed-off-by: N Balachandran --- xlators/cluster/dht/src/dht-rebalance.c | 562 ++++++++++++++------------------ 1 file changed, 253 insertions(+), 309 deletions(-) (limited to 'xlators/cluster/dht') diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index 0f836522499..3b986be97e0 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -4369,26 +4369,74 @@ gf_tier_wait_fix_lookup (gf_defrag_info_t *defrag) { } /******************Tier background Fix layout functions END********************/ - -uint64_t -gf_defrag_subvol_file_size (xlator_t *this, loc_t *root_loc) +int +dht_init_local_subvols_and_nodeuuids (xlator_t *this, dht_conf_t *conf, + loc_t *loc) { - int ret = -1; - struct statvfs buf = {0,}; - if (!this) - return 0; + dict_t *dict = NULL; + gf_defrag_info_t *defrag = NULL; + uuid_t *uuid_ptr = NULL; + int ret = -1; + int i = 0; + int j = 0; - ret = syncop_statfs (this, root_loc, &buf, NULL, NULL); + defrag = conf->defrag; + + if (defrag->cmd != GF_DEFRAG_CMD_START_TIER) { + /* Find local subvolumes */ + ret = syncop_getxattr (this, loc, &dict, + GF_REBAL_FIND_LOCAL_SUBVOL, + NULL, NULL); + if (ret && (ret != -ENODATA)) { + gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local " + "subvolume determination failed with error: %d", + -ret); + ret = -1; + goto out; + } + + if (!ret) + goto out; + } + + ret = syncop_getxattr (this, loc, &dict, + GF_REBAL_OLD_FIND_LOCAL_SUBVOL, + NULL, NULL); if (ret) { - /* Aargh! */ - return 0; + gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local " + "subvolume determination failed with error: %d", + -ret); + ret = -1; + goto out; } - return ((buf.f_blocks - buf.f_bfree) * buf.f_frsize); + ret = 0; + +out: + if (ret) { + return ret; + } + + for (i = 0 ; i < conf->local_subvols_cnt; i++) { + gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvol: " + "%s", conf->local_subvols[i]->name); + + for (j = 0; j < conf->local_nodeuuids[i].count; j++) { + uuid_ptr = &(conf->local_nodeuuids[i].elements[j].uuid); + gf_msg (this->name, GF_LOG_INFO, 0, 0, + "node uuid : %s", + uuid_utoa(*uuid_ptr)); + } + } + + return ret; } + +/* Functions for the rebalance estimates feature */ + uint64_t -gf_defrag_subvol_file_cnt (xlator_t *this, loc_t *root_loc) +gf_defrag_subvol_file_size (xlator_t *this, loc_t *root_loc) { int ret = -1; struct statvfs buf = {0,}; @@ -4401,10 +4449,9 @@ gf_defrag_subvol_file_cnt (xlator_t *this, loc_t *root_loc) /* Aargh! */ return 0; } - return (buf.f_files - buf.f_ffree); + return ((buf.f_blocks - buf.f_bfree) * buf.f_frsize); } - uint64_t gf_defrag_total_file_size (xlator_t *this, loc_t *root_loc) { @@ -4420,7 +4467,7 @@ gf_defrag_total_file_size (xlator_t *this, loc_t *root_loc) for (i = 0 ; i < conf->local_subvols_cnt; i++) { size_files = gf_defrag_subvol_file_size (conf->local_subvols[i], - root_loc); + root_loc); total_size += size_files; gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvol: %s," "cnt = %"PRIu64, conf->local_subvols[i]->name, @@ -4434,88 +4481,6 @@ gf_defrag_total_file_size (xlator_t *this, loc_t *root_loc) } -uint64_t -gf_defrag_total_file_cnt (xlator_t *this, loc_t *root_loc) -{ - dht_conf_t *conf = NULL; - int i = 0; - uint64_t num_files = 0; - uint64_t total_entries = 0; - - conf = this->private; - if (!conf) { - return 0; - } - - for (i = 0 ; i < conf->local_subvols_cnt; i++) { - num_files = gf_defrag_subvol_file_cnt (conf->local_subvols[i], - root_loc); - total_entries += num_files; - gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvol: %s," - "cnt = %"PRIu64, conf->local_subvols[i]->name, - num_files); - } - - /* FIXFIXFIX: halve the number of files to negate .glusterfs contents - We need a better way to figure this out */ - - total_entries = total_entries/2; - if (total_entries > 20000) - total_entries += 10000; - - gf_msg (this->name, GF_LOG_INFO, 0, 0, - "Total number of files = %"PRIu64, total_entries); - - return total_entries; -} - - - -int -dht_get_local_subvols_and_nodeuuids (xlator_t *this, dht_conf_t *conf, - loc_t *loc) -{ - - dict_t *dict = NULL; - gf_defrag_info_t *defrag = NULL; - int ret = -1; - - defrag = conf->defrag; - - if (defrag->cmd != GF_DEFRAG_CMD_START_TIER) { - /* Find local subvolumes */ - ret = syncop_getxattr (this, loc, &dict, - GF_REBAL_FIND_LOCAL_SUBVOL, - NULL, NULL); - if (ret && (ret != -ENODATA)) { - - gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local " - "subvolume determination failed with error: %d", - -ret); - ret = -1; - goto out; - } - - if (!ret) - goto out; - } - - ret = syncop_getxattr (this, loc, &dict, - GF_REBAL_OLD_FIND_LOCAL_SUBVOL, - NULL, NULL); - if (ret) { - - gf_msg (this->name, GF_LOG_ERROR, -ret, 0, "local " - "subvolume determination failed with error: %d", - -ret); - ret = -1; - goto out; - } - ret = 0; -out: - return ret; -} - static void* dht_file_counter_thread (void *args) { @@ -4572,6 +4537,176 @@ dht_file_counter_thread (void *args) return NULL; } +int +gf_defrag_estimates_cleanup (xlator_t *this, gf_defrag_info_t *defrag, + pthread_t filecnt_thread) +{ + int ret = -1; + + /* Wake up the filecounter thread. + * By now the defrag status will no longer be + * GF_DEFRAG_STATUS_STARTED so the thread will exit the loop. + */ + pthread_mutex_lock (&defrag->fc_mutex); + { + pthread_cond_broadcast (&defrag->fc_wakeup_cond); + } + pthread_mutex_unlock (&defrag->fc_mutex); + + ret = pthread_join (filecnt_thread, NULL); + if (ret) { + gf_msg ("dht", GF_LOG_ERROR, ret, 0, + "file_counter_thread: pthread_join failed."); + ret = -1; + } + return ret; +} + + +int +gf_defrag_estimates_init (xlator_t *this, loc_t *loc, + pthread_t *filecnt_thread) +{ + int ret = -1; + dht_conf_t *conf = NULL; + gf_defrag_info_t *defrag = NULL; + + conf = this->private; + defrag = conf->defrag; + + g_totalsize = gf_defrag_total_file_size (this, loc); + if (!g_totalsize) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Failed to get " + "the total data size. Unable to estimate " + "time to complete rebalance."); + goto out; + } + + ret = gf_thread_create (filecnt_thread, NULL, + &dht_file_counter_thread, + (void *)defrag, "dhtfcnt"); + + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, ret, 0, "Failed to " + "create the file counter thread "); + ret = -1; + } + ret = 0; +out: + return ret; +} + + +/* Init and cleanup functions for parallel file migration*/ +int +gf_defrag_parallel_migration_init (xlator_t *this, gf_defrag_info_t *defrag, + pthread_t **tid_array, int *thread_index) +{ + int ret = -1; + int thread_spawn_count = 0; + int index = 0; + pthread_t *tid = NULL; + char thread_name[GF_THREAD_NAMEMAX] = {0,}; + + if (!defrag) + goto out; + + /* Initialize global entry queue */ + defrag->queue = GF_CALLOC (1, sizeof (struct dht_container), + gf_dht_mt_container_t); + + if (!defrag->queue) { + gf_msg (this->name, GF_LOG_ERROR, ENOMEM, 0, + "Failed to initialise migration queue"); + ret = -1; + goto out; + } + + INIT_LIST_HEAD (&(defrag->queue[0].list)); + + thread_spawn_count = MAX (MAX_REBAL_THREADS, 4); + + gf_msg_debug (this->name, 0, "thread_spawn_count: %d", + thread_spawn_count); + + tid = GF_CALLOC (thread_spawn_count, sizeof (pthread_t), + gf_common_mt_pthread_t); + if (!tid) { + gf_msg (this->name, GF_LOG_ERROR, ENOMEM, 0, + "Failed to create migration threads"); + ret = -1; + goto out; + } + defrag->current_thread_count = thread_spawn_count; + + /*Spawn Threads Here*/ + while (index < thread_spawn_count) { + snprintf (thread_name, sizeof(thread_name), + "%s%d", "dhtmig", index + 1); + ret = gf_thread_create (&(tid[index]), NULL, + &gf_defrag_task, (void *)defrag, + thread_name); + if (ret != 0) { + gf_msg ("DHT", GF_LOG_ERROR, ret, 0, + "Thread[%d] creation failed. ", + index); + ret = -1; + goto out; + } else { + gf_log ("DHT", GF_LOG_INFO, "Thread[%d] " + "creation successful", index); + } + index++; + } + + ret = 0; +out: + *thread_index = index; + *tid_array = tid; + + return ret; +} + +int +gf_defrag_parallel_migration_cleanup (gf_defrag_info_t *defrag, + pthread_t *tid_array, int thread_index) +{ + int ret = -1; + int i = 0; + + if (!defrag) + goto out; + + /* Wake up all migration threads */ + pthread_mutex_lock (&defrag->dfq_mutex); + { + defrag->crawl_done = 1; + + pthread_cond_broadcast (&defrag->parallel_migration_cond); + pthread_cond_broadcast (&defrag->df_wakeup_thread); + } + pthread_mutex_unlock (&defrag->dfq_mutex); + + /*Wait for all the threads to complete their task*/ + for (i = 0; i < thread_index; i++) { + pthread_join (tid_array[i], NULL); + } + + GF_FREE (tid_array); + + /* Cleanup the migration queue */ + if (defrag->queue) { + gf_dirent_free (defrag->queue[0].df_entry); + INIT_LIST_HEAD (&(defrag->queue[0].list)); + } + + GF_FREE (defrag->queue); + + ret = 0; +out: + return ret; +} + int @@ -4580,28 +4715,22 @@ gf_defrag_start_crawl (void *data) xlator_t *this = NULL; dht_conf_t *conf = NULL; gf_defrag_info_t *defrag = NULL; - int ret = -1; - loc_t loc = {0,}; - struct iatt iatt = {0,}; - struct iatt parent = {0,}; dict_t *fix_layout = NULL; dict_t *migrate_data = NULL; dict_t *status = NULL; glusterfs_ctx_t *ctx = NULL; dht_methods_t *methods = NULL; - int i = 0; + call_frame_t *statfs_frame = NULL; + xlator_t *old_THIS = NULL; + int ret = -1; + loc_t loc = {0,}; + struct iatt iatt = {0,}; + struct iatt parent = {0,}; int thread_index = 0; - int err = 0; - int thread_spawn_count = 0; pthread_t *tid = NULL; - char thread_name[GF_THREAD_NAMEMAX] = {0,}; pthread_t filecnt_thread; gf_boolean_t is_tier_detach = _gf_false; - call_frame_t *statfs_frame = NULL; - xlator_t *old_THIS = NULL; - int j = 0; gf_boolean_t fc_thread_started = _gf_false; - uuid_t *uuid_ptr = NULL; this = data; if (!this) @@ -4717,6 +4846,8 @@ gf_defrag_start_crawl (void *data) } if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) { + /* We need to migrate files */ + migrate_data = dict_new (); if (!migrate_data) { defrag->total_failures++; @@ -4732,102 +4863,32 @@ gf_defrag_start_crawl (void *data) goto out; } - ret = dht_get_local_subvols_and_nodeuuids (this, conf, &loc); + ret = dht_init_local_subvols_and_nodeuuids (this, conf, &loc); if (ret) { ret = -1; goto out; } - for (i = 0 ; i < conf->local_subvols_cnt; i++) { - gf_msg (this->name, GF_LOG_INFO, 0, 0, "local subvols " - "are %s", conf->local_subvols[i]->name); - - for (j = 0; j < conf->local_nodeuuids[i].count; j++) { - uuid_ptr = &(conf->local_nodeuuids[i].elements[j].uuid); - gf_msg (this->name, GF_LOG_INFO, 0, 0, - "node uuids are %s", - uuid_utoa(*uuid_ptr)); - } - } - - g_totalsize = gf_defrag_total_file_size (this, &loc); - if (!g_totalsize) { - gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Failed to get " - "the total data size. Unable to estimate " - "time to complete rebalance."); - } - - g_totalfiles = gf_defrag_total_file_cnt (this, &loc); - if (!g_totalfiles) { - gf_msg (this->name, GF_LOG_ERROR, 0, 0, "Failed to get " - "the total number of files. Unable to estimate " - "time to complete rebalance."); + /* Initialise the structures required for parallel migration */ + ret = gf_defrag_parallel_migration_init (this, defrag, &tid, + &thread_index); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, 0, + "Aborting rebalance."); + goto out; } - ret = gf_thread_create (&filecnt_thread, NULL, - &dht_file_counter_thread, - (void *)defrag, "dhtfcnt"); - + ret = gf_defrag_estimates_init (this, &loc, &filecnt_thread); if (ret) { - gf_msg (this->name, GF_LOG_ERROR, ret, 0, "Failed to " - "create the file counter thread "); + /* Not a fatal error. Allow the rebalance to proceed*/ ret = 0; } else { fc_thread_started = _gf_true; } - - /* Initialize global entry queue */ - defrag->queue = GF_CALLOC (1, sizeof (struct dht_container), - gf_dht_mt_container_t); - - if (!defrag->queue) { - gf_log (this->name, GF_LOG_ERROR, "No memory for " - "queue"); - ret = -1; - goto out; - } - - INIT_LIST_HEAD (&(defrag->queue[0].list)); - - thread_spawn_count = MAX (MAX_REBAL_THREADS, 4); - - gf_msg_debug (this->name, 0, "thread_spawn_count: %d", - thread_spawn_count); - - tid = GF_CALLOC (thread_spawn_count, sizeof (pthread_t), - gf_common_mt_pthread_t); - if (!tid) { - gf_log (this->name, GF_LOG_ERROR, "Insufficient memory " - "for tid"); - ret = -1; - goto out; - } - - defrag->current_thread_count = thread_spawn_count; - - /*Spawn Threads Here*/ - while (thread_index < thread_spawn_count) { - snprintf (thread_name, sizeof(thread_name), - "%s%d", "dhtdf", thread_index + 1); - err = gf_thread_create (&(tid[thread_index]), NULL, - &gf_defrag_task, (void *)defrag, - thread_name); - if (err != 0) { - gf_log ("DHT", GF_LOG_ERROR, - "Thread[%d] creation failed. " - "Aborting Rebalance", - thread_index); - ret = -1; - goto out; - } else { - gf_log ("DHT", GF_LOG_INFO, "Thread[%d] " - "creation successful", thread_index); - } - thread_index++; - } } + if (defrag->cmd == GF_DEFRAG_CMD_START_TIER) { /* Fix layout for attach tier */ ret = gf_tier_start_fix_layout (this, &loc, defrag, fix_layout); @@ -4882,23 +4943,6 @@ out: defrag->defrag_status = GF_DEFRAG_STATUS_FAILED; } - pthread_mutex_lock (&defrag->dfq_mutex); - { - defrag->crawl_done = 1; - - pthread_cond_broadcast ( - &defrag->parallel_migration_cond); - pthread_cond_broadcast ( - &defrag->df_wakeup_thread); - } - pthread_mutex_unlock (&defrag->dfq_mutex); - - /*Wait for all the threads to complete their task*/ - for (i = 0; i < thread_index; i++) { - pthread_join (tid[i], NULL); - } - - GF_FREE (tid); if (defrag->cmd == GF_DEFRAG_CMD_START_TIER) { /* Wait for the tier fixlayout to @@ -4913,10 +4957,7 @@ out: gf_tier_clear_fix_layout (this, &loc, defrag); } - if (defrag->queue) { - gf_dirent_free (defrag->queue[0].df_entry); - INIT_LIST_HEAD (&(defrag->queue[0].list)); - } + gf_defrag_parallel_migration_cleanup (defrag, tid, thread_index); if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) && (defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) { @@ -4924,13 +4965,7 @@ out: } if (fc_thread_started) { - pthread_mutex_lock (&defrag->fc_mutex); - { - pthread_cond_broadcast (&defrag->fc_wakeup_cond); - } - pthread_mutex_unlock (&defrag->fc_mutex); - - pthread_join (filecnt_thread, NULL); + gf_defrag_estimates_cleanup (this, defrag, filecnt_thread); } dht_send_rebalance_event (this, defrag->cmd, defrag->defrag_status); @@ -4947,7 +4982,6 @@ out: } UNLOCK (&defrag->lock); - GF_FREE (defrag->queue); GF_FREE (defrag); conf->defrag = NULL; @@ -5075,84 +5109,6 @@ out: } - -uint64_t -gf_defrag_get_estimates (dht_conf_t *conf) -{ - gf_defrag_info_t *defrag = NULL; - loc_t loc = {0,}; - double rate_lookedup = 0; - uint64_t dirs_processed = 0; - uint64_t files_processed = 0; - uint64_t total_processed = 0; - uint64_t tmp_count = 0; - uint64_t time_to_complete = 0; - struct timeval now = {0,}; - double elapsed = 0; - - - defrag = conf->defrag; - - if (!g_totalfiles) - goto out; - - gettimeofday (&now, NULL); - elapsed = now.tv_sec - defrag->start_time.tv_sec; - - /* I tried locking before accessing num_files_lookedup and - * num_dirs_processed but the status function - * never seemed to get the lock, causing the status cli to - * hang. - */ - - dirs_processed = defrag->num_dirs_processed; - files_processed = defrag->num_files_lookedup; - - total_processed = files_processed + dirs_processed; - - if (total_processed > g_totalfiles) { - /* lookup the number of files again - * The problem here is that not all the newly added files - * might need to be processed. So this need not work - * in some cases - */ - dht_build_root_loc (defrag->root_inode, &loc); - g_totalfiles = gf_defrag_total_file_cnt (defrag->this, &loc); - if (!g_totalfiles) - goto out; - } - - /* rate at which files looked up */ - rate_lookedup = (total_processed)/elapsed; - - /* We initially sum up dirs across all local subvols because we get the - * file count from the inodes on each subvol. - * The same directories will be counted for each subvol but - * we want them to be counted once. - */ - - tmp_count = g_totalfiles - - (dirs_processed * (conf->local_subvols_cnt - 1)); - - if (rate_lookedup) { - time_to_complete = (tmp_count)/rate_lookedup; - - } else { - - gf_msg (THIS->name, GF_LOG_ERROR, 0, 0, - "Unable to calculate estimated time for rebalance"); - } - - gf_log (THIS->name, GF_LOG_INFO, - "TIME: (count) total_processed=%"PRIu64" tmp_cnt = %"PRIu64"," - "rate_lookedup=%f", total_processed, tmp_count, - rate_lookedup); - -out: - return time_to_complete; -} - - int gf_defrag_status_get (dht_conf_t *conf, dict_t *dict) { @@ -5196,18 +5152,6 @@ gf_defrag_status_get (dht_conf_t *conf, dict_t *dict) if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) && (defrag->defrag_status == GF_DEFRAG_STATUS_STARTED)) { -/* - time_to_complete = gf_defrag_get_estimates (conf); - - if (time_to_complete && (time_to_complete > elapsed)) - time_left = time_to_complete - elapsed; - - gf_log (THIS->name, GF_LOG_INFO, - "TIME: Estimated total time to complete based on" - " count = %"PRIu64 " seconds, seconds left = %"PRIu64"", - time_to_complete, time_left); -*/ - time_to_complete = gf_defrag_get_estimates_based_on_size (conf); if (time_to_complete && (time_to_complete > elapsed)) -- cgit