diff options
Diffstat (limited to 'xlators/cluster/dht/src/dht-rebalance.c')
-rw-r--r-- | xlators/cluster/dht/src/dht-rebalance.c | 68 |
1 files changed, 66 insertions, 2 deletions
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index e97e3916fd7..0a29ba30d3d 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -19,14 +19,17 @@ #include <signal.h> #include <fnmatch.h> #include <signal.h> +#include <sys/sysinfo.h> #define GF_DISK_SECTOR_SIZE 512 #define DHT_REBALANCE_PID 4242 /* Change it if required */ #define DHT_REBALANCE_BLKSIZE (128 * 1024) -#define MAX_MIGRATOR_THREAD_COUNT 20 +#define MAX_MIGRATOR_THREAD_COUNT 40 #define MAX_MIGRATE_QUEUE_COUNT 500 #define MIN_MIGRATE_QUEUE_COUNT 200 +#define MAX(a, b) (((a) > (b))?(a):(b)) + #define GF_CRAWL_INDEX_MOVE(idx, sv_cnt) { \ idx++; \ idx %= sv_cnt; \ @@ -1724,6 +1727,36 @@ gf_defrag_task (void *opaque) pthread_mutex_lock (&defrag->dfq_mutex); { + + /*Throttle down: + If the reconfigured count is less than current thread + count, then the current thread will sleep */ + + /*TODO: Need to refactor the following block to work + *under defrag->lock. For now access + * defrag->current_thread_count and rthcount under + * dfq_mutex lock */ + while (!defrag->crawl_done && + (defrag->recon_thread_count < + defrag->current_thread_count)) { + defrag->current_thread_count--; + gf_log ("DHT", GF_LOG_INFO, + "Thread sleeping. " + "defrag->current_thread_count: %d", + defrag->current_thread_count); + + pthread_cond_wait ( + &defrag->df_wakeup_thread, + &defrag->dfq_mutex); + + defrag->current_thread_count++; + + gf_log ("DHT", GF_LOG_INFO, + "Thread wokeup. " + "defrag->current_thread_count: %d", + defrag->current_thread_count); + } + if (defrag->q_entry_count) { iterator = list_entry (q_head->next, typeof(*iterator), list); @@ -2067,9 +2100,11 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, double elapsed = {0,}; int local_subvols_cnt = 0; int i = 0; + int j = 0; struct dht_container *container = NULL; int ldfq_count = 0; int dfc_index = 0; + int throttle_up = 0; struct dir_dfmeta *dir_dfmeta = NULL; gf_log (this->name, GF_LOG_INFO, "migrate data called on %s", @@ -2197,6 +2232,25 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc, pthread_mutex_lock (&defrag->dfq_mutex); { + + /*Throttle up: If reconfigured count is higher than + current thread count, wake up the sleeping threads + TODO: Need to refactor this. Instead of making the + thread sleep and wake, we should terminate and spawn + threads on-demand*/ + + if (defrag->recon_thread_count > + defrag->current_thread_count) { + throttle_up = + (defrag->recon_thread_count - + defrag->current_thread_count); + for (j = 0; j < throttle_up; j++) { + pthread_cond_signal ( + &defrag->df_wakeup_thread); + } + + } + while (defrag->q_entry_count > MAX_MIGRATE_QUEUE_COUNT) { defrag->wakeup_crawler = 1; @@ -2473,6 +2527,7 @@ gf_defrag_start_crawl (void *data) int i = 0; int thread_index = 0; int err = 0; + int thread_spawn_count = 0; pthread_t tid[MAX_MIGRATOR_THREAD_COUNT]; this = data; @@ -2585,8 +2640,15 @@ gf_defrag_start_crawl (void *data) INIT_LIST_HEAD (&(defrag->queue[0].list)); + thread_spawn_count = MAX ((get_nprocs() - 4), 4); + + gf_log (this->name, GF_LOG_DEBUG, "thread_spawn_count: %d", + thread_spawn_count); + + defrag->current_thread_count = thread_spawn_count; + /*Spawn Threads Here*/ - while (thread_index < MAX_MIGRATOR_THREAD_COUNT) { + while (thread_index < thread_spawn_count) { err = pthread_create(&(tid[thread_index]), NULL, &gf_defrag_task, (void *)defrag); if (err != 0) { @@ -2651,6 +2713,8 @@ out: pthread_cond_broadcast ( &defrag->parallel_migration_cond); + pthread_cond_broadcast ( + &defrag->df_wakeup_thread); } pthread_mutex_unlock (&defrag->dfq_mutex); |