summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/dht-rebalance.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/dht/src/dht-rebalance.c')
-rw-r--r--xlators/cluster/dht/src/dht-rebalance.c68
1 files changed, 66 insertions, 2 deletions
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c
index e97e3916fd7..0a29ba30d3d 100644
--- a/xlators/cluster/dht/src/dht-rebalance.c
+++ b/xlators/cluster/dht/src/dht-rebalance.c
@@ -19,14 +19,17 @@
#include <signal.h>
#include <fnmatch.h>
#include <signal.h>
+#include <sys/sysinfo.h>
#define GF_DISK_SECTOR_SIZE 512
#define DHT_REBALANCE_PID 4242 /* Change it if required */
#define DHT_REBALANCE_BLKSIZE (128 * 1024)
-#define MAX_MIGRATOR_THREAD_COUNT 20
+#define MAX_MIGRATOR_THREAD_COUNT 40
#define MAX_MIGRATE_QUEUE_COUNT 500
#define MIN_MIGRATE_QUEUE_COUNT 200
+#define MAX(a, b) (((a) > (b))?(a):(b))
+
#define GF_CRAWL_INDEX_MOVE(idx, sv_cnt) { \
idx++; \
idx %= sv_cnt; \
@@ -1724,6 +1727,36 @@ gf_defrag_task (void *opaque)
pthread_mutex_lock (&defrag->dfq_mutex);
{
+
+ /*Throttle down:
+ If the reconfigured count is less than current thread
+ count, then the current thread will sleep */
+
+ /*TODO: Need to refactor the following block to work
+ *under defrag->lock. For now access
+ * defrag->current_thread_count and rthcount under
+ * dfq_mutex lock */
+ while (!defrag->crawl_done &&
+ (defrag->recon_thread_count <
+ defrag->current_thread_count)) {
+ defrag->current_thread_count--;
+ gf_log ("DHT", GF_LOG_INFO,
+ "Thread sleeping. "
+ "defrag->current_thread_count: %d",
+ defrag->current_thread_count);
+
+ pthread_cond_wait (
+ &defrag->df_wakeup_thread,
+ &defrag->dfq_mutex);
+
+ defrag->current_thread_count++;
+
+ gf_log ("DHT", GF_LOG_INFO,
+ "Thread wokeup. "
+ "defrag->current_thread_count: %d",
+ defrag->current_thread_count);
+ }
+
if (defrag->q_entry_count) {
iterator = list_entry (q_head->next,
typeof(*iterator), list);
@@ -2067,9 +2100,11 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
double elapsed = {0,};
int local_subvols_cnt = 0;
int i = 0;
+ int j = 0;
struct dht_container *container = NULL;
int ldfq_count = 0;
int dfc_index = 0;
+ int throttle_up = 0;
struct dir_dfmeta *dir_dfmeta = NULL;
gf_log (this->name, GF_LOG_INFO, "migrate data called on %s",
@@ -2197,6 +2232,25 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
pthread_mutex_lock (&defrag->dfq_mutex);
{
+
+ /*Throttle up: If reconfigured count is higher than
+ current thread count, wake up the sleeping threads
+ TODO: Need to refactor this. Instead of making the
+ thread sleep and wake, we should terminate and spawn
+ threads on-demand*/
+
+ if (defrag->recon_thread_count >
+ defrag->current_thread_count) {
+ throttle_up =
+ (defrag->recon_thread_count -
+ defrag->current_thread_count);
+ for (j = 0; j < throttle_up; j++) {
+ pthread_cond_signal (
+ &defrag->df_wakeup_thread);
+ }
+
+ }
+
while (defrag->q_entry_count >
MAX_MIGRATE_QUEUE_COUNT) {
defrag->wakeup_crawler = 1;
@@ -2473,6 +2527,7 @@ gf_defrag_start_crawl (void *data)
int i = 0;
int thread_index = 0;
int err = 0;
+ int thread_spawn_count = 0;
pthread_t tid[MAX_MIGRATOR_THREAD_COUNT];
this = data;
@@ -2585,8 +2640,15 @@ gf_defrag_start_crawl (void *data)
INIT_LIST_HEAD (&(defrag->queue[0].list));
+ thread_spawn_count = MAX ((get_nprocs() - 4), 4);
+
+ gf_log (this->name, GF_LOG_DEBUG, "thread_spawn_count: %d",
+ thread_spawn_count);
+
+ defrag->current_thread_count = thread_spawn_count;
+
/*Spawn Threads Here*/
- while (thread_index < MAX_MIGRATOR_THREAD_COUNT) {
+ while (thread_index < thread_spawn_count) {
err = pthread_create(&(tid[thread_index]), NULL,
&gf_defrag_task, (void *)defrag);
if (err != 0) {
@@ -2651,6 +2713,8 @@ out:
pthread_cond_broadcast (
&defrag->parallel_migration_cond);
+ pthread_cond_broadcast (
+ &defrag->df_wakeup_thread);
}
pthread_mutex_unlock (&defrag->dfq_mutex);