diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-rpc-common.c')
-rw-r--r-- | xlators/features/changelog/src/changelog-rpc-common.c | 334 |
1 files changed, 334 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c new file mode 100644 index 00000000000..76db6696ae8 --- /dev/null +++ b/xlators/features/changelog/src/changelog-rpc-common.c @@ -0,0 +1,334 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-rpc-common.h" + +/** +***************************************************** + Client Interface +***************************************************** +*/ + +/** + * Initialize and return an RPC client object for a given unix + * domain socket. + */ + +void * +changelog_rpc_poller (void *arg) +{ + xlator_t *this = arg; + + (void) event_dispatch (this->ctx->event_pool); + return NULL; +} + +struct rpc_clnt * +changelog_rpc_client_init (xlator_t *this, void *cbkdata, + char *sockfile, rpc_clnt_notify_t fn) +{ + int ret = 0; + struct rpc_clnt *rpc = NULL; + dict_t *options = NULL; + + if (!cbkdata) + cbkdata = this; + + options = dict_new (); + if (!options) + goto error_return; + + ret = rpc_transport_unix_options_build (&options, sockfile, 0); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "failed to build rpc options"); + goto dealloc_dict; + } + + rpc = rpc_clnt_new (options, this->ctx, this->name, 16); + if (!rpc) + goto dealloc_dict; + + ret = rpc_clnt_register_notify (rpc, fn, cbkdata); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "failed to register notify"); + goto dealloc_rpc_clnt; + } + + ret = rpc_clnt_start (rpc); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "failed to start rpc"); + goto dealloc_rpc_clnt; + } + + return rpc; + + dealloc_rpc_clnt: + rpc_clnt_unref (rpc); + dealloc_dict: + dict_unref (options); + error_return: + return NULL; +} + +/** + * Generic RPC client routine to dispatch a request to an + * RPC server. + */ +int +changelog_rpc_sumbit_req (struct rpc_clnt *rpc, void *req, + call_frame_t *frame, rpc_clnt_prog_t *prog, + int procnum, struct iovec *payload, int payloadcnt, + struct iobref *iobref, xlator_t *this, + fop_cbk_fn_t cbkfn, xdrproc_t xdrproc) +{ + int ret = 0; + int count = 0; + struct iovec iov = {0, }; + struct iobuf *iobuf = NULL; + char new_iobref = 0; + ssize_t xdr_size = 0; + + GF_ASSERT (this); + + if (req) { + xdr_size = xdr_sizeof (xdrproc, req); + + iobuf = iobuf_get2 (this->ctx->iobuf_pool, xdr_size); + if (!iobuf) { + goto out; + }; + + if (!iobref) { + iobref = iobref_new (); + if (!iobref) { + goto out; + } + + new_iobref = 1; + } + + iobref_add (iobref, iobuf); + + iov.iov_base = iobuf->ptr; + iov.iov_len = iobuf_size (iobuf); + + /* Create the xdr payload */ + ret = xdr_serialize_generic (iov, req, xdrproc); + if (ret == -1) { + goto out; + } + + iov.iov_len = ret; + count = 1; + } + + ret = rpc_clnt_submit (rpc, prog, procnum, cbkfn, &iov, count, + payload, payloadcnt, iobref, frame, NULL, + 0, NULL, 0, NULL); + + out: + if (new_iobref) + iobref_unref (iobref); + if (iobuf) + iobuf_unref (iobuf); + return ret; +} + +/** + * Entry point to perform a remote procedure call + */ +int +changelog_invoke_rpc (xlator_t *this, struct rpc_clnt *rpc, + rpc_clnt_prog_t *prog, int procidx, void *arg) +{ + int ret = 0; + call_frame_t *frame = NULL; + rpc_clnt_procedure_t *proc = NULL; + + if (!this || !prog) + goto error_return; + + frame = create_frame (this, this->ctx->pool); + if (!frame) { + gf_log (this->name, GF_LOG_ERROR, "failed to create frame"); + goto error_return; + } + + proc = &prog->proctable[procidx]; + if (proc->fn) + ret = proc->fn (frame, this, arg); + + STACK_DESTROY (frame->root); + return ret; + + error_return: + return -1; +} + +/** +***************************************************** + Server Interface +***************************************************** +*/ + +struct iobuf * +__changelog_rpc_serialize_reply (rpcsvc_request_t *req, void *arg, + struct iovec *outmsg, xdrproc_t xdrproc) +{ + struct iobuf *iob = NULL; + ssize_t retlen = 0; + ssize_t rsp_size = 0; + + rsp_size = xdr_sizeof (xdrproc, arg); + iob = iobuf_get2 (req->svc->ctx->iobuf_pool, rsp_size); + if (!iob) + goto error_return; + + iobuf_to_iovec (iob, outmsg); + + retlen = xdr_serialize_generic (*outmsg, arg, xdrproc); + if (retlen == -1) + goto unref_iob; + + outmsg->iov_len = retlen; + return iob; + + unref_iob: + iobuf_unref (iob); + error_return: + return NULL; +} + +int +changelog_rpc_sumbit_reply (rpcsvc_request_t *req, + void *arg, struct iovec *payload, int payloadcount, + struct iobref *iobref, xdrproc_t xdrproc) +{ + int ret = -1; + struct iobuf *iob = NULL; + struct iovec iov = {0,}; + char new_iobref = 0; + + if (!req) + goto return_ret; + + if (!iobref) { + iobref = iobref_new (); + if (!iobref) + goto return_ret; + new_iobref = 1; + } + + iob = __changelog_rpc_serialize_reply (req, arg, &iov, xdrproc); + if (!iob) + gf_log ("", GF_LOG_ERROR, "failed to serialize reply"); + else + iobref_add (iobref, iob); + + ret = rpcsvc_submit_generic (req, &iov, + 1, payload, payloadcount, iobref); + + if (new_iobref) + iobref_unref (iobref); + if (iob) + iobuf_unref (iob); + return_ret: + return ret; +} + +void +changelog_rpc_server_destroy (xlator_t *this, rpcsvc_t *rpc, char *sockfile, + rpcsvc_notify_t fn, struct rpcsvc_program **progs) +{ + rpcsvc_listener_t *listener = NULL; + rpcsvc_listener_t *next = NULL; + struct rpcsvc_program *prog = NULL; + + while (*progs) { + prog = *progs; + (void) rpcsvc_program_unregister (rpc, prog); + } + + list_for_each_entry_safe (listener, next, &rpc->listeners, list) { + rpcsvc_listener_destroy (listener); + } + + (void) rpcsvc_unregister_notify (rpc, fn, this); + unlink (sockfile); + + GF_FREE (rpc); +} + +rpcsvc_t * +changelog_rpc_server_init (xlator_t *this, char *sockfile, void *cbkdata, + rpcsvc_notify_t fn, struct rpcsvc_program **progs) +{ + int j = 0; + int ret = 0; + rpcsvc_t *rpc = NULL; + dict_t *options = NULL; + struct rpcsvc_program *prog = NULL; + + if (!cbkdata) + cbkdata = this; + + options = dict_new (); + if (!options) + goto error_return; + + ret = rpcsvc_transport_unix_options_build (&options, sockfile); + if (ret) + goto dealloc_dict; + + rpc = rpcsvc_init (this, this->ctx, options, 8); + if (rpc == NULL) { + gf_log (this->name, GF_LOG_ERROR, "failed to init rpc"); + goto dealloc_dict; + } + + ret = rpcsvc_register_notify (rpc, fn, cbkdata); + if (ret) { + gf_log (this->name, + GF_LOG_ERROR, "failed to register notify function"); + goto dealloc_rpc; + } + + ret = rpcsvc_create_listeners (rpc, options, this->name); + if (ret != 1) { + gf_log (this->name, + GF_LOG_DEBUG, "failed to create listeners"); + goto dealloc_rpc; + } + + while (*progs) { + prog = *progs; + ret = rpcsvc_program_register (rpc, prog); + if (ret) { + gf_log (this->name, + GF_LOG_ERROR, "cannot register program " + "(name: %s, prognum: %d, pogver: %d)", + prog->progname, prog->prognum, prog->progver); + goto dealloc_rpc; + } + + progs++; + } + + dict_unref (options); + return rpc; + + dealloc_rpc: + GF_FREE (rpc); + dealloc_dict: + dict_unref (options); + error_return: + return NULL; +} |