diff options
Diffstat (limited to 'xlators/mgmt/glusterd/src/glusterd-syncop.c')
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-syncop.c | 1595 |
1 files changed, 1322 insertions, 273 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c index 7cdc1c3c2..438df8266 100644 --- a/xlators/mgmt/glusterd/src/glusterd-syncop.c +++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c @@ -1,24 +1,13 @@ /* - Copyright (c) 2012 Red Hat, Inc. <http://www.redhat.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ + Copyright (c) 2012-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ /* rpc related syncops */ -#include "syncop.h" #include "rpc-clnt.h" #include "protocol-common.h" #include "xdr-generic.h" @@ -28,11 +17,194 @@ #include "glusterd.h" #include "glusterd-op-sm.h" #include "glusterd-utils.h" +#include "glusterd-locks.h" + +extern glusterd_op_info_t opinfo; + +void +gd_synctask_barrier_wait (struct syncargs *args, int count) +{ + glusterd_conf_t *conf = THIS->private; + + synclock_unlock (&conf->big_lock); + synctask_barrier_wait (args, count); + synclock_lock (&conf->big_lock); + + syncbarrier_destroy (&args->barrier); +} + +static void +gd_mgmt_v3_collate_errors (struct syncargs *args, int op_ret, int op_errno, + char *op_errstr, int op_code, + glusterd_peerinfo_t *peerinfo, u_char *uuid) +{ + char err_str[PATH_MAX] = "Please check log file for details."; + char op_err[PATH_MAX] = ""; + int len = -1; + char *peer_str = NULL; + + if (op_ret) { + args->op_ret = op_ret; + args->op_errno = op_errno; + + if (peerinfo) + peer_str = peerinfo->hostname; + else + peer_str = uuid_utoa (uuid); + + if (op_errstr && strcmp (op_errstr, "")) { + len = snprintf (err_str, sizeof(err_str) - 1, + "Error: %s", op_errstr); + err_str[len] = '\0'; + } + + switch (op_code){ + case GLUSTERD_MGMT_V3_LOCK: + { + len = snprintf (op_err, sizeof(op_err) - 1, + "Locking failed " + "on %s. %s", peer_str, err_str); + break; + } + case GLUSTERD_MGMT_V3_UNLOCK: + { + len = snprintf (op_err, sizeof(op_err) - 1, + "Unlocking failed " + "on %s. %s", peer_str, err_str); + break; + } + } + op_err[len] = '\0'; + + if (args->errstr) { + len = snprintf (err_str, sizeof(err_str) - 1, + "%s\n%s", args->errstr, + op_err); + GF_FREE (args->errstr); + args->errstr = NULL; + } else + len = snprintf (err_str, sizeof(err_str) - 1, + "%s", op_err); + err_str[len] = '\0'; + + gf_log ("", GF_LOG_ERROR, "%s", op_err); + args->errstr = gf_strdup (err_str); + } + + return; +} + +static void +gd_collate_errors (struct syncargs *args, int op_ret, int op_errno, + char *op_errstr, int op_code, + glusterd_peerinfo_t *peerinfo, u_char *uuid) +{ + char err_str[PATH_MAX] = "Please check log file for details."; + char op_err[PATH_MAX] = ""; + int len = -1; + char *peer_str = NULL; + + if (op_ret) { + args->op_ret = op_ret; + args->op_errno = op_errno; + + if (peerinfo) + peer_str = peerinfo->hostname; + else + peer_str = uuid_utoa (uuid); + + if (op_errstr && strcmp (op_errstr, "")) { + len = snprintf (err_str, sizeof(err_str) - 1, + "Error: %s", op_errstr); + err_str[len] = '\0'; + } + + switch (op_code){ + case GLUSTERD_MGMT_CLUSTER_UNLOCK : + { + len = snprintf (op_err, sizeof(op_err) - 1, + "Unlocking failed on %s. %s", + peer_str, err_str); + break; + } + case GLUSTERD_MGMT_STAGE_OP : + { + len = snprintf (op_err, sizeof(op_err) - 1, + "Staging failed on %s. %s", + peer_str, err_str); + break; + } + case GLUSTERD_MGMT_COMMIT_OP : + { + len = snprintf (op_err, sizeof(op_err) - 1, + "Commit failed on %s. %s", + peer_str, err_str); + break; + } + } + op_err[len] = '\0'; + + if (args->errstr) { + len = snprintf (err_str, sizeof(err_str) - 1, + "%s\n%s", args->errstr, + op_err); + GF_FREE (args->errstr); + args->errstr = NULL; + } else + len = snprintf (err_str, sizeof(err_str) - 1, + "%s", op_err); + err_str[len] = '\0'; + + gf_log ("", GF_LOG_ERROR, "%s", op_err); + args->errstr = gf_strdup (err_str); + } + + return; +} + +void +gd_syncargs_init (struct syncargs *args, dict_t *op_ctx) +{ + args->dict = op_ctx; + pthread_mutex_init (&args->lock_dict, NULL); +} + +static void +gd_stage_op_req_free (gd1_mgmt_stage_op_req *req) +{ + if (!req) + return; + + GF_FREE (req->buf.buf_val); + GF_FREE (req); +} + +static void +gd_commit_op_req_free (gd1_mgmt_commit_op_req *req) +{ + if (!req) + return; + + GF_FREE (req->buf.buf_val); + GF_FREE (req); +} + +static void +gd_brick_op_req_free (gd1_mgmt_brick_op_req *req) +{ + if (!req) + return; + + if (strcmp (req->name, "") != 0) + GF_FREE (req->name); + GF_FREE (req->input.input_val); + GF_FREE (req); +} int -gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, - void *cookie, rpc_clnt_prog_t *prog, - int procnum, fop_cbk_fn_t cbkfn, xdrproc_t xdrproc) +gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, void *local, + void *cookie, rpc_clnt_prog_t *prog, int procnum, + fop_cbk_fn_t cbkfn, xdrproc_t xdrproc) { int ret = -1; struct iobuf *iobuf = NULL; @@ -72,7 +244,8 @@ gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, iov.iov_len = ret; count = 1; - frame->local = cookie; + frame->local = local; + frame->cookie = cookie; /* Send the msg */ ret = rpc_clnt_submit (rpc, prog, procnum, cbkfn, @@ -90,145 +263,536 @@ out: /* Defined in glusterd-rpc-ops.c */ extern struct rpc_clnt_program gd_mgmt_prog; +extern struct rpc_clnt_program gd_brick_prog; +extern struct rpc_clnt_program gd_mgmt_v3_prog; -int32_t -gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, - int count, void *myframe) +int +glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp) { - struct syncargs *args = NULL; - gd1_mgmt_cluster_lock_rsp rsp = {{0},}; - int ret = -1; - call_frame_t *frame = NULL; + int ret = 0; - frame = myframe; - args = frame->local; - frame->local = NULL; + switch (op) { + case GD_OP_REPLACE_BRICK: + ret = glusterd_rb_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; - /* initialize */ - args->op_ret = -1; - args->op_errno = EINVAL; + case GD_OP_SYNC_VOLUME: + ret = glusterd_sync_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_PROFILE_VOLUME: + ret = glusterd_profile_volume_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_GSYNC_CREATE: + break; + + case GD_OP_GSYNC_SET: + ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL); + if (ret) + goto out; + break; + + case GD_OP_STATUS_VOLUME: + ret = glusterd_volume_status_copy_to_op_ctx_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_REBALANCE: + case GD_OP_DEFRAG_BRICK_VOLUME: + ret = glusterd_volume_rebalance_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_HEAL_VOLUME: + ret = glusterd_volume_heal_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + + break; + + case GD_OP_QUOTA: + case GD_OP_CLEARLOCKS_VOLUME: + ret = glusterd_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + + break; + + case GD_OP_SYS_EXEC: + ret = glusterd_sys_exec_output_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + case GD_OP_SNAP: + ret = glusterd_snap_use_rsp_dict (aggr, rsp); + if (ret) + goto out; + break; + + default: + break; + } +out: + return ret; +} + +int32_t +_gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + int ret = -1; + struct syncargs *args = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + gd1_mgmt_cluster_lock_rsp rsp = {{0},}; + call_frame_t *frame = NULL; + int op_ret = -1; + int op_errno = -1; + + frame = myframe; + args = frame->local; + peerinfo = frame->cookie; + frame->local = NULL; + frame->cookie = NULL; if (-1 == req->rpc_status) { - args->op_errno = ENOTCONN; + op_errno = ENOTCONN; goto out; } ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_cluster_lock_rsp); - if (ret < 0) { + if (ret < 0) + goto out; + + uuid_copy (args->uuid, rsp.uuid); + + /* Set peer as locked, so we unlock only the locked peers */ + if (rsp.op_ret == 0) + peerinfo->locked = _gf_true; + op_ret = rsp.op_ret; + op_errno = rsp.op_errno; +out: + gd_collate_errors (args, op_ret, op_errno, NULL, + GLUSTERD_MGMT_CLUSTER_LOCK, peerinfo, rsp.uuid); + STACK_DESTROY (frame->root); + synctask_barrier_wake(args); + return 0; +} + +int32_t +gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, int count, + void *myframe) +{ + return glusterd_big_locked_cbk (req, iov, count, myframe, + _gd_syncop_mgmt_lock_cbk); +} + +int +gd_syncop_mgmt_lock (glusterd_peerinfo_t *peerinfo, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid) +{ + int ret = -1; + gd1_mgmt_cluster_lock_req req = {{0},}; + glusterd_conf_t *conf = THIS->private; + + uuid_copy (req.uuid, my_uuid); + synclock_unlock (&conf->big_lock); + ret = gd_syncop_submit_request (peerinfo->rpc, &req, args, peerinfo, + &gd_mgmt_prog, + GLUSTERD_MGMT_CLUSTER_LOCK, + gd_syncop_mgmt_lock_cbk, + (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req); + synclock_lock (&conf->big_lock); + return ret; +} + +int32_t +gd_syncop_mgmt_v3_lock_cbk_fn (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + int ret = -1; + struct syncargs *args = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + gd1_mgmt_v3_lock_rsp rsp = {{0},}; + call_frame_t *frame = NULL; + int op_ret = -1; + int op_errno = -1; + + GF_ASSERT(req); + GF_ASSERT(iov); + GF_ASSERT(myframe); + + frame = myframe; + args = frame->local; + peerinfo = frame->cookie; + frame->local = NULL; + frame->cookie = NULL; + + if (-1 == req->rpc_status) { + op_errno = ENOTCONN; goto out; } - args->op_ret = rsp.op_ret; - args->op_errno = rsp.op_errno; + ret = xdr_to_generic (*iov, &rsp, + (xdrproc_t)xdr_gd1_mgmt_v3_lock_rsp); + if (ret < 0) + goto out; uuid_copy (args->uuid, rsp.uuid); + op_ret = rsp.op_ret; + op_errno = rsp.op_errno; out: + gd_mgmt_v3_collate_errors (args, op_ret, op_errno, NULL, + GLUSTERD_MGMT_V3_LOCK, + peerinfo, rsp.uuid); STACK_DESTROY (frame->root); + synctask_barrier_wake(args); + return 0; +} - __wake (args); +int32_t +gd_syncop_mgmt_v3_lock_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + return glusterd_big_locked_cbk (req, iov, count, myframe, + gd_syncop_mgmt_v3_lock_cbk_fn); +} + +int +gd_syncop_mgmt_v3_lock (glusterd_op_t op, dict_t *op_ctx, + glusterd_peerinfo_t *peerinfo, + struct syncargs *args, uuid_t my_uuid, + uuid_t recv_uuid, uuid_t txn_id) +{ + int ret = -1; + gd1_mgmt_v3_lock_req req = {{0},}; + glusterd_conf_t *conf = THIS->private; + + GF_ASSERT(op_ctx); + GF_ASSERT(peerinfo); + GF_ASSERT(args); + ret = dict_allocate_and_serialize (op_ctx, + &req.dict.dict_val, + &req.dict.dict_len); + if (ret) + goto out; + + uuid_copy (req.uuid, my_uuid); + uuid_copy (req.txn_id, txn_id); + req.op = op; + synclock_unlock (&conf->big_lock); + ret = gd_syncop_submit_request (peerinfo->rpc, &req, args, peerinfo, + &gd_mgmt_v3_prog, + GLUSTERD_MGMT_V3_LOCK, + gd_syncop_mgmt_v3_lock_cbk, + (xdrproc_t) xdr_gd1_mgmt_v3_lock_req); + synclock_lock (&conf->big_lock); +out: + gf_log ("", GF_LOG_DEBUG, "Returning %d", ret); + return ret; +} + +int32_t +gd_syncop_mgmt_v3_unlock_cbk_fn (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + int ret = -1; + struct syncargs *args = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + gd1_mgmt_v3_unlock_rsp rsp = {{0},}; + call_frame_t *frame = NULL; + int op_ret = -1; + int op_errno = -1; + + GF_ASSERT(req); + GF_ASSERT(iov); + GF_ASSERT(myframe); + + frame = myframe; + args = frame->local; + peerinfo = frame->cookie; + frame->local = NULL; + frame->cookie = NULL; + + if (-1 == req->rpc_status) { + op_errno = ENOTCONN; + goto out; + } + + ret = xdr_to_generic (*iov, &rsp, + (xdrproc_t)xdr_gd1_mgmt_v3_unlock_rsp); + if (ret < 0) + goto out; + + uuid_copy (args->uuid, rsp.uuid); + + /* Set peer as locked, so we unlock only the locked peers */ + if (rsp.op_ret == 0) + peerinfo->locked = _gf_true; + op_ret = rsp.op_ret; + op_errno = rsp.op_errno; +out: + gd_mgmt_v3_collate_errors (args, op_ret, op_errno, NULL, + GLUSTERD_MGMT_V3_UNLOCK, + peerinfo, rsp.uuid); + STACK_DESTROY (frame->root); + synctask_barrier_wake(args); return 0; } +int32_t +gd_syncop_mgmt_v3_unlock_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + return glusterd_big_locked_cbk (req, iov, count, myframe, + gd_syncop_mgmt_v3_unlock_cbk_fn); +} int -gd_syncop_mgmt_lock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid) +gd_syncop_mgmt_v3_unlock (dict_t *op_ctx, glusterd_peerinfo_t *peerinfo, + struct syncargs *args, uuid_t my_uuid, + uuid_t recv_uuid, uuid_t txn_id) { - struct syncargs args = {0, }; - gd1_mgmt_cluster_lock_req req = {{0},}; + int ret = -1; + gd1_mgmt_v3_unlock_req req = {{0},}; + glusterd_conf_t *conf = THIS->private; + + GF_ASSERT(op_ctx); + GF_ASSERT(peerinfo); + GF_ASSERT(args); + + ret = dict_allocate_and_serialize (op_ctx, + &req.dict.dict_val, + &req.dict.dict_len); + if (ret) + goto out; uuid_copy (req.uuid, my_uuid); + uuid_copy (req.txn_id, txn_id); + synclock_unlock (&conf->big_lock); + ret = gd_syncop_submit_request (peerinfo->rpc, &req, args, peerinfo, + &gd_mgmt_v3_prog, + GLUSTERD_MGMT_V3_UNLOCK, + gd_syncop_mgmt_v3_unlock_cbk, + (xdrproc_t) xdr_gd1_mgmt_v3_unlock_req); + synclock_lock (&conf->big_lock); +out: + gf_log ("", GF_LOG_DEBUG, "Returning %d", ret); + return ret; +} - args.op_ret = -1; - args.op_errno = ENOTCONN; +int32_t +_gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + int ret = -1; + struct syncargs *args = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + gd1_mgmt_cluster_unlock_rsp rsp = {{0},}; + call_frame_t *frame = NULL; + int op_ret = -1; + int op_errno = -1; - GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_lock_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_LOCK, - xdr_gd1_mgmt_cluster_lock_req); + frame = myframe; + args = frame->local; + peerinfo = frame->cookie; + frame->local = NULL; - if (!args.op_ret) - uuid_copy (recv_uuid, args.uuid); + if (-1 == req->rpc_status) { + op_errno = ENOTCONN; + goto out; + } - errno = args.op_errno; - return args.op_ret; + ret = xdr_to_generic (*iov, &rsp, + (xdrproc_t)xdr_gd1_mgmt_cluster_unlock_rsp); + if (ret < 0) + goto out; + + uuid_copy (args->uuid, rsp.uuid); + peerinfo->locked = _gf_false; + op_ret = rsp.op_ret; + op_errno = rsp.op_errno; +out: + gd_collate_errors (args, op_ret, op_errno, NULL, + GLUSTERD_MGMT_CLUSTER_UNLOCK, peerinfo, rsp.uuid); + STACK_DESTROY (frame->root); + synctask_barrier_wake(args); + return 0; } int32_t gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { - struct syncargs *args = NULL; - gd1_mgmt_cluster_unlock_rsp rsp = {{0},}; - int ret = -1; - call_frame_t *frame = NULL; + return glusterd_big_locked_cbk (req, iov, count, myframe, + _gd_syncop_mgmt_unlock_cbk); +} + + +int +gd_syncop_mgmt_unlock (glusterd_peerinfo_t *peerinfo, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid) +{ + int ret = -1; + gd1_mgmt_cluster_unlock_req req = {{0},}; + glusterd_conf_t *conf = THIS->private; + + uuid_copy (req.uuid, my_uuid); + synclock_unlock (&conf->big_lock); + ret = gd_syncop_submit_request (peerinfo->rpc, &req, args, peerinfo, + &gd_mgmt_prog, + GLUSTERD_MGMT_CLUSTER_UNLOCK, + gd_syncop_mgmt_unlock_cbk, + (xdrproc_t) xdr_gd1_mgmt_cluster_lock_req); + synclock_lock (&conf->big_lock); + return ret; +} +int32_t +_gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + int ret = -1; + gd1_mgmt_stage_op_rsp rsp = {{0},}; + struct syncargs *args = NULL; + xlator_t *this = NULL; + dict_t *rsp_dict = NULL; + call_frame_t *frame = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + int op_ret = -1; + int op_errno = -1; + + this = THIS; frame = myframe; - args = frame->local; + args = frame->local; frame->local = NULL; - /* initialize */ - args->op_ret = -1; - args->op_errno = EINVAL; - if (-1 == req->rpc_status) { - args->op_errno = ENOTCONN; + op_errno = ENOTCONN; goto out; } ret = xdr_to_generic (*iov, &rsp, - (xdrproc_t)xdr_gd1_mgmt_cluster_unlock_rsp); - if (ret < 0) { + (xdrproc_t)xdr_gd1_mgmt_stage_op_rsp); + if (ret < 0) goto out; + + if (rsp.dict.dict_len) { + /* Unserialize the dictionary */ + rsp_dict = dict_new (); + + ret = dict_unserialize (rsp.dict.dict_val, + rsp.dict.dict_len, + &rsp_dict); + if (ret < 0) { + GF_FREE (rsp.dict.dict_val); + goto out; + } else { + rsp_dict->extra_stdfree = rsp.dict.dict_val; + } } - args->op_ret = rsp.op_ret; - args->op_errno = rsp.op_errno; + ret = glusterd_friend_find (rsp.uuid, NULL, &peerinfo); + if (ret) { + gf_log (this->name, GF_LOG_CRITICAL, "Staging response " + "for 'Volume %s' received from unknown " + "peer: %s", gd_op_list[rsp.op], + uuid_utoa (rsp.uuid)); + goto out; + } uuid_copy (args->uuid, rsp.uuid); + if (rsp.op == GD_OP_REPLACE_BRICK) { + pthread_mutex_lock (&args->lock_dict); + { + ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict, + rsp_dict); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "%s", + "Failed to aggregate response from " + " node/brick"); + } + pthread_mutex_unlock (&args->lock_dict); + } + + op_ret = rsp.op_ret; + op_errno = rsp.op_errno; out: - STACK_DESTROY (frame->root); + gd_collate_errors (args, op_ret, op_errno, rsp.op_errstr, + GLUSTERD_MGMT_STAGE_OP, peerinfo, rsp.uuid); - __wake (args); + if (rsp_dict) + dict_unref (rsp_dict); + STACK_DESTROY (frame->root); + synctask_barrier_wake(args); return 0; } +int32_t +gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + return glusterd_big_locked_cbk (req, iov, count, myframe, + _gd_syncop_stage_op_cbk); +} + int -gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid) +gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid, int op, + dict_t *dict_out, dict_t *op_ctx) { - struct syncargs args = {0, }; - gd1_mgmt_cluster_unlock_req req = {{0},}; - - uuid_copy (req.uuid, my_uuid); + gd1_mgmt_stage_op_req *req = NULL; + glusterd_conf_t *conf = THIS->private; + int ret = -1; - args.op_ret = -1; - args.op_errno = ENOTCONN; + req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_stage_req_t); + if (!req) + goto out; - GD_SYNCOP (rpc, (&args), gd_syncop_mgmt_unlock_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_UNLOCK, - xdr_gd1_mgmt_cluster_unlock_req); + uuid_copy (req->uuid, my_uuid); + req->op = op; - if (!args.op_ret) - uuid_copy (recv_uuid, args.uuid); + ret = dict_allocate_and_serialize (dict_out, + &req->buf.buf_val, &req->buf.buf_len); + if (ret) + goto out; - errno = args.op_errno; - return args.op_ret; + synclock_unlock (&conf->big_lock); + ret = gd_syncop_submit_request (rpc, req, args, NULL, &gd_mgmt_prog, + GLUSTERD_MGMT_STAGE_OP, + gd_syncop_stage_op_cbk, + (xdrproc_t) xdr_gd1_mgmt_stage_op_req); + synclock_lock (&conf->big_lock); +out: + gd_stage_op_req_free (req); + return ret; } int32_t -gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov, +_gd_syncop_brick_op_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { - struct syncargs *args = NULL; - gd1_mgmt_stage_op_rsp rsp = {{0},}; + struct syncargs *args = NULL; + gd1_mgmt_brick_op_rsp rsp = {0,}; int ret = -1; - call_frame_t *frame = NULL; + call_frame_t *frame = NULL; frame = myframe; args = frame->local; @@ -244,102 +808,132 @@ gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov, } ret = xdr_to_generic (*iov, &rsp, - (xdrproc_t)xdr_gd1_mgmt_stage_op_rsp); - if (ret < 0) { + (xdrproc_t)xdr_gd1_mgmt_brick_op_rsp); + if (ret < 0) goto out; - } - if (rsp.dict.dict_len) { - /* Unserialize the dictionary */ + if (rsp.output.output_len) { args->dict = dict_new (); + if (!args->dict) { + ret = -1; + args->op_errno = ENOMEM; + goto out; + } - ret = dict_unserialize (rsp.dict.dict_val, - rsp.dict.dict_len, + ret = dict_unserialize (rsp.output.output_val, + rsp.output.output_len, &args->dict); - if (ret < 0) { - GF_FREE (rsp.dict.dict_val); + if (ret < 0) goto out; - } else { - args->dict->extra_stdfree = rsp.dict.dict_val; - } } args->op_ret = rsp.op_ret; args->op_errno = rsp.op_errno; - - uuid_copy (args->uuid, rsp.uuid); - args->errstr = gf_strdup (rsp.op_errstr); out: - STACK_DESTROY (frame->root); + if ((rsp.op_errstr) && (strcmp (rsp.op_errstr, "") != 0)) + free (rsp.op_errstr); + free (rsp.output.output_val); + STACK_DESTROY (frame->root); __wake (args); return 0; } +int32_t +gd_syncop_brick_op_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + return glusterd_big_locked_cbk (req, iov, count, myframe, + _gd_syncop_brick_op_cbk); +} int -gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid, - int op, dict_t *dict_out, dict_t **dict_in, +gd_syncop_mgmt_brick_op (struct rpc_clnt *rpc, glusterd_pending_node_t *pnode, + int op, dict_t *dict_out, dict_t *op_ctx, char **errstr) { - struct syncargs args = {0, }; - gd1_mgmt_stage_op_req req = {{0},}; - int ret = 0; - - uuid_copy (req.uuid, my_uuid); - req.op = op; + struct syncargs args = {0, }; + gd1_mgmt_brick_op_req *req = NULL; + int ret = 0; + xlator_t *this = NULL; + this = THIS; args.op_ret = -1; args.op_errno = ENOTCONN; - ret = dict_allocate_and_serialize (dict_out, &req.buf.buf_val, - (size_t *)&req.buf.buf_len); + if ((pnode->type == GD_NODE_NFS) || + ((pnode->type == GD_NODE_SHD) && + (op == GD_OP_STATUS_VOLUME))) { + ret = glusterd_node_op_build_payload + (op, &req, dict_out); + + } else { + ret = glusterd_brick_op_build_payload + (op, pnode->node, &req, dict_out); + + } + if (ret) goto out; - GD_SYNCOP (rpc, (&args), gd_syncop_stage_op_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_STAGE_OP, - xdr_gd1_mgmt_stage_op_req); + GD_SYNCOP (rpc, (&args), NULL, gd_syncop_brick_op_cbk, req, + &gd_brick_prog, req->op, xdr_gd1_mgmt_brick_op_req); - if (args.errstr && errstr) - *errstr = args.errstr; - else if (args.errstr) - GF_FREE (args.errstr); + if (args.errstr) { + if ((strlen(args.errstr) > 0) && errstr) + *errstr = args.errstr; + else + GF_FREE (args.errstr); + } - if (args.dict && dict_in) - *dict_in = args.dict; - else if (args.dict) - dict_unref (args.dict); + if (GD_OP_STATUS_VOLUME == op) { + ret = dict_set_int32 (args.dict, "index", pnode->index); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Error setting index on brick status" + " rsp dict"); + args.op_ret = -1; + goto out; + } + } + if (args.op_ret == 0) + glusterd_handle_node_rsp (dict_out, pnode->node, op, + args.dict, op_ctx, errstr, + pnode->type); - uuid_copy (recv_uuid, args.uuid); out: errno = args.op_errno; + if (args.dict) + dict_unref (args.dict); + gd_brick_op_req_free (req); return args.op_ret; } int32_t -gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov, - int count, void *myframe) +_gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) { - struct syncargs *args = NULL; - gd1_mgmt_commit_op_rsp rsp = {{0},}; - int ret = -1; - call_frame_t *frame = NULL; - + int ret = -1; + gd1_mgmt_commit_op_rsp rsp = {{0},}; + struct syncargs *args = NULL; + xlator_t *this = NULL; + dict_t *rsp_dict = NULL; + call_frame_t *frame = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + int op_ret = -1; + int op_errno = -1; + + this = THIS; frame = myframe; - args = frame->local; + args = frame->local; frame->local = NULL; - /* initialize */ - args->op_ret = -1; - args->op_errno = EINVAL; - if (-1 == req->rpc_status) { - args->op_errno = ENOTCONN; + op_errno = ENOTCONN; goto out; } @@ -351,227 +945,684 @@ gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov, if (rsp.dict.dict_len) { /* Unserialize the dictionary */ - args->dict = dict_new (); + rsp_dict = dict_new (); ret = dict_unserialize (rsp.dict.dict_val, rsp.dict.dict_len, - &args->dict); + &rsp_dict); if (ret < 0) { GF_FREE (rsp.dict.dict_val); goto out; } else { - args->dict->extra_stdfree = rsp.dict.dict_val; + rsp_dict->extra_stdfree = rsp.dict.dict_val; } } - args->op_ret = rsp.op_ret; - args->op_errno = rsp.op_errno; + ret = glusterd_friend_find (rsp.uuid, NULL, &peerinfo); + if (ret) { + gf_log (this->name, GF_LOG_CRITICAL, "Commit response " + "for 'Volume %s' received from unknown " + "peer: %s", gd_op_list[rsp.op], + uuid_utoa (rsp.uuid)); + goto out; + } uuid_copy (args->uuid, rsp.uuid); + pthread_mutex_lock (&args->lock_dict); + { + ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict, + rsp_dict); + if (ret) + gf_log (this->name, GF_LOG_ERROR, "%s", + "Failed to aggregate response from " + " node/brick"); + } + pthread_mutex_unlock (&args->lock_dict); - args->errstr = gf_strdup (rsp.op_errstr); + op_ret = rsp.op_ret; + op_errno = rsp.op_errno; out: - STACK_DESTROY (frame->root); + gd_collate_errors (args, op_ret, op_errno, rsp.op_errstr, + GLUSTERD_MGMT_COMMIT_OP, peerinfo, rsp.uuid); + if (rsp_dict) + dict_unref (rsp_dict); - __wake (args); + STACK_DESTROY (frame->root); + synctask_barrier_wake(args); return 0; } +int32_t +gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + return glusterd_big_locked_cbk (req, iov, count, myframe, + _gd_syncop_commit_op_cbk); +} + int -gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid, - int op, dict_t *dict_out, dict_t **dict_in, - char **errstr) +gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, struct syncargs *args, + uuid_t my_uuid, uuid_t recv_uuid, + int op, dict_t *dict_out, dict_t *op_ctx) { - struct syncargs args = {0, }; - gd1_mgmt_commit_op_req req = {{0},}; - int ret = 0; + glusterd_conf_t *conf = THIS->private; + gd1_mgmt_commit_op_req *req = NULL; + int ret = -1; - uuid_copy (req.uuid, my_uuid); - req.op = op; + req = GF_CALLOC (1, sizeof (*req), gf_gld_mt_mop_commit_req_t); + if (!req) + goto out; - args.op_ret = -1; - args.op_errno = ENOTCONN; + uuid_copy (req->uuid, my_uuid); + req->op = op; - ret = dict_allocate_and_serialize (dict_out, &req.buf.buf_val, - (size_t *)&req.buf.buf_len); + ret = dict_allocate_and_serialize (dict_out, + &req->buf.buf_val, &req->buf.buf_len); if (ret) goto out; - GD_SYNCOP (rpc, (&args), gd_syncop_commit_op_cbk, - &req, &gd_mgmt_prog, GLUSTERD_MGMT_COMMIT_OP, - xdr_gd1_mgmt_commit_op_req); - - if (args.errstr && errstr) - *errstr = args.errstr; - else if (args.errstr) - GF_FREE (args.errstr); - - if (args.dict && dict_in) - *dict_in = args.dict; - else if (args.dict) - dict_unref (args.dict); - - uuid_copy (recv_uuid, args.uuid); - + synclock_unlock (&conf->big_lock); + ret = gd_syncop_submit_request (rpc, req, args, NULL, &gd_mgmt_prog, + GLUSTERD_MGMT_COMMIT_OP , + gd_syncop_commit_op_cbk, + (xdrproc_t) xdr_gd1_mgmt_commit_op_req); + synclock_lock (&conf->big_lock); out: - errno = args.op_errno; - return args.op_ret; - + gd_commit_op_req_free (req); + return ret; } int -gd_sync_task_begin (void *data) +gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers, + glusterd_op_t op) { - int ret = -1; - dict_t *dict = NULL; - dict_t *rsp_dict = NULL; glusterd_peerinfo_t *peerinfo = NULL; - glusterd_conf_t *conf = NULL; - uuid_t tmp_uuid = {0,}; - char *errstr = NULL; - glusterd_op_t op = 0; - int32_t tmp_op = 0; - gf_boolean_t local_locked = _gf_false; + int npeers = 0; - conf = THIS->private; + list_for_each_entry (peerinfo, peers, uuid_list) { + if (!peerinfo->connected) + continue; + if (op != GD_OP_SYNC_VOLUME && + peerinfo->state.state != GD_FRIEND_STATE_BEFRIENDED) + continue; - dict = data; + list_add_tail (&peerinfo->op_peers_list, xact_peers); + npeers++; + } + return npeers; +} - ret = dict_get_int32 (dict, GD_SYNC_OPCODE_KEY, &tmp_op); - if (ret) +int +gd_lock_op_phase (glusterd_conf_t *conf, glusterd_op_t op, dict_t *op_ctx, + char **op_errstr, int npeers, uuid_t txn_id) +{ + int ret = -1; + int peer_cnt = 0; + uuid_t peer_uuid = {0}; + xlator_t *this = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + struct syncargs args = {0}; + struct list_head *peers = NULL; + + peers = &conf->xaction_peers; + + if (!npeers) { + ret = 0; goto out; + } - op = tmp_op; + this = THIS; + synctask_barrier_init((&args)); + peer_cnt = 0; + list_for_each_entry (peerinfo, peers, op_peers_list) { + if (conf->op_version < 3) { + /* Reset lock status */ + peerinfo->locked = _gf_false; + gd_syncop_mgmt_lock (peerinfo, &args, + MY_UUID, peer_uuid); + } else + gd_syncop_mgmt_v3_lock (op, op_ctx, peerinfo, &args, + MY_UUID, peer_uuid, txn_id); + peer_cnt++; + } + gd_synctask_barrier_wait((&args), peer_cnt); + + if (args.op_ret) { + if (args.errstr) + *op_errstr = gf_strdup (args.errstr); + else { + ret = gf_asprintf (op_errstr, "Another transaction could be " + "in progress. Please try again after " + "sometime."); + if (ret == -1) + *op_errstr = NULL; - ret = -1; + gf_log (this->name, GF_LOG_ERROR, + "Failed to acquire lock"); + + } + } + + ret = args.op_ret; + + gf_log (this->name, GF_LOG_DEBUG, "Sent lock op req for 'Volume %s' " + "to %d peers. Returning %d", gd_op_list[op], peer_cnt, ret); +out: + return ret; +} + +int +gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, + dict_t *req_dict, char **op_errstr, int npeers) +{ + int ret = -1; + int peer_cnt = 0; + dict_t *rsp_dict = NULL; + char *hostname = NULL; + xlator_t *this = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + uuid_t tmp_uuid = {0}; + char *errstr = NULL; + struct syncargs args = {0}; + + this = THIS; rsp_dict = dict_new (); if (!rsp_dict) goto out; - /* Lock everything */ - ret = glusterd_lock (conf->uuid); - if (ret) - goto out; - /* successful lock in local node */ - local_locked = _gf_true; + ret = glusterd_op_stage_validate (op, req_dict, op_errstr, rsp_dict); + if (ret) { + hostname = "localhost"; + goto stage_done; + } - list_for_each_entry (peerinfo, &conf->peers, uuid_list) { - ret = gd_syncop_mgmt_lock (peerinfo->rpc, - conf->uuid, tmp_uuid); - if (ret) + if ((op == GD_OP_REPLACE_BRICK)) { + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "%s", + "Failed to aggregate response from node/brick"); goto out; - /* TODO: Only on lock successful nodes it should unlock */ + } } + dict_unref (rsp_dict); + rsp_dict = NULL; - /* stage op */ - ret = glusterd_op_stage_validate (op, dict, &errstr, rsp_dict); - if (ret) +stage_done: + if (ret) { + gf_log (this->name, GF_LOG_ERROR, LOGSTR_STAGE_FAIL, + gd_op_list[op], hostname, (*op_errstr) ? ":" : " ", + (*op_errstr) ? *op_errstr : " "); + if (*op_errstr == NULL) + gf_asprintf (op_errstr, OPERRSTR_STAGE_FAIL, hostname); goto out; + } - list_for_each_entry (peerinfo, &conf->peers, uuid_list) { - ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, - conf->uuid, tmp_uuid, - op, dict, &rsp_dict, &errstr); - if (ret) { - if (errstr) - ret = dict_set_dynstr (dict, "error", errstr); + if (!npeers) { + ret = 0; + goto out; + } - ret = -1; - goto out; - } + gd_syncargs_init (&args, op_ctx); + synctask_barrier_init((&args)); + peer_cnt = 0; + list_for_each_entry (peerinfo, peers, op_peers_list) { + ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, &args, + MY_UUID, tmp_uuid, + op, req_dict, op_ctx); + peer_cnt++; } + gd_synctask_barrier_wait((&args), peer_cnt); - /* commit op */ - ret = glusterd_op_commit_perform (op, dict, &errstr, rsp_dict); - if (ret) + if (args.errstr) + *op_errstr = gf_strdup (args.errstr); + else if (dict_get_str (op_ctx, "errstr", &errstr) == 0) + *op_errstr = gf_strdup (errstr); + + ret = args.op_ret; + + gf_log (this->name, GF_LOG_DEBUG, "Sent stage op req for 'Volume %s' " + "to %d peers", gd_op_list[op], peer_cnt); +out: + if (rsp_dict) + dict_unref (rsp_dict); + return ret; +} + +int +gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx, + dict_t *req_dict, char **op_errstr, int npeers) +{ + dict_t *rsp_dict = NULL; + int peer_cnt = -1; + int ret = -1; + char *hostname = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + xlator_t *this = NULL; + uuid_t tmp_uuid = {0}; + char *errstr = NULL; + struct syncargs args = {0}; + + this = THIS; + rsp_dict = dict_new (); + if (!rsp_dict) { + ret = -1; goto out; + } - list_for_each_entry (peerinfo, &conf->peers, uuid_list) { - ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, - conf->uuid, tmp_uuid, - op, dict, &rsp_dict, &errstr); + ret = glusterd_op_commit_perform (op, req_dict, op_errstr, rsp_dict); + if (ret) { + hostname = "localhost"; + goto commit_done; + } + if (op != GD_OP_SYNC_VOLUME) { + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); if (ret) { - if (errstr) - ret = dict_set_dynstr (dict, "error", errstr); - - ret = -1; + gf_log (this->name, GF_LOG_ERROR, "%s", + "Failed to aggregate response " + "from node/brick"); goto out; } } + dict_unref (rsp_dict); + rsp_dict = NULL; - ret = 0; -out: - if (local_locked) { - /* unlock everything as we help successful local lock */ - list_for_each_entry (peerinfo, &conf->peers, uuid_list) { - /* No need to check the error code, as it is possible - that if 'lock' on few nodes failed, it would come - here, and unlock would fail on nodes where lock - never was sent */ - gd_syncop_mgmt_unlock (peerinfo->rpc, - conf->uuid, tmp_uuid); - } +commit_done: + if (ret) { + gf_log (this->name, GF_LOG_ERROR, LOGSTR_COMMIT_FAIL, + gd_op_list[op], hostname, (*op_errstr) ? ":" : " ", + (*op_errstr) ? *op_errstr : " "); + if (*op_errstr == NULL) + gf_asprintf (op_errstr, OPERRSTR_COMMIT_FAIL, + hostname); + goto out; + } + + if (!npeers) { + ret = 0; + goto out; + } - /* Local node should be the one to be locked first, - unlocked last to prevent races */ - glusterd_unlock (conf->uuid); + gd_syncargs_init (&args, op_ctx); + synctask_barrier_init((&args)); + peer_cnt = 0; + list_for_each_entry (peerinfo, peers, op_peers_list) { + ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, &args, + MY_UUID, tmp_uuid, + op, req_dict, op_ctx); + peer_cnt++; } + gd_synctask_barrier_wait((&args), peer_cnt); + ret = args.op_ret; + if (args.errstr) + *op_errstr = gf_strdup (args.errstr); + else if (dict_get_str (op_ctx, "errstr", &errstr) == 0) + *op_errstr = gf_strdup (errstr); + + gf_log (this->name, GF_LOG_DEBUG, "Sent commit op req for 'Volume %s' " + "to %d peers", gd_op_list[op], peer_cnt); +out: + if (!ret) + glusterd_op_modify_op_ctx (op, op_ctx); if (rsp_dict) dict_unref (rsp_dict); + GF_FREE (args.errstr); + args.errstr = NULL; + return ret; } int -gd_sync_task_completion (int op_ret, call_frame_t *sync_frame, void *data) +gd_unlock_op_phase (glusterd_conf_t *conf, glusterd_op_t op, int op_ret, + rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr, + int npeers, char *volname, gf_boolean_t is_acquired, + uuid_t txn_id) { - int ret = 0; - dict_t *dict = NULL; - rpcsvc_request_t *req = NULL; - int32_t tmp_op = 0; - glusterd_op_t op = 0; + glusterd_peerinfo_t *peerinfo = NULL; + glusterd_peerinfo_t *tmp = NULL; + uuid_t tmp_uuid = {0}; + int peer_cnt = 0; + int ret = -1; + xlator_t *this = NULL; + struct syncargs args = {0}; + struct list_head *peers = NULL; + + peers = &conf->xaction_peers; + + if (!npeers) { + ret = 0; + goto out; + } - dict = data; + /* If the lock has not been held during this + * transaction, do not send unlock requests */ + if (!is_acquired) + goto out; - req = sync_frame->local; - sync_frame->local = NULL; + this = THIS; + synctask_barrier_init((&args)); + peer_cnt = 0; + list_for_each_entry_safe (peerinfo, tmp, peers, op_peers_list) { + if (conf->op_version < 3) { + /* Only unlock peers that were locked */ + if (peerinfo->locked) + gd_syncop_mgmt_unlock (peerinfo, &args, + MY_UUID, tmp_uuid); + } else + gd_syncop_mgmt_v3_unlock (op_ctx, peerinfo, + &args, MY_UUID, + tmp_uuid, txn_id); + peer_cnt++; + list_del_init (&peerinfo->op_peers_list); + } + gd_synctask_barrier_wait((&args), peer_cnt); + ret = args.op_ret; + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "Failed to unlock " + "on some peer(s)"); + } - ret = dict_get_int32 (dict, GD_SYNC_OPCODE_KEY, &tmp_op); - if (ret) +out: + glusterd_op_send_cli_response (op, op_ret, 0, req, op_ctx, op_errstr); + glusterd_op_clear_op (op); + if (is_acquired) { + /* Based on the op-version, we release * + * the cluster or mgmt_v3 lock */ + if (conf->op_version < 3) + glusterd_unlock (MY_UUID); + else { + ret = glusterd_mgmt_v3_unlock (volname, MY_UUID, + "vol"); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "Unable to release lock for %s", + volname); + } + } + + return 0; +} + +int +gd_get_brick_count (struct list_head *bricks) +{ + glusterd_pending_node_t *pending_node = NULL; + int npeers = 0; + list_for_each_entry (pending_node, bricks, list) { + npeers++; + } + return npeers; +} + +int +gd_brick_op_phase (glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict, + char **op_errstr) +{ + glusterd_pending_node_t *pending_node = NULL; + struct list_head selected = {0,}; + xlator_t *this = NULL; + int brick_count = 0; + int ret = -1; + rpc_clnt_t *rpc = NULL; + dict_t *rsp_dict = NULL; + glusterd_conf_t *conf = NULL; + + this = THIS; + conf = this->private; + rsp_dict = dict_new (); + if (!rsp_dict) { + ret = -1; goto out; - op = tmp_op; + } + + INIT_LIST_HEAD (&selected); + ret = glusterd_op_bricks_select (op, req_dict, op_errstr, &selected, rsp_dict); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "%s", + (*op_errstr)? *op_errstr: "Brick op failed. Check " + "glusterd log file for more details."); + goto out; + } - ret = glusterd_op_send_cli_response (op, op_ret, 0, req, NULL, - "operation failed"); + if (op == GD_OP_HEAL_VOLUME) { + ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict); + if (ret) + goto out; + } + dict_unref (rsp_dict); + rsp_dict = NULL; + + brick_count = 0; + list_for_each_entry (pending_node, &selected, list) { + rpc = glusterd_pending_node_get_rpc (pending_node); + if (!rpc) { + if (pending_node->type == GD_NODE_REBALANCE) { + ret = 0; + glusterd_defrag_volume_node_rsp (req_dict, + NULL, op_ctx); + goto out; + } -out: - if (dict) - dict_unref (dict); + ret = -1; + gf_log (this->name, GF_LOG_ERROR, "Brick Op failed " + "due to rpc failure."); + goto out; + } + ret = gd_syncop_mgmt_brick_op (rpc, pending_node, op, req_dict, + op_ctx, op_errstr); + if (ret) + goto out; - STACK_DESTROY (sync_frame->root); + brick_count++; + } + ret = 0; +out: + if (rsp_dict) + dict_unref (rsp_dict); + gf_log (this->name, GF_LOG_DEBUG, "Sent op req to %d bricks", + brick_count); return ret; } +void +gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req) +{ + int ret = -1; + int npeers = 0; + dict_t *req_dict = NULL; + glusterd_conf_t *conf = NULL; + glusterd_op_t op = 0; + int32_t tmp_op = 0; + char *op_errstr = NULL; + char *tmp = NULL; + char *volname = NULL; + xlator_t *this = NULL; + gf_boolean_t is_acquired = _gf_false; + uuid_t *txn_id = NULL; + uuid_t *originator_uuid = NULL; + glusterd_op_info_t txn_opinfo; + + this = THIS; + GF_ASSERT (this); + conf = this->private; + GF_ASSERT (conf); + + /* Generate a transaction-id for this operation and + * save it in the dict */ + txn_id = GF_CALLOC (1, sizeof(uuid_t), gf_common_mt_uuid_t); + if (!txn_id) { + ret = -1; + goto out; + } + + uuid_generate (*txn_id); + + ret = dict_set_bin (op_ctx, "transaction_id", + txn_id, sizeof(*txn_id)); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to set transaction id."); + goto out; + } else + gf_log (this->name, GF_LOG_DEBUG, + "Transaction_id = %s", uuid_utoa (*txn_id)); + + /* Save the MY_UUID as the originator_uuid */ + originator_uuid = GF_CALLOC (1, sizeof(uuid_t), + gf_common_mt_uuid_t); + if (!originator_uuid) { + ret = -1; + goto out; + } + + uuid_copy (*originator_uuid, MY_UUID); + ret = dict_set_bin (op_ctx, "originator_uuid", + originator_uuid, sizeof (uuid_t)); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Failed to set originator_uuid."); + goto out; + } + + ret = dict_get_int32 (op_ctx, GD_SYNC_OPCODE_KEY, &tmp_op); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "Failed to get volume " + "operation"); + goto out; + } + + op = tmp_op; + + /* Based on the op_version, acquire a cluster or mgmt_v3 lock */ + if (conf->op_version < 3) { + ret = glusterd_lock (MY_UUID); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Unable to acquire lock"); + gf_asprintf (&op_errstr, + "Another transaction is in progress. " + "Please try again after sometime."); + goto out; + } + } else { + + /* If no volname is given as a part of the command, locks will + * not be held */ + ret = dict_get_str (op_ctx, "volname", &tmp); + if (ret) { + gf_log ("", GF_LOG_DEBUG, "Failed to get volume " + "name"); + goto local_locking_done; + } else { + /* Use a copy of volname, as cli response will be + * sent before the unlock, and the volname in the + * dict, might be removed */ + volname = gf_strdup (tmp); + if (!volname) + goto out; + } + + ret = glusterd_mgmt_v3_lock (volname, MY_UUID, "vol"); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Unable to acquire lock for %s", volname); + gf_asprintf (&op_errstr, + "Another transaction is in progress " + "for %s. Please try again after sometime.", + volname); + goto out; + } + } + + is_acquired = _gf_true; + +local_locking_done: + + /* Save opinfo for this transaction with the transaction id */ + glusterd_txn_opinfo_init (&txn_opinfo, NULL, &op, NULL, NULL); + ret = glusterd_set_txn_opinfo (txn_id, &txn_opinfo); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "Unable to set transaction's opinfo"); + + opinfo = txn_opinfo; + + INIT_LIST_HEAD (&conf->xaction_peers); + + /* Make 'volume status tasks' command a local operation. + * This is accomplished by setting npeers to 0. + */ + if (!glusterd_is_status_tasks_op (op, op_ctx)) + npeers = gd_build_peers_list (&conf->peers, + &conf->xaction_peers, op); + + /* If no volname is given as a part of the command, locks will + * not be held */ + if (volname) { + ret = gd_lock_op_phase (conf, op, op_ctx, &op_errstr, npeers, *txn_id); + if (ret) + goto out; + } + + ret = glusterd_op_build_payload (&req_dict, &op_errstr, op_ctx); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, LOGSTR_BUILD_PAYLOAD, + gd_op_list[op]); + if (op_errstr == NULL) + gf_asprintf (&op_errstr, OPERRSTR_BUILD_PAYLOAD); + goto out; + } + + ret = gd_stage_op_phase (&conf->xaction_peers, op, op_ctx, req_dict, + &op_errstr, npeers); + if (ret) + goto out; + + ret = gd_brick_op_phase (op, op_ctx, req_dict, &op_errstr); + if (ret) + goto out; + + ret = gd_commit_op_phase (&conf->xaction_peers, op, op_ctx, req_dict, + &op_errstr, npeers); + if (ret) + goto out; + + ret = 0; +out: + (void) gd_unlock_op_phase (conf, op, ret, req, op_ctx, op_errstr, + npeers, volname, is_acquired, *txn_id); + + /* Clearing the transaction opinfo */ + ret = glusterd_clear_txn_opinfo (txn_id); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "Unable to clear transaction's opinfo"); + + if (volname) + GF_FREE (volname); + + if (req_dict) + dict_unref (req_dict); + + if (op_errstr) { + GF_FREE (op_errstr); + op_errstr = NULL; + } + + return; +} int32_t glusterd_op_begin_synctask (rpcsvc_request_t *req, glusterd_op_t op, void *dict) { int ret = 0; - call_frame_t *dummy_frame = NULL; - glusterfs_ctx_t *ctx = NULL; - - dummy_frame = create_frame (THIS, THIS->ctx->pool); - if (!dummy_frame) - goto out; - - dummy_frame->local = req; ret = dict_set_int32 (dict, GD_SYNC_OPCODE_KEY, op); if (ret) { @@ -580,11 +1631,9 @@ glusterd_op_begin_synctask (rpcsvc_request_t *req, glusterd_op_t op, goto out; } - ctx = glusterfs_ctx_get (); - - ret = synctask_new (ctx->env, gd_sync_task_begin, - gd_sync_task_completion, - dummy_frame, dict); + gd_sync_task_begin (dict, req); + ret = 0; out: + return ret; } |
