summaryrefslogtreecommitdiffstats
path: root/xlators/mgmt/glusterd/src/glusterd-syncop.c
diff options
context:
space:
mode:
authorKrishnan Parthasarathi <kparthas@redhat.com>2013-02-20 14:44:23 +0530
committerVijay Bellur <vbellur@redhat.com>2013-02-26 09:06:12 -0800
commit5e6dfce0b0d55d96b5bdad6a693fdb2826c20b92 (patch)
tree8521f9b3a8e5603b3c3c76efa4ca07210884acba /xlators/mgmt/glusterd/src/glusterd-syncop.c
parentdc43e7dd9934925f8cb96762c33be23ccb63528a (diff)
glusterd: Increasing throughput of synctask based mgmt ops.
Change-Id: Ibd963f78707b157fc4c9729aa87206cfd5ecfe81 BUG: 913662 Signed-off-by: Krishnan Parthasarathi <kparthas@redhat.com> Reviewed-on: http://review.gluster.org/4570 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Amar Tumballi <amarts@redhat.com> Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Diffstat (limited to 'xlators/mgmt/glusterd/src/glusterd-syncop.c')
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-syncop.c699
1 files changed, 374 insertions, 325 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c
index a01753f3a..b943d8e94 100644
--- a/xlators/mgmt/glusterd/src/glusterd-syncop.c
+++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c
@@ -8,7 +8,6 @@
cases as published by the Free Software Foundation.
*/
/* rpc related syncops */
-#include "syncop.h"
#include "rpc-clnt.h"
#include "protocol-common.h"
#include "xdr-generic.h"
@@ -19,6 +18,57 @@
#include "glusterd-op-sm.h"
#include "glusterd-utils.h"
+static void
+gd_collate_errors (struct syncargs *args, int op_ret, int op_errno,
+ char *op_errstr)
+{
+ if (args->op_ret)
+ return;
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+ if (op_ret && op_errstr && strcmp (op_errstr, ""))
+ args->errstr = gf_strdup (op_errstr);
+}
+
+static void
+gd_syncargs_init (struct syncargs *args, dict_t *op_ctx)
+{
+ args->dict = op_ctx;
+ pthread_mutex_init (&args->lock_dict, NULL);
+}
+
+static void
+gd_stage_op_req_free (gd1_mgmt_stage_op_req *req)
+{
+ if (!req)
+ return;
+
+ GF_FREE (req->buf.buf_val);
+ GF_FREE (req);
+}
+
+static void
+gd_commit_op_req_free (gd1_mgmt_commit_op_req *req)
+{
+ if (!req)
+ return;
+
+ GF_FREE (req->buf.buf_val);
+ GF_FREE (req);
+}
+
+static void
+gd_brick_op_req_free (gd1_mgmt_brick_op_req *req)
+{
+ if (!req)
+ return;
+
+ if (strcmp (req->name, "") != 0)
+ GF_FREE (req->name);
+ GF_FREE (req->input.input_val);
+ GF_FREE (req);
+}
+
int
gd_syncop_submit_request (struct rpc_clnt *rpc, void *req,
void *cookie, rpc_clnt_prog_t *prog,
@@ -82,23 +132,84 @@ out:
extern struct rpc_clnt_program gd_mgmt_prog;
extern struct rpc_clnt_program gd_brick_prog;
+static int
+glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp)
+{
+ int ret = 0;
+
+ switch (op) {
+ case GD_OP_REPLACE_BRICK:
+ ret = glusterd_rb_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_SYNC_VOLUME:
+ ret = glusterd_sync_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_PROFILE_VOLUME:
+ ret = glusterd_profile_volume_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_GSYNC_SET:
+ ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_STATUS_VOLUME:
+ ret = glusterd_volume_status_copy_to_op_ctx_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_REBALANCE:
+ case GD_OP_DEFRAG_BRICK_VOLUME:
+ ret = glusterd_volume_rebalance_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_HEAL_VOLUME:
+ ret = glusterd_volume_heal_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+
+ break;
+
+ case GD_OP_QUOTA:
+ case GD_OP_CLEARLOCKS_VOLUME:
+ ret = glusterd_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+
+ break;
+
+ default:
+ break;
+ }
+out:
+ return ret;
+}
+
int32_t
gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
- struct syncargs *args = NULL;
- gd1_mgmt_cluster_lock_rsp rsp = {{0},};
- int ret = -1;
- call_frame_t *frame = NULL;
+ int ret = -1;
+ struct syncargs *args = NULL;
+ gd1_mgmt_cluster_lock_rsp rsp = {{0},};
+ call_frame_t *frame = NULL;
- frame = myframe;
- args = frame->local;
+ frame = myframe;
+ args = frame->local;
frame->local = NULL;
- /* initialize */
- args->op_ret = -1;
- args->op_errno = EINVAL;
-
if (-1 == req->rpc_status) {
args->op_errno = ENOTCONN;
goto out;
@@ -106,64 +217,49 @@ gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov,
ret = xdr_to_generic (*iov, &rsp,
(xdrproc_t)xdr_gd1_mgmt_cluster_lock_rsp);
- if (ret < 0) {
+ if (ret < 0)
goto out;
- }
-
- args->op_ret = rsp.op_ret;
- args->op_errno = rsp.op_errno;
+ gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL);
uuid_copy (args->uuid, rsp.uuid);
out:
STACK_DESTROY (frame->root);
-
- __wake (args);
-
+ synctask_barrier_wake(args);
return 0;
}
int
-gd_syncop_mgmt_lock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid)
+gd_syncop_mgmt_lock (struct rpc_clnt *rpc, struct syncargs *args,
+ uuid_t my_uuid, uuid_t recv_uuid)
{
- struct syncargs args = {0, };
+ int ret = -1;
gd1_mgmt_cluster_lock_req req = {{0},};
uuid_copy (req.uuid, my_uuid);
-
- args.op_ret = -1;
- args.op_errno = ENOTCONN;
-
- GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_lock_cbk,
- &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_LOCK,
- xdr_gd1_mgmt_cluster_lock_req);
-
- if (!args.op_ret)
- uuid_copy (recv_uuid, args.uuid);
-
- errno = args.op_errno;
- return args.op_ret;
-
+ ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog,
+ GLUSTERD_MGMT_CLUSTER_LOCK,
+ gd_syncop_mgmt_lock_cbk,
+ (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req);
+ if (ret)
+ synctask_barrier_wake(args);
+ return ret;
}
int32_t
gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
- struct syncargs *args = NULL;
- gd1_mgmt_cluster_unlock_rsp rsp = {{0},};
- int ret = -1;
- call_frame_t *frame = NULL;
+ int ret = -1;
+ struct syncargs *args = NULL;
+ gd1_mgmt_cluster_unlock_rsp rsp = {{0},};
+ call_frame_t *frame = NULL;
frame = myframe;
- args = frame->local;
+ args = frame->local;
frame->local = NULL;
- /* initialize */
- args->op_ret = -1;
- args->op_errno = EINVAL;
-
if (-1 == req->rpc_status) {
args->op_errno = ENOTCONN;
goto out;
@@ -171,154 +267,143 @@ gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov,
ret = xdr_to_generic (*iov, &rsp,
(xdrproc_t)xdr_gd1_mgmt_cluster_unlock_rsp);
- if (ret < 0) {
+ if (ret < 0)
goto out;
- }
-
- args->op_ret = rsp.op_ret;
- args->op_errno = rsp.op_errno;
+ gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL);
uuid_copy (args->uuid, rsp.uuid);
out:
STACK_DESTROY (frame->root);
-
- __wake (args);
-
+ synctask_barrier_wake(args);
return 0;
}
int
-gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid)
+gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, struct syncargs *args,
+ uuid_t my_uuid, uuid_t recv_uuid)
{
- struct syncargs args = {0, };
- gd1_mgmt_cluster_unlock_req req = {{0},};
+ int ret = -1;
+ gd1_mgmt_cluster_unlock_req req = {{0},};
uuid_copy (req.uuid, my_uuid);
-
- args.op_ret = -1;
- args.op_errno = ENOTCONN;
-
- GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_unlock_cbk,
- &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_UNLOCK,
- xdr_gd1_mgmt_cluster_unlock_req);
-
- if (!args.op_ret)
- uuid_copy (recv_uuid, args.uuid);
-
- errno = args.op_errno;
- return args.op_ret;
-
+ ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog,
+ GLUSTERD_MGMT_CLUSTER_UNLOCK,
+ gd_syncop_mgmt_unlock_cbk,
+ (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req);
+ if (ret)
+ synctask_barrier_wake(args);
+ return ret;
}
int32_t
gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
- struct syncargs *args = NULL;
- gd1_mgmt_stage_op_rsp rsp = {{0},};
- int ret = -1;
- call_frame_t *frame = NULL;
-
+ int ret = -1;
+ gd1_mgmt_stage_op_rsp rsp = {{0},};
+ struct syncargs *args = NULL;
+ xlator_t *this = NULL;
+ dict_t *rsp_dict = NULL;
+ call_frame_t *frame = NULL;
+
+ this = THIS;
frame = myframe;
- args = frame->local;
+ args = frame->local;
frame->local = NULL;
- /* initialize */
- args->op_ret = -1;
- args->op_errno = EINVAL;
-
if (-1 == req->rpc_status) {
+ args->op_ret = -1;
args->op_errno = ENOTCONN;
goto out;
}
ret = xdr_to_generic (*iov, &rsp,
(xdrproc_t)xdr_gd1_mgmt_stage_op_rsp);
- if (ret < 0) {
+ if (ret < 0)
goto out;
- }
if (rsp.dict.dict_len) {
/* Unserialize the dictionary */
- args->dict = dict_new ();
+ rsp_dict = dict_new ();
ret = dict_unserialize (rsp.dict.dict_val,
rsp.dict.dict_len,
- &args->dict);
+ &rsp_dict);
if (ret < 0) {
GF_FREE (rsp.dict.dict_val);
goto out;
} else {
- args->dict->extra_stdfree = rsp.dict.dict_val;
+ rsp_dict->extra_stdfree = rsp.dict.dict_val;
}
}
- args->op_ret = rsp.op_ret;
- args->op_errno = rsp.op_errno;
-
+ gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr);
uuid_copy (args->uuid, rsp.uuid);
-
- args->errstr = gf_strdup (rsp.op_errstr);
+ if (rsp.op == GD_OP_REPLACE_BRICK) {
+ pthread_mutex_lock (&args->lock_dict);
+ {
+ ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict,
+ rsp_dict);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ "Failed to aggregate response from "
+ " node/brick");
+ }
+ pthread_mutex_unlock (&args->lock_dict);
+ }
out:
- STACK_DESTROY (frame->root);
-
- __wake (args);
+ if (rsp_dict)
+ dict_unref (rsp_dict);
+ STACK_DESTROY (frame->root);
+ synctask_barrier_wake(args);
return 0;
}
int
-gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid,
- int op, dict_t *dict_out, dict_t **dict_in,
- char **errstr)
+gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, struct syncargs *args,
+ uuid_t my_uuid, uuid_t recv_uuid, int op,
+ dict_t *dict_out, dict_t *op_ctx)
{
- struct syncargs args = {0, };
- gd1_mgmt_stage_op_req req = {{0},};
- int ret = 0;
+ gd1_mgmt_stage_op_req *req = NULL;
+ int ret = -1;
- uuid_copy (req.uuid, my_uuid);
- req.op = op;
+ req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_stage_req_t);
+ if (!req)
+ goto out;
- args.op_ret = -1;
- args.op_errno = ENOTCONN;
+ uuid_copy (req->uuid, my_uuid);
+ req->op = op;
ret = dict_allocate_and_serialize (dict_out,
- &req.buf.buf_val, &req.buf.buf_len);
+ &req->buf.buf_val, &req->buf.buf_len);
if (ret)
goto out;
- GD_SYNCOP (rpc, (&args), gd_syncop_stage_op_cbk,
- &req, &gd_mgmt_prog, GLUSTERD_MGMT_STAGE_OP,
- xdr_gd1_mgmt_stage_op_req);
-
- if (args.errstr && errstr)
- *errstr = args.errstr;
- else GF_FREE (args.errstr);
-
- if (args.dict && dict_in)
- *dict_in = args.dict;
- else if (args.dict)
- dict_unref (args.dict);
-
- uuid_copy (recv_uuid, args.uuid);
+ ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog,
+ GLUSTERD_MGMT_STAGE_OP,
+ gd_syncop_stage_op_cbk,
+ (xdrproc_t) xdr_gd1_mgmt_stage_op_req);
out:
- errno = args.op_errno;
- return args.op_ret;
+ gd_stage_op_req_free (req);
+ if (ret)
+ synctask_barrier_wake(args);
+
+ return ret;
}
-/*TODO: Need to add syncop for brick ops*/
int32_t
gd_syncop_brick_op_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
struct syncargs *args = NULL;
gd1_mgmt_brick_op_rsp rsp = {0,};
- int ret = -1;
+ int ret = -1;
call_frame_t *frame = NULL;
frame = myframe;
@@ -362,25 +447,13 @@ out:
if (strcmp (rsp.op_errstr, "") != 0)
free (rsp.op_errstr);
free (rsp.output.output_val);
- STACK_DESTROY (frame->root);
+ STACK_DESTROY (frame->root);
__wake (args);
return 0;
}
-static void
-gd_brick_op_req_free (gd1_mgmt_brick_op_req *req)
-{
- if (!req)
- return;
-
- if (strcmp (req->name, "") != 0)
- GF_FREE (req->name);
- GF_FREE (req->input.input_val);
- GF_FREE (req);
-}
-
int
gd_syncop_mgmt_brick_op (struct rpc_clnt *rpc, glusterd_pending_node_t *pnode,
int op, dict_t *dict_out, dict_t *op_ctx,
@@ -447,19 +520,18 @@ int32_t
gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
- struct syncargs *args = NULL;
- gd1_mgmt_commit_op_rsp rsp = {{0},};
- int ret = -1;
- call_frame_t *frame = NULL;
-
+ int ret = -1;
+ gd1_mgmt_commit_op_rsp rsp = {{0},};
+ struct syncargs *args = NULL;
+ xlator_t *this = NULL;
+ dict_t *rsp_dict = NULL;
+ call_frame_t *frame = NULL;
+
+ this = THIS;
frame = myframe;
- args = frame->local;
+ args = frame->local;
frame->local = NULL;
- /* initialize */
- args->op_ret = -1;
- args->op_errno = EINVAL;
-
if (-1 == req->rpc_status) {
args->op_errno = ENOTCONN;
goto out;
@@ -473,146 +545,80 @@ gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov,
if (rsp.dict.dict_len) {
/* Unserialize the dictionary */
- args->dict = dict_new ();
+ rsp_dict = dict_new ();
ret = dict_unserialize (rsp.dict.dict_val,
rsp.dict.dict_len,
- &args->dict);
+ &rsp_dict);
if (ret < 0) {
GF_FREE (rsp.dict.dict_val);
goto out;
} else {
- args->dict->extra_stdfree = rsp.dict.dict_val;
+ rsp_dict->extra_stdfree = rsp.dict.dict_val;
}
}
- args->op_ret = rsp.op_ret;
- args->op_errno = rsp.op_errno;
-
+ gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr);
uuid_copy (args->uuid, rsp.uuid);
-
- args->errstr = gf_strdup (rsp.op_errstr);
-
+ pthread_mutex_lock (&args->lock_dict);
+ {
+ ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict,
+ rsp_dict);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ "Failed to aggregate response from "
+ " node/brick");
+ }
+ pthread_mutex_unlock (&args->lock_dict);
out:
- STACK_DESTROY (frame->root);
+ if (rsp_dict)
+ dict_unref (rsp_dict);
- __wake (args);
+ STACK_DESTROY (frame->root);
+ synctask_barrier_wake(args);
return 0;
}
int
-gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid,
- int op, dict_t *dict_out, dict_t **dict_in,
- char **errstr)
+gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, struct syncargs *args,
+ uuid_t my_uuid, uuid_t recv_uuid,
+ int op, dict_t *dict_out, dict_t *op_ctx)
{
- struct syncargs args = {0, };
- gd1_mgmt_commit_op_req req = {{0},};
- int ret = 0;
+ gd1_mgmt_commit_op_req *req = NULL;
+ int ret = -1;
- uuid_copy (req.uuid, my_uuid);
- req.op = op;
+ req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_commit_req_t);
+ if (!req)
+ goto out;
- args.op_ret = -1;
- args.op_errno = ENOTCONN;
+ uuid_copy (req->uuid, my_uuid);
+ req->op = op;
ret = dict_allocate_and_serialize (dict_out,
- &req.buf.buf_val, &req.buf.buf_len);
+ &req->buf.buf_val, &req->buf.buf_len);
if (ret)
goto out;
- GD_SYNCOP (rpc, (&args), gd_syncop_commit_op_cbk,
- &req, &gd_mgmt_prog, GLUSTERD_MGMT_COMMIT_OP,
- xdr_gd1_mgmt_commit_op_req);
-
- if (args.errstr && errstr)
- *errstr = args.errstr;
- else GF_FREE (args.errstr);
-
- if (args.dict && dict_in)
- *dict_in = args.dict;
- else if (args.dict)
- dict_unref (args.dict);
-
- uuid_copy (recv_uuid, args.uuid);
-
+ ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog,
+ GLUSTERD_MGMT_COMMIT_OP ,
+ gd_syncop_commit_op_cbk,
+ (xdrproc_t) xdr_gd1_mgmt_commit_op_req);
out:
- errno = args.op_errno;
- return args.op_ret;
-
-}
-
-
-static int
-glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp)
-{
- int ret = 0;
-
- switch (op) {
- case GD_OP_REPLACE_BRICK:
- ret = glusterd_rb_use_rsp_dict (aggr, rsp);
- if (ret)
- goto out;
- break;
-
- case GD_OP_SYNC_VOLUME:
- ret = glusterd_sync_use_rsp_dict (aggr, rsp);
- if (ret)
- goto out;
- break;
-
- case GD_OP_PROFILE_VOLUME:
- ret = glusterd_profile_volume_use_rsp_dict (aggr, rsp);
- if (ret)
- goto out;
- break;
-
- case GD_OP_GSYNC_SET:
- ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL);
- if (ret)
- goto out;
- break;
-
- case GD_OP_STATUS_VOLUME:
- ret = glusterd_volume_status_copy_to_op_ctx_dict (aggr, rsp);
- if (ret)
- goto out;
- break;
-
- case GD_OP_REBALANCE:
- case GD_OP_DEFRAG_BRICK_VOLUME:
- ret = glusterd_volume_rebalance_use_rsp_dict (aggr, rsp);
- if (ret)
- goto out;
- break;
-
- case GD_OP_HEAL_VOLUME:
- ret = glusterd_volume_heal_use_rsp_dict (aggr, rsp);
- if (ret)
- goto out;
-
- break;
-
- case GD_OP_QUOTA:
- case GD_OP_CLEARLOCKS_VOLUME:
- ret = glusterd_use_rsp_dict (aggr, rsp);
- if (ret)
- goto out;
-
- break;
+ gd_commit_op_req_free (req);
+ if (ret)
+ synctask_barrier_wake(args);
- default:
- break;
- }
-out:
return ret;
}
-void
+
+int
gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers)
{
glusterd_peerinfo_t *peerinfo = NULL;
+ int npeers = 0;
list_for_each_entry (peerinfo, peers, uuid_list) {
if (!peerinfo->connected)
@@ -621,29 +627,42 @@ gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers)
continue;
list_add_tail (&peerinfo->op_peers_list, xact_peers);
+ npeers++;
}
+ return npeers;
}
int
-gd_lock_op_phase (struct list_head *peers, char **op_errstr)
+gd_lock_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
+ char **op_errstr, int npeers)
{
- glusterd_peerinfo_t *peerinfo = NULL;
- uuid_t peer_uuid = {0};
int ret = -1;
+ int peer_cnt = 0;
+ uuid_t peer_uuid = {0};
xlator_t *this = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
+ struct syncargs args = {0};
+
+ if (!npeers) {
+ ret = 0;
+ goto out;
+ }
this = THIS;
+ synctask_barrier_init((&args));
+ peer_cnt = 0;
list_for_each_entry (peerinfo, peers, op_peers_list) {
- ret = gd_syncop_mgmt_lock (peerinfo->rpc,
- MY_UUID, peer_uuid);
- if (ret) {
- gf_asprintf (op_errstr, "Another transaction could be "
- "in progress. Please try again after "
- "sometime.");
- gf_log (this->name, GF_LOG_ERROR, "Failed to acquire "
- "lock on peer %s", peerinfo->hostname);
- goto out;
- }
+ gd_syncop_mgmt_lock (peerinfo->rpc, &args, MY_UUID, peer_uuid);
+ peer_cnt++;
+ }
+ synctask_barrier_wait((&args), peer_cnt);
+ ret = args.op_ret;
+ if (ret) {
+ gf_asprintf (op_errstr, "Another transaction could be "
+ "in progress. Please try again after "
+ "sometime.");
+ gf_log (this->name, GF_LOG_ERROR, "Failed to acquire lock");
+ goto out;
}
ret = 0;
@@ -653,14 +672,17 @@ out:
int
gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
- dict_t *req_dict, char **op_errstr)
+ dict_t *req_dict, char **op_errstr, int npeers)
{
- dict_t *rsp_dict = NULL;
int ret = -1;
+ int peer_cnt = 0;
+ dict_t *rsp_dict = NULL;
char *hostname = NULL;
xlator_t *this = NULL;
glusterd_peerinfo_t *peerinfo = NULL;
uuid_t tmp_uuid = {0};
+ char *errstr = NULL;
+ struct syncargs args = {0};
this = THIS;
rsp_dict = dict_new ();
@@ -684,30 +706,6 @@ gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
dict_unref (rsp_dict);
rsp_dict = NULL;
- list_for_each_entry (peerinfo, peers, op_peers_list) {
- ret = gd_syncop_mgmt_stage_op (peerinfo->rpc,
- MY_UUID, tmp_uuid,
- op, req_dict, &rsp_dict,
- op_errstr);
- if (ret) {
- hostname = peerinfo->hostname;
- goto stage_done;
- }
-
- if (op == GD_OP_REPLACE_BRICK) {
- ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx,
- rsp_dict);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "%s",
- "Failed to aggregate response from "
- " node/brick");
- goto out;
- }
- }
- dict_unref (rsp_dict);
- rsp_dict = NULL;
- }
-
stage_done:
if (ret) {
gf_log (this->name, GF_LOG_ERROR, LOGSTR_STAGE_FAIL,
@@ -717,6 +715,26 @@ stage_done:
gf_asprintf (op_errstr, OPERRSTR_STAGE_FAIL, hostname);
goto out;
}
+
+ if (!npeers) {
+ ret = 0;
+ goto out;
+ }
+
+ gd_syncargs_init (&args, op_ctx);
+ synctask_barrier_init((&args));
+ peer_cnt = 0;
+ list_for_each_entry (peerinfo, peers, op_peers_list) {
+ ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, &args,
+ MY_UUID, tmp_uuid,
+ op, req_dict, op_ctx);
+ peer_cnt++;
+ }
+ synctask_barrier_wait((&args), peer_cnt);
+ ret = args.op_ret;
+ if (dict_get_str (op_ctx, "errstr", &errstr) == 0)
+ *op_errstr = gf_strdup (errstr);
+
out:
if (rsp_dict)
dict_unref (rsp_dict);
@@ -725,14 +743,17 @@ out:
int
gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
- dict_t *req_dict, char **op_errstr)
+ dict_t *req_dict, char **op_errstr, int npeers)
{
dict_t *rsp_dict = NULL;
+ int peer_cnt = -1;
int ret = -1;
char *hostname = NULL;
glusterd_peerinfo_t *peerinfo = NULL;
xlator_t *this = NULL;
uuid_t tmp_uuid = {0};
+ char *errstr = NULL;
+ struct syncargs args = {0};
this = THIS;
rsp_dict = dict_new ();
@@ -756,25 +777,6 @@ gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
dict_unref (rsp_dict);
rsp_dict = NULL;
- list_for_each_entry (peerinfo, peers, op_peers_list) {
- ret = gd_syncop_mgmt_commit_op (peerinfo->rpc,
- MY_UUID, tmp_uuid,
- op, req_dict, &rsp_dict,
- op_errstr);
- if (ret) {
- hostname = peerinfo->hostname;
- goto commit_done;
- }
- ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "%s",
- "Failed to aggregate "
- "response from node/brick");
- goto out;
- }
- dict_unref (rsp_dict);
- rsp_dict = NULL;
- }
commit_done:
if (ret) {
gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL,
@@ -788,6 +790,24 @@ commit_done:
glusterd_op_modify_op_ctx (op, op_ctx);
}
+ if (!npeers) {
+ ret = 0;
+ goto out;
+ }
+ gd_syncargs_init (&args, op_ctx);
+ synctask_barrier_init((&args));
+ peer_cnt = 0;
+ list_for_each_entry (peerinfo, peers, op_peers_list) {
+ ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, &args,
+ MY_UUID, tmp_uuid,
+ op, req_dict, op_ctx);
+ peer_cnt++;
+ }
+ synctask_barrier_wait((&args), peer_cnt);
+ ret = args.op_ret;
+ if (dict_get_str (op_ctx, "errstr", &errstr) == 0)
+ *op_errstr = gf_strdup (errstr);
+
out:
if (rsp_dict)
dict_unref (rsp_dict);
@@ -795,20 +815,40 @@ out:
}
int
-gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int ret,
- rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr)
+gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int op_ret,
+ rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr,
+ int npeers)
{
- glusterd_peerinfo_t *peerinfo = NULL;
- glusterd_peerinfo_t *tmp = NULL;
- uuid_t tmp_uuid = {0};
+ glusterd_peerinfo_t *peerinfo = NULL;
+ glusterd_peerinfo_t *tmp = NULL;
+ uuid_t tmp_uuid = {0};
+ int peer_cnt = 0;
+ int ret = -1;
+ xlator_t *this = NULL;
+ struct syncargs args = {0};
+
+ if (!npeers) {
+ ret = 0;
+ goto out;
+ }
+ this = THIS;
+ synctask_barrier_init((&args));
+ peer_cnt = 0;
list_for_each_entry_safe (peerinfo, tmp, peers, op_peers_list) {
- gd_syncop_mgmt_unlock (peerinfo->rpc,
- MY_UUID, tmp_uuid);
+ gd_syncop_mgmt_unlock (peerinfo->rpc, &args, MY_UUID, tmp_uuid);
list_del_init (&peerinfo->op_peers_list);
+ peer_cnt++;
+ }
+ synctask_barrier_wait((&args), peer_cnt);
+ ret = args.op_ret;
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to unlock "
+ "on some peer(s)");
}
- glusterd_op_send_cli_response (op, ret, 0, req, op_ctx, op_errstr);
+out:
+ glusterd_op_send_cli_response (op, op_ret, 0, req, op_ctx, op_errstr);
glusterd_op_clear_op (op);
glusterd_unlock (MY_UUID);
@@ -816,6 +856,17 @@ gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int ret,
}
int
+gd_get_brick_count (struct list_head *bricks)
+{
+ glusterd_pending_node_t *pending_node = NULL;
+ int npeers = 0;
+ list_for_each_entry (pending_node, bricks, list) {
+ npeers++;
+ }
+ return npeers;
+}
+
+int
gd_brick_op_phase (glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict, char **op_errstr)
{
glusterd_pending_node_t *pending_node = NULL;
@@ -885,13 +936,13 @@ void
gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
{
int ret = -1;
+ int npeers = 0;
dict_t *req_dict = NULL;
glusterd_conf_t *conf = NULL;
glusterd_op_t op = 0;
int32_t tmp_op = 0;
char *op_errstr = NULL;
xlator_t *this = NULL;
- gf_boolean_t local_locked = _gf_false;
this = THIS;
GF_ASSERT (this);
@@ -913,16 +964,15 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
"Please try again after sometime.");
goto out;
}
- local_locked = _gf_true;
/* storing op globally to access in synctask code paths
* This is still acceptable, as we are performing this under
* the 'cluster' lock*/
glusterd_op_set_op (op);
INIT_LIST_HEAD (&conf->xaction_peers);
- gd_build_peers_list (&conf->peers, &conf->xaction_peers);
+ npeers = gd_build_peers_list (&conf->peers, &conf->xaction_peers);
- ret = gd_lock_op_phase (&conf->xaction_peers, &op_errstr);
+ ret = gd_lock_op_phase (&conf->xaction_peers, op, op_ctx, &op_errstr, npeers);
if (ret)
goto out;
@@ -936,7 +986,7 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
}
ret = gd_stage_op_phase (&conf->xaction_peers, op, op_ctx, req_dict,
- &op_errstr);
+ &op_errstr, npeers);
if (ret)
goto out;
@@ -945,15 +995,14 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
goto out;
ret = gd_commit_op_phase (&conf->xaction_peers, op, op_ctx, req_dict,
- &op_errstr);
+ &op_errstr, npeers);
if (ret)
goto out;
ret = 0;
out:
- if (local_locked)
- (void) gd_unlock_op_phase (&conf->xaction_peers, op, ret, req,
- op_ctx, op_errstr);
+ (void) gd_unlock_op_phase (&conf->xaction_peers, op, ret, req,
+ op_ctx, op_errstr, npeers);
if (req_dict)
dict_unref (req_dict);