diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-rpc-common.c')
| -rw-r--r-- | xlators/features/changelog/src/changelog-rpc-common.c | 525 | 
1 files changed, 261 insertions, 264 deletions
diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c index 056519232cf..ce01bf7a133 100644 --- a/xlators/features/changelog/src/changelog-rpc-common.c +++ b/xlators/features/changelog/src/changelog-rpc-common.c @@ -24,65 +24,63 @@   */  void * -changelog_rpc_poller (void *arg) +changelog_rpc_poller(void *arg)  { -        xlator_t *this = arg; +    xlator_t *this = arg; -        (void) event_dispatch (this->ctx->event_pool); -        return NULL; +    (void)event_dispatch(this->ctx->event_pool); +    return NULL;  }  struct rpc_clnt * -changelog_rpc_client_init (xlator_t *this, void *cbkdata, -                           char *sockfile, rpc_clnt_notify_t fn) +changelog_rpc_client_init(xlator_t *this, void *cbkdata, char *sockfile, +                          rpc_clnt_notify_t fn)  { -        int              ret         = 0; -        struct rpc_clnt *rpc         = NULL; -        dict_t          *options     = NULL; - -        if (!cbkdata) -                cbkdata = this; - -        options = dict_new (); -        if (!options) -                goto error_return; - -        ret = rpc_transport_unix_options_build (&options, sockfile, 0); -        if (ret) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_MSG_RPC_BUILD_ERROR, -                        "failed to build rpc options"); -                goto dealloc_dict; -        } - -        rpc = rpc_clnt_new (options, this, this->name, 16); -        if (!rpc) -                goto dealloc_dict; - -        ret = rpc_clnt_register_notify (rpc, fn, cbkdata); -        if (ret) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, -                        "failed to register notify"); -                goto dealloc_rpc_clnt; -        } - -        ret = rpc_clnt_start (rpc); -        if (ret) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_MSG_RPC_START_ERROR, -                        "failed to start rpc"); -                goto dealloc_rpc_clnt; -        } - -        return rpc; - - dealloc_rpc_clnt: -        rpc_clnt_unref (rpc); - dealloc_dict: -        dict_unref (options); - error_return: -        return NULL; +    int ret = 0; +    struct rpc_clnt *rpc = NULL; +    dict_t *options = NULL; + +    if (!cbkdata) +        cbkdata = this; + +    options = dict_new(); +    if (!options) +        goto error_return; + +    ret = rpc_transport_unix_options_build(&options, sockfile, 0); +    if (ret) { +        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_BUILD_ERROR, +               "failed to build rpc options"); +        goto dealloc_dict; +    } + +    rpc = rpc_clnt_new(options, this, this->name, 16); +    if (!rpc) +        goto dealloc_dict; + +    ret = rpc_clnt_register_notify(rpc, fn, cbkdata); +    if (ret) { +        gf_msg(this->name, GF_LOG_ERROR, 0, +               CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, +               "failed to register notify"); +        goto dealloc_rpc_clnt; +    } + +    ret = rpc_clnt_start(rpc); +    if (ret) { +        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR, +               "failed to start rpc"); +        goto dealloc_rpc_clnt; +    } + +    return rpc; + +dealloc_rpc_clnt: +    rpc_clnt_unref(rpc); +dealloc_dict: +    dict_unref(options); +error_return: +    return NULL;  }  /** @@ -90,96 +88,96 @@ changelog_rpc_client_init (xlator_t *this, void *cbkdata,   * RPC server.   */  int -changelog_rpc_sumbit_req (struct rpc_clnt *rpc, void *req, -                          call_frame_t *frame, rpc_clnt_prog_t *prog, -                          int procnum, struct iovec *payload, int payloadcnt, -                          struct iobref *iobref, xlator_t *this, -                          fop_cbk_fn_t cbkfn, xdrproc_t xdrproc) +changelog_rpc_sumbit_req(struct rpc_clnt *rpc, void *req, call_frame_t *frame, +                         rpc_clnt_prog_t *prog, int procnum, +                         struct iovec *payload, int payloadcnt, +                         struct iobref *iobref, xlator_t *this, +                         fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)  { -        int           ret        = 0; -        int           count      = 0; -        struct iovec  iov        = {0, }; -        struct iobuf *iobuf      = NULL; -        char          new_iobref = 0; -        ssize_t       xdr_size   = 0; - -        GF_ASSERT (this); +    int ret = 0; +    int count = 0; +    struct iovec iov = { +        0, +    }; +    struct iobuf *iobuf = NULL; +    char new_iobref = 0; +    ssize_t xdr_size = 0; -        if (req) { -                xdr_size = xdr_sizeof (xdrproc, req); +    GF_ASSERT(this); -                iobuf = iobuf_get2 (this->ctx->iobuf_pool, xdr_size); -                if (!iobuf) { -                        goto out; -                }; +    if (req) { +        xdr_size = xdr_sizeof(xdrproc, req); -                if (!iobref) { -                        iobref = iobref_new (); -                        if (!iobref) { -                                goto out; -                        } +        iobuf = iobuf_get2(this->ctx->iobuf_pool, xdr_size); +        if (!iobuf) { +            goto out; +        }; -                        new_iobref = 1; -                } +        if (!iobref) { +            iobref = iobref_new(); +            if (!iobref) { +                goto out; +            } -                iobref_add (iobref, iobuf); +            new_iobref = 1; +        } -                iov.iov_base = iobuf->ptr; -                iov.iov_len  = iobuf_size (iobuf); +        iobref_add(iobref, iobuf); -                /* Create the xdr payload */ -                ret = xdr_serialize_generic (iov, req, xdrproc); -                if (ret == -1) { -                        goto out; -                } +        iov.iov_base = iobuf->ptr; +        iov.iov_len = iobuf_size(iobuf); -                iov.iov_len = ret; -                count = 1; +        /* Create the xdr payload */ +        ret = xdr_serialize_generic(iov, req, xdrproc); +        if (ret == -1) { +            goto out;          } -        ret = rpc_clnt_submit (rpc, prog, procnum, cbkfn, &iov, count, -                               payload, payloadcnt, iobref, frame, NULL, -                               0, NULL, 0, NULL); +        iov.iov_len = ret; +        count = 1; +    } - out: -        if (new_iobref) -                iobref_unref (iobref); -        if (iobuf) -                iobuf_unref (iobuf); -        return ret; +    ret = rpc_clnt_submit(rpc, prog, procnum, cbkfn, &iov, count, payload, +                          payloadcnt, iobref, frame, NULL, 0, NULL, 0, NULL); + +out: +    if (new_iobref) +        iobref_unref(iobref); +    if (iobuf) +        iobuf_unref(iobuf); +    return ret;  }  /**   * Entry point to perform a remote procedure call   */  int -changelog_invoke_rpc (xlator_t *this, struct rpc_clnt *rpc, -                      rpc_clnt_prog_t *prog, int procidx, void *arg) +changelog_invoke_rpc(xlator_t *this, struct rpc_clnt *rpc, +                     rpc_clnt_prog_t *prog, int procidx, void *arg)  { -        int                   ret   = 0; -        call_frame_t         *frame = NULL; -        rpc_clnt_procedure_t *proc  = NULL; - -        if (!this || !prog) -                goto error_return; - -        frame = create_frame (this, this->ctx->pool); -        if (!frame) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_MSG_CREATE_FRAME_FAILED, -                        "failed to create frame"); -                goto error_return; -        } +    int ret = 0; +    call_frame_t *frame = NULL; +    rpc_clnt_procedure_t *proc = NULL; -        proc = &prog->proctable[procidx]; -        if (proc->fn) -                ret = proc->fn (frame, this, arg); +    if (!this || !prog) +        goto error_return; -        STACK_DESTROY (frame->root); -        return ret; +    frame = create_frame(this, this->ctx->pool); +    if (!frame) { +        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_CREATE_FRAME_FAILED, +               "failed to create frame"); +        goto error_return; +    } - error_return: -        return -1; +    proc = &prog->proctable[procidx]; +    if (proc->fn) +        ret = proc->fn(frame, this, arg); + +    STACK_DESTROY(frame->root); +    return ret; + +error_return: +    return -1;  }  /** @@ -189,170 +187,169 @@ changelog_invoke_rpc (xlator_t *this, struct rpc_clnt *rpc,  */  struct iobuf * -__changelog_rpc_serialize_reply (rpcsvc_request_t *req, void *arg, -                                 struct iovec *outmsg, xdrproc_t xdrproc) +__changelog_rpc_serialize_reply(rpcsvc_request_t *req, void *arg, +                                struct iovec *outmsg, xdrproc_t xdrproc)  { -        struct iobuf *iob      = NULL; -        ssize_t       retlen   = 0; -        ssize_t       rsp_size = 0; +    struct iobuf *iob = NULL; +    ssize_t retlen = 0; +    ssize_t rsp_size = 0; -        rsp_size = xdr_sizeof (xdrproc, arg); -        iob = iobuf_get2 (req->svc->ctx->iobuf_pool, rsp_size); -        if (!iob) -                goto error_return; +    rsp_size = xdr_sizeof(xdrproc, arg); +    iob = iobuf_get2(req->svc->ctx->iobuf_pool, rsp_size); +    if (!iob) +        goto error_return; -        iobuf_to_iovec (iob, outmsg); +    iobuf_to_iovec(iob, outmsg); -        retlen = xdr_serialize_generic (*outmsg, arg, xdrproc); -        if (retlen == -1) -                goto unref_iob; +    retlen = xdr_serialize_generic(*outmsg, arg, xdrproc); +    if (retlen == -1) +        goto unref_iob; -        outmsg->iov_len = retlen; -        return iob; +    outmsg->iov_len = retlen; +    return iob; - unref_iob: -        iobuf_unref (iob); - error_return: -        return NULL; +unref_iob: +    iobuf_unref(iob); +error_return: +    return NULL;  }  int -changelog_rpc_sumbit_reply (rpcsvc_request_t *req, -                            void *arg, struct iovec *payload, int payloadcount, -                            struct iobref *iobref, xdrproc_t xdrproc) +changelog_rpc_sumbit_reply(rpcsvc_request_t *req, void *arg, +                           struct iovec *payload, int payloadcount, +                           struct iobref *iobref, xdrproc_t xdrproc)  { -        int           ret        = -1; -        struct iobuf *iob        = NULL; -        struct iovec  iov        = {0,}; -        char          new_iobref = 0; - -        if (!req) -                goto return_ret; - -        if (!iobref) { -                iobref = iobref_new (); -                if (!iobref) -                        goto return_ret; -                new_iobref = 1; -        } - -        iob = __changelog_rpc_serialize_reply (req, arg, &iov, xdrproc); -        if (!iob) -                gf_msg ("", GF_LOG_ERROR, 0, -                        CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED, -                        "failed to serialize reply"); -        else -                iobref_add (iobref, iob); - -        ret = rpcsvc_submit_generic (req, &iov, -                                     1, payload, payloadcount, iobref); - -        if (new_iobref) -                iobref_unref (iobref); -        if (iob) -                iobuf_unref (iob); - return_ret: -        return ret; +    int ret = -1; +    struct iobuf *iob = NULL; +    struct iovec iov = { +        0, +    }; +    char new_iobref = 0; + +    if (!req) +        goto return_ret; + +    if (!iobref) { +        iobref = iobref_new(); +        if (!iobref) +            goto return_ret; +        new_iobref = 1; +    } + +    iob = __changelog_rpc_serialize_reply(req, arg, &iov, xdrproc); +    if (!iob) +        gf_msg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_SUBMIT_REPLY_FAILED, +               "failed to serialize reply"); +    else +        iobref_add(iobref, iob); + +    ret = rpcsvc_submit_generic(req, &iov, 1, payload, payloadcount, iobref); + +    if (new_iobref) +        iobref_unref(iobref); +    if (iob) +        iobuf_unref(iob); +return_ret: +    return ret;  }  void -changelog_rpc_server_destroy (xlator_t *this, rpcsvc_t *rpc, char *sockfile, -                              rpcsvc_notify_t fn, struct rpcsvc_program **progs) +changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile, +                             rpcsvc_notify_t fn, struct rpcsvc_program **progs)  { -        rpcsvc_listener_t      *listener = NULL; -        rpcsvc_listener_t      *next     = NULL; -        struct rpcsvc_program  *prog     = NULL; - -        while (*progs) { -                prog = *progs; -                (void) rpcsvc_program_unregister (rpc, prog); -                progs++; -        } - -        list_for_each_entry_safe (listener, next, &rpc->listeners, list) { -                rpcsvc_listener_destroy (listener); -        } - -        (void) rpcsvc_unregister_notify (rpc, fn, this); -        sys_unlink (sockfile); -        if (rpc->rxpool) { -                mem_pool_destroy (rpc->rxpool); -                rpc->rxpool = NULL; -        } - -        /* TODO Avoid freeing rpc object in case of brick multiplex -           after freeing rpc object svc->rpclock corrupted and it takes -           more time to detach a brick -        */ -        if (!this->cleanup_starting) -                GF_FREE (rpc); +    rpcsvc_listener_t *listener = NULL; +    rpcsvc_listener_t *next = NULL; +    struct rpcsvc_program *prog = NULL; + +    while (*progs) { +        prog = *progs; +        (void)rpcsvc_program_unregister(rpc, prog); +        progs++; +    } + +    list_for_each_entry_safe(listener, next, &rpc->listeners, list) +    { +        rpcsvc_listener_destroy(listener); +    } + +    (void)rpcsvc_unregister_notify(rpc, fn, this); +    sys_unlink(sockfile); +    if (rpc->rxpool) { +        mem_pool_destroy(rpc->rxpool); +        rpc->rxpool = NULL; +    } + +    /* TODO Avoid freeing rpc object in case of brick multiplex +       after freeing rpc object svc->rpclock corrupted and it takes +       more time to detach a brick +    */ +    if (!this->cleanup_starting) +        GF_FREE(rpc);  }  rpcsvc_t * -changelog_rpc_server_init (xlator_t *this, char *sockfile, void *cbkdata, -                           rpcsvc_notify_t fn, struct rpcsvc_program **progs) +changelog_rpc_server_init(xlator_t *this, char *sockfile, void *cbkdata, +                          rpcsvc_notify_t fn, struct rpcsvc_program **progs)  { -        int                    ret     = 0; -        rpcsvc_t              *rpc     = NULL; -        dict_t                *options = NULL; -        struct rpcsvc_program *prog    = NULL; - -        if (!cbkdata) -                cbkdata = this; - -        options = dict_new (); -        if (!options) -                goto error_return; - -        ret = rpcsvc_transport_unix_options_build (&options, sockfile); -        if (ret) -                goto dealloc_dict; - -        rpc = rpcsvc_init (this, this->ctx, options, 8); -        if (rpc == NULL) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_MSG_RPC_START_ERROR, -                        "failed to init rpc"); -                goto dealloc_dict; -        } - -        ret = rpcsvc_register_notify (rpc, fn, cbkdata); +    int ret = 0; +    rpcsvc_t *rpc = NULL; +    dict_t *options = NULL; +    struct rpcsvc_program *prog = NULL; + +    if (!cbkdata) +        cbkdata = this; + +    options = dict_new(); +    if (!options) +        goto error_return; + +    ret = rpcsvc_transport_unix_options_build(&options, sockfile); +    if (ret) +        goto dealloc_dict; + +    rpc = rpcsvc_init(this, this->ctx, options, 8); +    if (rpc == NULL) { +        gf_msg(this->name, GF_LOG_ERROR, 0, CHANGELOG_MSG_RPC_START_ERROR, +               "failed to init rpc"); +        goto dealloc_dict; +    } + +    ret = rpcsvc_register_notify(rpc, fn, cbkdata); +    if (ret) { +        gf_msg(this->name, GF_LOG_ERROR, 0, +               CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, +               "failed to register notify function"); +        goto dealloc_rpc; +    } + +    ret = rpcsvc_create_listeners(rpc, options, this->name); +    if (ret != 1) { +        gf_msg_debug(this->name, 0, "failed to create listeners"); +        goto dealloc_rpc; +    } + +    while (*progs) { +        prog = *progs; +        ret = rpcsvc_program_register(rpc, prog, _gf_false);          if (ret) { -                gf_msg (this->name, GF_LOG_ERROR, 0, -                        CHANGELOG_MSG_NOTIFY_REGISTER_FAILED, -                        "failed to register notify function"); -                goto dealloc_rpc; -        } - -        ret = rpcsvc_create_listeners (rpc, options, this->name); -        if (ret != 1) { -                gf_msg_debug (this->name, -                              0, "failed to create listeners"); -                goto dealloc_rpc; +            gf_msg(this->name, GF_LOG_ERROR, 0, +                   CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED, +                   "cannot register program " +                   "(name: %s, prognum: %d, pogver: %d)", +                   prog->progname, prog->prognum, prog->progver); +            goto dealloc_rpc;          } -        while (*progs) { -                prog = *progs; -                ret = rpcsvc_program_register (rpc, prog, _gf_false); -                if (ret) { -                        gf_msg (this->name, GF_LOG_ERROR, 0, -                                CHANGELOG_MSG_PROGRAM_NAME_REG_FAILED, -                                "cannot register program " -                                "(name: %s, prognum: %d, pogver: %d)", -                                prog->progname, prog->prognum, prog->progver); -                        goto dealloc_rpc; -                } - -                progs++; -        } +        progs++; +    } -        dict_unref (options); -        return rpc; +    dict_unref(options); +    return rpc; - dealloc_rpc: -        GF_FREE (rpc); - dealloc_dict: -        dict_unref (options); - error_return: -        return NULL; +dealloc_rpc: +    GF_FREE(rpc); +dealloc_dict: +    dict_unref(options); +error_return: +    return NULL;  }  | 
