summaryrefslogtreecommitdiffstats
path: root/xlators/protocol/client/src/client-handshake.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/protocol/client/src/client-handshake.c')
-rw-r--r--xlators/protocol/client/src/client-handshake.c481
1 files changed, 462 insertions, 19 deletions
diff --git a/xlators/protocol/client/src/client-handshake.c b/xlators/protocol/client/src/client-handshake.c
index 91cda6d0c..1896e6b63 100644
--- a/xlators/protocol/client/src/client-handshake.c
+++ b/xlators/protocol/client/src/client-handshake.c
@@ -22,6 +22,7 @@
#include "config.h"
#endif
+#include "fd-lk.h"
#include "client.h"
#include "xlator.h"
#include "defaults.h"
@@ -39,6 +40,18 @@ extern rpc_clnt_prog_t clnt_pmap_prog;
int client_ping_cbk (struct rpc_req *req, struct iovec *iov, int count,
void *myframe);
+int client_set_lk_version_cbk (struct rpc_req *req, struct iovec *iov,
+ int count, void *myframe);
+
+int client_set_lk_version (xlator_t *this);
+
+typedef struct client_fd_lk_local {
+ int ref;
+ gf_boolean_t error;
+ gf_lock_t lock;
+ clnt_fd_ctx_t *fdctx;
+}clnt_fd_lk_local_t;
+
/* Handshake */
void
@@ -391,6 +404,411 @@ client_notify_parents_child_up (xlator_t *this)
}
int
+client_set_lk_version_cbk (struct rpc_req *req, struct iovec *iov,
+ int count, void *myframe)
+{
+ int32_t ret = -1;
+ call_frame_t *fr = NULL;
+ gf_set_lk_ver_rsp rsp = {0,};
+
+ fr = (call_frame_t *) myframe;
+ GF_VALIDATE_OR_GOTO ("client", fr, out);
+
+ if (req->rpc_status == -1) {
+ gf_log (fr->this->name, GF_LOG_WARNING,
+ "received RPC status error");
+ goto out;
+ }
+
+ ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gf_set_lk_ver_rsp);
+ if (ret < 0)
+ gf_log (fr->this->name, GF_LOG_WARNING,
+ "xdr decoding failed");
+ else
+ gf_log (fr->this->name, GF_LOG_DEBUG,
+ "Server lk version = %d", rsp.lk_ver);
+
+ ret = 0;
+out:
+ if (fr)
+ STACK_DESTROY (fr->root);
+
+ return ret;
+}
+
+int
+client_set_lk_version (xlator_t *this)
+{
+ int ret = -1;
+ clnt_conf_t *conf = NULL;
+ call_frame_t *frame = NULL;
+ gf_set_lk_ver_req req = {0, };
+
+ conf = (clnt_conf_t *) this->private;
+
+ req.lk_ver = client_get_lk_ver (conf);
+ req.uid = this->ctx->process_uuid;
+
+ gf_log (this->name, GF_LOG_DEBUG, "Sending SET_LK_VERSION");
+
+ frame = create_frame (this, this->ctx->pool);
+ if (!frame)
+ goto out;
+
+ ret = client_submit_request (this, &req, frame,
+ conf->handshake,
+ GF_HNDSK_SET_LK_VER,
+ client_set_lk_version_cbk,
+ NULL, NULL, 0, NULL, 0, NULL,
+ (xdrproc_t)xdr_gf_set_lk_ver_req);
+out:
+ if (ret < 0)
+ gf_log (this->name, GF_LOG_WARNING,
+ "Failed to send SET_LK_VERSION to server");
+
+ return ret;
+}
+
+int
+client_fd_lk_list_empty (fd_lk_ctx_t *lk_ctx)
+{
+ int ret = 1;
+
+ GF_VALIDATE_OR_GOTO ("client", lk_ctx, out);
+
+ LOCK (&lk_ctx->lock);
+ {
+ ret = list_empty (&lk_ctx->lk_list);
+ }
+ UNLOCK (&lk_ctx->lock);
+out:
+ return ret;
+}
+
+int
+client_fd_lk_count (fd_lk_ctx_t *lk_ctx)
+{
+ int count = 0;
+ fd_lk_ctx_node_t *fd_lk = NULL;
+
+ GF_VALIDATE_OR_GOTO ("client", lk_ctx, err);
+
+ LOCK (&lk_ctx->lock);
+ {
+ list_for_each_entry (fd_lk, &lk_ctx->lk_list, next)
+ count++;
+ }
+ UNLOCK (&lk_ctx->lock);
+
+ return count;
+err:
+ return -1;
+}
+
+clnt_fd_lk_local_t *
+clnt_fd_lk_local_ref (xlator_t *this, clnt_fd_lk_local_t *local)
+{
+ GF_VALIDATE_OR_GOTO (this->name, local, out);
+
+ LOCK (&local->lock);
+ {
+ local->ref++;
+ }
+ UNLOCK (&local->lock);
+out:
+ return local;
+}
+
+int
+clnt_fd_lk_local_unref (xlator_t *this, clnt_fd_lk_local_t *local)
+{
+ int ref = -1;
+
+ GF_VALIDATE_OR_GOTO (this->name, local, out);
+
+ LOCK (&local->lock);
+ {
+ ref = --local->ref;
+ }
+ UNLOCK (&local->lock);
+
+ if (ref == 0) {
+ LOCK_DESTROY (&local->lock);
+ GF_FREE (local);
+ }
+ ref = 0;
+out:
+ return ref;
+}
+
+clnt_fd_lk_local_t *
+clnt_fd_lk_local_create (clnt_fd_ctx_t *fdctx)
+{
+ clnt_fd_lk_local_t *local = NULL;
+
+ local = GF_CALLOC (1, sizeof (clnt_fd_lk_local_t),
+ gf_client_mt_clnt_fd_lk_local_t);
+ if (!local)
+ goto out;
+
+ local->ref = 1;
+ local->error = _gf_false;
+ local->fdctx = fdctx;
+
+ LOCK_INIT (&local->lock);
+out:
+ return local;
+}
+
+void
+clnt_mark_fd_bad (clnt_conf_t *conf, clnt_fd_ctx_t *fdctx)
+{
+ pthread_mutex_lock (&conf->lock);
+ {
+ fdctx->remote_fd = -1;
+ }
+ pthread_mutex_unlock (&conf->lock);
+}
+
+// call decrement_reopen_fd_count
+int
+clnt_release_reopen_fd_cbk (struct rpc_req *req, struct iovec *iov,
+ int count, void *myframe)
+{
+ xlator_t *this = NULL;
+ call_frame_t *frame = NULL;
+ clnt_conf_t *conf = NULL;
+ clnt_fd_ctx_t *fdctx = NULL;
+
+ frame = myframe;
+ this = frame->this;
+ fdctx = (clnt_fd_ctx_t *) frame->local;
+ conf = (clnt_conf_t *) this->private;
+
+ clnt_mark_fd_bad (conf, fdctx);
+
+ decrement_reopen_fd_count (this, conf);
+
+ frame->local = NULL;
+ STACK_DESTROY (frame->root);
+
+ return 0;
+}
+
+int
+clnt_release_reopen_fd (xlator_t *this, clnt_fd_ctx_t *fdctx)
+{
+ int ret = -1;
+ clnt_conf_t *conf = NULL;
+ call_frame_t *frame = NULL;
+ gfs3_release_req req = {{0,},};
+
+ conf = (clnt_conf_t *) this->private;
+
+ frame = create_frame (THIS, THIS->ctx->pool);
+ if (!frame)
+ goto out;
+
+ frame->local = (void *) fdctx;
+ req.fd = fdctx->remote_fd;
+
+ ret = client_submit_request (this, &req, frame, conf->fops,
+ GFS3_OP_RELEASE,
+ clnt_release_reopen_fd_cbk, NULL,
+ NULL, 0, NULL, 0, NULL,
+ (xdrproc_t)xdr_gfs3_releasedir_req);
+out:
+ if (ret) {
+ decrement_reopen_fd_count (this, conf);
+ clnt_mark_fd_bad (conf, fdctx);
+ if (frame) {
+ frame->local = NULL;
+ STACK_DESTROY (frame->root);
+ }
+ }
+
+ return 0;
+}
+
+int
+clnt_fd_lk_local_mark_error (xlator_t *this,
+ clnt_fd_lk_local_t *local)
+{
+ gf_boolean_t error = _gf_false;
+
+ LOCK (&local->lock);
+ {
+ error = local->error;
+ local->error = _gf_true;
+ }
+ UNLOCK (&local->lock);
+
+ if (error)
+ clnt_release_reopen_fd (this, local->fdctx);
+
+ return 0;
+}
+
+// Also, I think in reopen_cbk, the fdctx is added to
+// saved_fd list.. avoid that, may cause a problem
+// Reason: While the locks on the fd are reacquired, a release
+// fop may be received by the client-protocol translator
+// which will free the fdctx datastructure.
+int
+client_reacquire_lock_cbk (struct rpc_req *req, struct iovec *iov,
+ int count, void *myframe)
+{
+ int32_t ret = -1;
+ xlator_t *this = NULL;
+ gf_common_rsp rsp = {0,};
+ call_frame_t *frame = NULL;
+ clnt_fd_lk_local_t *local = NULL;
+
+ frame = (call_frame_t *) myframe;
+ this = frame->this;
+ local = (clnt_fd_lk_local_t *) frame->local;
+
+ if (req->rpc_status == -1) {
+ gf_log ("client", GF_LOG_WARNING,
+ "request failed at rpc");
+ goto out;
+ }
+
+ ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gf_common_rsp);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "XDR decoding failed");
+ goto out;
+ }
+
+ if (rsp.op_ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR, "lock request failed");
+ ret = -1;
+ goto out;
+ }
+
+ // TODO: Add more info to log.
+ gf_log (this->name, GF_LOG_DEBUG, "Reacquired lock");
+
+ ret = 0;
+out:
+ if (ret < 0)
+ clnt_fd_lk_local_mark_error (this, local);
+
+ (void) clnt_fd_lk_local_unref (this, local);
+ frame->local = NULL;
+ STACK_DESTROY (frame->root);
+
+ return ret;
+}
+
+int
+_client_reacquire_lock (xlator_t *this, clnt_fd_ctx_t *fdctx)
+{
+ int32_t ret = -1;
+ int32_t gf_cmd = 0;
+ int32_t gf_type = 0;
+ gfs3_lk_req req = {{0,},};
+ struct gf_flock flock = {0,};
+ fd_lk_ctx_t *lk_ctx = NULL;
+ clnt_fd_lk_local_t *local = NULL;
+ fd_lk_ctx_node_t *fd_lk = NULL;
+ call_frame_t *frame = NULL;
+ clnt_conf_t *conf = NULL;
+
+ conf = (clnt_conf_t *) this->private;
+ lk_ctx = fdctx->lk_ctx;
+
+ local = clnt_fd_lk_local_create (fdctx);
+ if (!local) {
+ clnt_release_reopen_fd (this, fdctx);
+ goto out;
+ }
+
+ list_for_each_entry (fd_lk, &lk_ctx->lk_list, next) {
+ memcpy (&flock, &fd_lk->user_flock,
+ sizeof (struct gf_flock));
+
+ ret = client_cmd_to_gf_cmd (fd_lk->cmd, &gf_cmd);
+ if (ret) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "client_cmd_to_gf_cmd failed, "
+ "aborting reacquiring of locks");
+ break;
+ }
+
+ gf_type = client_type_to_gf_type (flock.l_type);
+ req.fd = fdctx->remote_fd;
+ req.cmd = gf_cmd;
+ req.type = gf_type;
+ (void) gf_proto_flock_from_flock (&req.flock,
+ &flock);
+
+ memcpy (req.gfid, fdctx->inode->gfid, 16);
+
+ frame = create_frame (THIS, THIS->ctx->pool);
+ if (!frame) {
+ ret = -1;
+ break;
+ }
+
+ frame->local = clnt_fd_lk_local_ref (this, local);
+ frame->root->lk_owner = fd_lk->user_flock.l_owner;
+
+ ret = client_submit_request (this, &req, frame,
+ conf->fops, GFS3_OP_LK,
+ client_reacquire_lock_cbk,
+ NULL, NULL, 0, NULL, 0, NULL,
+ (xdrproc_t)xdr_gfs3_lk_req);
+ if (ret)
+ break;
+
+ ret = 0;
+ frame = NULL;
+ }
+
+ if (ret) {
+ clnt_fd_lk_local_mark_error (this, local);
+
+ if (frame) {
+ if (frame->local) {
+ clnt_fd_lk_local_unref (this, frame->local);
+ frame->local = NULL;
+ }
+ STACK_DESTROY (frame->root);
+ }
+ }
+ if (local)
+ (void) clnt_fd_lk_local_unref (this, local);
+out:
+ return ret;
+}
+
+int
+client_reacquire_lock (xlator_t *this, clnt_fd_ctx_t *fdctx)
+{
+ int32_t ret = -1;
+ fd_lk_ctx_t *lk_ctx = NULL;
+
+ if (client_fd_lk_list_empty (fdctx->lk_ctx)) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "fd lock list is empty");
+ decrement_reopen_fd_count (this, (clnt_conf_t *)this->private);
+ ret = 0;
+ goto out;
+ }
+
+ lk_ctx = fdctx->lk_ctx;
+
+ LOCK (&lk_ctx->lock);
+ {
+ ret = _client_reacquire_lock (this, fdctx);
+ }
+ UNLOCK (&lk_ctx->lock);
+out:
+ return ret;
+}
+
+int
client3_1_reopen_cbk (struct rpc_req *req, struct iovec *iov, int count,
void *myframe)
{
@@ -402,11 +820,13 @@ client3_1_reopen_cbk (struct rpc_req *req, struct iovec *iov, int count,
clnt_conf_t *conf = NULL;
clnt_fd_ctx_t *fdctx = NULL;
call_frame_t *frame = NULL;
+ xlator_t *this = NULL;
frame = myframe;
if (!frame || !frame->this)
goto out;
+ this = frame->this;
local = frame->local;
conf = frame->this->private;
@@ -454,7 +874,7 @@ client3_1_reopen_cbk (struct rpc_req *req, struct iovec *iov, int count,
fdctx->remote_fd = rsp.fd;
if (!fdctx->released) {
list_add_tail (&fdctx->sfd_pos, &conf->saved_fds);
- if (!list_empty (&fdctx->lock_list))
+ if (!client_fd_lk_list_empty (fdctx->lk_ctx))
attempt_lock_recovery = _gf_true;
fdctx = NULL;
}
@@ -463,31 +883,27 @@ client3_1_reopen_cbk (struct rpc_req *req, struct iovec *iov, int count,
ret = 0;
- attempt_lock_recovery = _gf_false; /* temporarily */
-
- if (attempt_lock_recovery) {
- ret = client_attempt_lock_recovery (frame->this, local->fdctx);
- if (ret < 0) {
- gf_log (frame->this->name, GF_LOG_DEBUG,
- "lock recovery not attempted on fd");
- } else {
- gf_log (frame->this->name, GF_LOG_INFO,
- "need to attempt lock recovery on %"PRIu64
- " open fds", fd_count);
- }
+ if (conf->lk_heal && attempt_lock_recovery) {
+ /* Delay decrement the reopen fd count untill all the
+ locks corresponding to this fd are acquired.*/
+ gf_log (frame->this->name, GF_LOG_WARNING, "acquiring locks on "
+ "%s", local->loc.path);
+ ret = client_reacquire_lock (frame->this, local->fdctx);
} else {
fd_count = decrement_reopen_fd_count (frame->this, conf);
}
out:
if (fdctx)
- client_fdctx_destroy (frame->this, fdctx);
+ client_fdctx_destroy (this, fdctx);
if ((ret < 0) && frame && frame->this && conf)
decrement_reopen_fd_count (frame->this, conf);
- frame->local = NULL;
- STACK_DESTROY (frame->root);
+ if (frame) {
+ frame->local = NULL;
+ STACK_DESTROY (frame->root);
+ }
client_local_wipe (local);
@@ -792,7 +1208,8 @@ client_post_handshake (call_frame_t *frame, xlator_t *this)
}
} else {
gf_log (this->name, GF_LOG_DEBUG,
- "no open fds - notifying all parents child up");
+ "no fds to open - notifying all parents child up");
+ client_set_lk_version (this);
client_notify_parents_child_up (this);
}
out:
@@ -814,6 +1231,7 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m
int32_t op_ret = 0;
int32_t op_errno = 0;
gf_boolean_t auth_fail = _gf_false;
+ uint32_t lk_ver = 0;
frame = myframe;
this = frame->this;
@@ -895,6 +1313,15 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m
goto out;
}
+ ret = dict_get_uint32 (reply, "clnt-lk-version", &lk_ver);
+ if (ret) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "failed to find key 'clnt-lk-version' in the options");
+ goto out;
+ }
+
+ gf_log (this->name, GF_LOG_INFO, "clnt-lk-version = %d, "
+ "server-lk-version = %d", client_get_lk_ver (conf), lk_ver);
/* TODO: currently setpeer path is broken */
/*
if (process_uuid && req->conn &&
@@ -930,8 +1357,15 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m
conf->need_different_port = 0;
- /* TODO: more to test */
- client_post_handshake (frame, frame->this);
+ if (lk_ver != client_get_lk_ver (conf)) {
+ client_mark_fd_bad (this);
+ client_post_handshake (frame, frame->this);
+ } else {
+ /*TODO: Traverse the saved fd list, and send
+ release to the server on fd's that were closed
+ during grace period */
+ ;
+ }
out:
if (auth_fail) {
@@ -1043,6 +1477,14 @@ client_setvolume (xlator_t *this, struct rpc_clnt *rpc)
"failed to set 'volfile-checksum'");
}
+ ret = dict_set_int16 (options, "clnt-lk-version",
+ client_get_lk_ver (conf));
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "failed to set clnt-lk-version(%"PRIu32") in handshake msg",
+ client_get_lk_ver (conf));
+ }
+
req.dict.dict_len = dict_serialized_length (options);
if (req.dict.dict_len < 0) {
gf_log (this->name, GF_LOG_ERROR,
@@ -1366,6 +1808,7 @@ char *clnt_handshake_procs[GF_HNDSK_MAXVALUE] = {
[GF_HNDSK_SETVOLUME] = "SETVOLUME",
[GF_HNDSK_GETSPEC] = "GETSPEC",
[GF_HNDSK_PING] = "PING",
+ [GF_HNDSK_SET_LK_VER] = "SET_LK_VER"
};
rpc_clnt_prog_t clnt_handshake_prog = {