diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 109 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 10 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 79 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 6 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 65 | ||||
-rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 2 | ||||
-rw-r--r-- | rpc/rpc-transport/socket/src/socket.c | 16 |
7 files changed, 211 insertions, 76 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index d496673217a..99bba8cb407 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -970,68 +970,8 @@ out: return ret; } - struct rpc_clnt * -rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, - glusterfs_ctx_t *ctx, char *name) -{ - int ret = -1; - struct rpc_clnt *rpc = NULL; - struct rpc_clnt_connection *conn = NULL; - - rpc = GF_CALLOC (1, sizeof (*rpc), gf_common_mt_rpcclnt_t); - if (!rpc) { - gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); - goto out; - } - - pthread_mutex_init (&rpc->lock, NULL); - rpc->ctx = ctx; - - rpc->reqpool = mem_pool_new (struct rpc_req, - RPC_CLNT_DEFAULT_REQUEST_COUNT); - if (rpc->reqpool == NULL) { - pthread_mutex_destroy (&rpc->lock); - GF_FREE (rpc); - rpc = NULL; - goto out; - } - - rpc->saved_frames_pool = mem_pool_new (struct saved_frame, - RPC_CLNT_DEFAULT_REQUEST_COUNT); - if (rpc->saved_frames_pool == NULL) { - pthread_mutex_destroy (&rpc->lock); - mem_pool_destroy (rpc->reqpool); - GF_FREE (rpc); - rpc = NULL; - goto out; - } - - ret = rpc_clnt_connection_init (rpc, ctx, options, name); - if (ret == -1) { - pthread_mutex_destroy (&rpc->lock); - mem_pool_destroy (rpc->reqpool); - mem_pool_destroy (rpc->saved_frames_pool); - GF_FREE (rpc); - rpc = NULL; - if (options) - dict_unref (options); - goto out; - } - - conn = &rpc->conn; - rpc_clnt_reconnect (conn->trans); - - rpc = rpc_clnt_ref (rpc); - INIT_LIST_HEAD (&rpc->programs); - -out: - return rpc; -} - - -struct rpc_clnt * -rpc_clnt_new (struct rpc_clnt_config *config, dict_t *options, +rpc_clnt_new (dict_t *options, glusterfs_ctx_t *ctx, char *name) { int ret = -1; @@ -1607,3 +1547,50 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config) rpc->conn.config.remote_host = gf_strdup (config->remote_host); } } + +int +rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath) +{ + dict_t *dict = NULL; + char *fpath = NULL; + int ret = -1; + + GF_ASSERT (filepath); + GF_ASSERT (options); + + dict = dict_new (); + if (!dict) + goto out; + + fpath = gf_strdup (filepath); + if (!fpath) { + ret = -1; + goto out; + } + + ret = dict_set_dynstr (dict, "transport.socket.connect-path", fpath); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport.address-family", "unix"); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport.socket.nodelay", "off"); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport-type", "socket"); + if (ret) + goto out; + + *options = dict; +out: + if (ret) { + if (fpath) + GF_FREE (fpath); + if (dict) + dict_unref (dict); + } + return ret; +} diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index f8da51b02e3..d3e7f2f169a 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -187,13 +187,7 @@ struct rpc_clnt { }; -struct rpc_clnt * rpc_clnt_init (struct rpc_clnt_config *config, - dict_t *options, glusterfs_ctx_t *ctx, - char *name); - - -struct rpc_clnt *rpc_clnt_new (struct rpc_clnt_config *config, - dict_t *options, glusterfs_ctx_t *ctx, +struct rpc_clnt *rpc_clnt_new (dict_t *options, glusterfs_ctx_t *ctx, char *name); int rpc_clnt_start (struct rpc_clnt *rpc); @@ -244,4 +238,6 @@ void rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config); int rpcclnt_cbk_program_register (struct rpc_clnt *svc, rpcclnt_cb_program_t *program); +int +rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath); #endif /* !_RPC_CLNT_H */ diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c index e7ffb065ff6..b3bf4c93b66 100644 --- a/rpc/rpc-lib/src/rpc-transport.c +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -808,7 +808,7 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name) "dlsym (gf_rpc_transport_fini) on %s", dlerror ()); goto fail; } - + trans->reconfigure = dlsym (handle, "reconfigure"); if (trans->fini == NULL) { gf_log ("rpc-transport", GF_LOG_DEBUG, @@ -954,6 +954,8 @@ rpc_transport_destroy (rpc_transport_t *this) GF_VALIDATE_OR_GOTO("rpc_transport", this, fail); + if (this->options) + dict_unref (this->options); if (this->fini) this->fini (this); @@ -1051,3 +1053,78 @@ rpc_transport_register_notify (rpc_transport_t *trans, out: return ret; } + +//give negative values to skip setting that value +//this function asserts if both the values are negative. +//why call it if you dont set it. +int +rpc_transport_keepalive_options_set (dict_t *options, int32_t interval, + int32_t time) +{ + int ret = -1; + + GF_ASSERT (options); + GF_ASSERT ((interval > 0) || (time > 0)); + + ret = dict_set_int32 (options, + "transport.socket.keepalive-interval", interval); + if (ret) + goto out; + + ret = dict_set_int32 (options, + "transport.socket.keepalive-time", time); + if (ret) + goto out; +out: + return ret; +} + +int +rpc_transport_inet_options_build (dict_t **options, const char *hostname, int port) +{ + dict_t *dict = NULL; + char *host = NULL; + int ret = -1; + + GF_ASSERT (options); + GF_ASSERT (hostname); + GF_ASSERT (port >= 1024); + + dict = dict_new (); + if (!dict) + goto out; + + + host = gf_strdup ((char*)hostname); + if (!hostname) { + ret = -1; + goto out; + } + + ret = dict_set_dynstr (dict, "remote-host", host); + if (ret) + goto out; + + ret = dict_set_int32 (dict, "remote-port", port); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport.address-family", "inet"); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport-type", "socket"); + if (ret) + goto out; + + *options = dict; +out: + if (ret) { + if (host) + GF_FREE (host); + if (dict) + dict_unref (dict); + } + + return ret; +} diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h index 9db24c09f4e..a955df11126 100644 --- a/rpc/rpc-lib/src/rpc-transport.h +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -295,4 +295,10 @@ rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector, void rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin); +int +rpc_transport_keepalive_options_set (dict_t *options, int32_t interval, + int32_t time); + +int +rpc_transport_inet_options_build (dict_t **options, const char *hostname, int port); #endif /* __RPC_TRANSPORT_H__ */ diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index d949677c35e..3e9d58530f4 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -937,6 +937,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, rpcsvc_request_t *req = NULL; int ret = -1; uint16_t port = 0; + gf_boolean_t is_unix = _gf_false; if (!trans || !svc) return -1; @@ -949,7 +950,9 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, case AF_INET6: port = ((struct sockaddr_in6 *)&trans->peerinfo.sockaddr)->sin6_port; break; - + case AF_UNIX: + is_unix = _gf_true; + break; default: gf_log (GF_RPCSVC, GF_LOG_DEBUG, "invalid address family (%d)", @@ -959,14 +962,16 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, - port = ntohs (port); + if (is_unix == _gf_false) { + port = ntohs (port); - gf_log ("rpcsvc", GF_LOG_TRACE, "Client port: %d", (int)port); + gf_log ("rpcsvc", GF_LOG_TRACE, "Client port: %d", (int)port); - if (port > 1024) { //Non-privilaged user, fail request - gf_log ("glusterd", GF_LOG_ERROR, "Request received from non-" - "privileged port. Failing request"); - return -1; + if (port > 1024) { //Non-privilaged user, fail request + gf_log ("glusterd", GF_LOG_ERROR, "Request received from non-" + "privileged port. Failing request"); + return -1; + } } req = rpcsvc_request_create (svc, trans, msg); @@ -2193,6 +2198,52 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options) return 0; } +int +rpcsvc_transport_unix_options_build (dict_t **options, char *filepath) +{ + dict_t *dict = NULL; + char *fpath = NULL; + int ret = -1; + + GF_ASSERT (filepath); + GF_ASSERT (options); + + dict = dict_new (); + if (!dict) + goto out; + + fpath = gf_strdup (filepath); + if (!fpath) { + ret = -1; + goto out; + } + + ret = dict_set_dynstr (dict, "transport.socket.listen-path", fpath); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport.address-family", "unix"); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport.socket.nodelay", "off"); + if (ret) + goto out; + + ret = dict_set_str (dict, "transport-type", "socket"); + if (ret) + goto out; + + *options = dict; +out: + if (ret) { + if (fpath) + GF_FREE (fpath); + if (dict) + dict_unref (dict); + } + return ret; +} /* The global RPC service initializer. */ diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 8625600db13..3d5abc2d4d6 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -538,4 +538,6 @@ int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans, rpcsvc_cbk_program_t *prog, int procnum, struct iovec *proghdr, int proghdrcount); +int +rpcsvc_transport_unix_options_build (dict_t **options, char *filepath); #endif diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c index 4ae49815a58..cbd303496ae 100644 --- a/rpc/rpc-transport/socket/src/socket.c +++ b/rpc/rpc-transport/socket/src/socket.c @@ -298,6 +298,8 @@ __socket_server_bind (rpc_transport_t *this) socket_private_t *priv = NULL; int ret = -1; int opt = 1; + int reuse_check_sock = -1; + struct sockaddr_storage unix_addr = {0}; if (!this || !this->private) goto out; @@ -312,6 +314,20 @@ __socket_server_bind (rpc_transport_t *this) "setsockopt() for SO_REUSEADDR failed (%s)", strerror (errno)); } + //reuse-address doesnt work for unix type sockets + if (AF_UNIX == SA (&this->myinfo.sockaddr)->sa_family) { + memcpy (&unix_addr, SA (&this->myinfo.sockaddr), + this->myinfo.sockaddr_len); + reuse_check_sock = socket (AF_UNIX, SOCK_STREAM, 0); + if (reuse_check_sock > 0) { + ret = connect (reuse_check_sock, SA (&unix_addr), + this->myinfo.sockaddr_len); + if ((ret == -1) && (ECONNREFUSED == errno)) { + unlink (((struct sockaddr_un*)&unix_addr)->sun_path); + } + close (reuse_check_sock); + } + } ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr, this->myinfo.sockaddr_len); |