summaryrefslogtreecommitdiffstats
path: root/xlators/experimental
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2015-12-10 10:30:32 -0500
committerJeff Darcy <jdarcy@redhat.com>2016-02-11 15:46:26 -0800
commit55617ef037695f47ead1c1b753678402e1545f8c (patch)
tree7bc4dde5a51ae5c8ef3a45454e64ae5f5b7c5a86 /xlators/experimental
parent320779d53ae013147d5e2556d2946c73e45734ab (diff)
NSR : nsr server code generation patch
The NSR-server with this patch, appoints the first node on every replica subvolume, as the leader for that subvolume. On receiving a 'write' fop, the leader first checks if there is quorum in the replica subvolume to proceeed. In case there isn't it fails with EROFS. If there is quorum, the leader forwards the fop to the followers. The followers on receiving the fop, perform the operation, and based on the success or failure of the outcome send a +ve or a -ve ack to the leader. The leader after receiving acks from the followers performs a quorum check of the acks, to see if it should even try to perform the fop. If quorum is not being met, and the leader's outcome wouldn't affect quorum, then it would send -ve ack to the client without even performing the fop. If quorum is being met, the leader will then try the fop on itself, and based on it's outcome perform a quorum check of all the acks received (this time, including it's own). Based on the result of the quorum check (irrespective of the outcome on the leader), a +ve or -ve ack is send back to the client. Change-Id: I860654b74c53e9b139b37dba43848e5504df6dce Signed-off-by: Avra Sengupta <asengupt@redhat.com> Reviewed-on: http://review.gluster.org/12705 Smoke: Gluster Build System <jenkins@build.gluster.com> Tested-by: Jeff Darcy <jdarcy@redhat.com> CentOS-regression: Gluster Build System <jenkins@build.gluster.com> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Diffstat (limited to 'xlators/experimental')
-rw-r--r--xlators/experimental/Makefile.am2
-rw-r--r--xlators/experimental/nsr-client/src/nsr-messages.h38
-rw-r--r--xlators/experimental/nsr-server/Makefile.am3
-rw-r--r--xlators/experimental/nsr-server/src/Makefile.am35
-rw-r--r--xlators/experimental/nsr-server/src/all-templates.c429
-rwxr-xr-xxlators/experimental/nsr-server/src/gen-fops.py138
-rw-r--r--xlators/experimental/nsr-server/src/nsr-internal.h114
-rw-r--r--xlators/experimental/nsr-server/src/nsr.c1066
8 files changed, 1824 insertions, 1 deletions
diff --git a/xlators/experimental/Makefile.am b/xlators/experimental/Makefile.am
index f937146c72a..06f04a193c8 100644
--- a/xlators/experimental/Makefile.am
+++ b/xlators/experimental/Makefile.am
@@ -1,3 +1,3 @@
-SUBDIRS = nsr-client
+SUBDIRS = nsr-client nsr-server
CLEANFILES =
diff --git a/xlators/experimental/nsr-client/src/nsr-messages.h b/xlators/experimental/nsr-client/src/nsr-messages.h
index d68cfc5d228..aa28a639a16 100644
--- a/xlators/experimental/nsr-client/src/nsr-messages.h
+++ b/xlators/experimental/nsr-client/src/nsr-messages.h
@@ -60,8 +60,46 @@
*/
#define N_MSG_MEM_ERR (NSR_COMP_BASE + 3)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ */
+#define N_MSG_DICT_FLR (NSR_COMP_BASE + 4)
+
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ */
+#define N_MSG_GENERIC (NSR_COMP_BASE + 5)
+
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ */
+#define N_MSG_INVALID (NSR_COMP_BASE + 6)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ */
+#define N_MSG_NO_DATA (NSR_COMP_BASE + 7)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ */
+#define N_MSG_SYS_CALL_FAILURE (NSR_COMP_BASE + 8)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ */
+#define N_MSG_QUORUM_NOT_MET (NSR_COMP_BASE + 9)
#endif /* _NSR_MESSAGES_H_ */
diff --git a/xlators/experimental/nsr-server/Makefile.am b/xlators/experimental/nsr-server/Makefile.am
new file mode 100644
index 00000000000..a985f42a877
--- /dev/null
+++ b/xlators/experimental/nsr-server/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/experimental/nsr-server/src/Makefile.am b/xlators/experimental/nsr-server/src/Makefile.am
new file mode 100644
index 00000000000..6c0597610a2
--- /dev/null
+++ b/xlators/experimental/nsr-server/src/Makefile.am
@@ -0,0 +1,35 @@
+xlator_LTLIBRARIES = nsr.la
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/experimental
+
+nodist_nsr_la_SOURCES = nsr-cg.c
+CLEANFILES = $(nodist_nsr_la_SOURCES)
+
+nsr_la_LDFLAGS = -module -avoid-version
+nsr_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \
+ $(top_builddir)/api/src/libgfapi.la
+
+noinst_HEADERS = nsr-internal.h \
+ $(top_srcdir)/xlators/lib/src/libxlator.h \
+ $(top_srcdir)/glusterfsd/src/glusterfsd.h
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) \
+ -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/xlators/lib/src \
+ -I$(top_srcdir)/rpc/rpc-lib/src -DSBIN_DIR=\"$(sbindir)\" \
+ -I$(top_srcdir)/api/src -DNSR_SCRIPT_PREFIX=\"$(nsrdir)\" \
+ -I$(top_srcdir)/xlators/experimental/nsr-client/src/
+
+AM_CFLAGS = -Wall $(GF_CFLAGS)
+
+NSR_PREFIX = $(top_srcdir)/xlators/experimental/nsr-server/src
+NSR_GEN_FOPS = $(NSR_PREFIX)/gen-fops.py
+NSR_TEMPLATES = $(NSR_PREFIX)/all-templates.c
+NSR_WRAPPER = $(NSR_PREFIX)/nsr.c
+noinst_PYTHON = $(NSR_GEN_FOPS)
+EXTRA_DIST = $(NSR_TEMPLATES) $(NSR_WRAPPER)
+
+nsr-cg.c: $(NSR_GEN_FOPS) $(NSR_TEMPLATES) $(NSR_WRAPPER)
+ $(PYTHON) $(NSR_GEN_FOPS) $(NSR_TEMPLATES) $(NSR_WRAPPER) > $@
+
+uninstall-local:
+ rm -f $(DESTDIR)$(xlatordir)/nsr.so
diff --git a/xlators/experimental/nsr-server/src/all-templates.c b/xlators/experimental/nsr-server/src/all-templates.c
new file mode 100644
index 00000000000..300abea959d
--- /dev/null
+++ b/xlators/experimental/nsr-server/src/all-templates.c
@@ -0,0 +1,429 @@
+/*
+ * You can put anything here - it doesn't even have to be a comment - and it
+ * will be ignored until we reach the first template-name comment.
+ */
+
+
+/* template-name read-fop */
+int32_t
+nsr_@NAME@ (call_frame_t *frame, xlator_t *this,
+ @LONG_ARGS@)
+{
+ nsr_private_t *priv = this->private;
+ gf_boolean_t in_recon = _gf_false;
+ int32_t recon_term, recon_index;
+
+ /* allow reads during reconciliation *
+ * TBD: allow "dirty" reads on non-leaders *
+ */
+ if (xdata &&
+ (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) &&
+ (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) {
+ in_recon = _gf_true;
+ }
+
+ if ((!priv->leader) && (in_recon == _gf_false)) {
+ goto err;
+ }
+
+ STACK_WIND (frame, default_@NAME@_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@,
+ @SHORT_ARGS@);
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (@NAME@, frame, -1, EREMOTE,
+ @ERROR_ARGS@);
+ return 0;
+}
+
+/* template-name read-dispatch */
+/* No "dispatch" function needed for @NAME@ */
+
+/* template-name read-fan-in */
+/* No "fan-in" function needed for @NAME@ */
+
+/* template-name read-continue */
+/* No "continue" function needed for @NAME@ */
+
+/* template-name read-complete */
+/* No "complete" function needed for @NAME@ */
+
+/* template-name write-fop */
+int32_t
+nsr_@NAME@ (call_frame_t *frame, xlator_t *this,
+ @LONG_ARGS@)
+{
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = this->private;
+ gf_boolean_t result = _gf_false;
+ int op_errno = ENOMEM;
+ int from_leader;
+ int from_recon;
+ uint32_t ti = 0;
+
+ /*
+ * Our first goal here is to avoid "split brain surprise" for users who
+ * specify exactly 50% with two- or three-way replication. That means
+ * either a more-than check against half the total replicas or an
+ * at-least check against half of our peers (one less). Of the two,
+ * only an at-least check supports the intuitive use of 100% to mean
+ * all replicas must be present, because "more than 100%" will never
+ * succeed regardless of which count we use. This leaves us with a
+ * slightly non-traditional definition of quorum ("at least X% of peers
+ * not including ourselves") but one that's useful enough to be worth
+ * it.
+ *
+ * Note that n_children and up_children *do* include the local
+ * subvolume, so we need to subtract one in each case.
+ */
+ if (priv->leader) {
+ result = fop_quorum_check (this, (double)(priv->n_children - 1),
+ (double)(priv->up_children - 1));
+
+ if (result == _gf_false) {
+ /* Emulate the AFR client-side-quorum behavior. */
+ op_errno = EROFS;
+ goto err;
+ }
+ } else {
+ if (xdata) {
+ from_leader = !!dict_get(xdata, NSR_TERM_XATTR);
+ from_recon = !!dict_get(xdata, RECON_TERM_XATTR)
+ && !!dict_get(xdata, RECON_INDEX_XATTR);
+ } else {
+ from_leader = from_recon = _gf_false;
+ }
+
+ /* follower/recon path *
+ * just send it to local node *
+ */
+ if (!from_leader && !from_recon) {
+ op_errno = EREMOTE;
+ goto err;
+ }
+ }
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ goto err;
+ }
+#if defined(NSR_CG_NEED_FD)
+ local->fd = fd_ref(fd);
+#else
+ local->fd = NULL;
+#endif
+ INIT_LIST_HEAD(&local->qlinks);
+ frame->local = local;
+
+ /*
+ * If we let it through despite not being the leader, then we just want
+ * to pass it on down without all of the additional xattrs, queuing, and
+ * so on. However, nsr_*_complete does depend on the initialization
+ * immediately above this.
+ */
+ if (!priv->leader) {
+ STACK_WIND (frame, nsr_@NAME@_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@,
+ @SHORT_ARGS@);
+ return 0;
+ }
+
+ if (!xdata) {
+ xdata = dict_new();
+ if (!xdata) {
+ gf_msg (this->name, GF_LOG_ERROR, ENOMEM,
+ N_MSG_MEM_ERR, "failed to allocate xdata");
+ goto err;
+ }
+ }
+
+ if (dict_set_int32(xdata, NSR_TERM_XATTR, priv->current_term) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR, "failed to set nsr-term");
+ goto err;
+ }
+
+ LOCK(&priv->index_lock);
+ ti = ++(priv->index);
+ UNLOCK(&priv->index_lock);
+ if (dict_set_int32(xdata, NSR_INDEX_XATTR, ti) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR, "failed to set index");
+ goto err;
+ }
+
+ local->stub = fop_@NAME@_stub (frame, nsr_@NAME@_continue,
+ @SHORT_ARGS@);
+ if (!local->stub) {
+ goto err;
+ }
+
+
+#if defined(NSR_CG_QUEUE)
+ nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this, fd->inode);
+
+ if (!ictx) {
+ op_errno = EIO;
+ goto err;
+ }
+ LOCK(&ictx->lock);
+ if (ictx->active) {
+ gf_msg_debug (this->name, 0,
+ "queuing request due to conflict");
+ /*
+ * TBD: enqueue only for real conflict
+ *
+ * Currently we just act like all writes are in
+ * conflict with one another. What we should really do
+ * is check the active/pending queues and defer only if
+ * there's a conflict there.
+ *
+ * It's important to check the pending queue because we
+ * might have an active request X which conflicts with
+ * a pending request Y, and this request Z might
+ * conflict with Y but not X. If we checked only the
+ * active queue then Z could jump ahead of Y, which
+ * would be incorrect.
+ */
+ local->qstub = fop_@NAME@_stub (frame,
+ nsr_@NAME@_dispatch,
+ @SHORT_ARGS@);
+ if (!local->qstub) {
+ UNLOCK(&ictx->lock);
+ goto err;
+ }
+ list_add_tail(&local->qlinks, &ictx->pqueue);
+ ++(ictx->pending);
+ UNLOCK(&ictx->lock);
+ return 0;
+ } else {
+ list_add_tail(&local->qlinks, &ictx->aqueue);
+ ++(ictx->active);
+ }
+ UNLOCK(&ictx->lock);
+#endif
+
+ return nsr_@NAME@_dispatch (frame, this, @SHORT_ARGS@);
+
+err:
+ if (local) {
+ if (local->stub) {
+ call_stub_destroy(local->stub);
+ }
+ if (local->qstub) {
+ call_stub_destroy(local->qstub);
+ }
+ if (local->fd) {
+ fd_unref(local->fd);
+ }
+ mem_put(local);
+ }
+ STACK_UNWIND_STRICT (@NAME@, frame, -1, op_errno,
+ @ERROR_ARGS@);
+ return 0;
+}
+
+/* template-name write-dispatch */
+int32_t
+nsr_@NAME@_dispatch (call_frame_t *frame, xlator_t *this,
+ @LONG_ARGS@)
+{
+ nsr_local_t *local = frame->local;
+ nsr_private_t *priv = this->private;
+ xlator_list_t *trav;
+
+ /*
+ * TBD: unblock pending request(s) if we fail after this point but
+ * before we get to nsr_@NAME@_complete (where that code currently
+ * resides).
+ */
+
+ local->call_count = priv->n_children - 1;
+ local->successful_acks = 0;
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_@NAME@_fan_in,
+ trav->xlator, trav->xlator->fops->@NAME@,
+ @SHORT_ARGS@);
+ }
+
+ /* TBD: variable Issue count */
+ return 0;
+}
+
+/* template-name write-fan-in */
+int32_t
+nsr_@NAME@_fan_in (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ @LONG_ARGS@)
+{
+ nsr_local_t *local = frame->local;
+ uint8_t call_count;
+
+ gf_msg_trace (this->name, 0, "op_ret = %d, op_errno = %d\n",
+ op_ret, op_errno);
+
+ LOCK(&frame->lock);
+ call_count = --(local->call_count);
+ if (op_ret != -1) {
+ /* Increment the number of successful acks *
+ * received for the operation. *
+ */
+ (local->successful_acks)++;
+ local->successful_op_ret = op_ret;
+ }
+ gf_msg_debug (this->name, 0, "succ_acks = %d, op_ret = %d, op_errno = %d\n",
+ op_ret, op_errno, local->successful_acks);
+ UNLOCK(&frame->lock);
+
+ /* TBD: variable Completion count */
+ if (call_count == 0) {
+ call_resume(local->stub);
+ }
+
+ return 0;
+}
+
+/* template-name write-continue */
+int32_t
+nsr_@NAME@_continue (call_frame_t *frame, xlator_t *this,
+ @LONG_ARGS@)
+{
+ int32_t ret = -1;
+ gf_boolean_t result = _gf_false;
+ nsr_local_t *local = NULL;
+ nsr_private_t *priv = NULL;
+
+ GF_VALIDATE_OR_GOTO ("nsr", this, out);
+ GF_VALIDATE_OR_GOTO (this->name, frame, out);
+ priv = this->private;
+ local = frame->local;
+ GF_VALIDATE_OR_GOTO (this->name, priv, out);
+ GF_VALIDATE_OR_GOTO (this->name, local, out);
+
+ /* Perform quorum check to see if the leader needs *
+ * to perform the operation. If the operation will not *
+ * meet quorum irrespective of the leader's result *
+ * there is no point in the leader performing the fop *
+ */
+ result = fop_quorum_check (this, (double)priv->n_children,
+ (double)local->successful_acks + 1);
+ if (result == _gf_false) {
+ STACK_UNWIND_STRICT (@NAME@, frame, -1, EROFS,
+ @ERROR_ARGS@);
+ } else {
+ STACK_WIND (frame, nsr_@NAME@_complete,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@,
+ @SHORT_ARGS@);
+ }
+
+ ret = 0;
+out:
+ return ret;
+}
+
+/* template-name write-complete */
+int32_t
+nsr_@NAME@_complete (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ @LONG_ARGS@)
+{
+ gf_boolean_t result = _gf_false;
+ nsr_private_t *priv = this->private;
+
+ nsr_local_t *local = frame->local;
+
+ /* If the fop failed on the leader, then reduce one succesful ack
+ * before calculating the fop quorum
+ */
+ LOCK(&frame->lock);
+ if (op_ret == -1)
+ (local->successful_acks)--;
+ UNLOCK(&frame->lock);
+
+#if defined(NSR_CG_QUEUE)
+ nsr_inode_ctx_t *ictx;
+ nsr_local_t *next;
+
+ if (local->qlinks.next != &local->qlinks) {
+ list_del(&local->qlinks);
+ ictx = nsr_get_inode_ctx(this, local->fd->inode);
+ if (ictx) {
+ LOCK(&ictx->lock);
+ if (ictx->pending) {
+ /*
+ * TBD: dequeue *all* non-conflicting
+ * reqs
+ *
+ * With the stub implementation there
+ * can only be one request active at a
+ * time (zero here) so it's not an
+ * issue. In a real implementation
+ * there might still be other active
+ * requests to check against, and
+ * multiple pending requests that could
+ * continue.
+ */
+ gf_msg_debug (this->name, 0,
+ "unblocking next request");
+ --(ictx->pending);
+ next = list_entry (ictx->pqueue.next,
+ nsr_local_t, qlinks);
+ list_del(&next->qlinks);
+ list_add_tail(&next->qlinks,
+ &ictx->aqueue);
+ call_resume(next->qstub);
+ } else {
+ --(ictx->active);
+ }
+ UNLOCK(&ictx->lock);
+ }
+ }
+#endif
+
+#if defined(NSR_CG_FSYNC)
+ nsr_mark_fd_dirty(this, local);
+#endif
+
+#if defined(NSR_CG_NEED_FD)
+ fd_unref(local->fd);
+#endif
+
+ /* After the leader completes the fop, a quorum check is *
+ * performed, taking into account the outcome of the fop *
+ * on the leader. Irrespective of the fop being successful *
+ * or failing on the leader, the result of the quorum will *
+ * determine if the overall fop is successful or not. For *
+ * example, a fop might have succeeded on every node except *
+ * the leader, in which case as quorum is being met, the fop *
+ * will be treated as a successful fop, even though it failed *
+ * on the leader. On follower nodes, no quorum check should *
+ * be done, and the result is returned to the leader as is. *
+ */
+ if (priv->leader) {
+ result = fop_quorum_check (this, (double)priv->n_children,
+ (double)local->successful_acks + 1);
+ if (result == _gf_false) {
+ op_ret = -1;
+ op_errno = EROFS;
+ gf_msg_debug (this->name, 0,
+ "Quorum is not met. The operation has failed.");
+ } else {
+#if defined(NSR_CG_NEED_FD)
+ op_ret = local->successful_op_ret;
+#else
+ op_ret = 0;
+#endif
+ op_errno = 0;
+ gf_msg_debug (this->name, 0,
+ "Quorum has met. The operation has succeeded.");
+ }
+ }
+
+ STACK_UNWIND_STRICT (@NAME@, frame, op_ret, op_errno,
+ @SHORT_ARGS@);
+
+
+ return 0;
+
+}
diff --git a/xlators/experimental/nsr-server/src/gen-fops.py b/xlators/experimental/nsr-server/src/gen-fops.py
new file mode 100755
index 00000000000..336b218a8fb
--- /dev/null
+++ b/xlators/experimental/nsr-server/src/gen-fops.py
@@ -0,0 +1,138 @@
+#!/usr/bin/python
+
+# This script generates the boilerplate versions of most fops and cbks in the
+# server. This allows the details of leadership-status checking, sequencing
+# between leader and followers (including fan-out), and basic error checking
+# to be centralized one place, with per-operation code kept to a minimum.
+
+import os
+import re
+import string
+import sys
+
+curdir = os.path.dirname(sys.argv[0])
+gendir = os.path.join(curdir,'../../../../libglusterfs/src')
+sys.path.append(gendir)
+from generator import ops, fop_subs, cbk_subs, generate
+
+# We really want the callback argument list, even when we're generating fop
+# code, so we propagate here.
+# TBD: this should probably be right in generate.py
+for k, v in cbk_subs.iteritems():
+ fop_subs[k]['@ERROR_ARGS@'] = v['@ERROR_ARGS@']
+
+# Stolen from old codegen.py
+def load_templates (path):
+ templates = {}
+ tmpl_re = re.compile("/\* template-name (.*) \*/")
+ templates = {}
+ t_name = None
+ for line in open(path,"r").readlines():
+ if not line:
+ break
+ m = tmpl_re.match(line)
+ if m:
+ if t_name:
+ templates[t_name] = string.join(t_contents,'')
+ t_name = m.group(1).strip()
+ t_contents = []
+ elif t_name:
+ t_contents.append(line)
+ if t_name:
+ templates[t_name] = string.join(t_contents,'')
+ return templates
+
+# We need two types of templates. The first, for pure read operations, just
+# needs to do a simple am-i-leader check (augmented to allow dirty reads).
+# The second, for pure writes, needs to do fan-out to followers between those
+# initial checks and local execution. There are other operations that don't
+# fit neatly into either category - e.g. lock ops or fsync - so we'll just have
+# to handle those manually. The table thus includes entries only for those we
+# can categorize. The special cases, plus any new operations we've never even
+# heard of, aren't in there.
+#
+# Various keywords can be used to define/undefine preprocessor symbols used
+# in the templates, on a per-function basis. For example, if the keyword here
+# is "fsync" (lowercase word or abbreviation) that will cause NSR_CG_FSYNC
+# (prefix plus uppercase version) to be defined above all of the generated code
+# for that fop.
+
+fop_table = {
+ "access": "read",
+ "create": "write",
+ "discard": "write",
+# "entrylk": "read",
+ "fallocate": "write",
+# "fentrylk": "read",
+ "fgetxattr": "read",
+# "finodelk": "read",
+# "flush": "read",
+ "fremovexattr": "write",
+ "fsetattr": "write",
+ "fsetxattr": "write",
+ "fstat": "read",
+# "fsync": "read",
+# "fsyncdir": "read",
+ "ftruncate": "write",
+ "fxattrop": "write",
+ "getxattr": "read",
+# "inodelk": "read",
+ "link": "write",
+# "lk": "read",
+# "lookup": "read",
+ "mkdir": "write",
+ "mknod": "write",
+ "open": "write",
+ "opendir": "read",
+ "rchecksum": "read",
+ "readdir": "read",
+ "readdirp": "read",
+ "readlink": "read",
+ "readv": "read",
+ "removexattr": "write",
+ "rename": "write",
+ "rmdir": "write",
+ "setattr": "write",
+ "setxattr": "write",
+ "stat": "read",
+ "statfs": "read",
+ "symlink": "write",
+ "truncate": "write",
+ "unlink": "write",
+ "writev": "write,fsync,queue",
+ "xattrop": "write",
+}
+
+# Stolen from gen_fdl.py
+def gen_server (templates):
+ fops_done = []
+ for name in fop_table.keys():
+ info = fop_table[name].split(",")
+ kind = info[0]
+ flags = info[1:]
+ if ("fsync" in flags) or ("queue" in flags):
+ flags.append("need_fd")
+ for fname in flags:
+ print "#define NSR_CG_%s" % fname.upper()
+ print generate(templates[kind+"-complete"],name,cbk_subs)
+ print generate(templates[kind+"-continue"],name,fop_subs)
+ print generate(templates[kind+"-fan-in"],name,cbk_subs)
+ print generate(templates[kind+"-dispatch"],name,fop_subs)
+ print generate(templates[kind+"-fop"],name,fop_subs)
+ for fname in flags:
+ print "#undef NSR_CG_%s" % fname.upper()
+ fops_done.append(name)
+ # Just for fun, emit the fops table too.
+ print("struct xlator_fops fops = {")
+ for x in fops_done:
+ print(" .%s = nsr_%s,"%(x,x))
+ print("};")
+
+tmpl = load_templates(sys.argv[1])
+for l in open(sys.argv[2],'r').readlines():
+ if l.find('#pragma generate') != -1:
+ print "/* BEGIN GENERATED CODE - DO NOT MODIFY */"
+ gen_server(tmpl)
+ print "/* END GENERATED CODE */"
+ else:
+ print l[:-1]
diff --git a/xlators/experimental/nsr-server/src/nsr-internal.h b/xlators/experimental/nsr-server/src/nsr-internal.h
new file mode 100644
index 00000000000..b8c7fc314b7
--- /dev/null
+++ b/xlators/experimental/nsr-server/src/nsr-internal.h
@@ -0,0 +1,114 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#define LEADER_XATTR "user.nsr.leader"
+#define SECOND_CHILD(xl) (xl->children->next->xlator)
+#define RECONCILER_PATH NSR_SCRIPT_PREFIX"/reconciler.py"
+#define CHANGELOG_ENTRY_SIZE 128
+
+enum {
+ gf_mt_nsr_private_t = gf_common_mt_end + 1,
+ gf_mt_nsr_fd_ctx_t,
+ gf_mt_nsr_inode_ctx_t,
+ gf_mt_nsr_dirty_t,
+ gf_mt_nsr_end
+};
+
+typedef enum nsr_recon_notify_ev_id_t {
+ NSR_RECON_SET_LEADER = 1,
+ NSR_RECON_ADD_CHILD = 2
+} nsr_recon_notify_ev_id_t;
+
+typedef struct _nsr_recon_notify_ev_s {
+ nsr_recon_notify_ev_id_t id;
+ uint32_t index; /* in case of add */
+ struct list_head list;
+} nsr_recon_notify_ev_t;
+
+typedef struct {
+ /*
+ * This is a hack to allow a non-leader to accept requests while the
+ * leader is down, and it only works for n=2. The way it works is that
+ * "config_leader" indicates the state from our options (via init or
+ * reconfigure) but "leader" is what the fop code actually looks at. If
+ * config_leader is true, then leader will *always* be true as well,
+ * giving that brick precedence. If config_leader is false, then
+ * leader will only be true if there is no connection to the other
+ * brick (tracked in nsr_notify).
+ *
+ * TBD: implement real leader election
+ */
+ gf_boolean_t config_leader;
+ gf_boolean_t leader;
+ uint8_t up_children;
+ uint8_t n_children;
+ char *vol_file;
+ uint32_t current_term;
+ uint32_t kid_state;
+ gf_lock_t dirty_lock;
+ struct list_head dirty_fds;
+ uint32_t index;
+ gf_lock_t index_lock;
+ double quorum_pct;
+ int term_fd;
+ long term_total;
+ long term_read;
+ /*
+ * This is a super-duper hack, but it will do for now. The reason it's
+ * a hack is that we pass this to dict_set_static_bin, so we don't have
+ * to mess around with allocating and freeing it on every single IPC
+ * request, but it's totally not thread-safe. On the other hand, there
+ * should only be one reconciliation thread running and calling these
+ * functions at a time, so maybe that doesn't matter.
+ *
+ * TBD: re-evaluate how to manage this
+ */
+ char term_buf[CHANGELOG_ENTRY_SIZE];
+} nsr_private_t;
+
+typedef struct {
+ call_stub_t *stub;
+ call_stub_t *qstub;
+ uint32_t call_count;
+ uint32_t successful_acks;
+ uint32_t successful_op_ret;
+ fd_t *fd;
+ struct list_head qlinks;
+} nsr_local_t;
+
+/*
+ * This should match whatever changelog returns on the pre-op for us to pass
+ * when we're ready for our post-op.
+ */
+typedef uint32_t log_id_t;
+
+typedef struct {
+ struct list_head links;
+ log_id_t id;
+} nsr_dirty_list_t;
+
+typedef struct {
+ fd_t *fd;
+ struct list_head dirty_list;
+ struct list_head fd_list;
+} nsr_fd_ctx_t;
+
+typedef struct {
+ gf_lock_t lock;
+ uint32_t active;
+ struct list_head aqueue;
+ uint32_t pending;
+ struct list_head pqueue;
+} nsr_inode_ctx_t;
+
+void nsr_start_reconciler (xlator_t *this);
diff --git a/xlators/experimental/nsr-server/src/nsr.c b/xlators/experimental/nsr-server/src/nsr.c
new file mode 100644
index 00000000000..0c494b78125
--- /dev/null
+++ b/xlators/experimental/nsr-server/src/nsr.c
@@ -0,0 +1,1066 @@
+/*
+ Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include <fnmatch.h>
+#include "call-stub.h"
+#include "defaults.h"
+#include "xlator.h"
+#include "glfs.h"
+#include "glfs-internal.h"
+#include "run.h"
+#include "common-utils.h"
+#include "syncop.h"
+#include "syscall.h"
+
+#include "nsr-internal.h"
+#include "nsr-messages.h"
+
+#define NSR_FLUSH_INTERVAL 5
+
+enum {
+ /* echo "cluster/nsr-server" | md5sum | cut -c 1-8 */
+ NSR_SERVER_IPC_BASE = 0x0e2d66a5,
+ NSR_SERVER_TERM_RANGE,
+ NSR_SERVER_OPEN_TERM,
+ NSR_SERVER_NEXT_ENTRY
+};
+
+/* Used to check the quorum of acks received after the fop
+ * confirming the status of the fop on all the brick processes
+ * for this particular subvolume
+ */
+gf_boolean_t
+fop_quorum_check (xlator_t *this, double n_children,
+ double current_state)
+{
+ nsr_private_t *priv = NULL;
+ gf_boolean_t result = _gf_false;
+ double required = 0;
+ double current = 0;
+
+ GF_VALIDATE_OR_GOTO ("nsr", this, out);
+ priv = this->private;
+ GF_VALIDATE_OR_GOTO (this->name, priv, out);
+
+ required = n_children * priv->quorum_pct;
+
+ /*
+ * Before performing the fop on the leader, we need to check,
+ * if there is any merit in performing the fop on the leader.
+ * In a case, where even a successful write on the leader, will
+ * not meet quorum, there is no point in trying the fop on the
+ * leader.
+ * When this function is called after the leader has tried
+ * performing the fop, this check will calculate quorum taking into
+ * account the status of the fop on the leader. If the leader's
+ * op_ret was -1, the complete function would account that by
+ * decrementing successful_acks by 1
+ */
+
+ current = current_state * 100.0;
+
+ if (current < required) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_QUORUM_NOT_MET,
+ "Quorum not met. quorum_pct = %f "
+ "Current State = %f, Required State = %f",
+ priv->quorum_pct, current,
+ required);
+ } else
+ result = _gf_true;
+
+out:
+ return result;
+}
+
+nsr_inode_ctx_t *
+nsr_get_inode_ctx (xlator_t *this, inode_t *inode)
+{
+ uint64_t ctx_int = 0LL;
+ nsr_inode_ctx_t *ctx_ptr;
+
+ if (__inode_ctx_get(inode, this, &ctx_int) == 0) {
+ ctx_ptr = (nsr_inode_ctx_t *)(long)ctx_int;
+ } else {
+ ctx_ptr = GF_CALLOC (1, sizeof(*ctx_ptr),
+ gf_mt_nsr_inode_ctx_t);
+ if (ctx_ptr) {
+ ctx_int = (uint64_t)(long)ctx_ptr;
+ if (__inode_ctx_set(inode, this, &ctx_int) == 0) {
+ LOCK_INIT(&ctx_ptr->lock);
+ INIT_LIST_HEAD(&ctx_ptr->aqueue);
+ INIT_LIST_HEAD(&ctx_ptr->pqueue);
+ } else {
+ GF_FREE(ctx_ptr);
+ ctx_ptr = NULL;
+ }
+ }
+
+ }
+
+ return ctx_ptr;
+}
+
+nsr_fd_ctx_t *
+nsr_get_fd_ctx (xlator_t *this, fd_t *fd)
+{
+ uint64_t ctx_int = 0LL;
+ nsr_fd_ctx_t *ctx_ptr;
+
+ if (__fd_ctx_get(fd, this, &ctx_int) == 0) {
+ ctx_ptr = (nsr_fd_ctx_t *)(long)ctx_int;
+ } else {
+ ctx_ptr = GF_CALLOC (1, sizeof(*ctx_ptr), gf_mt_nsr_fd_ctx_t);
+ if (ctx_ptr) {
+ if (__fd_ctx_set(fd, this, (uint64_t)ctx_ptr) == 0) {
+ INIT_LIST_HEAD(&ctx_ptr->dirty_list);
+ INIT_LIST_HEAD(&ctx_ptr->fd_list);
+ } else {
+ GF_FREE(ctx_ptr);
+ ctx_ptr = NULL;
+ }
+ }
+
+ }
+
+ return ctx_ptr;
+}
+
+void
+nsr_mark_fd_dirty (xlator_t *this, nsr_local_t *local)
+{
+ fd_t *fd = local->fd;
+ nsr_fd_ctx_t *ctx_ptr;
+ nsr_dirty_list_t *dirty;
+ nsr_private_t *priv = this->private;
+
+ /*
+ * TBD: don't do any of this for O_SYNC/O_DIRECT writes.
+ * Unfortunately, that optimization requires that we distinguish
+ * between writev and other "write" calls, saving the original flags
+ * and checking them in the callback. Too much work for too little
+ * gain right now.
+ */
+
+ LOCK(&fd->lock);
+ ctx_ptr = nsr_get_fd_ctx(this, fd);
+ dirty = GF_CALLOC(1, sizeof(*dirty), gf_mt_nsr_dirty_t);
+ if (ctx_ptr && dirty) {
+ gf_msg_trace (this->name, 0,
+ "marking fd %p as dirty (%p)", fd, dirty);
+ /* TBD: fill dirty->id from what changelog gave us */
+ list_add_tail(&dirty->links, &ctx_ptr->dirty_list);
+ if (list_empty(&ctx_ptr->fd_list)) {
+ /* Add a ref so _release doesn't get called. */
+ ctx_ptr->fd = fd_ref(fd);
+ LOCK(&priv->dirty_lock);
+ list_add_tail (&ctx_ptr->fd_list,
+ &priv->dirty_fds);
+ UNLOCK(&priv->dirty_lock);
+ }
+ } else {
+ gf_msg (this->name, GF_LOG_ERROR, ENOMEM,
+ N_MSG_MEM_ERR, "could not mark %p dirty", fd);
+ if (ctx_ptr) {
+ GF_FREE(ctx_ptr);
+ }
+ if (dirty) {
+ GF_FREE(dirty);
+ }
+ }
+ UNLOCK(&fd->lock);
+}
+
+#define NSR_TERM_XATTR "trusted.nsr.term"
+#define NSR_INDEX_XATTR "trusted.nsr.index"
+#define NSR_REP_COUNT_XATTR "trusted.nsr.rep-count"
+#define RECON_TERM_XATTR "trusted.nsr.recon-term"
+#define RECON_INDEX_XATTR "trusted.nsr.recon-index"
+
+#pragma generate
+
+uint8_t
+nsr_count_up_kids (nsr_private_t *priv)
+{
+ uint8_t retval = 0;
+ uint8_t i;
+
+ for (i = 0; i < priv->n_children; ++i) {
+ if (priv->kid_state & (1 << i)) {
+ ++retval;
+ }
+ }
+
+ return retval;
+}
+
+/*
+ * The fsync machinery looks a lot like that for any write call, but there are
+ * some important differences that are easy to miss. First, we don't care
+ * about the xdata that shows whether the call came from a leader or
+ * reconciliation process. If we're the leader we fan out; if we're not we
+ * don't. Second, we don't wait for followers before we issue the local call.
+ * The code generation system could be updated to handle this, and still might
+ * if we need to implement other "almost identical" paths (e.g. for open), but
+ * a copy is more readable as long as it's just one.
+ */
+
+int32_t
+nsr_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ nsr_local_t *local = frame->local;
+ gf_boolean_t unwind;
+
+ LOCK(&frame->lock);
+ unwind = !--(local->call_count);
+ UNLOCK(&frame->lock);
+
+ if (unwind) {
+ STACK_UNWIND_STRICT (fsync, frame, op_ret, op_errno, prebuf,
+ postbuf, xdata);
+ }
+ return 0;
+}
+
+int32_t
+nsr_fsync_local_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ nsr_dirty_list_t *dirty;
+ nsr_dirty_list_t *dtmp;
+ nsr_local_t *local = frame->local;
+
+ list_for_each_entry_safe (dirty, dtmp, &local->qlinks, links) {
+ gf_msg_trace (this->name, 0,
+ "sending post-op on %p (%p)", local->fd, dirty);
+ GF_FREE(dirty);
+ }
+
+ return nsr_fsync_cbk (frame, cookie, this, op_ret, op_errno,
+ prebuf, postbuf, xdata);
+}
+
+int32_t
+nsr_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,
+ dict_t *xdata)
+{
+ nsr_private_t *priv = this->private;
+ nsr_local_t *local;
+ uint64_t ctx_int = 0LL;
+ nsr_fd_ctx_t *ctx_ptr;
+ xlator_list_t *trav;
+
+ local = mem_get0(this->local_pool);
+ if (!local) {
+ STACK_UNWIND_STRICT(fsync, frame, -1, ENOMEM,
+ NULL, NULL, xdata);
+ return 0;
+ }
+ INIT_LIST_HEAD(&local->qlinks);
+ frame->local = local;
+
+ /* Move the dirty list from the fd to the fsync request. */
+ LOCK(&fd->lock);
+ if (__fd_ctx_get(fd, this, &ctx_int) == 0) {
+ ctx_ptr = (nsr_fd_ctx_t *)(long)ctx_int;
+ list_splice_init (&ctx_ptr->dirty_list,
+ &local->qlinks);
+ }
+ UNLOCK(&fd->lock);
+
+ /* Issue the local call. */
+ local->call_count = priv->leader ? priv->n_children : 1;
+ STACK_WIND (frame, nsr_fsync_local_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync,
+ fd, flags, xdata);
+
+ /* Issue remote calls if we're the leader. */
+ if (priv->leader) {
+ for (trav = this->children->next; trav; trav = trav->next) {
+ STACK_WIND (frame, nsr_fsync_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsync,
+ fd, flags, xdata);
+ }
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ const char *name, dict_t *xdata)
+{
+ dict_t *result;
+ nsr_private_t *priv = this->private;
+
+ if (!priv->leader) {
+ STACK_UNWIND_STRICT (getxattr, frame, -1, EREMOTE, NULL, NULL);
+ return 0;
+ }
+
+ if (!name || (strcmp(name, NSR_REP_COUNT_XATTR) != 0)) {
+ STACK_WIND_TAIL (frame,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->getxattr,
+ loc, name, xdata);
+ return 0;
+ }
+
+ result = dict_new();
+ if (!result) {
+ goto dn_failed;
+ }
+
+ priv->up_children = nsr_count_up_kids(this->private);
+ if (dict_set_uint32(result, NSR_REP_COUNT_XATTR,
+ priv->up_children) != 0) {
+ goto dsu_failed;
+ }
+
+ STACK_UNWIND_STRICT (getxattr, frame, 0, 0, result, NULL);
+ dict_destroy(result);
+ return 0;
+
+dsu_failed:
+ dict_destroy(result);
+dn_failed:
+ STACK_UNWIND_STRICT (getxattr, frame, -1, ENOMEM, NULL, NULL);
+ return 0;
+}
+
+void
+nsr_flush_fd (xlator_t *this, nsr_fd_ctx_t *fd_ctx)
+{
+ nsr_dirty_list_t *dirty;
+ nsr_dirty_list_t *dtmp;
+
+ list_for_each_entry_safe (dirty, dtmp, &fd_ctx->dirty_list, links) {
+ gf_msg_trace (this->name, 0,
+ "sending post-op on %p (%p)", fd_ctx->fd, dirty);
+ GF_FREE(dirty);
+ }
+
+ INIT_LIST_HEAD(&fd_ctx->dirty_list);
+}
+
+void *
+nsr_flush_thread (void *ctx)
+{
+ xlator_t *this = ctx;
+ nsr_private_t *priv = this->private;
+ struct list_head dirty_fds;
+ nsr_fd_ctx_t *fd_ctx;
+ nsr_fd_ctx_t *fd_tmp;
+ int ret;
+
+ for (;;) {
+ /*
+ * We have to be very careful to avoid lock inversions here, so
+ * we can't just hold priv->dirty_lock while we take and
+ * release locks for each fd. Instead, we only hold dirty_lock
+ * at the beginning of each iteration, as we (effectively) make
+ * a copy of the current list head and then clear the original.
+ * This leads to four scenarios for adding the first entry to
+ * an fd and potentially putting it on the global list.
+ *
+ * (1) While we're asleep. No lock contention, it just gets
+ * added and will be processed on the next iteration.
+ *
+ * (2) After we've made a local copy, but before we've started
+ * processing that fd. The new entry will be added to the
+ * fd (under its lock), and we'll process it on the current
+ * iteration.
+ *
+ * (3) While we're processing the fd. They'll block on the fd
+ * lock, then see that the list is empty and put it on the
+ * global list. We'll process it here on the next
+ * iteration.
+ *
+ * (4) While we're working, but after we've processed that fd.
+ * Same as (1) as far as that fd is concerned.
+ */
+ INIT_LIST_HEAD(&dirty_fds);
+ LOCK(&priv->dirty_lock);
+ list_splice_init(&priv->dirty_fds, &dirty_fds);
+ UNLOCK(&priv->dirty_lock);
+
+ list_for_each_entry_safe (fd_ctx, fd_tmp, &dirty_fds, fd_list) {
+ ret = syncop_fsync(FIRST_CHILD(this), fd_ctx->fd, 0,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_WARNING, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "failed to fsync %p (%d)",
+ fd_ctx->fd, -ret);
+ }
+
+ LOCK(&fd_ctx->fd->lock);
+ nsr_flush_fd(this, fd_ctx);
+ list_del_init(&fd_ctx->fd_list);
+ UNLOCK(&fd_ctx->fd->lock);
+ fd_unref(fd_ctx->fd);
+ }
+
+ sleep(NSR_FLUSH_INTERVAL);
+ }
+
+ return NULL;
+}
+
+
+int32_t
+nsr_get_changelog_dir (xlator_t *this, char **cl_dir_p)
+{
+ xlator_t *cl_xl;
+
+ /* Find our changelog translator. */
+ cl_xl = this;
+ while (cl_xl) {
+ if (strcmp(cl_xl->type, "features/changelog") == 0) {
+ break;
+ }
+ cl_xl = cl_xl->children->xlator;
+ }
+ if (!cl_xl) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_INIT_FAIL,
+ "failed to find changelog translator");
+ return ENOENT;
+ }
+
+ /* Find the actual changelog directory. */
+ if (dict_get_str(cl_xl->options, "changelog-dir", cl_dir_p) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_INIT_FAIL,
+ "failed to find changelog-dir for %s", cl_xl->name);
+ return ENODATA;
+ }
+
+ return 0;
+}
+
+
+void
+nsr_get_terms (call_frame_t *frame, xlator_t *this)
+{
+ int32_t op_errno;
+ char *cl_dir;
+ DIR *fp = NULL;
+ struct dirent *rd_entry;
+ struct dirent *rd_result;
+ int32_t term_first = -1;
+ int32_t term_contig = -1;
+ int32_t term_last = -1;
+ int term_num;
+ char *probe_str;
+ dict_t *my_xdata = NULL;
+
+ op_errno = nsr_get_changelog_dir(this, &cl_dir);
+ if (op_errno) {
+ goto err; /* Error was already logged. */
+ }
+ op_errno = ENODATA; /* Most common error after this. */
+
+ rd_entry = alloca (offsetof(struct dirent, d_name) +
+ pathconf(cl_dir, _PC_NAME_MAX) + 1);
+ if (!rd_entry) {
+ goto err;
+ }
+
+ fp = sys_opendir (cl_dir);
+ if (!fp) {
+ op_errno = errno;
+ goto err;
+ }
+
+ /* Find first and last terms. */
+ for (;;) {
+ if (readdir_r(fp, rd_entry, &rd_result) != 0) {
+ op_errno = errno;
+ goto err;
+ }
+ if (!rd_result) {
+ break;
+ }
+ if (fnmatch("TERM.*", rd_entry->d_name, FNM_PATHNAME) != 0) {
+ continue;
+ }
+ /* +5 points to the character after the period */
+ term_num = atoi(rd_entry->d_name+5);
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ N_MSG_GENERIC,
+ "%s => %d", rd_entry->d_name, term_num);
+ if (term_num < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_INVALID,
+ "invalid term file name %s", rd_entry->d_name);
+ op_errno = EINVAL;
+ goto err;
+ }
+ if ((term_first < 0) || (term_first > term_num)) {
+ term_first = term_num;
+ }
+ if ((term_last < 0) || (term_last < term_num)) {
+ term_last = term_num;
+ }
+ }
+ if ((term_first < 0) || (term_last < 0)) {
+ /* TBD: are we *sure* there should always be at least one? */
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_NO_DATA, "no terms found");
+ op_errno = EINVAL;
+ goto err;
+ }
+
+ sys_closedir (fp);
+ fp = NULL;
+
+ /*
+ * Find term_contig, which is the earliest term for which there are
+ * no gaps between it and term_last.
+ */
+ for (term_contig = term_last; term_contig > 0; --term_contig) {
+ if (gf_asprintf(&probe_str, "%s/TERM.%d",
+ cl_dir, term_contig-1) <= 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR,
+ "failed to format term %d", term_contig-1);
+ goto err;
+ }
+ if (sys_access(probe_str, F_OK) != 0) {
+ GF_FREE(probe_str);
+ break;
+ }
+ GF_FREE(probe_str);
+ }
+
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ N_MSG_GENERIC,
+ "found terms %d-%d (%d)",
+ term_first, term_last, term_contig);
+
+ /* Return what we've found */
+ my_xdata = dict_new();
+ if (!my_xdata) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR,
+ "failed to allocate reply dictionary");
+ goto err;
+ }
+ if (dict_set_int32(my_xdata, "term-first", term_first) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR,
+ "failed to set term-first");
+ goto err;
+ }
+ if (dict_set_int32(my_xdata, "term-contig", term_contig) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR,
+ "failed to set term-contig");
+ goto err;
+ }
+ if (dict_set_int32(my_xdata, "term-last", term_last) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR,
+ "failed to set term-last");
+ goto err;
+ }
+
+ /* Finally! */
+ STACK_UNWIND_STRICT (ipc, frame, 0, 0, my_xdata);
+ dict_unref(my_xdata);
+ return;
+
+err:
+ if (fp) {
+ sys_closedir (fp);
+ }
+ if (my_xdata) {
+ dict_unref(my_xdata);
+ }
+ STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL);
+}
+
+
+long
+get_entry_count (xlator_t *this, int fd)
+{
+ struct stat buf;
+ long min; /* last entry not known to be empty */
+ long max; /* first entry known to be empty */
+ long curr;
+ char entry[CHANGELOG_ENTRY_SIZE];
+
+ if (sys_fstat (fd, &buf) < 0) {
+ return -1;
+ }
+
+ min = 0;
+ max = buf.st_size / CHANGELOG_ENTRY_SIZE;
+
+ while ((min+1) < max) {
+ curr = (min + max) / 2;
+ if (sys_lseek(fd, curr*CHANGELOG_ENTRY_SIZE, SEEK_SET) < 0) {
+ return -1;
+ }
+ if (sys_read(fd, entry, sizeof(entry)) != sizeof(entry)) {
+ return -1;
+ }
+ if ((entry[0] == '_') && (entry[1] == 'P')) {
+ min = curr;
+ } else {
+ max = curr;
+ }
+ }
+
+ if (sys_lseek(fd, 0, SEEK_SET) < 0) {
+ gf_msg (this->name, GF_LOG_WARNING, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "failed to reset offset");
+ }
+ return max;
+}
+
+
+void
+nsr_open_term (call_frame_t *frame, xlator_t *this, dict_t *xdata)
+{
+ int32_t op_errno;
+ char *cl_dir;
+ char *term;
+ char *path;
+ nsr_private_t *priv = this->private;
+
+ op_errno = nsr_get_changelog_dir(this, &cl_dir);
+ if (op_errno) {
+ goto err;
+ }
+
+ if (dict_get_str(xdata, "term", &term) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_NO_DATA, "missing term");
+ op_errno = ENODATA;
+ goto err;
+ }
+
+ if (gf_asprintf(&path, "%s/TERM.%s", cl_dir, term) < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR, "failed to construct path");
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ if (priv->term_fd >= 0) {
+ sys_close (priv->term_fd);
+ }
+ priv->term_fd = open(path, O_RDONLY);
+ if (priv->term_fd < 0) {
+ op_errno = errno;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "failed to open term file");
+ goto err;
+ }
+
+ priv->term_total = get_entry_count(this, priv->term_fd);
+ if (priv->term_total < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_NO_DATA, "failed to get entry count");
+ sys_close (priv->term_fd);
+ priv->term_fd = -1;
+ op_errno = EIO;
+ goto err;
+ }
+ priv->term_read = 0;
+
+ /* Success! */
+ STACK_UNWIND_STRICT (ipc, frame, 0, 0, NULL);
+ return;
+
+err:
+ STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL);
+}
+
+
+void
+nsr_next_entry (call_frame_t *frame, xlator_t *this)
+{
+ int32_t op_errno = ENOMEM;
+ nsr_private_t *priv = this->private;
+ ssize_t nbytes;
+ dict_t *my_xdata;
+
+ if (priv->term_fd < 0) {
+ op_errno = EBADFD;
+ goto err;
+ }
+
+ if (priv->term_read >= priv->term_total) {
+ op_errno = ENODATA;
+ goto err;
+ }
+
+ nbytes = sys_read (priv->term_fd, priv->term_buf, CHANGELOG_ENTRY_SIZE);
+ if (nbytes < CHANGELOG_ENTRY_SIZE) {
+ if (nbytes < 0) {
+ op_errno = errno;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "error reading next entry: %s",
+ strerror(errno));
+ } else {
+ op_errno = EIO;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_SYS_CALL_FAILURE,
+ "got %ld/%d bytes for next entry",
+ nbytes, CHANGELOG_ENTRY_SIZE);
+ }
+ goto err;
+ }
+ ++(priv->term_read);
+
+ my_xdata = dict_new();
+ if (!my_xdata) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_MEM_ERR, "failed to allocate reply xdata");
+ goto err;
+ }
+
+ if (dict_set_static_bin(my_xdata, "data",
+ priv->term_buf, CHANGELOG_ENTRY_SIZE) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ N_MSG_DICT_FLR, "failed to assign reply xdata");
+ goto err;
+ }
+
+ STACK_UNWIND_STRICT (ipc, frame, 0, 0, my_xdata);
+ dict_unref(my_xdata);
+ return;
+
+err:
+ STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL);
+}
+
+
+int32_t
+nsr_ipc (call_frame_t *frame, xlator_t *this, int32_t op, dict_t *xdata)
+{
+ switch (op) {
+ case NSR_SERVER_TERM_RANGE:
+ nsr_get_terms(frame, this);
+ break;
+ case NSR_SERVER_OPEN_TERM:
+ nsr_open_term(frame, this, xdata);
+ break;
+ case NSR_SERVER_NEXT_ENTRY:
+ nsr_next_entry(frame, this);
+ break;
+ default:
+ STACK_WIND_TAIL (frame,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->ipc,
+ op, xdata);
+ }
+
+ return 0;
+}
+
+
+int32_t
+nsr_forget (xlator_t *this, inode_t *inode)
+{
+ uint64_t ctx = 0LL;
+
+ if ((inode_ctx_del(inode, this, &ctx) == 0) && ctx) {
+ GF_FREE((void *)(long)ctx);
+ }
+
+ return 0;
+}
+
+int32_t
+nsr_release (xlator_t *this, fd_t *fd)
+{
+ uint64_t ctx = 0LL;
+
+ if ((fd_ctx_del(fd, this, &ctx) == 0) && ctx) {
+ GF_FREE((void *)(long)ctx);
+ }
+
+ return 0;
+}
+
+struct xlator_cbks cbks = {
+ .forget = nsr_forget,
+ .release = nsr_release,
+};
+
+int
+nsr_reconfigure (xlator_t *this, dict_t *options)
+{
+ nsr_private_t *priv = this->private;
+
+ GF_OPTION_RECONF ("leader",
+ priv->config_leader, options, bool, err);
+ GF_OPTION_RECONF ("quorum-percent",
+ priv->quorum_pct, options, percent, err);
+ gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC,
+ "reconfigure called, config_leader = %d, quorum_pct = %.1f\n",
+ priv->leader, priv->quorum_pct);
+
+ priv->leader = priv->config_leader;
+
+ return 0;
+
+err:
+ return -1;
+}
+
+int
+nsr_get_child_index (xlator_t *this, xlator_t *kid)
+{
+ xlator_list_t *trav;
+ int retval = -1;
+
+ for (trav = this->children; trav; trav = trav->next) {
+ ++retval;
+ if (trav->xlator == kid) {
+ return retval;
+ }
+ }
+
+ return -1;
+}
+
+/*
+ * Child notify handling is unreasonably FUBAR. Sometimes we'll get a
+ * CHILD_DOWN for a protocol/client child before we ever got a CHILD_UP for it.
+ * Other times we won't. Because it's effectively random (probably racy), we
+ * can't just maintain a count. We actually have to keep track of the state
+ * for each child separately, to filter out the bogus CHILD_DOWN events, and
+ * then generate counts on demand.
+ */
+int
+nsr_notify (xlator_t *this, int event, void *data, ...)
+{
+ nsr_private_t *priv = this->private;
+ int index;
+
+ switch (event) {
+ case GF_EVENT_CHILD_UP:
+ index = nsr_get_child_index(this, data);
+ if (index >= 0) {
+ priv->kid_state |= (1 << index);
+ priv->up_children = nsr_count_up_kids(priv);
+ gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC,
+ "got CHILD_UP for %s, now %u kids",
+ ((xlator_t *)data)->name,
+ priv->up_children);
+ if (!priv->config_leader && (priv->up_children > 1)) {
+ priv->leader = _gf_false;
+ }
+ }
+ break;
+ case GF_EVENT_CHILD_DOWN:
+ index = nsr_get_child_index(this, data);
+ if (index >= 0) {
+ priv->kid_state &= ~(1 << index);
+ priv->up_children = nsr_count_up_kids(priv);
+ gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC,
+ "got CHILD_DOWN for %s, now %u kids",
+ ((xlator_t *)data)->name,
+ priv->up_children);
+ if (!priv->config_leader && (priv->up_children < 2)) {
+ priv->leader = _gf_true;
+ }
+ }
+ break;
+ default:
+ ;
+ }
+
+ return default_notify(this, event, data);
+}
+
+
+int32_t
+mem_acct_init (xlator_t *this)
+{
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("nsr", this, out);
+
+ ret = xlator_mem_acct_init (this, gf_mt_nsr_end + 1);
+
+ if (ret != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR,
+ "Memory accounting init" "failed");
+ return ret;
+ }
+out:
+ return ret;
+}
+
+
+void
+nsr_deallocate_priv (nsr_private_t *priv)
+{
+ if (!priv) {
+ return;
+ }
+
+ GF_FREE(priv);
+}
+
+
+int32_t
+nsr_init (xlator_t *this)
+{
+ xlator_list_t *remote;
+ xlator_list_t *local;
+ nsr_private_t *priv = NULL;
+ xlator_list_t *trav;
+ pthread_t kid;
+ extern xlator_t global_xlator;
+ glusterfs_ctx_t *oldctx = global_xlator.ctx;
+
+ /*
+ * Any fop that gets special treatment has to be patched in here,
+ * because the compiled-in table is produced by the code generator and
+ * only contains generated functions. Note that we have to go through
+ * this->fops because of some dynamic-linking strangeness; modifying
+ * the static table doesn't work.
+ */
+ this->fops->getxattr = nsr_getxattr_special;
+ this->fops->fsync = nsr_fsync;
+ this->fops->ipc = nsr_ipc;
+
+ local = this->children;
+ if (!local) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_NO_DATA,
+ "no local subvolume");
+ goto err;
+ }
+
+ remote = local->next;
+ if (!remote) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_NO_DATA,
+ "no remote subvolumes");
+ goto err;
+ }
+
+ this->local_pool = mem_pool_new (nsr_local_t, 128);
+ if (!this->local_pool) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR,
+ "failed to create nsr_local_t pool");
+ goto err;
+ }
+
+ priv = GF_CALLOC (1, sizeof(*priv), gf_mt_nsr_private_t);
+ if (!priv) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR,
+ "could not allocate priv");
+ goto err;
+ }
+
+ for (trav = this->children; trav; trav = trav->next) {
+ ++(priv->n_children);
+ }
+
+ LOCK_INIT(&priv->dirty_lock);
+ LOCK_INIT(&priv->index_lock);
+ INIT_LIST_HEAD(&priv->dirty_fds);
+ priv->term_fd = -1;
+
+ this->private = priv;
+
+ GF_OPTION_INIT ("leader", priv->config_leader, bool, err);
+ GF_OPTION_INIT ("quorum-percent", priv->quorum_pct, percent, err);
+
+ priv->leader = priv->config_leader;
+
+ if (pthread_create(&kid, NULL, nsr_flush_thread,
+ this) != 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_SYS_CALL_FAILURE,
+ "could not start flush thread");
+ /* TBD: treat this as a fatal error? */
+ }
+
+ /*
+ * Calling glfs_new changes old->ctx, even if THIS still points
+ * to global_xlator. That causes problems later in the main
+ * thread, when gf_log_dump_graph tries to use the FILE after
+ * we've mucked with it and gets a segfault in __fprintf_chk.
+ * We can avoid all that by undoing the damage before we
+ * continue.
+ */
+ global_xlator.ctx = oldctx;
+
+ return 0;
+
+err:
+ nsr_deallocate_priv(priv);
+ return -1;
+}
+
+
+void
+nsr_fini (xlator_t *this)
+{
+ nsr_deallocate_priv(this->private);
+}
+
+class_methods_t class_methods = {
+ .init = nsr_init,
+ .fini = nsr_fini,
+ .reconfigure = nsr_reconfigure,
+ .notify = nsr_notify,
+};
+
+struct volume_options options[] = {
+ { .key = {"leader"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "false",
+ .description = "Start in the leader role. This is only for "
+ "bootstrapping the code, and should go away when we "
+ "have real leader election."
+ },
+ { .key = {"vol-name"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "volume name"
+ },
+ { .key = {"my-name"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "brick name in form of host:/path"
+ },
+ { .key = {"etcd-servers"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "list of comma seperated etc servers"
+ },
+ { .key = {"subvol-uuid"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "UUID for this NSR (sub)volume"
+ },
+ { .key = {"quorum-percent"},
+ .type = GF_OPTION_TYPE_PERCENT,
+ .default_value = "50.0",
+ .description = "percentage of rep_count-1 that must be up"
+ },
+ { .key = {NULL} },
+};