diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 216 |
1 files changed, 87 insertions, 129 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 950c4ecc5..ac98a5c91 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -1,20 +1,11 @@ /* - Copyright (c) 2010-2011 Gluster, Inc. <http://www.gluster.com> + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ @@ -75,8 +66,8 @@ _is_lock_fop (struct saved_frame *sframe) { int fop = 0; - if (SFRAME_GET_PROGNUM (sframe) == GLUSTER3_1_FOP_PROGRAM && - SFRAME_GET_PROGVER (sframe) == GLUSTER3_1_FOP_VERSION) + if (SFRAME_GET_PROGNUM (sframe) == GLUSTER_FOP_PROGRAM && + SFRAME_GET_PROGVER (sframe) == GLUSTER_FOP_VERSION) fop = SFRAME_GET_PROCNUM (sframe); return ((fop == GFS3_OP_LK) || @@ -153,9 +144,8 @@ call_bail (void *data) struct saved_frame *saved_frame = NULL; struct saved_frame *trav = NULL; struct saved_frame *tmp = NULL; - struct tm frame_sent_tm; char frame_sent[256] = {0,}; - struct timeval timeout = {0,}; + struct timespec timeout = {0,}; struct iovec iov = {0,}; GF_VALIDATE_OR_GOTO ("client", data, out); @@ -173,7 +163,7 @@ call_bail (void *data) call-once timer */ if (conn->timer) { timeout.tv_sec = 10; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; gf_timer_call_cancel (clnt->ctx, conn->timer); conn->timer = gf_timer_call_after (clnt->ctx, @@ -183,7 +173,8 @@ call_bail (void *data) if (conn->timer == NULL) { gf_log (conn->trans->name, GF_LOG_WARNING, - "Cannot create bailout timer"); + "Cannot create bailout timer for %s", + conn->trans->peerinfo.identifier); } } @@ -200,21 +191,21 @@ call_bail (void *data) pthread_mutex_unlock (&conn->lock); list_for_each_entry_safe (trav, tmp, &list, list) { - localtime_r (&trav->saved_at.tv_sec, &frame_sent_tm); - strftime (frame_sent, 32, "%Y-%m-%d %H:%M:%S", &frame_sent_tm); + gf_time_fmt (frame_sent, sizeof frame_sent, + trav->saved_at.tv_sec, gf_timefmt_FT); snprintf (frame_sent + strlen (frame_sent), 256 - strlen (frame_sent), ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); gf_log (conn->trans->name, GF_LOG_ERROR, - "bailing out frame type(%s) op(%s(%d)) xid = 0x%ux " - "sent = %s. timeout = %d", + "bailing out frame type(%s) op(%s(%d)) xid = 0x%x " + "sent = %s. timeout = %d for %s", trav->rpcreq->prog->progname, (trav->rpcreq->prog->procnames) ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : "--", trav->rpcreq->procnum, trav->rpcreq->xid, frame_sent, - conn->frame_timeout); + conn->frame_timeout, conn->trans->peerinfo.identifier); clnt = rpc_clnt_ref (clnt); trav->rpcreq->rpc_status = -1; @@ -236,7 +227,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, struct rpc_req *rpcreq) { rpc_clnt_connection_t *conn = NULL; - struct timeval timeout = {0, }; + struct timespec timeout = {0, }; struct saved_frame *saved_frame = NULL; conn = &rpc_clnt->conn; @@ -250,7 +241,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, /* TODO: make timeout configurable */ if (conn->timer == NULL) { timeout.tv_sec = 10; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; conn->timer = gf_timer_call_after (rpc_clnt->ctx, timeout, call_bail, @@ -349,20 +340,16 @@ out: void saved_frames_unwind (struct saved_frames *saved_frames) { - struct rpc_clnt *clnt = NULL; struct saved_frame *trav = NULL; struct saved_frame *tmp = NULL; - struct tm *frame_sent_tm = NULL; - char timestr[256] = {0,}; - + char timestr[1024] = {0,}; struct iovec iov = {0,}; list_splice_init (&saved_frames->lk_sf.list, &saved_frames->sf.list); list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) { - frame_sent_tm = localtime (&trav->saved_at.tv_sec); - strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S", - frame_sent_tm); + gf_time_fmt (timestr, sizeof timestr, + trav->saved_at.tv_sec, gf_timefmt_FT); snprintf (timestr + strlen (timestr), sizeof(timestr) - strlen (timestr), ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); @@ -373,7 +360,7 @@ saved_frames_unwind (struct saved_frames *saved_frames) gf_log_callingfn (trav->rpcreq->conn->trans->name, GF_LOG_ERROR, "forced unwinding frame type(%s) op(%s(%d)) " - "called at %s (xid=0x%ux)", + "called at %s (xid=0x%x)", trav->rpcreq->prog->progname, ((trav->rpcreq->prog->procnames) ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum] @@ -382,14 +369,12 @@ saved_frames_unwind (struct saved_frames *saved_frames) trav->rpcreq->xid); saved_frames->count--; - clnt = rpc_clnt_ref (trav->rpcreq->conn->rpc_clnt); trav->rpcreq->rpc_status = -1; trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); rpc_clnt_reply_deinit (trav->rpcreq, trav->rpcreq->conn->rpc_clnt->reqpool); - clnt = rpc_clnt_unref (clnt); list_del_init (&trav->list); mem_put (trav); } @@ -413,7 +398,7 @@ rpc_clnt_reconnect (void *trans_ptr) { rpc_transport_t *trans = NULL; rpc_clnt_connection_t *conn = NULL; - struct timeval tv = {0, 0}; + struct timespec ts = {0, 0}; int32_t ret = 0; struct rpc_clnt *clnt = NULL; @@ -432,23 +417,15 @@ rpc_clnt_reconnect (void *trans_ptr) conn->reconnect = 0; if (conn->connected == 0) { - tv.tv_sec = 3; + ts.tv_sec = 3; + ts.tv_nsec = 0; gf_log (trans->name, GF_LOG_TRACE, "attempting reconnect"); ret = rpc_transport_connect (trans, conn->config.remote_port); - /* Every time there is a disconnection, processes - should try to connect to 'glusterd' (ie, default - port) or whichever port given as 'option remote-port' - in volume file. */ - /* Below code makes sure the (re-)configured port lasts - for just one successful attempt */ - if (!ret) - conn->config.remote_port = 0; - conn->reconnect = - gf_timer_call_after (clnt->ctx, tv, + gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, trans); } else { @@ -469,7 +446,7 @@ rpc_clnt_reconnect (void *trans_ptr) int rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) { - struct saved_frame saved_frame = {{}, 0}; + struct saved_frame saved_frame; int ret = -1; pthread_mutex_lock (&clnt->conn.lock); @@ -686,15 +663,13 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, } gf_log (conn->trans->name, GF_LOG_TRACE, - "received rpc message (RPC XID: 0x%ux" + "received rpc message (RPC XID: 0x%x" " Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)", saved_frame->rpcreq->xid, saved_frame->rpcreq->prog->progname, saved_frame->rpcreq->prog->progver, saved_frame->rpcreq->procnum, conn->trans->name); - req->rpc_status = 0; - out: if (ret != 0) { req->rpc_status = -1; @@ -751,7 +726,8 @@ rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) if (found && (procnum < program->numactors) && (program->actors[procnum].actor)) { - program->actors[procnum].actor (&progmsg); + program->actors[procnum].actor (clnt, program->mydata, + &progmsg); } out: @@ -845,6 +821,9 @@ out: return; } +static void +rpc_clnt_destroy (struct rpc_clnt *rpc); + int rpc_clnt_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...) @@ -854,7 +833,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, int ret = -1; rpc_request_info_t *req_info = NULL; rpc_transport_pollin_t *pollin = NULL; - struct timeval tv = {0, }; + struct timespec ts = {0, }; conn = mydata; if (conn == NULL) { @@ -873,10 +852,11 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, { if (!conn->rpc_clnt->disabled && (conn->reconnect == NULL)) { - tv.tv_sec = 10; + ts.tv_sec = 10; + ts.tv_nsec = 0; conn->reconnect = - gf_timer_call_after (clnt->ctx, tv, + gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, conn->trans); } @@ -890,9 +870,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, } case RPC_TRANSPORT_CLEANUP: - /* this event should not be received on a client for, a - * transport is only disconnected, but never destroyed. - */ + rpc_clnt_destroy (clnt); ret = 0; break; @@ -936,6 +914,14 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, case RPC_TRANSPORT_CONNECT: { + /* Every time there is a disconnection, processes + should try to connect to 'glusterd' (ie, default + port) or whichever port given as 'option remote-port' + in volume file. */ + /* Below code makes sure the (re-)configured port lasts + for just one successful attempt */ + conn->config.remote_port = 0; + if (clnt->notifyfn) ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_CONNECT, NULL); @@ -962,7 +948,7 @@ rpc_clnt_connection_deinit (rpc_clnt_connection_t *conn) } -inline int +static inline int rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, dict_t *options, char *name) { @@ -1335,7 +1321,7 @@ out: int rpcclnt_cbk_program_register (struct rpc_clnt *clnt, - rpcclnt_cb_program_t *program) + rpcclnt_cb_program_t *program, void *mydata) { int ret = -1; char already_registered = 0; @@ -1375,6 +1361,8 @@ rpcclnt_cbk_program_register (struct rpc_clnt *clnt, memcpy (tmp, program, sizeof (*tmp)); INIT_LIST_HEAD (&tmp->program); + tmp->mydata = mydata; + pthread_mutex_lock (&clnt->lock); { list_add_tail (&tmp->program, &clnt->programs); @@ -1490,10 +1478,6 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, if (conn->connected == 0) { ret = rpc_transport_connect (conn->trans, conn->config.remote_port); - /* Below code makes sure the (re-)configured port lasts - for just one successful connect attempt */ - if (!ret) - conn->config.remote_port = 0; } ret = rpc_transport_submit_request (rpc->conn.trans, @@ -1501,19 +1485,18 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, if (ret == -1) { gf_log (conn->trans->name, GF_LOG_WARNING, "failed to submit rpc-request " - "(XID: 0x%ux Program: %s, ProgVers: %d, " + "(XID: 0x%x Program: %s, ProgVers: %d, " "Proc: %d) to rpc-transport (%s)", rpcreq->xid, rpcreq->prog->progname, rpcreq->prog->progver, rpcreq->procnum, rpc->conn.trans->name); } if ((ret >= 0) && frame) { - gettimeofday (&conn->last_sent, NULL); /* Save the frame in queue */ __save_frame (rpc, frame, rpcreq); gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request " - "(XID: 0x%ux Program: %s, ProgVers: %d, " + "(XID: 0x%x Program: %s, ProgVers: %d, " "Proc: %d) to rpc-transport (%s)", rpcreq->xid, rpcreq->prog->progname, rpcreq->prog->progver, rpcreq->procnum, rpc->conn.trans->name); @@ -1562,18 +1545,21 @@ rpc_clnt_ref (struct rpc_clnt *rpc) static void -rpc_clnt_destroy (struct rpc_clnt *rpc) +rpc_clnt_trigger_destroy (struct rpc_clnt *rpc) { if (!rpc) return; - if (rpc->conn.trans) { - rpc_transport_unregister_notify (rpc->conn.trans); - rpc_transport_disconnect (rpc->conn.trans); - rpc_transport_unref (rpc->conn.trans); - } + rpc_clnt_disable (rpc); + rpc_transport_unref (rpc->conn.trans); +} + +static void +rpc_clnt_destroy (struct rpc_clnt *rpc) +{ + if (!rpc) + return; - rpc_clnt_reconnect_cleanup (&rpc->conn); saved_frames_destroy (rpc->conn.saved_frames); pthread_mutex_destroy (&rpc->lock); pthread_mutex_destroy (&rpc->conn.lock); @@ -1600,13 +1586,36 @@ rpc_clnt_unref (struct rpc_clnt *rpc) } pthread_mutex_unlock (&rpc->lock); if (!count) { - rpc_clnt_destroy (rpc); + rpc_clnt_trigger_destroy (rpc); return NULL; } return rpc; } +char +rpc_clnt_is_disabled (struct rpc_clnt *rpc) +{ + + rpc_clnt_connection_t *conn = NULL; + char disabled = 0; + + if (!rpc) { + goto out; + } + + conn = &rpc->conn; + + pthread_mutex_lock (&conn->lock); + { + disabled = rpc->disabled; + } + pthread_mutex_unlock (&conn->lock); + +out: + return disabled; +} + void rpc_clnt_disable (struct rpc_clnt *rpc) { @@ -1676,7 +1685,7 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config) if (strcmp (rpc->conn.config.remote_host, config->remote_host)) gf_log (rpc->conn.trans->name, GF_LOG_INFO, - "changing port to %s (from %s)", + "changing hostname to %s (from %s)", config->remote_host, rpc->conn.config.remote_host); FREE (rpc->conn.config.remote_host); @@ -1689,54 +1698,3 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config) rpc->conn.config.remote_host = gf_strdup (config->remote_host); } } - -int -rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath) -{ - dict_t *dict = NULL; - char *fpath = NULL; - int ret = -1; - - GF_ASSERT (filepath); - GF_ASSERT (options); - - dict = dict_new (); - if (!dict) - goto out; - - fpath = gf_strdup (filepath); - if (!fpath) { - ret = -1; - goto out; - } - - ret = dict_set_dynstr (dict, "transport.socket.connect-path", fpath); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport.address-family", "unix"); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport.socket.nodelay", "off"); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport-type", "socket"); - if (ret) - goto out; - - ret = dict_set_str (dict, "transport.socket.keepalive", "off"); - if (ret) - goto out; - - *options = dict; -out: - if (ret) { - if (fpath) - GF_FREE (fpath); - if (dict) - dict_unref (dict); - } - return ret; -} |
