diff options
author | Raghavendra G <raghavendra@gluster.com> | 2010-08-19 10:42:33 +0000 |
---|---|---|
committer | Anand V. Avati <avati@dev.gluster.com> | 2010-08-19 09:50:32 -0700 |
commit | a5dac1f49eb247d854348fe8ec54c33e664adf30 (patch) | |
tree | b2973e967fa44feba7b1eabb813f0e51e1e4b988 | |
parent | 01c00dd2e1d3113acb3f20c5dc7c20fa8d286339 (diff) |
rpcsvc: decouple creation of listener from rpcsvc_program_register and rpcsvc_init.
- with this patch every program that wants to register itself with rpcsvc should
also create one or more listener(s) and register with portmap (if necessary).
Signed-off-by: Raghavendra G <raghavendra@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 7 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 262 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 7 | ||||
-rw-r--r-- | xlators/mgmt/glusterd/src/glusterd.c | 77 | ||||
-rw-r--r-- | xlators/protocol/server/src/server.c | 53 |
5 files changed, 190 insertions, 216 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index e3bc519fc97..adbc0fe1e2b 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -1119,8 +1119,11 @@ rpc_transport_notify (rpc_transport_t *this, rpc_transport_event_t event, goto out; } - //ret = this->notify (this, this->notify_data, event, data); - ret = this->notify (this, this->mydata, event, data); + if (this->notify != NULL) { + ret = this->notify (this, this->mydata, event, data); + } else { + ret = 0; + } out: return ret; } diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 2b6ed58f07b..88675205c90 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1360,7 +1360,7 @@ out: } -int +inline int rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog) { if (!prog) @@ -1376,6 +1376,38 @@ rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog) } +int32_t +rpcsvc_get_listener_port (rpcsvc_listener_t *listener) +{ + int32_t listener_port = -1; + + if ((listener == NULL) || (listener->trans == NULL)) { + goto out; + } + + switch (listener->trans->myinfo.sockaddr.ss_family) { + case AF_INET: + listener_port = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port; + break; + + case AF_INET6: + listener_port = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port; + break; + + default: + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "invalid address family (%d)", + listener->trans->myinfo.sockaddr.ss_family); + goto out; + } + + listener_port = ntohs (listener_port); + +out: + return listener_port; +} + + rpcsvc_listener_t * rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans) { @@ -1398,34 +1430,21 @@ rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans) continue; } - - switch (listener->trans->myinfo.sockaddr.ss_family) { - case AF_INET: - listener_port - = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port; - break; - - case AF_INET6: - listener_port - = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port; - break; - default: + listener_port = rpcsvc_get_listener_port (listener); + if (listener_port == -1) { gf_log (GF_RPCSVC, GF_LOG_DEBUG, - "invalid address family (%d)", - listener->trans->myinfo.sockaddr.ss_family); - goto unlock; + "invalid port for listener %s", + listener->trans->name); + continue; } - listener_port = ntohs (listener_port); - if (listener_port == port) { found = 1; break; } } } -unlock: pthread_mutex_unlock (&svc->rpclock); if (!found) { @@ -1457,31 +1476,38 @@ rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr, int -rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t prog) +rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *prog) { int ret = -1; - if (!svc) - return -1; + if (!svc || !prog) { + goto out; + } - /* TODO: De-init the listening connection for this program. */ - ret = rpcsvc_program_unregister_portmap (&prog); + ret = rpcsvc_program_unregister_portmap (prog); if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap unregistration of" " program failed"); - goto err; + goto out; } - ret = 0; gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Program unregistered: %s, Num: %d," - " Ver: %d, Port: %d", prog.progname, prog.prognum, - prog.progver, prog.progport); + " Ver: %d, Port: %d", prog->progname, prog->prognum, + prog->progver, prog->progport); -err: - if (ret == -1) + pthread_mutex_lock (&svc->rpclock); + { + list_del (&prog->program); + } + pthread_mutex_unlock (&svc->rpclock); + + ret = 0; +out: + if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program unregistration failed" - ": %s, Num: %d, Ver: %d, Port: %d", prog.progname, - prog.prognum, prog.progver, prog.progport); + ": %s, Num: %d, Ver: %d, Port: %d", prog->progname, + prog->prognum, prog->progver, prog->progport); + } return ret; } @@ -1524,16 +1550,16 @@ rpcsvc_transport_create (rpcsvc_t *svc, dict_t *options, char *name) goto out; } - ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc); + ret = rpc_transport_listen (trans); if (ret == -1) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed"); + gf_log (GF_RPCSVC, GF_LOG_DEBUG, + "listening on transport failed"); goto out; } - ret = rpc_transport_listen (trans); + ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc); if (ret == -1) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, - "listening on transport failed"); + gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed"); goto out; } @@ -1657,127 +1683,37 @@ out: } -int -rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program) +inline int +rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program) { - rpcsvc_program_t *newprog = NULL; int ret = -1; - rpcsvc_listener_t *listener = NULL; - data_t *listen_port_data = NULL; - uint16_t listen_port = 0; - - if (!svc) - return -1; - - newprog = GF_CALLOC (1, sizeof(*newprog), - gf_common_mt_rpcsvc_program_t); - if (!newprog) - return -1; - - if (!program.actors) - goto free_prog; - - memcpy (newprog, &program, sizeof (program)); - - listen_port = RPCSVC_DEFAULT_LISTEN_PORT; - if (program.options != NULL) { - /* - * FIXME: use a method which does not hard-code each transport's - * option keys. - */ - listen_port_data = dict_get (program.options, "listen-port"); - if (listen_port_data != NULL) { - listen_port = data_to_uint16 (listen_port_data); - } else if ((listen_port_data - = dict_get (program.options, - "transport.socket.listen-port")) - != NULL) { - listen_port = data_to_uint16 (listen_port_data); - } else if ((listen_port_data - = dict_get (program.options, - "transport.rdma.listen-port")) - != NULL) { - listen_port = data_to_uint16 (listen_port_data); - } - } else if (program.progport != 0) { - listen_port = program.progport; - } - listener = rpcsvc_get_listener (svc, listen_port, NULL); - if ((listener == NULL) && (listen_port == RPCSVC_DEFAULT_LISTEN_PORT)) { - listener = rpcsvc_get_listener (svc, 6969, NULL); + if (!svc) { + goto out; } - if ((listener == NULL) || (listener->trans == NULL)) { - if ((listener != NULL) && (listener->trans == NULL)) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, - "empty listener without transport found, " - "destroying it"); - rpcsvc_listener_destroy (listener); - } - - if ((program.progport != 0) - && (listen_port == program.progport)) { - ret = dict_set (program.options, - "transport.socket.listen-port", - data_from_uint16 (listen_port)); - if (ret == -1) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, - "setting listening port (%d) specified " - "by program (%s) in options dictionary " - "failed", listen_port, - program.progname); - goto free_prog; - } - - ret = dict_set (program.options, - "transport.rdma.listen-port", - data_from_uint16 (listen_port)); - if (ret == -1) { - gf_log (GF_RPCSVC, GF_LOG_DEBUG, - "setting listening port (%d) specified " - "by program (%s) in options dictionary " - "failed", listen_port, - program.progname); - goto free_prog; - } - } - - listener = rpcsvc_create_listener (svc, program.options, - program.progname); - if (listener == NULL) { - ret = -1; - gf_log (GF_RPCSVC, GF_LOG_DEBUG, - "creation of listener for program (%s) failed", - program.progname); - goto free_prog; - } + if (program->actors == NULL) { + goto out; } - ret = rpcsvc_program_register_portmap (newprog, program.progport); - if (ret == -1) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap registration of" - " program failed"); - goto free_prog; - } + INIT_LIST_HEAD (&program->program); pthread_mutex_lock (&svc->rpclock); { - list_add_tail (&newprog->program, &svc->programs); + list_add_tail (&program->program, &svc->programs); } pthread_mutex_unlock (&svc->rpclock); ret = 0; gf_log (GF_RPCSVC, GF_LOG_DEBUG, "New program registered: %s, Num: %d," - " Ver: %d, Port: %d", newprog->progname, newprog->prognum, - newprog->progver, newprog->progport); + " Ver: %d, Port: %d", program->progname, program->prognum, + program->progver, program->progport); -free_prog: +out: if (ret == -1) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program registration failed:" - " %s, Num: %d, Ver: %d, Port: %d", newprog->progname, - newprog->prognum, newprog->progver, newprog->progport); - GF_FREE (newprog); + " %s, Num: %d, Ver: %d, Port: %d", program->progname, + program->prognum, program->progver, program->progport); } return ret; @@ -1884,9 +1820,6 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) { rpcsvc_t *svc = NULL; int ret = -1, poolcount = 0; - rpcsvc_listener_t *listener = NULL; - uint32_t listen_port = 0; - data_t *listen_port_data = NULL; if ((!ctx) || (!options)) return NULL; @@ -1929,50 +1862,9 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) svc->ctx = ctx; gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited."); - listen_port = RPCSVC_DEFAULT_LISTEN_PORT; - - /* - * FIXME: use a method which does not hard-code each transport's - * option keys. - */ - listen_port_data = dict_get (options, "listen-port"); - if (listen_port_data != NULL) { - listen_port = data_to_uint16 (listen_port_data); - } else if ((listen_port_data - = dict_get (options, "transport.socket.listen-port")) - != NULL) { - listen_port = data_to_uint16 (listen_port_data); - } else if ((listen_port_data - = dict_get (options, "transport.rdma.listen-port")) - != NULL) { - listen_port = data_to_uint16 (listen_port_data); - } - - /* One listen port per RPC */ - listener = rpcsvc_get_listener (svc, 0, NULL); - if (!listener) { - /* FIXME: listener is given the name of first program that - * creates it. This is not always correct. For eg., multiple - * programs can be listening on the same listener - * (glusterfs 3.1.0, 3.1.2, 3.1.3 etc). - */ - listener = rpcsvc_create_listener (svc, options, "RPC"); - if (!listener) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "creation of listener" - " for program failed"); - goto free_svc; - } - } - - if (!listener->trans) { - gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no transport " - "found"); - goto free_svc; - } - gluster_dump_prog.options = options; - ret = rpcsvc_program_register (svc, gluster_dump_prog); + ret = rpcsvc_program_register (svc, &gluster_dump_prog); if (ret) { gf_log (GF_RPCSVC, GF_LOG_ERROR, "failed to register DUMP program"); diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 25381af7798..0a3e91da7b2 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -379,10 +379,10 @@ struct rpcsvc_program { * procedure handlers. */ extern int -rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program); +rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program); extern int -rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t program); +rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *program); /* This will create and add a listener to listener pool. Programs can * use any of the listener in this pool. A single listener can be used by @@ -395,6 +395,9 @@ rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t program); extern rpcsvc_listener_t * rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name); +void +rpcsvc_listener_destroy (rpcsvc_listener_t *listener); + extern int rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port); diff --git a/xlators/mgmt/glusterd/src/glusterd.c b/xlators/mgmt/glusterd/src/glusterd.c index a9f790bd3a1..33c9ac796cd 100644 --- a/xlators/mgmt/glusterd/src/glusterd.c +++ b/xlators/mgmt/glusterd/src/glusterd.c @@ -161,6 +161,26 @@ out: return 0; } + +inline int32_t +glusterd_program_register (xlator_t *this, rpcsvc_t *svc, + rpcsvc_program_t *prog) +{ + int32_t ret = -1; + + ret = rpcsvc_program_register (svc, prog); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "cannot register program (name: %s, prognum:%d, " + "progver:%d)", prog->progname, prog->prognum, + prog->progver); + goto out; + } + +out: + return ret; +} + /* * init - called during glusterd initialization * @@ -170,15 +190,16 @@ out: int init (xlator_t *this) { - int32_t ret = -1; - rpcsvc_t *rpc = NULL; - glusterd_conf_t *conf = NULL; - data_t *dir_data = NULL; - char dirname [PATH_MAX]; - struct stat buf = {0,}; - char *port_str = NULL; - int port_num = 0; - char voldir [PATH_MAX] = {0,}; + int32_t ret = -1; + rpcsvc_t *rpc = NULL; + glusterd_conf_t *conf = NULL; + data_t *dir_data = NULL; + struct stat buf = {0,}; + char *port_str = NULL; + int port_num = 0; + char voldir [PATH_MAX] = {0,}; + rpcsvc_listener_t *listener = NULL; + char dirname [PATH_MAX]; dir_data = dict_get (this->options, "working-directory"); @@ -260,24 +281,35 @@ init (xlator_t *this) glusterd1_mop_prog.progport = port_num; } - ret = rpcsvc_program_register (rpc, glusterd1_mop_prog); - if (ret) { + /* + * only one listener for glusterd1_mop_prog, gluster_pmap_prog and + * gluster_handshake_prog. + */ + listener = rpcsvc_create_listener (rpc, this->options, this->name); + if (listener == NULL) { gf_log (this->name, GF_LOG_ERROR, - "rpcsvc_program_register returned %d", ret); + "creation of listener failed"); goto out; } - ret = rpcsvc_program_register (rpc, gluster_pmap_prog); + ret = glusterd_program_register (this, rpc, &glusterd1_mop_prog); if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "rpcsvc_program_register returned %d", ret); + goto out; + } + + ret = glusterd_program_register (this, rpc, &gluster_pmap_prog); + if (ret) { + rpcsvc_program_unregister (rpc, &glusterd1_mop_prog); goto out; } gluster_handshake_prog.options = this->options; - ret = rpcsvc_program_register (rpc, gluster_handshake_prog); - if (ret) + ret = glusterd_program_register (this, rpc, &gluster_handshake_prog); + if (ret) { + rpcsvc_program_unregister (rpc, &glusterd1_mop_prog); + rpcsvc_program_unregister (rpc, &gluster_handshake_prog); goto out; + } conf = GF_CALLOC (1, sizeof (glusterd_conf_t), gf_gld_mt_glusterd_conf_t); @@ -309,6 +341,17 @@ init (xlator_t *this) ret = 0; out: + if (ret == -1) { + if (listener != NULL) { + rpcsvc_listener_destroy (listener); + } + + if (this->private != NULL) { + GF_FREE (this->private); + this->private = NULL; + } + } + return ret; } diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index 96bb72901a7..7ab3de51c18 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -426,8 +426,9 @@ mem_acct_init (xlator_t *this) int init (xlator_t *this) { - int32_t ret = -1; - server_conf_t *conf = NULL; + int32_t ret = -1; + server_conf_t *conf = NULL; + rpcsvc_listener_t *listener = NULL; if (!this) goto out; @@ -476,24 +477,49 @@ init (xlator_t *this) /* RPC related */ //conf->rpc = rpc_svc_init (&conf->rpc_conf); conf->rpc = rpcsvc_init (this->ctx, this->options); - if (!conf->rpc) { + if (conf->rpc == NULL) { + ret = -1; + goto out; + } + + listener = rpcsvc_create_listener (conf->rpc, this->options, + this->name); + if (listener == NULL) { + gf_log (this->name, GF_LOG_DEBUG, + "creation of listener failed"); ret = -1; goto out; } ret = rpcsvc_register_notify (conf->rpc, server_rpc_notify, this); - if (ret) + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "registration of notify with rpcsvc failed"); goto out; + } glusterfs3_1_fop_prog.options = this->options; - ret = rpcsvc_program_register (conf->rpc, glusterfs3_1_fop_prog); - if (ret) + ret = rpcsvc_program_register (conf->rpc, &glusterfs3_1_fop_prog); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "registration of program (name:%s, prognum:%d, " + "progver:%d) failed", glusterfs3_1_fop_prog.progname, + glusterfs3_1_fop_prog.prognum, + glusterfs3_1_fop_prog.progver); goto out; + } gluster_handshake_prog.options = this->options; - ret = rpcsvc_program_register (conf->rpc, gluster_handshake_prog); - if (ret) + ret = rpcsvc_program_register (conf->rpc, &gluster_handshake_prog); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "registration of program (name:%s, prognum:%d, " + "progver:%d) failed", gluster_handshake_prog.progname, + gluster_handshake_prog.prognum, + gluster_handshake_prog.progver); + rpcsvc_program_unregister (conf->rpc, &glusterfs3_1_fop_prog); goto out; + } #ifndef GF_DARWIN_HOST_OS { @@ -523,8 +549,15 @@ init (xlator_t *this) ret = 0; out: - if (ret && this) - this->fini (this); + if (ret) { + if (this != NULL) { + this->fini (this); + } + + if (listener != NULL) { + rpcsvc_listener_destroy (listener); + } + } return ret; } |