summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-ev-handle.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/src/changelog-ev-handle.c')
-rw-r--r--xlators/features/changelog/src/changelog-ev-handle.c382
1 files changed, 382 insertions, 0 deletions
diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c
new file mode 100644
index 00000000000..ca7443cfd22
--- /dev/null
+++ b/xlators/features/changelog/src/changelog-ev-handle.c
@@ -0,0 +1,382 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "changelog-ev-handle.h"
+#include "changelog-rpc-common.h"
+#include "changelog-helpers.h"
+
+struct rpc_clnt_program changelog_ev_program;
+
+#define NR_IOVEC (MAX_IOVEC - 3)
+struct ev_rpc_vec {
+ int count;
+ struct iovec vector[NR_IOVEC];
+
+ /* sequence number */
+ unsigned long seq;
+};
+
+struct ev_rpc {
+ rbuf_list_t *rlist;
+ struct rpc_clnt *rpc;
+ struct ev_rpc_vec vec;
+};
+
+/**
+ * As of now this just does the minimal (retval logging). Going further
+ * un-acknowledges sequence numbers can be retransmitted and other
+ * intelligence can be built into the server.
+ */
+int
+changelog_event_dispatch_cbk (struct rpc_req *req,
+ struct iovec *iov, int count, void *myframe)
+{
+ return 0;
+}
+
+/* dispatcher RPC */
+inline int
+changelog_dispatch_vec (call_frame_t *frame, xlator_t *this,
+ struct rpc_clnt *rpc, struct ev_rpc_vec *vec)
+{
+ struct timeval tv = {0,};
+ changelog_event_req req = {0,};
+
+ (void) gettimeofday (&tv, NULL);
+
+ /**
+ * Event dispatch RPC header contains a sequence number for each
+ * dispatch. This allows the reciever to order the request before
+ * processing.
+ */
+ req.seq = vec->seq;
+ req.tv_sec = tv.tv_sec;
+ req.tv_usec = tv.tv_usec;
+
+ return changelog_rpc_sumbit_req (rpc, (void *)&req,
+ frame, &changelog_ev_program,
+ CHANGELOG_REV_PROC_EVENT,
+ vec->vector, vec->count, NULL,
+ this, changelog_event_dispatch_cbk,
+ (xdrproc_t) xdr_changelog_event_req);
+ }
+
+ int
+ changelog_event_dispatch_rpc (call_frame_t *frame, xlator_t *this, void *data)
+ {
+ int idx = 0;
+ int count = 0;
+ int ret = 0;
+ unsigned long range = 0;
+ unsigned long sequence = 0;
+ rbuf_iovec_t *rvec = NULL;
+ struct ev_rpc *erpc = NULL;
+ struct rlist_iter riter = {{0,},};
+
+ /* dispatch NR_IOVEC IO vectors at a time. */
+
+ erpc = data;
+ RLIST_GET_SEQ (erpc->rlist, sequence, range);
+
+ rlist_iter_init (&riter, erpc->rlist);
+
+ rvec_for_each_entry (rvec, &riter) {
+ idx = count % NR_IOVEC;
+ if (++count == NR_IOVEC) {
+ erpc->vec.vector[idx] = rvec->iov;
+ erpc->vec.seq = sequence++;
+ erpc->vec.count = NR_IOVEC;
+
+ ret = changelog_dispatch_vec (frame, this,
+ erpc->rpc, &erpc->vec);
+ if (ret)
+ break;
+ count = 0;
+ continue;
+ }
+
+ erpc->vec.vector[idx] = rvec->iov;
+ }
+
+ if (ret)
+ goto error_return;
+
+ idx = count % NR_IOVEC;
+ if (idx) {
+ erpc->vec.seq = sequence;
+ erpc->vec.count = idx;
+
+ ret = changelog_dispatch_vec (frame, this,
+ erpc->rpc, &erpc->vec);
+ }
+
+ error_return:
+ return ret;
+}
+
+int
+changelog_rpc_notify (struct rpc_clnt *rpc,
+ void *mydata, rpc_clnt_event_t event, void *data)
+{
+ xlator_t *this = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_priv_t *priv = NULL;
+ changelog_ev_selector_t *selection = NULL;
+
+ crpc = mydata;
+ this = crpc->this;
+ c_clnt = crpc->c_clnt;
+
+ priv = this->private;
+
+ switch (event) {
+ case RPC_CLNT_CONNECT:
+ rpc_clnt_set_connected (&rpc->conn);
+ selection = &priv->ev_selection;
+
+ LOCK (&c_clnt->wait_lock);
+ {
+ LOCK (&c_clnt->active_lock);
+ {
+ changelog_select_event (this, selection,
+ crpc->filter);
+ list_move_tail (&crpc->list, &c_clnt->active);
+ }
+ UNLOCK (&c_clnt->active_lock);
+ }
+ UNLOCK (&c_clnt->wait_lock);
+
+ break;
+ case RPC_CLNT_DISCONNECT:
+ rpc_clnt_disable (crpc->rpc);
+ selection = &priv->ev_selection;
+
+ LOCK (&crpc->lock);
+ {
+ changelog_deselect_event (this, selection,
+ crpc->filter);
+ changelog_set_disconnect_flag (crpc, _gf_true);
+ }
+ UNLOCK (&crpc->lock);
+
+ break;
+ case RPC_CLNT_MSG:
+ case RPC_CLNT_DESTROY:
+ break;
+ }
+
+ return 0;
+}
+
+void *
+changelog_ev_connector (void *data)
+{
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ c_clnt = data;
+ this = c_clnt->this;
+
+ while (1) {
+ pthread_mutex_lock (&c_clnt->pending_lock);
+ {
+ while (list_empty (&c_clnt->pending))
+ pthread_cond_wait (&c_clnt->pending_cond,
+ &c_clnt->pending_lock);
+ crpc = list_first_entry (&c_clnt->pending,
+ changelog_rpc_clnt_t, list);
+ crpc->rpc =
+ changelog_rpc_client_init (this, crpc,
+ crpc->sock,
+ changelog_rpc_notify);
+ if (!crpc->rpc) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to "
+ "connect back.. <%s>", crpc->sock);
+ crpc->cleanup (crpc);
+ goto mutex_unlock;
+ }
+
+ LOCK (&c_clnt->wait_lock);
+ {
+ list_move_tail (&crpc->list, &c_clnt->waitq);
+ }
+ UNLOCK (&c_clnt->wait_lock);
+ }
+ mutex_unlock:
+ pthread_mutex_unlock (&c_clnt->pending_lock);
+ }
+
+ return NULL;
+}
+
+void
+changelog_ev_cleanup_connections (xlator_t *this, changelog_clnt_t *c_clnt)
+{
+ int ret = 0;
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ /* cleanup active connections */
+ LOCK (&c_clnt->active_lock);
+ {
+ list_for_each_entry (crpc, &c_clnt->active, list) {
+ rpc_clnt_disable (crpc->rpc);
+ }
+ }
+ UNLOCK (&c_clnt->active_lock);
+}
+
+/**
+ * TODO: granularize lock
+ *
+ * If we have multiple threads dispatching events, doing it this way is
+ * a performance bottleneck.
+ */
+
+static inline changelog_rpc_clnt_t *
+get_client (changelog_clnt_t *c_clnt, struct list_head **next)
+{
+ changelog_rpc_clnt_t *crpc = NULL;
+
+ LOCK (&c_clnt->active_lock);
+ {
+ if (*next == &c_clnt->active)
+ goto unblock;
+ crpc = list_entry (*next, changelog_rpc_clnt_t, list);
+ changelog_rpc_clnt_ref (crpc);
+ *next = (*next)->next;
+ }
+ unblock:
+ UNLOCK (&c_clnt->active_lock);
+
+ return crpc;
+}
+
+static inline void
+put_client (changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc)
+{
+ LOCK (&c_clnt->active_lock);
+ {
+ changelog_rpc_clnt_unref (crpc);
+ }
+ UNLOCK (&c_clnt->active_lock);
+}
+
+void
+_dispatcher (rbuf_list_t *rlist, void *arg)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ changelog_rpc_clnt_t *crpc = NULL;
+ changelog_rpc_clnt_t *tmp = NULL;
+ struct ev_rpc erpc = {0,};
+ struct list_head *next = NULL;
+
+ c_clnt = arg;
+ this = c_clnt->this;
+
+ erpc.rlist = rlist;
+ next = c_clnt->active.next;
+
+ while (1) {
+ crpc = get_client (c_clnt, &next);
+ if (!crpc)
+ break;
+ erpc.rpc = crpc->rpc;
+ ret = changelog_invoke_rpc (this, crpc->rpc,
+ &changelog_ev_program,
+ CHANGELOG_REV_PROC_EVENT, &erpc);
+ put_client (c_clnt, crpc);
+ }
+}
+
+/** this is called under rotbuff's lock */
+void
+sequencer (rbuf_list_t *rlist, void *mydata)
+{
+ unsigned long range = 0;
+ changelog_clnt_t *c_clnt = 0;
+
+ c_clnt = mydata;
+
+ range = (RLIST_ENTRY_COUNT (rlist)) / NR_IOVEC;
+ if ((RLIST_ENTRY_COUNT (rlist)) % NR_IOVEC)
+ range++;
+ RLIST_STORE_SEQ (rlist, c_clnt->sequence, range);
+
+ c_clnt->sequence += range;
+}
+
+void *
+changelog_ev_dispatch (void *data)
+{
+ int ret = 0;
+ void *opaque = NULL;
+ xlator_t *this = NULL;
+ changelog_clnt_t *c_clnt = NULL;
+ struct timeval tv = {0,};
+
+ c_clnt = data;
+ this = c_clnt->this;
+
+ while (1) {
+ /* TODO: change this to be pthread cond based.. later */
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ select (0, NULL, NULL, NULL, &tv);
+
+ ret = rbuf_get_buffer (c_clnt->rbuf,
+ &opaque, sequencer, c_clnt);
+ if (ret != RBUF_CONSUMABLE) {
+ if (ret != RBUF_EMPTY)
+ gf_log (this->name, GF_LOG_WARNING,
+ "Failed to get buffer for RPC dispatch "
+ "[rbuf retval: %d]", ret);
+ continue;
+ }
+
+ ret = rbuf_wait_for_completion (c_clnt->rbuf,
+ opaque, _dispatcher, c_clnt);
+ if (ret)
+ gf_log (this->name, GF_LOG_WARNING,
+ "failed to put buffer after consumption");
+ }
+
+ return NULL;
+}
+
+void
+changelog_ev_queue_connection (changelog_clnt_t *c_clnt,
+ changelog_rpc_clnt_t *crpc)
+{
+ pthread_mutex_lock (&c_clnt->pending_lock);
+ {
+ list_add_tail (&crpc->list, &c_clnt->pending);
+ pthread_cond_signal (&c_clnt->pending_cond);
+ }
+ pthread_mutex_unlock (&c_clnt->pending_lock);
+}
+
+struct rpc_clnt_procedure changelog_ev_procs[CHANGELOG_REV_PROC_MAX] = {
+ [CHANGELOG_REV_PROC_NULL] = {"NULL", NULL},
+ [CHANGELOG_REV_PROC_EVENT] = {
+ "EVENT DISPATCH", changelog_event_dispatch_rpc
+ },
+};
+
+struct rpc_clnt_program changelog_ev_program = {
+ .progname = "CHANGELOG EVENT DISPATCHER",
+ .prognum = CHANGELOG_REV_RPC_PROCNUM,
+ .progver = CHANGELOG_REV_RPC_PROCVER,
+ .numproc = CHANGELOG_REV_PROC_MAX,
+ .proctable = changelog_ev_procs,
+};