/* Copyright (c) 2008-2012 Red Hat, Inc. This file is part of GlusterFS. This file is licensed to you under your choice of the GNU Lesser General Public License, version 3 or any later version (LGPLv3 or later), or the GNU General Public License, version 2 (GPLv2), in all cases as published by the Free Software Foundation. */ #include "rpcsvc.h" #include "rpc-transport.h" #include "dict.h" #include "logging.h" #include "byte-order.h" #include "common-utils.h" #include "compat-errno.h" #include "list.h" #include "xdr-rpc.h" #include "iobuf.h" #include "globals.h" #include "xdr-common.h" #include "xdr-generic.h" #include "rpc-common-xdr.h" #include "syncop.h" #include "rpc-drc.h" #include "protocol-common.h" #include #include #include #include #include #include #include #include #include #include #ifdef IPV6_DEFAULT #include #endif #include "xdr-rpcclnt.h" #include "glusterfs-acl.h" struct rpcsvc_program gluster_dump_prog; #define rpcsvc_alloc_request(svc, request) \ do { \ request = (rpcsvc_request_t *) mem_get ((svc)->rxpool); \ memset (request, 0, sizeof (rpcsvc_request_t)); \ } while (0) rpcsvc_listener_t * rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans); int rpcsvc_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...); static int rpcsvc_match_subnet_v4 (const char *addrtok, const char *ipaddr); rpcsvc_notify_wrapper_t * rpcsvc_notify_wrapper_alloc (void) { rpcsvc_notify_wrapper_t *wrapper = NULL; wrapper = GF_CALLOC (1, sizeof (*wrapper), gf_common_mt_rpcsvc_wrapper_t); if (!wrapper) { goto out; } INIT_LIST_HEAD (&wrapper->list); out: return wrapper; } void rpcsvc_listener_destroy (rpcsvc_listener_t *listener) { rpcsvc_t *svc = NULL; if (!listener) { goto out; } svc = listener->svc; if (!svc) { goto listener_free; } pthread_rwlock_wrlock (&svc->rpclock); { list_del_init (&listener->list); } pthread_rwlock_unlock (&svc->rpclock); listener_free: GF_FREE (listener); out: return; } rpcsvc_vector_sizer rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, uint32_t progver, int procnum) { rpcsvc_program_t *program = NULL; char found = 0; if (!svc) return NULL; pthread_rwlock_rdlock (&svc->rpclock); { /* Find the matching RPC program from registered list */ list_for_each_entry (program, &svc->programs, program) { if ((program->prognum == prognum) && (program->progver == progver)) { found = 1; break; } } } pthread_rwlock_unlock (&svc->rpclock); if (found) { /* Make sure the requested procnum is supported by RPC prog */ if ((procnum < 0) || (procnum >= program->numactors)) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC procedure %d not available for Program %s", procnum, program->progname); return NULL; } /* SUCCESS: Supported procedure */ return program->actors[procnum].vector_sizer; } return NULL; /* FAIL */ } gf_boolean_t rpcsvc_can_outstanding_req_be_ignored (rpcsvc_request_t *req) { /* * If outstanding_rpc_limit is reached because of blocked locks and * throttling is attempted then no unlock requests will be received. So * the outstanding request count will never change i.e. it will always * be equal to the limit. This also leads to ping timer expiry on * client. */ /* * This is a hack and a necessity until grantedlock == fop completion. * Ideally if we get a blocking lock request which cannot be granted * right now, we should unwind the fop saying “request registered, will * notify you when granted”, which is very hard to implement at the * moment. Until we bring in such mechanism, we will need to live with * not rate-limiting INODELK/ENTRYLK/LK fops */ if ((req->prognum == GLUSTER_FOP_PROGRAM) && (req->progver == GLUSTER_FOP_VERSION)) { if ((req->procnum == GFS3_OP_INODELK) || (req->procnum == GFS3_OP_FINODELK) || (req->procnum == GFS3_OP_ENTRYLK) || (req->procnum == GFS3_OP_FENTRYLK) || (req->procnum == GFS3_OP_LK)) return _gf_true; } return _gf_false; } int rpcsvc_request_outstanding (rpcsvc_request_t *req, int delta) { int ret = -1; int old_count = 0; int new_count = 0; int limit = 0; gf_boolean_t throttle = _gf_false; if (!req) goto out; throttle = rpcsvc_get_throttle (req->svc); if (!throttle) { ret = 0; goto out; } if (rpcsvc_can_outstanding_req_be_ignored (req)) { ret = 0; goto out; } pthread_mutex_lock (&req->trans->lock); { limit = req->svc->outstanding_rpc_limit; if (!limit) goto unlock; old_count = req->trans->outstanding_rpc_count; req->trans->outstanding_rpc_count += delta; new_count = req->trans->outstanding_rpc_count; if (old_count <= limit && new_count > limit) ret = rpc_transport_throttle (req->trans, _gf_true); if (old_count > limit && new_count <= limit) ret = rpc_transport_throttle (req->trans, _gf_false); } unlock: pthread_mutex_unlock (&req->trans->lock); out: return ret; } /* This needs to change to returning errors, since * we need to return RPC specific error messages when some * of the pointers below are NULL. */ rpcsvc_actor_t * rpcsvc_program_actor (rpcsvc_request_t *req) { rpcsvc_program_t *program = NULL; int err = SYSTEM_ERR; rpcsvc_actor_t *actor = NULL; rpcsvc_t *svc = NULL; char found = 0; char *peername = NULL; if (!req) goto err; svc = req->svc; peername = req->trans->peerinfo.identifier; pthread_rwlock_rdlock (&svc->rpclock); { list_for_each_entry (program, &svc->programs, program) { if (program->prognum == req->prognum) { err = PROG_MISMATCH; } if ((program->prognum == req->prognum) && (program->progver == req->progver)) { found = 1; break; } } } pthread_rwlock_unlock (&svc->rpclock); if (!found) { if (err != PROG_MISMATCH) { /* log in DEBUG when nfs clients try to see if * ACL requests are accepted by nfs server */ gf_log (GF_RPCSVC, (req->prognum == ACL_PROGRAM) ? GF_LOG_DEBUG : GF_LOG_WARNING, "RPC program not available (req %u %u) for %s", req->prognum, req->progver, peername); err = PROG_UNAVAIL; goto err; } gf_log (GF_RPCSVC, GF_LOG_WARNING, "RPC program version not available (req %u %u) for %s", req->prognum, req->progver, peername); goto err; } req->prog = program; if (!program->actors) { gf_log (GF_RPCSVC, GF_LOG_WARNING, "RPC Actor not found for program %s %d for %s", program->progname, program->prognum, peername); err = SYSTEM_ERR; goto err; } if ((req->procnum < 0) || (req->procnum >= program->numactors)) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC Program procedure not" " available for procedure %d in %s for %s", req->procnum, program->progname, peername); err = PROC_UNAVAIL; goto err; } actor = &program->actors[req->procnum]; if (!actor->actor) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC Program procedure not" " available for procedure %d in %s for %s", req->procnum, program->progname, peername); err = PROC_UNAVAIL; actor = NULL; goto err; } req->ownthread = program->ownthread; req->synctask = program->synctask; err = SUCCESS; gf_log (GF_RPCSVC, GF_LOG_TRACE, "Actor found: %s - %s for %s", program->progname, actor->procname, peername); err: if (req) req->rpc_err = err; return actor; } /* this procedure can only pass 4 arguments to registered notifyfn. To send more * arguments call wrapper->notify directly. */ static void rpcsvc_program_notify (rpcsvc_listener_t *listener, rpcsvc_event_t event, void *data) { rpcsvc_notify_wrapper_t *wrapper = NULL; if (!listener) { goto out; } list_for_each_entry (wrapper, &listener->svc->notify, list) { if (wrapper->notify) { wrapper->notify (listener->svc, wrapper->data, event, data); } } out: return; } static int rpcsvc_accept (rpcsvc_t *svc, rpc_transport_t *listen_trans, rpc_transport_t *new_trans) { rpcsvc_listener_t *listener = NULL; int32_t ret = -1; listener = rpcsvc_get_listener (svc, -1, listen_trans); if (listener == NULL) { goto out; } rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, new_trans); ret = 0; out: return ret; } void rpcsvc_request_destroy (rpcsvc_request_t *req) { if (!req) { goto out; } if (req->iobref) { iobref_unref (req->iobref); } /* This marks the "end" of an RPC request. Reply is completely written to the socket and is on the way to the client. It is time to decrement the outstanding request counter by 1. */ if (req->prognum) //Only for initialized requests rpcsvc_request_outstanding (req, -1); rpc_transport_unref (req->trans); GF_FREE (req->auxgidlarge); mem_put (req); out: return; } rpcsvc_request_t * rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans, struct rpc_msg *callmsg, struct iovec progmsg, rpc_transport_pollin_t *msg, rpcsvc_request_t *req) { int i = 0; if ((!trans) || (!callmsg)|| (!req) || (!msg)) return NULL; /* We start a RPC request as always denied. */ req->rpc_status = MSG_DENIED; req->xid = rpc_call_xid (callmsg); req->prognum = rpc_call_program (callmsg); req->progver = rpc_call_progver (callmsg); req->procnum = rpc_call_progproc (callmsg); req->trans = rpc_transport_ref (trans); req->count = msg->count; req->msg[0] = progmsg; req->iobref = iobref_ref (msg->iobref); if (msg->vectored) { /* msg->vector[MAX_IOVEC] is defined in structure. prevent a out of bound access */ for (i = 1; i < min (msg->count, MAX_IOVEC); i++) { req->msg[i] = msg->vector[i]; } } req->svc = svc; req->trans_private = msg->private; INIT_LIST_HEAD (&req->txlist); INIT_LIST_HEAD (&req->request_list); req->payloadsize = 0; /* By this time, the data bytes for the auth scheme would have already * been copied into the required sections of the req structure, * we just need to fill in the meta-data about it now. */ rpcsvc_auth_request_init (req, callmsg); return req; } rpcsvc_request_t * rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans, rpc_transport_pollin_t *msg) { char *msgbuf = NULL; struct rpc_msg rpcmsg; struct iovec progmsg; /* RPC Program payload */ rpcsvc_request_t *req = NULL; size_t msglen = 0; int ret = -1; if (!svc || !trans) return NULL; /* We need to allocate the request before actually calling * rpcsvc_request_init on the request so that we, can fill the auth * data directly into the request structure from the message iobuf. * This avoids a need to keep a temp buffer into which the auth data * would've been copied otherwise. */ rpcsvc_alloc_request (svc, req); if (!req) { goto err; } msgbuf = msg->vector[0].iov_base; msglen = msg->vector[0].iov_len; ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, req->cred.authdata,req->verf.authdata); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_WARNING, "RPC call decoding failed"); rpcsvc_request_seterr (req, GARBAGE_ARGS); req->trans = rpc_transport_ref (trans); req->svc = svc; goto err; } ret = -1; rpcsvc_request_init (svc, trans, &rpcmsg, progmsg, msg, req); gf_log (GF_RPCSVC, GF_LOG_TRACE, "received rpc-message " "(XID: 0x%" GF_PRI_RPC_XID ", Ver: %" GF_PRI_RPC_VERSION ", Program: %" GF_PRI_RPC_PROG_ID ", " "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC ") " "from rpc-transport (%s)", rpc_call_xid (&rpcmsg), rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg), trans->name); /* We just received a new request from the wire. Account for it in the outsanding request counter to make sure we don't ingest too many concurrent requests from the same client. */ if (req->prognum) //Only for initialized requests ret = rpcsvc_request_outstanding (req, +1); if (rpc_call_rpcvers (&rpcmsg) != 2) { /* LOG- TODO: print rpc version, also print the peerinfo from transport */ gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC version not supported " "(XID: 0x%" GF_PRI_RPC_XID ", Ver: %" GF_PRI_RPC_VERSION ", Program: %" GF_PRI_RPC_PROG_ID ", " "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC ") " "from trans (%s)", rpc_call_xid (&rpcmsg), rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg), trans->name); rpcsvc_request_seterr (req, RPC_MISMATCH); goto err; } ret = rpcsvc_authenticate (req); if (ret == RPCSVC_AUTH_REJECT) { /* No need to set auth_err, that is the responsibility of * the authentication handler since only that know what exact * error happened. */ rpcsvc_request_seterr (req, AUTH_ERROR); gf_log (GF_RPCSVC, GF_LOG_ERROR, "auth failed on request. " "(XID: 0x%" GF_PRI_RPC_XID ", Ver: %" GF_PRI_RPC_VERSION ", Program: %" GF_PRI_RPC_PROG_ID ", " "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC ") " "from trans (%s)", rpc_call_xid (&rpcmsg), rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg), trans->name); ret = -1; goto err; } /* If the error is not RPC_MISMATCH, we consider the call as accepted * since we are not handling authentication failures for now. */ req->rpc_status = MSG_ACCEPTED; req->reply = NULL; ret = 0; err: if (ret == -1) { ret = rpcsvc_error_reply (req); if (ret) gf_log ("rpcsvc", GF_LOG_WARNING, "failed to queue error reply"); req = NULL; } return req; } int rpcsvc_check_and_reply_error (int ret, call_frame_t *frame, void *opaque) { rpcsvc_request_t *req = NULL; req = opaque; if (ret) gf_log ("rpcsvc", GF_LOG_ERROR, "rpc actor failed to complete successfully"); if (ret == RPCSVC_ACTOR_ERROR) { ret = rpcsvc_error_reply (req); if (ret) gf_log ("rpcsvc", GF_LOG_WARNING, "failed to queue error reply"); } return 0; } int rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, rpc_transport_pollin_t *msg) { rpcsvc_actor_t *actor = NULL; rpcsvc_actor actor_fn = NULL; rpcsvc_request_t *req = NULL; int ret = -1; uint16_t port = 0; gf_boolean_t is_unix = _gf_false, empty = _gf_false; gf_boolean_t unprivileged = _gf_false; drc_cached_op_t *reply = NULL; rpcsvc_drc_globals_t *drc = NULL; if (!trans || !svc) return -1; switch (trans->peerinfo.sockaddr.ss_family) { case AF_INET: port = ((struct sockaddr_in *)&trans->peerinfo.sockaddr)->sin_port; break; case AF_INET6: port = ((struct sockaddr_in6 *)&trans->peerinfo.sockaddr)->sin6_port; break; case AF_UNIX: is_unix = _gf_true; break; default: gf_log (GF_RPCSVC, GF_LOG_ERROR, "invalid address family (%d)", trans->peerinfo.sockaddr.ss_family); return -1; } if (is_unix == _gf_false) { port = ntohs (port); gf_log ("rpcsvc", GF_LOG_TRACE, "Client port: %d", (int)port); if (port >= 1024) unprivileged = _gf_true; } req = rpcsvc_request_create (svc, trans, msg); if (!req) goto out; if (!rpcsvc_request_accepted (req)) goto err_reply; actor = rpcsvc_program_actor (req); if (!actor) goto err_reply; if (0 == svc->allow_insecure && unprivileged && !actor->unprivileged) { /* Non-privileged user, fail request */ gf_log (GF_RPCSVC, GF_LOG_ERROR, "Request received from non-" "privileged port. Failing request for %s.", req->trans->peerinfo.identifier); req->rpc_status = MSG_DENIED; req->rpc_err = AUTH_ERROR; req->auth_err = RPCSVC_AUTH_REJECT; goto err_reply; } /* DRC */ if (rpcsvc_need_drc (req)) { drc = req->svc->drc; LOCK (&drc->lock); { reply = rpcsvc_drc_lookup (req); /* retransmission of completed request, send cached reply */ if (reply && reply->state == DRC_OP_CACHED) { gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:" " XID: 0x%x", req->xid); ret = rpcsvc_send_cached_reply (req, reply); drc->cache_hits++; UNLOCK (&drc->lock); goto out; } /* retransmitted request, original op in transit, drop it */ else if (reply && reply->state == DRC_OP_IN_TRANSIT) { gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit," " discarding. XID: 0x%x", req->xid); ret = 0; drc->intransit_hits++; rpcsvc_request_destroy (req); UNLOCK (&drc->lock); goto out; } /* fresh request, cache it as in-transit and proceed */ else { ret = rpcsvc_cache_request (req); } } UNLOCK (&drc->lock); } if (req->rpc_err == SUCCESS) { /* Before going to xlator code, set the THIS properly */ THIS = svc->xl; actor_fn = actor->actor; if (!actor_fn) { rpcsvc_request_seterr (req, PROC_UNAVAIL); /* LOG TODO: print more info about procnum, prognum etc, also print transport info */ gf_log (GF_RPCSVC, GF_LOG_ERROR, "No vectored handler present"); ret = RPCSVC_ACTOR_ERROR; goto err_reply; } if (req->synctask) { ret = synctask_new (THIS->ctx->env, (synctask_fn_t) actor_fn, rpcsvc_check_and_reply_error, NULL, req); } else if (req->ownthread) { pthread_mutex_lock (&req->prog->queue_lock); { empty = list_empty (&req->prog->request_queue); list_add_tail (&req->request_list, &req->prog->request_queue); if (empty) pthread_cond_signal (&req->prog->queue_cond); } pthread_mutex_unlock (&req->prog->queue_lock); ret = 0; } else { ret = actor_fn (req); } } err_reply: ret = rpcsvc_check_and_reply_error (ret, NULL, req); /* No need to propagate error beyond this function since the reply * has now been queued. */ ret = 0; out: return ret; } int rpcsvc_handle_disconnect (rpcsvc_t *svc, rpc_transport_t *trans) { rpcsvc_event_t event; rpcsvc_notify_wrapper_t *wrappers = NULL, *wrapper; int32_t ret = -1, i = 0, wrapper_count = 0; rpcsvc_listener_t *listener = NULL; event = (trans->listener == NULL) ? RPCSVC_EVENT_LISTENER_DEAD : RPCSVC_EVENT_DISCONNECT; pthread_rwlock_rdlock (&svc->rpclock); { if (!svc->notify_count) goto unlock; wrappers = GF_CALLOC (svc->notify_count, sizeof (*wrapper), gf_common_mt_rpcsvc_wrapper_t); if (!wrappers) { goto unlock; } list_for_each_entry (wrapper, &svc->notify, list) { if (wrapper->notify) { wrappers[i++] = *wrapper; } } wrapper_count = i; } unlock: pthread_rwlock_unlock (&svc->rpclock); if (wrappers) { for (i = 0; i < wrapper_count; i++) { wrappers[i].notify (svc, wrappers[i].data, event, trans); } GF_FREE (wrappers); } if (event == RPCSVC_EVENT_LISTENER_DEAD) { listener = rpcsvc_get_listener (svc, -1, trans->listener); rpcsvc_listener_destroy (listener); } return ret; } int rpcsvc_notify (rpc_transport_t *trans, void *mydata, rpc_transport_event_t event, void *data, ...) { int ret = -1; rpc_transport_pollin_t *msg = NULL; rpc_transport_t *new_trans = NULL; rpcsvc_t *svc = NULL; rpcsvc_listener_t *listener = NULL; svc = mydata; if (svc == NULL) { goto out; } switch (event) { case RPC_TRANSPORT_ACCEPT: new_trans = data; ret = rpcsvc_accept (svc, trans, new_trans); break; case RPC_TRANSPORT_DISCONNECT: ret = rpcsvc_handle_disconnect (svc, trans); break; case RPC_TRANSPORT_MSG_RECEIVED: msg = data; ret = rpcsvc_handle_rpc_call (svc, trans, msg); break; case RPC_TRANSPORT_MSG_SENT: ret = 0; break; case RPC_TRANSPORT_CONNECT: /* do nothing, no need for rpcsvc to handle this, client should * handle this event */ /* print info about transport too : LOG TODO */ gf_log ("rpcsvc", GF_LOG_CRITICAL, "got CONNECT event, which should have not come"); ret = 0; break; case RPC_TRANSPORT_CLEANUP: listener = rpcsvc_get_listener (svc, -1, trans->listener); if (listener == NULL) { goto out; } rpcsvc_program_notify (listener, RPCSVC_EVENT_TRANSPORT_DESTROY, trans); ret = 0; break; case RPC_TRANSPORT_MAP_XID_REQUEST: /* FIXME: think about this later */ gf_log ("rpcsvc", GF_LOG_CRITICAL, "got MAP_XID event, which should have not come"); ret = 0; break; } out: return ret; } /* Given the RPC reply structure and the payload handed by the RPC program, * encode the RPC record header into the buffer pointed by recordstart. */ struct iovec rpcsvc_record_build_header (char *recordstart, size_t rlen, struct rpc_msg reply, size_t payload) { struct iovec replyhdr; struct iovec txrecord = {0, 0}; size_t fraglen = 0; int ret = -1; /* After leaving aside the 4 bytes for the fragment header, lets * encode the RPC reply structure into the buffer given to us. */ ret = rpc_reply_to_xdr (&reply, recordstart, rlen, &replyhdr); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_WARNING, "Failed to create RPC reply"); goto err; } fraglen = payload + replyhdr.iov_len; gf_log (GF_RPCSVC, GF_LOG_TRACE, "Reply fraglen %zu, payload: %zu, " "rpc hdr: %zu", fraglen, payload, replyhdr.iov_len); txrecord.iov_base = recordstart; /* Remember, this is only the vec for the RPC header and does not * include the payload above. We needed the payload only to calculate * the size of the full fragment. This size is sent in the fragment * header. */ txrecord.iov_len = replyhdr.iov_len; err: return txrecord; } static uint32_t rpc_callback_new_callid (struct rpc_transport *trans) { uint32_t callid = 0; pthread_mutex_lock (&trans->lock); { callid = ++trans->xid; } pthread_mutex_unlock (&trans->lock); return callid; } int rpcsvc_fill_callback (int prognum, int progver, int procnum, int payload, uint32_t xid, struct rpc_msg *request) { int ret = -1; if (!request) { goto out; } memset (request, 0, sizeof (*request)); request->rm_xid = xid; request->rm_direction = CALL; request->rm_call.cb_rpcvers = 2; request->rm_call.cb_prog = prognum; request->rm_call.cb_vers = progver; request->rm_call.cb_proc = procnum; request->rm_call.cb_cred.oa_flavor = AUTH_NONE; request->rm_call.cb_cred.oa_base = NULL; request->rm_call.cb_cred.oa_length = 0; request->rm_call.cb_verf.oa_flavor = AUTH_NONE; request->rm_call.cb_verf.oa_base = NULL; request->rm_call.cb_verf.oa_length = 0; ret = 0; out: return ret; } struct iovec rpcsvc_callback_build_header (char *recordstart, size_t rlen, struct rpc_msg *request, size_t payload) { struct iovec requesthdr = {0, }; struct iovec txrecord = {0, 0}; int ret = -1; size_t fraglen = 0; ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr); if (ret == -1) { gf_log ("rpcsvc", GF_LOG_WARNING, "Failed to create RPC request"); goto out; } fraglen = payload + requesthdr.iov_len; gf_log ("rpcsvc", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, " "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len); txrecord.iov_base = recordstart; /* Remember, this is only the vec for the RPC header and does not * include the payload above. We needed the payload only to calculate * the size of the full fragment. This size is sent in the fragment * header. */ txrecord.iov_len = requesthdr.iov_len; out: return txrecord; } static struct iobuf * rpcsvc_callback_build_record (rpcsvc_t *rpc, int prognum, int progver, int procnum, size_t payload, u_long xid, struct iovec *recbuf) { struct rpc_msg request = {0, }; struct iobuf *request_iob = NULL; char *record = NULL; struct iovec recordhdr = {0, }; size_t pagesize = 0; size_t xdr_size = 0; int ret = -1; if ((!rpc) || (!recbuf)) { goto out; } /* Fill the rpc structure and XDR it into the buffer got above. */ ret = rpcsvc_fill_callback (prognum, progver, procnum, payload, xid, &request); if (ret == -1) { gf_log ("rpcsvc", GF_LOG_WARNING, "cannot build a rpc-request " "xid (%" GF_PRI_RPC_XID ")", xid); goto out; } /* First, try to get a pointer into the buffer which the RPC * layer can use. */ xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request); request_iob = iobuf_get2 (rpc->ctx->iobuf_pool, (xdr_size + payload)); if (!request_iob) { goto out; } pagesize = iobuf_pagesize (request_iob); record = iobuf_ptr (request_iob); /* Now we have it. */ recordhdr = rpcsvc_callback_build_header (record, pagesize, &request, payload); if (!recordhdr.iov_base) { gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record " " header"); iobuf_unref (request_iob); request_iob = NULL; recbuf->iov_base = NULL; goto out; } recbuf->iov_base = recordhdr.iov_base; recbuf->iov_len = recordhdr.iov_len; out: return request_iob; } int rpcsvc_request_submit (rpcsvc_t *rpc, rpc_transport_t *trans, rpcsvc_cbk_program_t *prog, int procnum, void *req, glusterfs_ctx_t *ctx, xdrproc_t xdrproc) { int ret = -1; int count = 0; struct iovec iov = {0, }; struct iobuf *iobuf = NULL; ssize_t xdr_size = 0; struct iobref *iobref = NULL; if (!req) goto out; xdr_size = xdr_sizeof (xdrproc, req); iobuf = iobuf_get2 (ctx->iobuf_pool, xdr_size); if (!iobuf) goto out; iov.iov_base = iobuf->ptr; iov.iov_len = iobuf_pagesize (iobuf); ret = xdr_serialize_generic (iov, req, xdrproc); if (ret == -1) { gf_log (THIS->name, GF_LOG_WARNING, "failed to create XDR payload"); goto out; } iov.iov_len = ret; count = 1; iobref = iobref_new (); if (!iobref) { ret = -1; gf_log ("rpcsvc", GF_LOG_WARNING, "Failed to create iobref"); goto out; } iobref_add (iobref, iobuf); ret = rpcsvc_callback_submit (rpc, trans, prog, procnum, &iov, count, iobref); out: if (iobuf) iobuf_unref (iobuf); if (iobref) iobref_unref (iobref); return ret; } int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, rpcsvc_cbk_program_t *prog, int procnum, struct iovec *proghdr, int proghdrcount, struct iobref *iobref) { struct iobuf *request_iob = NULL; struct iovec rpchdr = {0,}; rpc_transport_req_t req; int ret = -1; int proglen = 0; uint32_t xid = 0; gf_boolean_t new_iobref = _gf_false; if (!rpc) { goto out; } memset (&req, 0, sizeof (req)); if (proghdr) { proglen += iov_length (proghdr, proghdrcount); } xid = rpc_callback_new_callid (trans); request_iob = rpcsvc_callback_build_record (rpc, prog->prognum, prog->progver, procnum, proglen, xid, &rpchdr); if (!request_iob) { gf_log ("rpcsvc", GF_LOG_WARNING, "cannot build rpc-record"); goto out; } if (!iobref) { iobref = iobref_new (); if (!iobref) { gf_log ("rpcsvc", GF_LOG_WARNING, "Failed to create iobref"); goto out; } new_iobref = 1; } iobref_add (iobref, request_iob); req.msg.rpchdr = &rpchdr; req.msg.rpchdrcount = 1; req.msg.proghdr = proghdr; req.msg.proghdrcount = proghdrcount; req.msg.iobref = iobref; ret = rpc_transport_submit_request (trans, &req); if (ret == -1) { gf_log ("rpcsvc", GF_LOG_WARNING, "transmission of rpc-request failed"); goto out; } ret = 0; out: iobuf_unref (request_iob); if (new_iobref) iobref_unref (iobref); return ret; } int rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr, int rpchdrcount, struct iovec *proghdr, int proghdrcount, struct iovec *progpayload, int progpayloadcount, struct iobref *iobref, void *priv) { int ret = -1; rpc_transport_reply_t reply = {{0, }}; if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) { goto out; } reply.msg.rpchdr = rpchdr; reply.msg.rpchdrcount = rpchdrcount; reply.msg.proghdr = proghdr; reply.msg.proghdrcount = proghdrcount; reply.msg.progpayload = progpayload; reply.msg.progpayloadcount = progpayloadcount; reply.msg.iobref = iobref; reply.private = priv; ret = rpc_transport_submit_reply (trans, &reply); out: return ret; } int rpcsvc_fill_reply (rpcsvc_request_t *req, struct rpc_msg *reply) { int ret = -1; rpcsvc_program_t *prog = NULL; if ((!req) || (!reply)) goto out; ret = 0; rpc_fill_empty_reply (reply, req->xid); if (req->rpc_status == MSG_DENIED) { rpc_fill_denied_reply (reply, req->rpc_err, req->auth_err); goto out; } prog = rpcsvc_request_program (req); if (req->rpc_status == MSG_ACCEPTED) rpc_fill_accepted_reply (reply, req->rpc_err, (prog) ? prog->proglowvers : 0, (prog) ? prog->proghighvers: 0, req->verf.flavour, req->verf.datalen, req->verf.authdata); else gf_log (GF_RPCSVC, GF_LOG_ERROR, "Invalid rpc_status value"); out: return ret; } /* Given a request and the reply payload, build a reply and encodes the reply * into a record header. This record header is encoded into the vector pointed * to be recbuf. * msgvec is the buffer that points to the payload of the RPC program. * This buffer can be NULL, if an RPC error reply is being constructed. * The only reason it is needed here is that in case the buffer is provided, * we should account for the length of that buffer in the RPC fragment header. */ struct iobuf * rpcsvc_record_build_record (rpcsvc_request_t *req, size_t payload, size_t hdrlen, struct iovec *recbuf) { struct rpc_msg reply; struct iobuf *replyiob = NULL; char *record = NULL; struct iovec recordhdr = {0, }; size_t pagesize = 0; size_t xdr_size = 0; rpcsvc_t *svc = NULL; int ret = -1; if ((!req) || (!req->trans) || (!req->svc) || (!recbuf)) return NULL; svc = req->svc; /* Fill the rpc structure and XDR it into the buffer got above. */ ret = rpcsvc_fill_reply (req, &reply); if (ret) goto err_exit; xdr_size = xdr_sizeof ((xdrproc_t)xdr_replymsg, &reply); /* Payload would include 'readv' size etc too, where as that comes as another payload iobuf */ replyiob = iobuf_get2 (svc->ctx->iobuf_pool, (xdr_size + hdrlen)); if (!replyiob) { goto err_exit; } pagesize = iobuf_pagesize (replyiob); record = iobuf_ptr (replyiob); /* Now we have it. */ recordhdr = rpcsvc_record_build_header (record, pagesize, reply, payload); if (!recordhdr.iov_base) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to build record " " header"); iobuf_unref (replyiob); replyiob = NULL; recbuf->iov_base = NULL; goto err_exit; } recbuf->iov_base = recordhdr.iov_base; recbuf->iov_len = recordhdr.iov_len; err_exit: return replyiob; } /* * The function to submit a program message to the RPC service. * This message is added to the transmission queue of the * conn. * * Program callers are not expected to use the msgvec->iov_base * address for anything else. * Nor are they expected to free it once this function returns. * Once the transmission of the buffer is completed by the RPC service, * the memory area as referenced through @msg will be unrefed. * If a higher layer does not want anything to do with this iobuf * after this function returns, it should call unref on it. For keeping * it around till the transmission is actually complete, rpcsvc also refs it. * * * If this function returns an error by returning -1, the * higher layer programs should assume that a disconnection happened * and should know that the conn memory area as well as the req structure * has been freed internally. * * For now, this function assumes that a submit is always called * to send a new record. Later, if there is a situation where different * buffers for the same record come from different sources, then we'll * need to change this code to account for multiple submit calls adding * the buffers into a single record. */ int rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, int hdrcount, struct iovec *payload, int payloadcount, struct iobref *iobref) { int ret = -1, i = 0; struct iobuf *replyiob = NULL; struct iovec recordhdr = {0, }; rpc_transport_t *trans = NULL; size_t msglen = 0; size_t hdrlen = 0; char new_iobref = 0; rpcsvc_drc_globals_t *drc = NULL; if ((!req) || (!req->trans)) return -1; trans = req->trans; for (i = 0; i < hdrcount; i++) { msglen += proghdr[i].iov_len; } for (i = 0; i < payloadcount; i++) { msglen += payload[i].iov_len; } gf_log (GF_RPCSVC, GF_LOG_TRACE, "Tx message: %zu", msglen); /* Build the buffer containing the encoded RPC reply. */ replyiob = rpcsvc_record_build_record (req, msglen, hdrlen, &recordhdr); if (!replyiob) { gf_log (GF_RPCSVC, GF_LOG_ERROR,"Reply record creation failed"); goto disconnect_exit; } if (!iobref) { iobref = iobref_new (); if (!iobref) { goto disconnect_exit; } new_iobref = 1; } iobref_add (iobref, replyiob); /* cache the request in the duplicate request cache for appropriate ops */ if ((req->reply) && (rpcsvc_need_drc (req))) { drc = req->svc->drc; LOCK (&drc->lock); ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1, proghdr, hdrcount, payload, payloadcount); UNLOCK (&drc->lock); } ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount, payload, payloadcount, iobref, req->trans_private); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "failed to submit message " "(XID: 0x%x, Program: %s, ProgVers: %d, Proc: %d) to " "rpc-transport (%s)", req->xid, req->prog ? req->prog->progname : "(not matched)", req->prog ? req->prog->progver : 0, req->procnum, trans ? trans->name : ""); } else { gf_log (GF_RPCSVC, GF_LOG_TRACE, "submitted reply for rpc-message (XID: 0x%x, " "Program: %s, ProgVers: %d, Proc: %d) to rpc-transport " "(%s)", req->xid, req->prog ? req->prog->progname: "-", req->prog ? req->prog->progver : 0, req->procnum, trans ? trans->name : ""); } disconnect_exit: if (replyiob) { iobuf_unref (replyiob); } if (new_iobref) { iobref_unref (iobref); } rpcsvc_request_destroy (req); return ret; } int rpcsvc_error_reply (rpcsvc_request_t *req) { struct iovec dummyvec = {0, }; if (!req) return -1; gf_log_callingfn ("", GF_LOG_DEBUG, "sending a RPC error reply"); /* At this point the req should already have been filled with the * appropriate RPC error numbers. */ return rpcsvc_submit_generic (req, &dummyvec, 0, NULL, 0, NULL); } #ifdef IPV6_DEFAULT int rpcsvc_program_register_rpcbind6 (rpcsvc_program_t *newprog, uint32_t port) { const int IP_BUF_LEN = 64; char addr_buf[IP_BUF_LEN]; int err = 0; bool_t success = 0; struct netconfig *nc; struct netbuf *nb; if (!newprog) { goto out; } nc = getnetconfigent ("tcp6"); if (!nc) { err = -1; goto out; } err = sprintf (addr_buf, "::.%d.%d", port >> 8 & 0xff, port & 0xff); if (err < 0) { err = -1; goto out; } nb = uaddr2taddr (nc, addr_buf); if (!nb) { err = -1; goto out; } success = rpcb_set (newprog->prognum, newprog->progver, nc, nb); if (!success) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register the IPv6" " service with rpcbind"); } err = 0; out: return err; } int rpcsvc_program_unregister_rpcbind6 (rpcsvc_program_t *newprog) { int err = 0; bool_t success = 0; struct netconfig *nc; if (!newprog) { goto out; } nc = getnetconfigent ("tcp6"); if (!nc) { err = -1; goto out; } success = rpcb_unset (newprog->prognum, newprog->progver, nc); if (!success) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not unregister the IPv6" " service with rpcbind"); } err = 0; out: return err; } #endif /* Register the program with the local portmapper service. */ int rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port) { int ret = -1; /* FAIL */ if (!newprog) { goto out; } /* pmap_set() returns 0 for FAIL and 1 for SUCCESS */ if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP, port))) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with" " portmap %d %d %u", newprog->prognum, newprog->progver, port); goto out; } ret = 0; /* SUCCESS */ out: return ret; } int rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog) { int ret = -1; if (!prog) goto out; if (!(pmap_unset(prog->prognum, prog->progver))) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not unregister with" " portmap"); goto out; } ret = 0; out: return ret; } int rpcsvc_register_portmap_enabled (rpcsvc_t *svc) { return svc->register_portmap; } int32_t rpcsvc_get_listener_port (rpcsvc_listener_t *listener) { int32_t listener_port = -1; if ((listener == NULL) || (listener->trans == NULL)) { goto out; } switch (listener->trans->myinfo.sockaddr.ss_family) { case AF_INET: listener_port = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port; break; case AF_INET6: listener_port = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port; break; default: gf_log (GF_RPCSVC, GF_LOG_DEBUG, "invalid address family (%d)", listener->trans->myinfo.sockaddr.ss_family); goto out; } listener_port = ntohs (listener_port); out: return listener_port; } rpcsvc_listener_t * rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans) { rpcsvc_listener_t *listener = NULL; char found = 0; uint32_t listener_port = 0; if (!svc) { goto out; } pthread_rwlock_rdlock (&svc->rpclock); { list_for_each_entry (listener, &svc->listeners, list) { if (trans != NULL) { if (listener->trans == trans) { found = 1; break; } continue; } listener_port = rpcsvc_get_listener_port (listener); if (listener_port == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "invalid port for listener %s", listener->trans->name); continue; } if (listener_port == port) { found = 1; break; } } } pthread_rwlock_unlock (&svc->rpclock); if (!found) { listener = NULL; } out: return listener; } /* The only difference between the generic submit and this one is that the * generic submit is also used for submitting RPC error replies in where there * are no payloads so the msgvec and msgbuf can be NULL. * Since RPC programs should be using this function along with their payloads * we must perform NULL checks before calling the generic submit. */ int rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr, int hdrcount, struct iovec *payload, int payloadcount, struct iobref *iobref) { if ((!req) || (!req->trans) || (!proghdr) || (!proghdr->iov_base)) return -1; return rpcsvc_submit_generic (req, proghdr, hdrcount, payload, payloadcount, iobref); } int rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *program) { int ret = -1; rpcsvc_program_t *prog = NULL; if (!svc || !program) { goto out; } ret = rpcsvc_program_unregister_portmap (program); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap unregistration of" " program failed"); goto out; } #ifdef IPV6_DEFAULT ret = rpcsvc_program_unregister_rpcbind6 (program); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "rpcbind (ipv6)" " unregistration of program failed"); goto out; } #endif pthread_rwlock_rdlock (&svc->rpclock); { list_for_each_entry (prog, &svc->programs, program) { if ((prog->prognum == program->prognum) && (prog->progver == program->progver)) { break; } } } pthread_rwlock_unlock (&svc->rpclock); if (prog == NULL) { ret = -1; goto out; } gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Program unregistered: %s, Num: %d," " Ver: %d, Port: %d", prog->progname, prog->prognum, prog->progver, prog->progport); if (prog->ownthread) { prog->alive = _gf_false; ret = 0; goto out; } pthread_rwlock_wrlock (&svc->rpclock); { list_del_init (&prog->program); } pthread_rwlock_unlock (&svc->rpclock); ret = 0; out: if (ret == -1) { if (program) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program " "unregistration failed" ": %s, Num: %d, Ver: %d, Port: %d", program->progname, program->prognum, program->progver, program->progport); } else { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program not found"); } } return ret; } int rpcsvc_transport_peername (rpc_transport_t *trans, char *hostname, int hostlen) { if (!trans) { return -1; } return rpc_transport_get_peername (trans, hostname, hostlen); } int rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen, struct sockaddr_storage *sa, socklen_t sasize) { if (!trans) { return -1; } return rpc_transport_get_peeraddr(trans, addrstr, addrlen, sa, sasize); } rpcsvc_listener_t * rpcsvc_listener_alloc (rpcsvc_t *svc, rpc_transport_t *trans) { rpcsvc_listener_t *listener = NULL; listener = GF_CALLOC (1, sizeof (*listener), gf_common_mt_rpcsvc_listener_t); if (!listener) { goto out; } listener->trans = trans; listener->svc = svc; INIT_LIST_HEAD (&listener->list); pthread_rwlock_wrlock (&svc->rpclock); { list_add_tail (&listener->list, &svc->listeners); } pthread_rwlock_unlock (&svc->rpclock); out: return listener; } int32_t rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name) { rpc_transport_t *trans = NULL; rpcsvc_listener_t *listener = NULL; int32_t ret = -1; if (!svc || !options) { goto out; } trans = rpc_transport_load (svc->ctx, options, name); if (!trans) { gf_log (GF_RPCSVC, GF_LOG_WARNING, "cannot create listener, " "initing the transport failed"); goto out; } ret = rpc_transport_listen (trans); if (ret == -EADDRINUSE || ret == -1) { gf_log (GF_RPCSVC, GF_LOG_WARNING, "listening on transport failed"); goto out; } ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_WARNING, "registering notify failed"); goto out; } listener = rpcsvc_listener_alloc (svc, trans); if (listener == NULL) { goto out; } ret = 0; out: if (!listener && trans) { rpc_transport_disconnect (trans, _gf_true); } return ret; } int32_t rpcsvc_create_listeners (rpcsvc_t *svc, dict_t *options, char *name) { int32_t ret = -1, count = 0; data_t *data = NULL; char *str = NULL, *ptr = NULL, *transport_name = NULL; char *transport_type = NULL, *saveptr = NULL, *tmp = NULL; if ((svc == NULL) || (options == NULL) || (name == NULL)) { goto out; } data = dict_get (options, "transport-type"); if (data == NULL) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "option transport-type not set"); goto out; } transport_type = data_to_str (data); if (transport_type == NULL) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "option transport-type not set"); goto out; } /* duplicate transport_type, since following dict_set will free it */ transport_type = gf_strdup (transport_type); if (transport_type == NULL) { goto out; } str = gf_strdup (transport_type); if (str == NULL) { goto out; } ptr = strtok_r (str, ",", &saveptr); while (ptr != NULL) { tmp = gf_strdup (ptr); if (tmp == NULL) { goto out; } ret = gf_asprintf (&transport_name, "%s.%s", tmp, name); if (ret == -1) { goto out; } ret = dict_set_dynstr (options, "transport-type", tmp); if (ret == -1) { goto out; } tmp = NULL; ptr = strtok_r (NULL, ",", &saveptr); ret = rpcsvc_create_listener (svc, options, transport_name); if (ret != 0) { goto out; } GF_FREE (transport_name); transport_name = NULL; count++; } ret = dict_set_dynstr (options, "transport-type", transport_type); if (ret == -1) { goto out; } transport_type = NULL; out: GF_FREE (str); GF_FREE (transport_type); GF_FREE (tmp); GF_FREE (transport_name); if (count > 0) { return count; } else { return ret; } } int rpcsvc_unregister_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata) { rpcsvc_notify_wrapper_t *wrapper = NULL, *tmp = NULL; int ret = 0; if (!svc || !notify) { goto out; } pthread_rwlock_wrlock (&svc->rpclock); { list_for_each_entry_safe (wrapper, tmp, &svc->notify, list) { if ((wrapper->notify == notify) && (mydata == wrapper->data)) { list_del_init (&wrapper->list); GF_FREE (wrapper); ret++; } } } pthread_rwlock_unlock (&svc->rpclock); out: return ret; } int rpcsvc_register_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata) { rpcsvc_notify_wrapper_t *wrapper = NULL; int ret = -1; wrapper = rpcsvc_notify_wrapper_alloc (); if (!wrapper) { goto out; } svc->mydata = mydata; wrapper->data = mydata; wrapper->notify = notify; pthread_rwlock_wrlock (&svc->rpclock); { list_add_tail (&wrapper->list, &svc->notify); svc->notify_count++; } pthread_rwlock_unlock (&svc->rpclock); ret = 0; out: return ret; } void * rpcsvc_request_handler (void *arg) { rpcsvc_program_t *program = arg; rpcsvc_request_t *req = NULL; rpcsvc_actor_t *actor = NULL; gf_boolean_t done = _gf_false; int ret = 0; if (!program) return NULL; while (1) { pthread_mutex_lock (&program->queue_lock); { if (!program->alive && list_empty (&program->request_queue)) { done = 1; goto unlock; } while (list_empty (&program->request_queue)) pthread_cond_wait (&program->queue_cond, &program->queue_lock); req = list_entry (program->request_queue.next, typeof (*req), request_list); list_del_init (&req->request_list); } unlock: pthread_mutex_unlock (&program->queue_lock); if (done) break; THIS = req->svc->xl; actor = rpcsvc_program_actor (req); ret = actor->actor (req); if (ret != 0) { rpcsvc_check_and_reply_error (ret, NULL, req); } } return NULL; } int rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program) { int ret = -1; rpcsvc_program_t *newprog = NULL; char already_registered = 0; if (!svc) { goto out; } if (program->actors == NULL) { goto out; } pthread_rwlock_rdlock (&svc->rpclock); { list_for_each_entry (newprog, &svc->programs, program) { if ((newprog->prognum == program->prognum) && (newprog->progver == program->progver)) { already_registered = 1; break; } } } pthread_rwlock_unlock (&svc->rpclock); if (already_registered) { ret = 0; goto out; } newprog = GF_CALLOC (1, sizeof(*newprog),gf_common_mt_rpcsvc_program_t); if (newprog == NULL) { goto out; } memcpy (newprog, program, sizeof (*program)); INIT_LIST_HEAD (&newprog->program); INIT_LIST_HEAD (&newprog->request_queue); pthread_mutex_init (&newprog->queue_lock, NULL); pthread_cond_init (&newprog->queue_cond, NULL); newprog->alive = _gf_true; /* make sure synctask gets priority over ownthread */ if (newprog->synctask) newprog->ownthread = _gf_false; if (newprog->ownthread) { gf_thread_create (&newprog->thread, NULL, rpcsvc_request_handler, newprog, "rpcsvcrh"); } pthread_rwlock_wrlock (&svc->rpclock); { list_add_tail (&newprog->program, &svc->programs); } pthread_rwlock_unlock (&svc->rpclock); ret = 0; gf_log (GF_RPCSVC, GF_LOG_DEBUG, "New program registered: %s, Num: %d," " Ver: %d, Port: %d", newprog->progname, newprog->prognum, newprog->progver, newprog->progport); out: if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program registration failed:" " %s, Num: %d, Ver: %d, Port: %d", program->progname, program->prognum, program->progver, program->progport); } return ret; } static void free_prog_details (gf_dump_rsp *rsp) { gf_prog_detail *prev = NULL; gf_prog_detail *trav = NULL; trav = rsp->prog; while (trav) { prev = trav; trav = trav->next; GF_FREE (prev); } } static int build_prog_details (rpcsvc_request_t *req, gf_dump_rsp *rsp) { int ret = -1; rpcsvc_program_t *program = NULL; gf_prog_detail *prog = NULL; gf_prog_detail *prev = NULL; if (!req || !req->trans || !req->svc) goto out; pthread_rwlock_rdlock (&req->svc->rpclock); { list_for_each_entry (program, &req->svc->programs, program) { prog = GF_CALLOC (1, sizeof (*prog), 0); if (!prog) goto unlock; prog->progname = program->progname; prog->prognum = program->prognum; prog->progver = program->progver; if (!rsp->prog) rsp->prog = prog; if (prev) prev->next = prog; prev = prog; } if (prev) ret = 0; } unlock: pthread_rwlock_unlock (&req->svc->rpclock); out: return ret; } static int rpcsvc_ping (rpcsvc_request_t *req) { char rsp_buf[8 * 1024] = {0,}; gf_common_rsp rsp = {0,}; struct iovec iov = {0,}; int ret = -1; uint32_t ping_rsp_len = 0; ping_rsp_len = xdr_sizeof ((xdrproc_t) xdr_gf_common_rsp, &rsp); iov.iov_base = rsp_buf; iov.iov_len = ping_rsp_len; ret = xdr_serialize_generic (iov, &rsp, (xdrproc_t)xdr_gf_common_rsp); if (ret < 0) { ret = RPCSVC_ACTOR_ERROR; } else { rsp.op_ret = 0; rpcsvc_submit_generic (req, &iov, 1, NULL, 0, NULL); } return 0; } static int rpcsvc_dump (rpcsvc_request_t *req) { char rsp_buf[8 * 1024] = {0,}; gf_dump_rsp rsp = {0,}; struct iovec iov = {0,}; int op_errno = EINVAL; int ret = -1; uint32_t dump_rsp_len = 0; if (!req) goto sendrsp; ret = build_prog_details (req, &rsp); if (ret < 0) { op_errno = -ret; goto sendrsp; } op_errno = 0; sendrsp: rsp.op_errno = gf_errno_to_error (op_errno); rsp.op_ret = ret; dump_rsp_len = xdr_sizeof ((xdrproc_t) xdr_gf_dump_rsp, &rsp); iov.iov_base = rsp_buf; iov.iov_len = dump_rsp_len; ret = xdr_serialize_generic (iov, &rsp, (xdrproc_t)xdr_gf_dump_rsp); if (ret < 0) { ret = RPCSVC_ACTOR_ERROR; } else { rpcsvc_submit_generic (req, &iov, 1, NULL, 0, NULL); ret = 0; } free_prog_details (&rsp); return ret; } int rpcsvc_init_options (rpcsvc_t *svc, dict_t *options) { char *optstr = NULL; int ret = -1; if ((!svc) || (!options)) return -1; svc->memfactor = RPCSVC_DEFAULT_MEMFACTOR; svc->register_portmap = _gf_true; if (dict_get (options, "rpc.register-with-portmap")) { ret = dict_get_str (options, "rpc.register-with-portmap", &optstr); if (ret < 0) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to parse " "dict"); goto out; } ret = gf_string2boolean (optstr, &svc->register_portmap); if (ret < 0) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to parse bool " "string"); goto out; } } if (!svc->register_portmap) gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration " "disabled"); ret = 0; out: return ret; } int rpcsvc_reconfigure_options (rpcsvc_t *svc, dict_t *options) { xlator_t *xlator = NULL; xlator_list_t *volentry = NULL; char *srchkey = NULL; char *keyval = NULL; int ret = -1; if ((!svc) || (!svc->options) || (!options)) return (-1); /* Fetch the xlator from svc */ xlator = svc->xl; if (!xlator) return (-1); /* Reconfigure the volume specific rpc-auth.addr allow part */ volentry = xlator->children; while (volentry) { ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.allow", volentry->xlator->name); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); return (-1); } /* key-string: rpc-auth.addr..allow * * IMP: Delete the OLD key/value pair from dict. * And set the NEW key/value pair IFF the option is SET * in reconfigured volfile. * * NB: If rpc-auth.addr..allow is not SET explicitly, * build_nfs_graph() sets it as "*" i.e. anonymous. */ dict_del (svc->options, srchkey); if (!dict_get_str (options, srchkey, &keyval)) { ret = dict_set_str (svc->options, srchkey, keyval); if (ret < 0) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "dict_set_str error"); GF_FREE (srchkey); return (-1); } } GF_FREE (srchkey); volentry = volentry->next; } /* Reconfigure the volume specific rpc-auth.addr reject part */ volentry = xlator->children; while (volentry) { ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.reject", volentry->xlator->name); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); return (-1); } /* key-string: rpc-auth.addr..reject * * IMP: Delete the OLD key/value pair from dict. * And set the NEW key/value pair IFF the option is SET * in reconfigured volfile. * * NB: No default value for reject key. */ dict_del (svc->options, srchkey); if (!dict_get_str (options, srchkey, &keyval)) { ret = dict_set_str (svc->options, srchkey, keyval); if (ret < 0) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "dict_set_str error"); GF_FREE (srchkey); return (-1); } } GF_FREE (srchkey); volentry = volentry->next; } ret = rpcsvc_init_options (svc, options); if (ret) return (-1); return rpcsvc_auth_reconf (svc, options); } int rpcsvc_transport_unix_options_build (dict_t **options, char *filepath) { dict_t *dict = NULL; char *fpath = NULL; int ret = -1; GF_ASSERT (filepath); GF_ASSERT (options); dict = dict_new (); if (!dict) goto out; fpath = gf_strdup (filepath); if (!fpath) { ret = -1; goto out; } ret = dict_set_dynstr (dict, "transport.socket.listen-path", fpath); if (ret) goto out; ret = dict_set_str (dict, "transport.address-family", "unix"); if (ret) goto out; ret = dict_set_str (dict, "transport.socket.nodelay", "off"); if (ret) goto out; ret = dict_set_str (dict, "transport-type", "socket"); if (ret) goto out; *options = dict; out: if (ret) { GF_FREE (fpath); if (dict) dict_unref (dict); } return ret; } /* * Configure() the rpc.outstanding-rpc-limit param. * If dict_get_int32() for dict-key "rpc.outstanding-rpc-limit" FAILS, * it would set the value as "defvalue". Otherwise it would fetch the * value and round up to multiple-of-8. defvalue must be +ve. * * NB: defval or set-value "0" is special which means unlimited/65536. */ int rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options, int defvalue) { int ret = -1; /* FAILURE */ int rpclim = 0; static char *rpclimkey = "rpc.outstanding-rpc-limit"; if ((!svc) || (!options)) return (-1); if ((defvalue < RPCSVC_MIN_OUTSTANDING_RPC_LIMIT) || (defvalue > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT)) { return (-1); } /* Fetch the rpc.outstanding-rpc-limit from dict. */ ret = dict_get_int32 (options, rpclimkey, &rpclim); if (ret < 0) { /* Fall back to default for FAILURE */ rpclim = defvalue; } /* Round up to multiple-of-8. It must not exceed * RPCSVC_MAX_OUTSTANDING_RPC_LIMIT. */ rpclim = ((rpclim + 8 - 1) >> 3) * 8; if (rpclim > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT) { rpclim = RPCSVC_MAX_OUTSTANDING_RPC_LIMIT; } if (svc->outstanding_rpc_limit != rpclim) { svc->outstanding_rpc_limit = rpclim; gf_log (GF_RPCSVC, GF_LOG_INFO, "Configured %s with value %d", rpclimkey, rpclim); } return (0); } /* * Enable throttling for rpcsvc_t svc. * Returns 0 on success, -1 otherwise. */ int rpcsvc_set_throttle_on (rpcsvc_t *svc) { if (!svc) return -1; svc->throttle = _gf_true; return 0; } /* * Disable throttling for rpcsvc_t svc. * Returns 0 on success, -1 otherwise. */ int rpcsvc_set_throttle_off (rpcsvc_t *svc) { if (!svc) return -1; svc->throttle = _gf_false; return 0; } /* * Get throttle state for rpcsvc_t svc. * Returns value of attribute throttle on success, _gf_false otherwise. */ gf_boolean_t rpcsvc_get_throttle (rpcsvc_t *svc) { if (!svc) return _gf_false; return svc->throttle; } /* The global RPC service initializer. */ rpcsvc_t * rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options, uint32_t poolcount) { rpcsvc_t *svc = NULL; int ret = -1; if ((!xl) || (!ctx) || (!options)) return NULL; svc = GF_CALLOC (1, sizeof (*svc), gf_common_mt_rpcsvc_t); if (!svc) return NULL; pthread_rwlock_init (&svc->rpclock, NULL); INIT_LIST_HEAD (&svc->authschemes); INIT_LIST_HEAD (&svc->notify); INIT_LIST_HEAD (&svc->listeners); INIT_LIST_HEAD (&svc->programs); ret = rpcsvc_init_options (svc, options); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init options"); goto free_svc; } if (!poolcount) poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor; gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount); svc->rxpool = mem_pool_new (rpcsvc_request_t, poolcount); /* TODO: leak */ if (!svc->rxpool) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed"); goto free_svc; } ret = rpcsvc_auth_init (svc, options); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init " "authentication"); goto free_svc; } ret = -1; svc->options = options; svc->ctx = ctx; svc->xl = xl; gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited."); gluster_dump_prog.options = options; ret = rpcsvc_program_register (svc, &gluster_dump_prog); if (ret) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "failed to register DUMP program"); goto free_svc; } ret = 0; free_svc: if (ret == -1) { GF_FREE (svc); svc = NULL; } return svc; } int rpcsvc_transport_peer_check_search (dict_t *options, char *pattern, char *ip, char *hostname) { int ret = -1; char *addrtok = NULL; char *addrstr = NULL; char *dup_addrstr = NULL; char *svptr = NULL; if ((!options) || (!ip)) return -1; ret = dict_get_str (options, pattern, &addrstr); if (ret < 0) { ret = -1; goto err; } if (!addrstr) { ret = -1; goto err; } dup_addrstr = gf_strdup (addrstr); addrtok = strtok_r (dup_addrstr, ",", &svptr); while (addrtok) { /* CASEFOLD not present on Solaris */ #ifdef FNM_CASEFOLD ret = fnmatch (addrtok, ip, FNM_CASEFOLD); #else ret = fnmatch (addrtok, ip, 0); #endif if (ret == 0) goto err; /* compare hostnames if applicable */ if (hostname) { #ifdef FNM_CASEFOLD ret = fnmatch (addrtok, hostname, FNM_CASEFOLD); #else ret = fnmatch (addrtok, hostname, 0); #endif if (ret == 0) goto err; } /* Compare IPv4 subnetwork, TODO: IPv6 subnet support */ if (strchr (addrtok, '/')) { ret = rpcsvc_match_subnet_v4 (addrtok, ip); if (ret == 0) goto err; } addrtok = strtok_r (NULL, ",", &svptr); } ret = -1; err: GF_FREE (dup_addrstr); return ret; } static int rpcsvc_transport_peer_check_allow (dict_t *options, char *volname, char *ip, char *hostname) { int ret = RPCSVC_AUTH_DONTCARE; char *srchstr = NULL; if ((!options) || (!ip) || (!volname)) return ret; ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); ret = RPCSVC_AUTH_DONTCARE; goto out; } ret = rpcsvc_transport_peer_check_search (options, srchstr, ip, hostname); GF_FREE (srchstr); if (ret == 0) ret = RPCSVC_AUTH_ACCEPT; else ret = RPCSVC_AUTH_REJECT; out: return ret; } static int rpcsvc_transport_peer_check_reject (dict_t *options, char *volname, char *ip, char *hostname) { int ret = RPCSVC_AUTH_DONTCARE; char *srchstr = NULL; if ((!options) || (!ip) || (!volname)) return ret; ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); ret = RPCSVC_AUTH_REJECT; goto out; } ret = rpcsvc_transport_peer_check_search (options, srchstr, ip, hostname); GF_FREE (srchstr); if (ret == 0) ret = RPCSVC_AUTH_REJECT; else ret = RPCSVC_AUTH_DONTCARE; out: return ret; } /* Combines rpc auth's allow and reject options. * Order of checks is important. * First, REJECT if either rejects. * If neither rejects, ACCEPT if either accepts. * If neither accepts, DONTCARE */ int rpcsvc_combine_allow_reject_volume_check (int allow, int reject) { if (allow == RPCSVC_AUTH_REJECT || reject == RPCSVC_AUTH_REJECT) return RPCSVC_AUTH_REJECT; if (allow == RPCSVC_AUTH_ACCEPT || reject == RPCSVC_AUTH_ACCEPT) return RPCSVC_AUTH_ACCEPT; return RPCSVC_AUTH_DONTCARE; } int rpcsvc_auth_check (rpcsvc_t *svc, char *volname, char *ipaddr) { int ret = RPCSVC_AUTH_REJECT; int accept = RPCSVC_AUTH_REJECT; int reject = RPCSVC_AUTH_REJECT; char *hostname = NULL; char *allow_str = NULL; char *reject_str = NULL; char *srchstr = NULL; dict_t *options = NULL; if (!svc || !volname || !ipaddr) return ret; /* Fetch the options from svc struct and validate */ options = svc->options; if (!options) return ret; /* Accept if its the default case: Allow all, Reject none * The default volfile always contains a 'allow *' rule * for each volume. If allow rule is missing (which implies * there is some bad volfile generating code doing this), we * assume no one is allowed mounts, and thus, we reject mounts. */ ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); return RPCSVC_AUTH_REJECT; } ret = dict_get_str (options, srchstr, &allow_str); GF_FREE (srchstr); if (ret < 0) return RPCSVC_AUTH_REJECT; ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); return RPCSVC_AUTH_REJECT; } ret = dict_get_str (options, srchstr, &reject_str); GF_FREE (srchstr); /* * If "reject_str" is being set as '*' (anonymous), then NFS-server * would reject everything. If the "reject_str" is not set and * "allow_str" is set as '*' (anonymous), then NFS-server would * accept mount requests from all clients. */ if (reject_str != NULL) { if (!strcmp ("*", reject_str)) return RPCSVC_AUTH_REJECT; } else { if (!strcmp ("*", allow_str)) return RPCSVC_AUTH_ACCEPT; } /* addr-namelookup check */ if (svc->addr_namelookup == _gf_true) { ret = gf_get_hostname_from_ip (ipaddr, &hostname); if (ret) { if (hostname) GF_FREE (hostname); /* failed to get hostname, but hostname auth * is enabled, so authentication will not be * 100% correct. reject mounts */ return RPCSVC_AUTH_REJECT; } } accept = rpcsvc_transport_peer_check_allow (options, volname, ipaddr, hostname); reject = rpcsvc_transport_peer_check_reject (options, volname, ipaddr, hostname); if (hostname) GF_FREE (hostname); return rpcsvc_combine_allow_reject_volume_check (accept, reject); } int rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname, uint16_t port) { int ret = RPCSVC_AUTH_REJECT; char *srchstr = NULL; char *valstr = NULL; gf_boolean_t insecure = _gf_false; if ((!svc) || (!volname)) return ret; gf_log (GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port); /* If the port is already a privileged one, dont bother with checking * options. */ if (port <= 1024) { ret = RPCSVC_AUTH_ACCEPT; goto err; } /* Disabled by default */ ret = gf_asprintf (&srchstr, "rpc-auth.ports.%s.insecure", volname); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); ret = RPCSVC_AUTH_REJECT; goto err; } ret = dict_get_str (svc->options, srchstr, &valstr); if (ret) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" " read rpc-auth.ports.insecure value"); goto err; } ret = gf_string2boolean (valstr, &insecure); if (ret) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" " convert rpc-auth.ports.insecure value"); goto err; } ret = insecure ? RPCSVC_AUTH_ACCEPT : RPCSVC_AUTH_REJECT; if (ret == RPCSVC_AUTH_ACCEPT) gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port allowed"); else gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port not" " allowed"); err: if (srchstr) GF_FREE (srchstr); return ret; } char * rpcsvc_volume_allowed (dict_t *options, char *volname) { char globalrule[] = "rpc-auth.addr.allow"; char *srchstr = NULL; char *addrstr = NULL; int ret = -1; if ((!options) || (!volname)) return NULL; ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); goto out; } if (!dict_get (options, srchstr)) ret = dict_get_str (options, globalrule, &addrstr); else ret = dict_get_str (options, srchstr, &addrstr); out: GF_FREE (srchstr); return addrstr; } /* * rpcsvc_match_subnet_v4() takes subnetwork address pattern and checks * if the target IPv4 address has the same network address with the help * of network mask. * * Returns 0 for SUCCESS and -1 otherwise. * * NB: Validation of subnetwork address pattern is not required * as it's already being done at the time of CLI SET. */ static int rpcsvc_match_subnet_v4 (const char *addrtok, const char *ipaddr) { char *slash = NULL; char *netaddr = NULL; int ret = -1; uint32_t prefixlen = 0; uint32_t shift = 0; struct sockaddr_in sin1 = {0, }; struct sockaddr_in sin2 = {0, }; struct sockaddr_in mask = {0, }; /* Copy the input */ netaddr = gf_strdup (addrtok); if (netaddr == NULL) /* ENOMEM */ goto out; /* Find the network socket addr of target */ if (inet_pton (AF_INET, ipaddr, &sin1.sin_addr) == 0) goto out; /* Find the network socket addr of subnet pattern */ slash = strchr (netaddr, '/'); *slash = '\0'; if (inet_pton (AF_INET, netaddr, &sin2.sin_addr) == 0) goto out; /* * Find the IPv4 network mask in network byte order. * IMP: String slash+1 is already validated, it cant have value * more than IPv4_ADDR_SIZE (32). */ prefixlen = (uint32_t) atoi (slash + 1); shift = IPv4_ADDR_SIZE - prefixlen; mask.sin_addr.s_addr = htonl ((uint32_t)~0 << shift); if (mask_match (sin1.sin_addr.s_addr, sin2.sin_addr.s_addr, mask.sin_addr.s_addr)) { ret = 0; /* SUCCESS */ } out: GF_FREE (netaddr); return ret; } rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = { [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA}, [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA}, [GF_DUMP_PING] = {"PING", GF_DUMP_PING, rpcsvc_ping, NULL, 0, DRC_NA}, }; struct rpcsvc_program gluster_dump_prog = { .progname = "GF-DUMP", .prognum = GLUSTER_DUMP_PROGRAM, .progver = GLUSTER_DUMP_VERSION, .actors = gluster_dump_actors, .numactors = GF_DUMP_MAXVALUE, };