summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r--geo-replication/syncdaemon/Makefile.am8
-rw-r--r--geo-replication/syncdaemon/README.md57
-rw-r--r--geo-replication/syncdaemon/__codecheck.py58
-rw-r--r--geo-replication/syncdaemon/__init__.py9
-rw-r--r--geo-replication/syncdaemon/argsupgrade.py359
-rw-r--r--geo-replication/syncdaemon/conf.py.in17
-rw-r--r--geo-replication/syncdaemon/gsyncd.py325
-rw-r--r--geo-replication/syncdaemon/gsyncdconfig.py485
-rw-r--r--geo-replication/syncdaemon/gsyncdstatus.py419
-rw-r--r--geo-replication/syncdaemon/libcxattr.py112
-rw-r--r--geo-replication/syncdaemon/libgfchangelog.py143
-rw-r--r--geo-replication/syncdaemon/logutils.py77
-rw-r--r--geo-replication/syncdaemon/master.py2020
-rw-r--r--geo-replication/syncdaemon/monitor.py395
-rw-r--r--geo-replication/syncdaemon/py2py3.py184
-rw-r--r--geo-replication/syncdaemon/rconf.py31
-rw-r--r--geo-replication/syncdaemon/repce.py253
-rw-r--r--geo-replication/syncdaemon/resource.py1583
-rw-r--r--geo-replication/syncdaemon/subcmds.py335
-rw-r--r--geo-replication/syncdaemon/syncdutils.py1115
20 files changed, 7985 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am
new file mode 100644
index 00000000000..d70e3368faf
--- /dev/null
+++ b/geo-replication/syncdaemon/Makefile.am
@@ -0,0 +1,8 @@
+syncdaemondir = $(GLUSTERFS_LIBEXECDIR)/python/syncdaemon
+
+syncdaemon_PYTHON = rconf.py gsyncd.py __init__.py master.py README.md repce.py \
+ resource.py syncdutils.py monitor.py libcxattr.py gsyncdconfig.py \
+ libgfchangelog.py gsyncdstatus.py conf.py logutils.py \
+ subcmds.py argsupgrade.py py2py3.py
+
+CLEANFILES =
diff --git a/geo-replication/syncdaemon/README.md b/geo-replication/syncdaemon/README.md
new file mode 100644
index 00000000000..5ab785ae669
--- /dev/null
+++ b/geo-replication/syncdaemon/README.md
@@ -0,0 +1,57 @@
+gsycnd, the Gluster Syncdaemon
+==============================
+
+REQUIREMENTS
+------------
+
+_gsyncd_ is a program which can operate either in _master_ or in _slave_ mode.
+Requirements are categorized according to this.
+
+* supported OS is GNU/Linux
+* Python >= 2.5, or 2.4 with Ctypes (see below) (both)
+* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave)
+* rsync (both)
+* glusterfs: with marker and changelog support (master & slave);
+* FUSE: glusterfs fuse module with auxiliary gfid based access support
+
+INSTALLATION
+------------
+
+As of now, the supported way of operation is running from the source directory or using the RPMs given.
+
+
+CONFIGURATION
+-------------
+
+gsyncd tunables are a subset of the long command-line options; for listing them,
+type
+
+ gsyncd.py --help
+
+and see the long options up to "--config-file". (The leading double dash should be omitted;
+interim underscores and dashes are interchangeable.) The set of options bear some resemblance
+to those of glusterfs and rsync.
+
+The config file format matches the following syntax:
+
+ <option1>: <value1>
+ <option2>: <value2>
+ # comment
+
+By default (unless specified by the option `-c`), gsyncd looks for config file at _conf/gsyncd_template.conf_
+in the source tree.
+
+USAGE
+-----
+
+gsyncd is a utilitly for continuous mirroring, ie. it mirrors master to slave incrementally.
+Assume we have a gluster volume _pop_ at localhost. We try to set up the mirroring for volume
+_pop_ using gsyncd for gluster volume _moz_ on remote machine/cluster @ example.com. The
+respective gsyncd invocations are (demoing some syntax sugaring):
+
+`gsyncd.py :pop example.com::moz`
+
+gsyncd has to be available on both sides; it's location on the remote side has to be specified
+via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be
+used for setting options on the remote side, although the suggested mode of operation is to
+set parameters like log file / pid file in the configuration file.)
diff --git a/geo-replication/syncdaemon/__codecheck.py b/geo-replication/syncdaemon/__codecheck.py
new file mode 100644
index 00000000000..9437147f7d9
--- /dev/null
+++ b/geo-replication/syncdaemon/__codecheck.py
@@ -0,0 +1,58 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+from __future__ import print_function
+import os
+import os.path
+import sys
+import tempfile
+import shutil
+
+ipd = tempfile.mkdtemp(prefix='codecheck-aux')
+
+try:
+ # add a fake ipaddr module, we don't want to
+ # deal with the real one (just test our code)
+ f = open(os.path.join(ipd, 'ipaddr.py'), 'w')
+ f.write("""
+class IPAddress(object):
+ pass
+class IPNetwork(list):
+ pass
+""")
+ f.close()
+ sys.path.append(ipd)
+
+ fl = os.listdir(os.path.dirname(sys.argv[0]) or '.')
+ fl.sort()
+ for f in fl:
+ if f[-3:] != '.py' or f[0] == '_':
+ continue
+ m = f[:-3]
+ sys.stdout.write('importing %s ...' % m)
+ __import__(m)
+ print(' OK.')
+
+ def sys_argv_set(a):
+ sys.argv = sys.argv[:1] + a
+
+ gsyncd = sys.modules['gsyncd']
+ for a in [['--help'], ['--version'],
+ ['--canonicalize-escape-url', '/foo']]:
+ print(('>>> invoking program with args: %s' % ' '.join(a)))
+ pid = os.fork()
+ if not pid:
+ sys_argv_set(a)
+ gsyncd.main()
+ _, r = os.waitpid(pid, 0)
+ if r:
+ raise RuntimeError('invocation failed')
+finally:
+ shutil.rmtree(ipd)
diff --git a/geo-replication/syncdaemon/__init__.py b/geo-replication/syncdaemon/__init__.py
new file mode 100644
index 00000000000..b4648b69645
--- /dev/null
+++ b/geo-replication/syncdaemon/__init__.py
@@ -0,0 +1,9 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
diff --git a/geo-replication/syncdaemon/argsupgrade.py b/geo-replication/syncdaemon/argsupgrade.py
new file mode 100644
index 00000000000..7af40633ef8
--- /dev/null
+++ b/geo-replication/syncdaemon/argsupgrade.py
@@ -0,0 +1,359 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+# Converts old style args into new style args
+
+from __future__ import print_function
+import sys
+from argparse import ArgumentParser
+import socket
+import os
+
+from syncdutils import GsyncdError
+from conf import GLUSTERD_WORKDIR
+
+
+def gethostbyname(hnam):
+ """gethostbyname wrapper"""
+ try:
+ return socket.gethostbyname(hnam)
+ except socket.gaierror:
+ ex = sys.exc_info()[1]
+ raise GsyncdError("failed to resolve %s: %s" %
+ (hnam, ex.strerror))
+
+
+def slave_url(urldata):
+ urldata = urldata.replace("ssh://", "")
+ host, vol = urldata.split("::")
+ vol = vol.split(":")[0]
+ return "%s::%s" % (host, vol)
+
+
+def init_gsyncd_template_conf():
+ path = GLUSTERD_WORKDIR + "/geo-replication/gsyncd_template.conf"
+ dname = os.path.dirname(path)
+ if not os.path.exists(dname):
+ try:
+ os.mkdir(dname)
+ except OSError:
+ pass
+
+ if not os.path.exists(path):
+ fd = os.open(path, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+
+
+def init_gsyncd_session_conf(master, slave):
+ slave = slave_url(slave)
+ master = master.strip(":")
+ slavehost, slavevol = slave.split("::")
+ slavehost = slavehost.split("@")[-1]
+
+ # Session Config File
+ path = "%s/geo-replication/%s_%s_%s/gsyncd.conf" % (
+ GLUSTERD_WORKDIR, master, slavehost, slavevol)
+
+ if os.path.exists(os.path.dirname(path)) and not os.path.exists(path):
+ fd = os.open(path, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+
+
+def init_gsyncd_conf(path):
+ dname = os.path.dirname(path)
+ if not os.path.exists(dname):
+ try:
+ os.mkdir(dname)
+ except OSError:
+ pass
+
+ if os.path.exists(dname) and not os.path.exists(path):
+ fd = os.open(path, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+
+
+def upgrade():
+ # Create dummy template conf(empty), hack to avoid glusterd
+ # fail when it does stat to check the existence.
+ init_gsyncd_template_conf()
+
+ inet6 = False
+ if "--inet6" in sys.argv:
+ inet6 = True
+
+ if "--monitor" in sys.argv:
+ # python gsyncd.py --path=/bricks/b1
+ # --monitor -c gsyncd.conf
+ # --iprefix=/var :gv1
+ # --glusterd-uuid=f26ac7a8-eb1b-4ea7-959c-80b27d3e43d0
+ # f241::gv2
+ p = ArgumentParser()
+ p.add_argument("master")
+ p.add_argument("slave")
+ p.add_argument("--glusterd-uuid")
+ p.add_argument("-c")
+ p.add_argument("--iprefix")
+ p.add_argument("--path", action="append")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+
+ # Overwrite the sys.argv after rearrange
+ init_gsyncd_session_conf(pargs.master, pargs.slave)
+ sys.argv = [
+ sys.argv[0],
+ "monitor",
+ pargs.master.strip(":"),
+ slave_url(pargs.slave),
+ "--local-node-id",
+ pargs.glusterd_uuid
+ ]
+ elif "--status-get" in sys.argv:
+ # -c gsyncd.conf --iprefix=/var :gv1 f241::gv2
+ # --status-get --path /bricks/b1
+ p = ArgumentParser()
+ p.add_argument("master")
+ p.add_argument("slave")
+ p.add_argument("-c")
+ p.add_argument("--path")
+ p.add_argument("--iprefix")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+
+ init_gsyncd_session_conf(pargs.master, pargs.slave)
+
+ sys.argv = [
+ sys.argv[0],
+ "status",
+ pargs.master.strip(":"),
+ slave_url(pargs.slave),
+ "--local-path",
+ pargs.path
+ ]
+ elif "--canonicalize-url" in sys.argv:
+ # This can accept multiple URLs and converts each URL to the
+ # format ssh://USER@IP:gluster://127.0.0.1:VOLUME
+ # This format not used in gsyncd, but added for glusterd compatibility
+ p = ArgumentParser()
+ p.add_argument("--canonicalize-url", nargs="+")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+
+ for url in pargs.canonicalize_url:
+ host, vol = url.split("::")
+ host = host.replace("ssh://", "")
+ remote_addr = host
+ if "@" not in remote_addr:
+ remote_addr = "root@" + remote_addr
+
+ user, hname = remote_addr.split("@")
+
+ if not inet6:
+ hname = gethostbyname(hname)
+
+ print(("ssh://%s@%s:gluster://127.0.0.1:%s" % (
+ user, hname, vol)))
+
+ sys.exit(0)
+ elif "--normalize-url" in sys.argv:
+ # Adds schema prefix as ssh://
+ # This format not used in gsyncd, but added for glusterd compatibility
+ p = ArgumentParser()
+ p.add_argument("--normalize-url")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+ print(("ssh://%s" % slave_url(pargs.normalize_url)))
+ sys.exit(0)
+ elif "--config-get-all" in sys.argv:
+ # -c gsyncd.conf --iprefix=/var :gv1 f241::gv2 --config-get-all
+ p = ArgumentParser()
+ p.add_argument("master")
+ p.add_argument("slave")
+ p.add_argument("-c")
+ p.add_argument("--iprefix")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+
+ init_gsyncd_session_conf(pargs.master, pargs.slave)
+
+ sys.argv = [
+ sys.argv[0],
+ "config-get",
+ pargs.master.strip(":"),
+ slave_url(pargs.slave),
+ "--show-defaults",
+ "--use-underscore"
+ ]
+ elif "--verify" in sys.argv and "spawning" in sys.argv:
+ # Just checks that able to spawn gsyncd or not
+ sys.exit(0)
+ elif "--slavevoluuid-get" in sys.argv:
+ # --slavevoluuid-get f241::gv2
+ p = ArgumentParser()
+ p.add_argument("--slavevoluuid-get")
+ p.add_argument("-c")
+ p.add_argument("--iprefix")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+ host, vol = pargs.slavevoluuid_get.split("::")
+
+ # Modified sys.argv
+ sys.argv = [
+ sys.argv[0],
+ "voluuidget",
+ host,
+ vol
+ ]
+ elif "--config-set-rx" in sys.argv:
+ # Not required since default conf is not generated
+ # and custom conf generated only when required
+ # -c gsyncd.conf --config-set-rx remote-gsyncd
+ # /usr/local/libexec/glusterfs/gsyncd . .
+ # Touch the gsyncd.conf file and create session
+ # directory if required
+ p = ArgumentParser()
+ p.add_argument("-c", dest="config_file")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+
+ # If not template conf then it is trying to create
+ # session config, create a empty file instead
+ if pargs.config_file.endswith("gsyncd.conf"):
+ init_gsyncd_conf(pargs.config_file)
+ sys.exit(0)
+ elif "--create" in sys.argv:
+ # To update monitor status file
+ # --create Created -c gsyncd.conf
+ # --iprefix=/var :gv1 f241::gv2
+ p = ArgumentParser()
+ p.add_argument("--create")
+ p.add_argument("master")
+ p.add_argument("slave")
+ p.add_argument("-c")
+ p.add_argument("--iprefix")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+
+ init_gsyncd_session_conf(pargs.master, pargs.slave)
+
+ # Modified sys.argv
+ sys.argv = [
+ sys.argv[0],
+ "monitor-status",
+ pargs.master.strip(":"),
+ slave_url(pargs.slave),
+ pargs.create
+ ]
+ elif "--config-get" in sys.argv:
+ # -c gsyncd.conf --iprefix=/var :gv1 f241::gv2 --config-get pid-file
+ p = ArgumentParser()
+ p.add_argument("--config-get")
+ p.add_argument("master")
+ p.add_argument("slave")
+ p.add_argument("-c")
+ p.add_argument("--iprefix")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+
+ init_gsyncd_session_conf(pargs.master, pargs.slave)
+
+ # Modified sys.argv
+ sys.argv = [
+ sys.argv[0],
+ "config-get",
+ pargs.master.strip(":"),
+ slave_url(pargs.slave),
+ "--only-value",
+ "--show-defaults",
+ "--name",
+ pargs.config_get.replace("_", "-")
+ ]
+ elif "--config-set" in sys.argv:
+ # ignore session-owner
+ if "session-owner" in sys.argv:
+ sys.exit(0)
+
+ # --path=/bricks/b1 -c gsyncd.conf :gv1 f241::gv2
+ # --config-set log_level DEBUG
+ p = ArgumentParser()
+ p.add_argument("master")
+ p.add_argument("slave")
+ p.add_argument("--config-set", action='store_true')
+ p.add_argument("name")
+ p.add_argument("--value")
+ p.add_argument("-c")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+
+ init_gsyncd_session_conf(pargs.master, pargs.slave)
+
+ # Modified sys.argv
+ sys.argv = [
+ sys.argv[0],
+ "config-set",
+ pargs.master.strip(":"),
+ slave_url(pargs.slave),
+ "--name=%s" % pargs.name,
+ "--value=%s" % pargs.value
+ ]
+ elif "--config-check" in sys.argv:
+ # --config-check georep_session_working_dir
+ p = ArgumentParser()
+ p.add_argument("--config-check")
+ p.add_argument("-c")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+
+ # Modified sys.argv
+ sys.argv = [
+ sys.argv[0],
+ "config-check",
+ pargs.config_check.replace("_", "-")
+ ]
+ elif "--config-del" in sys.argv:
+ # -c gsyncd.conf --iprefix=/var :gv1 f241::gv2 --config-del log_level
+ p = ArgumentParser()
+ p.add_argument("--config-del")
+ p.add_argument("master")
+ p.add_argument("slave")
+ p.add_argument("-c")
+ p.add_argument("--iprefix")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+
+ init_gsyncd_session_conf(pargs.master, pargs.slave)
+
+ # Modified sys.argv
+ sys.argv = [
+ sys.argv[0],
+ "config-reset",
+ pargs.master.strip(":"),
+ slave_url(pargs.slave),
+ pargs.config_del.replace("_", "-")
+ ]
+ elif "--delete" in sys.argv:
+ # --delete -c gsyncd.conf --iprefix=/var
+ # --path-list=--path=/bricks/b1 :gv1 f241::gv2
+ p = ArgumentParser()
+ p.add_argument("--reset-sync-time", action="store_true")
+ p.add_argument("--path-list")
+ p.add_argument("master")
+ p.add_argument("slave")
+ p.add_argument("--iprefix")
+ p.add_argument("-c")
+ pargs = p.parse_known_args(sys.argv[1:])[0]
+
+ init_gsyncd_session_conf(pargs.master, pargs.slave)
+
+ paths = pargs.path_list.split("--path=")
+ paths = ["--path=%s" % x.strip() for x in paths if x.strip() != ""]
+
+ # Modified sys.argv
+ sys.argv = [
+ sys.argv[0],
+ "delete",
+ pargs.master.strip(":"),
+ slave_url(pargs.slave)
+ ]
+ sys.argv += paths
+
+ if pargs.reset_sync_time:
+ sys.argv.append("--reset-sync-time")
+
+ if inet6:
+ # Add `--inet6` as first argument
+ sys.argv = [sys.argv[0], "--inet6"] + sys.argv[1:]
diff --git a/geo-replication/syncdaemon/conf.py.in b/geo-replication/syncdaemon/conf.py.in
new file mode 100644
index 00000000000..2042fa9cdfb
--- /dev/null
+++ b/geo-replication/syncdaemon/conf.py.in
@@ -0,0 +1,17 @@
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+GLUSTERFS_LIBEXECDIR = '@GLUSTERFS_LIBEXECDIR@'
+GLUSTERD_WORKDIR = "@GLUSTERD_WORKDIR@"
+
+LOCALSTATEDIR = "@localstatedir@"
+UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info"
+GLUSTERFS_CONFDIR = "@SYSCONF_DIR@/glusterfs"
+GCONF_VERSION = 4.0
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
new file mode 100644
index 00000000000..257ed72c6ae
--- /dev/null
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -0,0 +1,325 @@
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+from argparse import ArgumentParser
+import time
+import os
+from errno import EEXIST
+import sys
+import logging
+
+from logutils import setup_logging
+import gsyncdconfig as gconf
+from rconf import rconf
+import subcmds
+from conf import GLUSTERD_WORKDIR, GLUSTERFS_CONFDIR, GCONF_VERSION
+from syncdutils import (set_term_handler, finalize, lf,
+ log_raise_exception, FreeObject, escape)
+import argsupgrade
+
+
+GSYNCD_VERSION = "gsyncd.py %s.0" % GCONF_VERSION
+
+
+def main():
+ rconf.starttime = time.time()
+
+ # If old Glusterd sends commands in old format, below function
+ # converts the sys.argv to new format. This conversion is added
+ # temporarily for backward compatibility. This can be removed
+ # once integrated with Glusterd2
+ # This modifies sys.argv globally, so rest of the code works as usual
+ argsupgrade.upgrade()
+
+ # Default argparse version handler prints to stderr, which is fixed in
+ # 3.x series but not in 2.x, using custom parser to fix this issue
+ if "--version" in sys.argv:
+ print(GSYNCD_VERSION)
+ sys.exit(0)
+
+ parser = ArgumentParser()
+ parser.add_argument("--inet6", action="store_true")
+ sp = parser.add_subparsers(dest="subcmd")
+
+ # Monitor Status File update
+ p = sp.add_parser("monitor-status")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave details user@host::vol format")
+ p.add_argument("status", help="Update Monitor Status")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+
+ # Monitor
+ p = sp.add_parser("monitor")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave details user@host::vol format")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--pause-on-start",
+ action="store_true",
+ help="Start with Paused state")
+ p.add_argument("--local-node-id", help="Local Node ID")
+ p.add_argument("--debug", action="store_true")
+ p.add_argument("--use-gconf-volinfo", action="store_true")
+
+ # Worker
+ p = sp.add_parser("worker")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave details user@host::vol format")
+ p.add_argument("--local-path", help="Local Brick Path")
+ p.add_argument("--feedback-fd", type=int,
+ help="feedback fd between monitor and worker")
+ p.add_argument("--local-node", help="Local master node")
+ p.add_argument("--local-node-id", help="Local Node ID")
+ p.add_argument("--subvol-num", type=int, help="Subvolume number")
+ p.add_argument("--is-hottier", action="store_true",
+ help="Is this brick part of hot tier")
+ p.add_argument("--resource-remote",
+ help="Remote node to connect to Slave Volume")
+ p.add_argument("--resource-remote-id",
+ help="Remote node ID to connect to Slave Volume")
+ p.add_argument("--slave-id", help="Slave Volume ID")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+
+ # Slave
+ p = sp.add_parser("slave")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave details user@host::vol format")
+ p.add_argument("--session-owner")
+ p.add_argument("--master-brick",
+ help="Master brick which is connected to the Slave")
+ p.add_argument("--master-node",
+ help="Master node which is connected to the Slave")
+ p.add_argument("--master-node-id",
+ help="Master node ID which is connected to the Slave")
+ p.add_argument("--local-node", help="Local Slave node")
+ p.add_argument("--local-node-id", help="Local Slave ID")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+
+ # All configurations which are configured via "slave-" options
+ # DO NOT add default values for these configurations, default values
+ # will be picked from template config file
+ p.add_argument("--slave-timeout", type=int,
+ help="Timeout to end gsyncd at Slave side")
+ p.add_argument("--use-rsync-xattrs", action="store_true")
+ p.add_argument("--slave-log-level", help="Slave Gsyncd Log level")
+ p.add_argument("--slave-gluster-log-level",
+ help="Slave Gluster mount Log level")
+ p.add_argument("--slave-gluster-command-dir",
+ help="Directory where Gluster binaries exist on slave")
+ p.add_argument("--slave-access-mount", action="store_true",
+ help="Do not lazy umount the slave volume")
+ p.add_argument("--master-dist-count", type=int,
+ help="Master Distribution count")
+
+ # Status
+ p = sp.add_parser("status")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--local-path", help="Local Brick Path")
+ p.add_argument("--debug", action="store_true")
+ p.add_argument("--json", action="store_true")
+
+ # Config-check
+ p = sp.add_parser("config-check")
+ p.add_argument("name", help="Config Name")
+ p.add_argument("--value", help="Config Value")
+ p.add_argument("--debug", action="store_true")
+
+ # Config-get
+ p = sp.add_parser("config-get")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave")
+ p.add_argument("--name", help="Config Name")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+ p.add_argument("--show-defaults", action="store_true")
+ p.add_argument("--only-value", action="store_true")
+ p.add_argument("--use-underscore", action="store_true")
+ p.add_argument("--json", action="store_true")
+
+ # Config-set
+ p = sp.add_parser("config-set")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave")
+ p.add_argument("-n", "--name", help="Config Name")
+ p.add_argument("-v", "--value", help="Config Value")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+
+ # Config-reset
+ p = sp.add_parser("config-reset")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave")
+ p.add_argument("name", help="Config Name")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument("--debug", action="store_true")
+
+ # voluuidget
+ p = sp.add_parser("voluuidget")
+ p.add_argument("host", help="Hostname")
+ p.add_argument("volname", help="Volume Name")
+ p.add_argument("--debug", action="store_true")
+
+ # Delete
+ p = sp.add_parser("delete")
+ p.add_argument("master", help="Master Volume Name")
+ p.add_argument("slave", help="Slave")
+ p.add_argument("-c", "--config-file", help="Config File")
+ p.add_argument('--path', dest='paths', action="append")
+ p.add_argument("--reset-sync-time", action="store_true",
+ help="Reset Sync Time")
+ p.add_argument("--debug", action="store_true")
+
+ # Parse arguments
+ args = parser.parse_args()
+
+ # Extra template values, All arguments are already part of template
+ # variables, use this for adding extra variables
+ extra_tmpl_args = {}
+
+ # Add First/Primary Slave host, user and volume
+ if getattr(args, "slave", None) is not None:
+ hostdata, slavevol = args.slave.split("::")
+ hostdata = hostdata.split("@")
+ slavehost = hostdata[-1]
+ slaveuser = "root"
+ if len(hostdata) == 2:
+ slaveuser = hostdata[0]
+ extra_tmpl_args["primary_slave_host"] = slavehost
+ extra_tmpl_args["slaveuser"] = slaveuser
+ extra_tmpl_args["slavevol"] = slavevol
+
+ # Add Bricks encoded path
+ if getattr(args, "local_path", None) is not None:
+ extra_tmpl_args["local_id"] = escape(args.local_path)
+
+ # Add Master Bricks encoded path(For Slave)
+ if getattr(args, "master_brick", None) is not None:
+ extra_tmpl_args["master_brick_id"] = escape(args.master_brick)
+
+ # Load configurations
+ config_file = getattr(args, "config_file", None)
+
+ # Subcmd accepts config file argument but not passed
+ # Set default path for config file in that case
+ # If an subcmd accepts config file then it also accepts
+ # master and Slave arguments.
+ if config_file is None and hasattr(args, "config_file") \
+ and args.subcmd != "slave":
+ config_file = "%s/geo-replication/%s_%s_%s/gsyncd.conf" % (
+ GLUSTERD_WORKDIR,
+ args.master,
+ extra_tmpl_args["primary_slave_host"],
+ extra_tmpl_args["slavevol"])
+
+ # If Config file path not exists, log error and continue using default conf
+ config_file_error_msg = None
+ if config_file is not None and not os.path.exists(config_file):
+ # Logging not yet initialized, create the error message to
+ # log later and reset the config_file to None
+ config_file_error_msg = lf(
+ "Session config file not exists, using the default config",
+ path=config_file)
+ config_file = None
+
+ rconf.config_file = config_file
+
+ # Override gconf values from argument values only if it is slave gsyncd
+ override_from_args = False
+ if args.subcmd == "slave":
+ override_from_args = True
+
+ if config_file is not None and \
+ args.subcmd in ["monitor", "config-get", "config-set", "config-reset"]:
+ ret = gconf.is_config_file_old(config_file, args.master, extra_tmpl_args["slavevol"])
+ if ret is not None:
+ gconf.config_upgrade(config_file, ret)
+
+ # Load Config file
+ gconf.load(GLUSTERFS_CONFDIR + "/gsyncd.conf",
+ config_file,
+ vars(args),
+ extra_tmpl_args,
+ override_from_args)
+
+ # Default label to print in log file
+ label = args.subcmd
+ if args.subcmd in ("worker"):
+ # If Worker, then add brick path also to label
+ label = "%s %s" % (args.subcmd, args.local_path)
+ elif args.subcmd == "slave":
+ # If Slave add Master node and Brick details
+ label = "%s %s%s" % (args.subcmd, args.master_node, args.master_brick)
+
+ # Setup Logger
+ # Default log file
+ log_file = gconf.get("cli-log-file")
+ log_level = gconf.get("cli-log-level")
+ if getattr(args, "master", None) is not None and \
+ getattr(args, "slave", None) is not None:
+ log_file = gconf.get("log-file")
+ log_level = gconf.get("log-level")
+
+ # Use different log file location for Slave log file
+ if args.subcmd == "slave":
+ log_file = gconf.get("slave-log-file")
+ log_level = gconf.get("slave-log-level")
+
+ if args.debug:
+ log_file = "-"
+ log_level = "DEBUG"
+
+ # Create Logdir if not exists
+ try:
+ if log_file != "-":
+ os.mkdir(os.path.dirname(log_file))
+ except OSError as e:
+ if e.errno != EEXIST:
+ raise
+
+ setup_logging(
+ log_file=log_file,
+ level=log_level,
+ label=label
+ )
+
+ if config_file_error_msg is not None:
+ logging.warn(config_file_error_msg)
+
+ # Log message for loaded config file
+ if config_file is not None:
+ logging.debug(lf("Using session config file", path=config_file))
+
+ set_term_handler()
+ excont = FreeObject(exval=0)
+
+ # Gets the function name based on the input argument. For example
+ # if subcommand passed as argument is monitor then it looks for
+ # function with name "subcmd_monitor" in subcmds file
+ func = getattr(subcmds, "subcmd_" + args.subcmd.replace("-", "_"), None)
+
+ try:
+ try:
+ if func is not None:
+ rconf.args = args
+ func(args)
+ except:
+ log_raise_exception(excont)
+ finally:
+ finalize(exval=excont.exval)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/geo-replication/syncdaemon/gsyncdconfig.py b/geo-replication/syncdaemon/gsyncdconfig.py
new file mode 100644
index 00000000000..8848071997a
--- /dev/null
+++ b/geo-replication/syncdaemon/gsyncdconfig.py
@@ -0,0 +1,485 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+try:
+ from ConfigParser import RawConfigParser, NoSectionError
+except ImportError:
+ from configparser import RawConfigParser, NoSectionError
+import os
+import shutil
+from string import Template
+from datetime import datetime
+from threading import Lock
+
+
+# Global object which can be used in other modules
+# once load_config is called
+_gconf = {}
+
+
+class GconfNotConfigurable(Exception):
+ pass
+
+
+class GconfInvalidValue(Exception):
+ pass
+
+
+class Gconf(object):
+ def __init__(self, default_conf_file, custom_conf_file=None,
+ args={}, extra_tmpl_args={}, override_from_args=False):
+ self.lock = Lock()
+ self.default_conf_file = default_conf_file
+ self.custom_conf_file = custom_conf_file
+ self.tmp_conf_file = None
+ self.gconf = {}
+ self.gconfdata = {}
+ self.gconf_typecast = {}
+ self.template_conf = []
+ self.non_configurable_configs = []
+ self.prev_mtime = 0
+ if custom_conf_file is not None:
+ self.tmp_conf_file = custom_conf_file + ".tmp"
+
+ self.session_conf_items = []
+ self.args = args
+ self.extra_tmpl_args = extra_tmpl_args
+ self.override_from_args = override_from_args
+ # Store default values only if overwritten, Only for JSON/CLI output
+ self.default_values = {}
+ self._load()
+
+ def _tmpl_substitute(self):
+ tmpl_values = {}
+ for k, v in self.gconf.items():
+ tmpl_values[k.replace("-", "_")] = v
+
+ # override the config file values with the one user passed
+ for k, v in self.args.items():
+ # override the existing value only if set by user
+ if v is not None:
+ tmpl_values[k] = v
+
+ for k, v in self.extra_tmpl_args.items():
+ tmpl_values[k] = v
+
+ for k, v in self.gconf.items():
+ if k in self.template_conf and \
+ (isinstance(v, str) or isinstance(v, unicode)):
+ self.gconf[k] = Template(v).safe_substitute(tmpl_values)
+
+ def _do_typecast(self):
+ for k, v in self.gconf.items():
+ cast_func = globals().get(
+ "to_" + self.gconf_typecast.get(k, "string"), None)
+ if cast_func is not None:
+ self.gconf[k] = cast_func(v)
+ if self.default_values.get(k, None) is not None:
+ self.default_values[k] = cast_func(v)
+
+ def reset(self, name):
+ # If custom conf file is not set then it is only read only configs
+ if self.custom_conf_file is None:
+ raise GconfNotConfigurable()
+
+ # If a config can not be modified
+ if name != "all" and not self._is_configurable(name):
+ raise GconfNotConfigurable()
+
+ cnf = RawConfigParser()
+ with open(self.custom_conf_file) as f:
+ cnf.readfp(f)
+
+ # Nothing to Reset, Not configured
+ if name != "all":
+ if not cnf.has_option("vars", name):
+ return True
+
+ # Remove option from custom conf file
+ cnf.remove_option("vars", name)
+ else:
+ # Remove and add empty section, do not disturb if config file
+ # already has any other section
+ try:
+ cnf.remove_section("vars")
+ except NoSectionError:
+ pass
+
+ cnf.add_section("vars")
+
+ with open(self.tmp_conf_file, "w") as fw:
+ cnf.write(fw)
+
+ os.rename(self.tmp_conf_file, self.custom_conf_file)
+
+ self.reload()
+
+ return True
+
+ def set(self, name, value):
+ if self.custom_conf_file is None:
+ raise GconfNotConfigurable()
+
+ if not self._is_configurable(name):
+ raise GconfNotConfigurable()
+
+ if not self._is_valid_value(name, value):
+ raise GconfInvalidValue()
+
+ curr_val = self.gconf.get(name, None)
+ if curr_val == value:
+ return True
+
+ cnf = RawConfigParser()
+ with open(self.custom_conf_file) as f:
+ cnf.readfp(f)
+
+ if not cnf.has_section("vars"):
+ cnf.add_section("vars")
+
+ cnf.set("vars", name, value)
+ with open(self.tmp_conf_file, "w") as fw:
+ cnf.write(fw)
+
+ os.rename(self.tmp_conf_file, self.custom_conf_file)
+
+ self.reload()
+
+ return True
+
+ def check(self, name, value=None, with_conffile=True):
+ if with_conffile and self.custom_conf_file is None:
+ raise GconfNotConfigurable()
+
+ if not self._is_configurable(name):
+ raise GconfNotConfigurable()
+
+ if value is not None and not self._is_valid_value(name, value):
+ raise GconfInvalidValue()
+
+
+ def _load_with_lock(self):
+ with self.lock:
+ self._load()
+
+ def _load(self):
+ self.gconf = {}
+ self.template_conf = []
+ self.gconf_typecast = {}
+ self.non_configurable_configs = []
+ self.session_conf_items = []
+ self.default_values = {}
+
+ conf = RawConfigParser()
+ # Default Template config file
+ with open(self.default_conf_file) as f:
+ conf.readfp(f)
+
+ # Custom Config file
+ if self.custom_conf_file is not None:
+ with open(self.custom_conf_file) as f:
+ conf.readfp(f)
+
+ # Get version from default conf file
+ self.version = conf.get("__meta__", "version")
+
+ # Populate default values
+ for sect in conf.sections():
+ if sect in ["__meta__", "vars"]:
+ continue
+
+ # Collect list of available options with help details
+ self.gconfdata[sect] = {}
+ for k, v in conf.items(sect):
+ self.gconfdata[sect][k] = v.strip()
+
+ # Collect the Type cast information
+ if conf.has_option(sect, "type"):
+ self.gconf_typecast[sect] = conf.get(sect, "type")
+
+ # Prepare list of configurable conf
+ if conf.has_option(sect, "configurable"):
+ if conf.get(sect, "configurable").lower() == "false":
+ self.non_configurable_configs.append(sect)
+
+ # if it is a template conf value which needs to be substituted
+ if conf.has_option(sect, "template"):
+ if conf.get(sect, "template").lower().strip() == "true":
+ self.template_conf.append(sect)
+
+ # Set default values
+ if conf.has_option(sect, "value"):
+ self.gconf[sect] = conf.get(sect, "value").strip()
+
+ # Load the custom conf elements and overwrite
+ if conf.has_section("vars"):
+ for k, v in conf.items("vars"):
+ self.session_conf_items.append(k)
+ self.default_values[k] = self.gconf.get(k, "")
+ self.gconf[k] = v.strip()
+
+ # Overwrite the Slave configurations which are sent as
+ # arguments to gsyncd slave
+ if self.override_from_args:
+ for k, v in self.args.items():
+ k = k.replace("_", "-")
+ if k.startswith("slave-") and k in self.gconf:
+ self.gconf[k] = v
+
+ self._tmpl_substitute()
+ self._do_typecast()
+
+ def reload(self, with_lock=True):
+ if self._is_config_changed():
+ if with_lock:
+ self._load_with_lock()
+ else:
+ self._load()
+
+ def get(self, name, default_value=None, with_lock=True):
+ if with_lock:
+ with self.lock:
+ return self.gconf.get(name, default_value)
+ else:
+ return self.gconf.get(name, default_value)
+
+ def getall(self, show_defaults=False, show_non_configurable=False):
+ cnf = {}
+ if not show_defaults:
+ for k in self.session_conf_items:
+ if k not in self.non_configurable_configs:
+ dv = self.default_values.get(k, "")
+ cnf[k] = {
+ "value": self.get(k),
+ "default": dv,
+ "configurable": True,
+ "modified": False if dv == "" else True
+ }
+ return cnf
+
+ # Show all configs including defaults
+ for k, v in self.gconf.items():
+ configurable = False if k in self.non_configurable_configs \
+ else True
+ dv = self.default_values.get(k, "")
+ modified = False if dv == "" else True
+ if show_non_configurable:
+ cnf[k] = {
+ "value": v,
+ "default": dv,
+ "configurable": configurable,
+ "modified": modified
+ }
+ else:
+ if k not in self.non_configurable_configs:
+ cnf[k] = {
+ "value": v,
+ "default": dv,
+ "configurable": configurable,
+ "modified": modified
+ }
+
+ return cnf
+
+ def getr(self, name, default_value=None):
+ with self.lock:
+ self.reload(with_lock=False)
+ return self.get(name, default_value, with_lock=False)
+
+ def get_help(self, name=None):
+ pass
+
+ def _is_configurable(self, name):
+ item = self.gconfdata.get(name, None)
+ if item is None:
+ return False
+
+ return item.get("configurable", True)
+
+ def _is_valid_value(self, name, value):
+ item = self.gconfdata.get(name, None)
+ if item is None:
+ return False
+
+ # If validation func not defined
+ if item.get("validation", None) is None:
+ return True
+
+ # minmax validation
+ if item["validation"] == "minmax":
+ return validate_minmax(value, item["min"], item["max"])
+
+ if item["validation"] == "choice":
+ return validate_choice(value, item["allowed_values"])
+
+ if item["validation"] == "bool":
+ return validate_bool(value)
+
+ if item["validation"] == "execpath":
+ return validate_execpath(value)
+
+ if item["validation"] == "unixtime":
+ return validate_unixtime(value)
+
+ if item["validation"] == "int":
+ return validate_int(value)
+
+ return False
+
+ def _is_config_changed(self):
+ if self.custom_conf_file is not None and \
+ os.path.exists(self.custom_conf_file):
+ st = os.lstat(self.custom_conf_file)
+ if st.st_mtime > self.prev_mtime:
+ self.prev_mtime = st.st_mtime
+ return True
+
+ return False
+
+def is_config_file_old(config_file, mastervol, slavevol):
+ cnf = RawConfigParser()
+ cnf.read(config_file)
+ session_section = "peers %s %s" % (mastervol, slavevol)
+ try:
+ return dict(cnf.items(session_section))
+ except NoSectionError:
+ return None
+
+def config_upgrade(config_file, ret):
+ config_file_backup = os.path.join(os.path.dirname(config_file), "gsyncd.conf.bkp")
+
+ #copy old config file in a backup file
+ shutil.copyfile(config_file, config_file_backup)
+
+ #write a new config file
+ config = RawConfigParser()
+ config.add_section('vars')
+
+ for key, value in ret.items():
+ #handle option name changes
+ if key == "use_tarssh":
+ new_key = "sync-method"
+ if value == "true":
+ new_value = "tarssh"
+ else:
+ new_value = "rsync"
+ config.set('vars', new_key, new_value)
+ elif key == "timeout":
+ new_key = "slave-timeout"
+ config.set('vars', new_key, value)
+ #for changes like: ignore_deletes to ignore-deletes
+ else:
+ new_key = key.replace("_", "-")
+ config.set('vars', new_key, value)
+
+ with open(config_file, 'w') as configfile:
+ config.write(configfile)
+
+
+def validate_int(value):
+ try:
+ _ = int(value)
+ return True
+ except ValueError:
+ return False
+
+
+def validate_unixtime(value):
+ try:
+ y = datetime.fromtimestamp(int(value)).strftime("%Y")
+ if y == "1970":
+ return False
+
+ return True
+ except ValueError:
+ return False
+
+
+def validate_minmax(value, minval, maxval):
+ try:
+ value = int(value)
+ minval = int(minval)
+ maxval = int(maxval)
+ return value >= minval and value <= maxval
+ except ValueError:
+ return False
+
+
+def validate_choice(value, allowed_values):
+ allowed_values = allowed_values.split(",")
+ allowed_values = [v.strip() for v in allowed_values]
+
+ return value in allowed_values
+
+
+def validate_bool(value):
+ return value in ["true", "false"]
+
+
+def validate_execpath(value):
+ return os.path.isfile(value) and os.access(value, os.X_OK)
+
+
+def validate_filepath(value):
+ return os.path.isfile(value)
+
+
+def validate_path(value):
+ return os.path.exists(value)
+
+
+def to_int(value):
+ return int(value)
+
+
+def to_float(value):
+ return float(value)
+
+
+def to_bool(value):
+ if isinstance(value, bool):
+ return value
+ return True if value in ["true", "True"] else False
+
+
+def get(name, default_value=None):
+ return _gconf.get(name, default_value)
+
+
+def getall(show_defaults=False, show_non_configurable=False):
+ return _gconf.getall(show_defaults=show_defaults,
+ show_non_configurable=show_non_configurable)
+
+
+def getr(name, default_value=None):
+ return _gconf.getr(name, default_value)
+
+
+def load(default_conf, custom_conf=None, args={}, extra_tmpl_args={},
+ override_from_args=False):
+ global _gconf
+ _gconf = Gconf(default_conf, custom_conf, args, extra_tmpl_args,
+ override_from_args)
+
+
+def setconfig(name, value):
+ global _gconf
+ _gconf.set(name, value)
+
+
+def resetconfig(name):
+ global _gconf
+ _gconf.reset(name)
+
+
+def check(name, value=None, with_conffile=True):
+ global _gconf
+ _gconf.check(name, value=value, with_conffile=with_conffile)
diff --git a/geo-replication/syncdaemon/gsyncdstatus.py b/geo-replication/syncdaemon/gsyncdstatus.py
new file mode 100644
index 00000000000..1a655ff8887
--- /dev/null
+++ b/geo-replication/syncdaemon/gsyncdstatus.py
@@ -0,0 +1,419 @@
+#!/usr/bin/python3
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+from __future__ import print_function
+import fcntl
+import os
+import tempfile
+try:
+ import urllib.parse as urllib
+except ImportError:
+ import urllib
+import json
+import time
+from datetime import datetime
+from errno import EACCES, EAGAIN, ENOENT
+import logging
+
+from syncdutils import (EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event,
+ EVENT_GEOREP_CHECKPOINT_COMPLETED, lf)
+
+DEFAULT_STATUS = "N/A"
+MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")
+STATUS_VALUES = (DEFAULT_STATUS,
+ "Initializing...",
+ "Active",
+ "Passive",
+ "Faulty")
+
+CRAWL_STATUS_VALUES = (DEFAULT_STATUS,
+ "Hybrid Crawl",
+ "History Crawl",
+ "Changelog Crawl")
+
+
+def human_time(ts):
+ try:
+ return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S")
+ except ValueError:
+ return DEFAULT_STATUS
+
+
+def human_time_utc(ts):
+ try:
+ return datetime.utcfromtimestamp(
+ float(ts)).strftime("%Y-%m-%d %H:%M:%S")
+ except ValueError:
+ return DEFAULT_STATUS
+
+
+def get_default_values():
+ return {
+ "slave_node": DEFAULT_STATUS,
+ "worker_status": DEFAULT_STATUS,
+ "last_synced": 0,
+ "last_synced_entry": 0,
+ "crawl_status": DEFAULT_STATUS,
+ "entry": 0,
+ "data": 0,
+ "meta": 0,
+ "failures": 0,
+ "checkpoint_completed": DEFAULT_STATUS,
+ "checkpoint_time": 0,
+ "checkpoint_completion_time": 0}
+
+
+class LockedOpen(object):
+
+ def __init__(self, filename, *args, **kwargs):
+ self.filename = filename
+ self.open_args = args
+ self.open_kwargs = kwargs
+ self.fileobj = None
+
+ def __enter__(self):
+ """
+ If two processes compete to update a file, The first process
+ gets the lock and the second process is blocked in the fcntl.flock()
+ call. When first process replaces the file and releases the lock,
+ the already open file descriptor in the second process now points
+ to a "ghost" file(not reachable by any path name) with old contents.
+ To avoid that conflict, check the fd already opened is same or
+ not. Open new one if not same
+ """
+ f = open(self.filename, *self.open_args, **self.open_kwargs)
+ while True:
+ fcntl.flock(f, fcntl.LOCK_EX)
+ fnew = open(self.filename, *self.open_args, **self.open_kwargs)
+ if os.path.sameopenfile(f.fileno(), fnew.fileno()):
+ fnew.close()
+ break
+ else:
+ f.close()
+ f = fnew
+ self.fileobj = f
+ return f
+
+ def __exit__(self, _exc_type, _exc_value, _traceback):
+ fcntl.flock(self.fileobj, fcntl.LOCK_UN)
+ self.fileobj.close()
+
+
+def set_monitor_status(status_file, status):
+ fd = os.open(status_file, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+ with LockedOpen(status_file, 'r+'):
+ with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file),
+ delete=False) as tf:
+ tf.write(status)
+ tempname = tf.name
+
+ os.rename(tempname, status_file)
+ dirfd = os.open(os.path.dirname(os.path.abspath(status_file)),
+ os.O_DIRECTORY)
+ os.fsync(dirfd)
+ os.close(dirfd)
+
+
+class GeorepStatus(object):
+ def __init__(self, monitor_status_file, master_node, brick, master_node_id,
+ master, slave, monitor_pid_file=None):
+ self.master = master
+ slv_data = slave.split("::")
+ self.slave_host = slv_data[0]
+ self.slave_volume = slv_data[1].split(":")[0] # Remove Slave UUID
+ self.work_dir = os.path.dirname(monitor_status_file)
+ self.monitor_status_file = monitor_status_file
+ self.filename = os.path.join(self.work_dir,
+ "brick_%s.status"
+ % urllib.quote_plus(brick))
+
+ fd = os.open(self.filename, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+ fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR)
+ os.close(fd)
+ self.master_node = master_node
+ self.master_node_id = master_node_id
+ self.brick = brick
+ self.default_values = get_default_values()
+ self.monitor_pid_file = monitor_pid_file
+
+ def send_event(self, event_type, **kwargs):
+ gf_event(event_type,
+ master_volume=self.master,
+ master_node=self.master_node,
+ master_node_id=self.master_node_id,
+ slave_host=self.slave_host,
+ slave_volume=self.slave_volume,
+ brick_path=self.brick,
+ **kwargs)
+
+ def _update(self, mergerfunc):
+ data = self.default_values
+ with LockedOpen(self.filename, 'r+') as f:
+ try:
+ data.update(json.load(f))
+ except ValueError:
+ pass
+
+ data = mergerfunc(data)
+ # If Data is not changed by merger func
+ if not data:
+ return False
+
+ with tempfile.NamedTemporaryFile(
+ 'w',
+ dir=os.path.dirname(self.filename),
+ delete=False) as tf:
+ tf.write(data)
+ tempname = tf.name
+
+ os.rename(tempname, self.filename)
+ dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)),
+ os.O_DIRECTORY)
+ os.fsync(dirfd)
+ os.close(dirfd)
+ return True
+
+ def reset_on_worker_start(self):
+ def merger(data):
+ data["slave_node"] = DEFAULT_STATUS
+ data["crawl_status"] = DEFAULT_STATUS
+ data["entry"] = 0
+ data["data"] = 0
+ data["meta"] = 0
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def set_field(self, key, value):
+ def merger(data):
+ # Current data and prev data is same
+ if data[key] == value:
+ return {}
+
+ data[key] = value
+ return json.dumps(data)
+
+ return self._update(merger)
+
+ def trigger_gf_event_checkpoint_completion(self, checkpoint_time,
+ checkpoint_completion_time):
+ self.send_event(EVENT_GEOREP_CHECKPOINT_COMPLETED,
+ checkpoint_time=checkpoint_time,
+ checkpoint_completion_time=checkpoint_completion_time)
+
+ def set_last_synced(self, value, checkpoint_time):
+ def merger(data):
+ data["last_synced"] = value[0]
+
+ # If checkpoint is not set or reset
+ # or if last set checkpoint is changed
+ if checkpoint_time == 0 or \
+ checkpoint_time != data["checkpoint_time"]:
+ data["checkpoint_time"] = 0
+ data["checkpoint_completion_time"] = 0
+ data["checkpoint_completed"] = "No"
+
+ # If checkpoint is completed and not marked as completed
+ # previously then update the checkpoint completed time
+ if checkpoint_time > 0 and checkpoint_time <= value[0]:
+ if data["checkpoint_completed"] == "No":
+ curr_time = int(time.time())
+ data["checkpoint_time"] = checkpoint_time
+ data["checkpoint_completion_time"] = curr_time
+ data["checkpoint_completed"] = "Yes"
+ logging.info(lf("Checkpoint completed",
+ checkpoint_time=human_time_utc(
+ checkpoint_time),
+ completion_time=human_time_utc(curr_time)))
+ self.trigger_gf_event_checkpoint_completion(
+ checkpoint_time, curr_time)
+
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def set_worker_status(self, status):
+ if self.set_field("worker_status", status):
+ logging.info(lf("Worker Status Change",
+ status=status))
+
+ def set_worker_crawl_status(self, status):
+ if self.set_field("crawl_status", status):
+ logging.info(lf("Crawl Status Change",
+ status=status))
+
+ def set_slave_node(self, slave_node):
+ def merger(data):
+ data["slave_node"] = slave_node
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def inc_value(self, key, value):
+ def merger(data):
+ data[key] = data.get(key, 0) + value
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def dec_value(self, key, value):
+ def merger(data):
+ data[key] = data.get(key, 0) - value
+ if data[key] < 0:
+ data[key] = 0
+ return json.dumps(data)
+
+ self._update(merger)
+
+ def set_active(self):
+ if self.set_field("worker_status", "Active"):
+ logging.info(lf("Worker Status Change",
+ status="Active"))
+ self.send_event(EVENT_GEOREP_ACTIVE)
+
+ def set_passive(self):
+ if self.set_field("worker_status", "Passive"):
+ logging.info(lf("Worker Status Change",
+ status="Passive"))
+ self.send_event(EVENT_GEOREP_PASSIVE)
+
+ def get_monitor_status(self):
+ data = ""
+ with open(self.monitor_status_file, "r") as f:
+ data = f.read().strip()
+ return data
+
+ def get_status(self, checkpoint_time=0):
+ """
+ Monitor Status ---> Created Started Paused Stopped
+ ----------------------------------------------------------------------
+ slave_node N/A VALUE VALUE N/A
+ status Created VALUE Paused Stopped
+ last_synced N/A VALUE VALUE VALUE
+ last_synced_entry N/A VALUE VALUE VALUE
+ crawl_status N/A VALUE N/A N/A
+ entry N/A VALUE N/A N/A
+ data N/A VALUE N/A N/A
+ meta N/A VALUE N/A N/A
+ failures N/A VALUE VALUE VALUE
+ checkpoint_completed N/A VALUE VALUE VALUE
+ checkpoint_time N/A VALUE VALUE VALUE
+ checkpoint_completed_time N/A VALUE VALUE VALUE
+ """
+ data = self.default_values
+ with open(self.filename) as f:
+ try:
+ data.update(json.load(f))
+ except ValueError:
+ pass
+ monitor_status = self.get_monitor_status()
+
+ # Verifying whether monitor process running and adjusting status
+ if monitor_status in ["Started", "Paused"]:
+ try:
+ with open(self.monitor_pid_file, "r+") as f:
+ fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ monitor_status = "Stopped"
+ except (IOError, OSError) as e:
+ # If pid file not exists, either monitor died or Geo-rep
+ # not even started once
+ if e.errno == ENOENT:
+ monitor_status = "Stopped"
+ elif e.errno in (EACCES, EAGAIN):
+ # cannot grab. so, monitor process still running..move on
+ pass
+ else:
+ raise
+
+ if monitor_status in ["Created", "Paused", "Stopped"]:
+ data["worker_status"] = monitor_status
+
+ if monitor_status == "":
+ data["worker_status"] = "Stopped"
+
+ # Checkpoint adjustments
+ if checkpoint_time == 0:
+ data["checkpoint_completed"] = DEFAULT_STATUS
+ data["checkpoint_time"] = DEFAULT_STATUS
+ data["checkpoint_completion_time"] = DEFAULT_STATUS
+ else:
+ if checkpoint_time != data["checkpoint_time"]:
+ if checkpoint_time <= data["last_synced"]:
+ data["checkpoint_completed"] = "Yes"
+ data["checkpoint_time"] = checkpoint_time
+ data["checkpoint_completion_time"] = data["last_synced"]
+ else:
+ data["checkpoint_completed"] = "No"
+ data["checkpoint_time"] = checkpoint_time
+ data["checkpoint_completion_time"] = DEFAULT_STATUS
+
+ if data["checkpoint_time"] not in [0, DEFAULT_STATUS]:
+ chkpt_time = data["checkpoint_time"]
+ data["checkpoint_time"] = human_time(chkpt_time)
+ data["checkpoint_time_utc"] = human_time_utc(chkpt_time)
+
+ if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]:
+ chkpt_completion_time = data["checkpoint_completion_time"]
+ data["checkpoint_completion_time"] = human_time(
+ chkpt_completion_time)
+ data["checkpoint_completion_time_utc"] = human_time_utc(
+ chkpt_completion_time)
+
+ if data["last_synced"] == 0:
+ data["last_synced"] = DEFAULT_STATUS
+ data["last_synced_utc"] = DEFAULT_STATUS
+ else:
+ last_synced = data["last_synced"]
+ data["last_synced"] = human_time(last_synced)
+ data["last_synced_utc"] = human_time_utc(last_synced)
+
+ if data["worker_status"] != "Active":
+ data["last_synced"] = DEFAULT_STATUS
+ data["last_synced_utc"] = DEFAULT_STATUS
+ data["crawl_status"] = DEFAULT_STATUS
+ data["entry"] = DEFAULT_STATUS
+ data["data"] = DEFAULT_STATUS
+ data["meta"] = DEFAULT_STATUS
+ data["failures"] = DEFAULT_STATUS
+ data["checkpoint_completed"] = DEFAULT_STATUS
+ data["checkpoint_time"] = DEFAULT_STATUS
+ data["checkpoint_completed_time"] = DEFAULT_STATUS
+ data["checkpoint_time_utc"] = DEFAULT_STATUS
+ data["checkpoint_completion_time_utc"] = DEFAULT_STATUS
+
+ if data["worker_status"] not in ["Active", "Passive"]:
+ data["slave_node"] = DEFAULT_STATUS
+
+ if data.get("last_synced_utc", 0) == 0:
+ data["last_synced_utc"] = DEFAULT_STATUS
+
+ if data.get("checkpoint_completion_time_utc", 0) == 0:
+ data["checkpoint_completion_time_utc"] = DEFAULT_STATUS
+
+ if data.get("checkpoint_time_utc", 0) == 0:
+ data["checkpoint_time_utc"] = DEFAULT_STATUS
+
+ return data
+
+ def print_status(self, checkpoint_time=0, json_output=False):
+ status_out = self.get_status(checkpoint_time)
+ if json_output:
+ out = {}
+ # Convert all values as string
+ for k, v in status_out.items():
+ out[k] = str(v)
+ print(json.dumps(out))
+ return
+
+ for key, value in status_out.items():
+ print(("%s: %s" % (key, value)))
diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py
new file mode 100644
index 00000000000..e6406c36bd7
--- /dev/null
+++ b/geo-replication/syncdaemon/libcxattr.py
@@ -0,0 +1,112 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+from ctypes import CDLL, get_errno
+from py2py3 import (bytearray_to_str, gr_create_string_buffer,
+ gr_query_xattr, gr_lsetxattr, gr_lremovexattr)
+
+
+class Xattr(object):
+
+ """singleton that wraps the extended attributes system
+ interface for python using ctypes
+
+ Just implement it to the degree we need it, in particular
+ - we need just the l*xattr variants, ie. we never want symlinks to be
+ followed
+ - don't need size discovery for getxattr, as we always know the exact
+ sizes we expect
+ """
+
+ libc = CDLL("libc.so.6", use_errno=True)
+
+ @classmethod
+ def geterrno(cls):
+ return get_errno()
+
+ @classmethod
+ def raise_oserr(cls):
+ errn = cls.geterrno()
+ raise OSError(errn, os.strerror(errn))
+
+ @classmethod
+ def _query_xattr(cls, path, siz, syscall, *a):
+ if siz:
+ buf = gr_create_string_buffer(siz)
+ else:
+ buf = None
+ ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz)))
+ if ret == -1:
+ cls.raise_oserr()
+ if siz:
+ # py2 and py3 compatibility. Convert bytes array
+ # to string
+ result = bytearray_to_str(buf.raw)
+ return result[:ret]
+ else:
+ return ret
+
+ @classmethod
+ def lgetxattr(cls, path, attr, siz=0):
+ return gr_query_xattr(cls, path, siz, 'lgetxattr', attr)
+
+ @classmethod
+ def lgetxattr_buf(cls, path, attr):
+ """lgetxattr variant with size discovery"""
+ size = cls.lgetxattr(path, attr)
+ if size == -1:
+ cls.raise_oserr()
+ if size == 0:
+ return ''
+ return cls.lgetxattr(path, attr, size)
+
+ @classmethod
+ def llistxattr(cls, path, siz=0):
+ ret = gr_query_xattr(cls, path, siz, 'llistxattr')
+ if isinstance(ret, str):
+ ret = ret.strip('\0')
+ ret = ret.split('\0') if ret else []
+ return ret
+
+ @classmethod
+ def lsetxattr(cls, path, attr, val):
+ ret = gr_lsetxattr(cls, path, attr, val)
+ if ret == -1:
+ cls.raise_oserr()
+
+ @classmethod
+ def lremovexattr(cls, path, attr):
+ ret = gr_lremovexattr(cls, path, attr)
+ if ret == -1:
+ cls.raise_oserr()
+
+ @classmethod
+ def llistxattr_buf(cls, path):
+ """listxattr variant with size discovery"""
+ try:
+ # Assuming no more than 100 xattrs in a file/directory and
+ # each xattr key length will be less than 256 bytes
+ # llistxattr will be called with bigger size so that
+ # listxattr will not fail with ERANGE. OSError will be
+ # raised if fails even with the large size specified.
+ size = 256 * 100
+ return cls.llistxattr(path, size)
+ except OSError:
+ # If fixed length failed for getting list of xattrs then
+ # use the llistxattr call to get the size and use that
+ # size to get the list of xattrs.
+ size = cls.llistxattr(path)
+ if size == -1:
+ cls.raise_oserr()
+ if size == 0:
+ return []
+
+ return cls.llistxattr(path, size)
diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py
new file mode 100644
index 00000000000..a3bda7282c0
--- /dev/null
+++ b/geo-replication/syncdaemon/libgfchangelog.py
@@ -0,0 +1,143 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+from ctypes import CDLL, RTLD_GLOBAL, get_errno, byref, c_ulong
+from ctypes.util import find_library
+from syncdutils import ChangelogException, ChangelogHistoryNotAvailable
+from py2py3 import (gr_cl_history_changelog, gr_cl_done,
+ gr_create_string_buffer, gr_cl_register,
+ gr_cl_history_done, bytearray_to_str)
+
+
+libgfc = CDLL(
+ find_library("gfchangelog"),
+ mode=RTLD_GLOBAL,
+ use_errno=True
+)
+
+
+def _raise_changelog_err():
+ errn = get_errno()
+ raise ChangelogException(errn, os.strerror(errn))
+
+
+def _init():
+ if libgfc.gf_changelog_init(None) == -1:
+ _raise_changelog_err()
+
+
+def register(brick, path, log_file, log_level, retries=0):
+ _init()
+
+ ret = gr_cl_register(libgfc, brick, path, log_file, log_level, retries)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def scan():
+ ret = libgfc.gf_changelog_scan()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def startfresh():
+ ret = libgfc.gf_changelog_start_fresh()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def getchanges():
+ def clsort(cfile):
+ return cfile.split('.')[-1]
+
+ changes = []
+ buf = gr_create_string_buffer(4096)
+ call = libgfc.gf_changelog_next_change
+
+ while True:
+ ret = call(buf, 4096)
+ if ret in (0, -1):
+ break
+
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+ # cleanup tracker
+ startfresh()
+
+ return sorted(changes, key=clsort)
+
+
+def done(clfile):
+ ret = gr_cl_done(libgfc, clfile)
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def history_scan():
+ ret = libgfc.gf_history_changelog_scan()
+ if ret == -1:
+ _raise_changelog_err()
+
+ return ret
+
+
+def history_changelog(changelog_path, start, end, num_parallel):
+ actual_end = c_ulong()
+ ret = gr_cl_history_changelog(libgfc, changelog_path, start, end,
+ num_parallel, byref(actual_end))
+ if ret == -1:
+ _raise_changelog_err()
+
+ if ret == -2:
+ raise ChangelogHistoryNotAvailable()
+
+ return (ret, actual_end.value)
+
+
+def history_startfresh():
+ ret = libgfc.gf_history_changelog_start_fresh()
+ if ret == -1:
+ _raise_changelog_err()
+
+
+def history_getchanges():
+ def clsort(cfile):
+ return cfile.split('.')[-1]
+
+ changes = []
+ buf = gr_create_string_buffer(4096)
+ call = libgfc.gf_history_changelog_next_change
+
+ while True:
+ ret = call(buf, 4096)
+ if ret in (0, -1):
+ break
+
+ # py2 and py3 compatibility
+ result = bytearray_to_str(buf.raw[:ret - 1])
+ changes.append(result)
+
+ if ret == -1:
+ _raise_changelog_err()
+
+ return sorted(changes, key=clsort)
+
+
+def history_done(clfile):
+ ret = gr_cl_history_done(libgfc, clfile)
+ if ret == -1:
+ _raise_changelog_err()
diff --git a/geo-replication/syncdaemon/logutils.py b/geo-replication/syncdaemon/logutils.py
new file mode 100644
index 00000000000..01ae7852f23
--- /dev/null
+++ b/geo-replication/syncdaemon/logutils.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import logging
+from logging import Logger, handlers
+import sys
+import time
+
+
+class GLogger(Logger):
+
+ """Logger customizations for gsyncd.
+
+ It implements a log format similar to that of glusterfs.
+ """
+
+ def makeRecord(self, name, level, *a):
+ rv = Logger.makeRecord(self, name, level, *a)
+ rv.nsecs = (rv.created - int(rv.created)) * 1000000
+ fr = sys._getframe(4)
+ callee = fr.f_locals.get('self')
+ if callee:
+ ctx = str(type(callee)).split("'")[1].split('.')[-1]
+ else:
+ ctx = '<top>'
+ if not hasattr(rv, 'funcName'):
+ rv.funcName = fr.f_code.co_name
+ rv.lvlnam = logging.getLevelName(level)[0]
+ rv.ctx = ctx
+ return rv
+
+
+LOGFMT = ("[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s{0}"
+ ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s")
+
+
+def setup_logging(level="INFO", label="", log_file=""):
+ if label:
+ label = "(" + label + ")"
+
+ filename = None
+ stream = None
+ if log_file:
+ if log_file in ('-', '/dev/stderr'):
+ stream = sys.stderr
+ elif log_file == '/dev/stdout':
+ stream = sys.stdout
+ else:
+ filename = log_file
+
+ datefmt = "%Y-%m-%d %H:%M:%S"
+ fmt = LOGFMT.format(label)
+ logging.root = GLogger("root", level)
+ logging.setLoggerClass(GLogger)
+ logging.Formatter.converter = time.gmtime # Log in GMT/UTC time
+ logging.getLogger().handlers = []
+ logging.getLogger().setLevel(level)
+
+ if filename is not None:
+ logging_handler = handlers.WatchedFileHandler(filename)
+ formatter = logging.Formatter(fmt=fmt,
+ datefmt=datefmt)
+ logging_handler.setFormatter(formatter)
+ logging.getLogger().addHandler(logging_handler)
+ else:
+ logging.basicConfig(stream=stream,
+ format=fmt,
+ datefmt=datefmt,
+ level=level)
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
new file mode 100644
index 00000000000..9501aeae6b5
--- /dev/null
+++ b/geo-replication/syncdaemon/master.py
@@ -0,0 +1,2020 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+import sys
+import time
+import stat
+import logging
+import fcntl
+import string
+import errno
+import tarfile
+from errno import ENOENT, ENODATA, EEXIST, EACCES, EAGAIN, ESTALE, EINTR
+from threading import Condition, Lock
+from datetime import datetime
+
+import gsyncdconfig as gconf
+import libgfchangelog
+from rconf import rconf
+from syncdutils import (Thread, GsyncdError, escape_space_newline,
+ unescape_space_newline, gauxpfx, escape,
+ lstat, errno_wrap, FreeObject, lf, matching_disk_gfid,
+ NoStimeAvailable, PartialHistoryAvailable,
+ host_brick_split)
+
+URXTIME = (-1, 0)
+
+# Default rollover time set in changelog translator
+# changelog rollover time is hardcoded here to avoid the
+# xsync usage when crawling switch happens from history
+# to changelog. If rollover time increased in translator
+# then geo-rep can enter into xsync crawl after history
+# crawl before starting live changelog crawl.
+CHANGELOG_ROLLOVER_TIME = 15
+
+# Utility functions to help us to get to closer proximity
+# of the DRY principle (no, don't look for elevated or
+# perspectivistic things here)
+
+
+def _xtime_now():
+ t = time.time()
+ sec = int(t)
+ nsec = int((t - sec) * 1000000)
+ return (sec, nsec)
+
+
+def _volinfo_hook_relax_foreign(self):
+ volinfo_sys = self.get_sys_volinfo()
+ fgn_vi = volinfo_sys[self.KFGN]
+ if fgn_vi:
+ expiry = fgn_vi['timeout'] - int(time.time()) + 1
+ logging.info(lf('foreign volume info found, waiting for expiry',
+ expiry=expiry))
+ time.sleep(expiry)
+ volinfo_sys = self.get_sys_volinfo()
+ return volinfo_sys
+
+
+def edct(op, **ed):
+ dct = {}
+ dct['op'] = op
+ # This is used in automatic gfid conflict resolution.
+ # When marked True, it's skipped during re-processing.
+ dct['skip_entry'] = False
+ for k in ed:
+ if k == 'stat':
+ st = ed[k]
+ dst = dct['stat'] = {}
+ if st:
+ dst['uid'] = st.st_uid
+ dst['gid'] = st.st_gid
+ dst['mode'] = st.st_mode
+ dst['atime'] = st.st_atime
+ dst['mtime'] = st.st_mtime
+ else:
+ dct[k] = ed[k]
+ return dct
+
+
+# The API!
+
+def gmaster_builder(excrawl=None):
+ """produce the GMaster class variant corresponding
+ to sync mode"""
+ this = sys.modules[__name__]
+ modemixin = gconf.get("special-sync-mode")
+ if not modemixin:
+ modemixin = 'normal'
+
+ if gconf.get("change-detector") == 'xsync':
+ changemixin = 'xsync'
+ elif excrawl:
+ changemixin = excrawl
+ else:
+ changemixin = gconf.get("change-detector")
+
+ logging.debug(lf('setting up change detection mode',
+ mode=changemixin))
+ modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
+ crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
+
+ if gconf.get("use-rsync-xattrs"):
+ sendmarkmixin = SendmarkRsyncMixin
+ else:
+ sendmarkmixin = SendmarkNormalMixin
+
+ if gconf.get("ignore-deletes"):
+ purgemixin = PurgeNoopMixin
+ else:
+ purgemixin = PurgeNormalMixin
+
+ if gconf.get("sync-method") == "tarssh":
+ syncengine = TarSSHEngine
+ else:
+ syncengine = RsyncEngine
+
+ class _GMaster(crawlmixin, modemixin, sendmarkmixin,
+ purgemixin, syncengine):
+ pass
+
+ return _GMaster
+
+
+# Mixin classes that implement the data format
+# and logic particularities of the certain
+# sync modes
+
+class NormalMixin(object):
+
+ """normal geo-rep behavior"""
+
+ minus_infinity = URXTIME
+
+ # following staticmethods ideally would be
+ # methods of an xtime object (in particular,
+ # implementing the hooks needed for comparison
+ # operators), but at this point we don't yet
+ # have a dedicated xtime class
+
+ @staticmethod
+ def serialize_xtime(xt):
+ return "%d.%d" % tuple(xt)
+
+ @staticmethod
+ def deserialize_xtime(xt):
+ return tuple(int(x) for x in xt.split("."))
+
+ @staticmethod
+ def native_xtime(xt):
+ return xt
+
+ @staticmethod
+ def xtime_geq(xt0, xt1):
+ return xt0 >= xt1
+
+ def make_xtime_opts(self, is_master, opts):
+ if 'create' not in opts:
+ opts['create'] = is_master
+ if 'default_xtime' not in opts:
+ opts['default_xtime'] = URXTIME
+
+ def xtime_low(self, rsc, path, **opts):
+ if rsc == self.master:
+ xt = rsc.server.xtime(path, self.uuid)
+ else:
+ xt = rsc.server.stime(path, self.uuid)
+ if isinstance(xt, int) and xt == ENODATA:
+ xt = rsc.server.xtime(path, self.uuid)
+ if not isinstance(xt, int):
+ self.slave.server.set_stime(path, self.uuid, xt)
+ if isinstance(xt, int) and xt != ENODATA:
+ return xt
+ if xt == ENODATA or xt < self.volmark:
+ if opts['create']:
+ xt = _xtime_now()
+ rsc.server.aggregated.set_xtime(path, self.uuid, xt)
+ else:
+ zero_zero = (0, 0)
+ if xt != zero_zero:
+ xt = opts['default_xtime']
+ return xt
+
+ def keepalive_payload_hook(self, timo, gap):
+ # first grab a reference as self.volinfo
+ # can be changed in main thread
+ vi = self.volinfo
+ if vi:
+ # then have a private copy which we can mod
+ vi = vi.copy()
+ vi['timeout'] = int(time.time()) + timo
+ else:
+ # send keep-alive more frequently to
+ # avoid a delay in announcing our volume info
+ # to slave if it becomes established in the
+ # meantime
+ gap = min(10, gap)
+ return (vi, gap)
+
+ def volinfo_hook(self):
+ return self.get_sys_volinfo()
+
+ def xtime_reversion_hook(self, path, xtl, xtr):
+ if xtr > xtl:
+ raise GsyncdError("timestamp corruption for " + path)
+
+ def need_sync(self, e, xte, xtrd):
+ return xte > xtrd
+
+ def set_slave_xtime(self, path, mark):
+ self.slave.server.set_stime(path, self.uuid, mark)
+ # self.slave.server.set_xtime_remote(path, self.uuid, mark)
+
+
+class PartialMixin(NormalMixin):
+
+ """a variant tuned towards operation with a master
+ that has partial info of the slave (brick typically)"""
+
+ def xtime_reversion_hook(self, path, xtl, xtr):
+ pass
+
+
+class RecoverMixin(NormalMixin):
+
+ """a variant that differs from normal in terms
+ of ignoring non-indexed files"""
+
+ @staticmethod
+ def make_xtime_opts(is_master, opts):
+ if 'create' not in opts:
+ opts['create'] = False
+ if 'default_xtime' not in opts:
+ opts['default_xtime'] = URXTIME
+
+ def keepalive_payload_hook(self, timo, gap):
+ return (None, gap)
+
+ def volinfo_hook(self):
+ return _volinfo_hook_relax_foreign(self)
+
+# Further mixins for certain tunable behaviors
+
+
+class SendmarkNormalMixin(object):
+
+ def sendmark_regular(self, *a, **kw):
+ return self.sendmark(*a, **kw)
+
+
+class SendmarkRsyncMixin(object):
+
+ def sendmark_regular(self, *a, **kw):
+ pass
+
+
+class PurgeNormalMixin(object):
+
+ def purge_missing(self, path, names):
+ self.slave.server.purge(path, names)
+
+
+class PurgeNoopMixin(object):
+
+ def purge_missing(self, path, names):
+ pass
+
+
+class TarSSHEngine(object):
+
+ """Sync engine that uses tar(1) piped over ssh(1)
+ for data transfers. Good for lots of small files.
+ """
+
+ def a_syncdata(self, files):
+ logging.debug(lf("Files", files=files))
+
+ for f in files:
+ pb = self.syncer.add(f)
+
+ def regjob(se, xte, pb):
+ rv = pb.wait()
+ if rv[0]:
+ logging.debug(lf('synced', file=se))
+ return True
+ else:
+ # stat check for file presence
+ st = lstat(se)
+ if isinstance(st, int):
+ # file got unlinked in the interim
+ self.unlinked_gfids.add(se)
+ return True
+
+ self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
+
+ def syncdata_wait(self):
+ if self.wait(self.FLAT_DIR_HIERARCHY, None):
+ return True
+
+ def syncdata(self, files):
+ self.a_syncdata(files)
+ self.syncdata_wait()
+
+
+class RsyncEngine(object):
+
+ """Sync engine that uses rsync(1) for data transfers"""
+
+ def a_syncdata(self, files):
+ logging.debug(lf("files", files=files))
+
+ for f in files:
+ logging.debug(lf('candidate for syncing', file=f))
+ pb = self.syncer.add(f)
+
+ def regjob(se, xte, pb):
+ rv = pb.wait()
+ if rv[0]:
+ logging.debug(lf('synced', file=se))
+ return True
+ else:
+ # stat to check if the file exist
+ st = lstat(se)
+ if isinstance(st, int):
+ # file got unlinked in the interim
+ self.unlinked_gfids.add(se)
+ return True
+
+ self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
+
+ def syncdata_wait(self):
+ if self.wait(self.FLAT_DIR_HIERARCHY, None):
+ return True
+
+ def syncdata(self, files):
+ self.a_syncdata(files)
+ self.syncdata_wait()
+
+
+class GMasterCommon(object):
+
+ """abstract class impementling master role"""
+
+ KFGN = 0
+ KNAT = 1
+
+ def get_sys_volinfo(self):
+ """query volume marks on fs root
+
+ err out on multiple foreign masters
+ """
+ fgn_vis, nat_vi = (
+ self.master.server.aggregated.foreign_volume_infos(),
+ self.master.server.aggregated.native_volume_info())
+ fgn_vi = None
+ if fgn_vis:
+ if len(fgn_vis) > 1:
+ raise GsyncdError("cannot work with multiple foreign masters")
+ fgn_vi = fgn_vis[0]
+ return fgn_vi, nat_vi
+
+ @property
+ def uuid(self):
+ if self.volinfo:
+ return self.volinfo['uuid']
+
+ @property
+ def volmark(self):
+ if self.volinfo:
+ return self.volinfo['volume_mark']
+
+ def get_entry_stime(self):
+ data = self.slave.server.entry_stime(".", self.uuid)
+ if isinstance(data, int):
+ data = None
+ return data
+
+ def get_data_stime(self):
+ data = self.slave.server.stime(".", self.uuid)
+ if isinstance(data, int):
+ data = None
+ return data
+
+ def xtime(self, path, *a, **opts):
+ """get amended xtime
+
+ as of amending, we can create missing xtime, or
+ determine a valid value if what we get is expired
+ (as of the volume mark expiry); way of amendig
+ depends on @opts and on subject of query (master
+ or slave).
+ """
+ if a:
+ rsc = a[0]
+ else:
+ rsc = self.master
+ self.make_xtime_opts(rsc == self.master, opts)
+ return self.xtime_low(rsc, path, **opts)
+
+ def __init__(self, master, slave):
+ self.master = master
+ self.slave = slave
+ self.jobtab = {}
+ if gconf.get("sync-method") == "tarssh":
+ self.syncer = Syncer(slave, self.slave.tarssh, [2])
+ else:
+ # partial transfer (cf. rsync(1)), that's normal
+ self.syncer = Syncer(slave, self.slave.rsync, [23, 24])
+ # crawls vs. turns:
+ # - self.crawls is simply the number of crawl() invocations on root
+ # - one turn is a maximal consecutive sequence of crawls so that each
+ # crawl in it detects a change to be synced
+ # - self.turns is the number of turns since start
+ # - self.total_turns is a limit so that if self.turns reaches it, then
+ # we exit (for diagnostic purposes)
+ # so, eg., if the master fs changes unceasingly, self.turns will remain
+ # 0.
+ self.crawls = 0
+ self.turns = 0
+ self.total_turns = rconf.turns
+ self.crawl_start = datetime.now()
+ self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0}
+ self.start = None
+ self.change_seen = None
+ # the actual volinfo we make use of
+ self.volinfo = None
+ self.terminate = False
+ self.sleep_interval = 1
+ self.unlinked_gfids = set()
+
+ def init_keep_alive(cls):
+ """start the keep-alive thread """
+ timo = gconf.get("slave-timeout", 0)
+ if timo > 0:
+ def keep_alive():
+ while True:
+ vi, gap = cls.keepalive_payload_hook(timo, timo * 0.5)
+ cls.slave.server.keep_alive(vi)
+ time.sleep(gap)
+ t = Thread(target=keep_alive)
+ t.start()
+
+ def mgmt_lock(self):
+
+ """Take management volume lock """
+ if rconf.mgmt_lock_fd:
+ try:
+ fcntl.lockf(rconf.mgmt_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ return True
+ except:
+ ex = sys.exc_info()[1]
+ if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
+ return False
+ raise
+
+ fd = None
+ bname = str(self.uuid) + "_" + rconf.args.slave_id + "_subvol_" \
+ + str(rconf.args.subvol_num) + ".lock"
+ mgmt_lock_dir = os.path.join(gconf.get("meta-volume-mnt"), "geo-rep")
+ path = os.path.join(mgmt_lock_dir, bname)
+ logging.debug(lf("lock file path", path=path))
+ try:
+ fd = os.open(path, os.O_CREAT | os.O_RDWR)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ logging.info("Creating geo-rep directory in meta volume...")
+ try:
+ os.makedirs(mgmt_lock_dir)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST:
+ pass
+ else:
+ raise
+ fd = os.open(path, os.O_CREAT | os.O_RDWR)
+ else:
+ raise
+ try:
+ fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ # Save latest FD for future use
+ rconf.mgmt_lock_fd = fd
+ except:
+ ex = sys.exc_info()[1]
+ if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
+ # cannot grab, it's taken
+ rconf.mgmt_lock_fd = fd
+ return False
+ raise
+
+ return True
+
+ def should_crawl(self):
+ if not gconf.get("use-meta-volume"):
+ return rconf.args.local_node_id in self.master.server.node_uuid()
+
+ if not os.path.ismount(gconf.get("meta-volume-mnt")):
+ logging.error("Meta-volume is not mounted. Worker Exiting...")
+ sys.exit(1)
+ return self.mgmt_lock()
+
+ def register(self):
+ self.register()
+
+ def crawlwrap(self, oneshot=False, register_time=None):
+ if oneshot:
+ # it's important to do this during the oneshot crawl as
+ # for a passive gsyncd (ie. in a replicate scenario)
+ # the keepalive thread would keep the connection alive.
+ self.init_keep_alive()
+
+ # If crawlwrap is called when partial history available,
+ # then it sets register_time which is the time when geo-rep
+ # worker registered to changelog consumption. Since nsec is
+ # not considered in register time, there are chances of skipping
+ # changes detection in xsync crawl. This limit will be reset when
+ # crawlwrap is called again.
+ self.live_changelog_start_time = None
+ if register_time:
+ self.live_changelog_start_time = (register_time, 0)
+
+ # no need to maintain volinfo state machine.
+ # in a cascading setup, each geo-replication session is
+ # independent (ie. 'volume-mark' and 'xtime' are not
+ # propagated). This is because the slave's xtime is now
+ # stored on the master itself. 'volume-mark' just identifies
+ # that we are in a cascading setup and need to enable
+ # 'geo-replication.ignore-pid-check' option.
+ volinfo_sys = self.volinfo_hook()
+ self.volinfo = volinfo_sys[self.KNAT]
+ inter_master = volinfo_sys[self.KFGN]
+ logging.debug("%s master with volume id %s ..." %
+ (inter_master and "intermediate" or "primary",
+ self.uuid))
+ rconf.volume_id = self.uuid
+ if self.volinfo:
+ if self.volinfo['retval']:
+ logging.warn(lf("master cluster's info may not be valid",
+ error=self.volinfo['retval']))
+ else:
+ raise GsyncdError("master volinfo unavailable")
+ self.lastreport['time'] = time.time()
+
+ t0 = time.time()
+ crawl = self.should_crawl()
+ while not self.terminate:
+ if self.start:
+ logging.debug("... crawl #%d done, took %.6f seconds" %
+ (self.crawls, time.time() - self.start))
+ self.start = time.time()
+ should_display_info = self.start - self.lastreport['time'] >= 60
+ if should_display_info:
+ logging.debug("%d crawls, %d turns",
+ self.crawls - self.lastreport['crawls'],
+ self.turns - self.lastreport['turns'])
+ self.lastreport.update(crawls=self.crawls,
+ turns=self.turns,
+ time=self.start)
+ t1 = time.time()
+ if int(t1 - t0) >= gconf.get("replica-failover-interval"):
+ crawl = self.should_crawl()
+ t0 = t1
+ self.update_worker_remote_node()
+ if not crawl:
+ self.status.set_passive()
+ # bring up _this_ brick to the cluster stime
+ # which is min of cluster (but max of the replicas)
+ brick_stime = self.xtime('.', self.slave)
+ cluster_stime = self.master.server.aggregated.stime_mnt(
+ '.', '.'.join([str(self.uuid), rconf.args.slave_id]))
+ logging.debug(lf("Crawl info",
+ cluster_stime=cluster_stime,
+ brick_stime=brick_stime))
+
+ if not isinstance(cluster_stime, int):
+ if brick_stime < cluster_stime:
+ self.slave.server.set_stime(
+ self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
+ self.upd_stime(cluster_stime)
+ # Purge all changelogs available in processing dir
+ # less than cluster_stime
+ proc_dir = os.path.join(self.tempdir,
+ ".processing")
+
+ if os.path.exists(proc_dir):
+ to_purge = [f for f in os.listdir(proc_dir)
+ if (f.startswith("CHANGELOG.") and
+ int(f.split('.')[-1]) <
+ cluster_stime[0])]
+ for f in to_purge:
+ os.remove(os.path.join(proc_dir, f))
+
+ time.sleep(5)
+ continue
+
+ self.status.set_active()
+ self.crawl()
+
+ if oneshot:
+ return
+ time.sleep(self.sleep_interval)
+
+ @staticmethod
+ def humantime(*tpair):
+ """format xtime-like (sec, nsec) pair to human readable format"""
+ ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\
+ strftime("%Y-%m-%d %H:%M:%S")
+ if len(tpair) > 1:
+ ts += '.' + str(tpair[1])
+ return ts
+
+ def _crawl_time_format(self, crawl_time):
+ # Ex: 5 years, 4 days, 20:23:10
+ years, days = divmod(crawl_time.days, 365.25)
+ years = int(years)
+ days = int(days)
+
+ date = ""
+ m, s = divmod(crawl_time.seconds, 60)
+ h, m = divmod(m, 60)
+
+ if years != 0:
+ date += "%s %s " % (years, "year" if years == 1 else "years")
+ if days != 0:
+ date += "%s %s " % (days, "day" if days == 1 else "days")
+
+ date += "%s:%s:%s" % (string.zfill(h, 2),
+ string.zfill(m, 2), string.zfill(s, 2))
+ return date
+
+ def add_job(self, path, label, job, *a, **kw):
+ """insert @job function to job table at @path with @label"""
+ if self.jobtab.get(path) is None:
+ self.jobtab[path] = []
+ self.jobtab[path].append((label, a, lambda: job(*a, **kw)))
+
+ def add_failjob(self, path, label):
+ """invoke .add_job with a job that does nothing just fails"""
+ logging.debug('salvaged: ' + label)
+ self.add_job(path, label, lambda: False)
+
+ def wait(self, path, *args):
+ """perform jobs registered for @path
+
+ Reset jobtab entry for @path,
+ determine success as the conjunction of
+ success of all the jobs. In case of
+ success, call .sendmark on @path
+ """
+ jobs = self.jobtab.pop(path, [])
+ succeed = True
+ for j in jobs:
+ ret = j[-1]()
+ if not ret:
+ succeed = False
+ if succeed and not args[0] is None:
+ self.sendmark(path, *args)
+ return succeed
+
+ def sendmark(self, path, mark, adct=None):
+ """update slave side xtime for @path to master side xtime
+
+ also can send a setattr payload (see Server.setattr).
+ """
+ if adct:
+ self.slave.server.setattr(path, adct)
+ self.set_slave_xtime(path, mark)
+
+
+class XCrawlMetadata(object):
+ def __init__(self, st_uid, st_gid, st_mode, st_atime, st_mtime):
+ self.st_uid = int(st_uid)
+ self.st_gid = int(st_gid)
+ self.st_mode = int(st_mode)
+ self.st_atime = float(st_atime)
+ self.st_mtime = float(st_mtime)
+
+
+class GMasterChangelogMixin(GMasterCommon):
+
+ """ changelog based change detection and syncing """
+
+ # index for change type and entry
+ IDX_START = 0
+ IDX_END = 2
+ UNLINK_ENTRY = 2
+
+ POS_GFID = 0
+ POS_TYPE = 1
+ POS_ENTRY1 = -1
+
+ TYPE_META = "M "
+ TYPE_GFID = "D "
+ TYPE_ENTRY = "E "
+
+ MAX_EF_RETRIES = 10
+ MAX_OE_RETRIES = 10
+
+ # flat directory hierarchy for gfid based access
+ FLAT_DIR_HIERARCHY = '.'
+
+ CHANGELOG_CONN_RETRIES = 5
+
+ def init_fop_batch_stats(self):
+ self.batch_stats = {
+ "CREATE": 0,
+ "MKNOD": 0,
+ "UNLINK": 0,
+ "MKDIR": 0,
+ "RMDIR": 0,
+ "LINK": 0,
+ "SYMLINK": 0,
+ "RENAME": 0,
+ "SETATTR": 0,
+ "SETXATTR": 0,
+ "XATTROP": 0,
+ "DATA": 0,
+ "ENTRY_SYNC_TIME": 0,
+ "META_SYNC_TIME": 0,
+ "DATA_START_TIME": 0
+ }
+
+ def update_fop_batch_stats(self, ty):
+ if ty in ['FSETXATTR']:
+ ty = 'SETXATTR'
+ self.batch_stats[ty] = self.batch_stats.get(ty, 0) + 1
+
+ def archive_and_purge_changelogs(self, changelogs):
+ # Creates tar file instead of tar.gz, since changelogs will
+ # be appended to existing tar. archive name is
+ # archive_<YEAR><MONTH>.tar
+ archive_name = "archive_%s.tar" % datetime.today().strftime(
+ gconf.get("changelog-archive-format"))
+
+ try:
+ tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
+ archive_name),
+ "a")
+ except tarfile.ReadError:
+ tar = tarfile.open(os.path.join(self.processed_changelogs_dir,
+ archive_name),
+ "w")
+
+ for f in changelogs:
+ try:
+ f = os.path.basename(f)
+ tar.add(os.path.join(self.processed_changelogs_dir, f),
+ arcname=os.path.basename(f))
+ except:
+ exc = sys.exc_info()[1]
+ if ((isinstance(exc, OSError) or
+ isinstance(exc, IOError)) and exc.errno == ENOENT):
+ continue
+ else:
+ tar.close()
+ raise
+ tar.close()
+
+ for f in changelogs:
+ try:
+ f = os.path.basename(f)
+ os.remove(os.path.join(self.processed_changelogs_dir, f))
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ continue
+ else:
+ raise
+
+ def setup_working_dir(self):
+ workdir = os.path.join(gconf.get("working-dir"),
+ escape(rconf.args.local_path))
+ logging.debug('changelog working dir %s' % workdir)
+ return workdir
+
+ def log_failures(self, failures, entry_key, gfid_prefix, log_prefix):
+ num_failures = 0
+ for failure in failures:
+ st = lstat(os.path.join(gfid_prefix, failure[0][entry_key]))
+ if not isinstance(st, int):
+ num_failures += 1
+ logging.error(lf('%s FAILED' % log_prefix,
+ data=failure))
+ if failure[0]['op'] == 'MKDIR':
+ raise GsyncdError("The above directory failed to sync."
+ " Please fix it to proceed further.")
+
+ self.status.inc_value("failures", num_failures)
+
+ def fix_possible_entry_failures(self, failures, retry_count, entries):
+ pfx = gauxpfx()
+ fix_entry_ops = []
+ failures1 = []
+ remove_gfids = set()
+ for failure in failures:
+ if failure[2]['name_mismatch']:
+ pbname = failure[2]['slave_entry']
+ elif failure[2]['dst']:
+ pbname = failure[0]['entry1']
+ else:
+ pbname = failure[0]['entry']
+
+ op = failure[0]['op']
+ # name exists but gfid is different
+ if failure[2]['gfid_mismatch'] or failure[2]['name_mismatch']:
+ slave_gfid = failure[2]['slave_gfid']
+ st = lstat(os.path.join(pfx, slave_gfid))
+ # Takes care of scenarios with no hardlinks
+ if isinstance(st, int) and st == ENOENT:
+ logging.debug(lf('Entry not present on master. Fixing gfid '
+ 'mismatch in slave. Deleting the entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ # Add deletion to fix_entry_ops list
+ if failure[2]['slave_isdir']:
+ fix_entry_ops.append(
+ edct('RMDIR',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ else:
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ remove_gfids.add(slave_gfid)
+ if op in ['RENAME']:
+ # If renamed gfid doesn't exists on master, remove
+ # rename entry and unlink src on slave
+ st = lstat(os.path.join(pfx, failure[0]['gfid']))
+ if isinstance(st, int) and st == ENOENT:
+ logging.debug("Unlink source %s" % repr(failure))
+ remove_gfids.add(failure[0]['gfid'])
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
+ # Takes care of scenarios of hardlinks/renames on master
+ elif not isinstance(st, int):
+ if matching_disk_gfid(slave_gfid, pbname):
+ # Safe to ignore the failure as master contains same
+ # file with same gfid. Remove entry from entries list
+ logging.debug(lf('Fixing gfid mismatch in slave. '
+ ' Safe to ignore, take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ remove_gfids.add(failure[0]['gfid'])
+ if op == 'RENAME':
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[0]['gfid'],
+ entry=failure[0]['entry']))
+ # The file exists on master but with different name.
+ # Probably renamed and got missed during xsync crawl.
+ elif failure[2]['slave_isdir']:
+ realpath = os.readlink(os.path.join(
+ rconf.args.local_path,
+ ".glusterfs",
+ slave_gfid[0:2],
+ slave_gfid[2:4],
+ slave_gfid))
+ dst_entry = os.path.join(pfx, realpath.split('/')[-2],
+ realpath.split('/')[-1])
+ src_entry = pbname
+ logging.debug(lf('Fixing dir name/gfid mismatch in '
+ 'slave', retry_count=retry_count,
+ entry=repr(failure)))
+ if src_entry == dst_entry:
+ # Safe to ignore the failure as master contains
+ # same directory as in slave with same gfid.
+ # Remove the failure entry from entries list
+ logging.debug(lf('Fixing dir name/gfid mismatch'
+ ' in slave. Safe to ignore, '
+ 'take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ try:
+ entries.remove(failure[0])
+ except ValueError:
+ pass
+ else:
+ rename_dict = edct('RENAME', gfid=slave_gfid,
+ entry=src_entry,
+ entry1=dst_entry, stat=st,
+ link=None)
+ logging.debug(lf('Fixing dir name/gfid mismatch'
+ ' in slave. Renaming',
+ retry_count=retry_count,
+ entry=repr(rename_dict)))
+ fix_entry_ops.append(rename_dict)
+ else:
+ # A hardlink file exists with different name or
+ # renamed file exists and we are sure from
+ # matching_disk_gfid check that the entry doesn't
+ # exist with same gfid so we can safely delete on slave
+ logging.debug(lf('Fixing file gfid mismatch in slave. '
+ 'Hardlink/Rename Case. Deleting entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ fix_entry_ops.append(
+ edct('UNLINK',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ elif failure[1] == ENOENT:
+ if op in ['RENAME']:
+ pbname = failure[0]['entry1']
+ else:
+ pbname = failure[0]['entry']
+
+ pargfid = pbname.split('/')[1]
+ st = lstat(os.path.join(pfx, pargfid))
+ # Safe to ignore the failure as master doesn't contain
+ # parent directory.
+ if isinstance(st, int):
+ logging.debug(lf('Fixing ENOENT error in slave. Parent '
+ 'does not exist on master. Safe to '
+ 'ignore, take out entry',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ try:
+ entries.remove(failure[0])
+ except ValueError:
+ pass
+ else:
+ logging.debug(lf('Fixing ENOENT error in slave. Create '
+ 'parent directory on slave.',
+ retry_count=retry_count,
+ entry=repr(failure)))
+ realpath = os.readlink(os.path.join(rconf.args.local_path,
+ ".glusterfs",
+ pargfid[0:2],
+ pargfid[2:4],
+ pargfid))
+ dir_entry = os.path.join(pfx, realpath.split('/')[-2],
+ realpath.split('/')[-1])
+ fix_entry_ops.append(
+ edct('MKDIR', gfid=pargfid, entry=dir_entry,
+ mode=st.st_mode, uid=st.st_uid, gid=st.st_gid))
+
+ logging.debug("remove_gfids: %s" % repr(remove_gfids))
+ if remove_gfids:
+ for e in entries:
+ if e['op'] in ['MKDIR', 'MKNOD', 'CREATE', 'RENAME'] \
+ and e['gfid'] in remove_gfids:
+ logging.debug("Removed entry op from retrial list: entry: %s" % repr(e))
+ e['skip_entry'] = True
+
+ if fix_entry_ops:
+ # Process deletions of entries whose gfids are mismatched
+ failures1 = self.slave.server.entry_ops(fix_entry_ops)
+
+ return (failures1, fix_entry_ops)
+
+ def handle_entry_failures(self, failures, entries):
+ retries = 0
+ pending_failures = False
+ failures1 = []
+ failures2 = []
+ entry_ops1 = []
+ entry_ops2 = []
+
+ if failures:
+ pending_failures = True
+ failures1 = failures
+ entry_ops1 = entries
+
+ while pending_failures and retries < self.MAX_EF_RETRIES:
+ retries += 1
+ (failures2, entry_ops2) = self.fix_possible_entry_failures(
+ failures1, retries, entry_ops1)
+ if not failures2:
+ pending_failures = False
+ logging.info(lf('Successfully fixed entry ops with gfid '
+ 'mismatch', retry_count=retries))
+ else:
+ pending_failures = True
+ failures1 = failures2
+ entry_ops1 = entry_ops2
+
+ if pending_failures:
+ for failure in failures1:
+ logging.error("Failed to fix entry ops %s", repr(failure))
+
+ def process_change(self, change, done, retry):
+ pfx = gauxpfx()
+ clist = []
+ entries = []
+ meta_gfid = set()
+ datas = set()
+
+ change_ts = change.split(".")[-1]
+
+ # Ignore entry ops which are already processed in Changelog modes
+ ignore_entry_ops = False
+ entry_stime = None
+ data_stime = None
+ if self.name in ["live_changelog", "history_changelog"]:
+ entry_stime = self.get_entry_stime()
+ data_stime = self.get_data_stime()
+
+ if entry_stime is not None and data_stime is not None:
+ # if entry_stime is not None but data_stime > entry_stime
+ # This situation is caused by the stime update of Passive worker
+ # Consider data_stime in this case.
+ if data_stime[0] > entry_stime[0]:
+ entry_stime = data_stime
+
+ # Compare the entry_stime with changelog file suffix
+ # if changelog time is less than entry_stime then ignore
+ if int(change_ts) <= entry_stime[0]:
+ ignore_entry_ops = True
+
+ try:
+ f = open(change, "r")
+ clist = f.readlines()
+ f.close()
+ except IOError:
+ raise
+
+ for e in clist:
+ e = e.strip()
+ et = e[self.IDX_START:self.IDX_END] # entry type
+ ec = e[self.IDX_END:].split(' ') # rest of the bits
+
+ # skip ENTRY operation if hot tier brick
+ if self.name == 'live_changelog' or \
+ self.name == 'history_changelog':
+ if rconf.args.is_hottier and et == self.TYPE_ENTRY:
+ logging.debug(lf('skip ENTRY op if hot tier brick',
+ op=ec[self.POS_TYPE]))
+ continue
+
+ # Data and Meta operations are decided while parsing
+ # UNLINK/RMDIR/MKNOD except that case ignore all the other
+ # entry ops if ignore_entry_ops is True.
+ # UNLINK/RMDIR/MKNOD entry_ops are ignored in the end
+ if ignore_entry_ops and et == self.TYPE_ENTRY and \
+ ec[self.POS_TYPE] not in ["UNLINK", "RMDIR", "MKNOD"]:
+ continue
+
+ if et == self.TYPE_ENTRY:
+ # extract information according to the type of
+ # the entry operation. create(), mkdir() and mknod()
+ # have mode, uid, gid information in the changelog
+ # itself, so no need to stat()...
+ ty = ec[self.POS_TYPE]
+
+ self.update_fop_batch_stats(ec[self.POS_TYPE])
+
+ # PARGFID/BNAME
+ en = unescape_space_newline(
+ os.path.join(pfx, ec[self.POS_ENTRY1]))
+ # GFID of the entry
+ gfid = ec[self.POS_GFID]
+
+ if ty in ['UNLINK', 'RMDIR']:
+ # The index of PARGFID/BNAME for UNLINK, RMDIR
+ # is no more the last index. It varies based on
+ # changelog.capture-del-path is enabled or not.
+ en = unescape_space_newline(
+ os.path.join(pfx, ec[self.UNLINK_ENTRY]))
+
+ # Remove from DATA list, so that rsync will
+ # not fail
+ pt = os.path.join(pfx, ec[0])
+ st = lstat(pt)
+ if pt in datas and isinstance(st, int):
+ # file got unlinked, May be historical Changelog
+ datas.remove(pt)
+
+ if ty in ['RMDIR'] and not isinstance(st, int):
+ logging.info(lf('Ignoring rmdir. Directory present in '
+ 'master', gfid=gfid, pgfid_bname=en))
+ continue
+
+ if not gconf.get("ignore-deletes"):
+ if not ignore_entry_ops:
+ entries.append(edct(ty, gfid=gfid, entry=en))
+ elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
+ # Special case: record mknod as link
+ if ty in ['MKNOD']:
+ mode = int(ec[2])
+ if mode & 0o1000:
+ # Avoid stat'ing the file as it
+ # may be deleted in the interim
+ st = FreeObject(st_mode=int(ec[2]),
+ st_uid=int(ec[3]),
+ st_gid=int(ec[4]),
+ st_atime=0,
+ st_mtime=0)
+
+ # So, it may be deleted, but still we are
+ # append LINK? Because, the file will be
+ # CREATED if source not exists.
+ entries.append(edct('LINK', stat=st, entry=en,
+ gfid=gfid))
+
+ # Here, we have the assumption that only
+ # tier-gfid.linkto causes this mknod. Add data
+ datas.add(os.path.join(pfx, ec[0]))
+ continue
+
+ # stat info. present in the changelog itself
+ entries.append(edct(ty, gfid=gfid, entry=en,
+ mode=int(ec[2]),
+ uid=int(ec[3]), gid=int(ec[4])))
+ elif ty == "RENAME":
+ go = os.path.join(pfx, gfid)
+ st = lstat(go)
+ if isinstance(st, int):
+ st = {}
+
+ rl = None
+ if st and stat.S_ISLNK(st.st_mode):
+ rl = errno_wrap(os.readlink, [en], [ENOENT],
+ [ESTALE, EINTR])
+ if isinstance(rl, int):
+ rl = None
+
+ e1 = unescape_space_newline(
+ os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
+ entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en,
+ stat=st, link=rl))
+ # If src doesn't exist while doing rename, destination
+ # is created. If data is not followed by rename, this
+ # remains zero byte file on slave. Hence add data entry
+ # for renames
+ datas.add(os.path.join(pfx, gfid))
+ else:
+ # stat() to get mode and other information
+ if not matching_disk_gfid(gfid, en):
+ logging.debug(lf('Ignoring entry, purged in the '
+ 'interim', file=en, gfid=gfid))
+ continue
+
+ go = os.path.join(pfx, gfid)
+ st = lstat(go)
+ if isinstance(st, int):
+ logging.debug(lf('Ignoring entry, purged in the '
+ 'interim', file=en, gfid=gfid))
+ continue
+
+ if ty == 'LINK':
+ rl = None
+ if st and stat.S_ISLNK(st.st_mode):
+ rl = errno_wrap(os.readlink, [en], [ENOENT],
+ [ESTALE, EINTR])
+ if isinstance(rl, int):
+ rl = None
+ entries.append(edct(ty, stat=st, entry=en, gfid=gfid,
+ link=rl))
+ # If src doesn't exist while doing link, destination
+ # is created based on file type. If data is not
+ # followed by link, this remains zero byte file on
+ # slave. Hence add data entry for links
+ if rl is None:
+ datas.add(os.path.join(pfx, gfid))
+ elif ty == 'SYMLINK':
+ rl = errno_wrap(os.readlink, [en], [ENOENT],
+ [ESTALE, EINTR])
+ if isinstance(rl, int):
+ continue
+
+ entries.append(
+ edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
+ else:
+ logging.warn(lf('ignoring op',
+ gfid=gfid,
+ type=ty))
+ elif et == self.TYPE_GFID:
+ # If self.unlinked_gfids is available, then that means it is
+ # retrying the changelog second time. Do not add the GFID's
+ # to rsync job if failed previously but unlinked in master
+ if self.unlinked_gfids and \
+ os.path.join(pfx, ec[0]) in self.unlinked_gfids:
+ logging.debug("ignoring data, since file purged interim")
+ else:
+ datas.add(os.path.join(pfx, ec[0]))
+ elif et == self.TYPE_META:
+ self.update_fop_batch_stats(ec[self.POS_TYPE])
+ if ec[1] == 'SETATTR': # only setattr's for now...
+ if len(ec) == 5:
+ # In xsync crawl, we already have stat data
+ # avoid doing stat again
+ meta_gfid.add((os.path.join(pfx, ec[0]),
+ XCrawlMetadata(st_uid=ec[2],
+ st_gid=ec[3],
+ st_mode=ec[4],
+ st_atime=ec[5],
+ st_mtime=ec[6])))
+ else:
+ meta_gfid.add((os.path.join(pfx, ec[0]), ))
+ elif ec[1] in ['SETXATTR', 'XATTROP', 'FXATTROP']:
+ # To sync xattr/acls use rsync/tar, --xattrs and --acls
+ # switch to rsync and tar
+ if not gconf.get("sync-method") == "tarssh" and \
+ (gconf.get("sync-xattrs") or gconf.get("sync-acls")):
+ datas.add(os.path.join(pfx, ec[0]))
+ else:
+ logging.warn(lf('got invalid fop type',
+ type=et))
+ logging.debug('entries: %s' % repr(entries))
+
+ # Increment counters for Status
+ self.files_in_batch += len(datas)
+ self.status.inc_value("data", len(datas))
+
+ self.batch_stats["DATA"] += self.files_in_batch - \
+ self.batch_stats["SETXATTR"] - \
+ self.batch_stats["XATTROP"]
+
+ entry_start_time = time.time()
+ # sync namespace
+ if entries and not ignore_entry_ops:
+ # Increment counters for Status
+ self.status.inc_value("entry", len(entries))
+
+ failures = self.slave.server.entry_ops(entries)
+
+ if gconf.get("gfid-conflict-resolution"):
+ count = 0
+ if failures:
+ logging.info(lf('Entry ops failed with gfid mismatch',
+ count=len(failures)))
+ while failures and count < self.MAX_OE_RETRIES:
+ count += 1
+ self.handle_entry_failures(failures, entries)
+ logging.info(lf('Retry original entries', count=count))
+ failures = self.slave.server.entry_ops(entries)
+ if not failures:
+ logging.info("Successfully fixed all entry ops with "
+ "gfid mismatch")
+ break
+
+ self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
+ self.status.dec_value("entry", len(entries))
+
+ # Update Entry stime in Brick Root only in case of Changelog mode
+ if self.name in ["live_changelog", "history_changelog"]:
+ entry_stime_to_update = (int(change_ts) - 1, 0)
+ self.upd_entry_stime(entry_stime_to_update)
+ self.status.set_field("last_synced_entry",
+ entry_stime_to_update[0])
+
+ self.batch_stats["ENTRY_SYNC_TIME"] += time.time() - entry_start_time
+
+ if ignore_entry_ops:
+ # Book keeping, to show in logs the range of Changelogs skipped
+ self.num_skipped_entry_changelogs += 1
+ if self.skipped_entry_changelogs_first is None:
+ self.skipped_entry_changelogs_first = change_ts
+
+ self.skipped_entry_changelogs_last = change_ts
+
+ meta_start_time = time.time()
+ # sync metadata
+ if meta_gfid:
+ meta_entries = []
+ for go in meta_gfid:
+ if len(go) > 1:
+ st = go[1]
+ else:
+ st = lstat(go[0])
+ if isinstance(st, int):
+ logging.debug(lf('file got purged in the interim',
+ file=go[0]))
+ continue
+ meta_entries.append(edct('META', go=go[0], stat=st))
+ if meta_entries:
+ self.status.inc_value("meta", len(meta_entries))
+ failures = self.slave.server.meta_ops(meta_entries)
+ self.log_failures(failures, 'go', '', 'META')
+ self.status.dec_value("meta", len(meta_entries))
+
+ self.batch_stats["META_SYNC_TIME"] += time.time() - meta_start_time
+
+ if self.batch_stats["DATA_START_TIME"] == 0:
+ self.batch_stats["DATA_START_TIME"] = time.time()
+
+ # sync data
+ if datas:
+ self.a_syncdata(datas)
+ self.datas_in_batch.update(datas)
+
+ def process(self, changes, done=1):
+ tries = 0
+ retry = False
+ self.unlinked_gfids = set()
+ self.files_in_batch = 0
+ self.datas_in_batch = set()
+ # Error log disabled till the last round
+ self.syncer.disable_errorlog()
+ self.skipped_entry_changelogs_first = None
+ self.skipped_entry_changelogs_last = None
+ self.num_skipped_entry_changelogs = 0
+ self.batch_start_time = time.time()
+ self.init_fop_batch_stats()
+
+ while True:
+ # first, fire all changelog transfers in parallel. entry and
+ # metadata are performed synchronously, therefore in serial.
+ # However at the end of each changelog, data is synchronized
+ # with syncdata_async() - which means it is serial w.r.t
+ # entries/metadata of that changelog but happens in parallel
+ # with data of other changelogs.
+
+ if retry:
+ if tries == (gconf.get("max-rsync-retries") - 1):
+ # Enable Error logging if it is last retry
+ self.syncer.enable_errorlog()
+
+ # Remove Unlinked GFIDs from Queue
+ for unlinked_gfid in self.unlinked_gfids:
+ if unlinked_gfid in self.datas_in_batch:
+ self.datas_in_batch.remove(unlinked_gfid)
+
+ # Retry only Sync. Do not retry entry ops
+ if self.datas_in_batch:
+ self.a_syncdata(self.datas_in_batch)
+ else:
+ for change in changes:
+ logging.debug(lf('processing change',
+ changelog=change))
+ self.process_change(change, done, retry)
+ if not retry:
+ # number of changelogs processed in the batch
+ self.turns += 1
+
+ # Now we wait for all the data transfers fired off in the above
+ # step to complete. Note that this is not ideal either. Ideally
+ # we want to trigger the entry/meta-data transfer of the next
+ # batch while waiting for the data transfer of the current batch
+ # to finish.
+
+ # Note that the reason to wait for the data transfer (vs doing it
+ # completely in the background and call the changelog_done()
+ # asynchronously) is because this waiting acts as a "backpressure"
+ # and prevents a spiraling increase of wait stubs from consuming
+ # unbounded memory and resources.
+
+ # update the slave's time with the timestamp of the _last_
+ # changelog file time suffix. Since, the changelog prefix time
+ # is the time when the changelog was rolled over, introduce a
+ # tolerance of 1 second to counter the small delta b/w the
+ # marker update and gettimeofday().
+ # NOTE: this is only for changelog mode, not xsync.
+
+ # @change is the last changelog (therefore max time for this batch)
+ if self.syncdata_wait():
+ self.unlinked_gfids = set()
+ if done:
+ xtl = (int(change.split('.')[-1]) - 1, 0)
+ self.upd_stime(xtl)
+ list(map(self.changelog_done_func, changes))
+ self.archive_and_purge_changelogs(changes)
+
+ # Reset Data counter after sync
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
+ self.datas_in_batch = set()
+ break
+
+ # We do not know which changelog transfer failed, retry everything.
+ retry = True
+ tries += 1
+ if tries == gconf.get("max-rsync-retries"):
+ logging.error(lf('changelogs could not be processed '
+ 'completely - moving on...',
+ files=list(map(os.path.basename, changes))))
+
+ # Reset data counter on failure
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
+ self.datas_in_batch = set()
+
+ if done:
+ xtl = (int(change.split('.')[-1]) - 1, 0)
+ self.upd_stime(xtl)
+ list(map(self.changelog_done_func, changes))
+ self.archive_and_purge_changelogs(changes)
+ break
+ # it's either entry_ops() or Rsync that failed to do it's
+ # job. Mostly it's entry_ops() [which currently has a problem
+ # of failing to create an entry but failing to return an errno]
+ # Therefore we do not know if it's either Rsync or the freaking
+ # entry_ops() that failed... so we retry the _whole_ changelog
+ # again.
+ # TODO: remove entry retries when it's gets fixed.
+ logging.warn(lf('incomplete sync, retrying changelogs',
+ files=list(map(os.path.basename, changes))))
+
+ # Reset the Data counter before Retry
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
+ self.init_fop_batch_stats()
+ time.sleep(0.5)
+
+ # Log the Skipped Entry ops range if any
+ if self.skipped_entry_changelogs_first is not None and \
+ self.skipped_entry_changelogs_last is not None:
+ logging.info(lf("Skipping already processed entry ops",
+ from_changelog=self.skipped_entry_changelogs_first,
+ to_changelog=self.skipped_entry_changelogs_last,
+ num_changelogs=self.num_skipped_entry_changelogs))
+
+ # Log Current batch details
+ if changes:
+ logging.info(
+ lf("Entry Time Taken",
+ UNL=self.batch_stats["UNLINK"],
+ RMD=self.batch_stats["RMDIR"],
+ CRE=self.batch_stats["CREATE"],
+ MKN=self.batch_stats["MKNOD"],
+ MKD=self.batch_stats["MKDIR"],
+ REN=self.batch_stats["RENAME"],
+ LIN=self.batch_stats["LINK"],
+ SYM=self.batch_stats["SYMLINK"],
+ duration="%.4f" % self.batch_stats["ENTRY_SYNC_TIME"]))
+
+ logging.info(
+ lf("Data/Metadata Time Taken",
+ SETA=self.batch_stats["SETATTR"],
+ meta_duration="%.4f" % self.batch_stats["META_SYNC_TIME"],
+ SETX=self.batch_stats["SETXATTR"],
+ XATT=self.batch_stats["XATTROP"],
+ DATA=self.batch_stats["DATA"],
+ data_duration="%.4f" % (
+ time.time() - self.batch_stats["DATA_START_TIME"])))
+
+ logging.info(
+ lf("Batch Completed",
+ mode=self.name,
+ duration="%.4f" % (time.time() - self.batch_start_time),
+ changelog_start=changes[0].split(".")[-1],
+ changelog_end=changes[-1].split(".")[-1],
+ num_changelogs=len(changes),
+ stime=self.get_data_stime(),
+ entry_stime=self.get_entry_stime()))
+
+ def upd_entry_stime(self, stime):
+ self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
+ self.uuid,
+ stime)
+
+ def upd_stime(self, stime, path=None):
+ if not path:
+ path = self.FLAT_DIR_HIERARCHY
+ if not stime == URXTIME:
+ self.sendmark(path, stime)
+
+ # Update last_synced_time in status file based on stime
+ # only update stime if stime xattr set to Brick root
+ if path == self.FLAT_DIR_HIERARCHY:
+ chkpt_time = gconf.getr("checkpoint")
+ checkpoint_time = 0
+ if chkpt_time is not None:
+ checkpoint_time = int(chkpt_time)
+
+ self.status.set_last_synced(stime, checkpoint_time)
+
+ def update_worker_remote_node(self):
+ node = rconf.args.resource_remote
+ node_data = node.split("@")
+ node = node_data[-1]
+ remote_node_ip, _ = host_brick_split(node)
+ self.status.set_slave_node(remote_node_ip)
+
+ def changelogs_batch_process(self, changes):
+ changelogs_batches = []
+ current_size = 0
+ for c in changes:
+ si = os.lstat(c).st_size
+ if (si + current_size) > gconf.get("changelog-batch-size"):
+ # Create new batch if single Changelog file greater than
+ # Max Size! or current batch size exceeds Max size
+ changelogs_batches.append([c])
+ current_size = si
+ else:
+ # Append to last batch, if No batches available Create one
+ current_size += si
+ if not changelogs_batches:
+ changelogs_batches.append([c])
+ else:
+ changelogs_batches[-1].append(c)
+
+ for batch in changelogs_batches:
+ logging.debug(lf('processing changes',
+ batch=batch))
+ self.process(batch)
+
+ def crawl(self):
+ self.status.set_worker_crawl_status("Changelog Crawl")
+ changes = []
+ # get stime (from the brick) and purge changelogs
+ # that are _historical_ to that time.
+ data_stime = self.get_data_stime()
+
+ libgfchangelog.scan()
+ self.crawls += 1
+ changes = libgfchangelog.getchanges()
+ if changes:
+ if data_stime:
+ logging.info(lf("slave's time",
+ stime=data_stime))
+ processed = [x for x in changes
+ if int(x.split('.')[-1]) < data_stime[0]]
+ for pr in processed:
+ logging.debug(
+ lf('skipping already processed change',
+ changelog=os.path.basename(pr)))
+ self.changelog_done_func(pr)
+ changes.remove(pr)
+ self.archive_and_purge_changelogs(processed)
+
+ self.changelogs_batch_process(changes)
+
+ def register(self, register_time, status):
+ self.sleep_interval = gconf.get("change-interval")
+ self.changelog_done_func = libgfchangelog.done
+ self.tempdir = self.setup_working_dir()
+ self.processed_changelogs_dir = os.path.join(self.tempdir,
+ ".processed")
+ self.name = "live_changelog"
+ self.status = status
+
+
+class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
+ def register(self, register_time, status):
+ self.changelog_register_time = register_time
+ self.history_crawl_start_time = register_time
+ self.changelog_done_func = libgfchangelog.history_done
+ self.history_turns = 0
+ self.tempdir = self.setup_working_dir()
+ self.processed_changelogs_dir = os.path.join(self.tempdir,
+ ".history/.processed")
+ self.name = "history_changelog"
+ self.status = status
+
+ def crawl(self):
+ self.history_turns += 1
+ self.status.set_worker_crawl_status("History Crawl")
+ data_stime = self.get_data_stime()
+
+ end_time = int(time.time())
+
+ #as start of historical crawl marks Geo-rep worker restart
+ if gconf.get("ignore-deletes"):
+ logging.info(lf('ignore-deletes config option is set',
+ stime=data_stime))
+
+ logging.info(lf('starting history crawl',
+ turns=self.history_turns,
+ stime=data_stime,
+ etime=end_time,
+ entry_stime=self.get_entry_stime()))
+
+ if not data_stime or data_stime == URXTIME:
+ raise NoStimeAvailable()
+
+ # Changelogs backend path is hardcoded as
+ # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
+ # location then consuming history will not work(Known issue as of now)
+ changelog_path = os.path.join(rconf.args.local_path,
+ ".glusterfs/changelogs")
+ ret, actual_end = libgfchangelog.history_changelog(
+ changelog_path,
+ data_stime[0],
+ end_time,
+ gconf.get("sync-jobs"))
+
+ # scan followed by getchanges till scan returns zero.
+ # history_scan() is blocking call, till it gets the number
+ # of changelogs to process. Returns zero when no changelogs
+ # to be processed. returns positive value as number of changelogs
+ # to be processed, which will be fetched using
+ # history_getchanges()
+ while libgfchangelog.history_scan() > 0:
+ self.crawls += 1
+
+ changes = libgfchangelog.history_getchanges()
+ if changes:
+ if data_stime:
+ logging.info(lf("slave's time",
+ stime=data_stime))
+ processed = [x for x in changes
+ if int(x.split('.')[-1]) < data_stime[0]]
+ for pr in processed:
+ logging.debug(lf('skipping already processed change',
+ changelog=os.path.basename(pr)))
+ self.changelog_done_func(pr)
+ changes.remove(pr)
+
+ self.changelogs_batch_process(changes)
+
+ history_turn_time = int(time.time()) - self.history_crawl_start_time
+
+ logging.info(lf('finished history crawl',
+ endtime=actual_end,
+ stime=self.get_data_stime(),
+ entry_stime=self.get_entry_stime()))
+
+ # If TS returned from history_changelog is < register_time
+ # then FS crawl may be required, since history is only available
+ # till TS returned from history_changelog
+ if actual_end < self.changelog_register_time:
+ if self.history_turns < 2:
+ sleep_time = 1
+ if history_turn_time < CHANGELOG_ROLLOVER_TIME:
+ sleep_time = CHANGELOG_ROLLOVER_TIME - history_turn_time
+ time.sleep(sleep_time)
+ self.history_crawl_start_time = int(time.time())
+ self.crawl()
+ else:
+ # This exception will be caught in resource.py and
+ # fallback to xsync for the small gap.
+ raise PartialHistoryAvailable(str(actual_end))
+
+
+class GMasterXsyncMixin(GMasterChangelogMixin):
+
+ """
+ This crawl needs to be xtime based (as of now
+ it's not. this is because we generate CHANGELOG
+ file during each crawl which is then processed
+ by process_change()).
+ For now it's used as a one-shot initial sync
+ mechanism and only syncs directories, regular
+ files, hardlinks and symlinks.
+ """
+
+ XSYNC_MAX_ENTRIES = 1 << 13
+
+ def register(self, register_time=None, status=None):
+ self.status = status
+ self.counter = 0
+ self.comlist = []
+ self.stimes = []
+ self.sleep_interval = 60
+ self.tempdir = self.setup_working_dir()
+ logging.info(lf('Working dir',
+ path=self.tempdir))
+ self.tempdir = os.path.join(self.tempdir, 'xsync')
+ self.processed_changelogs_dir = self.tempdir
+ self.name = "xsync"
+ try:
+ os.makedirs(self.tempdir)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST and os.path.isdir(self.tempdir):
+ pass
+ else:
+ raise
+ # Purge stale unprocessed xsync changelogs
+ for f in os.listdir(self.tempdir):
+ if f.startswith("XSYNC-CHANGELOG"):
+ os.remove(os.path.join(self.tempdir, f))
+
+
+ def crawl(self):
+ """
+ event dispatcher thread
+
+ this thread dispatches either changelog or synchronizes stime.
+ additionally terminates itself on receiving a 'finale' event
+ """
+ def Xsyncer():
+ self.Xcrawl()
+ t = Thread(target=Xsyncer)
+ t.start()
+ logging.info(lf('starting hybrid crawl',
+ stime=self.get_data_stime()))
+ self.status.set_worker_crawl_status("Hybrid Crawl")
+ while True:
+ try:
+ item = self.comlist.pop(0)
+ if item[0] == 'finale':
+ logging.info(lf('finished hybrid crawl',
+ stime=self.get_data_stime()))
+ break
+ elif item[0] == 'xsync':
+ logging.info(lf('processing xsync changelog',
+ path=item[1]))
+ self.process([item[1]], 0)
+ self.archive_and_purge_changelogs([item[1]])
+ elif item[0] == 'stime':
+ logging.debug(lf('setting slave time',
+ time=item[1]))
+ self.upd_stime(item[1][1], item[1][0])
+ else:
+ logging.warn(lf('unknown tuple in comlist',
+ entry=item))
+ except IndexError:
+ time.sleep(1)
+
+ def write_entry_change(self, prefix, data=[]):
+ if not getattr(self, "fh", None):
+ self.open()
+
+ self.fh.write("%s %s\n" % (prefix, ' '.join(data)))
+
+ def open(self):
+ try:
+ self.xsync_change = os.path.join(
+ self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))
+ self.fh = open(self.xsync_change, 'w')
+ except IOError:
+ raise
+
+ def close(self):
+ if getattr(self, "fh", None):
+ self.fh.flush()
+ os.fsync(self.fh.fileno())
+ self.fh.close()
+ self.fh = None
+
+ def fname(self):
+ return self.xsync_change
+
+ def put(self, mark, item):
+ self.comlist.append((mark, item))
+
+ def sync_xsync(self, last):
+ """schedule a processing of changelog"""
+ self.close()
+ if self.counter > 0:
+ self.put('xsync', self.fname())
+ self.counter = 0
+ if not last:
+ time.sleep(1) # make sure changelogs are 1 second apart
+
+ def sync_stime(self, stime=None, last=False):
+ """schedule a stime synchronization"""
+ if stime:
+ self.put('stime', stime)
+ if last:
+ self.put('finale', None)
+
+ def sync_done(self, stime=[], last=False):
+ self.sync_xsync(last)
+ if stime:
+ # Send last as True only for last stime entry
+ for st in stime[:-1]:
+ self.sync_stime(st, False)
+
+ if stime and stime[-1]:
+ self.sync_stime(stime[-1], last)
+
+ def is_sticky(self, path, mo):
+ """check for DHTs linkto sticky bit file"""
+ sticky = False
+ if mo & 0o1000:
+ sticky = self.master.server.linkto_check(path)
+ return sticky
+
+ def Xcrawl(self, path='.', xtr_root=None):
+ """
+ generate a CHANGELOG file consumable by process_change.
+
+ slave's xtime (stime) is _cached_ for comparisons across
+ the filesystem tree, but set after directory synchronization.
+ """
+ if path == '.':
+ self.crawls += 1
+ if not xtr_root:
+ # get the root stime and use it for all comparisons
+ xtr_root = self.xtime('.', self.slave)
+ if isinstance(xtr_root, int):
+ if xtr_root != ENOENT:
+ logging.warn(lf("slave cluster not returning the "
+ "xtime for root",
+ error=xtr_root))
+ xtr_root = self.minus_infinity
+ xtl = self.xtime(path)
+ if isinstance(xtl, int):
+ logging.warn("master cluster's xtime not found")
+ xtr = self.xtime(path, self.slave)
+ if isinstance(xtr, int):
+ if xtr != ENOENT:
+ logging.warn(lf("slave cluster not returning the "
+ "xtime for dir",
+ path=path,
+ error=xtr))
+ xtr = self.minus_infinity
+ xtr = max(xtr, xtr_root)
+ zero_zero = (0, 0)
+ if xtr_root == zero_zero:
+ xtr = self.minus_infinity
+ if not self.need_sync(path, xtl, xtr):
+ if path == '.':
+ self.sync_done([(path, xtl)], True)
+ return
+ self.xtime_reversion_hook(path, xtl, xtr)
+ logging.debug("entering " + path)
+ dem = self.master.server.entries(path)
+ pargfid = self.master.server.gfid(path)
+ if isinstance(pargfid, int):
+ logging.warn(lf('skipping directory',
+ path=path))
+ for e in dem:
+ bname = e
+ e = os.path.join(path, e)
+ xte = self.xtime(e)
+ if isinstance(xte, int):
+ logging.warn(lf("irregular xtime",
+ path=e,
+ error=errno.errorcode[xte]))
+ continue
+ if not self.need_sync(e, xte, xtr):
+ continue
+ st = self.master.server.lstat(e)
+ if isinstance(st, int):
+ logging.warn(lf('got purged in the interim',
+ path=e))
+ continue
+ if self.is_sticky(e, st.st_mode):
+ logging.debug(lf('ignoring sticky bit file',
+ path=e))
+ continue
+ gfid = self.master.server.gfid(e)
+ if isinstance(gfid, int):
+ logging.warn(lf('skipping entry',
+ path=e))
+ continue
+ mo = st.st_mode
+ self.counter += 1 if ((stat.S_ISDIR(mo) or
+ stat.S_ISLNK(mo) or
+ stat.S_ISREG(mo))) else 0
+ if self.counter == self.XSYNC_MAX_ENTRIES:
+ self.sync_done(self.stimes, False)
+ self.stimes = []
+ if stat.S_ISDIR(mo):
+ self.write_entry_change("E",
+ [gfid, 'MKDIR', str(mo),
+ str(0), str(0), escape_space_newline(
+ os.path.join(pargfid, bname))])
+ self.write_entry_change("M", [gfid, "SETATTR", str(st.st_uid),
+ str(st.st_gid), str(st.st_mode),
+ str(st.st_atime),
+ str(st.st_mtime)])
+ self.Xcrawl(e, xtr_root)
+ stime_to_update = xte
+ # Live Changelog Start time indicates that from that time
+ # onwards Live changelogs are available. If we update stime
+ # greater than live_changelog_start time then Geo-rep will
+ # skip those changelogs as already processed. But Xsync
+ # actually failed to sync the deletes and Renames. Update
+ # stime as min(Live_changelogs_time, Actual_stime) When it
+ # switches to Changelog mode, it syncs Deletes and Renames.
+ if self.live_changelog_start_time:
+ stime_to_update = min(self.live_changelog_start_time, xte)
+ self.stimes.append((e, stime_to_update))
+ elif stat.S_ISLNK(mo):
+ self.write_entry_change(
+ "E", [gfid, 'SYMLINK', escape_space_newline(
+ os.path.join(pargfid, bname))])
+ elif stat.S_ISREG(mo):
+ nlink = st.st_nlink
+ nlink -= 1 # fixup backend stat link count
+ # if a file has a hardlink, create a Changelog entry as
+ # 'LINK' so the slave side will decide if to create the
+ # new entry, or to create link.
+ if nlink == 1:
+ self.write_entry_change("E",
+ [gfid, 'MKNOD', str(mo),
+ str(0), str(0),
+ escape_space_newline(
+ os.path.join(
+ pargfid, bname))])
+ else:
+ self.write_entry_change(
+ "E", [gfid, 'LINK', escape_space_newline(
+ os.path.join(pargfid, bname))])
+ self.write_entry_change("D", [gfid])
+ if path == '.':
+ stime_to_update = xtl
+ if self.live_changelog_start_time:
+ stime_to_update = min(self.live_changelog_start_time, xtl)
+ self.stimes.append((path, stime_to_update))
+ self.sync_done(self.stimes, True)
+
+
+class BoxClosedErr(Exception):
+ pass
+
+
+class PostBox(list):
+
+ """synchronized collection for storing things thought of as "requests" """
+
+ def __init__(self, *a):
+ list.__init__(self, *a)
+ # too bad Python stdlib does not have read/write locks...
+ # it would suffivce to grab the lock in .append as reader, in .close as
+ # writer
+ self.lever = Condition()
+ self.open = True
+ self.done = False
+
+ def wait(self):
+ """wait on requests to be processed"""
+ self.lever.acquire()
+ if not self.done:
+ self.lever.wait()
+ self.lever.release()
+ return self.result
+
+ def wakeup(self, data):
+ """wake up requestors with the result"""
+ self.result = data
+ self.lever.acquire()
+ self.done = True
+ self.lever.notifyAll()
+ self.lever.release()
+
+ def append(self, e):
+ """post a request"""
+ self.lever.acquire()
+ if not self.open:
+ raise BoxClosedErr
+ list.append(self, e)
+ self.lever.release()
+
+ def close(self):
+ """prohibit the posting of further requests"""
+ self.lever.acquire()
+ self.open = False
+ self.lever.release()
+
+
+class Syncer(object):
+
+ """a staged queue to relay rsync requests to rsync workers
+
+ By "staged queue" its meant that when a consumer comes to the
+ queue, it takes _all_ entries, leaving the queue empty.
+ (I don't know if there is an official term for this pattern.)
+
+ The queue uses a PostBox to accumulate incoming items.
+ When a consumer (rsync worker) comes, a new PostBox is
+ set up and the old one is passed on to the consumer.
+
+ Instead of the simplistic scheme of having one big lock
+ which synchronizes both the addition of new items and
+ PostBox exchanges, use a separate lock to arbitrate consumers,
+ and rely on PostBox's synchronization mechanisms take
+ care about additions.
+
+ There is a corner case racy situation, producers vs. consumers,
+ which is not handled by this scheme: namely, when the PostBox
+ exchange occurs in between being passed to the producer for posting
+ and the post placement. But that's what Postbox.close is for:
+ such a posting will find the PostBox closed, in which case
+ the producer can re-try posting against the actual PostBox of
+ the queue.
+
+ To aid accumlation of items in the PostBoxen before grabbed
+ by an rsync worker, the worker goes to sleep a bit after
+ each completed syncjob.
+ """
+
+ def __init__(self, slave, sync_engine, resilient_errnos=[]):
+ """spawn worker threads"""
+ self.log_err = False
+ self.slave = slave
+ self.lock = Lock()
+ self.pb = PostBox()
+ self.sync_engine = sync_engine
+ self.errnos_ok = resilient_errnos
+ for i in range(gconf.get("sync-jobs")):
+ t = Thread(target=self.syncjob, args=(i + 1, ))
+ t.start()
+
+ def syncjob(self, job_id):
+ """the life of a worker"""
+ while True:
+ pb = None
+ while True:
+ self.lock.acquire()
+ if self.pb:
+ pb, self.pb = self.pb, PostBox()
+ self.lock.release()
+ if pb:
+ break
+ time.sleep(0.5)
+ pb.close()
+ start = time.time()
+ po = self.sync_engine(pb, self.log_err)
+ logging.info(lf("Sync Time Taken",
+ job=job_id,
+ num_files=len(pb),
+ return_code=po.returncode,
+ duration="%.4f" % (time.time() - start)))
+
+ if po.returncode == 0:
+ ret = (True, 0)
+ elif po.returncode in self.errnos_ok:
+ ret = (False, po.returncode)
+ else:
+ po.errfail()
+ pb.wakeup(ret)
+
+ def add(self, e):
+ while True:
+ pb = self.pb
+ try:
+ pb.append(e)
+ return pb
+ except BoxClosedErr:
+ pass
+
+ def enable_errorlog(self):
+ self.log_err = True
+
+ def disable_errorlog(self):
+ self.log_err = False
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
new file mode 100644
index 00000000000..6aa7b9dfc99
--- /dev/null
+++ b/geo-replication/syncdaemon/monitor.py
@@ -0,0 +1,395 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+import sys
+import time
+import signal
+import logging
+import xml.etree.ElementTree as XET
+from threading import Lock
+from errno import ECHILD, ESRCH
+import random
+
+from resource import SSH
+import gsyncdconfig as gconf
+import libgfchangelog
+from rconf import rconf
+from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile,
+ set_term_handler, GsyncdError,
+ Thread, finalize, Volinfo, VolinfoFromGconf,
+ gf_event, EVENT_GEOREP_FAULTY, get_up_nodes,
+ unshare_propagation_supported)
+from gsyncdstatus import GeorepStatus, set_monitor_status
+import py2py3
+from py2py3 import pipe
+
+ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
+
+
+def get_subvol_num(brick_idx, vol, hot):
+ tier = vol.is_tier()
+ disperse_count = vol.disperse_count(tier, hot)
+ replica_count = vol.replica_count(tier, hot)
+ distribute_count = vol.distribution_count(tier, hot)
+ gconf.setconfig("master-distribution-count", distribute_count)
+
+ if (tier and not hot):
+ brick_idx = brick_idx - vol.get_hot_bricks_count(tier)
+
+ subvol_size = disperse_count if disperse_count > 0 else replica_count
+ cnt = int((brick_idx + 1) / subvol_size)
+ rem = (brick_idx + 1) % subvol_size
+ if rem > 0:
+ cnt = cnt + 1
+
+ if (tier and hot):
+ return "hot_" + str(cnt)
+ elif (tier and not hot):
+ return "cold_" + str(cnt)
+ else:
+ return str(cnt)
+
+
+class Monitor(object):
+
+ """class which spawns and manages gsyncd workers"""
+
+ ST_INIT = 'Initializing...'
+ ST_STARTED = 'Started'
+ ST_STABLE = 'Active'
+ ST_FAULTY = 'Faulty'
+ ST_INCON = 'inconsistent'
+ _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]
+
+ def __init__(self):
+ self.lock = Lock()
+ self.state = {}
+ self.status = {}
+
+ @staticmethod
+ def terminate():
+ # relax one SIGTERM by setting a handler that sets back
+ # standard handler
+ set_term_handler(lambda *a: set_term_handler())
+ # give a chance to graceful exit
+ errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])
+
+ def monitor(self, w, argv, cpids, slave_vol, slave_host, master,
+ suuid, slavenodes):
+ """the monitor loop
+
+ Basic logic is a blantantly simple blunt heuristics:
+ if spawned client survives 60 secs, it's considered OK.
+ This servers us pretty well as it's not vulneralbe to
+ any kind of irregular behavior of the child...
+
+ ... well, except for one: if children is hung up on
+ waiting for some event, it can survive aeons, still
+ will be defunct. So we tweak the above logic to
+ expect the worker to send us a signal within 60 secs
+ (in the form of closing its end of a pipe). The worker
+ does this when it's done with the setup stage
+ ready to enter the service loop (note it's the setup
+ stage which is vulnerable to hangs -- the full
+ blown worker blows up on EPIPE if the net goes down,
+ due to the keep-alive thread)
+ """
+ if not self.status.get(w[0]['dir'], None):
+ self.status[w[0]['dir']] = GeorepStatus(gconf.get("state-file"),
+ w[0]['host'],
+ w[0]['dir'],
+ w[0]['uuid'],
+ master,
+ "%s::%s" % (slave_host,
+ slave_vol))
+ ret = 0
+
+ def nwait(p, o=0):
+ try:
+ p2, r = waitpid(p, o)
+ if not p2:
+ return
+ return r
+ except OSError as e:
+ # no child process, this happens if the child process
+ # already died and has been cleaned up
+ if e.errno == ECHILD:
+ return -1
+ else:
+ raise
+
+ def exit_signalled(s):
+ """ child terminated due to receipt of SIGUSR1 """
+ return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))
+
+ def exit_status(s):
+ if os.WIFEXITED(s):
+ return os.WEXITSTATUS(s)
+ return 1
+
+ conn_timeout = gconf.get("connection-timeout")
+ while ret in (0, 1):
+ remote_user, remote_host = w[1][0].split("@")
+ remote_id = w[1][1]
+ # Check the status of the connected slave node
+ # If the connected slave node is down then try to connect to
+ # different up node.
+ current_slave_host = remote_host
+ slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port"))
+
+ if (current_slave_host, remote_id) not in slave_up_hosts:
+ if len(slave_up_hosts) > 0:
+ remote_new = random.choice(slave_up_hosts)
+ remote_host = "%s@%s" % (remote_user, remote_new[0])
+ remote_id = remote_new[1]
+
+ # Spawn the worker in lock to avoid fd leak
+ self.lock.acquire()
+
+ self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
+ logging.info(lf('starting gsyncd worker',
+ brick=w[0]['dir'],
+ slave_node=remote_host))
+
+ pr, pw = pipe()
+ cpid = os.fork()
+ if cpid == 0:
+ os.close(pr)
+
+ args_to_worker = argv + [
+ 'worker',
+ rconf.args.master,
+ rconf.args.slave,
+ '--feedback-fd', str(pw),
+ '--local-path', w[0]['dir'],
+ '--local-node', w[0]['host'],
+ '--local-node-id', w[0]['uuid'],
+ '--slave-id', suuid,
+ '--subvol-num', str(w[2]),
+ '--resource-remote', remote_host,
+ '--resource-remote-id', remote_id
+ ]
+
+ if rconf.args.config_file is not None:
+ args_to_worker += ['-c', rconf.args.config_file]
+
+ if w[3]:
+ args_to_worker.append("--is-hottier")
+
+ if rconf.args.debug:
+ args_to_worker.append("--debug")
+
+ access_mount = gconf.get("access-mount")
+ if access_mount:
+ os.execv(sys.executable, args_to_worker)
+ else:
+ if unshare_propagation_supported():
+ logging.debug("Worker would mount volume privately")
+ unshare_cmd = ['unshare', '-m', '--propagation',
+ 'private']
+ cmd = unshare_cmd + args_to_worker
+ os.execvp("unshare", cmd)
+ else:
+ logging.debug("Mount is not private. It would be lazy"
+ " umounted")
+ os.execv(sys.executable, args_to_worker)
+
+ cpids.add(cpid)
+ os.close(pw)
+
+ self.lock.release()
+
+ t0 = time.time()
+ so = select((pr,), (), (), conn_timeout)[0]
+ os.close(pr)
+
+ if so:
+ ret = nwait(cpid, os.WNOHANG)
+
+ if ret is not None:
+ logging.info(lf("worker died before establishing "
+ "connection",
+ brick=w[0]['dir']))
+ else:
+ logging.debug("worker(%s) connected" % w[0]['dir'])
+ while time.time() < t0 + conn_timeout:
+ ret = nwait(cpid, os.WNOHANG)
+
+ if ret is not None:
+ logging.info(lf("worker died in startup phase",
+ brick=w[0]['dir']))
+ break
+
+ time.sleep(1)
+ else:
+ logging.info(
+ lf("Worker not confirmed after wait, aborting it. "
+ "Gsyncd invocation on remote slave via SSH or "
+ "gluster master mount might have hung. Please "
+ "check the above logs for exact issue and check "
+ "master or slave volume for errors. Restarting "
+ "master/slave volume accordingly might help.",
+ brick=w[0]['dir'],
+ timeout=conn_timeout))
+ errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
+ ret = nwait(cpid)
+ if ret is None:
+ ret = nwait(cpid)
+ if exit_signalled(ret):
+ ret = 0
+ else:
+ ret = exit_status(ret)
+ if ret in (0, 1):
+ self.status[w[0]['dir']].set_worker_status(self.ST_FAULTY)
+ gf_event(EVENT_GEOREP_FAULTY,
+ master_volume=master.volume,
+ master_node=w[0]['host'],
+ master_node_id=w[0]['uuid'],
+ slave_host=slave_host,
+ slave_volume=slave_vol,
+ current_slave_host=current_slave_host,
+ brick_path=w[0]['dir'])
+ time.sleep(10)
+ self.status[w[0]['dir']].set_worker_status(self.ST_INCON)
+ return ret
+
+ def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes):
+ argv = [os.path.basename(sys.executable), sys.argv[0]]
+
+ cpids = set()
+ ta = []
+ for wx in wspx:
+ def wmon(w):
+ cpid, _ = self.monitor(w, argv, cpids, slave_vol,
+ slave_host, master, suuid, slavenodes)
+ time.sleep(1)
+ self.lock.acquire()
+ for cpid in cpids:
+ errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
+ self.lock.release()
+ finalize(exval=1)
+ t = Thread(target=wmon, args=[wx])
+ t.start()
+ ta.append(t)
+
+ # monitor status was being updated in each monitor thread. It
+ # should not be done as it can cause deadlock for a worker start.
+ # set_monitor_status uses flock to synchronize multple instances
+ # updating the file. Since each monitor thread forks worker,
+ # these processes can hold the reference to fd of status
+ # file causing deadlock to workers which starts later as flock
+ # will not be release until all references to same fd is closed.
+ # It will also cause fd leaks.
+
+ self.lock.acquire()
+ set_monitor_status(gconf.get("state-file"), self.ST_STARTED)
+ self.lock.release()
+ for t in ta:
+ t.join()
+
+
+def distribute(master, slave):
+ if rconf.args.use_gconf_volinfo:
+ mvol = VolinfoFromGconf(master.volume, master=True)
+ else:
+ mvol = Volinfo(master.volume, master.host, master=True)
+ logging.debug('master bricks: ' + repr(mvol.bricks))
+ prelude = []
+ slave_host = None
+ slave_vol = None
+
+ prelude = [gconf.get("ssh-command")] + \
+ gconf.get("ssh-options").split() + \
+ ["-p", str(gconf.get("ssh-port"))] + \
+ [slave.remote_addr]
+
+ logging.debug('slave SSH gateway: ' + slave.remote_addr)
+
+ if rconf.args.use_gconf_volinfo:
+ svol = VolinfoFromGconf(slave.volume, master=False)
+ else:
+ svol = Volinfo(slave.volume, "localhost", prelude, master=False)
+
+ sbricks = svol.bricks
+ suuid = svol.uuid
+ slave_host = slave.remote_addr.split('@')[-1]
+ slave_vol = slave.volume
+
+ # save this xattr for the session delete command
+ old_stime_xattr_prefix = gconf.get("stime-xattr-prefix", None)
+ new_stime_xattr_prefix = "trusted.glusterfs." + mvol.uuid + "." + \
+ svol.uuid
+ if not old_stime_xattr_prefix or \
+ old_stime_xattr_prefix != new_stime_xattr_prefix:
+ gconf.setconfig("stime-xattr-prefix", new_stime_xattr_prefix)
+
+ logging.debug('slave bricks: ' + repr(sbricks))
+
+ slavenodes = set((b['host'], b["uuid"]) for b in sbricks)
+ rap = SSH.parse_ssh_address(slave)
+ slaves = [(rap['user'] + '@' + h[0], h[1]) for h in slavenodes]
+
+ workerspex = []
+ for idx, brick in enumerate(mvol.bricks):
+ if rconf.args.local_node_id == brick['uuid']:
+ is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']]))
+ workerspex.append((brick,
+ slaves[idx % len(slaves)],
+ get_subvol_num(idx, mvol, is_hot),
+ is_hot))
+ logging.debug('worker specs: ' + repr(workerspex))
+ return workerspex, suuid, slave_vol, slave_host, master, slavenodes
+
+
+def monitor(local, remote):
+ # Check if gsyncd restarted in pause state. If
+ # yes, send SIGSTOP to negative of monitor pid
+ # to go back to pause state.
+ if rconf.args.pause_on_start:
+ errno_wrap(os.kill, [-os.getpid(), signal.SIGSTOP], [ESRCH])
+
+ """oh yeah, actually Monitor is used as singleton, too"""
+ return Monitor().multiplex(*distribute(local, remote))
+
+
+def startup(go_daemon=True):
+ """set up logging, pidfile grabbing, daemonization"""
+ pid_file = gconf.get("pid-file")
+ if not grabpidfile():
+ sys.stderr.write("pidfile is taken, exiting.\n")
+ sys.exit(2)
+ rconf.pid_file_owned = True
+
+ if not go_daemon:
+ return
+
+ x, y = pipe()
+ cpid = os.fork()
+ if cpid:
+ os.close(x)
+ sys.exit()
+ os.close(y)
+ os.setsid()
+ dn = os.open(os.devnull, os.O_RDWR)
+ for f in (sys.stdin, sys.stdout, sys.stderr):
+ os.dup2(dn, f.fileno())
+
+ if not grabpidfile(pid_file + '.tmp'):
+ raise GsyncdError("cannot grab temporary pidfile")
+
+ os.rename(pid_file + '.tmp', pid_file)
+
+ # wait for parent to terminate
+ # so we can start up with
+ # no messing from the dirty
+ # ol' bustard
+ select((x,), (), ())
+ os.close(x)
diff --git a/geo-replication/syncdaemon/py2py3.py b/geo-replication/syncdaemon/py2py3.py
new file mode 100644
index 00000000000..f9c76e1b50a
--- /dev/null
+++ b/geo-replication/syncdaemon/py2py3.py
@@ -0,0 +1,184 @@
+#
+# Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+# All python2/python3 compatibility routines
+
+import sys
+import os
+import stat
+import struct
+from syncdutils import umask
+from ctypes import create_string_buffer
+
+if sys.version_info >= (3,):
+ def pipe():
+ (r, w) = os.pipe()
+ os.set_inheritable(r, True)
+ os.set_inheritable(w, True)
+ return (r, w)
+
+ # Raw conversion of bytearray to string. Used in the cases where
+ # buffer is created by create_string_buffer which is a 8-bit char
+ # array and passed to syscalls to fetch results. Using encode/decode
+ # doesn't work as it converts to string altering the size.
+ def bytearray_to_str(byte_arr):
+ return ''.join([chr(b) for b in byte_arr])
+
+ # Raw conversion of string to bytes. This is required to convert
+ # back the string into bytearray(c char array) to use in struc
+ # pack/unpacking. Again encode/decode can't be used as it
+ # converts it alters size.
+ def str_to_bytearray(string):
+ return bytes([ord(c) for c in string])
+
+ def gr_create_string_buffer(size):
+ return create_string_buffer(b'\0', size)
+
+ def gr_query_xattr(cls, path, size, syscall, attr=None):
+ if attr:
+ return cls._query_xattr(path.encode(), size, syscall,
+ attr.encode())
+ else:
+ return cls._query_xattr(path.encode(), size, syscall)
+
+ def gr_lsetxattr(cls, path, attr, val):
+ return cls.libc.lsetxattr(path.encode(), attr.encode(), val,
+ len(val), 0)
+
+ def gr_lremovexattr(cls, path, attr):
+ return cls.libc.lremovexattr(path.encode(), attr.encode())
+
+ def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries):
+ return libgfapi.gf_changelog_register(brick.encode(),
+ path.encode(),
+ log_file.encode(),
+ log_level, retries)
+
+ def gr_cl_done(libgfapi, clfile):
+ return libgfapi.gf_changelog_done(clfile.encode())
+
+ def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel,
+ actual_end):
+ return libgfapi.gf_history_changelog(changelog_path.encode(),
+ start, end, num_parallel,
+ actual_end)
+
+ def gr_cl_history_done(libgfapi, clfile):
+ return libgfapi.gf_history_changelog_done(clfile.encode())
+
+ # regular file
+
+ def entry_pack_reg(cls, gf, bn, mo, uid, gid):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), 0, umask())
+
+ def entry_pack_reg_stat(cls, gf, bn, st):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mknod(blen),
+ st['uid'], st['gid'],
+ gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), 0, umask())
+ # mkdir
+
+ def entry_pack_mkdir(cls, gf, bn, mo, uid, gid):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ return struct.pack(cls._fmt_mkdir(blen),
+ uid, gid, gf.encode(), mo, bn_encoded,
+ stat.S_IMODE(mo), umask())
+ # symlink
+
+ def entry_pack_symlink(cls, gf, bn, lnk, st):
+ bn_encoded = bn.encode()
+ blen = len(bn_encoded)
+ lnk_encoded = lnk.encode()
+ llen = len(lnk_encoded)
+ return struct.pack(cls._fmt_symlink(blen, llen),
+ st['uid'], st['gid'],
+ gf.encode(), st['mode'], bn_encoded,
+ lnk_encoded)
+else:
+ def pipe():
+ (r, w) = os.pipe()
+ return (r, w)
+
+ # Raw conversion of bytearray to string
+ def bytearray_to_str(byte_arr):
+ return byte_arr
+
+ # Raw conversion of string to bytearray
+ def str_to_bytearray(string):
+ return string
+
+ def gr_create_string_buffer(size):
+ return create_string_buffer('\0', size)
+
+ def gr_query_xattr(cls, path, size, syscall, attr=None):
+ if attr:
+ return cls._query_xattr(path, size, syscall, attr)
+ else:
+ return cls._query_xattr(path, size, syscall)
+
+ def gr_lsetxattr(cls, path, attr, val):
+ return cls.libc.lsetxattr(path, attr, val, len(val), 0)
+
+ def gr_lremovexattr(cls, path, attr):
+ return cls.libc.lremovexattr(path, attr)
+
+ def gr_cl_register(libgfapi, brick, path, log_file, log_level, retries):
+ return libgfapi.gf_changelog_register(brick, path, log_file,
+ log_level, retries)
+
+ def gr_cl_done(libgfapi, clfile):
+ return libgfapi.gf_changelog_done(clfile)
+
+ def gr_cl_history_changelog(libgfapi, changelog_path, start, end, num_parallel,
+ actual_end):
+ return libgfapi.gf_history_changelog(changelog_path, start, end,
+ num_parallel, actual_end)
+
+ def gr_cl_history_done(libgfapi, clfile):
+ return libgfapi.gf_history_changelog_done(clfile)
+
+ # regular file
+
+ def entry_pack_reg(cls, gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mknod(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+
+ def entry_pack_reg_stat(cls, gf, bn, st):
+ blen = len(bn)
+ mo = st['mode']
+ return struct.pack(cls._fmt_mknod(blen),
+ st['uid'], st['gid'],
+ gf, mo, bn,
+ stat.S_IMODE(mo), 0, umask())
+ # mkdir
+
+ def entry_pack_mkdir(cls, gf, bn, mo, uid, gid):
+ blen = len(bn)
+ return struct.pack(cls._fmt_mkdir(blen),
+ uid, gid, gf, mo, bn,
+ stat.S_IMODE(mo), umask())
+ # symlink
+
+ def entry_pack_symlink(cls, gf, bn, lnk, st):
+ blen = len(bn)
+ llen = len(lnk)
+ return struct.pack(cls._fmt_symlink(blen, llen),
+ st['uid'], st['gid'],
+ gf, st['mode'], bn, lnk)
diff --git a/geo-replication/syncdaemon/rconf.py b/geo-replication/syncdaemon/rconf.py
new file mode 100644
index 00000000000..ff716ee4d6d
--- /dev/null
+++ b/geo-replication/syncdaemon/rconf.py
@@ -0,0 +1,31 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+
+class RConf(object):
+
+ """singleton class to store runtime globals
+ shared between gsyncd modules"""
+
+ ssh_ctl_dir = None
+ ssh_ctl_args = None
+ cpid = None
+ pid_file_owned = False
+ log_exit = False
+ permanent_handles = []
+ log_metadata = {}
+ mgmt_lock_fd = None
+ args = None
+ turns = 0
+ mountbroker = False
+ mount_point = None
+ mbr_umount_cmd = []
+
+rconf = RConf()
diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py
new file mode 100644
index 00000000000..c622afa6373
--- /dev/null
+++ b/geo-replication/syncdaemon/repce.py
@@ -0,0 +1,253 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+import sys
+import time
+import logging
+from threading import Condition
+try:
+ import _thread as thread
+except ImportError:
+ import thread
+try:
+ from queue import Queue
+except ImportError:
+ from Queue import Queue
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
+
+from syncdutils import Thread, select, lf
+
+pickle_proto = 2
+repce_version = 1.0
+
+
+def ioparse(i, o):
+ if isinstance(i, int):
+ i = os.fdopen(i, 'rb')
+ # rely on duck typing for recognizing
+ # streams as that works uniformly
+ # in py2 and py3
+ if hasattr(o, 'fileno'):
+ o = o.fileno()
+ return (i, o)
+
+
+def send(out, *args):
+ """pickle args and write out wholly in one syscall
+
+ ie. not use the ability of pickle to dump directly to
+ a stream, as that would potentially mess up messages
+ by interleaving them
+ """
+ os.write(out, pickle.dumps(args, pickle_proto))
+
+
+def recv(inf):
+ """load an object from input stream
+ python2 and python3 compatibility, inf is sys.stdin
+ and is opened as text stream by default. Hence using the
+ buffer attribute in python3
+ """
+ if hasattr(inf, "buffer"):
+ return pickle.load(inf.buffer)
+ else:
+ return pickle.load(inf)
+
+
+class RepceServer(object):
+
+ """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
+
+ ... also our homebrewed RPC backend where the transport layer is
+ reduced to a pair of filehandles.
+
+ This is the server component.
+ """
+
+ def __init__(self, obj, i, o, wnum=6):
+ """register a backend object .obj to which incoming messages
+ are dispatched, also incoming/outcoming streams
+ """
+ self.obj = obj
+ self.inf, self.out = ioparse(i, o)
+ self.wnum = wnum
+ self.q = Queue()
+
+ def service_loop(self):
+ """fire up worker threads, get messages and dispatch among them"""
+ for i in range(self.wnum):
+ t = Thread(target=self.worker)
+ t.start()
+ try:
+ while True:
+ self.q.put(recv(self.inf))
+ except EOFError:
+ logging.info("terminating on reaching EOF.")
+
+ def worker(self):
+ """life of a worker
+
+ Get message, extract its id, method name and arguments
+ (kwargs not supported), call method on .obj.
+ Send back message id + return value.
+ If method call throws an exception, rescue it, and send
+ back the exception as result (with flag marking it as
+ exception).
+ """
+ while True:
+ in_data = self.q.get(True)
+ rid = in_data[0]
+ rmeth = in_data[1]
+ exc = False
+ if rmeth == '__repce_version__':
+ res = repce_version
+ else:
+ try:
+ res = getattr(self.obj, rmeth)(*in_data[2:])
+ except:
+ res = sys.exc_info()[1]
+ exc = True
+ logging.exception("call failed: ")
+ send(self.out, rid, exc, res)
+
+
+class RepceJob(object):
+
+ """class representing message status we can use
+ for waiting on reply"""
+
+ def __init__(self, cbk):
+ """
+ - .rid: (process-wise) unique id
+ - .cbk: what we do upon receiving reply
+ """
+ self.rid = (os.getpid(), thread.get_ident(), time.time())
+ self.cbk = cbk
+ self.lever = Condition()
+ self.done = False
+
+ def __repr__(self):
+ return ':'.join([str(x) for x in self.rid])
+
+ def wait(self):
+ self.lever.acquire()
+ if not self.done:
+ self.lever.wait()
+ self.lever.release()
+ return self.result
+
+ def wakeup(self, data):
+ self.result = data
+ self.lever.acquire()
+ self.done = True
+ self.lever.notify()
+ self.lever.release()
+
+
+class RepceClient(object):
+
+ """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
+
+ ... also our homebrewed RPC backend where the transport layer is
+ reduced to a pair of filehandles.
+
+ This is the client component.
+ """
+
+ def __init__(self, i, o):
+ self.inf, self.out = ioparse(i, o)
+ self.jtab = {}
+ t = Thread(target=self.listen)
+ t.start()
+
+ def listen(self):
+ while True:
+ select((self.inf,), (), ())
+ rid, exc, res = recv(self.inf)
+ rjob = self.jtab.pop(rid)
+ if rjob.cbk:
+ rjob.cbk(rjob, [exc, res])
+
+ def push(self, meth, *args, **kw):
+ """wrap arguments in a RepceJob, send them to server
+ and return the RepceJob
+
+ @cbk to pass on RepceJob can be given as kwarg.
+ """
+ cbk = kw.get('cbk')
+ if not cbk:
+ def cbk(rj, res):
+ if res[0]:
+ raise res[1]
+ rjob = RepceJob(cbk)
+ self.jtab[rjob.rid] = rjob
+ logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args)))
+ send(self.out, rjob.rid, meth, *args)
+ return rjob
+
+ def __call__(self, meth, *args):
+ """RePCe client is callabe, calling it implements a synchronous
+ remote call.
+
+ We do a .push with a cbk which does a wakeup upon receiving answer,
+ then wait on the RepceJob.
+ """
+ rjob = self.push(
+ meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
+ exc, res = rjob.wait()
+ if exc:
+ logging.error(lf('call failed',
+ call=repr(rjob),
+ method=meth,
+ error=str(type(res).__name__)))
+ raise res
+ logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
+ return res
+
+ class mprx(object):
+
+ """method proxy, standard trick to implement rubyesque
+ method_missing in Python
+
+ A class is a closure factory, you know what I mean, or go read
+ some SICP.
+ """
+
+ def __init__(self, ins, meth):
+ self.ins = ins
+ self.meth = meth
+
+ def __call__(self, *a):
+ return self.ins(self.meth, *a)
+
+ def __getattr__(self, meth):
+ """this implements transparent method dispatch to remote object,
+ so that you don't need to call the RepceClient instance like
+
+ rclient('how_old_are_you_if_born_in', 1979)
+
+ but you can make it into an ordinary method call like
+
+ rclient.how_old_are_you_if_born_in(1979)
+ """
+ return self.mprx(self, meth)
+
+ def __version__(self):
+ """used in handshake to verify compatibility"""
+ d = {'proto': self('__repce_version__')}
+ try:
+ d['object'] = self('version')
+ except AttributeError:
+ pass
+ return d
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
new file mode 100644
index 00000000000..f12c7ceaa36
--- /dev/null
+++ b/geo-replication/syncdaemon/resource.py
@@ -0,0 +1,1583 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import re
+import os
+import sys
+import stat
+import time
+import fcntl
+import types
+import struct
+import logging
+import tempfile
+import subprocess
+from errno import (EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES,
+ EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM)
+import errno
+
+from rconf import rconf
+import gsyncdconfig as gconf
+import libgfchangelog
+
+import repce
+from repce import RepceServer, RepceClient
+from master import gmaster_builder
+import syncdutils
+from syncdutils import (GsyncdError, select, privileged, funcode,
+ entry2pb, gauxpfx, errno_wrap, lstat,
+ NoStimeAvailable, PartialHistoryAvailable,
+ ChangelogException, ChangelogHistoryNotAvailable,
+ get_changelog_log_level, get_rsync_version,
+ GX_GFID_CANONICAL_LEN,
+ gf_mount_ready, lf, Popen, sup,
+ Xattr, matching_disk_gfid, get_gfid_from_mnt,
+ unshare_propagation_supported, get_slv_dir_path)
+from gsyncdstatus import GeorepStatus
+from py2py3 import (pipe, str_to_bytearray, entry_pack_reg,
+ entry_pack_reg_stat, entry_pack_mkdir,
+ entry_pack_symlink)
+
+
+ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
+
+slv_volume = None
+slv_host = None
+
+
+class Server(object):
+
+ """singleton implemening those filesystem access primitives
+ which are needed for geo-replication functionality
+
+ (Singleton in the sense it's a class which has only static
+ and classmethods and is used directly, without instantiation.)
+ """
+
+ GX_NSPACE_PFX = (privileged() and "trusted" or "system")
+ GX_NSPACE = GX_NSPACE_PFX + ".glusterfs"
+ NTV_FMTSTR = "!" + "B" * 19 + "II"
+ FRGN_XTRA_FMT = "I"
+ FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
+
+ # for backend gfid fetch, do not use GX_NSPACE_PFX
+ GFID_XATTR = 'trusted.gfid'
+ GFID_FMTSTR = "!" + "B" * 16
+
+ local_path = ''
+
+ @classmethod
+ def _fmt_mknod(cls, l):
+ return "!II%dsI%dsIII" % (GX_GFID_CANONICAL_LEN, l + 1)
+
+ @classmethod
+ def _fmt_mkdir(cls, l):
+ return "!II%dsI%dsII" % (GX_GFID_CANONICAL_LEN, l + 1)
+
+ @classmethod
+ def _fmt_symlink(cls, l1, l2):
+ return "!II%dsI%ds%ds" % (GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1)
+
+ def _pathguard(f):
+ """decorator method that checks
+ the path argument of the decorated
+ functions to make sure it does not
+ point out of the managed tree
+ """
+
+ fc = funcode(f)
+ pi = list(fc.co_varnames).index('path')
+
+ def ff(*args):
+ path = args[pi]
+ ps = path.split('/')
+ if path[0] == '/' or '..' in ps:
+ raise ValueError('unsafe path')
+ args = list(args)
+ args[pi] = os.path.join(args[0].local_path, path)
+ return f(*args)
+ return ff
+
+ @classmethod
+ @_pathguard
+ def entries(cls, path):
+ """directory entries in an array"""
+ # prevent symlinks being followed
+ if not stat.S_ISDIR(os.lstat(path).st_mode):
+ raise OSError(ENOTDIR, os.strerror(ENOTDIR))
+ return os.listdir(path)
+
+ @classmethod
+ @_pathguard
+ def lstat(cls, path):
+ try:
+ return os.lstat(path)
+ except (IOError, OSError):
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ @_pathguard
+ def linkto_check(cls, path):
+ try:
+ return not (
+ Xattr.lgetxattr_buf(path,
+ 'trusted.glusterfs.dht.linkto') == '')
+ except (IOError, OSError):
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA):
+ return False
+ else:
+ raise
+
+ @classmethod
+ @_pathguard
+ def gfid(cls, path):
+ buf = errno_wrap(Xattr.lgetxattr, [path, cls.GFID_XATTR, 16],
+ [ENOENT], [ESTALE, ENODATA])
+ if buf == ENOENT:
+ return buf
+ else:
+ buf = str_to_bytearray(buf)
+ m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(
+ ['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)]))
+ return '-'.join(m.groups())
+
+ @classmethod
+ @_pathguard
+ def purge(cls, path, entries=None):
+ """force-delete subtrees
+
+ If @entries is not specified, delete
+ the whole subtree under @path (including
+ @path).
+
+ Otherwise, @entries should be a
+ a sequence of children of @path, and
+ the effect is identical with a joint
+ @entries-less purge on them, ie.
+
+ for e in entries:
+ cls.purge(os.path.join(path, e))
+ """
+ me_also = entries is None
+ if not entries:
+ try:
+ # if it's a symlink, prevent
+ # following it
+ try:
+ os.unlink(path)
+ return
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EISDIR:
+ entries = os.listdir(path)
+ else:
+ raise
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOTDIR, ENOENT, ELOOP):
+ try:
+ os.unlink(path)
+ return
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ return
+ raise
+ else:
+ raise
+ for e in entries:
+ cls.purge(os.path.join(path, e))
+ if me_also:
+ os.rmdir(path)
+
+ @classmethod
+ @_pathguard
+ def _create(cls, path, ctor):
+ """path creation backend routine"""
+ try:
+ ctor(path)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST:
+ cls.purge(path)
+ return ctor(path)
+ raise
+
+ @classmethod
+ @_pathguard
+ def mkdir(cls, path):
+ cls._create(path, os.mkdir)
+
+ @classmethod
+ @_pathguard
+ def symlink(cls, lnk, path):
+ cls._create(path, lambda p: os.symlink(lnk, p))
+
+ @classmethod
+ @_pathguard
+ def xtime(cls, path, uuid):
+ """query xtime extended attribute
+
+ Return xtime of @path for @uuid as a pair of integers.
+ "Normal" errors due to non-existent @path or extended attribute
+ are tolerated and errno is returned in such a case.
+ """
+
+ try:
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid, 'xtime']),
+ 8)
+ val = str_to_bytearray(val)
+ return struct.unpack('!II', val)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ @_pathguard
+ def stime_mnt(cls, path, uuid):
+ """query xtime extended attribute
+
+ Return xtime of @path for @uuid as a pair of integers.
+ "Normal" errors due to non-existent @path or extended attribute
+ are tolerated and errno is returned in such a case.
+ """
+
+ try:
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid, 'stime']),
+ 8)
+ val = str_to_bytearray(val)
+ return struct.unpack('!II', val)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ @_pathguard
+ def stime(cls, path, uuid):
+ """query xtime extended attribute
+
+ Return xtime of @path for @uuid as a pair of integers.
+ "Normal" errors due to non-existent @path or extended attribute
+ are tolerated and errno is returned in such a case.
+ """
+
+ try:
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid, 'stime']),
+ 8)
+ val = str_to_bytearray(val)
+ return struct.unpack('!II', val)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ @_pathguard
+ def entry_stime(cls, path, uuid):
+ """
+ entry_stime xattr to reduce the number of retry of Entry changes when
+ Geo-rep worker crashes and restarts. entry_stime is updated after
+ processing every changelog file. On failure and restart, worker only
+ have to reprocess the last changelog for Entry ops.
+ Xattr Key: <PFX>.<MASTERVOL_UUID>.<SLAVEVOL_UUID>.entry_stime
+ """
+ try:
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid,
+ 'entry_stime']),
+ 8)
+ val = str_to_bytearray(val)
+ return struct.unpack('!II', val)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ def node_uuid(cls, path='.'):
+ try:
+ uuid_l = Xattr.lgetxattr_buf(
+ path, '.'.join([cls.GX_NSPACE, 'node-uuid']))
+ return uuid_l[:-1].split(' ')
+ except OSError:
+ raise
+
+ @classmethod
+ @_pathguard
+ def set_stime(cls, path, uuid, mark):
+ """set @mark as stime for @uuid on @path"""
+ errno_wrap(Xattr.lsetxattr,
+ [path,
+ '.'.join([cls.GX_NSPACE, uuid, 'stime']),
+ struct.pack('!II', *mark)],
+ [ENOENT],
+ [ESTALE, EINVAL])
+
+ @classmethod
+ @_pathguard
+ def set_entry_stime(cls, path, uuid, mark):
+ """set @mark as stime for @uuid on @path"""
+ errno_wrap(Xattr.lsetxattr,
+ [path,
+ '.'.join([cls.GX_NSPACE, uuid, 'entry_stime']),
+ struct.pack('!II', *mark)],
+ [ENOENT],
+ [ESTALE, EINVAL])
+
+ @classmethod
+ @_pathguard
+ def set_xtime(cls, path, uuid, mark):
+ """set @mark as xtime for @uuid on @path"""
+ errno_wrap(Xattr.lsetxattr,
+ [path,
+ '.'.join([cls.GX_NSPACE, uuid, 'xtime']),
+ struct.pack('!II', *mark)],
+ [ENOENT],
+ [ESTALE, EINVAL])
+
+ @classmethod
+ @_pathguard
+ def set_xtime_remote(cls, path, uuid, mark):
+ """
+ set @mark as xtime for @uuid on @path
+ the difference b/w this and set_xtime() being
+ set_xtime() being overloaded to set the xtime
+ on the brick (this method sets xtime on the
+ remote slave)
+ """
+ Xattr.lsetxattr(
+ path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']),
+ struct.pack('!II', *mark))
+
+ @classmethod
+ def entry_ops(cls, entries):
+ pfx = gauxpfx()
+ logging.debug('entries: %s' % repr(entries))
+ dist_count = rconf.args.master_dist_count
+
+ def entry_purge(op, entry, gfid, e, uid, gid):
+ # This is an extremely racy code and needs to be fixed ASAP.
+ # The GFID check here is to be sure that the pargfid/bname
+ # to be purged is the GFID gotten from the changelog.
+ # (a stat(changelog_gfid) would also be valid here)
+ # The race here is between the GFID check and the purge.
+
+ # If the entry or the gfid of the file to be deleted is not present
+ # on slave, we can ignore the unlink/rmdir
+ if isinstance(lstat(entry), int) or \
+ isinstance(lstat(os.path.join(pfx, gfid)), int):
+ return
+
+ if not matching_disk_gfid(gfid, entry):
+ collect_failure(e, EEXIST, uid, gid)
+ return
+
+ if op == 'UNLINK':
+ er = errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], [EBUSY])
+ # EISDIR is safe error, ignore. This can only happen when
+ # unlink is sent from master while fixing gfid conflicts.
+ if er != EISDIR:
+ return er
+
+ elif op == 'RMDIR':
+ er = errno_wrap(os.rmdir, [entry], [ENOENT, ESTALE,
+ ENOTEMPTY], [EBUSY])
+ if er == ENOTEMPTY:
+ return er
+
+ def collect_failure(e, cmd_ret, uid, gid, dst=False):
+ slv_entry_info = {}
+ slv_entry_info['gfid_mismatch'] = False
+ slv_entry_info['name_mismatch'] = False
+ slv_entry_info['dst'] = dst
+ slv_entry_info['slave_isdir'] = False
+ slv_entry_info['slave_name'] = None
+ slv_entry_info['slave_gfid'] = None
+ # We do this for failing fops on Slave
+ # Master should be logging this
+ if cmd_ret is None:
+ return False
+
+ if e.get("stat", {}):
+ # Copy actual UID/GID value back to entry stat
+ e['stat']['uid'] = uid
+ e['stat']['gid'] = gid
+
+ if cmd_ret in [EEXIST, ESTALE]:
+ if dst:
+ en = e['entry1']
+ else:
+ en = e['entry']
+ disk_gfid = get_gfid_from_mnt(en)
+ if isinstance(disk_gfid, str) and \
+ e['gfid'] != disk_gfid:
+ slv_entry_info['gfid_mismatch'] = True
+ st = lstat(en)
+ if not isinstance(st, int):
+ if st and stat.S_ISDIR(st.st_mode):
+ slv_entry_info['slave_isdir'] = True
+ dir_name = get_slv_dir_path(slv_host, slv_volume,
+ disk_gfid)
+ slv_entry_info['slave_name'] = dir_name
+ else:
+ slv_entry_info['slave_isdir'] = False
+ slv_entry_info['slave_gfid'] = disk_gfid
+ failures.append((e, cmd_ret, slv_entry_info))
+ else:
+ return False
+ else:
+ failures.append((e, cmd_ret, slv_entry_info))
+
+ return True
+
+ failures = []
+
+ def recursive_rmdir(gfid, entry, path):
+ """disk_gfid check added for original path for which
+ recursive_delete is called. This disk gfid check executed
+ before every Unlink/Rmdir. If disk gfid is not matching
+ with GFID from Changelog, that means other worker
+ deleted the directory. Even if the subdir/file present,
+ it belongs to different parent. Exit without performing
+ further deletes.
+ """
+ if not matching_disk_gfid(gfid, entry):
+ return
+
+ names = []
+ names = errno_wrap(os.listdir, [path], [ENOENT], [ESTALE, ENOTSUP])
+ if isinstance(names, int):
+ return
+
+ for name in names:
+ fullname = os.path.join(path, name)
+ if not matching_disk_gfid(gfid, entry):
+ return
+ er = errno_wrap(os.remove, [fullname], [ENOENT, ESTALE,
+ EISDIR], [EBUSY])
+
+ if er == EISDIR:
+ recursive_rmdir(gfid, entry, fullname)
+
+ if not matching_disk_gfid(gfid, entry):
+ return
+
+ errno_wrap(os.rmdir, [path], [ENOENT, ESTALE], [EBUSY])
+
+ def rename_with_disk_gfid_confirmation(gfid, entry, en, uid, gid):
+ if not matching_disk_gfid(gfid, entry):
+ logging.error(lf("RENAME ignored: source entry does not match "
+ "with on-disk gfid",
+ source=entry,
+ gfid=gfid,
+ disk_gfid=get_gfid_from_mnt(entry),
+ target=en))
+ collect_failure(e, EEXIST, uid, gid)
+ return
+
+ cmd_ret = errno_wrap(os.rename,
+ [entry, en],
+ [ENOENT, EEXIST], [ESTALE, EBUSY])
+ collect_failure(e, cmd_ret, uid, gid)
+
+ for e in entries:
+ blob = None
+ op = e['op']
+ gfid = e['gfid']
+ entry = e['entry']
+ uid = 0
+ gid = 0
+
+ # Skip entry processing if it's marked true during gfid
+ # conflict resolution
+ if e['skip_entry']:
+ continue
+
+ if e.get("stat", {}):
+ # Copy UID/GID value and then reset to zero. Copied UID/GID
+ # will be used to run chown once entry is created.
+ uid = e['stat']['uid']
+ gid = e['stat']['gid']
+ e['stat']['uid'] = 0
+ e['stat']['gid'] = 0
+
+ (pg, bname) = entry2pb(entry)
+ if op in ['RMDIR', 'UNLINK']:
+ # Try once, if rmdir failed with ENOTEMPTY
+ # then delete recursively.
+ er = entry_purge(op, entry, gfid, e, uid, gid)
+ if isinstance(er, int):
+ if er == ENOTEMPTY and op == 'RMDIR':
+ # Retry if ENOTEMPTY, ESTALE
+ er1 = errno_wrap(recursive_rmdir,
+ [gfid, entry,
+ os.path.join(pg, bname)],
+ [], [ENOTEMPTY, ESTALE, ENODATA])
+ if not isinstance(er1, int):
+ logging.debug("Removed %s => %s/%s recursively" %
+ (gfid, pg, bname))
+ else:
+ logging.warn(lf("Recursive remove failed",
+ gfid=gfid,
+ pgfid=pg,
+ bname=bname,
+ error=os.strerror(er1)))
+ else:
+ logging.warn(lf("Failed to remove",
+ gfid=gfid,
+ pgfid=pg,
+ bname=bname,
+ error=os.strerror(er)))
+ elif op in ['CREATE', 'MKNOD']:
+ slink = os.path.join(pfx, gfid)
+ st = lstat(slink)
+ # don't create multiple entries with same gfid
+ if isinstance(st, int):
+ blob = entry_pack_reg(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
+ # Self healed hardlinks are recorded as MKNOD.
+ # So if the gfid already exists, it should be
+ # processed as hard link not mknod.
+ elif op in ['MKNOD']:
+ cmd_ret = errno_wrap(os.link,
+ [slink, entry],
+ [ENOENT, EEXIST], [ESTALE])
+ collect_failure(e, cmd_ret, uid, gid)
+ elif op == 'MKDIR':
+ en = e['entry']
+ slink = os.path.join(pfx, gfid)
+ st = lstat(slink)
+ # don't create multiple entries with same gfid
+ if isinstance(st, int):
+ blob = entry_pack_mkdir(cls, gfid, bname,
+ e['mode'], e['uid'], e['gid'])
+ elif (isinstance(lstat(en), int) or
+ not matching_disk_gfid(gfid, en)):
+ # If gfid of a directory exists on slave but path based
+ # create is getting EEXIST. This means the directory is
+ # renamed in master but recorded as MKDIR during hybrid
+ # crawl. Get the directory path by reading the backend
+ # symlink and trying to rename to new name as said by
+ # master.
+ logging.info(lf("Special case: rename on mkdir",
+ gfid=gfid, entry=repr(entry)))
+ src_entry = get_slv_dir_path(slv_host, slv_volume, gfid)
+ if src_entry is None:
+ collect_failure(e, ENOENT, uid, gid)
+ if src_entry is not None and src_entry != entry:
+ slv_entry_info = {}
+ slv_entry_info['gfid_mismatch'] = False
+ slv_entry_info['name_mismatch'] = True
+ slv_entry_info['dst'] = False
+ slv_entry_info['slave_isdir'] = True
+ slv_entry_info['slave_gfid'] = gfid
+ slv_entry_info['slave_entry'] = src_entry
+
+ failures.append((e, EEXIST, slv_entry_info))
+ elif op == 'LINK':
+ slink = os.path.join(pfx, gfid)
+ st = lstat(slink)
+ if isinstance(st, int):
+ (pg, bname) = entry2pb(entry)
+ if stat.S_ISREG(e['stat']['mode']):
+ blob = entry_pack_reg_stat(cls, gfid, bname, e['stat'])
+ elif stat.S_ISLNK(e['stat']['mode']):
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
+ e['stat'])
+ else:
+ cmd_ret = errno_wrap(os.link,
+ [slink, entry],
+ [ENOENT, EEXIST], [ESTALE])
+ collect_failure(e, cmd_ret, uid, gid)
+ elif op == 'SYMLINK':
+ en = e['entry']
+ st = lstat(entry)
+ if isinstance(st, int):
+ blob = entry_pack_symlink(cls, gfid, bname, e['link'],
+ e['stat'])
+ elif not matching_disk_gfid(gfid, en):
+ collect_failure(e, EEXIST, uid, gid)
+ elif op == 'RENAME':
+ en = e['entry1']
+ # The matching disk gfid check validates two things
+ # 1. Validates name is present, return false otherwise
+ # 2. Validates gfid is same, returns false otherwise
+ # So both validations are necessary to decide src doesn't
+ # exist. We can't rely on only gfid stat as hardlink could
+ # be present and we can't rely only on name as name could
+ # exist with different gfid.
+ if not matching_disk_gfid(gfid, entry):
+ if e['stat'] and not stat.S_ISDIR(e['stat']['mode']):
+ if stat.S_ISLNK(e['stat']['mode']):
+ # src is not present, so don't sync symlink as
+ # we don't know target. It's ok to ignore. If
+ # it's unliked, it's fine. If it's renamed to
+ # something else, it will be synced then.
+ if e['link'] is not None:
+ st1 = lstat(en)
+ if isinstance(st1, int):
+ (pg, bname) = entry2pb(en)
+ blob = entry_pack_symlink(cls, gfid, bname,
+ e['link'],
+ e['stat'])
+ elif not matching_disk_gfid(gfid, en):
+ collect_failure(e, EEXIST, uid, gid, True)
+ else:
+ slink = os.path.join(pfx, gfid)
+ st = lstat(slink)
+ # don't create multiple entries with same gfid
+ if isinstance(st, int):
+ (pg, bname) = entry2pb(en)
+ blob = entry_pack_reg_stat(cls, gfid, bname,
+ e['stat'])
+ else:
+ cmd_ret = errno_wrap(os.link, [slink, en],
+ [ENOENT, EEXIST], [ESTALE])
+ collect_failure(e, cmd_ret, uid, gid)
+ else:
+ st = lstat(entry)
+ st1 = lstat(en)
+ if isinstance(st1, int):
+ rename_with_disk_gfid_confirmation(gfid, entry, en,
+ uid, gid)
+ else:
+ if st.st_ino == st1.st_ino:
+ # we have a hard link, we can now unlink source
+ try:
+ errno_wrap(os.unlink, [entry],
+ [ENOENT, ESTALE], [EBUSY])
+ except OSError as e:
+ if e.errno == EISDIR:
+ try:
+ errno_wrap(os.rmdir, [entry],
+ [ENOENT, ESTALE], [EBUSY])
+ except OSError as e:
+ if e.errno == ENOTEMPTY:
+ logging.error(
+ lf("Directory Rename failed. "
+ "Both Old and New"
+ " directories exists",
+ old=entry,
+ new=en))
+ else:
+ raise
+ else:
+ raise
+ elif not matching_disk_gfid(gfid, en) and dist_count > 1:
+ collect_failure(e, EEXIST, uid, gid, True)
+ else:
+ # We are here which means matching_disk_gfid for
+ # both source and destination has returned false
+ # and distribution count for master vol is greater
+ # then one. Which basically says both the source and
+ # destination exist and not hardlinks.
+ # So we are safe to go ahead with rename here.
+ rename_with_disk_gfid_confirmation(gfid, entry, en,
+ uid, gid)
+ if blob:
+ cmd_ret = errno_wrap(Xattr.lsetxattr,
+ [pg, 'glusterfs.gfid.newfile', blob],
+ [EEXIST, ENOENT, ESTALE],
+ [ESTALE, EINVAL, EBUSY])
+ collect_failure(e, cmd_ret, uid, gid)
+
+ # If UID/GID is different than zero that means we are trying
+ # create Entry with different UID/GID. Create Entry with
+ # UID:0 and GID:0, and then call chown to set UID/GID
+ if uid != 0 or gid != 0:
+ path = os.path.join(pfx, gfid)
+ cmd_ret = errno_wrap(os.lchown, [path, uid, gid], [ENOENT],
+ [ESTALE, EINVAL])
+ collect_failure(e, cmd_ret, uid, gid)
+
+ return failures
+
+ @classmethod
+ def meta_ops(cls, meta_entries):
+ logging.debug('Meta-entries: %s' % repr(meta_entries))
+ failures = []
+ for e in meta_entries:
+ mode = e['stat']['mode']
+ uid = e['stat']['uid']
+ gid = e['stat']['gid']
+ atime = e['stat']['atime']
+ mtime = e['stat']['mtime']
+ go = e['go']
+ # Linux doesn't support chmod on symlink itself.
+ # It is always applied to the target file. So
+ # changelog would record target file's gfid
+ # and we are good. But 'chown' is supported on
+ # symlink file. So changelog would record symlink
+ # gfid in such cases. Since we do 'chown' 'chmod'
+ # 'utime' for each gfid recorded for metadata, and
+ # we know from changelog the metadata is on symlink's
+ # gfid or target file's gfid, we should be doing
+ # 'lchown' 'lchmod' 'utime with no-deference' blindly.
+ # But since 'lchmod' and 'utime with no de-reference' is
+ # not supported in python3, we have to rely on 'chmod'
+ # and 'utime with de-reference'. Hence avoiding 'chmod'
+ # and 'utime' if it's symlink file.
+
+ is_symlink = False
+ cmd_ret = errno_wrap(os.lchown, [go, uid, gid], [ENOENT],
+ [ESTALE, EINVAL])
+ if isinstance(cmd_ret, int):
+ continue
+
+ is_symlink = os.path.islink(go)
+
+ if not is_symlink:
+ cmd_ret = errno_wrap(os.chmod, [go, mode],
+ [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
+ if isinstance(cmd_ret, int):
+ failures.append((e, cmd_ret, "chmod"))
+
+ cmd_ret = errno_wrap(os.utime, [go, (atime, mtime)],
+ [ENOENT, EACCES, EPERM], [ESTALE, EINVAL])
+ if isinstance(cmd_ret, int):
+ failures.append((e, cmd_ret, "utime"))
+ return failures
+
+ @classmethod
+ @_pathguard
+ def setattr(cls, path, adct):
+ """set file attributes
+
+ @adct is a dict, where 'own', 'mode' and 'times'
+ keys are looked for and values used to perform
+ chown, chmod or utimes on @path.
+ """
+ own = adct.get('own')
+ if own:
+ os.lchown(path, *own)
+ mode = adct.get('mode')
+ if mode:
+ os.chmod(path, stat.S_IMODE(mode))
+ times = adct.get('times')
+ if times:
+ os.utime(path, times)
+
+ @staticmethod
+ def pid():
+ return os.getpid()
+
+ last_keep_alive = 0
+
+ @classmethod
+ def keep_alive(cls, dct):
+ """process keepalive messages.
+
+ Return keep-alive counter (number of received keep-alive
+ messages).
+
+ Now the "keep-alive" message can also have a payload which is
+ used to set a foreign volume-mark on the underlying file system.
+ """
+ if dct:
+ key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])
+ val = struct.pack(cls.FRGN_FMTSTR,
+ *(dct['version'] +
+ tuple(int(x, 16)
+ for x in re.findall('(?:[\da-f]){2}',
+ dct['uuid'])) +
+ (dct['retval'],) + dct['volume_mark'][0:2] + (
+ dct['timeout'],)))
+ Xattr.lsetxattr('.', key, val)
+ cls.last_keep_alive += 1
+ return cls.last_keep_alive
+
+ @staticmethod
+ def version():
+ """version used in handshake"""
+ return 1.0
+
+
+class Mounter(object):
+
+ """Abstract base class for mounter backends"""
+
+ def __init__(self, params):
+ self.params = params
+ self.mntpt = None
+ self.umount_cmd = []
+
+ @classmethod
+ def get_glusterprog(cls):
+ gluster_cmd_dir = gconf.get("gluster-command-dir")
+ if rconf.args.subcmd == "slave":
+ gluster_cmd_dir = gconf.get("slave-gluster-command-dir")
+ return os.path.join(gluster_cmd_dir, cls.glusterprog)
+
+ def umount_l(self, d):
+ """perform lazy umount"""
+ po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE,
+ universal_newlines=True)
+ po.wait()
+ return po
+
+ @classmethod
+ def make_umount_argv(cls, d):
+ raise NotImplementedError
+
+ def make_mount_argv(self, label=None):
+ raise NotImplementedError
+
+ def cleanup_mntpt(self, *a):
+ pass
+
+ def handle_mounter(self, po):
+ po.wait()
+
+ def inhibit(self, label):
+ """inhibit a gluster filesystem
+
+ Mount glusterfs over a temporary mountpoint,
+ change into the mount, and lazy unmount the
+ filesystem.
+ """
+ mpi, mpo = pipe()
+ mh = Popen.fork()
+ if mh:
+ # Parent
+ os.close(mpi)
+ fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
+ d = None
+ margv = self.make_mount_argv(label)
+ if self.mntpt:
+ # mntpt is determined pre-mount
+ d = self.mntpt
+ mnt_msg = d + '\0'
+ encoded_msg = mnt_msg.encode()
+ os.write(mpo, encoded_msg)
+ po = Popen(margv, **self.mountkw)
+ self.handle_mounter(po)
+ po.terminate_geterr()
+ logging.debug('auxiliary glusterfs mount in place')
+ if not d:
+ # mntpt is determined during mount
+ d = self.mntpt
+ mnt_msg = d + '\0'
+ encoded_msg = mnt_msg.encode()
+ os.write(mpo, encoded_msg)
+ encoded_msg = 'M'.encode()
+ os.write(mpo, encoded_msg)
+ t = syncdutils.Thread(target=lambda: os.chdir(d))
+ t.start()
+ tlim = rconf.starttime + gconf.get("connection-timeout")
+ while True:
+ if not t.isAlive():
+ break
+
+ if time.time() >= tlim:
+ syncdutils.finalize(exval=1)
+ time.sleep(1)
+ os.close(mpo)
+ _, rv = syncdutils.waitpid(mh, 0)
+ if rv:
+ rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \
+ (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0)
+ logging.warn(lf('stale mount possibly left behind',
+ path=d))
+ raise GsyncdError("cleaning up temp mountpoint %s "
+ "failed with status %d" %
+ (d, rv))
+ else:
+ rv = 0
+ try:
+ os.setsid()
+ os.close(mpo)
+ mntdata = ''
+ while True:
+ c = os.read(mpi, 1)
+ c = c.decode()
+ if not c:
+ break
+ mntdata += c
+ if mntdata:
+ mounted = False
+ if mntdata[-1] == 'M':
+ mntdata = mntdata[:-1]
+ assert(mntdata)
+ mounted = True
+ assert(mntdata[-1] == '\0')
+ mntpt = mntdata[:-1]
+ assert(mntpt)
+
+ umount_master = False
+ umount_slave = False
+ if rconf.args.subcmd == "worker" \
+ and not unshare_propagation_supported() \
+ and not gconf.get("access-mount"):
+ umount_master = True
+ if rconf.args.subcmd == "slave" \
+ and not gconf.get("slave-access-mount"):
+ umount_slave = True
+
+ if mounted and (umount_master or umount_slave):
+ po = self.umount_l(mntpt)
+ po.terminate_geterr(fail_on_err=False)
+ if po.returncode != 0:
+ po.errlog()
+ rv = po.returncode
+ logging.debug("Lazy umount done: %s" % mntpt)
+ if umount_master or umount_slave:
+ self.cleanup_mntpt(mntpt)
+ except:
+ logging.exception('mount cleanup failure:')
+ rv = 200
+ os._exit(rv)
+
+ #Polling the dht.subvol.status value.
+ RETRIES = 10
+ while not gf_mount_ready():
+ if RETRIES < 0:
+ logging.error('Subvols are not up')
+ break
+ RETRIES -= 1
+ time.sleep(0.2)
+
+ logging.debug('auxiliary glusterfs mount prepared')
+
+
+class DirectMounter(Mounter):
+
+ """mounter backend which calls mount(8), umount(8) directly"""
+
+ mountkw = {'stderr': subprocess.PIPE, 'universal_newlines': True}
+ glusterprog = 'glusterfs'
+
+ @staticmethod
+ def make_umount_argv(d):
+ return ['umount', '-l', d]
+
+ def make_mount_argv(self, label=None):
+ self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-')
+ rconf.mount_point = self.mntpt
+ return [self.get_glusterprog()] + \
+ ['--' + p for p in self.params] + [self.mntpt]
+
+ def cleanup_mntpt(self, mntpt=None):
+ if not mntpt:
+ mntpt = self.mntpt
+ errno_wrap(os.rmdir, [mntpt], [ENOENT, EBUSY])
+
+
+class MountbrokerMounter(Mounter):
+
+ """mounter backend using the mountbroker gluster service"""
+
+ mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE,
+ 'universal_newlines': True}
+ glusterprog = 'gluster'
+
+ @classmethod
+ def make_cli_argv(cls):
+ return [cls.get_glusterprog()] + ['--remote-host=localhost'] + \
+ gconf.get("gluster-cli-options").split() + ['system::']
+
+ @classmethod
+ def make_umount_argv(cls, d):
+ return cls.make_cli_argv() + ['umount', d, 'lazy']
+
+ def make_mount_argv(self, label):
+ return self.make_cli_argv() + \
+ ['mount', label, 'user-map-root=' +
+ syncdutils.getusername()] + self.params
+
+ def handle_mounter(self, po):
+ self.mntpt = po.stdout.readline()[:-1]
+ rconf.mount_point = self.mntpt
+ rconf.mountbroker = True
+ self.umount_cmd = self.make_cli_argv() + ['umount']
+ rconf.mbr_umount_cmd = self.umount_cmd
+ po.stdout.close()
+ sup(self, po)
+ if po.returncode != 0:
+ # if cli terminated with error due to being
+ # refused by glusterd, what it put
+ # out on stdout is a diagnostic message
+ logging.error(lf('glusterd answered', mnt=self.mntpt))
+
+
+class GLUSTERServer(Server):
+
+ "server enhancements for a glusterfs backend"""
+
+ @classmethod
+ def _attr_unpack_dict(cls, xattr, extra_fields=''):
+ """generic volume mark fetching/parsing backed"""
+ fmt_string = cls.NTV_FMTSTR + extra_fields
+ buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
+ buf = str_to_bytearray(buf)
+ vm = struct.unpack(fmt_string, buf)
+ m = re.match(
+ '(.{8})(.{4})(.{4})(.{4})(.{12})',
+ "".join(['%02x' % x for x in vm[2:18]]))
+ uuid = '-'.join(m.groups())
+ volinfo = {'version': vm[0:2],
+ 'uuid': uuid,
+ 'retval': vm[18],
+ 'volume_mark': vm[19:21],
+ }
+ if extra_fields:
+ return volinfo, vm[-len(extra_fields):]
+ else:
+ return volinfo
+
+ @classmethod
+ def foreign_volume_infos(cls):
+ """return list of valid (not expired) foreign volume marks"""
+ dict_list = []
+ xattr_list = Xattr.llistxattr_buf('.')
+ for ele in xattr_list:
+ if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0:
+ d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
+ now = int(time.time())
+ if x[0] > now:
+ logging.debug("volinfo[%s] expires: %d "
+ "(%d sec later)" %
+ (d['uuid'], x[0], x[0] - now))
+ d['timeout'] = x[0]
+ dict_list.append(d)
+ else:
+ try:
+ Xattr.lremovexattr('.', ele)
+ except OSError:
+ pass
+ return dict_list
+
+ @classmethod
+ def native_volume_info(cls):
+ """get the native volume mark of the underlying gluster volume"""
+ try:
+ return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE,
+ 'volume-mark']))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno != ENODATA:
+ raise
+
+
+class GLUSTER(object):
+
+ """scheme class for gluster:// urls
+
+ can be used to represent a gluster slave server
+ on slave side, or interface to a remote gluster
+ slave on master side, or to represent master
+ (slave-ish features come from the mixins, master
+ functionality is outsourced to GMaster from master)
+ """
+ server = GLUSTERServer
+
+ def __init__(self, host, volume):
+ self.path = "%s:%s" % (host, volume)
+ self.host = host
+ self.volume = volume
+
+ global slv_volume
+ global slv_host
+ slv_volume = self.volume
+ slv_host = self.host
+
+ def connect(self):
+ """inhibit the resource beyond
+
+ Choose mounting backend (direct or mountbroker),
+ set up glusterfs parameters and perform the mount
+ with given backend
+ """
+
+ logging.info("Mounting gluster volume locally...")
+ t0 = time.time()
+ label = gconf.get('mountbroker', None)
+ if not label and not privileged():
+ label = syncdutils.getusername()
+ mounter = label and MountbrokerMounter or DirectMounter
+
+ log_file = gconf.get("gluster-log-file")
+ if rconf.args.subcmd == "slave":
+ log_file = gconf.get("slave-gluster-log-file")
+
+ log_level = gconf.get("gluster-log-level")
+ if rconf.args.subcmd == "slave":
+ log_level = gconf.get("slave-gluster-log-level")
+
+ params = gconf.get("gluster-params").split() + \
+ ['log-level=' + log_level] + \
+ ['log-file=' + log_file, 'volfile-server=' + self.host] + \
+ ['volfile-id=' + self.volume, 'client-pid=-1']
+
+ self.mounter = mounter(params)
+ self.mounter.inhibit(label)
+ logging.info(lf("Mounted gluster volume",
+ duration="%.4f" % (time.time() - t0)))
+
+ def gmaster_instantiate_tuple(self, slave):
+ """return a tuple of the 'one shot' and the 'main crawl'
+ class instance"""
+ return (gmaster_builder('xsync')(self, slave),
+ gmaster_builder()(self, slave),
+ gmaster_builder('changeloghistory')(self, slave))
+
+ def service_loop(self, slave=None):
+ """enter service loop
+
+ - if slave given, instantiate GMaster and
+ pass control to that instance, which implements
+ master behavior
+ - else do that's what's inherited
+ """
+ if rconf.args.subcmd == "slave":
+ if gconf.get("use-rsync-xattrs") and not privileged():
+ raise GsyncdError(
+ "using rsync for extended attributes is not supported")
+
+ repce = RepceServer(
+ self.server, sys.stdin, sys.stdout, gconf.get("sync-jobs"))
+ t = syncdutils.Thread(target=lambda: (repce.service_loop(),
+ syncdutils.finalize()))
+ t.start()
+ logging.info("slave listening")
+ if gconf.get("slave-timeout") and gconf.get("slave-timeout") > 0:
+ while True:
+ lp = self.server.last_keep_alive
+ time.sleep(gconf.get("slave-timeout"))
+ if lp == self.server.last_keep_alive:
+ logging.info(
+ lf("connection inactive, stopping",
+ timeout=gconf.get("slave-timeout")))
+ break
+ else:
+ select((), (), ())
+
+ return
+
+ class brickserver(Server):
+ local_path = rconf.args.local_path
+ aggregated = self.server
+
+ @classmethod
+ def entries(cls, path):
+ e = super(brickserver, cls).entries(path)
+ # on the brick don't mess with /.glusterfs
+ if path == '.':
+ try:
+ e.remove('.glusterfs')
+ e.remove('.trashcan')
+ except ValueError:
+ pass
+ return e
+
+ @classmethod
+ def lstat(cls, e):
+ """ path based backend stat """
+ return super(brickserver, cls).lstat(e)
+
+ @classmethod
+ def gfid(cls, e):
+ """ path based backend gfid fetch """
+ return super(brickserver, cls).gfid(e)
+
+ @classmethod
+ def linkto_check(cls, e):
+ return super(brickserver, cls).linkto_check(e)
+
+ # define {,set_}xtime in slave, thus preempting
+ # the call to remote, so that it takes data from
+ # the local brick
+ slave.server.xtime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.xtime(path,
+ uuid + '.' + rconf.args.slave_id)
+ ),
+ slave.server)
+ slave.server.stime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.stime(path,
+ uuid + '.' + rconf.args.slave_id)
+ ),
+ slave.server)
+ slave.server.entry_stime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.entry_stime(
+ path,
+ uuid + '.' + rconf.args.slave_id)
+ ),
+ slave.server)
+ slave.server.set_stime = types.MethodType(
+ lambda _self, path, uuid, mark: (
+ brickserver.set_stime(path,
+ uuid + '.' + rconf.args.slave_id,
+ mark)
+ ),
+ slave.server)
+ slave.server.set_entry_stime = types.MethodType(
+ lambda _self, path, uuid, mark: (
+ brickserver.set_entry_stime(
+ path,
+ uuid + '.' + rconf.args.slave_id,
+ mark)
+ ),
+ slave.server)
+
+ (g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
+ g1.master.server = brickserver
+ g2.master.server = brickserver
+ g3.master.server = brickserver
+
+ # bad bad bad: bad way to do things like this
+ # need to make this elegant
+ # register the crawlers and start crawling
+ # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
+ # g3 ==> changelog History
+ status = GeorepStatus(gconf.get("state-file"),
+ rconf.args.local_node,
+ rconf.args.local_path,
+ rconf.args.local_node_id,
+ rconf.args.master,
+ rconf.args.slave)
+ status.reset_on_worker_start()
+
+ try:
+ workdir = g2.setup_working_dir()
+ # Register only when change_detector is not set to
+ # xsync, else agent will generate changelog files
+ # in .processing directory of working dir
+ if gconf.get("change-detector") != 'xsync':
+ # register with the changelog library
+ # 9 == log level (DEBUG)
+ # 5 == connection retries
+ libgfchangelog.register(rconf.args.local_path,
+ workdir,
+ gconf.get("changelog-log-file"),
+ get_changelog_log_level(
+ gconf.get("changelog-log-level")),
+ g2.CHANGELOG_CONN_RETRIES)
+
+ register_time = int(time.time())
+ g2.register(register_time, status)
+ g3.register(register_time, status)
+ except ChangelogException as e:
+ logging.error(lf("Changelog register failed", error=e))
+ sys.exit(1)
+
+ g1.register(status=status)
+ logging.info(lf("Register time",
+ time=register_time))
+ # oneshot: Try to use changelog history api, if not
+ # available switch to FS crawl
+ # Note: if config.change_detector is xsync then
+ # it will not use changelog history api
+ try:
+ g3.crawlwrap(oneshot=True)
+ except PartialHistoryAvailable as e:
+ logging.info(lf('Partial history available, using xsync crawl'
+ ' after consuming history',
+ till=e))
+ g1.crawlwrap(oneshot=True, register_time=register_time)
+ except ChangelogHistoryNotAvailable:
+ logging.info('Changelog history not available, using xsync')
+ g1.crawlwrap(oneshot=True, register_time=register_time)
+ except NoStimeAvailable:
+ logging.info('No stime available, using xsync crawl')
+ g1.crawlwrap(oneshot=True, register_time=register_time)
+ except ChangelogException as e:
+ logging.error(lf("Changelog History Crawl failed",
+ error=e))
+ sys.exit(1)
+
+ try:
+ g2.crawlwrap()
+ except ChangelogException as e:
+ logging.error(lf("Changelog crawl failed", error=e))
+ sys.exit(1)
+
+
+class SSH(object):
+
+ """scheme class for ssh:// urls
+
+ interface to remote slave on master side
+ implementing an ssh based proxy
+ """
+
+ def __init__(self, host, volume):
+ self.remote_addr = host
+ self.volume = volume
+
+ @staticmethod
+ def parse_ssh_address(self):
+ m = re.match('([^@]+)@(.+)', self.remote_addr)
+ if m:
+ u, h = m.groups()
+ else:
+ u, h = syncdutils.getusername(), self.remote_addr
+ self.remotehost = h
+ return {'user': u, 'host': h}
+
+ def start_fd_client(self, i, o):
+ """set up RePCe client, handshake with server
+
+ It's cut out as a separate method to let
+ subclasses hook into client startup
+ """
+ self.server = RepceClient(i, o)
+ rv = self.server.__version__()
+ exrv = {'proto': repce.repce_version, 'object': Server.version()}
+ da0 = (rv, exrv)
+ da1 = ({}, {})
+ for i in range(2):
+ for k, v in da0[i].items():
+ da1[i][k] = int(v)
+ if da1[0] != da1[1]:
+ raise GsyncdError(
+ "RePCe major version mismatch: local %s, remote %s" %
+ (exrv, rv))
+ slavepath = "/proc/%d/cwd" % self.server.pid()
+ self.slaveurl = ':'.join([self.remote_addr, slavepath])
+
+ def connect_remote(self):
+ """connect to inner slave url through outer ssh url
+
+ Wrap the connecting utility in ssh.
+
+ Much care is put into daemonizing: in that case
+ ssh is started before daemonization, but
+ RePCe client is to be created after that (as ssh
+ interactive password auth would be defeated by
+ a daemonized ssh, while client should be present
+ only in the final process). In that case the action
+ is taken apart to two parts, this method is ivoked
+ once pre-daemon, once post-daemon. Use @go_daemon
+ to deiced what part to perform.
+
+ [NB. ATM gluster product does not makes use of interactive
+ authentication.]
+ """
+ syncdutils.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'),
+ self.remote_addr,
+ self.volume)
+
+ logging.info("Initializing SSH connection between master and slave...")
+ t0 = time.time()
+
+ extra_opts = []
+ remote_gsyncd = gconf.get("remote-gsyncd")
+ if remote_gsyncd == "":
+ remote_gsyncd = "/nonexistent/gsyncd"
+
+ if gconf.get("use-rsync-xattrs"):
+ extra_opts.append('--use-rsync-xattrs')
+
+ args_to_slave = [gconf.get("ssh-command")] + \
+ gconf.get("ssh-options").split() + \
+ ["-p", str(gconf.get("ssh-port"))] + \
+ rconf.ssh_ctl_args + [self.remote_addr] + \
+ [remote_gsyncd, "slave"] + \
+ extra_opts + \
+ [rconf.args.master, rconf.args.slave] + \
+ [
+ '--master-node', rconf.args.local_node,
+ '--master-node-id', rconf.args.local_node_id,
+ '--master-brick', rconf.args.local_path,
+ '--local-node', rconf.args.resource_remote,
+ '--local-node-id', rconf.args.resource_remote_id] + \
+ [
+ # Add all config arguments here, slave gsyncd will not use
+ # config file in slave side, so all overriding options should
+ # be sent as arguments
+ '--slave-timeout', str(gconf.get("slave-timeout")),
+ '--slave-log-level', gconf.get("slave-log-level"),
+ '--slave-gluster-log-level',
+ gconf.get("slave-gluster-log-level"),
+ '--slave-gluster-command-dir',
+ gconf.get("slave-gluster-command-dir"),
+ '--master-dist-count',
+ str(gconf.get("master-distribution-count"))]
+
+ if gconf.get("slave-access-mount"):
+ args_to_slave.append('--slave-access-mount')
+
+ if rconf.args.debug:
+ args_to_slave.append('--debug')
+
+ po = Popen(args_to_slave,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ rconf.transport = po
+ self.start_fd_client(po.stdout, po.stdin)
+ logging.info(lf("SSH connection between master and slave established.",
+ duration="%.4f" % (time.time() - t0)))
+
+ def rsync(self, files, *args, **kw):
+ """invoke rsync"""
+ if not files:
+ raise GsyncdError("no files to sync")
+ logging.debug("files: " + ", ".join(files))
+
+ extra_rsync_flags = []
+ # Performance flag, --ignore-missing-args, if rsync version is
+ # greater than 3.1.0 then include this flag.
+ if gconf.get("rsync-opt-ignore-missing-args") and \
+ get_rsync_version(gconf.get("rsync-command")) >= "3.1.0":
+ extra_rsync_flags = ["--ignore-missing-args"]
+
+ rsync_ssh_opts = [gconf.get("ssh-command")] + \
+ gconf.get("ssh-options").split() + \
+ ["-p", str(gconf.get("ssh-port"))] + \
+ rconf.ssh_ctl_args + \
+ gconf.get("rsync-ssh-options").split()
+
+ argv = [
+ gconf.get("rsync-command"),
+ '-aR0',
+ '--inplace',
+ '--files-from=-',
+ '--super',
+ '--stats',
+ '--numeric-ids',
+ '--no-implied-dirs'
+ ]
+
+ if gconf.get("rsync-opt-existing"):
+ argv += ["--existing"]
+
+ if gconf.get("sync-xattrs"):
+ argv += ['--xattrs']
+
+ if gconf.get("sync-acls"):
+ argv += ['--acls']
+
+ argv = argv + \
+ gconf.get("rsync-options").split() + \
+ extra_rsync_flags + ['.'] + \
+ ["-e", " ".join(rsync_ssh_opts)] + \
+ [self.slaveurl]
+
+ log_rsync_performance = gconf.getr("log-rsync-performance", False)
+
+ if log_rsync_performance:
+ # use stdout=PIPE only when log_rsync_performance enabled
+ # Else rsync will write to stdout and nobody is there
+ # to consume. If PIPE is full rsync hangs.
+ po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, universal_newlines=True)
+ else:
+ po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
+
+ for f in files:
+ po.stdin.write(f)
+ po.stdin.write('\0')
+
+ stdout, stderr = po.communicate()
+
+ if kw.get("log_err", False):
+ for errline in stderr.strip().split("\n")[:-1]:
+ logging.error(lf("SYNC Error",
+ sync_engine="Rsync",
+ error=errline))
+
+ if log_rsync_performance:
+ rsync_msg = []
+ for line in stdout.split("\n"):
+ if line.startswith("Number of files:") or \
+ line.startswith("Number of regular files transferred:") or \
+ line.startswith("Total file size:") or \
+ line.startswith("Total transferred file size:") or \
+ line.startswith("Literal data:") or \
+ line.startswith("Matched data:") or \
+ line.startswith("Total bytes sent:") or \
+ line.startswith("Total bytes received:") or \
+ line.startswith("sent "):
+ rsync_msg.append(line)
+ logging.info(lf("rsync performance",
+ data=", ".join(rsync_msg)))
+
+ return po
+
+ def tarssh(self, files, log_err=False):
+ """invoke tar+ssh
+ -z (compress) can be use if needed, but omitting it now
+ as it results in weird error (tar+ssh errors out (errcode: 2)
+ """
+ if not files:
+ raise GsyncdError("no files to sync")
+ logging.debug("files: " + ", ".join(files))
+ (host, rdir) = self.slaveurl.split(':')
+
+ tar_cmd = ["tar"] + \
+ ["--sparse", "-cf", "-", "--files-from", "-"]
+ ssh_cmd = gconf.get("ssh-command").split() + \
+ gconf.get("ssh-options-tar").split() + \
+ ["-p", str(gconf.get("ssh-port"))] + \
+ [host, "tar"] + \
+ ["--overwrite", "-xf", "-", "-C", rdir]
+ p0 = Popen(tar_cmd, stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True)
+ p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE,
+ universal_newlines=True)
+ for f in files:
+ p0.stdin.write(f)
+ p0.stdin.write('\n')
+
+ p0.stdin.close()
+ p0.stdout.close() # Allow p0 to receive a SIGPIPE if p1 exits.
+
+ # stdin and stdout of p0 is already closed, Reset to None and
+ # wait for child process to complete
+ p0.stdin = None
+ p0.stdout = None
+
+ def wait_for_tar(p0):
+ _, stderr = p0.communicate()
+ if log_err:
+ for errline in stderr.strip().split("\n")[:-1]:
+ if "No such file or directory" not in errline:
+ logging.error(lf("SYNC Error",
+ sync_engine="Tarssh",
+ error=errline))
+
+ t = syncdutils.Thread(target=wait_for_tar, args=(p0, ))
+ # wait for tar to terminate, collecting any errors, further
+ # waiting for transfer to complete
+ t.start()
+
+ # wait for ssh process
+ _, stderr1 = p1.communicate()
+ t.join()
+
+ if log_err:
+ for errline in stderr1.strip().split("\n")[:-1]:
+ logging.error(lf("SYNC Error",
+ sync_engine="Tarssh",
+ error=errline))
+
+ return p1
diff --git a/geo-replication/syncdaemon/subcmds.py b/geo-replication/syncdaemon/subcmds.py
new file mode 100644
index 00000000000..b8508532e30
--- /dev/null
+++ b/geo-replication/syncdaemon/subcmds.py
@@ -0,0 +1,335 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+from __future__ import print_function
+from syncdutils import lf
+import logging
+import gsyncdconfig as gconf
+
+
+ERROR_CONFIG_INVALID = 2
+ERROR_CONFIG_INVALID_VALUE = 3
+ERROR_CONFIG_NOT_CONFIGURABLE = 4
+
+
+def subcmd_monitor_status(args):
+ from gsyncdstatus import set_monitor_status
+ from rconf import rconf
+
+ set_monitor_status(gconf.get("state-file"), args.status)
+ rconf.log_exit = False
+ logging.info(lf("Monitor Status Change", status=args.status))
+
+
+def subcmd_status(args):
+ from gsyncdstatus import GeorepStatus
+
+ master_name = args.master.replace(":", "")
+ slave_data = args.slave.replace("ssh://", "")
+
+ brick_status = GeorepStatus(gconf.get("state-file"),
+ "",
+ args.local_path,
+ "",
+ master_name,
+ slave_data,
+ gconf.get("pid-file"))
+ checkpoint_time = gconf.get("checkpoint", 0)
+ brick_status.print_status(checkpoint_time=checkpoint_time,
+ json_output=args.json)
+
+
+def subcmd_monitor(args):
+ import monitor
+ from resource import GLUSTER, SSH, Popen
+ go_daemon = False if args.debug else True
+
+ monitor.startup(go_daemon)
+ Popen.init_errhandler()
+ local = GLUSTER("localhost", args.master)
+ slavehost, slavevol = args.slave.split("::")
+ remote = SSH(slavehost, slavevol)
+ return monitor.monitor(local, remote)
+
+
+def subcmd_verify_spawning(args):
+ logging.info("Able to spawn gsyncd.py")
+
+
+def subcmd_worker(args):
+ import os
+ import fcntl
+
+ from resource import GLUSTER, SSH, Popen
+
+ Popen.init_errhandler()
+ fcntl.fcntl(args.feedback_fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
+ local = GLUSTER("localhost", args.master)
+ slave_url, slavevol = args.slave.split("::")
+ if "@" not in slave_url:
+ slavehost = args.resource_remote
+ else:
+ slavehost = "%s@%s" % (slave_url.split("@")[0], args.resource_remote)
+ remote = SSH(slavehost, slavevol)
+ remote.connect_remote()
+ local.connect()
+ logging.info("Worker spawn successful. Acknowledging back to monitor")
+ os.close(args.feedback_fd)
+ local.service_loop(remote)
+
+
+def subcmd_slave(args):
+ from resource import GLUSTER, Popen
+
+ Popen.init_errhandler()
+ slavevol = args.slave.split("::")[-1]
+ local = GLUSTER("localhost", slavevol)
+
+ local.connect()
+ local.service_loop()
+
+
+def subcmd_voluuidget(args):
+ from subprocess import Popen, PIPE
+ import xml.etree.ElementTree as XET
+
+ ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
+
+ cmd = ['gluster', '--xml', '--remote-host=' + args.host,
+ 'volume', 'info', args.volname]
+
+ if args.inet6:
+ cmd.append("--inet6")
+
+ po = Popen(cmd, bufsize=0,
+ stdin=None, stdout=PIPE, stderr=PIPE,
+ universal_newlines=True)
+
+ vix, err = po.communicate()
+ if po.returncode != 0:
+ logging.info(lf("Volume info failed, unable to get "
+ "volume uuid of slavevol, "
+ "returning empty string",
+ slavevol=args.volname,
+ slavehost=args.host,
+ error=po.returncode))
+ return ""
+ vi = XET.fromstring(vix)
+ if vi.find('opRet').text != '0':
+ logging.info(lf("Unable to get volume uuid of slavevol, "
+ "returning empty string",
+ slavevol=args.volname,
+ slavehost=args.host,
+ error=vi.find('opErrstr').text))
+ return ""
+
+ try:
+ voluuid = vi.find("volInfo/volumes/volume/id").text
+ except (ParseError, AttributeError, ValueError) as e:
+ logging.info(lf("Parsing failed to volume uuid of slavevol, "
+ "returning empty string",
+ slavevol=args.volname,
+ slavehost=args.host,
+ error=e))
+ voluuid = ""
+
+ print(voluuid)
+
+
+def _unlink(path):
+ import os
+ from errno import ENOENT
+ from syncdutils import GsyncdError
+ import sys
+
+ try:
+ os.unlink(path)
+ except (OSError, IOError):
+ if sys.exc_info()[1].errno == ENOENT:
+ pass
+ else:
+ raise GsyncdError('Unlink error: %s' % path)
+
+
+def subcmd_delete(args):
+ import logging
+ import shutil
+ import glob
+ import sys
+ from errno import ENOENT, ENODATA
+ import struct
+
+ from syncdutils import GsyncdError, Xattr, errno_wrap
+ import gsyncdconfig as gconf
+
+ logging.info('geo-replication delete')
+ # remove the stime xattr from all the brick paths so that
+ # a re-create of a session will start sync all over again
+ stime_xattr_prefix = gconf.get('stime-xattr-prefix', None)
+
+ # Delete pid file, status file, socket file
+ cleanup_paths = []
+ cleanup_paths.append(gconf.get("pid-file"))
+
+ # Cleanup Session dir
+ try:
+ shutil.rmtree(gconf.get("georep-session-working-dir"))
+ except (IOError, OSError):
+ if sys.exc_info()[1].errno == ENOENT:
+ pass
+ else:
+ raise GsyncdError(
+ 'Error while removing working dir: %s' %
+ gconf.get("georep-session-working-dir"))
+
+ # Cleanup changelog working dirs
+ try:
+ shutil.rmtree(gconf.get("working-dir"))
+ except (IOError, OSError):
+ if sys.exc_info()[1].errno == ENOENT:
+ pass
+ else:
+ raise GsyncdError(
+ 'Error while removing working dir: %s' %
+ gconf.get("working-dir"))
+
+ for path in cleanup_paths:
+ # To delete temp files
+ for f in glob.glob(path + "*"):
+ _unlink(f)
+
+ if args.reset_sync_time and stime_xattr_prefix:
+ for p in args.paths:
+ if p != "":
+ # set stime to (0,0) to trigger full volume content resync
+ # to slave on session recreation
+ # look at master.py::Xcrawl hint: zero_zero
+ errno_wrap(Xattr.lsetxattr,
+ (p, stime_xattr_prefix + ".stime",
+ struct.pack("!II", 0, 0)),
+ [ENOENT, ENODATA])
+ errno_wrap(Xattr.lremovexattr,
+ (p, stime_xattr_prefix + ".entry_stime"),
+ [ENOENT, ENODATA])
+
+ return
+
+
+def print_config(name, value, only_value=False, use_underscore=False):
+ val = value
+ if isinstance(value, bool):
+ val = str(value).lower()
+
+ if only_value:
+ print(val)
+ else:
+ if use_underscore:
+ name = name.replace("-", "_")
+
+ print(("%s:%s" % (name, val)))
+
+
+def config_name_format(val):
+ return val.replace("_", "-")
+
+
+def subcmd_config_get(args):
+ import sys
+ import json
+
+ all_config = gconf.getall(show_defaults=args.show_defaults,
+ show_non_configurable=True)
+ if args.name is not None:
+ val = all_config.get(config_name_format(args.name), None)
+ if val is None:
+ sys.stderr.write("Invalid config name \"%s\"\n" % args.name)
+ sys.exit(ERROR_CONFIG_INVALID)
+
+ print_config(args.name, val["value"], only_value=args.only_value,
+ use_underscore=args.use_underscore)
+ return
+
+ if args.json:
+ out = []
+ # Convert all values as string
+ for k in sorted(all_config):
+ v = all_config[k]
+ out.append({
+ "name": k,
+ "value": str(v["value"]),
+ "default": str(v["default"]),
+ "configurable": v["configurable"],
+ "modified": v["modified"]
+ })
+
+ print((json.dumps(out)))
+ return
+
+ for k in sorted(all_config):
+ print_config(k, all_config[k]["value"],
+ use_underscore=args.use_underscore)
+
+
+def subcmd_config_check(args):
+ import sys
+
+ try:
+ gconf.check(config_name_format(args.name), value=args.value,
+ with_conffile=False)
+ except gconf.GconfNotConfigurable:
+ cnf_val = gconf.get(config_name_format(args.name), None)
+ if cnf_val is None:
+ sys.stderr.write("Invalid config name \"%s\"\n" % args.name)
+ sys.exit(ERROR_CONFIG_INVALID)
+
+ # Not configurable
+ sys.stderr.write("Not configurable \"%s\"\n" % args.name)
+ sys.exit(ERROR_CONFIG_NOT_CONFIGURABLE)
+ except gconf.GconfInvalidValue:
+ sys.stderr.write("Invalid config value \"%s=%s\"\n" % (args.name,
+ args.value))
+ sys.exit(ERROR_CONFIG_INVALID_VALUE)
+
+
+def subcmd_config_set(args):
+ import sys
+
+ try:
+ gconf.setconfig(config_name_format(args.name), args.value)
+ except gconf.GconfNotConfigurable:
+ cnf_val = gconf.get(config_name_format(args.name), None)
+ if cnf_val is None:
+ sys.stderr.write("Invalid config name \"%s\"\n" % args.name)
+ sys.exit(ERROR_CONFIG_INVALID)
+
+ # Not configurable
+ sys.stderr.write("Not configurable \"%s\"\n" % args.name)
+ sys.exit(ERROR_CONFIG_NOT_CONFIGURABLE)
+ except gconf.GconfInvalidValue:
+ sys.stderr.write("Invalid config value \"%s=%s\"\n" % (args.name,
+ args.value))
+ sys.exit(ERROR_CONFIG_INVALID_VALUE)
+
+
+def subcmd_config_reset(args):
+ import sys
+
+ try:
+ gconf.resetconfig(config_name_format(args.name))
+ except gconf.GconfNotConfigurable:
+ cnf_val = gconf.get(config_name_format(args.name), None)
+ if cnf_val is None:
+ sys.stderr.write("Invalid config name \"%s\"\n" % args.name)
+ sys.exit(ERROR_CONFIG_INVALID)
+
+ # Not configurable
+ sys.stderr.write("Not configurable \"%s\"\n" % args.name)
+ sys.exit(ERROR_CONFIG_NOT_CONFIGURABLE)
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
new file mode 100644
index 00000000000..a3df103e76c
--- /dev/null
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -0,0 +1,1115 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+import sys
+import pwd
+import time
+import fcntl
+import shutil
+import logging
+import errno
+import threading
+import subprocess
+import socket
+from subprocess import PIPE
+from threading import Lock, Thread as baseThread
+from errno import (EACCES, EAGAIN, EPIPE, ENOTCONN, ENOMEM, ECONNABORTED,
+ EINTR, ENOENT, ESTALE, EBUSY, ENODATA, errorcode, EIO)
+from signal import signal, SIGTERM
+import select as oselect
+from os import waitpid as owaitpid
+import xml.etree.ElementTree as XET
+from select import error as SelectError
+try:
+ from cPickle import PickleError
+except ImportError:
+ from pickle import PickleError
+
+from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
+sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
+EVENTS_ENABLED = True
+try:
+ from gfevents.eventtypes import GEOREP_FAULTY as EVENT_GEOREP_FAULTY
+ from gfevents.eventtypes import GEOREP_ACTIVE as EVENT_GEOREP_ACTIVE
+ from gfevents.eventtypes import GEOREP_PASSIVE as EVENT_GEOREP_PASSIVE
+ from gfevents.eventtypes import GEOREP_CHECKPOINT_COMPLETED \
+ as EVENT_GEOREP_CHECKPOINT_COMPLETED
+except ImportError:
+ # Events APIs not installed, dummy eventtypes with None
+ EVENTS_ENABLED = False
+ EVENT_GEOREP_FAULTY = None
+ EVENT_GEOREP_ACTIVE = None
+ EVENT_GEOREP_PASSIVE = None
+ EVENT_GEOREP_CHECKPOINT_COMPLETED = None
+
+import gsyncdconfig as gconf
+from rconf import rconf
+
+from hashlib import sha256 as sha256
+
+ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
+
+# auxiliary gfid based access prefix
+_CL_AUX_GFID_PFX = ".gfid/"
+ROOT_GFID = "00000000-0000-0000-0000-000000000001"
+GF_OP_RETRIES = 10
+
+GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
+
+NodeID = None
+rsync_version = None
+unshare_mnt_propagation = None
+slv_bricks = None
+SPACE_ESCAPE_CHAR = "%20"
+NEWLINE_ESCAPE_CHAR = "%0A"
+PERCENTAGE_ESCAPE_CHAR = "%25"
+
+final_lock = Lock()
+
+def sup(x, *a, **kw):
+ """a rubyesque "super" for python ;)
+
+ invoke caller method in parent class with given args.
+ """
+ return getattr(super(type(x), x),
+ sys._getframe(1).f_code.co_name)(*a, **kw)
+
+
+def escape(s):
+ """the chosen flavor of string escaping, used all over
+ to turn whatever data to creatable representation"""
+ return s.replace("/", "-").strip("-")
+
+
+def escape_space_newline(s):
+ return s.replace("%", PERCENTAGE_ESCAPE_CHAR)\
+ .replace(" ", SPACE_ESCAPE_CHAR)\
+ .replace("\n", NEWLINE_ESCAPE_CHAR)
+
+
+def unescape_space_newline(s):
+ return s.replace(SPACE_ESCAPE_CHAR, " ")\
+ .replace(NEWLINE_ESCAPE_CHAR, "\n")\
+ .replace(PERCENTAGE_ESCAPE_CHAR, "%")
+
+# gf_mount_ready() returns 1 if all subvols are up, else 0
+def gf_mount_ready():
+ ret = errno_wrap(Xattr.lgetxattr,
+ ['.', 'dht.subvol.status', 16],
+ [ENOENT, ENOTSUP, ENODATA], [ENOMEM])
+
+ if isinstance(ret, int):
+ logging.error("failed to get the xattr value")
+ return 1
+ ret = ret.rstrip('\x00')
+ if ret == "1":
+ return 1
+ return 0
+
+def norm(s):
+ if s:
+ return s.replace('-', '_')
+
+
+def update_file(path, updater, merger=lambda f: True):
+ """update a file in a transaction-like manner"""
+
+ fr = fw = None
+ try:
+ fd = os.open(path, os.O_CREAT | os.O_RDWR)
+ try:
+ fr = os.fdopen(fd, 'r+b')
+ except:
+ os.close(fd)
+ raise
+ fcntl.lockf(fr, fcntl.LOCK_EX)
+ if not merger(fr):
+ return
+
+ tmpp = path + '.tmp.' + str(os.getpid())
+ fd = os.open(tmpp, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
+ try:
+ fw = os.fdopen(fd, 'wb', 0)
+ except:
+ os.close(fd)
+ raise
+ updater(fw)
+ os.fsync(fd)
+ os.rename(tmpp, path)
+ finally:
+ for fx in (fr, fw):
+ if fx:
+ fx.close()
+
+
+def create_manifest(fname, content):
+ """
+ Create manifest file for SSH Control Path
+ """
+ fd = None
+ try:
+ fd = os.open(fname, os.O_CREAT | os.O_RDWR)
+ try:
+ os.write(fd, content)
+ except:
+ os.close(fd)
+ raise
+ finally:
+ if fd is not None:
+ os.close(fd)
+
+
+def setup_ssh_ctl(ctld, remote_addr, resource_url):
+ """
+ Setup GConf ssh control path parameters
+ """
+ rconf.ssh_ctl_dir = ctld
+ content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr,
+ resource_url)
+ encoded_content = content.encode()
+ content_sha256 = sha256hex(encoded_content)
+ """
+ The length of ctl_path for ssh connection should not be > 108.
+ ssh fails with ctl_path too long if it is so. But when rsync
+ is piped to ssh, it is not taking > 90. Hence using first 32
+ bytes of hash. Hash collision doesn't matter as only one sock
+ file is created per directory.
+ """
+ content_sha256 = content_sha256[:32]
+ fname = os.path.join(rconf.ssh_ctl_dir,
+ "%s.mft" % content_sha256)
+
+ create_manifest(fname, encoded_content)
+ ssh_ctl_path = os.path.join(rconf.ssh_ctl_dir,
+ "%s.sock" % content_sha256)
+ rconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path]
+
+
+def grabfile(fname, content=None):
+ """open @fname + contest for its fcntl lock
+
+ @content: if given, set the file content to it
+ """
+ # damn those messy open() mode codes
+ fd = os.open(fname, os.O_CREAT | os.O_RDWR)
+ f = os.fdopen(fd, 'r+')
+ try:
+ fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except:
+ ex = sys.exc_info()[1]
+ f.close()
+ if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
+ # cannot grab, it's taken
+ return
+ raise
+ if content:
+ try:
+ f.truncate()
+ f.write(content)
+ f.flush()
+ except:
+ f.close()
+ raise
+ rconf.permanent_handles.append(f)
+ return f
+
+
+def grabpidfile(fname=None, setpid=True):
+ """.grabfile customization for pid files"""
+ if not fname:
+ fname = gconf.get("pid-file")
+ content = None
+ if setpid:
+ content = str(os.getpid()) + '\n'
+ return grabfile(fname, content=content)
+
+
+def finalize(*args, **kwargs):
+ """all those messy final steps we go trough upon termination
+
+ Do away with pidfile, ssh control dir and logging.
+ """
+
+ final_lock.acquire()
+ if gconf.get('pid_file'):
+ rm_pidf = rconf.pid_file_owned
+ if rconf.cpid:
+ # exit path from parent branch of daemonization
+ rm_pidf = False
+ while True:
+ f = grabpidfile(setpid=False)
+ if not f:
+ # child has already taken over pidfile
+ break
+ if os.waitpid(rconf.cpid, os.WNOHANG)[0] == rconf.cpid:
+ # child has terminated
+ rm_pidf = True
+ break
+ time.sleep(0.1)
+ if rm_pidf:
+ try:
+ os.unlink(rconf.pid_file)
+ except:
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ pass
+ else:
+ raise
+ if rconf.ssh_ctl_dir and not rconf.cpid:
+ def handle_rm_error(func, path, exc_info):
+ if exc_info[1].errno == ENOENT:
+ return
+ raise exc_info[1]
+
+ shutil.rmtree(rconf.ssh_ctl_dir, onerror=handle_rm_error)
+
+ """ Unmount if not done """
+ if rconf.mount_point:
+ if rconf.mountbroker:
+ umount_cmd = rconf.mbr_umount_cmd + [rconf.mount_point, 'lazy']
+ else:
+ umount_cmd = ['umount', '-l', rconf.mount_point]
+ p0 = subprocess.Popen(umount_cmd, stderr=subprocess.PIPE,
+ universal_newlines=True)
+ _, errdata = p0.communicate()
+ if p0.returncode == 0:
+ try:
+ os.rmdir(rconf.mount_point)
+ except OSError:
+ pass
+ else:
+ pass
+
+ if rconf.log_exit:
+ logging.info("exiting.")
+ sys.stdout.flush()
+ sys.stderr.flush()
+ os._exit(kwargs.get('exval', 0))
+
+
+def log_raise_exception(excont):
+ """top-level exception handler
+
+ Try to some fancy things to cover up we face with an error.
+ Translate some weird sounding but well understood exceptions
+ into human-friendly lingo
+ """
+
+ is_filelog = False
+ for h in logging.getLogger().handlers:
+ fno = getattr(getattr(h, 'stream', None), 'fileno', None)
+ if fno and not os.isatty(fno()):
+ is_filelog = True
+
+ exc = sys.exc_info()[1]
+ if isinstance(exc, SystemExit):
+ excont.exval = exc.code or 0
+ raise
+ else:
+ logtag = None
+ if isinstance(exc, GsyncdError):
+ if is_filelog:
+ logging.error(exc.args[0])
+ sys.stderr.write('failure: ' + exc.args[0] + '\n')
+ elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \
+ ((isinstance(exc, OSError) or isinstance(exc, IOError)) and
+ exc.errno == EPIPE):
+ logging.error('connection to peer is broken')
+ if hasattr(rconf, 'transport'):
+ rconf.transport.wait()
+ if rconf.transport.returncode == 127:
+ logging.error("getting \"No such file or directory\""
+ "errors is most likely due to "
+ "MISCONFIGURATION, please remove all "
+ "the public keys added by geo-replication "
+ "from authorized_keys file in slave nodes "
+ "and run Geo-replication create "
+ "command again.")
+ logging.error("If `gsec_create container` was used, then "
+ "run `gluster volume geo-replication "
+ "<MASTERVOL> [<SLAVEUSER>@]<SLAVEHOST>::"
+ "<SLAVEVOL> config remote-gsyncd "
+ "<GSYNCD_PATH> (Example GSYNCD_PATH: "
+ "`/usr/libexec/glusterfs/gsyncd`)")
+ rconf.transport.terminate_geterr()
+ elif isinstance(exc, OSError) and exc.errno in (ENOTCONN,
+ ECONNABORTED):
+ logging.error(lf('Gluster Mount process exited',
+ error=errorcode[exc.errno]))
+ elif isinstance(exc, OSError) and exc.errno == EIO:
+ logging.error("Getting \"Input/Output error\" "
+ "is most likely due to "
+ "a. Brick is down or "
+ "b. Split brain issue.")
+ logging.error("This is expected as per design to "
+ "keep the consistency of the file system. "
+ "Once the above issue is resolved "
+ "geo-replication would automatically "
+ "proceed further.")
+ logtag = "FAIL"
+ else:
+ logtag = "FAIL"
+ if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG):
+ logtag = "FULL EXCEPTION TRACE"
+ if logtag:
+ logging.exception(logtag + ": ")
+ sys.stderr.write("failed with %s: %s.\n" % (type(exc).__name__, exc))
+ excont.exval = 1
+ sys.exit(excont.exval)
+
+
+class FreeObject(object):
+
+ """wildcard class for which any attribute can be set"""
+
+ def __init__(self, **kw):
+ for k, v in kw.items():
+ setattr(self, k, v)
+
+
+class Thread(baseThread):
+
+ """thread class flavor for gsyncd
+
+ - always a daemon thread
+ - force exit for whole program if thread
+ function coughs up an exception
+ """
+
+ def __init__(self, *args, **kwargs):
+ tf = kwargs.get('target')
+ if tf:
+ def twrap(*aargs):
+ excont = FreeObject(exval=0)
+ try:
+ tf(*aargs)
+ except:
+ try:
+ log_raise_exception(excont)
+ finally:
+ finalize(exval=excont.exval)
+ kwargs['target'] = twrap
+ baseThread.__init__(self, *args, **kwargs)
+ self.setDaemon(True)
+
+
+class GsyncdError(Exception):
+ pass
+
+
+class _MetaXattr(object):
+
+ """singleton class, a lazy wrapper around the
+ libcxattr module
+
+ libcxattr (a heavy import due to ctypes) is
+ loaded only when when the single
+ instance is tried to be used.
+
+ This reduces runtime for those invocations
+ which do not need filesystem manipulation
+ (eg. for config, url parsing)
+ """
+
+ def __getattr__(self, meth):
+ from libcxattr import Xattr as LXattr
+ xmeth = [m for m in dir(LXattr) if m[0] != '_']
+ if meth not in xmeth:
+ return
+ for m in xmeth:
+ setattr(self, m, getattr(LXattr, m))
+ return getattr(self, meth)
+
+
+Xattr = _MetaXattr()
+
+
+def getusername(uid=None):
+ if uid is None:
+ uid = os.geteuid()
+ return pwd.getpwuid(uid).pw_name
+
+
+def privileged():
+ return os.geteuid() == 0
+
+
+def boolify(s):
+ """
+ Generic string to boolean converter
+
+ return
+ - Quick return if string 's' is of type bool
+ - True if it's in true_list
+ - False if it's in false_list
+ - Warn if it's not present in either and return False
+ """
+ true_list = ['true', 'yes', '1', 'on']
+ false_list = ['false', 'no', '0', 'off']
+
+ if isinstance(s, bool):
+ return s
+
+ rv = False
+ lstr = s.lower()
+ if lstr in true_list:
+ rv = True
+ elif lstr not in false_list:
+ logging.warn(lf("Unknown string in \"string to boolean\" conversion, "
+ "defaulting to False",
+ str=s))
+
+ return rv
+
+
+def eintr_wrap(func, exc, *args):
+ """
+ wrapper around syscalls resilient to interrupt caused
+ by signals
+ """
+ while True:
+ try:
+ return func(*args)
+ except exc:
+ ex = sys.exc_info()[1]
+ if not ex.args[0] == EINTR:
+ raise
+
+
+def select(*args):
+ return eintr_wrap(oselect.select, oselect.error, *args)
+
+
+def waitpid(*args):
+ return eintr_wrap(owaitpid, OSError, *args)
+
+
+def term_handler_default_hook(signum, frame):
+ finalize(signum, frame, exval=1)
+
+
+def set_term_handler(hook=term_handler_default_hook):
+ signal(SIGTERM, hook)
+
+
+def get_node_uuid():
+ global NodeID
+ if NodeID is not None:
+ return NodeID
+
+ NodeID = ""
+ with open(UUID_FILE) as f:
+ for line in f:
+ if line.startswith("UUID="):
+ NodeID = line.strip().split("=")[-1]
+ break
+
+ if NodeID == "":
+ raise GsyncdError("Failed to get Host UUID from %s" % UUID_FILE)
+ return NodeID
+
+
+def is_host_local(host_id):
+ return host_id == get_node_uuid()
+
+
+def funcode(f):
+ fc = getattr(f, 'func_code', None)
+ if not fc:
+ # python 3
+ fc = f.__code__
+ return fc
+
+
+def memoize(f):
+ fc = funcode(f)
+ fn = fc.co_name
+
+ def ff(self, *a, **kw):
+ rv = getattr(self, '_' + fn, None)
+ if rv is None:
+ rv = f(self, *a, **kw)
+ setattr(self, '_' + fn, rv)
+ return rv
+ return ff
+
+
+def umask():
+ return os.umask(0)
+
+
+def entry2pb(e):
+ return e.rsplit('/', 1)
+
+
+def gauxpfx():
+ return _CL_AUX_GFID_PFX
+
+
+def sha256hex(s):
+ return sha256(s).hexdigest()
+
+
+def selfkill(sig=SIGTERM):
+ os.kill(os.getpid(), sig)
+
+
+def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]):
+ """ wrapper around calls resilient to errnos.
+ """
+ nr_tries = 0
+ while True:
+ try:
+ return call(*arg)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in errnos:
+ return ex.errno
+ if ex.errno not in retry_errnos:
+ raise
+ nr_tries += 1
+ if nr_tries == GF_OP_RETRIES:
+ # probably a screwed state, cannot do much...
+ logging.warn(lf('reached maximum retries',
+ args=repr(arg),
+ error=ex))
+ raise
+ time.sleep(0.250) # retry the call
+
+
+def lstat(e):
+ return errno_wrap(os.lstat, [e], [ENOENT], [ESTALE, EBUSY])
+
+def get_gfid_from_mnt(gfidpath):
+ return errno_wrap(Xattr.lgetxattr,
+ [gfidpath, 'glusterfs.gfid.string',
+ GX_GFID_CANONICAL_LEN], [ENOENT], [ESTALE])
+
+
+def matching_disk_gfid(gfid, entry):
+ disk_gfid = get_gfid_from_mnt(entry)
+ if isinstance(disk_gfid, int):
+ return False
+
+ if not gfid == disk_gfid:
+ return False
+
+ return True
+
+
+class NoStimeAvailable(Exception):
+ pass
+
+
+class PartialHistoryAvailable(Exception):
+ pass
+
+
+class ChangelogHistoryNotAvailable(Exception):
+ pass
+
+
+class ChangelogException(OSError):
+ pass
+
+
+def gf_event(event_type, **kwargs):
+ if EVENTS_ENABLED:
+ from gfevents.gf_event import gf_event as gfevent
+ gfevent(event_type, **kwargs)
+
+
+class GlusterLogLevel(object):
+ NONE = 0
+ EMERG = 1
+ ALERT = 2
+ CRITICAL = 3
+ ERROR = 4
+ WARNING = 5
+ NOTICE = 6
+ INFO = 7
+ DEBUG = 8
+ TRACE = 9
+
+
+def get_changelog_log_level(lvl):
+ return getattr(GlusterLogLevel, lvl, GlusterLogLevel.INFO)
+
+
+def get_master_and_slave_data_from_args(args):
+ master_name = None
+ slave_data = None
+ for arg in args:
+ if arg.startswith(":"):
+ master_name = arg.replace(":", "")
+ if "::" in arg:
+ slave_data = arg.replace("ssh://", "")
+
+ return (master_name, slave_data)
+
+def unshare_propagation_supported():
+ global unshare_mnt_propagation
+ if unshare_mnt_propagation is not None:
+ return unshare_mnt_propagation
+
+ unshare_mnt_propagation = False
+ p = subprocess.Popen(["unshare", "--help"],
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ universal_newlines=True)
+ out, err = p.communicate()
+ if p.returncode == 0:
+ if "propagation" in out:
+ unshare_mnt_propagation = True
+
+ return unshare_mnt_propagation
+
+
+def get_rsync_version(rsync_cmd):
+ global rsync_version
+ if rsync_version is not None:
+ return rsync_version
+
+ rsync_version = "0"
+ p = subprocess.Popen([rsync_cmd, "--version"],
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ universal_newlines=True)
+ out, err = p.communicate()
+ if p.returncode == 0:
+ rsync_version = out.split(" ", 4)[3]
+
+ return rsync_version
+
+
+def get_slv_dir_path(slv_host, slv_volume, gfid):
+ global slv_bricks
+
+ dir_path = ENOENT
+ pfx = gauxpfx()
+
+ if not slv_bricks:
+ slv_info = Volinfo(slv_volume, slv_host, master=False)
+ slv_bricks = slv_info.bricks
+ # Result of readlink would be of format as below.
+ # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
+ for brick in slv_bricks:
+ dir_path = errno_wrap(os.path.join,
+ [brick['dir'],
+ ".glusterfs", gfid[0:2],
+ gfid[2:4],
+ gfid], [ENOENT], [ESTALE])
+ if dir_path != ENOENT:
+ try:
+ realpath = errno_wrap(os.readlink, [dir_path],
+ [ENOENT], [ESTALE])
+ if not isinstance(realpath, int):
+ realpath_parts = realpath.split('/')
+ pargfid = realpath_parts[-2]
+ basename = realpath_parts[-1]
+ dir_entry = os.path.join(pfx, pargfid, basename)
+ return dir_entry
+ except OSError:
+ # .gfid/GFID
+ gfidpath = unescape_space_newline(os.path.join(pfx, gfid))
+ realpath = errno_wrap(Xattr.lgetxattr_buf,
+ [gfidpath, 'glusterfs.gfid2path'], [ENOENT], [ESTALE])
+ if not isinstance(realpath, int):
+ basename = os.path.basename(realpath).rstrip('\x00')
+ dirpath = os.path.dirname(realpath)
+ if dirpath == "/":
+ pargfid = ROOT_GFID
+ else:
+ dirpath = dirpath.strip("/")
+ pargfid = get_gfid_from_mnt(dirpath)
+ if isinstance(pargfid, int):
+ return None
+ dir_entry = os.path.join(pfx, pargfid, basename)
+ return dir_entry
+
+ return None
+
+
+def lf(event, **kwargs):
+ """
+ Log Format helper function, log messages can be
+ easily modified to structured log format.
+ lf("Config Change", sync_jobs=4, brick=/bricks/b1) will be
+ converted as "Config Change [{brick=/bricks/b1}, {sync_jobs=4}]"
+ """
+ msgparts = []
+ for k, v in kwargs.items():
+ msgparts.append("{%s=%s}" % (k, v))
+ return "%s [%s]" % (event, ", ".join(msgparts))
+
+
+class Popen(subprocess.Popen):
+
+ """customized subclass of subprocess.Popen with a ring
+ buffer for children error output"""
+
+ @classmethod
+ def init_errhandler(cls):
+ """start the thread which handles children's error output"""
+ cls.errstore = {}
+
+ def tailer():
+ while True:
+ errstore = cls.errstore.copy()
+ try:
+ poe, _, _ = select(
+ [po.stderr for po in errstore], [], [], 1)
+ except (ValueError, SelectError):
+ # stderr is already closed wait for some time before
+ # checking next error
+ time.sleep(0.5)
+ continue
+ for po in errstore:
+ if po.stderr not in poe:
+ continue
+ po.lock.acquire()
+ try:
+ if po.on_death_row:
+ continue
+ la = errstore[po]
+ try:
+ fd = po.stderr.fileno()
+ except ValueError: # file is already closed
+ time.sleep(0.5)
+ continue
+
+ try:
+ l = os.read(fd, 1024)
+ except OSError:
+ time.sleep(0.5)
+ continue
+
+ if not l:
+ continue
+ tots = len(l)
+ for lx in la:
+ tots += len(lx)
+ while tots > 1 << 20 and la:
+ tots -= len(la.pop(0))
+ la.append(l)
+ finally:
+ po.lock.release()
+ t = Thread(target=tailer)
+ t.start()
+ cls.errhandler = t
+
+ @classmethod
+ def fork(cls):
+ """fork wrapper that restarts errhandler thread in child"""
+ pid = os.fork()
+ if not pid:
+ cls.init_errhandler()
+ return pid
+
+ def __init__(self, args, *a, **kw):
+ """customizations for subprocess.Popen instantiation
+
+ - 'close_fds' is taken to be the default
+ - if child's stderr is chosen to be managed,
+ register it with the error handler thread
+ """
+ self.args = args
+ if 'close_fds' not in kw:
+ kw['close_fds'] = True
+ self.lock = threading.Lock()
+ self.on_death_row = False
+ self.elines = []
+ try:
+ sup(self, args, *a, **kw)
+ except:
+ ex = sys.exc_info()[1]
+ if not isinstance(ex, OSError):
+ raise
+ raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
+ (args[0], errno.errorcode[ex.errno],
+ os.strerror(ex.errno)))
+ if kw.get('stderr') == subprocess.PIPE:
+ assert(getattr(self, 'errhandler', None))
+ self.errstore[self] = []
+
+ def errlog(self):
+ """make a log about child's failure event"""
+ logging.error(lf("command returned error",
+ cmd=" ".join(self.args),
+ error=self.returncode))
+ lp = ''
+
+ def logerr(l):
+ logging.error(self.args[0] + "> " + l)
+ for l in self.elines:
+ ls = l.split('\n')
+ ls[0] = lp + ls[0]
+ lp = ls.pop()
+ for ll in ls:
+ logerr(ll)
+ if lp:
+ logerr(lp)
+
+ def errfail(self):
+ """fail nicely if child did not terminate with success"""
+ self.errlog()
+ finalize(exval=1)
+
+ def terminate_geterr(self, fail_on_err=True):
+ """kill child, finalize stderr harvesting (unregister
+ from errhandler, set up .elines), fail on error if
+ asked for
+ """
+ self.lock.acquire()
+ try:
+ self.on_death_row = True
+ finally:
+ self.lock.release()
+ elines = self.errstore.pop(self)
+ if self.poll() is None:
+ self.terminate()
+ if self.poll() is None:
+ time.sleep(0.1)
+ self.kill()
+ self.wait()
+ while True:
+ if not select([self.stderr], [], [], 0.1)[0]:
+ break
+ b = os.read(self.stderr.fileno(), 1024)
+ if b:
+ elines.append(b.decode())
+ else:
+ break
+ self.stderr.close()
+ self.elines = elines
+ if fail_on_err and self.returncode != 0:
+ self.errfail()
+
+
+def host_brick_split(value):
+ """
+ IPv6 compatible way to split and get the host
+ and brick information. Example inputs:
+ node1.example.com:/exports/bricks/brick1/brick
+ fe80::af0f:df82:844f:ef66%utun0:/exports/bricks/brick1/brick
+ """
+ parts = value.split(":")
+ brick = parts[-1]
+ hostparts = parts[0:-1]
+ return (":".join(hostparts), brick)
+
+
+class Volinfo(object):
+
+ def __init__(self, vol, host='localhost', prelude=[], master=True):
+ if master:
+ gluster_cmd_dir = gconf.get("gluster-command-dir")
+ else:
+ gluster_cmd_dir = gconf.get("slave-gluster-command-dir")
+
+ gluster_cmd = os.path.join(gluster_cmd_dir, 'gluster')
+ po = Popen(prelude + [gluster_cmd, '--xml', '--remote-host=' + host,
+ 'volume', 'info', vol],
+ stdout=PIPE, stderr=PIPE, universal_newlines=True)
+ vix = po.stdout.read()
+ po.wait()
+ po.terminate_geterr()
+ vi = XET.fromstring(vix)
+ if vi.find('opRet').text != '0':
+ if prelude:
+ via = '(via %s) ' % prelude.join(' ')
+ else:
+ via = ' '
+ raise GsyncdError('getting volume info of %s%s '
+ 'failed with errorcode %s' %
+ (vol, via, vi.find('opErrno').text))
+ self.tree = vi
+ self.volume = vol
+ self.host = host
+
+ def get(self, elem):
+ return self.tree.findall('.//' + elem)
+
+ def is_tier(self):
+ return (self.get('typeStr')[0].text == 'Tier')
+
+ def is_hot(self, brickpath):
+ logging.debug('brickpath: ' + repr(brickpath))
+ return brickpath in self.hot_bricks
+
+ @property
+ @memoize
+ def bricks(self):
+ def bparse(b):
+ host, dirp = host_brick_split(b.find("name").text)
+ return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
+ return [bparse(b) for b in self.get('brick')]
+
+ @property
+ @memoize
+ def uuid(self):
+ ids = self.get('id')
+ if len(ids) != 1:
+ raise GsyncdError("volume info of %s obtained from %s: "
+ "ambiguous uuid" % (self.volume, self.host))
+ return ids[0].text
+
+ def replica_count(self, tier, hot):
+ if (tier and hot):
+ return int(self.get('hotBricks/hotreplicaCount')[0].text)
+ elif (tier and not hot):
+ return int(self.get('coldBricks/coldreplicaCount')[0].text)
+ else:
+ return int(self.get('replicaCount')[0].text)
+
+ def disperse_count(self, tier, hot):
+ if (tier and hot):
+ # Tiering doesn't support disperse volume as hot brick,
+ # hence no xml output, so returning 0. In case, if it's
+ # supported later, we should change here.
+ return 0
+ elif (tier and not hot):
+ return int(self.get('coldBricks/colddisperseCount')[0].text)
+ else:
+ return int(self.get('disperseCount')[0].text)
+
+ def distribution_count(self, tier, hot):
+ if (tier and hot):
+ return int(self.get('hotBricks/hotdistCount')[0].text)
+ elif (tier and not hot):
+ return int(self.get('coldBricks/colddistCount')[0].text)
+ else:
+ return int(self.get('distCount')[0].text)
+
+ @property
+ @memoize
+ def hot_bricks(self):
+ return [b.text for b in self.get('hotBricks/brick')]
+
+ def get_hot_bricks_count(self, tier):
+ if (tier):
+ return int(self.get('hotBricks/hotbrickCount')[0].text)
+ else:
+ return 0
+
+
+class VolinfoFromGconf(object):
+ # Glusterd will generate following config items before Geo-rep start
+ # So that Geo-rep need not run gluster commands from inside
+ # Volinfo object API/interface kept as is so that caller need not
+ # change anything except calling this instead of Volinfo()
+ #
+ # master-bricks=
+ # master-bricks=NODEID:HOSTNAME:PATH,..
+ # slave-bricks=NODEID:HOSTNAME,..
+ # master-volume-id=
+ # slave-volume-id=
+ # master-replica-count=
+ # master-disperse_count=
+ def __init__(self, vol, host='localhost', master=True):
+ self.volume = vol
+ self.host = host
+ self.master = master
+
+ def is_tier(self):
+ return False
+
+ def is_hot(self, brickpath):
+ return False
+
+ def is_uuid(self, value):
+ try:
+ uuid.UUID(value)
+ return True
+ except ValueError:
+ return False
+
+ def possible_path(self, value):
+ return "/" in value
+
+ @property
+ @memoize
+ def bricks(self):
+ pfx = "master-" if self.master else "slave-"
+ bricks_data = gconf.get(pfx + "bricks")
+ if bricks_data is None:
+ return []
+
+ bricks_data = bricks_data.split(",")
+ bricks_data = [b.strip() for b in bricks_data]
+ out = []
+ for b in bricks_data:
+ parts = b.split(":")
+ b_uuid = None
+ if self.is_uuid(parts[0]):
+ b_uuid = parts[0]
+ # Set all parts except first
+ parts = parts[1:]
+
+ if self.possible_path(parts[-1]):
+ bpath = parts[-1]
+ # Set all parts except last
+ parts = parts[0:-1]
+
+ out.append({
+ "host": ":".join(parts), # if remaining parts are IPv6 name
+ "dir": bpath,
+ "uuid": b_uuid
+ })
+
+ return out
+
+ @property
+ @memoize
+ def uuid(self):
+ if self.master:
+ return gconf.get("master-volume-id")
+ else:
+ return gconf.get("slave-volume-id")
+
+ def replica_count(self, tier, hot):
+ return gconf.get("master-replica-count")
+
+ def disperse_count(self, tier, hot):
+ return gconf.get("master-disperse-count")
+
+ def distribution_count(self, tier, hot):
+ return gconf.get("master-distribution-count")
+
+ @property
+ @memoize
+ def hot_bricks(self):
+ return []
+
+ def get_hot_bricks_count(self, tier):
+ return 0
+
+
+def can_ssh(host, port=22):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ s.connect((host, port))
+ flag = True
+ except socket.error:
+ flag = False
+
+ s.close()
+ return flag
+
+
+def get_up_nodes(hosts, port):
+ # List of hosts with Hostname/IP and UUID
+ up_nodes = []
+ for h in hosts:
+ if can_ssh(h[0], port):
+ up_nodes.append(h)
+
+ return up_nodes