/* Copyright (c) 2010 Gluster, Inc. This file is part of GlusterFS. GlusterFS is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. GlusterFS is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" #endif #include "rpc-clnt.h" #include "xdr-rpcclnt.h" #include "rpc-transport.h" #include "protocol-common.h" uint64_t rpc_clnt_new_callid (struct rpc_clnt *clnt) { uint64_t callid = 0; pthread_mutex_lock (&clnt->lock); { callid = ++clnt->xid; } pthread_mutex_unlock (&clnt->lock); return callid; } struct saved_frame * __saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout, struct timeval *current) { struct saved_frame *bailout_frame = NULL, *tmp = NULL; if (!list_empty(&frames->sf.list)) { tmp = list_entry (frames->sf.list.next, typeof (*tmp), list); if ((tmp->saved_at.tv_sec + timeout) < current->tv_sec) { bailout_frame = tmp; list_del_init (&bailout_frame->list); frames->count--; } } return bailout_frame; } struct saved_frame * __saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum, rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid) { struct saved_frame *saved_frame = NULL; saved_frame = GF_CALLOC (1, sizeof (*saved_frame), gf_common_mt_rpcclnt_savedframe_t); if (!saved_frame) { gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } /* THIS should be saved and set back */ INIT_LIST_HEAD (&saved_frame->list); saved_frame->capital_this = THIS; saved_frame->frame = frame; saved_frame->procnum = procnum; saved_frame->callid = callid; saved_frame->prog = prog; saved_frame->cbkfn = cbk; gettimeofday (&saved_frame->saved_at, NULL); list_add_tail (&saved_frame->list, &frames->sf.list); frames->count++; out: return saved_frame; } void saved_frames_delete (struct saved_frame *saved_frame, rpc_clnt_connection_t *conn) { if (!saved_frame || !conn) { goto out; } pthread_mutex_lock (&conn->lock); { list_del_init (&saved_frame->list); conn->saved_frames->count--; } pthread_mutex_unlock (&conn->lock); GF_FREE (saved_frame); out: return; } static void call_bail (void *data) { struct rpc_clnt *clnt = NULL; rpc_clnt_connection_t *conn = NULL; struct timeval current; struct list_head list; struct saved_frame *saved_frame = NULL; struct saved_frame *trav = NULL; struct saved_frame *tmp = NULL; struct tm frame_sent_tm; char frame_sent[32] = {0,}; struct timeval timeout = {0,}; gf_timer_cbk_t timer_cbk = NULL; struct rpc_req req; struct iovec iov = {0,}; GF_VALIDATE_OR_GOTO ("client", data, out); clnt = data; conn = &clnt->conn; gettimeofday (¤t, NULL); INIT_LIST_HEAD (&list); pthread_mutex_lock (&conn->lock); { /* Chaining to get call-always functionality from call-once timer */ if (conn->timer) { timer_cbk = conn->timer->callbk; timeout.tv_sec = 10; timeout.tv_usec = 0; gf_timer_call_cancel (clnt->ctx, conn->timer); conn->timer = gf_timer_call_after (clnt->ctx, timeout, call_bail, (void *) clnt); if (conn->timer == NULL) { gf_log (conn->trans->name, GF_LOG_DEBUG, "Cannot create bailout timer"); } } do { saved_frame = __saved_frames_get_timedout (conn->saved_frames, conn->frame_timeout, ¤t); if (saved_frame) list_add (&saved_frame->list, &list); } while (saved_frame); } pthread_mutex_unlock (&conn->lock); list_for_each_entry_safe (trav, tmp, &list, list) { localtime_r (&trav->saved_at.tv_sec, &frame_sent_tm); strftime (frame_sent, 32, "%Y-%m-%d %H:%M:%S", &frame_sent_tm); gf_log (conn->trans->name, GF_LOG_ERROR, "bailing out frame type(%s) op(%s(%d)) sent = %s. " "timeout = %d", trav->prog->progname, (trav->prog->procnames) ? trav->prog->procnames[trav->procnum] : "--", trav->procnum, frame_sent, conn->frame_timeout); trav->cbkfn (&req, &iov, 1, trav->frame); list_del_init (&trav->list); GF_FREE (trav); } out: return; } /* to be called with conn->lock held */ struct saved_frame * __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid) { rpc_clnt_connection_t *conn = NULL; struct timeval timeout = {0, }; struct saved_frame *saved_frame = NULL; conn = &rpc_clnt->conn; saved_frame = __saved_frames_put (conn->saved_frames, frame, procnum, prog, cbk, callid); if (saved_frame == NULL) { goto out; } /* TODO: make timeout configurable */ if (conn->timer == NULL) { timeout.tv_sec = 10; timeout.tv_usec = 0; conn->timer = gf_timer_call_after (rpc_clnt->ctx, timeout, call_bail, (void *) rpc_clnt); } out: return saved_frame; } struct saved_frames * saved_frames_new (void) { struct saved_frames *saved_frames = NULL; saved_frames = GF_CALLOC (1, sizeof (*saved_frames), gf_common_mt_rpcclnt_savedframe_t); if (!saved_frames) { gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); return NULL; } INIT_LIST_HEAD (&saved_frames->sf.list); return saved_frames; } int __saved_frame_copy (struct saved_frames *frames, int64_t callid, struct saved_frame *saved_frame) { struct saved_frame *tmp = NULL; int ret = -1; if (!saved_frame) { ret = 0; goto out; } list_for_each_entry (tmp, &frames->sf.list, list) { if (tmp->callid == callid) { *saved_frame = *tmp; ret = 0; break; } } out: return ret; } struct saved_frame * __saved_frame_get (struct saved_frames *frames, int64_t callid) { struct saved_frame *saved_frame = NULL; struct saved_frame *tmp = NULL; list_for_each_entry (tmp, &frames->sf.list, list) { if (tmp->callid == callid) { list_del_init (&tmp->list); frames->count--; saved_frame = tmp; break; } } if (saved_frame) { THIS = saved_frame->capital_this; } return saved_frame; } void saved_frames_unwind (struct saved_frames *saved_frames) { struct saved_frame *trav = NULL; struct saved_frame *tmp = NULL; struct rpc_req req; struct iovec iov = {0,}; memset (&req, 0, sizeof (req)); req.rpc_status = -1; list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) { gf_log ("rpc-clnt", GF_LOG_ERROR, "forced unwinding frame type(%s) op(%s(%d))", trav->prog->progname, (trav->prog->procnames) ? trav->prog->procnames[trav->procnum] : "--", trav->procnum); saved_frames->count--; trav->cbkfn (&req, &iov, 1, trav->frame); list_del_init (&trav->list); GF_FREE (trav); } } void saved_frames_destroy (struct saved_frames *frames) { saved_frames_unwind (frames); GF_FREE (frames); } void rpc_clnt_reconnect (void *trans_ptr) { rpc_transport_t *trans = NULL; rpc_clnt_connection_t *conn = NULL; struct timeval tv = {0, 0}; int32_t ret = 0; struct rpc_clnt *clnt = NULL; trans = trans_ptr; if (!trans || !trans->mydata) return; conn = trans->mydata; clnt = conn->rpc_clnt; pthread_mutex_lock (&conn->lock); { if (conn->reconnect) gf_timer_call_cancel (clnt->ctx, conn->reconnect); conn->reconnect = 0; if (conn->connected == 0) { tv.tv_sec = 3; gf_log (trans->name, GF_LOG_TRACE, "attempting reconnect"); ret = rpc_transport_connect (trans); conn->reconnect = gf_timer_call_after (clnt->ctx, tv, rpc_clnt_reconnect, trans); } else { gf_log (trans->name, GF_LOG_TRACE, "breaking reconnect chain"); } } pthread_mutex_unlock (&conn->lock); if ((ret == -1) && (errno != EINPROGRESS) && (clnt->notifyfn)) { clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL); } return; } int rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) { struct saved_frame saved_frame = {{}, 0}; int ret = -1; pthread_mutex_lock (&clnt->conn.lock); { ret = __saved_frame_copy (clnt->conn.saved_frames, info->xid, &saved_frame); } pthread_mutex_unlock (&clnt->conn.lock); if (ret == -1) { gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the saved " "frame corresponding to xid (%d)", info->xid); goto out; } info->prognum = saved_frame.prog->prognum; info->procnum = saved_frame.procnum; info->progver = saved_frame.prog->progver; info->rsp = saved_frame.rsp; ret = 0; out: return ret; } /* * client_protocol_cleanup - cleanup function * @trans: transport object * */ int rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) { struct saved_frames *saved_frames = NULL; struct rpc_clnt *clnt = NULL; if (!conn) { goto out; } clnt = conn->rpc_clnt; gf_log ("rpc-clnt", GF_LOG_DEBUG, "cleaning up state in transport object %p", conn->trans); pthread_mutex_lock (&conn->lock); { saved_frames = conn->saved_frames; conn->saved_frames = saved_frames_new (); /* bailout logic cleanup */ if (conn->timer) { gf_timer_call_cancel (clnt->ctx, conn->timer); conn->timer = NULL; } if (conn->reconnect == NULL) { /* :O This part is empty.. any thing missing? */ } conn->connected = 0; } pthread_mutex_unlock (&conn->lock); saved_frames_destroy (saved_frames); out: return 0; } /* * lookup_frame - lookup call frame corresponding to a given callid * @trans: transport object * @callid: call id of the frame * * not for external reference */ static struct saved_frame * lookup_frame (rpc_clnt_connection_t *conn, int64_t callid) { struct saved_frame *frame = NULL; pthread_mutex_lock (&conn->lock); { frame = __saved_frame_get (conn->saved_frames, callid); } pthread_mutex_unlock (&conn->lock); return frame; } int rpc_clnt_reply_fill (rpc_transport_pollin_t *msg, rpc_clnt_connection_t *conn, struct rpc_msg *replymsg, struct iovec progmsg, struct rpc_req *req, struct saved_frame *saved_frame) { int ret = -1; if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) { goto out; } req->rpc_status = 0; if ((rpc_reply_status (replymsg) == MSG_DENIED) || (rpc_accepted_reply_status (replymsg) != SUCCESS)) { req->rpc_status = -1; } req->xid = rpc_reply_xid (replymsg); req->prog = saved_frame->prog; req->procnum = saved_frame->procnum; req->conn = conn; req->rsp[0] = progmsg; if (msg->vectored) { req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2); req->rsp[1].iov_len = msg->data.vector.size2; req->rspcnt = 2; req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1); req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2); } else { req->rspcnt = 1; req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf); } /* By this time, the data bytes for the auth scheme would have already * been copied into the required sections of the req structure, * we just need to fill in the meta-data about it now. */ if (req->rpc_status == 0) { /* * req->verf.flavour = rpc_reply_verf_flavour (replymsg); * req->verf.datalen = rpc_reply_verf_len (replymsg); */ } ret = 0; out: return ret; } void rpc_clnt_reply_deinit (struct rpc_req *req) { if (!req) { goto out; } if (req->rsp_prochdr) { iobuf_unref (req->rsp_prochdr); } if (req->rsp_procpayload) { iobuf_unref (req->rsp_procpayload); } out: return; } /* TODO: use mem-pool for allocating requests */ int rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, struct rpc_req *req, struct saved_frame *saved_frame) { char *msgbuf = NULL; struct rpc_msg rpcmsg; struct iovec progmsg; /* RPC Program payload */ size_t msglen = 0; int ret = -1; if (msg->vectored) { msgbuf = iobuf_ptr (msg->data.vector.iobuf1); msglen = msg->data.vector.size1; } else { msgbuf = iobuf_ptr (msg->data.simple.iobuf); msglen = msg->data.simple.size; } ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg, req->verf.authdata); if (ret != 0) { gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC reply decoding failed"); goto out; } ret = rpc_clnt_reply_fill (msg, conn, &rpcmsg, progmsg, req, saved_frame); if (ret != 0) { goto out; } gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s," " ProgVers: %d, Proc: %d", saved_frame->callid, saved_frame->prog->progname, saved_frame->prog->progver, saved_frame->procnum); /* TODO: */ /* TODO: AUTH */ /* The verifier that is sent in a reply is a string that can be used as * a shorthand in credentials for future transactions. We can opt not to * use this shorthand, preffering to use the original AUTH_UNIX method * for authentication (containing all the details for authentication in * credential itself). Hence it is not mandatory for us to be checking * the verifier. See Appendix A of rfc-5531 for more details. */ /* * ret = rpc_authenticate (req); * if (ret == RPC_AUTH_REJECT) { * gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed authentication"); * ret = -1; * goto out; * } */ /* If the error is not RPC_MISMATCH, we consider the call as accepted * since we are not handling authentication failures for now. */ req->rpc_status = 0; out: if (ret != 0) { req->rpc_status = -1; } return ret; } int rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) { rpc_clnt_connection_t *conn = NULL; struct saved_frame *saved_frame = NULL; rpc_request_info_t *request_info = NULL; int ret = -1; struct rpc_req req = {0, }; conn = &clnt->conn; request_info = pollin->private; saved_frame = lookup_frame (conn, (int64_t)request_info->xid); if (saved_frame == NULL) { gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the " "saved frame for reply with xid (%d), " "prog-version (%d), prog-num (%d)," "procnum (%d)", request_info->xid, request_info->progver, request_info->prognum, request_info->procnum); goto out; } ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame); if (ret != 0) { req.rpc_status = -1; gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply " "failed"); } saved_frame->cbkfn (&req, req.rsp, req.rspcnt, saved_frame->frame); if (ret == 0) { rpc_clnt_reply_deinit (&req); } ret = 0; out: if (saved_frame) { GF_FREE (saved_frame); } return ret; } inline void rpc_clnt_set_connected (rpc_clnt_connection_t *conn) { if (!conn) { goto out; } pthread_mutex_lock (&conn->lock); { conn->connected = 1; } pthread_mutex_unlock (&conn->lock); out: return; } void rpc_clnt_unset_connected (rpc_clnt_connection_t *conn) { if (!conn) { goto out; } pthread_mutex_lock (&conn->lock); { conn->connected = 0; } pthread_mutex_unlock (&conn->lock); out: return; } int rpc_clnt_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...) { rpc_clnt_connection_t *conn = NULL; struct rpc_clnt *clnt = NULL; int ret = -1; rpc_request_info_t *req_info = NULL; rpc_transport_pollin_t *pollin = NULL; struct timeval tv = {0, }; conn = mydata; if (conn == NULL) { goto out; } clnt = conn->rpc_clnt; switch (event) { case RPC_TRANSPORT_DISCONNECT: { rpc_clnt_connection_cleanup (&clnt->conn); pthread_mutex_lock (&conn->lock); { if (conn->reconnect == NULL) { tv.tv_sec = 10; conn->reconnect = gf_timer_call_after (clnt->ctx, tv, rpc_clnt_reconnect, conn->trans); } } pthread_mutex_unlock (&conn->lock); if (clnt->notifyfn) ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL); break; } case RPC_TRANSPORT_CLEANUP: /* this event should not be received on a client for, a * transport is only disconnected, but never destroyed. */ ret = 0; break; case RPC_TRANSPORT_MAP_XID_REQUEST: { req_info = data; ret = rpc_clnt_fill_request_info (clnt, req_info); break; } case RPC_TRANSPORT_MSG_RECEIVED: { pollin = data; ret = rpc_clnt_handle_reply (clnt, pollin); /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG, * data); */ break; } case RPC_TRANSPORT_MSG_SENT: { pthread_mutex_lock (&conn->lock); { gettimeofday (&conn->last_sent, NULL); } pthread_mutex_unlock (&conn->lock); ret = 0; break; } case RPC_TRANSPORT_CONNECT: { if (clnt->notifyfn) ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_CONNECT, NULL); break; } case RPC_TRANSPORT_ACCEPT: /* only meaningful on a server, no need of handling this event * in a client. */ ret = 0; break; } out: return ret; } void rpc_clnt_connection_deinit (rpc_clnt_connection_t *conn) { return; } inline int rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, dict_t *options, char *name) { int ret = -1; rpc_clnt_connection_t *conn = NULL; conn = &clnt->conn; pthread_mutex_init (&clnt->conn.lock, NULL); ret = dict_get_int32 (options, "frame-timeout", &conn->frame_timeout); if (ret >= 0) { gf_log (name, GF_LOG_DEBUG, "setting frame-timeout to %d", conn->frame_timeout); } else { gf_log (name, GF_LOG_DEBUG, "defaulting frame-timeout to 30mins"); conn->frame_timeout = 1800; } conn->trans = rpc_transport_load (ctx, options, name); if (!conn->trans) { gf_log ("rpc-clnt", GF_LOG_DEBUG, "loading of new rpc-transport" " failed"); goto out; } rpc_transport_ref (conn->trans); conn->rpc_clnt = clnt; ret = rpc_transport_register_notify (conn->trans, rpc_clnt_notify, conn); if (ret == -1) { gf_log ("rpc-clnt", GF_LOG_DEBUG, "registering notify failed"); rpc_clnt_connection_cleanup (conn); conn = NULL; goto out; } conn->saved_frames = saved_frames_new (); if (!conn->saved_frames) { gf_log ("rpc-clnt", GF_LOG_DEBUG, "creation of saved_frames " "failed"); rpc_clnt_connection_cleanup (conn); goto out; } rpc_clnt_reconnect (conn->trans); ret = 0; out: return ret; } struct rpc_clnt * rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, glusterfs_ctx_t *ctx, char *name) { int ret = -1; struct rpc_clnt *rpc = NULL; rpc = GF_CALLOC (1, sizeof (*rpc), gf_common_mt_rpcclnt_t); if (!rpc) { gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } pthread_mutex_init (&rpc->lock, NULL); rpc->ctx = ctx; ret = rpc_clnt_connection_init (rpc, ctx, options, name); if (ret == -1) { pthread_mutex_destroy (&rpc->lock); GF_FREE (rpc); rpc = NULL; goto out; } out: return rpc; } int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, void *mydata) { rpc->mydata = mydata; rpc->notifyfn = fn; return 0; } ssize_t xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms *au) { ssize_t ret = -1; XDR xdr; if ((!dest) || (!au)) return -1; xdrmem_create (&xdr, dest, 1024, XDR_ENCODE); if (!xdr_auth_glusterfs_parms (&xdr, au)) { ret = -1; goto ret; } ret = (((size_t)(&xdr)->x_private) - ((size_t)(&xdr)->x_base)); ret: return ret; } int rpc_clnt_fill_request (int prognum, int progver, int procnum, int payload, uint64_t xid, struct auth_glusterfs_parms *au, struct rpc_msg *request) { int ret = -1; char dest[1024] = {0,}; if (!request) { goto out; } memset (request, 0, sizeof (*request)); request->rm_xid = xid; request->rm_direction = CALL; request->rm_call.cb_rpcvers = 2; request->rm_call.cb_prog = prognum; request->rm_call.cb_vers = progver; request->rm_call.cb_proc = procnum; /* TODO: Using AUTH_GLUSTERFS for time-being. Make it modular in * future so it is easy to plug-in new authentication schemes. */ ret = xdr_serialize_glusterfs_auth (dest, au); if (ret == -1) { gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot encode credentials"); goto out; } request->rm_call.cb_cred.oa_flavor = AUTH_GLUSTERFS; request->rm_call.cb_cred.oa_base = dest; request->rm_call.cb_cred.oa_length = ret; request->rm_call.cb_verf.oa_flavor = AUTH_NONE; request->rm_call.cb_verf.oa_base = NULL; request->rm_call.cb_verf.oa_length = 0; ret = 0; out: return ret; } void rpc_clnt_set_lastfrag (uint32_t *fragsize) { (*fragsize) |= 0x80000000U; } void rpc_clnt_set_frag_header_size (uint32_t size, char *haddr) { size = htonl (size); memcpy (haddr, &size, sizeof (size)); } void rpc_clnt_set_last_frag_header_size (uint32_t size, char *haddr) { rpc_clnt_set_lastfrag (&size); rpc_clnt_set_frag_header_size (size, haddr); } struct iovec rpc_clnt_record_build_header (char *recordstart, size_t rlen, struct rpc_msg *request, size_t payload) { struct iovec requesthdr = {0, }; struct iovec txrecord = {0, 0}; size_t fraglen = 0; int ret = -1; /* After leaving aside the 4 bytes for the fragment header, lets * encode the RPC reply structure into the buffer given to us. */ ret = rpc_request_to_xdr (request, (recordstart + RPC_FRAGHDR_SIZE), rlen, &requesthdr); if (ret == -1) { gf_log ("rpc-clnt", GF_LOG_DEBUG, "Failed to create RPC request"); goto out; } fraglen = payload + requesthdr.iov_len; gf_log ("rpc-clnt", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, " "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len); /* Since we're not spreading RPC records over mutiple fragments * we just set this fragment as the first and last fragment for this * record. */ rpc_clnt_set_last_frag_header_size (fraglen, recordstart); /* Even though the RPC record starts at recordstart+RPCSVC_FRAGHDR_SIZE * we need to transmit the record with the fragment header, which starts * at recordstart. */ txrecord.iov_base = recordstart; /* Remember, this is only the vec for the RPC header and does not * include the payload above. We needed the payload only to calculate * the size of the full fragment. This size is sent in the fragment * header. */ txrecord.iov_len = RPC_FRAGHDR_SIZE + requesthdr.iov_len; out: return txrecord; } struct iobuf * rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver, int procnum, size_t payload, uint64_t xid, struct auth_glusterfs_parms *au, struct iovec *recbuf) { struct rpc_msg request = {0, }; struct iobuf *request_iob = NULL; char *record = NULL; struct iovec recordhdr = {0, }; size_t pagesize = 0; int ret = -1; if ((!clnt) || (!recbuf) || (!au)) { goto out; } /* First, try to get a pointer into the buffer which the RPC * layer can use. */ request_iob = iobuf_get (clnt->ctx->iobuf_pool); if (!request_iob) { gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to get iobuf"); goto out; } pagesize = ((struct iobuf_pool *)clnt->ctx->iobuf_pool)->page_size; record = iobuf_ptr (request_iob); /* Now we have it. */ /* Fill the rpc structure and XDR it into the buffer got above. */ ret = rpc_clnt_fill_request (prognum, progver, procnum, payload, xid, au, &request); if (ret == -1) { gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build a rpc-request " "xid (%"PRIu64")", xid); goto out; } recordhdr = rpc_clnt_record_build_header (record, pagesize, &request, payload); //GF_FREE (request.rm_call.cb_cred.oa_base); if (!recordhdr.iov_base) { gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record " " header"); iobuf_unref (request_iob); request_iob = NULL; recbuf->iov_base = NULL; goto out; } recbuf->iov_base = recordhdr.iov_base; recbuf->iov_len = recordhdr.iov_len; out: return request_iob; } struct iobuf * rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame, rpc_clnt_prog_t *prog,int procnum, size_t payload_len, struct iovec *rpchdr, uint64_t callid) { struct auth_glusterfs_parms au = {0, }; struct iobuf *request_iob = NULL; if (!prog || !rpchdr || !call_frame) { goto out; } au.pid = call_frame->root->pid; au.uid = call_frame->root->uid; au.gid = call_frame->root->gid; au.ngrps = call_frame->root->ngrps; au.lk_owner = call_frame->root->lk_owner; if (!au.lk_owner) au.lk_owner = au.pid; gf_log ("", GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d" ", gid: %d, owner: %"PRId64, au.pid, au.uid, au.gid, au.lk_owner); memcpy (au.groups, call_frame->root->groups, 16); //rpc_transport_get_myname (clnt->conn.trans, myname, UNIX_PATH_MAX); //au.aup_machname = myname; /* Assuming the client program would like to speak to the same versioned * program on server. */ request_iob = rpc_clnt_record_build_record (clnt, prog->prognum, prog->progver, procnum, payload_len, callid, &au, rpchdr); if (!request_iob) { gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build rpc-record"); goto out; } out: return request_iob; } int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum, fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount, struct iovec *progpayload, int progpayloadcount, struct iobref *iobref, void *frame) { rpc_clnt_connection_t *conn = NULL; struct iobuf *request_iob = NULL; struct iovec rpchdr = {0,}; struct rpc_req rpcreq = {0,}; rpc_transport_req_t req; int ret = -1; int proglen = 0; char new_iobref = 0; uint64_t callid = 0; if (!rpc || !prog || !frame) { goto out; } memset (&req, 0, sizeof (req)); if (!iobref) { iobref = iobref_new (); if (!iobref) { gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); goto out; } new_iobref = 1; } callid = rpc_clnt_new_callid (rpc); conn = &rpc->conn; pthread_mutex_lock (&conn->lock); { if (conn->connected == 0) { rpc_transport_connect (conn->trans); } ret = -1; if (proghdr) { proglen += iov_length (proghdr, proghdrcount); } if (progpayload) { proglen += iov_length (progpayload, progpayloadcount); } request_iob = rpc_clnt_record (rpc, frame, prog, procnum, proglen, &rpchdr, callid); if (!request_iob) { gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build rpc-record"); goto unlock; } iobref_add (iobref, request_iob); req.msg.rpchdr = &rpchdr; req.msg.rpchdrcount = 1; req.msg.proghdr = proghdr; req.msg.proghdrcount = proghdrcount; req.msg.progpayload = progpayload; req.msg.progpayloadcount = progpayloadcount; req.msg.iobref = iobref; ret = rpc_transport_submit_request (rpc->conn.trans, &req); if (ret == -1) { gf_log ("rpc-clnt", GF_LOG_DEBUG, "transmission of rpc-request failed"); } if ((ret >= 0) && frame) { gettimeofday (&conn->last_sent, NULL); /* Save the frame in queue */ __save_frame (rpc, frame, procnum, prog, cbkfn, callid); } } unlock: pthread_mutex_unlock (&conn->lock); if (ret == -1) { goto out; } ret = 0; out: iobuf_unref (request_iob); if (new_iobref && iobref) { iobref_unref (iobref); } if (frame && (ret == -1)) { rpcreq.rpc_status = -1; cbkfn (&rpcreq, NULL, 0, frame); } return ret; } void rpc_clnt_destroy (struct rpc_clnt *rpc) { rpc_clnt_connection_cleanup (&rpc->conn); pthread_mutex_destroy (&rpc->lock); pthread_mutex_destroy (&rpc->conn.lock); GF_FREE (rpc); return; }