diff options
author | Joseph Fernandes <josferna@redhat.com> | 2015-09-18 19:57:54 +0530 |
---|---|---|
committer | Dan Lambright <dlambrig@redhat.com> | 2015-10-08 12:00:31 -0700 |
commit | 58d1a9be562630bd1ed8af3e496ca05e087adece (patch) | |
tree | 5feb32ff9e6ad3c38d87759589426471a4044671 /xlators/cluster | |
parent | 81d4aa18b3a1cdad0e0e8df43fe4c8c141c06618 (diff) |
tier/ctr: Solution for db locks for tier migrator and ctr using sqlite version less than 3.7 i.e rhel 6.7
Problem: On RHEL 6.7, we have sqlite version 3.6.2 which doesnt support
WAL journaling mode, as this journaling mode is only available in sqlite 3.7 and above.
As a result we cannot have to progreses concurrently accessing sqlite, without
running into db locks! Well WAL is also need for performace on CTR side.
Solution: This solution is to use CTR db connection for doing queries when WAL mode is
absent. i,e tier migrator will send sync_op ipc calls to CTR, which in turn will
do the query and create/update the query file suggested by tier migrator.
Pending: Well this solution will stop the db locks but the performance is still an issue for CTR.
We are developing an in-Memory Transaction Log (iMeTaL) which will help boost the CTR
performance by doing in memory udpates on the IO path and later flush the updates to
the db in a batch/segment flush.
Change-Id: Ie3149643ded159234b5cc6aa6cf93b9022c2f124
BUG: 1240577
Signed-off-by: Joseph Fernandes <josferna@redhat.com>
Signed-off-by: Dan Lambright <dlambrig@redhat.com>
Signed-off-by: Joseph Fernandes <josferna@redhat.com>
Reviewed-on: http://review.gluster.org/12191
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Luis Pabon <lpabon@redhat.com>
Diffstat (limited to 'xlators/cluster')
-rw-r--r-- | xlators/cluster/dht/src/dht-mem-types.h | 1 | ||||
-rw-r--r-- | xlators/cluster/dht/src/dht-messages.h | 11 | ||||
-rw-r--r-- | xlators/cluster/dht/src/tier.c | 301 |
3 files changed, 297 insertions, 16 deletions
diff --git a/xlators/cluster/dht/src/dht-mem-types.h b/xlators/cluster/dht/src/dht-mem-types.h index e3a38ed7e03..a90b5710745 100644 --- a/xlators/cluster/dht/src/dht-mem-types.h +++ b/xlators/cluster/dht/src/dht-mem-types.h @@ -35,6 +35,7 @@ enum gf_dht_mem_types_ { gf_dht_mt_octx_t, gf_dht_mt_miginfo_t, gf_tier_mt_bricklist_t, + gf_tier_mt_ipc_ctr_params_t, gf_dht_mt_end }; #endif diff --git a/xlators/cluster/dht/src/dht-messages.h b/xlators/cluster/dht/src/dht-messages.h index 80b35557408..61631e682f8 100644 --- a/xlators/cluster/dht/src/dht-messages.h +++ b/xlators/cluster/dht/src/dht-messages.h @@ -40,7 +40,7 @@ */ #define GLFS_DHT_BASE GLFS_MSGID_COMP_DHT -#define GLFS_DHT_NUM_MESSAGES 106 +#define GLFS_DHT_NUM_MESSAGES 107 #define GLFS_MSGID_END (GLFS_DHT_BASE + GLFS_DHT_NUM_MESSAGES + 1) /* Messages with message IDs */ @@ -993,5 +993,14 @@ */ #define DHT_MSG_HAS_MIGINFO (GLFS_DHT_BASE + 106) + +/* + * @messageid 109107 + * @diagnosis + * @recommendedaction None + */ + +#define DHT_MSG_LOG_IPC_TIER_ERROR (GLFS_DHT_BASE + 107) + #define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages" #endif /* _DHT_MESSAGES_H_ */ diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c index aa968b6f352..ff01862bed9 100644 --- a/xlators/cluster/dht/src/tier.c +++ b/xlators/cluster/dht/src/tier.c @@ -430,17 +430,20 @@ out: return ret; } -/*This is the call back function for each brick from hot/cold bricklist - * It picks up each bricks db and queries for eligible files for migration. - * The list of eligible files are populated in appropriate query files*/ + + + +/*Create query file in tier process*/ static int -tier_process_brick_cbk (brick_list_t *local_brick, void *args) { +tier_process_self_query (brick_list_t *local_brick, void *args) +{ int ret = -1; char *db_path = NULL; - query_cbk_args_t *query_cbk_args = NULL; + query_cbk_args_t *query_cbk_args = NULL; xlator_t *this = NULL; - gfdb_conn_node_t *conn_node = NULL; + gfdb_conn_node_t *conn_node = NULL; dict_t *params_dict = NULL; + dict_t *ctr_ipc_dict = NULL; _gfdb_brick_dict_info_t *gfdb_brick_dict_info = args; /*Init of all the essentials*/ @@ -458,6 +461,7 @@ tier_process_brick_cbk (brick_list_t *local_brick, void *args) { GF_VALIDATE_OR_GOTO (this->name, local_brick->xlator, out); GF_VALIDATE_OR_GOTO (this->name, local_brick->brick_db_path, out); + db_path = local_brick->brick_db_path; /*Preparing DB parameters before init_db i.e getting db connection*/ @@ -465,7 +469,7 @@ tier_process_brick_cbk (brick_list_t *local_brick, void *args) { if (!params_dict) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, - "DB Params cannot initialized!"); + "DB Params cannot initialized"); goto out; } SET_DB_PARAM_TO_DICT(this->name, params_dict, @@ -540,16 +544,40 @@ tier_process_brick_cbk (brick_list_t *local_brick, void *args) { } /*Clear the heat on the DB entries*/ - ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR, NULL, NULL); + /*Preparing ctr_ipc_dict*/ + ctr_ipc_dict = dict_new (); + if (!ctr_ipc_dict) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "ctr_ipc_dict cannot initialized"); + goto out; + } + + SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_dict, + GFDB_IPC_CTR_KEY, GFDB_IPC_CTR_CLEAR_OPS, + ret, out); + + ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR, ctr_ipc_dict, + NULL); if (ret) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, "Failed clearing the heat " - "on db %s", local_brick->brick_db_path); + "on db %s error %d", local_brick->brick_db_path, ret); goto out; } ret = 0; out: + if (params_dict) { + dict_unref (params_dict); + params_dict = NULL; + } + + if (ctr_ipc_dict) { + dict_unref (ctr_ipc_dict); + ctr_ipc_dict = NULL; + } + if (query_cbk_args && query_cbk_args->queryFILE) { fclose (query_cbk_args->queryFILE); query_cbk_args->queryFILE = NULL; @@ -558,6 +586,245 @@ out: return ret; } + + + + +/*Ask CTR to create the query file*/ +static int +tier_process_ctr_query (brick_list_t *local_brick, void *args) +{ + int ret = -1; + query_cbk_args_t *query_cbk_args = NULL; + xlator_t *this = NULL; + dict_t *ctr_ipc_in_dict = NULL; + dict_t *ctr_ipc_out_dict = NULL; + _gfdb_brick_dict_info_t *gfdb_brick_dict_info = args; + gfdb_ipc_ctr_params_t *ipc_ctr_params = NULL; + int count = 0; + + /*Init of all the essentials*/ + GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_dict_info , out); + query_cbk_args = gfdb_brick_dict_info->_query_cbk_args; + + GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out); + this = query_cbk_args->this; + + GF_VALIDATE_OR_GOTO (this->name, + gfdb_brick_dict_info->_query_cbk_args, out); + + GF_VALIDATE_OR_GOTO (this->name, local_brick, out); + + GF_VALIDATE_OR_GOTO (this->name, local_brick->xlator, out); + + GF_VALIDATE_OR_GOTO (this->name, local_brick->brick_db_path, out); + + + /*Preparing ctr_ipc_in_dict*/ + ctr_ipc_in_dict = dict_new (); + if (!ctr_ipc_in_dict) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "ctr_ipc_in_dict cannot initialized"); + goto out; + } + + ipc_ctr_params = GF_CALLOC (1, sizeof (gfdb_ipc_ctr_params_t), + gf_tier_mt_ipc_ctr_params_t); + if (!ipc_ctr_params) { + goto out; + } + + /* set all the query params*/ + ipc_ctr_params->is_promote = gfdb_brick_dict_info->_gfdb_promote; + ipc_ctr_params->write_freq_threshold = query_cbk_args-> + defrag->write_freq_threshold; + ipc_ctr_params->read_freq_threshold = query_cbk_args-> + defrag->read_freq_threshold; + memcpy (&ipc_ctr_params->time_stamp, + gfdb_brick_dict_info->time_stamp, + sizeof (gfdb_time_t)); + + SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_in_dict, + GFDB_IPC_CTR_KEY, GFDB_IPC_CTR_QUERY_OPS, + ret, out); + + SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_in_dict, + GFDB_IPC_CTR_GET_QFILE_PATH, + GET_QFILE_PATH(ipc_ctr_params->is_promote), + ret, out); + + ret = dict_set_bin (ctr_ipc_in_dict, GFDB_IPC_CTR_GET_QUERY_PARAMS, + ipc_ctr_params, sizeof (*ipc_ctr_params)); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, LG_MSG_SET_PARAM_FAILED, + "Failed setting %s to params dictionary", + GFDB_IPC_CTR_GET_QUERY_PARAMS); + goto out; + } + + ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR, + ctr_ipc_in_dict, &ctr_ipc_out_dict); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_IPC_TIER_ERROR, "Failed query on %s ret %d", + local_brick->brick_db_path, ret); + goto out; + } + + ret = dict_get_int32(ctr_ipc_out_dict, GFDB_IPC_CTR_RET_QUERY_COUNT, + &count); + if (ret) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, "Failed getting count " + "of records on %s", + local_brick->brick_db_path); + goto out; + } + + if (count < 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, "Failed query on %s", + local_brick->brick_db_path); + ret = -1; + goto out; + } + + pthread_mutex_lock (&dm_stat_mutex); + query_cbk_args->defrag->num_files_lookedup = count; + pthread_mutex_unlock (&dm_stat_mutex); + + ret = 0; +out: + + if (ctr_ipc_in_dict) { + dict_unref(ctr_ipc_in_dict); + ctr_ipc_in_dict = NULL; + } + + if (ctr_ipc_out_dict) { + dict_unref(ctr_ipc_out_dict); + ctr_ipc_out_dict = NULL; + ipc_ctr_params = NULL; + } + + GF_FREE (ipc_ctr_params); + + return ret; +} + + + + +/*This is the call back function for each brick from hot/cold bricklist + * It picks up each bricks db and queries for eligible files for migration. + * The list of eligible files are populated in appropriate query files*/ +static int +tier_process_brick (brick_list_t *local_brick, void *args) { + int ret = -1; + dict_t *ctr_ipc_in_dict = NULL; + dict_t *ctr_ipc_out_dict = NULL; + char *strval = NULL; + + GF_VALIDATE_OR_GOTO ("tier", local_brick, out); + + GF_VALIDATE_OR_GOTO ("tier", local_brick->xlator, out); + + if (dht_tier_db_type == GFDB_SQLITE3) { + + /*Preparing ctr_ipc_in_dict*/ + ctr_ipc_in_dict = dict_new (); + if (!ctr_ipc_in_dict) { + gf_msg ("tier", GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, + "ctr_ipc_in_dict cannot initialized"); + goto out; + } + + ret = dict_set_str (ctr_ipc_in_dict, GFDB_IPC_CTR_KEY, + GFDB_IPC_CTR_GET_DB_PARAM_OPS); + if (ret) { + gf_msg ("tier", GF_LOG_ERROR, 0,\ + LG_MSG_SET_PARAM_FAILED, "Failed setting %s "\ + "to params dictionary", GFDB_IPC_CTR_KEY);\ + goto out; + } + + ret = dict_set_str (ctr_ipc_in_dict, + GFDB_IPC_CTR_GET_DB_PARAM_OPS, ""); + if (ret) { + gf_msg ("tier", GF_LOG_ERROR, 0,\ + LG_MSG_SET_PARAM_FAILED, "Failed setting %s "\ + "to params dictionary", + GFDB_IPC_CTR_GET_DB_PARAM_OPS);\ + goto out; + } + + ret = dict_set_str (ctr_ipc_in_dict, + GFDB_IPC_CTR_GET_DB_KEY, "journal_mode"); + if (ret) { + gf_msg ("tier", GF_LOG_ERROR, 0,\ + LG_MSG_SET_PARAM_FAILED, "Failed setting %s "\ + "to params dictionary", + GFDB_IPC_CTR_GET_DB_KEY);\ + goto out; + } + + + + ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR, + ctr_ipc_in_dict, &ctr_ipc_out_dict); + if (ret || ctr_ipc_out_dict == NULL) { + gf_msg ("tier", GF_LOG_ERROR, 0, + DHT_MSG_LOG_TIER_ERROR, "Failed getting" + "journal_mode of sql db %s", + local_brick->brick_db_path); + goto out; + } + + ret = dict_get_str (ctr_ipc_out_dict, "journal_mode", &strval); + if (ret) { + gf_msg ("tier", GF_LOG_ERROR, 0,\ + LG_MSG_GET_PARAM_FAILED, "Failed getting %s "\ + "to params dictionary", + "journal_mode");\ + goto out; + } + + if (strval && (strncmp(strval, "wal", strlen ("wal")) == 0)) { + ret = tier_process_self_query (local_brick, args); + if (ret) { + goto out; + } + } else { + ret = tier_process_ctr_query (local_brick, args); + if (ret) { + goto out; + } + } + ret = 0; + + } else { + ret = tier_process_self_query (local_brick, args); + if (ret) { + goto out; + } + } + + ret = 0; +out: + if (ctr_ipc_in_dict) + dict_unref (ctr_ipc_in_dict); + + if (ctr_ipc_out_dict) + dict_unref (ctr_ipc_out_dict); + + return ret; +} + + + + static int tier_build_migration_qfile (demotion_args_t *args, query_cbk_args_t *query_cbk_args, @@ -600,8 +867,8 @@ tier_build_migration_qfile (demotion_args_t *args, gfdb_brick_dict_info._query_cbk_args = query_cbk_args; list_for_each_entry (local_brick, args->brick_list, list) { - ret = tier_process_brick_cbk (local_brick, - &gfdb_brick_dict_info); + ret = tier_process_brick (local_brick, + &gfdb_brick_dict_info); if (ret) { gf_msg (args->this->name, GF_LOG_ERROR, 0, DHT_MSG_BRICK_QUERY_FAILED, @@ -655,6 +922,8 @@ tier_demote (void *args) GF_VALIDATE_OR_GOTO (demotion_args->this->name, demotion_args->defrag, out); + THIS = demotion_args->this; + query_cbk_args.this = demotion_args->this; query_cbk_args.defrag = demotion_args->defrag; query_cbk_args.is_promotion = 0; @@ -691,6 +960,8 @@ static void GF_VALIDATE_OR_GOTO (promotion_args->this->name, promotion_args->defrag, out); + THIS = promotion_args->this; + query_cbk_args.this = promotion_args->this; query_cbk_args.defrag = promotion_args->defrag; query_cbk_args.is_promotion = 1; @@ -935,7 +1206,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag) gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, "Failed starting Demotion " - "thread!"); + "thread"); } } @@ -951,7 +1222,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag) gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, "Failed starting Promotion " - "thread!"); + "thread"); } } @@ -960,7 +1231,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag) if (demotion_args.return_value) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, - "Demotion failed!"); + "Demotion failed"); } ret_demotion = demotion_args.return_value; } @@ -970,7 +1241,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag) if (promotion_args.return_value) { gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR, - "Promotion failed!"); + "Promotion failed"); } ret_promotion = promotion_args.return_value; } |