From 8071909e84b6a479a6b5be1eddd15e8b16fc1a80 Mon Sep 17 00:00:00 2001 From: Amar Tumballi Date: Fri, 27 Aug 2010 06:45:38 +0000 Subject: rpc: server to client callback mechanism Signed-off-by: Amar Tumballi Signed-off-by: Vijay Bellur --- rpc/rpc-lib/src/protocol-common.h | 18 +++- rpc/rpc-lib/src/rpc-clnt.c | 82 ++++++++++++++- rpc/rpc-lib/src/rpc-clnt.h | 43 +++++++- rpc/rpc-lib/src/rpc-transport.h | 3 + rpc/rpc-lib/src/rpcsvc.c | 185 ++++++++++++++++++++++++++++++++++ rpc/rpc-lib/src/rpcsvc.h | 11 +- rpc/rpc-transport/socket/src/socket.c | 2 + 7 files changed, 337 insertions(+), 7 deletions(-) (limited to 'rpc') diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h index 4df5a554fec..fdb42dfe663 100644 --- a/rpc/rpc-lib/src/protocol-common.h +++ b/rpc/rpc-lib/src/protocol-common.h @@ -140,6 +140,13 @@ enum gf_probe_resp { GF_PROBE_FRIEND, }; +enum gf_cbk_procnum { + GF_CBK_NULL = 0, + GF_CBK_FETCHSPEC, + GF_CBK_INO_FLUSH, + GF_CBK_MAXVALUE, +}; + #define GLUSTER3_1_FOP_PROGRAM 1298437 /* Completely random */ #define GLUSTER3_1_FOP_VERSION 310 /* 3.1.0 */ #define GLUSTER3_1_FOP_PROCCNT GFS3_OP_MAXVALUE @@ -152,10 +159,13 @@ enum gf_probe_resp { #define GLUSTER3_1_CLI_VERSION 1 /* 0.0.1 */ #define GLUSTER3_1_CLI_PROCCNT GF1_CLI_MAXVALUE -#define GLUSTER_HNDSK_PROGRAM 14398633 /* Completely random */ -#define GLUSTER_HNDSK_VERSION 1 /* 0.0.1 */ +#define GLUSTER_HNDSK_PROGRAM 14398633 /* Completely random */ +#define GLUSTER_HNDSK_VERSION 1 /* 0.0.1 */ + +#define GLUSTER_PMAP_PROGRAM 34123456 +#define GLUSTER_PMAP_VERSION 1 -#define GLUSTER_PMAP_PROGRAM 34123456 -#define GLUSTER_PMAP_VERSION 1 +#define GLUSTER_CBK_PROGRAM 52743234 /* Completely random */ +#define GLUSTER_CBK_VERSION 1 /* 0.0.1 */ #endif /* !_PROTOCOL_COMMON_H */ diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index eac9f875066..8d923ed5f43 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -31,6 +31,7 @@ #include "rpc-transport.h" #include "protocol-common.h" #include "mem-pool.h" +#include "xdr-rpc.h" void rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); @@ -653,6 +654,49 @@ out: return ret; } +int +rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) +{ + char *msgbuf = NULL; + rpcclnt_cb_program_t *program = NULL; + struct rpc_msg rpcmsg; + struct iovec progmsg; /* RPC Program payload */ + size_t msglen = 0; + int found = 0; + int ret = -1; + int procnum = 0; + + msgbuf = msg->vector[0].iov_base; + msglen = msg->vector[0].iov_len; + + ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL); + if (ret == -1) { + gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC call decoding failed"); + goto out; + } + + gf_log ("rpc-clnt", GF_LOG_INFO, "RPC XID: %lx, Ver: %ld, Program: %ld," + " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg), + rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), + rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg)); + + procnum = rpc_call_progproc (&rpcmsg); + + list_for_each_entry (program, &clnt->programs, program) { + if ((program->prognum == rpc_call_program (&rpcmsg)) + && (program->progver == rpc_call_progver (&rpcmsg))) { + found = 1; + break; + } + } + if (found && (procnum < program->numactors) && + (program->actors[procnum].actor)) { + program->actors[procnum].actor (&progmsg); + } + +out: + return ret; +} int rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) @@ -796,7 +840,10 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata, case RPC_TRANSPORT_MSG_RECEIVED: { pollin = data; - ret = rpc_clnt_handle_reply (clnt, pollin); + if (pollin->is_reply) + ret = rpc_clnt_handle_reply (clnt, pollin); + else + ret = rpc_clnt_handle_cbk (clnt, pollin); /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG, * data); */ @@ -943,6 +990,8 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, goto out; } + INIT_LIST_HEAD (&rpc->programs); + out: return rpc; } @@ -1168,6 +1217,37 @@ out: return request_iob; } +int +rpcclnt_cbk_program_register (struct rpc_clnt *clnt, + rpcclnt_cb_program_t *program) +{ + int ret = -1; + + if (!clnt) + goto out; + + if (program->actors == NULL) + goto out; + + INIT_LIST_HEAD (&program->program); + + list_add_tail (&program->program, &clnt->programs); + + ret = 0; + gf_log ("rpc-clnt", GF_LOG_DEBUG, "New program registered: %s, Num: %d," + " Ver: %d", program->progname, program->prognum, + program->progver); + +out: + if (ret == -1) { + gf_log ("rpc-clnt", GF_LOG_ERROR, "Program registration failed:" + " %s, Num: %d, Ver: %d", program->progname, + program->prognum, program->progver); + } + + return ret; +} + int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index 2381aaa087c..a0251c7c551 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -83,6 +83,38 @@ typedef struct rpc_clnt_program { int numproc; } rpc_clnt_prog_t; +typedef int (*rpcclnt_cb_fn) (void *data); + +/* The descriptor for each procedure/actor that runs + * over the RPC service. + */ +typedef struct rpcclnt_actor_desc { + char procname[32]; + int procnum; + rpcclnt_cb_fn actor; +} rpcclnt_cb_actor_t; + +/* Describes a program and its version along with the function pointers + * required to handle the procedures/actors of each program/version. + * Never changed ever by any thread so no need for a lock. + */ +typedef struct rpcclnt_cb_program { + char progname[32]; + int prognum; + int progver; + rpcclnt_cb_actor_t *actors; /* All procedure handlers */ + int numactors; /* Num actors in actor array */ + + /* Program specific state handed to actors */ + void *private; + + + /* list member to link to list of registered services with rpc_clnt */ + struct list_head program; +} rpcclnt_cb_program_t; + + + #define RPC_MAX_AUTH_BYTES 400 typedef struct rpc_auth_data { int flavour; @@ -141,6 +173,9 @@ struct rpc_clnt { void *mydata; uint64_t xid; + /* list of cb programs registered with rpc-clnt */ + struct list_head programs; + /* Memory pool for rpc_req_t */ struct mem_pool *reqpool; @@ -171,7 +206,7 @@ int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, * also be filled with pointer to buffer to hold header and length * of the header. */ - + int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum, fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount, @@ -190,4 +225,10 @@ void rpc_clnt_reconnect (void *trans_ptr); void rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config); +/* All users of RPC services should use this API to register their + * procedure handlers. + */ +int rpcclnt_cbk_program_register (struct rpc_clnt *svc, + rpcclnt_cb_program_t *program); + #endif /* !_RPC_CLNT_H */ diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index c5b6f382e7b..cccae5f261d 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -166,6 +166,7 @@ struct rpc_transport_pollin { char vectored; void *private; struct iobref *iobref; + char is_reply; }; typedef struct rpc_transport_pollin rpc_transport_pollin_t; @@ -196,6 +197,8 @@ struct rpc_transport { void *notify_data; peer_info_t peerinfo; peer_info_t myinfo; + + struct list_head list; }; struct rpc_transport_ops { diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 5b5c2998c5e..ee3d674c2eb 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -46,6 +46,8 @@ #include #include +#include "xdr-rpcclnt.h" + struct rpcsvc_program gluster_dump_prog; #define rpcsvc_alloc_request(svc, request) \ @@ -1119,6 +1121,189 @@ err: return txrecord; } +inline int +rpcsvc_get_callid (rpcsvc_t *rpc) +{ + return GF_UNIVERSAL_ANSWER; +} + +int +rpcsvc_fill_callback (int prognum, int progver, int procnum, int payload, + uint64_t xid, struct rpc_msg *request) +{ + int ret = -1; + + if (!request) { + goto out; + } + + memset (request, 0, sizeof (*request)); + + request->rm_xid = xid; + request->rm_direction = CALL; + + request->rm_call.cb_rpcvers = 2; + request->rm_call.cb_prog = prognum; + request->rm_call.cb_vers = progver; + request->rm_call.cb_proc = procnum; + + request->rm_call.cb_cred.oa_flavor = AUTH_NONE; + request->rm_call.cb_cred.oa_base = NULL; + request->rm_call.cb_cred.oa_length = 0; + + request->rm_call.cb_verf.oa_flavor = AUTH_NONE; + request->rm_call.cb_verf.oa_base = NULL; + request->rm_call.cb_verf.oa_length = 0; + + ret = 0; +out: + return ret; +} + + +struct iovec +rpcsvc_callback_build_header (char *recordstart, size_t rlen, + struct rpc_msg *request, size_t payload) +{ + struct iovec requesthdr = {0, }; + struct iovec txrecord = {0, 0}; + int ret = -1; + size_t fraglen = 0; + + ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr); + if (ret == -1) { + gf_log ("rpcsvc", GF_LOG_DEBUG, + "Failed to create RPC request"); + goto out; + } + + fraglen = payload + requesthdr.iov_len; + gf_log ("rpcsvc", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, " + "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len); + + txrecord.iov_base = recordstart; + + /* Remember, this is only the vec for the RPC header and does not + * include the payload above. We needed the payload only to calculate + * the size of the full fragment. This size is sent in the fragment + * header. + */ + txrecord.iov_len = requesthdr.iov_len; + +out: + return txrecord; +} + +struct iobuf * +rpcsvc_callback_build_record (rpcsvc_t *rpc, int prognum, int progver, + int procnum, size_t payload, uint64_t xid, + struct iovec *recbuf) +{ + struct rpc_msg request = {0, }; + struct iobuf *request_iob = NULL; + char *record = NULL; + struct iovec recordhdr = {0, }; + size_t pagesize = 0; + int ret = -1; + + if ((!rpc) || (!recbuf)) { + goto out; + } + + /* First, try to get a pointer into the buffer which the RPC + * layer can use. + */ + request_iob = iobuf_get (rpc->ctx->iobuf_pool); + if (!request_iob) { + gf_log ("rpcsvc", GF_LOG_ERROR, "Failed to get iobuf"); + goto out; + } + + pagesize = ((struct iobuf_pool *)rpc->ctx->iobuf_pool)->page_size; + + record = iobuf_ptr (request_iob); /* Now we have it. */ + + /* Fill the rpc structure and XDR it into the buffer got above. */ + ret = rpcsvc_fill_callback (prognum, progver, procnum, payload, xid, + &request); + if (ret == -1) { + gf_log ("rpcsvc", GF_LOG_DEBUG, "cannot build a rpc-request " + "xid (%"PRIu64")", xid); + goto out; + } + + recordhdr = rpcsvc_callback_build_header (record, pagesize, &request, + payload); + + if (!recordhdr.iov_base) { + gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record " + " header"); + iobuf_unref (request_iob); + request_iob = NULL; + recbuf->iov_base = NULL; + goto out; + } + + recbuf->iov_base = recordhdr.iov_base; + recbuf->iov_len = recordhdr.iov_len; + +out: + return request_iob; +} + +int +rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, + rpcsvc_cbk_program_t *prog, int procnum, + struct iovec *proghdr, int proghdrcount) +{ + struct iobuf *request_iob = NULL; + struct iovec rpchdr = {0,}; + rpc_transport_req_t req; + int ret = -1; + int proglen = 0; + uint64_t callid = 0; + + if (!rpc) { + goto out; + } + + memset (&req, 0, sizeof (req)); + + callid = rpcsvc_get_callid (rpc); + + if (proghdr) { + proglen += iov_length (proghdr, proghdrcount); + } + + request_iob = rpcsvc_callback_build_record (rpc, prog->prognum, + prog->progver, procnum, + proglen, callid, + &rpchdr); + if (!request_iob) { + gf_log ("rpcsvc", GF_LOG_DEBUG, + "cannot build rpc-record"); + goto out; + } + + req.msg.rpchdr = &rpchdr; + req.msg.rpchdrcount = 1; + req.msg.proghdr = proghdr; + req.msg.proghdrcount = proghdrcount; + + ret = rpc_transport_submit_request (trans, &req); + if (ret == -1) { + gf_log ("rpc-clnt", GF_LOG_DEBUG, + "transmission of rpc-request failed"); + goto out; + } + + ret = 0; + +out: + iobuf_unref (request_iob); + + return ret; +} inline int rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 10b20af0a88..fca7d047a7b 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -375,7 +375,11 @@ struct rpcsvc_program { struct list_head program; }; - +typedef struct rpcsvc_cbk_program { + char *progname; + int prognum; + int progver; +} rpcsvc_cbk_program_t; /* All users of RPC services should use this API to register their * procedure handlers. */ @@ -525,4 +529,9 @@ rpcsvc_combine_gen_spec_volume_checks (int gen, int spec); extern char * rpcsvc_volume_allowed (dict_t *options, char *volname); + +int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, + rpcsvc_cbk_program_t *prog, int procnum, + struct iovec *proghdr, int proghdrcount); + #endif diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 6d2d584d111..26b56fad577 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1477,6 +1477,8 @@ __socket_proto_state_machine (rpc_transport_t *this, ret = -1; goto out; } + if (priv->incoming.msg_type == REPLY) + (*pollin)->is_reply = 1; priv->incoming.request_info = NULL; } -- cgit