diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-rpc.c')
| -rw-r--r-- | xlators/features/changelog/src/changelog-rpc.c | 185 |
1 files changed, 138 insertions, 47 deletions
diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c index 828f85e8e45..440b88091a6 100644 --- a/xlators/features/changelog/src/changelog-rpc.c +++ b/xlators/features/changelog/src/changelog-rpc.c @@ -8,12 +8,12 @@ cases as published by the Free Software Foundation. */ -#include "syscall.h" +#include <glusterfs/syscall.h> #include "changelog-rpc.h" #include "changelog-mem-types.h" #include "changelog-ev-handle.h" -struct rpcsvc_program *changelog_programs[]; +static struct rpcsvc_program *changelog_programs[]; static void changelog_cleanup_dispatchers(xlator_t *this, changelog_priv_t *priv, int count) @@ -43,9 +43,6 @@ changelog_cleanup_rpc_threads(xlator_t *this, changelog_priv_t *priv) /** terminate dispatcher thread(s) */ changelog_cleanup_dispatchers(this, priv, priv->nr_dispatchers); - /* TODO: what about pending and waiting connections? */ - changelog_ev_cleanup_connections(this, conn); - /* destroy locks */ ret = pthread_mutex_destroy(&conn->pending_lock); if (ret != 0) @@ -72,9 +69,6 @@ changelog_init_rpc_threads(xlator_t *this, changelog_priv_t *priv, rbuf_t *rbuf, int j = 0; int ret = 0; changelog_clnt_t *conn = NULL; - char thread_name[GF_THREAD_NAMEMAX] = { - 0, - }; conn = &priv->connections; @@ -114,9 +108,9 @@ changelog_init_rpc_threads(xlator_t *this, changelog_priv_t *priv, rbuf_t *rbuf, /* spawn dispatcher threads */ for (; j < nr_dispatchers; j++) { - snprintf(thread_name, sizeof(thread_name), "clogd%03hx", (j & 0x3ff)); ret = gf_thread_create(&priv->ev_dispatcher[j], NULL, - changelog_ev_dispatch, conn, thread_name); + changelog_ev_dispatch, conn, "clogd%03hx", + j & 0x3ff); if (ret != 0) { changelog_cleanup_dispatchers(this, priv, j); break; @@ -147,48 +141,146 @@ int changelog_rpcsvc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, void *data) { + xlator_t *this = NULL; + rpc_transport_t *trans = NULL; + rpc_transport_t *xprt = NULL; + rpc_transport_t *xp_next = NULL; + changelog_priv_t *priv = NULL; + uint64_t listnercnt = 0; + uint64_t xprtcnt = 0; + uint64_t clntcnt = 0; + rpcsvc_listener_t *listener = NULL; + rpcsvc_listener_t *next = NULL; + gf_boolean_t listner_found = _gf_false; + socket_private_t *sockpriv = NULL; + + if (!xl || !data || !rpc) { + gf_msg_callingfn("changelog", GF_LOG_WARNING, 0, + CHANGELOG_MSG_RPCSVC_NOTIFY_FAILED, + "Calling rpc_notify without initializing"); + goto out; + } + + this = xl; + trans = data; + priv = this->private; + + if (!priv) { + gf_msg_callingfn("changelog", GF_LOG_WARNING, 0, + CHANGELOG_MSG_RPCSVC_NOTIFY_FAILED, + "Calling rpc_notify without priv initializing"); + goto out; + } + + if (event == RPCSVC_EVENT_ACCEPT) { + GF_ATOMIC_INC(priv->xprtcnt); + LOCK(&priv->lock); + { + list_add_tail(&trans->list, &priv->xprt_list); + } + UNLOCK(&priv->lock); + goto out; + } + + if (event == RPCSVC_EVENT_DISCONNECT) { + list_for_each_entry_safe(listener, next, &rpc->listeners, list) + { + if (listener && listener->trans) { + if (listener->trans == trans) { + listnercnt = GF_ATOMIC_DEC(priv->listnercnt); + listner_found = _gf_true; + rpcsvc_listener_destroy(listener); + } + } + } + + if (listnercnt > 0) { + goto out; + } + if (listner_found) { + LOCK(&priv->lock); + list_for_each_entry_safe(xprt, xp_next, &priv->xprt_list, list) + { + sockpriv = (socket_private_t *)(xprt->private); + gf_log("changelog", GF_LOG_INFO, + "Send disconnect" + " on socket %d", + sockpriv->sock); + rpc_transport_disconnect(xprt, _gf_false); + } + UNLOCK(&priv->lock); + goto out; + } + LOCK(&priv->lock); + { + list_del_init(&trans->list); + } + UNLOCK(&priv->lock); + + xprtcnt = GF_ATOMIC_DEC(priv->xprtcnt); + clntcnt = GF_ATOMIC_GET(priv->clntcnt); + if (!xprtcnt && !clntcnt) { + changelog_process_cleanup_event(this); + } + } + +out: return 0; } void +changelog_process_cleanup_event(xlator_t *this) +{ + gf_boolean_t cleanup_notify = _gf_false; + changelog_priv_t *priv = NULL; + char sockfile[UNIX_PATH_MAX] = { + 0, + }; + + if (!this) + return; + priv = this->private; + if (!priv) + return; + + LOCK(&priv->lock); + { + cleanup_notify = priv->notify_down; + priv->notify_down = _gf_true; + } + UNLOCK(&priv->lock); + + if (priv->victim && !cleanup_notify) { + default_notify(this, GF_EVENT_PARENT_DOWN, priv->victim); + + if (priv->rpc) { + /* sockfile path could have been saved to avoid this */ + CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, + UNIX_PATH_MAX); + sys_unlink(sockfile); + (void)rpcsvc_unregister_notify(priv->rpc, changelog_rpcsvc_notify, + this); + if (priv->rpc->rxpool) { + mem_pool_destroy(priv->rpc->rxpool); + priv->rpc->rxpool = NULL; + } + GF_FREE(priv->rpc); + priv->rpc = NULL; + } + } +} + +void changelog_destroy_rpc_listner(xlator_t *this, changelog_priv_t *priv) { char sockfile[UNIX_PATH_MAX] = { 0, }; - changelog_clnt_t *c_clnt = &priv->connections; - changelog_rpc_clnt_t *crpc = NULL; - int nofconn = 0; /* sockfile path could have been saved to avoid this */ CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, UNIX_PATH_MAX); changelog_rpc_server_destroy(this, priv->rpc, sockfile, changelog_rpcsvc_notify, changelog_programs); - - /* TODO Below approach is not perfect to wait for cleanup - all active connections without this code brick process - can be crash in case of brick multiplexing if any in-progress - request process on rpc by changelog xlator after - cleanup resources - */ - - if (c_clnt) { - do { - nofconn = 0; - LOCK(&c_clnt->active_lock); - list_for_each_entry(crpc, &c_clnt->active, list) { nofconn++; } - UNLOCK(&c_clnt->active_lock); - LOCK(&c_clnt->wait_lock); - list_for_each_entry(crpc, &c_clnt->waitq, list) { nofconn++; } - UNLOCK(&c_clnt->wait_lock); - pthread_mutex_lock(&c_clnt->pending_lock); - list_for_each_entry(crpc, &c_clnt->pending, list) { nofconn++; } - pthread_mutex_unlock(&c_clnt->pending_lock); - - } while (nofconn); /* Wait for all connection cleanup */ - } - - (void)changelog_cleanup_rpc_threads(this, priv); } rpcsvc_t * @@ -287,16 +379,15 @@ changelog_handle_probe(rpcsvc_request_t *req) this = req->trans->xl; if (this->cleanup_starting) { - gf_msg(this->name, GF_LOG_DEBUG, 0, CHANGELOG_MSG_HANDLE_PROBE_ERROR, - "cleanup_starting flag is already set for xl"); + gf_smsg(this->name, GF_LOG_DEBUG, 0, CHANGELOG_MSG_CLEANUP_ALREADY_SET, + NULL); return 0; } ret = xdr_to_generic(req->msg[0], &rpc_req, (xdrproc_t)xdr_changelog_probe_req); if (ret < 0) { - gf_msg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_HANDLE_PROBE_ERROR, - "xdr decoding error"); + gf_smsg("", GF_LOG_ERROR, 0, CHANGELOG_MSG_HANDLE_PROBE_ERROR, NULL); req->rpc_err = GARBAGE_ARGS; goto handle_xdr_error; } @@ -328,13 +419,13 @@ submit_rpc: * RPC declarations */ -rpcsvc_actor_t changelog_svc_actors[CHANGELOG_RPC_PROC_MAX] = { +static rpcsvc_actor_t changelog_svc_actors[CHANGELOG_RPC_PROC_MAX] = { [CHANGELOG_RPC_PROBE_FILTER] = {"CHANGELOG PROBE FILTER", - CHANGELOG_RPC_PROBE_FILTER, - changelog_handle_probe, NULL, 0, DRC_NA}, + changelog_handle_probe, NULL, + CHANGELOG_RPC_PROBE_FILTER, DRC_NA, 0}, }; -struct rpcsvc_program changelog_svc_prog = { +static struct rpcsvc_program changelog_svc_prog = { .progname = CHANGELOG_RPC_PROGNAME, .prognum = CHANGELOG_RPC_PROGNUM, .progver = CHANGELOG_RPC_PROGVER, @@ -343,7 +434,7 @@ struct rpcsvc_program changelog_svc_prog = { .synctask = _gf_true, }; -struct rpcsvc_program *changelog_programs[] = { +static struct rpcsvc_program *changelog_programs[] = { &changelog_svc_prog, NULL, }; |
