diff options
author | Venky Shankar <vshankar@redhat.com> | 2015-02-03 19:22:16 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2015-03-18 18:22:36 -0700 |
commit | 4737584fffcd25dbe35d17b076c95bf90a422cf2 (patch) | |
tree | 9f30e0e90c88c245787b78af3ca78d7ae05e30f2 /xlators/features/changelog/src/changelog-ev-handle.c | |
parent | 728fcd41eb39f66744d84b979dd8195fd47313ed (diff) |
features/changelog: RPC'fy {libgf}changelog
This patch introduces RPC based communication between the changelog
translator and libgfchangelog. It replaces the old pathetic stream
based interaction that existed earlier (due to time constraints :-/).
Changelog, upon initialization starts a RPC server (rpcsvc) allowing
clients to invoke a probe API as a bootup mechanism to request for
event notifications. During probe, clients can choose an event
filter specifying the type(s) of events they are interested in. As
of now there is no way to change the event notification set once
the probe RPC call is made, but that is easier to implement.
The actual event notifications is done on a separate RPC session.
The client (libgfchangelog) itself starts and RPC server which the
changelog translator "connects back" during probe. Notifications
are dispatched by a bunch of threads from the server (translator)
and the client optionally orders them if ordered notifications
are requried. FOPs fill in their respective event details in a
buffer (rot-buffs to be particular) and a bunch of threads
(consumers) swap the buffers out of roatation and dispatch them
via RPC. To avoid writer starvation, then number of dispatcher
threads is one less than the number of buffer list in rot-buffs.x
libgfchangelog becomes purely callback based -- upon event
notification from the server (and re-ordering them if required)
invoke a callback routine specified by consumer(s).
A major part of the patch is also aimed at providing backward
compatibility for geo-replication, which was one of the main
consumer of the stream based API. Also, this patch does not\
"turn on" event notifications for all fops, just a bunch which
is currently in requirement. Another pain point is that the
server does not filter events before dispatching it to the
clients. That load is taken up by the client itself (although
it's done at the library layer rather than making it hard on
the callback implementor). This needs improvement and care
needs to be taken to not load the server up with expensive
filtering mechanisms.
Change-Id: Ibf60a432b68f2dfa60c6f9add2bcfd37a9c41395
BUG: 1170075
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Reviewed-on: http://review.gluster.org/9708
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog/src/changelog-ev-handle.c')
-rw-r--r-- | xlators/features/changelog/src/changelog-ev-handle.c | 382 |
1 files changed, 382 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c new file mode 100644 index 00000000000..ca7443cfd22 --- /dev/null +++ b/xlators/features/changelog/src/changelog-ev-handle.c @@ -0,0 +1,382 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-ev-handle.h" +#include "changelog-rpc-common.h" +#include "changelog-helpers.h" + +struct rpc_clnt_program changelog_ev_program; + +#define NR_IOVEC (MAX_IOVEC - 3) +struct ev_rpc_vec { + int count; + struct iovec vector[NR_IOVEC]; + + /* sequence number */ + unsigned long seq; +}; + +struct ev_rpc { + rbuf_list_t *rlist; + struct rpc_clnt *rpc; + struct ev_rpc_vec vec; +}; + +/** + * As of now this just does the minimal (retval logging). Going further + * un-acknowledges sequence numbers can be retransmitted and other + * intelligence can be built into the server. + */ +int +changelog_event_dispatch_cbk (struct rpc_req *req, + struct iovec *iov, int count, void *myframe) +{ + return 0; +} + +/* dispatcher RPC */ +inline int +changelog_dispatch_vec (call_frame_t *frame, xlator_t *this, + struct rpc_clnt *rpc, struct ev_rpc_vec *vec) +{ + struct timeval tv = {0,}; + changelog_event_req req = {0,}; + + (void) gettimeofday (&tv, NULL); + + /** + * Event dispatch RPC header contains a sequence number for each + * dispatch. This allows the reciever to order the request before + * processing. + */ + req.seq = vec->seq; + req.tv_sec = tv.tv_sec; + req.tv_usec = tv.tv_usec; + + return changelog_rpc_sumbit_req (rpc, (void *)&req, + frame, &changelog_ev_program, + CHANGELOG_REV_PROC_EVENT, + vec->vector, vec->count, NULL, + this, changelog_event_dispatch_cbk, + (xdrproc_t) xdr_changelog_event_req); + } + + int + changelog_event_dispatch_rpc (call_frame_t *frame, xlator_t *this, void *data) + { + int idx = 0; + int count = 0; + int ret = 0; + unsigned long range = 0; + unsigned long sequence = 0; + rbuf_iovec_t *rvec = NULL; + struct ev_rpc *erpc = NULL; + struct rlist_iter riter = {{0,},}; + + /* dispatch NR_IOVEC IO vectors at a time. */ + + erpc = data; + RLIST_GET_SEQ (erpc->rlist, sequence, range); + + rlist_iter_init (&riter, erpc->rlist); + + rvec_for_each_entry (rvec, &riter) { + idx = count % NR_IOVEC; + if (++count == NR_IOVEC) { + erpc->vec.vector[idx] = rvec->iov; + erpc->vec.seq = sequence++; + erpc->vec.count = NR_IOVEC; + + ret = changelog_dispatch_vec (frame, this, + erpc->rpc, &erpc->vec); + if (ret) + break; + count = 0; + continue; + } + + erpc->vec.vector[idx] = rvec->iov; + } + + if (ret) + goto error_return; + + idx = count % NR_IOVEC; + if (idx) { + erpc->vec.seq = sequence; + erpc->vec.count = idx; + + ret = changelog_dispatch_vec (frame, this, + erpc->rpc, &erpc->vec); + } + + error_return: + return ret; +} + +int +changelog_rpc_notify (struct rpc_clnt *rpc, + void *mydata, rpc_clnt_event_t event, void *data) +{ + xlator_t *this = NULL; + changelog_rpc_clnt_t *crpc = NULL; + changelog_clnt_t *c_clnt = NULL; + changelog_priv_t *priv = NULL; + changelog_ev_selector_t *selection = NULL; + + crpc = mydata; + this = crpc->this; + c_clnt = crpc->c_clnt; + + priv = this->private; + + switch (event) { + case RPC_CLNT_CONNECT: + rpc_clnt_set_connected (&rpc->conn); + selection = &priv->ev_selection; + + LOCK (&c_clnt->wait_lock); + { + LOCK (&c_clnt->active_lock); + { + changelog_select_event (this, selection, + crpc->filter); + list_move_tail (&crpc->list, &c_clnt->active); + } + UNLOCK (&c_clnt->active_lock); + } + UNLOCK (&c_clnt->wait_lock); + + break; + case RPC_CLNT_DISCONNECT: + rpc_clnt_disable (crpc->rpc); + selection = &priv->ev_selection; + + LOCK (&crpc->lock); + { + changelog_deselect_event (this, selection, + crpc->filter); + changelog_set_disconnect_flag (crpc, _gf_true); + } + UNLOCK (&crpc->lock); + + break; + case RPC_CLNT_MSG: + case RPC_CLNT_DESTROY: + break; + } + + return 0; +} + +void * +changelog_ev_connector (void *data) +{ + xlator_t *this = NULL; + changelog_clnt_t *c_clnt = NULL; + changelog_rpc_clnt_t *crpc = NULL; + + c_clnt = data; + this = c_clnt->this; + + while (1) { + pthread_mutex_lock (&c_clnt->pending_lock); + { + while (list_empty (&c_clnt->pending)) + pthread_cond_wait (&c_clnt->pending_cond, + &c_clnt->pending_lock); + crpc = list_first_entry (&c_clnt->pending, + changelog_rpc_clnt_t, list); + crpc->rpc = + changelog_rpc_client_init (this, crpc, + crpc->sock, + changelog_rpc_notify); + if (!crpc->rpc) { + gf_log (this->name, GF_LOG_ERROR, "failed to " + "connect back.. <%s>", crpc->sock); + crpc->cleanup (crpc); + goto mutex_unlock; + } + + LOCK (&c_clnt->wait_lock); + { + list_move_tail (&crpc->list, &c_clnt->waitq); + } + UNLOCK (&c_clnt->wait_lock); + } + mutex_unlock: + pthread_mutex_unlock (&c_clnt->pending_lock); + } + + return NULL; +} + +void +changelog_ev_cleanup_connections (xlator_t *this, changelog_clnt_t *c_clnt) +{ + int ret = 0; + changelog_rpc_clnt_t *crpc = NULL; + + /* cleanup active connections */ + LOCK (&c_clnt->active_lock); + { + list_for_each_entry (crpc, &c_clnt->active, list) { + rpc_clnt_disable (crpc->rpc); + } + } + UNLOCK (&c_clnt->active_lock); +} + +/** + * TODO: granularize lock + * + * If we have multiple threads dispatching events, doing it this way is + * a performance bottleneck. + */ + +static inline changelog_rpc_clnt_t * +get_client (changelog_clnt_t *c_clnt, struct list_head **next) +{ + changelog_rpc_clnt_t *crpc = NULL; + + LOCK (&c_clnt->active_lock); + { + if (*next == &c_clnt->active) + goto unblock; + crpc = list_entry (*next, changelog_rpc_clnt_t, list); + changelog_rpc_clnt_ref (crpc); + *next = (*next)->next; + } + unblock: + UNLOCK (&c_clnt->active_lock); + + return crpc; +} + +static inline void +put_client (changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc) +{ + LOCK (&c_clnt->active_lock); + { + changelog_rpc_clnt_unref (crpc); + } + UNLOCK (&c_clnt->active_lock); +} + +void +_dispatcher (rbuf_list_t *rlist, void *arg) +{ + int ret = 0; + xlator_t *this = NULL; + changelog_clnt_t *c_clnt = NULL; + changelog_rpc_clnt_t *crpc = NULL; + changelog_rpc_clnt_t *tmp = NULL; + struct ev_rpc erpc = {0,}; + struct list_head *next = NULL; + + c_clnt = arg; + this = c_clnt->this; + + erpc.rlist = rlist; + next = c_clnt->active.next; + + while (1) { + crpc = get_client (c_clnt, &next); + if (!crpc) + break; + erpc.rpc = crpc->rpc; + ret = changelog_invoke_rpc (this, crpc->rpc, + &changelog_ev_program, + CHANGELOG_REV_PROC_EVENT, &erpc); + put_client (c_clnt, crpc); + } +} + +/** this is called under rotbuff's lock */ +void +sequencer (rbuf_list_t *rlist, void *mydata) +{ + unsigned long range = 0; + changelog_clnt_t *c_clnt = 0; + + c_clnt = mydata; + + range = (RLIST_ENTRY_COUNT (rlist)) / NR_IOVEC; + if ((RLIST_ENTRY_COUNT (rlist)) % NR_IOVEC) + range++; + RLIST_STORE_SEQ (rlist, c_clnt->sequence, range); + + c_clnt->sequence += range; +} + +void * +changelog_ev_dispatch (void *data) +{ + int ret = 0; + void *opaque = NULL; + xlator_t *this = NULL; + changelog_clnt_t *c_clnt = NULL; + struct timeval tv = {0,}; + + c_clnt = data; + this = c_clnt->this; + + while (1) { + /* TODO: change this to be pthread cond based.. later */ + tv.tv_sec = 1; + tv.tv_usec = 0; + select (0, NULL, NULL, NULL, &tv); + + ret = rbuf_get_buffer (c_clnt->rbuf, + &opaque, sequencer, c_clnt); + if (ret != RBUF_CONSUMABLE) { + if (ret != RBUF_EMPTY) + gf_log (this->name, GF_LOG_WARNING, + "Failed to get buffer for RPC dispatch " + "[rbuf retval: %d]", ret); + continue; + } + + ret = rbuf_wait_for_completion (c_clnt->rbuf, + opaque, _dispatcher, c_clnt); + if (ret) + gf_log (this->name, GF_LOG_WARNING, + "failed to put buffer after consumption"); + } + + return NULL; +} + +void +changelog_ev_queue_connection (changelog_clnt_t *c_clnt, + changelog_rpc_clnt_t *crpc) +{ + pthread_mutex_lock (&c_clnt->pending_lock); + { + list_add_tail (&crpc->list, &c_clnt->pending); + pthread_cond_signal (&c_clnt->pending_cond); + } + pthread_mutex_unlock (&c_clnt->pending_lock); +} + +struct rpc_clnt_procedure changelog_ev_procs[CHANGELOG_REV_PROC_MAX] = { + [CHANGELOG_REV_PROC_NULL] = {"NULL", NULL}, + [CHANGELOG_REV_PROC_EVENT] = { + "EVENT DISPATCH", changelog_event_dispatch_rpc + }, +}; + +struct rpc_clnt_program changelog_ev_program = { + .progname = "CHANGELOG EVENT DISPATCHER", + .prognum = CHANGELOG_REV_RPC_PROCNUM, + .progver = CHANGELOG_REV_RPC_PROCVER, + .numproc = CHANGELOG_REV_PROC_MAX, + .proctable = changelog_ev_procs, +}; |