diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-rpc-common.c')
-rw-r--r-- | xlators/features/changelog/src/changelog-rpc-common.c | 525 |
1 files changed, 261 insertions, 264 deletions
diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c index 056519232cf..ce01bf7a133 100644 --- a/xlators/features/changelog/src/changelog-rpc-common.c +++ b/xlators/features/changelog/src/changelog-rpc-common.c @@ -24,65 +24,63 @@ */ void * -changelog_rpc_poller (void *arg) +changelog_rpc_poller(void *arg) { - xlator_t *this = arg; + xlator_t *this = arg; - (void) event_dispatch (this->ctx->event_pool); - return NULL; + (void)event_dispatch(this->ctx->event_pool); + return NULL; } struct rpc_clnt * -changelog_rpc_client_init (xlator_t *this, void *cbkdata, - char *sockfile, rpc_clnt_notify_t fn) +changelog_rpc_client_init(xlator_t *this, void *cbkdata, char *sockfile, + rpc_clnt_notify_t fn) { - int ret = 0; - struct rpc_clnt *rpc = NULL; - dict_t *options = NULL; - - if (!cbkdata) - cbkdata = this; - - options = dict_new (); - if (!options) - goto error_return; - - ret = rpc_transport_unix_options_build (&options, sockfile, 0); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_RPC_BUILD_ERROR, - "failed to build rpc options"); - goto dealloc_dict; - } - - rpc = rpc_clnt_new (options, this, this->name, 16); - if (!rpc) - goto dealloc_dict; - - ret = rpc_clnt_register_notify (rpc, fn, cbkdata); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, - "failed to register notify"); - goto dealloc_rpc_clnt; - } - - ret = rpc_clnt_start (rpc); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_RPC_START_ERROR, - "failed to start rpc"); - goto dealloc_rpc_clnt; - } - - return rpc; - - dealloc_rpc_clnt: - rpc_clnt_unref (rpc); - dealloc_dict: - dict_unref (options); - error_return: - return NULL; + int ret = 0; + struct rpc_clnt *rpc = NULL; + dict_t *options = NULL; + + if (!cbkdata) + cbkdata = this; + + options = dict_new(); + if (!options) + goto error_return; + + ret = rpc_transport_unix_options_build(&options, sockfile, 0); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_BUILD_ERROR, + "failed to build rpc options"); + goto dealloc_dict; + } + + rpc = rpc_clnt_new(options, this, this->name, 16); + if (!rpc) + goto dealloc_dict; + + ret = rpc_clnt_register_notify(rpc, fn, cbkdata); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, + "failed to register notify"); + goto dealloc_rpc_clnt; + } + + ret = rpc_clnt_start(rpc); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR, + "failed to start rpc"); + goto dealloc_rpc_clnt; + } + + return rpc; + +dealloc_rpc_clnt: + rpc_clnt_unref(rpc); +dealloc_dict: + dict_unref(options); +error_return: + return NULL; } /** @@ -90,96 +88,96 @@ changelog_rpc_client_init (xlator_t *this, void *cbkdata, * RPC server. */ int -changelog_rpc_sumbit_req (struct rpc_clnt *rpc, void *req, - call_frame_t *frame, rpc_clnt_prog_t *prog, - int procnum, struct iovec *payload, int payloadcnt, - struct iobref *iobref, xlator_t *this, - fop_cbk_fn_t cbkfn, xdrproc_t xdrproc) +changelog_rpc_sumbit_req(struct rpc_clnt *rpc, void *req, call_frame_t *frame, + rpc_clnt_prog_t *prog, int procnum, + struct iovec *payload, int payloadcnt, + struct iobref *iobref, xlator_t *this, + fop_cbk_fn_t cbkfn, xdrproc_t xdrproc) { - int ret = 0; - int count = 0; - struct iovec iov = {0, }; - struct iobuf *iobuf = NULL; - char new_iobref = 0; - ssize_t xdr_size = 0; - - GF_ASSERT (this); + int ret = 0; + int count = 0; + struct iovec iov = { + 0, + }; + struct iobuf *iobuf = NULL; + char new_iobref = 0; + ssize_t xdr_size = 0; - if (req) { - xdr_size = xdr_sizeof (xdrproc, req); + GF_ASSERT(this); - iobuf = iobuf_get2 (this->ctx->iobuf_pool, xdr_size); - if (!iobuf) { - goto out; - }; + if (req) { + xdr_size = xdr_sizeof(xdrproc, req); - if (!iobref) { - iobref = iobref_new (); - if (!iobref) { - goto out; - } + iobuf = iobuf_get2(this->ctx->iobuf_pool, xdr_size); + if (!iobuf) { + goto out; + }; - new_iobref = 1; - } + if (!iobref) { + iobref = iobref_new(); + if (!iobref) { + goto out; + } - iobref_add (iobref, iobuf); + new_iobref = 1; + } - iov.iov_base = iobuf->ptr; - iov.iov_len = iobuf_size (iobuf); + iobref_add(iobref, iobuf); - /* Create the xdr payload */ - ret = xdr_serialize_generic (iov, req, xdrproc); - if (ret == -1) { - goto out; - } + iov.iov_base = iobuf->ptr; + iov.iov_len = iobuf_size(iobuf); - iov.iov_len = ret; - count = 1; + /* Create the xdr payload */ + ret = xdr_serialize_generic(iov, req, xdrproc); + if (ret == -1) { + goto out; } - ret = rpc_clnt_submit (rpc, prog, procnum, cbkfn, &iov, count, - payload, payloadcnt, iobref, frame, NULL, - 0, NULL, 0, NULL); + iov.iov_len = ret; + count = 1; + } - out: - if (new_iobref) - iobref_unref (iobref); - if (iobuf) - iobuf_unref (iobuf); - return ret; + ret = rpc_clnt_submit(rpc, prog, procnum, cbkfn, &iov, count, payload, + payloadcnt, iobref, frame, NULL, 0, NULL, 0, NULL); + +out: + if (new_iobref) + iobref_unref(iobref); + if (iobuf) + iobuf_unref(iobuf); + return ret; } /** * Entry point to perform a remote procedure call */ int -changelog_invoke_rpc (xlator_t *this, struct rpc_clnt *rpc, - rpc_clnt_prog_t *prog, int procidx, void *arg) +changelog_invoke_rpc(xlator_t *this, struct rpc_clnt *rpc, + rpc_clnt_prog_t *prog, int procidx, void *arg) { - int ret = 0; - call_frame_t *frame = NULL; - rpc_clnt_procedure_t *proc = NULL; - - if (!this || !prog) - goto error_return; - - frame = create_frame (this, this->ctx->pool); - if (!frame) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_CREATE_FRAME_FAILED, - "failed to create frame"); - goto error_return; - } + int ret = 0; + call_frame_t *frame = NULL; + rpc_clnt_procedure_t *proc = NULL; - proc = &prog->proctable[procidx]; - if (proc->fn) - ret = proc->fn (frame, this, arg); + if (!this || !prog) + goto error_return; - STACK_DESTROY (frame->root); - return ret; + frame = create_frame(this, this->ctx->pool); + if (!frame) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CREATE_FRAME_FAILED, + "failed to create frame"); + goto error_return; + } - error_return: - return -1; + proc = &prog->proctable[procidx]; + if (proc->fn) + ret = proc->fn(frame, this, arg); + + STACK_DESTROY(frame->root); + return ret; + +error_return: + return -1; } /** @@ -189,170 +187,169 @@ changelog_invoke_rpc (xlator_t *this, struct rpc_clnt *rpc, */ struct iobuf * -__changelog_rpc_serialize_reply (rpcsvc_request_t *req, void *arg, - struct iovec *outmsg, xdrproc_t xdrproc) +__changelog_rpc_serialize_reply(rpcsvc_request_t *req, void *arg, + struct iovec *outmsg, xdrproc_t xdrproc) { - struct iobuf *iob = NULL; - ssize_t retlen = 0; - ssize_t rsp_size = 0; + struct iobuf *iob = NULL; + ssize_t retlen = 0; + ssize_t rsp_size = 0; - rsp_size = xdr_sizeof (xdrproc, arg); - iob = iobuf_get2 (req->svc->ctx->iobuf_pool, rsp_size); - if (!iob) - goto error_return; + rsp_size = xdr_sizeof(xdrproc, arg); + iob = iobuf_get2(req->svc->ctx->iobuf_pool, rsp_size); + if (!iob) + goto error_return; - iobuf_to_iovec (iob, outmsg); + iobuf_to_iovec(iob, outmsg); - retlen = xdr_serialize_generic (*outmsg, arg, xdrproc); - if (retlen == -1) - goto unref_iob; + retlen = xdr_serialize_generic(*outmsg, arg, xdrproc); + if (retlen == -1) + goto unref_iob; - outmsg->iov_len = retlen; - return iob; + outmsg->iov_len = retlen; + return iob; - unref_iob: - iobuf_unref (iob); - error_return: - return NULL; +unref_iob: + iobuf_unref(iob); +error_return: + return NULL; } int -changelog_rpc_sumbit_reply (rpcsvc_request_t *req, - void *arg, struct iovec *payload, int payloadcount, - struct iobref *iobref, xdrproc_t xdrproc) +changelog_rpc_sumbit_reply(rpcsvc_request_t *req, void *arg, + struct iovec *payload, int payloadcount, + struct iobref *iobref, xdrproc_t xdrproc) { - int ret = -1; - struct iobuf *iob = NULL; - struct iovec iov = {0,}; - char new_iobref = 0; - - if (!req) - goto return_ret; - - if (!iobref) { - iobref = iobref_new (); - if (!iobref) - goto return_ret; - new_iobref = 1; - } - - iob = __changelog_rpc_serialize_reply (req, arg, &iov, xdrproc); - if (!iob) - gf_msg ("", GF_LOG_ERROR, 0, - CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED, - "failed to serialize reply"); - else - iobref_add (iobref, iob); - - ret = rpcsvc_submit_generic (req, &iov, - 1, payload, payloadcount, iobref); - - if (new_iobref) - iobref_unref (iobref); - if (iob) - iobuf_unref (iob); - return_ret: - return ret; + int ret = -1; + struct iobuf *iob = NULL; + struct iovec iov = { + 0, + }; + char new_iobref = 0; + + if (!req) + goto return_ret; + + if (!iobref) { + iobref = iobref_new(); + if (!iobref) + goto return_ret; + new_iobref = 1; + } + + iob = __changelog_rpc_serialize_reply(req, arg, &iov, xdrproc); + if (!iob) + gf_msg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED, + "failed to serialize reply"); + else + iobref_add(iobref, iob); + + ret = rpcsvc_submit_generic(req, &iov, 1, payload, payloadcount, iobref); + + if (new_iobref) + iobref_unref(iobref); + if (iob) + iobuf_unref(iob); +return_ret: + return ret; } void -changelog_rpc_server_destroy (xlator_t *this, rpcsvc_t *rpc, char *sockfile, - rpcsvc_notify_t fn, struct rpcsvc_program **progs) +changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile, + rpcsvc_notify_t fn, struct rpcsvc_program **progs) { - rpcsvc_listener_t *listener = NULL; - rpcsvc_listener_t *next = NULL; - struct rpcsvc_program *prog = NULL; - - while (*progs) { - prog = *progs; - (void) rpcsvc_program_unregister (rpc, prog); - progs++; - } - - list_for_each_entry_safe (listener, next, &rpc->listeners, list) { - rpcsvc_listener_destroy (listener); - } - - (void) rpcsvc_unregister_notify (rpc, fn, this); - sys_unlink (sockfile); - if (rpc->rxpool) { - mem_pool_destroy (rpc->rxpool); - rpc->rxpool = NULL; - } - - /* TODO Avoid freeing rpc object in case of brick multiplex - after freeing rpc object svc->rpclock corrupted and it takes - more time to detach a brick - */ - if (!this->cleanup_starting) - GF_FREE (rpc); + rpcsvc_listener_t *listener = NULL; + rpcsvc_listener_t *next = NULL; + struct rpcsvc_program *prog = NULL; + + while (*progs) { + prog = *progs; + (void)rpcsvc_program_unregister(rpc, prog); + progs++; + } + + list_for_each_entry_safe(listener, next, &rpc->listeners, list) + { + rpcsvc_listener_destroy(listener); + } + + (void)rpcsvc_unregister_notify(rpc, fn, this); + sys_unlink(sockfile); + if (rpc->rxpool) { + mem_pool_destroy(rpc->rxpool); + rpc->rxpool = NULL; + } + + /* TODO Avoid freeing rpc object in case of brick multiplex + after freeing rpc object svc->rpclock corrupted and it takes + more time to detach a brick + */ + if (!this->cleanup_starting) + GF_FREE(rpc); } rpcsvc_t * -changelog_rpc_server_init (xlator_t *this, char *sockfile, void *cbkdata, - rpcsvc_notify_t fn, struct rpcsvc_program **progs) +changelog_rpc_server_init(xlator_t *this, char *sockfile, void *cbkdata, + rpcsvc_notify_t fn, struct rpcsvc_program **progs) { - int ret = 0; - rpcsvc_t *rpc = NULL; - dict_t *options = NULL; - struct rpcsvc_program *prog = NULL; - - if (!cbkdata) - cbkdata = this; - - options = dict_new (); - if (!options) - goto error_return; - - ret = rpcsvc_transport_unix_options_build (&options, sockfile); - if (ret) - goto dealloc_dict; - - rpc = rpcsvc_init (this, this->ctx, options, 8); - if (rpc == NULL) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_RPC_START_ERROR, - "failed to init rpc"); - goto dealloc_dict; - } - - ret = rpcsvc_register_notify (rpc, fn, cbkdata); + int ret = 0; + rpcsvc_t *rpc = NULL; + dict_t *options = NULL; + struct rpcsvc_program *prog = NULL; + + if (!cbkdata) + cbkdata = this; + + options = dict_new(); + if (!options) + goto error_return; + + ret = rpcsvc_transport_unix_options_build(&options, sockfile); + if (ret) + goto dealloc_dict; + + rpc = rpcsvc_init(this, this->ctx, options, 8); + if (rpc == NULL) { + gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR, + "failed to init rpc"); + goto dealloc_dict; + } + + ret = rpcsvc_register_notify(rpc, fn, cbkdata); + if (ret) { + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, + "failed to register notify function"); + goto dealloc_rpc; + } + + ret = rpcsvc_create_listeners(rpc, options, this->name); + if (ret != 1) { + gf_msg_debug(this->name, 0, "failed to create listeners"); + goto dealloc_rpc; + } + + while (*progs) { + prog = *progs; + ret = rpcsvc_program_register(rpc, prog, _gf_false); if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, - "failed to register notify function"); - goto dealloc_rpc; - } - - ret = rpcsvc_create_listeners (rpc, options, this->name); - if (ret != 1) { - gf_msg_debug (this->name, - 0, "failed to create listeners"); - goto dealloc_rpc; + gf_msg(this->name, GF_LOG_ERROR, 0, + CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED, + "cannot register program " + "(name: %s, prognum: %d, pogver: %d)", + prog->progname, prog->prognum, prog->progver); + goto dealloc_rpc; } - while (*progs) { - prog = *progs; - ret = rpcsvc_program_register (rpc, prog, _gf_false); - if (ret) { - gf_msg (this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED, - "cannot register program " - "(name: %s, prognum: %d, pogver: %d)", - prog->progname, prog->prognum, prog->progver); - goto dealloc_rpc; - } - - progs++; - } + progs++; + } - dict_unref (options); - return rpc; + dict_unref(options); + return rpc; - dealloc_rpc: - GF_FREE (rpc); - dealloc_dict: - dict_unref (options); - error_return: - return NULL; +dealloc_rpc: + GF_FREE(rpc); +dealloc_dict: + dict_unref(options); +error_return: + return NULL; } |