summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpc-clnt.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c1107
1 files changed, 764 insertions, 343 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 2375cc958..ac98a5c91 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -1,20 +1,11 @@
/*
- Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com>
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
- GlusterFS is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3 of the License,
- or (at your option) any later version.
-
- GlusterFS is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see
- <http://www.gnu.org/licenses/>.
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
*/
@@ -23,10 +14,19 @@
#include "config.h"
#endif
+#define RPC_CLNT_DEFAULT_REQUEST_COUNT 512
+
#include "rpc-clnt.h"
+#include "byte-order.h"
#include "xdr-rpcclnt.h"
#include "rpc-transport.h"
#include "protocol-common.h"
+#include "mem-pool.h"
+#include "xdr-rpc.h"
+#include "rpc-common-xdr.h"
+
+void
+rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool);
uint64_t
rpc_clnt_new_callid (struct rpc_clnt *clnt)
@@ -61,33 +61,47 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout,
return bailout_frame;
}
+static int
+_is_lock_fop (struct saved_frame *sframe)
+{
+ int fop = 0;
+
+ if (SFRAME_GET_PROGNUM (sframe) == GLUSTER_FOP_PROGRAM &&
+ SFRAME_GET_PROGVER (sframe) == GLUSTER_FOP_VERSION)
+ fop = SFRAME_GET_PROCNUM (sframe);
+
+ return ((fop == GFS3_OP_LK) ||
+ (fop == GFS3_OP_INODELK) ||
+ (fop == GFS3_OP_FINODELK) ||
+ (fop == GFS3_OP_ENTRYLK) ||
+ (fop == GFS3_OP_FENTRYLK));
+}
struct saved_frame *
-__saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum,
- rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid)
+__saved_frames_put (struct saved_frames *frames, void *frame,
+ struct rpc_req *rpcreq)
{
struct saved_frame *saved_frame = NULL;
- saved_frame = GF_CALLOC (1, sizeof (*saved_frame),
- gf_common_mt_rpcclnt_savedframe_t);
+ saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool);
if (!saved_frame) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
goto out;
}
/* THIS should be saved and set back */
+ memset (saved_frame, 0, sizeof (*saved_frame));
INIT_LIST_HEAD (&saved_frame->list);
saved_frame->capital_this = THIS;
saved_frame->frame = frame;
- saved_frame->procnum = procnum;
- saved_frame->callid = callid;
- saved_frame->prog = prog;
- saved_frame->cbkfn = cbk;
-
+ saved_frame->rpcreq = rpcreq;
gettimeofday (&saved_frame->saved_at, NULL);
- list_add_tail (&saved_frame->list, &frames->sf.list);
+ if (_is_lock_fop (saved_frame))
+ list_add_tail (&saved_frame->list, &frames->lk_sf.list);
+ else
+ list_add_tail (&saved_frame->list, &frames->sf.list);
+
frames->count++;
out:
@@ -99,9 +113,8 @@ void
saved_frames_delete (struct saved_frame *saved_frame,
rpc_clnt_connection_t *conn)
{
- if (!saved_frame || !conn) {
- goto out;
- }
+ GF_VALIDATE_OR_GOTO ("rpc-clnt", saved_frame, out);
+ GF_VALIDATE_OR_GOTO ("rpc-clnt", conn, out);
pthread_mutex_lock (&conn->lock);
{
@@ -110,7 +123,12 @@ saved_frames_delete (struct saved_frame *saved_frame,
}
pthread_mutex_unlock (&conn->lock);
- GF_FREE (saved_frame);
+ if (saved_frame->rpcreq != NULL) {
+ rpc_clnt_reply_deinit (saved_frame->rpcreq,
+ conn->rpc_clnt->reqpool);
+ }
+
+ mem_put (saved_frame);
out:
return;
}
@@ -126,11 +144,8 @@ call_bail (void *data)
struct saved_frame *saved_frame = NULL;
struct saved_frame *trav = NULL;
struct saved_frame *tmp = NULL;
- struct tm frame_sent_tm;
- char frame_sent[32] = {0,};
- struct timeval timeout = {0,};
- gf_timer_cbk_t timer_cbk = NULL;
- struct rpc_req req;
+ char frame_sent[256] = {0,};
+ struct timespec timeout = {0,};
struct iovec iov = {0,};
GF_VALIDATE_OR_GOTO ("client", data, out);
@@ -147,10 +162,8 @@ call_bail (void *data)
/* Chaining to get call-always functionality from
call-once timer */
if (conn->timer) {
- timer_cbk = conn->timer->callbk;
-
timeout.tv_sec = 10;
- timeout.tv_usec = 0;
+ timeout.tv_nsec = 0;
gf_timer_call_cancel (clnt->ctx, conn->timer);
conn->timer = gf_timer_call_after (clnt->ctx,
@@ -159,8 +172,9 @@ call_bail (void *data)
(void *) clnt);
if (conn->timer == NULL) {
- gf_log (conn->trans->name, GF_LOG_DEBUG,
- "Cannot create bailout timer");
+ gf_log (conn->trans->name, GF_LOG_WARNING,
+ "Cannot create bailout timer for %s",
+ conn->trans->peerinfo.identifier);
}
}
@@ -177,21 +191,30 @@ call_bail (void *data)
pthread_mutex_unlock (&conn->lock);
list_for_each_entry_safe (trav, tmp, &list, list) {
- localtime_r (&trav->saved_at.tv_sec, &frame_sent_tm);
- strftime (frame_sent, 32, "%Y-%m-%d %H:%M:%S", &frame_sent_tm);
+ gf_time_fmt (frame_sent, sizeof frame_sent,
+ trav->saved_at.tv_sec, gf_timefmt_FT);
+ snprintf (frame_sent + strlen (frame_sent),
+ 256 - strlen (frame_sent),
+ ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
gf_log (conn->trans->name, GF_LOG_ERROR,
- "bailing out frame type(%s) op(%s(%d)) sent = %s. "
- "timeout = %d",
- trav->prog->progname, (trav->prog->procnames) ?
- trav->prog->procnames[trav->procnum] : "--",
- trav->procnum, frame_sent,
- conn->frame_timeout);
-
- trav->cbkfn (&req, &iov, 1, trav->frame);
-
+ "bailing out frame type(%s) op(%s(%d)) xid = 0x%x "
+ "sent = %s. timeout = %d for %s",
+ trav->rpcreq->prog->progname,
+ (trav->rpcreq->prog->procnames) ?
+ trav->rpcreq->prog->procnames[trav->rpcreq->procnum] :
+ "--",
+ trav->rpcreq->procnum, trav->rpcreq->xid, frame_sent,
+ conn->frame_timeout, conn->trans->peerinfo.identifier);
+
+ clnt = rpc_clnt_ref (clnt);
+ trav->rpcreq->rpc_status = -1;
+ trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame);
+
+ rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool);
+ clnt = rpc_clnt_unref (clnt);
list_del_init (&trav->list);
- GF_FREE (trav);
+ mem_put (trav);
}
out:
return;
@@ -200,17 +223,17 @@ out:
/* to be called with conn->lock held */
struct saved_frame *
-__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum,
- rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid)
+__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
+ struct rpc_req *rpcreq)
{
rpc_clnt_connection_t *conn = NULL;
- struct timeval timeout = {0, };
+ struct timespec timeout = {0, };
struct saved_frame *saved_frame = NULL;
conn = &rpc_clnt->conn;
- saved_frame = __saved_frames_put (conn->saved_frames, frame,
- procnum, prog, cbk, callid);
+ saved_frame = __saved_frames_put (conn->saved_frames, frame, rpcreq);
+
if (saved_frame == NULL) {
goto out;
}
@@ -218,7 +241,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum,
/* TODO: make timeout configurable */
if (conn->timer == NULL) {
timeout.tv_sec = 10;
- timeout.tv_usec = 0;
+ timeout.tv_nsec = 0;
conn->timer = gf_timer_call_after (rpc_clnt->ctx,
timeout,
call_bail,
@@ -238,11 +261,11 @@ saved_frames_new (void)
saved_frames = GF_CALLOC (1, sizeof (*saved_frames),
gf_common_mt_rpcclnt_savedframe_t);
if (!saved_frames) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
return NULL;
}
INIT_LIST_HEAD (&saved_frames->sf.list);
+ INIT_LIST_HEAD (&saved_frames->lk_sf.list);
return saved_frames;
}
@@ -261,10 +284,18 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid,
}
list_for_each_entry (tmp, &frames->sf.list, list) {
- if (tmp->callid == callid) {
+ if (tmp->rpcreq->xid == callid) {
*saved_frame = *tmp;
ret = 0;
- break;
+ goto out;
+ }
+ }
+
+ list_for_each_entry (tmp, &frames->lk_sf.list, list) {
+ if (tmp->rpcreq->xid == callid) {
+ *saved_frame = *tmp;
+ ret = 0;
+ goto out;
}
}
@@ -280,14 +311,24 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid)
struct saved_frame *tmp = NULL;
list_for_each_entry (tmp, &frames->sf.list, list) {
- if (tmp->callid == callid) {
+ if (tmp->rpcreq->xid == callid) {
list_del_init (&tmp->list);
frames->count--;
saved_frame = tmp;
- break;
+ goto out;
}
}
+ list_for_each_entry (tmp, &frames->lk_sf.list, list) {
+ if (tmp->rpcreq->xid == callid) {
+ list_del_init (&tmp->list);
+ frames->count--;
+ saved_frame = tmp;
+ goto out;
+ }
+ }
+
+out:
if (saved_frame) {
THIS = saved_frame->capital_this;
}
@@ -295,32 +336,47 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid)
return saved_frame;
}
+
void
saved_frames_unwind (struct saved_frames *saved_frames)
{
struct saved_frame *trav = NULL;
struct saved_frame *tmp = NULL;
-
- struct rpc_req req;
+ char timestr[1024] = {0,};
struct iovec iov = {0,};
- memset (&req, 0, sizeof (req));
-
- req.rpc_status = -1;
+ list_splice_init (&saved_frames->lk_sf.list, &saved_frames->sf.list);
list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) {
- gf_log ("rpc-clnt", GF_LOG_ERROR,
- "forced unwinding frame type(%s) op(%s(%d))",
- trav->prog->progname, (trav->prog->procnames) ?
- trav->prog->procnames[trav->procnum] : "--",
- trav->procnum);
-
+ gf_time_fmt (timestr, sizeof timestr,
+ trav->saved_at.tv_sec, gf_timefmt_FT);
+ snprintf (timestr + strlen (timestr),
+ sizeof(timestr) - strlen (timestr),
+ ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
+
+ if (!trav->rpcreq || !trav->rpcreq->prog)
+ continue;
+
+ gf_log_callingfn (trav->rpcreq->conn->trans->name,
+ GF_LOG_ERROR,
+ "forced unwinding frame type(%s) op(%s(%d)) "
+ "called at %s (xid=0x%x)",
+ trav->rpcreq->prog->progname,
+ ((trav->rpcreq->prog->procnames) ?
+ trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
+ : "--"),
+ trav->rpcreq->procnum, timestr,
+ trav->rpcreq->xid);
saved_frames->count--;
- trav->cbkfn (&req, &iov, 1, trav->frame);
+ trav->rpcreq->rpc_status = -1;
+ trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame);
+
+ rpc_clnt_reply_deinit (trav->rpcreq,
+ trav->rpcreq->conn->rpc_clnt->reqpool);
list_del_init (&trav->list);
- GF_FREE (trav);
+ mem_put (trav);
}
}
@@ -328,6 +384,9 @@ saved_frames_unwind (struct saved_frames *saved_frames)
void
saved_frames_destroy (struct saved_frames *frames)
{
+ if (!frames)
+ return;
+
saved_frames_unwind (frames);
GF_FREE (frames);
@@ -339,7 +398,7 @@ rpc_clnt_reconnect (void *trans_ptr)
{
rpc_transport_t *trans = NULL;
rpc_clnt_connection_t *conn = NULL;
- struct timeval tv = {0, 0};
+ struct timespec ts = {0, 0};
int32_t ret = 0;
struct rpc_clnt *clnt = NULL;
@@ -358,14 +417,15 @@ rpc_clnt_reconnect (void *trans_ptr)
conn->reconnect = 0;
if (conn->connected == 0) {
- tv.tv_sec = 3;
+ ts.tv_sec = 3;
+ ts.tv_nsec = 0;
gf_log (trans->name, GF_LOG_TRACE,
"attempting reconnect");
- ret = rpc_transport_connect (trans);
-
+ ret = rpc_transport_connect (trans,
+ conn->config.remote_port);
conn->reconnect =
- gf_timer_call_after (clnt->ctx, tv,
+ gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
trans);
} else {
@@ -386,7 +446,7 @@ rpc_clnt_reconnect (void *trans_ptr)
int
rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)
{
- struct saved_frame saved_frame = {{}, 0};
+ struct saved_frame saved_frame;
int ret = -1;
pthread_mutex_lock (&clnt->conn.lock);
@@ -397,14 +457,16 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)
pthread_mutex_unlock (&clnt->conn.lock);
if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the saved "
+ gf_log (clnt->conn.trans->name, GF_LOG_CRITICAL,
+ "cannot lookup the saved "
"frame corresponding to xid (%d)", info->xid);
goto out;
}
- info->prognum = saved_frame.prog->prognum;
- info->procnum = saved_frame.procnum;
- info->progver = saved_frame.prog->progver;
+ info->prognum = saved_frame.rpcreq->prog->prognum;
+ info->procnum = saved_frame.rpcreq->procnum;
+ info->progver = saved_frame.rpcreq->prog->progver;
+ info->rpc_req = saved_frame.rpcreq;
info->rsp = saved_frame.rsp;
ret = 0;
@@ -412,6 +474,31 @@ out:
return ret;
}
+int
+rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)
+{
+ struct rpc_clnt *clnt = NULL;
+
+ if (!conn) {
+ goto out;
+ }
+
+ clnt = conn->rpc_clnt;
+
+ pthread_mutex_lock (&conn->lock);
+ {
+
+ if (conn->reconnect) {
+ gf_timer_call_cancel (clnt->ctx, conn->reconnect);
+ conn->reconnect = NULL;
+ }
+
+ }
+ pthread_mutex_unlock (&conn->lock);
+
+out:
+ return 0;
+}
/*
* client_protocol_cleanup - cleanup function
@@ -430,7 +517,7 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
clnt = conn->rpc_clnt;
- gf_log ("rpc-clnt", GF_LOG_DEBUG,
+ gf_log (conn->trans->name, GF_LOG_TRACE,
"cleaning up state in transport object %p", conn->trans);
pthread_mutex_lock (&conn->lock);
@@ -444,11 +531,13 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
conn->timer = NULL;
}
- if (conn->reconnect == NULL) {
- /* :O This part is empty.. any thing missing? */
- }
-
conn->connected = 0;
+
+ if (conn->ping_timer) {
+ gf_timer_call_cancel (clnt->ctx, conn->ping_timer);
+ conn->ping_timer = NULL;
+ conn->ping_started = 0;
+ }
}
pthread_mutex_unlock (&conn->lock);
@@ -485,9 +574,10 @@ int
rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,
rpc_clnt_connection_t *conn,
struct rpc_msg *replymsg, struct iovec progmsg,
- struct rpc_req *req, struct saved_frame *saved_frame)
+ struct rpc_req *req,
+ struct saved_frame *saved_frame)
{
- int ret = -1;
+ int ret = -1;
if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) {
goto out;
@@ -499,25 +589,14 @@ rpc_clnt_reply_fill (rpc_transport_pollin_t *msg,
req->rpc_status = -1;
}
- req->xid = rpc_reply_xid (replymsg);
- req->prog = saved_frame->prog;
- req->procnum = saved_frame->procnum;
- req->conn = conn;
-
req->rsp[0] = progmsg;
+ req->rsp_iobref = iobref_ref (msg->iobref);
if (msg->vectored) {
- req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2);
- req->rsp[1].iov_len = msg->data.vector.size2;
-
+ req->rsp[1] = msg->vector[1];
req->rspcnt = 2;
-
- req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1);
- req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2);
} else {
req->rspcnt = 1;
-
- req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf);
}
/* By this time, the data bytes for the auth scheme would have already
@@ -539,20 +618,17 @@ out:
void
-rpc_clnt_reply_deinit (struct rpc_req *req)
+rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool)
{
if (!req) {
goto out;
}
- if (req->rsp_prochdr) {
- iobuf_unref (req->rsp_prochdr);
- }
-
- if (req->rsp_procpayload) {
- iobuf_unref (req->rsp_procpayload);
+ if (req->rsp_iobref) {
+ iobref_unref (req->rsp_iobref);
}
+ mem_put (req);
out:
return;
}
@@ -569,18 +645,14 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
size_t msglen = 0;
int ret = -1;
- if (msg->vectored) {
- msgbuf = iobuf_ptr (msg->data.vector.iobuf1);
- msglen = msg->data.vector.size1;
- } else {
- msgbuf = iobuf_ptr (msg->data.simple.iobuf);
- msglen = msg->data.simple.size;
- }
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg,
req->verf.authdata);
if (ret != 0) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC reply decoding failed");
+ gf_log (conn->trans->name, GF_LOG_WARNING,
+ "RPC reply decoding failed");
goto out;
}
@@ -590,33 +662,13 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
goto out;
}
- gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s,"
- " ProgVers: %d, Proc: %d", saved_frame->callid,
- saved_frame->prog->progname, saved_frame->prog->progver,
- saved_frame->procnum);
-/* TODO: */
- /* TODO: AUTH */
- /* The verifier that is sent in a reply is a string that can be used as
- * a shorthand in credentials for future transactions. We can opt not to
- * use this shorthand, preffering to use the original AUTH_UNIX method
- * for authentication (containing all the details for authentication in
- * credential itself). Hence it is not mandatory for us to be checking
- * the verifier. See Appendix A of rfc-5531 for more details.
- */
-
- /*
- * ret = rpc_authenticate (req);
- * if (ret == RPC_AUTH_REJECT) {
- * gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed authentication");
- * ret = -1;
- * goto out;
- * }
- */
-
- /* If the error is not RPC_MISMATCH, we consider the call as accepted
- * since we are not handling authentication failures for now.
- */
- req->rpc_status = 0;
+ gf_log (conn->trans->name, GF_LOG_TRACE,
+ "received rpc message (RPC XID: 0x%x"
+ " Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)",
+ saved_frame->rpcreq->xid,
+ saved_frame->rpcreq->prog->progname,
+ saved_frame->rpcreq->prog->progver,
+ saved_frame->rpcreq->procnum, conn->trans->name);
out:
if (ret != 0) {
@@ -626,51 +678,110 @@ out:
return ret;
}
+int
+rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg)
+{
+ char *msgbuf = NULL;
+ rpcclnt_cb_program_t *program = NULL;
+ struct rpc_msg rpcmsg;
+ struct iovec progmsg; /* RPC Program payload */
+ size_t msglen = 0;
+ int found = 0;
+ int ret = -1;
+ int procnum = 0;
+
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
+
+ clnt = rpc_clnt_ref (clnt);
+ ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL);
+ if (ret == -1) {
+ gf_log (clnt->conn.trans->name, GF_LOG_WARNING,
+ "RPC call decoding failed");
+ goto out;
+ }
+
+ gf_log (clnt->conn.trans->name, GF_LOG_TRACE,
+ "received rpc message (XID: 0x%lx, "
+ "Ver: %ld, Program: %ld, ProgVers: %ld, Proc: %ld) "
+ "from rpc-transport (%s)", rpc_call_xid (&rpcmsg),
+ rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
+ rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
+ clnt->conn.trans->name);
+
+ procnum = rpc_call_progproc (&rpcmsg);
+
+ pthread_mutex_lock (&clnt->lock);
+ {
+ list_for_each_entry (program, &clnt->programs, program) {
+ if ((program->prognum == rpc_call_program (&rpcmsg))
+ && (program->progver
+ == rpc_call_progver (&rpcmsg))) {
+ found = 1;
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock (&clnt->lock);
+
+ if (found && (procnum < program->numactors) &&
+ (program->actors[procnum].actor)) {
+ program->actors[procnum].actor (clnt, program->mydata,
+ &progmsg);
+ }
+
+out:
+ clnt = rpc_clnt_unref (clnt);
+ return ret;
+}
int
rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
{
rpc_clnt_connection_t *conn = NULL;
struct saved_frame *saved_frame = NULL;
- rpc_request_info_t *request_info = NULL;
int ret = -1;
- struct rpc_req req = {0, };
+ struct rpc_req *req = NULL;
+ uint32_t xid = 0;
+ clnt = rpc_clnt_ref (clnt);
conn = &clnt->conn;
- request_info = pollin->private;
-
- saved_frame = lookup_frame (conn, (int64_t)request_info->xid);
+ xid = ntoh32 (*((uint32_t *)pollin->vector[0].iov_base));
+ saved_frame = lookup_frame (conn, xid);
if (saved_frame == NULL) {
- gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the "
- "saved frame for reply with xid (%d), "
- "prog-version (%d), prog-num (%d),"
- "procnum (%d)", request_info->xid,
- request_info->progver, request_info->prognum,
- request_info->procnum);
+ gf_log (conn->trans->name, GF_LOG_ERROR,
+ "cannot lookup the saved frame for reply with xid (%u)",
+ xid);
+ goto out;
+ }
+
+ req = saved_frame->rpcreq;
+ if (req == NULL) {
+ gf_log (conn->trans->name, GF_LOG_ERROR,
+ "no request with frame for xid (%u)", xid);
goto out;
}
- ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame);
+ ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame);
if (ret != 0) {
- req.rpc_status = -1;
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply "
- "failed");
+ req->rpc_status = -1;
+ gf_log (conn->trans->name, GF_LOG_WARNING,
+ "initialising rpc reply failed");
}
- saved_frame->cbkfn (&req, req.rsp, req.rspcnt, saved_frame->frame);
+ req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame);
- if (ret == 0) {
- rpc_clnt_reply_deinit (&req);
+ if (req) {
+ rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool);
}
-
- ret = 0;
out:
if (saved_frame) {
- GF_FREE (saved_frame);
+ mem_put (saved_frame);
}
+ clnt = rpc_clnt_unref (clnt);
return ret;
}
@@ -710,36 +821,42 @@ out:
return;
}
+static void
+rpc_clnt_destroy (struct rpc_clnt *rpc);
int
rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_event_t event, void *data, ...)
{
- rpc_clnt_connection_t *conn = NULL;
- struct rpc_clnt *clnt = NULL;
- int ret = -1;
- rpc_request_info_t *req_info = NULL;
- rpc_transport_pollin_t *pollin = NULL;
- struct timeval tv = {0, };
+ rpc_clnt_connection_t *conn = NULL;
+ struct rpc_clnt *clnt = NULL;
+ int ret = -1;
+ rpc_request_info_t *req_info = NULL;
+ rpc_transport_pollin_t *pollin = NULL;
+ struct timespec ts = {0, };
conn = mydata;
if (conn == NULL) {
goto out;
}
clnt = conn->rpc_clnt;
+ if (!clnt)
+ goto out;
switch (event) {
case RPC_TRANSPORT_DISCONNECT:
{
- rpc_clnt_connection_cleanup (&clnt->conn);
+ rpc_clnt_connection_cleanup (conn);
pthread_mutex_lock (&conn->lock);
{
- if (conn->reconnect == NULL) {
- tv.tv_sec = 10;
+ if (!conn->rpc_clnt->disabled
+ && (conn->reconnect == NULL)) {
+ ts.tv_sec = 10;
+ ts.tv_nsec = 0;
conn->reconnect =
- gf_timer_call_after (clnt->ctx, tv,
+ gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
conn->trans);
}
@@ -747,15 +864,13 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
pthread_mutex_unlock (&conn->lock);
if (clnt->notifyfn)
- ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT,
- NULL);
+ ret = clnt->notifyfn (clnt, clnt->mydata,
+ RPC_CLNT_DISCONNECT, NULL);
break;
}
case RPC_TRANSPORT_CLEANUP:
- /* this event should not be received on a client for, a
- * transport is only disconnected, but never destroyed.
- */
+ rpc_clnt_destroy (clnt);
ret = 0;
break;
@@ -768,8 +883,17 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
case RPC_TRANSPORT_MSG_RECEIVED:
{
+ pthread_mutex_lock (&conn->lock);
+ {
+ gettimeofday (&conn->last_received, NULL);
+ }
+ pthread_mutex_unlock (&conn->lock);
+
pollin = data;
- ret = rpc_clnt_handle_reply (clnt, pollin);
+ if (pollin->is_reply)
+ ret = rpc_clnt_handle_reply (clnt, pollin);
+ else
+ ret = rpc_clnt_handle_cbk (clnt, pollin);
/* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG,
* data);
*/
@@ -790,8 +914,17 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
case RPC_TRANSPORT_CONNECT:
{
+ /* Every time there is a disconnection, processes
+ should try to connect to 'glusterd' (ie, default
+ port) or whichever port given as 'option remote-port'
+ in volume file. */
+ /* Below code makes sure the (re-)configured port lasts
+ for just one successful attempt */
+ conn->config.remote_port = 0;
+
if (clnt->notifyfn)
- ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_CONNECT, NULL);
+ ret = clnt->notifyfn (clnt, clnt->mydata,
+ RPC_CLNT_CONNECT, NULL);
break;
}
@@ -815,7 +948,7 @@ rpc_clnt_connection_deinit (rpc_clnt_connection_t *conn)
}
-inline int
+static inline int
rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
dict_t *options, char *name)
{
@@ -828,7 +961,7 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
ret = dict_get_int32 (options, "frame-timeout",
&conn->frame_timeout);
if (ret >= 0) {
- gf_log (name, GF_LOG_DEBUG,
+ gf_log (name, GF_LOG_INFO,
"setting frame-timeout to %d", conn->frame_timeout);
} else {
gf_log (name, GF_LOG_DEBUG,
@@ -838,8 +971,9 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
conn->trans = rpc_transport_load (ctx, options, name);
if (!conn->trans) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "loading of new rpc-transport"
+ gf_log (name, GF_LOG_WARNING, "loading of new rpc-transport"
" failed");
+ ret = -1;
goto out;
}
@@ -850,7 +984,7 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
ret = rpc_transport_register_notify (conn->trans, rpc_clnt_notify,
conn);
if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "registering notify failed");
+ gf_log (name, GF_LOG_WARNING, "registering notify failed");
rpc_clnt_connection_cleanup (conn);
conn = NULL;
goto out;
@@ -858,50 +992,93 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
conn->saved_frames = saved_frames_new ();
if (!conn->saved_frames) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "creation of saved_frames "
+ gf_log (name, GF_LOG_WARNING, "creation of saved_frames "
"failed");
rpc_clnt_connection_cleanup (conn);
goto out;
}
- rpc_clnt_reconnect (conn->trans);
-
ret = 0;
out:
return ret;
}
-
struct rpc_clnt *
-rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
- glusterfs_ctx_t *ctx, char *name)
+rpc_clnt_new (dict_t *options, glusterfs_ctx_t *ctx, char *name,
+ uint32_t reqpool_size)
{
int ret = -1;
struct rpc_clnt *rpc = NULL;
rpc = GF_CALLOC (1, sizeof (*rpc), gf_common_mt_rpcclnt_t);
if (!rpc) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
goto out;
}
pthread_mutex_init (&rpc->lock, NULL);
rpc->ctx = ctx;
+ if (!reqpool_size)
+ reqpool_size = RPC_CLNT_DEFAULT_REQUEST_COUNT;
+
+ rpc->reqpool = mem_pool_new (struct rpc_req, reqpool_size);
+ if (rpc->reqpool == NULL) {
+ pthread_mutex_destroy (&rpc->lock);
+ GF_FREE (rpc);
+ rpc = NULL;
+ goto out;
+ }
+
+ rpc->saved_frames_pool = mem_pool_new (struct saved_frame,
+ reqpool_size);
+ if (rpc->saved_frames_pool == NULL) {
+ pthread_mutex_destroy (&rpc->lock);
+ mem_pool_destroy (rpc->reqpool);
+ GF_FREE (rpc);
+ rpc = NULL;
+ goto out;
+ }
+
ret = rpc_clnt_connection_init (rpc, ctx, options, name);
if (ret == -1) {
pthread_mutex_destroy (&rpc->lock);
+ mem_pool_destroy (rpc->reqpool);
+ mem_pool_destroy (rpc->saved_frames_pool);
GF_FREE (rpc);
rpc = NULL;
+ if (options)
+ dict_unref (options);
goto out;
}
+
+ rpc->auth_null = dict_get_str_boolean (options, "auth-null", 0);
+
+ rpc = rpc_clnt_ref (rpc);
+ INIT_LIST_HEAD (&rpc->programs);
+
out:
return rpc;
}
int
+rpc_clnt_start (struct rpc_clnt *rpc)
+{
+ struct rpc_clnt_connection *conn = NULL;
+
+ if (!rpc)
+ return -1;
+
+ conn = &rpc->conn;
+
+ rpc_clnt_reconnect (conn->trans);
+
+ return 0;
+}
+
+
+int
rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
void *mydata)
{
@@ -912,7 +1089,7 @@ rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
}
ssize_t
-xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms *au)
+xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms_v2 *au)
{
ssize_t ret = -1;
XDR xdr;
@@ -920,10 +1097,11 @@ xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms *au)
if ((!dest) || (!au))
return -1;
- xdrmem_create (&xdr, dest, 1024,
- XDR_ENCODE);
+ xdrmem_create (&xdr, dest, GF_MAX_AUTH_BYTES, XDR_ENCODE);
- if (!xdr_auth_glusterfs_parms (&xdr, au)) {
+ if (!xdr_auth_glusterfs_parms_v2 (&xdr, au)) {
+ gf_log (THIS->name, GF_LOG_WARNING,
+ "failed to encode auth glusterfs elements");
ret = -1;
goto ret;
}
@@ -936,12 +1114,11 @@ ret:
int
-rpc_clnt_fill_request (int prognum, int progver, int procnum, int payload,
- uint64_t xid, struct auth_glusterfs_parms *au,
- struct rpc_msg *request)
+rpc_clnt_fill_request (int prognum, int progver, int procnum,
+ uint64_t xid, struct auth_glusterfs_parms_v2 *au,
+ struct rpc_msg *request, char *auth_data)
{
int ret = -1;
- char dest[1024] = {0,};
if (!request) {
goto out;
@@ -957,19 +1134,26 @@ rpc_clnt_fill_request (int prognum, int progver, int procnum, int payload,
request->rm_call.cb_vers = progver;
request->rm_call.cb_proc = procnum;
- /* TODO: Using AUTH_GLUSTERFS for time-being. Make it modular in
- * future so it is easy to plug-in new authentication schemes.
+ /* TODO: Using AUTH_(GLUSTERFS/NULL) in a kludgy way for time-being.
+ * Make it modular in future so it is easy to plug-in new
+ * authentication schemes.
*/
- ret = xdr_serialize_glusterfs_auth (dest, au);
- if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot encode credentials");
- goto out;
- }
-
- request->rm_call.cb_cred.oa_flavor = AUTH_GLUSTERFS;
- request->rm_call.cb_cred.oa_base = dest;
- request->rm_call.cb_cred.oa_length = ret;
+ if (auth_data) {
+ ret = xdr_serialize_glusterfs_auth (auth_data, au);
+ if (ret == -1) {
+ gf_log ("rpc-clnt", GF_LOG_DEBUG,
+ "cannot encode credentials");
+ goto out;
+ }
+ request->rm_call.cb_cred.oa_flavor = AUTH_GLUSTERFS_v2;
+ request->rm_call.cb_cred.oa_base = auth_data;
+ request->rm_call.cb_cred.oa_length = ret;
+ } else {
+ request->rm_call.cb_cred.oa_flavor = AUTH_NULL;
+ request->rm_call.cb_cred.oa_base = NULL;
+ request->rm_call.cb_cred.oa_length = 0;
+ }
request->rm_call.cb_verf.oa_flavor = AUTH_NONE;
request->rm_call.cb_verf.oa_base = NULL;
request->rm_call.cb_verf.oa_length = 0;
@@ -980,42 +1164,16 @@ out:
}
-void
-rpc_clnt_set_lastfrag (uint32_t *fragsize) {
- (*fragsize) |= 0x80000000U;
-}
-
-
-void
-rpc_clnt_set_frag_header_size (uint32_t size, char *haddr)
-{
- size = htonl (size);
- memcpy (haddr, &size, sizeof (size));
-}
-
-
-void
-rpc_clnt_set_last_frag_header_size (uint32_t size, char *haddr)
-{
- rpc_clnt_set_lastfrag (&size);
- rpc_clnt_set_frag_header_size (size, haddr);
-}
-
-
struct iovec
rpc_clnt_record_build_header (char *recordstart, size_t rlen,
struct rpc_msg *request, size_t payload)
{
struct iovec requesthdr = {0, };
struct iovec txrecord = {0, 0};
- size_t fraglen = 0;
int ret = -1;
+ size_t fraglen = 0;
- /* After leaving aside the 4 bytes for the fragment header, lets
- * encode the RPC reply structure into the buffer given to us.
- */
- ret = rpc_request_to_xdr (request, (recordstart + RPC_FRAGHDR_SIZE),
- rlen, &requesthdr);
+ ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr);
if (ret == -1) {
gf_log ("rpc-clnt", GF_LOG_DEBUG,
"Failed to create RPC request");
@@ -1026,16 +1184,7 @@ rpc_clnt_record_build_header (char *recordstart, size_t rlen,
gf_log ("rpc-clnt", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, "
"rpc hdr: %zu", fraglen, payload, requesthdr.iov_len);
- /* Since we're not spreading RPC records over mutiple fragments
- * we just set this fragment as the first and last fragment for this
- * record.
- */
- rpc_clnt_set_last_frag_header_size (fraglen, recordstart);
- /* Even though the RPC record starts at recordstart+RPCSVC_FRAGHDR_SIZE
- * we need to transmit the record with the fragment header, which starts
- * at recordstart.
- */
txrecord.iov_base = recordstart;
/* Remember, this is only the vec for the RPC header and does not
@@ -1043,7 +1192,7 @@ rpc_clnt_record_build_header (char *recordstart, size_t rlen,
* the size of the full fragment. This size is sent in the fragment
* header.
*/
- txrecord.iov_len = RPC_FRAGHDR_SIZE + requesthdr.iov_len;
+ txrecord.iov_len = requesthdr.iov_len;
out:
return txrecord;
@@ -1052,50 +1201,57 @@ out:
struct iobuf *
rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver,
- int procnum, size_t payload, uint64_t xid,
- struct auth_glusterfs_parms *au, struct iovec *recbuf)
+ int procnum, size_t hdrsize, uint64_t xid,
+ struct auth_glusterfs_parms_v2 *au,
+ struct iovec *recbuf)
{
- struct rpc_msg request = {0, };
- struct iobuf *request_iob = NULL;
- char *record = NULL;
- struct iovec recordhdr = {0, };
- size_t pagesize = 0;
- int ret = -1;
+ struct rpc_msg request = {0, };
+ struct iobuf *request_iob = NULL;
+ char *record = NULL;
+ struct iovec recordhdr = {0, };
+ size_t pagesize = 0;
+ int ret = -1;
+ size_t xdr_size = 0;
+ char auth_data[GF_MAX_AUTH_BYTES] = {0, };
if ((!clnt) || (!recbuf) || (!au)) {
goto out;
}
+ /* Fill the rpc structure and XDR it into the buffer got above. */
+ if (clnt->auth_null)
+ ret = rpc_clnt_fill_request (prognum, progver, procnum,
+ xid, NULL, &request, NULL);
+ else
+ ret = rpc_clnt_fill_request (prognum, progver, procnum,
+ xid, au, &request, auth_data);
+
+ if (ret == -1) {
+ gf_log (clnt->conn.trans->name, GF_LOG_WARNING,
+ "cannot build a rpc-request xid (%"PRIu64")", xid);
+ goto out;
+ }
+
+ xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request);
+
/* First, try to get a pointer into the buffer which the RPC
* layer can use.
*/
- request_iob = iobuf_get (clnt->ctx->iobuf_pool);
+ request_iob = iobuf_get2 (clnt->ctx->iobuf_pool, (xdr_size + hdrsize));
if (!request_iob) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to get iobuf");
goto out;
}
- pagesize = ((struct iobuf_pool *)clnt->ctx->iobuf_pool)->page_size;
+ pagesize = iobuf_pagesize (request_iob);
record = iobuf_ptr (request_iob); /* Now we have it. */
- /* Fill the rpc structure and XDR it into the buffer got above. */
- ret = rpc_clnt_fill_request (prognum, progver, procnum, payload, xid,
- au, &request);
- if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build a rpc-request "
- "xid (%"PRIu64")", xid);
- goto out;
- }
-
recordhdr = rpc_clnt_record_build_header (record, pagesize, &request,
- payload);
-
- //GF_FREE (request.rm_call.cb_cred.oa_base);
+ hdrsize);
if (!recordhdr.iov_base) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record "
- " header");
+ gf_log (clnt->conn.trans->name, GF_LOG_ERROR,
+ "Failed to build record header");
iobuf_unref (request_iob);
request_iob = NULL;
recbuf->iov_base = NULL;
@@ -1112,43 +1268,50 @@ out:
struct iobuf *
rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame,
- rpc_clnt_prog_t *prog,int procnum, size_t payload_len,
+ rpc_clnt_prog_t *prog, int procnum, size_t hdrlen,
struct iovec *rpchdr, uint64_t callid)
{
- struct auth_glusterfs_parms au = {0, };
- struct iobuf *request_iob = NULL;
+ struct auth_glusterfs_parms_v2 au = {0, };
+ struct iobuf *request_iob = NULL;
+ char owner[4] = {0,};
if (!prog || !rpchdr || !call_frame) {
goto out;
}
- au.pid = call_frame->root->pid;
- au.uid = call_frame->root->uid;
- au.gid = call_frame->root->gid;
- au.ngrps = call_frame->root->ngrps;
- au.lk_owner = call_frame->root->lk_owner;
- if (!au.lk_owner)
- au.lk_owner = au.pid;
+ au.pid = call_frame->root->pid;
+ au.uid = call_frame->root->uid;
+ au.gid = call_frame->root->gid;
+ au.groups.groups_len = call_frame->root->ngrps;
+ au.lk_owner.lk_owner_len = call_frame->root->lk_owner.len;
- gf_log ("", GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d"
- ", gid: %d, owner: %"PRId64,
- au.pid, au.uid, au.gid, au.lk_owner);
+ if (au.groups.groups_len)
+ au.groups.groups_val = call_frame->root->groups;
- memcpy (au.groups, call_frame->root->groups, 16);
+ if (call_frame->root->lk_owner.len)
+ au.lk_owner.lk_owner_val = call_frame->root->lk_owner.data;
+ else {
+ owner[0] = (char)(au.pid & 0xff);
+ owner[1] = (char)((au.pid >> 8) & 0xff);
+ owner[2] = (char)((au.pid >> 16) & 0xff);
+ owner[3] = (char)((au.pid >> 24) & 0xff);
- //rpc_transport_get_myname (clnt->conn.trans, myname, UNIX_PATH_MAX);
- //au.aup_machname = myname;
+ au.lk_owner.lk_owner_val = owner;
+ au.lk_owner.lk_owner_len = 4;
+ }
+
+ gf_log (clnt->conn.trans->name, GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d"
+ ", gid: %d, owner: %s", au.pid, au.uid, au.gid,
+ lkowner_utoa (&call_frame->root->lk_owner));
- /* Assuming the client program would like to speak to the same versioned
- * program on server.
- */
request_iob = rpc_clnt_record_build_record (clnt, prog->prognum,
prog->progver,
- procnum, payload_len,
+ procnum, hdrlen,
callid, &au,
rpchdr);
if (!request_iob) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build rpc-record");
+ gf_log (clnt->conn.trans->name, GF_LOG_WARNING,
+ "cannot build rpc-record");
goto out;
}
@@ -1156,18 +1319,87 @@ out:
return request_iob;
}
+int
+rpcclnt_cbk_program_register (struct rpc_clnt *clnt,
+ rpcclnt_cb_program_t *program, void *mydata)
+{
+ int ret = -1;
+ char already_registered = 0;
+ rpcclnt_cb_program_t *tmp = NULL;
+
+ if (!clnt)
+ goto out;
+
+ if (program->actors == NULL)
+ goto out;
+
+ pthread_mutex_lock (&clnt->lock);
+ {
+ list_for_each_entry (tmp, &clnt->programs, program) {
+ if ((program->prognum == tmp->prognum)
+ && (program->progver == tmp->progver)) {
+ already_registered = 1;
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock (&clnt->lock);
+
+ if (already_registered) {
+ gf_log_callingfn (clnt->conn.trans->name, GF_LOG_DEBUG,
+ "already registered");
+ ret = 0;
+ goto out;
+ }
+
+ tmp = GF_CALLOC (1, sizeof (*tmp),
+ gf_common_mt_rpcclnt_cb_program_t);
+ if (tmp == NULL) {
+ goto out;
+ }
+
+ memcpy (tmp, program, sizeof (*tmp));
+ INIT_LIST_HEAD (&tmp->program);
+
+ tmp->mydata = mydata;
+
+ pthread_mutex_lock (&clnt->lock);
+ {
+ list_add_tail (&tmp->program, &clnt->programs);
+ }
+ pthread_mutex_unlock (&clnt->lock);
+
+ ret = 0;
+ gf_log (clnt->conn.trans->name, GF_LOG_DEBUG,
+ "New program registered: %s, Num: %d, Ver: %d",
+ program->progname, program->prognum,
+ program->progver);
+
+out:
+ if (ret == -1) {
+ gf_log (clnt->conn.trans->name, GF_LOG_ERROR,
+ "Program registration failed:"
+ " %s, Num: %d, Ver: %d", program->progname,
+ program->prognum, program->progver);
+ }
+
+ return ret;
+}
+
int
rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
int procnum, fop_cbk_fn_t cbkfn,
struct iovec *proghdr, int proghdrcount,
struct iovec *progpayload, int progpayloadcount,
- struct iobref *iobref, void *frame)
+ struct iobref *iobref, void *frame, struct iovec *rsphdr,
+ int rsphdr_count, struct iovec *rsp_payload,
+ int rsp_payload_count, struct iobref *rsp_iobref)
{
rpc_clnt_connection_t *conn = NULL;
struct iobuf *request_iob = NULL;
struct iovec rpchdr = {0,};
- struct rpc_req rpcreq = {0,};
+ struct rpc_req *rpcreq = NULL;
rpc_transport_req_t req;
int ret = -1;
int proglen = 0;
@@ -1178,12 +1410,23 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
goto out;
}
+ conn = &rpc->conn;
+
+ if (conn->trans == NULL) {
+ goto out;
+ }
+
+ rpcreq = mem_get (rpc->reqpool);
+ if (rpcreq == NULL) {
+ goto out;
+ }
+
+ memset (rpcreq, 0, sizeof (*rpcreq));
memset (&req, 0, sizeof (req));
if (!iobref) {
iobref = iobref_new ();
if (!iobref) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
goto out;
}
@@ -1192,59 +1435,73 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
callid = rpc_clnt_new_callid (rpc);
- conn = &rpc->conn;
+ rpcreq->prog = prog;
+ rpcreq->procnum = procnum;
+ rpcreq->conn = conn;
+ rpcreq->xid = callid;
+ rpcreq->cbkfn = cbkfn;
- pthread_mutex_lock (&conn->lock);
- {
- if (conn->connected == 0) {
- rpc_transport_connect (conn->trans);
- }
+ ret = -1;
- ret = -1;
+ if (proghdr) {
+ proglen += iov_length (proghdr, proghdrcount);
+ }
- if (proghdr) {
- proglen += iov_length (proghdr, proghdrcount);
- }
+ request_iob = rpc_clnt_record (rpc, frame, prog,
+ procnum, proglen,
+ &rpchdr, callid);
+ if (!request_iob) {
+ gf_log (conn->trans->name, GF_LOG_WARNING,
+ "cannot build rpc-record");
+ goto out;
+ }
- if (progpayload) {
- proglen += iov_length (progpayload,
- progpayloadcount);
- }
+ iobref_add (iobref, request_iob);
- request_iob = rpc_clnt_record (rpc, frame, prog,
- procnum, proglen,
- &rpchdr, callid);
- if (!request_iob) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG,
- "cannot build rpc-record");
- goto unlock;
- }
+ req.msg.rpchdr = &rpchdr;
+ req.msg.rpchdrcount = 1;
+ req.msg.proghdr = proghdr;
+ req.msg.proghdrcount = proghdrcount;
+ req.msg.progpayload = progpayload;
+ req.msg.progpayloadcount = progpayloadcount;
+ req.msg.iobref = iobref;
- iobref_add (iobref, request_iob);
+ req.rsp.rsphdr = rsphdr;
+ req.rsp.rsphdr_count = rsphdr_count;
+ req.rsp.rsp_payload = rsp_payload;
+ req.rsp.rsp_payload_count = rsp_payload_count;
+ req.rsp.rsp_iobref = rsp_iobref;
+ req.rpc_req = rpcreq;
- req.msg.rpchdr = &rpchdr;
- req.msg.rpchdrcount = 1;
- req.msg.proghdr = proghdr;
- req.msg.proghdrcount = proghdrcount;
- req.msg.progpayload = progpayload;
- req.msg.progpayloadcount = progpayloadcount;
- req.msg.iobref = iobref;
+ pthread_mutex_lock (&conn->lock);
+ {
+ if (conn->connected == 0) {
+ ret = rpc_transport_connect (conn->trans,
+ conn->config.remote_port);
+ }
ret = rpc_transport_submit_request (rpc->conn.trans,
&req);
if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG,
- "transmission of rpc-request failed");
+ gf_log (conn->trans->name, GF_LOG_WARNING,
+ "failed to submit rpc-request "
+ "(XID: 0x%x Program: %s, ProgVers: %d, "
+ "Proc: %d) to rpc-transport (%s)", rpcreq->xid,
+ rpcreq->prog->progname, rpcreq->prog->progver,
+ rpcreq->procnum, rpc->conn.trans->name);
}
if ((ret >= 0) && frame) {
- gettimeofday (&conn->last_sent, NULL);
/* Save the frame in queue */
- __save_frame (rpc, frame, procnum, prog, cbkfn, callid);
- }
+ __save_frame (rpc, frame, rpcreq);
+ gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request "
+ "(XID: 0x%x Program: %s, ProgVers: %d, "
+ "Proc: %d) to rpc-transport (%s)", rpcreq->xid,
+ rpcreq->prog->progname, rpcreq->prog->progver,
+ rpcreq->procnum, rpc->conn.trans->name);
+ }
}
-unlock:
pthread_mutex_unlock (&conn->lock);
if (ret == -1) {
@@ -1254,26 +1511,190 @@ unlock:
ret = 0;
out:
- iobuf_unref (request_iob);
+ if (request_iob) {
+ iobuf_unref (request_iob);
+ }
if (new_iobref && iobref) {
iobref_unref (iobref);
}
if (frame && (ret == -1)) {
- rpcreq.rpc_status = -1;
- cbkfn (&rpcreq, NULL, 0, frame);
+ if (rpcreq) {
+ rpcreq->rpc_status = -1;
+ cbkfn (rpcreq, NULL, 0, frame);
+ mem_put (rpcreq);
+ }
}
return ret;
}
-void
+struct rpc_clnt *
+rpc_clnt_ref (struct rpc_clnt *rpc)
+{
+ if (!rpc)
+ return NULL;
+ pthread_mutex_lock (&rpc->lock);
+ {
+ rpc->refcount++;
+ }
+ pthread_mutex_unlock (&rpc->lock);
+ return rpc;
+}
+
+
+static void
+rpc_clnt_trigger_destroy (struct rpc_clnt *rpc)
+{
+ if (!rpc)
+ return;
+
+ rpc_clnt_disable (rpc);
+ rpc_transport_unref (rpc->conn.trans);
+}
+
+static void
rpc_clnt_destroy (struct rpc_clnt *rpc)
{
- rpc_clnt_connection_cleanup (&rpc->conn);
+ if (!rpc)
+ return;
+
+ saved_frames_destroy (rpc->conn.saved_frames);
pthread_mutex_destroy (&rpc->lock);
pthread_mutex_destroy (&rpc->conn.lock);
+
+ /* mem-pool should be destroyed, otherwise,
+ it will cause huge memory leaks */
+ mem_pool_destroy (rpc->reqpool);
+ mem_pool_destroy (rpc->saved_frames_pool);
+
GF_FREE (rpc);
return;
}
+
+struct rpc_clnt *
+rpc_clnt_unref (struct rpc_clnt *rpc)
+{
+ int count = 0;
+
+ if (!rpc)
+ return NULL;
+ pthread_mutex_lock (&rpc->lock);
+ {
+ count = --rpc->refcount;
+ }
+ pthread_mutex_unlock (&rpc->lock);
+ if (!count) {
+ rpc_clnt_trigger_destroy (rpc);
+ return NULL;
+ }
+ return rpc;
+}
+
+
+char
+rpc_clnt_is_disabled (struct rpc_clnt *rpc)
+{
+
+ rpc_clnt_connection_t *conn = NULL;
+ char disabled = 0;
+
+ if (!rpc) {
+ goto out;
+ }
+
+ conn = &rpc->conn;
+
+ pthread_mutex_lock (&conn->lock);
+ {
+ disabled = rpc->disabled;
+ }
+ pthread_mutex_unlock (&conn->lock);
+
+out:
+ return disabled;
+}
+
+void
+rpc_clnt_disable (struct rpc_clnt *rpc)
+{
+ rpc_clnt_connection_t *conn = NULL;
+
+ if (!rpc) {
+ goto out;
+ }
+
+ conn = &rpc->conn;
+
+ pthread_mutex_lock (&conn->lock);
+ {
+ rpc->disabled = 1;
+
+ if (conn->timer) {
+ gf_timer_call_cancel (rpc->ctx, conn->timer);
+ conn->timer = NULL;
+ }
+
+ if (conn->reconnect) {
+ gf_timer_call_cancel (rpc->ctx, conn->reconnect);
+ conn->reconnect = NULL;
+ }
+ conn->connected = 0;
+
+ if (conn->ping_timer) {
+ gf_timer_call_cancel (rpc->ctx, conn->ping_timer);
+ conn->ping_timer = NULL;
+ conn->ping_started = 0;
+ }
+
+ }
+ pthread_mutex_unlock (&conn->lock);
+
+ rpc_transport_disconnect (rpc->conn.trans);
+
+out:
+ return;
+}
+
+
+void
+rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config)
+{
+ if (config->rpc_timeout) {
+ if (config->rpc_timeout != rpc->conn.config.rpc_timeout)
+ gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ "changing timeout to %d (from %d)",
+ config->rpc_timeout,
+ rpc->conn.config.rpc_timeout);
+ rpc->conn.config.rpc_timeout = config->rpc_timeout;
+ }
+
+ if (config->remote_port) {
+ if (config->remote_port != rpc->conn.config.remote_port)
+ gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ "changing port to %d (from %d)",
+ config->remote_port,
+ rpc->conn.config.remote_port);
+
+ rpc->conn.config.remote_port = config->remote_port;
+ }
+
+ if (config->remote_host) {
+ if (rpc->conn.config.remote_host) {
+ if (strcmp (rpc->conn.config.remote_host,
+ config->remote_host))
+ gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ "changing hostname to %s (from %s)",
+ config->remote_host,
+ rpc->conn.config.remote_host);
+ FREE (rpc->conn.config.remote_host);
+ } else {
+ gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ "setting hostname to %s",
+ config->remote_host);
+ }
+
+ rpc->conn.config.remote_host = gf_strdup (config->remote_host);
+ }
+}