summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpc-clnt.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c82
1 files changed, 81 insertions, 1 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index eac9f875066..8d923ed5f43 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -31,6 +31,7 @@
#include "rpc-transport.h"
#include "protocol-common.h"
#include "mem-pool.h"
+#include "xdr-rpc.h"
void
rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool);
@@ -653,6 +654,49 @@ out:
return ret;
}
+int
+rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg)
+{
+ char *msgbuf = NULL;
+ rpcclnt_cb_program_t *program = NULL;
+ struct rpc_msg rpcmsg;
+ struct iovec progmsg; /* RPC Program payload */
+ size_t msglen = 0;
+ int found = 0;
+ int ret = -1;
+ int procnum = 0;
+
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
+
+ ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL);
+ if (ret == -1) {
+ gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC call decoding failed");
+ goto out;
+ }
+
+ gf_log ("rpc-clnt", GF_LOG_INFO, "RPC XID: %lx, Ver: %ld, Program: %ld,"
+ " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg),
+ rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
+ rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg));
+
+ procnum = rpc_call_progproc (&rpcmsg);
+
+ list_for_each_entry (program, &clnt->programs, program) {
+ if ((program->prognum == rpc_call_program (&rpcmsg))
+ && (program->progver == rpc_call_progver (&rpcmsg))) {
+ found = 1;
+ break;
+ }
+ }
+ if (found && (procnum < program->numactors) &&
+ (program->actors[procnum].actor)) {
+ program->actors[procnum].actor (&progmsg);
+ }
+
+out:
+ return ret;
+}
int
rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
@@ -796,7 +840,10 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
case RPC_TRANSPORT_MSG_RECEIVED:
{
pollin = data;
- ret = rpc_clnt_handle_reply (clnt, pollin);
+ if (pollin->is_reply)
+ ret = rpc_clnt_handle_reply (clnt, pollin);
+ else
+ ret = rpc_clnt_handle_cbk (clnt, pollin);
/* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG,
* data);
*/
@@ -943,6 +990,8 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
goto out;
}
+ INIT_LIST_HEAD (&rpc->programs);
+
out:
return rpc;
}
@@ -1168,6 +1217,37 @@ out:
return request_iob;
}
+int
+rpcclnt_cbk_program_register (struct rpc_clnt *clnt,
+ rpcclnt_cb_program_t *program)
+{
+ int ret = -1;
+
+ if (!clnt)
+ goto out;
+
+ if (program->actors == NULL)
+ goto out;
+
+ INIT_LIST_HEAD (&program->program);
+
+ list_add_tail (&program->program, &clnt->programs);
+
+ ret = 0;
+ gf_log ("rpc-clnt", GF_LOG_DEBUG, "New program registered: %s, Num: %d,"
+ " Ver: %d", program->progname, program->prognum,
+ program->progver);
+
+out:
+ if (ret == -1) {
+ gf_log ("rpc-clnt", GF_LOG_ERROR, "Program registration failed:"
+ " %s, Num: %d, Ver: %d", program->progname,
+ program->prognum, program->progver);
+ }
+
+ return ret;
+}
+
int
rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,