summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c7
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c262
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h7
-rw-r--r--xlators/mgmt/glusterd/src/glusterd.c77
-rw-r--r--xlators/protocol/server/src/server.c53
5 files changed, 190 insertions, 216 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index e3bc519fc..adbc0fe1e 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -1119,8 +1119,11 @@ rpc_transport_notify (rpc_transport_t *this, rpc_transport_event_t event,
goto out;
}
- //ret = this->notify (this, this->notify_data, event, data);
- ret = this->notify (this, this->mydata, event, data);
+ if (this->notify != NULL) {
+ ret = this->notify (this, this->mydata, event, data);
+ } else {
+ ret = 0;
+ }
out:
return ret;
}
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 2b6ed58f0..88675205c 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -1360,7 +1360,7 @@ out:
}
-int
+inline int
rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog)
{
if (!prog)
@@ -1376,6 +1376,38 @@ rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog)
}
+int32_t
+rpcsvc_get_listener_port (rpcsvc_listener_t *listener)
+{
+ int32_t listener_port = -1;
+
+ if ((listener == NULL) || (listener->trans == NULL)) {
+ goto out;
+ }
+
+ switch (listener->trans->myinfo.sockaddr.ss_family) {
+ case AF_INET:
+ listener_port = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port;
+ break;
+
+ case AF_INET6:
+ listener_port = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port;
+ break;
+
+ default:
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ "invalid address family (%d)",
+ listener->trans->myinfo.sockaddr.ss_family);
+ goto out;
+ }
+
+ listener_port = ntohs (listener_port);
+
+out:
+ return listener_port;
+}
+
+
rpcsvc_listener_t *
rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans)
{
@@ -1398,34 +1430,21 @@ rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans)
continue;
}
-
- switch (listener->trans->myinfo.sockaddr.ss_family) {
- case AF_INET:
- listener_port
- = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port;
- break;
-
- case AF_INET6:
- listener_port
- = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port;
- break;
- default:
+ listener_port = rpcsvc_get_listener_port (listener);
+ if (listener_port == -1) {
gf_log (GF_RPCSVC, GF_LOG_DEBUG,
- "invalid address family (%d)",
- listener->trans->myinfo.sockaddr.ss_family);
- goto unlock;
+ "invalid port for listener %s",
+ listener->trans->name);
+ continue;
}
- listener_port = ntohs (listener_port);
-
if (listener_port == port) {
found = 1;
break;
}
}
}
-unlock:
pthread_mutex_unlock (&svc->rpclock);
if (!found) {
@@ -1457,31 +1476,38 @@ rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr,
int
-rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t prog)
+rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *prog)
{
int ret = -1;
- if (!svc)
- return -1;
+ if (!svc || !prog) {
+ goto out;
+ }
- /* TODO: De-init the listening connection for this program. */
- ret = rpcsvc_program_unregister_portmap (&prog);
+ ret = rpcsvc_program_unregister_portmap (prog);
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap unregistration of"
" program failed");
- goto err;
+ goto out;
}
- ret = 0;
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Program unregistered: %s, Num: %d,"
- " Ver: %d, Port: %d", prog.progname, prog.prognum,
- prog.progver, prog.progport);
+ " Ver: %d, Port: %d", prog->progname, prog->prognum,
+ prog->progver, prog->progport);
-err:
- if (ret == -1)
+ pthread_mutex_lock (&svc->rpclock);
+ {
+ list_del (&prog->program);
+ }
+ pthread_mutex_unlock (&svc->rpclock);
+
+ ret = 0;
+out:
+ if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program unregistration failed"
- ": %s, Num: %d, Ver: %d, Port: %d", prog.progname,
- prog.prognum, prog.progver, prog.progport);
+ ": %s, Num: %d, Ver: %d, Port: %d", prog->progname,
+ prog->prognum, prog->progver, prog->progport);
+ }
return ret;
}
@@ -1524,16 +1550,16 @@ rpcsvc_transport_create (rpcsvc_t *svc, dict_t *options, char *name)
goto out;
}
- ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc);
+ ret = rpc_transport_listen (trans);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed");
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG,
+ "listening on transport failed");
goto out;
}
- ret = rpc_transport_listen (trans);
+ ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc);
if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG,
- "listening on transport failed");
+ gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed");
goto out;
}
@@ -1657,127 +1683,37 @@ out:
}
-int
-rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program)
+inline int
+rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
{
- rpcsvc_program_t *newprog = NULL;
int ret = -1;
- rpcsvc_listener_t *listener = NULL;
- data_t *listen_port_data = NULL;
- uint16_t listen_port = 0;
-
- if (!svc)
- return -1;
-
- newprog = GF_CALLOC (1, sizeof(*newprog),
- gf_common_mt_rpcsvc_program_t);
- if (!newprog)
- return -1;
-
- if (!program.actors)
- goto free_prog;
-
- memcpy (newprog, &program, sizeof (program));
-
- listen_port = RPCSVC_DEFAULT_LISTEN_PORT;
- if (program.options != NULL) {
- /*
- * FIXME: use a method which does not hard-code each transport's
- * option keys.
- */
- listen_port_data = dict_get (program.options, "listen-port");
- if (listen_port_data != NULL) {
- listen_port = data_to_uint16 (listen_port_data);
- } else if ((listen_port_data
- = dict_get (program.options,
- "transport.socket.listen-port"))
- != NULL) {
- listen_port = data_to_uint16 (listen_port_data);
- } else if ((listen_port_data
- = dict_get (program.options,
- "transport.rdma.listen-port"))
- != NULL) {
- listen_port = data_to_uint16 (listen_port_data);
- }
- } else if (program.progport != 0) {
- listen_port = program.progport;
- }
- listener = rpcsvc_get_listener (svc, listen_port, NULL);
- if ((listener == NULL) && (listen_port == RPCSVC_DEFAULT_LISTEN_PORT)) {
- listener = rpcsvc_get_listener (svc, 6969, NULL);
+ if (!svc) {
+ goto out;
}
- if ((listener == NULL) || (listener->trans == NULL)) {
- if ((listener != NULL) && (listener->trans == NULL)) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG,
- "empty listener without transport found, "
- "destroying it");
- rpcsvc_listener_destroy (listener);
- }
-
- if ((program.progport != 0)
- && (listen_port == program.progport)) {
- ret = dict_set (program.options,
- "transport.socket.listen-port",
- data_from_uint16 (listen_port));
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG,
- "setting listening port (%d) specified "
- "by program (%s) in options dictionary "
- "failed", listen_port,
- program.progname);
- goto free_prog;
- }
-
- ret = dict_set (program.options,
- "transport.rdma.listen-port",
- data_from_uint16 (listen_port));
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_DEBUG,
- "setting listening port (%d) specified "
- "by program (%s) in options dictionary "
- "failed", listen_port,
- program.progname);
- goto free_prog;
- }
- }
-
- listener = rpcsvc_create_listener (svc, program.options,
- program.progname);
- if (listener == NULL) {
- ret = -1;
- gf_log (GF_RPCSVC, GF_LOG_DEBUG,
- "creation of listener for program (%s) failed",
- program.progname);
- goto free_prog;
- }
+ if (program->actors == NULL) {
+ goto out;
}
- ret = rpcsvc_program_register_portmap (newprog, program.progport);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap registration of"
- " program failed");
- goto free_prog;
- }
+ INIT_LIST_HEAD (&program->program);
pthread_mutex_lock (&svc->rpclock);
{
- list_add_tail (&newprog->program, &svc->programs);
+ list_add_tail (&program->program, &svc->programs);
}
pthread_mutex_unlock (&svc->rpclock);
ret = 0;
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "New program registered: %s, Num: %d,"
- " Ver: %d, Port: %d", newprog->progname, newprog->prognum,
- newprog->progver, newprog->progport);
+ " Ver: %d, Port: %d", program->progname, program->prognum,
+ program->progver, program->progport);
-free_prog:
+out:
if (ret == -1) {
gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program registration failed:"
- " %s, Num: %d, Ver: %d, Port: %d", newprog->progname,
- newprog->prognum, newprog->progver, newprog->progport);
- GF_FREE (newprog);
+ " %s, Num: %d, Ver: %d, Port: %d", program->progname,
+ program->prognum, program->progver, program->progport);
}
return ret;
@@ -1884,9 +1820,6 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
{
rpcsvc_t *svc = NULL;
int ret = -1, poolcount = 0;
- rpcsvc_listener_t *listener = NULL;
- uint32_t listen_port = 0;
- data_t *listen_port_data = NULL;
if ((!ctx) || (!options))
return NULL;
@@ -1929,50 +1862,9 @@ rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options)
svc->ctx = ctx;
gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited.");
- listen_port = RPCSVC_DEFAULT_LISTEN_PORT;
-
- /*
- * FIXME: use a method which does not hard-code each transport's
- * option keys.
- */
- listen_port_data = dict_get (options, "listen-port");
- if (listen_port_data != NULL) {
- listen_port = data_to_uint16 (listen_port_data);
- } else if ((listen_port_data
- = dict_get (options, "transport.socket.listen-port"))
- != NULL) {
- listen_port = data_to_uint16 (listen_port_data);
- } else if ((listen_port_data
- = dict_get (options, "transport.rdma.listen-port"))
- != NULL) {
- listen_port = data_to_uint16 (listen_port_data);
- }
-
- /* One listen port per RPC */
- listener = rpcsvc_get_listener (svc, 0, NULL);
- if (!listener) {
- /* FIXME: listener is given the name of first program that
- * creates it. This is not always correct. For eg., multiple
- * programs can be listening on the same listener
- * (glusterfs 3.1.0, 3.1.2, 3.1.3 etc).
- */
- listener = rpcsvc_create_listener (svc, options, "RPC");
- if (!listener) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "creation of listener"
- " for program failed");
- goto free_svc;
- }
- }
-
- if (!listener->trans) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no transport "
- "found");
- goto free_svc;
- }
-
gluster_dump_prog.options = options;
- ret = rpcsvc_program_register (svc, gluster_dump_prog);
+ ret = rpcsvc_program_register (svc, &gluster_dump_prog);
if (ret) {
gf_log (GF_RPCSVC, GF_LOG_ERROR,
"failed to register DUMP program");
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index 25381af77..0a3e91da7 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -379,10 +379,10 @@ struct rpcsvc_program {
* procedure handlers.
*/
extern int
-rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program);
+rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program);
extern int
-rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t program);
+rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *program);
/* This will create and add a listener to listener pool. Programs can
* use any of the listener in this pool. A single listener can be used by
@@ -395,6 +395,9 @@ rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t program);
extern rpcsvc_listener_t *
rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name);
+void
+rpcsvc_listener_destroy (rpcsvc_listener_t *listener);
+
extern int
rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port);
diff --git a/xlators/mgmt/glusterd/src/glusterd.c b/xlators/mgmt/glusterd/src/glusterd.c
index a9f790bd3..33c9ac796 100644
--- a/xlators/mgmt/glusterd/src/glusterd.c
+++ b/xlators/mgmt/glusterd/src/glusterd.c
@@ -161,6 +161,26 @@ out:
return 0;
}
+
+inline int32_t
+glusterd_program_register (xlator_t *this, rpcsvc_t *svc,
+ rpcsvc_program_t *prog)
+{
+ int32_t ret = -1;
+
+ ret = rpcsvc_program_register (svc, prog);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "cannot register program (name: %s, prognum:%d, "
+ "progver:%d)", prog->progname, prog->prognum,
+ prog->progver);
+ goto out;
+ }
+
+out:
+ return ret;
+}
+
/*
* init - called during glusterd initialization
*
@@ -170,15 +190,16 @@ out:
int
init (xlator_t *this)
{
- int32_t ret = -1;
- rpcsvc_t *rpc = NULL;
- glusterd_conf_t *conf = NULL;
- data_t *dir_data = NULL;
- char dirname [PATH_MAX];
- struct stat buf = {0,};
- char *port_str = NULL;
- int port_num = 0;
- char voldir [PATH_MAX] = {0,};
+ int32_t ret = -1;
+ rpcsvc_t *rpc = NULL;
+ glusterd_conf_t *conf = NULL;
+ data_t *dir_data = NULL;
+ struct stat buf = {0,};
+ char *port_str = NULL;
+ int port_num = 0;
+ char voldir [PATH_MAX] = {0,};
+ rpcsvc_listener_t *listener = NULL;
+ char dirname [PATH_MAX];
dir_data = dict_get (this->options, "working-directory");
@@ -260,24 +281,35 @@ init (xlator_t *this)
glusterd1_mop_prog.progport = port_num;
}
- ret = rpcsvc_program_register (rpc, glusterd1_mop_prog);
- if (ret) {
+ /*
+ * only one listener for glusterd1_mop_prog, gluster_pmap_prog and
+ * gluster_handshake_prog.
+ */
+ listener = rpcsvc_create_listener (rpc, this->options, this->name);
+ if (listener == NULL) {
gf_log (this->name, GF_LOG_ERROR,
- "rpcsvc_program_register returned %d", ret);
+ "creation of listener failed");
goto out;
}
- ret = rpcsvc_program_register (rpc, gluster_pmap_prog);
+ ret = glusterd_program_register (this, rpc, &glusterd1_mop_prog);
if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "rpcsvc_program_register returned %d", ret);
+ goto out;
+ }
+
+ ret = glusterd_program_register (this, rpc, &gluster_pmap_prog);
+ if (ret) {
+ rpcsvc_program_unregister (rpc, &glusterd1_mop_prog);
goto out;
}
gluster_handshake_prog.options = this->options;
- ret = rpcsvc_program_register (rpc, gluster_handshake_prog);
- if (ret)
+ ret = glusterd_program_register (this, rpc, &gluster_handshake_prog);
+ if (ret) {
+ rpcsvc_program_unregister (rpc, &glusterd1_mop_prog);
+ rpcsvc_program_unregister (rpc, &gluster_handshake_prog);
goto out;
+ }
conf = GF_CALLOC (1, sizeof (glusterd_conf_t),
gf_gld_mt_glusterd_conf_t);
@@ -309,6 +341,17 @@ init (xlator_t *this)
ret = 0;
out:
+ if (ret == -1) {
+ if (listener != NULL) {
+ rpcsvc_listener_destroy (listener);
+ }
+
+ if (this->private != NULL) {
+ GF_FREE (this->private);
+ this->private = NULL;
+ }
+ }
+
return ret;
}
diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c
index 96bb72901..7ab3de51c 100644
--- a/xlators/protocol/server/src/server.c
+++ b/xlators/protocol/server/src/server.c
@@ -426,8 +426,9 @@ mem_acct_init (xlator_t *this)
int
init (xlator_t *this)
{
- int32_t ret = -1;
- server_conf_t *conf = NULL;
+ int32_t ret = -1;
+ server_conf_t *conf = NULL;
+ rpcsvc_listener_t *listener = NULL;
if (!this)
goto out;
@@ -476,24 +477,49 @@ init (xlator_t *this)
/* RPC related */
//conf->rpc = rpc_svc_init (&conf->rpc_conf);
conf->rpc = rpcsvc_init (this->ctx, this->options);
- if (!conf->rpc) {
+ if (conf->rpc == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ listener = rpcsvc_create_listener (conf->rpc, this->options,
+ this->name);
+ if (listener == NULL) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "creation of listener failed");
ret = -1;
goto out;
}
ret = rpcsvc_register_notify (conf->rpc, server_rpc_notify, this);
- if (ret)
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "registration of notify with rpcsvc failed");
goto out;
+ }
glusterfs3_1_fop_prog.options = this->options;
- ret = rpcsvc_program_register (conf->rpc, glusterfs3_1_fop_prog);
- if (ret)
+ ret = rpcsvc_program_register (conf->rpc, &glusterfs3_1_fop_prog);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "registration of program (name:%s, prognum:%d, "
+ "progver:%d) failed", glusterfs3_1_fop_prog.progname,
+ glusterfs3_1_fop_prog.prognum,
+ glusterfs3_1_fop_prog.progver);
goto out;
+ }
gluster_handshake_prog.options = this->options;
- ret = rpcsvc_program_register (conf->rpc, gluster_handshake_prog);
- if (ret)
+ ret = rpcsvc_program_register (conf->rpc, &gluster_handshake_prog);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "registration of program (name:%s, prognum:%d, "
+ "progver:%d) failed", gluster_handshake_prog.progname,
+ gluster_handshake_prog.prognum,
+ gluster_handshake_prog.progver);
+ rpcsvc_program_unregister (conf->rpc, &glusterfs3_1_fop_prog);
goto out;
+ }
#ifndef GF_DARWIN_HOST_OS
{
@@ -523,8 +549,15 @@ init (xlator_t *this)
ret = 0;
out:
- if (ret && this)
- this->fini (this);
+ if (ret) {
+ if (this != NULL) {
+ this->fini (this);
+ }
+
+ if (listener != NULL) {
+ rpcsvc_listener_destroy (listener);
+ }
+ }
return ret;
}