diff options
Diffstat (limited to 'xlators/experimental/nsr-server')
-rw-r--r-- | xlators/experimental/nsr-server/Makefile.am | 3 | ||||
-rw-r--r-- | xlators/experimental/nsr-server/src/Makefile.am | 35 | ||||
-rw-r--r-- | xlators/experimental/nsr-server/src/all-templates.c | 429 | ||||
-rwxr-xr-x | xlators/experimental/nsr-server/src/gen-fops.py | 138 | ||||
-rw-r--r-- | xlators/experimental/nsr-server/src/nsr-internal.h | 114 | ||||
-rw-r--r-- | xlators/experimental/nsr-server/src/nsr.c | 1066 |
6 files changed, 1785 insertions, 0 deletions
diff --git a/xlators/experimental/nsr-server/Makefile.am b/xlators/experimental/nsr-server/Makefile.am new file mode 100644 index 00000000000..a985f42a877 --- /dev/null +++ b/xlators/experimental/nsr-server/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES = diff --git a/xlators/experimental/nsr-server/src/Makefile.am b/xlators/experimental/nsr-server/src/Makefile.am new file mode 100644 index 00000000000..6c0597610a2 --- /dev/null +++ b/xlators/experimental/nsr-server/src/Makefile.am @@ -0,0 +1,35 @@ +xlator_LTLIBRARIES = nsr.la +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/experimental + +nodist_nsr_la_SOURCES = nsr-cg.c +CLEANFILES = $(nodist_nsr_la_SOURCES) + +nsr_la_LDFLAGS = -module -avoid-version +nsr_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la \ + $(top_builddir)/api/src/libgfapi.la + +noinst_HEADERS = nsr-internal.h \ + $(top_srcdir)/xlators/lib/src/libxlator.h \ + $(top_srcdir)/glusterfsd/src/glusterfsd.h + +AM_CPPFLAGS = $(GF_CPPFLAGS) \ + -I$(top_srcdir)/libglusterfs/src \ + -I$(top_srcdir)/xlators/lib/src \ + -I$(top_srcdir)/rpc/rpc-lib/src -DSBIN_DIR=\"$(sbindir)\" \ + -I$(top_srcdir)/api/src -DNSR_SCRIPT_PREFIX=\"$(nsrdir)\" \ + -I$(top_srcdir)/xlators/experimental/nsr-client/src/ + +AM_CFLAGS = -Wall $(GF_CFLAGS) + +NSR_PREFIX = $(top_srcdir)/xlators/experimental/nsr-server/src +NSR_GEN_FOPS = $(NSR_PREFIX)/gen-fops.py +NSR_TEMPLATES = $(NSR_PREFIX)/all-templates.c +NSR_WRAPPER = $(NSR_PREFIX)/nsr.c +noinst_PYTHON = $(NSR_GEN_FOPS) +EXTRA_DIST = $(NSR_TEMPLATES) $(NSR_WRAPPER) + +nsr-cg.c: $(NSR_GEN_FOPS) $(NSR_TEMPLATES) $(NSR_WRAPPER) + $(PYTHON) $(NSR_GEN_FOPS) $(NSR_TEMPLATES) $(NSR_WRAPPER) > $@ + +uninstall-local: + rm -f $(DESTDIR)$(xlatordir)/nsr.so diff --git a/xlators/experimental/nsr-server/src/all-templates.c b/xlators/experimental/nsr-server/src/all-templates.c new file mode 100644 index 00000000000..300abea959d --- /dev/null +++ b/xlators/experimental/nsr-server/src/all-templates.c @@ -0,0 +1,429 @@ +/* + * You can put anything here - it doesn't even have to be a comment - and it + * will be ignored until we reach the first template-name comment. + */ + + +/* template-name read-fop */ +int32_t +nsr_@NAME@ (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@) +{ + nsr_private_t *priv = this->private; + gf_boolean_t in_recon = _gf_false; + int32_t recon_term, recon_index; + + /* allow reads during reconciliation * + * TBD: allow "dirty" reads on non-leaders * + */ + if (xdata && + (dict_get_int32(xdata, RECON_TERM_XATTR, &recon_term) == 0) && + (dict_get_int32(xdata, RECON_INDEX_XATTR, &recon_index) == 0)) { + in_recon = _gf_true; + } + + if ((!priv->leader) && (in_recon == _gf_false)) { + goto err; + } + + STACK_WIND (frame, default_@NAME@_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@, + @SHORT_ARGS@); + return 0; + +err: + STACK_UNWIND_STRICT (@NAME@, frame, -1, EREMOTE, + @ERROR_ARGS@); + return 0; +} + +/* template-name read-dispatch */ +/* No "dispatch" function needed for @NAME@ */ + +/* template-name read-fan-in */ +/* No "fan-in" function needed for @NAME@ */ + +/* template-name read-continue */ +/* No "continue" function needed for @NAME@ */ + +/* template-name read-complete */ +/* No "complete" function needed for @NAME@ */ + +/* template-name write-fop */ +int32_t +nsr_@NAME@ (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@) +{ + nsr_local_t *local = NULL; + nsr_private_t *priv = this->private; + gf_boolean_t result = _gf_false; + int op_errno = ENOMEM; + int from_leader; + int from_recon; + uint32_t ti = 0; + + /* + * Our first goal here is to avoid "split brain surprise" for users who + * specify exactly 50% with two- or three-way replication. That means + * either a more-than check against half the total replicas or an + * at-least check against half of our peers (one less). Of the two, + * only an at-least check supports the intuitive use of 100% to mean + * all replicas must be present, because "more than 100%" will never + * succeed regardless of which count we use. This leaves us with a + * slightly non-traditional definition of quorum ("at least X% of peers + * not including ourselves") but one that's useful enough to be worth + * it. + * + * Note that n_children and up_children *do* include the local + * subvolume, so we need to subtract one in each case. + */ + if (priv->leader) { + result = fop_quorum_check (this, (double)(priv->n_children - 1), + (double)(priv->up_children - 1)); + + if (result == _gf_false) { + /* Emulate the AFR client-side-quorum behavior. */ + op_errno = EROFS; + goto err; + } + } else { + if (xdata) { + from_leader = !!dict_get(xdata, NSR_TERM_XATTR); + from_recon = !!dict_get(xdata, RECON_TERM_XATTR) + && !!dict_get(xdata, RECON_INDEX_XATTR); + } else { + from_leader = from_recon = _gf_false; + } + + /* follower/recon path * + * just send it to local node * + */ + if (!from_leader && !from_recon) { + op_errno = EREMOTE; + goto err; + } + } + + local = mem_get0(this->local_pool); + if (!local) { + goto err; + } +#if defined(NSR_CG_NEED_FD) + local->fd = fd_ref(fd); +#else + local->fd = NULL; +#endif + INIT_LIST_HEAD(&local->qlinks); + frame->local = local; + + /* + * If we let it through despite not being the leader, then we just want + * to pass it on down without all of the additional xattrs, queuing, and + * so on. However, nsr_*_complete does depend on the initialization + * immediately above this. + */ + if (!priv->leader) { + STACK_WIND (frame, nsr_@NAME@_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@, + @SHORT_ARGS@); + return 0; + } + + if (!xdata) { + xdata = dict_new(); + if (!xdata) { + gf_msg (this->name, GF_LOG_ERROR, ENOMEM, + N_MSG_MEM_ERR, "failed to allocate xdata"); + goto err; + } + } + + if (dict_set_int32(xdata, NSR_TERM_XATTR, priv->current_term) != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_DICT_FLR, "failed to set nsr-term"); + goto err; + } + + LOCK(&priv->index_lock); + ti = ++(priv->index); + UNLOCK(&priv->index_lock); + if (dict_set_int32(xdata, NSR_INDEX_XATTR, ti) != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_DICT_FLR, "failed to set index"); + goto err; + } + + local->stub = fop_@NAME@_stub (frame, nsr_@NAME@_continue, + @SHORT_ARGS@); + if (!local->stub) { + goto err; + } + + +#if defined(NSR_CG_QUEUE) + nsr_inode_ctx_t *ictx = nsr_get_inode_ctx(this, fd->inode); + + if (!ictx) { + op_errno = EIO; + goto err; + } + LOCK(&ictx->lock); + if (ictx->active) { + gf_msg_debug (this->name, 0, + "queuing request due to conflict"); + /* + * TBD: enqueue only for real conflict + * + * Currently we just act like all writes are in + * conflict with one another. What we should really do + * is check the active/pending queues and defer only if + * there's a conflict there. + * + * It's important to check the pending queue because we + * might have an active request X which conflicts with + * a pending request Y, and this request Z might + * conflict with Y but not X. If we checked only the + * active queue then Z could jump ahead of Y, which + * would be incorrect. + */ + local->qstub = fop_@NAME@_stub (frame, + nsr_@NAME@_dispatch, + @SHORT_ARGS@); + if (!local->qstub) { + UNLOCK(&ictx->lock); + goto err; + } + list_add_tail(&local->qlinks, &ictx->pqueue); + ++(ictx->pending); + UNLOCK(&ictx->lock); + return 0; + } else { + list_add_tail(&local->qlinks, &ictx->aqueue); + ++(ictx->active); + } + UNLOCK(&ictx->lock); +#endif + + return nsr_@NAME@_dispatch (frame, this, @SHORT_ARGS@); + +err: + if (local) { + if (local->stub) { + call_stub_destroy(local->stub); + } + if (local->qstub) { + call_stub_destroy(local->qstub); + } + if (local->fd) { + fd_unref(local->fd); + } + mem_put(local); + } + STACK_UNWIND_STRICT (@NAME@, frame, -1, op_errno, + @ERROR_ARGS@); + return 0; +} + +/* template-name write-dispatch */ +int32_t +nsr_@NAME@_dispatch (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@) +{ + nsr_local_t *local = frame->local; + nsr_private_t *priv = this->private; + xlator_list_t *trav; + + /* + * TBD: unblock pending request(s) if we fail after this point but + * before we get to nsr_@NAME@_complete (where that code currently + * resides). + */ + + local->call_count = priv->n_children - 1; + local->successful_acks = 0; + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_@NAME@_fan_in, + trav->xlator, trav->xlator->fops->@NAME@, + @SHORT_ARGS@); + } + + /* TBD: variable Issue count */ + return 0; +} + +/* template-name write-fan-in */ +int32_t +nsr_@NAME@_fan_in (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + @LONG_ARGS@) +{ + nsr_local_t *local = frame->local; + uint8_t call_count; + + gf_msg_trace (this->name, 0, "op_ret = %d, op_errno = %d\n", + op_ret, op_errno); + + LOCK(&frame->lock); + call_count = --(local->call_count); + if (op_ret != -1) { + /* Increment the number of successful acks * + * received for the operation. * + */ + (local->successful_acks)++; + local->successful_op_ret = op_ret; + } + gf_msg_debug (this->name, 0, "succ_acks = %d, op_ret = %d, op_errno = %d\n", + op_ret, op_errno, local->successful_acks); + UNLOCK(&frame->lock); + + /* TBD: variable Completion count */ + if (call_count == 0) { + call_resume(local->stub); + } + + return 0; +} + +/* template-name write-continue */ +int32_t +nsr_@NAME@_continue (call_frame_t *frame, xlator_t *this, + @LONG_ARGS@) +{ + int32_t ret = -1; + gf_boolean_t result = _gf_false; + nsr_local_t *local = NULL; + nsr_private_t *priv = NULL; + + GF_VALIDATE_OR_GOTO ("nsr", this, out); + GF_VALIDATE_OR_GOTO (this->name, frame, out); + priv = this->private; + local = frame->local; + GF_VALIDATE_OR_GOTO (this->name, priv, out); + GF_VALIDATE_OR_GOTO (this->name, local, out); + + /* Perform quorum check to see if the leader needs * + * to perform the operation. If the operation will not * + * meet quorum irrespective of the leader's result * + * there is no point in the leader performing the fop * + */ + result = fop_quorum_check (this, (double)priv->n_children, + (double)local->successful_acks + 1); + if (result == _gf_false) { + STACK_UNWIND_STRICT (@NAME@, frame, -1, EROFS, + @ERROR_ARGS@); + } else { + STACK_WIND (frame, nsr_@NAME@_complete, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->@NAME@, + @SHORT_ARGS@); + } + + ret = 0; +out: + return ret; +} + +/* template-name write-complete */ +int32_t +nsr_@NAME@_complete (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, + @LONG_ARGS@) +{ + gf_boolean_t result = _gf_false; + nsr_private_t *priv = this->private; + + nsr_local_t *local = frame->local; + + /* If the fop failed on the leader, then reduce one succesful ack + * before calculating the fop quorum + */ + LOCK(&frame->lock); + if (op_ret == -1) + (local->successful_acks)--; + UNLOCK(&frame->lock); + +#if defined(NSR_CG_QUEUE) + nsr_inode_ctx_t *ictx; + nsr_local_t *next; + + if (local->qlinks.next != &local->qlinks) { + list_del(&local->qlinks); + ictx = nsr_get_inode_ctx(this, local->fd->inode); + if (ictx) { + LOCK(&ictx->lock); + if (ictx->pending) { + /* + * TBD: dequeue *all* non-conflicting + * reqs + * + * With the stub implementation there + * can only be one request active at a + * time (zero here) so it's not an + * issue. In a real implementation + * there might still be other active + * requests to check against, and + * multiple pending requests that could + * continue. + */ + gf_msg_debug (this->name, 0, + "unblocking next request"); + --(ictx->pending); + next = list_entry (ictx->pqueue.next, + nsr_local_t, qlinks); + list_del(&next->qlinks); + list_add_tail(&next->qlinks, + &ictx->aqueue); + call_resume(next->qstub); + } else { + --(ictx->active); + } + UNLOCK(&ictx->lock); + } + } +#endif + +#if defined(NSR_CG_FSYNC) + nsr_mark_fd_dirty(this, local); +#endif + +#if defined(NSR_CG_NEED_FD) + fd_unref(local->fd); +#endif + + /* After the leader completes the fop, a quorum check is * + * performed, taking into account the outcome of the fop * + * on the leader. Irrespective of the fop being successful * + * or failing on the leader, the result of the quorum will * + * determine if the overall fop is successful or not. For * + * example, a fop might have succeeded on every node except * + * the leader, in which case as quorum is being met, the fop * + * will be treated as a successful fop, even though it failed * + * on the leader. On follower nodes, no quorum check should * + * be done, and the result is returned to the leader as is. * + */ + if (priv->leader) { + result = fop_quorum_check (this, (double)priv->n_children, + (double)local->successful_acks + 1); + if (result == _gf_false) { + op_ret = -1; + op_errno = EROFS; + gf_msg_debug (this->name, 0, + "Quorum is not met. The operation has failed."); + } else { +#if defined(NSR_CG_NEED_FD) + op_ret = local->successful_op_ret; +#else + op_ret = 0; +#endif + op_errno = 0; + gf_msg_debug (this->name, 0, + "Quorum has met. The operation has succeeded."); + } + } + + STACK_UNWIND_STRICT (@NAME@, frame, op_ret, op_errno, + @SHORT_ARGS@); + + + return 0; + +} diff --git a/xlators/experimental/nsr-server/src/gen-fops.py b/xlators/experimental/nsr-server/src/gen-fops.py new file mode 100755 index 00000000000..336b218a8fb --- /dev/null +++ b/xlators/experimental/nsr-server/src/gen-fops.py @@ -0,0 +1,138 @@ +#!/usr/bin/python + +# This script generates the boilerplate versions of most fops and cbks in the +# server. This allows the details of leadership-status checking, sequencing +# between leader and followers (including fan-out), and basic error checking +# to be centralized one place, with per-operation code kept to a minimum. + +import os +import re +import string +import sys + +curdir = os.path.dirname(sys.argv[0]) +gendir = os.path.join(curdir,'../../../../libglusterfs/src') +sys.path.append(gendir) +from generator import ops, fop_subs, cbk_subs, generate + +# We really want the callback argument list, even when we're generating fop +# code, so we propagate here. +# TBD: this should probably be right in generate.py +for k, v in cbk_subs.iteritems(): + fop_subs[k]['@ERROR_ARGS@'] = v['@ERROR_ARGS@'] + +# Stolen from old codegen.py +def load_templates (path): + templates = {} + tmpl_re = re.compile("/\* template-name (.*) \*/") + templates = {} + t_name = None + for line in open(path,"r").readlines(): + if not line: + break + m = tmpl_re.match(line) + if m: + if t_name: + templates[t_name] = string.join(t_contents,'') + t_name = m.group(1).strip() + t_contents = [] + elif t_name: + t_contents.append(line) + if t_name: + templates[t_name] = string.join(t_contents,'') + return templates + +# We need two types of templates. The first, for pure read operations, just +# needs to do a simple am-i-leader check (augmented to allow dirty reads). +# The second, for pure writes, needs to do fan-out to followers between those +# initial checks and local execution. There are other operations that don't +# fit neatly into either category - e.g. lock ops or fsync - so we'll just have +# to handle those manually. The table thus includes entries only for those we +# can categorize. The special cases, plus any new operations we've never even +# heard of, aren't in there. +# +# Various keywords can be used to define/undefine preprocessor symbols used +# in the templates, on a per-function basis. For example, if the keyword here +# is "fsync" (lowercase word or abbreviation) that will cause NSR_CG_FSYNC +# (prefix plus uppercase version) to be defined above all of the generated code +# for that fop. + +fop_table = { + "access": "read", + "create": "write", + "discard": "write", +# "entrylk": "read", + "fallocate": "write", +# "fentrylk": "read", + "fgetxattr": "read", +# "finodelk": "read", +# "flush": "read", + "fremovexattr": "write", + "fsetattr": "write", + "fsetxattr": "write", + "fstat": "read", +# "fsync": "read", +# "fsyncdir": "read", + "ftruncate": "write", + "fxattrop": "write", + "getxattr": "read", +# "inodelk": "read", + "link": "write", +# "lk": "read", +# "lookup": "read", + "mkdir": "write", + "mknod": "write", + "open": "write", + "opendir": "read", + "rchecksum": "read", + "readdir": "read", + "readdirp": "read", + "readlink": "read", + "readv": "read", + "removexattr": "write", + "rename": "write", + "rmdir": "write", + "setattr": "write", + "setxattr": "write", + "stat": "read", + "statfs": "read", + "symlink": "write", + "truncate": "write", + "unlink": "write", + "writev": "write,fsync,queue", + "xattrop": "write", +} + +# Stolen from gen_fdl.py +def gen_server (templates): + fops_done = [] + for name in fop_table.keys(): + info = fop_table[name].split(",") + kind = info[0] + flags = info[1:] + if ("fsync" in flags) or ("queue" in flags): + flags.append("need_fd") + for fname in flags: + print "#define NSR_CG_%s" % fname.upper() + print generate(templates[kind+"-complete"],name,cbk_subs) + print generate(templates[kind+"-continue"],name,fop_subs) + print generate(templates[kind+"-fan-in"],name,cbk_subs) + print generate(templates[kind+"-dispatch"],name,fop_subs) + print generate(templates[kind+"-fop"],name,fop_subs) + for fname in flags: + print "#undef NSR_CG_%s" % fname.upper() + fops_done.append(name) + # Just for fun, emit the fops table too. + print("struct xlator_fops fops = {") + for x in fops_done: + print(" .%s = nsr_%s,"%(x,x)) + print("};") + +tmpl = load_templates(sys.argv[1]) +for l in open(sys.argv[2],'r').readlines(): + if l.find('#pragma generate') != -1: + print "/* BEGIN GENERATED CODE - DO NOT MODIFY */" + gen_server(tmpl) + print "/* END GENERATED CODE */" + else: + print l[:-1] diff --git a/xlators/experimental/nsr-server/src/nsr-internal.h b/xlators/experimental/nsr-server/src/nsr-internal.h new file mode 100644 index 00000000000..b8c7fc314b7 --- /dev/null +++ b/xlators/experimental/nsr-server/src/nsr-internal.h @@ -0,0 +1,114 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#include <sys/stat.h> +#include <sys/types.h> + +#define LEADER_XATTR "user.nsr.leader" +#define SECOND_CHILD(xl) (xl->children->next->xlator) +#define RECONCILER_PATH NSR_SCRIPT_PREFIX"/reconciler.py" +#define CHANGELOG_ENTRY_SIZE 128 + +enum { + gf_mt_nsr_private_t = gf_common_mt_end + 1, + gf_mt_nsr_fd_ctx_t, + gf_mt_nsr_inode_ctx_t, + gf_mt_nsr_dirty_t, + gf_mt_nsr_end +}; + +typedef enum nsr_recon_notify_ev_id_t { + NSR_RECON_SET_LEADER = 1, + NSR_RECON_ADD_CHILD = 2 +} nsr_recon_notify_ev_id_t; + +typedef struct _nsr_recon_notify_ev_s { + nsr_recon_notify_ev_id_t id; + uint32_t index; /* in case of add */ + struct list_head list; +} nsr_recon_notify_ev_t; + +typedef struct { + /* + * This is a hack to allow a non-leader to accept requests while the + * leader is down, and it only works for n=2. The way it works is that + * "config_leader" indicates the state from our options (via init or + * reconfigure) but "leader" is what the fop code actually looks at. If + * config_leader is true, then leader will *always* be true as well, + * giving that brick precedence. If config_leader is false, then + * leader will only be true if there is no connection to the other + * brick (tracked in nsr_notify). + * + * TBD: implement real leader election + */ + gf_boolean_t config_leader; + gf_boolean_t leader; + uint8_t up_children; + uint8_t n_children; + char *vol_file; + uint32_t current_term; + uint32_t kid_state; + gf_lock_t dirty_lock; + struct list_head dirty_fds; + uint32_t index; + gf_lock_t index_lock; + double quorum_pct; + int term_fd; + long term_total; + long term_read; + /* + * This is a super-duper hack, but it will do for now. The reason it's + * a hack is that we pass this to dict_set_static_bin, so we don't have + * to mess around with allocating and freeing it on every single IPC + * request, but it's totally not thread-safe. On the other hand, there + * should only be one reconciliation thread running and calling these + * functions at a time, so maybe that doesn't matter. + * + * TBD: re-evaluate how to manage this + */ + char term_buf[CHANGELOG_ENTRY_SIZE]; +} nsr_private_t; + +typedef struct { + call_stub_t *stub; + call_stub_t *qstub; + uint32_t call_count; + uint32_t successful_acks; + uint32_t successful_op_ret; + fd_t *fd; + struct list_head qlinks; +} nsr_local_t; + +/* + * This should match whatever changelog returns on the pre-op for us to pass + * when we're ready for our post-op. + */ +typedef uint32_t log_id_t; + +typedef struct { + struct list_head links; + log_id_t id; +} nsr_dirty_list_t; + +typedef struct { + fd_t *fd; + struct list_head dirty_list; + struct list_head fd_list; +} nsr_fd_ctx_t; + +typedef struct { + gf_lock_t lock; + uint32_t active; + struct list_head aqueue; + uint32_t pending; + struct list_head pqueue; +} nsr_inode_ctx_t; + +void nsr_start_reconciler (xlator_t *this); diff --git a/xlators/experimental/nsr-server/src/nsr.c b/xlators/experimental/nsr-server/src/nsr.c new file mode 100644 index 00000000000..0c494b78125 --- /dev/null +++ b/xlators/experimental/nsr-server/src/nsr.c @@ -0,0 +1,1066 @@ +/* + Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <fnmatch.h> +#include "call-stub.h" +#include "defaults.h" +#include "xlator.h" +#include "glfs.h" +#include "glfs-internal.h" +#include "run.h" +#include "common-utils.h" +#include "syncop.h" +#include "syscall.h" + +#include "nsr-internal.h" +#include "nsr-messages.h" + +#define NSR_FLUSH_INTERVAL 5 + +enum { + /* echo "cluster/nsr-server" | md5sum | cut -c 1-8 */ + NSR_SERVER_IPC_BASE = 0x0e2d66a5, + NSR_SERVER_TERM_RANGE, + NSR_SERVER_OPEN_TERM, + NSR_SERVER_NEXT_ENTRY +}; + +/* Used to check the quorum of acks received after the fop + * confirming the status of the fop on all the brick processes + * for this particular subvolume + */ +gf_boolean_t +fop_quorum_check (xlator_t *this, double n_children, + double current_state) +{ + nsr_private_t *priv = NULL; + gf_boolean_t result = _gf_false; + double required = 0; + double current = 0; + + GF_VALIDATE_OR_GOTO ("nsr", this, out); + priv = this->private; + GF_VALIDATE_OR_GOTO (this->name, priv, out); + + required = n_children * priv->quorum_pct; + + /* + * Before performing the fop on the leader, we need to check, + * if there is any merit in performing the fop on the leader. + * In a case, where even a successful write on the leader, will + * not meet quorum, there is no point in trying the fop on the + * leader. + * When this function is called after the leader has tried + * performing the fop, this check will calculate quorum taking into + * account the status of the fop on the leader. If the leader's + * op_ret was -1, the complete function would account that by + * decrementing successful_acks by 1 + */ + + current = current_state * 100.0; + + if (current < required) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_QUORUM_NOT_MET, + "Quorum not met. quorum_pct = %f " + "Current State = %f, Required State = %f", + priv->quorum_pct, current, + required); + } else + result = _gf_true; + +out: + return result; +} + +nsr_inode_ctx_t * +nsr_get_inode_ctx (xlator_t *this, inode_t *inode) +{ + uint64_t ctx_int = 0LL; + nsr_inode_ctx_t *ctx_ptr; + + if (__inode_ctx_get(inode, this, &ctx_int) == 0) { + ctx_ptr = (nsr_inode_ctx_t *)(long)ctx_int; + } else { + ctx_ptr = GF_CALLOC (1, sizeof(*ctx_ptr), + gf_mt_nsr_inode_ctx_t); + if (ctx_ptr) { + ctx_int = (uint64_t)(long)ctx_ptr; + if (__inode_ctx_set(inode, this, &ctx_int) == 0) { + LOCK_INIT(&ctx_ptr->lock); + INIT_LIST_HEAD(&ctx_ptr->aqueue); + INIT_LIST_HEAD(&ctx_ptr->pqueue); + } else { + GF_FREE(ctx_ptr); + ctx_ptr = NULL; + } + } + + } + + return ctx_ptr; +} + +nsr_fd_ctx_t * +nsr_get_fd_ctx (xlator_t *this, fd_t *fd) +{ + uint64_t ctx_int = 0LL; + nsr_fd_ctx_t *ctx_ptr; + + if (__fd_ctx_get(fd, this, &ctx_int) == 0) { + ctx_ptr = (nsr_fd_ctx_t *)(long)ctx_int; + } else { + ctx_ptr = GF_CALLOC (1, sizeof(*ctx_ptr), gf_mt_nsr_fd_ctx_t); + if (ctx_ptr) { + if (__fd_ctx_set(fd, this, (uint64_t)ctx_ptr) == 0) { + INIT_LIST_HEAD(&ctx_ptr->dirty_list); + INIT_LIST_HEAD(&ctx_ptr->fd_list); + } else { + GF_FREE(ctx_ptr); + ctx_ptr = NULL; + } + } + + } + + return ctx_ptr; +} + +void +nsr_mark_fd_dirty (xlator_t *this, nsr_local_t *local) +{ + fd_t *fd = local->fd; + nsr_fd_ctx_t *ctx_ptr; + nsr_dirty_list_t *dirty; + nsr_private_t *priv = this->private; + + /* + * TBD: don't do any of this for O_SYNC/O_DIRECT writes. + * Unfortunately, that optimization requires that we distinguish + * between writev and other "write" calls, saving the original flags + * and checking them in the callback. Too much work for too little + * gain right now. + */ + + LOCK(&fd->lock); + ctx_ptr = nsr_get_fd_ctx(this, fd); + dirty = GF_CALLOC(1, sizeof(*dirty), gf_mt_nsr_dirty_t); + if (ctx_ptr && dirty) { + gf_msg_trace (this->name, 0, + "marking fd %p as dirty (%p)", fd, dirty); + /* TBD: fill dirty->id from what changelog gave us */ + list_add_tail(&dirty->links, &ctx_ptr->dirty_list); + if (list_empty(&ctx_ptr->fd_list)) { + /* Add a ref so _release doesn't get called. */ + ctx_ptr->fd = fd_ref(fd); + LOCK(&priv->dirty_lock); + list_add_tail (&ctx_ptr->fd_list, + &priv->dirty_fds); + UNLOCK(&priv->dirty_lock); + } + } else { + gf_msg (this->name, GF_LOG_ERROR, ENOMEM, + N_MSG_MEM_ERR, "could not mark %p dirty", fd); + if (ctx_ptr) { + GF_FREE(ctx_ptr); + } + if (dirty) { + GF_FREE(dirty); + } + } + UNLOCK(&fd->lock); +} + +#define NSR_TERM_XATTR "trusted.nsr.term" +#define NSR_INDEX_XATTR "trusted.nsr.index" +#define NSR_REP_COUNT_XATTR "trusted.nsr.rep-count" +#define RECON_TERM_XATTR "trusted.nsr.recon-term" +#define RECON_INDEX_XATTR "trusted.nsr.recon-index" + +#pragma generate + +uint8_t +nsr_count_up_kids (nsr_private_t *priv) +{ + uint8_t retval = 0; + uint8_t i; + + for (i = 0; i < priv->n_children; ++i) { + if (priv->kid_state & (1 << i)) { + ++retval; + } + } + + return retval; +} + +/* + * The fsync machinery looks a lot like that for any write call, but there are + * some important differences that are easy to miss. First, we don't care + * about the xdata that shows whether the call came from a leader or + * reconciliation process. If we're the leader we fan out; if we're not we + * don't. Second, we don't wait for followers before we issue the local call. + * The code generation system could be updated to handle this, and still might + * if we need to implement other "almost identical" paths (e.g. for open), but + * a copy is more readable as long as it's just one. + */ + +int32_t +nsr_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + nsr_local_t *local = frame->local; + gf_boolean_t unwind; + + LOCK(&frame->lock); + unwind = !--(local->call_count); + UNLOCK(&frame->lock); + + if (unwind) { + STACK_UNWIND_STRICT (fsync, frame, op_ret, op_errno, prebuf, + postbuf, xdata); + } + return 0; +} + +int32_t +nsr_fsync_local_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + nsr_dirty_list_t *dirty; + nsr_dirty_list_t *dtmp; + nsr_local_t *local = frame->local; + + list_for_each_entry_safe (dirty, dtmp, &local->qlinks, links) { + gf_msg_trace (this->name, 0, + "sending post-op on %p (%p)", local->fd, dirty); + GF_FREE(dirty); + } + + return nsr_fsync_cbk (frame, cookie, this, op_ret, op_errno, + prebuf, postbuf, xdata); +} + +int32_t +nsr_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags, + dict_t *xdata) +{ + nsr_private_t *priv = this->private; + nsr_local_t *local; + uint64_t ctx_int = 0LL; + nsr_fd_ctx_t *ctx_ptr; + xlator_list_t *trav; + + local = mem_get0(this->local_pool); + if (!local) { + STACK_UNWIND_STRICT(fsync, frame, -1, ENOMEM, + NULL, NULL, xdata); + return 0; + } + INIT_LIST_HEAD(&local->qlinks); + frame->local = local; + + /* Move the dirty list from the fd to the fsync request. */ + LOCK(&fd->lock); + if (__fd_ctx_get(fd, this, &ctx_int) == 0) { + ctx_ptr = (nsr_fd_ctx_t *)(long)ctx_int; + list_splice_init (&ctx_ptr->dirty_list, + &local->qlinks); + } + UNLOCK(&fd->lock); + + /* Issue the local call. */ + local->call_count = priv->leader ? priv->n_children : 1; + STACK_WIND (frame, nsr_fsync_local_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync, + fd, flags, xdata); + + /* Issue remote calls if we're the leader. */ + if (priv->leader) { + for (trav = this->children->next; trav; trav = trav->next) { + STACK_WIND (frame, nsr_fsync_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsync, + fd, flags, xdata); + } + } + + return 0; +} + +int32_t +nsr_getxattr_special (call_frame_t *frame, xlator_t *this, loc_t *loc, + const char *name, dict_t *xdata) +{ + dict_t *result; + nsr_private_t *priv = this->private; + + if (!priv->leader) { + STACK_UNWIND_STRICT (getxattr, frame, -1, EREMOTE, NULL, NULL); + return 0; + } + + if (!name || (strcmp(name, NSR_REP_COUNT_XATTR) != 0)) { + STACK_WIND_TAIL (frame, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->getxattr, + loc, name, xdata); + return 0; + } + + result = dict_new(); + if (!result) { + goto dn_failed; + } + + priv->up_children = nsr_count_up_kids(this->private); + if (dict_set_uint32(result, NSR_REP_COUNT_XATTR, + priv->up_children) != 0) { + goto dsu_failed; + } + + STACK_UNWIND_STRICT (getxattr, frame, 0, 0, result, NULL); + dict_destroy(result); + return 0; + +dsu_failed: + dict_destroy(result); +dn_failed: + STACK_UNWIND_STRICT (getxattr, frame, -1, ENOMEM, NULL, NULL); + return 0; +} + +void +nsr_flush_fd (xlator_t *this, nsr_fd_ctx_t *fd_ctx) +{ + nsr_dirty_list_t *dirty; + nsr_dirty_list_t *dtmp; + + list_for_each_entry_safe (dirty, dtmp, &fd_ctx->dirty_list, links) { + gf_msg_trace (this->name, 0, + "sending post-op on %p (%p)", fd_ctx->fd, dirty); + GF_FREE(dirty); + } + + INIT_LIST_HEAD(&fd_ctx->dirty_list); +} + +void * +nsr_flush_thread (void *ctx) +{ + xlator_t *this = ctx; + nsr_private_t *priv = this->private; + struct list_head dirty_fds; + nsr_fd_ctx_t *fd_ctx; + nsr_fd_ctx_t *fd_tmp; + int ret; + + for (;;) { + /* + * We have to be very careful to avoid lock inversions here, so + * we can't just hold priv->dirty_lock while we take and + * release locks for each fd. Instead, we only hold dirty_lock + * at the beginning of each iteration, as we (effectively) make + * a copy of the current list head and then clear the original. + * This leads to four scenarios for adding the first entry to + * an fd and potentially putting it on the global list. + * + * (1) While we're asleep. No lock contention, it just gets + * added and will be processed on the next iteration. + * + * (2) After we've made a local copy, but before we've started + * processing that fd. The new entry will be added to the + * fd (under its lock), and we'll process it on the current + * iteration. + * + * (3) While we're processing the fd. They'll block on the fd + * lock, then see that the list is empty and put it on the + * global list. We'll process it here on the next + * iteration. + * + * (4) While we're working, but after we've processed that fd. + * Same as (1) as far as that fd is concerned. + */ + INIT_LIST_HEAD(&dirty_fds); + LOCK(&priv->dirty_lock); + list_splice_init(&priv->dirty_fds, &dirty_fds); + UNLOCK(&priv->dirty_lock); + + list_for_each_entry_safe (fd_ctx, fd_tmp, &dirty_fds, fd_list) { + ret = syncop_fsync(FIRST_CHILD(this), fd_ctx->fd, 0, + NULL, NULL); + if (ret) { + gf_msg (this->name, GF_LOG_WARNING, 0, + N_MSG_SYS_CALL_FAILURE, + "failed to fsync %p (%d)", + fd_ctx->fd, -ret); + } + + LOCK(&fd_ctx->fd->lock); + nsr_flush_fd(this, fd_ctx); + list_del_init(&fd_ctx->fd_list); + UNLOCK(&fd_ctx->fd->lock); + fd_unref(fd_ctx->fd); + } + + sleep(NSR_FLUSH_INTERVAL); + } + + return NULL; +} + + +int32_t +nsr_get_changelog_dir (xlator_t *this, char **cl_dir_p) +{ + xlator_t *cl_xl; + + /* Find our changelog translator. */ + cl_xl = this; + while (cl_xl) { + if (strcmp(cl_xl->type, "features/changelog") == 0) { + break; + } + cl_xl = cl_xl->children->xlator; + } + if (!cl_xl) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_INIT_FAIL, + "failed to find changelog translator"); + return ENOENT; + } + + /* Find the actual changelog directory. */ + if (dict_get_str(cl_xl->options, "changelog-dir", cl_dir_p) != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_INIT_FAIL, + "failed to find changelog-dir for %s", cl_xl->name); + return ENODATA; + } + + return 0; +} + + +void +nsr_get_terms (call_frame_t *frame, xlator_t *this) +{ + int32_t op_errno; + char *cl_dir; + DIR *fp = NULL; + struct dirent *rd_entry; + struct dirent *rd_result; + int32_t term_first = -1; + int32_t term_contig = -1; + int32_t term_last = -1; + int term_num; + char *probe_str; + dict_t *my_xdata = NULL; + + op_errno = nsr_get_changelog_dir(this, &cl_dir); + if (op_errno) { + goto err; /* Error was already logged. */ + } + op_errno = ENODATA; /* Most common error after this. */ + + rd_entry = alloca (offsetof(struct dirent, d_name) + + pathconf(cl_dir, _PC_NAME_MAX) + 1); + if (!rd_entry) { + goto err; + } + + fp = sys_opendir (cl_dir); + if (!fp) { + op_errno = errno; + goto err; + } + + /* Find first and last terms. */ + for (;;) { + if (readdir_r(fp, rd_entry, &rd_result) != 0) { + op_errno = errno; + goto err; + } + if (!rd_result) { + break; + } + if (fnmatch("TERM.*", rd_entry->d_name, FNM_PATHNAME) != 0) { + continue; + } + /* +5 points to the character after the period */ + term_num = atoi(rd_entry->d_name+5); + gf_msg (this->name, GF_LOG_INFO, 0, + N_MSG_GENERIC, + "%s => %d", rd_entry->d_name, term_num); + if (term_num < 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_INVALID, + "invalid term file name %s", rd_entry->d_name); + op_errno = EINVAL; + goto err; + } + if ((term_first < 0) || (term_first > term_num)) { + term_first = term_num; + } + if ((term_last < 0) || (term_last < term_num)) { + term_last = term_num; + } + } + if ((term_first < 0) || (term_last < 0)) { + /* TBD: are we *sure* there should always be at least one? */ + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_NO_DATA, "no terms found"); + op_errno = EINVAL; + goto err; + } + + sys_closedir (fp); + fp = NULL; + + /* + * Find term_contig, which is the earliest term for which there are + * no gaps between it and term_last. + */ + for (term_contig = term_last; term_contig > 0; --term_contig) { + if (gf_asprintf(&probe_str, "%s/TERM.%d", + cl_dir, term_contig-1) <= 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_MEM_ERR, + "failed to format term %d", term_contig-1); + goto err; + } + if (sys_access(probe_str, F_OK) != 0) { + GF_FREE(probe_str); + break; + } + GF_FREE(probe_str); + } + + gf_msg (this->name, GF_LOG_INFO, 0, + N_MSG_GENERIC, + "found terms %d-%d (%d)", + term_first, term_last, term_contig); + + /* Return what we've found */ + my_xdata = dict_new(); + if (!my_xdata) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_MEM_ERR, + "failed to allocate reply dictionary"); + goto err; + } + if (dict_set_int32(my_xdata, "term-first", term_first) != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_DICT_FLR, + "failed to set term-first"); + goto err; + } + if (dict_set_int32(my_xdata, "term-contig", term_contig) != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_DICT_FLR, + "failed to set term-contig"); + goto err; + } + if (dict_set_int32(my_xdata, "term-last", term_last) != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_DICT_FLR, + "failed to set term-last"); + goto err; + } + + /* Finally! */ + STACK_UNWIND_STRICT (ipc, frame, 0, 0, my_xdata); + dict_unref(my_xdata); + return; + +err: + if (fp) { + sys_closedir (fp); + } + if (my_xdata) { + dict_unref(my_xdata); + } + STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL); +} + + +long +get_entry_count (xlator_t *this, int fd) +{ + struct stat buf; + long min; /* last entry not known to be empty */ + long max; /* first entry known to be empty */ + long curr; + char entry[CHANGELOG_ENTRY_SIZE]; + + if (sys_fstat (fd, &buf) < 0) { + return -1; + } + + min = 0; + max = buf.st_size / CHANGELOG_ENTRY_SIZE; + + while ((min+1) < max) { + curr = (min + max) / 2; + if (sys_lseek(fd, curr*CHANGELOG_ENTRY_SIZE, SEEK_SET) < 0) { + return -1; + } + if (sys_read(fd, entry, sizeof(entry)) != sizeof(entry)) { + return -1; + } + if ((entry[0] == '_') && (entry[1] == 'P')) { + min = curr; + } else { + max = curr; + } + } + + if (sys_lseek(fd, 0, SEEK_SET) < 0) { + gf_msg (this->name, GF_LOG_WARNING, 0, + N_MSG_SYS_CALL_FAILURE, + "failed to reset offset"); + } + return max; +} + + +void +nsr_open_term (call_frame_t *frame, xlator_t *this, dict_t *xdata) +{ + int32_t op_errno; + char *cl_dir; + char *term; + char *path; + nsr_private_t *priv = this->private; + + op_errno = nsr_get_changelog_dir(this, &cl_dir); + if (op_errno) { + goto err; + } + + if (dict_get_str(xdata, "term", &term) != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_NO_DATA, "missing term"); + op_errno = ENODATA; + goto err; + } + + if (gf_asprintf(&path, "%s/TERM.%s", cl_dir, term) < 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_MEM_ERR, "failed to construct path"); + op_errno = ENOMEM; + goto err; + } + + if (priv->term_fd >= 0) { + sys_close (priv->term_fd); + } + priv->term_fd = open(path, O_RDONLY); + if (priv->term_fd < 0) { + op_errno = errno; + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_SYS_CALL_FAILURE, + "failed to open term file"); + goto err; + } + + priv->term_total = get_entry_count(this, priv->term_fd); + if (priv->term_total < 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_NO_DATA, "failed to get entry count"); + sys_close (priv->term_fd); + priv->term_fd = -1; + op_errno = EIO; + goto err; + } + priv->term_read = 0; + + /* Success! */ + STACK_UNWIND_STRICT (ipc, frame, 0, 0, NULL); + return; + +err: + STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL); +} + + +void +nsr_next_entry (call_frame_t *frame, xlator_t *this) +{ + int32_t op_errno = ENOMEM; + nsr_private_t *priv = this->private; + ssize_t nbytes; + dict_t *my_xdata; + + if (priv->term_fd < 0) { + op_errno = EBADFD; + goto err; + } + + if (priv->term_read >= priv->term_total) { + op_errno = ENODATA; + goto err; + } + + nbytes = sys_read (priv->term_fd, priv->term_buf, CHANGELOG_ENTRY_SIZE); + if (nbytes < CHANGELOG_ENTRY_SIZE) { + if (nbytes < 0) { + op_errno = errno; + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_SYS_CALL_FAILURE, + "error reading next entry: %s", + strerror(errno)); + } else { + op_errno = EIO; + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_SYS_CALL_FAILURE, + "got %ld/%d bytes for next entry", + nbytes, CHANGELOG_ENTRY_SIZE); + } + goto err; + } + ++(priv->term_read); + + my_xdata = dict_new(); + if (!my_xdata) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_MEM_ERR, "failed to allocate reply xdata"); + goto err; + } + + if (dict_set_static_bin(my_xdata, "data", + priv->term_buf, CHANGELOG_ENTRY_SIZE) != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, + N_MSG_DICT_FLR, "failed to assign reply xdata"); + goto err; + } + + STACK_UNWIND_STRICT (ipc, frame, 0, 0, my_xdata); + dict_unref(my_xdata); + return; + +err: + STACK_UNWIND_STRICT (ipc, frame, -1, op_errno, NULL); +} + + +int32_t +nsr_ipc (call_frame_t *frame, xlator_t *this, int32_t op, dict_t *xdata) +{ + switch (op) { + case NSR_SERVER_TERM_RANGE: + nsr_get_terms(frame, this); + break; + case NSR_SERVER_OPEN_TERM: + nsr_open_term(frame, this, xdata); + break; + case NSR_SERVER_NEXT_ENTRY: + nsr_next_entry(frame, this); + break; + default: + STACK_WIND_TAIL (frame, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->ipc, + op, xdata); + } + + return 0; +} + + +int32_t +nsr_forget (xlator_t *this, inode_t *inode) +{ + uint64_t ctx = 0LL; + + if ((inode_ctx_del(inode, this, &ctx) == 0) && ctx) { + GF_FREE((void *)(long)ctx); + } + + return 0; +} + +int32_t +nsr_release (xlator_t *this, fd_t *fd) +{ + uint64_t ctx = 0LL; + + if ((fd_ctx_del(fd, this, &ctx) == 0) && ctx) { + GF_FREE((void *)(long)ctx); + } + + return 0; +} + +struct xlator_cbks cbks = { + .forget = nsr_forget, + .release = nsr_release, +}; + +int +nsr_reconfigure (xlator_t *this, dict_t *options) +{ + nsr_private_t *priv = this->private; + + GF_OPTION_RECONF ("leader", + priv->config_leader, options, bool, err); + GF_OPTION_RECONF ("quorum-percent", + priv->quorum_pct, options, percent, err); + gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC, + "reconfigure called, config_leader = %d, quorum_pct = %.1f\n", + priv->leader, priv->quorum_pct); + + priv->leader = priv->config_leader; + + return 0; + +err: + return -1; +} + +int +nsr_get_child_index (xlator_t *this, xlator_t *kid) +{ + xlator_list_t *trav; + int retval = -1; + + for (trav = this->children; trav; trav = trav->next) { + ++retval; + if (trav->xlator == kid) { + return retval; + } + } + + return -1; +} + +/* + * Child notify handling is unreasonably FUBAR. Sometimes we'll get a + * CHILD_DOWN for a protocol/client child before we ever got a CHILD_UP for it. + * Other times we won't. Because it's effectively random (probably racy), we + * can't just maintain a count. We actually have to keep track of the state + * for each child separately, to filter out the bogus CHILD_DOWN events, and + * then generate counts on demand. + */ +int +nsr_notify (xlator_t *this, int event, void *data, ...) +{ + nsr_private_t *priv = this->private; + int index; + + switch (event) { + case GF_EVENT_CHILD_UP: + index = nsr_get_child_index(this, data); + if (index >= 0) { + priv->kid_state |= (1 << index); + priv->up_children = nsr_count_up_kids(priv); + gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC, + "got CHILD_UP for %s, now %u kids", + ((xlator_t *)data)->name, + priv->up_children); + if (!priv->config_leader && (priv->up_children > 1)) { + priv->leader = _gf_false; + } + } + break; + case GF_EVENT_CHILD_DOWN: + index = nsr_get_child_index(this, data); + if (index >= 0) { + priv->kid_state &= ~(1 << index); + priv->up_children = nsr_count_up_kids(priv); + gf_msg (this->name, GF_LOG_INFO, 0, N_MSG_GENERIC, + "got CHILD_DOWN for %s, now %u kids", + ((xlator_t *)data)->name, + priv->up_children); + if (!priv->config_leader && (priv->up_children < 2)) { + priv->leader = _gf_true; + } + } + break; + default: + ; + } + + return default_notify(this, event, data); +} + + +int32_t +mem_acct_init (xlator_t *this) +{ + int ret = -1; + + GF_VALIDATE_OR_GOTO ("nsr", this, out); + + ret = xlator_mem_acct_init (this, gf_mt_nsr_end + 1); + + if (ret != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR, + "Memory accounting init" "failed"); + return ret; + } +out: + return ret; +} + + +void +nsr_deallocate_priv (nsr_private_t *priv) +{ + if (!priv) { + return; + } + + GF_FREE(priv); +} + + +int32_t +nsr_init (xlator_t *this) +{ + xlator_list_t *remote; + xlator_list_t *local; + nsr_private_t *priv = NULL; + xlator_list_t *trav; + pthread_t kid; + extern xlator_t global_xlator; + glusterfs_ctx_t *oldctx = global_xlator.ctx; + + /* + * Any fop that gets special treatment has to be patched in here, + * because the compiled-in table is produced by the code generator and + * only contains generated functions. Note that we have to go through + * this->fops because of some dynamic-linking strangeness; modifying + * the static table doesn't work. + */ + this->fops->getxattr = nsr_getxattr_special; + this->fops->fsync = nsr_fsync; + this->fops->ipc = nsr_ipc; + + local = this->children; + if (!local) { + gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_NO_DATA, + "no local subvolume"); + goto err; + } + + remote = local->next; + if (!remote) { + gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_NO_DATA, + "no remote subvolumes"); + goto err; + } + + this->local_pool = mem_pool_new (nsr_local_t, 128); + if (!this->local_pool) { + gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR, + "failed to create nsr_local_t pool"); + goto err; + } + + priv = GF_CALLOC (1, sizeof(*priv), gf_mt_nsr_private_t); + if (!priv) { + gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_MEM_ERR, + "could not allocate priv"); + goto err; + } + + for (trav = this->children; trav; trav = trav->next) { + ++(priv->n_children); + } + + LOCK_INIT(&priv->dirty_lock); + LOCK_INIT(&priv->index_lock); + INIT_LIST_HEAD(&priv->dirty_fds); + priv->term_fd = -1; + + this->private = priv; + + GF_OPTION_INIT ("leader", priv->config_leader, bool, err); + GF_OPTION_INIT ("quorum-percent", priv->quorum_pct, percent, err); + + priv->leader = priv->config_leader; + + if (pthread_create(&kid, NULL, nsr_flush_thread, + this) != 0) { + gf_msg (this->name, GF_LOG_ERROR, 0, N_MSG_SYS_CALL_FAILURE, + "could not start flush thread"); + /* TBD: treat this as a fatal error? */ + } + + /* + * Calling glfs_new changes old->ctx, even if THIS still points + * to global_xlator. That causes problems later in the main + * thread, when gf_log_dump_graph tries to use the FILE after + * we've mucked with it and gets a segfault in __fprintf_chk. + * We can avoid all that by undoing the damage before we + * continue. + */ + global_xlator.ctx = oldctx; + + return 0; + +err: + nsr_deallocate_priv(priv); + return -1; +} + + +void +nsr_fini (xlator_t *this) +{ + nsr_deallocate_priv(this->private); +} + +class_methods_t class_methods = { + .init = nsr_init, + .fini = nsr_fini, + .reconfigure = nsr_reconfigure, + .notify = nsr_notify, +}; + +struct volume_options options[] = { + { .key = {"leader"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "false", + .description = "Start in the leader role. This is only for " + "bootstrapping the code, and should go away when we " + "have real leader election." + }, + { .key = {"vol-name"}, + .type = GF_OPTION_TYPE_STR, + .description = "volume name" + }, + { .key = {"my-name"}, + .type = GF_OPTION_TYPE_STR, + .description = "brick name in form of host:/path" + }, + { .key = {"etcd-servers"}, + .type = GF_OPTION_TYPE_STR, + .description = "list of comma seperated etc servers" + }, + { .key = {"subvol-uuid"}, + .type = GF_OPTION_TYPE_STR, + .description = "UUID for this NSR (sub)volume" + }, + { .key = {"quorum-percent"}, + .type = GF_OPTION_TYPE_PERCENT, + .default_value = "50.0", + .description = "percentage of rep_count-1 that must be up" + }, + { .key = {NULL} }, +}; |