diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-rpc.c')
-rw-r--r-- | xlators/features/changelog/src/changelog-rpc.c | 300 |
1 files changed, 300 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c new file mode 100644 index 00000000000..04326456d31 --- /dev/null +++ b/xlators/features/changelog/src/changelog-rpc.c @@ -0,0 +1,300 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-rpc.h" +#include "changelog-mem-types.h" +#include "changelog-ev-handle.h" + +struct rpcsvc_program *changelog_programs[]; + +static void +changelog_cleanup_dispatchers (xlator_t *this, + changelog_priv_t *priv, int count) +{ + for (; count >= 0; count--) { + (void) changelog_thread_cleanup + (this, priv->ev_dispatcher[count]); + } +} + +static int +changelog_cleanup_rpc_threads (xlator_t *this, changelog_priv_t *priv) +{ + int ret = 0; + changelog_clnt_t *conn = NULL; + + conn = &priv->connections; + if (!conn) + return 0; + + /** terminate RPC thread(s) */ + ret = changelog_thread_cleanup (this, priv->connector); + if (ret != 0) + goto error_return; + /** terminate dispatcher thread(s) */ + changelog_cleanup_dispatchers (this, priv, priv->nr_dispatchers); + + /* TODO: what about pending and waiting connections? */ + changelog_ev_cleanup_connections (this, conn); + + /* destroy locks */ + ret = pthread_mutex_destroy (&conn->pending_lock); + if (ret != 0) + goto error_return; + ret = pthread_cond_destroy (&conn->pending_cond); + if (ret != 0) + goto error_return; + ret = LOCK_DESTROY (&conn->active_lock); + if (ret != 0) + goto error_return; + ret = LOCK_DESTROY (&conn->wait_lock); + if (ret != 0) + goto error_return; + return 0; + + error_return: + return -1; +} + +static int +changelog_init_rpc_threads (xlator_t *this, changelog_priv_t *priv, + rbuf_t *rbuf, int nr_dispatchers) +{ + int j = 0; + int ret = 0; + changelog_clnt_t *conn = NULL; + + conn = &priv->connections; + + conn->this = this; + conn->rbuf = rbuf; + conn->sequence = 1; /* start with sequence number one */ + + INIT_LIST_HEAD (&conn->pending); + INIT_LIST_HEAD (&conn->active); + INIT_LIST_HEAD (&conn->waitq); + + ret = pthread_mutex_init (&conn->pending_lock, NULL); + if (ret) + goto error_return; + ret = pthread_cond_init (&conn->pending_cond, NULL); + if (ret) + goto cleanup_pending_lock; + + ret = LOCK_INIT (&conn->active_lock); + if (ret) + goto cleanup_pending_cond; + ret = LOCK_INIT (&conn->wait_lock); + if (ret) + goto cleanup_active_lock; + + /* spawn reverse connection thread */ + ret = pthread_create (&priv->connector, + NULL, changelog_ev_connector, conn); + if (ret != 0) + goto cleanup_wait_lock; + + /* spawn dispatcher thread(s) */ + priv->ev_dispatcher = GF_CALLOC (nr_dispatchers, sizeof(pthread_t), + gf_changelog_mt_ev_dispatcher_t); + if (!priv->ev_dispatcher) + goto cleanup_connector; + + /* spawn dispatcher threads */ + for (; j < nr_dispatchers; j++) { + ret = pthread_create (&priv->ev_dispatcher[j], + NULL, changelog_ev_dispatch, conn); + if (ret != 0) { + changelog_cleanup_dispatchers (this, priv, --j); + break; + } + } + + if (ret != 0) + goto cleanup_connector; + + priv->nr_dispatchers = nr_dispatchers; + return 0; + + cleanup_connector: + (void) pthread_cancel (priv->connector); + cleanup_wait_lock: + (void) LOCK_DESTROY (&conn->wait_lock); + cleanup_active_lock: + (void) LOCK_DESTROY (&conn->active_lock); + cleanup_pending_cond: + (void) pthread_cond_destroy (&conn->pending_cond); + cleanup_pending_lock: + (void) pthread_mutex_destroy (&conn->pending_lock); + error_return: + return -1; +} + +int +changelog_rpcsvc_notify (rpcsvc_t *rpc, + void *xl, rpcsvc_event_t event, void *data) +{ + return 0; +} + +void +changelog_destroy_rpc_listner (xlator_t *this, changelog_priv_t *priv) +{ + char sockfile[UNIX_PATH_MAX] = {0,}; + + /* sockfile path could have been saved to avoid this */ + CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick, + sockfile, UNIX_PATH_MAX); + changelog_rpc_server_destroy (this, + priv->rpc, sockfile, + changelog_rpcsvc_notify, + changelog_programs); + (void) changelog_cleanup_rpc_threads (this, priv); +} + +rpcsvc_t * +changelog_init_rpc_listner (xlator_t *this, changelog_priv_t *priv, + rbuf_t *rbuf, int nr_dispatchers) +{ + int ret = 0; + char sockfile[UNIX_PATH_MAX] = {0,}; + + ret = changelog_init_rpc_threads (this, priv, rbuf, nr_dispatchers); + if (ret) + return NULL; + + CHANGELOG_MAKE_SOCKET_PATH (priv->changelog_brick, + sockfile, UNIX_PATH_MAX); + return changelog_rpc_server_init (this, sockfile, NULL, + changelog_rpcsvc_notify, + changelog_programs); +} + +void +changelog_rpc_clnt_cleanup (changelog_rpc_clnt_t *crpc) +{ + if (!crpc) + return; + crpc->c_clnt = NULL; + (void) LOCK_DESTROY (&crpc->lock); + GF_FREE (crpc); +} + +inline changelog_rpc_clnt_t * +changelog_rpc_clnt_init (xlator_t *this, + changelog_probe_req *rpc_req, changelog_clnt_t *c_clnt) +{ + int ret = 0; + changelog_rpc_clnt_t *crpc = NULL; + + crpc = GF_CALLOC (1, sizeof (*crpc), gf_changelog_mt_rpc_clnt_t); + if (!crpc) + goto error_return; + INIT_LIST_HEAD (&crpc->list); + + crpc->ref = 0; + changelog_set_disconnect_flag (crpc, _gf_false); + + crpc->filter = rpc_req->filter; + (void) memcpy (crpc->sock, rpc_req->sock, strlen (rpc_req->sock)); + + crpc->this = this; + crpc->c_clnt = c_clnt; + crpc->cleanup = changelog_rpc_clnt_cleanup; + + ret = LOCK_INIT (&crpc->lock); + if (ret != 0) + goto dealloc_crpc; + return crpc; + + dealloc_crpc: + GF_FREE (crpc); + error_return: + return NULL; +} + +/** + * Actor declarations + */ + +/** + * @probe_handler + * A probe RPC call spawns a connect back to the caller. Caller also + * passes an hint which acts as a filter for selecting updates. + */ + +int +changelog_handle_probe (rpcsvc_request_t *req) +{ + int ret = 0; + xlator_t *this = NULL; + rpcsvc_t *svc = NULL; + changelog_priv_t *priv = NULL; + changelog_clnt_t *c_clnt = NULL; + changelog_rpc_clnt_t *crpc = NULL; + + changelog_probe_req rpc_req = {0,}; + changelog_probe_rsp rpc_rsp = {0,}; + + ret = xdr_to_generic (req->msg[0], + &rpc_req, (xdrproc_t)xdr_changelog_probe_req); + if (ret < 0) { + gf_log ("", GF_LOG_ERROR, "xdr decoding error"); + req->rpc_err = GARBAGE_ARGS; + goto handle_xdr_error; + } + + /* ->xl hidden in rpcsvc */ + svc = rpcsvc_request_service (req); + this = svc->mydata; + priv = this->private; + c_clnt = &priv->connections; + + crpc = changelog_rpc_clnt_init (this, &rpc_req, c_clnt); + if (!crpc) + goto handle_xdr_error; + + changelog_ev_queue_connection (c_clnt, crpc); + rpc_rsp.op_ret = 0; + + goto submit_rpc; + + handle_xdr_error: + rpc_rsp.op_ret = -1; + submit_rpc: + (void) changelog_rpc_sumbit_reply (req, &rpc_rsp, NULL, 0, NULL, + (xdrproc_t)xdr_changelog_probe_rsp); + return 0; +} + +/** + * RPC declarations + */ + +rpcsvc_actor_t changelog_svc_actors[CHANGELOG_RPC_PROC_MAX] = { + [CHANGELOG_RPC_PROBE_FILTER] = { + "CHANGELOG PROBE FILTER", CHANGELOG_RPC_PROBE_FILTER, + changelog_handle_probe, NULL, 0, DRC_NA + }, +}; + +struct rpcsvc_program changelog_svc_prog = { + .progname = CHANGELOG_RPC_PROGNAME, + .prognum = CHANGELOG_RPC_PROGNUM, + .progver = CHANGELOG_RPC_PROGVER, + .numactors = CHANGELOG_RPC_PROC_MAX, + .actors = changelog_svc_actors, + .synctask = _gf_true, +}; + +struct rpcsvc_program *changelog_programs[] = { + &changelog_svc_prog, + NULL, +}; |