diff options
Diffstat (limited to 'xlators/cluster')
-rw-r--r-- | xlators/cluster/dht/src/tier.c | 340 | ||||
-rw-r--r-- | xlators/cluster/dht/src/tier.h | 6 |
2 files changed, 152 insertions, 194 deletions
diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c index 010e43c4107..51642e48970 100644 --- a/xlators/cluster/dht/src/tier.c +++ b/xlators/cluster/dht/src/tier.c @@ -32,49 +32,6 @@ static gfdb_methods_t gfdb_methods; static int -tier_parse_query_str (char *query_record_str, - char *gfid, char *link_buffer, ssize_t *link_size) -{ - char *token_str = NULL; - char *delimiter = "|"; - char *saveptr = NULL; - int ret = -1; - - GF_VALIDATE_OR_GOTO ("tier", query_record_str, out); - GF_VALIDATE_OR_GOTO ("tier", gfid, out); - GF_VALIDATE_OR_GOTO ("tier", link_buffer, out); - GF_VALIDATE_OR_GOTO ("tier", link_size, out); - - token_str = strtok_r (query_record_str, delimiter, &saveptr); - if (!token_str) - goto out; - - strcpy (gfid, token_str); - - - token_str = strtok_r (NULL, delimiter, &saveptr); - if (!token_str) - goto out; - - strcpy (link_buffer, token_str); - - token_str = strtok_r (NULL, delimiter, &saveptr); - if (!token_str) - goto out; - - *link_size = atoi (token_str); - - ret = 0; -out: - return ret; -} - -/* - * return 0 if the same node. - * return 1 if not the same node, but no errors. - * return -1 if errors.xs - */ -static int tier_check_same_node (xlator_t *this, loc_t *loc, gf_defrag_info_t *defrag) { int ret = -1; @@ -239,14 +196,9 @@ static int tier_migrate_using_query_file (void *_args) { int ret = -1; - char gfid_str[UUID_CANONICAL_FORM_LEN+1] = ""; - char query_record_str[4096] = ""; query_cbk_args_t *query_cbk_args = (query_cbk_args_t *) _args; xlator_t *this = NULL; gf_defrag_info_t *defrag = NULL; - char *token_str = NULL; - char *delimiter = "::"; - char *link_buffer = NULL; gfdb_query_record_t *query_record = NULL; gfdb_link_info_t *link_info = NULL; struct iatt par_stbuf = {0,}; @@ -254,6 +206,9 @@ tier_migrate_using_query_file (void *_args) loc_t p_loc = {0,}; loc_t loc = {0,}; dict_t *migrate_data = NULL; + dict_t *xdata_request = NULL; + dict_t *xdata_response = NULL; + char *parent_path = NULL; inode_t *linked_inode = NULL; /* * per_file_status and per_link_status @@ -264,8 +219,7 @@ tier_migrate_using_query_file (void *_args) int per_file_status = 0; int per_link_status = 0; int total_status = 0; - FILE *queryFILE = NULL; - char *link_str = NULL; + int query_fd = 0; xlator_t *src_subvol = NULL; dht_conf_t *conf = NULL; uint64_t total_migrated_bytes = 0; @@ -275,75 +229,72 @@ tier_migrate_using_query_file (void *_args) GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out); this = query_cbk_args->this; GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->defrag, out); - GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->queryFILE, out); + GF_VALIDATE_OR_GOTO (this->name, (query_cbk_args->query_fd > 0), out); GF_VALIDATE_OR_GOTO (this->name, this->private, out); conf = this->private; defrag = query_cbk_args->defrag; - queryFILE = query_cbk_args->queryFILE; + query_fd = query_cbk_args->query_fd; + + migrate_data = dict_new (); + if (!migrate_data) + goto out; - query_record = gfdb_query_record_init (); - if (!query_record) { + xdata_request = dict_new (); + if (!xdata_request) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, - "Call to gfdb_query_record_init() failed."); + "Failed to create xdata_request dict"); goto out; } - - query_record->_link_info_str = GF_CALLOC (1, DB_QUERY_RECORD_SIZE, - gf_common_mt_char); - if (!query_record->_link_info_str) { + ret = dict_set_int32 (xdata_request, + GET_ANCESTRY_PATH_KEY, 42); + if (ret) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, - "Allocating query record link info string failed."); + "Failed to set value to dict : key %s \n", + GET_ANCESTRY_PATH_KEY); goto out; } - link_buffer = query_record->_link_info_str; - link_info = gfdb_link_info_init (); - - migrate_data = dict_new (); - if (!migrate_data) - goto out; /* Per file */ - while (fgets (query_record_str, sizeof (query_record_str), queryFILE)) { + while ((ret = gfdb_methods.gfdb_read_query_record + (query_fd, &query_record)) != 0) { + + if (ret < 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "Failed to fetch query record " + "from query file"); + goto out; + } per_file_status = 0; per_link_status = 0; + dict_del (migrate_data, GF_XATTR_FILE_MIGRATE_KEY); + + dict_del (migrate_data, "from.migrator"); + if (defrag->tier_conf.request_pause) { gf_msg (this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS, - "Tiering paused. Exiting tier_migrate_using_query_file"); + "Tiering paused. " + "Exiting tier_migrate_using_query_file"); break; } - memset (gfid_str, 0, sizeof (gfid_str)); - memset (query_record->_link_info_str, 0, DB_QUERY_RECORD_SIZE); - - if (tier_parse_query_str (query_record_str, gfid_str, - link_buffer, - &query_record->link_info_size)) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_LOG_TIER_ERROR, - "failed parsing %s\n", query_record_str); + if (!tier_do_migration (this, query_cbk_args->is_promotion)) { + gfdb_methods.gfdb_query_record_free (query_record); + query_record = NULL; continue; } - if (!tier_do_migration (this, query_cbk_args->is_promotion)) - continue; - - gf_uuid_parse (gfid_str, query_record->gfid); - - dict_del (migrate_data, GF_XATTR_FILE_MIGRATE_KEY); - - dict_del (migrate_data, "from.migrator"); - token_str = strtok (link_buffer, delimiter); - if (token_str != NULL) { + if (!list_empty (&query_record->link_list)) { per_file_status = dict_set_str (migrate_data, GF_XATTR_FILE_MIGRATE_KEY, @@ -379,48 +330,29 @@ tier_migrate_using_query_file (void *_args) } per_link_status = 0; - /* Per link of file */ - while (token_str != NULL) { - - if (defrag->tier_conf.request_pause) { - gf_msg (this->name, GF_LOG_INFO, 0, - DHT_MSG_LOG_TIER_STATUS, - "Tiering paused. " - "Exiting tier_migrate_using_query_file"); - goto abort; - } - - link_str = gf_strdup (token_str); - - if (!link_info) { - per_link_status = -1; - goto per_file_out; - } - - memset (link_info, 0, sizeof(gfdb_link_info_t)); - - ret = str_to_link_info (link_str, link_info); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_LOG_TIER_ERROR, - "failed parsing %s\n", link_str); - per_link_status = -1; - goto abort; - } + /* For now we only support single link migration. And we will + * ignore other hard links in the link info list of query record + * TODO: Multiple hard links migration */ + if (!list_empty (&query_record->link_list)) { + link_info = list_first_entry + (&query_record->link_list, + gfdb_link_info_t, list); + } + if (link_info != NULL) { + /* Lookup for parent and get the path of parent */ gf_uuid_copy (p_loc.gfid, link_info->pargfid); - p_loc.inode = inode_new (defrag->root_inode->table); if (!p_loc.inode) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, - "failed parsing %s\n", link_str); + "Failed to create reference to inode"); per_link_status = -1; goto abort; } ret = syncop_lookup (this, &p_loc, &par_stbuf, NULL, - NULL, NULL); + xdata_request, &xdata_response); if (ret) { gf_msg (this->name, GF_LOG_ERROR, -ret, DHT_MSG_LOG_TIER_ERROR, @@ -428,42 +360,60 @@ tier_migrate_using_query_file (void *_args) per_link_status = -1; goto abort; } + ret = dict_get_str (xdata_response, + GET_ANCESTRY_PATH_KEY, + &parent_path); + if (ret || !parent_path) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "Failed to get parent path\n"); + per_link_status = -1; + goto abort; + } + linked_inode = inode_link (p_loc.inode, NULL, NULL, &par_stbuf); inode_unref (p_loc.inode); p_loc.inode = linked_inode; + + + + /* Preparing File Inode */ gf_uuid_copy (loc.gfid, query_record->gfid); loc.inode = inode_new (defrag->root_inode->table); gf_uuid_copy (loc.pargfid, link_info->pargfid); loc.parent = inode_ref (p_loc.inode); + /* Get filename and Construct file path */ loc.name = gf_strdup (link_info->file_name); if (!loc.name) { gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_LOG_TIER_ERROR, "ERROR in " - "memory allocation\n"); + DHT_MSG_LOG_TIER_ERROR, "Memory " + "allocation failed.\n"); per_link_status = -1; goto abort; } - - loc.path = gf_strdup (link_info->file_path); - if (!loc.path) { + ret = gf_asprintf((char **)&(loc.path), "%s/%s", + parent_path, loc.name); + if (ret < 0) { gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_LOG_TIER_ERROR, "ERROR in " - "memory allocation\n"); + DHT_MSG_LOG_TIER_ERROR, "Failed to " + "construct file path for %s %s\n", + parent_path, loc.name); per_link_status = -1; goto abort; } gf_uuid_copy (loc.parent->gfid, link_info->pargfid); + /* lookup file inode */ ret = syncop_lookup (this, &loc, ¤t, NULL, NULL, NULL); if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_LOG_TIER_ERROR, "ERROR in " - "current lookup\n"); + gf_msg (this->name, GF_LOG_ERROR, -ret, + DHT_MSG_LOG_TIER_ERROR, "Failed to do " + "lookup on file %s\n", loc.name); per_link_status = -1; goto abort; } @@ -472,6 +422,7 @@ tier_migrate_using_query_file (void *_args) inode_unref (loc.inode); loc.inode = linked_inode; + /* * Do not promote/demote if file already is where it * should be. It means another brick moved the file @@ -507,7 +458,7 @@ tier_migrate_using_query_file (void *_args) goto abort; } ret = 0; - /* By setting per_linl_status to 1 we are + /* By setting per_link_status to 1 we are * ignoring this status and will not be counting * this file for migration */ per_link_status = 1; @@ -520,18 +471,18 @@ tier_migrate_using_query_file (void *_args) gf_msg (this->name, GF_LOG_INFO, 0, DHT_MSG_LOG_TIER_STATUS, "Tiering paused. " - "Exiting tier_migrate_using_query_file"); + "Exiting " + "tier_migrate_using_query_file"); goto abort; } + /* Data migration */ ret = syncop_setxattr (this, &loc, migrate_data, 0, NULL, NULL); if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - DHT_MSG_LOG_TIER_ERROR, "ERROR %d in " - "current migration %s %s\n", ret, - loc.name, - loc.path); + gf_msg (this->name, GF_LOG_ERROR, -ret, + DHT_MSG_LOG_TIER_ERROR, "Failed to " + "migrate %s \n", loc.name); per_link_status = -1; goto abort; } @@ -560,6 +511,11 @@ tier_migrate_using_query_file (void *_args) defrag->tier_conf.blocks_total; pthread_mutex_unlock (&dm_stat_mutex); } +abort: + GF_FREE ((char *) loc.name); + loc.name = NULL; + loc_wipe (&loc); + loc_wipe (&p_loc); if ((++total_files > defrag->tier_conf.max_migrate_files) || (total_migrated_bytes > defrag->tier_conf.max_migrate_bytes)) { @@ -571,15 +527,6 @@ tier_migrate_using_query_file (void *_args) total_files); goto out; } - -abort: - loc_wipe(&loc); - loc_wipe(&p_loc); - - token_str = NULL; - token_str = strtok (NULL, delimiter); - GF_FREE (link_str); - } per_file_status = per_link_status; per_file_out: @@ -597,36 +544,45 @@ per_file_out: total_status = total_status + per_file_status; per_link_status = 0; per_file_status = 0; - query_record_str[0] = '\0'; + + gfdb_methods.gfdb_query_record_free (query_record); + query_record = NULL; } out: - if (link_buffer) - GF_FREE (link_buffer); - gfdb_link_info_fini (&link_info); + if (xdata_request) { + dict_unref (xdata_request); + } + if (migrate_data) dict_unref (migrate_data); - gfdb_query_record_fini (&query_record); + + + gfdb_methods.gfdb_query_record_free (query_record); + query_record = NULL; + return total_status; } -/*This is the call back function per record/file from data base*/ +/* This is the call back function per record/file from data base */ static int tier_gf_query_callback (gfdb_query_record_t *gfdb_query_record, void *_args) { int ret = -1; - char gfid_str[UUID_CANONICAL_FORM_LEN+1] = ""; query_cbk_args_t *query_cbk_args = _args; GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out); GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->defrag, out); - GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->queryFILE, out); + GF_VALIDATE_OR_GOTO ("tier", (query_cbk_args->query_fd > 0), out); - gf_uuid_unparse (gfdb_query_record->gfid, gfid_str); - fprintf (query_cbk_args->queryFILE, "%s|%s|%zd\n", gfid_str, - gfdb_query_record->_link_info_str, - gfdb_query_record->link_info_size); + ret = gfdb_methods.gfdb_write_query_record (query_cbk_args->query_fd, + gfdb_query_record); + if (ret) { + gf_msg ("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, + "Failed writing query record to query file"); + goto out; + } pthread_mutex_lock (&dm_stat_mutex); query_cbk_args->defrag->num_files_lookedup++; @@ -640,7 +596,7 @@ out: -/*Create query file in tier process*/ +/* Create query file in tier process */ static int tier_process_self_query (tier_brick_list_t *local_brick, void *args) { @@ -651,17 +607,17 @@ tier_process_self_query (tier_brick_list_t *local_brick, void *args) gfdb_conn_node_t *conn_node = NULL; dict_t *params_dict = NULL; dict_t *ctr_ipc_dict = NULL; - _gfdb_brick_dict_info_t *gfdb_brick_dict_info = args; + gfdb_brick_info_t *gfdb_brick_info = args; /*Init of all the essentials*/ - GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_dict_info , out); - query_cbk_args = gfdb_brick_dict_info->_query_cbk_args; + GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_info , out); + query_cbk_args = gfdb_brick_info->_query_cbk_args; GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out); this = query_cbk_args->this; GF_VALIDATE_OR_GOTO (this->name, - gfdb_brick_dict_info->_query_cbk_args, out); + gfdb_brick_info->_query_cbk_args, out); GF_VALIDATE_OR_GOTO (this->name, local_brick, out); @@ -692,31 +648,33 @@ tier_process_self_query (tier_brick_list_t *local_brick, void *args) goto out; } - /*Query for eligible files from db*/ - query_cbk_args->queryFILE = fopen ( - GET_QFILE_PATH (gfdb_brick_dict_info->_gfdb_promote), "a+"); - if (!query_cbk_args->queryFILE) { + /* Query for eligible files from db */ + query_cbk_args->query_fd = open (GET_QFILE_PATH + (gfdb_brick_info->_gfdb_promote), + O_WRONLY | O_CREAT | O_APPEND, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (query_cbk_args->query_fd < 0) { gf_msg (this->name, GF_LOG_ERROR, errno, DHT_MSG_LOG_TIER_ERROR, "Failed to open query file %s", GET_QFILE_PATH - (gfdb_brick_dict_info->_gfdb_promote)); + (gfdb_brick_info->_gfdb_promote)); goto out; } - if (!gfdb_brick_dict_info->_gfdb_promote) { + if (!gfdb_brick_info->_gfdb_promote) { if (query_cbk_args->defrag->write_freq_threshold == 0 && query_cbk_args->defrag->read_freq_threshold == 0) { ret = gfdb_methods.find_unchanged_for_time ( conn_node, tier_gf_query_callback, (void *)query_cbk_args, - gfdb_brick_dict_info->time_stamp); + gfdb_brick_info->time_stamp); } else { ret = gfdb_methods.find_unchanged_for_time_freq ( conn_node, tier_gf_query_callback, (void *)query_cbk_args, - gfdb_brick_dict_info->time_stamp, + gfdb_brick_info->time_stamp, query_cbk_args->defrag-> write_freq_threshold, query_cbk_args->defrag-> @@ -730,13 +688,13 @@ tier_process_self_query (tier_brick_list_t *local_brick, void *args) conn_node, tier_gf_query_callback, (void *)query_cbk_args, - gfdb_brick_dict_info->time_stamp); + gfdb_brick_info->time_stamp); } else { ret = gfdb_methods.find_recently_changed_files_freq ( conn_node, tier_gf_query_callback, (void *)query_cbk_args, - gfdb_brick_dict_info->time_stamp, + gfdb_brick_info->time_stamp, query_cbk_args->defrag-> write_freq_threshold, query_cbk_args->defrag->read_freq_threshold, @@ -785,9 +743,9 @@ out: ctr_ipc_dict = NULL; } - if (query_cbk_args && query_cbk_args->queryFILE) { - fclose (query_cbk_args->queryFILE); - query_cbk_args->queryFILE = NULL; + if (query_cbk_args && query_cbk_args->query_fd >= 0) { + close (query_cbk_args->query_fd); + query_cbk_args->query_fd = -1; } gfdb_methods.fini_db (conn_node); return ret; @@ -806,19 +764,19 @@ tier_process_ctr_query (tier_brick_list_t *local_brick, void *args) xlator_t *this = NULL; dict_t *ctr_ipc_in_dict = NULL; dict_t *ctr_ipc_out_dict = NULL; - _gfdb_brick_dict_info_t *gfdb_brick_dict_info = args; + gfdb_brick_info_t *gfdb_brick_info = args; gfdb_ipc_ctr_params_t *ipc_ctr_params = NULL; int count = 0; /*Init of all the essentials*/ - GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_dict_info , out); - query_cbk_args = gfdb_brick_dict_info->_query_cbk_args; + GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_info , out); + query_cbk_args = gfdb_brick_info->_query_cbk_args; GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out); this = query_cbk_args->this; GF_VALIDATE_OR_GOTO (this->name, - gfdb_brick_dict_info->_query_cbk_args, out); + gfdb_brick_info->_query_cbk_args, out); GF_VALIDATE_OR_GOTO (this->name, local_brick, out); @@ -843,13 +801,13 @@ tier_process_ctr_query (tier_brick_list_t *local_brick, void *args) } /* set all the query params*/ - ipc_ctr_params->is_promote = gfdb_brick_dict_info->_gfdb_promote; + ipc_ctr_params->is_promote = gfdb_brick_info->_gfdb_promote; ipc_ctr_params->write_freq_threshold = query_cbk_args-> defrag->write_freq_threshold; ipc_ctr_params->read_freq_threshold = query_cbk_args-> defrag->read_freq_threshold; memcpy (&ipc_ctr_params->time_stamp, - gfdb_brick_dict_info->time_stamp, + gfdb_brick_info->time_stamp, sizeof (gfdb_time_t)); SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_in_dict, @@ -1038,7 +996,7 @@ tier_build_migration_qfile (demotion_args_t *args, gf_boolean_t is_promotion) { gfdb_time_t current_time; - _gfdb_brick_dict_info_t gfdb_brick_dict_info; + gfdb_brick_info_t gfdb_brick_info; gfdb_time_t time_in_past; int ret = -1; tier_brick_list_t *local_brick = NULL; @@ -1068,13 +1026,13 @@ tier_build_migration_qfile (demotion_args_t *args, } time_in_past.tv_sec = current_time.tv_sec - time_in_past.tv_sec; time_in_past.tv_usec = current_time.tv_usec - time_in_past.tv_usec; - gfdb_brick_dict_info.time_stamp = &time_in_past; - gfdb_brick_dict_info._gfdb_promote = is_promotion; - gfdb_brick_dict_info._query_cbk_args = query_cbk_args; + gfdb_brick_info.time_stamp = &time_in_past; + gfdb_brick_info._gfdb_promote = is_promotion; + gfdb_brick_info._query_cbk_args = query_cbk_args; list_for_each_entry (local_brick, args->brick_list, list) { ret = tier_process_brick (local_brick, - &gfdb_brick_dict_info); + &gfdb_brick_info); if (ret) { gf_msg (args->this->name, GF_LOG_ERROR, 0, DHT_MSG_BRICK_QUERY_FAILED, @@ -1095,16 +1053,16 @@ tier_migrate_files_using_qfile (demotion_args_t *comp, char renamed_file[PATH_MAX] = ""; int ret = -1; - query_cbk_args->queryFILE = fopen (qfile, "r"); - if (!query_cbk_args->queryFILE) { - gf_msg ("tier", GF_LOG_ERROR, 0, + query_cbk_args->query_fd = open (qfile, O_RDONLY); + if (query_cbk_args->query_fd < 0) { + gf_msg ("tier", GF_LOG_ERROR, errno, DHT_MSG_FOPEN_FAILED, - "Failed opening %s for migration", qfile); + "Failed to open %s for migration", qfile); goto out; } ret = tier_migrate_using_query_file ((void *)query_cbk_args); - fclose (query_cbk_args->queryFILE); - query_cbk_args->queryFILE = NULL; + close (query_cbk_args->query_fd); + query_cbk_args->query_fd = -1; if (ret) { snprintf (renamed_file, sizeof renamed_file, "%s.err", qfile); sys_rename (qfile, renamed_file); diff --git a/xlators/cluster/dht/src/tier.h b/xlators/cluster/dht/src/tier.h index 18ca3269f8b..0f8107924ea 100644 --- a/xlators/cluster/dht/src/tier.h +++ b/xlators/cluster/dht/src/tier.h @@ -39,18 +39,18 @@ typedef struct _query_cbk_args { xlator_t *this; gf_defrag_info_t *defrag; - FILE *queryFILE; + int query_fd; int is_promotion; } query_cbk_args_t; int gf_run_tier(xlator_t *this, gf_defrag_info_t *defrag); -typedef struct _gfdb_brick_dict_info { +typedef struct gfdb_brick_info { gfdb_time_t *time_stamp; gf_boolean_t _gfdb_promote; query_cbk_args_t *_query_cbk_args; -} _gfdb_brick_dict_info_t; +} gfdb_brick_info_t; typedef struct brick_list { xlator_t *xlator; |