summaryrefslogtreecommitdiffstats
path: root/xlators/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster')
-rw-r--r--xlators/cluster/dht/src/dht-common.h8
-rw-r--r--xlators/cluster/dht/src/dht-rebalance.c68
-rw-r--r--xlators/cluster/dht/src/dht-shared.c61
3 files changed, 133 insertions, 4 deletions
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h
index baf244d0564..3e2d5d725e9 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -346,6 +346,13 @@ struct gf_defrag_info_ {
int32_t abort;
int32_t wakeup_crawler;
+ /*Throttle params*/
+ /*stands for reconfigured thread count*/
+ int32_t recon_thread_count;
+ /*stands for current running thread count*/
+ int32_t current_thread_count;
+ pthread_cond_t df_wakeup_thread;
+
/* Hard link handle requirement */
synclock_t link_lock;
};
@@ -421,6 +428,7 @@ struct dht_conf {
/* Support size-weighted rebalancing (heterogeneous bricks). */
gf_boolean_t do_weighting;
gf_boolean_t randomize_by_gfid;
+ char *dthrottle;
dht_methods_t *methods;
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c
index e97e3916fd7..0a29ba30d3d 100644
--- a/xlators/cluster/dht/src/dht-rebalance.c
+++ b/xlators/cluster/dht/src/dht-rebalance.c
@@ -19,14 +19,17 @@
#include <signal.h>
#include <fnmatch.h>
#include <signal.h>
+#include <sys/sysinfo.h>
#define GF_DISK_SECTOR_SIZE 512
#define DHT_REBALANCE_PID 4242 /* Change it if required */
#define DHT_REBALANCE_BLKSIZE (128 * 1024)
-#define MAX_MIGRATOR_THREAD_COUNT 20
+#define MAX_MIGRATOR_THREAD_COUNT 40
#define MAX_MIGRATE_QUEUE_COUNT 500
#define MIN_MIGRATE_QUEUE_COUNT 200
+#define MAX(a, b) (((a) > (b))?(a):(b))
+
#define GF_CRAWL_INDEX_MOVE(idx, sv_cnt) { \
idx++; \
idx %= sv_cnt; \
@@ -1724,6 +1727,36 @@ gf_defrag_task (void *opaque)
pthread_mutex_lock (&defrag->dfq_mutex);
{
+
+ /*Throttle down:
+ If the reconfigured count is less than current thread
+ count, then the current thread will sleep */
+
+ /*TODO: Need to refactor the following block to work
+ *under defrag->lock. For now access
+ * defrag->current_thread_count and rthcount under
+ * dfq_mutex lock */
+ while (!defrag->crawl_done &&
+ (defrag->recon_thread_count <
+ defrag->current_thread_count)) {
+ defrag->current_thread_count--;
+ gf_log ("DHT", GF_LOG_INFO,
+ "Thread sleeping. "
+ "defrag->current_thread_count: %d",
+ defrag->current_thread_count);
+
+ pthread_cond_wait (
+ &defrag->df_wakeup_thread,
+ &defrag->dfq_mutex);
+
+ defrag->current_thread_count++;
+
+ gf_log ("DHT", GF_LOG_INFO,
+ "Thread wokeup. "
+ "defrag->current_thread_count: %d",
+ defrag->current_thread_count);
+ }
+
if (defrag->q_entry_count) {
iterator = list_entry (q_head->next,
typeof(*iterator), list);
@@ -2067,9 +2100,11 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
double elapsed = {0,};
int local_subvols_cnt = 0;
int i = 0;
+ int j = 0;
struct dht_container *container = NULL;
int ldfq_count = 0;
int dfc_index = 0;
+ int throttle_up = 0;
struct dir_dfmeta *dir_dfmeta = NULL;
gf_log (this->name, GF_LOG_INFO, "migrate data called on %s",
@@ -2197,6 +2232,25 @@ gf_defrag_process_dir (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
pthread_mutex_lock (&defrag->dfq_mutex);
{
+
+ /*Throttle up: If reconfigured count is higher than
+ current thread count, wake up the sleeping threads
+ TODO: Need to refactor this. Instead of making the
+ thread sleep and wake, we should terminate and spawn
+ threads on-demand*/
+
+ if (defrag->recon_thread_count >
+ defrag->current_thread_count) {
+ throttle_up =
+ (defrag->recon_thread_count -
+ defrag->current_thread_count);
+ for (j = 0; j < throttle_up; j++) {
+ pthread_cond_signal (
+ &defrag->df_wakeup_thread);
+ }
+
+ }
+
while (defrag->q_entry_count >
MAX_MIGRATE_QUEUE_COUNT) {
defrag->wakeup_crawler = 1;
@@ -2473,6 +2527,7 @@ gf_defrag_start_crawl (void *data)
int i = 0;
int thread_index = 0;
int err = 0;
+ int thread_spawn_count = 0;
pthread_t tid[MAX_MIGRATOR_THREAD_COUNT];
this = data;
@@ -2585,8 +2640,15 @@ gf_defrag_start_crawl (void *data)
INIT_LIST_HEAD (&(defrag->queue[0].list));
+ thread_spawn_count = MAX ((get_nprocs() - 4), 4);
+
+ gf_log (this->name, GF_LOG_DEBUG, "thread_spawn_count: %d",
+ thread_spawn_count);
+
+ defrag->current_thread_count = thread_spawn_count;
+
/*Spawn Threads Here*/
- while (thread_index < MAX_MIGRATOR_THREAD_COUNT) {
+ while (thread_index < thread_spawn_count) {
err = pthread_create(&(tid[thread_index]), NULL,
&gf_defrag_task, (void *)defrag);
if (err != 0) {
@@ -2651,6 +2713,8 @@ out:
pthread_cond_broadcast (
&defrag->parallel_migration_cond);
+ pthread_cond_broadcast (
+ &defrag->df_wakeup_thread);
}
pthread_mutex_unlock (&defrag->dfq_mutex);
diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c
index 3b22a2a8486..0ab81c37890 100644
--- a/xlators/cluster/dht/src/dht-shared.c
+++ b/xlators/cluster/dht/src/dht-shared.c
@@ -15,11 +15,32 @@
#endif
/* TODO: add NS locking */
-
+#include <sys/sysinfo.h>
#include "statedump.h"
#include "dht-common.h"
#include "dht-messages.h"
+#define MAX(a, b) (((a) > (b))?(a):(b))
+
+#define GF_DECIDE_DEFRAG_THROTTLE_COUNT(throttle_count, conf) { \
+ \
+ pthread_mutex_lock (&conf->defrag->dfq_mutex); \
+ \
+ if (!strcasecmp (conf->dthrottle, "lazy")) \
+ conf->defrag->recon_thread_count = 1; \
+ \
+ throttle_count = MAX ((get_nprocs() - 4), 4); \
+ \
+ if (!strcasecmp (conf->dthrottle, "normal")) \
+ conf->defrag->recon_thread_count = \
+ (throttle_count / 2); \
+ \
+ if (!strcasecmp (conf->dthrottle, "aggressive")) \
+ conf->defrag->recon_thread_count = \
+ throttle_count; \
+ \
+ pthread_mutex_unlock (&conf->defrag->dfq_mutex); \
+ } \
/* TODO:
- use volumename in xattr instead of "dht"
@@ -374,6 +395,7 @@ dht_reconfigure (xlator_t *this, dict_t *options)
char *temp_str = NULL;
gf_boolean_t search_unhashed;
int ret = -1;
+ int throttle_count = 0;
GF_VALIDATE_OR_GOTO ("dht", this, out);
GF_VALIDATE_OR_GOTO ("dht", options, out);
@@ -426,6 +448,16 @@ dht_reconfigure (xlator_t *this, dict_t *options)
conf->randomize_by_gfid,
options, bool, out);
+ GF_OPTION_RECONF ("rebal-throttle", conf->dthrottle, options,
+ str, out);
+
+ if (conf->defrag) {
+ GF_DECIDE_DEFRAG_THROTTLE_COUNT (throttle_count, conf);
+ gf_log ("DHT", GF_LOG_INFO, "conf->dthrottle: %s, "
+ "conf->defrag->recon_thread_count: %d",
+ conf->dthrottle, conf->defrag->recon_thread_count);
+ }
+
if (conf->defrag) {
GF_OPTION_RECONF ("rebalance-stats", conf->defrag->stats,
options, bool, out);
@@ -534,7 +566,7 @@ dht_init (xlator_t *this)
gf_defrag_info_t *defrag = NULL;
int cmd = 0;
char *node_uuid = NULL;
-
+ int throttle_count = 0;
GF_VALIDATE_OR_GOTO ("dht", this, err);
@@ -604,6 +636,8 @@ dht_init (xlator_t *this)
pthread_mutex_init (&defrag->dfq_mutex, 0);
pthread_cond_init (&defrag->parallel_migration_cond, 0);
pthread_cond_init (&defrag->rebalance_crawler_alarm, 0);
+ pthread_cond_init (&defrag->df_wakeup_thread, 0);
+
defrag->global_error = 0;
}
@@ -710,6 +744,17 @@ dht_init (xlator_t *this)
GF_OPTION_INIT ("randomize-hash-range-by-gfid",
conf->randomize_by_gfid, bool, err);
+ if (defrag) {
+ GF_OPTION_INIT ("rebal-throttle",
+ conf->dthrottle, str, err);
+
+ GF_DECIDE_DEFRAG_THROTTLE_COUNT(throttle_count, conf);
+
+ gf_log ("DHT", GF_LOG_DEBUG, "conf->dthrottle: %s, "
+ "conf->defrag->recon_thread_count: %d",
+ conf->dthrottle, conf->defrag->recon_thread_count);
+ }
+
GF_OPTION_INIT ("xattr-name", conf->xattr_name, str, err);
gf_asprintf (&conf->link_xattr_name, "%s."DHT_LINKFILE_STR,
conf->xattr_name);
@@ -922,5 +967,17 @@ struct volume_options options[] = {
"subvolume to which it hashes"
},
+ { .key = {"rebal-throttle"},
+ .type = GF_OPTION_TYPE_STR,
+ .default_value = "normal",
+ .description = " Sets the maximum number of parallel file migrations "
+ "allowed on a node during the rebalance operation. The"
+ " default value is normal and allows a max of "
+ "[($(processing units) - 4) / 2), 2] files to be "
+ "migrated at a time. Lazy will allow only one file to "
+ "be migrated at a time and aggressive will allow "
+ "max of [($(processing units) - 4) / 2), 4]"
+ },
+
{ .key = {NULL} },
};