diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog-ev-handle.c')
-rw-r--r-- | xlators/features/changelog/src/changelog-ev-handle.c | 382 |
1 files changed, 382 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c new file mode 100644 index 00000000000..ca7443cfd22 --- /dev/null +++ b/xlators/features/changelog/src/changelog-ev-handle.c @@ -0,0 +1,382 @@ +/* + Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include "changelog-ev-handle.h" +#include "changelog-rpc-common.h" +#include "changelog-helpers.h" + +struct rpc_clnt_program changelog_ev_program; + +#define NR_IOVEC (MAX_IOVEC - 3) +struct ev_rpc_vec { + int count; + struct iovec vector[NR_IOVEC]; + + /* sequence number */ + unsigned long seq; +}; + +struct ev_rpc { + rbuf_list_t *rlist; + struct rpc_clnt *rpc; + struct ev_rpc_vec vec; +}; + +/** + * As of now this just does the minimal (retval logging). Going further + * un-acknowledges sequence numbers can be retransmitted and other + * intelligence can be built into the server. + */ +int +changelog_event_dispatch_cbk (struct rpc_req *req, + struct iovec *iov, int count, void *myframe) +{ + return 0; +} + +/* dispatcher RPC */ +inline int +changelog_dispatch_vec (call_frame_t *frame, xlator_t *this, + struct rpc_clnt *rpc, struct ev_rpc_vec *vec) +{ + struct timeval tv = {0,}; + changelog_event_req req = {0,}; + + (void) gettimeofday (&tv, NULL); + + /** + * Event dispatch RPC header contains a sequence number for each + * dispatch. This allows the reciever to order the request before + * processing. + */ + req.seq = vec->seq; + req.tv_sec = tv.tv_sec; + req.tv_usec = tv.tv_usec; + + return changelog_rpc_sumbit_req (rpc, (void *)&req, + frame, &changelog_ev_program, + CHANGELOG_REV_PROC_EVENT, + vec->vector, vec->count, NULL, + this, changelog_event_dispatch_cbk, + (xdrproc_t) xdr_changelog_event_req); + } + + int + changelog_event_dispatch_rpc (call_frame_t *frame, xlator_t *this, void *data) + { + int idx = 0; + int count = 0; + int ret = 0; + unsigned long range = 0; + unsigned long sequence = 0; + rbuf_iovec_t *rvec = NULL; + struct ev_rpc *erpc = NULL; + struct rlist_iter riter = {{0,},}; + + /* dispatch NR_IOVEC IO vectors at a time. */ + + erpc = data; + RLIST_GET_SEQ (erpc->rlist, sequence, range); + + rlist_iter_init (&riter, erpc->rlist); + + rvec_for_each_entry (rvec, &riter) { + idx = count % NR_IOVEC; + if (++count == NR_IOVEC) { + erpc->vec.vector[idx] = rvec->iov; + erpc->vec.seq = sequence++; + erpc->vec.count = NR_IOVEC; + + ret = changelog_dispatch_vec (frame, this, + erpc->rpc, &erpc->vec); + if (ret) + break; + count = 0; + continue; + } + + erpc->vec.vector[idx] = rvec->iov; + } + + if (ret) + goto error_return; + + idx = count % NR_IOVEC; + if (idx) { + erpc->vec.seq = sequence; + erpc->vec.count = idx; + + ret = changelog_dispatch_vec (frame, this, + erpc->rpc, &erpc->vec); + } + + error_return: + return ret; +} + +int +changelog_rpc_notify (struct rpc_clnt *rpc, + void *mydata, rpc_clnt_event_t event, void *data) +{ + xlator_t *this = NULL; + changelog_rpc_clnt_t *crpc = NULL; + changelog_clnt_t *c_clnt = NULL; + changelog_priv_t *priv = NULL; + changelog_ev_selector_t *selection = NULL; + + crpc = mydata; + this = crpc->this; + c_clnt = crpc->c_clnt; + + priv = this->private; + + switch (event) { + case RPC_CLNT_CONNECT: + rpc_clnt_set_connected (&rpc->conn); + selection = &priv->ev_selection; + + LOCK (&c_clnt->wait_lock); + { + LOCK (&c_clnt->active_lock); + { + changelog_select_event (this, selection, + crpc->filter); + list_move_tail (&crpc->list, &c_clnt->active); + } + UNLOCK (&c_clnt->active_lock); + } + UNLOCK (&c_clnt->wait_lock); + + break; + case RPC_CLNT_DISCONNECT: + rpc_clnt_disable (crpc->rpc); + selection = &priv->ev_selection; + + LOCK (&crpc->lock); + { + changelog_deselect_event (this, selection, + crpc->filter); + changelog_set_disconnect_flag (crpc, _gf_true); + } + UNLOCK (&crpc->lock); + + break; + case RPC_CLNT_MSG: + case RPC_CLNT_DESTROY: + break; + } + + return 0; +} + +void * +changelog_ev_connector (void *data) +{ + xlator_t *this = NULL; + changelog_clnt_t *c_clnt = NULL; + changelog_rpc_clnt_t *crpc = NULL; + + c_clnt = data; + this = c_clnt->this; + + while (1) { + pthread_mutex_lock (&c_clnt->pending_lock); + { + while (list_empty (&c_clnt->pending)) + pthread_cond_wait (&c_clnt->pending_cond, + &c_clnt->pending_lock); + crpc = list_first_entry (&c_clnt->pending, + changelog_rpc_clnt_t, list); + crpc->rpc = + changelog_rpc_client_init (this, crpc, + crpc->sock, + changelog_rpc_notify); + if (!crpc->rpc) { + gf_log (this->name, GF_LOG_ERROR, "failed to " + "connect back.. <%s>", crpc->sock); + crpc->cleanup (crpc); + goto mutex_unlock; + } + + LOCK (&c_clnt->wait_lock); + { + list_move_tail (&crpc->list, &c_clnt->waitq); + } + UNLOCK (&c_clnt->wait_lock); + } + mutex_unlock: + pthread_mutex_unlock (&c_clnt->pending_lock); + } + + return NULL; +} + +void +changelog_ev_cleanup_connections (xlator_t *this, changelog_clnt_t *c_clnt) +{ + int ret = 0; + changelog_rpc_clnt_t *crpc = NULL; + + /* cleanup active connections */ + LOCK (&c_clnt->active_lock); + { + list_for_each_entry (crpc, &c_clnt->active, list) { + rpc_clnt_disable (crpc->rpc); + } + } + UNLOCK (&c_clnt->active_lock); +} + +/** + * TODO: granularize lock + * + * If we have multiple threads dispatching events, doing it this way is + * a performance bottleneck. + */ + +static inline changelog_rpc_clnt_t * +get_client (changelog_clnt_t *c_clnt, struct list_head **next) +{ + changelog_rpc_clnt_t *crpc = NULL; + + LOCK (&c_clnt->active_lock); + { + if (*next == &c_clnt->active) + goto unblock; + crpc = list_entry (*next, changelog_rpc_clnt_t, list); + changelog_rpc_clnt_ref (crpc); + *next = (*next)->next; + } + unblock: + UNLOCK (&c_clnt->active_lock); + + return crpc; +} + +static inline void +put_client (changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc) +{ + LOCK (&c_clnt->active_lock); + { + changelog_rpc_clnt_unref (crpc); + } + UNLOCK (&c_clnt->active_lock); +} + +void +_dispatcher (rbuf_list_t *rlist, void *arg) +{ + int ret = 0; + xlator_t *this = NULL; + changelog_clnt_t *c_clnt = NULL; + changelog_rpc_clnt_t *crpc = NULL; + changelog_rpc_clnt_t *tmp = NULL; + struct ev_rpc erpc = {0,}; + struct list_head *next = NULL; + + c_clnt = arg; + this = c_clnt->this; + + erpc.rlist = rlist; + next = c_clnt->active.next; + + while (1) { + crpc = get_client (c_clnt, &next); + if (!crpc) + break; + erpc.rpc = crpc->rpc; + ret = changelog_invoke_rpc (this, crpc->rpc, + &changelog_ev_program, + CHANGELOG_REV_PROC_EVENT, &erpc); + put_client (c_clnt, crpc); + } +} + +/** this is called under rotbuff's lock */ +void +sequencer (rbuf_list_t *rlist, void *mydata) +{ + unsigned long range = 0; + changelog_clnt_t *c_clnt = 0; + + c_clnt = mydata; + + range = (RLIST_ENTRY_COUNT (rlist)) / NR_IOVEC; + if ((RLIST_ENTRY_COUNT (rlist)) % NR_IOVEC) + range++; + RLIST_STORE_SEQ (rlist, c_clnt->sequence, range); + + c_clnt->sequence += range; +} + +void * +changelog_ev_dispatch (void *data) +{ + int ret = 0; + void *opaque = NULL; + xlator_t *this = NULL; + changelog_clnt_t *c_clnt = NULL; + struct timeval tv = {0,}; + + c_clnt = data; + this = c_clnt->this; + + while (1) { + /* TODO: change this to be pthread cond based.. later */ + tv.tv_sec = 1; + tv.tv_usec = 0; + select (0, NULL, NULL, NULL, &tv); + + ret = rbuf_get_buffer (c_clnt->rbuf, + &opaque, sequencer, c_clnt); + if (ret != RBUF_CONSUMABLE) { + if (ret != RBUF_EMPTY) + gf_log (this->name, GF_LOG_WARNING, + "Failed to get buffer for RPC dispatch " + "[rbuf retval: %d]", ret); + continue; + } + + ret = rbuf_wait_for_completion (c_clnt->rbuf, + opaque, _dispatcher, c_clnt); + if (ret) + gf_log (this->name, GF_LOG_WARNING, + "failed to put buffer after consumption"); + } + + return NULL; +} + +void +changelog_ev_queue_connection (changelog_clnt_t *c_clnt, + changelog_rpc_clnt_t *crpc) +{ + pthread_mutex_lock (&c_clnt->pending_lock); + { + list_add_tail (&crpc->list, &c_clnt->pending); + pthread_cond_signal (&c_clnt->pending_cond); + } + pthread_mutex_unlock (&c_clnt->pending_lock); +} + +struct rpc_clnt_procedure changelog_ev_procs[CHANGELOG_REV_PROC_MAX] = { + [CHANGELOG_REV_PROC_NULL] = {"NULL", NULL}, + [CHANGELOG_REV_PROC_EVENT] = { + "EVENT DISPATCH", changelog_event_dispatch_rpc + }, +}; + +struct rpc_clnt_program changelog_ev_program = { + .progname = "CHANGELOG EVENT DISPATCHER", + .prognum = CHANGELOG_REV_RPC_PROCNUM, + .progver = CHANGELOG_REV_RPC_PROCVER, + .numproc = CHANGELOG_REV_PROC_MAX, + .proctable = changelog_ev_procs, +}; |