summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCsaba Henk <csaba@gluster.com>2011-01-27 05:23:35 +0000
committerAnand V. Avati <avati@dev.gluster.com>2011-01-27 03:17:20 -0800
commit85300e25f2d47e33b169d14fa9eb0b7cfe39011b (patch)
tree6a00e8790358f1321855122d90a78e669168c1d1
parent7d883898c5225df3f7c38e67274b74ff8ac396c0 (diff)
adding syncdaemon
Signed-off-by: Csaba Henk <csaba@gluster.com> Signed-off-by: Anand V. Avati <avati@dev.gluster.com> BUG: 2310 (georeplication) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=2310
-rw-r--r--configure.ac34
-rw-r--r--xlators/features/marker/Makefile.am2
-rw-r--r--xlators/features/marker/utils/Makefile.am7
-rwxr-xr-xxlators/features/marker/utils/gsyncd.in7
-rw-r--r--xlators/features/marker/utils/syncdaemon/Makefile.am5
-rw-r--r--xlators/features/marker/utils/syncdaemon/README.md81
-rw-r--r--xlators/features/marker/utils/syncdaemon/__init__.py0
-rw-r--r--xlators/features/marker/utils/syncdaemon/gconf.py13
-rw-r--r--xlators/features/marker/utils/syncdaemon/gsyncd.py230
-rw-r--r--xlators/features/marker/utils/syncdaemon/master.py240
-rw-r--r--xlators/features/marker/utils/syncdaemon/repce.py150
-rw-r--r--xlators/features/marker/utils/syncdaemon/resource.py372
-rw-r--r--xlators/features/marker/utils/syncdaemon/simplecfg.py77
13 files changed, 1217 insertions, 1 deletions
diff --git a/configure.ac b/configure.ac
index e8d3c07a897..3ce78028f58 100644
--- a/configure.ac
+++ b/configure.ac
@@ -93,6 +93,9 @@ AC_CONFIG_FILES([Makefile
xlators/features/quota/src/Makefile
xlators/features/marker/Makefile
xlators/features/marker/src/Makefile
+ xlators/features/marker/utils/Makefile
+ xlators/features/marker/utils/gsyncd
+ xlators/features/marker/utils/syncdaemon/Makefile
xlators/features/read-only/Makefile
xlators/features/read-only/src/Makefile
xlators/features/mac-compat/Makefile
@@ -281,6 +284,36 @@ AC_SUBST(RDMA_SUBDIR)
# end IBVERBS section
+# SYNCDAEMON section
+AC_ARG_ENABLE([georeplication],
+ AC_HELP_STRING([--disable-georeplication],
+ [Do not install georeplication components]))
+
+BUILD_SYNCDAEMON=no
+if test "x$enable_georeplication" != "xno"; then
+ SYNCDAEMON_SUBDIR=utils
+ BUILD_SYNCDAEMON="yes"
+ AM_PATH_PYTHON([2.4])
+ echo -n "checking if python is python 2.x... "
+ if echo $PYTHON_VERSION | grep ^2; then
+ :
+ else
+ echo no
+ AC_MSG_ERROR([only python 2.x is supported])
+ fi
+ echo -n "checking if python has ctypes support... "
+ if "$PYTHON" -c 'import ctypes' 2>/dev/null; then
+ echo yes
+ else
+ echo no
+ AC_MSG_ERROR([python does not have ctypes support])
+ fi
+fi
+
+AC_SUBST(SYNCDAEMON_SUBDIR)
+# end SYNCDAEMON section
+
+
dnl FreeBSD > 5 has execinfo as a Ported library for giving a workaround
dnl solution to GCC backtrace functionality
@@ -430,4 +463,5 @@ echo "epoll IO multiplex : $BUILD_EPOLL"
echo "argp-standalone : $BUILD_ARGP_STANDALONE"
echo "fusermount : $BUILD_FUSERMOUNT"
echo "readline : $BUILD_READLINE"
+echo "georeplication : $BUILD_SYNCDAEMON"
echo
diff --git a/xlators/features/marker/Makefile.am b/xlators/features/marker/Makefile.am
index a985f42a877..a6ba2de16ae 100644
--- a/xlators/features/marker/Makefile.am
+++ b/xlators/features/marker/Makefile.am
@@ -1,3 +1,3 @@
-SUBDIRS = src
+SUBDIRS = src @SYNCDAEMON_SUBDIR@
CLEANFILES =
diff --git a/xlators/features/marker/utils/Makefile.am b/xlators/features/marker/utils/Makefile.am
new file mode 100644
index 00000000000..8aefea4011b
--- /dev/null
+++ b/xlators/features/marker/utils/Makefile.am
@@ -0,0 +1,7 @@
+SUBDIRS = syncdaemon
+
+gsyncddir = $(libexecdir)
+
+gsyncd_SCRIPTS = gsyncd
+
+CLEANFILES =
diff --git a/xlators/features/marker/utils/gsyncd.in b/xlators/features/marker/utils/gsyncd.in
new file mode 100755
index 00000000000..9bbf8041f36
--- /dev/null
+++ b/xlators/features/marker/utils/gsyncd.in
@@ -0,0 +1,7 @@
+#!/bin/sh
+
+prefix="@prefix@"
+exec_prefix="@exec_prefix@"
+libexecdir=`eval echo "@libexecdir@"`
+
+PYTHONPATH="$libexecdir"/python exec @PYTHON@ -c "from syncdaemon import gsyncd; gsyncd.main()" -c @sysconfdir@/glusterfs/gsyncd.conf "$@"
diff --git a/xlators/features/marker/utils/syncdaemon/Makefile.am b/xlators/features/marker/utils/syncdaemon/Makefile.am
new file mode 100644
index 00000000000..1d5014a9eb3
--- /dev/null
+++ b/xlators/features/marker/utils/syncdaemon/Makefile.am
@@ -0,0 +1,5 @@
+syncdaemondir = $(libexecdir)/python/syncdaemon
+
+syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py resource.py simplecfg.py
+
+CLEANFILES =
diff --git a/xlators/features/marker/utils/syncdaemon/README.md b/xlators/features/marker/utils/syncdaemon/README.md
new file mode 100644
index 00000000000..d45006932d1
--- /dev/null
+++ b/xlators/features/marker/utils/syncdaemon/README.md
@@ -0,0 +1,81 @@
+gsycnd, the Gluster Syncdaemon
+==============================
+
+REQUIREMENTS
+------------
+
+_gsyncd_ is a program which can operate either in _master_ or in _slave_ mode.
+Requirements are categorized according to this.
+
+* supported OS is GNU/Linux
+* Python >= 2.5, or 2.4 with Ctypes (see below) (both)
+* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave)
+* rsync (both)
+* glusterfs with marker support (master); glusterfs (optional on slave)
+* FUSE; for supported versions consult glusterfs
+
+INSTALLATION
+------------
+
+As of now, the supported way of operation is running from the source directory.
+
+If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/).
+
+CONFIGURATION
+-------------
+
+gsyncd tunables are a subset of the long command-line options; for listing them,
+type
+
+ gsyncd.py --help
+
+and see the long options up to "--config-file". (The leading double dash should be omitted;
+interim underscores and dashes are interchangeable.) The set of options bear some resemblance
+to those of glusterfs and rsync.
+
+The config file format matches the following syntax:
+
+ <option1>: <value1>
+ <option2>: <value2>
+ # comment
+
+By default (unless specified by the option `-c`), gsyncd looks for config file at _conf/gsyncd.conf_
+in the source tree.
+
+USAGE
+-----
+
+gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally.
+Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors
+for it with gysncd:
+
+1. _/data/mirror_
+2. local gluster volume _yow_
+3. _/data/far_mirror_ at example.com
+4. gluster volume _moz_ at example.com
+
+The respective gsyncd invocations are (demoing some syntax sugaring):
+
+1.
+
+ gsyncd.py gluster://localhost:pop file:///data/mirror
+
+ or short form
+
+ gsyncd.py :pop /data/mirror
+
+2. `gsyncd :pop :yow`
+3.
+
+ gsyncd.py :pop ssh://example.com:/data/far_mirror
+
+ or short form
+
+ gsyncd.py :pop example.com:/data/far_mirror
+
+4. `gsyncd.py :pop example.com::moz`
+
+gsyncd has to be available on both sides; it's location on the remote side has to be specified
+via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be
+used for setting options on the remote side, although the suggested mode of operation is to
+set parameters like log file / pid file in the configuration file.)
diff --git a/xlators/features/marker/utils/syncdaemon/__init__.py b/xlators/features/marker/utils/syncdaemon/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/xlators/features/marker/utils/syncdaemon/__init__.py
diff --git a/xlators/features/marker/utils/syncdaemon/gconf.py b/xlators/features/marker/utils/syncdaemon/gconf.py
new file mode 100644
index 00000000000..7bedce5148a
--- /dev/null
+++ b/xlators/features/marker/utils/syncdaemon/gconf.py
@@ -0,0 +1,13 @@
+import os
+
+class GConf(object):
+ ssh_ctl_dir = None
+ ssh_ctl_args = None
+ cpid = None
+
+ @classmethod
+ def setup_ssh_ctl(cls, ctld):
+ cls.ssh_ctl_dir = ctld
+ cls.ssh_ctl_args = ["-oControlMaster=auto", "-S", os.path.join(ctld, "gsycnd-ssh-%r@%h:%p")]
+
+gconf = GConf()
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py
new file mode 100644
index 00000000000..f84df502185
--- /dev/null
+++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py
@@ -0,0 +1,230 @@
+#!/usr/bin/env python
+
+import os
+import os.path
+import sys
+import time
+import logging
+import signal
+import select
+import shutil
+import optparse
+from optparse import OptionParser, SUPPRESS_HELP
+from logging import Logger
+from errno import EEXIST, ENOENT
+
+from gconf import gconf
+import resource
+from simplecfg import SimpleCfg
+
+class GLogger(Logger):
+
+ def makeRecord(self, name, level, *a):
+ rv = Logger.makeRecord(self, name, level, *a)
+ rv.nsecs = (rv.created - int(rv.created)) * 1000000
+ fr = sys._getframe(4)
+ callee = fr.f_locals.get('self')
+ if callee:
+ ctx = str(type(callee)).split("'")[1].split('.')[-1]
+ else:
+ ctx = '<top>'
+ if not hasattr(rv, 'funcName'):
+ rv.funcName = fr.f_code.co_name
+ rv.lvlnam = logging.getLevelName(level)[0]
+ rv.ctx = ctx
+ return rv
+
+ @classmethod
+ def setup(cls, **kw):
+ lprm = {'datefmt': "%Y-%m-%d %H:%M:%S",
+ 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s:%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"}
+ lprm.update(kw)
+ lvl = kw.get('level', logging.INFO)
+ if isinstance(lvl, str):
+ lvl = logging.getLevelName(lvl)
+ lprm['level'] = lvl
+ logging.root = cls("root", lvl)
+ logging.setLoggerClass(cls)
+ logging.getLogger().handlers = []
+ logging.basicConfig(**lprm)
+
+
+def startup(**kw):
+ def write_pid(fn):
+ fd = None
+ try:
+ fd = os.open(fn, os.O_CREAT|os.O_TRUNC|os.O_WRONLY|os.O_EXCL)
+ os.write(fd, str(os.getpid()) + '\n')
+ finally:
+ if fd:
+ os.close(fd)
+
+ if kw.get('go_daemon') == 'should':
+ x, y = os.pipe()
+ gconf.cpid = os.fork()
+ if gconf.cpid:
+ os.close(x)
+ sys.exit()
+ os.close(y)
+ # wait for parent to terminate
+ # so we can start up with
+ # no messing from the dirty
+ # ol' bustard
+ select.select((x,), (), ())
+ os.close(x)
+ if getattr(gconf, 'pid_file', None):
+ write_pid(gconf.pid_file + '.tmp')
+ os.rename(gconf.pid_file + '.tmp', gconf.pid_file)
+ os.setsid()
+ dn = os.open(os.devnull, os.O_RDWR)
+ for f in (sys.stdin, sys.stdout, sys.stderr):
+ os.dup2(dn, f.fileno())
+ elif getattr(gconf, 'pid_file', None):
+ try:
+ write_pid(gconf.pid_file)
+ except OSError:
+ gconf.pid_file = None
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST:
+ sys.stderr.write("pidfile is taken, exiting.\n")
+ exit(2)
+ raise
+
+ lkw = {'level': gconf.log_level}
+ if kw.get('log_file'):
+ lkw['filename'] = kw['log_file']
+ GLogger.setup(**lkw)
+
+def finalize(*a):
+ if getattr(gconf, 'pid_file', None):
+ if gconf.cpid:
+ while True:
+ f = open(gconf.pid_file)
+ pid = f.read()
+ f.close()
+ pid = int(pid.strip())
+ if pid == gconf.cpid:
+ break
+ if pid != os.getpid():
+ raise RuntimeError("corrupt pidfile")
+ if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:
+ break;
+ time.sleep(0.1)
+ else:
+ try:
+ os.unlink(gconf.pid_file)
+ except:
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ pass
+ else:
+ raise
+ if gconf.ssh_ctl_dir and not gconf.cpid:
+ shutil.rmtree(gconf.ssh_ctl_dir)
+
+def main():
+ # ??? "finally" clause does not take effect with SIGTERM...
+ # but this handler neither does
+ # signal.signal(signal.SIGTERM, finalize)
+ GLogger.setup()
+ try:
+ try:
+ main_i()
+ except:
+ exc = sys.exc_info()[0]
+ if exc != SystemExit:
+ logging.exception("FAIL: ")
+ sys.stderr.write("failed with %s.\n" % exc.__name__)
+ exit(1)
+ finally:
+ finalize()
+
+def main_i():
+ rconf = {'go_daemon': 'should'}
+
+ def store_abs(opt, optstr, val, parser):
+ setattr(parser.values, opt.dest, os.path.abspath(val))
+ def store_local(opt, optstr, val, parser):
+ rconf[opt.dest] = val
+ def store_local_curry(val):
+ return lambda o, oo, vx, p: store_local(o, oo, val, p)
+
+ op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1")
+ op.add_option('--gluster-command', metavar='CMD', default='glusterfs')
+ op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs)
+ op.add_option('--gluster-log-level', metavar='LVL')
+ op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs)
+ op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs)
+ op.add_option('-L', '--log-level', metavar='LVL')
+ op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0]))
+ op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
+ op.add_option('--rsync-command', metavar='CMD', default='rsync')
+ op.add_option('--rsync-extra', metavar='ARGS', default='-sS', help=SUPPRESS_HELP)
+ op.add_option('--timeout', metavar='SEC', type=int, default=30)
+ op.add_option('--sync-jobs', metavar='N', type=int, default=3)
+ op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
+
+ op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
+ # duh. need to specify dest or value will be mapped to None :S
+ op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True))
+ op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont'))
+ op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a),
+ a[-1].values.__dict__.get('log_level') or \
+ a[-1].values.__dict__.update(log_level='DEBUG')))
+ # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults
+ # -- for this to work out we need to tell apart defaults from explicitly set
+ # options... so churn out the defaults here and call the parser with virgin
+ # values container.
+ defaults = op.get_default_values()
+ opts, args = op.parse_args(values=optparse.Values())
+ if not (len(args) == 2 or (len(args) == 1 and rconf.get('listen'))):
+ sys.stderr.write("error: incorrect number of arguments\n\n")
+ sys.stderr.write(op.get_usage() + "\n")
+ sys.exit(1)
+
+ gconf.__dict__.update(defaults.__dict__)
+ # XXX add global config support
+ if not 'config_file' in rconf:
+ rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf")
+ try:
+ cfg = SimpleCfg()
+ cfg.read(rconf['config_file'])
+ gconf.__dict__.update(cfg)
+ except IOError:
+ ex = sys.exc_info()[1]
+ if ex.errno != ENOENT:
+ raise
+ gconf.__dict__.update(opts.__dict__)
+
+ local = resource.parse_url(args[0])
+ remote = None
+ if len(args) > 1:
+ remote = resource.parse_url(args[1])
+
+ if not local.can_connect_to(remote):
+ raise RuntimeError("%s cannot work with %s" % (local.path, remote and remote.path))
+
+ go_daemon = rconf['go_daemon']
+
+ if isinstance(remote, resource.SSH) and go_daemon == 'should':
+ go_daemon = 'postconn'
+ log_file = None
+ else:
+ log_file = gconf.log_file
+ startup(go_daemon=go_daemon, log_file=log_file)
+
+ logging.info("syncing: %s" % " -> ".join([x.url for x in [local, remote] if x]))
+ if remote:
+ go_daemon = remote.connect_remote(go_daemon=go_daemon)
+ if go_daemon:
+ startup(go_daemon=go_daemon, log_file=gconf.log_file)
+ # complete remote connection in child
+ remote.connect_remote(go_daemon='done')
+ local.connect()
+ local.service_loop(*[r for r in [remote] if r])
+
+ logging.info("exiting.")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py
new file mode 100644
index 00000000000..a2f9f718eb4
--- /dev/null
+++ b/xlators/features/marker/utils/syncdaemon/master.py
@@ -0,0 +1,240 @@
+import os
+import sys
+import time
+import stat
+import signal
+import logging
+import errno
+from errno import ENOENT, ENODATA
+from threading import Thread, currentThread, Condition, Lock
+
+from gconf import gconf
+
+URXTIME = (-1, 0)
+
+class GMaster(object):
+
+ def get_volinfo(self):
+ self.volume_info = self.master.server.volume_info()
+ if self.volume_info['retval']:
+ raise RuntimeError("master is corrupt")
+ return self.volume_info
+
+ @property
+ def uuid(self):
+ if not getattr(self, '_uuid', None):
+ self._uuid = self.volume_info['uuid']
+ return self._uuid
+
+ @property
+ def volmark(self):
+ return self.volume_info['volume_mark']
+
+ def xtime(self, path, *a, **opts):
+ if a:
+ rsc = a[0]
+ else:
+ rsc = self.master
+ if not 'create' in opts:
+ opts['create'] = rsc == self.master
+ xt = rsc.server.xtime(path, self.uuid)
+ if (isinstance(xt, int) or xt < self.volmark) and opts['create']:
+ t = time.time()
+ sec = int(t)
+ nsec = int((t - sec) * 1000000)
+ xt = (sec, nsec)
+ rsc.server.set_xtime(path, self.uuid, xt)
+ if xt == ENODATA:
+ xt = URXTIME
+ return xt
+
+ def __init__(self, master, slave):
+ self.master = master
+ self.slave = slave
+ self.get_volinfo()
+ self.jobtab = {}
+ self.syncer = Syncer(slave)
+ self.total_turns = int(gconf.turns)
+ self.turns = 0
+ self.start = None
+ self.change_seen = None
+ logging.info('master started on ' + self.uuid)
+ while True:
+ self.crawl()
+
+ def add_job(self, path, label, job, *a, **kw):
+ if self.jobtab.get(path) == None:
+ self.jobtab[path] = []
+ self.jobtab[path].append((label, a, lambda : job(*a, **kw)))
+
+ def wait(self, path, mark):
+ jobs = self.jobtab.pop(path, [])
+ succeed = True
+ for j in jobs:
+ ret = j[-1]()
+ if not ret:
+ succeed = False
+ if succeed:
+ self.sendmark(path, mark)
+ return succeed
+
+ def sendmark(self, path, mark):
+ self.slave.server.set_xtime(path, self.uuid, mark)
+
+ def crawl(self, path='.', xtl=None):
+ if path == '.':
+ if self.start:
+ logging.info("crawl took %.6f" % (time.time() - self.start))
+ time.sleep(1)
+ self.start = time.time()
+ logging.info("crawling...")
+ self.get_volinfo()
+ if self.volume_info['uuid'] != self.uuid:
+ raise RuntimeError("master uuid mismatch")
+ logging.debug("entering " + path)
+ if not xtl:
+ xtl = self.xtime(path)
+ xtr0 = self.xtime(path, self.slave)
+ if isinstance(xtr0, int):
+ xtr = URXTIME
+ else:
+ xtr = xtr0
+ if xtr0 == ENOENT:
+ self.slave.server.mkdir(path)
+ else:
+ if xtr > xtl:
+ raise RuntimeError("timestamp corruption for " + path)
+ if xtl == xtr:
+ if path == '.' and self.total_turns and self.change_seen:
+ self.turns += 1
+ self.change_seen = False
+ logging.info("finished turn #%s/%s" % (self.turns, self.total_turns))
+ if self.turns == self.total_turns:
+ logging.info("reached turn limit, terminating.")
+ os.kill(os.getpid(), signal.SIGTERM)
+ return
+ if path == '.':
+ self.change_seen = True
+ dem, des = ( x.server.entries(path) for x in (self.master, self.slave) )
+ dd = set(des) - set(dem)
+ if dd:
+ self.slave.server.purge(path, dd)
+ chld = []
+ for e in dem:
+ e = os.path.join(path, e)
+ xte = self.xtime(e)
+ if isinstance(xte, int):
+ logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte]))
+ elif xte > xtr:
+ chld.append((e, xte))
+ def indulgently(e, fnc, blame=None):
+ if not blame:
+ blame = path
+ try:
+ return fnc(e)
+ except (IOError, OSError):
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ logging.warn("salvaged ENOENT for" + e)
+ self.add_job(blame, 'salvage', lambda: False)
+ return False
+ else:
+ raise
+ for e, xte in chld:
+ mo = indulgently(e, lambda e: os.lstat(e).st_mode)
+ if mo == False:
+ continue
+ if stat.S_ISLNK(mo):
+ self.slave.server.symlink(os.readlink(e), e)
+ self.sendmark(e, xte)
+ elif stat.S_ISREG(mo):
+ logging.debug("syncing %s ..." % e)
+ pb = self.syncer.add(e)
+ def regjob(e, xte, pb):
+ if pb.wait():
+ logging.debug("synced " + e)
+ self.sendmark(e, xte)
+ return True
+ else:
+ logging.error("failed to sync " + e)
+ self.add_job(path, 'reg', regjob, e, xte, pb)
+ elif stat.S_ISDIR(mo):
+ if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte),
+ self.crawl(e, xte),
+ True)[-1], blame=e) == False:
+ continue
+ else:
+ # ignore fifos, sockets and special files
+ pass
+ if path == '.':
+ self.wait(path, xtl)
+
+class BoxClosedErr(Exception):
+ pass
+
+class PostBox(list):
+
+ def __init__(self, *a):
+ list.__init__(self, *a)
+ self.lever = Condition()
+ self.open = True
+ self.done = False
+
+ def wait(self):
+ self.lever.acquire()
+ if not self.done:
+ self.lever.wait()
+ self.lever.release()
+ return self.result
+
+ def wakeup(self, data):
+ self.result = data
+ self.lever.acquire()
+ self.done = True
+ self.lever.notifyAll()
+ self.lever.release()
+
+ def append(self, e):
+ self.lever.acquire()
+ if not self.open:
+ raise BoxClosedErr
+ list.append(self, e)
+ self.lever.release()
+
+ def close(self):
+ self.lever.acquire()
+ self.open = False
+ self.lever.release()
+
+class Syncer(object):
+
+ def __init__(self, slave):
+ self.slave = slave
+ self.lock = Lock()
+ self.pb = PostBox()
+ for i in range(int(gconf.sync_jobs)):
+ t = Thread(target=self.syncjob)
+ t.setDaemon = True
+ t.start()
+
+ def syncjob(self):
+ while True:
+ pb = None
+ while True:
+ self.lock.acquire()
+ if self.pb:
+ pb, self.pb = self.pb, PostBox()
+ self.lock.release()
+ if pb:
+ break
+ time.sleep(0.5)
+ pb.close()
+ pb.wakeup(self.slave.rsync(pb))
+
+ def add(self, e):
+ while True:
+ try:
+ self.pb.append(e)
+ return self.pb
+ except BoxClosedErr:
+ pass
diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py
new file mode 100644
index 00000000000..f878d481a6c
--- /dev/null
+++ b/xlators/features/marker/utils/syncdaemon/repce.py
@@ -0,0 +1,150 @@
+import os
+import sys
+import select
+import time
+import logging
+from threading import Thread, Condition
+try:
+ import thread
+except ImportError:
+ # py 3
+ import _thread as thread
+try:
+ from Queue import Queue
+except ImportError:
+ # py 3
+ from queue import Queue
+try:
+ import cPickle as pickle
+except ImportError:
+ # py 3
+ import pickle
+
+pickle_proto = -1
+
+def ioparse(i, o):
+ if isinstance(i, int):
+ i = os.fdopen(i)
+ # rely on duck typing for recognizing
+ # streams as that works uniformly
+ # in py2 and py3
+ if hasattr(o, 'fileno'):
+ o = o.fileno()
+ return (i, o)
+
+def send(out, *args):
+ os.write(out, pickle.dumps(args, pickle_proto))
+
+def recv(inf):
+ return pickle.load(inf)
+
+
+class RepceServer(object):
+
+ def __init__(self, obj, i, o, wnum=6):
+ self.obj = obj
+ self.inf, self.out = ioparse(i, o)
+ self.wnum = wnum
+ self.q = Queue()
+
+ def service_loop(self):
+ for i in range(self.wnum):
+ t = Thread(target=self.worker)
+ t.setDaemon(True)
+ t.start()
+ try:
+ while True:
+ self.q.put(recv(self.inf))
+ except EOFError:
+ logging.info("terminating on reaching EOF.")
+
+ def worker(self):
+ while True:
+ in_data = self.q.get(True)
+ rid = in_data[0]
+ rmeth = in_data[1]
+ exc = False
+ try:
+ res = getattr(self.obj, rmeth)(*in_data[2:])
+ except:
+ res = sys.exc_info()[1]
+ exc = True
+ logging.exception("call failed: ")
+ send(self.out, rid, exc, res)
+
+
+class RepceJob(object):
+
+ def __init__(self, cbk):
+ self.rid = (os.getpid(), thread.get_ident(), time.time())
+ self.cbk = cbk
+ self.lever = Condition()
+ self.done = False
+
+ def __repr__(self):
+ return ':'.join([str(x) for x in self.rid])
+
+ def wait(self):
+ self.lever.acquire()
+ if not self.done:
+ self.lever.wait()
+ self.lever.release()
+ return self.result
+
+ def wakeup(self, data):
+ self.result = data
+ self.lever.acquire()
+ self.done = True
+ self.lever.notify()
+ self.lever.release()
+
+
+class RepceClient(object):
+
+ def __init__(self, i, o):
+ self.inf, self.out = ioparse(i, o)
+ self.jtab = {}
+ t = Thread(target = self.listen)
+ t.setDaemon(True)
+ t.start()
+
+ def listen(self):
+ while True:
+ select.select((self.inf,), (), ())
+ rid, exc, res = recv(self.inf)
+ rjob = self.jtab.pop(rid)
+ if rjob.cbk:
+ rjob.cbk(rjob, [exc, res])
+
+ def push(self, meth, *args, **kw):
+ cbk = kw.get('cbk')
+ if not cbk:
+ def cbk(rj, res):
+ if res[0]:
+ raise res[1]
+ rjob = RepceJob(cbk)
+ self.jtab[rjob.rid] = rjob
+ logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args)))
+ send(self.out, rjob.rid, meth, *args)
+ return rjob
+
+ def __call__(self, meth, *args):
+ rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
+ exc, res = rjob.wait()
+ if exc:
+ logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__)))
+ raise res
+ logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
+ return res
+
+ class mprx(object):
+
+ def __init__(self, ins, meth):
+ self.ins = ins
+ self.meth = meth
+
+ def __call__(self, *a):
+ return self.ins(self.meth, *a)
+
+ def __getattr__(self, meth):
+ return self.mprx(self, meth)
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py
new file mode 100644
index 00000000000..bbf7459bb55
--- /dev/null
+++ b/xlators/features/marker/utils/syncdaemon/resource.py
@@ -0,0 +1,372 @@
+import re
+import os
+import sys
+import time
+import errno
+import struct
+import select
+import logging
+import tempfile
+import threading
+from ctypes import *
+from ctypes.util import find_library
+from errno import EEXIST, ENOENT, ENODATA, ENOTDIR
+
+from gconf import gconf
+import repce
+from repce import RepceServer, RepceClient
+from master import GMaster
+
+UrlRX = re.compile('\A(\w+)://(.*)')
+HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
+UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
+
+def sup(x, *a, **kw):
+ return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw)
+
+def desugar(ustr):
+ m = re.match('([^:]*):(.*)', ustr)
+ if m:
+ if not m.groups()[0]:
+ return "gluster://localhost" + ustr
+ elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]):
+ return "ssh://" + ustr
+ else:
+ return "gluster://#{str}"
+ else:
+ return "file://" + os.path.abspath(ustr)
+
+def parse_url(ustr):
+ m = UrlRX.match(ustr)
+ if not m:
+ ustr = desugar(ustr)
+ m = UrlRX.match(ustr)
+ if not m:
+ raise RuntimeError("malformed url")
+ sch, path = m.groups()
+ this = sys.modules[__name__]
+ if not hasattr(this, sch.upper()):
+ raise RuntimeError("unknown url scheme " + sch)
+ return getattr(this, sch.upper())(path)
+
+
+class Xattr(object):
+
+ libc = CDLL(find_library("libc"))
+
+ @classmethod
+ def geterrno(cls):
+ return c_int.in_dll(cls.libc, 'errno').value
+
+ @classmethod
+ def raise_oserr(cls):
+ errn = cls.geterrno()
+ raise OSError(errn, os.strerror(errn))
+
+ @classmethod
+ def lgetxattr(cls, path, attr, siz=0):
+ if siz:
+ buf = create_string_buffer('\0' * siz)
+ else:
+ buf = None
+ ret = cls.libc.lgetxattr(path, attr, buf, siz)
+ if ret == -1:
+ cls.raise_oserr()
+ if siz:
+ return buf.raw[:ret]
+ else:
+ return ret
+
+ @classmethod
+ def lsetxattr(cls, path, attr, val):
+ ret = cls.libc.lsetxattr(path, attr, val, len(val), 0)
+ if ret == -1:
+ cls.raise_oserr()
+
+
+class Server(object):
+
+ GX_NSPACE = "trusted.glusterfs"
+
+ @staticmethod
+ def entries(path):
+ try:
+ return os.listdir(path)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOTDIR:
+ return []
+ else:
+ raise
+
+ @classmethod
+ def purge(cls, path, entries=None):
+ me_also = entries == None
+ if not entries:
+ try:
+ entries = os.listdir(path)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOTDIR, ENOENT):
+ try:
+ os.unlink(path)
+ return
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno != ENOENT:
+ raise
+ else:
+ raise
+ for e in entries:
+ cls.purge(os.path.join(path, e))
+ if me_also:
+ os.rmdir(path)
+
+ @classmethod
+ def _create(cls, path, ctor):
+ try:
+ ctor(path)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST:
+ cls.purge(path)
+ return ctor(path)
+ raise
+
+ @classmethod
+ def mkdir(cls, path):
+ cls._create(path, os.mkdir)
+
+ @classmethod
+ def symlink(cls, lnk, path):
+ cls._create(path, lambda p: os.symlink(lnk, p))
+
+ @classmethod
+ def xtime(cls, path, uuid):
+ try:
+ return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA):
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ def set_xtime(cls, path, uuid, mark):
+ Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
+
+ @staticmethod
+ def pid():
+ return os.getpid()
+
+ lastping = 0
+ @classmethod
+ def ping(cls):
+ cls.lastping += 1
+ return cls.lastping
+
+
+class SlaveLocal(object):
+
+ def can_connect_to(self, remote):
+ return not remote
+
+ def service_loop(self):
+ repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
+ t = threading.Thread(target=repce.service_loop)
+ t.setDaemon(True)
+ t.start()
+ logging.info("slave listening")
+ if gconf.timeout and int(gconf.timeout) > 0:
+ while True:
+ lp = self.server.lastping
+ time.sleep(int(gconf.timeout))
+ if lp == self.server.lastping:
+ logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout))
+ break
+ else:
+ select.select((), (), ())
+
+class SlaveRemote(object):
+
+ def connect_remote(self, rargs=[], **opts):
+ slave = opts.get('slave', self.url)
+ ix, ox = os.pipe()
+ iy, oy = os.pipe()
+ pid = os.fork()
+ if not pid:
+ os.close(ox)
+ os.dup2(ix, sys.stdin.fileno())
+ os.close(iy)
+ os.dup2(oy, sys.stdout.fileno())
+ argv = rargs + gconf.remote_gsyncd.split() + ['-N', '--listen', '--timeout', str(gconf.timeout), slave]
+ os.execvp(argv[0], argv)
+ os.close(ix)
+ os.close(oy)
+ return self.start_fd_client(iy, ox, **opts)
+
+ def start_fd_client(self, i, o, **opts):
+ self.server = RepceClient(i, o)
+ if gconf.timeout and int(gconf.timeout) > 0:
+ def pinger():
+ while True:
+ self.server.ping()
+ time.sleep(int(gconf.timeout) * 0.5)
+ t = threading.Thread(target=pinger)
+ t.setDaemon(True)
+ t.start()
+
+ def rsync(self, files, *args):
+ if not files:
+ raise RuntimeError("no files to sync")
+ logging.debug("files: " + ", ".join(files))
+ argv = gconf.rsync_command.split() + gconf.rsync_extra.split() + ['-aR'] + files + list(args)
+ return os.spawnvp(os.P_WAIT, argv[0], argv) == 0
+
+
+class AbstractUrl(object):
+
+ def __init__(self, path, pattern):
+ m = re.search(pattern, path)
+ if not m:
+ raise RuntimeError("malformed path")
+ self.path = path
+ return m.groups()
+
+ def scheme(self):
+ return type(self).__name__.lower()
+
+ @property
+ def url(self):
+ return "://".join((self.scheme(), self.path))
+
+
+ ### Concrete resource classes ###
+
+
+class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
+
+ class FILEServer(Server):
+ pass
+
+ server = FILEServer
+
+ def __init__(self, path):
+ sup(self, path, '^/')
+
+ def connect(self):
+ os.chdir(self.path)
+
+ def rsync(self, files):
+ return sup(self, files, self.path)
+
+
+class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
+
+ class GLUSTERServer(Server):
+
+ @classmethod
+ def volume_info(cls):
+ vm = struct.unpack('!' + 'B'*19 + 'II',
+ Xattr.lgetxattr('.', '.'.join([cls.GX_NSPACE, 'volume-mark']), 27))
+ m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]]))
+ uuid = '-'.join(m.groups())
+ return { 'version': vm[0:2],
+ 'uuid' : uuid,
+ 'retval' : vm[18],
+ 'volume_mark': vm[-2:] }
+
+ server = GLUSTERServer
+
+ def __init__(self, path):
+ self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
+
+ def can_connect_to(self, remote):
+ return True
+
+ def connect(self):
+ d = tempfile.mkdtemp()
+ try:
+ argv = [gconf.gluster_command] + \
+ (gconf.gluster_log_level and ['-L', gConf.gluster_log_level] or []) + \
+ ['-l', gconf.gluster_log_file, '-s', self.host,
+ '--volfile-id', self.volume, '--client-pid=-1', d]
+ if os.spawnvp(os.P_WAIT, argv[0], argv):
+ raise RuntimeError("command failed: " + " ".join(argv))
+ logging.debug('auxiliary glusterfs mount in place')
+ os.chdir(d)
+ argv = ['umount', '-l', d]
+ if os.spawnvp(os.P_WAIT, argv[0], argv):
+ raise RuntimeError("command failed: " + " ".join(argv))
+ finally:
+ try:
+ os.rmdir(d)
+ except:
+ logging.warn('stale mount left behind on ' + d)
+ logging.debug('auxiliary glusterfs mount prepared')
+
+ def connect_remote(self, *a, **kw):
+ sup(self, *a, **kw)
+ self.slavedir = "/proc/%d/cwd" % self.server.pid()
+
+ def service_loop(self, *args):
+ if args:
+ GMaster(self, args[0]).crawl()
+ else:
+ sup(self, *args)
+
+ def rsync(self, files):
+ return sup(self, files, self.slavedir)
+
+
+class SSH(AbstractUrl, SlaveRemote):
+
+ def __init__(self, path):
+ self.remote_addr, inner_url = sup(self, path,
+ '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ]))
+ self.inner_rsc = parse_url(inner_url)
+
+ def can_connect_to(self, remote):
+ return False
+
+ def start_fd_client(self, *a, **opts):
+ if opts['deferred']:
+ return a
+ sup(self, *a)
+ ityp = type(self.inner_rsc)
+ if ityp == FILE:
+ slavepath = self.inner_rsc.path
+ elif ityp == GLUSTER:
+ slavepath = "/proc/%d/cwd" % self.server.pid()
+ else:
+ raise NotImplementedError
+ self.slaveurl = ':'.join([self.remote_addr, slavepath])
+
+ def connect_remote(self, go_daemon=None):
+ if go_daemon == 'done':
+ return self.start_fd_client(*self.fd_pair)
+ gconf.setup_ssh_ctl(tempfile.mkdtemp())
+ deferred = go_daemon == 'postconn'
+ ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred)
+ if deferred:
+ # send a ping to peer so that we can wait for
+ # the answer from which we know connection is
+ # established and we can proceed with daemonization
+ # (doing that too early robs the ssh passwd prompt...)
+ # However, we'd better not start the RepceClient
+ # before daemonization (that's not preserved properly
+ # in daemon), we just do a an ad-hoc linear put/get.
+ i, o = ret
+ inf = os.fdopen(i)
+ repce.send(o, None, 'ping')
+ select.select((inf,), (), ())
+ repce.recv(inf)
+ # hack hack hack: store a global reference to the file
+ # to save it from getting GC'd which implies closing it
+ gconf._in_fd_reference = inf
+ self.fd_pair = (i, o)
+ return 'should'
+
+ def rsync(self, files):
+ return sup(self, files, '-ze', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), self.slaveurl)
diff --git a/xlators/features/marker/utils/syncdaemon/simplecfg.py b/xlators/features/marker/utils/syncdaemon/simplecfg.py
new file mode 100644
index 00000000000..fc3863ef4e7
--- /dev/null
+++ b/xlators/features/marker/utils/syncdaemon/simplecfg.py
@@ -0,0 +1,77 @@
+import re
+import tempfile
+import os
+
+CommentRe = re.compile('\s*(#|$)')
+ParseRe = re.compile('\s*(\S+):\s+(.*\S)\s+$')
+
+class SimpleCfgError(Exception):
+ pass
+
+class SimpleCfg(dict):
+ """
+ Read/write support for a simple config file format.
+ Entries can be of the form "key: value".
+ "#" comments are supported. Whitespace-only lines are ignored.
+ """
+
+ def __init__(self, *a, **kw):
+ dict.__init__(self, *a, **kw)
+ self.klist = dict.keys(self)
+
+ def __setitem__(self, k, v):
+ k = k.replace('-', '_')
+ if not k in self:
+ self.klist.append(k)
+ dict.__setitem__(self, k, v)
+
+ def __iter__(self):
+ return self.klist.__iter__()
+
+ def keys(self):
+ return self.klist
+
+ def pop(self, key, *a):
+ e = dict.pop(self, key, *a)
+ self.klist.remove(key)
+ return e
+
+ def readstream(self, s):
+ while True:
+ l = s.readline()
+ if not l:
+ break
+ m = ParseRe.match(l)
+ if m:
+ k, v = m.groups()
+ self[k] = v
+ elif not CommentRe.match(l):
+ raise SimpleCfgError('syntax error')
+
+ def writestream(self, s):
+ for k in self:
+ s.write('%s: %s\n' % (k, self[k]))
+
+ def read(self, file):
+ f = None
+ try:
+ f = open(file)
+ self.readstream(f)
+ finally:
+ if f:
+ f.close()
+
+ def write(self, file):
+ tfd = None
+ tfil = None
+ try:
+ tfd, tname = tempfile.mkstemp(dir=os.path.dirname(file))
+ tfil, tfd = os.fdopen(tfd, 'w'), None
+ self.writestream(tfil)
+ os.fsync(tfil.fileno())
+ os.rename(tname, file)
+ finally:
+ if tfd != None:
+ os.close(tfd)
+ if tfil != None:
+ tfil.close()