diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 161 |
1 files changed, 101 insertions, 60 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 4920edaa3ad..22513b789d6 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -137,6 +137,7 @@ out: static void call_bail (void *data) { + rpc_transport_t *trans = NULL; struct rpc_clnt *clnt = NULL; rpc_clnt_connection_t *conn = NULL; struct timeval current; @@ -147,12 +148,27 @@ call_bail (void *data) char frame_sent[256] = {0,}; struct timespec timeout = {0,}; struct iovec iov = {0,}; + char peerid[UNIX_PATH_MAX] = {0}; GF_VALIDATE_OR_GOTO ("client", data, out); clnt = data; conn = &clnt->conn; + pthread_mutex_lock (&conn->lock); + { + trans = conn->trans; + if (trans) { + strncpy (peerid, conn->trans->peerinfo.identifier, + sizeof (peerid)-1); + + } + } + pthread_mutex_unlock (&conn->lock); + /*rpc_clnt_connection_cleanup will be unwinding all saved frames, + * bailed or otherwise*/ + if (!trans) + goto out; gettimeofday (¤t, NULL); INIT_LIST_HEAD (&list); @@ -172,9 +188,9 @@ call_bail (void *data) (void *) clnt); if (conn->timer == NULL) { - gf_log (conn->trans->name, GF_LOG_WARNING, + gf_log (conn->name, GF_LOG_WARNING, "Cannot create bailout timer for %s", - conn->trans->peerinfo.identifier); + peerid); } } @@ -197,7 +213,7 @@ call_bail (void *data) 256 - strlen (frame_sent), ".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec); - gf_log (conn->trans->name, GF_LOG_ERROR, + gf_log (conn->name, GF_LOG_ERROR, "bailing out frame type(%s) op(%s(%d)) xid = 0x%x " "sent = %s. timeout = %d for %s", trav->rpcreq->prog->progname, @@ -205,7 +221,7 @@ call_bail (void *data) trav->rpcreq->prog->procnames[trav->rpcreq->procnum] : "--", trav->rpcreq->procnum, trav->rpcreq->xid, frame_sent, - conn->frame_timeout, conn->trans->peerinfo.identifier); + conn->frame_timeout, peerid); clnt = rpc_clnt_ref (clnt); trav->rpcreq->rpc_status = -1; @@ -357,7 +373,7 @@ saved_frames_unwind (struct saved_frames *saved_frames) if (!trav->rpcreq || !trav->rpcreq->prog) continue; - gf_log_callingfn (trav->rpcreq->conn->trans->name, + gf_log_callingfn (trav->rpcreq->conn->name, GF_LOG_ERROR, "forced unwinding frame type(%s) op(%s(%d)) " "called at %s (xid=0x%x)", @@ -394,7 +410,7 @@ saved_frames_destroy (struct saved_frames *frames) void -rpc_clnt_reconnect (void *trans_ptr) +rpc_clnt_reconnect (void *conn_ptr) { rpc_transport_t *trans = NULL; rpc_clnt_connection_t *conn = NULL; @@ -402,15 +418,16 @@ rpc_clnt_reconnect (void *trans_ptr) int32_t ret = 0; struct rpc_clnt *clnt = NULL; - trans = trans_ptr; - if (!trans || !trans->mydata) - return; - - conn = trans->mydata; + conn = conn_ptr; clnt = conn->rpc_clnt; pthread_mutex_lock (&conn->lock); { + trans = conn->trans; + if (!trans) { + pthread_mutex_unlock (&conn->lock); + return; + } if (conn->reconnect) gf_timer_call_cancel (clnt->ctx, conn->reconnect); @@ -420,16 +437,16 @@ rpc_clnt_reconnect (void *trans_ptr) ts.tv_sec = 3; ts.tv_nsec = 0; - gf_log (trans->name, GF_LOG_TRACE, + gf_log (conn->name, GF_LOG_TRACE, "attempting reconnect"); ret = rpc_transport_connect (trans, conn->config.remote_port); conn->reconnect = gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, - trans); + conn); } else { - gf_log (trans->name, GF_LOG_TRACE, + gf_log (conn->name, GF_LOG_TRACE, "breaking reconnect chain"); } } @@ -457,7 +474,7 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) pthread_mutex_unlock (&clnt->conn.lock); if (ret == -1) { - gf_log (clnt->conn.trans->name, GF_LOG_CRITICAL, + gf_log (clnt->conn.name, GF_LOG_CRITICAL, "cannot lookup the saved " "frame corresponding to xid (%d)", info->xid); goto out; @@ -517,11 +534,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) clnt = conn->rpc_clnt; - gf_log (conn->trans->name, GF_LOG_TRACE, - "cleaning up state in transport object %p", conn->trans); - pthread_mutex_lock (&conn->lock); { + saved_frames = conn->saved_frames; conn->saved_frames = saved_frames_new (); @@ -651,7 +666,7 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg, req->verf.authdata); if (ret != 0) { - gf_log (conn->trans->name, GF_LOG_WARNING, + gf_log (conn->name, GF_LOG_WARNING, "RPC reply decoding failed"); goto out; } @@ -662,13 +677,13 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, goto out; } - gf_log (conn->trans->name, GF_LOG_TRACE, + gf_log (conn->name, GF_LOG_TRACE, "received rpc message (RPC XID: 0x%x" " Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)", saved_frame->rpcreq->xid, saved_frame->rpcreq->prog->progname, saved_frame->rpcreq->prog->progver, - saved_frame->rpcreq->procnum, conn->trans->name); + saved_frame->rpcreq->procnum, conn->name); out: if (ret != 0) { @@ -696,18 +711,18 @@ rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) clnt = rpc_clnt_ref (clnt); ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL); if (ret == -1) { - gf_log (clnt->conn.trans->name, GF_LOG_WARNING, + gf_log (clnt->conn.name, GF_LOG_WARNING, "RPC call decoding failed"); goto out; } - gf_log (clnt->conn.trans->name, GF_LOG_TRACE, + gf_log (clnt->conn.name, GF_LOG_TRACE, "received rpc message (XID: 0x%lx, " "Ver: %ld, Program: %ld, ProgVers: %ld, Proc: %ld) " "from rpc-transport (%s)", rpc_call_xid (&rpcmsg), rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg), - clnt->conn.trans->name); + clnt->conn.name); procnum = rpc_call_progproc (&rpcmsg); @@ -750,7 +765,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) xid = ntoh32 (*((uint32_t *)pollin->vector[0].iov_base)); saved_frame = lookup_frame (conn, xid); if (saved_frame == NULL) { - gf_log (conn->trans->name, GF_LOG_ERROR, + gf_log (conn->name, GF_LOG_ERROR, "cannot lookup the saved frame for reply with xid (%u)", xid); goto out; @@ -758,7 +773,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) req = saved_frame->rpcreq; if (req == NULL) { - gf_log (conn->trans->name, GF_LOG_ERROR, + gf_log (conn->name, GF_LOG_ERROR, "no request with frame for xid (%u)", xid); goto out; } @@ -766,7 +781,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame); if (ret != 0) { req->rpc_status = -1; - gf_log (conn->trans->name, GF_LOG_WARNING, + gf_log (conn->name, GF_LOG_WARNING, "initialising rpc reply failed"); } @@ -859,7 +874,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, conn->reconnect = gf_timer_call_after (clnt->ctx, ts, rpc_clnt_reconnect, - conn->trans); + conn); } } pthread_mutex_unlock (&conn->lock); @@ -961,10 +976,17 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, { int ret = -1; rpc_clnt_connection_t *conn = NULL; + rpc_transport_t *trans = NULL; conn = &clnt->conn; pthread_mutex_init (&clnt->conn.lock, NULL); + conn->name = gf_strdup (name); + if (!conn->name) { + ret = -1; + goto out; + } + ret = dict_get_int32 (options, "frame-timeout", &conn->frame_timeout); if (ret >= 0) { @@ -975,25 +997,28 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, "defaulting frame-timeout to 30mins"); conn->frame_timeout = 1800; } + conn->rpc_clnt = clnt; - conn->trans = rpc_transport_load (ctx, options, name); - if (!conn->trans) { + trans = rpc_transport_load (ctx, options, name); + if (!trans) { gf_log (name, GF_LOG_WARNING, "loading of new rpc-transport" " failed"); ret = -1; goto out; } + rpc_transport_ref (trans); - rpc_transport_ref (conn->trans); - - conn->rpc_clnt = clnt; + pthread_mutex_lock (&conn->lock); + { + conn->trans = trans; + trans = NULL; + } + pthread_mutex_unlock (&conn->lock); ret = rpc_transport_register_notify (conn->trans, rpc_clnt_notify, conn); if (ret == -1) { gf_log (name, GF_LOG_WARNING, "registering notify failed"); - rpc_clnt_connection_cleanup (conn); - conn = NULL; goto out; } @@ -1001,13 +1026,26 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, if (!conn->saved_frames) { gf_log (name, GF_LOG_WARNING, "creation of saved_frames " "failed"); - rpc_clnt_connection_cleanup (conn); + ret = -1; goto out; } ret = 0; out: + if (ret) { + pthread_mutex_lock (&conn->lock); + { + trans = conn->trans; + conn->trans = NULL; + } + pthread_mutex_unlock (&conn->lock); + if (trans) + rpc_transport_unref (trans); + //conn cleanup needs to be done since we might have failed to + // register notification. + rpc_clnt_connection_cleanup (conn); + } return ret; } @@ -1079,7 +1117,7 @@ rpc_clnt_start (struct rpc_clnt *rpc) conn = &rpc->conn; - rpc_clnt_reconnect (conn->trans); + rpc_clnt_reconnect (conn); return 0; } @@ -1234,7 +1272,7 @@ rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver, xid, au, &request, auth_data); if (ret == -1) { - gf_log (clnt->conn.trans->name, GF_LOG_WARNING, + gf_log (clnt->conn.name, GF_LOG_WARNING, "cannot build a rpc-request xid (%"PRIu64")", xid); goto out; } @@ -1257,7 +1295,7 @@ rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver, hdrsize); if (!recordhdr.iov_base) { - gf_log (clnt->conn.trans->name, GF_LOG_ERROR, + gf_log (clnt->conn.name, GF_LOG_ERROR, "Failed to build record header"); iobuf_unref (request_iob); request_iob = NULL; @@ -1307,7 +1345,7 @@ rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame, au.lk_owner.lk_owner_len = 4; } - gf_log (clnt->conn.trans->name, GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d" + gf_log (clnt->conn.name, GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d" ", gid: %d, owner: %s", au.pid, au.uid, au.gid, lkowner_utoa (&call_frame->root->lk_owner)); @@ -1317,7 +1355,7 @@ rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame, callid, &au, rpchdr); if (!request_iob) { - gf_log (clnt->conn.trans->name, GF_LOG_WARNING, + gf_log (clnt->conn.name, GF_LOG_WARNING, "cannot build rpc-record"); goto out; } @@ -1353,7 +1391,7 @@ rpcclnt_cbk_program_register (struct rpc_clnt *clnt, pthread_mutex_unlock (&clnt->lock); if (already_registered) { - gf_log_callingfn (clnt->conn.trans->name, GF_LOG_DEBUG, + gf_log_callingfn (clnt->conn.name, GF_LOG_DEBUG, "already registered"); ret = 0; goto out; @@ -1377,14 +1415,14 @@ rpcclnt_cbk_program_register (struct rpc_clnt *clnt, pthread_mutex_unlock (&clnt->lock); ret = 0; - gf_log (clnt->conn.trans->name, GF_LOG_DEBUG, + gf_log (clnt->conn.name, GF_LOG_DEBUG, "New program registered: %s, Num: %d, Ver: %d", program->progname, program->prognum, program->progver); out: if (ret == -1) { - gf_log (clnt->conn.trans->name, GF_LOG_ERROR, + gf_log (clnt->conn.name, GF_LOG_ERROR, "Program registration failed:" " %s, Num: %d, Ver: %d", program->progname, program->prognum, program->progver); @@ -1419,10 +1457,6 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, conn = &rpc->conn; - if (conn->trans == NULL) { - goto out; - } - rpcreq = mem_get (rpc->reqpool); if (rpcreq == NULL) { goto out; @@ -1458,7 +1492,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, procnum, proglen, &rpchdr, callid); if (!request_iob) { - gf_log (conn->trans->name, GF_LOG_WARNING, + gf_log (conn->name, GF_LOG_WARNING, "cannot build rpc-record"); goto out; } @@ -1487,15 +1521,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, conn->config.remote_port); } - ret = rpc_transport_submit_request (rpc->conn.trans, - &req); + ret = rpc_transport_submit_request (conn->trans, &req); if (ret == -1) { - gf_log (conn->trans->name, GF_LOG_WARNING, + gf_log (conn->name, GF_LOG_WARNING, "failed to submit rpc-request " "(XID: 0x%x Program: %s, ProgVers: %d, " "Proc: %d) to rpc-transport (%s)", rpcreq->xid, rpcreq->prog->progname, rpcreq->prog->progver, - rpcreq->procnum, rpc->conn.trans->name); + rpcreq->procnum, conn->name); } if ((ret >= 0) && frame) { @@ -1506,7 +1539,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, "(XID: 0x%x Program: %s, ProgVers: %d, " "Proc: %d) to rpc-transport (%s)", rpcreq->xid, rpcreq->prog->progname, rpcreq->prog->progver, - rpcreq->procnum, rpc->conn.trans->name); + rpcreq->procnum, conn->name); } } pthread_mutex_unlock (&conn->lock); @@ -1554,11 +1587,14 @@ rpc_clnt_ref (struct rpc_clnt *rpc) static void rpc_clnt_trigger_destroy (struct rpc_clnt *rpc) { + rpc_clnt_connection_t *conn = NULL; + if (!rpc) return; + conn = &rpc->conn; rpc_clnt_disable (rpc); - rpc_transport_unref (rpc->conn.trans); + rpc_transport_unref (conn->trans); } static void @@ -1627,6 +1663,7 @@ void rpc_clnt_disable (struct rpc_clnt *rpc) { rpc_clnt_connection_t *conn = NULL; + rpc_transport_t *trans = NULL; if (!rpc) { goto out; @@ -1654,11 +1691,15 @@ rpc_clnt_disable (struct rpc_clnt *rpc) conn->ping_timer = NULL; conn->ping_started = 0; } + trans = conn->trans; + conn->trans = NULL; } pthread_mutex_unlock (&conn->lock); - rpc_transport_disconnect (rpc->conn.trans); + if (trans) { + rpc_transport_disconnect (trans); + } out: return; @@ -1670,7 +1711,7 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config) { if (config->rpc_timeout) { if (config->rpc_timeout != rpc->conn.config.rpc_timeout) - gf_log (rpc->conn.trans->name, GF_LOG_INFO, + gf_log (rpc->conn.name, GF_LOG_INFO, "changing timeout to %d (from %d)", config->rpc_timeout, rpc->conn.config.rpc_timeout); @@ -1679,7 +1720,7 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config) if (config->remote_port) { if (config->remote_port != rpc->conn.config.remote_port) - gf_log (rpc->conn.trans->name, GF_LOG_INFO, + gf_log (rpc->conn.name, GF_LOG_INFO, "changing port to %d (from %d)", config->remote_port, rpc->conn.config.remote_port); @@ -1691,13 +1732,13 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config) if (rpc->conn.config.remote_host) { if (strcmp (rpc->conn.config.remote_host, config->remote_host)) - gf_log (rpc->conn.trans->name, GF_LOG_INFO, + gf_log (rpc->conn.name, GF_LOG_INFO, "changing hostname to %s (from %s)", config->remote_host, rpc->conn.config.remote_host); GF_FREE (rpc->conn.config.remote_host); } else { - gf_log (rpc->conn.trans->name, GF_LOG_INFO, + gf_log (rpc->conn.name, GF_LOG_INFO, "setting hostname to %s", config->remote_host); } |