diff options
author | Krishnan Parthasarathi <kparthas@redhat.com> | 2013-02-20 14:44:23 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2013-02-26 09:06:12 -0800 |
commit | 5e6dfce0b0d55d96b5bdad6a693fdb2826c20b92 (patch) | |
tree | 8521f9b3a8e5603b3c3c76efa4ca07210884acba | |
parent | dc43e7dd9934925f8cb96762c33be23ccb63528a (diff) |
glusterd: Increasing throughput of synctask based mgmt ops.
Change-Id: Ibd963f78707b157fc4c9729aa87206cfd5ecfe81
BUG: 913662
Signed-off-by: Krishnan Parthasarathi <kparthas@redhat.com>
Reviewed-on: http://review.gluster.org/4570
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Amar Tumballi <amarts@redhat.com>
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
-rw-r--r-- | libglusterfs/src/syncop.h | 1 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-mem-types.h | 3 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-syncop.c | 699 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-syncop.h | 22 |
4 files changed, 387 insertions, 338 deletions
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index ba0440cd..aec07f36 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -114,6 +114,7 @@ struct syncargs { uuid_t uuid; char *errstr; dict_t *dict; + pthread_mutex_t lock_dict; /* do not touch */ struct synctask *task; diff --git a/xlators/mgmt/glusterd/src/glusterd-mem-types.h b/xlators/mgmt/glusterd/src/glusterd-mem-types.h index a13236da..98216e28 100644 --- a/xlators/mgmt/glusterd/src/glusterd-mem-types.h +++ b/xlators/mgmt/glusterd/src/glusterd-mem-types.h @@ -65,7 +65,8 @@ typedef enum gf_gld_mem_types_ { gf_gld_mt_charptr = gf_common_mt_end + 49, gf_gld_mt_hooks_stub_t = gf_common_mt_end + 50, gf_gld_mt_hooks_priv_t = gf_common_mt_end + 51, - gf_gld_mt_end = gf_common_mt_end + 52, + gf_gld_mt_mop_commit_req_t = gf_common_mt_end + 52, + gf_gld_mt_end = gf_common_mt_end + 53, } gf_gld_mem_types_t; #endif diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c index a01753f3..b943d8e9 100644 --- a/xlators/mgmt/glusterd/src/glusterd-syncop.c +++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c @@ -8,7 +8,6 @@ cases as published by the Free Software Foundation. */ /* rpc related syncops */ -#include "syncop.h" #include "rpc-clnt.h" #include "protocol-common.h" #include "xdr-generic.h" @@ -19,6 +18,57 @@ #include "glusterd-op-sm.h" #include "glusterd-utils.h" +static void +gd_collate_errors (struct syncargs *args, int op_ret, int op_errno, + char *op_errstr) +{ + if (args->op_ret) + return; + args->op_ret = op_ret; + args->op_errno = op_errno; + if (op_ret && op_errstr && strcmp (op_errstr, "")) + args->errstr = gf_strdup (op_errstr); +} + +static void +gd_syncargs_init (struct syncargs *args, dict_t *op_ctx) +{ + args->dict = op_ctx; + pthread_mutex_init (&args->lock_dict, NULL); +} + +static void +gd_stage_op_req_free (gd1_mgmt_stage_op_req *req) +{ + if (!req) + return; + + GF_FREE (req->buf.buf_val); + GF_FREE (req); +} + +static void +gd_commit_op_req_free (gd1_mgmt_commit_op_req *req) +{ + if (!req) + return; + + GF_FREE (req->buf.buf_val); + GF_FREE (req); +} + +static void +gd_brick_op_req_free (gd1_mgmt_brick_op_req *req) +{ + if (!req) + return; + + if (strcmp (req->name, "") != 0) + GF_FREE (req->name); + GF_FREE (req->input.input_val); + GF_FREE (req); +} + int gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, void *cookie, rpc_clnt_prog_t *prog, @@ -82,23 +132,84 @@ out: extern struct rpc_clnt_program gd_mgmt_prog; extern struct rpc_clnt_program gd_brick_prog; +static int +glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp) +{ + int ret = 0; + + switch (op) { + case GD_OP_REPLACE_BRICK: + ret = glusterd_rb_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_SYNC_VOLUME: + ret = glusterd_sync_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_PROFILE_VOLUME: + ret = glusterd_profile_volume_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_GSYNC_SET: + ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL); + if (ret) + goto out; + break; + + case GD_OP_STATUS_VOLUME: + ret = glusterd_volume_status_copy_to_op_ctx_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_REBALANCE: + case GD_OP_DEFRAG_BRICK_VOLUME: + ret = glusterd_volume_rebalance_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_HEAL_VOLUME: + ret = glusterd_volume_heal_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + + break; + + case GD_OP_QUOTA: + case GD_OP_CLEARLOCKS_VOLUME: + ret = glusterd_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + + break; + + default: + break; + } +out: + return ret; +} + int32_t gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { - struct syncargs *args = NULL; - gd1_mgmt_cluster_lock_rsp rsp = {{0},}; - int ret = -1; - call_frame_t *frame = NULL; + int ret = -1; + struct syncargs *args = NULL; + gd1_mgmt_cluster_lock_rsp rsp = {{0},}; + call_frame_t *frame = NULL; - frame = myframe; - args = frame->local; + frame = myframe; + args = frame->local; frame->local = NULL; - /* initialize */ - args->op_ret = -1; - args->op_errno = EINVAL; - if (-1 == req->rpc_status) { args->op_errno = ENOTCONN; goto out; @@ -106,64 +217,49 @@ gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_cluster_lock_rsp); - if (ret < 0) { + if (ret < 0) goto out; - } - - args->op_ret = rsp.op_ret; - args->op_errno = rsp.op_errno; + gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL); uuid_copy (args->uuid, rsp.uuid); out: STACK_DESTROY (frame->root); - - __wake (args); - + synctask_barrier_wake(args); return 0; } int -gd_syncop_mgmt_lock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid) +gd_syncop_mgmt_lock (struct rpc_clnt *rpc, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid) { - struct syncargs args = {0, }; + int ret = -1; gd1_mgmt_cluster_lock_req req = {{0},}; uuid_copy (req.uuid, my_uuid); - - args.op_ret = -1; - args.op_errno = ENOTCONN; - - GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_lock_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_LOCK, - xdr_gd1_mgmt_cluster_lock_req); - - if (!args.op_ret) - uuid_copy (recv_uuid, args.uuid); - - errno = args.op_errno; - return args.op_ret; - + ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog, + GLUSTERD_MGMT_CLUSTER_LOCK, + gd_syncop_mgmt_lock_cbk, + (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req); + if (ret) + synctask_barrier_wake(args); + return ret; } int32_t gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { - struct syncargs *args = NULL; - gd1_mgmt_cluster_unlock_rsp rsp = {{0},}; - int ret = -1; - call_frame_t *frame = NULL; + int ret = -1; + struct syncargs *args = NULL; + gd1_mgmt_cluster_unlock_rsp rsp = {{0},}; + call_frame_t *frame = NULL; frame = myframe; - args = frame->local; + args = frame->local; frame->local = NULL; - /* initialize */ - args->op_ret = -1; - args->op_errno = EINVAL; - if (-1 == req->rpc_status) { args->op_errno = ENOTCONN; goto out; @@ -171,154 +267,143 @@ gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov, ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_cluster_unlock_rsp); - if (ret < 0) { + if (ret < 0) goto out; - } - - args->op_ret = rsp.op_ret; - args->op_errno = rsp.op_errno; + gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL); uuid_copy (args->uuid, rsp.uuid); out: STACK_DESTROY (frame->root); - - __wake (args); - + synctask_barrier_wake(args); return 0; } int -gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid) +gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid) { - struct syncargs args = {0, }; - gd1_mgmt_cluster_unlock_req req = {{0},}; + int ret = -1; + gd1_mgmt_cluster_unlock_req req = {{0},}; uuid_copy (req.uuid, my_uuid); - - args.op_ret = -1; - args.op_errno = ENOTCONN; - - GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_unlock_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_UNLOCK, - xdr_gd1_mgmt_cluster_unlock_req); - - if (!args.op_ret) - uuid_copy (recv_uuid, args.uuid); - - errno = args.op_errno; - return args.op_ret; - + ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog, + GLUSTERD_MGMT_CLUSTER_UNLOCK, + gd_syncop_mgmt_unlock_cbk, + (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req); + if (ret) + synctask_barrier_wake(args); + return ret; } int32_t gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { - struct syncargs *args = NULL; - gd1_mgmt_stage_op_rsp rsp = {{0},}; - int ret = -1; - call_frame_t *frame = NULL; - + int ret = -1; + gd1_mgmt_stage_op_rsp rsp = {{0},}; + struct syncargs *args = NULL; + xlator_t *this = NULL; + dict_t *rsp_dict = NULL; + call_frame_t *frame = NULL; + + this = THIS; frame = myframe; - args = frame->local; + args = frame->local; frame->local = NULL; - /* initialize */ - args->op_ret = -1; - args->op_errno = EINVAL; - if (-1 == req->rpc_status) { + args->op_ret = -1; args->op_errno = ENOTCONN; goto out; } ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_stage_op_rsp); - if (ret < 0) { + if (ret < 0) goto out; - } if (rsp.dict.dict_len) { /* Unserialize the dictionary */ - args->dict = dict_new (); + rsp_dict = dict_new (); ret = dict_unserialize (rsp.dict.dict_val, rsp.dict.dict_len, - &args->dict); + &rsp_dict); if (ret < 0) { GF_FREE (rsp.dict.dict_val); goto out; } else { - args->dict->extra_stdfree = rsp.dict.dict_val; + rsp_dict->extra_stdfree = rsp.dict.dict_val; } } - args->op_ret = rsp.op_ret; - args->op_errno = rsp.op_errno; - + gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr); uuid_copy (args->uuid, rsp.uuid); - - args->errstr = gf_strdup (rsp.op_errstr); + if (rsp.op == GD_OP_REPLACE_BRICK) { + pthread_mutex_lock (&args->lock_dict); + { + ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict, + rsp_dict); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "%s", + "Failed to aggregate response from " + " node/brick"); + } + pthread_mutex_unlock (&args->lock_dict); + } out: - STACK_DESTROY (frame->root); - - __wake (args); + if (rsp_dict) + dict_unref (rsp_dict); + STACK_DESTROY (frame->root); + synctask_barrier_wake(args); return 0; } int -gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid, - int op, dict_t *dict_out, dict_t **dict_in, - char **errstr) +gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid, int op, + dict_t *dict_out, dict_t *op_ctx) { - struct syncargs args = {0, }; - gd1_mgmt_stage_op_req req = {{0},}; - int ret = 0; + gd1_mgmt_stage_op_req *req = NULL; + int ret = -1; - uuid_copy (req.uuid, my_uuid); - req.op = op; + req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_stage_req_t); + if (!req) + goto out; - args.op_ret = -1; - args.op_errno = ENOTCONN; + uuid_copy (req->uuid, my_uuid); + req->op = op; ret = dict_allocate_and_serialize (dict_out, - &req.buf.buf_val, &req.buf.buf_len); + &req->buf.buf_val, &req->buf.buf_len); if (ret) goto out; - GD_SYNCOP (rpc, (&args), gd_syncop_stage_op_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_STAGE_OP, - xdr_gd1_mgmt_stage_op_req); - - if (args.errstr && errstr) - *errstr = args.errstr; - else GF_FREE (args.errstr); - - if (args.dict && dict_in) - *dict_in = args.dict; - else if (args.dict) - dict_unref (args.dict); - - uuid_copy (recv_uuid, args.uuid); + ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog, + GLUSTERD_MGMT_STAGE_OP, + gd_syncop_stage_op_cbk, + (xdrproc_t) xdr_gd1_mgmt_stage_op_req); out: - errno = args.op_errno; - return args.op_ret; + gd_stage_op_req_free (req); + if (ret) + synctask_barrier_wake(args); + + return ret; } -/*TODO: Need to add syncop for brick ops*/ int32_t gd_syncop_brick_op_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { struct syncargs *args = NULL; gd1_mgmt_brick_op_rsp rsp = {0,}; - int ret = -1; + int ret = -1; call_frame_t *frame = NULL; frame = myframe; @@ -362,25 +447,13 @@ out: if (strcmp (rsp.op_errstr, "") != 0) free (rsp.op_errstr); free (rsp.output.output_val); - STACK_DESTROY (frame->root); + STACK_DESTROY (frame->root); __wake (args); return 0; } -static void -gd_brick_op_req_free (gd1_mgmt_brick_op_req *req) -{ - if (!req) - return; - - if (strcmp (req->name, "") != 0) - GF_FREE (req->name); - GF_FREE (req->input.input_val); - GF_FREE (req); -} - int gd_syncop_mgmt_brick_op (struct rpc_clnt *rpc, glusterd_pending_node_t *pnode, int op, dict_t *dict_out, dict_t *op_ctx, @@ -447,19 +520,18 @@ int32_t gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { - struct syncargs *args = NULL; - gd1_mgmt_commit_op_rsp rsp = {{0},}; - int ret = -1; - call_frame_t *frame = NULL; - + int ret = -1; + gd1_mgmt_commit_op_rsp rsp = {{0},}; + struct syncargs *args = NULL; + xlator_t *this = NULL; + dict_t *rsp_dict = NULL; + call_frame_t *frame = NULL; + + this = THIS; frame = myframe; - args = frame->local; + args = frame->local; frame->local = NULL; - /* initialize */ - args->op_ret = -1; - args->op_errno = EINVAL; - if (-1 == req->rpc_status) { args->op_errno = ENOTCONN; goto out; @@ -473,146 +545,80 @@ gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov, if (rsp.dict.dict_len) { /* Unserialize the dictionary */ - args->dict = dict_new (); + rsp_dict = dict_new (); ret = dict_unserialize (rsp.dict.dict_val, rsp.dict.dict_len, - &args->dict); + &rsp_dict); if (ret < 0) { GF_FREE (rsp.dict.dict_val); goto out; } else { - args->dict->extra_stdfree = rsp.dict.dict_val; + rsp_dict->extra_stdfree = rsp.dict.dict_val; } } - args->op_ret = rsp.op_ret; - args->op_errno = rsp.op_errno; - + gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr); uuid_copy (args->uuid, rsp.uuid); - - args->errstr = gf_strdup (rsp.op_errstr); - + pthread_mutex_lock (&args->lock_dict); + { + ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict, + rsp_dict); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "%s", + "Failed to aggregate response from " + " node/brick"); + } + pthread_mutex_unlock (&args->lock_dict); out: - STACK_DESTROY (frame->root); + if (rsp_dict) + dict_unref (rsp_dict); - __wake (args); + STACK_DESTROY (frame->root); + synctask_barrier_wake(args); return 0; } int -gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid, - int op, dict_t *dict_out, dict_t **dict_in, - char **errstr) +gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid, + int op, dict_t *dict_out, dict_t *op_ctx) { - struct syncargs args = {0, }; - gd1_mgmt_commit_op_req req = {{0},}; - int ret = 0; + gd1_mgmt_commit_op_req *req = NULL; + int ret = -1; - uuid_copy (req.uuid, my_uuid); - req.op = op; + req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_commit_req_t); + if (!req) + goto out; - args.op_ret = -1; - args.op_errno = ENOTCONN; + uuid_copy (req->uuid, my_uuid); + req->op = op; ret = dict_allocate_and_serialize (dict_out, - &req.buf.buf_val, &req.buf.buf_len); + &req->buf.buf_val, &req->buf.buf_len); if (ret) goto out; - GD_SYNCOP (rpc, (&args), gd_syncop_commit_op_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_COMMIT_OP, - xdr_gd1_mgmt_commit_op_req); - - if (args.errstr && errstr) - *errstr = args.errstr; - else GF_FREE (args.errstr); - - if (args.dict && dict_in) - *dict_in = args.dict; - else if (args.dict) - dict_unref (args.dict); - - uuid_copy (recv_uuid, args.uuid); - + ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog, + GLUSTERD_MGMT_COMMIT_OP , + gd_syncop_commit_op_cbk, + (xdrproc_t) xdr_gd1_mgmt_commit_op_req); out: - errno = args.op_errno; - return args.op_ret; - -} - - -static int -glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp) -{ - int ret = 0; - - switch (op) { - case GD_OP_REPLACE_BRICK: - ret = glusterd_rb_use_rsp_dict (aggr, rsp); - if (ret) - goto out; - break; - - case GD_OP_SYNC_VOLUME: - ret = glusterd_sync_use_rsp_dict (aggr, rsp); - if (ret) - goto out; - break; - - case GD_OP_PROFILE_VOLUME: - ret = glusterd_profile_volume_use_rsp_dict (aggr, rsp); - if (ret) - goto out; - break; - - case GD_OP_GSYNC_SET: - ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL); - if (ret) - goto out; - break; - - case GD_OP_STATUS_VOLUME: - ret = glusterd_volume_status_copy_to_op_ctx_dict (aggr, rsp); - if (ret) - goto out; - break; - - case GD_OP_REBALANCE: - case GD_OP_DEFRAG_BRICK_VOLUME: - ret = glusterd_volume_rebalance_use_rsp_dict (aggr, rsp); - if (ret) - goto out; - break; - - case GD_OP_HEAL_VOLUME: - ret = glusterd_volume_heal_use_rsp_dict (aggr, rsp); - if (ret) - goto out; - - break; - - case GD_OP_QUOTA: - case GD_OP_CLEARLOCKS_VOLUME: - ret = glusterd_use_rsp_dict (aggr, rsp); - if (ret) - goto out; - - break; + gd_commit_op_req_free (req); + if (ret) + synctask_barrier_wake(args); - default: - break; - } -out: return ret; } -void + +int gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers) { glusterd_peerinfo_t *peerinfo = NULL; + int npeers = 0; list_for_each_entry (peerinfo, peers, uuid_list) { if (!peerinfo->connected) @@ -621,29 +627,42 @@ gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers) continue; list_add_tail (&peerinfo->op_peers_list, xact_peers); + npeers++; } + return npeers; } int -gd_lock_op_phase (struct list_head *peers, char **op_errstr) +gd_lock_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, + char **op_errstr, int npeers) { - glusterd_peerinfo_t *peerinfo = NULL; - uuid_t peer_uuid = {0}; int ret = -1; + int peer_cnt = 0; + uuid_t peer_uuid = {0}; xlator_t *this = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + struct syncargs args = {0}; + + if (!npeers) { + ret = 0; + goto out; + } this = THIS; + synctask_barrier_init((&args)); + peer_cnt = 0; list_for_each_entry (peerinfo, peers, op_peers_list) { - ret = gd_syncop_mgmt_lock (peerinfo->rpc, - MY_UUID, peer_uuid); - if (ret) { - gf_asprintf (op_errstr, "Another transaction could be " - "in progress. Please try again after " - "sometime."); - gf_log (this->name, GF_LOG_ERROR, "Failed to acquire " - "lock on peer %s", peerinfo->hostname); - goto out; - } + gd_syncop_mgmt_lock (peerinfo->rpc, &args, MY_UUID, peer_uuid); + peer_cnt++; + } + synctask_barrier_wait((&args), peer_cnt); + ret = args.op_ret; + if (ret) { + gf_asprintf (op_errstr, "Another transaction could be " + "in progress. Please try again after " + "sometime."); + gf_log (this->name, GF_LOG_ERROR, "Failed to acquire lock"); + goto out; } ret = 0; @@ -653,14 +672,17 @@ out: int gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, - dict_t *req_dict, char **op_errstr) + dict_t *req_dict, char **op_errstr, int npeers) { - dict_t *rsp_dict = NULL; int ret = -1; + int peer_cnt = 0; + dict_t *rsp_dict = NULL; char *hostname = NULL; xlator_t *this = NULL; glusterd_peerinfo_t *peerinfo = NULL; uuid_t tmp_uuid = {0}; + char *errstr = NULL; + struct syncargs args = {0}; this = THIS; rsp_dict = dict_new (); @@ -684,30 +706,6 @@ gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, dict_unref (rsp_dict); rsp_dict = NULL; - list_for_each_entry (peerinfo, peers, op_peers_list) { - ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, - MY_UUID, tmp_uuid, - op, req_dict, &rsp_dict, - op_errstr); - if (ret) { - hostname = peerinfo->hostname; - goto stage_done; - } - - if (op == GD_OP_REPLACE_BRICK) { - ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, - rsp_dict); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s", - "Failed to aggregate response from " - " node/brick"); - goto out; - } - } - dict_unref (rsp_dict); - rsp_dict = NULL; - } - stage_done: if (ret) { gf_log (this->name, GF_LOG_ERROR, LOGSTR_STAGE_FAIL, @@ -717,6 +715,26 @@ stage_done: gf_asprintf (op_errstr, OPERRSTR_STAGE_FAIL, hostname); goto out; } + + if (!npeers) { + ret = 0; + goto out; + } + + gd_syncargs_init (&args, op_ctx); + synctask_barrier_init((&args)); + peer_cnt = 0; + list_for_each_entry (peerinfo, peers, op_peers_list) { + ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, &args, + MY_UUID, tmp_uuid, + op, req_dict, op_ctx); + peer_cnt++; + } + synctask_barrier_wait((&args), peer_cnt); + ret = args.op_ret; + if (dict_get_str (op_ctx, "errstr", &errstr) == 0) + *op_errstr = gf_strdup (errstr); + out: if (rsp_dict) dict_unref (rsp_dict); @@ -725,14 +743,17 @@ out: int gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, - dict_t *req_dict, char **op_errstr) + dict_t *req_dict, char **op_errstr, int npeers) { dict_t *rsp_dict = NULL; + int peer_cnt = -1; int ret = -1; char *hostname = NULL; glusterd_peerinfo_t *peerinfo = NULL; xlator_t *this = NULL; uuid_t tmp_uuid = {0}; + char *errstr = NULL; + struct syncargs args = {0}; this = THIS; rsp_dict = dict_new (); @@ -756,25 +777,6 @@ gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, dict_unref (rsp_dict); rsp_dict = NULL; - list_for_each_entry (peerinfo, peers, op_peers_list) { - ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, - MY_UUID, tmp_uuid, - op, req_dict, &rsp_dict, - op_errstr); - if (ret) { - hostname = peerinfo->hostname; - goto commit_done; - } - ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s", - "Failed to aggregate " - "response from node/brick"); - goto out; - } - dict_unref (rsp_dict); - rsp_dict = NULL; - } commit_done: if (ret) { gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL, @@ -788,6 +790,24 @@ commit_done: glusterd_op_modify_op_ctx (op, op_ctx); } + if (!npeers) { + ret = 0; + goto out; + } + gd_syncargs_init (&args, op_ctx); + synctask_barrier_init((&args)); + peer_cnt = 0; + list_for_each_entry (peerinfo, peers, op_peers_list) { + ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, &args, + MY_UUID, tmp_uuid, + op, req_dict, op_ctx); + peer_cnt++; + } + synctask_barrier_wait((&args), peer_cnt); + ret = args.op_ret; + if (dict_get_str (op_ctx, "errstr", &errstr) == 0) + *op_errstr = gf_strdup (errstr); + out: if (rsp_dict) dict_unref (rsp_dict); @@ -795,20 +815,40 @@ out: } int -gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int ret, - rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr) +gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int op_ret, + rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr, + int npeers) { - glusterd_peerinfo_t *peerinfo = NULL; - glusterd_peerinfo_t *tmp = NULL; - uuid_t tmp_uuid = {0}; + glusterd_peerinfo_t *peerinfo = NULL; + glusterd_peerinfo_t *tmp = NULL; + uuid_t tmp_uuid = {0}; + int peer_cnt = 0; + int ret = -1; + xlator_t *this = NULL; + struct syncargs args = {0}; + + if (!npeers) { + ret = 0; + goto out; + } + this = THIS; + synctask_barrier_init((&args)); + peer_cnt = 0; list_for_each_entry_safe (peerinfo, tmp, peers, op_peers_list) { - gd_syncop_mgmt_unlock (peerinfo->rpc, - MY_UUID, tmp_uuid); + gd_syncop_mgmt_unlock (peerinfo->rpc, &args, MY_UUID, tmp_uuid); list_del_init (&peerinfo->op_peers_list); + peer_cnt++; + } + synctask_barrier_wait((&args), peer_cnt); + ret = args.op_ret; + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "Failed to unlock " + "on some peer(s)"); } - glusterd_op_send_cli_response (op, ret, 0, req, op_ctx, op_errstr); +out: + glusterd_op_send_cli_response (op, op_ret, 0, req, op_ctx, op_errstr); glusterd_op_clear_op (op); glusterd_unlock (MY_UUID); @@ -816,6 +856,17 @@ gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int ret, } int +gd_get_brick_count (struct list_head *bricks) +{ + glusterd_pending_node_t *pending_node = NULL; + int npeers = 0; + list_for_each_entry (pending_node, bricks, list) { + npeers++; + } + return npeers; +} + +int gd_brick_op_phase (glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict, char **op_errstr) { glusterd_pending_node_t *pending_node = NULL; @@ -885,13 +936,13 @@ void gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) { int ret = -1; + int npeers = 0; dict_t *req_dict = NULL; glusterd_conf_t *conf = NULL; glusterd_op_t op = 0; int32_t tmp_op = 0; char *op_errstr = NULL; xlator_t *this = NULL; - gf_boolean_t local_locked = _gf_false; this = THIS; GF_ASSERT (this); @@ -913,16 +964,15 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) "Please try again after sometime."); goto out; } - local_locked = _gf_true; /* storing op globally to access in synctask code paths * This is still acceptable, as we are performing this under * the 'cluster' lock*/ glusterd_op_set_op (op); INIT_LIST_HEAD (&conf->xaction_peers); - gd_build_peers_list (&conf->peers, &conf->xaction_peers); + npeers = gd_build_peers_list (&conf->peers, &conf->xaction_peers); - ret = gd_lock_op_phase (&conf->xaction_peers, &op_errstr); + ret = gd_lock_op_phase (&conf->xaction_peers, op, op_ctx, &op_errstr, npeers); if (ret) goto out; @@ -936,7 +986,7 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) } ret = gd_stage_op_phase (&conf->xaction_peers, op, op_ctx, req_dict, - &op_errstr); + &op_errstr, npeers); if (ret) goto out; @@ -945,15 +995,14 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) goto out; ret = gd_commit_op_phase (&conf->xaction_peers, op, op_ctx, req_dict, - &op_errstr); + &op_errstr, npeers); if (ret) goto out; ret = 0; out: - if (local_locked) - (void) gd_unlock_op_phase (&conf->xaction_peers, op, ret, req, - op_ctx, op_errstr); + (void) gd_unlock_op_phase (&conf->xaction_peers, op, ret, req, + op_ctx, op_errstr, npeers); if (req_dict) dict_unref (req_dict); diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.h b/xlators/mgmt/glusterd/src/glusterd-syncop.h index 268f3bf2..658ed4e2 100644 --- a/xlators/mgmt/glusterd/src/glusterd-syncop.h +++ b/xlators/mgmt/glusterd/src/glusterd-syncop.h @@ -12,7 +12,6 @@ #include "syncop.h" - #define GD_SYNC_OPCODE_KEY "sync-mgmt-operation" /* gd_syncop_* */ @@ -36,15 +35,14 @@ int gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, xdrproc_t xdrproc); -int gd_syncop_mgmt_lock (struct rpc_clnt *rpc, uuid_t my_uuid, - uuid_t recv_uuid); -int gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, uuid_t my_uuid, - uuid_t recv_uuid); -int gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, uuid_t my_uuid, - uuid_t recv_uuid, int op, dict_t *dict_out, - dict_t **dict_in, char **errstr); -int gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, uuid_t my_uuid, - uuid_t recv_uuid, int op, dict_t *dict_out, - dict_t **dict_in, char **errstr); - +int gd_syncop_mgmt_lock (struct rpc_clnt *rpc, struct syncargs *arg, + uuid_t my_uuid, uuid_t recv_uuid); +int gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, struct syncargs *arg, + uuid_t my_uuid, uuid_t recv_uuid); +int gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, struct syncargs *arg, + uuid_t my_uuid, uuid_t recv_uuid, int op, + dict_t *dict_out, dict_t *op_ctx); +int gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, struct syncargs *arg, + uuid_t my_uuid, uuid_t recv_uuid, int op, + dict_t *dict_out, dict_t *op_ctx); #endif /* __RPC_SYNCOP_H */ |