diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 1107 |
1 files changed, 764 insertions, 343 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 2375cc958..ac98a5c91 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -1,20 +1,11 @@ /* - Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ @@ -23,10 +14,19 @@ #include "config.h" #endif +#define RPC_CLNT_DEFAULT_REQUEST_COUNT 512 + #include "rpc-clnt.h" +#include "byte-order.h" #include "xdr-rpcclnt.h" #include "rpc-transport.h" #include "protocol-common.h" +#include "mem-pool.h" +#include "xdr-rpc.h" +#include "rpc-common-xdr.h" + +void +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); uint64_t rpc_clnt_new_callid (struct rpc_clnt *clnt) @@ -61,33 +61,47 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout, return bailout_frame; } +static int +_is_lock_fop (struct saved_frame *sframe) +{ + int fop = 0; + + if (SFRAME_GET_PROGNUM (sframe) == GLUSTER_FOP_PROGRAM && + SFRAME_GET_PROGVER (sframe) == GLUSTER_FOP_VERSION) + fop = SFRAME_GET_PROCNUM (sframe); + + return ((fop == GFS3_OP_LK) || + (fop == GFS3_OP_INODELK) || + (fop == GFS3_OP_FINODELK) || + (fop == GFS3_OP_ENTRYLK) || + (fop == GFS3_OP_FENTRYLK)); +} struct saved_frame * -__saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum, - rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid) +__saved_frames_put (struct saved_frames *frames, void *frame, + struct rpc_req *rpcreq) { struct saved_frame *saved_frame = NULL; - saved_frame = GF_CALLOC (1, sizeof (*saved_frame), - gf_common_mt_rpcclnt_savedframe_t); + saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool); if (!saved_frame) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } /* THIS should be saved and set back */ + memset (saved_frame, 0, sizeof (*saved_frame)); INIT_LIST_HEAD (&saved_frame->list); saved_frame->capital_this = THIS; saved_frame->frame = frame; - saved_frame->procnum = procnum; - saved_frame->callid = callid; - saved_frame->prog = prog; - saved_frame->cbkfn = cbk; - + saved_frame->rpcreq = rpcreq; gettimeofday (&saved_frame->saved_at, NULL); - list_add_tail (&saved_frame->list, &frames->sf.list); + if (_is_lock_fop (saved_frame)) + list_add_tail (&saved_frame->list, &frames->lk_sf.list); + else + list_add_tail (&saved_frame->list, &frames->sf.list); + frames->count++; out: @@ -99,9 +113,8 @@ void saved_frames_delete (struct saved_frame *saved_frame, rpc_clnt_connection_t *conn) { - if (!saved_frame || !conn) { - goto out; - } + GF_VALIDATE_OR_GOTO ("rpc-clnt", saved_frame, out); + GF_VALIDATE_OR_GOTO ("rpc-clnt", conn, out); pthread_mutex_lock (&conn->lock); { @@ -110,7 +123,12 @@ saved_frames_delete (struct saved_frame *saved_frame, } pthread_mutex_unlock (&conn->lock); - GF_FREE (saved_frame); + if (saved_frame->rpcreq != NULL) { + rpc_clnt_reply_deinit (saved_frame->rpcreq, + conn->rpc_clnt->reqpool); + } + + mem_put (saved_frame); out: return; } @@ -126,11 +144,8 @@ call_bail (void *data) struct saved_frame *saved_frame = NULL; struct saved_frame *trav = NULL; struct saved_frame *tmp = NULL; - struct tm frame_sent_tm; - char frame_sent[32] = {0,}; - struct timeval timeout = {0,}; - gf_timer_cbk_t timer_cbk = NULL; - struct rpc_req req; + char frame_sent[256] = {0,}; + struct timespec timeout = {0,}; struct iovec iov = {0,}; GF_VALIDATE_OR_GOTO ("client", data, out); @@ -147,10 +162,8 @@ call_bail (void *data) /* Chaining to get call-always functionality from call-once timer */ if (conn->timer) { - timer_cbk = conn->timer->callbk; - timeout.tv_sec = 10; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; gf_timer_call_cancel (clnt->ctx, conn->timer); conn->timer = gf_timer_call_after (clnt->ctx, @@ -159,8 +172,9 @@ call_bail (void *data) (void *) clnt); if (conn->timer == NULL) { - gf_log (conn->trans->name, GF_LOG_DEBUG, - "Cannot create bailout timer"); + gf_log (conn->trans->name, GF_LOG_WARNING, + "Cannot create bailout timer for %s", + conn->trans->peerinfo.identifier); } } @@ -177,21 +191,30 @@ call_bail (void *data) pthread_mutex_unlock (&conn->lock); list_for_each_entry_safe (trav, tmp, &list, list) { - localtime_r (&trav->saved_at.tv_sec, &frame_sent_tm); - strftime (frame_sent, 32, "%Y-%m-%d %H:%M:%S", &frame_sent_tm); + gf_time_fmt (frame_sent, sizeof frame_sent, + trav->saved_at.tv_sec, gf_timefmt_FT); + snprintf (frame_sent + strlen (frame_sent), + 256 - strlen (frame_sent), + ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); gf_log (conn->trans->name, GF_LOG_ERROR, - "bailing out frame type(%s) op(%s(%d)) sent = %s. " - "timeout = %d", - trav->prog->progname, (trav->prog->procnames) ? - trav->prog->procnames[trav->procnum] : "--", - trav->procnum, frame_sent, - conn->frame_timeout); - - trav->cbkfn (&req, &iov, 1, trav->frame); - + "bailing out frame type(%s) op(%s(%d)) xid = 0x%x " + "sent = %s. timeout = %d for %s", + trav->rpcreq->prog->progname, + (trav->rpcreq->prog->procnames) ? + trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : + "--", + trav->rpcreq->procnum, trav->rpcreq->xid, frame_sent, + conn->frame_timeout, conn->trans->peerinfo.identifier); + + clnt = rpc_clnt_ref (clnt); + trav->rpcreq->rpc_status = -1; + trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); + + rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool); + clnt = rpc_clnt_unref (clnt); list_del_init (&trav->list); - GF_FREE (trav); + mem_put (trav); } out: return; @@ -200,17 +223,17 @@ out: /* to be called with conn->lock held */ struct saved_frame * -__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, - rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid) +__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, + struct rpc_req *rpcreq) { rpc_clnt_connection_t *conn = NULL; - struct timeval timeout = {0, }; + struct timespec timeout = {0, }; struct saved_frame *saved_frame = NULL; conn = &rpc_clnt->conn; - saved_frame = __saved_frames_put (conn->saved_frames, frame, - procnum, prog, cbk, callid); + saved_frame = __saved_frames_put (conn->saved_frames, frame, rpcreq); + if (saved_frame == NULL) { goto out; } @@ -218,7 +241,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, /* TODO: make timeout configurable */ if (conn->timer == NULL) { timeout.tv_sec = 10; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; conn->timer = gf_timer_call_after (rpc_clnt->ctx, timeout, call_bail, @@ -238,11 +261,11 @@ saved_frames_new (void) saved_frames = GF_CALLOC (1, sizeof (*saved_frames), gf_common_mt_rpcclnt_savedframe_t); if (!saved_frames) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); return NULL; } INIT_LIST_HEAD (&saved_frames->sf.list); + INIT_LIST_HEAD (&saved_frames->lk_sf.list); return saved_frames; } @@ -261,10 +284,18 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid, } list_for_each_entry (tmp, &frames->sf.list, list) { - if (tmp->callid == callid) { + if (tmp->rpcreq->xid == callid) { *saved_frame = *tmp; ret = 0; - break; + goto out; + } + } + + list_for_each_entry (tmp, &frames->lk_sf.list, list) { + if (tmp->rpcreq->xid == callid) { + *saved_frame = *tmp; + ret = 0; + goto out; } } @@ -280,14 +311,24 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid) struct saved_frame *tmp = NULL; list_for_each_entry (tmp, &frames->sf.list, list) { - if (tmp->callid == callid) { + if (tmp->rpcreq->xid == callid) { list_del_init (&tmp->list); frames->count--; saved_frame = tmp; - break; + goto out; } } + list_for_each_entry (tmp, &frames->lk_sf.list, list) { + if (tmp->rpcreq->xid == callid) { + list_del_init (&tmp->list); + frames->count--; + saved_frame = tmp; + goto out; + } + } + +out: if (saved_frame) { THIS = saved_frame->capital_this; } @@ -295,32 +336,47 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid) return saved_frame; } + void saved_frames_unwind (struct saved_frames *saved_frames) { struct saved_frame *trav = NULL; struct saved_frame *tmp = NULL; - - struct rpc_req req; + char timestr[1024] = {0,}; struct iovec iov = {0,}; - memset (&req, 0, sizeof (req)); - - req.rpc_status = -1; + list_splice_init (&saved_frames->lk_sf.list, &saved_frames->sf.list); list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) { - gf_log ("rpc-clnt", GF_LOG_ERROR, - "forced unwinding frame type(%s) op(%s(%d))", - trav->prog->progname, (trav->prog->procnames) ? - trav->prog->procnames[trav->procnum] : "--", - trav->procnum); - + gf_time_fmt (timestr, sizeof timestr, + trav->saved_at.tv_sec, gf_timefmt_FT); + snprintf (timestr + strlen (timestr), + sizeof(timestr) - strlen (timestr), + ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); + + if (!trav->rpcreq || !trav->rpcreq->prog) + continue; + + gf_log_callingfn (trav->rpcreq->conn->trans->name, + GF_LOG_ERROR, + "forced unwinding frame type(%s) op(%s(%d)) " + "called at %s (xid=0x%x)", + trav->rpcreq->prog->progname, + ((trav->rpcreq->prog->procnames) ? + trav->rpcreq->prog->procnames[trav->rpcreq->procnum] + : "--"), + trav->rpcreq->procnum, timestr, + trav->rpcreq->xid); saved_frames->count--; - trav->cbkfn (&req, &iov, 1, trav->frame); + trav->rpcreq->rpc_status = -1; + trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame); + + rpc_clnt_reply_deinit (trav->rpcreq, + trav->rpcreq->conn->rpc_clnt->reqpool); list_del_init (&trav->list); - GF_FREE (trav); + mem_put (trav); } } @@ -328,6 +384,9 @@ saved_frames_unwind (struct saved_frames *saved_frames) void saved_frames_destroy (struct saved_frames *frames) { + if (!frames) + return; + saved_frames_unwind (frames); GF_FREE (frames); @@ -339,7 +398,7 @@ rpc_clnt_reconnect (void *trans_ptr) { rpc_transport_t *trans = NULL; rpc_clnt_connection_t *conn = NULL; - struct timeval tv = {0, 0}; + struct timespec ts = {0, 0}; int32_t ret = 0; struct rpc_clnt *clnt = NULL; @@ -358,14 +417,15 @@ rpc_clnt_reconnect (void *trans_ptr) conn->reconnect = 0; if (conn->connected == 0) { - tv.tv_sec = 3; + ts.tv_sec = 3; + ts.tv_nsec = 0; gf_log (trans->name, GF_LOG_TRACE, "attempting reconnect"); - ret = rpc_transport_connect (trans); - + ret = rpc_transport_connect (trans, + conn->config.remote_port); conn->reconnect = - gf_timer_call_after (clnt->ctx, tv, + gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, trans); } else { @@ -386,7 +446,7 @@ rpc_clnt_reconnect (void *trans_ptr) int rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) { - struct saved_frame saved_frame = {{}, 0}; + struct saved_frame saved_frame; int ret = -1; pthread_mutex_lock (&clnt->conn.lock); @@ -397,14 +457,16 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) pthread_mutex_unlock (&clnt->conn.lock); if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the saved " + gf_log (clnt->conn.trans->name, GF_LOG_CRITICAL, + "cannot lookup the saved " "frame corresponding to xid (%d)", info->xid); goto out; } - info->prognum = saved_frame.prog->prognum; - info->procnum = saved_frame.procnum; - info->progver = saved_frame.prog->progver; + info->prognum = saved_frame.rpcreq->prog->prognum; + info->procnum = saved_frame.rpcreq->procnum; + info->progver = saved_frame.rpcreq->prog->progver; + info->rpc_req = saved_frame.rpcreq; info->rsp = saved_frame.rsp; ret = 0; @@ -412,6 +474,31 @@ out: return ret; } +int +rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn) +{ + struct rpc_clnt *clnt = NULL; + + if (!conn) { + goto out; + } + + clnt = conn->rpc_clnt; + + pthread_mutex_lock (&conn->lock); + { + + if (conn->reconnect) { + gf_timer_call_cancel (clnt->ctx, conn->reconnect); + conn->reconnect = NULL; + } + + } + pthread_mutex_unlock (&conn->lock); + +out: + return 0; +} /* * client_protocol_cleanup - cleanup function @@ -430,7 +517,7 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) clnt = conn->rpc_clnt; - gf_log ("rpc-clnt", GF_LOG_DEBUG, + gf_log (conn->trans->name, GF_LOG_TRACE, "cleaning up state in transport object %p", conn->trans); pthread_mutex_lock (&conn->lock); @@ -444,11 +531,13 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) conn->timer = NULL; } - if (conn->reconnect == NULL) { - /* :O This part is empty.. any thing missing? */ - } - conn->connected = 0; + + if (conn->ping_timer) { + gf_timer_call_cancel (clnt->ctx, conn->ping_timer); + conn->ping_timer = NULL; + conn->ping_started = 0; + } } pthread_mutex_unlock (&conn->lock); @@ -485,9 +574,10 @@ int rpc_clnt_reply_fill (rpc_transport_pollin_t *msg, rpc_clnt_connection_t *conn, struct rpc_msg *replymsg, struct iovec progmsg, - struct rpc_req *req, struct saved_frame *saved_frame) + struct rpc_req *req, + struct saved_frame *saved_frame) { - int ret = -1; + int ret = -1; if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) { goto out; @@ -499,25 +589,14 @@ rpc_clnt_reply_fill (rpc_transport_pollin_t *msg, req->rpc_status = -1; } - req->xid = rpc_reply_xid (replymsg); - req->prog = saved_frame->prog; - req->procnum = saved_frame->procnum; - req->conn = conn; - req->rsp[0] = progmsg; + req->rsp_iobref = iobref_ref (msg->iobref); if (msg->vectored) { - req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2); - req->rsp[1].iov_len = msg->data.vector.size2; - + req->rsp[1] = msg->vector[1]; req->rspcnt = 2; - - req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1); - req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2); } else { req->rspcnt = 1; - - req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf); } /* By this time, the data bytes for the auth scheme would have already @@ -539,20 +618,17 @@ out: void -rpc_clnt_reply_deinit (struct rpc_req *req) +rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool) { if (!req) { goto out; } - if (req->rsp_prochdr) { - iobuf_unref (req->rsp_prochdr); - } - - if (req->rsp_procpayload) { - iobuf_unref (req->rsp_procpayload); + if (req->rsp_iobref) { + iobref_unref (req->rsp_iobref); } + mem_put (req); out: return; } @@ -569,18 +645,14 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, size_t msglen = 0; int ret = -1; - if (msg->vectored) { - msgbuf = iobuf_ptr (msg->data.vector.iobuf1); - msglen = msg->data.vector.size1; - } else { - msgbuf = iobuf_ptr (msg->data.simple.iobuf); - msglen = msg->data.simple.size; - } + msgbuf = msg->vector[0].iov_base; + msglen = msg->vector[0].iov_len; ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg, req->verf.authdata); if (ret != 0) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC reply decoding failed"); + gf_log (conn->trans->name, GF_LOG_WARNING, + "RPC reply decoding failed"); goto out; } @@ -590,33 +662,13 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, goto out; } - gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s," - " ProgVers: %d, Proc: %d", saved_frame->callid, - saved_frame->prog->progname, saved_frame->prog->progver, - saved_frame->procnum); -/* TODO: */ - /* TODO: AUTH */ - /* The verifier that is sent in a reply is a string that can be used as - * a shorthand in credentials for future transactions. We can opt not to - * use this shorthand, preffering to use the original AUTH_UNIX method - * for authentication (containing all the details for authentication in - * credential itself). Hence it is not mandatory for us to be checking - * the verifier. See Appendix A of rfc-5531 for more details. - */ - - /* - * ret = rpc_authenticate (req); - * if (ret == RPC_AUTH_REJECT) { - * gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed authentication"); - * ret = -1; - * goto out; - * } - */ - - /* If the error is not RPC_MISMATCH, we consider the call as accepted - * since we are not handling authentication failures for now. - */ - req->rpc_status = 0; + gf_log (conn->trans->name, GF_LOG_TRACE, + "received rpc message (RPC XID: 0x%x" + " Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)", + saved_frame->rpcreq->xid, + saved_frame->rpcreq->prog->progname, + saved_frame->rpcreq->prog->progver, + saved_frame->rpcreq->procnum, conn->trans->name); out: if (ret != 0) { @@ -626,51 +678,110 @@ out: return ret; } +int +rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) +{ + char *msgbuf = NULL; + rpcclnt_cb_program_t *program = NULL; + struct rpc_msg rpcmsg; + struct iovec progmsg; /* RPC Program payload */ + size_t msglen = 0; + int found = 0; + int ret = -1; + int procnum = 0; + + msgbuf = msg->vector[0].iov_base; + msglen = msg->vector[0].iov_len; + + clnt = rpc_clnt_ref (clnt); + ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL); + if (ret == -1) { + gf_log (clnt->conn.trans->name, GF_LOG_WARNING, + "RPC call decoding failed"); + goto out; + } + + gf_log (clnt->conn.trans->name, GF_LOG_TRACE, + "received rpc message (XID: 0x%lx, " + "Ver: %ld, Program: %ld, ProgVers: %ld, Proc: %ld) " + "from rpc-transport (%s)", rpc_call_xid (&rpcmsg), + rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), + rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg), + clnt->conn.trans->name); + + procnum = rpc_call_progproc (&rpcmsg); + + pthread_mutex_lock (&clnt->lock); + { + list_for_each_entry (program, &clnt->programs, program) { + if ((program->prognum == rpc_call_program (&rpcmsg)) + && (program->progver + == rpc_call_progver (&rpcmsg))) { + found = 1; + break; + } + } + } + pthread_mutex_unlock (&clnt->lock); + + if (found && (procnum < program->numactors) && + (program->actors[procnum].actor)) { + program->actors[procnum].actor (clnt, program->mydata, + &progmsg); + } + +out: + clnt = rpc_clnt_unref (clnt); + return ret; +} int rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) { rpc_clnt_connection_t *conn = NULL; struct saved_frame *saved_frame = NULL; - rpc_request_info_t *request_info = NULL; int ret = -1; - struct rpc_req req = {0, }; + struct rpc_req *req = NULL; + uint32_t xid = 0; + clnt = rpc_clnt_ref (clnt); conn = &clnt->conn; - request_info = pollin->private; - - saved_frame = lookup_frame (conn, (int64_t)request_info->xid); + xid = ntoh32 (*((uint32_t *)pollin->vector[0].iov_base)); + saved_frame = lookup_frame (conn, xid); if (saved_frame == NULL) { - gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the " - "saved frame for reply with xid (%d), " - "prog-version (%d), prog-num (%d)," - "procnum (%d)", request_info->xid, - request_info->progver, request_info->prognum, - request_info->procnum); + gf_log (conn->trans->name, GF_LOG_ERROR, + "cannot lookup the saved frame for reply with xid (%u)", + xid); + goto out; + } + + req = saved_frame->rpcreq; + if (req == NULL) { + gf_log (conn->trans->name, GF_LOG_ERROR, + "no request with frame for xid (%u)", xid); goto out; } - ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame); + ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame); if (ret != 0) { - req.rpc_status = -1; - gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply " - "failed"); + req->rpc_status = -1; + gf_log (conn->trans->name, GF_LOG_WARNING, + "initialising rpc reply failed"); } - saved_frame->cbkfn (&req, req.rsp, req.rspcnt, saved_frame->frame); + req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame); - if (ret == 0) { - rpc_clnt_reply_deinit (&req); + if (req) { + rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool); } - - ret = 0; out: if (saved_frame) { - GF_FREE (saved_frame); + mem_put (saved_frame); } + clnt = rpc_clnt_unref (clnt); return ret; } @@ -710,36 +821,42 @@ out: return; } +static void +rpc_clnt_destroy (struct rpc_clnt *rpc); int rpc_clnt_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...) { - rpc_clnt_connection_t *conn = NULL; - struct rpc_clnt *clnt = NULL; - int ret = -1; - rpc_request_info_t *req_info = NULL; - rpc_transport_pollin_t *pollin = NULL; - struct timeval tv = {0, }; + rpc_clnt_connection_t *conn = NULL; + struct rpc_clnt *clnt = NULL; + int ret = -1; + rpc_request_info_t *req_info = NULL; + rpc_transport_pollin_t *pollin = NULL; + struct timespec ts = {0, }; conn = mydata; if (conn == NULL) { goto out; } clnt = conn->rpc_clnt; + if (!clnt) + goto out; switch (event) { case RPC_TRANSPORT_DISCONNECT: { - rpc_clnt_connection_cleanup (&clnt->conn); + rpc_clnt_connection_cleanup (conn); pthread_mutex_lock (&conn->lock); { - if (conn->reconnect == NULL) { - tv.tv_sec = 10; + if (!conn->rpc_clnt->disabled + && (conn->reconnect == NULL)) { + ts.tv_sec = 10; + ts.tv_nsec = 0; conn->reconnect = - gf_timer_call_after (clnt->ctx, tv, + gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, conn->trans); } @@ -747,15 +864,13 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, pthread_mutex_unlock (&conn->lock); if (clnt->notifyfn) - ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, - NULL); + ret = clnt->notifyfn (clnt, clnt->mydata, + RPC_CLNT_DISCONNECT, NULL); break; } case RPC_TRANSPORT_CLEANUP: - /* this event should not be received on a client for, a - * transport is only disconnected, but never destroyed. - */ + rpc_clnt_destroy (clnt); ret = 0; break; @@ -768,8 +883,17 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, case RPC_TRANSPORT_MSG_RECEIVED: { + pthread_mutex_lock (&conn->lock); + { + gettimeofday (&conn->last_received, NULL); + } + pthread_mutex_unlock (&conn->lock); + pollin = data; - ret = rpc_clnt_handle_reply (clnt, pollin); + if (pollin->is_reply) + ret = rpc_clnt_handle_reply (clnt, pollin); + else + ret = rpc_clnt_handle_cbk (clnt, pollin); /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG, * data); */ @@ -790,8 +914,17 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, case RPC_TRANSPORT_CONNECT: { + /* Every time there is a disconnection, processes + should try to connect to 'glusterd' (ie, default + port) or whichever port given as 'option remote-port' + in volume file. */ + /* Below code makes sure the (re-)configured port lasts + for just one successful attempt */ + conn->config.remote_port = 0; + if (clnt->notifyfn) - ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_CONNECT, NULL); + ret = clnt->notifyfn (clnt, clnt->mydata, + RPC_CLNT_CONNECT, NULL); break; } @@ -815,7 +948,7 @@ rpc_clnt_connection_deinit (rpc_clnt_connection_t *conn) } -inline int +static inline int rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, dict_t *options, char *name) { @@ -828,7 +961,7 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, ret = dict_get_int32 (options, "frame-timeout", &conn->frame_timeout); if (ret >= 0) { - gf_log (name, GF_LOG_DEBUG, + gf_log (name, GF_LOG_INFO, "setting frame-timeout to %d", conn->frame_timeout); } else { gf_log (name, GF_LOG_DEBUG, @@ -838,8 +971,9 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, conn->trans = rpc_transport_load (ctx, options, name); if (!conn->trans) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "loading of new rpc-transport" + gf_log (name, GF_LOG_WARNING, "loading of new rpc-transport" " failed"); + ret = -1; goto out; } @@ -850,7 +984,7 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, ret = rpc_transport_register_notify (conn->trans, rpc_clnt_notify, conn); if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "registering notify failed"); + gf_log (name, GF_LOG_WARNING, "registering notify failed"); rpc_clnt_connection_cleanup (conn); conn = NULL; goto out; @@ -858,50 +992,93 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, conn->saved_frames = saved_frames_new (); if (!conn->saved_frames) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "creation of saved_frames " + gf_log (name, GF_LOG_WARNING, "creation of saved_frames " "failed"); rpc_clnt_connection_cleanup (conn); goto out; } - rpc_clnt_reconnect (conn->trans); - ret = 0; out: return ret; } - struct rpc_clnt * -rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, - glusterfs_ctx_t *ctx, char *name) +rpc_clnt_new (dict_t *options, glusterfs_ctx_t *ctx, char *name, + uint32_t reqpool_size) { int ret = -1; struct rpc_clnt *rpc = NULL; rpc = GF_CALLOC (1, sizeof (*rpc), gf_common_mt_rpcclnt_t); if (!rpc) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } pthread_mutex_init (&rpc->lock, NULL); rpc->ctx = ctx; + if (!reqpool_size) + reqpool_size = RPC_CLNT_DEFAULT_REQUEST_COUNT; + + rpc->reqpool = mem_pool_new (struct rpc_req, reqpool_size); + if (rpc->reqpool == NULL) { + pthread_mutex_destroy (&rpc->lock); + GF_FREE (rpc); + rpc = NULL; + goto out; + } + + rpc->saved_frames_pool = mem_pool_new (struct saved_frame, + reqpool_size); + if (rpc->saved_frames_pool == NULL) { + pthread_mutex_destroy (&rpc->lock); + mem_pool_destroy (rpc->reqpool); + GF_FREE (rpc); + rpc = NULL; + goto out; + } + ret = rpc_clnt_connection_init (rpc, ctx, options, name); if (ret == -1) { pthread_mutex_destroy (&rpc->lock); + mem_pool_destroy (rpc->reqpool); + mem_pool_destroy (rpc->saved_frames_pool); GF_FREE (rpc); rpc = NULL; + if (options) + dict_unref (options); goto out; } + + rpc->auth_null = dict_get_str_boolean (options, "auth-null", 0); + + rpc = rpc_clnt_ref (rpc); + INIT_LIST_HEAD (&rpc->programs); + out: return rpc; } int +rpc_clnt_start (struct rpc_clnt *rpc) +{ + struct rpc_clnt_connection *conn = NULL; + + if (!rpc) + return -1; + + conn = &rpc->conn; + + rpc_clnt_reconnect (conn->trans); + + return 0; +} + + +int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, void *mydata) { @@ -912,7 +1089,7 @@ rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, } ssize_t -xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms *au) +xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms_v2 *au) { ssize_t ret = -1; XDR xdr; @@ -920,10 +1097,11 @@ xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms *au) if ((!dest) || (!au)) return -1; - xdrmem_create (&xdr, dest, 1024, - XDR_ENCODE); + xdrmem_create (&xdr, dest, GF_MAX_AUTH_BYTES, XDR_ENCODE); - if (!xdr_auth_glusterfs_parms (&xdr, au)) { + if (!xdr_auth_glusterfs_parms_v2 (&xdr, au)) { + gf_log (THIS->name, GF_LOG_WARNING, + "failed to encode auth glusterfs elements"); ret = -1; goto ret; } @@ -936,12 +1114,11 @@ ret: int -rpc_clnt_fill_request (int prognum, int progver, int procnum, int payload, - uint64_t xid, struct auth_glusterfs_parms *au, - struct rpc_msg *request) +rpc_clnt_fill_request (int prognum, int progver, int procnum, + uint64_t xid, struct auth_glusterfs_parms_v2 *au, + struct rpc_msg *request, char *auth_data) { int ret = -1; - char dest[1024] = {0,}; if (!request) { goto out; @@ -957,19 +1134,26 @@ rpc_clnt_fill_request (int prognum, int progver, int procnum, int payload, request->rm_call.cb_vers = progver; request->rm_call.cb_proc = procnum; - /* TODO: Using AUTH_GLUSTERFS for time-being. Make it modular in - * future so it is easy to plug-in new authentication schemes. + /* TODO: Using AUTH_(GLUSTERFS/NULL) in a kludgy way for time-being. + * Make it modular in future so it is easy to plug-in new + * authentication schemes. */ - ret = xdr_serialize_glusterfs_auth (dest, au); - if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot encode credentials"); - goto out; - } - - request->rm_call.cb_cred.oa_flavor = AUTH_GLUSTERFS; - request->rm_call.cb_cred.oa_base = dest; - request->rm_call.cb_cred.oa_length = ret; + if (auth_data) { + ret = xdr_serialize_glusterfs_auth (auth_data, au); + if (ret == -1) { + gf_log ("rpc-clnt", GF_LOG_DEBUG, + "cannot encode credentials"); + goto out; + } + request->rm_call.cb_cred.oa_flavor = AUTH_GLUSTERFS_v2; + request->rm_call.cb_cred.oa_base = auth_data; + request->rm_call.cb_cred.oa_length = ret; + } else { + request->rm_call.cb_cred.oa_flavor = AUTH_NULL; + request->rm_call.cb_cred.oa_base = NULL; + request->rm_call.cb_cred.oa_length = 0; + } request->rm_call.cb_verf.oa_flavor = AUTH_NONE; request->rm_call.cb_verf.oa_base = NULL; request->rm_call.cb_verf.oa_length = 0; @@ -980,42 +1164,16 @@ out: } -void -rpc_clnt_set_lastfrag (uint32_t *fragsize) { - (*fragsize) |= 0x80000000U; -} - - -void -rpc_clnt_set_frag_header_size (uint32_t size, char *haddr) -{ - size = htonl (size); - memcpy (haddr, &size, sizeof (size)); -} - - -void -rpc_clnt_set_last_frag_header_size (uint32_t size, char *haddr) -{ - rpc_clnt_set_lastfrag (&size); - rpc_clnt_set_frag_header_size (size, haddr); -} - - struct iovec rpc_clnt_record_build_header (char *recordstart, size_t rlen, struct rpc_msg *request, size_t payload) { struct iovec requesthdr = {0, }; struct iovec txrecord = {0, 0}; - size_t fraglen = 0; int ret = -1; + size_t fraglen = 0; - /* After leaving aside the 4 bytes for the fragment header, lets - * encode the RPC reply structure into the buffer given to us. - */ - ret = rpc_request_to_xdr (request, (recordstart + RPC_FRAGHDR_SIZE), - rlen, &requesthdr); + ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr); if (ret == -1) { gf_log ("rpc-clnt", GF_LOG_DEBUG, "Failed to create RPC request"); @@ -1026,16 +1184,7 @@ rpc_clnt_record_build_header (char *recordstart, size_t rlen, gf_log ("rpc-clnt", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, " "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len); - /* Since we're not spreading RPC records over mutiple fragments - * we just set this fragment as the first and last fragment for this - * record. - */ - rpc_clnt_set_last_frag_header_size (fraglen, recordstart); - /* Even though the RPC record starts at recordstart+RPCSVC_FRAGHDR_SIZE - * we need to transmit the record with the fragment header, which starts - * at recordstart. - */ txrecord.iov_base = recordstart; /* Remember, this is only the vec for the RPC header and does not @@ -1043,7 +1192,7 @@ rpc_clnt_record_build_header (char *recordstart, size_t rlen, * the size of the full fragment. This size is sent in the fragment * header. */ - txrecord.iov_len = RPC_FRAGHDR_SIZE + requesthdr.iov_len; + txrecord.iov_len = requesthdr.iov_len; out: return txrecord; @@ -1052,50 +1201,57 @@ out: struct iobuf * rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver, - int procnum, size_t payload, uint64_t xid, - struct auth_glusterfs_parms *au, struct iovec *recbuf) + int procnum, size_t hdrsize, uint64_t xid, + struct auth_glusterfs_parms_v2 *au, + struct iovec *recbuf) { - struct rpc_msg request = {0, }; - struct iobuf *request_iob = NULL; - char *record = NULL; - struct iovec recordhdr = {0, }; - size_t pagesize = 0; - int ret = -1; + struct rpc_msg request = {0, }; + struct iobuf *request_iob = NULL; + char *record = NULL; + struct iovec recordhdr = {0, }; + size_t pagesize = 0; + int ret = -1; + size_t xdr_size = 0; + char auth_data[GF_MAX_AUTH_BYTES] = {0, }; if ((!clnt) || (!recbuf) || (!au)) { goto out; } + /* Fill the rpc structure and XDR it into the buffer got above. */ + if (clnt->auth_null) + ret = rpc_clnt_fill_request (prognum, progver, procnum, + xid, NULL, &request, NULL); + else + ret = rpc_clnt_fill_request (prognum, progver, procnum, + xid, au, &request, auth_data); + + if (ret == -1) { + gf_log (clnt->conn.trans->name, GF_LOG_WARNING, + "cannot build a rpc-request xid (%"PRIu64")", xid); + goto out; + } + + xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request); + /* First, try to get a pointer into the buffer which the RPC * layer can use. */ - request_iob = iobuf_get (clnt->ctx->iobuf_pool); + request_iob = iobuf_get2 (clnt->ctx->iobuf_pool, (xdr_size + hdrsize)); if (!request_iob) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to get iobuf"); goto out; } - pagesize = ((struct iobuf_pool *)clnt->ctx->iobuf_pool)->page_size; + pagesize = iobuf_pagesize (request_iob); record = iobuf_ptr (request_iob); /* Now we have it. */ - /* Fill the rpc structure and XDR it into the buffer got above. */ - ret = rpc_clnt_fill_request (prognum, progver, procnum, payload, xid, - au, &request); - if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build a rpc-request " - "xid (%"PRIu64")", xid); - goto out; - } - recordhdr = rpc_clnt_record_build_header (record, pagesize, &request, - payload); - - //GF_FREE (request.rm_call.cb_cred.oa_base); + hdrsize); if (!recordhdr.iov_base) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record " - " header"); + gf_log (clnt->conn.trans->name, GF_LOG_ERROR, + "Failed to build record header"); iobuf_unref (request_iob); request_iob = NULL; recbuf->iov_base = NULL; @@ -1112,43 +1268,50 @@ out: struct iobuf * rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame, - rpc_clnt_prog_t *prog,int procnum, size_t payload_len, + rpc_clnt_prog_t *prog, int procnum, size_t hdrlen, struct iovec *rpchdr, uint64_t callid) { - struct auth_glusterfs_parms au = {0, }; - struct iobuf *request_iob = NULL; + struct auth_glusterfs_parms_v2 au = {0, }; + struct iobuf *request_iob = NULL; + char owner[4] = {0,}; if (!prog || !rpchdr || !call_frame) { goto out; } - au.pid = call_frame->root->pid; - au.uid = call_frame->root->uid; - au.gid = call_frame->root->gid; - au.ngrps = call_frame->root->ngrps; - au.lk_owner = call_frame->root->lk_owner; - if (!au.lk_owner) - au.lk_owner = au.pid; + au.pid = call_frame->root->pid; + au.uid = call_frame->root->uid; + au.gid = call_frame->root->gid; + au.groups.groups_len = call_frame->root->ngrps; + au.lk_owner.lk_owner_len = call_frame->root->lk_owner.len; - gf_log ("", GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d" - ", gid: %d, owner: %"PRId64, - au.pid, au.uid, au.gid, au.lk_owner); + if (au.groups.groups_len) + au.groups.groups_val = call_frame->root->groups; - memcpy (au.groups, call_frame->root->groups, 16); + if (call_frame->root->lk_owner.len) + au.lk_owner.lk_owner_val = call_frame->root->lk_owner.data; + else { + owner[0] = (char)(au.pid & 0xff); + owner[1] = (char)((au.pid >> 8) & 0xff); + owner[2] = (char)((au.pid >> 16) & 0xff); + owner[3] = (char)((au.pid >> 24) & 0xff); - //rpc_transport_get_myname (clnt->conn.trans, myname, UNIX_PATH_MAX); - //au.aup_machname = myname; + au.lk_owner.lk_owner_val = owner; + au.lk_owner.lk_owner_len = 4; + } + + gf_log (clnt->conn.trans->name, GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d" + ", gid: %d, owner: %s", au.pid, au.uid, au.gid, + lkowner_utoa (&call_frame->root->lk_owner)); - /* Assuming the client program would like to speak to the same versioned - * program on server. - */ request_iob = rpc_clnt_record_build_record (clnt, prog->prognum, prog->progver, - procnum, payload_len, + procnum, hdrlen, callid, &au, rpchdr); if (!request_iob) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build rpc-record"); + gf_log (clnt->conn.trans->name, GF_LOG_WARNING, + "cannot build rpc-record"); goto out; } @@ -1156,18 +1319,87 @@ out: return request_iob; } +int +rpcclnt_cbk_program_register (struct rpc_clnt *clnt, + rpcclnt_cb_program_t *program, void *mydata) +{ + int ret = -1; + char already_registered = 0; + rpcclnt_cb_program_t *tmp = NULL; + + if (!clnt) + goto out; + + if (program->actors == NULL) + goto out; + + pthread_mutex_lock (&clnt->lock); + { + list_for_each_entry (tmp, &clnt->programs, program) { + if ((program->prognum == tmp->prognum) + && (program->progver == tmp->progver)) { + already_registered = 1; + break; + } + } + } + pthread_mutex_unlock (&clnt->lock); + + if (already_registered) { + gf_log_callingfn (clnt->conn.trans->name, GF_LOG_DEBUG, + "already registered"); + ret = 0; + goto out; + } + + tmp = GF_CALLOC (1, sizeof (*tmp), + gf_common_mt_rpcclnt_cb_program_t); + if (tmp == NULL) { + goto out; + } + + memcpy (tmp, program, sizeof (*tmp)); + INIT_LIST_HEAD (&tmp->program); + + tmp->mydata = mydata; + + pthread_mutex_lock (&clnt->lock); + { + list_add_tail (&tmp->program, &clnt->programs); + } + pthread_mutex_unlock (&clnt->lock); + + ret = 0; + gf_log (clnt->conn.trans->name, GF_LOG_DEBUG, + "New program registered: %s, Num: %d, Ver: %d", + program->progname, program->prognum, + program->progver); + +out: + if (ret == -1) { + gf_log (clnt->conn.trans->name, GF_LOG_ERROR, + "Program registration failed:" + " %s, Num: %d, Ver: %d", program->progname, + program->prognum, program->progver); + } + + return ret; +} + int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum, fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount, struct iovec *progpayload, int progpayloadcount, - struct iobref *iobref, void *frame) + struct iobref *iobref, void *frame, struct iovec *rsphdr, + int rsphdr_count, struct iovec *rsp_payload, + int rsp_payload_count, struct iobref *rsp_iobref) { rpc_clnt_connection_t *conn = NULL; struct iobuf *request_iob = NULL; struct iovec rpchdr = {0,}; - struct rpc_req rpcreq = {0,}; + struct rpc_req *rpcreq = NULL; rpc_transport_req_t req; int ret = -1; int proglen = 0; @@ -1178,12 +1410,23 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, goto out; } + conn = &rpc->conn; + + if (conn->trans == NULL) { + goto out; + } + + rpcreq = mem_get (rpc->reqpool); + if (rpcreq == NULL) { + goto out; + } + + memset (rpcreq, 0, sizeof (*rpcreq)); memset (&req, 0, sizeof (req)); if (!iobref) { iobref = iobref_new (); if (!iobref) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } @@ -1192,59 +1435,73 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, callid = rpc_clnt_new_callid (rpc); - conn = &rpc->conn; + rpcreq->prog = prog; + rpcreq->procnum = procnum; + rpcreq->conn = conn; + rpcreq->xid = callid; + rpcreq->cbkfn = cbkfn; - pthread_mutex_lock (&conn->lock); - { - if (conn->connected == 0) { - rpc_transport_connect (conn->trans); - } + ret = -1; - ret = -1; + if (proghdr) { + proglen += iov_length (proghdr, proghdrcount); + } - if (proghdr) { - proglen += iov_length (proghdr, proghdrcount); - } + request_iob = rpc_clnt_record (rpc, frame, prog, + procnum, proglen, + &rpchdr, callid); + if (!request_iob) { + gf_log (conn->trans->name, GF_LOG_WARNING, + "cannot build rpc-record"); + goto out; + } - if (progpayload) { - proglen += iov_length (progpayload, - progpayloadcount); - } + iobref_add (iobref, request_iob); - request_iob = rpc_clnt_record (rpc, frame, prog, - procnum, proglen, - &rpchdr, callid); - if (!request_iob) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, - "cannot build rpc-record"); - goto unlock; - } + req.msg.rpchdr = &rpchdr; + req.msg.rpchdrcount = 1; + req.msg.proghdr = proghdr; + req.msg.proghdrcount = proghdrcount; + req.msg.progpayload = progpayload; + req.msg.progpayloadcount = progpayloadcount; + req.msg.iobref = iobref; - iobref_add (iobref, request_iob); + req.rsp.rsphdr = rsphdr; + req.rsp.rsphdr_count = rsphdr_count; + req.rsp.rsp_payload = rsp_payload; + req.rsp.rsp_payload_count = rsp_payload_count; + req.rsp.rsp_iobref = rsp_iobref; + req.rpc_req = rpcreq; - req.msg.rpchdr = &rpchdr; - req.msg.rpchdrcount = 1; - req.msg.proghdr = proghdr; - req.msg.proghdrcount = proghdrcount; - req.msg.progpayload = progpayload; - req.msg.progpayloadcount = progpayloadcount; - req.msg.iobref = iobref; + pthread_mutex_lock (&conn->lock); + { + if (conn->connected == 0) { + ret = rpc_transport_connect (conn->trans, + conn->config.remote_port); + } ret = rpc_transport_submit_request (rpc->conn.trans, &req); if (ret == -1) { - gf_log ("rpc-clnt", GF_LOG_DEBUG, - "transmission of rpc-request failed"); + gf_log (conn->trans->name, GF_LOG_WARNING, + "failed to submit rpc-request " + "(XID: 0x%x Program: %s, ProgVers: %d, " + "Proc: %d) to rpc-transport (%s)", rpcreq->xid, + rpcreq->prog->progname, rpcreq->prog->progver, + rpcreq->procnum, rpc->conn.trans->name); } if ((ret >= 0) && frame) { - gettimeofday (&conn->last_sent, NULL); /* Save the frame in queue */ - __save_frame (rpc, frame, procnum, prog, cbkfn, callid); - } + __save_frame (rpc, frame, rpcreq); + gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request " + "(XID: 0x%x Program: %s, ProgVers: %d, " + "Proc: %d) to rpc-transport (%s)", rpcreq->xid, + rpcreq->prog->progname, rpcreq->prog->progver, + rpcreq->procnum, rpc->conn.trans->name); + } } -unlock: pthread_mutex_unlock (&conn->lock); if (ret == -1) { @@ -1254,26 +1511,190 @@ unlock: ret = 0; out: - iobuf_unref (request_iob); + if (request_iob) { + iobuf_unref (request_iob); + } if (new_iobref && iobref) { iobref_unref (iobref); } if (frame && (ret == -1)) { - rpcreq.rpc_status = -1; - cbkfn (&rpcreq, NULL, 0, frame); + if (rpcreq) { + rpcreq->rpc_status = -1; + cbkfn (rpcreq, NULL, 0, frame); + mem_put (rpcreq); + } } return ret; } -void +struct rpc_clnt * +rpc_clnt_ref (struct rpc_clnt *rpc) +{ + if (!rpc) + return NULL; + pthread_mutex_lock (&rpc->lock); + { + rpc->refcount++; + } + pthread_mutex_unlock (&rpc->lock); + return rpc; +} + + +static void +rpc_clnt_trigger_destroy (struct rpc_clnt *rpc) +{ + if (!rpc) + return; + + rpc_clnt_disable (rpc); + rpc_transport_unref (rpc->conn.trans); +} + +static void rpc_clnt_destroy (struct rpc_clnt *rpc) { - rpc_clnt_connection_cleanup (&rpc->conn); + if (!rpc) + return; + + saved_frames_destroy (rpc->conn.saved_frames); pthread_mutex_destroy (&rpc->lock); pthread_mutex_destroy (&rpc->conn.lock); + + /* mem-pool should be destroyed, otherwise, + it will cause huge memory leaks */ + mem_pool_destroy (rpc->reqpool); + mem_pool_destroy (rpc->saved_frames_pool); + GF_FREE (rpc); return; } + +struct rpc_clnt * +rpc_clnt_unref (struct rpc_clnt *rpc) +{ + int count = 0; + + if (!rpc) + return NULL; + pthread_mutex_lock (&rpc->lock); + { + count = --rpc->refcount; + } + pthread_mutex_unlock (&rpc->lock); + if (!count) { + rpc_clnt_trigger_destroy (rpc); + return NULL; + } + return rpc; +} + + +char +rpc_clnt_is_disabled (struct rpc_clnt *rpc) +{ + + rpc_clnt_connection_t *conn = NULL; + char disabled = 0; + + if (!rpc) { + goto out; + } + + conn = &rpc->conn; + + pthread_mutex_lock (&conn->lock); + { + disabled = rpc->disabled; + } + pthread_mutex_unlock (&conn->lock); + +out: + return disabled; +} + +void +rpc_clnt_disable (struct rpc_clnt *rpc) +{ + rpc_clnt_connection_t *conn = NULL; + + if (!rpc) { + goto out; + } + + conn = &rpc->conn; + + pthread_mutex_lock (&conn->lock); + { + rpc->disabled = 1; + + if (conn->timer) { + gf_timer_call_cancel (rpc->ctx, conn->timer); + conn->timer = NULL; + } + + if (conn->reconnect) { + gf_timer_call_cancel (rpc->ctx, conn->reconnect); + conn->reconnect = NULL; + } + conn->connected = 0; + + if (conn->ping_timer) { + gf_timer_call_cancel (rpc->ctx, conn->ping_timer); + conn->ping_timer = NULL; + conn->ping_started = 0; + } + + } + pthread_mutex_unlock (&conn->lock); + + rpc_transport_disconnect (rpc->conn.trans); + +out: + return; +} + + +void +rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config) +{ + if (config->rpc_timeout) { + if (config->rpc_timeout != rpc->conn.config.rpc_timeout) + gf_log (rpc->conn.trans->name, GF_LOG_INFO, + "changing timeout to %d (from %d)", + config->rpc_timeout, + rpc->conn.config.rpc_timeout); + rpc->conn.config.rpc_timeout = config->rpc_timeout; + } + + if (config->remote_port) { + if (config->remote_port != rpc->conn.config.remote_port) + gf_log (rpc->conn.trans->name, GF_LOG_INFO, + "changing port to %d (from %d)", + config->remote_port, + rpc->conn.config.remote_port); + + rpc->conn.config.remote_port = config->remote_port; + } + + if (config->remote_host) { + if (rpc->conn.config.remote_host) { + if (strcmp (rpc->conn.config.remote_host, + config->remote_host)) + gf_log (rpc->conn.trans->name, GF_LOG_INFO, + "changing hostname to %s (from %s)", + config->remote_host, + rpc->conn.config.remote_host); + FREE (rpc->conn.config.remote_host); + } else { + gf_log (rpc->conn.trans->name, GF_LOG_INFO, + "setting hostname to %s", + config->remote_host); + } + + rpc->conn.config.remote_host = gf_strdup (config->remote_host); + } +} |
