diff options
Diffstat (limited to 'xlators/cluster')
-rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 8 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-rebalance.c | 68 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-shared.c | 61 |
3 files changed, 133 insertions, 4 deletions
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index baf244d0564..3e2d5d725e9 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -346,6 +346,13 @@ struct gf_defrag_info_ { int32_t abort; int32_t wakeup_crawler; + /*Throttle params*/ + /*stands for reconfigured thread count*/ + int32_t recon_thread_count; + /*stands for current running thread count*/ + int32_t current_thread_count; + pthread_cond_t df_wakeup_thread; + /* Hard link handle requirement */ synclock_t link_lock; }; @@ -421,6 +428,7 @@ struct dht_conf { /* Support size-weighted rebalancing (heterogeneous bricks). */ gf_boolean_t do_weighting; gf_boolean_t randomize_by_gfid; + char *dthrottle; dht_methods_t *methods; diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index e97e3916fd7..0a29ba30d3d 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -19,14 +19,17 @@ #include <signal.h> #include <fnmatch.h> #include <signal.h> +#include <sys/sysinfo.h> #define GF_DISK_SECTOR_SIZE 512 #define DHT_REBALANCE_PID 4242 /* Change it if required */ #define DHT_REBALANCE_BLKSIZE (128 * 1024) -#define MAX_MIGRATOR_THREAD_COUNT 20 +#define MAX_MIGRATOR_THREAD_COUNT 40 #define MAX_MIGRATE_QUEUE_COUNT 500 #define MIN_MIGRATE_QUEUE_COUNT 200 +#define MAX(a, b) (((a) > (b))?(a):(b)) + #define GF_CRAWL_INDEX_MOVE(idx, sv_cnt) { \ idx++; \ idx %= sv_cnt; \ @@ -1724,6 +1727,36 @@ gf_defrag_task (void *opaque) pthread_mutex_lock (&defrag->dfq_mutex); { + + /*Throttle down: + If the reconfigured count is less than current thread + count, then the current thread will sleep */ + + /*TODO: Need to refactor the following block to work + *under defrag->lock. For now access + * defrag->current_thread_count and rthcount under + * dfq_mutex lock */ + while (!defrag->crawl_done && + (defrag->recon_thread_count < + defrag->current_thread_count)) { + defrag->current_thread_count--; + gf_log ("DHT", GF_LOG_INFO, + "Thread sleeping. " + "defrag->current_thread_count: %d", + defrag->current_thread_count); + + pthread_cond_wait ( + &defrag->df_wakeup_thread, + &defrag->dfq_mutex); + + defrag->current_thread_count++; + + gf_log ("DHT", GF_LOG_INFO, + "Thread wokeup. " + "defrag->current_thread_count: %d", + defrag->current_thread_count); + } + if (defrag->q_entry_count) { iterator = list_entry (q_head->next, typeof(*iterator), list); @@ -2067,9 +2100,11 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, double elapsed = {0,}; int local_subvols_cnt = 0; int i = 0; + int j = 0; struct dht_container *container = NULL; int ldfq_count = 0; int dfc_index = 0; + int throttle_up = 0; struct dir_dfmeta *dir_dfmeta = NULL; gf_log (this->name, GF_LOG_INFO, "migrate data called on %s", @@ -2197,6 +2232,25 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, pthread_mutex_lock (&defrag->dfq_mutex); { + + /*Throttle up: If reconfigured count is higher than + current thread count, wake up the sleeping threads + TODO: Need to refactor this. Instead of making the + thread sleep and wake, we should terminate and spawn + threads on-demand*/ + + if (defrag->recon_thread_count > + defrag->current_thread_count) { + throttle_up = + (defrag->recon_thread_count - + defrag->current_thread_count); + for (j = 0; j < throttle_up; j++) { + pthread_cond_signal ( + &defrag->df_wakeup_thread); + } + + } + while (defrag->q_entry_count > MAX_MIGRATE_QUEUE_COUNT) { defrag->wakeup_crawler = 1; @@ -2473,6 +2527,7 @@ gf_defrag_start_crawl (void *data) int i = 0; int thread_index = 0; int err = 0; + int thread_spawn_count = 0; pthread_t tid[MAX_MIGRATOR_THREAD_COUNT]; this = data; @@ -2585,8 +2640,15 @@ gf_defrag_start_crawl (void *data) INIT_LIST_HEAD (&(defrag->queue[0].list)); + thread_spawn_count = MAX ((get_nprocs() - 4), 4); + + gf_log (this->name, GF_LOG_DEBUG, "thread_spawn_count: %d", + thread_spawn_count); + + defrag->current_thread_count = thread_spawn_count; + /*Spawn Threads Here*/ - while (thread_index < MAX_MIGRATOR_THREAD_COUNT) { + while (thread_index < thread_spawn_count) { err = pthread_create(&(tid[thread_index]), NULL, &gf_defrag_task, (void *)defrag); if (err != 0) { @@ -2651,6 +2713,8 @@ out: pthread_cond_broadcast ( &defrag->parallel_migration_cond); + pthread_cond_broadcast ( + &defrag->df_wakeup_thread); } pthread_mutex_unlock (&defrag->dfq_mutex); diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c index 3b22a2a8486..0ab81c37890 100644 --- a/xlators/cluster/dht/src/dht-shared.c +++ b/xlators/cluster/dht/src/dht-shared.c @@ -15,11 +15,32 @@ #endif /* TODO: add NS locking */ - +#include <sys/sysinfo.h> #include "statedump.h" #include "dht-common.h" #include "dht-messages.h" +#define MAX(a, b) (((a) > (b))?(a):(b)) + +#define GF_DECIDE_DEFRAG_THROTTLE_COUNT(throttle_count, conf) { \ + \ + pthread_mutex_lock (&conf->defrag->dfq_mutex); \ + \ + if (!strcasecmp (conf->dthrottle, "lazy")) \ + conf->defrag->recon_thread_count = 1; \ + \ + throttle_count = MAX ((get_nprocs() - 4), 4); \ + \ + if (!strcasecmp (conf->dthrottle, "normal")) \ + conf->defrag->recon_thread_count = \ + (throttle_count / 2); \ + \ + if (!strcasecmp (conf->dthrottle, "aggressive")) \ + conf->defrag->recon_thread_count = \ + throttle_count; \ + \ + pthread_mutex_unlock (&conf->defrag->dfq_mutex); \ + } \ /* TODO: - use volumename in xattr instead of "dht" @@ -374,6 +395,7 @@ dht_reconfigure (xlator_t *this, dict_t *options) char *temp_str = NULL; gf_boolean_t search_unhashed; int ret = -1; + int throttle_count = 0; GF_VALIDATE_OR_GOTO ("dht", this, out); GF_VALIDATE_OR_GOTO ("dht", options, out); @@ -426,6 +448,16 @@ dht_reconfigure (xlator_t *this, dict_t *options) conf->randomize_by_gfid, options, bool, out); + GF_OPTION_RECONF ("rebal-throttle", conf->dthrottle, options, + str, out); + + if (conf->defrag) { + GF_DECIDE_DEFRAG_THROTTLE_COUNT (throttle_count, conf); + gf_log ("DHT", GF_LOG_INFO, "conf->dthrottle: %s, " + "conf->defrag->recon_thread_count: %d", + conf->dthrottle, conf->defrag->recon_thread_count); + } + if (conf->defrag) { GF_OPTION_RECONF ("rebalance-stats", conf->defrag->stats, options, bool, out); @@ -534,7 +566,7 @@ dht_init (xlator_t *this) gf_defrag_info_t *defrag = NULL; int cmd = 0; char *node_uuid = NULL; - + int throttle_count = 0; GF_VALIDATE_OR_GOTO ("dht", this, err); @@ -604,6 +636,8 @@ dht_init (xlator_t *this) pthread_mutex_init (&defrag->dfq_mutex, 0); pthread_cond_init (&defrag->parallel_migration_cond, 0); pthread_cond_init (&defrag->rebalance_crawler_alarm, 0); + pthread_cond_init (&defrag->df_wakeup_thread, 0); + defrag->global_error = 0; } @@ -710,6 +744,17 @@ dht_init (xlator_t *this) GF_OPTION_INIT ("randomize-hash-range-by-gfid", conf->randomize_by_gfid, bool, err); + if (defrag) { + GF_OPTION_INIT ("rebal-throttle", + conf->dthrottle, str, err); + + GF_DECIDE_DEFRAG_THROTTLE_COUNT(throttle_count, conf); + + gf_log ("DHT", GF_LOG_DEBUG, "conf->dthrottle: %s, " + "conf->defrag->recon_thread_count: %d", + conf->dthrottle, conf->defrag->recon_thread_count); + } + GF_OPTION_INIT ("xattr-name", conf->xattr_name, str, err); gf_asprintf (&conf->link_xattr_name, "%s."DHT_LINKFILE_STR, conf->xattr_name); @@ -922,5 +967,17 @@ struct volume_options options[] = { "subvolume to which it hashes" }, + { .key = {"rebal-throttle"}, + .type = GF_OPTION_TYPE_STR, + .default_value = "normal", + .description = " Sets the maximum number of parallel file migrations " + "allowed on a node during the rebalance operation. The" + " default value is normal and allows a max of " + "[($(processing units) - 4) / 2), 2] files to be " + "migrated at a time. Lazy will allow only one file to " + "be migrated at a time and aggressive will allow " + "max of [($(processing units) - 4) / 2), 4]" + }, + { .key = {NULL} }, }; |