diff options
Diffstat (limited to 'xlators/protocol/client/src/client-handshake.c')
-rw-r--r-- | xlators/protocol/client/src/client-handshake.c | 481 |
1 files changed, 462 insertions, 19 deletions
diff --git a/xlators/protocol/client/src/client-handshake.c b/xlators/protocol/client/src/client-handshake.c index 91cda6d0c45..1896e6b6391 100644 --- a/xlators/protocol/client/src/client-handshake.c +++ b/xlators/protocol/client/src/client-handshake.c @@ -22,6 +22,7 @@ #include "config.h" #endif +#include "fd-lk.h" #include "client.h" #include "xlator.h" #include "defaults.h" @@ -39,6 +40,18 @@ extern rpc_clnt_prog_t clnt_pmap_prog; int client_ping_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe); +int client_set_lk_version_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe); + +int client_set_lk_version (xlator_t *this); + +typedef struct client_fd_lk_local { + int ref; + gf_boolean_t error; + gf_lock_t lock; + clnt_fd_ctx_t *fdctx; +}clnt_fd_lk_local_t; + /* Handshake */ void @@ -391,6 +404,411 @@ client_notify_parents_child_up (xlator_t *this) } int +client_set_lk_version_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + int32_t ret = -1; + call_frame_t *fr = NULL; + gf_set_lk_ver_rsp rsp = {0,}; + + fr = (call_frame_t *) myframe; + GF_VALIDATE_OR_GOTO ("client", fr, out); + + if (req->rpc_status == -1) { + gf_log (fr->this->name, GF_LOG_WARNING, + "received RPC status error"); + goto out; + } + + ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gf_set_lk_ver_rsp); + if (ret < 0) + gf_log (fr->this->name, GF_LOG_WARNING, + "xdr decoding failed"); + else + gf_log (fr->this->name, GF_LOG_DEBUG, + "Server lk version = %d", rsp.lk_ver); + + ret = 0; +out: + if (fr) + STACK_DESTROY (fr->root); + + return ret; +} + +int +client_set_lk_version (xlator_t *this) +{ + int ret = -1; + clnt_conf_t *conf = NULL; + call_frame_t *frame = NULL; + gf_set_lk_ver_req req = {0, }; + + conf = (clnt_conf_t *) this->private; + + req.lk_ver = client_get_lk_ver (conf); + req.uid = this->ctx->process_uuid; + + gf_log (this->name, GF_LOG_DEBUG, "Sending SET_LK_VERSION"); + + frame = create_frame (this, this->ctx->pool); + if (!frame) + goto out; + + ret = client_submit_request (this, &req, frame, + conf->handshake, + GF_HNDSK_SET_LK_VER, + client_set_lk_version_cbk, + NULL, NULL, 0, NULL, 0, NULL, + (xdrproc_t)xdr_gf_set_lk_ver_req); +out: + if (ret < 0) + gf_log (this->name, GF_LOG_WARNING, + "Failed to send SET_LK_VERSION to server"); + + return ret; +} + +int +client_fd_lk_list_empty (fd_lk_ctx_t *lk_ctx) +{ + int ret = 1; + + GF_VALIDATE_OR_GOTO ("client", lk_ctx, out); + + LOCK (&lk_ctx->lock); + { + ret = list_empty (&lk_ctx->lk_list); + } + UNLOCK (&lk_ctx->lock); +out: + return ret; +} + +int +client_fd_lk_count (fd_lk_ctx_t *lk_ctx) +{ + int count = 0; + fd_lk_ctx_node_t *fd_lk = NULL; + + GF_VALIDATE_OR_GOTO ("client", lk_ctx, err); + + LOCK (&lk_ctx->lock); + { + list_for_each_entry (fd_lk, &lk_ctx->lk_list, next) + count++; + } + UNLOCK (&lk_ctx->lock); + + return count; +err: + return -1; +} + +clnt_fd_lk_local_t * +clnt_fd_lk_local_ref (xlator_t *this, clnt_fd_lk_local_t *local) +{ + GF_VALIDATE_OR_GOTO (this->name, local, out); + + LOCK (&local->lock); + { + local->ref++; + } + UNLOCK (&local->lock); +out: + return local; +} + +int +clnt_fd_lk_local_unref (xlator_t *this, clnt_fd_lk_local_t *local) +{ + int ref = -1; + + GF_VALIDATE_OR_GOTO (this->name, local, out); + + LOCK (&local->lock); + { + ref = --local->ref; + } + UNLOCK (&local->lock); + + if (ref == 0) { + LOCK_DESTROY (&local->lock); + GF_FREE (local); + } + ref = 0; +out: + return ref; +} + +clnt_fd_lk_local_t * +clnt_fd_lk_local_create (clnt_fd_ctx_t *fdctx) +{ + clnt_fd_lk_local_t *local = NULL; + + local = GF_CALLOC (1, sizeof (clnt_fd_lk_local_t), + gf_client_mt_clnt_fd_lk_local_t); + if (!local) + goto out; + + local->ref = 1; + local->error = _gf_false; + local->fdctx = fdctx; + + LOCK_INIT (&local->lock); +out: + return local; +} + +void +clnt_mark_fd_bad (clnt_conf_t *conf, clnt_fd_ctx_t *fdctx) +{ + pthread_mutex_lock (&conf->lock); + { + fdctx->remote_fd = -1; + } + pthread_mutex_unlock (&conf->lock); +} + +// call decrement_reopen_fd_count +int +clnt_release_reopen_fd_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + xlator_t *this = NULL; + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + clnt_fd_ctx_t *fdctx = NULL; + + frame = myframe; + this = frame->this; + fdctx = (clnt_fd_ctx_t *) frame->local; + conf = (clnt_conf_t *) this->private; + + clnt_mark_fd_bad (conf, fdctx); + + decrement_reopen_fd_count (this, conf); + + frame->local = NULL; + STACK_DESTROY (frame->root); + + return 0; +} + +int +clnt_release_reopen_fd (xlator_t *this, clnt_fd_ctx_t *fdctx) +{ + int ret = -1; + clnt_conf_t *conf = NULL; + call_frame_t *frame = NULL; + gfs3_release_req req = {{0,},}; + + conf = (clnt_conf_t *) this->private; + + frame = create_frame (THIS, THIS->ctx->pool); + if (!frame) + goto out; + + frame->local = (void *) fdctx; + req.fd = fdctx->remote_fd; + + ret = client_submit_request (this, &req, frame, conf->fops, + GFS3_OP_RELEASE, + clnt_release_reopen_fd_cbk, NULL, + NULL, 0, NULL, 0, NULL, + (xdrproc_t)xdr_gfs3_releasedir_req); +out: + if (ret) { + decrement_reopen_fd_count (this, conf); + clnt_mark_fd_bad (conf, fdctx); + if (frame) { + frame->local = NULL; + STACK_DESTROY (frame->root); + } + } + + return 0; +} + +int +clnt_fd_lk_local_mark_error (xlator_t *this, + clnt_fd_lk_local_t *local) +{ + gf_boolean_t error = _gf_false; + + LOCK (&local->lock); + { + error = local->error; + local->error = _gf_true; + } + UNLOCK (&local->lock); + + if (error) + clnt_release_reopen_fd (this, local->fdctx); + + return 0; +} + +// Also, I think in reopen_cbk, the fdctx is added to +// saved_fd list.. avoid that, may cause a problem +// Reason: While the locks on the fd are reacquired, a release +// fop may be received by the client-protocol translator +// which will free the fdctx datastructure. +int +client_reacquire_lock_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + int32_t ret = -1; + xlator_t *this = NULL; + gf_common_rsp rsp = {0,}; + call_frame_t *frame = NULL; + clnt_fd_lk_local_t *local = NULL; + + frame = (call_frame_t *) myframe; + this = frame->this; + local = (clnt_fd_lk_local_t *) frame->local; + + if (req->rpc_status == -1) { + gf_log ("client", GF_LOG_WARNING, + "request failed at rpc"); + goto out; + } + + ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gf_common_rsp); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, "XDR decoding failed"); + goto out; + } + + if (rsp.op_ret == -1) { + gf_log (this->name, GF_LOG_ERROR, "lock request failed"); + ret = -1; + goto out; + } + + // TODO: Add more info to log. + gf_log (this->name, GF_LOG_DEBUG, "Reacquired lock"); + + ret = 0; +out: + if (ret < 0) + clnt_fd_lk_local_mark_error (this, local); + + (void) clnt_fd_lk_local_unref (this, local); + frame->local = NULL; + STACK_DESTROY (frame->root); + + return ret; +} + +int +_client_reacquire_lock (xlator_t *this, clnt_fd_ctx_t *fdctx) +{ + int32_t ret = -1; + int32_t gf_cmd = 0; + int32_t gf_type = 0; + gfs3_lk_req req = {{0,},}; + struct gf_flock flock = {0,}; + fd_lk_ctx_t *lk_ctx = NULL; + clnt_fd_lk_local_t *local = NULL; + fd_lk_ctx_node_t *fd_lk = NULL; + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + + conf = (clnt_conf_t *) this->private; + lk_ctx = fdctx->lk_ctx; + + local = clnt_fd_lk_local_create (fdctx); + if (!local) { + clnt_release_reopen_fd (this, fdctx); + goto out; + } + + list_for_each_entry (fd_lk, &lk_ctx->lk_list, next) { + memcpy (&flock, &fd_lk->user_flock, + sizeof (struct gf_flock)); + + ret = client_cmd_to_gf_cmd (fd_lk->cmd, &gf_cmd); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "client_cmd_to_gf_cmd failed, " + "aborting reacquiring of locks"); + break; + } + + gf_type = client_type_to_gf_type (flock.l_type); + req.fd = fdctx->remote_fd; + req.cmd = gf_cmd; + req.type = gf_type; + (void) gf_proto_flock_from_flock (&req.flock, + &flock); + + memcpy (req.gfid, fdctx->inode->gfid, 16); + + frame = create_frame (THIS, THIS->ctx->pool); + if (!frame) { + ret = -1; + break; + } + + frame->local = clnt_fd_lk_local_ref (this, local); + frame->root->lk_owner = fd_lk->user_flock.l_owner; + + ret = client_submit_request (this, &req, frame, + conf->fops, GFS3_OP_LK, + client_reacquire_lock_cbk, + NULL, NULL, 0, NULL, 0, NULL, + (xdrproc_t)xdr_gfs3_lk_req); + if (ret) + break; + + ret = 0; + frame = NULL; + } + + if (ret) { + clnt_fd_lk_local_mark_error (this, local); + + if (frame) { + if (frame->local) { + clnt_fd_lk_local_unref (this, frame->local); + frame->local = NULL; + } + STACK_DESTROY (frame->root); + } + } + if (local) + (void) clnt_fd_lk_local_unref (this, local); +out: + return ret; +} + +int +client_reacquire_lock (xlator_t *this, clnt_fd_ctx_t *fdctx) +{ + int32_t ret = -1; + fd_lk_ctx_t *lk_ctx = NULL; + + if (client_fd_lk_list_empty (fdctx->lk_ctx)) { + gf_log (this->name, GF_LOG_WARNING, + "fd lock list is empty"); + decrement_reopen_fd_count (this, (clnt_conf_t *)this->private); + ret = 0; + goto out; + } + + lk_ctx = fdctx->lk_ctx; + + LOCK (&lk_ctx->lock); + { + ret = _client_reacquire_lock (this, fdctx); + } + UNLOCK (&lk_ctx->lock); +out: + return ret; +} + +int client3_1_reopen_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) { @@ -402,11 +820,13 @@ client3_1_reopen_cbk (struct rpc_req *req, struct iovec *iov, int count, clnt_conf_t *conf = NULL; clnt_fd_ctx_t *fdctx = NULL; call_frame_t *frame = NULL; + xlator_t *this = NULL; frame = myframe; if (!frame || !frame->this) goto out; + this = frame->this; local = frame->local; conf = frame->this->private; @@ -454,7 +874,7 @@ client3_1_reopen_cbk (struct rpc_req *req, struct iovec *iov, int count, fdctx->remote_fd = rsp.fd; if (!fdctx->released) { list_add_tail (&fdctx->sfd_pos, &conf->saved_fds); - if (!list_empty (&fdctx->lock_list)) + if (!client_fd_lk_list_empty (fdctx->lk_ctx)) attempt_lock_recovery = _gf_true; fdctx = NULL; } @@ -463,31 +883,27 @@ client3_1_reopen_cbk (struct rpc_req *req, struct iovec *iov, int count, ret = 0; - attempt_lock_recovery = _gf_false; /* temporarily */ - - if (attempt_lock_recovery) { - ret = client_attempt_lock_recovery (frame->this, local->fdctx); - if (ret < 0) { - gf_log (frame->this->name, GF_LOG_DEBUG, - "lock recovery not attempted on fd"); - } else { - gf_log (frame->this->name, GF_LOG_INFO, - "need to attempt lock recovery on %"PRIu64 - " open fds", fd_count); - } + if (conf->lk_heal && attempt_lock_recovery) { + /* Delay decrement the reopen fd count untill all the + locks corresponding to this fd are acquired.*/ + gf_log (frame->this->name, GF_LOG_WARNING, "acquiring locks on " + "%s", local->loc.path); + ret = client_reacquire_lock (frame->this, local->fdctx); } else { fd_count = decrement_reopen_fd_count (frame->this, conf); } out: if (fdctx) - client_fdctx_destroy (frame->this, fdctx); + client_fdctx_destroy (this, fdctx); if ((ret < 0) && frame && frame->this && conf) decrement_reopen_fd_count (frame->this, conf); - frame->local = NULL; - STACK_DESTROY (frame->root); + if (frame) { + frame->local = NULL; + STACK_DESTROY (frame->root); + } client_local_wipe (local); @@ -792,7 +1208,8 @@ client_post_handshake (call_frame_t *frame, xlator_t *this) } } else { gf_log (this->name, GF_LOG_DEBUG, - "no open fds - notifying all parents child up"); + "no fds to open - notifying all parents child up"); + client_set_lk_version (this); client_notify_parents_child_up (this); } out: @@ -814,6 +1231,7 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m int32_t op_ret = 0; int32_t op_errno = 0; gf_boolean_t auth_fail = _gf_false; + uint32_t lk_ver = 0; frame = myframe; this = frame->this; @@ -895,6 +1313,15 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m goto out; } + ret = dict_get_uint32 (reply, "clnt-lk-version", &lk_ver); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "failed to find key 'clnt-lk-version' in the options"); + goto out; + } + + gf_log (this->name, GF_LOG_INFO, "clnt-lk-version = %d, " + "server-lk-version = %d", client_get_lk_ver (conf), lk_ver); /* TODO: currently setpeer path is broken */ /* if (process_uuid && req->conn && @@ -930,8 +1357,15 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m conf->need_different_port = 0; - /* TODO: more to test */ - client_post_handshake (frame, frame->this); + if (lk_ver != client_get_lk_ver (conf)) { + client_mark_fd_bad (this); + client_post_handshake (frame, frame->this); + } else { + /*TODO: Traverse the saved fd list, and send + release to the server on fd's that were closed + during grace period */ + ; + } out: if (auth_fail) { @@ -1043,6 +1477,14 @@ client_setvolume (xlator_t *this, struct rpc_clnt *rpc) "failed to set 'volfile-checksum'"); } + ret = dict_set_int16 (options, "clnt-lk-version", + client_get_lk_ver (conf)); + if (ret < 0) { + gf_log (this->name, GF_LOG_WARNING, + "failed to set clnt-lk-version(%"PRIu32") in handshake msg", + client_get_lk_ver (conf)); + } + req.dict.dict_len = dict_serialized_length (options); if (req.dict.dict_len < 0) { gf_log (this->name, GF_LOG_ERROR, @@ -1366,6 +1808,7 @@ char *clnt_handshake_procs[GF_HNDSK_MAXVALUE] = { [GF_HNDSK_SETVOLUME] = "SETVOLUME", [GF_HNDSK_GETSPEC] = "GETSPEC", [GF_HNDSK_PING] = "PING", + [GF_HNDSK_SET_LK_VER] = "SET_LK_VER" }; rpc_clnt_prog_t clnt_handshake_prog = { |