summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-syncop.c270
1 files changed, 242 insertions, 28 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c
index 2b1c88c3cc3..a8f58e0255d 100644
--- a/xlators/mgmt/glusterd/src/glusterd-syncop.c
+++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c
@@ -80,6 +80,7 @@ out:
/* Defined in glusterd-rpc-ops.c */
extern struct rpc_clnt_program gd_mgmt_prog;
+extern struct rpc_clnt_program gd_brick_prog;
int32_t
gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov,
@@ -312,6 +313,137 @@ out:
/*TODO: Need to add syncop for brick ops*/
int32_t
+gd_syncop_brick_op_cbk (struct rpc_req *req, struct iovec *iov,
+ int count, void *myframe)
+{
+ struct syncargs *args = NULL;
+ gd1_mgmt_brick_op_rsp rsp = {0,};
+ int ret = -1;
+ call_frame_t *frame = NULL;
+
+ frame = myframe;
+ args = frame->local;
+ frame->local = NULL;
+
+ /* initialize */
+ args->op_ret = -1;
+ args->op_errno = EINVAL;
+
+ if (-1 == req->rpc_status) {
+ args->op_errno = ENOTCONN;
+ goto out;
+ }
+
+ ret = xdr_to_generic (*iov, &rsp,
+ (xdrproc_t)xdr_gd1_mgmt_brick_op_rsp);
+ if (ret < 0)
+ goto out;
+
+ if (rsp.output.output_len) {
+ args->dict = dict_new ();
+ if (!args->dict) {
+ ret = -1;
+ args->op_errno = ENOMEM;
+ goto out;
+ }
+
+ ret = dict_unserialize (rsp.output.output_val,
+ rsp.output.output_len,
+ &args->dict);
+ if (ret < 0)
+ goto out;
+ }
+
+ args->op_ret = rsp.op_ret;
+ args->op_errno = rsp.op_errno;
+ args->errstr = gf_strdup (rsp.op_errstr);
+
+out:
+ if (strcmp (rsp.op_errstr, "") != 0)
+ free (rsp.op_errstr);
+ free (rsp.output.output_val);
+ STACK_DESTROY (frame->root);
+
+ __wake (args);
+
+ return 0;
+}
+
+static void
+gd_brick_op_req_free (gd1_mgmt_brick_op_req *req)
+{
+ if (!req)
+ return;
+
+ if (strcmp (req->name, "") != 0)
+ GF_FREE (req->name);
+ GF_FREE (req->input.input_val);
+ GF_FREE (req);
+}
+
+int
+gd_syncop_mgmt_brick_op (struct rpc_clnt *rpc, glusterd_pending_node_t *pnode,
+ int op, dict_t *dict_out, dict_t *op_ctx,
+ char **errstr)
+{
+ struct syncargs args = {0, };
+ gd1_mgmt_brick_op_req *req = NULL;
+ int ret = 0;
+ xlator_t *this = NULL;
+
+ this = THIS;
+ args.op_ret = -1;
+ args.op_errno = ENOTCONN;
+
+ if ((pnode->type == GD_NODE_NFS) ||
+ ((pnode->type == GD_NODE_SHD) &&
+ (op == GD_OP_STATUS_VOLUME))) {
+ ret = glusterd_node_op_build_payload
+ (op, &req, dict_out);
+
+ } else {
+ ret = glusterd_brick_op_build_payload
+ (op, pnode->node, &req, dict_out);
+
+ }
+
+ if (ret)
+ goto out;
+
+ GD_SYNCOP (rpc, (&args), gd_syncop_brick_op_cbk,
+ req, &gd_brick_prog, req->op,
+ xdr_gd1_mgmt_brick_op_req);
+
+ if (args.errstr && errstr)
+ *errstr = args.errstr;
+ else
+ GF_FREE (args.errstr);
+
+ if (GD_OP_STATUS_VOLUME == op) {
+ ret = dict_set_int32 (args.dict, "index", pnode->index);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Error setting index on brick status"
+ " rsp dict");
+ args.op_ret = -1;
+ goto out;
+ }
+ }
+ if (args.op_ret == 0)
+ glusterd_handle_node_rsp (dict_out, pnode->node, op,
+ args.dict, op_ctx, errstr,
+ pnode->type);
+
+out:
+ errno = args.op_errno;
+ if (args.dict)
+ dict_unref (args.dict);
+ gd_brick_op_req_free (req);
+ return args.op_ret;
+
+}
+
+int32_t
gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
@@ -416,7 +548,7 @@ static int
glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp,
char *op_errstr)
{
- int ret = -1;
+ int ret = 0;
switch (op) {
case GD_OP_REPLACE_BRICK:
@@ -473,19 +605,23 @@ out:
void
gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
{
- int ret = -1;
- dict_t *req_dict = NULL;
- dict_t *rsp_dict = NULL;
- glusterd_peerinfo_t *peerinfo = NULL;
- glusterd_peerinfo_t *tmp = NULL;
- glusterd_conf_t *conf = NULL;
- uuid_t tmp_uuid = {0,};
- glusterd_op_t op = 0;
- int32_t tmp_op = 0;
- gf_boolean_t local_locked = _gf_false;
- char *op_errstr = NULL;
- xlator_t *this = NULL;
- char *hostname = NULL;
+ int ret = -1;
+ dict_t *req_dict = NULL;
+ dict_t *rsp_dict = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
+ glusterd_peerinfo_t *tmp = NULL;
+ glusterd_conf_t *conf = NULL;
+ uuid_t tmp_uuid = {0,};
+ glusterd_op_t op = 0;
+ int32_t tmp_op = 0;
+ gf_boolean_t local_locked = _gf_false;
+ char *op_errstr = NULL;
+ glusterd_pending_node_t *pending_node = NULL;
+ rpc_clnt_t *rpc = NULL;
+ int brick_count = 0;
+ struct list_head selected = {0};
+ xlator_t *this = NULL;
+ char *hostname = NULL;
this = THIS;
GF_ASSERT (this);
@@ -501,11 +637,6 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
op = tmp_op;
- ret = -1;
- rsp_dict = dict_new ();
- if (!rsp_dict)
- goto out;
-
/* Lock everything */
ret = glusterd_lock (MY_UUID);
if (ret) {
@@ -554,12 +685,30 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
}
/* stage op */
+ ret = -1;
+ rsp_dict = dict_new ();
+ if (!rsp_dict)
+ goto out;
+
ret = glusterd_op_stage_validate (op, req_dict, &op_errstr, rsp_dict);
if (ret) {
hostname = "localhost";
goto stage_done;
}
+ if (op == GD_OP_REPLACE_BRICK) {
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict,
+ op_errstr);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ (op_errstr)? op_errstr: "Failed to aggregate "
+ "response from node/brick");
+ goto out;
+ }
+ }
+ dict_unref (rsp_dict);
+ rsp_dict = NULL;
+
list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) {
ret = gd_syncop_mgmt_stage_op (peerinfo->rpc,
MY_UUID, tmp_uuid,
@@ -570,13 +719,19 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
goto stage_done;
}
- if (op == GD_OP_REPLACE_BRICK)
- (void) glusterd_syncop_aggr_rsp_dict (op, op_ctx,
+ if (op == GD_OP_REPLACE_BRICK) {
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx,
rsp_dict,
op_errstr);
-
- if (rsp_dict)
- dict_unref (rsp_dict);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ (op_errstr)? op_errstr: "Failed to "
+ "aggregate response from node/brick");
+ goto out;
+ }
+ }
+ dict_unref (rsp_dict);
+ rsp_dict = NULL;
}
stage_done:
@@ -589,12 +744,64 @@ stage_done:
goto out;
}
+ /*brick op */
+ INIT_LIST_HEAD (&selected);
+ ret = glusterd_op_bricks_select (op, req_dict, &op_errstr, &selected);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ (op_errstr)? op_errstr: "Brick op failed. Check "
+ "glusterd log file for more details.");
+ goto out;
+ }
+
+ brick_count = 0;
+ list_for_each_entry (pending_node, &selected, list) {
+ rpc = glusterd_pending_node_get_rpc (pending_node);
+ if (!rpc) {
+ if (pending_node->type == GD_NODE_REBALANCE) {
+ ret = 0;
+ glusterd_defrag_volume_node_rsp (req_dict,
+ NULL, op_ctx);
+ goto out;
+ }
+
+ ret = -1;
+ gf_log (this->name, GF_LOG_ERROR, "Brick Op failed "
+ "due to rpc failure.");
+ goto out;
+ }
+ ret = gd_syncop_mgmt_brick_op (rpc, pending_node, op, req_dict,
+ op_ctx, &op_errstr);
+ if (ret)
+ goto out;
+
+ brick_count++;
+ }
+
+ gf_log (this->name, GF_LOG_DEBUG, "Sent op req to %d bricks",
+ brick_count);
+
/* commit op */
+ rsp_dict = dict_new ();
+ if (!rsp_dict) {
+ ret = -1;
+ goto out;
+ }
+
ret = glusterd_op_commit_perform (op, req_dict, &op_errstr, rsp_dict);
if (ret) {
hostname = "localhost";
goto commit_done;
}
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, op_errstr);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ (op_errstr)? op_errstr: "Failed to aggregate response "
+ "from node/brick");
+ goto out;
+ }
+ dict_unref (rsp_dict);
+ rsp_dict = NULL;
list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) {
ret = gd_syncop_mgmt_commit_op (peerinfo->rpc,
@@ -605,10 +812,16 @@ stage_done:
hostname = peerinfo->hostname;
goto commit_done;
}
- (void) glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict,
- op_errstr);
- if (rsp_dict)
- dict_unref (rsp_dict);
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict,
+ op_errstr);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ (op_errstr)? op_errstr: "Failed to aggregate "
+ "response from node/brick");
+ goto out;
+ }
+ dict_unref (rsp_dict);
+ rsp_dict = NULL;
}
commit_done:
if (ret) {
@@ -665,6 +878,7 @@ glusterd_op_begin_synctask (rpcsvc_request_t *req, glusterd_op_t op,
}
gd_sync_task_begin (dict, req);
+ ret = 0;
out:
if (dict)
dict_unref (dict);