diff options
author | Joseph Fernandes <josferna@redhat.com> | 2016-01-26 17:47:08 +0530 |
---|---|---|
committer | Dan Lambright <dlambrig@redhat.com> | 2016-02-03 10:04:40 -0800 |
commit | 11202e6c726f79ddf0e461338d7dce158733122e (patch) | |
tree | df1381c8783ba8f5507f8311db4b9febf9536d10 /xlators | |
parent | 545f4ed2c7195a21210e6a055c27c1b7a115e18c (diff) |
tier/gfdb : Round-Robin read of query files
1. Each brick on a host will get a separate query file.
2. While reading query record from these query files we
read them in a Round-Robin manner.
3. When an error occurs during migration we rename it to
query file with an time stamp and .err extension for
better debugging.
Change-Id: I27c4285d24fd695d2d5cbd9fd7db3879d277ecc8
BUG: 1302772
Signed-off-by: Joseph Fernandes <josferna@redhat.com>
Reviewed-on: http://review.gluster.org/13293
Smoke: Gluster Build System <jenkins@build.gluster.com>
Tested-by: N Balachandran <nbalacha@redhat.com>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
Reviewed-by: Dan Lambright <dlambrig@redhat.com>
CentOS-regression: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'xlators')
-rw-r--r-- | xlators/cluster/dht/src/dht-mem-types.h | 1 | ||||
-rw-r--r-- | xlators/cluster/dht/src/tier.c | 380 | ||||
-rw-r--r-- | xlators/cluster/dht/src/tier.h | 26 |
3 files changed, 326 insertions, 81 deletions
diff --git a/xlators/cluster/dht/src/dht-mem-types.h b/xlators/cluster/dht/src/dht-mem-types.h index 27ebf9dc501..5de5d1838ad 100644 --- a/xlators/cluster/dht/src/dht-mem-types.h +++ b/xlators/cluster/dht/src/dht-mem-types.h @@ -37,6 +37,7 @@ enum gf_dht_mem_types_ { gf_tier_mt_bricklist_t, gf_tier_mt_ipc_ctr_params_t, gf_dht_mt_fd_ctx_t, + gf_tier_mt_qfile_array_t, gf_dht_mt_end }; #endif diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c index 7be5c4a2b81..8a756b88c27 100644 --- a/xlators/cluster/dht/src/tier.c +++ b/xlators/cluster/dht/src/tier.c @@ -22,7 +22,9 @@ static gfdb_db_type_t dht_tier_db_type = GFDB_SQLITE3; /*Mutex for updating the data movement stats*/ static pthread_mutex_t dm_stat_mutex = PTHREAD_MUTEX_INITIALIZER; +/* Stores the path location of promotion query files */ static char *promotion_qfile; +/* Stores the path location of demotion query files */ static char *demotion_qfile; static void *libhandle; @@ -30,6 +32,162 @@ static gfdb_methods_t gfdb_methods; #define DB_QUERY_RECORD_SIZE 4096 +/* + * Closes all the fds and frees the qfile_array + * */ +static void +qfile_array_free (tier_qfile_array_t *qfile_array) +{ + ssize_t i = 0; + + if (qfile_array) { + if (qfile_array->fd_array) { + for (i = 0; i < qfile_array->array_size; i++) { + if (qfile_array->fd_array[i] != -1) { + sys_close (qfile_array->fd_array[i]); + } + } + } + GF_FREE (qfile_array->fd_array); + } + GF_FREE (qfile_array); +} + + +/* Create a new query file list with given size */ +static tier_qfile_array_t * +qfile_array_new (ssize_t array_size) +{ + int ret = -1; + tier_qfile_array_t *qfile_array = NULL; + ssize_t i = 0; + + GF_VALIDATE_OR_GOTO ("tier", (array_size > 0), out); + + qfile_array = GF_CALLOC (1, sizeof (tier_qfile_array_t), + gf_tier_mt_qfile_array_t); + if (!qfile_array) { + gf_msg ("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, + "Failed to allocate memory for tier_qfile_array_t"); + goto out; + } + + qfile_array->fd_array = GF_CALLOC (array_size, sizeof (int), + gf_dht_mt_int32_t); + if (!qfile_array->fd_array) { + gf_msg ("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, + "Failed to allocate memory for " + "tier_qfile_array_t->fd_array"); + goto out; + } + + /* Init all the fds to -1 */ + for (i = 0; i < array_size; i++) { + qfile_array->fd_array[i] = -1; + } + + qfile_array->array_size = array_size; + qfile_array->next_index = 0; + + /* Set exhausted count to list size as the list is empty */ + qfile_array->exhausted_count = qfile_array->array_size; + + ret = 0; +out: + if (ret) { + qfile_array_free (qfile_array); + qfile_array = NULL; + } + return qfile_array; +} + + +/* Checks if the query file list is empty or totally exhausted. */ +static gf_boolean_t +is_qfile_array_empty (tier_qfile_array_t *qfile_array) +{ + return (qfile_array->exhausted_count == qfile_array->array_size) ? + _gf_true : _gf_false; +} + + +/* Shifts the next_fd pointer to the next available fd in the list */ +static void +shift_next_index (tier_qfile_array_t *qfile_array) +{ + int qfile_fd = 0; + int spin_count = 0; + + if (is_qfile_array_empty (qfile_array)) { + return; + } + + do { + /* change next_index in a rotional manner */ + (qfile_array->next_index == (qfile_array->array_size - 1)) ? + qfile_array->next_index = 0 : qfile_array->next_index++; + + qfile_fd = (qfile_array->fd_array[qfile_array->next_index]); + + spin_count++; + + } while ((qfile_fd == -1) && (spin_count < qfile_array->array_size)); + +} + +/* + * This is a non-thread safe function to read query records + * from a list of query files in a Round-Robin manner. + * As in when the query files get exhuasted they are closed. + * Returns: + * 0 if all the query records in all the query files of the list are + * exhausted. + * > 0 if a query record is successfully read. Indicates the size of the query + * record read. + * < 0 if there was failure + * */ +static int +read_query_record_list (tier_qfile_array_t *qfile_array, + gfdb_query_record_t **query_record) +{ + int ret = -1; + int qfile_fd = 0; + + GF_VALIDATE_OR_GOTO ("tier", qfile_array, out); + GF_VALIDATE_OR_GOTO ("tier", qfile_array->fd_array, out); + + do { + if (is_qfile_array_empty (qfile_array)) { + ret = 0; + break; + } + + qfile_fd = qfile_array->fd_array[qfile_array->next_index]; + ret = gfdb_methods.gfdb_read_query_record + (qfile_fd, query_record); + if (ret <= 0) { + /*The qfile_fd has reached EOF or + * there was an error. + * 1. Close the exhausted fd + * 2. increment the exhausted count + * 3. shift next_qfile to next qfile + **/ + sys_close (qfile_fd); + qfile_array->fd_array[qfile_array->next_index] = -1; + qfile_array->exhausted_count++; + /* shift next_qfile to next qfile */ + shift_next_index (qfile_array); + continue; + } else { + /* shift next_qfile to next qfile */ + shift_next_index (qfile_array); + break; + } + } while (1); +out: + return ret; +} + /* Check and update the watermark every WM_INTERVAL seconds */ #define WM_INTERVAL 5 @@ -126,8 +284,10 @@ tier_check_watermark (xlator_t *this, loc_t *root_loc) goto exit; } - /* Find how much free space is on the hot subvolume. Then see if that value */ - /* is less than or greater than user defined watermarks. Stash results in */ + /* Find how much free space is on the hot subvolume. + * Then see if that value */ + /* is less than or greater than user defined watermarks. + * Stash results in */ /* the tier_conf data structure. */ ret = syncop_statfs (conf->subvolumes[1], root_loc, &statfs, @@ -255,7 +415,7 @@ static int tier_migrate_using_query_file (void *_args) { int ret = -1; - query_cbk_args_t *query_cbk_args = (query_cbk_args_t *) _args; + query_cbk_args_t *query_cbk_args = (query_cbk_args_t *) _args; xlator_t *this = NULL; gf_defrag_info_t *defrag = NULL; gfdb_query_record_t *query_record = NULL; @@ -278,7 +438,6 @@ tier_migrate_using_query_file (void *_args) int per_file_status = 0; int per_link_status = 0; int total_status = 0; - int query_fd = 0; xlator_t *src_subvol = NULL; dht_conf_t *conf = NULL; uint64_t total_migrated_bytes = 0; @@ -293,15 +452,13 @@ tier_migrate_using_query_file (void *_args) GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out); this = query_cbk_args->this; GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->defrag, out); - GF_VALIDATE_OR_GOTO (this->name, (query_cbk_args->query_fd > 0), out); + GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->qfile_array, out); GF_VALIDATE_OR_GOTO (this->name, this->private, out); conf = this->private; defrag = query_cbk_args->defrag; - query_fd = query_cbk_args->query_fd; - migrate_data = dict_new (); if (!migrate_data) goto out; @@ -333,8 +490,8 @@ tier_migrate_using_query_file (void *_args) } /* Per file */ - while ((ret = gfdb_methods.gfdb_read_query_record - (query_fd, &query_record)) != 0) { + while ((ret = read_query_record_list (query_cbk_args->qfile_array, + &query_record)) != 0) { if (ret < 0) { gf_msg (this->name, GF_LOG_ERROR, 0, @@ -773,16 +930,14 @@ tier_process_self_query (tier_brick_list_t *local_brick, void *args) } /* Query for eligible files from db */ - query_cbk_args->query_fd = open (GET_QFILE_PATH - (gfdb_brick_info->_gfdb_promote), + query_cbk_args->query_fd = open (local_brick->qfile_path, O_WRONLY | O_CREAT | O_APPEND, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); if (query_cbk_args->query_fd < 0) { gf_msg (this->name, GF_LOG_ERROR, errno, DHT_MSG_LOG_TIER_ERROR, "Failed to open query file %s", - GET_QFILE_PATH - (gfdb_brick_info->_gfdb_promote)); + local_brick->qfile_path); goto out; } if (!gfdb_brick_info->_gfdb_promote) { @@ -872,6 +1027,7 @@ out: query_cbk_args->query_fd = -1; } gfdb_methods.fini_db (conn_node); + return ret; } @@ -938,9 +1094,10 @@ tier_process_ctr_query (tier_brick_list_t *local_brick, void *args) GFDB_IPC_CTR_KEY, GFDB_IPC_CTR_QUERY_OPS, ret, out); + SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_in_dict, GFDB_IPC_CTR_GET_QFILE_PATH, - GET_QFILE_PATH(ipc_ctr_params->is_promote), + local_brick->qfile_path, ret, out); ret = dict_set_bin (ctr_ipc_in_dict, GFDB_IPC_CTR_GET_QUERY_PARAMS, @@ -1003,7 +1160,7 @@ out: -/*This is the call back function for each brick from hot/cold bricklist +/* This is the call back function for each brick from hot/cold bricklist * It picks up each bricks db and queries for eligible files for migration. * The list of eligible files are populated in appropriate query files*/ static int @@ -1122,20 +1279,7 @@ tier_build_migration_qfile (demotion_args_t *args, gfdb_time_t time_in_past; int ret = -1; tier_brick_list_t *local_brick = NULL; - - /* - * The first time this function is called, query file will - * not exist on a given instance of running the migration daemon. - * The remove call is optimistic and it is legal if it fails. - */ - - ret = remove (GET_QFILE_PATH (is_promotion)); - if (ret == -1) { - gf_msg_trace (args->this->name, errno, - "Failed to remove %s", - GET_QFILE_PATH (is_promotion)); - } - + int i = 0; time_in_past.tv_sec = args->freq_time; time_in_past.tv_usec = 0; @@ -1160,6 +1304,21 @@ tier_build_migration_qfile (demotion_args_t *args, gfdb_brick_info._query_cbk_args = query_cbk_args; list_for_each_entry (local_brick, args->brick_list, list) { + + /* Construct query file path for this brick + * i.e + * /var/run/gluster/xlator_name/ + * {promote/demote}-brickname-indexinbricklist + * So that no two query files will have same path even + * bricks have the same name + * */ + snprintf (local_brick->qfile_path, PATH_MAX , "%s-%s-%d", + GET_QFILE_PATH (gfdb_brick_info._gfdb_promote), + local_brick->brick_name, i); + + /* Delete any old query files for this brick */ + sys_unlink (local_brick->qfile_path); + ret = tier_process_brick (local_brick, &gfdb_brick_info); if (ret) { @@ -1168,6 +1327,7 @@ tier_build_migration_qfile (demotion_args_t *args, "Brick %s query failed\n", local_brick->brick_db_path); } + i++; } ret = 0; out: @@ -1176,30 +1336,80 @@ out: static int tier_migrate_files_using_qfile (demotion_args_t *comp, - query_cbk_args_t *query_cbk_args, - char *qfile) + query_cbk_args_t *query_cbk_args) { - char renamed_file[PATH_MAX] = ""; - int ret = -1; - - query_cbk_args->query_fd = open (qfile, O_RDONLY); - if (query_cbk_args->query_fd < 0) { - gf_msg ("tier", GF_LOG_ERROR, errno, - DHT_MSG_FOPEN_FAILED, - "Failed to open %s for migration", qfile); + int ret = -1; + tier_brick_list_t *local_brick = NULL; + tier_brick_list_t *temp = NULL; + char query_file_path_err[PATH_MAX] = ""; + struct tm tm = {0}; + gfdb_time_t current_time = {0}; + char time_str[256] = {0}; + char time_format[20] = "%Y-%m-%d-%H-%M-%S"; + ssize_t qfile_array_size = 0; + int count = 0; + int temp_fd = 0; + + /* Time format for error query files */ + gettimeofday (¤t_time, NULL); + gmtime_r (¤t_time.tv_sec, &tm); + strftime (time_str, 256, time_format, &tm); + + /* Build the qfile list */ + list_for_each_entry_safe (local_brick, temp, comp->brick_list, list) { + qfile_array_size++; + } + query_cbk_args->qfile_array = qfile_array_new (qfile_array_size); + if (!query_cbk_args->qfile_array) { + gf_msg ("tier", GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, "Failed to create new " + "qfile_array"); goto out; } + + /*Open all qfiles*/ + count = 0; + query_cbk_args->qfile_array->exhausted_count = 0; + list_for_each_entry_safe (local_brick, temp, comp->brick_list, list) { + temp_fd = query_cbk_args->qfile_array->fd_array[count]; + temp_fd = open (local_brick->qfile_path, O_RDONLY, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (temp_fd < 0) { + gf_msg ("tier", GF_LOG_ERROR, errno, + DHT_MSG_LOG_TIER_ERROR, "Failed to open " + "%s to the query file", + local_brick->qfile_path); + query_cbk_args->qfile_array->exhausted_count++; + } + query_cbk_args->qfile_array->fd_array[count] = temp_fd; + count++; + } + + /* Migrate files using query file list */ ret = tier_migrate_using_query_file ((void *)query_cbk_args); - sys_close (query_cbk_args->query_fd); - query_cbk_args->query_fd = -1; +out: + qfile_array_free (query_cbk_args->qfile_array); + + /* If there is an error rename all the query files to .err files + * with a timestamp for better debugging */ if (ret) { - snprintf (renamed_file, sizeof renamed_file, "%s.err", qfile); - sys_rename (qfile, renamed_file); + list_for_each_entry_safe (local_brick, temp, comp->brick_list, + list) { + /* rename error qfile*/ + snprintf (query_file_path_err, PATH_MAX, "%s-%s.err", + local_brick->qfile_path, time_str); + sys_rename (local_brick->qfile_path, + query_file_path_err); + } } -out: + + query_cbk_args->qfile_array = NULL; + return ret; } + + /*Demotion Thread*/ static void * tier_demote (void *args) @@ -1229,7 +1439,7 @@ tier_demote (void *args) /* Migrate files using the query file */ ret = tier_migrate_files_using_qfile (args, - &query_cbk_args, demotion_qfile); + &query_cbk_args); if (ret) goto out; @@ -1266,9 +1476,7 @@ static void goto out; /* Migrate files using the query file */ - ret = tier_migrate_files_using_qfile (args, - &query_cbk_args, - promotion_qfile); + ret = tier_migrate_files_using_qfile (args, &query_cbk_args); if (ret) goto out; @@ -1280,14 +1488,14 @@ out: static int tier_get_bricklist (xlator_t *xl, struct list_head *local_bricklist_head) { - xlator_list_t *child = NULL; - char *rv = NULL; - char *rh = NULL; - char localhost[256] = {0}; - char *brickname = NULL; - char db_name[PATH_MAX] = ""; - int ret = 0; - tier_brick_list_t *local_brick = NULL; + xlator_list_t *child = NULL; + char *rv = NULL; + char *rh = NULL; + char localhost[256] = {0}; + char *brickname = NULL; + char db_name[PATH_MAX] = ""; + int ret = 0; + tier_brick_list_t *local_brick = NULL; GF_VALIDATE_OR_GOTO ("tier", xl, out); GF_VALIDATE_OR_GOTO ("tier", local_bricklist_head, out); @@ -1325,18 +1533,22 @@ tier_get_bricklist (xlator_t *xl, struct list_head *local_bricklist_head) if (!local_brick->brick_db_path) { gf_msg ("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_STATUS, - "Faile. to allocate memory for bricklist"); + "Failed to allocate memory for" + " bricklist."); goto out; } - snprintf(local_brick->brick_db_path, PATH_MAX, "%s/%s/%s", rv, - GF_HIDDEN_PATH, - db_name); + snprintf(local_brick->brick_db_path, + PATH_MAX, "%s/%s/%s", rv, + GF_HIDDEN_PATH, db_name); local_brick->xlator = xl; + snprintf (local_brick->brick_name, + NAME_MAX, "%s", brickname); + list_add_tail (&(local_brick->list), - local_bricklist_head); + local_bricklist_head); ret = 0; goto out; @@ -1420,6 +1632,34 @@ clear_bricklist (struct list_head *brick_list) } +static void +set_brick_list_qpath (struct list_head *brick_list, gf_boolean_t is_cold) +{ + + tier_brick_list_t *local_brick = NULL; + int i = 0; + + GF_VALIDATE_OR_GOTO ("tier", brick_list, out); + + list_for_each_entry (local_brick, brick_list, list) { + + /* Construct query file path for this brick + * i.e + * /var/run/gluster/xlator_name/ + * {promote/demote}-brickname-indexinbricklist + * So that no two query files will have same path even + * bricks have the same name + * */ + snprintf (local_brick->qfile_path, PATH_MAX , "%s-%s-%d", + GET_QFILE_PATH (is_cold), + local_brick->brick_name, i); + i++; + } +out: + return; +} + + int tier_start (xlator_t *this, gf_defrag_info_t *defrag) { @@ -1453,7 +1693,9 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag) INIT_LIST_HEAD ((&bricklist_cold)); tier_get_bricklist (conf->subvolumes[0], &bricklist_cold); + set_brick_list_qpath (&bricklist_cold, _gf_true); tier_get_bricklist (conf->subvolumes[1], &bricklist_hot); + set_brick_list_qpath (&bricklist_hot, _gf_false); is_hot_list_empty = list_empty(&bricklist_hot); is_cold_list_empty = list_empty(&bricklist_cold); @@ -1948,27 +2190,20 @@ tier_init (xlator_t *this) GF_FREE(voldir); - ret = gf_asprintf (&promotion_qfile, "%s/%s/%s-%s", + ret = gf_asprintf (&promotion_qfile, "%s/%s/promote", DEFAULT_VAR_RUN_DIRECTORY, - this->name, - PROMOTION_QFILE, this->name); if (ret < 0) goto out; - ret = gf_asprintf (&demotion_qfile, "%s/%s/%s-%s", + ret = gf_asprintf (&demotion_qfile, "%s/%s/demote", DEFAULT_VAR_RUN_DIRECTORY, - this->name, - DEMOTION_QFILE, this->name); if (ret < 0) { GF_FREE (promotion_qfile); goto out; } - sys_unlink(promotion_qfile); - sys_unlink(demotion_qfile); - gf_msg (this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS, "Promote/demote frequency %d/%d " @@ -1978,11 +2213,6 @@ tier_init (xlator_t *this) defrag->write_freq_threshold, defrag->read_freq_threshold); - gf_msg (this->name, GF_LOG_INFO, 0, - DHT_MSG_LOG_TIER_STATUS, - "Promote file %s demote file %s", - promotion_qfile, demotion_qfile); - ret = 0; out: diff --git a/xlators/cluster/dht/src/tier.h b/xlators/cluster/dht/src/tier.h index 92e2fda6e5c..41c5a318de4 100644 --- a/xlators/cluster/dht/src/tier.h +++ b/xlators/cluster/dht/src/tier.h @@ -39,25 +39,39 @@ #define GET_QFILE_PATH(is_promotion)\ (is_promotion) ? promotion_qfile : demotion_qfile +typedef struct tier_qfile_array { + int *fd_array; + ssize_t array_size; + ssize_t next_index; + /* Indicate the number of exhuasted FDs*/ + ssize_t exhausted_count; +} tier_qfile_array_t; + + typedef struct _query_cbk_args { - xlator_t *this; - gf_defrag_info_t *defrag; - int query_fd; - int is_promotion; + xlator_t *this; + gf_defrag_info_t *defrag; + /* This is write */ + int query_fd; + int is_promotion; + /* This is for read */ + tier_qfile_array_t *qfile_array; } query_cbk_args_t; int gf_run_tier(xlator_t *this, gf_defrag_info_t *defrag); typedef struct gfdb_brick_info { - gfdb_time_t *time_stamp; + gfdb_time_t *time_stamp; gf_boolean_t _gfdb_promote; - query_cbk_args_t *_query_cbk_args; + query_cbk_args_t *_query_cbk_args; } gfdb_brick_info_t; typedef struct brick_list { xlator_t *xlator; char *brick_db_path; + char brick_name[NAME_MAX]; + char qfile_path[PATH_MAX]; struct list_head list; } tier_brick_list_t; |