summaryrefslogtreecommitdiffstats
path: root/xlators/mgmt/glusterd/src/glusterd-syncop.c
diff options
context:
space:
mode:
authorKrishnan Parthasarathi <kparthas@redhat.com>2013-02-11 11:53:36 +0530
committerAnand Avati <avati@redhat.com>2013-02-19 18:57:47 -0800
commitd0f8b8fb947048fb1017dd1d9fd1ea144fcb6dc0 (patch)
treeccbb967d0a7fc07b2e7ead03bf0eeb5aafb4a755 /xlators/mgmt/glusterd/src/glusterd-syncop.c
parent66ec7fb345441ba6283e68f95410ce3ee9d5554e (diff)
glusterd: Made gd_synctask_begin less 'monolithic' in terms of LOC.
Change-Id: I2dcaea56c2ca2c2c42c046ab7d2a39d586307868 BUG: 852147 Signed-off-by: Krishnan Parthasarathi <kparthas@redhat.com> Reviewed-on: http://review.gluster.org/4507 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Anand Avati <avati@redhat.com>
Diffstat (limited to 'xlators/mgmt/glusterd/src/glusterd-syncop.c')
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-syncop.c382
1 files changed, 235 insertions, 147 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c
index 37839a9e3..a01753f3a 100644
--- a/xlators/mgmt/glusterd/src/glusterd-syncop.c
+++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c
@@ -545,8 +545,7 @@ out:
static int
-glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp,
- char *op_errstr)
+glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp)
{
int ret = 0;
@@ -570,7 +569,7 @@ glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp,
break;
case GD_OP_GSYNC_SET:
- ret = glusterd_gsync_use_rsp_dict (aggr, rsp, op_errstr);
+ ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL);
if (ret)
goto out;
break;
@@ -611,118 +610,85 @@ out:
}
void
-gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
+gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers)
{
- int ret = -1;
- dict_t *req_dict = NULL;
- dict_t *rsp_dict = NULL;
- glusterd_peerinfo_t *peerinfo = NULL;
- glusterd_peerinfo_t *tmp = NULL;
- glusterd_conf_t *conf = NULL;
- uuid_t tmp_uuid = {0,};
- glusterd_op_t op = 0;
- int32_t tmp_op = 0;
- gf_boolean_t local_locked = _gf_false;
- char *op_errstr = NULL;
- glusterd_pending_node_t *pending_node = NULL;
- rpc_clnt_t *rpc = NULL;
- int brick_count = 0;
- struct list_head selected = {0};
- xlator_t *this = NULL;
- char *hostname = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
- this = THIS;
- GF_ASSERT (this);
- conf = this->private;
- GF_ASSERT (conf);
-
- ret = dict_get_int32 (op_ctx, GD_SYNC_OPCODE_KEY, &tmp_op);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "Failed to get volume "
- "operation");
- goto out;
- }
-
- op = tmp_op;
-
- /* Lock everything */
- ret = glusterd_lock (MY_UUID);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "Unable to acquire lock");
- gf_asprintf (&op_errstr, "Another transaction is in progress. "
- "Please try again after sometime.");
- goto out;
- }
- /* successful lock in local node */
- local_locked = _gf_true;
-
- /* storing op globally to access in synctask code paths
- * This is still acceptable, as we are performing this under
- * the 'cluster' lock*/
-
- glusterd_op_set_op (op);
- INIT_LIST_HEAD (&conf->xaction_peers);
- list_for_each_entry (peerinfo, &conf->peers, uuid_list) {
+ list_for_each_entry (peerinfo, peers, uuid_list) {
if (!peerinfo->connected)
continue;
if (peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED)
continue;
+ list_add_tail (&peerinfo->op_peers_list, xact_peers);
+ }
+}
+
+int
+gd_lock_op_phase (struct list_head *peers, char **op_errstr)
+{
+ glusterd_peerinfo_t *peerinfo = NULL;
+ uuid_t peer_uuid = {0};
+ int ret = -1;
+ xlator_t *this = NULL;
+
+ this = THIS;
+ list_for_each_entry (peerinfo, peers, op_peers_list) {
ret = gd_syncop_mgmt_lock (peerinfo->rpc,
- MY_UUID, tmp_uuid);
+ MY_UUID, peer_uuid);
if (ret) {
- gf_asprintf (&op_errstr, "Another transaction could be "
+ gf_asprintf (op_errstr, "Another transaction could be "
"in progress. Please try again after "
"sometime.");
gf_log (this->name, GF_LOG_ERROR, "Failed to acquire "
"lock on peer %s", peerinfo->hostname);
goto out;
- } else {
- list_add_tail (&peerinfo->op_peers_list,
- &conf->xaction_peers);
}
}
- ret = glusterd_op_build_payload (&req_dict, &op_errstr, op_ctx);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, LOGSTR_BUILD_PAYLOAD,
- gd_op_list[op]);
- if (op_errstr == NULL)
- gf_asprintf (&op_errstr, OPERRSTR_BUILD_PAYLOAD);
- goto out;
- }
+ ret = 0;
+out:
+ return ret;
+}
+
+int
+gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
+ dict_t *req_dict, char **op_errstr)
+{
+ dict_t *rsp_dict = NULL;
+ int ret = -1;
+ char *hostname = NULL;
+ xlator_t *this = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
+ uuid_t tmp_uuid = {0};
- /* stage op */
- ret = -1;
+ this = THIS;
rsp_dict = dict_new ();
if (!rsp_dict)
goto out;
- ret = glusterd_op_stage_validate (op, req_dict, &op_errstr, rsp_dict);
+ ret = glusterd_op_stage_validate (op, req_dict, op_errstr, rsp_dict);
if (ret) {
hostname = "localhost";
goto stage_done;
}
- if ((op == GD_OP_REPLACE_BRICK) ||
- (op == GD_OP_CLEARLOCKS_VOLUME)) {
- ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict,
- op_errstr);
+ if ((op == GD_OP_REPLACE_BRICK)) {
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict);
if (ret) {
gf_log (this->name, GF_LOG_ERROR, "%s",
- (op_errstr)? op_errstr: "Failed to aggregate "
- "response from node/brick");
+ "Failed to aggregate response from node/brick");
goto out;
}
}
dict_unref (rsp_dict);
rsp_dict = NULL;
- list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) {
+ list_for_each_entry (peerinfo, peers, op_peers_list) {
ret = gd_syncop_mgmt_stage_op (peerinfo->rpc,
MY_UUID, tmp_uuid,
op, req_dict, &rsp_dict,
- &op_errstr);
+ op_errstr);
if (ret) {
hostname = peerinfo->hostname;
goto stage_done;
@@ -730,12 +696,11 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
if (op == GD_OP_REPLACE_BRICK) {
ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx,
- rsp_dict,
- op_errstr);
+ rsp_dict);
if (ret) {
gf_log (this->name, GF_LOG_ERROR, "%s",
- (op_errstr)? op_errstr: "Failed to "
- "aggregate response from node/brick");
+ "Failed to aggregate response from "
+ " node/brick");
goto out;
}
}
@@ -746,14 +711,122 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
stage_done:
if (ret) {
gf_log (this->name, GF_LOG_ERROR, LOGSTR_STAGE_FAIL,
- gd_op_list[op], hostname, (op_errstr) ? ":" : " ",
- (op_errstr) ? op_errstr : " ");
- if (op_errstr == NULL)
- gf_asprintf (&op_errstr, OPERRSTR_STAGE_FAIL, hostname);
+ gd_op_list[op], hostname, (*op_errstr) ? ":" : " ",
+ (*op_errstr) ? *op_errstr : " ");
+ if (*op_errstr == NULL)
+ gf_asprintf (op_errstr, OPERRSTR_STAGE_FAIL, hostname);
goto out;
}
+out:
+ if (rsp_dict)
+ dict_unref (rsp_dict);
+ return ret;
+}
- /*brick op */
+int
+gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
+ dict_t *req_dict, char **op_errstr)
+{
+ dict_t *rsp_dict = NULL;
+ int ret = -1;
+ char *hostname = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
+ xlator_t *this = NULL;
+ uuid_t tmp_uuid = {0};
+
+ this = THIS;
+ rsp_dict = dict_new ();
+ if (!rsp_dict) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = glusterd_op_commit_perform (op, req_dict, op_errstr, rsp_dict);
+ if (ret) {
+ hostname = "localhost";
+ goto commit_done;
+ }
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ "Failed to aggregate response "
+ "from node/brick");
+ goto out;
+ }
+ dict_unref (rsp_dict);
+ rsp_dict = NULL;
+
+ list_for_each_entry (peerinfo, peers, op_peers_list) {
+ ret = gd_syncop_mgmt_commit_op (peerinfo->rpc,
+ MY_UUID, tmp_uuid,
+ op, req_dict, &rsp_dict,
+ op_errstr);
+ if (ret) {
+ hostname = peerinfo->hostname;
+ goto commit_done;
+ }
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ "Failed to aggregate "
+ "response from node/brick");
+ goto out;
+ }
+ dict_unref (rsp_dict);
+ rsp_dict = NULL;
+ }
+commit_done:
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL,
+ gd_op_list[op], hostname, (*op_errstr) ? ":" : " ",
+ (*op_errstr) ? *op_errstr : " ");
+ if (*op_errstr == NULL)
+ gf_asprintf (op_errstr, OPERRSTR_COMMIT_FAIL,
+ hostname);
+ goto out;
+ } else {
+ glusterd_op_modify_op_ctx (op, op_ctx);
+ }
+
+out:
+ if (rsp_dict)
+ dict_unref (rsp_dict);
+ return ret;
+}
+
+int
+gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int ret,
+ rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr)
+{
+ glusterd_peerinfo_t *peerinfo = NULL;
+ glusterd_peerinfo_t *tmp = NULL;
+ uuid_t tmp_uuid = {0};
+
+ list_for_each_entry_safe (peerinfo, tmp, peers, op_peers_list) {
+ gd_syncop_mgmt_unlock (peerinfo->rpc,
+ MY_UUID, tmp_uuid);
+ list_del_init (&peerinfo->op_peers_list);
+ }
+
+ glusterd_op_send_cli_response (op, ret, 0, req, op_ctx, op_errstr);
+ glusterd_op_clear_op (op);
+ glusterd_unlock (MY_UUID);
+
+ return 0;
+}
+
+int
+gd_brick_op_phase (glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict, char **op_errstr)
+{
+ glusterd_pending_node_t *pending_node = NULL;
+ struct list_head selected = {0,};
+ xlator_t *this = NULL;
+ int brick_count = 0;
+ int ret = -1;
+ rpc_clnt_t *rpc = NULL;
+ dict_t *rsp_dict = NULL;
+
+ this = THIS;
rsp_dict = dict_new ();
if (!rsp_dict) {
ret = -1;
@@ -761,15 +834,20 @@ stage_done:
}
INIT_LIST_HEAD (&selected);
- ret = glusterd_op_bricks_select (op, req_dict, &op_errstr,
- &selected, rsp_dict);
+ ret = glusterd_op_bricks_select (op, req_dict, op_errstr, &selected, rsp_dict);
if (ret) {
gf_log (this->name, GF_LOG_ERROR, "%s",
- (op_errstr)? op_errstr: "Brick op failed. Check "
+ (*op_errstr)? *op_errstr: "Brick op failed. Check "
"glusterd log file for more details.");
goto out;
}
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict);
+ if (ret)
+ goto out;
+ dict_unref (rsp_dict);
+ rsp_dict = NULL;
+
brick_count = 0;
list_for_each_entry (pending_node, &selected, list) {
rpc = glusterd_pending_node_get_rpc (pending_node);
@@ -787,89 +865,99 @@ stage_done:
goto out;
}
ret = gd_syncop_mgmt_brick_op (rpc, pending_node, op, req_dict,
- op_ctx, &op_errstr);
+ op_ctx, op_errstr);
if (ret)
goto out;
brick_count++;
}
+ ret = 0;
+out:
+ if (rsp_dict)
+ dict_unref (rsp_dict);
gf_log (this->name, GF_LOG_DEBUG, "Sent op req to %d bricks",
brick_count);
+ return ret;
+}
- /* commit op */
- ret = glusterd_op_commit_perform (op, req_dict, &op_errstr, rsp_dict);
+void
+gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
+{
+ int ret = -1;
+ dict_t *req_dict = NULL;
+ glusterd_conf_t *conf = NULL;
+ glusterd_op_t op = 0;
+ int32_t tmp_op = 0;
+ char *op_errstr = NULL;
+ xlator_t *this = NULL;
+ gf_boolean_t local_locked = _gf_false;
+
+ this = THIS;
+ GF_ASSERT (this);
+ conf = this->private;
+ GF_ASSERT (conf);
+
+ ret = dict_get_int32 (op_ctx, GD_SYNC_OPCODE_KEY, &tmp_op);
if (ret) {
- hostname = "localhost";
- goto commit_done;
+ gf_log (this->name, GF_LOG_ERROR, "Failed to get volume "
+ "operation");
+ goto out;
}
- ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, op_errstr);
+
+ op = tmp_op;
+ ret = glusterd_lock (MY_UUID);
if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "%s",
- (op_errstr)? op_errstr: "Failed to aggregate response "
- "from node/brick");
+ gf_log (this->name, GF_LOG_ERROR, "Unable to acquire lock");
+ gf_asprintf (&op_errstr, "Another transaction is in progress. "
+ "Please try again after sometime.");
goto out;
}
- dict_unref (rsp_dict);
- rsp_dict = NULL;
+ local_locked = _gf_true;
- list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) {
- ret = gd_syncop_mgmt_commit_op (peerinfo->rpc,
- MY_UUID, tmp_uuid,
- op, req_dict, &rsp_dict,
- &op_errstr);
- if (ret) {
- hostname = peerinfo->hostname;
- goto commit_done;
- }
- ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict,
- op_errstr);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "%s",
- (op_errstr)? op_errstr: "Failed to aggregate "
- "response from node/brick");
- goto out;
- }
- dict_unref (rsp_dict);
- rsp_dict = NULL;
- }
-commit_done:
+ /* storing op globally to access in synctask code paths
+ * This is still acceptable, as we are performing this under
+ * the 'cluster' lock*/
+ glusterd_op_set_op (op);
+ INIT_LIST_HEAD (&conf->xaction_peers);
+ gd_build_peers_list (&conf->peers, &conf->xaction_peers);
+
+ ret = gd_lock_op_phase (&conf->xaction_peers, &op_errstr);
+ if (ret)
+ goto out;
+
+ ret = glusterd_op_build_payload (&req_dict, &op_errstr, op_ctx);
if (ret) {
- gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL,
- gd_op_list[op], hostname, (op_errstr) ? ":" : " ",
- (op_errstr) ? op_errstr : " ");
+ gf_log (this->name, GF_LOG_ERROR, LOGSTR_BUILD_PAYLOAD,
+ gd_op_list[op]);
if (op_errstr == NULL)
- gf_asprintf (&op_errstr, OPERRSTR_COMMIT_FAIL,
- hostname);
+ gf_asprintf (&op_errstr, OPERRSTR_BUILD_PAYLOAD);
goto out;
- } else {
- glusterd_op_modify_op_ctx (op, op_ctx);
- }
+ }
- ret = 0;
-out:
- if (local_locked) {
- list_for_each_entry_safe (peerinfo, tmp, &conf->xaction_peers,
- op_peers_list) {
- gd_syncop_mgmt_unlock (peerinfo->rpc,
- MY_UUID, tmp_uuid);
- list_del_init (&peerinfo->op_peers_list);
- }
+ ret = gd_stage_op_phase (&conf->xaction_peers, op, op_ctx, req_dict,
+ &op_errstr);
+ if (ret)
+ goto out;
- /* Local node should be the one to be locked first,
- unlocked last to prevent races */
- glusterd_op_clear_op (op);
- glusterd_unlock (MY_UUID);
- }
+ ret = gd_brick_op_phase (op, op_ctx, req_dict, &op_errstr);
+ if (ret)
+ goto out;
- glusterd_op_send_cli_response (op, ret, 0, req, op_ctx, op_errstr);
+ ret = gd_commit_op_phase (&conf->xaction_peers, op, op_ctx, req_dict,
+ &op_errstr);
+ if (ret)
+ goto out;
+
+ ret = 0;
+out:
+ if (local_locked)
+ (void) gd_unlock_op_phase (&conf->xaction_peers, op, ret, req,
+ op_ctx, op_errstr);
if (req_dict)
dict_unref (req_dict);
- if (ret && rsp_dict)
- dict_unref (rsp_dict);
-
if (op_errstr)
GF_FREE (op_errstr);