summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon
diff options
context:
space:
mode:
authorAvra Sengupta <asengupt@redhat.com>2013-05-27 22:23:57 +0530
committerVijay Bellur <vbellur@redhat.com>2013-07-22 01:53:03 -0700
commit950371be29d029179ac5cd0ad2dfdbfcd4467b96 (patch)
treea979d3c710c25074088bd05cef5b6a0ede9505a9 /geo-replication/syncdaemon
parent11f6c56f83b977a08f9d74563249cef59e22a05d (diff)
move 'xlators/marker/utils/' to 'geo-replication/' directory
Change-Id: Ibd0faefecc15b6713eda28bc96794ae58aff45aa BUG: 847839 Original Author: Amar Tumballi <amarts@redhat.com> Signed-off-by: Avra Sengupta <asengupt@redhat.com> Reviewed-on: http://review.gluster.org/5133 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r--geo-replication/syncdaemon/Makefile.am7
-rw-r--r--geo-replication/syncdaemon/README.md81
-rw-r--r--geo-replication/syncdaemon/__codecheck.py46
-rw-r--r--geo-replication/syncdaemon/__init__.py0
-rw-r--r--geo-replication/syncdaemon/configinterface.py224
-rw-r--r--geo-replication/syncdaemon/gconf.py20
-rw-r--r--geo-replication/syncdaemon/gsyncd.py419
-rw-r--r--geo-replication/syncdaemon/libcxattr.py72
-rw-r--r--geo-replication/syncdaemon/master.py961
-rw-r--r--geo-replication/syncdaemon/monitor.py129
-rw-r--r--geo-replication/syncdaemon/repce.py225
-rw-r--r--geo-replication/syncdaemon/resource.py972
-rw-r--r--geo-replication/syncdaemon/syncdutils.py288
13 files changed, 3444 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am
new file mode 100644
index 00000000000..c19f6b45919
--- /dev/null
+++ b/geo-replication/syncdaemon/Makefile.am
@@ -0,0 +1,7 @@
+syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon
+
+syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \
+ resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \
+ $(top_builddir)/contrib/ipaddr-py/ipaddr.py
+
+CLEANFILES =
diff --git a/geo-replication/syncdaemon/README.md b/geo-replication/syncdaemon/README.md
new file mode 100644
index 00000000000..d45006932d1
--- /dev/null
+++ b/geo-replication/syncdaemon/README.md
@@ -0,0 +1,81 @@
+gsycnd, the Gluster Syncdaemon
+==============================
+
+REQUIREMENTS
+------------
+
+_gsyncd_ is a program which can operate either in _master_ or in _slave_ mode.
+Requirements are categorized according to this.
+
+* supported OS is GNU/Linux
+* Python >= 2.5, or 2.4 with Ctypes (see below) (both)
+* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave)
+* rsync (both)
+* glusterfs with marker support (master); glusterfs (optional on slave)
+* FUSE; for supported versions consult glusterfs
+
+INSTALLATION
+------------
+
+As of now, the supported way of operation is running from the source directory.
+
+If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/).
+
+CONFIGURATION
+-------------
+
+gsyncd tunables are a subset of the long command-line options; for listing them,
+type
+
+ gsyncd.py --help
+
+and see the long options up to "--config-file". (The leading double dash should be omitted;
+interim underscores and dashes are interchangeable.) The set of options bear some resemblance
+to those of glusterfs and rsync.
+
+The config file format matches the following syntax:
+
+ <option1>: <value1>
+ <option2>: <value2>
+ # comment
+
+By default (unless specified by the option `-c`), gsyncd looks for config file at _conf/gsyncd.conf_
+in the source tree.
+
+USAGE
+-----
+
+gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally.
+Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors
+for it with gysncd:
+
+1. _/data/mirror_
+2. local gluster volume _yow_
+3. _/data/far_mirror_ at example.com
+4. gluster volume _moz_ at example.com
+
+The respective gsyncd invocations are (demoing some syntax sugaring):
+
+1.
+
+ gsyncd.py gluster://localhost:pop file:///data/mirror
+
+ or short form
+
+ gsyncd.py :pop /data/mirror
+
+2. `gsyncd :pop :yow`
+3.
+
+ gsyncd.py :pop ssh://example.com:/data/far_mirror
+
+ or short form
+
+ gsyncd.py :pop example.com:/data/far_mirror
+
+4. `gsyncd.py :pop example.com::moz`
+
+gsyncd has to be available on both sides; it's location on the remote side has to be specified
+via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be
+used for setting options on the remote side, although the suggested mode of operation is to
+set parameters like log file / pid file in the configuration file.)
diff --git a/geo-replication/syncdaemon/__codecheck.py b/geo-replication/syncdaemon/__codecheck.py
new file mode 100644
index 00000000000..e3386afba8b
--- /dev/null
+++ b/geo-replication/syncdaemon/__codecheck.py
@@ -0,0 +1,46 @@
+import os
+import os.path
+import sys
+import tempfile
+import shutil
+
+ipd = tempfile.mkdtemp(prefix = 'codecheck-aux')
+
+try:
+ # add a fake ipaddr module, we don't want to
+ # deal with the real one (just test our code)
+ f = open(os.path.join(ipd, 'ipaddr.py'), 'w')
+ f.write("""
+class IPAddress(object):
+ pass
+class IPNetwork(list):
+ pass
+""")
+ f.close()
+ sys.path.append(ipd)
+
+ fl = os.listdir(os.path.dirname(sys.argv[0]) or '.')
+ fl.sort()
+ for f in fl:
+ if f[-3:] != '.py' or f[0] == '_':
+ continue
+ m = f[:-3]
+ sys.stdout.write('importing %s ...' % m)
+ __import__(m)
+ print(' OK.')
+
+ def sys_argv_set(a):
+ sys.argv = sys.argv[:1] + a
+
+ gsyncd = sys.modules['gsyncd']
+ for a in [['--help'], ['--version'], ['--canonicalize-escape-url', '/foo']]:
+ print('>>> invoking program with args: %s' % ' '.join(a))
+ pid = os.fork()
+ if not pid:
+ sys_argv_set(a)
+ gsyncd.main()
+ _, r = os.waitpid(pid, 0)
+ if r:
+ raise RuntimeError('invocation failed')
+finally:
+ shutil.rmtree(ipd)
diff --git a/geo-replication/syncdaemon/__init__.py b/geo-replication/syncdaemon/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/geo-replication/syncdaemon/__init__.py
diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py
new file mode 100644
index 00000000000..e55bec519e9
--- /dev/null
+++ b/geo-replication/syncdaemon/configinterface.py
@@ -0,0 +1,224 @@
+try:
+ import ConfigParser
+except ImportError:
+ # py 3
+ import configparser as ConfigParser
+import re
+from string import Template
+
+from syncdutils import escape, unescape, norm, update_file, GsyncdError
+
+SECT_ORD = '__section_order__'
+SECT_META = '__meta__'
+config_version = 2.0
+
+re_type = type(re.compile(''))
+
+
+class MultiDict(object):
+ """a virtual dict-like class which functions as the union of underlying dicts"""
+
+ def __init__(self, *dd):
+ self.dicts = dd
+
+ def __getitem__(self, key):
+ val = None
+ for d in self.dicts:
+ if d.get(key):
+ val = d[key]
+ if not val:
+ raise KeyError(key)
+ return val
+
+
+class GConffile(object):
+ """A high-level interface to ConfigParser which flattens the two-tiered
+ config layout by implenting automatic section dispatch based on initial
+ parameters.
+
+ Also ensure section ordering in terms of their time of addition -- a compat
+ hack for Python < 2.7.
+ """
+
+ def _normconfig(self):
+ """normalize config keys by s/-/_/g"""
+ for n, s in self.config._sections.items():
+ if n.find('__') == 0:
+ continue
+ s2 = type(s)()
+ for k, v in s.items():
+ if k.find('__') != 0:
+ k = norm(k)
+ s2[k] = v
+ self.config._sections[n] = s2
+
+ def __init__(self, path, peers, *dd):
+ """
+ - .path: location of config file
+ - .config: underlying ConfigParser instance
+ - .peers: on behalf of whom we flatten .config
+ (master, or master-slave url pair)
+ - .auxdicts: template subtituents
+ """
+ self.peers = peers
+ self.path = path
+ self.auxdicts = dd
+ self.config = ConfigParser.RawConfigParser()
+ self.config.read(path)
+ self._normconfig()
+
+ def section(self, rx=False):
+ """get the section name of the section representing .peers in .config"""
+ peers = self.peers
+ if not peers:
+ peers = ['.', '.']
+ rx = True
+ if rx:
+ st = 'peersrx'
+ else:
+ st = 'peers'
+ return ' '.join([st] + [escape(u) for u in peers])
+
+ @staticmethod
+ def parse_section(section):
+ """retrieve peers sequence encoded by section name
+ (as urls or regexen, depending on section type)
+ """
+ sl = section.split()
+ st = sl.pop(0)
+ sl = [unescape(u) for u in sl]
+ if st == 'peersrx':
+ sl = [re.compile(u) for u in sl]
+ return sl
+
+ def ord_sections(self):
+ """Return an ordered list of sections.
+
+ Ordering happens based on the auxiliary
+ SECT_ORD section storing indices for each
+ section added through the config API.
+
+ To not to go corrupt in case of manually
+ written config files, we take care to append
+ also those sections which are not registered
+ in SECT_ORD.
+
+ Needed for python 2.{4,5,6} where ConfigParser
+ cannot yet order sections/options internally.
+ """
+ so = {}
+ if self.config.has_section(SECT_ORD):
+ so = self.config._sections[SECT_ORD]
+ so2 = {}
+ for k, v in so.items():
+ if k != '__name__':
+ so2[k] = int(v)
+ tv = 0
+ if so2:
+ tv = max(so2.values()) + 1
+ ss = [s for s in self.config.sections() if s.find('__') != 0]
+ for s in ss:
+ if s in so.keys():
+ continue
+ so2[s] = tv
+ tv += 1
+ def scmp(x, y):
+ return cmp(*(so2[s] for s in (x, y)))
+ ss.sort(scmp)
+ return ss
+
+ def update_to(self, dct, allow_unresolved=False):
+ """update @dct from key/values of ours.
+
+ key/values are collected from .config by filtering the regexp sections
+ according to match, and from .section. The values are treated as templates,
+ which are substituted from .auxdicts and (in case of regexp sections)
+ match groups.
+ """
+ if not self.peers:
+ raise GsyncdError('no peers given, cannot select matching options')
+ def update_from_sect(sect, mud):
+ for k, v in self.config._sections[sect].items():
+ if k == '__name__':
+ continue
+ if allow_unresolved:
+ dct[k] = Template(v).safe_substitute(mud)
+ else:
+ dct[k] = Template(v).substitute(mud)
+ for sect in self.ord_sections():
+ sp = self.parse_section(sect)
+ if isinstance(sp[0], re_type) and len(sp) == len(self.peers):
+ match = True
+ mad = {}
+ for i in range(len(sp)):
+ m = sp[i].search(self.peers[i])
+ if not m:
+ match = False
+ break
+ for j in range(len(m.groups())):
+ mad['match%d_%d' % (i+1, j+1)] = m.groups()[j]
+ if match:
+ update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts))
+ if self.config.has_section(self.section()):
+ update_from_sect(self.section(), MultiDict(dct, *self.auxdicts))
+
+ def get(self, opt=None):
+ """print the matching key/value pairs from .config,
+ or if @opt given, the value for @opt (according to the
+ logic described in .update_to)
+ """
+ d = {}
+ self.update_to(d, allow_unresolved = True)
+ if opt:
+ opt = norm(opt)
+ v = d.get(opt)
+ if v:
+ print(v)
+ else:
+ for k, v in d.iteritems():
+ if k == '__name__':
+ continue
+ print("%s: %s" % (k, v))
+
+ def write(self, trfn, opt, *a, **kw):
+ """update on-disk config transactionally
+
+ @trfn is the transaction function
+ """
+ def mergeconf(f):
+ self.config = ConfigParser.RawConfigParser()
+ self.config.readfp(f)
+ self._normconfig()
+ if not self.config.has_section(SECT_META):
+ self.config.add_section(SECT_META)
+ self.config.set(SECT_META, 'version', config_version)
+ return trfn(norm(opt), *a, **kw)
+ def updateconf(f):
+ self.config.write(f)
+ update_file(self.path, updateconf, mergeconf)
+
+ def _set(self, opt, val, rx=False):
+ """set @opt to @val in .section"""
+ sect = self.section(rx)
+ if not self.config.has_section(sect):
+ self.config.add_section(sect)
+ # regarding SECT_ORD, cf. ord_sections
+ if not self.config.has_section(SECT_ORD):
+ self.config.add_section(SECT_ORD)
+ self.config.set(SECT_ORD, sect, len(self.config._sections[SECT_ORD]))
+ self.config.set(sect, opt, val)
+ return True
+
+ def set(self, opt, *a, **kw):
+ """perform ._set transactionally"""
+ self.write(self._set, opt, *a, **kw)
+
+ def _delete(self, opt, rx=False):
+ """delete @opt from .section"""
+ sect = self.section(rx)
+ if self.config.has_section(sect):
+ return self.config.remove_option(sect, opt)
+
+ def delete(self, opt, *a, **kw):
+ """perform ._delete transactionally"""
+ self.write(self._delete, opt, *a, **kw)
diff --git a/geo-replication/syncdaemon/gconf.py b/geo-replication/syncdaemon/gconf.py
new file mode 100644
index 00000000000..146c72a1825
--- /dev/null
+++ b/geo-replication/syncdaemon/gconf.py
@@ -0,0 +1,20 @@
+import os
+
+class GConf(object):
+ """singleton class to store globals
+ shared between gsyncd modules"""
+
+ ssh_ctl_dir = None
+ ssh_ctl_args = None
+ cpid = None
+ pid_file_owned = False
+ log_exit = False
+ permanent_handles = []
+ log_metadata = {}
+
+ @classmethod
+ def setup_ssh_ctl(cls, ctld):
+ cls.ssh_ctl_dir = ctld
+ cls.ssh_ctl_args = ["-oControlMaster=auto", "-S", os.path.join(ctld, "gsycnd-ssh-%r@%h:%p")]
+
+gconf = GConf()
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
new file mode 100644
index 00000000000..387900e6ce8
--- /dev/null
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -0,0 +1,419 @@
+#!/usr/bin/env python
+
+import os
+import os.path
+import sys
+import time
+import logging
+import signal
+import optparse
+import fcntl
+import fnmatch
+from optparse import OptionParser, SUPPRESS_HELP
+from logging import Logger
+from errno import EEXIST, ENOENT
+
+from ipaddr import IPAddress, IPNetwork
+
+from gconf import gconf
+from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception
+from syncdutils import GsyncdError, select, set_term_handler, privileged
+from configinterface import GConffile
+import resource
+from monitor import monitor
+
+class GLogger(Logger):
+ """Logger customizations for gsyncd.
+
+ It implements a log format similar to that of glusterfs.
+ """
+
+ def makeRecord(self, name, level, *a):
+ rv = Logger.makeRecord(self, name, level, *a)
+ rv.nsecs = (rv.created - int(rv.created)) * 1000000
+ fr = sys._getframe(4)
+ callee = fr.f_locals.get('self')
+ if callee:
+ ctx = str(type(callee)).split("'")[1].split('.')[-1]
+ else:
+ ctx = '<top>'
+ if not hasattr(rv, 'funcName'):
+ rv.funcName = fr.f_code.co_name
+ rv.lvlnam = logging.getLevelName(level)[0]
+ rv.ctx = ctx
+ return rv
+
+ @classmethod
+ def setup(cls, **kw):
+ lbl = kw.get('label', "")
+ if lbl:
+ lbl = '(' + lbl + ')'
+ lprm = {'datefmt': "%Y-%m-%d %H:%M:%S",
+ 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"}
+ lprm.update(kw)
+ lvl = kw.get('level', logging.INFO)
+ lprm['level'] = lvl
+ logging.root = cls("root", lvl)
+ logging.setLoggerClass(cls)
+ logging.getLogger().handlers = []
+ logging.basicConfig(**lprm)
+
+ @classmethod
+ def _gsyncd_loginit(cls, **kw):
+ lkw = {}
+ if gconf.log_level:
+ lkw['level'] = gconf.log_level
+ if kw.get('log_file'):
+ if kw['log_file'] in ('-', '/dev/stderr'):
+ lkw['stream'] = sys.stderr
+ elif kw['log_file'] == '/dev/stdout':
+ lkw['stream'] = sys.stdout
+ else:
+ lkw['filename'] = kw['log_file']
+
+ cls.setup(label=kw.get('label'), **lkw)
+
+ lkw.update({'saved_label': kw.get('label')})
+ gconf.log_metadata = lkw
+ gconf.log_exit = True
+
+def startup(**kw):
+ """set up logging, pidfile grabbing, daemonization"""
+ if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn':
+ if not grabpidfile():
+ sys.stderr.write("pidfile is taken, exiting.\n")
+ sys.exit(2)
+ gconf.pid_file_owned = True
+
+ if kw.get('go_daemon') == 'should':
+ x, y = os.pipe()
+ gconf.cpid = os.fork()
+ if gconf.cpid:
+ os.close(x)
+ sys.exit()
+ os.close(y)
+ os.setsid()
+ dn = os.open(os.devnull, os.O_RDWR)
+ for f in (sys.stdin, sys.stdout, sys.stderr):
+ os.dup2(dn, f.fileno())
+ if getattr(gconf, 'pid_file', None):
+ if not grabpidfile(gconf.pid_file + '.tmp'):
+ raise GsyncdError("cannot grab temporary pidfile")
+ os.rename(gconf.pid_file + '.tmp', gconf.pid_file)
+ # wait for parent to terminate
+ # so we can start up with
+ # no messing from the dirty
+ # ol' bustard
+ select((x,), (), ())
+ os.close(x)
+
+ GLogger._gsyncd_loginit(**kw)
+
+def main():
+ """main routine, signal/exception handling boilerplates"""
+ gconf.starttime = time.time()
+ set_term_handler()
+ GLogger.setup()
+ excont = FreeObject(exval = 0)
+ try:
+ try:
+ main_i()
+ except:
+ log_raise_exception(excont)
+ finally:
+ finalize(exval = excont.exval)
+
+def main_i():
+ """internal main routine
+
+ parse command line, decide what action will be taken;
+ we can either:
+ - query/manipulate configuration
+ - format gsyncd urls using gsyncd's url parsing engine
+ - start service in following modes, in given stages:
+ - monitor: startup(), monitor()
+ - master: startup(), connect_remote(), connect(), service_loop()
+ - slave: startup(), connect(), service_loop()
+ """
+ rconf = {'go_daemon': 'should'}
+
+ def store_abs(opt, optstr, val, parser):
+ if val and val != '-':
+ val = os.path.abspath(val)
+ setattr(parser.values, opt.dest, val)
+ def store_local(opt, optstr, val, parser):
+ rconf[opt.dest] = val
+ def store_local_curry(val):
+ return lambda o, oo, vx, p: store_local(o, oo, val, p)
+ def store_local_obj(op, dmake):
+ return lambda o, oo, vx, p: store_local(o, oo, FreeObject(op=op, **dmake(vx)), p)
+
+ op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1")
+ op.add_option('--gluster-command-dir', metavar='DIR', default='')
+ op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs)
+ op.add_option('--gluster-log-level', metavar='LVL')
+ op.add_option('--gluster-params', metavar='PRMS', default='')
+ op.add_option('--gluster-cli-options', metavar='OPTS', default='--log-file=-')
+ op.add_option('--mountbroker', metavar='LABEL')
+ op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs)
+ op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs)
+ op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs)
+ op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs)
+ op.add_option('--ignore-deletes', default=False, action='store_true')
+ op.add_option('--use-rsync-xattrs', default=False, action='store_true')
+ op.add_option('-L', '--log-level', metavar='LVL')
+ op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0]))
+ op.add_option('--volume-id', metavar='UUID')
+ op.add_option('--session-owner', metavar='ID')
+ op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
+ op.add_option('--rsync-command', metavar='CMD', default='rsync')
+ op.add_option('--rsync-options', metavar='OPTS', default='--sparse')
+ op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress')
+ op.add_option('--timeout', metavar='SEC', type=int, default=120)
+ op.add_option('--connection-timeout', metavar='SEC', type=int, default=60, help=SUPPRESS_HELP)
+ op.add_option('--sync-jobs', metavar='N', type=int, default=3)
+ op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
+ op.add_option('--allow-network', metavar='IPS', default='')
+ op.add_option('--socketdir', metavar='DIR')
+ op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs)
+ op.add_option('--checkpoint', metavar='LABEL', default='')
+ # tunables for failover/failback mechanism:
+ # None - gsyncd behaves as normal
+ # blind - gsyncd works with xtime pairs to identify
+ # candidates for synchronization
+ # wrapup - same as normal mode but does not assign
+ # xtimes to orphaned files
+ # see crawl() for usage of the above tunables
+ op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP)
+
+ op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
+ # duh. need to specify dest or value will be mapped to None :S
+ op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True))
+ op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local)
+ op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True))
+ op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont'))
+ op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a),
+ setattr(a[-1].values, 'log_file', '-'),
+ setattr(a[-1].values, 'log_level', 'DEBUG'))),
+
+ for a in ('check', 'get'):
+ op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback',
+ callback=store_local_obj(a, lambda vx: {'opt': vx}))
+ op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_obj('get', lambda vx: {'opt': None}))
+ for m in ('', '-rx', '-glob'):
+ # call this code 'Pythonic' eh?
+ # have to define a one-shot local function to be able to inject (a value depending on the)
+ # iteration variable into the inner lambda
+ def conf_mod_opt_regex_variant(rx):
+ op.add_option('--config-set' + m, metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback',
+ callback=store_local_obj('set', lambda vx: {'opt': vx[0], 'val': vx[1], 'rx': rx}))
+ op.add_option('--config-del' + m, metavar='OPT', type=str, dest='config', action='callback',
+ callback=store_local_obj('del', lambda vx: {'opt': vx, 'rx': rx}))
+ conf_mod_opt_regex_variant(m and m[1:] or False)
+
+ op.add_option('--normalize-url', dest='url_print', action='callback', callback=store_local_curry('normal'))
+ op.add_option('--canonicalize-url', dest='url_print', action='callback', callback=store_local_curry('canon'))
+ op.add_option('--canonicalize-escape-url', dest='url_print', action='callback', callback=store_local_curry('canon_esc'))
+
+ tunables = [ norm(o.get_opt_string()[2:]) for o in op.option_list if o.callback in (store_abs, 'store_true', None) and o.get_opt_string() not in ('--version', '--help') ]
+ remote_tunables = [ 'listen', 'go_daemon', 'timeout', 'session_owner', 'config_file', 'use_rsync_xattrs' ]
+ rq_remote_tunables = { 'listen': True }
+
+ # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults
+ # -- for this to work out we need to tell apart defaults from explicitly set
+ # options... so churn out the defaults here and call the parser with virgin
+ # values container.
+ defaults = op.get_default_values()
+ opts, args = op.parse_args(values=optparse.Values())
+ confdata = rconf.get('config')
+ if not (len(args) == 2 or \
+ (len(args) == 1 and rconf.get('listen')) or \
+ (len(args) <= 2 and confdata) or \
+ rconf.get('url_print')):
+ sys.stderr.write("error: incorrect number of arguments\n\n")
+ sys.stderr.write(op.get_usage() + "\n")
+ sys.exit(1)
+
+ restricted = os.getenv('_GSYNCD_RESTRICTED_')
+
+ if restricted:
+ allopts = {}
+ allopts.update(opts.__dict__)
+ allopts.update(rconf)
+ bannedtuns = set(allopts.keys()) - set(remote_tunables)
+ if bannedtuns:
+ raise GsyncdError('following tunables cannot be set with restricted SSH invocaton: ' + \
+ ', '.join(bannedtuns))
+ for k, v in rq_remote_tunables.items():
+ if not k in allopts or allopts[k] != v:
+ raise GsyncdError('tunable %s is not set to value %s required for restricted SSH invocaton' % \
+ (k, v))
+
+ confrx = getattr(confdata, 'rx', None)
+ if confrx:
+ # peers are regexen, don't try to parse them
+ if confrx == 'glob':
+ args = [ '\A' + fnmatch.translate(a) for a in args ]
+ canon_peers = args
+ namedict = {}
+ else:
+ rscs = [resource.parse_url(u) for u in args]
+ dc = rconf.get('url_print')
+ if dc:
+ for r in rscs:
+ print(r.get_url(**{'normal': {},
+ 'canon': {'canonical': True},
+ 'canon_esc': {'canonical': True, 'escaped': True}}[dc]))
+ return
+ local = remote = None
+ if rscs:
+ local = rscs[0]
+ if len(rscs) > 1:
+ remote = rscs[1]
+ if not local.can_connect_to(remote):
+ raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path))
+ pa = ([], [], [])
+ urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True})
+ for x in rscs:
+ for i in range(len(pa)):
+ pa[i].append(x.get_url(**urlprms[i]))
+ peers, canon_peers, canon_esc_peers = pa
+ # creating the namedict, a dict representing various ways of referring to / repreenting
+ # peers to be fillable in config templates
+ mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:])
+ if remote:
+ rmap = { local: ('local', 'master'), remote: ('remote', 'slave') }
+ else:
+ rmap = { local: ('local', 'slave') }
+ namedict = {}
+ for i in range(len(rscs)):
+ x = rscs[i]
+ for name in rmap[x]:
+ for j in range(3):
+ namedict[mods[j](name)] = pa[j][i]
+ if x.scheme == 'gluster':
+ namedict[name + 'vol'] = x.volume
+ if not 'config_file' in rconf:
+ rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf")
+ gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict)
+
+ checkpoint_change = False
+ if confdata:
+ opt_ok = norm(confdata.opt) in tunables + [None]
+ if confdata.op == 'check':
+ if opt_ok:
+ sys.exit(0)
+ else:
+ sys.exit(1)
+ elif not opt_ok:
+ raise GsyncdError("not a valid option: " + confdata.opt)
+ if confdata.op == 'get':
+ gcnf.get(confdata.opt)
+ elif confdata.op == 'set':
+ gcnf.set(confdata.opt, confdata.val, confdata.rx)
+ elif confdata.op == 'del':
+ gcnf.delete(confdata.opt, confdata.rx)
+ # when modifying checkpoint, it's important to make a log
+ # of that, so in that case we go on to set up logging even
+ # if its just config invocation
+ if confdata.opt == 'checkpoint' and confdata.op in ('set', 'del') and \
+ not confdata.rx:
+ checkpoint_change = True
+ if not checkpoint_change:
+ return
+
+ gconf.__dict__.update(defaults.__dict__)
+ gcnf.update_to(gconf.__dict__)
+ gconf.__dict__.update(opts.__dict__)
+ gconf.configinterface = gcnf
+
+ if restricted and gconf.allow_network:
+ ssh_conn = os.getenv('SSH_CONNECTION')
+ if not ssh_conn:
+ #legacy env var
+ ssh_conn = os.getenv('SSH_CLIENT')
+ if ssh_conn:
+ allowed_networks = [ IPNetwork(a) for a in gconf.allow_network.split(',') ]
+ client_ip = IPAddress(ssh_conn.split()[0])
+ allowed = False
+ for nw in allowed_networks:
+ if client_ip in nw:
+ allowed = True
+ break
+ if not allowed:
+ raise GsyncdError("client IP address is not allowed")
+
+ ffd = rconf.get('feedback_fd')
+ if ffd:
+ fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
+
+ #normalize loglevel
+ lvl0 = gconf.log_level
+ if isinstance(lvl0, str):
+ lvl1 = lvl0.upper()
+ lvl2 = logging.getLevelName(lvl1)
+ # I have _never_ _ever_ seen such an utterly braindead
+ # error condition
+ if lvl2 == "Level " + lvl1:
+ raise GsyncdError('cannot recognize log level "%s"' % lvl0)
+ gconf.log_level = lvl2
+
+ if not privileged() and gconf.log_file_mbr:
+ gconf.log_file = gconf.log_file_mbr
+
+ if checkpoint_change:
+ try:
+ GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf')
+ if confdata.op == 'set':
+ logging.info('checkpoint %s set' % confdata.val)
+ elif confdata.op == 'del':
+ logging.info('checkpoint info was reset')
+ except IOError:
+ if sys.exc_info()[1].errno == ENOENT:
+ # directory of log path is not present,
+ # which happens if we get here from
+ # a peer-multiplexed "config-set checkpoint"
+ # (as that directory is created only on the
+ # original node)
+ pass
+ else:
+ raise
+ return
+
+ go_daemon = rconf['go_daemon']
+ be_monitor = rconf.get('monitor')
+
+ if not be_monitor and isinstance(remote, resource.SSH) and \
+ go_daemon == 'should':
+ go_daemon = 'postconn'
+ log_file = None
+ else:
+ log_file = gconf.log_file
+ if be_monitor:
+ label = 'monitor'
+ elif remote:
+ #master
+ label = ''
+ else:
+ label = 'slave'
+ startup(go_daemon=go_daemon, log_file=log_file, label=label)
+
+ if be_monitor:
+ return monitor()
+
+ logging.info("syncing: %s" % " -> ".join(peers))
+ resource.Popen.init_errhandler()
+ if remote:
+ go_daemon = remote.connect_remote(go_daemon=go_daemon)
+ if go_daemon:
+ startup(go_daemon=go_daemon, log_file=gconf.log_file)
+ # complete remote connection in child
+ remote.connect_remote(go_daemon='done')
+ local.connect()
+ if ffd:
+ os.close(ffd)
+ local.service_loop(*[r for r in [remote] if r])
+
+
+if __name__ == "__main__":
+ main()
diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py
new file mode 100644
index 00000000000..f0a9d22920a
--- /dev/null
+++ b/geo-replication/syncdaemon/libcxattr.py
@@ -0,0 +1,72 @@
+import os
+from ctypes import *
+from ctypes.util import find_library
+
+class Xattr(object):
+ """singleton that wraps the extended attribues system
+ interface for python using ctypes
+
+ Just implement it to the degree we need it, in particular
+ - we need just the l*xattr variants, ie. we never want symlinks to be
+ followed
+ - don't need size discovery for getxattr, as we always know the exact
+ sizes we expect
+ """
+
+ libc = CDLL(find_library("libc"))
+
+ @classmethod
+ def geterrno(cls):
+ return c_int.in_dll(cls.libc, 'errno').value
+
+ @classmethod
+ def raise_oserr(cls):
+ errn = cls.geterrno()
+ raise OSError(errn, os.strerror(errn))
+
+ @classmethod
+ def _query_xattr(cls, path, siz, syscall, *a):
+ if siz:
+ buf = create_string_buffer('\0' * siz)
+ else:
+ buf = None
+ ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz)))
+ if ret == -1:
+ cls.raise_oserr()
+ if siz:
+ return buf.raw[:ret]
+ else:
+ return ret
+
+ @classmethod
+ def lgetxattr(cls, path, attr, siz=0):
+ return cls._query_xattr( path, siz, 'lgetxattr', attr)
+
+ @classmethod
+ def llistxattr(cls, path, siz=0):
+ ret = cls._query_xattr(path, siz, 'llistxattr')
+ if isinstance(ret, str):
+ ret = ret.split('\0')
+ return ret
+
+ @classmethod
+ def lsetxattr(cls, path, attr, val):
+ ret = cls.libc.lsetxattr(path, attr, val, len(val), 0)
+ if ret == -1:
+ cls.raise_oserr()
+
+ @classmethod
+ def lremovexattr(cls, path, attr):
+ ret = cls.libc.lremovexattr(path, attr)
+ if ret == -1:
+ cls.raise_oserr()
+
+ @classmethod
+ def llistxattr_buf(cls, path):
+ """listxattr variant with size discovery"""
+ size = cls.llistxattr(path)
+ if size == -1:
+ cls.raise_oserr()
+ if size == 0:
+ return []
+ return cls.llistxattr(path, size)
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
new file mode 100644
index 00000000000..f903f30595d
--- /dev/null
+++ b/geo-replication/syncdaemon/master.py
@@ -0,0 +1,961 @@
+import os
+import sys
+import time
+import stat
+import random
+import signal
+import logging
+import socket
+import errno
+import re
+from errno import ENOENT, ENODATA, EPIPE
+from threading import currentThread, Condition, Lock
+from datetime import datetime
+try:
+ from hashlib import md5 as md5
+except ImportError:
+ # py 2.4
+ from md5 import new as md5
+
+from gconf import gconf
+from syncdutils import FreeObject, Thread, GsyncdError, boolify, \
+ escape, unescape, select
+
+URXTIME = (-1, 0)
+
+# Utility functions to help us to get to closer proximity
+# of the DRY principle (no, don't look for elevated or
+# perspectivistic things here)
+
+def _xtime_now():
+ t = time.time()
+ sec = int(t)
+ nsec = int((t - sec) * 1000000)
+ return (sec, nsec)
+
+def _volinfo_hook_relax_foreign(self):
+ volinfo_sys = self.get_sys_volinfo()
+ fgn_vi = volinfo_sys[self.KFGN]
+ if fgn_vi:
+ expiry = fgn_vi['timeout'] - int(time.time()) + 1
+ logging.info('foreign volume info found, waiting %d sec for expiry' % \
+ expiry)
+ time.sleep(expiry)
+ volinfo_sys = self.get_sys_volinfo()
+ self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
+ volinfo_sys)
+ if self.inter_master:
+ raise GsyncdError("cannot be intermediate master in special mode")
+ return (volinfo_sys, state_change)
+
+
+# The API!
+
+def gmaster_builder():
+ """produce the GMaster class variant corresponding
+ to sync mode"""
+ this = sys.modules[__name__]
+ modemixin = gconf.special_sync_mode
+ if not modemixin:
+ modemixin = 'normal'
+ logging.info('setting up master for %s sync mode' % modemixin)
+ modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
+ sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
+ purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
+ class _GMaster(GMasterBase, modemixin, sendmarkmixin, purgemixin):
+ pass
+ return _GMaster
+
+
+# Mixin classes that implement the data format
+# and logic particularities of the certain
+# sync modes
+
+class NormalMixin(object):
+ """normal geo-rep behavior"""
+
+ minus_infinity = URXTIME
+
+ # following staticmethods ideally would be
+ # methods of an xtime object (in particular,
+ # implementing the hooks needed for comparison
+ # operators), but at this point we don't yet
+ # have a dedicated xtime class
+
+ @staticmethod
+ def serialize_xtime(xt):
+ return "%d.%d" % tuple(xt)
+
+ @staticmethod
+ def deserialize_xtime(xt):
+ return tuple(int(x) for x in xt.split("."))
+
+ @staticmethod
+ def native_xtime(xt):
+ return xt
+
+ @staticmethod
+ def xtime_geq(xt0, xt1):
+ return xt0 >= xt1
+
+ def make_xtime_opts(self, is_master, opts):
+ if not 'create' in opts:
+ opts['create'] = is_master and not self.inter_master
+ if not 'default_xtime' in opts:
+ if is_master and self.inter_master:
+ opts['default_xtime'] = ENODATA
+ else:
+ opts['default_xtime'] = URXTIME
+
+ def xtime_low(self, server, path, **opts):
+ xt = server.xtime(path, self.uuid)
+ if isinstance(xt, int) and xt != ENODATA:
+ return xt
+ if xt == ENODATA or xt < self.volmark:
+ if opts['create']:
+ xt = _xtime_now()
+ server.set_xtime(path, self.uuid, xt)
+ else:
+ xt = opts['default_xtime']
+ return xt
+
+ def keepalive_payload_hook(self, timo, gap):
+ # first grab a reference as self.volinfo
+ # can be changed in main thread
+ vi = self.volinfo
+ if vi:
+ # then have a private copy which we can mod
+ vi = vi.copy()
+ vi['timeout'] = int(time.time()) + timo
+ else:
+ # send keep-alives more frequently to
+ # avoid a delay in announcing our volume info
+ # to slave if it becomes established in the
+ # meantime
+ gap = min(10, gap)
+ return (vi, gap)
+
+ def volinfo_hook(self):
+ volinfo_sys = self.get_sys_volinfo()
+ self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
+ volinfo_sys)
+ return (volinfo_sys, state_change)
+
+ def xtime_reversion_hook(self, path, xtl, xtr):
+ if xtr > xtl:
+ raise GsyncdError("timestamp corruption for " + path)
+
+ def need_sync(self, e, xte, xtrd):
+ return xte > xtrd
+
+ def set_slave_xtime(self, path, mark):
+ self.slave.server.set_xtime(path, self.uuid, mark)
+
+class WrapupMixin(NormalMixin):
+ """a variant that differs from normal in terms
+ of ignoring non-indexed files"""
+
+ @staticmethod
+ def make_xtime_opts(is_master, opts):
+ if not 'create' in opts:
+ opts['create'] = False
+ if not 'default_xtime' in opts:
+ opts['default_xtime'] = URXTIME
+
+ @staticmethod
+ def keepalive_payload_hook(timo, gap):
+ return (None, gap)
+
+ def volinfo_hook(self):
+ return _volinfo_hook_relax_foreign(self)
+
+class BlindMixin(object):
+ """Geo-rep flavor using vectored xtime.
+
+ Coordinates are the master, slave uuid pair;
+ in master coordinate behavior is normal,
+ in slave coordinate we force synchronization
+ on any value difference (these are in disjunctive
+ relation, ie. if either orders the entry to be
+ synced, it shall be synced.
+ """
+
+ minus_infinity = (URXTIME, None)
+
+ @staticmethod
+ def serialize_xtime(xt):
+ a = []
+ for x in xt:
+ if not x:
+ x = ('None', '')
+ a.extend(x)
+ return '.'.join(str(n) for n in a)
+
+ @staticmethod
+ def deserialize_xtime(xt):
+ a = xt.split(".")
+ a = (tuple(a[0:2]), tuple(a[3:4]))
+ b = []
+ for p in a:
+ if p[0] == 'None':
+ p = None
+ else:
+ p = tuple(int(x) for x in p)
+ b.append(p)
+ return tuple(b)
+
+ @staticmethod
+ def native_xtime(xt):
+ return xt[0]
+
+ @staticmethod
+ def xtime_geq(xt0, xt1):
+ return (not xt1[0] or xt0[0] >= xt1[0]) and \
+ (not xt1[1] or xt0[1] >= xt1[1])
+
+ @property
+ def ruuid(self):
+ if self.volinfo_r:
+ return self.volinfo_r['uuid']
+
+ @staticmethod
+ def make_xtime_opts(is_master, opts):
+ if not 'create' in opts:
+ opts['create'] = is_master
+ if not 'default_xtime' in opts:
+ opts['default_xtime'] = URXTIME
+
+ def xtime_low(self, server, path, **opts):
+ xtd = server.xtime_vec(path, self.uuid, self.ruuid)
+ if isinstance(xtd, int):
+ return xtd
+ xt = (xtd[self.uuid], xtd[self.ruuid])
+ if not xt[1] and (not xt[0] or xt[0] < self.volmark):
+ if opts['create']:
+ # not expected, but can happen if file originates
+ # from interrupted gsyncd transfer
+ logging.warn('have to fix up missing xtime on ' + path)
+ xt0 = _xtime_now()
+ server.set_xtime(path, self.uuid, xt0)
+ else:
+ xt0 = opts['default_xtime']
+ xt = (xt0, xt[1])
+ return xt
+
+ @staticmethod
+ def keepalive_payload_hook(timo, gap):
+ return (None, gap)
+
+ def volinfo_hook(self):
+ res = _volinfo_hook_relax_foreign(self)
+ volinfo_r_new = self.slave.server.native_volume_info()
+ if volinfo_r_new['retval']:
+ raise GsyncdError("slave is corrupt")
+ if getattr(self, 'volinfo_r', None):
+ if self.volinfo_r['uuid'] != volinfo_r_new['uuid']:
+ raise GsyncdError("uuid mismatch on slave")
+ self.volinfo_r = volinfo_r_new
+ return res
+
+ def xtime_reversion_hook(self, path, xtl, xtr):
+ if not isinstance(xtr[0], int) and \
+ (isinstance(xtl[0], int) or xtr[0] > xtl[0]):
+ raise GsyncdError("timestamp corruption for " + path)
+
+ def need_sync(self, e, xte, xtrd):
+ if xte[0]:
+ if not xtrd[0] or xte[0] > xtrd[0]:
+ # there is outstanding diff at 0th pos,
+ # we can short-cut to true
+ return True
+ # we arrived to this point by either of these
+ # two possiblilites:
+ # - no outstanding difference at 0th pos,
+ # wanna see 1st pos if he raises veto
+ # against "no need to sync" proposal
+ # - no data at 0th pos, 1st pos will have
+ # to decide (due to xtime assignment,
+ # in this case 1st pos does carry data
+ # -- iow, if 1st pos did not have data,
+ # and 0th neither, 0th would have been
+ # force-feeded)
+ if not xte[1]:
+ # no data, no veto
+ return False
+ # the hard work: for 1st pos,
+ # the conduct is fetch corresponding
+ # slave data and do a "blind" comparison
+ # (ie. do not care who is newer, we trigger
+ # sync on non-identical xitmes)
+ xtr = self.xtime(e, self.slave)
+ return isinstance(xtr, int) or xte[1] != xtr[1]
+
+ def set_slave_xtime(self, path, mark):
+ xtd = {}
+ for (u, t) in zip((self.uuid, self.ruuid), mark):
+ if t:
+ xtd[u] = t
+ self.slave.server.set_xtime_vec(path, xtd)
+
+
+# Further mixins for certain tunable behaviors
+
+class SendmarkNormalMixin(object):
+
+ def sendmark_regular(self, *a, **kw):
+ return self.sendmark(*a, **kw)
+
+class SendmarkRsyncMixin(object):
+
+ def sendmark_regular(self, *a, **kw):
+ pass
+
+
+class PurgeNormalMixin(object):
+
+ def purge_missing(self, path, names):
+ self.slave.server.purge(path, names)
+
+class PurgeNoopMixin(object):
+
+ def purge_missing(self, path, names):
+ pass
+
+
+
+class GMasterBase(object):
+ """abstract class impementling master role"""
+
+ KFGN = 0
+ KNAT = 1
+
+ def get_sys_volinfo(self):
+ """query volume marks on fs root
+
+ err out on multiple foreign masters
+ """
+ fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \
+ self.master.server.native_volume_info()
+ fgn_vi = None
+ if fgn_vis:
+ if len(fgn_vis) > 1:
+ raise GsyncdError("cannot work with multiple foreign masters")
+ fgn_vi = fgn_vis[0]
+ return fgn_vi, nat_vi
+
+ @property
+ def uuid(self):
+ if self.volinfo:
+ return self.volinfo['uuid']
+
+ @property
+ def volmark(self):
+ if self.volinfo:
+ return self.volinfo['volume_mark']
+
+ @property
+ def inter_master(self):
+ """decide if we are an intermediate master
+ in a cascading setup
+ """
+ return self.volinfo_state[self.KFGN] and True or False
+
+ def xtime(self, path, *a, **opts):
+ """get amended xtime
+
+ as of amending, we can create missing xtime, or
+ determine a valid value if what we get is expired
+ (as of the volume mark expiry); way of amendig
+ depends on @opts and on subject of query (master
+ or slave).
+ """
+ if a:
+ rsc = a[0]
+ else:
+ rsc = self.master
+ self.make_xtime_opts(rsc == self.master, opts)
+ return self.xtime_low(rsc.server, path, **opts)
+
+ def __init__(self, master, slave):
+ self.master = master
+ self.slave = slave
+ self.jobtab = {}
+ self.syncer = Syncer(slave)
+ # crawls vs. turns:
+ # - self.crawls is simply the number of crawl() invocations on root
+ # - one turn is a maximal consecutive sequence of crawls so that each
+ # crawl in it detects a change to be synced
+ # - self.turns is the number of turns since start
+ # - self.total_turns is a limit so that if self.turns reaches it, then
+ # we exit (for diagnostic purposes)
+ # so, eg., if the master fs changes unceasingly, self.turns will remain 0.
+ self.crawls = 0
+ self.turns = 0
+ self.total_turns = int(gconf.turns)
+ self.lastreport = {'crawls': 0, 'turns': 0}
+ self.start = None
+ self.change_seen = None
+ self.syncTime=0
+ self.lastSyncTime=0
+ self.crawlStartTime=0
+ self.crawlTime=0
+ self.filesSynced=0
+ self.bytesSynced=0
+ # the authoritative (foreign, native) volinfo pair
+ # which lets us deduce what to do when we refetch
+ # the volinfos from system
+ uuid_preset = getattr(gconf, 'volume_id', None)
+ self.volinfo_state = (uuid_preset and {'uuid': uuid_preset}, None)
+ # the actual volinfo we make use of
+ self.volinfo = None
+ self.terminate = False
+ self.checkpoint_thread = None
+
+ @classmethod
+ def _checkpt_param(cls, chkpt, prm, xtimish=True):
+ """use config backend to lookup a parameter belonging to
+ checkpoint @chkpt"""
+ cprm = getattr(gconf, 'checkpoint_' + prm, None)
+ if not cprm:
+ return
+ chkpt_mapped, val = cprm.split(':', 1)
+ if unescape(chkpt_mapped) != chkpt:
+ return
+ if xtimish:
+ val = cls.deserialize_xtime(val)
+ return val
+
+ @classmethod
+ def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True):
+ """use config backend to store a parameter associated
+ with checkpoint @chkpt"""
+ if xtimish:
+ val = cls.serialize_xtime(val)
+ gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
+
+ @staticmethod
+ def humantime(*tpair):
+ """format xtime-like (sec, nsec) pair to human readable format"""
+ ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\
+ strftime("%Y-%m-%d %H:%M:%S")
+ if len(tpair) > 1:
+ ts += '.' + str(tpair[1])
+ return ts
+
+ def get_extra_info(self):
+ str_info="\nFile synced : %d" %(self.filesSynced)
+ str_info+="\nBytes Synced : %d KB" %(self.syncer.bytesSynced)
+ str_info+="\nSync Time : %f seconds" %(self.syncTime)
+ self.crawlTime=datetime.now()-self.crawlStartTime
+ years , days =divmod(self.crawlTime.days,365.25)
+ years=int(years)
+ days=int(days)
+
+ date=""
+ m, s = divmod(self.crawlTime.seconds, 60)
+ h, m = divmod(m, 60)
+
+ if years!=0 :
+ date+=str(years)+" year "
+ if days!=0 :
+ date+=str(days)+" day "
+ if h!=0 :
+ date+=str(h)+" H : "
+ if m!=0 or h!=0 :
+ date+=str(m)+" M : "
+
+ date+=str(s)+" S"
+ self.crawlTime=date
+ str_info+="\nCrawl Time : %s" %(str(self.crawlTime))
+ str_info+="\n\0"
+ return str_info
+
+ def checkpt_service(self, chan, chkpt, tgt):
+ """checkpoint service loop
+
+ monitor and verify checkpoint status for @chkpt, and listen
+ for incoming requests for whom we serve a pretty-formatted
+ status report"""
+ if not chkpt:
+ # dummy loop for the case when there is no checkpt set
+ while True:
+ select([chan], [], [])
+ conn, _ = chan.accept()
+ conn.send(self.get_extra_info())
+ conn.close()
+ completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
+ if completed:
+ completed = tuple(int(x) for x in completed.split('.'))
+ while True:
+ s,_,_ = select([chan], [], [], (not completed) and 5 or None)
+ # either request made and we re-check to not
+ # give back stale data, or we still hunting for completion
+ if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark:
+ # indexing has been reset since setting the checkpoint
+ status = "is invalid"
+ else:
+ xtr = self.xtime('.', self.slave)
+ if isinstance(xtr, int):
+ raise GsyncdError("slave root directory is unaccessible (%s)",
+ os.strerror(xtr))
+ ncompleted = self.xtime_geq(xtr, tgt)
+ if completed and not ncompleted: # stale data
+ logging.warn("completion time %s for checkpoint %s became stale" % \
+ (self.humantime(*completed), chkpt))
+ completed = None
+ gconf.confdata.delete('checkpoint-completed')
+ if ncompleted and not completed: # just reaching completion
+ completed = "%.6f" % time.time()
+ self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False)
+ completed = tuple(int(x) for x in completed.split('.'))
+ logging.info("checkpoint %s completed" % chkpt)
+ status = completed and \
+ "completed at " + self.humantime(completed[0]) or \
+ "not reached yet"
+ if s:
+ conn = None
+ try:
+ conn, _ = chan.accept()
+ try:
+ conn.send(" | checkpoint %s %s %s" % (chkpt, status,self.get_extra_info()))
+ except:
+ exc = sys.exc_info()[1]
+ if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \
+ exc.errno == EPIPE:
+ logging.debug('checkpoint client disconnected')
+ else:
+ raise
+ finally:
+ if conn:
+ conn.close()
+
+ def start_checkpoint_thread(self):
+ """prepare and start checkpoint service"""
+ if self.checkpoint_thread or not (
+ getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None)
+ ):
+ return
+ chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ state_socket = os.path.join(gconf.socketdir, md5(gconf.state_socket_unencoded).hexdigest() + ".socket")
+ try:
+ os.unlink(state_socket)
+ except:
+ if sys.exc_info()[0] == OSError:
+ pass
+ chan.bind(state_socket)
+ chan.listen(1)
+ checkpt_tgt = None
+ if gconf.checkpoint:
+ checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target')
+ if not checkpt_tgt:
+ checkpt_tgt = self.xtime('.')
+ if isinstance(checkpt_tgt, int):
+ raise GsyncdError("master root directory is unaccessible (%s)",
+ os.strerror(checkpt_tgt))
+ self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt)
+ logging.debug("checkpoint target %s has been determined for checkpoint %s" % \
+ (repr(checkpt_tgt), gconf.checkpoint))
+ t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt))
+ t.start()
+ self.checkpoint_thread = t
+
+ def crawl_loop(self):
+ """start the keep-alive thread and iterate .crawl"""
+ timo = int(gconf.timeout or 0)
+ if timo > 0:
+ def keep_alive():
+ while True:
+ vi, gap = self.keepalive_payload_hook(timo, timo * 0.5)
+ self.slave.server.keep_alive(vi)
+ time.sleep(gap)
+ t = Thread(target=keep_alive)
+ t.start()
+ self.lastreport['time'] = time.time()
+ self.crawlStartTime=datetime.now()
+ while not self.terminate:
+ self.crawl()
+
+ def add_job(self, path, label, job, *a, **kw):
+ """insert @job function to job table at @path with @label"""
+ if self.jobtab.get(path) == None:
+ self.jobtab[path] = []
+ self.jobtab[path].append((label, a, lambda : job(*a, **kw)))
+
+ def add_failjob(self, path, label):
+ """invoke .add_job with a job that does nothing just fails"""
+ logging.debug('salvaged: ' + label)
+ self.add_job(path, label, lambda: False)
+
+ def wait(self, path, *args):
+ """perform jobs registered for @path
+
+ Reset jobtab entry for @path,
+ determine success as the conjuction of
+ success of all the jobs. In case of
+ success, call .sendmark on @path
+ """
+ jobs = self.jobtab.pop(path, [])
+ succeed = True
+ for j in jobs:
+ ret = j[-1]()
+ if not ret:
+ succeed = False
+ if succeed:
+ self.sendmark(path, *args)
+ return succeed
+
+ def sendmark(self, path, mark, adct=None):
+ """update slave side xtime for @path to master side xtime
+
+ also can send a setattr payload (see Server.setattr).
+ """
+ if adct:
+ self.slave.server.setattr(path, adct)
+ self.set_slave_xtime(path, mark)
+
+ @staticmethod
+ def volinfo_state_machine(volinfo_state, volinfo_sys):
+ """compute new volinfo_state from old one and incoming
+ as of current system state, also indicating if there was a
+ change regarding which volume mark is the authoritative one
+
+ @volinfo_state, @volinfo_sys are pairs of volume mark dicts
+ (foreign, native).
+
+ Note this method is marked as static, ie. the computation is
+ pure, without reliance on any excess implicit state. State
+ transitions which are deemed as ambiguous or banned will raise
+ an exception.
+
+ """
+ # store the value below "boxed" to emulate proper closures
+ # (variables of the enclosing scope are available inner functions
+ # provided they are no reassigned; mutation is OK).
+ param = FreeObject(relax_mismatch = False, state_change = None, index=-1)
+ def select_vi(vi0, vi):
+ param.index += 1
+ if vi and (not vi0 or vi0['uuid'] == vi['uuid']):
+ if not vi0 and not param.relax_mismatch:
+ param.state_change = param.index
+ # valid new value found; for the rest, we are graceful about
+ # uuid mismatch
+ param.relax_mismatch = True
+ return vi
+ if vi0 and vi and vi0['uuid'] != vi['uuid'] and not param.relax_mismatch:
+ # uuid mismatch for master candidate, bail out
+ raise GsyncdError("aborting on uuid change from %s to %s" % \
+ (vi0['uuid'], vi['uuid']))
+ # fall back to old
+ return vi0
+ newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys))
+ srep = lambda vi: vi and vi['uuid'][0:8]
+ logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \
+ tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate))
+ return newstate, param.state_change
+
+ def crawl(self, path='.', xtl=None):
+ """crawling...
+
+ Standing around
+ All the right people
+ Crawling
+ Tennis on Tuesday
+ The ladder is long
+ It is your nature
+ You've gotta suntan
+ Football on Sunday
+ Society boy
+
+ Recursively walk the master side tree and check if updates are
+ needed due to xtime differences. One invocation of crawl checks
+ children of @path and do a recursive enter only on
+ those directory children where there is an update needed.
+
+ Way of updates depend on file type:
+ - for symlinks, sync them directy and synchronously
+ - for regular children, register jobs for @path (cf. .add_job) to start
+ and wait on their rsync
+ - for directory children, register a job for @path which waits (.wait)
+ on jobs for the given child
+ (other kind of filesystem nodes are not considered)
+
+ Those slave side children which do not exist on master are simply
+ purged (see Server.purge).
+
+ Behavior is fault tolerant, synchronization is adaptive: if some action fails,
+ just go on relentlessly, adding a fail job (see .add_failjob) which will prevent
+ the .sendmark on @path, so when the next crawl will arrive to @path it will not
+ see it as up-to-date and will try to sync it again. While this semantics can be
+ supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris),
+ the ultimate reason which excludes other possibilities is simply transience: we cannot
+ assert that the file systems (master / slave) underneath do not change and actions
+ taken upon some condition will not lose their context by the time they are performed.
+ """
+ if path == '.':
+ if self.start:
+ self.crawls += 1
+ logging.debug("... crawl #%d done, took %.6f seconds" % \
+ (self.crawls, time.time() - self.start))
+ time.sleep(1)
+ self.start = time.time()
+ should_display_info = self.start - self.lastreport['time'] >= 60
+ if should_display_info:
+ logging.info("completed %d crawls, %d turns",
+ self.crawls - self.lastreport['crawls'],
+ self.turns - self.lastreport['turns'])
+ self.lastreport.update(crawls = self.crawls,
+ turns = self.turns,
+ time = self.start)
+ volinfo_sys, state_change = self.volinfo_hook()
+ if self.inter_master:
+ self.volinfo = volinfo_sys[self.KFGN]
+ else:
+ self.volinfo = volinfo_sys[self.KNAT]
+ if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master):
+ logging.info('new master is %s', self.uuid)
+ if self.volinfo:
+ logging.info("%s master with volume id %s ..." % \
+ (self.inter_master and "intermediate" or "primary",
+ self.uuid))
+ if state_change == self.KFGN:
+ gconf.configinterface.set('volume_id', self.uuid)
+ if self.volinfo:
+ if self.volinfo['retval']:
+ raise GsyncdError ("master is corrupt")
+ self.start_checkpoint_thread()
+ else:
+ if should_display_info or self.crawls == 0:
+ if self.inter_master:
+ logging.info("waiting for being synced from %s ..." % \
+ self.volinfo_state[self.KFGN]['uuid'])
+ else:
+ logging.info("waiting for volume info ...")
+ return
+ logging.debug("entering " + path)
+ if not xtl:
+ xtl = self.xtime(path)
+ if isinstance(xtl, int):
+ self.add_failjob(path, 'no-local-node')
+ return
+ xtr = self.xtime(path, self.slave)
+ if isinstance(xtr, int):
+ if xtr != ENOENT:
+ self.slave.server.purge(path)
+ try:
+ self.slave.server.mkdir(path)
+ except OSError:
+ self.add_failjob(path, 'no-remote-node')
+ return
+ xtr = self.minus_infinity
+ else:
+ self.xtime_reversion_hook(path, xtl, xtr)
+ if xtl == xtr:
+ if path == '.' and self.change_seen:
+ self.turns += 1
+ self.change_seen = False
+ if self.total_turns:
+ logging.info("finished turn #%s/%s" % \
+ (self.turns, self.total_turns))
+ if self.turns == self.total_turns:
+ logging.info("reached turn limit")
+ self.terminate = True
+ return
+ if path == '.':
+ self.change_seen = True
+ try:
+ dem = self.master.server.entries(path)
+ except OSError:
+ self.add_failjob(path, 'local-entries-fail')
+ return
+ random.shuffle(dem)
+ try:
+ des = self.slave.server.entries(path)
+ except OSError:
+ self.slave.server.purge(path)
+ try:
+ self.slave.server.mkdir(path)
+ des = self.slave.server.entries(path)
+ except OSError:
+ self.add_failjob(path, 'remote-entries-fail')
+ return
+ dd = set(des) - set(dem)
+ if dd:
+ self.purge_missing(path, dd)
+ chld = []
+ for e in dem:
+ e = os.path.join(path, e)
+ xte = self.xtime(e)
+ if isinstance(xte, int):
+ logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte]))
+ elif self.need_sync(e, xte, xtr):
+ chld.append((e, xte))
+ def indulgently(e, fnc, blame=None):
+ if not blame:
+ blame = path
+ try:
+ return fnc(e)
+ except (IOError, OSError):
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ logging.warn("salvaged ENOENT for " + e)
+ self.add_failjob(blame, 'by-indulgently')
+ return False
+ else:
+ raise
+ for e, xte in chld:
+ st = indulgently(e, lambda e: os.lstat(e))
+ if st == False:
+ continue
+ mo = st.st_mode
+ adct = {'own': (st.st_uid, st.st_gid)}
+ if stat.S_ISLNK(mo):
+ if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False:
+ continue
+ self.sendmark(e, xte, adct)
+ elif stat.S_ISREG(mo):
+ logging.debug("syncing %s ..." % e)
+ pb = self.syncer.add(e)
+ timeA=datetime.now()
+ def regjob(e, xte, pb):
+ if pb.wait():
+ logging.debug("synced " + e)
+ self.sendmark_regular(e, xte)
+
+ timeB=datetime.now()
+ self.lastSyncTime=timeB-timeA
+ self.syncTime=(self.syncTime+self.lastSyncTime.microseconds)/(10.0**6)
+ self.filesSynced=self.filesSynced+1
+ return True
+ else:
+ logging.warn("failed to sync " + e)
+ self.add_job(path, 'reg', regjob, e, xte, pb)
+ elif stat.S_ISDIR(mo):
+ adct['mode'] = mo
+ if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct),
+ self.crawl(e, xte),
+ True)[-1], blame=e) == False:
+ continue
+ else:
+ # ignore fifos, sockets and special files
+ pass
+ if path == '.':
+ self.wait(path, xtl)
+
+class BoxClosedErr(Exception):
+ pass
+
+class PostBox(list):
+ """synchronized collection for storing things thought of as "requests" """
+
+ def __init__(self, *a):
+ list.__init__(self, *a)
+ # too bad Python stdlib does not have read/write locks...
+ # it would suffivce to grab the lock in .append as reader, in .close as writer
+ self.lever = Condition()
+ self.open = True
+ self.done = False
+
+ def wait(self):
+ """wait on requests to be processed"""
+ self.lever.acquire()
+ if not self.done:
+ self.lever.wait()
+ self.lever.release()
+ return self.result
+
+ def wakeup(self, data):
+ """wake up requestors with the result"""
+ self.result = data
+ self.lever.acquire()
+ self.done = True
+ self.lever.notifyAll()
+ self.lever.release()
+
+ def append(self, e):
+ """post a request"""
+ self.lever.acquire()
+ if not self.open:
+ raise BoxClosedErr
+ list.append(self, e)
+ self.lever.release()
+
+ def close(self):
+ """prohibit the posting of further requests"""
+ self.lever.acquire()
+ self.open = False
+ self.lever.release()
+
+class Syncer(object):
+ """a staged queue to relay rsync requests to rsync workers
+
+ By "staged queue" its meant that when a consumer comes to the
+ queue, it takes _all_ entries, leaving the queue empty.
+ (I don't know if there is an official term for this pattern.)
+
+ The queue uses a PostBox to accumulate incoming items.
+ When a consumer (rsync worker) comes, a new PostBox is
+ set up and the old one is passed on to the consumer.
+
+ Instead of the simplistic scheme of having one big lock
+ which synchronizes both the addition of new items and
+ PostBox exchanges, use a separate lock to arbitrate consumers,
+ and rely on PostBox's synchronization mechanisms take
+ care about additions.
+
+ There is a corner case racy situation, producers vs. consumers,
+ which is not handled by this scheme: namely, when the PostBox
+ exchange occurs in between being passed to the producer for posting
+ and the post placement. But that's what Postbox.close is for:
+ such a posting will find the PostBox closed, in which case
+ the producer can re-try posting against the actual PostBox of
+ the queue.
+
+ To aid accumlation of items in the PostBoxen before grabbed
+ by an rsync worker, the worker goes to sleep a bit after
+ each completed syncjob.
+ """
+
+ def __init__(self, slave):
+ """spawn worker threads"""
+ self.slave = slave
+ self.lock = Lock()
+ self.pb = PostBox()
+ self.bytesSynced=0
+ for i in range(int(gconf.sync_jobs)):
+ t = Thread(target=self.syncjob)
+ t.start()
+
+ def syncjob(self):
+ """the life of a worker"""
+ while True:
+ pb = None
+ while True:
+ self.lock.acquire()
+ if self.pb:
+ pb, self.pb = self.pb, PostBox()
+ self.lock.release()
+ if pb:
+ break
+ time.sleep(0.5)
+ pb.close()
+ po = self.slave.rsync(pb)
+ if po.returncode == 0:
+ regEx=re.search('\ *total\ *transferred\ *file\ *size:\ *(\d+)\ *bytes\ *',po.stdout.read(),re.IGNORECASE)
+ if regEx:
+ self.bytesSynced+=(int(regEx.group(1)))/1024
+ ret = True
+ elif po.returncode in (23, 24):
+ # partial transfer (cf. rsync(1)), that's normal
+ ret = False
+ else:
+ po.errfail()
+ pb.wakeup(ret)
+
+ def add(self, e):
+ while True:
+ pb = self.pb
+ try:
+ pb.append(e)
+ return pb
+ except BoxClosedErr:
+ pass
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
new file mode 100644
index 00000000000..b8956dcc2b9
--- /dev/null
+++ b/geo-replication/syncdaemon/monitor.py
@@ -0,0 +1,129 @@
+import os
+import sys
+import time
+import signal
+import logging
+from gconf import gconf
+from syncdutils import update_file, select, waitpid, set_term_handler
+
+class Monitor(object):
+ """class which spawns and manages gsyncd workers"""
+
+ def __init__(self):
+ self.state = None
+
+ def set_state(self, state):
+ """set the state that can be used by external agents
+ like glusterd for status reporting"""
+ if state == self.state:
+ return
+ self.state = state
+ logging.info('new state: %s' % state)
+ if getattr(gconf, 'state_file', None):
+ update_file(gconf.state_file, lambda f: f.write(state + '\n'))
+
+ def monitor(self):
+ """the monitor loop
+
+ Basic logic is a blantantly simple blunt heuristics:
+ if spawned client survives 60 secs, it's considered OK.
+ This servers us pretty well as it's not vulneralbe to
+ any kind of irregular behavior of the child...
+
+ ... well, except for one: if children is hung up on
+ waiting for some event, it can survive aeons, still
+ will be defunct. So we tweak the above logic to
+ expect the worker to send us a signal within 60 secs
+ (in the form of closing its end of a pipe). The worker
+ does this when it's done with the setup stage
+ ready to enter the service loop (note it's the setup
+ stage which is vulnerable to hangs -- the full
+ blown worker blows up on EPIPE if the net goes down,
+ due to the keep-alive thread)
+ """
+ def sigcont_handler(*a):
+ """
+ Re-init logging and send group kill signal
+ """
+ md = gconf.log_metadata
+ logging.shutdown()
+ lcls = logging.getLoggerClass()
+ lcls.setup(label=md.get('saved_label'), **md)
+ pid = os.getpid()
+ os.kill(-pid, signal.SIGUSR1)
+ signal.signal(signal.SIGUSR1, lambda *a: ())
+ signal.signal(signal.SIGCONT, sigcont_handler)
+
+ argv = sys.argv[:]
+ for o in ('-N', '--no-daemon', '--monitor'):
+ while o in argv:
+ argv.remove(o)
+ argv.extend(('-N', '-p', ''))
+ argv.insert(0, os.path.basename(sys.executable))
+
+ self.set_state('starting...')
+ ret = 0
+ def nwait(p, o=0):
+ p2, r = waitpid(p, o)
+ if not p2:
+ return
+ return r
+ def exit_signalled(s):
+ """ child teminated due to receipt of SIGUSR1 """
+ return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))
+ def exit_status(s):
+ if os.WIFEXITED(s):
+ return os.WEXITSTATUS(s)
+ return 1
+ conn_timeout = int(gconf.connection_timeout)
+ while ret in (0, 1):
+ logging.info('-' * conn_timeout)
+ logging.info('starting gsyncd worker')
+ pr, pw = os.pipe()
+ cpid = os.fork()
+ if cpid == 0:
+ os.close(pr)
+ os.execv(sys.executable, argv + ['--feedback-fd', str(pw)])
+ os.close(pw)
+ t0 = time.time()
+ so = select((pr,), (), (), conn_timeout)[0]
+ os.close(pr)
+ if so:
+ ret = nwait(cpid, os.WNOHANG)
+ if ret != None:
+ logging.debug("worker died before establishing connection")
+ else:
+ logging.debug("worker seems to be connected (?? racy check)")
+ while time.time() < t0 + conn_timeout:
+ ret = nwait(cpid, os.WNOHANG)
+ if ret != None:
+ logging.debug("worker died in startup phase")
+ break
+ time.sleep(1)
+ else:
+ logging.debug("worker not confirmed in %d sec, aborting it" % \
+ conn_timeout)
+ # relax one SIGTERM by setting a handler that sets back
+ # standard handler
+ set_term_handler(lambda *a: set_term_handler())
+ # give a chance to graceful exit
+ os.kill(-os.getpid(), signal.SIGTERM)
+ time.sleep(1)
+ os.kill(cpid, signal.SIGKILL)
+ ret = nwait(cpid)
+ if ret == None:
+ self.set_state('OK')
+ ret = nwait(cpid)
+ if exit_signalled(ret):
+ ret = 0
+ else:
+ ret = exit_status(ret)
+ if ret in (0,1):
+ self.set_state('faulty')
+ time.sleep(10)
+ self.set_state('inconsistent')
+ return ret
+
+def monitor():
+ """oh yeah, actually Monitor is used as singleton, too"""
+ return Monitor().monitor()
diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py
new file mode 100644
index 00000000000..755fb61df48
--- /dev/null
+++ b/geo-replication/syncdaemon/repce.py
@@ -0,0 +1,225 @@
+import os
+import sys
+import time
+import logging
+from threading import Condition
+try:
+ import thread
+except ImportError:
+ # py 3
+ import _thread as thread
+try:
+ from Queue import Queue
+except ImportError:
+ # py 3
+ from queue import Queue
+try:
+ import cPickle as pickle
+except ImportError:
+ # py 3
+ import pickle
+
+from syncdutils import Thread, select
+
+pickle_proto = -1
+repce_version = 1.0
+
+def ioparse(i, o):
+ if isinstance(i, int):
+ i = os.fdopen(i)
+ # rely on duck typing for recognizing
+ # streams as that works uniformly
+ # in py2 and py3
+ if hasattr(o, 'fileno'):
+ o = o.fileno()
+ return (i, o)
+
+def send(out, *args):
+ """pickle args and write out wholly in one syscall
+
+ ie. not use the ability of pickle to dump directly to
+ a stream, as that would potentially mess up messages
+ by interleaving them
+ """
+ os.write(out, pickle.dumps(args, pickle_proto))
+
+def recv(inf):
+ """load an object from input stream"""
+ return pickle.load(inf)
+
+
+class RepceServer(object):
+ """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
+
+ ... also our homebrewed RPC backend where the transport layer is
+ reduced to a pair of filehandles.
+
+ This is the server component.
+ """
+
+ def __init__(self, obj, i, o, wnum=6):
+ """register a backend object .obj to which incoming messages
+ are dispatched, also incoming/outcoming streams
+ """
+ self.obj = obj
+ self.inf, self.out = ioparse(i, o)
+ self.wnum = wnum
+ self.q = Queue()
+
+ def service_loop(self):
+ """fire up worker threads, get messages and dispatch among them"""
+ for i in range(self.wnum):
+ t = Thread(target=self.worker)
+ t.start()
+ try:
+ while True:
+ self.q.put(recv(self.inf))
+ except EOFError:
+ logging.info("terminating on reaching EOF.")
+
+ def worker(self):
+ """life of a worker
+
+ Get message, extract its id, method name and arguments
+ (kwargs not supported), call method on .obj.
+ Send back message id + return value.
+ If method call throws an exception, rescue it, and send
+ back the exception as result (with flag marking it as
+ exception).
+ """
+ while True:
+ in_data = self.q.get(True)
+ rid = in_data[0]
+ rmeth = in_data[1]
+ exc = False
+ if rmeth == '__repce_version__':
+ res = repce_version
+ else:
+ try:
+ res = getattr(self.obj, rmeth)(*in_data[2:])
+ except:
+ res = sys.exc_info()[1]
+ exc = True
+ logging.exception("call failed: ")
+ send(self.out, rid, exc, res)
+
+
+class RepceJob(object):
+ """class representing message status we can use
+ for waiting on reply"""
+
+ def __init__(self, cbk):
+ """
+ - .rid: (process-wise) unique id
+ - .cbk: what we do upon receiving reply
+ """
+ self.rid = (os.getpid(), thread.get_ident(), time.time())
+ self.cbk = cbk
+ self.lever = Condition()
+ self.done = False
+
+ def __repr__(self):
+ return ':'.join([str(x) for x in self.rid])
+
+ def wait(self):
+ self.lever.acquire()
+ if not self.done:
+ self.lever.wait()
+ self.lever.release()
+ return self.result
+
+ def wakeup(self, data):
+ self.result = data
+ self.lever.acquire()
+ self.done = True
+ self.lever.notify()
+ self.lever.release()
+
+
+class RepceClient(object):
+ """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
+
+ ... also our homebrewed RPC backend where the transport layer is
+ reduced to a pair of filehandles.
+
+ This is the client component.
+ """
+
+ def __init__(self, i, o):
+ self.inf, self.out = ioparse(i, o)
+ self.jtab = {}
+ t = Thread(target = self.listen)
+ t.start()
+
+ def listen(self):
+ while True:
+ select((self.inf,), (), ())
+ rid, exc, res = recv(self.inf)
+ rjob = self.jtab.pop(rid)
+ if rjob.cbk:
+ rjob.cbk(rjob, [exc, res])
+
+ def push(self, meth, *args, **kw):
+ """wrap arguments in a RepceJob, send them to server
+ and return the RepceJob
+
+ @cbk to pass on RepceJob can be given as kwarg.
+ """
+ cbk = kw.get('cbk')
+ if not cbk:
+ def cbk(rj, res):
+ if res[0]:
+ raise res[1]
+ rjob = RepceJob(cbk)
+ self.jtab[rjob.rid] = rjob
+ logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args)))
+ send(self.out, rjob.rid, meth, *args)
+ return rjob
+
+ def __call__(self, meth, *args):
+ """RePCe client is callabe, calling it implements a synchronous remote call
+
+ We do a .push with a cbk which does a wakeup upon receiving anwser, then wait
+ on the RepceJob.
+ """
+ rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
+ exc, res = rjob.wait()
+ if exc:
+ logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__)))
+ raise res
+ logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
+ return res
+
+ class mprx(object):
+ """method proxy, standard trick to implement rubyesque method_missing
+ in Python
+
+ A class is a closure factory, you know what I mean, or go read some SICP.
+ """
+ def __init__(self, ins, meth):
+ self.ins = ins
+ self.meth = meth
+
+ def __call__(self, *a):
+ return self.ins(self.meth, *a)
+
+ def __getattr__(self, meth):
+ """this implements transparent method dispatch to remote object,
+ so that you don't need to call the RepceClient instance like
+
+ rclient('how_old_are_you_if_born_in', 1979)
+
+ but you can make it into an ordinary method call like
+
+ rclient.how_old_are_you_if_born_in(1979)
+ """
+ return self.mprx(self, meth)
+
+ def __version__(self):
+ """used in handshake to verify compatibility"""
+ d = {'proto': self('__repce_version__')}
+ try:
+ d['object'] = self('version')
+ except AttributeError:
+ pass
+ return d
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
new file mode 100644
index 00000000000..73102fbcb44
--- /dev/null
+++ b/geo-replication/syncdaemon/resource.py
@@ -0,0 +1,972 @@
+import re
+import os
+import sys
+import stat
+import time
+import fcntl
+import errno
+import struct
+import socket
+import logging
+import tempfile
+import threading
+import subprocess
+from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR
+from select import error as SelectError
+
+from gconf import gconf
+import repce
+from repce import RepceServer, RepceClient
+from master import gmaster_builder
+import syncdutils
+from syncdutils import GsyncdError, select, privileged, boolify
+
+UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
+HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
+UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
+
+def sup(x, *a, **kw):
+ """a rubyesque "super" for python ;)
+
+ invoke caller method in parent class with given args.
+ """
+ return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw)
+
+def desugar(ustr):
+ """transform sugared url strings to standard <scheme>://<urlbody> form
+
+ parsing logic enforces the constraint that sugared forms should contatin
+ a ':' or a '/', which ensures that sugared urls do not conflict with
+ gluster volume names.
+ """
+ m = re.match('([^:]*):(.*)', ustr)
+ if m:
+ if not m.groups()[0]:
+ return "gluster://localhost" + ustr
+ elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]):
+ return "ssh://" + ustr
+ else:
+ return "gluster://" + ustr
+ else:
+ if ustr[0] != '/':
+ raise GsyncdError("cannot resolve sugared url '%s'" % ustr)
+ ap = os.path.normpath(ustr)
+ if ap.startswith('//'):
+ ap = ap[1:]
+ return "file://" + ap
+
+def gethostbyname(hnam):
+ """gethostbyname wrapper"""
+ try:
+ return socket.gethostbyname(hnam)
+ except socket.gaierror:
+ ex = sys.exc_info()[1]
+ raise GsyncdError("failed to resolve %s: %s" % \
+ (hnam, ex.strerror))
+
+def parse_url(ustr):
+ """instantiate an url object by scheme-to-class dispatch
+
+ The url classes taken into consideration are the ones in
+ this module whose names are full-caps.
+ """
+ m = UrlRX.match(ustr)
+ if not m:
+ ustr = desugar(ustr)
+ m = UrlRX.match(ustr)
+ if not m:
+ raise GsyncdError("malformed url")
+ sch, path = m.groups()
+ this = sys.modules[__name__]
+ if not hasattr(this, sch.upper()):
+ raise GsyncdError("unknown url scheme " + sch)
+ return getattr(this, sch.upper())(path)
+
+
+class _MetaXattr(object):
+ """singleton class, a lazy wrapper around the
+ libcxattr module
+
+ libcxattr (a heavy import due to ctypes) is
+ loaded only when when the single
+ instance is tried to be used.
+
+ This reduces runtime for those invocations
+ which do not need filesystem manipulation
+ (eg. for config, url parsing)
+ """
+
+ def __getattr__(self, meth):
+ from libcxattr import Xattr as LXattr
+ xmeth = [ m for m in dir(LXattr) if m[0] != '_' ]
+ if not meth in xmeth:
+ return
+ for m in xmeth:
+ setattr(self, m, getattr(LXattr, m))
+ return getattr(self, meth)
+
+Xattr = _MetaXattr()
+
+
+class Popen(subprocess.Popen):
+ """customized subclass of subprocess.Popen with a ring
+ buffer for children error output"""
+
+ @classmethod
+ def init_errhandler(cls):
+ """start the thread which handles children's error output"""
+ cls.errstore = {}
+ def tailer():
+ while True:
+ errstore = cls.errstore.copy()
+ try:
+ poe, _ ,_ = select([po.stderr for po in errstore], [], [], 1)
+ except (ValueError, SelectError):
+ continue
+ for po in errstore:
+ if po.stderr not in poe:
+ continue
+ po.lock.acquire()
+ try:
+ if po.on_death_row:
+ continue
+ la = errstore[po]
+ try:
+ fd = po.stderr.fileno()
+ except ValueError: # file is already closed
+ continue
+ l = os.read(fd, 1024)
+ if not l:
+ continue
+ tots = len(l)
+ for lx in la:
+ tots += len(lx)
+ while tots > 1<<20 and la:
+ tots -= len(la.pop(0))
+ la.append(l)
+ finally:
+ po.lock.release()
+ t = syncdutils.Thread(target = tailer)
+ t.start()
+ cls.errhandler = t
+
+ @classmethod
+ def fork(cls):
+ """fork wrapper that restarts errhandler thread in child"""
+ pid = os.fork()
+ if not pid:
+ cls.init_errhandler()
+ return pid
+
+ def __init__(self, args, *a, **kw):
+ """customizations for subprocess.Popen instantiation
+
+ - 'close_fds' is taken to be the default
+ - if child's stderr is chosen to be managed,
+ register it with the error handler thread
+ """
+ self.args = args
+ if 'close_fds' not in kw:
+ kw['close_fds'] = True
+ self.lock = threading.Lock()
+ self.on_death_row = False
+ try:
+ sup(self, args, *a, **kw)
+ except:
+ ex = sys.exc_info()[1]
+ if not isinstance(ex, OSError):
+ raise
+ raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \
+ (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno)))
+ if kw.get('stderr') == subprocess.PIPE:
+ assert(getattr(self, 'errhandler', None))
+ self.errstore[self] = []
+
+ def errlog(self):
+ """make a log about child's failure event"""
+ filling = ""
+ if self.elines:
+ filling = ", saying:"
+ logging.error("""command "%s" returned with %s%s""" % \
+ (" ".join(self.args), repr(self.returncode), filling))
+ lp = ''
+ def logerr(l):
+ logging.error(self.args[0] + "> " + l)
+ for l in self.elines:
+ ls = l.split('\n')
+ ls[0] = lp + ls[0]
+ lp = ls.pop()
+ for ll in ls:
+ logerr(ll)
+ if lp:
+ logerr(lp)
+
+ def errfail(self):
+ """fail nicely if child did not terminate with success"""
+ self.errlog()
+ syncdutils.finalize(exval = 1)
+
+ def terminate_geterr(self, fail_on_err = True):
+ """kill child, finalize stderr harvesting (unregister
+ from errhandler, set up .elines), fail on error if
+ asked for
+ """
+ self.lock.acquire()
+ try:
+ self.on_death_row = True
+ finally:
+ self.lock.release()
+ elines = self.errstore.pop(self)
+ if self.poll() == None:
+ self.terminate()
+ if self.poll() == None:
+ time.sleep(0.1)
+ self.kill()
+ self.wait()
+ while True:
+ if not select([self.stderr],[],[],0.1)[0]:
+ break
+ b = os.read(self.stderr.fileno(), 1024)
+ if b:
+ elines.append(b)
+ else:
+ break
+ self.stderr.close()
+ self.elines = elines
+ if fail_on_err and self.returncode != 0:
+ self.errfail()
+
+
+class Server(object):
+ """singleton implemening those filesystem access primitives
+ which are needed for geo-replication functionality
+
+ (Singleton in the sense it's a class which has only static
+ and classmethods and is used directly, without instantiation.)
+ """
+
+ GX_NSPACE = (privileged() and "trusted" or "system") + ".glusterfs"
+ NTV_FMTSTR = "!" + "B"*19 + "II"
+ FRGN_XTRA_FMT = "I"
+ FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
+
+ def _pathguard(f):
+ """decorator method that checks
+ the path argument of the decorated
+ functions to make sure it does not
+ point out of the managed tree
+ """
+
+ fc = getattr(f, 'func_code', None)
+ if not fc:
+ # python 3
+ fc = f.__code__
+ pi = list(fc.co_varnames).index('path')
+ def ff(*a):
+ path = a[pi]
+ ps = path.split('/')
+ if path[0] == '/' or '..' in ps:
+ raise ValueError('unsafe path')
+ return f(*a)
+ return ff
+
+ @staticmethod
+ @_pathguard
+ def entries(path):
+ """directory entries in an array"""
+ # prevent symlinks being followed
+ if not stat.S_ISDIR(os.lstat(path).st_mode):
+ raise OSError(ENOTDIR, os.strerror(ENOTDIR))
+ return os.listdir(path)
+
+ @classmethod
+ @_pathguard
+ def purge(cls, path, entries=None):
+ """force-delete subtrees
+
+ If @entries is not specified, delete
+ the whole subtree under @path (including
+ @path).
+
+ Otherwise, @entries should be a
+ a sequence of children of @path, and
+ the effect is identical with a joint
+ @entries-less purge on them, ie.
+
+ for e in entries:
+ cls.purge(os.path.join(path, e))
+ """
+ me_also = entries == None
+ if not entries:
+ try:
+ # if it's a symlink, prevent
+ # following it
+ try:
+ os.unlink(path)
+ return
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EISDIR:
+ entries = os.listdir(path)
+ else:
+ raise
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOTDIR, ENOENT, ELOOP):
+ try:
+ os.unlink(path)
+ return
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ return
+ raise
+ else:
+ raise
+ for e in entries:
+ cls.purge(os.path.join(path, e))
+ if me_also:
+ os.rmdir(path)
+
+ @classmethod
+ @_pathguard
+ def _create(cls, path, ctor):
+ """path creation backend routine"""
+ try:
+ ctor(path)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST:
+ cls.purge(path)
+ return ctor(path)
+ raise
+
+ @classmethod
+ @_pathguard
+ def mkdir(cls, path):
+ cls._create(path, os.mkdir)
+
+ @classmethod
+ @_pathguard
+ def symlink(cls, lnk, path):
+ cls._create(path, lambda p: os.symlink(lnk, p))
+
+ @classmethod
+ @_pathguard
+ def xtime(cls, path, uuid):
+ """query xtime extended attribute
+
+ Return xtime of @path for @uuid as a pair of integers.
+ "Normal" errors due to non-existent @path or extended attribute
+ are tolerated and errno is returned in such a case.
+ """
+
+ try:
+ return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ def xtime_vec(cls, path, *uuids):
+ """vectored version of @xtime
+
+ accepts a list of uuids and returns a dictionary
+ with uuid as key(s) and xtime as value(s)
+ """
+ xt = {}
+ for uuid in uuids:
+ xtu = cls.xtime(path, uuid)
+ if xtu == ENODATA:
+ xtu = None
+ if isinstance(xtu, int):
+ return xtu
+ xt[uuid] = xtu
+ return xt
+
+ @classmethod
+ @_pathguard
+ def set_xtime(cls, path, uuid, mark):
+ """set @mark as xtime for @uuid on @path"""
+ Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
+
+ @classmethod
+ def set_xtime_vec(cls, path, mark_dct):
+ """vectored (or dictered) version of set_xtime
+
+ ignore values that match @ignore
+ """
+ for u,t in mark_dct.items():
+ cls.set_xtime(path, u, t)
+
+ @staticmethod
+ @_pathguard
+ def setattr(path, adct):
+ """set file attributes
+
+ @adct is a dict, where 'own', 'mode' and 'times'
+ keys are looked for and values used to perform
+ chown, chmod or utimes on @path.
+ """
+ own = adct.get('own')
+ if own:
+ os.lchown(path, *own)
+ mode = adct.get('mode')
+ if mode:
+ os.chmod(path, stat.S_IMODE(mode))
+ times = adct.get('times')
+ if times:
+ os.utime(path, times)
+
+ @staticmethod
+ def pid():
+ return os.getpid()
+
+ last_keep_alive = 0
+ @classmethod
+ def keep_alive(cls, dct):
+ """process keepalive messages.
+
+ Return keep-alive counter (number of received keep-alive
+ messages).
+
+ Now the "keep-alive" message can also have a payload which is
+ used to set a foreign volume-mark on the underlying file system.
+ """
+ if dct:
+ key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])
+ val = struct.pack(cls.FRGN_FMTSTR,
+ *(dct['version'] +
+ tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) +
+ (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],)))
+ Xattr.lsetxattr('.', key, val)
+ cls.last_keep_alive += 1
+ return cls.last_keep_alive
+
+ @staticmethod
+ def version():
+ """version used in handshake"""
+ return 1.0
+
+
+class SlaveLocal(object):
+ """mix-in class to implement some factes of a slave server
+
+ ("mix-in" is sort of like "abstract class", ie. it's not
+ instantiated just included in the ancesty DAG. I use "mix-in"
+ to indicate that it's not used as an abstract base class,
+ rather just taken in to implement additional functionality
+ on the basis of the assumed availability of certain interfaces.)
+ """
+
+ def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
+ return not remote
+
+ def service_loop(self):
+ """start a RePCe server serving self's server
+
+ stop servicing if a timeout is configured and got no
+ keep-alime in that inteval
+ """
+
+ if boolify(gconf.use_rsync_xattrs) and not privileged():
+ raise GsyncdError("using rsync for extended attributes is not supported")
+
+ repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
+ t = syncdutils.Thread(target=lambda: (repce.service_loop(),
+ syncdutils.finalize()))
+ t.start()
+ logging.info("slave listening")
+ if gconf.timeout and int(gconf.timeout) > 0:
+ while True:
+ lp = self.server.last_keep_alive
+ time.sleep(int(gconf.timeout))
+ if lp == self.server.last_keep_alive:
+ logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout))
+ break
+ else:
+ select((), (), ())
+
+class SlaveRemote(object):
+ """mix-in class to implement an interface to a remote slave"""
+
+ def connect_remote(self, rargs=[], **opts):
+ """connects to a remote slave
+
+ Invoke an auxiliary utility (slave gsyncd, possibly wrapped)
+ which sets up the connection and set up a RePCe client to
+ communicate throuh its stdio.
+ """
+ slave = opts.get('slave', self.url)
+ extra_opts = []
+ so = getattr(gconf, 'session_owner', None)
+ if so:
+ extra_opts += ['--session-owner', so]
+ if boolify(gconf.use_rsync_xattrs):
+ extra_opts.append('--use-rsync-xattrs')
+ po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + \
+ ['-N', '--listen', '--timeout', str(gconf.timeout), slave],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ gconf.transport = po
+ return self.start_fd_client(po.stdout, po.stdin, **opts)
+
+ def start_fd_client(self, i, o, **opts):
+ """set up RePCe client, handshake with server
+
+ It's cut out as a separate method to let
+ subclasses hook into client startup
+ """
+ self.server = RepceClient(i, o)
+ rv = self.server.__version__()
+ exrv = {'proto': repce.repce_version, 'object': Server.version()}
+ da0 = (rv, exrv)
+ da1 = ({}, {})
+ for i in range(2):
+ for k, v in da0[i].iteritems():
+ da1[i][k] = int(v)
+ if da1[0] != da1[1]:
+ raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv))
+
+ def rsync(self, files, *args):
+ """invoke rsync"""
+ if not files:
+ raise GsyncdError("no files to sync")
+ logging.debug("files: " + ", ".join(files))
+ argv = gconf.rsync_command.split() + \
+ ['-aR0', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \
+ gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \
+ ['.'] + list(args)
+ po = Popen(argv, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
+ for f in files:
+ po.stdin.write(f)
+ po.stdin.write('\0')
+
+ po.stdin.close()
+ po.wait()
+ po.terminate_geterr(fail_on_err = False)
+
+ return po
+
+
+class AbstractUrl(object):
+ """abstract base class for url scheme classes"""
+
+ def __init__(self, path, pattern):
+ m = re.search(pattern, path)
+ if not m:
+ raise GsyncdError("malformed path")
+ self.path = path
+ return m.groups()
+
+ @property
+ def scheme(self):
+ return type(self).__name__.lower()
+
+ def canonical_path(self):
+ return self.path
+
+ def get_url(self, canonical=False, escaped=False):
+ """format self's url in various styles"""
+ if canonical:
+ pa = self.canonical_path()
+ else:
+ pa = self.path
+ u = "://".join((self.scheme, pa))
+ if escaped:
+ u = syncdutils.escape(u)
+ return u
+
+ @property
+ def url(self):
+ return self.get_url()
+
+
+ ### Concrete resource classes ###
+
+
+class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
+ """scheme class for file:// urls
+
+ can be used to represent a file slave server
+ on slave side, or interface to a remote file
+ file server on master side
+ """
+
+ class FILEServer(Server):
+ """included server flavor"""
+ pass
+
+ server = FILEServer
+
+ def __init__(self, path):
+ sup(self, path, '^/')
+
+ def connect(self):
+ """inhibit the resource beyond"""
+ os.chdir(self.path)
+
+ def rsync(self, files):
+ return sup(self, files, self.path)
+
+
+class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
+ """scheme class for gluster:// urls
+
+ can be used to represent a gluster slave server
+ on slave side, or interface to a remote gluster
+ slave on master side, or to represent master
+ (slave-ish features come from the mixins, master
+ functionality is outsourced to GMaster from master)
+ """
+
+ class GLUSTERServer(Server):
+ "server enhancements for a glusterfs backend"""
+
+ @classmethod
+ def _attr_unpack_dict(cls, xattr, extra_fields = ''):
+ """generic volume mark fetching/parsing backed"""
+ fmt_string = cls.NTV_FMTSTR + extra_fields
+ buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
+ vm = struct.unpack(fmt_string, buf)
+ m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]]))
+ uuid = '-'.join(m.groups())
+ volinfo = { 'version': vm[0:2],
+ 'uuid' : uuid,
+ 'retval' : vm[18],
+ 'volume_mark': vm[19:21],
+ }
+ if extra_fields:
+ return volinfo, vm[-len(extra_fields):]
+ else:
+ return volinfo
+
+ @classmethod
+ def foreign_volume_infos(cls):
+ """return list of valid (not expired) foreign volume marks"""
+ dict_list = []
+ xattr_list = Xattr.llistxattr_buf('.')
+ for ele in xattr_list:
+ if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0:
+ d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
+ now = int(time.time())
+ if x[0] > now:
+ logging.debug("volinfo[%s] expires: %d (%d sec later)" % \
+ (d['uuid'], x[0], x[0] - now))
+ d['timeout'] = x[0]
+ dict_list.append(d)
+ else:
+ try:
+ Xattr.lremovexattr('.', ele)
+ except OSError:
+ pass
+ return dict_list
+
+ @classmethod
+ def native_volume_info(cls):
+ """get the native volume mark of the underlying gluster volume"""
+ try:
+ return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark']))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno != ENODATA:
+ raise
+
+ server = GLUSTERServer
+
+ def __init__(self, path):
+ self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
+
+ def canonical_path(self):
+ return ':'.join([gethostbyname(self.host), self.volume])
+
+ def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
+ return True
+
+ class Mounter(object):
+ """Abstract base class for mounter backends"""
+
+ def __init__(self, params):
+ self.params = params
+ self.mntpt = None
+
+ @classmethod
+ def get_glusterprog(cls):
+ return os.path.join(gconf.gluster_command_dir, cls.glusterprog)
+
+ def umount_l(self, d):
+ """perform lazy umount"""
+ po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE)
+ po.wait()
+ return po
+
+ @classmethod
+ def make_umount_argv(cls, d):
+ raise NotImplementedError
+
+ def make_mount_argv(self, *a):
+ raise NotImplementedError
+
+ def cleanup_mntpt(self, *a):
+ pass
+
+ def handle_mounter(self, po):
+ po.wait()
+
+ def inhibit(self, *a):
+ """inhibit a gluster filesystem
+
+ Mount glusterfs over a temporary mountpoint,
+ change into the mount, and lazy unmount the
+ filesystem.
+ """
+
+ mpi, mpo = os.pipe()
+ mh = Popen.fork()
+ if mh:
+ os.close(mpi)
+ fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
+ d = None
+ margv = self.make_mount_argv(*a)
+ if self.mntpt:
+ # mntpt is determined pre-mount
+ d = self.mntpt
+ os.write(mpo, d + '\0')
+ po = Popen(margv, **self.mountkw)
+ self.handle_mounter(po)
+ po.terminate_geterr()
+ logging.debug('auxiliary glusterfs mount in place')
+ if not d:
+ # mntpt is determined during mount
+ d = self.mntpt
+ os.write(mpo, d + '\0')
+ os.write(mpo, 'M')
+ t = syncdutils.Thread(target=lambda: os.chdir(d))
+ t.start()
+ tlim = gconf.starttime + int(gconf.connection_timeout)
+ while True:
+ if not t.isAlive():
+ break
+ if time.time() >= tlim:
+ syncdutils.finalize(exval = 1)
+ time.sleep(1)
+ os.close(mpo)
+ _, rv = syncdutils.waitpid(mh, 0)
+ if rv:
+ rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \
+ (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0)
+ logging.warn('stale mount possibly left behind on ' + d)
+ raise GsyncdError("cleaning up temp mountpoint %s failed with status %d" % \
+ (d, rv))
+ else:
+ rv = 0
+ try:
+ os.setsid()
+ os.close(mpo)
+ mntdata = ''
+ while True:
+ c = os.read(mpi, 1)
+ if not c:
+ break
+ mntdata += c
+ if mntdata:
+ mounted = False
+ if mntdata[-1] == 'M':
+ mntdata = mntdata[:-1]
+ assert(mntdata)
+ mounted = True
+ assert(mntdata[-1] == '\0')
+ mntpt = mntdata[:-1]
+ assert(mntpt)
+ if mounted:
+ po = self.umount_l(mntpt)
+ po.terminate_geterr(fail_on_err = False)
+ if po.returncode != 0:
+ po.errlog()
+ rv = po.returncode
+ self.cleanup_mntpt(mntpt)
+ except:
+ logging.exception('mount cleanup failure:')
+ rv = 200
+ os._exit(rv)
+ logging.debug('auxiliary glusterfs mount prepared')
+
+ class DirectMounter(Mounter):
+ """mounter backend which calls mount(8), umount(8) directly"""
+
+ mountkw = {'stderr': subprocess.PIPE}
+ glusterprog = 'glusterfs'
+
+ @staticmethod
+ def make_umount_argv(d):
+ return ['umount', '-l', d]
+
+ def make_mount_argv(self):
+ self.mntpt = tempfile.mkdtemp(prefix = 'gsyncd-aux-mount-')
+ return [self.get_glusterprog()] + ['--' + p for p in self.params] + [self.mntpt]
+
+ def cleanup_mntpt(self, mntpt = None):
+ if not mntpt:
+ mntpt = self.mntpt
+ os.rmdir(mntpt)
+
+ class MountbrokerMounter(Mounter):
+ """mounter backend using the mountbroker gluster service"""
+
+ mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE}
+ glusterprog = 'gluster'
+
+ @classmethod
+ def make_cli_argv(cls):
+ return [cls.get_glusterprog()] + gconf.gluster_cli_options.split() + ['system::']
+
+ @classmethod
+ def make_umount_argv(cls, d):
+ return cls.make_cli_argv() + ['umount', d, 'lazy']
+
+ def make_mount_argv(self, label):
+ return self.make_cli_argv() + \
+ ['mount', label, 'user-map-root=' + syncdutils.getusername()] + self.params
+
+ def handle_mounter(self, po):
+ self.mntpt = po.stdout.readline()[:-1]
+ po.stdout.close()
+ sup(self, po)
+ if po.returncode != 0:
+ # if cli terminated with error due to being
+ # refused by glusterd, what it put
+ # out on stdout is a diagnostic message
+ logging.error('glusterd answered: %s' % self.mntpt)
+
+ def connect(self):
+ """inhibit the resource beyond
+
+ Choose mounting backend (direct or mountbroker),
+ set up glusterfs parameters and perform the mount
+ with given backend
+ """
+
+ label = getattr(gconf, 'mountbroker', None)
+ if not label and not privileged():
+ label = syncdutils.getusername()
+ mounter = label and self.MountbrokerMounter or self.DirectMounter
+ params = gconf.gluster_params.split() + \
+ (gconf.gluster_log_level and ['log-level=' + gconf.gluster_log_level] or []) + \
+ ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + self.host,
+ 'volfile-id=' + self.volume, 'client-pid=-1']
+ mounter(params).inhibit(*[l for l in [label] if l])
+
+ def connect_remote(self, *a, **kw):
+ sup(self, *a, **kw)
+ self.slavedir = "/proc/%d/cwd" % self.server.pid()
+
+ def service_loop(self, *args):
+ """enter service loop
+
+ - if slave given, instantiate GMaster and
+ pass control to that instance, which implements
+ master behavior
+ - else do that's what's inherited
+ """
+ if args:
+ gmaster_builder()(self, args[0]).crawl_loop()
+ else:
+ sup(self, *args)
+
+ def rsync(self, files):
+ return sup(self, files, self.slavedir)
+
+
+class SSH(AbstractUrl, SlaveRemote):
+ """scheme class for ssh:// urls
+
+ interface to remote slave on master side
+ implementing an ssh based proxy
+ """
+
+ def __init__(self, path):
+ self.remote_addr, inner_url = sup(self, path,
+ '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ]))
+ self.inner_rsc = parse_url(inner_url)
+
+ def canonical_path(self):
+ m = re.match('([^@]+)@(.+)', self.remote_addr)
+ if m:
+ u, h = m.groups()
+ else:
+ u, h = syncdutils.getusername(), self.remote_addr
+ remote_addr = '@'.join([u, gethostbyname(h)])
+ return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)])
+
+ def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
+ return False
+
+ def start_fd_client(self, *a, **opts):
+ """customizations for client startup
+
+ - be a no-op if we are to daemonize (client startup is deferred
+ to post-daemon stage)
+ - determine target url for rsync after consulting server
+ """
+ if opts.get('deferred'):
+ return a
+ sup(self, *a)
+ ityp = type(self.inner_rsc)
+ if ityp == FILE:
+ slavepath = self.inner_rsc.path
+ elif ityp == GLUSTER:
+ slavepath = "/proc/%d/cwd" % self.server.pid()
+ else:
+ raise NotImplementedError
+ self.slaveurl = ':'.join([self.remote_addr, slavepath])
+
+ def connect_remote(self, go_daemon=None):
+ """connect to inner slave url through outer ssh url
+
+ Wrap the connecting utility in ssh.
+
+ Much care is put into daemonizing: in that case
+ ssh is started before daemonization, but
+ RePCe client is to be created after that (as ssh
+ interactive password auth would be defeated by
+ a daemonized ssh, while client should be present
+ only in the final process). In that case the action
+ is taken apart to two parts, this method is ivoked
+ once pre-daemon, once post-daemon. Use @go_daemon
+ to deiced what part to perform.
+
+ [NB. ATM gluster product does not makes use of interactive
+ authentication.]
+ """
+ if go_daemon == 'done':
+ return self.start_fd_client(*self.fd_pair)
+ gconf.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'))
+ deferred = go_daemon == 'postconn'
+ ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred)
+ if deferred:
+ # send a message to peer so that we can wait for
+ # the answer from which we know connection is
+ # established and we can proceed with daemonization
+ # (doing that too early robs the ssh passwd prompt...)
+ # However, we'd better not start the RepceClient
+ # before daemonization (that's not preserved properly
+ # in daemon), we just do a an ad-hoc linear put/get.
+ i, o = ret
+ inf = os.fdopen(i)
+ repce.send(o, None, '__repce_version__')
+ select((inf,), (), ())
+ repce.recv(inf)
+ # hack hack hack: store a global reference to the file
+ # to save it from getting GC'd which implies closing it
+ gconf.permanent_handles.append(inf)
+ self.fd_pair = (i, o)
+ return 'should'
+
+ def rsync(self, files):
+ return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args),
+ *(gconf.rsync_ssh_options.split() + [self.slaveurl]))
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
new file mode 100644
index 00000000000..0764c07904d
--- /dev/null
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -0,0 +1,288 @@
+import os
+import sys
+import pwd
+import time
+import fcntl
+import shutil
+import logging
+from threading import Lock, Thread as baseThread
+from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, errorcode
+from signal import signal, SIGTERM, SIGKILL
+from time import sleep
+import select as oselect
+from os import waitpid as owaitpid
+try:
+ from cPickle import PickleError
+except ImportError:
+ # py 3
+ from pickle import PickleError
+
+from gconf import gconf
+
+try:
+ # py 3
+ from urllib import parse as urllib
+except ImportError:
+ import urllib
+
+def escape(s):
+ """the chosen flavor of string escaping, used all over
+ to turn whatever data to creatable representation"""
+ return urllib.quote_plus(s)
+
+def unescape(s):
+ """inverse of .escape"""
+ return urllib.unquote_plus(s)
+
+def norm(s):
+ if s:
+ return s.replace('-', '_')
+
+def update_file(path, updater, merger = lambda f: True):
+ """update a file in a transaction-like manner"""
+
+ fr = fw = None
+ try:
+ fd = os.open(path, os.O_CREAT|os.O_RDWR)
+ try:
+ fr = os.fdopen(fd, 'r+b')
+ except:
+ os.close(fd)
+ raise
+ fcntl.lockf(fr, fcntl.LOCK_EX)
+ if not merger(fr):
+ return
+
+ tmpp = path + '.tmp.' + str(os.getpid())
+ fd = os.open(tmpp, os.O_CREAT|os.O_EXCL|os.O_WRONLY)
+ try:
+ fw = os.fdopen(fd, 'wb', 0)
+ except:
+ os.close(fd)
+ raise
+ updater(fw)
+ os.fsync(fd)
+ os.rename(tmpp, path)
+ finally:
+ for fx in (fr, fw):
+ if fx:
+ fx.close()
+
+def grabfile(fname, content=None):
+ """open @fname + contest for its fcntl lock
+
+ @content: if given, set the file content to it
+ """
+ # damn those messy open() mode codes
+ fd = os.open(fname, os.O_CREAT|os.O_RDWR)
+ f = os.fdopen(fd, 'r+b', 0)
+ try:
+ fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB)
+ except:
+ ex = sys.exc_info()[1]
+ f.close()
+ if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
+ # cannot grab, it's taken
+ return
+ raise
+ if content:
+ try:
+ f.truncate()
+ f.write(content)
+ except:
+ f.close()
+ raise
+ gconf.permanent_handles.append(f)
+ return f
+
+def grabpidfile(fname=None, setpid=True):
+ """.grabfile customization for pid files"""
+ if not fname:
+ fname = gconf.pid_file
+ content = None
+ if setpid:
+ content = str(os.getpid()) + '\n'
+ return grabfile(fname, content=content)
+
+final_lock = Lock()
+
+def finalize(*a, **kw):
+ """all those messy final steps we go trough upon termination
+
+ Do away with pidfile, ssh control dir and logging.
+ """
+ final_lock.acquire()
+ if getattr(gconf, 'pid_file', None):
+ rm_pidf = gconf.pid_file_owned
+ if gconf.cpid:
+ # exit path from parent branch of daemonization
+ rm_pidf = False
+ while True:
+ f = grabpidfile(setpid=False)
+ if not f:
+ # child has already taken over pidfile
+ break
+ if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:
+ # child has terminated
+ rm_pidf = True
+ break;
+ time.sleep(0.1)
+ if rm_pidf:
+ try:
+ os.unlink(gconf.pid_file)
+ except:
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ pass
+ else:
+ raise
+ if gconf.ssh_ctl_dir and not gconf.cpid:
+ shutil.rmtree(gconf.ssh_ctl_dir)
+ if getattr(gconf, 'state_socket', None):
+ try:
+ os.unlink(gconf.state_socket)
+ except:
+ if sys.exc_info()[0] == OSError:
+ pass
+ if gconf.log_exit:
+ logging.info("exiting.")
+ sys.stdout.flush()
+ sys.stderr.flush()
+ os._exit(kw.get('exval', 0))
+
+def log_raise_exception(excont):
+ """top-level exception handler
+
+ Try to some fancy things to cover up we face with an error.
+ Translate some weird sounding but well understood exceptions
+ into human-friendly lingo
+ """
+ is_filelog = False
+ for h in logging.getLogger().handlers:
+ fno = getattr(getattr(h, 'stream', None), 'fileno', None)
+ if fno and not os.isatty(fno()):
+ is_filelog = True
+
+ exc = sys.exc_info()[1]
+ if isinstance(exc, SystemExit):
+ excont.exval = exc.code or 0
+ raise
+ else:
+ logtag = None
+ if isinstance(exc, GsyncdError):
+ if is_filelog:
+ logging.error(exc.args[0])
+ sys.stderr.write('failure: ' + exc.args[0] + '\n')
+ elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \
+ ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \
+ exc.errno == EPIPE):
+ logging.error('connection to peer is broken')
+ if hasattr(gconf, 'transport'):
+ gconf.transport.wait()
+ if gconf.transport.returncode == 127:
+ logging.warn("!!!!!!!!!!!!!")
+ logging.warn('!!! getting "No such file or directory" errors '
+ "is most likely due to MISCONFIGURATION, please consult "
+ "http://access.redhat.com/knowledge/docs/en-US/Red_Hat_Storage/2.0/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html")
+ logging.warn("!!!!!!!!!!!!!")
+ gconf.transport.terminate_geterr()
+ elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED):
+ logging.error('glusterfs session went down [%s]', errorcode[exc.errno])
+ else:
+ logtag = "FAIL"
+ if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG):
+ logtag = "FULL EXCEPTION TRACE"
+ if logtag:
+ logging.exception(logtag + ": ")
+ sys.stderr.write("failed with %s.\n" % type(exc).__name__)
+ excont.exval = 1
+ sys.exit(excont.exval)
+
+
+class FreeObject(object):
+ """wildcard class for which any attribute can be set"""
+
+ def __init__(self, **kw):
+ for k,v in kw.items():
+ setattr(self, k, v)
+
+class Thread(baseThread):
+ """thread class flavor for gsyncd
+
+ - always a daemon thread
+ - force exit for whole program if thread
+ function coughs up an exception
+ """
+ def __init__(self, *a, **kw):
+ tf = kw.get('target')
+ if tf:
+ def twrap(*aa):
+ excont = FreeObject(exval = 0)
+ try:
+ tf(*aa)
+ except:
+ try:
+ log_raise_exception(excont)
+ finally:
+ finalize(exval = excont.exval)
+ kw['target'] = twrap
+ baseThread.__init__(self, *a, **kw)
+ self.setDaemon(True)
+
+class GsyncdError(Exception):
+ pass
+
+def getusername(uid = None):
+ if uid == None:
+ uid = os.geteuid()
+ return pwd.getpwuid(uid).pw_name
+
+def privileged():
+ return os.geteuid() == 0
+
+def boolify(s):
+ """
+ Generic string to boolean converter
+
+ return
+ - Quick return if string 's' is of type bool
+ - True if it's in true_list
+ - False if it's in false_list
+ - Warn if it's not present in either and return False
+ """
+ true_list = ['true', 'yes', '1', 'on']
+ false_list = ['false', 'no', '0', 'off']
+
+ if isinstance(s, bool):
+ return s
+
+ rv = False
+ lstr = s.lower()
+ if lstr in true_list:
+ rv = True
+ elif not lstr in false_list:
+ logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s))
+
+ return rv
+
+def eintr_wrap(func, exc, *a):
+ """
+ wrapper around syscalls resilient to interrupt caused
+ by signals
+ """
+ while True:
+ try:
+ return func(*a)
+ except exc:
+ ex = sys.exc_info()[1]
+ if not ex.args[0] == EINTR:
+ raise
+
+def select(*a):
+ return eintr_wrap(oselect.select, oselect.error, *a)
+
+def waitpid (*a):
+ return eintr_wrap(owaitpid, OSError, *a)
+
+def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})):
+ signal(SIGTERM, hook)