summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpc-clnt.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c828
1 files changed, 558 insertions, 270 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 8d923ed5f..ac98a5c91 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -1,20 +1,11 @@
/*
- Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com>
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
- GlusterFS is free software; you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published
- by the Free Software Foundation; either version 3 of the License,
- or (at your option) any later version.
-
- GlusterFS is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see
- <http://www.gnu.org/licenses/>.
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
*/
@@ -23,7 +14,7 @@
#include "config.h"
#endif
-#define RPC_CLNT_DEFAULT_REQUEST_COUNT 4096
+#define RPC_CLNT_DEFAULT_REQUEST_COUNT 512
#include "rpc-clnt.h"
#include "byte-order.h"
@@ -32,6 +23,7 @@
#include "protocol-common.h"
#include "mem-pool.h"
#include "xdr-rpc.h"
+#include "rpc-common-xdr.h"
void
rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool);
@@ -69,6 +61,21 @@ __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout,
return bailout_frame;
}
+static int
+_is_lock_fop (struct saved_frame *sframe)
+{
+ int fop = 0;
+
+ if (SFRAME_GET_PROGNUM (sframe) == GLUSTER_FOP_PROGRAM &&
+ SFRAME_GET_PROGVER (sframe) == GLUSTER_FOP_VERSION)
+ fop = SFRAME_GET_PROCNUM (sframe);
+
+ return ((fop == GFS3_OP_LK) ||
+ (fop == GFS3_OP_INODELK) ||
+ (fop == GFS3_OP_FINODELK) ||
+ (fop == GFS3_OP_ENTRYLK) ||
+ (fop == GFS3_OP_FENTRYLK));
+}
struct saved_frame *
__saved_frames_put (struct saved_frames *frames, void *frame,
@@ -78,7 +85,6 @@ __saved_frames_put (struct saved_frames *frames, void *frame,
saved_frame = mem_get (rpcreq->conn->rpc_clnt->saved_frames_pool);
if (!saved_frame) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
goto out;
}
/* THIS should be saved and set back */
@@ -91,7 +97,11 @@ __saved_frames_put (struct saved_frames *frames, void *frame,
saved_frame->rpcreq = rpcreq;
gettimeofday (&saved_frame->saved_at, NULL);
- list_add_tail (&saved_frame->list, &frames->sf.list);
+ if (_is_lock_fop (saved_frame))
+ list_add_tail (&saved_frame->list, &frames->lk_sf.list);
+ else
+ list_add_tail (&saved_frame->list, &frames->sf.list);
+
frames->count++;
out:
@@ -103,9 +113,8 @@ void
saved_frames_delete (struct saved_frame *saved_frame,
rpc_clnt_connection_t *conn)
{
- if (!saved_frame || !conn) {
- goto out;
- }
+ GF_VALIDATE_OR_GOTO ("rpc-clnt", saved_frame, out);
+ GF_VALIDATE_OR_GOTO ("rpc-clnt", conn, out);
pthread_mutex_lock (&conn->lock);
{
@@ -119,7 +128,7 @@ saved_frames_delete (struct saved_frame *saved_frame,
conn->rpc_clnt->reqpool);
}
- mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);
+ mem_put (saved_frame);
out:
return;
}
@@ -135,9 +144,8 @@ call_bail (void *data)
struct saved_frame *saved_frame = NULL;
struct saved_frame *trav = NULL;
struct saved_frame *tmp = NULL;
- struct tm frame_sent_tm;
- char frame_sent[32] = {0,};
- struct timeval timeout = {0,};
+ char frame_sent[256] = {0,};
+ struct timespec timeout = {0,};
struct iovec iov = {0,};
GF_VALIDATE_OR_GOTO ("client", data, out);
@@ -155,7 +163,7 @@ call_bail (void *data)
call-once timer */
if (conn->timer) {
timeout.tv_sec = 10;
- timeout.tv_usec = 0;
+ timeout.tv_nsec = 0;
gf_timer_call_cancel (clnt->ctx, conn->timer);
conn->timer = gf_timer_call_after (clnt->ctx,
@@ -164,8 +172,9 @@ call_bail (void *data)
(void *) clnt);
if (conn->timer == NULL) {
- gf_log (conn->trans->name, GF_LOG_DEBUG,
- "Cannot create bailout timer");
+ gf_log (conn->trans->name, GF_LOG_WARNING,
+ "Cannot create bailout timer for %s",
+ conn->trans->peerinfo.identifier);
}
}
@@ -182,25 +191,30 @@ call_bail (void *data)
pthread_mutex_unlock (&conn->lock);
list_for_each_entry_safe (trav, tmp, &list, list) {
- localtime_r (&trav->saved_at.tv_sec, &frame_sent_tm);
- strftime (frame_sent, 32, "%Y-%m-%d %H:%M:%S", &frame_sent_tm);
+ gf_time_fmt (frame_sent, sizeof frame_sent,
+ trav->saved_at.tv_sec, gf_timefmt_FT);
+ snprintf (frame_sent + strlen (frame_sent),
+ 256 - strlen (frame_sent),
+ ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
gf_log (conn->trans->name, GF_LOG_ERROR,
- "bailing out frame type(%s) op(%s(%d)) sent = %s. "
- "timeout = %d",
+ "bailing out frame type(%s) op(%s(%d)) xid = 0x%x "
+ "sent = %s. timeout = %d for %s",
trav->rpcreq->prog->progname,
(trav->rpcreq->prog->procnames) ?
trav->rpcreq->prog->procnames[trav->rpcreq->procnum] :
"--",
- trav->rpcreq->procnum, frame_sent,
- conn->frame_timeout);
+ trav->rpcreq->procnum, trav->rpcreq->xid, frame_sent,
+ conn->frame_timeout, conn->trans->peerinfo.identifier);
+ clnt = rpc_clnt_ref (clnt);
trav->rpcreq->rpc_status = -1;
trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame);
rpc_clnt_reply_deinit (trav->rpcreq, clnt->reqpool);
+ clnt = rpc_clnt_unref (clnt);
list_del_init (&trav->list);
- mem_put (conn->rpc_clnt->saved_frames_pool, trav);
+ mem_put (trav);
}
out:
return;
@@ -213,7 +227,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
struct rpc_req *rpcreq)
{
rpc_clnt_connection_t *conn = NULL;
- struct timeval timeout = {0, };
+ struct timespec timeout = {0, };
struct saved_frame *saved_frame = NULL;
conn = &rpc_clnt->conn;
@@ -227,7 +241,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
/* TODO: make timeout configurable */
if (conn->timer == NULL) {
timeout.tv_sec = 10;
- timeout.tv_usec = 0;
+ timeout.tv_nsec = 0;
conn->timer = gf_timer_call_after (rpc_clnt->ctx,
timeout,
call_bail,
@@ -247,11 +261,11 @@ saved_frames_new (void)
saved_frames = GF_CALLOC (1, sizeof (*saved_frames),
gf_common_mt_rpcclnt_savedframe_t);
if (!saved_frames) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
return NULL;
}
INIT_LIST_HEAD (&saved_frames->sf.list);
+ INIT_LIST_HEAD (&saved_frames->lk_sf.list);
return saved_frames;
}
@@ -273,7 +287,15 @@ __saved_frame_copy (struct saved_frames *frames, int64_t callid,
if (tmp->rpcreq->xid == callid) {
*saved_frame = *tmp;
ret = 0;
- break;
+ goto out;
+ }
+ }
+
+ list_for_each_entry (tmp, &frames->lk_sf.list, list) {
+ if (tmp->rpcreq->xid == callid) {
+ *saved_frame = *tmp;
+ ret = 0;
+ goto out;
}
}
@@ -293,10 +315,20 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid)
list_del_init (&tmp->list);
frames->count--;
saved_frame = tmp;
- break;
+ goto out;
}
}
+ list_for_each_entry (tmp, &frames->lk_sf.list, list) {
+ if (tmp->rpcreq->xid == callid) {
+ list_del_init (&tmp->list);
+ frames->count--;
+ saved_frame = tmp;
+ goto out;
+ }
+ }
+
+out:
if (saved_frame) {
THIS = saved_frame->capital_this;
}
@@ -304,42 +336,47 @@ __saved_frame_get (struct saved_frames *frames, int64_t callid)
return saved_frame;
}
+
void
saved_frames_unwind (struct saved_frames *saved_frames)
{
struct saved_frame *trav = NULL;
struct saved_frame *tmp = NULL;
- struct tm *frame_sent_tm = NULL;
- char timestr[256] = {0,};
-
+ char timestr[1024] = {0,};
struct iovec iov = {0,};
+ list_splice_init (&saved_frames->lk_sf.list, &saved_frames->sf.list);
+
list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) {
- frame_sent_tm = localtime (&trav->saved_at.tv_sec);
- strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S",
- frame_sent_tm);
+ gf_time_fmt (timestr, sizeof timestr,
+ trav->saved_at.tv_sec, gf_timefmt_FT);
snprintf (timestr + strlen (timestr),
sizeof(timestr) - strlen (timestr),
".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
- gf_log ("rpc-clnt", GF_LOG_ERROR,
- "forced unwinding frame type(%s) op(%s(%d)) "
- "called at %s",
- trav->rpcreq->prog->progname,
- (trav->rpcreq->prog->procnames) ?
- trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
- : "--",
- trav->rpcreq->procnum, timestr);
-
+ if (!trav->rpcreq || !trav->rpcreq->prog)
+ continue;
+
+ gf_log_callingfn (trav->rpcreq->conn->trans->name,
+ GF_LOG_ERROR,
+ "forced unwinding frame type(%s) op(%s(%d)) "
+ "called at %s (xid=0x%x)",
+ trav->rpcreq->prog->progname,
+ ((trav->rpcreq->prog->procnames) ?
+ trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
+ : "--"),
+ trav->rpcreq->procnum, timestr,
+ trav->rpcreq->xid);
saved_frames->count--;
trav->rpcreq->rpc_status = -1;
trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame);
+
rpc_clnt_reply_deinit (trav->rpcreq,
trav->rpcreq->conn->rpc_clnt->reqpool);
list_del_init (&trav->list);
- mem_put (trav->rpcreq->conn->rpc_clnt->saved_frames_pool, trav);
+ mem_put (trav);
}
}
@@ -361,7 +398,7 @@ rpc_clnt_reconnect (void *trans_ptr)
{
rpc_transport_t *trans = NULL;
rpc_clnt_connection_t *conn = NULL;
- struct timeval tv = {0, 0};
+ struct timespec ts = {0, 0};
int32_t ret = 0;
struct rpc_clnt *clnt = NULL;
@@ -380,14 +417,15 @@ rpc_clnt_reconnect (void *trans_ptr)
conn->reconnect = 0;
if (conn->connected == 0) {
- tv.tv_sec = 3;
+ ts.tv_sec = 3;
+ ts.tv_nsec = 0;
gf_log (trans->name, GF_LOG_TRACE,
"attempting reconnect");
- ret = rpc_transport_connect (trans, conn->config.remote_port);
-
+ ret = rpc_transport_connect (trans,
+ conn->config.remote_port);
conn->reconnect =
- gf_timer_call_after (clnt->ctx, tv,
+ gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
trans);
} else {
@@ -408,7 +446,7 @@ rpc_clnt_reconnect (void *trans_ptr)
int
rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)
{
- struct saved_frame saved_frame = {{}, 0};
+ struct saved_frame saved_frame;
int ret = -1;
pthread_mutex_lock (&clnt->conn.lock);
@@ -419,10 +457,9 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)
pthread_mutex_unlock (&clnt->conn.lock);
if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the saved "
- "frame corresponding to xid (%d) for msg arrived on "
- "transport %s",
- info->xid, clnt->conn.trans->name);
+ gf_log (clnt->conn.trans->name, GF_LOG_CRITICAL,
+ "cannot lookup the saved "
+ "frame corresponding to xid (%d)", info->xid);
goto out;
}
@@ -457,6 +494,7 @@ rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)
}
}
+ pthread_mutex_unlock (&conn->lock);
out:
return 0;
@@ -479,7 +517,7 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
clnt = conn->rpc_clnt;
- gf_log ("rpc-clnt", GF_LOG_DEBUG,
+ gf_log (conn->trans->name, GF_LOG_TRACE,
"cleaning up state in transport object %p", conn->trans);
pthread_mutex_lock (&conn->lock);
@@ -494,6 +532,12 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
}
conn->connected = 0;
+
+ if (conn->ping_timer) {
+ gf_timer_call_cancel (clnt->ctx, conn->ping_timer);
+ conn->ping_timer = NULL;
+ conn->ping_started = 0;
+ }
}
pthread_mutex_unlock (&conn->lock);
@@ -584,7 +628,7 @@ rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool)
iobref_unref (req->rsp_iobref);
}
- mem_put (pool, req);
+ mem_put (req);
out:
return;
}
@@ -607,7 +651,8 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg,
req->verf.authdata);
if (ret != 0) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC reply decoding failed");
+ gf_log (conn->trans->name, GF_LOG_WARNING,
+ "RPC reply decoding failed");
goto out;
}
@@ -617,34 +662,13 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
goto out;
}
- gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %d Program: %s,"
- " ProgVers: %d, Proc: %d", saved_frame->rpcreq->xid,
+ gf_log (conn->trans->name, GF_LOG_TRACE,
+ "received rpc message (RPC XID: 0x%x"
+ " Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)",
+ saved_frame->rpcreq->xid,
saved_frame->rpcreq->prog->progname,
saved_frame->rpcreq->prog->progver,
- saved_frame->rpcreq->procnum);
-/* TODO: */
- /* TODO: AUTH */
- /* The verifier that is sent in a reply is a string that can be used as
- * a shorthand in credentials for future transactions. We can opt not to
- * use this shorthand, preffering to use the original AUTH_UNIX method
- * for authentication (containing all the details for authentication in
- * credential itself). Hence it is not mandatory for us to be checking
- * the verifier. See Appendix A of rfc-5531 for more details.
- */
-
- /*
- * ret = rpc_authenticate (req);
- * if (ret == RPC_AUTH_REJECT) {
- * gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed authentication");
- * ret = -1;
- * goto out;
- * }
- */
-
- /* If the error is not RPC_MISMATCH, we consider the call as accepted
- * since we are not handling authentication failures for now.
- */
- req->rpc_status = 0;
+ saved_frame->rpcreq->procnum, conn->trans->name);
out:
if (ret != 0) {
@@ -669,32 +693,45 @@ rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg)
msgbuf = msg->vector[0].iov_base;
msglen = msg->vector[0].iov_len;
+ clnt = rpc_clnt_ref (clnt);
ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL);
if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC call decoding failed");
+ gf_log (clnt->conn.trans->name, GF_LOG_WARNING,
+ "RPC call decoding failed");
goto out;
}
- gf_log ("rpc-clnt", GF_LOG_INFO, "RPC XID: %lx, Ver: %ld, Program: %ld,"
- " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg),
+ gf_log (clnt->conn.trans->name, GF_LOG_TRACE,
+ "received rpc message (XID: 0x%lx, "
+ "Ver: %ld, Program: %ld, ProgVers: %ld, Proc: %ld) "
+ "from rpc-transport (%s)", rpc_call_xid (&rpcmsg),
rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
- rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg));
+ rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
+ clnt->conn.trans->name);
procnum = rpc_call_progproc (&rpcmsg);
- list_for_each_entry (program, &clnt->programs, program) {
- if ((program->prognum == rpc_call_program (&rpcmsg))
- && (program->progver == rpc_call_progver (&rpcmsg))) {
- found = 1;
- break;
+ pthread_mutex_lock (&clnt->lock);
+ {
+ list_for_each_entry (program, &clnt->programs, program) {
+ if ((program->prognum == rpc_call_program (&rpcmsg))
+ && (program->progver
+ == rpc_call_progver (&rpcmsg))) {
+ found = 1;
+ break;
+ }
}
}
+ pthread_mutex_unlock (&clnt->lock);
+
if (found && (procnum < program->numactors) &&
(program->actors[procnum].actor)) {
- program->actors[procnum].actor (&progmsg);
+ program->actors[procnum].actor (clnt, program->mydata,
+ &progmsg);
}
out:
+ clnt = rpc_clnt_unref (clnt);
return ret;
}
@@ -707,41 +744,44 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
struct rpc_req *req = NULL;
uint32_t xid = 0;
+ clnt = rpc_clnt_ref (clnt);
conn = &clnt->conn;
xid = ntoh32 (*((uint32_t *)pollin->vector[0].iov_base));
saved_frame = lookup_frame (conn, xid);
if (saved_frame == NULL) {
- gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the "
- "saved frame for reply with xid (%d)", xid);
+ gf_log (conn->trans->name, GF_LOG_ERROR,
+ "cannot lookup the saved frame for reply with xid (%u)",
+ xid);
goto out;
}
req = saved_frame->rpcreq;
if (req == NULL) {
- gf_log ("rpc-clnt", GF_LOG_CRITICAL,
- "saved_frame for reply with xid (%d)", xid);
+ gf_log (conn->trans->name, GF_LOG_ERROR,
+ "no request with frame for xid (%u)", xid);
goto out;
}
-
+
ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame);
if (ret != 0) {
req->rpc_status = -1;
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply "
- "failed");
+ gf_log (conn->trans->name, GF_LOG_WARNING,
+ "initialising rpc reply failed");
}
req->cbkfn (req, req->rsp, req->rspcnt, saved_frame->frame);
-
+
if (req) {
rpc_clnt_reply_deinit (req, conn->rpc_clnt->reqpool);
}
out:
if (saved_frame) {
- mem_put (conn->rpc_clnt->saved_frames_pool, saved_frame);
+ mem_put (saved_frame);
}
+ clnt = rpc_clnt_unref (clnt);
return ret;
}
@@ -781,36 +821,42 @@ out:
return;
}
+static void
+rpc_clnt_destroy (struct rpc_clnt *rpc);
int
rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_event_t event, void *data, ...)
{
- rpc_clnt_connection_t *conn = NULL;
- struct rpc_clnt *clnt = NULL;
- int ret = -1;
- rpc_request_info_t *req_info = NULL;
- rpc_transport_pollin_t *pollin = NULL;
- struct timeval tv = {0, };
+ rpc_clnt_connection_t *conn = NULL;
+ struct rpc_clnt *clnt = NULL;
+ int ret = -1;
+ rpc_request_info_t *req_info = NULL;
+ rpc_transport_pollin_t *pollin = NULL;
+ struct timespec ts = {0, };
conn = mydata;
if (conn == NULL) {
goto out;
}
clnt = conn->rpc_clnt;
+ if (!clnt)
+ goto out;
switch (event) {
case RPC_TRANSPORT_DISCONNECT:
{
- rpc_clnt_connection_cleanup (&clnt->conn);
+ rpc_clnt_connection_cleanup (conn);
pthread_mutex_lock (&conn->lock);
{
- if (conn->reconnect == NULL) {
- tv.tv_sec = 10;
+ if (!conn->rpc_clnt->disabled
+ && (conn->reconnect == NULL)) {
+ ts.tv_sec = 10;
+ ts.tv_nsec = 0;
conn->reconnect =
- gf_timer_call_after (clnt->ctx, tv,
+ gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
conn->trans);
}
@@ -818,15 +864,13 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
pthread_mutex_unlock (&conn->lock);
if (clnt->notifyfn)
- ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT,
- NULL);
+ ret = clnt->notifyfn (clnt, clnt->mydata,
+ RPC_CLNT_DISCONNECT, NULL);
break;
}
case RPC_TRANSPORT_CLEANUP:
- /* this event should not be received on a client for, a
- * transport is only disconnected, but never destroyed.
- */
+ rpc_clnt_destroy (clnt);
ret = 0;
break;
@@ -839,6 +883,12 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
case RPC_TRANSPORT_MSG_RECEIVED:
{
+ pthread_mutex_lock (&conn->lock);
+ {
+ gettimeofday (&conn->last_received, NULL);
+ }
+ pthread_mutex_unlock (&conn->lock);
+
pollin = data;
if (pollin->is_reply)
ret = rpc_clnt_handle_reply (clnt, pollin);
@@ -864,8 +914,17 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
case RPC_TRANSPORT_CONNECT:
{
+ /* Every time there is a disconnection, processes
+ should try to connect to 'glusterd' (ie, default
+ port) or whichever port given as 'option remote-port'
+ in volume file. */
+ /* Below code makes sure the (re-)configured port lasts
+ for just one successful attempt */
+ conn->config.remote_port = 0;
+
if (clnt->notifyfn)
- ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_CONNECT, NULL);
+ ret = clnt->notifyfn (clnt, clnt->mydata,
+ RPC_CLNT_CONNECT, NULL);
break;
}
@@ -889,7 +948,7 @@ rpc_clnt_connection_deinit (rpc_clnt_connection_t *conn)
}
-inline int
+static inline int
rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
dict_t *options, char *name)
{
@@ -902,7 +961,7 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
ret = dict_get_int32 (options, "frame-timeout",
&conn->frame_timeout);
if (ret >= 0) {
- gf_log (name, GF_LOG_DEBUG,
+ gf_log (name, GF_LOG_INFO,
"setting frame-timeout to %d", conn->frame_timeout);
} else {
gf_log (name, GF_LOG_DEBUG,
@@ -912,8 +971,9 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
conn->trans = rpc_transport_load (ctx, options, name);
if (!conn->trans) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "loading of new rpc-transport"
+ gf_log (name, GF_LOG_WARNING, "loading of new rpc-transport"
" failed");
+ ret = -1;
goto out;
}
@@ -924,7 +984,7 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
ret = rpc_transport_register_notify (conn->trans, rpc_clnt_notify,
conn);
if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "registering notify failed");
+ gf_log (name, GF_LOG_WARNING, "registering notify failed");
rpc_clnt_connection_cleanup (conn);
conn = NULL;
goto out;
@@ -932,39 +992,37 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
conn->saved_frames = saved_frames_new ();
if (!conn->saved_frames) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "creation of saved_frames "
+ gf_log (name, GF_LOG_WARNING, "creation of saved_frames "
"failed");
rpc_clnt_connection_cleanup (conn);
goto out;
}
- rpc_clnt_reconnect (conn->trans);
-
ret = 0;
out:
return ret;
}
-
struct rpc_clnt *
-rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
- glusterfs_ctx_t *ctx, char *name)
+rpc_clnt_new (dict_t *options, glusterfs_ctx_t *ctx, char *name,
+ uint32_t reqpool_size)
{
int ret = -1;
struct rpc_clnt *rpc = NULL;
rpc = GF_CALLOC (1, sizeof (*rpc), gf_common_mt_rpcclnt_t);
if (!rpc) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
goto out;
}
pthread_mutex_init (&rpc->lock, NULL);
rpc->ctx = ctx;
- rpc->reqpool = mem_pool_new (struct rpc_req,
- RPC_CLNT_DEFAULT_REQUEST_COUNT);
+ if (!reqpool_size)
+ reqpool_size = RPC_CLNT_DEFAULT_REQUEST_COUNT;
+
+ rpc->reqpool = mem_pool_new (struct rpc_req, reqpool_size);
if (rpc->reqpool == NULL) {
pthread_mutex_destroy (&rpc->lock);
GF_FREE (rpc);
@@ -973,7 +1031,7 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
}
rpc->saved_frames_pool = mem_pool_new (struct saved_frame,
- RPC_CLNT_DEFAULT_REQUEST_COUNT);
+ reqpool_size);
if (rpc->saved_frames_pool == NULL) {
pthread_mutex_destroy (&rpc->lock);
mem_pool_destroy (rpc->reqpool);
@@ -985,11 +1043,18 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
ret = rpc_clnt_connection_init (rpc, ctx, options, name);
if (ret == -1) {
pthread_mutex_destroy (&rpc->lock);
+ mem_pool_destroy (rpc->reqpool);
+ mem_pool_destroy (rpc->saved_frames_pool);
GF_FREE (rpc);
rpc = NULL;
+ if (options)
+ dict_unref (options);
goto out;
}
+ rpc->auth_null = dict_get_str_boolean (options, "auth-null", 0);
+
+ rpc = rpc_clnt_ref (rpc);
INIT_LIST_HEAD (&rpc->programs);
out:
@@ -998,6 +1063,22 @@ out:
int
+rpc_clnt_start (struct rpc_clnt *rpc)
+{
+ struct rpc_clnt_connection *conn = NULL;
+
+ if (!rpc)
+ return -1;
+
+ conn = &rpc->conn;
+
+ rpc_clnt_reconnect (conn->trans);
+
+ return 0;
+}
+
+
+int
rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
void *mydata)
{
@@ -1008,7 +1089,7 @@ rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
}
ssize_t
-xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms *au)
+xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms_v2 *au)
{
ssize_t ret = -1;
XDR xdr;
@@ -1016,10 +1097,11 @@ xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms *au)
if ((!dest) || (!au))
return -1;
- xdrmem_create (&xdr, dest, 1024,
- XDR_ENCODE);
+ xdrmem_create (&xdr, dest, GF_MAX_AUTH_BYTES, XDR_ENCODE);
- if (!xdr_auth_glusterfs_parms (&xdr, au)) {
+ if (!xdr_auth_glusterfs_parms_v2 (&xdr, au)) {
+ gf_log (THIS->name, GF_LOG_WARNING,
+ "failed to encode auth glusterfs elements");
ret = -1;
goto ret;
}
@@ -1032,12 +1114,11 @@ ret:
int
-rpc_clnt_fill_request (int prognum, int progver, int procnum, int payload,
- uint64_t xid, struct auth_glusterfs_parms *au,
- struct rpc_msg *request)
+rpc_clnt_fill_request (int prognum, int progver, int procnum,
+ uint64_t xid, struct auth_glusterfs_parms_v2 *au,
+ struct rpc_msg *request, char *auth_data)
{
int ret = -1;
- char dest[1024] = {0,};
if (!request) {
goto out;
@@ -1053,19 +1134,26 @@ rpc_clnt_fill_request (int prognum, int progver, int procnum, int payload,
request->rm_call.cb_vers = progver;
request->rm_call.cb_proc = procnum;
- /* TODO: Using AUTH_GLUSTERFS for time-being. Make it modular in
- * future so it is easy to plug-in new authentication schemes.
+ /* TODO: Using AUTH_(GLUSTERFS/NULL) in a kludgy way for time-being.
+ * Make it modular in future so it is easy to plug-in new
+ * authentication schemes.
*/
- ret = xdr_serialize_glusterfs_auth (dest, au);
- if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot encode credentials");
- goto out;
- }
-
- request->rm_call.cb_cred.oa_flavor = AUTH_GLUSTERFS;
- request->rm_call.cb_cred.oa_base = dest;
- request->rm_call.cb_cred.oa_length = ret;
+ if (auth_data) {
+ ret = xdr_serialize_glusterfs_auth (auth_data, au);
+ if (ret == -1) {
+ gf_log ("rpc-clnt", GF_LOG_DEBUG,
+ "cannot encode credentials");
+ goto out;
+ }
+ request->rm_call.cb_cred.oa_flavor = AUTH_GLUSTERFS_v2;
+ request->rm_call.cb_cred.oa_base = auth_data;
+ request->rm_call.cb_cred.oa_length = ret;
+ } else {
+ request->rm_call.cb_cred.oa_flavor = AUTH_NULL;
+ request->rm_call.cb_cred.oa_base = NULL;
+ request->rm_call.cb_cred.oa_length = 0;
+ }
request->rm_call.cb_verf.oa_flavor = AUTH_NONE;
request->rm_call.cb_verf.oa_base = NULL;
request->rm_call.cb_verf.oa_length = 0;
@@ -1113,50 +1201,57 @@ out:
struct iobuf *
rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver,
- int procnum, size_t payload, uint64_t xid,
- struct auth_glusterfs_parms *au, struct iovec *recbuf)
+ int procnum, size_t hdrsize, uint64_t xid,
+ struct auth_glusterfs_parms_v2 *au,
+ struct iovec *recbuf)
{
- struct rpc_msg request = {0, };
- struct iobuf *request_iob = NULL;
- char *record = NULL;
- struct iovec recordhdr = {0, };
- size_t pagesize = 0;
- int ret = -1;
+ struct rpc_msg request = {0, };
+ struct iobuf *request_iob = NULL;
+ char *record = NULL;
+ struct iovec recordhdr = {0, };
+ size_t pagesize = 0;
+ int ret = -1;
+ size_t xdr_size = 0;
+ char auth_data[GF_MAX_AUTH_BYTES] = {0, };
if ((!clnt) || (!recbuf) || (!au)) {
goto out;
}
+ /* Fill the rpc structure and XDR it into the buffer got above. */
+ if (clnt->auth_null)
+ ret = rpc_clnt_fill_request (prognum, progver, procnum,
+ xid, NULL, &request, NULL);
+ else
+ ret = rpc_clnt_fill_request (prognum, progver, procnum,
+ xid, au, &request, auth_data);
+
+ if (ret == -1) {
+ gf_log (clnt->conn.trans->name, GF_LOG_WARNING,
+ "cannot build a rpc-request xid (%"PRIu64")", xid);
+ goto out;
+ }
+
+ xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request);
+
/* First, try to get a pointer into the buffer which the RPC
* layer can use.
*/
- request_iob = iobuf_get (clnt->ctx->iobuf_pool);
+ request_iob = iobuf_get2 (clnt->ctx->iobuf_pool, (xdr_size + hdrsize));
if (!request_iob) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to get iobuf");
goto out;
}
- pagesize = ((struct iobuf_pool *)clnt->ctx->iobuf_pool)->page_size;
+ pagesize = iobuf_pagesize (request_iob);
record = iobuf_ptr (request_iob); /* Now we have it. */
- /* Fill the rpc structure and XDR it into the buffer got above. */
- ret = rpc_clnt_fill_request (prognum, progver, procnum, payload, xid,
- au, &request);
- if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build a rpc-request "
- "xid (%"PRIu64")", xid);
- goto out;
- }
-
recordhdr = rpc_clnt_record_build_header (record, pagesize, &request,
- payload);
-
- //GF_FREE (request.rm_call.cb_cred.oa_base);
+ hdrsize);
if (!recordhdr.iov_base) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record "
- " header");
+ gf_log (clnt->conn.trans->name, GF_LOG_ERROR,
+ "Failed to build record header");
iobuf_unref (request_iob);
request_iob = NULL;
recbuf->iov_base = NULL;
@@ -1173,43 +1268,50 @@ out:
struct iobuf *
rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame,
- rpc_clnt_prog_t *prog,int procnum, size_t payload_len,
+ rpc_clnt_prog_t *prog, int procnum, size_t hdrlen,
struct iovec *rpchdr, uint64_t callid)
{
- struct auth_glusterfs_parms au = {0, };
- struct iobuf *request_iob = NULL;
+ struct auth_glusterfs_parms_v2 au = {0, };
+ struct iobuf *request_iob = NULL;
+ char owner[4] = {0,};
if (!prog || !rpchdr || !call_frame) {
goto out;
}
- au.pid = call_frame->root->pid;
- au.uid = call_frame->root->uid;
- au.gid = call_frame->root->gid;
- au.ngrps = call_frame->root->ngrps;
- au.lk_owner = call_frame->root->lk_owner;
- if (!au.lk_owner)
- au.lk_owner = au.pid;
+ au.pid = call_frame->root->pid;
+ au.uid = call_frame->root->uid;
+ au.gid = call_frame->root->gid;
+ au.groups.groups_len = call_frame->root->ngrps;
+ au.lk_owner.lk_owner_len = call_frame->root->lk_owner.len;
- gf_log ("", GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d"
- ", gid: %d, owner: %"PRId64,
- au.pid, au.uid, au.gid, au.lk_owner);
+ if (au.groups.groups_len)
+ au.groups.groups_val = call_frame->root->groups;
- memcpy (au.groups, call_frame->root->groups, 16);
+ if (call_frame->root->lk_owner.len)
+ au.lk_owner.lk_owner_val = call_frame->root->lk_owner.data;
+ else {
+ owner[0] = (char)(au.pid & 0xff);
+ owner[1] = (char)((au.pid >> 8) & 0xff);
+ owner[2] = (char)((au.pid >> 16) & 0xff);
+ owner[3] = (char)((au.pid >> 24) & 0xff);
- //rpc_transport_get_myname (clnt->conn.trans, myname, UNIX_PATH_MAX);
- //au.aup_machname = myname;
+ au.lk_owner.lk_owner_val = owner;
+ au.lk_owner.lk_owner_len = 4;
+ }
+
+ gf_log (clnt->conn.trans->name, GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d"
+ ", gid: %d, owner: %s", au.pid, au.uid, au.gid,
+ lkowner_utoa (&call_frame->root->lk_owner));
- /* Assuming the client program would like to speak to the same versioned
- * program on server.
- */
request_iob = rpc_clnt_record_build_record (clnt, prog->prognum,
prog->progver,
- procnum, payload_len,
+ procnum, hdrlen,
callid, &au,
rpchdr);
if (!request_iob) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build rpc-record");
+ gf_log (clnt->conn.trans->name, GF_LOG_WARNING,
+ "cannot build rpc-record");
goto out;
}
@@ -1219,9 +1321,11 @@ out:
int
rpcclnt_cbk_program_register (struct rpc_clnt *clnt,
- rpcclnt_cb_program_t *program)
+ rpcclnt_cb_program_t *program, void *mydata)
{
- int ret = -1;
+ int ret = -1;
+ char already_registered = 0;
+ rpcclnt_cb_program_t *tmp = NULL;
if (!clnt)
goto out;
@@ -1229,18 +1333,52 @@ rpcclnt_cbk_program_register (struct rpc_clnt *clnt,
if (program->actors == NULL)
goto out;
- INIT_LIST_HEAD (&program->program);
+ pthread_mutex_lock (&clnt->lock);
+ {
+ list_for_each_entry (tmp, &clnt->programs, program) {
+ if ((program->prognum == tmp->prognum)
+ && (program->progver == tmp->progver)) {
+ already_registered = 1;
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock (&clnt->lock);
- list_add_tail (&program->program, &clnt->programs);
+ if (already_registered) {
+ gf_log_callingfn (clnt->conn.trans->name, GF_LOG_DEBUG,
+ "already registered");
+ ret = 0;
+ goto out;
+ }
+
+ tmp = GF_CALLOC (1, sizeof (*tmp),
+ gf_common_mt_rpcclnt_cb_program_t);
+ if (tmp == NULL) {
+ goto out;
+ }
+
+ memcpy (tmp, program, sizeof (*tmp));
+ INIT_LIST_HEAD (&tmp->program);
+
+ tmp->mydata = mydata;
+
+ pthread_mutex_lock (&clnt->lock);
+ {
+ list_add_tail (&tmp->program, &clnt->programs);
+ }
+ pthread_mutex_unlock (&clnt->lock);
ret = 0;
- gf_log ("rpc-clnt", GF_LOG_DEBUG, "New program registered: %s, Num: %d,"
- " Ver: %d", program->progname, program->prognum,
+ gf_log (clnt->conn.trans->name, GF_LOG_DEBUG,
+ "New program registered: %s, Num: %d, Ver: %d",
+ program->progname, program->prognum,
program->progver);
out:
if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "Program registration failed:"
+ gf_log (clnt->conn.trans->name, GF_LOG_ERROR,
+ "Program registration failed:"
" %s, Num: %d, Ver: %d", program->progname,
program->prognum, program->progver);
}
@@ -1272,9 +1410,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
goto out;
}
+ conn = &rpc->conn;
+
+ if (conn->trans == NULL) {
+ goto out;
+ }
+
rpcreq = mem_get (rpc->reqpool);
if (rpcreq == NULL) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
goto out;
}
@@ -1284,7 +1427,6 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
if (!iobref) {
iobref = iobref_new ();
if (!iobref) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
goto out;
}
@@ -1293,72 +1435,73 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
callid = rpc_clnt_new_callid (rpc);
- conn = &rpc->conn;
-
rpcreq->prog = prog;
rpcreq->procnum = procnum;
rpcreq->conn = conn;
rpcreq->xid = callid;
rpcreq->cbkfn = cbkfn;
- pthread_mutex_lock (&conn->lock);
- {
- if (conn->connected == 0) {
- rpc_transport_connect (conn->trans,
- conn->config.remote_port);
- }
+ ret = -1;
- ret = -1;
+ if (proghdr) {
+ proglen += iov_length (proghdr, proghdrcount);
+ }
- if (proghdr) {
- proglen += iov_length (proghdr, proghdrcount);
- }
+ request_iob = rpc_clnt_record (rpc, frame, prog,
+ procnum, proglen,
+ &rpchdr, callid);
+ if (!request_iob) {
+ gf_log (conn->trans->name, GF_LOG_WARNING,
+ "cannot build rpc-record");
+ goto out;
+ }
- if (progpayload) {
- proglen += iov_length (progpayload,
- progpayloadcount);
- }
+ iobref_add (iobref, request_iob);
- request_iob = rpc_clnt_record (rpc, frame, prog,
- procnum, proglen,
- &rpchdr, callid);
- if (!request_iob) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG,
- "cannot build rpc-record");
- goto unlock;
- }
+ req.msg.rpchdr = &rpchdr;
+ req.msg.rpchdrcount = 1;
+ req.msg.proghdr = proghdr;
+ req.msg.proghdrcount = proghdrcount;
+ req.msg.progpayload = progpayload;
+ req.msg.progpayloadcount = progpayloadcount;
+ req.msg.iobref = iobref;
- iobref_add (iobref, request_iob);
+ req.rsp.rsphdr = rsphdr;
+ req.rsp.rsphdr_count = rsphdr_count;
+ req.rsp.rsp_payload = rsp_payload;
+ req.rsp.rsp_payload_count = rsp_payload_count;
+ req.rsp.rsp_iobref = rsp_iobref;
+ req.rpc_req = rpcreq;
- req.msg.rpchdr = &rpchdr;
- req.msg.rpchdrcount = 1;
- req.msg.proghdr = proghdr;
- req.msg.proghdrcount = proghdrcount;
- req.msg.progpayload = progpayload;
- req.msg.progpayloadcount = progpayloadcount;
- req.msg.iobref = iobref;
-
- req.rsp.rsphdr = rsphdr;
- req.rsp.rsphdr_count = rsphdr_count;
- req.rsp.rsp_payload = rsp_payload;
- req.rsp.rsp_payload_count = rsp_payload_count;
- req.rsp.rsp_iobref = rsp_iobref;
- req.rpc_req = rpcreq;
+ pthread_mutex_lock (&conn->lock);
+ {
+ if (conn->connected == 0) {
+ ret = rpc_transport_connect (conn->trans,
+ conn->config.remote_port);
+ }
ret = rpc_transport_submit_request (rpc->conn.trans,
&req);
if (ret == -1) {
- gf_log ("rpc-clnt", GF_LOG_DEBUG,
- "transmission of rpc-request failed");
+ gf_log (conn->trans->name, GF_LOG_WARNING,
+ "failed to submit rpc-request "
+ "(XID: 0x%x Program: %s, ProgVers: %d, "
+ "Proc: %d) to rpc-transport (%s)", rpcreq->xid,
+ rpcreq->prog->progname, rpcreq->prog->progver,
+ rpcreq->procnum, rpc->conn.trans->name);
}
if ((ret >= 0) && frame) {
- gettimeofday (&conn->last_sent, NULL);
/* Save the frame in queue */
__save_frame (rpc, frame, rpcreq);
+
+ gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request "
+ "(XID: 0x%x Program: %s, ProgVers: %d, "
+ "Proc: %d) to rpc-transport (%s)", rpcreq->xid,
+ rpcreq->prog->progname, rpcreq->prog->progver,
+ rpcreq->procnum, rpc->conn.trans->name);
}
}
-unlock:
pthread_mutex_unlock (&conn->lock);
if (ret == -1) {
@@ -1368,45 +1511,190 @@ unlock:
ret = 0;
out:
- iobuf_unref (request_iob);
+ if (request_iob) {
+ iobuf_unref (request_iob);
+ }
if (new_iobref && iobref) {
iobref_unref (iobref);
}
if (frame && (ret == -1)) {
- rpcreq->rpc_status = -1;
- cbkfn (rpcreq, NULL, 0, frame);
- mem_put (rpc->reqpool, rpcreq);
+ if (rpcreq) {
+ rpcreq->rpc_status = -1;
+ cbkfn (rpcreq, NULL, 0, frame);
+ mem_put (rpcreq);
+ }
}
return ret;
}
-void
+struct rpc_clnt *
+rpc_clnt_ref (struct rpc_clnt *rpc)
+{
+ if (!rpc)
+ return NULL;
+ pthread_mutex_lock (&rpc->lock);
+ {
+ rpc->refcount++;
+ }
+ pthread_mutex_unlock (&rpc->lock);
+ return rpc;
+}
+
+
+static void
+rpc_clnt_trigger_destroy (struct rpc_clnt *rpc)
+{
+ if (!rpc)
+ return;
+
+ rpc_clnt_disable (rpc);
+ rpc_transport_unref (rpc->conn.trans);
+}
+
+static void
rpc_clnt_destroy (struct rpc_clnt *rpc)
{
- rpc_clnt_connection_cleanup (&rpc->conn);
- rpc_clnt_reconnect_cleanup (&rpc->conn);
+ if (!rpc)
+ return;
+
+ saved_frames_destroy (rpc->conn.saved_frames);
pthread_mutex_destroy (&rpc->lock);
pthread_mutex_destroy (&rpc->conn.lock);
+
+ /* mem-pool should be destroyed, otherwise,
+ it will cause huge memory leaks */
+ mem_pool_destroy (rpc->reqpool);
+ mem_pool_destroy (rpc->saved_frames_pool);
+
GF_FREE (rpc);
return;
}
+struct rpc_clnt *
+rpc_clnt_unref (struct rpc_clnt *rpc)
+{
+ int count = 0;
+
+ if (!rpc)
+ return NULL;
+ pthread_mutex_lock (&rpc->lock);
+ {
+ count = --rpc->refcount;
+ }
+ pthread_mutex_unlock (&rpc->lock);
+ if (!count) {
+ rpc_clnt_trigger_destroy (rpc);
+ return NULL;
+ }
+ return rpc;
+}
+
+
+char
+rpc_clnt_is_disabled (struct rpc_clnt *rpc)
+{
+
+ rpc_clnt_connection_t *conn = NULL;
+ char disabled = 0;
+
+ if (!rpc) {
+ goto out;
+ }
+
+ conn = &rpc->conn;
+
+ pthread_mutex_lock (&conn->lock);
+ {
+ disabled = rpc->disabled;
+ }
+ pthread_mutex_unlock (&conn->lock);
+
+out:
+ return disabled;
+}
+
+void
+rpc_clnt_disable (struct rpc_clnt *rpc)
+{
+ rpc_clnt_connection_t *conn = NULL;
+
+ if (!rpc) {
+ goto out;
+ }
+
+ conn = &rpc->conn;
+
+ pthread_mutex_lock (&conn->lock);
+ {
+ rpc->disabled = 1;
+
+ if (conn->timer) {
+ gf_timer_call_cancel (rpc->ctx, conn->timer);
+ conn->timer = NULL;
+ }
+
+ if (conn->reconnect) {
+ gf_timer_call_cancel (rpc->ctx, conn->reconnect);
+ conn->reconnect = NULL;
+ }
+ conn->connected = 0;
+
+ if (conn->ping_timer) {
+ gf_timer_call_cancel (rpc->ctx, conn->ping_timer);
+ conn->ping_timer = NULL;
+ conn->ping_started = 0;
+ }
+
+ }
+ pthread_mutex_unlock (&conn->lock);
+
+ rpc_transport_disconnect (rpc->conn.trans);
+
+out:
+ return;
+}
+
void
rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config)
{
- if (config->rpc_timeout)
+ if (config->rpc_timeout) {
+ if (config->rpc_timeout != rpc->conn.config.rpc_timeout)
+ gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ "changing timeout to %d (from %d)",
+ config->rpc_timeout,
+ rpc->conn.config.rpc_timeout);
rpc->conn.config.rpc_timeout = config->rpc_timeout;
+ }
+
+ if (config->remote_port) {
+ if (config->remote_port != rpc->conn.config.remote_port)
+ gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ "changing port to %d (from %d)",
+ config->remote_port,
+ rpc->conn.config.remote_port);
- if (config->remote_port)
rpc->conn.config.remote_port = config->remote_port;
+ }
if (config->remote_host) {
- if (rpc->conn.config.remote_host)
+ if (rpc->conn.config.remote_host) {
+ if (strcmp (rpc->conn.config.remote_host,
+ config->remote_host))
+ gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ "changing hostname to %s (from %s)",
+ config->remote_host,
+ rpc->conn.config.remote_host);
FREE (rpc->conn.config.remote_host);
+ } else {
+ gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ "setting hostname to %s",
+ config->remote_host);
+ }
+
rpc->conn.config.remote_host = gf_strdup (config->remote_host);
}
}