summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/dht/src')
-rw-r--r--xlators/cluster/dht/src/dht-common.h21
-rw-r--r--xlators/cluster/dht/src/dht-rebalance.c69
-rw-r--r--xlators/cluster/dht/src/tier.c50
3 files changed, 110 insertions, 30 deletions
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h
index 9fc0d402d6f..22855b9425c 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -340,6 +340,12 @@ typedef enum tier_mode_ {
TIER_MODE_WM
} tier_mode_t;
+typedef enum tier_pause_state_ {
+ TIER_RUNNING = 0,
+ TIER_REQUEST_PAUSE,
+ TIER_PAUSED
+} tier_pause_state_t;
+
typedef struct gf_tier_conf {
int is_tier;
int watermark_hi;
@@ -355,11 +361,12 @@ typedef struct gf_tier_conf {
int tier_demote_frequency;
uint64_t st_last_promoted_size;
uint64_t st_last_demoted_size;
- int request_pause;
- gf_boolean_t paused;
+ tier_pause_state_t pause_state;
struct synctask *pause_synctask;
gf_timer_t *pause_timer;
pthread_mutex_t pause_mutex;
+ int promote_in_progress;
+ int demote_in_progress;
} gf_tier_conf_t;
struct gf_defrag_info_ {
@@ -995,11 +1002,17 @@ int dht_newfile_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int
gf_defrag_status_get (gf_defrag_info_t *defrag, dict_t *dict);
+void
+gf_defrag_set_pause_state (gf_tier_conf_t *tier_conf, tier_pause_state_t state);
+
+tier_pause_state_t
+gf_defrag_get_pause_state (gf_tier_conf_t *tier_conf);
+
int
gf_defrag_pause_tier (xlator_t *this, gf_defrag_info_t *defrag);
-void
-gf_defrag_wake_pause_tier (gf_tier_conf_t *defrag, gf_boolean_t pause);
+tier_pause_state_t
+gf_defrag_check_pause_tier (gf_tier_conf_t *defrag);
int
gf_defrag_resume_tier (xlator_t *this, gf_defrag_info_t *defrag);
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c
index cc323c46f0d..ced907e7937 100644
--- a/xlators/cluster/dht/src/dht-rebalance.c
+++ b/xlators/cluster/dht/src/dht-rebalance.c
@@ -826,7 +826,7 @@ __tier_migrate_data (gf_defrag_info_t *defrag, xlator_t *from, xlator_t *to, fd_
else
ret = syncop_writev (to, dst, vector, count,
offset, iobref, 0, NULL, NULL);
- if (defrag->tier_conf.request_pause) {
+ if (gf_defrag_get_pause_state (&defrag->tier_conf) != TIER_RUNNING) {
gf_msg ("tier", GF_LOG_INFO, 0,
DHT_MSG_TIER_PAUSED,
"Migrate file paused");
@@ -3619,23 +3619,60 @@ out:
}
void
-gf_defrag_wake_pause_tier (gf_tier_conf_t *tier_conf, gf_boolean_t pause)
+gf_defrag_set_pause_state (gf_tier_conf_t *tier_conf, tier_pause_state_t state)
+{
+ pthread_mutex_lock (&tier_conf->pause_mutex);
+ tier_conf->pause_state = state;
+ pthread_mutex_unlock (&tier_conf->pause_mutex);
+}
+
+tier_pause_state_t
+gf_defrag_get_pause_state (gf_tier_conf_t *tier_conf)
+{
+ int state;
+
+ pthread_mutex_lock (&tier_conf->pause_mutex);
+ state = tier_conf->pause_state;
+ pthread_mutex_unlock (&tier_conf->pause_mutex);
+
+ return state;
+}
+
+tier_pause_state_t
+gf_defrag_check_pause_tier (gf_tier_conf_t *tier_conf)
{
int woke = 0;
+ int state = -1;
pthread_mutex_lock (&tier_conf->pause_mutex);
+
+ if (tier_conf->pause_state == TIER_RUNNING)
+ goto out;
+
+ if (tier_conf->pause_state == TIER_PAUSED)
+ goto out;
+
+ if (tier_conf->promote_in_progress ||
+ tier_conf->demote_in_progress)
+ goto out;
+
+ tier_conf->pause_state = TIER_PAUSED;
+
if (tier_conf->pause_synctask) {
- tier_conf->paused = pause;
synctask_wake (tier_conf->pause_synctask);
tier_conf->pause_synctask = 0;
woke = 1;
}
- pthread_mutex_unlock (&tier_conf->pause_mutex);
- tier_conf->request_pause = 0;
gf_msg ("tier", GF_LOG_DEBUG, 0,
DHT_MSG_TIER_PAUSED,
- "woken %d paused %d", woke, tier_conf->paused);
+ "woken %d", woke);
+out:
+ state = tier_conf->pause_state;
+
+ pthread_mutex_unlock (&tier_conf->pause_mutex);
+
+ return state;
}
void
@@ -3658,7 +3695,7 @@ gf_defrag_pause_tier_timeout (void *data)
DHT_MSG_TIER_PAUSED,
"Request pause timer timeout");
- gf_defrag_wake_pause_tier (&defrag->tier_conf, _gf_false);
+ gf_defrag_check_pause_tier (&defrag->tier_conf);
out:
return;
@@ -3676,12 +3713,16 @@ gf_defrag_pause_tier (xlator_t *this, gf_defrag_info_t *defrag)
/*
* Set flag requesting to pause tiering. Wait 'delay' seconds for
- * tiering to actually stop as indicated by the "paused" boolean,
+ * tiering to actually stop as indicated by the pause state
* before returning success or failure.
*/
- defrag->tier_conf.request_pause = 1;
+ gf_defrag_set_pause_state (&defrag->tier_conf, TIER_REQUEST_PAUSE);
- if (defrag->tier_conf.paused == _gf_true)
+ /*
+ * If migration is not underway, can pause immediately.
+ */
+ gf_defrag_check_pause_tier (&defrag->tier_conf);
+ if (gf_defrag_get_pause_state (&defrag->tier_conf) == TIER_PAUSED)
goto out;
gf_msg (this->name, GF_LOG_DEBUG, 0,
@@ -3698,11 +3739,12 @@ gf_defrag_pause_tier (xlator_t *this, gf_defrag_info_t *defrag)
synctask_yield (defrag->tier_conf.pause_synctask);
- if (defrag->tier_conf.paused == _gf_true)
+ if (gf_defrag_get_pause_state (&defrag->tier_conf) == TIER_PAUSED)
goto out;
- ret = -1;
+ gf_defrag_set_pause_state (&defrag->tier_conf, TIER_RUNNING);
+ ret = -1;
out:
gf_msg (this->name, GF_LOG_DEBUG, 0,
@@ -3719,8 +3761,7 @@ gf_defrag_resume_tier (xlator_t *this, gf_defrag_info_t *defrag)
DHT_MSG_TIER_RESUME,
"Pause end. Resume tiering");
- defrag->tier_conf.request_pause = 0;
- defrag->tier_conf.paused = _gf_false;
+ gf_defrag_set_pause_state (&defrag->tier_conf, TIER_RUNNING);
return 0;
}
diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c
index eef9ae10ab6..d2b65ea6f86 100644
--- a/xlators/cluster/dht/src/tier.c
+++ b/xlators/cluster/dht/src/tier.c
@@ -200,6 +200,32 @@ exit:
return migrate;
}
+int
+tier_migrate (xlator_t *this, int is_promotion, dict_t *migrate_data,
+ loc_t *loc, gf_tier_conf_t *tier_conf)
+{
+ int ret = -1;
+
+ pthread_mutex_lock (&tier_conf->pause_mutex);
+ if (is_promotion)
+ tier_conf->promote_in_progress = 1;
+ else
+ tier_conf->demote_in_progress = 1;
+ pthread_mutex_unlock (&tier_conf->pause_mutex);
+
+ /* Data migration */
+ ret = syncop_setxattr (this, loc, migrate_data, 0,
+ NULL, NULL);
+
+ pthread_mutex_lock (&tier_conf->pause_mutex);
+ if (is_promotion)
+ tier_conf->promote_in_progress = 0;
+ else
+ tier_conf->demote_in_progress = 0;
+ pthread_mutex_unlock (&tier_conf->pause_mutex);
+
+ return ret;
+}
static int
tier_migrate_using_query_file (void *_args)
@@ -290,7 +316,8 @@ tier_migrate_using_query_file (void *_args)
dict_del (migrate_data, "from.migrator");
- if (defrag->tier_conf.request_pause) {
+ if (gf_defrag_get_pause_state (&defrag->tier_conf)
+ != TIER_RUNNING) {
gf_msg (this->name, GF_LOG_INFO, 0,
DHT_MSG_LOG_TIER_STATUS,
"Tiering paused. "
@@ -486,7 +513,8 @@ tier_migrate_using_query_file (void *_args)
gf_uuid_copy (loc.gfid, loc.inode->gfid);
- if (defrag->tier_conf.request_pause) {
+ if (gf_defrag_get_pause_state (&defrag->tier_conf)
+ != TIER_RUNNING) {
gf_msg (this->name, GF_LOG_INFO, 0,
DHT_MSG_LOG_TIER_STATUS,
"Tiering paused. "
@@ -495,9 +523,9 @@ tier_migrate_using_query_file (void *_args)
goto abort;
}
- /* Data migration */
- ret = syncop_setxattr (this, &loc, migrate_data, 0,
- NULL, NULL);
+ ret = tier_migrate (this, query_cbk_args->is_promotion,
+ migrate_data, &loc, &defrag->tier_conf);
+
if (ret) {
gf_msg (this->name, GF_LOG_ERROR, -ret,
DHT_MSG_LOG_TIER_ERROR, "Failed to "
@@ -1389,8 +1417,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag)
goto out;
}
- if (tier_conf->request_pause)
- gf_defrag_wake_pause_tier (tier_conf, _gf_true);
+ gf_defrag_check_pause_tier (tier_conf);
sleep(1);
@@ -1414,8 +1441,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag)
goto out;
}
- if ((defrag->tier_conf.paused) ||
- (defrag->tier_conf.request_pause))
+ if (gf_defrag_get_pause_state (&defrag->tier_conf) != TIER_RUNNING)
continue;
@@ -1819,15 +1845,15 @@ tier_init (xlator_t *this)
defrag->tier_conf.mode = ret;
}
- defrag->tier_conf.request_pause = 0;
-
pthread_mutex_init (&defrag->tier_conf.pause_mutex, 0);
+ gf_defrag_set_pause_state (&defrag->tier_conf, TIER_RUNNING);
+
ret = dict_get_str (this->options,
"tier-pause", &paused);
if (paused && strcmp (paused, "on") == 0)
- defrag->tier_conf.request_pause = 1;
+ gf_defrag_set_pause_state (&defrag->tier_conf, TIER_REQUEST_PAUSE);
ret = gf_asprintf(&voldir, "%s/%s",
DEFAULT_VAR_RUN_DIRECTORY,