diff options
Diffstat (limited to 'xlators/cluster/dht/src/tier.c')
-rw-r--r-- | xlators/cluster/dht/src/tier.c | 1007 |
1 files changed, 1007 insertions, 0 deletions
diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c new file mode 100644 index 00000000000..028a42f7a1a --- /dev/null +++ b/xlators/cluster/dht/src/tier.c @@ -0,0 +1,1007 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "dht-common.h" +#include "tier.h" + +/*Hard coded DB info*/ +static gfdb_db_type_t dht_tier_db_type = GFDB_SQLITE3; +/*Hard coded DB info*/ + +/*Mutex for updating the data movement stats*/ +static pthread_mutex_t dm_stat_mutex = PTHREAD_MUTEX_INITIALIZER; + +#define DB_QUERY_RECORD_SIZE 4096 + +static int +tier_parse_query_str (char *query_record_str, + char *gfid, char *link_buffer, ssize_t *link_size) +{ + char *token_str = NULL; + char *delimiter = "|"; + char *saveptr = NULL; + int ret = -1; + + GF_VALIDATE_OR_GOTO ("tier", query_record_str, out); + GF_VALIDATE_OR_GOTO ("tier", gfid, out); + GF_VALIDATE_OR_GOTO ("tier", link_buffer, out); + GF_VALIDATE_OR_GOTO ("tier", link_size, out); + + token_str = strtok_r (query_record_str, delimiter, &saveptr); + if (!token_str) + goto out; + + strcpy (gfid, token_str); + + + token_str = strtok_r (NULL, delimiter, &saveptr); + if (!token_str) + goto out; + + strcpy (link_buffer, token_str); + + token_str = strtok_r (NULL, delimiter, &saveptr); + if (!token_str) + goto out; + + *link_size = atoi (token_str); + + ret = 0; +out: + return ret; +} + +static int +tier_migrate_using_query_file (void *_args) +{ + int ret = -1; + char gfid_str[UUID_CANONICAL_FORM_LEN+1] = ""; + char query_record_str[4096] = ""; + query_cbk_args_t *query_cbk_args = (query_cbk_args_t *) _args; + xlator_t *this = NULL; + gf_defrag_info_t *defrag = NULL; + char *token_str = NULL; + char *delimiter = "::"; + char *link_buffer = NULL; + gfdb_query_record_t *query_record = NULL; + gfdb_link_info_t *link_info = NULL; + struct iatt par_stbuf = {0,}; + struct iatt current = {0,}; + loc_t p_loc = {0,}; + loc_t loc = {0,}; + dict_t *migrate_data = NULL; + inode_t *linked_inode = NULL; + int per_file_status = 0; + int per_link_status = 0; + int total_status = 0; + FILE *queryFILE = NULL; + char *link_str = NULL; + + GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out); + GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out); + this = query_cbk_args->this; + GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->defrag, out); + GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->queryFILE, out); + + defrag = query_cbk_args->defrag; + + queryFILE = query_cbk_args->queryFILE; + + query_record = gfdb_query_record_init(); + if (!query_record) { + goto out; + } + + query_record->_link_info_str = calloc (DB_QUERY_RECORD_SIZE, 1); + if (!query_record->_link_info_str) { + goto out; + } + link_buffer = query_record->_link_info_str; + + link_info = gfdb_link_info_init (); + + migrate_data = dict_new (); + if (!migrate_data) + goto out; + + /* Per file */ + while (fscanf (queryFILE, "%s", query_record_str) != EOF) { + + per_file_status = 0; + per_link_status = 0; + + memset (gfid_str, 0, UUID_CANONICAL_FORM_LEN+1); + memset (query_record->_link_info_str, 0, DB_QUERY_RECORD_SIZE); + + if (tier_parse_query_str (query_record_str, gfid_str, + link_buffer, + &query_record->link_info_size)) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "failed parsing %s\n", query_record_str); + continue; + } + + uuid_parse (gfid_str, query_record->gfid); + + if (dict_get(migrate_data, GF_XATTR_FILE_MIGRATE_KEY)) + dict_del(migrate_data, GF_XATTR_FILE_MIGRATE_KEY); + + if (dict_get(migrate_data, "from.migrator")) + dict_del(migrate_data, "from.migrator"); + + token_str = strtok (link_buffer, delimiter); + if (token_str != NULL) { + per_file_status = + dict_set_str (migrate_data, + GF_XATTR_FILE_MIGRATE_KEY, + "force"); + if (per_file_status) { + goto per_file_out; + } + + /* Flag to suggest the xattr call is from migrator */ + per_file_status = dict_set_str (migrate_data, + "from.migrator", "yes"); + if (per_file_status) { + goto per_file_out; + } + } + per_link_status = 0; + /* Per link of file */ + while (token_str != NULL) { + + link_str = gf_strdup (token_str); + + if (!link_info) { + per_link_status = -1; + goto per_file_out; + } + + memset (link_info, 0, sizeof(gfdb_link_info_t)); + + ret = str_to_link_info (link_str, link_info); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "failed parsing %s\n", link_str); + per_link_status = -1; + goto error; + } + + uuid_copy (p_loc.gfid, link_info->pargfid); + + p_loc.inode = inode_new (defrag->root_inode->table); + if (!p_loc.inode) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "failed parsing %s\n", link_str); + per_link_status = -1; + goto error; + } + + ret = syncop_lookup (this, &p_loc, NULL, &par_stbuf, + NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + " ERROR in parent lookup\n"); + per_link_status = -1; + goto error; + } + linked_inode = inode_link (p_loc.inode, NULL, NULL, + &par_stbuf); + inode_unref (p_loc.inode); + p_loc.inode = linked_inode; + + uuid_copy (loc.gfid, query_record->gfid); + loc.inode = inode_new (defrag->root_inode->table); + uuid_copy (loc.pargfid, link_info->pargfid); + loc.parent = inode_ref(p_loc.inode); + + loc.name = gf_strdup (link_info->file_name); + if (!loc.name) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, "ERROR in " + "memory allocation\n"); + per_link_status = -1; + goto error; + } + + loc.path = gf_strdup (link_info->file_path); + if (!loc.path) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, "ERROR in " + "memory allocation\n"); + per_link_status = -1; + goto error; + } + + uuid_copy (loc.parent->gfid, link_info->pargfid); + + ret = syncop_lookup (this, &loc, NULL, ¤t, + NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, "ERROR in " + "current lookup\n"); + per_link_status = -1; + goto error; + } + linked_inode = inode_link (loc.inode, NULL, NULL, + ¤t); + inode_unref (loc.inode); + loc.inode = linked_inode; + + gf_msg (this->name, GF_LOG_INFO, 0, + DHT_MSG_LOG_TIER_STATUS, "Tier migrate file %s", + loc.name); + + ret = syncop_setxattr (this, &loc, migrate_data, 0); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, "ERROR %d in " + "current migration %s %s\n", ret, + loc.name, + loc.path); + per_link_status = -1; + goto error; + } + inode_unref (loc.inode); + inode_unref (loc.parent); + inode_unref (p_loc.inode); +error: + if (loc.name) { + GF_FREE ((char *) loc.name); + } + if (loc.path) { + GF_FREE ((char *) loc.path); + } + token_str = NULL; + token_str = strtok (NULL, delimiter); + GF_FREE (link_str); + } + per_file_status = per_link_status; +per_file_out: + if (per_file_status) { + pthread_mutex_lock (&dm_stat_mutex); + defrag->total_failures++; + pthread_mutex_unlock (&dm_stat_mutex); + } else { + pthread_mutex_lock (&dm_stat_mutex); + defrag->total_files++; + pthread_mutex_unlock (&dm_stat_mutex); + } + total_status = total_status + per_file_status; + per_link_status = 0; + per_file_status = 0; + query_record_str[0] = '\0'; + } + +out: + if (link_buffer) + free (link_buffer); + gfdb_link_info_fini (&link_info); + if (migrate_data) + dict_unref (migrate_data); + gfdb_query_record_fini (&query_record); + return total_status; +} + + +/*This is the call back function per record/file from data base*/ +static int +tier_gf_query_callback (gfdb_query_record_t *gfdb_query_record, + void *_args) { + int ret = -1; + char gfid_str[UUID_CANONICAL_FORM_LEN] = ""; + query_cbk_args_t *query_cbk_args = _args; + + GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out); + GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->defrag, out); + GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->queryFILE, out); + + uuid_unparse (gfdb_query_record->gfid, gfid_str); + fprintf (query_cbk_args->queryFILE, "%s|%s|%ld\n", gfid_str, + gfdb_query_record->_link_info_str, + gfdb_query_record->link_info_size); + + pthread_mutex_lock (&dm_stat_mutex); + query_cbk_args->defrag->num_files_lookedup++; + pthread_mutex_unlock (&dm_stat_mutex); + + ret = 0; +out: + return ret; +} + +/*This is the call back function for each brick from hot/cold bricklist + * It picks up each bricks db and queries for eligible files for migration. + * The list of eligible files are populated in appropriate query files*/ +static int +tier_process_brick_cbk (dict_t *brick_dict, char *key, data_t *value, + void *args) { + int ret = -1; + char *db_path = NULL; + query_cbk_args_t *query_cbk_args = NULL; + xlator_t *this = NULL; + gfdb_conn_node_t *conn_node = NULL; + dict_t *params_dict = NULL; + _gfdb_brick_dict_info_t *gfdb_brick_dict_info = args; + + /*Init of all the essentials*/ + GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_dict_info , out); + query_cbk_args = gfdb_brick_dict_info->_query_cbk_args; + + GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out); + this = query_cbk_args->this; + + GF_VALIDATE_OR_GOTO (this->name, + gfdb_brick_dict_info->_query_cbk_args, out); + + GF_VALIDATE_OR_GOTO (this->name, value, out); + db_path = data_to_str(value); + + /*Preparing DB parameters before init_db i.e getting db connection*/ + params_dict = dict_new (); + if (!params_dict) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "DB Params cannot initialized!"); + goto out; + } + SET_DB_PARAM_TO_DICT(this->name, params_dict, GFDB_SQL_PARAM_DBPATH, + db_path, ret, out); + + /*Get the db connection*/ + conn_node = init_db((void *)params_dict, dht_tier_db_type); + if (!conn_node) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "FATAL: Failed initializing db operations"); + goto out; + } + + /*Query for eligible files from db*/ + query_cbk_args->queryFILE = fopen(GET_QFILE_PATH + (gfdb_brick_dict_info->_gfdb_promote), "a+"); + if (!query_cbk_args->queryFILE) { + gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, + "Failed to open query file %s:%s", + GET_QFILE_PATH + (gfdb_brick_dict_info->_gfdb_promote), + strerror(errno)); + goto out; + } + if (!gfdb_brick_dict_info->_gfdb_promote) { + if (query_cbk_args->defrag->write_freq_threshold == 0 && + query_cbk_args->defrag->read_freq_threshold == 0) { + ret = find_unchanged_for_time(conn_node, + tier_gf_query_callback, + (void *)query_cbk_args, + gfdb_brick_dict_info->time_stamp); + } else { + ret = find_unchanged_for_time_freq(conn_node, + tier_gf_query_callback, + (void *)query_cbk_args, + gfdb_brick_dict_info->time_stamp, + query_cbk_args->defrag-> + write_freq_threshold, + query_cbk_args->defrag-> + read_freq_threshold, + _gf_false); + } + } else { + if (query_cbk_args->defrag->write_freq_threshold == 0 && + query_cbk_args->defrag->read_freq_threshold == 0) { + ret = find_recently_changed_files(conn_node, + tier_gf_query_callback, + (void *)query_cbk_args, + gfdb_brick_dict_info->time_stamp); + } else { + ret = find_recently_changed_files_freq(conn_node, + tier_gf_query_callback, + (void *)query_cbk_args, + gfdb_brick_dict_info->time_stamp, + query_cbk_args->defrag-> + write_freq_threshold, + query_cbk_args->defrag-> + read_freq_threshold, + _gf_false); + } + } + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "FATAL: query from db failed"); + goto out; + } + ret = 0; +out: + if (query_cbk_args->queryFILE) { + fclose (query_cbk_args->queryFILE); + query_cbk_args->queryFILE = NULL; + } + fini_db (conn_node); + return ret; +} + +inline int +tier_build_migration_qfile (demotion_args_t *args, + query_cbk_args_t *query_cbk_args, + gf_boolean_t is_promotion) +{ + gfdb_time_t current_time; + _gfdb_brick_dict_info_t gfdb_brick_dict_info; + gfdb_time_t time_in_past; + int ret = -1; + + remove (GET_QFILE_PATH (is_promotion)); + time_in_past.tv_sec = args->freq_time; + time_in_past.tv_usec = 0; + if (gettimeofday (¤t_time, NULL) == -1) { + gf_log (args->this->name, GF_LOG_ERROR, + "Failed to get current timen"); + goto out; + } + time_in_past.tv_sec = current_time.tv_sec - time_in_past.tv_sec; + time_in_past.tv_usec = current_time.tv_usec - time_in_past.tv_usec; + gfdb_brick_dict_info.time_stamp = &time_in_past; + gfdb_brick_dict_info._gfdb_promote = is_promotion; + gfdb_brick_dict_info._query_cbk_args = query_cbk_args; + ret = dict_foreach (args->brick_list, tier_process_brick_cbk, + &gfdb_brick_dict_info); + if (ret) { + gf_log (args->this->name, GF_LOG_ERROR, + "Brick query failedn"); + goto out; + } +out: + return ret; +} + +inline int +tier_migrate_files_using_qfile (demotion_args_t *comp, + query_cbk_args_t *query_cbk_args, + char *qfile) +{ + char renamed_file[PATH_MAX] = ""; + int ret = -1; + + query_cbk_args->queryFILE = fopen (qfile, "r"); + if (!query_cbk_args->queryFILE) { + gf_log ("tier", GF_LOG_ERROR, + "Failed opening %s for migration", qfile); + goto out; + } + ret = tier_migrate_using_query_file ((void *)query_cbk_args); + fclose (query_cbk_args->queryFILE); + query_cbk_args->queryFILE = NULL; + if (ret) { + sprintf (renamed_file, "%s.err", qfile); + rename (qfile, renamed_file); + } +out: + return ret; +} + +/*Demotion Thread*/ +static void * +tier_demote (void *args) +{ + int ret = -1; + query_cbk_args_t query_cbk_args; + demotion_args_t *demotion_args = args; + + GF_VALIDATE_OR_GOTO ("tier", demotion_args, out); + GF_VALIDATE_OR_GOTO ("tier", demotion_args->this, out); + GF_VALIDATE_OR_GOTO (demotion_args->this->name, + demotion_args->brick_list, out); + GF_VALIDATE_OR_GOTO (demotion_args->this->name, + demotion_args->defrag, out); + + query_cbk_args.this = demotion_args->this; + query_cbk_args.defrag = demotion_args->defrag; + + /*Build the query file using bricklist*/ + ret = tier_build_migration_qfile(demotion_args, &query_cbk_args, + _gf_false); + if (ret) + goto out; + + /* Migrate files using the query file */ + ret = tier_migrate_files_using_qfile (args, + &query_cbk_args, DEMOTION_QFILE); + if (ret) + goto out; + +out: + demotion_args->return_value = ret; + return NULL; +} + + +/*Promotion Thread*/ +static void +*tier_promote (void *args) +{ + int ret = -1; + query_cbk_args_t query_cbk_args; + promotion_args_t *promotion_args = args; + + GF_VALIDATE_OR_GOTO ("tier", promotion_args->this, out); + GF_VALIDATE_OR_GOTO (promotion_args->this->name, + promotion_args->brick_list, out); + GF_VALIDATE_OR_GOTO (promotion_args->this->name, + promotion_args->defrag, out); + + query_cbk_args.this = promotion_args->this; + query_cbk_args.defrag = promotion_args->defrag; + + /*Build the query file using bricklist*/ + ret = tier_build_migration_qfile(promotion_args, &query_cbk_args, + _gf_true); + if (ret) + goto out; + + /* Migrate files using the query file */ + ret = tier_migrate_files_using_qfile (args, + &query_cbk_args, + PROMOTION_QFILE); + if (ret) + goto out; + +out: + promotion_args->return_value = ret; + return NULL; +} + +static void +tier_get_bricklist (xlator_t *xl, dict_t *bricklist) +{ + xlator_list_t *child = NULL; + char *rv = NULL; + char *rh = NULL; + char localhost[256] = {0}; + char *db_path = ""; + char *brickname = NULL; + char db_name[PATH_MAX] = ""; + int ret = 0; + + GF_VALIDATE_OR_GOTO ("tier", xl, out); + GF_VALIDATE_OR_GOTO ("tier", bricklist, out); + + gethostname (localhost, sizeof (localhost)); + + /* + * This function obtains remote subvolumes and filters out only + * those running on the same node as the tier daemon. + */ + if (strcmp(xl->type, "protocol/client") == 0) { + ret = dict_get_str(xl->options, "remote-host", &rh); + if (ret < 0) + goto out; + + if (gf_is_local_addr (rh)) { + + ret = dict_get_str(xl->options, "remote-subvolume", + &rv); + if (ret < 0) + goto out; + brickname = strrchr(rv, '/') + 1; + snprintf(db_name, sizeof(db_name), "%s.db", + brickname); + db_path = GF_CALLOC (PATH_MAX, 1, gf_common_mt_char); + if (!db_path) { + gf_msg ("tier", GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_STATUS, + "Failed to allocate memory for bricklist"); + goto out; + } + + sprintf(db_path, "%s/.glusterfs/%s", rv, db_name); + if (dict_add_dynstr_with_alloc(bricklist, "brick", + db_path)) + goto out; + } + } + + for (child = xl->children; child; child = child->next) { + tier_get_bricklist(child->xlator, bricklist); + } +out: + return; +} + +int +tier_start (xlator_t *this, gf_defrag_info_t *defrag) +{ + dict_t *bricklist_cold = NULL; + dict_t *bricklist_hot = NULL; + dht_conf_t *conf = NULL; + int tick = 0; + int next_demote = 0; + int next_promote = 0; + int freq_promote = 0; + int freq_demote = 0; + promotion_args_t promotion_args = { 0 }; + demotion_args_t demotion_args = { 0 }; + int ret_promotion = 0; + int ret_demotion = 0; + int ret = 0; + pthread_t promote_thread; + pthread_t demote_thread; + + conf = this->private; + + bricklist_cold = dict_new(); + if (!bricklist_cold) + return -1; + + bricklist_hot = dict_new(); + if (!bricklist_hot) + return -1; + + tier_get_bricklist (conf->subvolumes[0], bricklist_cold); + tier_get_bricklist (conf->subvolumes[1], bricklist_hot); + + freq_promote = defrag->tier_promote_frequency; + freq_demote = defrag->tier_demote_frequency; + + next_promote = defrag->tier_promote_frequency % TIMER_SECS; + next_demote = defrag->tier_demote_frequency % TIMER_SECS; + + + gf_msg (this->name, GF_LOG_INFO, 0, + DHT_MSG_LOG_TIER_STATUS, "Begin run tier promote %d demote %d", + next_promote, next_demote); + + defrag->defrag_status = GF_DEFRAG_STATUS_STARTED; + + while (1) { + + sleep(1); + + ret_promotion = -1; + ret_demotion = -1; + + if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) { + ret = 1; + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "defrag->defrag_status != " + "GF_DEFRAG_STATUS_STARTED"); + goto out; + } + + tick = (tick + 1) % TIMER_SECS; + if ((next_demote != tick) && (next_promote != tick)) + continue; + + if (next_demote >= tick) { + demotion_args.this = this; + demotion_args.brick_list = bricklist_hot; + demotion_args.defrag = defrag; + demotion_args.freq_time = freq_demote; + ret_demotion = pthread_create (&demote_thread, NULL, + &tier_demote, &demotion_args); + if (ret_demotion) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "Failed starting Demotion thread!"); + } + freq_demote = defrag->tier_demote_frequency; + next_demote = (tick + freq_demote) % TIMER_SECS; + } + + if (next_promote >= tick) { + promotion_args.this = this; + promotion_args.brick_list = bricklist_cold; + promotion_args.defrag = defrag; + promotion_args.freq_time = freq_promote; + ret_promotion = pthread_create (&promote_thread, NULL, + &tier_promote, &promotion_args); + if (ret_promotion) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "Failed starting Promotion thread!"); + } + freq_promote = defrag->tier_promote_frequency; + next_promote = (tick + freq_promote) % TIMER_SECS; + } + + if (ret_demotion == 0) { + pthread_join (demote_thread, NULL); + if (demotion_args.return_value) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "Demotion failed!"); + } + ret_demotion = demotion_args.return_value; + } + + if (ret_promotion == 0) { + pthread_join (promote_thread, NULL); + if (promotion_args.return_value) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "Promotion failed!"); + } + ret_promotion = promotion_args.return_value; + } + + /*Collect previous and current cummulative status */ + ret = ret | ret_demotion | ret_promotion; + + /*reseting promotion and demotion arguments for next iteration*/ + memset (&demotion_args, 0, sizeof(demotion_args_t)); + memset (&promotion_args, 0, sizeof(promotion_args_t)); + + } + + ret = 0; +out: + + dict_unref(bricklist_cold); + dict_unref(bricklist_hot); + + return ret; +} + +int32_t +tier_migration_needed (xlator_t *this) +{ + gf_defrag_info_t *defrag = NULL; + dht_conf_t *conf = NULL; + int ret = 0; + + conf = this->private; + + GF_VALIDATE_OR_GOTO (this->name, conf, out); + GF_VALIDATE_OR_GOTO (this->name, conf->defrag, out); + + defrag = conf->defrag; + + if (defrag->cmd == GF_DEFRAG_CMD_START_TIER) + ret = 1; +out: + return ret; +} + +int32_t +tier_migration_get_dst (xlator_t *this, dht_local_t *local) +{ + dht_conf_t *conf = NULL; + int32_t ret = -1; + + GF_VALIDATE_OR_GOTO("tier", this, out); + GF_VALIDATE_OR_GOTO(this->name, this->private, out); + + conf = this->private; + if (!conf) + goto out; + + if (conf->subvolumes[0] == local->cached_subvol) + local->rebalance.target_node = + conf->subvolumes[1]; + else + local->rebalance.target_node = + conf->subvolumes[0]; + + if (local->rebalance.target_node) + ret = 0; + +out: + return ret; +} + +xlator_t * +tier_search (xlator_t *this, dht_layout_t *layout, const char *name) +{ + xlator_t *subvol = NULL; + void *value; + int search_first_subvol = 0; + + GF_VALIDATE_OR_GOTO("tier", this, out); + GF_VALIDATE_OR_GOTO(this->name, layout, out); + GF_VALIDATE_OR_GOTO(this->name, name, out); + + if (!dict_get_ptr (this->options, "rule", &value) && + !strcmp(layout->list[0].xlator->name, value)) { + search_first_subvol = 1; + } + + if (search_first_subvol) + subvol = layout->list[0].xlator; + else + subvol = layout->list[1].xlator; + +out: + return subvol; +} + + +dht_methods_t tier_methods = { + .migration_get_dst_subvol = tier_migration_get_dst, + .migration_other = tier_start, + .migration_needed = tier_migration_needed, + .layout_search = tier_search, +}; + + +int +tier_init (xlator_t *this) +{ + int ret = -1; + int freq = 0; + dht_conf_t *conf = NULL; + gf_defrag_info_t *defrag = NULL; + + ret = dht_init(this); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "dht_init failed"); + goto out; + } + + conf = this->private; + + conf->methods = &tier_methods; + + if (conf->subvolume_cnt != 2) { + gf_msg(this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "Invalid number of subvolumes %d", conf->subvolume_cnt); + goto out; + } + + /* if instatiated from client side initialization is complete. */ + if (!conf->defrag) { + ret = 0; + goto out; + } + + defrag = conf->defrag; + + GF_OPTION_INIT ("tier-promote-frequency", + defrag->tier_promote_frequency, + int32, out); + + ret = dict_get_int32 (this->options, + "tier-promote-frequency", &freq); + if (ret) { + freq = DEFAULT_PROMOTE_FREQ_SEC; + } + + defrag->tier_promote_frequency = freq; + + GF_OPTION_INIT ("tier-demote-frequency", + defrag->tier_demote_frequency, + int32, out); + + ret = dict_get_int32 (this->options, + "tier-demote-frequency", &freq); + if (ret) { + freq = DEFAULT_DEMOTE_FREQ_SEC; + } + + defrag->tier_demote_frequency = freq; + + GF_OPTION_INIT ("write-freq-threshold", + defrag->write_freq_threshold, + int32, out); + + GF_OPTION_INIT ("read-freq-threshold", + defrag->read_freq_threshold, + int32, out); + + gf_msg(this->name, GF_LOG_INFO, 0, + DHT_MSG_LOG_TIER_STATUS, + "Promote frequency set to %d demote set to %d", + defrag->tier_promote_frequency, + defrag->tier_demote_frequency); + + ret = 0; + +out: + return ret; +} + + +int +tier_reconfigure (xlator_t *this, dict_t *options) +{ + dht_conf_t *conf = NULL; + gf_defrag_info_t *defrag = NULL; + + conf = this->private; + + if (conf->defrag) { + defrag = conf->defrag; + GF_OPTION_RECONF ("tier-promote-frequency", + defrag->tier_promote_frequency, options, + int32, out); + + GF_OPTION_RECONF ("tier-demote-frequency", + defrag->tier_demote_frequency, options, + int32, out); + + GF_OPTION_RECONF ("write-freq-threshold", + defrag->write_freq_threshold, options, + int32, out); + + GF_OPTION_RECONF ("read-freq-threshold", + defrag->read_freq_threshold, options, + int32, out); + } + +out: + return dht_reconfigure (this, options); +} + +class_methods_t class_methods = { + .init = tier_init, + .fini = dht_fini, + .reconfigure = tier_reconfigure, + .notify = dht_notify +}; + + +struct xlator_fops fops = { + .lookup = dht_lookup, + .create = dht_create, + .mknod = dht_mknod, + + .stat = dht_stat, + .fstat = dht_fstat, + .truncate = dht_truncate, + .ftruncate = dht_ftruncate, + .access = dht_access, + .readlink = dht_readlink, + .setxattr = dht_setxattr, + .getxattr = dht_getxattr, + .removexattr = dht_removexattr, + .open = dht_open, + .readv = dht_readv, + .writev = dht_writev, + .flush = dht_flush, + .fsync = dht_fsync, + .statfs = dht_statfs, + .lk = dht_lk, + .opendir = dht_opendir, + .readdir = dht_readdir, + .readdirp = dht_readdirp, + .fsyncdir = dht_fsyncdir, + .symlink = dht_symlink, + .unlink = dht_unlink, + .link = dht_link, + .mkdir = dht_mkdir, + .rmdir = dht_rmdir, + .rename = dht_rename, + .inodelk = dht_inodelk, + .finodelk = dht_finodelk, + .entrylk = dht_entrylk, + .fentrylk = dht_fentrylk, + .xattrop = dht_xattrop, + .fxattrop = dht_fxattrop, + .setattr = dht_setattr, +}; + + +struct xlator_cbks cbks = { + .forget = dht_forget +}; + |