diff options
-rwxr-xr-x | tests/basic/tier/fops-during-migration-pause.t | 7 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-common.h | 21 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-rebalance.c | 69 | ||||
-rw-r--r-- | xlators/cluster/dht/src/tier.c | 153 |
4 files changed, 171 insertions, 79 deletions
diff --git a/tests/basic/tier/fops-during-migration-pause.t b/tests/basic/tier/fops-during-migration-pause.t index f50d666ef27..20719c8510a 100755 --- a/tests/basic/tier/fops-during-migration-pause.t +++ b/tests/basic/tier/fops-during-migration-pause.t @@ -5,8 +5,8 @@ . $(dirname $0)/../../tier.rc NUM_BRICKS=3 -DEMOTE_FREQ=10 -PROMOTE_FREQ=10 +DEMOTE_FREQ=30 +PROMOTE_FREQ=30 TEST_STR="Testing write and truncate fops on tier migration" @@ -59,8 +59,9 @@ TEST mkdir $M0/dir1 # Create a large file (800MB), so that rebalance takes time # The file will be created on the hot tier + sleep_until_mid_cycle $DEMOTE_FREQ -dd if=/dev/zero of=$M0/dir1/FILE1 bs=256k count=5120 +dd if=/dev/zero of=$M0/dir1/FILE1 bs=256k count=4096 # Get the path of the file on the hot tier HPATH=`find $B0/hot/ -name FILE1` diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h index 0f06a4a6670..5fa97a41881 100644 --- a/xlators/cluster/dht/src/dht-common.h +++ b/xlators/cluster/dht/src/dht-common.h @@ -345,6 +345,12 @@ typedef enum tier_mode_ { TIER_MODE_WM } tier_mode_t; +typedef enum tier_pause_state_ { + TIER_RUNNING = 0, + TIER_REQUEST_PAUSE, + TIER_PAUSED +} tier_pause_state_t; + typedef struct gf_tier_conf { int is_tier; int watermark_hi; @@ -360,11 +366,12 @@ typedef struct gf_tier_conf { int tier_demote_frequency; uint64_t st_last_promoted_size; uint64_t st_last_demoted_size; - int request_pause; - gf_boolean_t paused; + tier_pause_state_t pause_state; struct synctask *pause_synctask; gf_timer_t *pause_timer; pthread_mutex_t pause_mutex; + int promote_in_progress; + int demote_in_progress; } gf_tier_conf_t; struct gf_defrag_info_ { @@ -1000,11 +1007,17 @@ int dht_newfile_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int gf_defrag_status_get (gf_defrag_info_t *defrag, dict_t *dict); +void +gf_defrag_set_pause_state (gf_tier_conf_t *tier_conf, tier_pause_state_t state); + +tier_pause_state_t +gf_defrag_get_pause_state (gf_tier_conf_t *tier_conf); + int gf_defrag_pause_tier (xlator_t *this, gf_defrag_info_t *defrag); -void -gf_defrag_wake_pause_tier (gf_tier_conf_t *defrag, gf_boolean_t pause); +tier_pause_state_t +gf_defrag_check_pause_tier (gf_tier_conf_t *defrag); int gf_defrag_resume_tier (xlator_t *this, gf_defrag_info_t *defrag); diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c index 3698b340fef..e28bb76be66 100644 --- a/xlators/cluster/dht/src/dht-rebalance.c +++ b/xlators/cluster/dht/src/dht-rebalance.c @@ -833,7 +833,7 @@ __tier_migrate_data (gf_defrag_info_t *defrag, xlator_t *from, xlator_t *to, fd_ else ret = syncop_writev (to, dst, vector, count, offset, iobref, 0, NULL, NULL); - if (defrag->tier_conf.request_pause) { + if (gf_defrag_get_pause_state (&defrag->tier_conf) != TIER_RUNNING) { gf_msg ("tier", GF_LOG_INFO, 0, DHT_MSG_TIER_PAUSED, "Migrate file paused"); @@ -3507,23 +3507,60 @@ out: } void -gf_defrag_wake_pause_tier (gf_tier_conf_t *tier_conf, gf_boolean_t pause) +gf_defrag_set_pause_state (gf_tier_conf_t *tier_conf, tier_pause_state_t state) +{ + pthread_mutex_lock (&tier_conf->pause_mutex); + tier_conf->pause_state = state; + pthread_mutex_unlock (&tier_conf->pause_mutex); +} + +tier_pause_state_t +gf_defrag_get_pause_state (gf_tier_conf_t *tier_conf) +{ + int state; + + pthread_mutex_lock (&tier_conf->pause_mutex); + state = tier_conf->pause_state; + pthread_mutex_unlock (&tier_conf->pause_mutex); + + return state; +} + +tier_pause_state_t +gf_defrag_check_pause_tier (gf_tier_conf_t *tier_conf) { int woke = 0; + int state = -1; pthread_mutex_lock (&tier_conf->pause_mutex); + + if (tier_conf->pause_state == TIER_RUNNING) + goto out; + + if (tier_conf->pause_state == TIER_PAUSED) + goto out; + + if (tier_conf->promote_in_progress || + tier_conf->demote_in_progress) + goto out; + + tier_conf->pause_state = TIER_PAUSED; + if (tier_conf->pause_synctask) { - tier_conf->paused = pause; synctask_wake (tier_conf->pause_synctask); tier_conf->pause_synctask = 0; woke = 1; } - pthread_mutex_unlock (&tier_conf->pause_mutex); - tier_conf->request_pause = 0; gf_msg ("tier", GF_LOG_DEBUG, 0, DHT_MSG_TIER_PAUSED, - "woken %d paused %d", woke, tier_conf->paused); + "woken %d", woke); +out: + state = tier_conf->pause_state; + + pthread_mutex_unlock (&tier_conf->pause_mutex); + + return state; } void @@ -3546,7 +3583,7 @@ gf_defrag_pause_tier_timeout (void *data) DHT_MSG_TIER_PAUSED, "Request pause timer timeout"); - gf_defrag_wake_pause_tier (&defrag->tier_conf, _gf_false); + gf_defrag_check_pause_tier (&defrag->tier_conf); out: return; @@ -3564,12 +3601,16 @@ gf_defrag_pause_tier (xlator_t *this, gf_defrag_info_t *defrag) /* * Set flag requesting to pause tiering. Wait 'delay' seconds for - * tiering to actually stop as indicated by the "paused" boolean, + * tiering to actually stop as indicated by the pause state * before returning success or failure. */ - defrag->tier_conf.request_pause = 1; + gf_defrag_set_pause_state (&defrag->tier_conf, TIER_REQUEST_PAUSE); - if (defrag->tier_conf.paused == _gf_true) + /* + * If migration is not underway, can pause immediately. + */ + gf_defrag_check_pause_tier (&defrag->tier_conf); + if (gf_defrag_get_pause_state (&defrag->tier_conf) == TIER_PAUSED) goto out; gf_msg (this->name, GF_LOG_DEBUG, 0, @@ -3586,11 +3627,12 @@ gf_defrag_pause_tier (xlator_t *this, gf_defrag_info_t *defrag) synctask_yield (defrag->tier_conf.pause_synctask); - if (defrag->tier_conf.paused == _gf_true) + if (gf_defrag_get_pause_state (&defrag->tier_conf) == TIER_PAUSED) goto out; - ret = -1; + gf_defrag_set_pause_state (&defrag->tier_conf, TIER_RUNNING); + ret = -1; out: gf_msg (this->name, GF_LOG_DEBUG, 0, @@ -3607,8 +3649,7 @@ gf_defrag_resume_tier (xlator_t *this, gf_defrag_info_t *defrag) DHT_MSG_TIER_RESUME, "Pause end. Resume tiering"); - defrag->tier_conf.request_pause = 0; - defrag->tier_conf.paused = _gf_false; + gf_defrag_set_pause_state (&defrag->tier_conf, TIER_RUNNING); return 0; } diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c index 21410dd30dc..8353cdafb60 100644 --- a/xlators/cluster/dht/src/tier.c +++ b/xlators/cluster/dht/src/tier.c @@ -241,51 +241,6 @@ out: } int -tier_do_migration (xlator_t *this, int promote) -{ - gf_defrag_info_t *defrag = NULL; - dht_conf_t *conf = NULL; - long rand = 0; - int migrate = 0; - gf_tier_conf_t *tier_conf = NULL; - - conf = this->private; - if (!conf) - goto exit; - - defrag = conf->defrag; - if (!defrag) - goto exit; - - if (defrag->tier_conf.mode != TIER_MODE_WM) { - migrate = 1; - goto exit; - } - - tier_conf = &defrag->tier_conf; - - switch (tier_conf->watermark_last) { - case TIER_WM_LOW: - migrate = promote ? 1 : 0; - break; - case TIER_WM_HI: - migrate = promote ? 0 : 1; - break; - case TIER_WM_MID: - rand = random() % 100; - if (promote) { - migrate = (rand > tier_conf->percent_full); - } else { - migrate = (rand <= tier_conf->percent_full); - } - break; - } - -exit: - return migrate; -} - -int tier_check_watermark (xlator_t *this, loc_t *root_loc) { tier_watermark_op_t wm = TIER_WM_NONE; @@ -377,6 +332,85 @@ exit: return ret; } +int +tier_do_migration (xlator_t *this, int promote, loc_t *root_loc) +{ + gf_defrag_info_t *defrag = NULL; + dht_conf_t *conf = NULL; + long rand = 0; + int migrate = 0; + gf_tier_conf_t *tier_conf = NULL; + + conf = this->private; + if (!conf) + goto exit; + + defrag = conf->defrag; + if (!defrag) + goto exit; + + if (defrag->tier_conf.mode != TIER_MODE_WM) { + migrate = 1; + goto exit; + } + + if (tier_check_watermark (this, root_loc) != 0) { + gf_msg (this->name, GF_LOG_CRITICAL, errno, + DHT_MSG_LOG_TIER_ERROR, + "Failed to get watermark"); + goto exit; + } + + tier_conf = &defrag->tier_conf; + + switch (tier_conf->watermark_last) { + case TIER_WM_LOW: + migrate = promote ? 1 : 0; + break; + case TIER_WM_HI: + migrate = promote ? 0 : 1; + break; + case TIER_WM_MID: + rand = random() % 100; + if (promote) { + migrate = (rand > tier_conf->percent_full); + } else { + migrate = (rand <= tier_conf->percent_full); + } + break; + } + +exit: + return migrate; +} + +int +tier_migrate (xlator_t *this, int is_promotion, dict_t *migrate_data, + loc_t *loc, gf_tier_conf_t *tier_conf) +{ + int ret = -1; + + pthread_mutex_lock (&tier_conf->pause_mutex); + if (is_promotion) + tier_conf->promote_in_progress = 1; + else + tier_conf->demote_in_progress = 1; + pthread_mutex_unlock (&tier_conf->pause_mutex); + + /* Data migration */ + ret = syncop_setxattr (this, loc, migrate_data, 0, + NULL, NULL); + + pthread_mutex_lock (&tier_conf->pause_mutex); + if (is_promotion) + tier_conf->promote_in_progress = 0; + else + tier_conf->demote_in_progress = 0; + pthread_mutex_unlock (&tier_conf->pause_mutex); + + return ret; +} + static int tier_migrate_using_query_file (void *_args) { @@ -408,6 +442,7 @@ tier_migrate_using_query_file (void *_args) dht_conf_t *conf = NULL; uint64_t total_migrated_bytes = 0; int total_files = 0; + loc_t root_loc = { 0 }; GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out); GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out); @@ -420,6 +455,8 @@ tier_migrate_using_query_file (void *_args) defrag = query_cbk_args->defrag; + dht_build_root_loc (defrag->root_inode, &root_loc); + migrate_data = dict_new (); if (!migrate_data) goto out; @@ -461,7 +498,8 @@ tier_migrate_using_query_file (void *_args) dict_del (migrate_data, "from.migrator"); - if (defrag->tier_conf.request_pause) { + if (gf_defrag_get_pause_state (&defrag->tier_conf) + != TIER_RUNNING) { gf_msg (this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS, "Tiering paused. " @@ -469,7 +507,7 @@ tier_migrate_using_query_file (void *_args) break; } - if (!tier_do_migration (this, query_cbk_args->is_promotion)) { + if (!tier_do_migration (this, query_cbk_args->is_promotion, &root_loc)) { gfdb_methods.gfdb_query_record_free (query_record); query_record = NULL; continue; @@ -653,7 +691,8 @@ tier_migrate_using_query_file (void *_args) gf_uuid_copy (loc.gfid, loc.inode->gfid); - if (defrag->tier_conf.request_pause) { + if (gf_defrag_get_pause_state (&defrag->tier_conf) + != TIER_RUNNING) { gf_msg (this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS, "Tiering paused. " @@ -662,9 +701,9 @@ tier_migrate_using_query_file (void *_args) goto abort; } - /* Data migration */ - ret = syncop_setxattr (this, &loc, migrate_data, 0, - NULL, NULL); + ret = tier_migrate (this, query_cbk_args->is_promotion, + migrate_data, &loc, &defrag->tier_conf); + if (ret) { gf_msg (this->name, GF_LOG_ERROR, -ret, DHT_MSG_LOG_TIER_ERROR, "Failed to " @@ -1639,8 +1678,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag) goto out; } - if (tier_conf->request_pause) - gf_defrag_wake_pause_tier (tier_conf, _gf_true); + gf_defrag_check_pause_tier (tier_conf); sleep(1); @@ -1664,8 +1702,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag) goto out; } - if ((defrag->tier_conf.paused) || - (defrag->tier_conf.request_pause)) + if (gf_defrag_get_pause_state (&defrag->tier_conf) != TIER_RUNNING) continue; @@ -2069,15 +2106,15 @@ tier_init (xlator_t *this) defrag->tier_conf.mode = ret; } - defrag->tier_conf.request_pause = 0; - pthread_mutex_init (&defrag->tier_conf.pause_mutex, 0); + gf_defrag_set_pause_state (&defrag->tier_conf, TIER_RUNNING); + ret = dict_get_str (this->options, "tier-pause", &paused); if (paused && strcmp (paused, "on") == 0) - defrag->tier_conf.request_pause = 1; + gf_defrag_set_pause_state (&defrag->tier_conf, TIER_REQUEST_PAUSE); ret = gf_asprintf(&voldir, "%s/%s", DEFAULT_VAR_RUN_DIRECTORY, |