diff options
author | Krishnan Parthasarathi <kparthas@redhat.com> | 2013-02-11 11:53:36 +0530 |
---|---|---|
committer | Anand Avati <avati@redhat.com> | 2013-02-19 18:57:47 -0800 |
commit | d0f8b8fb947048fb1017dd1d9fd1ea144fcb6dc0 (patch) | |
tree | ccbb967d0a7fc07b2e7ead03bf0eeb5aafb4a755 /xlators | |
parent | 66ec7fb345441ba6283e68f95410ce3ee9d5554e (diff) |
glusterd: Made gd_synctask_begin less 'monolithic' in terms of LOC.
Change-Id: I2dcaea56c2ca2c2c42c046ab7d2a39d586307868
BUG: 852147
Signed-off-by: Krishnan Parthasarathi <kparthas@redhat.com>
Reviewed-on: http://review.gluster.org/4507
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'xlators')
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-syncop.c | 382 |
1 files changed, 235 insertions, 147 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c index 37839a9e38c..a01753f3a29 100644 --- a/xlators/mgmt/glusterd/src/glusterd-syncop.c +++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c @@ -545,8 +545,7 @@ out: static int -glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp, - char *op_errstr) +glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp) { int ret = 0; @@ -570,7 +569,7 @@ glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp, break; case GD_OP_GSYNC_SET: - ret = glusterd_gsync_use_rsp_dict (aggr, rsp, op_errstr); + ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL); if (ret) goto out; break; @@ -611,118 +610,85 @@ out: } void -gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) +gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers) { - int ret = -1; - dict_t *req_dict = NULL; - dict_t *rsp_dict = NULL; - glusterd_peerinfo_t *peerinfo = NULL; - glusterd_peerinfo_t *tmp = NULL; - glusterd_conf_t *conf = NULL; - uuid_t tmp_uuid = {0,}; - glusterd_op_t op = 0; - int32_t tmp_op = 0; - gf_boolean_t local_locked = _gf_false; - char *op_errstr = NULL; - glusterd_pending_node_t *pending_node = NULL; - rpc_clnt_t *rpc = NULL; - int brick_count = 0; - struct list_head selected = {0}; - xlator_t *this = NULL; - char *hostname = NULL; + glusterd_peerinfo_t *peerinfo = NULL; - this = THIS; - GF_ASSERT (this); - conf = this->private; - GF_ASSERT (conf); - - ret = dict_get_int32 (op_ctx, GD_SYNC_OPCODE_KEY, &tmp_op); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Failed to get volume " - "operation"); - goto out; - } - - op = tmp_op; - - /* Lock everything */ - ret = glusterd_lock (MY_UUID); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Unable to acquire lock"); - gf_asprintf (&op_errstr, "Another transaction is in progress. " - "Please try again after sometime."); - goto out; - } - /* successful lock in local node */ - local_locked = _gf_true; - - /* storing op globally to access in synctask code paths - * This is still acceptable, as we are performing this under - * the 'cluster' lock*/ - - glusterd_op_set_op (op); - INIT_LIST_HEAD (&conf->xaction_peers); - list_for_each_entry (peerinfo, &conf->peers, uuid_list) { + list_for_each_entry (peerinfo, peers, uuid_list) { if (!peerinfo->connected) continue; if (peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED) continue; + list_add_tail (&peerinfo->op_peers_list, xact_peers); + } +} + +int +gd_lock_op_phase (struct list_head *peers, char **op_errstr) +{ + glusterd_peerinfo_t *peerinfo = NULL; + uuid_t peer_uuid = {0}; + int ret = -1; + xlator_t *this = NULL; + + this = THIS; + list_for_each_entry (peerinfo, peers, op_peers_list) { ret = gd_syncop_mgmt_lock (peerinfo->rpc, - MY_UUID, tmp_uuid); + MY_UUID, peer_uuid); if (ret) { - gf_asprintf (&op_errstr, "Another transaction could be " + gf_asprintf (op_errstr, "Another transaction could be " "in progress. Please try again after " "sometime."); gf_log (this->name, GF_LOG_ERROR, "Failed to acquire " "lock on peer %s", peerinfo->hostname); goto out; - } else { - list_add_tail (&peerinfo->op_peers_list, - &conf->xaction_peers); } } - ret = glusterd_op_build_payload (&req_dict, &op_errstr, op_ctx); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, LOGSTR_BUILD_PAYLOAD, - gd_op_list[op]); - if (op_errstr == NULL) - gf_asprintf (&op_errstr, OPERRSTR_BUILD_PAYLOAD); - goto out; - } + ret = 0; +out: + return ret; +} + +int +gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, + dict_t *req_dict, char **op_errstr) +{ + dict_t *rsp_dict = NULL; + int ret = -1; + char *hostname = NULL; + xlator_t *this = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + uuid_t tmp_uuid = {0}; - /* stage op */ - ret = -1; + this = THIS; rsp_dict = dict_new (); if (!rsp_dict) goto out; - ret = glusterd_op_stage_validate (op, req_dict, &op_errstr, rsp_dict); + ret = glusterd_op_stage_validate (op, req_dict, op_errstr, rsp_dict); if (ret) { hostname = "localhost"; goto stage_done; } - if ((op == GD_OP_REPLACE_BRICK) || - (op == GD_OP_CLEARLOCKS_VOLUME)) { - ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, - op_errstr); + if ((op == GD_OP_REPLACE_BRICK)) { + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); if (ret) { gf_log (this->name, GF_LOG_ERROR, "%s", - (op_errstr)? op_errstr: "Failed to aggregate " - "response from node/brick"); + "Failed to aggregate response from node/brick"); goto out; } } dict_unref (rsp_dict); rsp_dict = NULL; - list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) { + list_for_each_entry (peerinfo, peers, op_peers_list) { ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, MY_UUID, tmp_uuid, op, req_dict, &rsp_dict, - &op_errstr); + op_errstr); if (ret) { hostname = peerinfo->hostname; goto stage_done; @@ -730,12 +696,11 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) if (op == GD_OP_REPLACE_BRICK) { ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, - rsp_dict, - op_errstr); + rsp_dict); if (ret) { gf_log (this->name, GF_LOG_ERROR, "%s", - (op_errstr)? op_errstr: "Failed to " - "aggregate response from node/brick"); + "Failed to aggregate response from " + " node/brick"); goto out; } } @@ -746,14 +711,122 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) stage_done: if (ret) { gf_log (this->name, GF_LOG_ERROR, LOGSTR_STAGE_FAIL, - gd_op_list[op], hostname, (op_errstr) ? ":" : " ", - (op_errstr) ? op_errstr : " "); - if (op_errstr == NULL) - gf_asprintf (&op_errstr, OPERRSTR_STAGE_FAIL, hostname); + gd_op_list[op], hostname, (*op_errstr) ? ":" : " ", + (*op_errstr) ? *op_errstr : " "); + if (*op_errstr == NULL) + gf_asprintf (op_errstr, OPERRSTR_STAGE_FAIL, hostname); goto out; } +out: + if (rsp_dict) + dict_unref (rsp_dict); + return ret; +} - /*brick op */ +int +gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, + dict_t *req_dict, char **op_errstr) +{ + dict_t *rsp_dict = NULL; + int ret = -1; + char *hostname = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + xlator_t *this = NULL; + uuid_t tmp_uuid = {0}; + + this = THIS; + rsp_dict = dict_new (); + if (!rsp_dict) { + ret = -1; + goto out; + } + + ret = glusterd_op_commit_perform (op, req_dict, op_errstr, rsp_dict); + if (ret) { + hostname = "localhost"; + goto commit_done; + } + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "%s", + "Failed to aggregate response " + "from node/brick"); + goto out; + } + dict_unref (rsp_dict); + rsp_dict = NULL; + + list_for_each_entry (peerinfo, peers, op_peers_list) { + ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, + MY_UUID, tmp_uuid, + op, req_dict, &rsp_dict, + op_errstr); + if (ret) { + hostname = peerinfo->hostname; + goto commit_done; + } + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "%s", + "Failed to aggregate " + "response from node/brick"); + goto out; + } + dict_unref (rsp_dict); + rsp_dict = NULL; + } +commit_done: + if (ret) { + gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL, + gd_op_list[op], hostname, (*op_errstr) ? ":" : " ", + (*op_errstr) ? *op_errstr : " "); + if (*op_errstr == NULL) + gf_asprintf (op_errstr, OPERRSTR_COMMIT_FAIL, + hostname); + goto out; + } else { + glusterd_op_modify_op_ctx (op, op_ctx); + } + +out: + if (rsp_dict) + dict_unref (rsp_dict); + return ret; +} + +int +gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int ret, + rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr) +{ + glusterd_peerinfo_t *peerinfo = NULL; + glusterd_peerinfo_t *tmp = NULL; + uuid_t tmp_uuid = {0}; + + list_for_each_entry_safe (peerinfo, tmp, peers, op_peers_list) { + gd_syncop_mgmt_unlock (peerinfo->rpc, + MY_UUID, tmp_uuid); + list_del_init (&peerinfo->op_peers_list); + } + + glusterd_op_send_cli_response (op, ret, 0, req, op_ctx, op_errstr); + glusterd_op_clear_op (op); + glusterd_unlock (MY_UUID); + + return 0; +} + +int +gd_brick_op_phase (glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict, char **op_errstr) +{ + glusterd_pending_node_t *pending_node = NULL; + struct list_head selected = {0,}; + xlator_t *this = NULL; + int brick_count = 0; + int ret = -1; + rpc_clnt_t *rpc = NULL; + dict_t *rsp_dict = NULL; + + this = THIS; rsp_dict = dict_new (); if (!rsp_dict) { ret = -1; @@ -761,15 +834,20 @@ stage_done: } INIT_LIST_HEAD (&selected); - ret = glusterd_op_bricks_select (op, req_dict, &op_errstr, - &selected, rsp_dict); + ret = glusterd_op_bricks_select (op, req_dict, op_errstr, &selected, rsp_dict); if (ret) { gf_log (this->name, GF_LOG_ERROR, "%s", - (op_errstr)? op_errstr: "Brick op failed. Check " + (*op_errstr)? *op_errstr: "Brick op failed. Check " "glusterd log file for more details."); goto out; } + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); + if (ret) + goto out; + dict_unref (rsp_dict); + rsp_dict = NULL; + brick_count = 0; list_for_each_entry (pending_node, &selected, list) { rpc = glusterd_pending_node_get_rpc (pending_node); @@ -787,89 +865,99 @@ stage_done: goto out; } ret = gd_syncop_mgmt_brick_op (rpc, pending_node, op, req_dict, - op_ctx, &op_errstr); + op_ctx, op_errstr); if (ret) goto out; brick_count++; } + ret = 0; +out: + if (rsp_dict) + dict_unref (rsp_dict); gf_log (this->name, GF_LOG_DEBUG, "Sent op req to %d bricks", brick_count); + return ret; +} - /* commit op */ - ret = glusterd_op_commit_perform (op, req_dict, &op_errstr, rsp_dict); +void +gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) +{ + int ret = -1; + dict_t *req_dict = NULL; + glusterd_conf_t *conf = NULL; + glusterd_op_t op = 0; + int32_t tmp_op = 0; + char *op_errstr = NULL; + xlator_t *this = NULL; + gf_boolean_t local_locked = _gf_false; + + this = THIS; + GF_ASSERT (this); + conf = this->private; + GF_ASSERT (conf); + + ret = dict_get_int32 (op_ctx, GD_SYNC_OPCODE_KEY, &tmp_op); if (ret) { - hostname = "localhost"; - goto commit_done; + gf_log (this->name, GF_LOG_ERROR, "Failed to get volume " + "operation"); + goto out; } - ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, op_errstr); + + op = tmp_op; + ret = glusterd_lock (MY_UUID); if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s", - (op_errstr)? op_errstr: "Failed to aggregate response " - "from node/brick"); + gf_log (this->name, GF_LOG_ERROR, "Unable to acquire lock"); + gf_asprintf (&op_errstr, "Another transaction is in progress. " + "Please try again after sometime."); goto out; } - dict_unref (rsp_dict); - rsp_dict = NULL; + local_locked = _gf_true; - list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) { - ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, - MY_UUID, tmp_uuid, - op, req_dict, &rsp_dict, - &op_errstr); - if (ret) { - hostname = peerinfo->hostname; - goto commit_done; - } - ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, - op_errstr); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s", - (op_errstr)? op_errstr: "Failed to aggregate " - "response from node/brick"); - goto out; - } - dict_unref (rsp_dict); - rsp_dict = NULL; - } -commit_done: + /* storing op globally to access in synctask code paths + * This is still acceptable, as we are performing this under + * the 'cluster' lock*/ + glusterd_op_set_op (op); + INIT_LIST_HEAD (&conf->xaction_peers); + gd_build_peers_list (&conf->peers, &conf->xaction_peers); + + ret = gd_lock_op_phase (&conf->xaction_peers, &op_errstr); + if (ret) + goto out; + + ret = glusterd_op_build_payload (&req_dict, &op_errstr, op_ctx); if (ret) { - gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL, - gd_op_list[op], hostname, (op_errstr) ? ":" : " ", - (op_errstr) ? op_errstr : " "); + gf_log (this->name, GF_LOG_ERROR, LOGSTR_BUILD_PAYLOAD, + gd_op_list[op]); if (op_errstr == NULL) - gf_asprintf (&op_errstr, OPERRSTR_COMMIT_FAIL, - hostname); + gf_asprintf (&op_errstr, OPERRSTR_BUILD_PAYLOAD); goto out; - } else { - glusterd_op_modify_op_ctx (op, op_ctx); - } + } - ret = 0; -out: - if (local_locked) { - list_for_each_entry_safe (peerinfo, tmp, &conf->xaction_peers, - op_peers_list) { - gd_syncop_mgmt_unlock (peerinfo->rpc, - MY_UUID, tmp_uuid); - list_del_init (&peerinfo->op_peers_list); - } + ret = gd_stage_op_phase (&conf->xaction_peers, op, op_ctx, req_dict, + &op_errstr); + if (ret) + goto out; - /* Local node should be the one to be locked first, - unlocked last to prevent races */ - glusterd_op_clear_op (op); - glusterd_unlock (MY_UUID); - } + ret = gd_brick_op_phase (op, op_ctx, req_dict, &op_errstr); + if (ret) + goto out; - glusterd_op_send_cli_response (op, ret, 0, req, op_ctx, op_errstr); + ret = gd_commit_op_phase (&conf->xaction_peers, op, op_ctx, req_dict, + &op_errstr); + if (ret) + goto out; + + ret = 0; +out: + if (local_locked) + (void) gd_unlock_op_phase (&conf->xaction_peers, op, ret, req, + op_ctx, op_errstr); if (req_dict) dict_unref (req_dict); - if (ret && rsp_dict) - dict_unref (rsp_dict); - if (op_errstr) GF_FREE (op_errstr); |