summaryrefslogtreecommitdiffstats
path: root/xlators/experimental/nsr-server
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/experimental/nsr-server')
-rw-r--r--xlators/experimental/nsr-server/Makefile.am3
-rw-r--r--xlators/experimental/nsr-server/src/Makefile.am35
-rw-r--r--xlators/experimental/nsr-server/src/all-templates.c429
-rwxr-xr-xxlators/experimental/nsr-server/src/gen-fops.py138
-rw-r--r--xlators/experimental/nsr-server/src/nsr-internal.h114
-rw-r--r--xlators/experimental/nsr-server/src/nsr.c1066
6 files changed, 1785 insertions, 0 deletions
diff --git a/xlators/experimental/nsr-server/Makefile.am b/xlators/experimental/nsr-server/Makefile.am
new file mode 100644
index 00000000000..a985f42a877
--- /dev/null
+++ b/xlators/experimental/nsr-server/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/experimental/nsr-server/src/Makefile.am b/xlators/experimental/nsr-server/src/Makefile.am
new file mode 100644
index 00000000000..6c0597610a2
--- /dev/null
+++ b/xlators/experimental/nsr-server/src/Makefile.am
@@ -0,0 +1,35 @@
+xlator_LTLIBRARIES = nsr.la
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/experimental
+
+nodist_nsr_la_SOURCES = nsr-cg.c
+CLEANFILES = $(nodist_nsr_la_SOURCES)
+
+nsr_la_LDFLAGS = -module -avoid-version
+nsr_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/api/src/libgfapi.la
+
+noinst_HEADERS = nsr-internal.h \
+ $(top_srcdir)/xlators/lib/src/libxlator.h \
+ $(top_srcdir)/glusterfsd/src/glusterfsd.h
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) \
+ -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/xlators/lib/src \
+ -I$(top_srcdir)/rpc/rpc-lib/src -DSBIN_DIR=\"$(sbindir)\" \
+ -I$(top_srcdir)/api/src -DNSR_SCRIPT_PREFIX=\"$(nsrdir)\" \
+ -I$(top_srcdir)/xlators/experimental/nsr-client/src/
+
+AM_CFLAGS = -Wall $(GF_CFLAGS)
+
+NSR_PREFIX = $(top_srcdir)/xlators/experimental/nsr-server/src
+NSR_GEN_FOPS = $(NSR_PREFIX)/gen-fops.py
+NSR_TEMPLATES = $(NSR_PREFIX)/all-templates.c
+NSR_WRAPPER = $(NSR_PREFIX)/nsr.c
+noinst_PYTHON = $(NSR_GEN_FOPS)
+EXTRA_DIST = $(NSR_TEMPLATES) $(NSR_WRAPPER)
+
+nsr-cg.c: $(NSR_GEN_FOPS) $(NSR_TEMPLATES) $(NSR_WRAPPER)
+ $(PYTHON) $(NSR_GEN_FOPS) $(NSR_TEMPLATES) $(NSR_WRAPPER) > $@
+
+uninstall-local:
+ rm -f $(DESTDIR)$(xlatordir)/nsr.so
diff --git a/xlators/experimental/nsr-server/src/all-templates.c b/xlators/experimental/nsr-server/src/all-templates.c
new file mode 100644
index 00000000000..300abea959d
--- /dev/null
+++ b/xlators/experimental/nsr-server/src/all-templates.c
@@ -0,0 +1,429 @@
+/*
+ * You can put anything here - it doesn't even have to be a comment - and it
+ * will be ignored until we reach the first template-name comment.
+ */
+
+
+/* template-name read-fop */
+int32_t
+nsr_@NAME@ (call_frame_t *frame, xlator_t *this,
+ @LONG_ARGS@)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ /* allow reads during reconciliation *
+ * TBD: allow "dirty" reads on non-leaders *
+ */
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_@NAME@_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@,
+ @SHORT_ARGS@);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (@NAME@, frame, -1, EREMOTE,
+ @ERROR_ARGS@);
+ return 0;
+}
+
+/* template-name read-dispatch */
+/* No "dispatch" function needed for @NAME@ */
+
+/* template-name read-fan-in */
+/* No "fan-in" function needed for @NAME@ */
+
+/* template-name read-continue */
+/* No "continue" function needed for @NAME@ */
+
+/* template-name read-complete */
+/* No "complete" function needed for @NAME@ */
+
+/* template-name write-fop */
+int32_t
+nsr_@NAME@ (call_frame_t *frame, xlator_t *this,
+ @LONG_ARGS@)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ gf_boolean_t result = _gf_false;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+ uint32_t ti = 0;
+
+ /*
+ * Our first goal here is to avoid "split brain surprise" for users who
+ * specify exactly 50% with two- or three-way replication. That means
+ * either a more-than check against half the total replicas or an
+ * at-least check against half of our peers (one less). Of the two,
+ * only an at-least check supports the intuitive use of 100% to mean
+ * all replicas must be present, because "more than 100%" will never
+ * succeed regardless of which count we use. This leaves us with a
+ * slightly non-traditional definition of quorum ("at least X% of peers
+ * not including ourselves") but one that's useful enough to be worth
+ * it.
+ *
+ * Note that n_children and up_children *do* include the local
+ * subvolume, so we need to subtract one in each case.
+ */
+ if (priv->leader) {
+ result = fop_quorum_check (this, (double)(priv->n_children - 1),
+ (double)(priv->up_children - 1));
+
+ if (result == _gf_false) {
+ /* Emulate the AFR client-side-quorum behavior. */
+ op_errno = EROFS;
+ goto err;
+ }
+ } else {
+ if (xdata) {
+ from_leader = !!dict_get(xdata, NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata, RECON_TERM_XATTR)
+ && !!dict_get(xdata, RECON_INDEX_XATTR);
+ } else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ /* follower/recon path *
+ * just send it to local node *
+ */
+ if (!from_leader && !from_recon) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+ }
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if defined(NSR_CG_NEED_FD)
+ local->fd = fd_ref(fd);
+#else
+ local->fd = NULL;
+#endif
+ INIT_LIST_HEAD(&local->qlinks);
+ frame->local = local;
+
+ /*
+ * If we let it through despite not being the leader, then we just want
+ * to pass it on down without all of the additional xattrs, queuing, and
+ * so on. However, nsr_*_complete does depend on the initialization
+ * immediately above this.
+ */
+ if (!priv->leader) {
+ STACK_WIND (frame, nsr_@NAME@_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@,
+ @SHORT_ARGS@);
+ return 0;
+ }
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_msg (this->name, GF_LOG_ERROR, ENOMEM,
+ N_MSG_MEM_ERR, "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata, NSR_TERM_XATTR, priv->current_term) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR, "failed to set nsr-term");
+ goto err;
+ }
+
+ LOCK(&priv->index_lock);
+ ti = ++(priv->index);
+ UNLOCK(&priv->index_lock);
+ if (dict_set_int32(xdata, NSR_INDEX_XATTR, ti) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR, "failed to set index");
+ goto err;
+ }
+
+ local->stub = fop_@NAME@_stub (frame, nsr_@NAME@_continue,
+ @SHORT_ARGS@);
+ if (!local->stub) {
+ goto err;
+ }
+
+
+#if defined(NSR_CG_QUEUE)
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this, fd->inode);
+
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ LOCK(&ictx->lock);
+ if (ictx->active) {
+ gf_msg_debug (this->name, 0,
+ "queuing request due to conflict");
+ /*
+ * TBD: enqueue only for real conflict
+ *
+ * Currently we just act like all writes are in
+ * conflict with one another. What we should really do
+ * is check the active/pending queues and defer only if
+ * there's a conflict there.
+ *
+ * It's important to check the pending queue because we
+ * might have an active request X which conflicts with
+ * a pending request Y, and this request Z might
+ * conflict with Y but not X. If we checked only the
+ * active queue then Z could jump ahead of Y, which
+ * would be incorrect.
+ */
+ local->qstub = fop_@NAME@_stub (frame,
+ nsr_@NAME@_dispatch,
+ @SHORT_ARGS@);
+ if (!local->qstub) {
+ UNLOCK(&ictx->lock);
+ goto err;
+ }
+ list_add_tail(&local->qlinks, &ictx->pqueue);
+ ++(ictx->pending);
+ UNLOCK(&ictx->lock);
+ return 0;
+ } else {
+ list_add_tail(&local->qlinks, &ictx->aqueue);
+ ++(ictx->active);
+ }
+ UNLOCK(&ictx->lock);
+#endif
+
+ return nsr_@NAME@_dispatch (frame, this, @SHORT_ARGS@);
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->qstub) {
+ call_stub_destroy(local->qstub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (@NAME@, frame, -1, op_errno,
+ @ERROR_ARGS@);
+ return 0;
+}
+
+/* template-name write-dispatch */
+int32_t
+nsr_@NAME@_dispatch (call_frame_t *frame, xlator_t *this,
+ @LONG_ARGS@)
+{
+ nsr_local_t *local = frame->local;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+
+ /*
+ * TBD: unblock pending request(s) if we fail after this point but
+ * before we get to nsr_@NAME@_complete (where that code currently
+ * resides).
+ */
+
+ local->call_count = priv->n_children - 1;
+ local->successful_acks = 0;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_@NAME@_fan_in,
+ trav->xlator, trav->xlator->fops->@NAME@,
+ @SHORT_ARGS@);
+ }
+
+ /* TBD: variable Issue count */
+ return 0;
+}
+
+/* template-name write-fan-in */
+int32_t
+nsr_@NAME@_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ @LONG_ARGS@)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_msg_trace (this->name, 0, "op_ret = %d, op_errno = %d\n",
+ op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ if (op_ret != -1) {
+ /* Increment the number of successful acks *
+ * received for the operation. *
+ */
+ (local->successful_acks)++;
+ local->successful_op_ret = op_ret;
+ }
+ gf_msg_debug (this->name, 0, "succ_acks = %d, op_ret = %d, op_errno = %d\n",
+ op_ret, op_errno, local->successful_acks);
+ UNLOCK(&frame->lock);
+
+ /* TBD: variable Completion count */
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+/* template-name write-continue */
+int32_t
+nsr_@NAME@_continue (call_frame_t *frame, xlator_t *this,
+ @LONG_ARGS@)
+{
+ int32_t ret = -1;
+ gf_boolean_t result = _gf_false;
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("nsr", this, out);
+ GF_VALIDATE_OR_GOTO (this->name, frame, out);
+ priv = this->private;
+ local = frame->local;
+ GF_VALIDATE_OR_GOTO (this->name, priv, out);
+ GF_VALIDATE_OR_GOTO (this->name, local, out);
+
+ /* Perform quorum check to see if the leader needs *
+ * to perform the operation. If the operation will not *
+ * meet quorum irrespective of the leader's result *
+ * there is no point in the leader performing the fop *
+ */
+ result = fop_quorum_check (this, (double)priv->n_children,
+ (double)local->successful_acks + 1);
+ if (result == _gf_false) {
+ STACK_UNWIND_STRICT (@NAME@, frame, -1, EROFS,
+ @ERROR_ARGS@);
+ } else {
+ STACK_WIND (frame, nsr_@NAME@_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@,
+ @SHORT_ARGS@);
+ }
+
+ ret = 0;
+out:
+ return ret;
+}
+
+/* template-name write-complete */
+int32_t
+nsr_@NAME@_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ @LONG_ARGS@)
+{
+ gf_boolean_t result = _gf_false;
+ nsr_private_t *priv = this->private;
+
+ nsr_local_t *local = frame->local;
+
+ /* If the fop failed on the leader, then reduce one succesful ack
+ * before calculating the fop quorum
+ */
+ LOCK(&frame->lock);
+ if (op_ret == -1)
+ (local->successful_acks)--;
+ UNLOCK(&frame->lock);
+
+#if defined(NSR_CG_QUEUE)
+ nsr_inode_ctx_t *ictx;
+ nsr_local_t *next;
+
+ if (local->qlinks.next != &local->qlinks) {
+ list_del(&local->qlinks);
+ ictx = nsr_get_inode_ctx(this, local->fd->inode);
+ if (ictx) {
+ LOCK(&ictx->lock);
+ if (ictx->pending) {
+ /*
+ * TBD: dequeue *all* non-conflicting
+ * reqs
+ *
+ * With the stub implementation there
+ * can only be one request active at a
+ * time (zero here) so it's not an
+ * issue. In a real implementation
+ * there might still be other active
+ * requests to check against, and
+ * multiple pending requests that could
+ * continue.
+ */
+ gf_msg_debug (this->name, 0,
+ "unblocking next request");
+ --(ictx->pending);
+ next = list_entry (ictx->pqueue.next,
+ nsr_local_t, qlinks);
+ list_del(&next->qlinks);
+ list_add_tail(&next->qlinks,
+ &ictx->aqueue);
+ call_resume(next->qstub);
+ } else {
+ --(ictx->active);
+ }
+ UNLOCK(&ictx->lock);
+ }
+ }
+#endif
+
+#if defined(NSR_CG_FSYNC)
+ nsr_mark_fd_dirty(this, local);
+#endif
+
+#if defined(NSR_CG_NEED_FD)
+ fd_unref(local->fd);
+#endif
+
+ /* After the leader completes the fop, a quorum check is *
+ * performed, taking into account the outcome of the fop *
+ * on the leader. Irrespective of the fop being successful *
+ * or failing on the leader, the result of the quorum will *
+ * determine if the overall fop is successful or not. For *
+ * example, a fop might have succeeded on every node except *
+ * the leader, in which case as quorum is being met, the fop *
+ * will be treated as a successful fop, even though it failed *
+ * on the leader. On follower nodes, no quorum check should *
+ * be done, and the result is returned to the leader as is. *
+ */
+ if (priv->leader) {
+ result = fop_quorum_check (this, (double)priv->n_children,
+ (double)local->successful_acks + 1);
+ if (result == _gf_false) {
+ op_ret = -1;
+ op_errno = EROFS;
+ gf_msg_debug (this->name, 0,
+ "Quorum is not met. The operation has failed.");
+ } else {
+#if defined(NSR_CG_NEED_FD)
+ op_ret = local->successful_op_ret;
+#else
+ op_ret = 0;
+#endif
+ op_errno = 0;
+ gf_msg_debug (this->name, 0,
+ "Quorum has met. The operation has succeeded.");
+ }
+ }
+
+ STACK_UNWIND_STRICT (@NAME@, frame, op_ret, op_errno,
+ @SHORT_ARGS@);
+
+
+ return 0;
+
+}
diff --git a/xlators/experimental/nsr-server/src/gen-fops.py b/xlators/experimental/nsr-server/src/gen-fops.py
new file mode 100755
index 00000000000..336b218a8fb
--- /dev/null
+++ b/xlators/experimental/nsr-server/src/gen-fops.py
@@ -0,0 +1,138 @@
+#!/usr/bin/python
+
+# This script generates the boilerplate versions of most fops and cbks in the
+# server. This allows the details of leadership-status checking, sequencing
+# between leader and followers (including fan-out), and basic error checking
+# to be centralized one place, with per-operation code kept to a minimum.
+
+import os
+import re
+import string
+import sys
+
+curdir = os.path.dirname(sys.argv[0])
+gendir = os.path.join(curdir,'../../../../libglusterfs/src')
+sys.path.append(gendir)
+from generator import ops, fop_subs, cbk_subs, generate
+
+# We really want the callback argument list, even when we're generating fop
+# code, so we propagate here.
+# TBD: this should probably be right in generate.py
+for k, v in cbk_subs.iteritems():
+ fop_subs[k]['@ERROR_ARGS@'] = v['@ERROR_ARGS@']
+
+# Stolen from old codegen.py
+def load_templates (path):
+ templates = {}
+ tmpl_re = re.compile("/\* template-name (.*) \*/")
+ templates = {}
+ t_name = None
+ for line in open(path,"r").readlines():
+ if not line:
+ break
+ m = tmpl_re.match(line)
+ if m:
+ if t_name:
+ templates[t_name] = string.join(t_contents,'')
+ t_name = m.group(1).strip()
+ t_contents = []
+ elif t_name:
+ t_contents.append(line)
+ if t_name:
+ templates[t_name] = string.join(t_contents,'')
+ return templates
+
+# We need two types of templates. The first, for pure read operations, just
+# needs to do a simple am-i-leader check (augmented to allow dirty reads).
+# The second, for pure writes, needs to do fan-out to followers between those
+# initial checks and local execution. There are other operations that don't
+# fit neatly into either category - e.g. lock ops or fsync - so we'll just have
+# to handle those manually. The table thus includes entries only for those we
+# can categorize. The special cases, plus any new operations we've never even
+# heard of, aren't in there.
+#
+# Various keywords can be used to define/undefine preprocessor symbols used
+# in the templates, on a per-function basis. For example, if the keyword here
+# is "fsync" (lowercase word or abbreviation) that will cause NSR_CG_FSYNC
+# (prefix plus uppercase version) to be defined above all of the generated code
+# for that fop.
+
+fop_table = {
+ "access": "read",
+ "create": "write",
+ "discard": "write",
+# "entrylk": "read",
+ "fallocate": "write",
+# "fentrylk": "read",
+ "fgetxattr": "read",
+# "finodelk": "read",
+# "flush": "read",
+ "fremovexattr": "write",
+ "fsetattr": "write",
+ "fsetxattr": "write",
+ "fstat": "read",
+# "fsync": "read",
+# "fsyncdir": "read",
+ "ftruncate": "write",
+ "fxattrop": "write",
+ "getxattr": "read",
+# "inodelk": "read",
+ "link": "write",
+# "lk": "read",
+# "lookup": "read",
+ "mkdir": "write",
+ "mknod": "write",
+ "open": "write",
+ "opendir": "read",
+ "rchecksum": "read",
+ "readdir": "read",
+ "readdirp": "read",
+ "readlink": "read",
+ "readv": "read",
+ "removexattr": "write",
+ "rename": "write",
+ "rmdir": "write",
+ "setattr": "write",
+ "setxattr": "write",
+ "stat": "read",
+ "statfs": "read",
+ "symlink": "write",
+ "truncate": "write",
+ "unlink": "write",
+ "writev": "write,fsync,queue",
+ "xattrop": "write",
+}
+
+# Stolen from gen_fdl.py
+def gen_server (templates):
+ fops_done = []
+ for name in fop_table.keys():
+ info = fop_table[name].split(",")
+ kind = info[0]
+ flags = info[1:]
+ if ("fsync" in flags) or ("queue" in flags):
+ flags.append("need_fd")
+ for fname in flags:
+ print "#define NSR_CG_%s" % fname.upper()
+ print generate(templates[kind+"-complete"],name,cbk_subs)
+ print generate(templates[kind+"-continue"],name,fop_subs)
+ print generate(templates[kind+"-fan-in"],name,cbk_subs)
+ print generate(templates[kind+"-dispatch"],name,fop_subs)
+ print generate(templates[kind+"-fop"],name,fop_subs)
+ for fname in flags:
+ print "#undef NSR_CG_%s" % fname.upper()
+ fops_done.append(name)
+ # Just for fun, emit the fops table too.
+ print("struct xlator_fops fops = {")
+ for x in fops_done:
+ print(" .%s = nsr_%s,"%(x,x))
+ print("};")
+
+tmpl = load_templates(sys.argv[1])
+for l in open(sys.argv[2],'r').readlines():
+ if l.find('#pragma generate') != -1:
+ print "/* BEGIN GENERATED CODE - DO NOT MODIFY */"
+ gen_server(tmpl)
+ print "/* END GENERATED CODE */"
+ else:
+ print l[:-1]
diff --git a/xlators/experimental/nsr-server/src/nsr-internal.h b/xlators/experimental/nsr-server/src/nsr-internal.h
new file mode 100644
index 00000000000..b8c7fc314b7
--- /dev/null
+++ b/xlators/experimental/nsr-server/src/nsr-internal.h
@@ -0,0 +1,114 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#define LEADER_XATTR "user.nsr.leader"
+#define SECOND_CHILD(xl) (xl->children->next->xlator)
+#define RECONCILER_PATH NSR_SCRIPT_PREFIX"/reconciler.py"
+#define CHANGELOG_ENTRY_SIZE 128
+
+enum {
+ gf_mt_nsr_private_t = gf_common_mt_end + 1,
+ gf_mt_nsr_fd_ctx_t,
+ gf_mt_nsr_inode_ctx_t,
+ gf_mt_nsr_dirty_t,
+ gf_mt_nsr_end
+};
+
+typedef enum nsr_recon_notify_ev_id_t {
+ NSR_RECON_SET_LEADER = 1,
+ NSR_RECON_ADD_CHILD = 2
+} nsr_recon_notify_ev_id_t;
+
+typedef struct _nsr_recon_notify_ev_s {
+ nsr_recon_notify_ev_id_t id;
+ uint32_t index; /* in case of add */
+ struct list_head list;
+} nsr_recon_notify_ev_t;
+
+typedef struct {
+ /*
+ * This is a hack to allow a non-leader to accept requests while the
+ * leader is down, and it only works for n=2. The way it works is that
+ * "config_leader" indicates the state from our options (via init or
+ * reconfigure) but "leader" is what the fop code actually looks at. If
+ * config_leader is true, then leader will *always* be true as well,
+ * giving that brick precedence. If config_leader is false, then
+ * leader will only be true if there is no connection to the other
+ * brick (tracked in nsr_notify).
+ *
+ * TBD: implement real leader election
+ */
+ gf_boolean_t config_leader;
+ gf_boolean_t leader;
+ uint8_t up_children;
+ uint8_t n_children;
+ char *vol_file;
+ uint32_t current_term;
+ uint32_t kid_state;
+ gf_lock_t dirty_lock;
+ struct list_head dirty_fds;
+ uint32_t index;
+ gf_lock_t index_lock;
+ double quorum_pct;
+ int term_fd;
+ long term_total;
+ long term_read;
+ /*
+ * This is a super-duper hack, but it will do for now. The reason it's
+ * a hack is that we pass this to dict_set_static_bin, so we don't have
+ * to mess around with allocating and freeing it on every single IPC
+ * request, but it's totally not thread-safe. On the other hand, there
+ * should only be one reconciliation thread running and calling these
+ * functions at a time, so maybe that doesn't matter.
+ *
+ * TBD: re-evaluate how to manage this
+ */
+ char term_buf[CHANGELOG_ENTRY_SIZE];
+} nsr_private_t;
+
+typedef struct {
+ call_stub_t *stub;
+ call_stub_t *qstub;
+ uint32_t call_count;
+ uint32_t successful_acks;
+ uint32_t successful_op_ret;
+ fd_t *fd;
+ struct list_head qlinks;
+} nsr_local_t;
+
+/*
+ * This should match whatever changelog returns on the pre-op for us to pass
+ * when we're ready for our post-op.
+ */
+typedef uint32_t log_id_t;
+
+typedef struct {
+ struct list_head links;
+ log_id_t id;
+} nsr_dirty_list_t;
+
+typedef struct {
+ fd_t *fd;
+ struct list_head dirty_list;
+ struct list_head fd_list;
+} nsr_fd_ctx_t;
+
+typedef struct {
+ gf_lock_t lock;
+ uint32_t active;
+ struct list_head aqueue;
+ uint32_t pending;
+ struct list_head pqueue;
+} nsr_inode_ctx_t;
+
+void nsr_start_reconciler (xlator_t *this);
diff --git a/xlators/experimental/nsr-server/src/nsr.c b/xlators/experimental/nsr-server/src/nsr.c
new file mode 100644
index 00000000000..0c494b78125
--- /dev/null
+++ b/xlators/experimental/nsr-server/src/nsr.c
@@ -0,0 +1,1066 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include <fnmatch.h>
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+#include "glfs.h"
+#include "glfs-internal.h"
+#include "run.h"
+#include "common-utils.h"
+#include "syncop.h"
+#include "syscall.h"
+
+#include "nsr-internal.h"
+#include "nsr-messages.h"
+
+#define NSR_FLUSH_INTERVAL 5
+
+enum {
+ /* echo "cluster/nsr-server" | md5sum | cut -c 1-8 */
+ NSR_SERVER_IPC_BASE = 0x0e2d66a5,
+ NSR_SERVER_TERM_RANGE,
+ NSR_SERVER_OPEN_TERM,
+ NSR_SERVER_NEXT_ENTRY
+};
+
+/* Used to check the quorum of acks received after the fop
+ * confirming the status of the fop on all the brick processes
+ * for this particular subvolume
+ */
+gf_boolean_t
+fop_quorum_check (xlator_t *this, double n_children,
+ double current_state)
+{
+ nsr_private_t *priv = NULL;
+ gf_boolean_t result = _gf_false;
+ double required = 0;
+ double current = 0;
+
+ GF_VALIDATE_OR_GOTO ("nsr", this, out);
+ priv = this->private;
+ GF_VALIDATE_OR_GOTO (this->name, priv, out);
+
+ required = n_children * priv->quorum_pct;
+
+ /*
+ * Before performing the fop on the leader, we need to check,
+ * if there is any merit in performing the fop on the leader.
+ * In a case, where even a successful write on the leader, will
+ * not meet quorum, there is no point in trying the fop on the
+ * leader.
+ * When this function is called after the leader has tried
+ * performing the fop, this check will calculate quorum taking into
+ * account the status of the fop on the leader. If the leader's
+ * op_ret was -1, the complete function would account that by
+ * decrementing successful_acks by 1
+ */
+
+ current = current_state * 100.0;
+
+ if (current < required) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_QUORUM_NOT_MET,
+ "Quorum not met. quorum_pct = %f "
+ "Current State = %f, Required State = %f",
+ priv->quorum_pct, current,
+ required);
+ } else
+ result = _gf_true;
+
+out:
+ return result;
+}
+
+nsr_inode_ctx_t *
+nsr_get_inode_ctx (xlator_t *this, inode_t *inode)
+{
+ uint64_t ctx_int = 0LL;
+ nsr_inode_ctx_t *ctx_ptr;
+
+ if (__inode_ctx_get(inode, this, &ctx_int) == 0) {
+ ctx_ptr = (nsr_inode_ctx_t *)(long)ctx_int;
+ } else {
+ ctx_ptr = GF_CALLOC (1, sizeof(*ctx_ptr),
+ gf_mt_nsr_inode_ctx_t);
+ if (ctx_ptr) {
+ ctx_int = (uint64_t)(long)ctx_ptr;
+ if (__inode_ctx_set(inode, this, &ctx_int) == 0) {
+ LOCK_INIT(&ctx_ptr->lock);
+ INIT_LIST_HEAD(&ctx_ptr->aqueue);
+ INIT_LIST_HEAD(&ctx_ptr->pqueue);
+ } else {
+ GF_FREE(ctx_ptr);
+ ctx_ptr = NULL;
+ }
+ }
+
+ }
+
+ return ctx_ptr;
+}
+
+nsr_fd_ctx_t *
+nsr_get_fd_ctx (xlator_t *this, fd_t *fd)
+{
+ uint64_t ctx_int = 0LL;
+ nsr_fd_ctx_t *ctx_ptr;
+
+ if (__fd_ctx_get(fd, this, &ctx_int) == 0) {
+ ctx_ptr = (nsr_fd_ctx_t *)(long)ctx_int;
+ } else {
+ ctx_ptr = GF_CALLOC (1, sizeof(*ctx_ptr), gf_mt_nsr_fd_ctx_t);
+ if (ctx_ptr) {
+ if (__fd_ctx_set(fd, this, (uint64_t)ctx_ptr) == 0) {
+ INIT_LIST_HEAD(&ctx_ptr->dirty_list);
+ INIT_LIST_HEAD(&ctx_ptr->fd_list);
+ } else {
+ GF_FREE(ctx_ptr);
+ ctx_ptr = NULL;
+ }
+ }
+
+ }
+
+ return ctx_ptr;
+}
+
+void
+nsr_mark_fd_dirty (xlator_t *this, nsr_local_t *local)
+{
+ fd_t *fd = local->fd;
+ nsr_fd_ctx_t *ctx_ptr;
+ nsr_dirty_list_t *dirty;
+ nsr_private_t *priv = this->private;
+
+ /*
+ * TBD: don't do any of this for O_SYNC/O_DIRECT writes.
+ * Unfortunately, that optimization requires that we distinguish
+ * between writev and other "write" calls, saving the original flags
+ * and checking them in the callback. Too much work for too little
+ * gain right now.
+ */
+
+ LOCK(&fd->lock);
+ ctx_ptr = nsr_get_fd_ctx(this, fd);
+ dirty = GF_CALLOC(1, sizeof(*dirty), gf_mt_nsr_dirty_t);
+ if (ctx_ptr && dirty) {
+ gf_msg_trace (this->name, 0,
+ "marking fd %p as dirty (%p)", fd, dirty);
+ /* TBD: fill dirty->id from what changelog gave us */
+ list_add_tail(&dirty->links, &ctx_ptr->dirty_list);
+ if (list_empty(&ctx_ptr->fd_list)) {
+ /* Add a ref so _release doesn't get called. */
+ ctx_ptr->fd = fd_ref(fd);
+ LOCK(&priv->dirty_lock);
+ list_add_tail (&ctx_ptr->fd_list,
+ &priv->dirty_fds);
+ UNLOCK(&priv->dirty_lock);
+ }
+ } else {
+ gf_msg (this->name, GF_LOG_ERROR, ENOMEM,
+ N_MSG_MEM_ERR, "could not mark %p dirty", fd);
+ if (ctx_ptr) {
+ GF_FREE(ctx_ptr);
+ }
+ if (dirty) {
+ GF_FREE(dirty);
+ }
+ }
+ UNLOCK(&fd->lock);
+}
+
+#define NSR_TERM_XATTR "trusted.nsr.term"
+#define NSR_INDEX_XATTR "trusted.nsr.index"
+#define NSR_REP_COUNT_XATTR "trusted.nsr.rep-count"
+#define RECON_TERM_XATTR "trusted.nsr.recon-term"
+#define RECON_INDEX_XATTR "trusted.nsr.recon-index"
+
+#pragma generate
+
+uint8_t
+nsr_count_up_kids (nsr_private_t *priv)
+{
+ uint8_t retval = 0;
+ uint8_t i;
+
+ for (i = 0; i < priv->n_children; ++i) {
+ if (priv->kid_state & (1 << i)) {
+ ++retval;
+ }
+ }
+
+ return retval;
+}
+
+/*
+ * The fsync machinery looks a lot like that for any write call, but there are
+ * some important differences that are easy to miss. First, we don't care
+ * about the xdata that shows whether the call came from a leader or
+ * reconciliation process. If we're the leader we fan out; if we're not we
+ * don't. Second, we don't wait for followers before we issue the local call.
+ * The code generation system could be updated to handle this, and still might
+ * if we need to implement other "almost identical" paths (e.g. for open), but
+ * a copy is more readable as long as it's just one.
+ */
+
+int32_t
+nsr_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ nsr_local_t *local = frame->local;
+ gf_boolean_t unwind;
+
+ LOCK(&frame->lock);
+ unwind = !--(local->call_count);
+ UNLOCK(&frame->lock);
+
+ if (unwind) {
+ STACK_UNWIND_STRICT (fsync, frame, op_ret, op_errno, prebuf,
+ postbuf, xdata);
+ }
+ return 0;
+}
+
+int32_t
+nsr_fsync_local_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ nsr_dirty_list_t *dirty;
+ nsr_dirty_list_t *dtmp;
+ nsr_local_t *local = frame->local;
+
+ list_for_each_entry_safe (dirty, dtmp, &local->qlinks, links) {
+ gf_msg_trace (this->name, 0,
+ "sending post-op on %p (%p)", local->fd, dirty);
+ GF_FREE(dirty);
+ }
+
+ return nsr_fsync_cbk (frame, cookie, this, op_ret, op_errno,
+ prebuf, postbuf, xdata);
+}
+
+int32_t
+nsr_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,
+ dict_t *xdata)
+{
+ nsr_private_t *priv = this->private;
+ nsr_local_t *local;
+ uint64_t ctx_int = 0LL;
+ nsr_fd_ctx_t *ctx_ptr;
+ xlator_list_t *trav;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ STACK_UNWIND_STRICT(fsync, frame, -1, ENOMEM,
+ NULL, NULL, xdata);
+ return 0;
+ }
+ INIT_LIST_HEAD(&local->qlinks);
+ frame->local = local;
+
+ /* Move the dirty list from the fd to the fsync request. */
+ LOCK(&fd->lock);
+ if (__fd_ctx_get(fd, this, &ctx_int) == 0) {
+ ctx_ptr = (nsr_fd_ctx_t *)(long)ctx_int;
+ list_splice_init (&ctx_ptr->dirty_list,
+ &local->qlinks);
+ }
+ UNLOCK(&fd->lock);
+
+ /* Issue the local call. */
+ local->call_count = priv->leader ? priv->n_children : 1;
+ STACK_WIND (frame, nsr_fsync_local_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync,
+ fd, flags, xdata);
+
+ /* Issue remote calls if we're the leader. */
+ if (priv->leader) {
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_fsync_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsync,
+ fd, flags, xdata);
+ }
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ const char *name, dict_t *xdata)
+{
+ dict_t *result;
+ nsr_private_t *priv = this->private;
+
+ if (!priv->leader) {
+ STACK_UNWIND_STRICT (getxattr, frame, -1, EREMOTE, NULL, NULL);
+ return 0;
+ }
+
+ if (!name || (strcmp(name, NSR_REP_COUNT_XATTR) != 0)) {
+ STACK_WIND_TAIL (frame,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->getxattr,
+ loc, name, xdata);
+ return 0;
+ }
+
+ result = dict_new();
+ if (!result) {
+ goto dn_failed;
+ }
+
+ priv->up_children = nsr_count_up_kids(this->private);
+ if (dict_set_uint32(result, NSR_REP_COUNT_XATTR,
+ priv->up_children) != 0) {
+ goto dsu_failed;
+ }
+
+ STACK_UNWIND_STRICT (getxattr, frame, 0, 0, result, NULL);
+ dict_destroy(result);
+ return 0;
+
+dsu_failed:
+ dict_destroy(result);
+dn_failed:
+ STACK_UNWIND_STRICT (getxattr, frame, -1, ENOMEM, NULL, NULL);
+ return 0;
+}
+
+void
+nsr_flush_fd (xlator_t *this, nsr_fd_ctx_t *fd_ctx)
+{
+ nsr_dirty_list_t *dirty;
+ nsr_dirty_list_t *dtmp;
+
+ list_for_each_entry_safe (dirty, dtmp, &fd_ctx->dirty_list, links) {
+ gf_msg_trace (this->name, 0,
+ "sending post-op on %p (%p)", fd_ctx->fd, dirty);
+ GF_FREE(dirty);
+ }
+
+ INIT_LIST_HEAD(&fd_ctx->dirty_list);
+}
+
+void *
+nsr_flush_thread (void *ctx)
+{
+ xlator_t *this = ctx;
+ nsr_private_t *priv = this->private;
+ struct list_head dirty_fds;
+ nsr_fd_ctx_t *fd_ctx;
+ nsr_fd_ctx_t *fd_tmp;
+ int ret;
+
+ for (;;) {
+ /*
+ * We have to be very careful to avoid lock inversions here, so
+ * we can't just hold priv->dirty_lock while we take and
+ * release locks for each fd. Instead, we only hold dirty_lock
+ * at the beginning of each iteration, as we (effectively) make
+ * a copy of the current list head and then clear the original.
+ * This leads to four scenarios for adding the first entry to
+ * an fd and potentially putting it on the global list.
+ *
+ * (1) While we're asleep. No lock contention, it just gets
+ * added and will be processed on the next iteration.
+ *
+ * (2) After we've made a local copy, but before we've started
+ * processing that fd. The new entry will be added to the
+ * fd (under its lock), and we'll process it on the current
+ * iteration.
+ *
+ * (3) While we're processing the fd. They'll block on the fd
+ * lock, then see that the list is empty and put it on the
+ * global list. We'll process it here on the next
+ * iteration.
+ *
+ * (4) While we're working, but after we've processed that fd.
+ * Same as (1) as far as that fd is concerned.
+ */
+ INIT_LIST_HEAD(&dirty_fds);
+ LOCK(&priv->dirty_lock);
+ list_splice_init(&priv->dirty_fds, &dirty_fds);
+ UNLOCK(&priv->dirty_lock);
+
+ list_for_each_entry_safe (fd_ctx, fd_tmp, &dirty_fds, fd_list) {
+ ret = syncop_fsync(FIRST_CHILD(this), fd_ctx->fd, 0,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_WARNING, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "failed to fsync %p (%d)",
+ fd_ctx->fd, -ret);
+ }
+
+ LOCK(&fd_ctx->fd->lock);
+ nsr_flush_fd(this, fd_ctx);
+ list_del_init(&fd_ctx->fd_list);
+ UNLOCK(&fd_ctx->fd->lock);
+ fd_unref(fd_ctx->fd);
+ }
+
+ sleep(NSR_FLUSH_INTERVAL);
+ }
+
+ return NULL;
+}
+
+
+int32_t
+nsr_get_changelog_dir (xlator_t *this, char **cl_dir_p)
+{
+ xlator_t *cl_xl;
+
+ /* Find our changelog translator. */
+ cl_xl = this;
+ while (cl_xl) {
+ if (strcmp(cl_xl->type, "features/changelog") == 0) {
+ break;
+ }
+ cl_xl = cl_xl->children->xlator;
+ }
+ if (!cl_xl) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_INIT_FAIL,
+ "failed to find changelog translator");
+ return ENOENT;
+ }
+
+ /* Find the actual changelog directory. */
+ if (dict_get_str(cl_xl->options, "changelog-dir", cl_dir_p) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_INIT_FAIL,
+ "failed to find changelog-dir for %s", cl_xl->name);
+ return ENODATA;
+ }
+
+ return 0;
+}
+
+
+void
+nsr_get_terms (call_frame_t *frame, xlator_t *this)
+{
+ int32_t op_errno;
+ char *cl_dir;
+ DIR *fp = NULL;
+ struct dirent *rd_entry;
+ struct dirent *rd_result;
+ int32_t term_first = -1;
+ int32_t term_contig = -1;
+ int32_t term_last = -1;
+ int term_num;
+ char *probe_str;
+ dict_t *my_xdata = NULL;
+
+ op_errno = nsr_get_changelog_dir(this, &cl_dir);
+ if (op_errno) {
+ goto err; /* Error was already logged. */
+ }
+ op_errno = ENODATA; /* Most common error after this. */
+
+ rd_entry = alloca (offsetof(struct dirent, d_name) +
+ pathconf(cl_dir, _PC_NAME_MAX) + 1);
+ if (!rd_entry) {
+ goto err;
+ }
+
+ fp = sys_opendir (cl_dir);
+ if (!fp) {
+ op_errno = errno;
+ goto err;
+ }
+
+ /* Find first and last terms. */
+ for (;;) {
+ if (readdir_r(fp, rd_entry, &rd_result) != 0) {
+ op_errno = errno;
+ goto err;
+ }
+ if (!rd_result) {
+ break;
+ }
+ if (fnmatch("TERM.*", rd_entry->d_name, FNM_PATHNAME) != 0) {
+ continue;
+ }
+ /* +5 points to the character after the period */
+ term_num = atoi(rd_entry->d_name+5);
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ N_MSG_GENERIC,
+ "%s => %d", rd_entry->d_name, term_num);
+ if (term_num < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_INVALID,
+ "invalid term file name %s", rd_entry->d_name);
+ op_errno = EINVAL;
+ goto err;
+ }
+ if ((term_first < 0) || (term_first > term_num)) {
+ term_first = term_num;
+ }
+ if ((term_last < 0) || (term_last < term_num)) {
+ term_last = term_num;
+ }
+ }
+ if ((term_first < 0) || (term_last < 0)) {
+ /* TBD: are we *sure* there should always be at least one? */
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_NO_DATA, "no terms found");
+ op_errno = EINVAL;
+ goto err;
+ }
+
+ sys_closedir (fp);
+ fp = NULL;
+
+ /*
+ * Find term_contig, which is the earliest term for which there are
+ * no gaps between it and term_last.
+ */
+ for (term_contig = term_last; term_contig > 0; --term_contig) {
+ if (gf_asprintf(&probe_str, "%s/TERM.%d",
+ cl_dir, term_contig-1) <= 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR,
+ "failed to format term %d", term_contig-1);
+ goto err;
+ }
+ if (sys_access(probe_str, F_OK) != 0) {
+ GF_FREE(probe_str);
+ break;
+ }
+ GF_FREE(probe_str);
+ }
+
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ N_MSG_GENERIC,
+ "found terms %d-%d (%d)",
+ term_first, term_last, term_contig);
+
+ /* Return what we've found */
+ my_xdata = dict_new();
+ if (!my_xdata) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR,
+ "failed to allocate reply dictionary");
+ goto err;
+ }
+ if (dict_set_int32(my_xdata, "term-first", term_first) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR,
+ "failed to set term-first");
+ goto err;
+ }
+ if (dict_set_int32(my_xdata, "term-contig", term_contig) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR,
+ "failed to set term-contig");
+ goto err;
+ }
+ if (dict_set_int32(my_xdata, "term-last", term_last) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR,
+ "failed to set term-last");
+ goto err;
+ }
+
+ /* Finally! */
+ STACK_UNWIND_STRICT (ipc, frame, 0, 0, my_xdata);
+ dict_unref(my_xdata);
+ return;
+
+err:
+ if (fp) {
+ sys_closedir (fp);
+ }
+ if (my_xdata) {
+ dict_unref(my_xdata);
+ }
+ STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL);
+}
+
+
+long
+get_entry_count (xlator_t *this, int fd)
+{
+ struct stat buf;
+ long min; /* last entry not known to be empty */
+ long max; /* first entry known to be empty */
+ long curr;
+ char entry[CHANGELOG_ENTRY_SIZE];
+
+ if (sys_fstat (fd, &buf) < 0) {
+ return -1;
+ }
+
+ min = 0;
+ max = buf.st_size / CHANGELOG_ENTRY_SIZE;
+
+ while ((min+1) < max) {
+ curr = (min + max) / 2;
+ if (sys_lseek(fd, curr*CHANGELOG_ENTRY_SIZE, SEEK_SET) < 0) {
+ return -1;
+ }
+ if (sys_read(fd, entry, sizeof(entry)) != sizeof(entry)) {
+ return -1;
+ }
+ if ((entry[0] == '_') && (entry[1] == 'P')) {
+ min = curr;
+ } else {
+ max = curr;
+ }
+ }
+
+ if (sys_lseek(fd, 0, SEEK_SET) < 0) {
+ gf_msg (this->name, GF_LOG_WARNING, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "failed to reset offset");
+ }
+ return max;
+}
+
+
+void
+nsr_open_term (call_frame_t *frame, xlator_t *this, dict_t *xdata)
+{
+ int32_t op_errno;
+ char *cl_dir;
+ char *term;
+ char *path;
+ nsr_private_t *priv = this->private;
+
+ op_errno = nsr_get_changelog_dir(this, &cl_dir);
+ if (op_errno) {
+ goto err;
+ }
+
+ if (dict_get_str(xdata, "term", &term) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_NO_DATA, "missing term");
+ op_errno = ENODATA;
+ goto err;
+ }
+
+ if (gf_asprintf(&path, "%s/TERM.%s", cl_dir, term) < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR, "failed to construct path");
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ if (priv->term_fd >= 0) {
+ sys_close (priv->term_fd);
+ }
+ priv->term_fd = open(path, O_RDONLY);
+ if (priv->term_fd < 0) {
+ op_errno = errno;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "failed to open term file");
+ goto err;
+ }
+
+ priv->term_total = get_entry_count(this, priv->term_fd);
+ if (priv->term_total < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_NO_DATA, "failed to get entry count");
+ sys_close (priv->term_fd);
+ priv->term_fd = -1;
+ op_errno = EIO;
+ goto err;
+ }
+ priv->term_read = 0;
+
+ /* Success! */
+ STACK_UNWIND_STRICT (ipc, frame, 0, 0, NULL);
+ return;
+
+err:
+ STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL);
+}
+
+
+void
+nsr_next_entry (call_frame_t *frame, xlator_t *this)
+{
+ int32_t op_errno = ENOMEM;
+ nsr_private_t *priv = this->private;
+ ssize_t nbytes;
+ dict_t *my_xdata;
+
+ if (priv->term_fd < 0) {
+ op_errno = EBADFD;
+ goto err;
+ }
+
+ if (priv->term_read >= priv->term_total) {
+ op_errno = ENODATA;
+ goto err;
+ }
+
+ nbytes = sys_read (priv->term_fd, priv->term_buf, CHANGELOG_ENTRY_SIZE);
+ if (nbytes < CHANGELOG_ENTRY_SIZE) {
+ if (nbytes < 0) {
+ op_errno = errno;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "error reading next entry: %s",
+ strerror(errno));
+ } else {
+ op_errno = EIO;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "got %ld/%d bytes for next entry",
+ nbytes, CHANGELOG_ENTRY_SIZE);
+ }
+ goto err;
+ }
+ ++(priv->term_read);
+
+ my_xdata = dict_new();
+ if (!my_xdata) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR, "failed to allocate reply xdata");
+ goto err;
+ }
+
+ if (dict_set_static_bin(my_xdata, "data",
+ priv->term_buf, CHANGELOG_ENTRY_SIZE) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR, "failed to assign reply xdata");
+ goto err;
+ }
+
+ STACK_UNWIND_STRICT (ipc, frame, 0, 0, my_xdata);
+ dict_unref(my_xdata);
+ return;
+
+err:
+ STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL);
+}
+
+
+int32_t
+nsr_ipc (call_frame_t *frame, xlator_t *this, int32_t op, dict_t *xdata)
+{
+ switch (op) {
+ case NSR_SERVER_TERM_RANGE:
+ nsr_get_terms(frame, this);
+ break;
+ case NSR_SERVER_OPEN_TERM:
+ nsr_open_term(frame, this, xdata);
+ break;
+ case NSR_SERVER_NEXT_ENTRY:
+ nsr_next_entry(frame, this);
+ break;
+ default:
+ STACK_WIND_TAIL (frame,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->ipc,
+ op, xdata);
+ }
+
+ return 0;
+}
+
+
+int32_t
+nsr_forget (xlator_t *this, inode_t *inode)
+{
+ uint64_t ctx = 0LL;
+
+ if ((inode_ctx_del(inode, this, &ctx) == 0) && ctx) {
+ GF_FREE((void *)(long)ctx);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_release (xlator_t *this, fd_t *fd)
+{
+ uint64_t ctx = 0LL;
+
+ if ((fd_ctx_del(fd, this, &ctx) == 0) && ctx) {
+ GF_FREE((void *)(long)ctx);
+ }
+
+ return 0;
+}
+
+struct xlator_cbks cbks = {
+ .forget = nsr_forget,
+ .release = nsr_release,
+};
+
+int
+nsr_reconfigure (xlator_t *this, dict_t *options)
+{
+ nsr_private_t *priv = this->private;
+
+ GF_OPTION_RECONF ("leader",
+ priv->config_leader, options, bool, err);
+ GF_OPTION_RECONF ("quorum-percent",
+ priv->quorum_pct, options, percent, err);
+ gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC,
+ "reconfigure called, config_leader = %d, quorum_pct = %.1f\n",
+ priv->leader, priv->quorum_pct);
+
+ priv->leader = priv->config_leader;
+
+ return 0;
+
+err:
+ return -1;
+}
+
+int
+nsr_get_child_index (xlator_t *this, xlator_t *kid)
+{
+ xlator_list_t *trav;
+ int retval = -1;
+
+ for (trav = this->children; trav; trav = trav->next) {
+ ++retval;
+ if (trav->xlator == kid) {
+ return retval;
+ }
+ }
+
+ return -1;
+}
+
+/*
+ * Child notify handling is unreasonably FUBAR. Sometimes we'll get a
+ * CHILD_DOWN for a protocol/client child before we ever got a CHILD_UP for it.
+ * Other times we won't. Because it's effectively random (probably racy), we
+ * can't just maintain a count. We actually have to keep track of the state
+ * for each child separately, to filter out the bogus CHILD_DOWN events, and
+ * then generate counts on demand.
+ */
+int
+nsr_notify (xlator_t *this, int event, void *data, ...)
+{
+ nsr_private_t *priv = this->private;
+ int index;
+
+ switch (event) {
+ case GF_EVENT_CHILD_UP:
+ index = nsr_get_child_index(this, data);
+ if (index >= 0) {
+ priv->kid_state |= (1 << index);
+ priv->up_children = nsr_count_up_kids(priv);
+ gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC,
+ "got CHILD_UP for %s, now %u kids",
+ ((xlator_t *)data)->name,
+ priv->up_children);
+ if (!priv->config_leader && (priv->up_children > 1)) {
+ priv->leader = _gf_false;
+ }
+ }
+ break;
+ case GF_EVENT_CHILD_DOWN:
+ index = nsr_get_child_index(this, data);
+ if (index >= 0) {
+ priv->kid_state &= ~(1 << index);
+ priv->up_children = nsr_count_up_kids(priv);
+ gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC,
+ "got CHILD_DOWN for %s, now %u kids",
+ ((xlator_t *)data)->name,
+ priv->up_children);
+ if (!priv->config_leader && (priv->up_children < 2)) {
+ priv->leader = _gf_true;
+ }
+ }
+ break;
+ default:
+ ;
+ }
+
+ return default_notify(this, event, data);
+}
+
+
+int32_t
+mem_acct_init (xlator_t *this)
+{
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("nsr", this, out);
+
+ ret = xlator_mem_acct_init (this, gf_mt_nsr_end + 1);
+
+ if (ret != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR,
+ "Memory accounting init" "failed");
+ return ret;
+ }
+out:
+ return ret;
+}
+
+
+void
+nsr_deallocate_priv (nsr_private_t *priv)
+{
+ if (!priv) {
+ return;
+ }
+
+ GF_FREE(priv);
+}
+
+
+int32_t
+nsr_init (xlator_t *this)
+{
+ xlator_list_t *remote;
+ xlator_list_t *local;
+ nsr_private_t *priv = NULL;
+ xlator_list_t *trav;
+ pthread_t kid;
+ extern xlator_t global_xlator;
+ glusterfs_ctx_t *oldctx = global_xlator.ctx;
+
+ /*
+ * Any fop that gets special treatment has to be patched in here,
+ * because the compiled-in table is produced by the code generator and
+ * only contains generated functions. Note that we have to go through
+ * this->fops because of some dynamic-linking strangeness; modifying
+ * the static table doesn't work.
+ */
+ this->fops->getxattr = nsr_getxattr_special;
+ this->fops->fsync = nsr_fsync;
+ this->fops->ipc = nsr_ipc;
+
+ local = this->children;
+ if (!local) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_NO_DATA,
+ "no local subvolume");
+ goto err;
+ }
+
+ remote = local->next;
+ if (!remote) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_NO_DATA,
+ "no remote subvolumes");
+ goto err;
+ }
+
+ this->local_pool = mem_pool_new (nsr_local_t, 128);
+ if (!this->local_pool) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR,
+ "failed to create nsr_local_t pool");
+ goto err;
+ }
+
+ priv = GF_CALLOC (1, sizeof(*priv), gf_mt_nsr_private_t);
+ if (!priv) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR,
+ "could not allocate priv");
+ goto err;
+ }
+
+ for (trav = this->children; trav; trav = trav->next) {
+ ++(priv->n_children);
+ }
+
+ LOCK_INIT(&priv->dirty_lock);
+ LOCK_INIT(&priv->index_lock);
+ INIT_LIST_HEAD(&priv->dirty_fds);
+ priv->term_fd = -1;
+
+ this->private = priv;
+
+ GF_OPTION_INIT ("leader", priv->config_leader, bool, err);
+ GF_OPTION_INIT ("quorum-percent", priv->quorum_pct, percent, err);
+
+ priv->leader = priv->config_leader;
+
+ if (pthread_create(&kid, NULL, nsr_flush_thread,
+ this) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_SYS_CALL_FAILURE,
+ "could not start flush thread");
+ /* TBD: treat this as a fatal error? */
+ }
+
+ /*
+ * Calling glfs_new changes old->ctx, even if THIS still points
+ * to global_xlator. That causes problems later in the main
+ * thread, when gf_log_dump_graph tries to use the FILE after
+ * we've mucked with it and gets a segfault in __fprintf_chk.
+ * We can avoid all that by undoing the damage before we
+ * continue.
+ */
+ global_xlator.ctx = oldctx;
+
+ return 0;
+
+err:
+ nsr_deallocate_priv(priv);
+ return -1;
+}
+
+
+void
+nsr_fini (xlator_t *this)
+{
+ nsr_deallocate_priv(this->private);
+}
+
+class_methods_t class_methods = {
+ .init = nsr_init,
+ .fini = nsr_fini,
+ .reconfigure = nsr_reconfigure,
+ .notify = nsr_notify,
+};
+
+struct volume_options options[] = {
+ { .key = {"leader"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "false",
+ .description = "Start in the leader role. This is only for "
+ "bootstrapping the code, and should go away when we "
+ "have real leader election."
+ },
+ { .key = {"vol-name"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "volume name"
+ },
+ { .key = {"my-name"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "brick name in form of host:/path"
+ },
+ { .key = {"etcd-servers"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "list of comma seperated etc servers"
+ },
+ { .key = {"subvol-uuid"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "UUID for this NSR (sub)volume"
+ },
+ { .key = {"quorum-percent"},
+ .type = GF_OPTION_TYPE_PERCENT,
+ .default_value = "50.0",
+ .description = "percentage of rep_count-1 that must be up"
+ },
+ { .key = {NULL} },
+};