diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 82 |
1 files changed, 81 insertions, 1 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index eac9f875066..8d923ed5f43 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -31,6 +31,7 @@ #include "rpc-transport.h" #include "protocol-common.h" #include "mem-pool.h" +#include "xdr-rpc.h" void rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); @@ -653,6 +654,49 @@ out: return ret; } +int +rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) +{ + char *msgbuf = NULL; + rpcclnt_cb_program_t *program = NULL; + struct rpc_msg rpcmsg; + struct iovec progmsg; /* RPC Program payload */ + size_t msglen = 0; + int found = 0; + int ret = -1; + int procnum = 0; + + msgbuf = msg->vector[0].iov_base; + msglen = msg->vector[0].iov_len; + + ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL); + if (ret == -1) { + gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC call decoding failed"); + goto out; + } + + gf_log ("rpc-clnt", GF_LOG_INFO, "RPC XID: %lx, Ver: %ld, Program: %ld," + " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg), + rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), + rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg)); + + procnum = rpc_call_progproc (&rpcmsg); + + list_for_each_entry (program, &clnt->programs, program) { + if ((program->prognum == rpc_call_program (&rpcmsg)) + && (program->progver == rpc_call_progver (&rpcmsg))) { + found = 1; + break; + } + } + if (found && (procnum < program->numactors) && + (program->actors[procnum].actor)) { + program->actors[procnum].actor (&progmsg); + } + +out: + return ret; +} int rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) @@ -796,7 +840,10 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, case RPC_TRANSPORT_MSG_RECEIVED: { pollin = data; - ret = rpc_clnt_handle_reply (clnt, pollin); + if (pollin->is_reply) + ret = rpc_clnt_handle_reply (clnt, pollin); + else + ret = rpc_clnt_handle_cbk (clnt, pollin); /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG, * data); */ @@ -943,6 +990,8 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, goto out; } + INIT_LIST_HEAD (&rpc->programs); + out: return rpc; } @@ -1168,6 +1217,37 @@ out: return request_iob; } +int +rpcclnt_cbk_program_register (struct rpc_clnt *clnt, + rpcclnt_cb_program_t *program) +{ + int ret = -1; + + if (!clnt) + goto out; + + if (program->actors == NULL) + goto out; + + INIT_LIST_HEAD (&program->program); + + list_add_tail (&program->program, &clnt->programs); + + ret = 0; + gf_log ("rpc-clnt", GF_LOG_DEBUG, "New program registered: %s, Num: %d," + " Ver: %d", program->progname, program->prognum, + program->progver); + +out: + if (ret == -1) { + gf_log ("rpc-clnt", GF_LOG_ERROR, "Program registration failed:" + " %s, Num: %d, Ver: %d", program->progname, + program->prognum, program->progver); + } + + return ret; +} + int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, |