From fdd20492638fe98a62b5e6d5e82f18cf4799fd1a Mon Sep 17 00:00:00 2001 From: Amar Tumballi Date: Mon, 21 Jun 2010 07:00:04 +0000 Subject: rpc protocol Signed-off-by: Amar Tumballi Signed-off-by: Raghavendra G Signed-off-by: Anand V. Avati BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875 --- xlators/protocol/client/src/client-handshake.c | 738 +++++++++++++++++++++++++ 1 file changed, 738 insertions(+) create mode 100644 xlators/protocol/client/src/client-handshake.c (limited to 'xlators/protocol/client/src/client-handshake.c') diff --git a/xlators/protocol/client/src/client-handshake.c b/xlators/protocol/client/src/client-handshake.c new file mode 100644 index 00000000000..79cabd106b2 --- /dev/null +++ b/xlators/protocol/client/src/client-handshake.c @@ -0,0 +1,738 @@ +/* + Copyright (c) 2010 Gluster, Inc. + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + . +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "client.h" +#include "xlator.h" +#include "defaults.h" +#include "glusterfs.h" +#include "msg-xdr.h" +#include "statedump.h" +#include "compat-errno.h" + +extern rpc_clnt_prog_t clnt3_1_fop_prog; +extern rpc_clnt_prog_t clnt3_1_mgmt_prog; + +/* Handshake */ + +void +rpc_client_ping_timer_expired (void *data) +{ + rpc_transport_t *trans = NULL; + rpc_clnt_connection_t *conn = NULL; + int disconnect = 0; + int transport_activity = 0; + struct timeval timeout = {0, }; + struct timeval current = {0, }; + struct rpc_clnt *clnt = NULL; + xlator_t *this = NULL; + clnt_conf_t *conf = NULL; + + if (!data) { + goto out; + } + + this = data; + conf = this->private; + + conn = &conf->rpc->conn; + trans = conn->trans; + + if (!clnt || !trans) { + goto out; + } + + pthread_mutex_lock (&conn->lock); + { + if (conn->ping_timer) + gf_timer_call_cancel (this->ctx, + conn->ping_timer); + gettimeofday (¤t, NULL); + + if (((current.tv_sec - conn->last_received.tv_sec) < + conf->opt.ping_timeout) + || ((current.tv_sec - conn->last_sent.tv_sec) < + conf->opt.ping_timeout)) { + transport_activity = 1; + } + + if (transport_activity) { + gf_log (trans->name, GF_LOG_TRACE, + "ping timer expired but transport activity " + "detected - not bailing transport"); + timeout.tv_sec = conf->opt.ping_timeout; + timeout.tv_usec = 0; + + conn->ping_timer = + gf_timer_call_after (this->ctx, timeout, + rpc_client_ping_timer_expired, + (void *) this); + if (conn->ping_timer == NULL) + gf_log (trans->name, GF_LOG_DEBUG, + "unable to setup timer"); + + } else { + conn->ping_started = 0; + conn->ping_timer = NULL; + disconnect = 1; + } + } + pthread_mutex_unlock (&conn->lock); + + if (disconnect) { + gf_log (trans->name, GF_LOG_ERROR, + "Server %s has not responded in the last %d " + "seconds, disconnecting.", + conn->trans->peerinfo.identifier, + conf->opt.ping_timeout); + + rpc_transport_disconnect (conn->trans); + } + +out: + return; +} + + +void +client_start_ping (void *data) +{ + xlator_t *this = NULL; + clnt_conf_t *conf = NULL; + rpc_clnt_connection_t *conn = NULL; + int32_t ret = -1; + struct timeval timeout = {0, }; + call_frame_t *frame = NULL; + int frame_count = 0; + rpc_transport_t *trans = NULL; + + this = data; + conf = this->private; + + conn = &conf->rpc->conn; + trans = conn->trans; + + if (conf->opt.ping_timeout == 0) + return; + + pthread_mutex_lock (&conn->lock); + { + if (conn->ping_timer) + gf_timer_call_cancel (this->ctx, conn->ping_timer); + + conn->ping_timer = NULL; + conn->ping_started = 0; + + if (conn->saved_frames) + /* treat the case where conn->saved_frames is NULL + as no pending frames */ + frame_count = conn->saved_frames->count; + + if ((frame_count == 0) || !conn->connected) { + /* using goto looked ugly here, + * hence getting out this way */ + /* unlock */ + pthread_mutex_unlock (&conn->lock); + return; + } + + if (frame_count < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "saved_frames->count is %"PRId64, + conn->saved_frames->count); + conn->saved_frames->count = 0; + } + + timeout.tv_sec = conf->opt.ping_timeout; + timeout.tv_usec = 0; + + conn->ping_timer = + gf_timer_call_after (this->ctx, timeout, + rpc_client_ping_timer_expired, + (void *) this); + + if (conn->ping_timer == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "unable to setup timer"); + } else { + conn->ping_started = 1; + } + } + pthread_mutex_unlock (&conn->lock); + + frame = create_frame (this, this->ctx->pool); + if (!frame) + goto fail; + + ret = client_submit_request (this, NULL, frame, conf->handshake, + GF_HNDSK_PING, NULL, NULL); + + return; +fail: + + if (frame) { + STACK_DESTROY (frame->root); + } + + return; +} + + +int +client_ping_cbk (struct rpc_req *req, struct iovec *iov, int count, + void *myframe) +{ + xlator_t *this = NULL; + rpc_clnt_connection_t *conn = NULL; + struct timeval timeout = {0, }; + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + + frame = myframe; + + this = frame->this; + conf = this->private; + conn = &conf->rpc->conn; + + if (req->rpc_status == -1) { + /* timer expired and transport bailed out */ + gf_log (this->name, GF_LOG_DEBUG, "timer must have expired"); + goto out; + } + + pthread_mutex_lock (&conn->lock); + { + timeout.tv_sec = conf->opt.ping_timeout; + timeout.tv_usec = 0; + + gf_timer_call_cancel (this->ctx, + conn->ping_timer); + + conn->ping_timer = + gf_timer_call_after (this->ctx, timeout, + client_start_ping, (void *)this); + + if (conn->ping_timer == NULL) + gf_log (this->name, GF_LOG_DEBUG, + "gf_timer_call_after() returned NULL"); + } + pthread_mutex_unlock (&conn->lock); +out: + STACK_DESTROY (frame->root); + return 0; +} + + +int +client_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) +{ + gf_getspec_rsp rsp = {0,}; + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + int ret = 0; + char spec[(32*1024)] = {0,}; + + frame = myframe; + conf = frame->this->private; + + if (-1 == req->rpc_status) { + rsp.op_ret = -1; + rsp.op_errno = EINVAL; + goto out; + } + + rsp.spec = spec; + ret = xdr_to_dump_version_rsp (*iov, &rsp); + if (ret < 0) { + gf_log ("", GF_LOG_ERROR, "error"); + rsp.op_ret = -1; + rsp.op_errno = EINVAL; + goto out; + } + + if (-1 == rsp.op_ret) { + gf_log (frame->this->name, GF_LOG_ERROR, + "failed to get the 'volume file' from server"); + goto out; + } + +out: + /* no _STRICT for mops */ + STACK_UNWIND_STRICT (getspec, frame, rsp.op_ret, rsp.op_errno, rsp.spec); + return 0; +} + + +int +client_post_handshake (call_frame_t *frame, xlator_t *this) +{ + clnt_conf_t *conf = NULL; + clnt_fd_ctx_t *tmp = NULL; + clnt_fd_ctx_t *fdctx = NULL; + xlator_list_t *parent = NULL; + struct list_head reopen_head; + + if (!this || !this->private) + goto out; + + conf = this->private; + INIT_LIST_HEAD (&reopen_head); + + pthread_mutex_lock (&conf->lock); + { + list_for_each_entry_safe (fdctx, tmp, &conf->saved_fds, + sfd_pos) { + if (fdctx->remote_fd != -1) + continue; + + list_del_init (&fdctx->sfd_pos); + list_add_tail (&fdctx->sfd_pos, &reopen_head); + } + } + pthread_mutex_unlock (&conf->lock); + + list_for_each_entry_safe (fdctx, tmp, &reopen_head, sfd_pos) { + list_del_init (&fdctx->sfd_pos); + + if (fdctx->is_dir) + protocol_client_reopendir (this, fdctx); + else + protocol_client_reopen (this, fdctx); + } + + parent = this->parents; + + while (parent) { + xlator_notify (parent->xlator, GF_EVENT_CHILD_UP, + this); + parent = parent->next; + } + +out: + return 0; +} + +int +client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) +{ + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + xlator_t *this = NULL; + dict_t *reply = NULL; + xlator_list_t *parent = NULL; + char *process_uuid = NULL; + char *remote_error = NULL; + char *remote_subvol = NULL; + rpc_transport_t *peer_trans = NULL; + gf_setvolume_rsp rsp = {0,}; + uint64_t peertrans_int = 0; + int ret = 0; + int op_ret = 0; + int op_errno = 0; + + frame = myframe; + this = frame->this; + conf = this->private; + + if (-1 == req->rpc_status) { + op_ret = -1; + op_errno = EINVAL; + goto out; + } + + ret = xdr_to_setvolume_rsp (*iov, &rsp); + if (ret < 0) { + gf_log ("", GF_LOG_ERROR, "error"); + op_errno = EINVAL; + op_ret = -1; + goto out; + } + op_ret = rsp.op_ret; + op_errno = gf_error_to_errno (rsp.op_errno); + if (-1 == rsp.op_ret) { + gf_log (frame->this->name, GF_LOG_WARNING, + "failed to set the volume"); + } + + reply = dict_new (); + if (!reply) + goto out; + + if (rsp.dict.dict_len) { + ret = dict_unserialize (rsp.dict.dict_val, + rsp.dict.dict_len, &reply); + if (ret < 0) { + gf_log (frame->this->name, GF_LOG_DEBUG, + "failed to unserialize buffer to dict"); + goto out; + } + } + + ret = dict_get_str (reply, "ERROR", &remote_error); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get ERROR string from reply dict"); + } + + ret = dict_get_str (reply, "process-uuid", &process_uuid); + if (ret < 0) { + gf_log (this->name, GF_LOG_DEBUG, + "failed to get 'process-uuid' from reply dict"); + } + + if (op_ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "SETVOLUME on remote-host failed: %s", + remote_error ? remote_error : strerror (op_errno)); + errno = op_errno; + if (op_errno == ESTALE) { + parent = this->parents; + while (parent) { + xlator_notify (parent->xlator, + GF_EVENT_VOLFILE_MODIFIED, + this); + parent = parent->next; + } + } + goto out; + } + ret = dict_get_str (this->options, "remote-subvolume", + &remote_subvol); + if (!remote_subvol) + goto out; + + if (process_uuid && + !strcmp (this->ctx->process_uuid, process_uuid)) { + ret = dict_get_uint64 (reply, "transport-ptr", + &peertrans_int); + + peer_trans = (void *) (long) (peertrans_int); + + gf_log (this->name, GF_LOG_WARNING, + "attaching to the local volume '%s'", + remote_subvol); + + if (req->conn) { + /* TODO: Some issues with this logic at present */ + //rpc_transport_setpeer (req->conn->trans, peer_trans); + } + } + + gf_log (this->name, GF_LOG_NORMAL, + "Connected to %s, attached to remote volume '%s'.", + conf->rpc->conn.trans->peerinfo.identifier, + remote_subvol); + + rpc_clnt_set_connected (&conf->rpc->conn); + + op_ret = 0; + conf->connecting = 0; + + /* TODO: more to test */ + client_post_handshake (frame, frame->this); + +out: + + if (-1 == op_ret) { + /* Let the connection/re-connection happen in + * background, for now, don't hang here, + * tell the parents that i am all ok.. + */ + parent = this->parents; + while (parent) { + xlator_notify (parent->xlator, + GF_EVENT_CHILD_CONNECTING, this); + parent = parent->next; + } + + conf->connecting= 1; + } + + if (rsp.dict.dict_val) + free (rsp.dict.dict_val); + + STACK_DESTROY (frame->root); + + if (reply) + dict_unref (reply); + + return 0; +} + +int +client_setvolume (xlator_t *this, struct rpc_clnt *rpc) +{ + int ret = 0; + gf_setvolume_req req = {0,}; + call_frame_t *fr = NULL; + char *process_uuid_xl = NULL; + clnt_conf_t *conf = NULL; + dict_t *options = NULL; + + options = this->options; + conf = this->private; + + if (conf->fops || !dict_get (options, "fops-version")) { + ret = dict_set_int32 (options, "fops-version", + conf->fops->prognum); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set version-fops(%d) in handshake msg", + conf->fops->prognum); + goto fail; + } + } + + if (conf->mgmt) { + ret = dict_set_int32 (options, "mgmt-version", conf->mgmt->prognum); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set version-mgmt(%d) in handshake msg", + conf->mgmt->prognum); + goto fail; + } + } + + ret = gf_asprintf (&process_uuid_xl, "%s-%s", this->ctx->process_uuid, + this->name); + if (-1 == ret) { + gf_log (this->name, GF_LOG_ERROR, + "asprintf failed while setting process_uuid"); + goto fail; + } + ret = dict_set_dynstr (options, "process-uuid", process_uuid_xl); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to set process-uuid(%s) in handshake msg", + process_uuid_xl); + goto fail; + } + + if (this->ctx->cmd_args.volfile_server) { + if (this->ctx->cmd_args.volfile_id) + ret = dict_set_str (options, "volfile-key", + this->ctx->cmd_args.volfile_id); + ret = dict_set_uint32 (options, "volfile-checksum", + this->graph->volfile_checksum); + } + + req.dict.dict_len = dict_serialized_length (options); + if (req.dict.dict_len < 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to get serialized length of dict"); + ret = -1; + goto fail; + } + req.dict.dict_val = GF_CALLOC (1, req.dict.dict_len, 0); + ret = dict_serialize (options, req.dict.dict_val); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "failed to serialize dictionary"); + goto fail; + } + + fr = create_frame (this, this->ctx->pool); + if (!fr) + goto fail; + + ret = client_submit_request (this, &req, fr, conf->handshake, + GF_HNDSK_SETVOLUME, NULL, xdr_from_setvolume_req); + +fail: + if (req.dict.dict_val) + GF_FREE (req.dict.dict_val); + + return ret; +} + +int +select_server_supported_programs (xlator_t *this, char *msg) +{ + clnt_conf_t *conf = NULL; + char *tmp_str = NULL; + char *prog_str = NULL; + char *dup_str = NULL; + char *tmp_str1 = NULL; + char *tmp_msg = NULL; + char *progname = NULL; + char *progver_str = NULL; + char *prognum_str = NULL; + int ret = -1; + int progver = 0; + int prognum = 0; + + if (!this || !msg) + goto out; + + conf = this->private; + + /* Reply in "Name:Program-Number:Program-Version,..." format */ + tmp_msg = gf_strdup (msg); + prog_str = strtok_r (tmp_msg, ",", &tmp_str); + while (prog_str) { + dup_str = gf_strdup (prog_str); + + progname = strtok_r (dup_str, ":", &tmp_str1); + prognum_str = strtok_r (NULL, ":", &tmp_str1); + if (!prognum_str) { + gf_log (this->name, GF_LOG_WARNING, + "Supported versions not formatted"); + goto out; + } + sscanf (prognum_str, "%d", &prognum); + progver_str = strtok_r (NULL, ":", &tmp_str1); + if (!progver_str) { + gf_log (this->name, GF_LOG_WARNING, + "Supported versions not formatted"); + goto out; + } + sscanf (progver_str, "%d", &progver); + + /* Select 'programs' */ + if ((clnt3_1_fop_prog.prognum == prognum) && + (clnt3_1_fop_prog.progver == progver)) { + conf->fops = &clnt3_1_fop_prog; + gf_log (this->name, GF_LOG_INFO, + "Using Program %s, Num (%s), Version (%s)", + progname, prognum_str, progver_str); + ret = 0; + } + if ((clnt3_1_mgmt_prog.prognum == prognum) && + (clnt3_1_mgmt_prog.progver == progver)) { + conf->mgmt = &clnt3_1_mgmt_prog; + gf_log (this->name, GF_LOG_INFO, + "Using Program %s, Num (%s), Version (%s)", + progname, prognum_str, progver_str); + ret = 0; + } + + prog_str = strtok_r (NULL, ",", &tmp_str); + GF_FREE (dup_str); + } + + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "none of the server versions are supported by client"); + } + ret = 0; +out: + if (tmp_msg) + GF_FREE (tmp_msg); + return ret; +} + +int +client_dump_version_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) +{ + gf_dump_version_rsp rsp = {0,}; + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + int ret = 0; + + frame = myframe; + conf = frame->this->private; + + if (-1 == req->rpc_status) { + gf_log ("", 1, "some error, retry again later"); + goto out; + } + + ret = xdr_to_dump_version_rsp (*iov, &rsp); + if (ret < 0) { + gf_log ("", GF_LOG_ERROR, "error"); + goto out; + } + if (-1 == rsp.op_ret) { + gf_log (frame->this->name, GF_LOG_ERROR, + "failed to get the 'versions' from server"); + goto out; + } + + /* Check for the proper version string */ + /* Reply in "Name:Program-Number:Program-Version,..." format */ + ret = select_server_supported_programs (frame->this, + rsp.msg.msg_val); + if (ret) { + gf_log (frame->this->name, GF_LOG_ERROR, + "Server versions are not present in this " + "release (%s)", rsp.msg.msg_val); + goto out; + } + + client_setvolume (frame->this, conf->rpc); + +out: + /* don't use GF_FREE, buffer was allocated by libc */ + if (rsp.msg.msg_val) { + free (rsp.msg.msg_val); + } + + STACK_DESTROY (frame->root); + return ret; +} + +int +client_handshake (xlator_t *this, struct rpc_clnt *rpc) +{ + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + gf_dump_version_req req = {0,}; + int ret = 0; + + conf = this->private; + if (!conf->handshake) + goto out; + + frame = create_frame (this, this->ctx->pool); + if (!frame) + goto out; + + req.key = "fop-handshake"; + req.gfs_id = 123456; + ret = client_submit_request (this, &req, frame, conf->handshake, + GF_HNDSK_DUMP_VERSION, + NULL, xdr_from_dump_version_req); + +out: + return ret; +} + + +/* */ +/* This table should ideally remain same irrespective of versions */ +static rpc_clnt_procedure_t clnt_handshake_actors[] = { + [GF_HNDSK_NULL] = { "NULL", NULL, NULL}, + [GF_HNDSK_DUMP_VERSION] = { "VERSION", NULL, client_dump_version_cbk}, + [GF_HNDSK_SETVOLUME] = { "SETVOLUME", NULL, client_setvolume_cbk}, + [GF_HNDSK_GETSPEC] = { "GETSPEC", NULL, client_getspec_cbk }, + [GF_HNDSK_PING] = { "PING", NULL, client_ping_cbk }, +}; + +rpc_clnt_prog_t clnt_handshake_prog = { + .progname = "GlusterFS Handshake", + .prognum = GLUSTER_HNDSK_PROGRAM, + .progver = GLUSTER_HNDSK_VERSION, + .actor = clnt_handshake_actors, + .numproc = (sizeof (*clnt_handshake_actors) / + sizeof (rpc_clnt_procedure_t)), +}; -- cgit