diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 82 | 
1 files changed, 81 insertions, 1 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index eac9f875066..8d923ed5f43 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -31,6 +31,7 @@  #include "rpc-transport.h"  #include "protocol-common.h"  #include "mem-pool.h" +#include "xdr-rpc.h"  void  rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); @@ -653,6 +654,49 @@ out:          return ret;  } +int +rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) +{ +        char                 *msgbuf = NULL; +        rpcclnt_cb_program_t *program = NULL; +        struct rpc_msg        rpcmsg; +        struct iovec          progmsg; /* RPC Program payload */ +        size_t                msglen = 0; +        int                   found  = 0; +        int                   ret    = -1; +        int                   procnum = 0; + +        msgbuf = msg->vector[0].iov_base; +        msglen = msg->vector[0].iov_len; + +        ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL); +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC call decoding failed"); +                goto out; +        } + +        gf_log ("rpc-clnt", GF_LOG_INFO, "RPC XID: %lx, Ver: %ld, Program: %ld," +                " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg), +                rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), +                rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg)); + +        procnum = rpc_call_progproc (&rpcmsg); + +        list_for_each_entry (program, &clnt->programs, program) { +                if ((program->prognum == rpc_call_program (&rpcmsg)) +                    && (program->progver == rpc_call_progver (&rpcmsg))) { +                        found = 1; +                        break; +                } +        } +        if (found && (procnum < program->numactors) && +            (program->actors[procnum].actor)) { +                program->actors[procnum].actor (&progmsg); +        } + +out: +        return ret; +}  int  rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) @@ -796,7 +840,10 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,          case RPC_TRANSPORT_MSG_RECEIVED:          {                  pollin = data; -                ret = rpc_clnt_handle_reply (clnt, pollin); +                if (pollin->is_reply) +                        ret = rpc_clnt_handle_reply (clnt, pollin); +                else +                        ret = rpc_clnt_handle_cbk (clnt, pollin);                  /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG,                   * data);                   */ @@ -943,6 +990,8 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,                  goto out;          } +        INIT_LIST_HEAD (&rpc->programs); +  out:          return rpc;  } @@ -1168,6 +1217,37 @@ out:          return request_iob;  } +int +rpcclnt_cbk_program_register (struct rpc_clnt *clnt, +                              rpcclnt_cb_program_t *program) +{ +        int ret = -1; + +        if (!clnt) +                goto out; + +        if (program->actors == NULL) +                goto out; + +        INIT_LIST_HEAD (&program->program); + +        list_add_tail (&program->program, &clnt->programs); + +        ret = 0; +        gf_log ("rpc-clnt", GF_LOG_DEBUG, "New program registered: %s, Num: %d," +                " Ver: %d", program->progname, program->prognum, +                program->progver); + +out: +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "Program registration failed:" +                        " %s, Num: %d, Ver: %d", program->progname, +                        program->prognum, program->progver); +        } + +        return ret; +} +  int  rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,  | 
