diff options
| author | Raghavendra G <raghavendra@gluster.com> | 2010-08-19 10:42:33 +0000 | 
|---|---|---|
| committer | Anand V. Avati <avati@dev.gluster.com> | 2010-08-19 09:50:32 -0700 | 
| commit | a5dac1f49eb247d854348fe8ec54c33e664adf30 (patch) | |
| tree | b2973e967fa44feba7b1eabb813f0e51e1e4b988 | |
| parent | 01c00dd2e1d3113acb3f20c5dc7c20fa8d286339 (diff) | |
rpcsvc: decouple creation of listener from rpcsvc_program_register and rpcsvc_init.
- with this patch every program that wants to register itself with rpcsvc should
    also create one or more listener(s) and register with portmap (if necessary).
Signed-off-by: Raghavendra G <raghavendra@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 7 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 262 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 7 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd.c | 77 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server.c | 53 | 
5 files changed, 190 insertions, 216 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index e3bc519fc97..adbc0fe1e2b 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -1119,8 +1119,11 @@ rpc_transport_notify (rpc_transport_t *this, rpc_transport_event_t event,                  goto out;          } -        //ret = this->notify (this, this->notify_data, event, data); -        ret = this->notify (this, this->mydata, event, data); +        if (this->notify != NULL) { +                ret = this->notify (this, this->mydata, event, data); +        } else { +                ret = 0; +        }  out:          return ret;  } diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 2b6ed58f07b..88675205c90 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1360,7 +1360,7 @@ out:  } -int +inline int  rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog)  {          if (!prog) @@ -1376,6 +1376,38 @@ rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog)  } +int32_t +rpcsvc_get_listener_port (rpcsvc_listener_t *listener) +{ +        int32_t listener_port = -1; + +        if ((listener == NULL) || (listener->trans == NULL)) { +                goto out; +        } + +        switch (listener->trans->myinfo.sockaddr.ss_family) { +        case AF_INET: +                listener_port = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port; +                break; + +        case AF_INET6: +                listener_port = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port;  +                break; + +        default: +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, +                        "invalid address family (%d)", +                        listener->trans->myinfo.sockaddr.ss_family); +                goto out; +        } + +        listener_port = ntohs (listener_port); + +out: +        return listener_port; +} + +  rpcsvc_listener_t *  rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans)  { @@ -1398,34 +1430,21 @@ rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans)                                  continue;                          } -                                 -                        switch (listener->trans->myinfo.sockaddr.ss_family) { -                        case AF_INET: -                                listener_port -                                        = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port; -                                break; - -                        case AF_INET6: -                                listener_port -                                        = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port;  -                                break; -                        default: +                        listener_port = rpcsvc_get_listener_port (listener); +                        if (listener_port == -1) {                                  gf_log (GF_RPCSVC, GF_LOG_DEBUG, -                                        "invalid address family (%d)", -                                        listener->trans->myinfo.sockaddr.ss_family); -                                goto unlock; +                                        "invalid port for listener %s", +                                        listener->trans->name); +                                continue;                          } -                        listener_port = ntohs (listener_port); -                          if (listener_port == port) {                                  found = 1;                                  break;                          }                  }          } -unlock:          pthread_mutex_unlock (&svc->rpclock);          if (!found) { @@ -1457,31 +1476,38 @@ rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr,  int -rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t prog) +rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *prog)  {          int                     ret = -1; -        if (!svc) -                return -1; +        if (!svc || !prog) { +                goto out; +        } -        /* TODO: De-init the listening connection for this program. */ -        ret = rpcsvc_program_unregister_portmap (&prog); +        ret = rpcsvc_program_unregister_portmap (prog);          if (ret == -1) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap unregistration of"                          " program failed"); -                goto err; +                goto out;          } -        ret = 0;          gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Program unregistered: %s, Num: %d," -                " Ver: %d, Port: %d", prog.progname, prog.prognum, -                prog.progver, prog.progport); +                " Ver: %d, Port: %d", prog->progname, prog->prognum, +                prog->progver, prog->progport); -err: -        if (ret == -1) +        pthread_mutex_lock (&svc->rpclock); +        { +                list_del (&prog->program); +        } +        pthread_mutex_unlock (&svc->rpclock); + +        ret = 0; +out: +        if (ret == -1) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program unregistration failed" -                        ": %s, Num: %d, Ver: %d, Port: %d", prog.progname, -                        prog.prognum, prog.progver, prog.progport); +                        ": %s, Num: %d, Ver: %d, Port: %d", prog->progname, +                        prog->prognum, prog->progver, prog->progport); +        }          return ret;  } @@ -1524,16 +1550,16 @@ rpcsvc_transport_create (rpcsvc_t *svc, dict_t *options, char *name)                  goto out;          } -        ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc); +        ret = rpc_transport_listen (trans);          if (ret == -1) { -                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed"); +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, +                        "listening on transport failed");                  goto out;          } -        ret = rpc_transport_listen (trans); +        ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc);          if (ret == -1) { -                gf_log (GF_RPCSVC, GF_LOG_DEBUG, -                        "listening on transport failed"); +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed");                  goto out;          } @@ -1657,127 +1683,37 @@ out:  } -int -rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program) +inline int +rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)  { -        rpcsvc_program_t        *newprog          = NULL;          int                      ret              = -1; -        rpcsvc_listener_t       *listener         = NULL; -        data_t                  *listen_port_data = NULL; -        uint16_t                 listen_port      = 0; - -        if (!svc) -                return -1; - -        newprog = GF_CALLOC (1, sizeof(*newprog), -                             gf_common_mt_rpcsvc_program_t); -        if (!newprog) -                return -1; - -        if (!program.actors) -                goto free_prog; - -        memcpy (newprog, &program, sizeof (program)); - -        listen_port = RPCSVC_DEFAULT_LISTEN_PORT; -        if (program.options != NULL) { -                /* -                 * FIXME: use a method which does not hard-code each transport's -                 * option keys. -                 */ -                listen_port_data = dict_get (program.options, "listen-port"); -                if (listen_port_data != NULL) { -                        listen_port = data_to_uint16 (listen_port_data); -                } else if ((listen_port_data -                            = dict_get (program.options, -                                        "transport.socket.listen-port")) -                           != NULL) { -                        listen_port = data_to_uint16 (listen_port_data); -                } else if ((listen_port_data -                            = dict_get (program.options, -                                        "transport.rdma.listen-port")) -                           != NULL) { -                        listen_port = data_to_uint16 (listen_port_data); -                } -        } else if (program.progport != 0) { -                listen_port = program.progport; -        } -        listener = rpcsvc_get_listener (svc, listen_port, NULL); -        if ((listener == NULL) && (listen_port == RPCSVC_DEFAULT_LISTEN_PORT)) { -                listener = rpcsvc_get_listener (svc, 6969, NULL); +        if (!svc) { +                goto out;          } -        if ((listener == NULL) || (listener->trans == NULL)) { -                if ((listener != NULL) && (listener->trans == NULL)) { -                        gf_log (GF_RPCSVC, GF_LOG_DEBUG, -                                "empty listener without transport found, " -                                "destroying it"); -                        rpcsvc_listener_destroy (listener); -                } - -                if ((program.progport != 0) -                    && (listen_port == program.progport)) { -                        ret = dict_set (program.options, -                                        "transport.socket.listen-port", -                                        data_from_uint16 (listen_port)); -                        if (ret == -1) { -                                gf_log (GF_RPCSVC, GF_LOG_DEBUG, -                                        "setting listening port (%d) specified " -                                        "by program (%s) in options dictionary " -                                        "failed", listen_port, -                                        program.progname); -                                goto free_prog; -                        } - -                        ret = dict_set (program.options, -                                        "transport.rdma.listen-port", -                                        data_from_uint16 (listen_port)); -                        if (ret == -1) { -                                gf_log (GF_RPCSVC, GF_LOG_DEBUG, -                                        "setting listening port (%d) specified " -                                        "by program (%s) in options dictionary " -                                        "failed", listen_port, -                                        program.progname); -                                goto free_prog; -                        } -                } - -                listener = rpcsvc_create_listener (svc, program.options, -                                                   program.progname); -                if (listener == NULL) { -                        ret = -1; -                        gf_log (GF_RPCSVC, GF_LOG_DEBUG, -                                "creation of listener for program (%s) failed", -                                program.progname); -                        goto free_prog; -                } +        if (program->actors == NULL) { +                goto out;          } -        ret = rpcsvc_program_register_portmap (newprog, program.progport); -        if (ret == -1) { -                gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap registration of" -                        " program failed"); -                goto free_prog; -        } +        INIT_LIST_HEAD (&program->program);          pthread_mutex_lock (&svc->rpclock);          { -                list_add_tail (&newprog->program, &svc->programs); +                list_add_tail (&program->program, &svc->programs);          }          pthread_mutex_unlock (&svc->rpclock);          ret = 0;          gf_log (GF_RPCSVC, GF_LOG_DEBUG, "New program registered: %s, Num: %d," -                " Ver: %d, Port: %d", newprog->progname, newprog->prognum, -                newprog->progver, newprog->progport); +                " Ver: %d, Port: %d", program->progname, program->prognum, +                program->progver, program->progport); -free_prog: +out:          if (ret == -1) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program registration failed:" -                        " %s, Num: %d, Ver: %d, Port: %d", newprog->progname, -                        newprog->prognum, newprog->progver, newprog->progport); -                GF_FREE (newprog); +                        " %s, Num: %d, Ver: %d, Port: %d", program->progname, +                        program->prognum, program->progver, program->progport);          }          return ret; @@ -1884,9 +1820,6 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)  {          rpcsvc_t          *svc              = NULL;          int                ret              = -1, poolcount = 0; -        rpcsvc_listener_t *listener         = NULL; -        uint32_t           listen_port      = 0; -        data_t            *listen_port_data = NULL;          if ((!ctx) || (!options))                  return NULL; @@ -1929,50 +1862,9 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)          svc->ctx = ctx;          gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited."); -        listen_port = RPCSVC_DEFAULT_LISTEN_PORT; - -        /* -         * FIXME: use a method which does not hard-code each transport's -         * option keys. -         */ -        listen_port_data = dict_get (options, "listen-port"); -        if (listen_port_data != NULL) { -                listen_port = data_to_uint16 (listen_port_data); -        } else if ((listen_port_data -                    = dict_get (options, "transport.socket.listen-port")) -                   != NULL) { -                listen_port = data_to_uint16 (listen_port_data); -        } else if ((listen_port_data -                    = dict_get (options, "transport.rdma.listen-port")) -                   != NULL) { -                listen_port = data_to_uint16 (listen_port_data); -        } - -        /* One listen port per RPC */ -        listener = rpcsvc_get_listener (svc, 0, NULL); -        if (!listener) { -                /* FIXME: listener is given the name of first program that -                 * creates it. This is not always correct. For eg., multiple -                 * programs can be listening on the same listener -                 * (glusterfs 3.1.0, 3.1.2, 3.1.3 etc). -                 */ -                listener = rpcsvc_create_listener (svc, options, "RPC"); -                if (!listener) { -                        gf_log (GF_RPCSVC, GF_LOG_ERROR, "creation of listener" -                                " for program failed"); -                        goto free_svc; -                } -        } - -        if (!listener->trans) { -                gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no transport " -                        "found"); -                goto free_svc; -        } -          gluster_dump_prog.options = options; -        ret = rpcsvc_program_register (svc, gluster_dump_prog); +        ret = rpcsvc_program_register (svc, &gluster_dump_prog);          if (ret) {                  gf_log (GF_RPCSVC, GF_LOG_ERROR,                          "failed to register DUMP program"); diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 25381af7798..0a3e91da7b2 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -379,10 +379,10 @@ struct rpcsvc_program {   * procedure handlers.   */  extern int -rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program); +rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program);  extern int -rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t program); +rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *program);  /* This will create and add a listener to listener pool. Programs can   * use any of the listener in this pool. A single listener can be used by @@ -395,6 +395,9 @@ rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t program);  extern rpcsvc_listener_t *  rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name); +void +rpcsvc_listener_destroy (rpcsvc_listener_t *listener); +  extern int  rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port); diff --git a/xlators/mgmt/glusterd/src/glusterd.c b/xlators/mgmt/glusterd/src/glusterd.c index a9f790bd3a1..33c9ac796cd 100644 --- a/xlators/mgmt/glusterd/src/glusterd.c +++ b/xlators/mgmt/glusterd/src/glusterd.c @@ -161,6 +161,26 @@ out:          return 0;  } + +inline int32_t +glusterd_program_register (xlator_t *this, rpcsvc_t *svc, +                           rpcsvc_program_t *prog) +{ +        int32_t ret = -1; + +        ret = rpcsvc_program_register (svc, prog); +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "cannot register program (name: %s, prognum:%d, " +                        "progver:%d)", prog->progname, prog->prognum, +                        prog->progver); +                goto out; +        } + +out: +        return ret; +} +  /*   * init - called during glusterd initialization   * @@ -170,15 +190,16 @@ out:  int  init (xlator_t *this)  { -        int32_t         ret = -1; -        rpcsvc_t        *rpc = NULL; -        glusterd_conf_t *conf = NULL; -        data_t          *dir_data = NULL; -        char            dirname [PATH_MAX]; -        struct stat     buf = {0,}; -        char            *port_str = NULL; -        int             port_num = 0; -        char            voldir [PATH_MAX] = {0,}; +        int32_t            ret               = -1; +        rpcsvc_t          *rpc               = NULL; +        glusterd_conf_t   *conf              = NULL; +        data_t            *dir_data          = NULL; +        struct stat        buf               = {0,}; +        char              *port_str          = NULL; +        int                port_num          = 0; +        char               voldir [PATH_MAX] = {0,}; +        rpcsvc_listener_t *listener          = NULL; +        char               dirname [PATH_MAX];          dir_data = dict_get (this->options, "working-directory"); @@ -260,24 +281,35 @@ init (xlator_t *this)                  glusterd1_mop_prog.progport = port_num;          } -        ret = rpcsvc_program_register (rpc, glusterd1_mop_prog); -        if (ret) { +        /* +         * only one listener for glusterd1_mop_prog, gluster_pmap_prog and +         * gluster_handshake_prog. +         */ +        listener = rpcsvc_create_listener (rpc, this->options, this->name); +        if (listener == NULL) {                  gf_log (this->name, GF_LOG_ERROR, -                        "rpcsvc_program_register returned %d", ret); +                        "creation of listener failed");                  goto out;          } -        ret = rpcsvc_program_register (rpc, gluster_pmap_prog); +        ret = glusterd_program_register (this, rpc, &glusterd1_mop_prog);          if (ret) { -                gf_log (this->name, GF_LOG_ERROR, -                        "rpcsvc_program_register returned %d", ret); +                goto out; +        } + +        ret = glusterd_program_register (this, rpc, &gluster_pmap_prog); +        if (ret) { +                rpcsvc_program_unregister (rpc, &glusterd1_mop_prog);                  goto out;          }          gluster_handshake_prog.options = this->options; -        ret = rpcsvc_program_register (rpc, gluster_handshake_prog); -        if (ret) +        ret = glusterd_program_register (this, rpc, &gluster_handshake_prog); +        if (ret) { +                rpcsvc_program_unregister (rpc, &glusterd1_mop_prog); +                rpcsvc_program_unregister (rpc, &gluster_handshake_prog);                  goto out; +        }          conf = GF_CALLOC (1, sizeof (glusterd_conf_t),                            gf_gld_mt_glusterd_conf_t); @@ -309,6 +341,17 @@ init (xlator_t *this)          ret = 0;  out: +        if (ret == -1) { +                if (listener != NULL) { +                        rpcsvc_listener_destroy (listener); +                } + +                if (this->private != NULL) { +                        GF_FREE (this->private); +                        this->private = NULL; +                } +        } +          return ret;  } diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index 96bb72901a7..7ab3de51c18 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -426,8 +426,9 @@ mem_acct_init (xlator_t *this)  int  init (xlator_t *this)  { -        int32_t        ret = -1; -        server_conf_t *conf = NULL; +        int32_t            ret      = -1; +        server_conf_t     *conf     = NULL; +        rpcsvc_listener_t *listener = NULL;          if (!this)                  goto out; @@ -476,24 +477,49 @@ init (xlator_t *this)          /* RPC related */          //conf->rpc = rpc_svc_init (&conf->rpc_conf);          conf->rpc = rpcsvc_init (this->ctx, this->options); -        if (!conf->rpc) { +        if (conf->rpc == NULL) { +                ret = -1; +                goto out; +        } + +        listener = rpcsvc_create_listener (conf->rpc, this->options, +                                           this->name); +        if (listener == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "creation of listener failed");                  ret = -1;                  goto out;          }          ret = rpcsvc_register_notify (conf->rpc, server_rpc_notify, this); -        if (ret) +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "registration of notify with rpcsvc failed");                  goto out; +        }          glusterfs3_1_fop_prog.options = this->options; -        ret = rpcsvc_program_register (conf->rpc, glusterfs3_1_fop_prog); -        if (ret) +        ret = rpcsvc_program_register (conf->rpc, &glusterfs3_1_fop_prog); +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "registration of program (name:%s, prognum:%d, " +                        "progver:%d) failed", glusterfs3_1_fop_prog.progname, +                        glusterfs3_1_fop_prog.prognum, +                        glusterfs3_1_fop_prog.progver);                  goto out; +        }          gluster_handshake_prog.options = this->options; -        ret = rpcsvc_program_register (conf->rpc, gluster_handshake_prog); -        if (ret) +        ret = rpcsvc_program_register (conf->rpc, &gluster_handshake_prog); +        if (ret) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "registration of program (name:%s, prognum:%d, " +                        "progver:%d) failed", gluster_handshake_prog.progname, +                        gluster_handshake_prog.prognum, +                        gluster_handshake_prog.progver); +                rpcsvc_program_unregister (conf->rpc, &glusterfs3_1_fop_prog);                  goto out; +        }  #ifndef GF_DARWIN_HOST_OS          { @@ -523,8 +549,15 @@ init (xlator_t *this)          ret = 0;  out: -        if (ret && this) -                this->fini (this); +        if (ret) { +                if (this != NULL) { +                        this->fini (this); +                } + +                if (listener != NULL) { +                        rpcsvc_listener_destroy (listener); +                } +        }          return ret;  }  | 
