diff options
Diffstat (limited to 'xlators/mgmt/glusterd/src/glusterd-syncop.c')
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-syncop.c | 216 |
1 files changed, 207 insertions, 9 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c index c1fdde19a..7cdc1c3c2 100644 --- a/xlators/mgmt/glusterd/src/glusterd-syncop.c +++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c @@ -25,6 +25,10 @@ #include "glusterd1-xdr.h" #include "glusterd-syncop.h" +#include "glusterd.h" +#include "glusterd-op-sm.h" +#include "glusterd-utils.h" + int gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, void *cookie, rpc_clnt_prog_t *prog, @@ -36,6 +40,7 @@ gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, int count = 0; struct iovec iov = {0, }; ssize_t req_size = 0; + call_frame_t *frame = NULL; GF_ASSERT (rpc); if (!req) @@ -50,6 +55,10 @@ gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, if (!iobref) goto out; + frame = create_frame (THIS, THIS->ctx->pool); + if (!frame) + goto out; + iobref_add (iobref, iobuf); iov.iov_base = iobuf->ptr; @@ -63,10 +72,12 @@ gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, iov.iov_len = ret; count = 1; + frame->local = cookie; + /* Send the msg */ ret = rpc_clnt_submit (rpc, prog, procnum, cbkfn, &iov, count, NULL, 0, iobref, - (call_frame_t *)cookie, NULL, 0, NULL, 0, NULL); + frame, NULL, 0, NULL, 0, NULL); /* TODO: do we need to start ping also? */ @@ -82,13 +93,16 @@ extern struct rpc_clnt_program gd_mgmt_prog; int32_t gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, - int count, void *cookie) + int count, void *myframe) { struct syncargs *args = NULL; gd1_mgmt_cluster_lock_rsp rsp = {{0},}; int ret = -1; + call_frame_t *frame = NULL; - args = cookie; + frame = myframe; + args = frame->local; + frame->local = NULL; /* initialize */ args->op_ret = -1; @@ -111,6 +125,8 @@ gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, uuid_copy (args->uuid, rsp.uuid); out: + STACK_DESTROY (frame->root); + __wake (args); return 0; @@ -142,13 +158,16 @@ gd_syncop_mgmt_lock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid) int32_t gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov, - int count, void *cookie) + int count, void *myframe) { struct syncargs *args = NULL; gd1_mgmt_cluster_unlock_rsp rsp = {{0},}; int ret = -1; + call_frame_t *frame = NULL; - args = cookie; + frame = myframe; + args = frame->local; + frame->local = NULL; /* initialize */ args->op_ret = -1; @@ -171,6 +190,8 @@ gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov, uuid_copy (args->uuid, rsp.uuid); out: + STACK_DESTROY (frame->root); + __wake (args); return 0; @@ -202,13 +223,16 @@ gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, uuid_t my_uuid, uuid_t recv_uuid) int32_t gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov, - int count, void *cookie) + int count, void *myframe) { struct syncargs *args = NULL; gd1_mgmt_stage_op_rsp rsp = {{0},}; int ret = -1; + call_frame_t *frame = NULL; - args = cookie; + frame = myframe; + args = frame->local; + frame->local = NULL; /* initialize */ args->op_ret = -1; @@ -248,6 +272,8 @@ gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov, args->errstr = gf_strdup (rsp.op_errstr); out: + STACK_DESTROY (frame->root); + __wake (args); return 0; @@ -297,13 +323,16 @@ out: int32_t gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov, - int count, void *cookie) + int count, void *myframe) { struct syncargs *args = NULL; gd1_mgmt_commit_op_rsp rsp = {{0},}; int ret = -1; + call_frame_t *frame = NULL; - args = cookie; + frame = myframe; + args = frame->local; + frame->local = NULL; /* initialize */ args->op_ret = -1; @@ -343,6 +372,8 @@ gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov, args->errstr = gf_strdup (rsp.op_errstr); out: + STACK_DESTROY (frame->root); + __wake (args); return 0; @@ -390,3 +421,170 @@ out: return args.op_ret; } + + +int +gd_sync_task_begin (void *data) +{ + int ret = -1; + dict_t *dict = NULL; + dict_t *rsp_dict = NULL; + glusterd_peerinfo_t *peerinfo = NULL; + glusterd_conf_t *conf = NULL; + uuid_t tmp_uuid = {0,}; + char *errstr = NULL; + glusterd_op_t op = 0; + int32_t tmp_op = 0; + gf_boolean_t local_locked = _gf_false; + + conf = THIS->private; + + dict = data; + + ret = dict_get_int32 (dict, GD_SYNC_OPCODE_KEY, &tmp_op); + if (ret) + goto out; + + op = tmp_op; + + ret = -1; + rsp_dict = dict_new (); + if (!rsp_dict) + goto out; + + /* Lock everything */ + ret = glusterd_lock (conf->uuid); + if (ret) + goto out; + /* successful lock in local node */ + local_locked = _gf_true; + + list_for_each_entry (peerinfo, &conf->peers, uuid_list) { + ret = gd_syncop_mgmt_lock (peerinfo->rpc, + conf->uuid, tmp_uuid); + if (ret) + goto out; + /* TODO: Only on lock successful nodes it should unlock */ + } + + /* stage op */ + ret = glusterd_op_stage_validate (op, dict, &errstr, rsp_dict); + if (ret) + goto out; + + list_for_each_entry (peerinfo, &conf->peers, uuid_list) { + ret = gd_syncop_mgmt_stage_op (peerinfo->rpc, + conf->uuid, tmp_uuid, + op, dict, &rsp_dict, &errstr); + if (ret) { + if (errstr) + ret = dict_set_dynstr (dict, "error", errstr); + + ret = -1; + goto out; + } + } + + /* commit op */ + ret = glusterd_op_commit_perform (op, dict, &errstr, rsp_dict); + if (ret) + goto out; + + list_for_each_entry (peerinfo, &conf->peers, uuid_list) { + ret = gd_syncop_mgmt_commit_op (peerinfo->rpc, + conf->uuid, tmp_uuid, + op, dict, &rsp_dict, &errstr); + if (ret) { + if (errstr) + ret = dict_set_dynstr (dict, "error", errstr); + + ret = -1; + goto out; + } + } + + ret = 0; +out: + if (local_locked) { + /* unlock everything as we help successful local lock */ + list_for_each_entry (peerinfo, &conf->peers, uuid_list) { + /* No need to check the error code, as it is possible + that if 'lock' on few nodes failed, it would come + here, and unlock would fail on nodes where lock + never was sent */ + gd_syncop_mgmt_unlock (peerinfo->rpc, + conf->uuid, tmp_uuid); + } + + /* Local node should be the one to be locked first, + unlocked last to prevent races */ + glusterd_unlock (conf->uuid); + } + + if (rsp_dict) + dict_unref (rsp_dict); + + return ret; +} + +int +gd_sync_task_completion (int op_ret, call_frame_t *sync_frame, void *data) +{ + int ret = 0; + dict_t *dict = NULL; + rpcsvc_request_t *req = NULL; + int32_t tmp_op = 0; + glusterd_op_t op = 0; + + dict = data; + + req = sync_frame->local; + sync_frame->local = NULL; + + ret = dict_get_int32 (dict, GD_SYNC_OPCODE_KEY, &tmp_op); + if (ret) + goto out; + op = tmp_op; + + ret = glusterd_op_send_cli_response (op, op_ret, 0, req, NULL, + "operation failed"); + +out: + if (dict) + dict_unref (dict); + + STACK_DESTROY (sync_frame->root); + + return ret; +} + + +int32_t +glusterd_op_begin_synctask (rpcsvc_request_t *req, glusterd_op_t op, + void *dict) +{ + int ret = 0; + call_frame_t *dummy_frame = NULL; + glusterfs_ctx_t *ctx = NULL; + + dummy_frame = create_frame (THIS, THIS->ctx->pool); + if (!dummy_frame) + goto out; + + dummy_frame->local = req; + + ret = dict_set_int32 (dict, GD_SYNC_OPCODE_KEY, op); + if (ret) { + gf_log (THIS->name, GF_LOG_ERROR, + "dict set failed for setting operations"); + goto out; + } + + ctx = glusterfs_ctx_get (); + + ret = synctask_new (ctx->env, gd_sync_task_begin, + gd_sync_task_completion, + dummy_frame, dict); +out: + return ret; +} |