summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-rpc-common.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/changelog-rpc-common.c')
-rw-r--r--xlators/features/changelog/src/changelog-rpc-common.c334
1 files changed, 334 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c
new file mode 100644
index 00000000000..76db6696ae8
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-rpc-common.c
@@ -0,0 +1,334 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-rpc-common.h"
+
+/**
+*****************************************************
+ Client Interface
+*****************************************************
+*/
+
+/**
+ * Initialize and return an RPC client object for a given unix
+ * domain socket.
+ */
+
+void *
+changelog_rpc_poller (void *arg)
+{
+ xlator_t *this = arg;
+
+ (void) event_dispatch (this->ctx->event_pool);
+ return NULL;
+}
+
+struct rpc_clnt *
+changelog_rpc_client_init (xlator_t *this, void *cbkdata,
+ char *sockfile, rpc_clnt_notify_t fn)
+{
+ int ret = 0;
+ struct rpc_clnt *rpc = NULL;
+ dict_t *options = NULL;
+
+ if (!cbkdata)
+ cbkdata = this;
+
+ options = dict_new ();
+ if (!options)
+ goto error_return;
+
+ ret = rpc_transport_unix_options_build (&options, sockfile, 0);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "failed to build rpc options");
+ goto dealloc_dict;
+ }
+
+ rpc = rpc_clnt_new (options, this->ctx, this->name, 16);
+ if (!rpc)
+ goto dealloc_dict;
+
+ ret = rpc_clnt_register_notify (rpc, fn, cbkdata);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to register notify");
+ goto dealloc_rpc_clnt;
+ }
+
+ ret = rpc_clnt_start (rpc);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to start rpc");
+ goto dealloc_rpc_clnt;
+ }
+
+ return rpc;
+
+ dealloc_rpc_clnt:
+ rpc_clnt_unref (rpc);
+ dealloc_dict:
+ dict_unref (options);
+ error_return:
+ return NULL;
+}
+
+/**
+ * Generic RPC client routine to dispatch a request to an
+ * RPC server.
+ */
+int
+changelog_rpc_sumbit_req (struct rpc_clnt *rpc, void *req,
+ call_frame_t *frame, rpc_clnt_prog_t *prog,
+ int procnum, struct iovec *payload, int payloadcnt,
+ struct iobref *iobref, xlator_t *this,
+ fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
+{
+ int ret = 0;
+ int count = 0;
+ struct iovec iov = {0, };
+ struct iobuf *iobuf = NULL;
+ char new_iobref = 0;
+ ssize_t xdr_size = 0;
+
+ GF_ASSERT (this);
+
+ if (req) {
+ xdr_size = xdr_sizeof (xdrproc, req);
+
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, xdr_size);
+ if (!iobuf) {
+ goto out;
+ };
+
+ if (!iobref) {
+ iobref = iobref_new ();
+ if (!iobref) {
+ goto out;
+ }
+
+ new_iobref = 1;
+ }
+
+ iobref_add (iobref, iobuf);
+
+ iov.iov_base = iobuf->ptr;
+ iov.iov_len = iobuf_size (iobuf);
+
+ /* Create the xdr payload */
+ ret = xdr_serialize_generic (iov, req, xdrproc);
+ if (ret == -1) {
+ goto out;
+ }
+
+ iov.iov_len = ret;
+ count = 1;
+ }
+
+ ret = rpc_clnt_submit (rpc, prog, procnum, cbkfn, &iov, count,
+ payload, payloadcnt, iobref, frame, NULL,
+ 0, NULL, 0, NULL);
+
+ out:
+ if (new_iobref)
+ iobref_unref (iobref);
+ if (iobuf)
+ iobuf_unref (iobuf);
+ return ret;
+}
+
+/**
+ * Entry point to perform a remote procedure call
+ */
+int
+changelog_invoke_rpc (xlator_t *this, struct rpc_clnt *rpc,
+ rpc_clnt_prog_t *prog, int procidx, void *arg)
+{
+ int ret = 0;
+ call_frame_t *frame = NULL;
+ rpc_clnt_procedure_t *proc = NULL;
+
+ if (!this || !prog)
+ goto error_return;
+
+ frame = create_frame (this, this->ctx->pool);
+ if (!frame) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to create frame");
+ goto error_return;
+ }
+
+ proc = &prog->proctable[procidx];
+ if (proc->fn)
+ ret = proc->fn (frame, this, arg);
+
+ STACK_DESTROY (frame->root);
+ return ret;
+
+ error_return:
+ return -1;
+}
+
+/**
+*****************************************************
+ Server Interface
+*****************************************************
+*/
+
+struct iobuf *
+__changelog_rpc_serialize_reply (rpcsvc_request_t *req, void *arg,
+ struct iovec *outmsg, xdrproc_t xdrproc)
+{
+ struct iobuf *iob = NULL;
+ ssize_t retlen = 0;
+ ssize_t rsp_size = 0;
+
+ rsp_size = xdr_sizeof (xdrproc, arg);
+ iob = iobuf_get2 (req->svc->ctx->iobuf_pool, rsp_size);
+ if (!iob)
+ goto error_return;
+
+ iobuf_to_iovec (iob, outmsg);
+
+ retlen = xdr_serialize_generic (*outmsg, arg, xdrproc);
+ if (retlen == -1)
+ goto unref_iob;
+
+ outmsg->iov_len = retlen;
+ return iob;
+
+ unref_iob:
+ iobuf_unref (iob);
+ error_return:
+ return NULL;
+}
+
+int
+changelog_rpc_sumbit_reply (rpcsvc_request_t *req,
+ void *arg, struct iovec *payload, int payloadcount,
+ struct iobref *iobref, xdrproc_t xdrproc)
+{
+ int ret = -1;
+ struct iobuf *iob = NULL;
+ struct iovec iov = {0,};
+ char new_iobref = 0;
+
+ if (!req)
+ goto return_ret;
+
+ if (!iobref) {
+ iobref = iobref_new ();
+ if (!iobref)
+ goto return_ret;
+ new_iobref = 1;
+ }
+
+ iob = __changelog_rpc_serialize_reply (req, arg, &iov, xdrproc);
+ if (!iob)
+ gf_log ("", GF_LOG_ERROR, "failed to serialize reply");
+ else
+ iobref_add (iobref, iob);
+
+ ret = rpcsvc_submit_generic (req, &iov,
+ 1, payload, payloadcount, iobref);
+
+ if (new_iobref)
+ iobref_unref (iobref);
+ if (iob)
+ iobuf_unref (iob);
+ return_ret:
+ return ret;
+}
+
+void
+changelog_rpc_server_destroy (xlator_t *this, rpcsvc_t *rpc, char *sockfile,
+ rpcsvc_notify_t fn, struct rpcsvc_program **progs)
+{
+ rpcsvc_listener_t *listener = NULL;
+ rpcsvc_listener_t *next = NULL;
+ struct rpcsvc_program *prog = NULL;
+
+ while (*progs) {
+ prog = *progs;
+ (void) rpcsvc_program_unregister (rpc, prog);
+ }
+
+ list_for_each_entry_safe (listener, next, &rpc->listeners, list) {
+ rpcsvc_listener_destroy (listener);
+ }
+
+ (void) rpcsvc_unregister_notify (rpc, fn, this);
+ unlink (sockfile);
+
+ GF_FREE (rpc);
+}
+
+rpcsvc_t *
+changelog_rpc_server_init (xlator_t *this, char *sockfile, void *cbkdata,
+ rpcsvc_notify_t fn, struct rpcsvc_program **progs)
+{
+ int j = 0;
+ int ret = 0;
+ rpcsvc_t *rpc = NULL;
+ dict_t *options = NULL;
+ struct rpcsvc_program *prog = NULL;
+
+ if (!cbkdata)
+ cbkdata = this;
+
+ options = dict_new ();
+ if (!options)
+ goto error_return;
+
+ ret = rpcsvc_transport_unix_options_build (&options, sockfile);
+ if (ret)
+ goto dealloc_dict;
+
+ rpc = rpcsvc_init (this, this->ctx, options, 8);
+ if (rpc == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to init rpc");
+ goto dealloc_dict;
+ }
+
+ ret = rpcsvc_register_notify (rpc, fn, cbkdata);
+ if (ret) {
+ gf_log (this->name,
+ GF_LOG_ERROR, "failed to register notify function");
+ goto dealloc_rpc;
+ }
+
+ ret = rpcsvc_create_listeners (rpc, options, this->name);
+ if (ret != 1) {
+ gf_log (this->name,
+ GF_LOG_DEBUG, "failed to create listeners");
+ goto dealloc_rpc;
+ }
+
+ while (*progs) {
+ prog = *progs;
+ ret = rpcsvc_program_register (rpc, prog);
+ if (ret) {
+ gf_log (this->name,
+ GF_LOG_ERROR, "cannot register program "
+ "(name: %s, prognum: %d, pogver: %d)",
+ prog->progname, prog->prognum, prog->progver);
+ goto dealloc_rpc;
+ }
+
+ progs++;
+ }
+
+ dict_unref (options);
+ return rpc;
+
+ dealloc_rpc:
+ GF_FREE (rpc);
+ dealloc_dict:
+ dict_unref (options);
+ error_return:
+ return NULL;
+}