diff options
Diffstat (limited to 'rpc')
| -rw-r--r-- | rpc/rpc-lib/src/protocol-common.h | 18 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 82 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 43 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 3 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 185 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 11 | ||||
| -rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 2 | 
7 files changed, 337 insertions, 7 deletions
diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h index 4df5a554fec..fdb42dfe663 100644 --- a/rpc/rpc-lib/src/protocol-common.h +++ b/rpc/rpc-lib/src/protocol-common.h @@ -140,6 +140,13 @@ enum gf_probe_resp {  	GF_PROBE_FRIEND,  }; +enum gf_cbk_procnum { +        GF_CBK_NULL = 0, +        GF_CBK_FETCHSPEC, +        GF_CBK_INO_FLUSH, +        GF_CBK_MAXVALUE, +}; +  #define GLUSTER3_1_FOP_PROGRAM   1298437 /* Completely random */  #define GLUSTER3_1_FOP_VERSION   310 /* 3.1.0 */  #define GLUSTER3_1_FOP_PROCCNT   GFS3_OP_MAXVALUE @@ -152,10 +159,13 @@ enum gf_probe_resp {  #define GLUSTER3_1_CLI_VERSION   1   /* 0.0.1 */  #define GLUSTER3_1_CLI_PROCCNT   GF1_CLI_MAXVALUE -#define GLUSTER_HNDSK_PROGRAM   14398633 /* Completely random */ -#define GLUSTER_HNDSK_VERSION   1   /* 0.0.1 */ +#define GLUSTER_HNDSK_PROGRAM    14398633 /* Completely random */ +#define GLUSTER_HNDSK_VERSION    1   /* 0.0.1 */ + +#define GLUSTER_PMAP_PROGRAM     34123456 +#define GLUSTER_PMAP_VERSION     1 -#define GLUSTER_PMAP_PROGRAM    34123456 -#define GLUSTER_PMAP_VERSION    1 +#define GLUSTER_CBK_PROGRAM      52743234 /* Completely random */ +#define GLUSTER_CBK_VERSION      1   /* 0.0.1 */  #endif /* !_PROTOCOL_COMMON_H */ diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index eac9f875066..8d923ed5f43 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -31,6 +31,7 @@  #include "rpc-transport.h"  #include "protocol-common.h"  #include "mem-pool.h" +#include "xdr-rpc.h"  void  rpc_clnt_reply_deinit (struct rpc_req *req, struct mem_pool *pool); @@ -653,6 +654,49 @@ out:          return ret;  } +int +rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg) +{ +        char                 *msgbuf = NULL; +        rpcclnt_cb_program_t *program = NULL; +        struct rpc_msg        rpcmsg; +        struct iovec          progmsg; /* RPC Program payload */ +        size_t                msglen = 0; +        int                   found  = 0; +        int                   ret    = -1; +        int                   procnum = 0; + +        msgbuf = msg->vector[0].iov_base; +        msglen = msg->vector[0].iov_len; + +        ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL); +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC call decoding failed"); +                goto out; +        } + +        gf_log ("rpc-clnt", GF_LOG_INFO, "RPC XID: %lx, Ver: %ld, Program: %ld," +                " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg), +                rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), +                rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg)); + +        procnum = rpc_call_progproc (&rpcmsg); + +        list_for_each_entry (program, &clnt->programs, program) { +                if ((program->prognum == rpc_call_program (&rpcmsg)) +                    && (program->progver == rpc_call_progver (&rpcmsg))) { +                        found = 1; +                        break; +                } +        } +        if (found && (procnum < program->numactors) && +            (program->actors[procnum].actor)) { +                program->actors[procnum].actor (&progmsg); +        } + +out: +        return ret; +}  int  rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) @@ -796,7 +840,10 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,          case RPC_TRANSPORT_MSG_RECEIVED:          {                  pollin = data; -                ret = rpc_clnt_handle_reply (clnt, pollin); +                if (pollin->is_reply) +                        ret = rpc_clnt_handle_reply (clnt, pollin); +                else +                        ret = rpc_clnt_handle_cbk (clnt, pollin);                  /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG,                   * data);                   */ @@ -943,6 +990,8 @@ rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,                  goto out;          } +        INIT_LIST_HEAD (&rpc->programs); +  out:          return rpc;  } @@ -1168,6 +1217,37 @@ out:          return request_iob;  } +int +rpcclnt_cbk_program_register (struct rpc_clnt *clnt, +                              rpcclnt_cb_program_t *program) +{ +        int ret = -1; + +        if (!clnt) +                goto out; + +        if (program->actors == NULL) +                goto out; + +        INIT_LIST_HEAD (&program->program); + +        list_add_tail (&program->program, &clnt->programs); + +        ret = 0; +        gf_log ("rpc-clnt", GF_LOG_DEBUG, "New program registered: %s, Num: %d," +                " Ver: %d", program->progname, program->prognum, +                program->progver); + +out: +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "Program registration failed:" +                        " %s, Num: %d, Ver: %d", program->progname, +                        program->prognum, program->progver); +        } + +        return ret; +} +  int  rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index 2381aaa087c..a0251c7c551 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -83,6 +83,38 @@ typedef struct rpc_clnt_program {          int                   numproc;  } rpc_clnt_prog_t; +typedef int (*rpcclnt_cb_fn) (void *data); + +/* The descriptor for each procedure/actor that runs + * over the RPC service. + */ +typedef struct rpcclnt_actor_desc { +        char                 procname[32]; +        int                  procnum; +        rpcclnt_cb_fn        actor; +} rpcclnt_cb_actor_t; + +/* Describes a program and its version along with the function pointers + * required to handle the procedures/actors of each program/version. + * Never changed ever by any thread so no need for a lock. + */ +typedef struct rpcclnt_cb_program { +        char                      progname[32]; +        int                       prognum; +        int                       progver; +        rpcclnt_cb_actor_t       *actors;        /* All procedure handlers */ +        int                       numactors;     /* Num actors in actor array */ + +        /* Program specific state handed to actors */ +        void                    *private; + + +        /* list member to link to list of registered services with rpc_clnt */ +        struct list_head        program; +} rpcclnt_cb_program_t; + + +  #define RPC_MAX_AUTH_BYTES   400  typedef struct rpc_auth_data {          int             flavour; @@ -141,6 +173,9 @@ struct rpc_clnt {          void                  *mydata;          uint64_t               xid; +        /* list of cb programs registered with rpc-clnt */ +        struct list_head       programs; +          /* Memory pool for rpc_req_t */          struct mem_pool       *reqpool; @@ -171,7 +206,7 @@ int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn,   *    also be filled with pointer to buffer to hold header and length   *    of the header.   */ -  +  int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                       int procnum, fop_cbk_fn_t cbkfn,                       struct iovec *proghdr, int proghdrcount, @@ -190,4 +225,10 @@ void rpc_clnt_reconnect (void *trans_ptr);  void rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config); +/* All users of RPC services should use this API to register their + * procedure handlers. + */ +int rpcclnt_cbk_program_register (struct rpc_clnt *svc, +                                  rpcclnt_cb_program_t *program); +  #endif /* !_RPC_CLNT_H */ diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index c5b6f382e7b..cccae5f261d 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -166,6 +166,7 @@ struct rpc_transport_pollin {          char vectored;          void *private;          struct iobref *iobref; +        char is_reply;  };  typedef struct rpc_transport_pollin rpc_transport_pollin_t; @@ -196,6 +197,8 @@ struct rpc_transport {          void                      *notify_data;  	peer_info_t                peerinfo;  	peer_info_t                myinfo; + +        struct list_head           list;  };  struct rpc_transport_ops { diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 5b5c2998c5e..ee3d674c2eb 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -46,6 +46,8 @@  #include <stdarg.h>  #include <stdio.h> +#include "xdr-rpcclnt.h" +  struct rpcsvc_program gluster_dump_prog;  #define rpcsvc_alloc_request(svc, request)                              \ @@ -1119,6 +1121,189 @@ err:          return txrecord;  } +inline int +rpcsvc_get_callid (rpcsvc_t *rpc) +{ +        return GF_UNIVERSAL_ANSWER; +} + +int +rpcsvc_fill_callback (int prognum, int progver, int procnum, int payload, +                      uint64_t xid, struct rpc_msg *request) +{ +        int   ret          = -1; + +        if (!request) { +                goto out; +        } + +        memset (request, 0, sizeof (*request)); + +        request->rm_xid = xid; +        request->rm_direction = CALL; + +        request->rm_call.cb_rpcvers = 2; +        request->rm_call.cb_prog = prognum; +        request->rm_call.cb_vers = progver; +        request->rm_call.cb_proc = procnum; + +        request->rm_call.cb_cred.oa_flavor = AUTH_NONE; +        request->rm_call.cb_cred.oa_base   = NULL; +        request->rm_call.cb_cred.oa_length = 0; + +        request->rm_call.cb_verf.oa_flavor = AUTH_NONE; +        request->rm_call.cb_verf.oa_base = NULL; +        request->rm_call.cb_verf.oa_length = 0; + +        ret = 0; +out: +        return ret; +} + + +struct iovec +rpcsvc_callback_build_header (char *recordstart, size_t rlen, +                             struct rpc_msg *request, size_t payload) +{ +        struct iovec    requesthdr = {0, }; +        struct iovec    txrecord   = {0, 0}; +        int             ret        = -1; +        size_t          fraglen    = 0; + +        ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr); +        if (ret == -1) { +                gf_log ("rpcsvc", GF_LOG_DEBUG, +                        "Failed to create RPC request"); +                goto out; +        } + +        fraglen = payload + requesthdr.iov_len; +        gf_log ("rpcsvc", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, " +                "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len); + +        txrecord.iov_base = recordstart; + +        /* Remember, this is only the vec for the RPC header and does not +         * include the payload above. We needed the payload only to calculate +         * the size of the full fragment. This size is sent in the fragment +         * header. +         */ +        txrecord.iov_len = requesthdr.iov_len; + +out: +        return txrecord; +} + +struct iobuf * +rpcsvc_callback_build_record (rpcsvc_t *rpc, int prognum, int progver, +                              int procnum, size_t payload, uint64_t xid, +                              struct iovec *recbuf) +{ +        struct rpc_msg           request     = {0, }; +        struct iobuf            *request_iob = NULL; +        char                    *record      = NULL; +        struct iovec             recordhdr   = {0, }; +        size_t                   pagesize    = 0; +        int                      ret         = -1; + +        if ((!rpc) || (!recbuf)) { +                goto out; +        } + +        /* First, try to get a pointer into the buffer which the RPC +         * layer can use. +         */ +        request_iob = iobuf_get (rpc->ctx->iobuf_pool); +        if (!request_iob) { +                gf_log ("rpcsvc", GF_LOG_ERROR, "Failed to get iobuf"); +                goto out; +        } + +        pagesize = ((struct iobuf_pool *)rpc->ctx->iobuf_pool)->page_size; + +        record = iobuf_ptr (request_iob);  /* Now we have it. */ + +        /* Fill the rpc structure and XDR it into the buffer got above. */ +        ret = rpcsvc_fill_callback (prognum, progver, procnum, payload, xid, +                                    &request); +        if (ret == -1) { +                gf_log ("rpcsvc", GF_LOG_DEBUG, "cannot build a rpc-request " +                        "xid (%"PRIu64")", xid); +                goto out; +        } + +        recordhdr = rpcsvc_callback_build_header (record, pagesize, &request, +                                                  payload); + +        if (!recordhdr.iov_base) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record " +                        " header"); +                iobuf_unref (request_iob); +                request_iob = NULL; +                recbuf->iov_base = NULL; +                goto out; +        } + +        recbuf->iov_base = recordhdr.iov_base; +        recbuf->iov_len = recordhdr.iov_len; + +out: +        return request_iob; +} + +int +rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, +                        rpcsvc_cbk_program_t *prog, int procnum, +                        struct iovec *proghdr, int proghdrcount) +{ +        struct iobuf          *request_iob = NULL; +        struct iovec           rpchdr      = {0,}; +        rpc_transport_req_t    req; +        int                    ret         = -1; +        int                    proglen     = 0; +        uint64_t               callid      = 0; + +        if (!rpc) { +                goto out; +        } + +        memset (&req, 0, sizeof (req)); + +        callid = rpcsvc_get_callid (rpc); + +        if (proghdr) { +                proglen += iov_length (proghdr, proghdrcount); +        } + +        request_iob = rpcsvc_callback_build_record (rpc, prog->prognum, +                                                    prog->progver, procnum, +                                                    proglen, callid, +                                                    &rpchdr); +        if (!request_iob) { +                gf_log ("rpcsvc", GF_LOG_DEBUG, +                        "cannot build rpc-record"); +                goto out; +        } + +        req.msg.rpchdr = &rpchdr; +        req.msg.rpchdrcount = 1; +        req.msg.proghdr = proghdr; +        req.msg.proghdrcount = proghdrcount; + +        ret = rpc_transport_submit_request (trans, &req); +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_DEBUG, +                        "transmission of rpc-request failed"); +                goto out; +        } + +        ret = 0; + +out: +        iobuf_unref (request_iob); + +        return ret; +}  inline int  rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *hdrvec, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 10b20af0a88..fca7d047a7b 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -375,7 +375,11 @@ struct rpcsvc_program {          struct list_head        program;  }; - +typedef struct rpcsvc_cbk_program { +        char                 *progname; +        int                   prognum; +        int                   progver; +} rpcsvc_cbk_program_t;  /* All users of RPC services should use this API to register their   * procedure handlers.   */ @@ -525,4 +529,9 @@ rpcsvc_combine_gen_spec_volume_checks (int gen, int spec);  extern char *  rpcsvc_volume_allowed (dict_t *options, char *volname); + +int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, +                            rpcsvc_cbk_program_t *prog, int procnum, +                            struct iovec *proghdr, int proghdrcount); +  #endif diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 6d2d584d111..26b56fad577 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -1477,6 +1477,8 @@ __socket_proto_state_machine (rpc_transport_t *this,                                          ret = -1;                                          goto out;                                  } +                                if (priv->incoming.msg_type == REPLY) +                                        (*pollin)->is_reply = 1;                                  priv->incoming.request_info = NULL;                          }  | 
