diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 828 |
1 files changed, 558 insertions, 270 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 8d923ed5f..ac98a5c91 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -1,20 +1,11 @@ /* - Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ @@ -23,7 +14,7 @@ #include "config.h" #endif -#define RPC_CLNT_DEFAULT_REQUEST_COUNT 4096 +#define RPC_CLNT_DEFAULT_REQUEST_COUNT 512 #include "rpc-clnt.h" #include "byte-order.h" @@ -32,6 +23,7 @@ #include "protocol-common.h" #include "mem-pool.h" #include "xdr-rpc.h" +#include "rpc-common-xdr.h" void rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); @@ -69,6 +61,21 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout, return bailout_frame; } +static int +_is_lock_fop (struct saved_frame *sframe) +{ + int fop = 0; + + if (SFRAME_GET_PROGNUM (sframe) == GLUSTER_FOP_PROGRAM && + SFRAME_GET_PROGVER (sframe) == GLUSTER_FOP_VERSION) + fop = SFRAME_GET_PROCNUM (sframe); + + return ((fop == GFS3_OP_LK) || + (fop == GFS3_OP_INODELK) || + (fop == GFS3_OP_FINODELK) || + (fop == GFS3_OP_ENTRYLK) || + (fop == GFS3_OP_FENTRYLK)); +} struct saved_frame * __saved_frames_put (struct saved_frames *frames, void *frame, @@ -78,7 +85,6 @@ __saved_frames_put (struct saved_frames *frames, void *frame, saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool); if (!saved_frame) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } /* THIS should be saved and set back */ @@ -91,7 +97,11 @@ __saved_frames_put (struct saved_frames *frames, void *frame, saved_frame->rpcreq = rpcreq; gettimeofday (&saved_frame->saved_at, NULL); - list_add_tail (&saved_frame->list, &frames->sf.list); + if (_is_lock_fop (saved_frame)) + list_add_tail (&saved_frame->list, &frames->lk_sf.list); + else + list_add_tail (&saved_frame->list, &frames->sf.list); + frames->count++; out: @@ -103,9 +113,8 @@ void saved_frames_delete (struct saved_frame *saved_frame, rpc_clnt_connection_t *conn) { - if (!saved_frame || !conn) { - goto out; - } + GF_VALIDATE_OR_GOTO ("rpc-clnt", saved_frame, out); + GF_VALIDATE_OR_GOTO ("rpc-clnt", conn, out); pthread_mutex_lock (&conn->lock); { @@ -119,7 +128,7 @@ saved_frames_delete (struct saved_frame *saved_frame, conn->rpc_clnt->reqpool); } - mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame); + mem_put (saved_frame); out: return; } @@ -135,9 +144,8 @@ call_bail (void *data) struct saved_frame *saved_frame = NULL; struct saved_frame *trav = NULL; struct saved_frame *tmp = NULL; - struct tm frame_sent_tm; - char frame_sent[32] = {0,}; - struct timeval timeout = {0,}; + char frame_sent[256] = {0,}; + struct timespec timeout = {0,}; struct iovec iov = {0,}; GF_VALIDATE_OR_GOTO ("client", data, out); @@ -155,7 +163,7 @@ call_bail (void *data) call-once timer */ if (conn->timer) { timeout.tv_sec = 10; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; gf_timer_call_cancel (clnt->ctx, conn->timer); conn->timer = gf_timer_call_after (clnt->ctx, @@ -164,8 +172,9 @@ call_bail (void *data) (void *) clnt); if (conn->timer == NULL) { - gf_log (conn->trans->name, GF_LOG_DEBUG, - "Cannot create bailout timer"); + gf_log (conn->trans->name, GF_LOG_WARNING, + "Cannot create bailout timer for %s", + conn->trans->peerinfo.identifier); } } @@ -182,25 +191,30 @@ call_bail (void *data) pthread_mutex_unlock (&conn->lock); list_for_each_entry_safe (trav, tmp, &list, list) { - localtime_r (&trav->saved_at.tv_sec, &frame_sent_tm); - strftime (frame_sent, 32, "%Y-%m-%d %H:%M:%S", &frame_sent_tm); + gf_time_fmt (frame_sent, sizeof frame_sent, + trav->saved_at.tv_sec, gf_timefmt_FT); + snprintf (frame_sent + strlen (frame_sent), + 256 - strlen (frame_sent), + ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); gf_log (conn->trans->name, GF_LOG_ERROR, - "bailing out frame type(%s) op(%s(%d)) sent = %s. " - "timeout = %d", + "bailing out frame type(%s) op(%s(%d)) xid = 0x%x " + "sent = %s. timeout = %d for %s", trav->rpcreq->prog->progname, (trav->rpcreq->prog->procnames) ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : "--", - trav->rpcreq->procnum, frame_sent, - conn->frame_timeout); + trav->rpcreq->procnum, trav->rpcreq->xid, frame_sent, + conn->frame_timeout, conn->trans->peerinfo.identifier); + clnt = rpc_clnt_ref (clnt); trav->rpcreq->rpc_status = -1; trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool); + clnt = rpc_clnt_unref (clnt); list_del_init (&trav->list); - mem_put (conn->rpc_clnt->saved_frames_pool, trav); + mem_put (trav); } out: return; @@ -213,7 +227,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, struct rpc_req *rpcreq) { rpc_clnt_connection_t *conn = NULL; - struct timeval timeout = {0, }; + struct timespec timeout = {0, }; struct saved_frame *saved_frame = NULL; conn = &rpc_clnt->conn; @@ -227,7 +241,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, /* TODO: make timeout configurable */ if (conn->timer == NULL) { timeout.tv_sec = 10; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; conn->timer = gf_timer_call_after (rpc_clnt->ctx, timeout, call_bail, @@ -247,11 +261,11 @@ saved_frames_new (void) saved_frames = GF_CALLOC (1, sizeof (*saved_frames), gf_common_mt_rpcclnt_savedframe_t); if (!saved_frames) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); return NULL; } INIT_LIST_HEAD (&saved_frames->sf.list); + INIT_LIST_HEAD (&saved_frames->lk_sf.list); return saved_frames; } @@ -273,7 +287,15 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid, if (tmp->rpcreq->xid == callid) { *saved_frame = *tmp; ret = 0; - break; + goto out; + } + } + + list_for_each_entry (tmp, &frames->lk_sf.list, list) { + if (tmp->rpcreq->xid == callid) { + *saved_frame = *tmp; + ret = 0; + goto out; } } @@ -293,10 +315,20 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid) list_del_init (&tmp->list); frames->count--; saved_frame = tmp; - break; + goto out; } } + list_for_each_entry (tmp, &frames->lk_sf.list, list) { + if (tmp->rpcreq->xid == callid) { + list_del_init (&tmp->list); + frames->count--; + saved_frame = tmp; + goto out; + } + } + +out: if (saved_frame) { THIS = saved_frame->capital_this; } @@ -304,42 +336,47 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid) return saved_frame; } + void saved_frames_unwind (struct saved_frames *saved_frames) { struct saved_frame *trav = NULL; struct saved_frame *tmp = NULL; - struct tm *frame_sent_tm = NULL; - char timestr[256] = {0,}; - + char timestr[1024] = {0,}; struct iovec iov = {0,}; + list_splice_init (&saved_frames->lk_sf.list, &saved_frames->sf.list); + list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) { - frame_sent_tm = localtime (&trav->saved_at.tv_sec); - strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S", - frame_sent_tm); + gf_time_fmt (timestr, sizeof timestr, + trav->saved_at.tv_sec, gf_timefmt_FT); snprintf (timestr + strlen (timestr), sizeof(timestr) - strlen (timestr), ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); - gf_log ("rpc-clnt", GF_LOG_ERROR, - "forced unwinding frame type(%s) op(%s(%d)) " - "called at %s", - trav->rpcreq->prog->progname, - (trav->rpcreq->prog->procnames) ? - trav->rpcreq->prog->procnames[trav->rpcreq->procnum] - : "--", - trav->rpcreq->procnum, timestr); - + if (!trav->rpcreq || !trav->rpcreq->prog) + continue; + + gf_log_callingfn (trav->rpcreq->conn->trans->name, + GF_LOG_ERROR, + "forced unwinding frame type(%s) op(%s(%d)) " + "called at %s (xid=0x%x)", + trav->rpcreq->prog->progname, + ((trav->rpcreq->prog->procnames) ? + trav->rpcreq->prog->procnames[trav->rpcreq->procnum] + : "--"), + trav->rpcreq->procnum, timestr, + trav->rpcreq->xid); saved_frames->count--; trav->rpcreq->rpc_status = -1; trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); + rpc_clnt_reply_deinit (trav->rpcreq, trav->rpcreq->conn->rpc_clnt->reqpool); list_del_init (&trav->list); - mem_put (trav->rpcreq->conn->rpc_clnt->saved_frames_pool, trav); + mem_put (trav); } } @@ -361,7 +398,7 @@ rpc_clnt_reconnect (void *trans_ptr) { rpc_transport_t *trans = NULL; rpc_clnt_connection_t *conn = NULL; - struct timeval tv = {0, 0}; + struct timespec ts = {0, 0}; int32_t ret = 0; struct rpc_clnt *clnt = NULL; @@ -380,14 +417,15 @@ rpc_clnt_reconnect (void *trans_ptr) conn->reconnect = 0; if (conn->connected == 0) { - tv.tv_sec = 3; + ts.tv_sec = 3; + ts.tv_nsec = 0; gf_log (trans->name, GF_LOG_TRACE, "attempting reconnect"); - ret = rpc_transport_connect (trans, conn->config.remote_port); - + ret = rpc_transport_connect (trans, + conn->config.remote_port); conn->reconnect = - gf_timer_call_after (clnt->ctx, tv, + gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, trans); } else { @@ -408,7 +446,7 @@ rpc_clnt_reconnect (void *trans_ptr) int rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) { - struct saved_frame saved_frame = {{}, 0}; + struct saved_frame saved_frame; int ret = -1; pthread_mutex_lock (&clnt->conn.lock); @@ -419,10 +457,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) pthread_mutex_unlock (&clnt->conn.lock); if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the saved " - "frame corresponding to xid (%d) for msg arrived on " - "transport %s", - info->xid, clnt->conn.trans->name); + gf_log (clnt->conn.trans->name, GF_LOG_CRITICAL, + "cannot lookup the saved " + "frame corresponding to xid (%d)", info->xid); goto out; } @@ -457,6 +494,7 @@ rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn) } } + pthread_mutex_unlock (&conn->lock); out: return 0; @@ -479,7 +517,7 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) clnt = conn->rpc_clnt; - gf_log ("rpc-clnt", GF_LOG_DEBUG, + gf_log (conn->trans->name, GF_LOG_TRACE, "cleaning up state in transport object %p", conn->trans); pthread_mutex_lock (&conn->lock); @@ -494,6 +532,12 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) } conn->connected = 0; + + if (conn->ping_timer) { + gf_timer_call_cancel (clnt->ctx, conn->ping_timer); + conn->ping_timer = NULL; + conn->ping_started = 0; + } } pthread_mutex_unlock (&conn->lock); @@ -584,7 +628,7 @@ rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool) iobref_unref (req->rsp_iobref); } - mem_put (pool, req); + mem_put (req); out: return; } @@ -607,7 +651,8 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg, req->verf.authdata); if (ret != 0) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC reply decoding failed"); + gf_log (conn->trans->name, GF_LOG_WARNING, + "RPC reply decoding failed"); goto out; } @@ -617,34 +662,13 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, goto out; } - gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %d Program: %s," - " ProgVers: %d, Proc: %d", saved_frame->rpcreq->xid, + gf_log (conn->trans->name, GF_LOG_TRACE, + "received rpc message (RPC XID: 0x%x" + " Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)", + saved_frame->rpcreq->xid, saved_frame->rpcreq->prog->progname, saved_frame->rpcreq->prog->progver, - saved_frame->rpcreq->procnum); -/* TODO: */ - /* TODO: AUTH */ - /* The verifier that is sent in a reply is a string that can be used as - * a shorthand in credentials for future transactions. We can opt not to - * use this shorthand, preffering to use the original AUTH_UNIX method - * for authentication (containing all the details for authentication in - * credential itself). Hence it is not mandatory for us to be checking - * the verifier. See Appendix A of rfc-5531 for more details. - */ - - /* - * ret = rpc_authenticate (req); - * if (ret == RPC_AUTH_REJECT) { - * gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed authentication"); - * ret = -1; - * goto out; - * } - */ - - /* If the error is not RPC_MISMATCH, we consider the call as accepted - * since we are not handling authentication failures for now. - */ - req->rpc_status = 0; + saved_frame->rpcreq->procnum, conn->trans->name); out: if (ret != 0) { @@ -669,32 +693,45 @@ rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) msgbuf = msg->vector[0].iov_base; msglen = msg->vector[0].iov_len; + clnt = rpc_clnt_ref (clnt); ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL); if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC call decoding failed"); + gf_log (clnt->conn.trans->name, GF_LOG_WARNING, + "RPC call decoding failed"); goto out; } - gf_log ("rpc-clnt", GF_LOG_INFO, "RPC XID: %lx, Ver: %ld, Program: %ld," - " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg), + gf_log (clnt->conn.trans->name, GF_LOG_TRACE, + "received rpc message (XID: 0x%lx, " + "Ver: %ld, Program: %ld, ProgVers: %ld, Proc: %ld) " + "from rpc-transport (%s)", rpc_call_xid (&rpcmsg), rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), - rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg)); + rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg), + clnt->conn.trans->name); procnum = rpc_call_progproc (&rpcmsg); - list_for_each_entry (program, &clnt->programs, program) { - if ((program->prognum == rpc_call_program (&rpcmsg)) - && (program->progver == rpc_call_progver (&rpcmsg))) { - found = 1; - break; + pthread_mutex_lock (&clnt->lock); + { + list_for_each_entry (program, &clnt->programs, program) { + if ((program->prognum == rpc_call_program (&rpcmsg)) + && (program->progver + == rpc_call_progver (&rpcmsg))) { + found = 1; + break; + } } } + pthread_mutex_unlock (&clnt->lock); + if (found && (procnum < program->numactors) && (program->actors[procnum].actor)) { - program->actors[procnum].actor (&progmsg); + program->actors[procnum].actor (clnt, program->mydata, + &progmsg); } out: + clnt = rpc_clnt_unref (clnt); return ret; } @@ -707,41 +744,44 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) struct rpc_req *req = NULL; uint32_t xid = 0; + clnt = rpc_clnt_ref (clnt); conn = &clnt->conn; xid = ntoh32 (*((uint32_t *)pollin->vector[0].iov_base)); saved_frame = lookup_frame (conn, xid); if (saved_frame == NULL) { - gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the " - "saved frame for reply with xid (%d)", xid); + gf_log (conn->trans->name, GF_LOG_ERROR, + "cannot lookup the saved frame for reply with xid (%u)", + xid); goto out; } req = saved_frame->rpcreq; if (req == NULL) { - gf_log ("rpc-clnt", GF_LOG_CRITICAL, - "saved_frame for reply with xid (%d)", xid); + gf_log (conn->trans->name, GF_LOG_ERROR, + "no request with frame for xid (%u)", xid); goto out; } - + ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame); if (ret != 0) { req->rpc_status = -1; - gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply " - "failed"); + gf_log (conn->trans->name, GF_LOG_WARNING, + "initialising rpc reply failed"); } req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame); - + if (req) { rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool); } out: if (saved_frame) { - mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame); + mem_put (saved_frame); } + clnt = rpc_clnt_unref (clnt); return ret; } @@ -781,36 +821,42 @@ out: return; } +static void +rpc_clnt_destroy (struct rpc_clnt *rpc); int rpc_clnt_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...) { - rpc_clnt_connection_t *conn = NULL; - struct rpc_clnt *clnt = NULL; - int ret = -1; - rpc_request_info_t *req_info = NULL; - rpc_transport_pollin_t *pollin = NULL; - struct timeval tv = {0, }; + rpc_clnt_connection_t *conn = NULL; + struct rpc_clnt *clnt = NULL; + int ret = -1; + rpc_request_info_t *req_info = NULL; + rpc_transport_pollin_t *pollin = NULL; + struct timespec ts = {0, }; conn = mydata; if (conn == NULL) { goto out; } clnt = conn->rpc_clnt; + if (!clnt) + goto out; switch (event) { case RPC_TRANSPORT_DISCONNECT: { - rpc_clnt_connection_cleanup (&clnt->conn); + rpc_clnt_connection_cleanup (conn); pthread_mutex_lock (&conn->lock); { - if (conn->reconnect == NULL) { - tv.tv_sec = 10; + if (!conn->rpc_clnt->disabled + && (conn->reconnect == NULL)) { + ts.tv_sec = 10; + ts.tv_nsec = 0; conn->reconnect = - gf_timer_call_after (clnt->ctx, tv, + gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, conn->trans); } @@ -818,15 +864,13 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, pthread_mutex_unlock (&conn->lock); if (clnt->notifyfn) - ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, - NULL); + ret = clnt->notifyfn (clnt, clnt->mydata, + RPC_CLNT_DISCONNECT, NULL); break; } case RPC_TRANSPORT_CLEANUP: - /* this event should not be received on a client for, a - * transport is only disconnected, but never destroyed. - */ + rpc_clnt_destroy (clnt); ret = 0; break; @@ -839,6 +883,12 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, case RPC_TRANSPORT_MSG_RECEIVED: { + pthread_mutex_lock (&conn->lock); + { + gettimeofday (&conn->last_received, NULL); + } + pthread_mutex_unlock (&conn->lock); + pollin = data; if (pollin->is_reply) ret = rpc_clnt_handle_reply (clnt, pollin); @@ -864,8 +914,17 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, case RPC_TRANSPORT_CONNECT: { + /* Every time there is a disconnection, processes + should try to connect to 'glusterd' (ie, default + port) or whichever port given as 'option remote-port' + in volume file. */ + /* Below code makes sure the (re-)configured port lasts + for just one successful attempt */ + conn->config.remote_port = 0; + if (clnt->notifyfn) - ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_CONNECT, NULL); + ret = clnt->notifyfn (clnt, clnt->mydata, + RPC_CLNT_CONNECT, NULL); break; } @@ -889,7 +948,7 @@ rpc_clnt_connection_deinit (rpc_clnt_connection_t *conn) } -inline int +static inline int rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, dict_t *options, char *name) { @@ -902,7 +961,7 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, ret = dict_get_int32 (options, "frame-timeout", &conn->frame_timeout); if (ret >= 0) { - gf_log (name, GF_LOG_DEBUG, + gf_log (name, GF_LOG_INFO, "setting frame-timeout to %d", conn->frame_timeout); } else { gf_log (name, GF_LOG_DEBUG, @@ -912,8 +971,9 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, conn->trans = rpc_transport_load (ctx, options, name); if (!conn->trans) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "loading of new rpc-transport" + gf_log (name, GF_LOG_WARNING, "loading of new rpc-transport" " failed"); + ret = -1; goto out; } @@ -924,7 +984,7 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, ret = rpc_transport_register_notify (conn->trans, rpc_clnt_notify, conn); if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "registering notify failed"); + gf_log (name, GF_LOG_WARNING, "registering notify failed"); rpc_clnt_connection_cleanup (conn); conn = NULL; goto out; @@ -932,39 +992,37 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, conn->saved_frames = saved_frames_new (); if (!conn->saved_frames) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "creation of saved_frames " + gf_log (name, GF_LOG_WARNING, "creation of saved_frames " "failed"); rpc_clnt_connection_cleanup (conn); goto out; } - rpc_clnt_reconnect (conn->trans); - ret = 0; out: return ret; } - struct rpc_clnt * -rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, - glusterfs_ctx_t *ctx, char *name) +rpc_clnt_new (dict_t *options, glusterfs_ctx_t *ctx, char *name, + uint32_t reqpool_size) { int ret = -1; struct rpc_clnt *rpc = NULL; rpc = GF_CALLOC (1, sizeof (*rpc), gf_common_mt_rpcclnt_t); if (!rpc) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } pthread_mutex_init (&rpc->lock, NULL); rpc->ctx = ctx; - rpc->reqpool = mem_pool_new (struct rpc_req, - RPC_CLNT_DEFAULT_REQUEST_COUNT); + if (!reqpool_size) + reqpool_size = RPC_CLNT_DEFAULT_REQUEST_COUNT; + + rpc->reqpool = mem_pool_new (struct rpc_req, reqpool_size); if (rpc->reqpool == NULL) { pthread_mutex_destroy (&rpc->lock); GF_FREE (rpc); @@ -973,7 +1031,7 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, } rpc->saved_frames_pool = mem_pool_new (struct saved_frame, - RPC_CLNT_DEFAULT_REQUEST_COUNT); + reqpool_size); if (rpc->saved_frames_pool == NULL) { pthread_mutex_destroy (&rpc->lock); mem_pool_destroy (rpc->reqpool); @@ -985,11 +1043,18 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, ret = rpc_clnt_connection_init (rpc, ctx, options, name); if (ret == -1) { pthread_mutex_destroy (&rpc->lock); + mem_pool_destroy (rpc->reqpool); + mem_pool_destroy (rpc->saved_frames_pool); GF_FREE (rpc); rpc = NULL; + if (options) + dict_unref (options); goto out; } + rpc->auth_null = dict_get_str_boolean (options, "auth-null", 0); + + rpc = rpc_clnt_ref (rpc); INIT_LIST_HEAD (&rpc->programs); out: @@ -998,6 +1063,22 @@ out: int +rpc_clnt_start (struct rpc_clnt *rpc) +{ + struct rpc_clnt_connection *conn = NULL; + + if (!rpc) + return -1; + + conn = &rpc->conn; + + rpc_clnt_reconnect (conn->trans); + + return 0; +} + + +int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, void *mydata) { @@ -1008,7 +1089,7 @@ rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, } ssize_t -xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms *au) +xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms_v2 *au) { ssize_t ret = -1; XDR xdr; @@ -1016,10 +1097,11 @@ xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms *au) if ((!dest) || (!au)) return -1; - xdrmem_create (&xdr, dest, 1024, - XDR_ENCODE); + xdrmem_create (&xdr, dest, GF_MAX_AUTH_BYTES, XDR_ENCODE); - if (!xdr_auth_glusterfs_parms (&xdr, au)) { + if (!xdr_auth_glusterfs_parms_v2 (&xdr, au)) { + gf_log (THIS->name, GF_LOG_WARNING, + "failed to encode auth glusterfs elements"); ret = -1; goto ret; } @@ -1032,12 +1114,11 @@ ret: int -rpc_clnt_fill_request (int prognum, int progver, int procnum, int payload, - uint64_t xid, struct auth_glusterfs_parms *au, - struct rpc_msg *request) +rpc_clnt_fill_request (int prognum, int progver, int procnum, + uint64_t xid, struct auth_glusterfs_parms_v2 *au, + struct rpc_msg *request, char *auth_data) { int ret = -1; - char dest[1024] = {0,}; if (!request) { goto out; @@ -1053,19 +1134,26 @@ rpc_clnt_fill_request (int prognum, int progver, int procnum, int payload, request->rm_call.cb_vers = progver; request->rm_call.cb_proc = procnum; - /* TODO: Using AUTH_GLUSTERFS for time-being. Make it modular in - * future so it is easy to plug-in new authentication schemes. + /* TODO: Using AUTH_(GLUSTERFS/NULL) in a kludgy way for time-being. + * Make it modular in future so it is easy to plug-in new + * authentication schemes. */ - ret = xdr_serialize_glusterfs_auth (dest, au); - if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot encode credentials"); - goto out; - } - - request->rm_call.cb_cred.oa_flavor = AUTH_GLUSTERFS; - request->rm_call.cb_cred.oa_base = dest; - request->rm_call.cb_cred.oa_length = ret; + if (auth_data) { + ret = xdr_serialize_glusterfs_auth (auth_data, au); + if (ret == -1) { + gf_log ("rpc-clnt", GF_LOG_DEBUG, + "cannot encode credentials"); + goto out; + } + request->rm_call.cb_cred.oa_flavor = AUTH_GLUSTERFS_v2; + request->rm_call.cb_cred.oa_base = auth_data; + request->rm_call.cb_cred.oa_length = ret; + } else { + request->rm_call.cb_cred.oa_flavor = AUTH_NULL; + request->rm_call.cb_cred.oa_base = NULL; + request->rm_call.cb_cred.oa_length = 0; + } request->rm_call.cb_verf.oa_flavor = AUTH_NONE; request->rm_call.cb_verf.oa_base = NULL; request->rm_call.cb_verf.oa_length = 0; @@ -1113,50 +1201,57 @@ out: struct iobuf * rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver, - int procnum, size_t payload, uint64_t xid, - struct auth_glusterfs_parms *au, struct iovec *recbuf) + int procnum, size_t hdrsize, uint64_t xid, + struct auth_glusterfs_parms_v2 *au, + struct iovec *recbuf) { - struct rpc_msg request = {0, }; - struct iobuf *request_iob = NULL; - char *record = NULL; - struct iovec recordhdr = {0, }; - size_t pagesize = 0; - int ret = -1; + struct rpc_msg request = {0, }; + struct iobuf *request_iob = NULL; + char *record = NULL; + struct iovec recordhdr = {0, }; + size_t pagesize = 0; + int ret = -1; + size_t xdr_size = 0; + char auth_data[GF_MAX_AUTH_BYTES] = {0, }; if ((!clnt) || (!recbuf) || (!au)) { goto out; } + /* Fill the rpc structure and XDR it into the buffer got above. */ + if (clnt->auth_null) + ret = rpc_clnt_fill_request (prognum, progver, procnum, + xid, NULL, &request, NULL); + else + ret = rpc_clnt_fill_request (prognum, progver, procnum, + xid, au, &request, auth_data); + + if (ret == -1) { + gf_log (clnt->conn.trans->name, GF_LOG_WARNING, + "cannot build a rpc-request xid (%"PRIu64")", xid); + goto out; + } + + xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request); + /* First, try to get a pointer into the buffer which the RPC * layer can use. */ - request_iob = iobuf_get (clnt->ctx->iobuf_pool); + request_iob = iobuf_get2 (clnt->ctx->iobuf_pool, (xdr_size + hdrsize)); if (!request_iob) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to get iobuf"); goto out; } - pagesize = ((struct iobuf_pool *)clnt->ctx->iobuf_pool)->page_size; + pagesize = iobuf_pagesize (request_iob); record = iobuf_ptr (request_iob); /* Now we have it. */ - /* Fill the rpc structure and XDR it into the buffer got above. */ - ret = rpc_clnt_fill_request (prognum, progver, procnum, payload, xid, - au, &request); - if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build a rpc-request " - "xid (%"PRIu64")", xid); - goto out; - } - recordhdr = rpc_clnt_record_build_header (record, pagesize, &request, - payload); - - //GF_FREE (request.rm_call.cb_cred.oa_base); + hdrsize); if (!recordhdr.iov_base) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record " - " header"); + gf_log (clnt->conn.trans->name, GF_LOG_ERROR, + "Failed to build record header"); iobuf_unref (request_iob); request_iob = NULL; recbuf->iov_base = NULL; @@ -1173,43 +1268,50 @@ out: struct iobuf * rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame, - rpc_clnt_prog_t *prog,int procnum, size_t payload_len, + rpc_clnt_prog_t *prog, int procnum, size_t hdrlen, struct iovec *rpchdr, uint64_t callid) { - struct auth_glusterfs_parms au = {0, }; - struct iobuf *request_iob = NULL; + struct auth_glusterfs_parms_v2 au = {0, }; + struct iobuf *request_iob = NULL; + char owner[4] = {0,}; if (!prog || !rpchdr || !call_frame) { goto out; } - au.pid = call_frame->root->pid; - au.uid = call_frame->root->uid; - au.gid = call_frame->root->gid; - au.ngrps = call_frame->root->ngrps; - au.lk_owner = call_frame->root->lk_owner; - if (!au.lk_owner) - au.lk_owner = au.pid; + au.pid = call_frame->root->pid; + au.uid = call_frame->root->uid; + au.gid = call_frame->root->gid; + au.groups.groups_len = call_frame->root->ngrps; + au.lk_owner.lk_owner_len = call_frame->root->lk_owner.len; - gf_log ("", GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d" - ", gid: %d, owner: %"PRId64, - au.pid, au.uid, au.gid, au.lk_owner); + if (au.groups.groups_len) + au.groups.groups_val = call_frame->root->groups; - memcpy (au.groups, call_frame->root->groups, 16); + if (call_frame->root->lk_owner.len) + au.lk_owner.lk_owner_val = call_frame->root->lk_owner.data; + else { + owner[0] = (char)(au.pid & 0xff); + owner[1] = (char)((au.pid >> 8) & 0xff); + owner[2] = (char)((au.pid >> 16) & 0xff); + owner[3] = (char)((au.pid >> 24) & 0xff); - //rpc_transport_get_myname (clnt->conn.trans, myname, UNIX_PATH_MAX); - //au.aup_machname = myname; + au.lk_owner.lk_owner_val = owner; + au.lk_owner.lk_owner_len = 4; + } + + gf_log (clnt->conn.trans->name, GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d" + ", gid: %d, owner: %s", au.pid, au.uid, au.gid, + lkowner_utoa (&call_frame->root->lk_owner)); - /* Assuming the client program would like to speak to the same versioned - * program on server. - */ request_iob = rpc_clnt_record_build_record (clnt, prog->prognum, prog->progver, - procnum, payload_len, + procnum, hdrlen, callid, &au, rpchdr); if (!request_iob) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build rpc-record"); + gf_log (clnt->conn.trans->name, GF_LOG_WARNING, + "cannot build rpc-record"); goto out; } @@ -1219,9 +1321,11 @@ out: int rpcclnt_cbk_program_register (struct rpc_clnt *clnt, - rpcclnt_cb_program_t *program) + rpcclnt_cb_program_t *program, void *mydata) { - int ret = -1; + int ret = -1; + char already_registered = 0; + rpcclnt_cb_program_t *tmp = NULL; if (!clnt) goto out; @@ -1229,18 +1333,52 @@ rpcclnt_cbk_program_register (struct rpc_clnt *clnt, if (program->actors == NULL) goto out; - INIT_LIST_HEAD (&program->program); + pthread_mutex_lock (&clnt->lock); + { + list_for_each_entry (tmp, &clnt->programs, program) { + if ((program->prognum == tmp->prognum) + && (program->progver == tmp->progver)) { + already_registered = 1; + break; + } + } + } + pthread_mutex_unlock (&clnt->lock); - list_add_tail (&program->program, &clnt->programs); + if (already_registered) { + gf_log_callingfn (clnt->conn.trans->name, GF_LOG_DEBUG, + "already registered"); + ret = 0; + goto out; + } + + tmp = GF_CALLOC (1, sizeof (*tmp), + gf_common_mt_rpcclnt_cb_program_t); + if (tmp == NULL) { + goto out; + } + + memcpy (tmp, program, sizeof (*tmp)); + INIT_LIST_HEAD (&tmp->program); + + tmp->mydata = mydata; + + pthread_mutex_lock (&clnt->lock); + { + list_add_tail (&tmp->program, &clnt->programs); + } + pthread_mutex_unlock (&clnt->lock); ret = 0; - gf_log ("rpc-clnt", GF_LOG_DEBUG, "New program registered: %s, Num: %d," - " Ver: %d", program->progname, program->prognum, + gf_log (clnt->conn.trans->name, GF_LOG_DEBUG, + "New program registered: %s, Num: %d, Ver: %d", + program->progname, program->prognum, program->progver); out: if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "Program registration failed:" + gf_log (clnt->conn.trans->name, GF_LOG_ERROR, + "Program registration failed:" " %s, Num: %d, Ver: %d", program->progname, program->prognum, program->progver); } @@ -1272,9 +1410,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, goto out; } + conn = &rpc->conn; + + if (conn->trans == NULL) { + goto out; + } + rpcreq = mem_get (rpc->reqpool); if (rpcreq == NULL) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } @@ -1284,7 +1427,6 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, if (!iobref) { iobref = iobref_new (); if (!iobref) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } @@ -1293,72 +1435,73 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, callid = rpc_clnt_new_callid (rpc); - conn = &rpc->conn; - rpcreq->prog = prog; rpcreq->procnum = procnum; rpcreq->conn = conn; rpcreq->xid = callid; rpcreq->cbkfn = cbkfn; - pthread_mutex_lock (&conn->lock); - { - if (conn->connected == 0) { - rpc_transport_connect (conn->trans, - conn->config.remote_port); - } + ret = -1; - ret = -1; + if (proghdr) { + proglen += iov_length (proghdr, proghdrcount); + } - if (proghdr) { - proglen += iov_length (proghdr, proghdrcount); - } + request_iob = rpc_clnt_record (rpc, frame, prog, + procnum, proglen, + &rpchdr, callid); + if (!request_iob) { + gf_log (conn->trans->name, GF_LOG_WARNING, + "cannot build rpc-record"); + goto out; + } - if (progpayload) { - proglen += iov_length (progpayload, - progpayloadcount); - } + iobref_add (iobref, request_iob); - request_iob = rpc_clnt_record (rpc, frame, prog, - procnum, proglen, - &rpchdr, callid); - if (!request_iob) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, - "cannot build rpc-record"); - goto unlock; - } + req.msg.rpchdr = &rpchdr; + req.msg.rpchdrcount = 1; + req.msg.proghdr = proghdr; + req.msg.proghdrcount = proghdrcount; + req.msg.progpayload = progpayload; + req.msg.progpayloadcount = progpayloadcount; + req.msg.iobref = iobref; - iobref_add (iobref, request_iob); + req.rsp.rsphdr = rsphdr; + req.rsp.rsphdr_count = rsphdr_count; + req.rsp.rsp_payload = rsp_payload; + req.rsp.rsp_payload_count = rsp_payload_count; + req.rsp.rsp_iobref = rsp_iobref; + req.rpc_req = rpcreq; - req.msg.rpchdr = &rpchdr; - req.msg.rpchdrcount = 1; - req.msg.proghdr = proghdr; - req.msg.proghdrcount = proghdrcount; - req.msg.progpayload = progpayload; - req.msg.progpayloadcount = progpayloadcount; - req.msg.iobref = iobref; - - req.rsp.rsphdr = rsphdr; - req.rsp.rsphdr_count = rsphdr_count; - req.rsp.rsp_payload = rsp_payload; - req.rsp.rsp_payload_count = rsp_payload_count; - req.rsp.rsp_iobref = rsp_iobref; - req.rpc_req = rpcreq; + pthread_mutex_lock (&conn->lock); + { + if (conn->connected == 0) { + ret = rpc_transport_connect (conn->trans, + conn->config.remote_port); + } ret = rpc_transport_submit_request (rpc->conn.trans, &req); if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, - "transmission of rpc-request failed"); + gf_log (conn->trans->name, GF_LOG_WARNING, + "failed to submit rpc-request " + "(XID: 0x%x Program: %s, ProgVers: %d, " + "Proc: %d) to rpc-transport (%s)", rpcreq->xid, + rpcreq->prog->progname, rpcreq->prog->progver, + rpcreq->procnum, rpc->conn.trans->name); } if ((ret >= 0) && frame) { - gettimeofday (&conn->last_sent, NULL); /* Save the frame in queue */ __save_frame (rpc, frame, rpcreq); + + gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request " + "(XID: 0x%x Program: %s, ProgVers: %d, " + "Proc: %d) to rpc-transport (%s)", rpcreq->xid, + rpcreq->prog->progname, rpcreq->prog->progver, + rpcreq->procnum, rpc->conn.trans->name); } } -unlock: pthread_mutex_unlock (&conn->lock); if (ret == -1) { @@ -1368,45 +1511,190 @@ unlock: ret = 0; out: - iobuf_unref (request_iob); + if (request_iob) { + iobuf_unref (request_iob); + } if (new_iobref && iobref) { iobref_unref (iobref); } if (frame && (ret == -1)) { - rpcreq->rpc_status = -1; - cbkfn (rpcreq, NULL, 0, frame); - mem_put (rpc->reqpool, rpcreq); + if (rpcreq) { + rpcreq->rpc_status = -1; + cbkfn (rpcreq, NULL, 0, frame); + mem_put (rpcreq); + } } return ret; } -void +struct rpc_clnt * +rpc_clnt_ref (struct rpc_clnt *rpc) +{ + if (!rpc) + return NULL; + pthread_mutex_lock (&rpc->lock); + { + rpc->refcount++; + } + pthread_mutex_unlock (&rpc->lock); + return rpc; +} + + +static void +rpc_clnt_trigger_destroy (struct rpc_clnt *rpc) +{ + if (!rpc) + return; + + rpc_clnt_disable (rpc); + rpc_transport_unref (rpc->conn.trans); +} + +static void rpc_clnt_destroy (struct rpc_clnt *rpc) { - rpc_clnt_connection_cleanup (&rpc->conn); - rpc_clnt_reconnect_cleanup (&rpc->conn); + if (!rpc) + return; + + saved_frames_destroy (rpc->conn.saved_frames); pthread_mutex_destroy (&rpc->lock); pthread_mutex_destroy (&rpc->conn.lock); + + /* mem-pool should be destroyed, otherwise, + it will cause huge memory leaks */ + mem_pool_destroy (rpc->reqpool); + mem_pool_destroy (rpc->saved_frames_pool); + GF_FREE (rpc); return; } +struct rpc_clnt * +rpc_clnt_unref (struct rpc_clnt *rpc) +{ + int count = 0; + + if (!rpc) + return NULL; + pthread_mutex_lock (&rpc->lock); + { + count = --rpc->refcount; + } + pthread_mutex_unlock (&rpc->lock); + if (!count) { + rpc_clnt_trigger_destroy (rpc); + return NULL; + } + return rpc; +} + + +char +rpc_clnt_is_disabled (struct rpc_clnt *rpc) +{ + + rpc_clnt_connection_t *conn = NULL; + char disabled = 0; + + if (!rpc) { + goto out; + } + + conn = &rpc->conn; + + pthread_mutex_lock (&conn->lock); + { + disabled = rpc->disabled; + } + pthread_mutex_unlock (&conn->lock); + +out: + return disabled; +} + +void +rpc_clnt_disable (struct rpc_clnt *rpc) +{ + rpc_clnt_connection_t *conn = NULL; + + if (!rpc) { + goto out; + } + + conn = &rpc->conn; + + pthread_mutex_lock (&conn->lock); + { + rpc->disabled = 1; + + if (conn->timer) { + gf_timer_call_cancel (rpc->ctx, conn->timer); + conn->timer = NULL; + } + + if (conn->reconnect) { + gf_timer_call_cancel (rpc->ctx, conn->reconnect); + conn->reconnect = NULL; + } + conn->connected = 0; + + if (conn->ping_timer) { + gf_timer_call_cancel (rpc->ctx, conn->ping_timer); + conn->ping_timer = NULL; + conn->ping_started = 0; + } + + } + pthread_mutex_unlock (&conn->lock); + + rpc_transport_disconnect (rpc->conn.trans); + +out: + return; +} + void rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config) { - if (config->rpc_timeout) + if (config->rpc_timeout) { + if (config->rpc_timeout != rpc->conn.config.rpc_timeout) + gf_log (rpc->conn.trans->name, GF_LOG_INFO, + "changing timeout to %d (from %d)", + config->rpc_timeout, + rpc->conn.config.rpc_timeout); rpc->conn.config.rpc_timeout = config->rpc_timeout; + } + + if (config->remote_port) { + if (config->remote_port != rpc->conn.config.remote_port) + gf_log (rpc->conn.trans->name, GF_LOG_INFO, + "changing port to %d (from %d)", + config->remote_port, + rpc->conn.config.remote_port); - if (config->remote_port) rpc->conn.config.remote_port = config->remote_port; + } if (config->remote_host) { - if (rpc->conn.config.remote_host) + if (rpc->conn.config.remote_host) { + if (strcmp (rpc->conn.config.remote_host, + config->remote_host)) + gf_log (rpc->conn.trans->name, GF_LOG_INFO, + "changing hostname to %s (from %s)", + config->remote_host, + rpc->conn.config.remote_host); FREE (rpc->conn.config.remote_host); + } else { + gf_log (rpc->conn.trans->name, GF_LOG_INFO, + "setting hostname to %s", + config->remote_host); + } + rpc->conn.config.remote_host = gf_strdup (config->remote_host); } } |
