diff options
author | Krishnan Parthasarathi <kparthas@redhat.com> | 2013-02-20 14:44:23 +0530 |
---|---|---|
committer | Anand Avati <avati@redhat.com> | 2013-03-06 22:06:30 -0800 |
commit | 8224bc6111b3bf5a710b6e5315b39b85904f3fe1 (patch) | |
tree | 02f2ca47808d8249ed3cdbf76b1cd46e5e2e2d4a /xlators/mgmt/glusterd | |
parent | e020c861579790cc0e1c183d9d5d518f43951712 (diff) |
glusterd: Increasing throughput of synctask based mgmt ops.
Change-Id: Ibd963f78707b157fc4c9729aa87206cfd5ecfe81
BUG: 913662
Signed-off-by: Krishnan Parthasarathi <kparthas@redhat.com>
Reviewed-on: http://review.gluster.org/4638
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Diffstat (limited to 'xlators/mgmt/glusterd')
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-mem-types.h | 3 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-syncop.c | 963 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-syncop.h | 22 |
3 files changed, 563 insertions, 425 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-mem-types.h b/xlators/mgmt/glusterd/src/glusterd-mem-types.h index a13236da1fe..98216e28ab0 100644 --- a/xlators/mgmt/glusterd/src/glusterd-mem-types.h +++ b/xlators/mgmt/glusterd/src/glusterd-mem-types.h @@ -65,7 +65,8 @@ typedef enum gf_gld_mem_types_ { gf_gld_mt_charptr = gf_common_mt_end + 49, gf_gld_mt_hooks_stub_t = gf_common_mt_end + 50, gf_gld_mt_hooks_priv_t = gf_common_mt_end + 51, - gf_gld_mt_end = gf_common_mt_end + 52, + gf_gld_mt_mop_commit_req_t = gf_common_mt_end + 52, + gf_gld_mt_end = gf_common_mt_end + 53, } gf_gld_mem_types_t; #endif diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c index 370f454df5c..1deb15d5e2f 100644 --- a/xlators/mgmt/glusterd/src/glusterd-syncop.c +++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c @@ -8,7 +8,6 @@ cases as published by the Free Software Foundation. */ /* rpc related syncops */ -#include "syncop.h" #include "rpc-clnt.h" #include "protocol-common.h" #include "xdr-generic.h" @@ -19,6 +18,57 @@ #include "glusterd-op-sm.h" #include "glusterd-utils.h" +static void +gd_collate_errors (struct syncargs *args, int op_ret, int op_errno, + char *op_errstr) +{ + if (args->op_ret) + return; + args->op_ret = op_ret; + args->op_errno = op_errno; + if (op_ret && op_errstr && strcmp (op_errstr, "")) + args->errstr = gf_strdup (op_errstr); +} + +static void +gd_syncargs_init (struct syncargs *args, dict_t *op_ctx) +{ + args->dict = op_ctx; + pthread_mutex_init (&args->lock_dict, NULL); +} + +static void +gd_stage_op_req_free (gd1_mgmt_stage_op_req *req) +{ + if (!req) + return; + + GF_FREE (req->buf.buf_val); + GF_FREE (req); +} + +static void +gd_commit_op_req_free (gd1_mgmt_commit_op_req *req) +{ + if (!req) + return; + + GF_FREE (req->buf.buf_val); + GF_FREE (req); +} + +static void +gd_brick_op_req_free (gd1_mgmt_brick_op_req *req) +{ + if (!req) + return; + + if (strcmp (req->name, "") != 0) + GF_FREE (req->name); + GF_FREE (req->input.input_val); + GF_FREE (req); +} + int gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, void *cookie, rpc_clnt_prog_t *prog, @@ -82,23 +132,83 @@ out: extern struct rpc_clnt_program gd_mgmt_prog; extern struct rpc_clnt_program gd_brick_prog; +static int +glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp) +{ + int ret = 0; + + switch (op) { + case GD_OP_REPLACE_BRICK: + ret = glusterd_rb_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_SYNC_VOLUME: + ret = glusterd_sync_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_PROFILE_VOLUME: + ret = glusterd_profile_volume_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_GSYNC_SET: + ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL); + if (ret) + goto out; + break; + + case GD_OP_STATUS_VOLUME: + ret = glusterd_volume_status_copy_to_op_ctx_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_REBALANCE: + case GD_OP_DEFRAG_BRICK_VOLUME: + ret = glusterd_volume_rebalance_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_HEAL_VOLUME: + ret = glusterd_volume_heal_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + + break; + + case GD_OP_CLEARLOCKS_VOLUME: + ret = glusterd_volume_clearlocks_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + + break; + + default: + break; + } +out: + return ret; +} + int32_t gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { - struct syncargs *args = NULL; - gd1_mgmt_cluster_lock_rsp rsp = {{0},}; - int ret = -1; - call_frame_t *frame = NULL; + int ret = -1; + struct syncargs *args = NULL; + gd1_mgmt_cluster_lock_rsp rsp = {{0},}; + call_frame_t *frame = NULL; - frame = myframe; - args = frame->local; + frame = myframe; + args = frame->local; frame->local = NULL; - /* initialize */ - args->op_ret = -1; - args->op_errno = EINVAL; - if (-1 == req->rpc_status) { args->op_errno = ENOTCONN; goto out; @@ -106,64 +216,49 @@ gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_cluster_lock_rsp); - if (ret < 0) { + if (ret < 0) goto out; - } - - args->op_ret = rsp.op_ret; - args->op_errno = rsp.op_errno; + gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL); uuid_copy (args->uuid, rsp.uuid); out: STACK_DESTROY (frame->root); - - __wake (args); - + synctask_barrier_wake(args); return 0; } int -gd_syncop_mgmt_lock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid) +gd_syncop_mgmt_lock (struct rpc_clnt *rpc, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid) { - struct syncargs args = {0, }; + int ret = -1; gd1_mgmt_cluster_lock_req req = {{0},}; uuid_copy (req.uuid, my_uuid); - - args.op_ret = -1; - args.op_errno = ENOTCONN; - - GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_lock_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_LOCK, - xdr_gd1_mgmt_cluster_lock_req); - - if (!args.op_ret) - uuid_copy (recv_uuid, args.uuid); - - errno = args.op_errno; - return args.op_ret; - + ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog, + GLUSTERD_MGMT_CLUSTER_LOCK, + gd_syncop_mgmt_lock_cbk, + (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req); + if (ret) + synctask_barrier_wake(args); + return ret; } int32_t gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { - struct syncargs *args = NULL; - gd1_mgmt_cluster_unlock_rsp rsp = {{0},}; - int ret = -1; - call_frame_t *frame = NULL; + int ret = -1; + struct syncargs *args = NULL; + gd1_mgmt_cluster_unlock_rsp rsp = {{0},}; + call_frame_t *frame = NULL; frame = myframe; - args = frame->local; + args = frame->local; frame->local = NULL; - /* initialize */ - args->op_ret = -1; - args->op_errno = EINVAL; - if (-1 == req->rpc_status) { args->op_errno = ENOTCONN; goto out; @@ -171,154 +266,143 @@ gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov, ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_cluster_unlock_rsp); - if (ret < 0) { + if (ret < 0) goto out; - } - - args->op_ret = rsp.op_ret; - args->op_errno = rsp.op_errno; + gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL); uuid_copy (args->uuid, rsp.uuid); out: STACK_DESTROY (frame->root); - - __wake (args); - + synctask_barrier_wake(args); return 0; } int -gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid) +gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid) { - struct syncargs args = {0, }; - gd1_mgmt_cluster_unlock_req req = {{0},}; + int ret = -1; + gd1_mgmt_cluster_unlock_req req = {{0},}; uuid_copy (req.uuid, my_uuid); - - args.op_ret = -1; - args.op_errno = ENOTCONN; - - GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_unlock_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_UNLOCK, - xdr_gd1_mgmt_cluster_unlock_req); - - if (!args.op_ret) - uuid_copy (recv_uuid, args.uuid); - - errno = args.op_errno; - return args.op_ret; - + ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog, + GLUSTERD_MGMT_CLUSTER_UNLOCK, + gd_syncop_mgmt_unlock_cbk, + (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req); + if (ret) + synctask_barrier_wake(args); + return ret; } int32_t gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { - struct syncargs *args = NULL; - gd1_mgmt_stage_op_rsp rsp = {{0},}; - int ret = -1; - call_frame_t *frame = NULL; - + int ret = -1; + gd1_mgmt_stage_op_rsp rsp = {{0},}; + struct syncargs *args = NULL; + xlator_t *this = NULL; + dict_t *rsp_dict = NULL; + call_frame_t *frame = NULL; + + this = THIS; frame = myframe; - args = frame->local; + args = frame->local; frame->local = NULL; - /* initialize */ - args->op_ret = -1; - args->op_errno = EINVAL; - if (-1 == req->rpc_status) { + args->op_ret = -1; args->op_errno = ENOTCONN; goto out; } ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_stage_op_rsp); - if (ret < 0) { + if (ret < 0) goto out; - } if (rsp.dict.dict_len) { /* Unserialize the dictionary */ - args->dict = dict_new (); + rsp_dict = dict_new (); ret = dict_unserialize (rsp.dict.dict_val, rsp.dict.dict_len, - &args->dict); + &rsp_dict); if (ret < 0) { GF_FREE (rsp.dict.dict_val); goto out; } else { - args->dict->extra_stdfree = rsp.dict.dict_val; + rsp_dict->extra_stdfree = rsp.dict.dict_val; } } - args->op_ret = rsp.op_ret; - args->op_errno = rsp.op_errno; - + gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr); uuid_copy (args->uuid, rsp.uuid); - - args->errstr = gf_strdup (rsp.op_errstr); + if (rsp.op == GD_OP_REPLACE_BRICK) { + pthread_mutex_lock (&args->lock_dict); + { + ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict, + rsp_dict); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "%s", + "Failed to aggregate response from " + " node/brick"); + } + pthread_mutex_unlock (&args->lock_dict); + } out: - STACK_DESTROY (frame->root); - - __wake (args); + if (rsp_dict) + dict_unref (rsp_dict); + STACK_DESTROY (frame->root); + synctask_barrier_wake(args); return 0; } int -gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid, - int op, dict_t *dict_out, dict_t **dict_in, - char **errstr) +gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid, int op, + dict_t *dict_out, dict_t *op_ctx) { - struct syncargs args = {0, }; - gd1_mgmt_stage_op_req req = {{0},}; - int ret = 0; + gd1_mgmt_stage_op_req *req = NULL; + int ret = -1; - uuid_copy (req.uuid, my_uuid); - req.op = op; + req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_stage_req_t); + if (!req) + goto out; - args.op_ret = -1; - args.op_errno = ENOTCONN; + uuid_copy (req->uuid, my_uuid); + req->op = op; ret = dict_allocate_and_serialize (dict_out, - &req.buf.buf_val, &req.buf.buf_len); + &req->buf.buf_val, &req->buf.buf_len); if (ret) goto out; - GD_SYNCOP (rpc, (&args), gd_syncop_stage_op_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_STAGE_OP, - xdr_gd1_mgmt_stage_op_req); - - if (args.errstr && errstr) - *errstr = args.errstr; - else GF_FREE (args.errstr); - - if (args.dict && dict_in) - *dict_in = args.dict; - else if (args.dict) - dict_unref (args.dict); - - uuid_copy (recv_uuid, args.uuid); + ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog, + GLUSTERD_MGMT_STAGE_OP, + gd_syncop_stage_op_cbk, + (xdrproc_t) xdr_gd1_mgmt_stage_op_req); out: - errno = args.op_errno; - return args.op_ret; + gd_stage_op_req_free (req); + if (ret) + synctask_barrier_wake(args); + + return ret; } -/*TODO: Need to add syncop for brick ops*/ int32_t gd_syncop_brick_op_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { struct syncargs *args = NULL; gd1_mgmt_brick_op_rsp rsp = {0,}; - int ret = -1; + int ret = -1; call_frame_t *frame = NULL; frame = myframe; @@ -362,25 +446,13 @@ out: if (strcmp (rsp.op_errstr, "") != 0) free (rsp.op_errstr); free (rsp.output.output_val); - STACK_DESTROY (frame->root); + STACK_DESTROY (frame->root); __wake (args); return 0; } -static void -gd_brick_op_req_free (gd1_mgmt_brick_op_req *req) -{ - if (!req) - return; - - if (strcmp (req->name, "") != 0) - GF_FREE (req->name); - GF_FREE (req->input.input_val); - GF_FREE (req); -} - int gd_syncop_mgmt_brick_op (struct rpc_clnt *rpc, glusterd_pending_node_t *pnode, int op, dict_t *dict_out, dict_t *op_ctx, @@ -447,19 +519,18 @@ int32_t gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { - struct syncargs *args = NULL; - gd1_mgmt_commit_op_rsp rsp = {{0},}; - int ret = -1; - call_frame_t *frame = NULL; - + int ret = -1; + gd1_mgmt_commit_op_rsp rsp = {{0},}; + struct syncargs *args = NULL; + xlator_t *this = NULL; + dict_t *rsp_dict = NULL; + call_frame_t *frame = NULL; + + this = THIS; frame = myframe; - args = frame->local; + args = frame->local; frame->local = NULL; - /* initialize */ - args->op_ret = -1; - args->op_errno = EINVAL; - if (-1 == req->rpc_status) { args->op_errno = ENOTCONN; goto out; @@ -473,295 +544,361 @@ gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov, if (rsp.dict.dict_len) { /* Unserialize the dictionary */ - args->dict = dict_new (); + rsp_dict = dict_new (); ret = dict_unserialize (rsp.dict.dict_val, rsp.dict.dict_len, - &args->dict); + &rsp_dict); if (ret < 0) { GF_FREE (rsp.dict.dict_val); goto out; } else { - args->dict->extra_stdfree = rsp.dict.dict_val; + rsp_dict->extra_stdfree = rsp.dict.dict_val; } } - args->op_ret = rsp.op_ret; - args->op_errno = rsp.op_errno; - + gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr); uuid_copy (args->uuid, rsp.uuid); - - args->errstr = gf_strdup (rsp.op_errstr); - + pthread_mutex_lock (&args->lock_dict); + { + ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict, + rsp_dict); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "%s", + "Failed to aggregate response from " + " node/brick"); + } + pthread_mutex_unlock (&args->lock_dict); out: - STACK_DESTROY (frame->root); + if (rsp_dict) + dict_unref (rsp_dict); - __wake (args); + STACK_DESTROY (frame->root); + synctask_barrier_wake(args); return 0; } int -gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid, - int op, dict_t *dict_out, dict_t **dict_in, - char **errstr) +gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid, + int op, dict_t *dict_out, dict_t *op_ctx) { - struct syncargs args = {0, }; - gd1_mgmt_commit_op_req req = {{0},}; - int ret = 0; + gd1_mgmt_commit_op_req *req = NULL; + int ret = -1; - uuid_copy (req.uuid, my_uuid); - req.op = op; + req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_commit_req_t); + if (!req) + goto out; - args.op_ret = -1; - args.op_errno = ENOTCONN; + uuid_copy (req->uuid, my_uuid); + req->op = op; ret = dict_allocate_and_serialize (dict_out, - &req.buf.buf_val, &req.buf.buf_len); + &req->buf.buf_val, &req->buf.buf_len); if (ret) goto out; - GD_SYNCOP (rpc, (&args), gd_syncop_commit_op_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_COMMIT_OP, - xdr_gd1_mgmt_commit_op_req); - - if (args.errstr && errstr) - *errstr = args.errstr; - else GF_FREE (args.errstr); - - if (args.dict && dict_in) - *dict_in = args.dict; - else if (args.dict) - dict_unref (args.dict); - - uuid_copy (recv_uuid, args.uuid); - + ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog, + GLUSTERD_MGMT_COMMIT_OP , + gd_syncop_commit_op_cbk, + (xdrproc_t) xdr_gd1_mgmt_commit_op_req); out: - errno = args.op_errno; - return args.op_ret; + gd_commit_op_req_free (req); + if (ret) + synctask_barrier_wake(args); + return ret; } - -static int -glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp, - char *op_errstr) +int +gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers) { - int ret = 0; + glusterd_peerinfo_t *peerinfo = NULL; + int npeers = 0; - switch (op) { - case GD_OP_REPLACE_BRICK: - ret = glusterd_rb_use_rsp_dict (aggr, rsp); - if (ret) - goto out; - break; - - case GD_OP_SYNC_VOLUME: - ret = glusterd_sync_use_rsp_dict (aggr, rsp); - if (ret) - goto out; - break; + list_for_each_entry (peerinfo, peers, uuid_list) { + if (!peerinfo->connected) + continue; + if (peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED) + continue; + list_add_tail (&peerinfo->op_peers_list, xact_peers); + npeers++; + } + return npeers; +} - case GD_OP_PROFILE_VOLUME: - ret = glusterd_profile_volume_use_rsp_dict (aggr, rsp); - if (ret) - goto out; - break; +int +gd_lock_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, + char **op_errstr, int npeers) +{ + int ret = -1; + int peer_cnt = 0; + uuid_t peer_uuid = {0}; + xlator_t *this = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + struct syncargs args = {0}; + + if (!npeers) { + ret = 0; + goto out; + } - case GD_OP_GSYNC_SET: - ret = glusterd_gsync_use_rsp_dict (aggr, rsp, op_errstr); - if (ret) - goto out; - break; + this = THIS; + synctask_barrier_init((&args)); + peer_cnt = 0; + list_for_each_entry (peerinfo, peers, op_peers_list) { + gd_syncop_mgmt_lock (peerinfo->rpc, &args, MY_UUID, peer_uuid); + peer_cnt++; + } + synctask_barrier_wait((&args), peer_cnt); + ret = args.op_ret; + if (ret) { + gf_asprintf (op_errstr, "Another transaction could be " + "in progress. Please try again after " + "sometime."); + gf_log (this->name, GF_LOG_ERROR, "Failed to acquire lock"); + goto out; + } - case GD_OP_STATUS_VOLUME: - ret = glusterd_volume_status_copy_to_op_ctx_dict (aggr, rsp); - if (ret) - goto out; - break; + ret = 0; +out: + return ret; +} - case GD_OP_REBALANCE: - case GD_OP_DEFRAG_BRICK_VOLUME: - ret = glusterd_volume_rebalance_use_rsp_dict (aggr, rsp); - if (ret) - goto out; - break; +int +gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, + dict_t *req_dict, char **op_errstr, int npeers) +{ + int ret = -1; + int peer_cnt = 0; + dict_t *rsp_dict = NULL; + char *hostname = NULL; + xlator_t *this = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + uuid_t tmp_uuid = {0}; + char *errstr = NULL; + struct syncargs args = {0}; - case GD_OP_HEAL_VOLUME: - ret = glusterd_volume_heal_use_rsp_dict (aggr, rsp); - if (ret) - goto out; + this = THIS; + ret = -1; + rsp_dict = dict_new (); + if (!rsp_dict) + goto out; - break; + ret = glusterd_op_stage_validate (op, req_dict, op_errstr, rsp_dict); + if (ret) { + hostname = "localhost"; + goto stage_done; + } - case GD_OP_CLEARLOCKS_VOLUME: - ret = glusterd_volume_clearlocks_use_rsp_dict (aggr, rsp); - if (ret) + if ((op == GD_OP_REPLACE_BRICK) || + (op == GD_OP_CLEARLOCKS_VOLUME)) { + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "%s", + (*op_errstr)? *op_errstr: "Failed to aggregate " + "response from node/brick"); goto out; + } + } + dict_unref (rsp_dict); + rsp_dict = NULL; - break; +stage_done: + if (ret) { + gf_log (this->name, GF_LOG_ERROR, LOGSTR_STAGE_FAIL, + gd_op_list[op], hostname, (*op_errstr) ? ":" : " ", + (*op_errstr) ? *op_errstr : " "); + if (op_errstr == NULL) + gf_asprintf (op_errstr, OPERRSTR_STAGE_FAIL, hostname); + goto out; + } - default: - break; + if (!npeers) { + ret = 0; + goto out; + } + + gd_syncargs_init (&args, op_ctx); + synctask_barrier_init((&args)); + peer_cnt = 0; + list_for_each_entry (peerinfo, peers, op_peers_list) { + ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, &args, + MY_UUID, tmp_uuid, + op, req_dict, op_ctx); + peer_cnt++; } + synctask_barrier_wait((&args), peer_cnt); + ret = args.op_ret; + if (dict_get_str (op_ctx, "errstr", &errstr) == 0) + *op_errstr = gf_strdup (errstr); + out: + if (rsp_dict) + dict_unref (rsp_dict); return ret; } -void -gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) +int +gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, + dict_t *req_dict, char **op_errstr, int npeers) { - int ret = -1; - dict_t *req_dict = NULL; dict_t *rsp_dict = NULL; + int peer_cnt = -1; + int ret = -1; + char *hostname = NULL; glusterd_peerinfo_t *peerinfo = NULL; - glusterd_peerinfo_t *tmp = NULL; - glusterd_conf_t *conf = NULL; - uuid_t tmp_uuid = {0,}; - glusterd_op_t op = 0; - int32_t tmp_op = 0; - gf_boolean_t local_locked = _gf_false; - char *op_errstr = NULL; - glusterd_pending_node_t *pending_node = NULL; - rpc_clnt_t *rpc = NULL; - int brick_count = 0; - struct list_head selected = {0}; xlator_t *this = NULL; - char *hostname = NULL; + uuid_t tmp_uuid = {0}; + char *errstr = NULL; + struct syncargs args = {0}; this = THIS; - GF_ASSERT (this); - conf = this->private; - GF_ASSERT (conf); - - ret = dict_get_int32 (op_ctx, GD_SYNC_OPCODE_KEY, &tmp_op); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Failed to get volume " - "operation"); + rsp_dict = dict_new (); + if (!rsp_dict) { + ret = -1; goto out; } - op = tmp_op; - - /* Lock everything */ - ret = glusterd_lock (MY_UUID); + ret = glusterd_op_commit_perform (op, req_dict, op_errstr, rsp_dict); if (ret) { - gf_log (this->name, GF_LOG_ERROR, "Unable to acquire lock"); - gf_asprintf (&op_errstr, "Another transaction is in progress. " - "Please try again after sometime."); - goto out; + hostname = "localhost"; + goto commit_done; } - /* successful lock in local node */ - local_locked = _gf_true; - - /* storing op globally to access in synctask code paths - * This is still acceptable, as we are performing this under - * the 'cluster' lock*/ - - glusterd_op_set_op (op); - INIT_LIST_HEAD (&conf->xaction_peers); - list_for_each_entry (peerinfo, &conf->peers, uuid_list) { - if (!peerinfo->connected) - continue; - if (peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED) - continue; - - ret = gd_syncop_mgmt_lock (peerinfo->rpc, - MY_UUID, tmp_uuid); - if (ret) { - gf_asprintf (&op_errstr, "Another transaction could be " - "in progress. Please try again after " - "sometime."); - gf_log (this->name, GF_LOG_ERROR, "Failed to acquire " - "lock on peer %s", peerinfo->hostname); - goto out; - } else { - list_add_tail (&peerinfo->op_peers_list, - &conf->xaction_peers); - } + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "%s", + (*op_errstr)? *op_errstr: "Failed to aggregate response " + "from node/brick"); + goto out; } + dict_unref (rsp_dict); + rsp_dict = NULL; - ret = glusterd_op_build_payload (&req_dict, &op_errstr, op_ctx); +commit_done: if (ret) { - gf_log (this->name, GF_LOG_ERROR, LOGSTR_BUILD_PAYLOAD, - gd_op_list[op]); - if (op_errstr == NULL) - gf_asprintf (&op_errstr, OPERRSTR_BUILD_PAYLOAD); + gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL, + gd_op_list[op], hostname, (*op_errstr) ? ":" : " ", + (*op_errstr) ? *op_errstr : " "); + if (*op_errstr == NULL) + gf_asprintf (op_errstr, OPERRSTR_COMMIT_FAIL, + hostname); + goto out; + } else { + glusterd_op_modify_op_ctx (op, op_ctx); + } + + if (!npeers) { + ret = 0; goto out; } + gd_syncargs_init (&args, op_ctx); + synctask_barrier_init((&args)); + peer_cnt = 0; + list_for_each_entry (peerinfo, peers, op_peers_list) { + ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, &args, + MY_UUID, tmp_uuid, + op, req_dict, op_ctx); + peer_cnt++; + } + synctask_barrier_wait((&args), peer_cnt); + ret = args.op_ret; + if (dict_get_str (op_ctx, "errstr", &errstr) == 0) + *op_errstr = gf_strdup (errstr); - /* stage op */ - ret = -1; - rsp_dict = dict_new (); - if (!rsp_dict) +out: + if (rsp_dict) + dict_unref (rsp_dict); + return ret; +} + +int +gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int op_ret, + rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr, + int npeers) +{ + glusterd_peerinfo_t *peerinfo = NULL; + glusterd_peerinfo_t *tmp = NULL; + uuid_t tmp_uuid = {0}; + int peer_cnt = 0; + int ret = -1; + xlator_t *this = NULL; + struct syncargs args = {0}; + + if (!npeers) { + ret = 0; goto out; + } - ret = glusterd_op_stage_validate (op, req_dict, &op_errstr, rsp_dict); + this = THIS; + synctask_barrier_init((&args)); + peer_cnt = 0; + list_for_each_entry_safe (peerinfo, tmp, peers, op_peers_list) { + gd_syncop_mgmt_unlock (peerinfo->rpc, &args, MY_UUID, tmp_uuid); + list_del_init (&peerinfo->op_peers_list); + peer_cnt++; + } + synctask_barrier_wait((&args), peer_cnt); + ret = args.op_ret; if (ret) { - hostname = "localhost"; - goto stage_done; + gf_log (this->name, GF_LOG_ERROR, "Failed to unlock " + "on some peer(s)"); } - if ((op == GD_OP_REPLACE_BRICK) || - (op == GD_OP_CLEARLOCKS_VOLUME)) { - ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, - op_errstr); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s", - (op_errstr)? op_errstr: "Failed to aggregate " - "response from node/brick"); - goto out; - } - } - dict_unref (rsp_dict); - rsp_dict = NULL; +out: + glusterd_op_send_cli_response (op, op_ret, 0, req, op_ctx, op_errstr); + glusterd_op_clear_op (op); + glusterd_unlock (MY_UUID); - list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) { - ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, - MY_UUID, tmp_uuid, - op, req_dict, &rsp_dict, - &op_errstr); - if (ret) { - hostname = peerinfo->hostname; - goto stage_done; - } + return 0; +} - if (op == GD_OP_REPLACE_BRICK) { - ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, - rsp_dict, - op_errstr); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s", - (op_errstr)? op_errstr: "Failed to " - "aggregate response from node/brick"); - goto out; - } - } - dict_unref (rsp_dict); - rsp_dict = NULL; +int +gd_get_brick_count (struct list_head *bricks) +{ + glusterd_pending_node_t *pending_node = NULL; + int npeers = 0; + list_for_each_entry (pending_node, bricks, list) { + npeers++; } + return npeers; +} -stage_done: - if (ret) { - gf_log (this->name, GF_LOG_ERROR, LOGSTR_STAGE_FAIL, - gd_op_list[op], hostname, (op_errstr) ? ":" : " ", - (op_errstr) ? op_errstr : " "); - if (op_errstr == NULL) - gf_asprintf (&op_errstr, OPERRSTR_STAGE_FAIL, hostname); +int +gd_brick_op_phase (glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict, char **op_errstr) +{ + glusterd_pending_node_t *pending_node = NULL; + struct list_head selected = {0,}; + xlator_t *this = NULL; + int brick_count = 0; + int ret = -1; + rpc_clnt_t *rpc = NULL; + dict_t *rsp_dict = NULL; + + this = THIS; + rsp_dict = dict_new (); + if (!rsp_dict) { + ret = -1; goto out; } - /*brick op */ INIT_LIST_HEAD (&selected); - ret = glusterd_op_bricks_select (op, req_dict, &op_errstr, &selected); + ret = glusterd_op_bricks_select (op, req_dict, op_errstr, &selected); if (ret) { gf_log (this->name, GF_LOG_ERROR, "%s", - (op_errstr)? op_errstr: "Brick op failed. Check " + (*op_errstr)? *op_errstr: "Brick op failed. Check " "glusterd log file for more details."); goto out; } + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); + if (ret) + goto out; + dict_unref (rsp_dict); + rsp_dict = NULL; + brick_count = 0; list_for_each_entry (pending_node, &selected, list) { rpc = glusterd_pending_node_get_rpc (pending_node); @@ -779,95 +916,97 @@ stage_done: goto out; } ret = gd_syncop_mgmt_brick_op (rpc, pending_node, op, req_dict, - op_ctx, &op_errstr); + op_ctx, op_errstr); if (ret) goto out; brick_count++; } + ret = 0; +out: + if (rsp_dict) + dict_unref (rsp_dict); gf_log (this->name, GF_LOG_DEBUG, "Sent op req to %d bricks", brick_count); + return ret; +} - /* commit op */ - rsp_dict = dict_new (); - if (!rsp_dict) { - ret = -1; - goto out; - } +void +gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) +{ + int ret = -1; + int npeers = 0; + dict_t *req_dict = NULL; + glusterd_conf_t *conf = NULL; + glusterd_op_t op = 0; + int32_t tmp_op = 0; + char *op_errstr = NULL; + xlator_t *this = NULL; - ret = glusterd_op_commit_perform (op, req_dict, &op_errstr, rsp_dict); + this = THIS; + GF_ASSERT (this); + conf = this->private; + GF_ASSERT (conf); + + ret = dict_get_int32 (op_ctx, GD_SYNC_OPCODE_KEY, &tmp_op); if (ret) { - hostname = "localhost"; - goto commit_done; + gf_log (this->name, GF_LOG_ERROR, "Failed to get volume " + "operation"); + goto out; } - ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, op_errstr); + + op = tmp_op; + ret = glusterd_lock (MY_UUID); if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s", - (op_errstr)? op_errstr: "Failed to aggregate response " - "from node/brick"); + gf_log (this->name, GF_LOG_ERROR, "Unable to acquire lock"); + gf_asprintf (&op_errstr, "Another transaction is in progress. " + "Please try again after sometime."); goto out; } - dict_unref (rsp_dict); - rsp_dict = NULL; - list_for_each_entry (peerinfo, &conf->xaction_peers, op_peers_list) { - ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, - MY_UUID, tmp_uuid, - op, req_dict, &rsp_dict, - &op_errstr); - if (ret) { - hostname = peerinfo->hostname; - goto commit_done; - } - ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict, - op_errstr); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, "%s", - (op_errstr)? op_errstr: "Failed to aggregate " - "response from node/brick"); - goto out; - } - dict_unref (rsp_dict); - rsp_dict = NULL; - } -commit_done: + /* storing op globally to access in synctask code paths + * This is still acceptable, as we are performing this under + * the 'cluster' lock*/ + glusterd_op_set_op (op); + INIT_LIST_HEAD (&conf->xaction_peers); + npeers = gd_build_peers_list (&conf->peers, &conf->xaction_peers); + + ret = gd_lock_op_phase (&conf->xaction_peers, op, op_ctx, &op_errstr, npeers); + if (ret) + goto out; + + ret = glusterd_op_build_payload (&req_dict, &op_errstr, op_ctx); if (ret) { - gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL, - gd_op_list[op], hostname, (op_errstr) ? ":" : " ", - (op_errstr) ? op_errstr : " "); + gf_log (this->name, GF_LOG_ERROR, LOGSTR_BUILD_PAYLOAD, + gd_op_list[op]); if (op_errstr == NULL) - gf_asprintf (&op_errstr, OPERRSTR_COMMIT_FAIL, - hostname); + gf_asprintf (&op_errstr, OPERRSTR_BUILD_PAYLOAD); goto out; - } else { - glusterd_op_modify_op_ctx (op, op_ctx); - } + } - ret = 0; -out: - if (local_locked) { - list_for_each_entry_safe (peerinfo, tmp, &conf->xaction_peers, - op_peers_list) { - gd_syncop_mgmt_unlock (peerinfo->rpc, - MY_UUID, tmp_uuid); - list_del_init (&peerinfo->op_peers_list); - } + ret = gd_stage_op_phase (&conf->xaction_peers, op, op_ctx, req_dict, + &op_errstr, npeers); + if (ret) + goto out; - /* Local node should be the one to be locked first, - unlocked last to prevent races */ - glusterd_op_clear_op (op); - glusterd_unlock (MY_UUID); - } + ret = gd_brick_op_phase (op, op_ctx, req_dict, &op_errstr); + if (ret) + goto out; + + ret = gd_commit_op_phase (&conf->xaction_peers, op, op_ctx, req_dict, + &op_errstr, npeers); + if (ret) + goto out; - glusterd_op_send_cli_response (op, ret, 0, req, op_ctx, op_errstr); + ret = 0; +out: + (void) gd_unlock_op_phase (&conf->xaction_peers, op, ret, req, + op_ctx, op_errstr, npeers); if (req_dict) dict_unref (req_dict); - if (ret && rsp_dict) - dict_unref (rsp_dict); - if (op_errstr) GF_FREE (op_errstr); diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.h b/xlators/mgmt/glusterd/src/glusterd-syncop.h index 268f3bf2357..658ed4e2a28 100644 --- a/xlators/mgmt/glusterd/src/glusterd-syncop.h +++ b/xlators/mgmt/glusterd/src/glusterd-syncop.h @@ -12,7 +12,6 @@ #include "syncop.h" - #define GD_SYNC_OPCODE_KEY "sync-mgmt-operation" /* gd_syncop_* */ @@ -36,15 +35,14 @@ int gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, xdrproc_t xdrproc); -int gd_syncop_mgmt_lock (struct rpc_clnt *rpc, uuid_t my_uuid, - uuid_t recv_uuid); -int gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, uuid_t my_uuid, - uuid_t recv_uuid); -int gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, uuid_t my_uuid, - uuid_t recv_uuid, int op, dict_t *dict_out, - dict_t **dict_in, char **errstr); -int gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, uuid_t my_uuid, - uuid_t recv_uuid, int op, dict_t *dict_out, - dict_t **dict_in, char **errstr); - +int gd_syncop_mgmt_lock (struct rpc_clnt *rpc, struct syncargs *arg, + uuid_t my_uuid, uuid_t recv_uuid); +int gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, struct syncargs *arg, + uuid_t my_uuid, uuid_t recv_uuid); +int gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, struct syncargs *arg, + uuid_t my_uuid, uuid_t recv_uuid, int op, + dict_t *dict_out, dict_t *op_ctx); +int gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, struct syncargs *arg, + uuid_t my_uuid, uuid_t recv_uuid, int op, + dict_t *dict_out, dict_t *op_ctx); #endif /* __RPC_SYNCOP_H */ |