diff options
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon')
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/Makefile.am | 5 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/README.md | 81 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/__init__.py | 0 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/configinterface.py | 140 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/gconf.py | 14 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/gsyncd.py | 306 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/master.py | 273 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/repce.py | 162 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/resource.py | 432 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/syncdutils.py | 11 |
10 files changed, 0 insertions, 1424 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/Makefile.am b/xlators/features/marker/utils/syncdaemon/Makefile.am deleted file mode 100644 index 03ac97625..000000000 --- a/xlators/features/marker/utils/syncdaemon/Makefile.am +++ /dev/null @@ -1,5 +0,0 @@ -syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon - -syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py resource.py configinterface.py syncdutils.py - -CLEANFILES = diff --git a/xlators/features/marker/utils/syncdaemon/README.md b/xlators/features/marker/utils/syncdaemon/README.md deleted file mode 100644 index d45006932..000000000 --- a/xlators/features/marker/utils/syncdaemon/README.md +++ /dev/null @@ -1,81 +0,0 @@ -gsycnd, the Gluster Syncdaemon -============================== - -REQUIREMENTS ------------- - -_gsyncd_ is a program which can operate either in _master_ or in _slave_ mode. -Requirements are categorized according to this. - -* supported OS is GNU/Linux -* Python >= 2.5, or 2.4 with Ctypes (see below) (both) -* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave) -* rsync (both) -* glusterfs with marker support (master); glusterfs (optional on slave) -* FUSE; for supported versions consult glusterfs - -INSTALLATION ------------- - -As of now, the supported way of operation is running from the source directory. - -If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/). - -CONFIGURATION -------------- - -gsyncd tunables are a subset of the long command-line options; for listing them, -type - - gsyncd.py --help - -and see the long options up to "--config-file". (The leading double dash should be omitted; -interim underscores and dashes are interchangeable.) The set of options bear some resemblance -to those of glusterfs and rsync. - -The config file format matches the following syntax: - - <option1>: <value1> - <option2>: <value2> - # comment - -By default (unless specified by the option `-c`), gsyncd looks for config file at _conf/gsyncd.conf_ -in the source tree. - -USAGE ------ - -gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally. -Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors -for it with gysncd: - -1. _/data/mirror_ -2. local gluster volume _yow_ -3. _/data/far_mirror_ at example.com -4. gluster volume _moz_ at example.com - -The respective gsyncd invocations are (demoing some syntax sugaring): - -1. - - gsyncd.py gluster://localhost:pop file:///data/mirror - - or short form - - gsyncd.py :pop /data/mirror - -2. `gsyncd :pop :yow` -3. - - gsyncd.py :pop ssh://example.com:/data/far_mirror - - or short form - - gsyncd.py :pop example.com:/data/far_mirror - -4. `gsyncd.py :pop example.com::moz` - -gsyncd has to be available on both sides; it's location on the remote side has to be specified -via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be -used for setting options on the remote side, although the suggested mode of operation is to -set parameters like log file / pid file in the configuration file.) diff --git a/xlators/features/marker/utils/syncdaemon/__init__.py b/xlators/features/marker/utils/syncdaemon/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/xlators/features/marker/utils/syncdaemon/__init__.py +++ /dev/null diff --git a/xlators/features/marker/utils/syncdaemon/configinterface.py b/xlators/features/marker/utils/syncdaemon/configinterface.py deleted file mode 100644 index cda7da7eb..000000000 --- a/xlators/features/marker/utils/syncdaemon/configinterface.py +++ /dev/null @@ -1,140 +0,0 @@ -try: - import ConfigParser -except ImportError: - # py 3 - import configparser as ConfigParser -import re - -import syncdutils - -SECT_ORD = '__section_order__' -SECT_META = '__meta__' -config_version = 2.0 - -re_type = type(re.compile('')) - -class GConffile(object): - - def __init__(self, path, peers): - self.peers = peers - self.path = path - self.config = ConfigParser.RawConfigParser() - self.config.read(path) - - def section(self, rx=False): - peers = self.peers - if not peers: - peers = ['.', '.'] - rx = True - if rx: - st = 'peersrx' - else: - st = 'peers' - return ' '.join([st] + [syncdutils.escape(u) for u in peers]) - - @staticmethod - def parse_section(section): - sl = section.split() - st = sl.pop(0) - sl = [syncdutils.unescape(u) for u in sl] - if st == 'peersrx': - sl = [re.compile(u) for u in sl] - return sl - - def ord_sections(self): - """Return an ordered list of sections. - - Ordering happens based on the auxiliary - SECT_ORD section storing indices for each - section added through the config API. - - To not to go corrupt in case of manually - written config files, we take care to append - also those sections which are not registered - in SECT_ORD. - - Needed for python 2.{4,5} where ConfigParser - cannot yet order sections/options internally. - """ - so = {} - if self.config.has_section(SECT_ORD): - so = self.config._sections[SECT_ORD] - so2 = {} - for k, v in so.items(): - if k != '__name__': - so2[k] = int(v) - tv = 0 - if so2: - tv = max(so2.values()) + 1 - ss = [s for s in self.config.sections() if s.find('__') != 0] - for s in ss: - if s in so.keys(): - continue - so2[s] = tv - tv += 1 - def scmp(x, y): - return cmp(*(so2[s] for s in (x, y))) - ss.sort(scmp) - return ss - - def update_to(self, dct): - if not self.peers: - raise RuntimeError('no peers given, cannot select matching options') - def update_from_sect(sect): - for k, v in self.config._sections[sect].items(): - if k == '__name__': - continue - k = k.replace('-', '_') - dct[k] = v - for sect in self.ord_sections(): - sp = self.parse_section(sect) - if isinstance(sp[0], re_type) and len(sp) == len(self.peers): - match = True - for i in range(len(sp)): - if not sp[i].search(self.peers[i]): - match = False - break - if match: - update_from_sect(sect) - if self.config.has_section(self.section()): - update_from_sect(self.section()) - - def get(self, opt=None): - d = {} - self.update_to(d) - if opt: - d = {opt: d.get(opt, "")} - for k, v in d.iteritems(): - if k == '__name__': - continue - print("%s: %s" % (k, v)) - - def write(self): - if not self.config.has_section(SECT_META): - self.config.add_section(SECT_META) - self.config.set(SECT_META, 'version', config_version) - f = None - try: - f = open(self.path, 'wb') - self.config.write(f) - finally: - if f: - f.close() - - def set(self, opt, val, rx=False): - sect = self.section(rx) - if not self.config.has_section(sect): - self.config.add_section(sect) - # regarding SECT_ORD, cf. ord_sections - if not self.config.has_section(SECT_ORD): - self.config.add_section(SECT_ORD) - self.config.set(SECT_ORD, sect, len(self.config._sections[SECT_ORD])) - self.config.set(sect, opt, val) - self.write() - - def delete(self, opt, rx=False): - sect = self.section(rx) - if not self.config.has_section(sect): - return - if self.config.remove_option(sect, opt): - self.write() diff --git a/xlators/features/marker/utils/syncdaemon/gconf.py b/xlators/features/marker/utils/syncdaemon/gconf.py deleted file mode 100644 index cec5be078..000000000 --- a/xlators/features/marker/utils/syncdaemon/gconf.py +++ /dev/null @@ -1,14 +0,0 @@ -import os - -class GConf(object): - ssh_ctl_dir = None - ssh_ctl_args = None - cpid = None - permanent_handles = [] - - @classmethod - def setup_ssh_ctl(cls, ctld): - cls.ssh_ctl_dir = ctld - cls.ssh_ctl_args = ["-oControlMaster=auto", "-S", os.path.join(ctld, "gsycnd-ssh-%r@%h:%p")] - -gconf = GConf() diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py deleted file mode 100644 index 1920a05ec..000000000 --- a/xlators/features/marker/utils/syncdaemon/gsyncd.py +++ /dev/null @@ -1,306 +0,0 @@ -#!/usr/bin/env python - -import os -import os.path -import sys -import time -import logging -import signal -import select -import shutil -import optparse -import fcntl -from optparse import OptionParser, SUPPRESS_HELP -from logging import Logger -from errno import EEXIST, ENOENT, EACCES, EAGAIN - -from gconf import gconf -from configinterface import GConffile -import resource - -class GLogger(Logger): - - def makeRecord(self, name, level, *a): - rv = Logger.makeRecord(self, name, level, *a) - rv.nsecs = (rv.created - int(rv.created)) * 1000000 - fr = sys._getframe(4) - callee = fr.f_locals.get('self') - if callee: - ctx = str(type(callee)).split("'")[1].split('.')[-1] - else: - ctx = '<top>' - if not hasattr(rv, 'funcName'): - rv.funcName = fr.f_code.co_name - rv.lvlnam = logging.getLevelName(level)[0] - rv.ctx = ctx - return rv - - @classmethod - def setup(cls, **kw): - if kw.get('slave'): - sls = "(slave)" - else: - sls = "" - lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", - 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + sls + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} - lprm.update(kw) - lvl = kw.get('level', logging.INFO) - lprm['level'] = lvl - logging.root = cls("root", lvl) - logging.setLoggerClass(cls) - logging.getLogger().handlers = [] - logging.basicConfig(**lprm) - - -def grabfile(fname, content=None): - # damn those messy open() mode codes - fd = os.open(fname, os.O_CREAT|os.O_RDWR) - f = os.fdopen(fd, 'r+b', 0) - try: - fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB) - except: - ex = sys.exc_info()[1] - f.close() - if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): - # cannot grab, it's taken - return - raise - if content: - try: - f.truncate() - f.write(content) - except: - f.close() - raise - gconf.permanent_handles.append(f) - return f - -def grabpidfile(fname=None, setpid=True): - if not fname: - fname = gconf.pid_file - content = None - if setpid: - content = str(os.getpid()) + '\n' - return grabfile(fname, content=content) - -def startup(**kw): - if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': - if not grabpidfile(): - sys.stderr.write("pidfile is taken, exiting.\n") - exit(2) - - if kw.get('go_daemon') == 'should': - x, y = os.pipe() - gconf.cpid = os.fork() - if gconf.cpid: - os.close(x) - sys.exit() - os.close(y) - os.setsid() - dn = os.open(os.devnull, os.O_RDWR) - for f in (sys.stdin, sys.stdout, sys.stderr): - os.dup2(dn, f.fileno()) - if getattr(gconf, 'pid_file', None): - if not grabpidfile(gconf.pid_file + '.tmp'): - raise RuntimeError("cannot grap temporary pidfile") - os.rename(gconf.pid_file + '.tmp', gconf.pid_file) - # wait for parent to terminate - # so we can start up with - # no messing from the dirty - # ol' bustard - select.select((x,), (), ()) - os.close(x) - - lkw = {'level': gconf.log_level} - if kw.get('log_file'): - lkw['filename'] = kw['log_file'] - GLogger.setup(slave=kw.get('slave'), **lkw) - -def finalize(*a): - if getattr(gconf, 'pid_file', None): - rm_pidf = True - if gconf.cpid: - # exit path from parent branch of daemonization - rm_pidf = False - while True: - f = grabpidfile(setpid=False) - if not f: - # child has already taken over pidfile - break - if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: - # child has terminated - rm_pidf = True - break; - time.sleep(0.1) - if rm_pidf: - try: - os.unlink(gconf.pid_file) - except: - ex = sys.exc_info()[1] - if ex.errno == ENOENT: - pass - else: - raise - if gconf.ssh_ctl_dir and not gconf.cpid: - shutil.rmtree(gconf.ssh_ctl_dir) - sys.stdout.flush() - sys.stderr.flush() - -def main(): - # ??? "finally" clause does not take effect with SIGTERM... - # but this handler neither does - # signal.signal(signal.SIGTERM, finalize) - GLogger.setup() - try: - try: - main_i() - except: - exc = sys.exc_info()[0] - if exc != SystemExit: - logging.exception("FAIL: ") - sys.stderr.write("failed with %s.\n" % exc.__name__) - sys.exit(1) - finally: - finalize() - # force exit in non-main thread too - os._exit(1) - -def main_i(): - rconf = {'go_daemon': 'should'} - - def store_abs(opt, optstr, val, parser): - setattr(parser.values, opt.dest, os.path.abspath(val)) - def store_local(opt, optstr, val, parser): - rconf[opt.dest] = val - def store_local_curry(val): - return lambda o, oo, vx, p: store_local(o, oo, val, p) - - op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") - op.add_option('--gluster-command', metavar='CMD', default='glusterfs') - op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs) - op.add_option('--gluster-log-level', metavar='LVL') - op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs) - op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs) - op.add_option('-L', '--log-level', metavar='LVL') - op.add_option('-r', '--remote-gsyncd', metavar='CMD', default='/usr/libexec/gsyncd') - op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') - op.add_option('--rsync-command', metavar='CMD', default='rsync') - op.add_option('--rsync-extra', metavar='ARGS', default='-sS', help=SUPPRESS_HELP) - op.add_option('--timeout', metavar='SEC', type=int, default=120) - op.add_option('--sync-jobs', metavar='N', type=int, default=3) - op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) - - op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) - # duh. need to specify dest or value will be mapped to None :S - op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True)) - op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont')) - op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a), - a[-1].values.__dict__.get('log_level') or \ - a[-1].values.__dict__.update(log_level='DEBUG'))) - op.add_option('--config-get', metavar='OPT', type=str, dest='config', action='callback', callback=store_local) - op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_curry(True)) - op.add_option('--config-set', metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback', - callback=lambda o, oo, vx, p: store_local(o, oo, (vx[0], vx[1], False), p)) - op.add_option('--config-set-rx', metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback', - callback=lambda o, oo, vx, p: store_local(o, oo, (vx[0], vx[1], True), p)) - op.add_option('--config-del', metavar='OPT', type=str, dest='config', action='callback', callback=lambda o, oo, vx, p: - store_local(o, oo, (vx, False, False), p)) - op.add_option('--config-del-rx', metavar='OPT', type=str, dest='config', action='callback', callback=lambda o, oo, vx, p: - store_local(o, oo, (vx, False, True), p)) - op.add_option('--canonicalize-url', dest='do_canon', action='callback', callback=store_local_curry('raw')) - op.add_option('--canonicalize-escape-url', dest='do_canon', action='callback', callback=store_local_curry('escaped')) - - # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults - # -- for this to work out we need to tell apart defaults from explicitly set - # options... so churn out the defaults here and call the parser with virgin - # values container. - defaults = op.get_default_values() - opts, args = op.parse_args(values=optparse.Values()) - confdata = rconf.get('config') - if not (len(args) == 2 or \ - (len(args) == 1 and rconf.get('listen')) or \ - (len(args) <= 2 and confdata) or \ - rconf.get('do_canon')): - sys.stderr.write("error: incorrect number of arguments\n\n") - sys.stderr.write(op.get_usage() + "\n") - sys.exit(1) - - if confdata and isinstance(confdata, tuple) and confdata[2]: - # peers are regexen, don't try to parse them - canon_peers = args - else: - rscs = [resource.parse_url(u) for u in args] - dc = rconf.get('do_canon') - if dc: - for r in rscs: - print(r.get_url(canonical=True, escaped=(dc=='escaped'))) - return - local = remote = None - if rscs: - local = rscs[0] - if len(rscs) > 1: - remote = rscs[1] - if not local.can_connect_to(remote): - raise RuntimeError("%s cannot work with %s" % (local.path, remote and remote.path)) - pa = ([], []) - canon = [False, True] - for x in (local, remote): - if x: - for i in range(2): - pa[i].append(x.get_url(canonical=canon[i])) - peers, canon_peers = pa - if not 'config_file' in rconf: - rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf") - gcnf = GConffile(rconf['config_file'], canon_peers) - - if confdata: - if isinstance(confdata, tuple): - if confdata[1]: - gcnf.set(*confdata) - else: - gcnf.delete(confdata[0], confdata[1]) - else: - if confdata == True: - confdata = None - gcnf.get(confdata) - return - - gconf.__dict__.update(defaults.__dict__) - gcnf.update_to(gconf.__dict__) - gconf.__dict__.update(opts.__dict__) - - #normalize loglevel - lvl0 = gconf.log_level - if isinstance(lvl0, str): - lvl1 = lvl0.upper() - lvl2 = logging.getLevelName(lvl1) - # I have _never_ _ever_ seen such an utterly braindead - # error condition - if lvl2 == "Level " + lvl1: - raise RuntimeError('cannot recognize log level "%s"' % lvl0) - gconf.log_level = lvl2 - - go_daemon = rconf['go_daemon'] - - if isinstance(remote, resource.SSH) and go_daemon == 'should': - go_daemon = 'postconn' - log_file = None - else: - log_file = gconf.log_file - startup(go_daemon=go_daemon, log_file=log_file, slave=(not remote)) - - logging.info("syncing: %s" % " -> ".join(peers)) - if remote: - go_daemon = remote.connect_remote(go_daemon=go_daemon) - if go_daemon: - startup(go_daemon=go_daemon, log_file=gconf.log_file) - # complete remote connection in child - remote.connect_remote(go_daemon='done') - local.connect() - local.service_loop(*[r for r in [remote] if r]) - - logging.info("exiting.") - - -if __name__ == "__main__": - main() diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py deleted file mode 100644 index a275f55fb..000000000 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ /dev/null @@ -1,273 +0,0 @@ -import os -import sys -import time -import stat -import signal -import logging -import errno -from errno import ENOENT, ENODATA -from threading import Thread, currentThread, Condition, Lock - -from gconf import gconf - -URXTIME = (-1, 0) - -class GMaster(object): - - def get_volinfo(self): - self.volume_info = self.master.server.volume_info() - if self.volume_info['retval']: - raise RuntimeError("master is corrupt") - return self.volume_info - - @property - def uuid(self): - if not getattr(self, '_uuid', None): - self._uuid = self.volume_info['uuid'] - return self._uuid - - @property - def volmark(self): - return self.volume_info['volume_mark'] - - def xtime(self, path, *a, **opts): - if a: - rsc = a[0] - else: - rsc = self.master - if not 'create' in opts: - opts['create'] = rsc == self.master - xt = rsc.server.xtime(path, self.uuid) - if isinstance(xt, int) and xt != ENODATA: - return xt - if (xt == ENODATA or xt < self.volmark) and opts['create']: - t = time.time() - sec = int(t) - nsec = int((t - sec) * 1000000) - xt = (sec, nsec) - rsc.server.set_xtime(path, self.uuid, xt) - if xt == ENODATA: - xt = URXTIME - return xt - - def __init__(self, master, slave): - self.master = master - self.slave = slave - self.get_volinfo() - self.jobtab = {} - self.syncer = Syncer(slave) - self.total_turns = int(gconf.turns) - self.turns = 0 - self.start = None - self.change_seen = None - logging.info('master started on ' + self.uuid) - while True: - self.crawl() - - def add_job(self, path, label, job, *a, **kw): - if self.jobtab.get(path) == None: - self.jobtab[path] = [] - self.jobtab[path].append((label, a, lambda : job(*a, **kw))) - - def add_failjob(self, path, label): - logging.debug('salvaged: ' + label) - self.add_job(path, label, lambda: False) - - def wait(self, path, *args): - jobs = self.jobtab.pop(path, []) - succeed = True - for j in jobs: - ret = j[-1]() - if not ret: - succeed = False - if succeed: - self.sendmark(path, *args) - return succeed - - def sendmark(self, path, mark, adct=None): - if adct: - self.slave.server.setattr(path, adct) - self.slave.server.set_xtime(path, self.uuid, mark) - - def crawl(self, path='.', xtl=None): - if path == '.': - if self.start: - logging.info("crawl took %.6f" % (time.time() - self.start)) - time.sleep(1) - self.start = time.time() - logging.info("crawling...") - self.get_volinfo() - if self.volume_info['uuid'] != self.uuid: - raise RuntimeError("master uuid mismatch") - logging.debug("entering " + path) - if not xtl: - xtl = self.xtime(path) - if isinstance(xtl, int): - self.add_failjob(path, 'no-local-node') - return - xtr0 = self.xtime(path, self.slave) - if isinstance(xtr0, int): - if xtr0 != ENOENT: - self.slave.server.purge(path) - try: - self.slave.server.mkdir(path) - except OSError: - self.add_failjob(path, 'no-remote-node') - return - xtr = URXTIME - else: - xtr = xtr0 - if xtr > xtl: - raise RuntimeError("timestamp corruption for " + path) - if xtl == xtr: - if path == '.' and self.total_turns and self.change_seen: - self.turns += 1 - self.change_seen = False - logging.info("finished turn #%s/%s" % (self.turns, self.total_turns)) - if self.turns == self.total_turns: - logging.info("reached turn limit, terminating.") - os.kill(os.getpid(), signal.SIGTERM) - return - if path == '.': - self.change_seen = True - try: - dem = self.master.server.entries(path) - except OSError: - self.add_failjob(path, 'local-entries-fail') - return - try: - des = self.slave.server.entries(path) - except OSError: - self.slave.server.purge(path) - try: - self.slave.server.mkdir(path) - des = self.slave.server.entries(path) - except OSError: - self.add_failjob(path, 'remote-entries-fail') - return - dd = set(des) - set(dem) - if dd: - self.slave.server.purge(path, dd) - chld = [] - for e in dem: - e = os.path.join(path, e) - xte = self.xtime(e) - if isinstance(xte, int): - logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) - elif xte > xtr: - chld.append((e, xte)) - def indulgently(e, fnc, blame=None): - if not blame: - blame = path - try: - return fnc(e) - except (IOError, OSError): - ex = sys.exc_info()[1] - if ex.errno == ENOENT: - logging.warn("salvaged ENOENT for" + e) - self.add_failjob(blame, 'by-indulgently') - return False - else: - raise - for e, xte in chld: - st = indulgently(e, lambda e: os.lstat(e)) - if st == False: - continue - mo = st.st_mode - adct = {'own': (st.st_uid, st.st_gid)} - if stat.S_ISLNK(mo): - if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False: - continue - self.sendmark(e, xte, adct) - elif stat.S_ISREG(mo): - logging.debug("syncing %s ..." % e) - pb = self.syncer.add(e) - def regjob(e, xte, pb): - if pb.wait(): - logging.debug("synced " + e) - self.sendmark(e, xte) - return True - else: - logging.error("failed to sync " + e) - self.add_job(path, 'reg', regjob, e, xte, pb) - elif stat.S_ISDIR(mo): - adct['mode'] = mo - if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct), - self.crawl(e, xte), - True)[-1], blame=e) == False: - continue - else: - # ignore fifos, sockets and special files - pass - if path == '.': - self.wait(path, xtl) - -class BoxClosedErr(Exception): - pass - -class PostBox(list): - - def __init__(self, *a): - list.__init__(self, *a) - self.lever = Condition() - self.open = True - self.done = False - - def wait(self): - self.lever.acquire() - if not self.done: - self.lever.wait() - self.lever.release() - return self.result - - def wakeup(self, data): - self.result = data - self.lever.acquire() - self.done = True - self.lever.notifyAll() - self.lever.release() - - def append(self, e): - self.lever.acquire() - if not self.open: - raise BoxClosedErr - list.append(self, e) - self.lever.release() - - def close(self): - self.lever.acquire() - self.open = False - self.lever.release() - -class Syncer(object): - - def __init__(self, slave): - self.slave = slave - self.lock = Lock() - self.pb = PostBox() - for i in range(int(gconf.sync_jobs)): - t = Thread(target=self.syncjob) - t.setDaemon = True - t.start() - - def syncjob(self): - while True: - pb = None - while True: - self.lock.acquire() - if self.pb: - pb, self.pb = self.pb, PostBox() - self.lock.release() - if pb: - break - time.sleep(0.5) - pb.close() - pb.wakeup(self.slave.rsync(pb)) - - def add(self, e): - while True: - try: - self.pb.append(e) - return self.pb - except BoxClosedErr: - pass diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py deleted file mode 100644 index 1b8d0203c..000000000 --- a/xlators/features/marker/utils/syncdaemon/repce.py +++ /dev/null @@ -1,162 +0,0 @@ -import os -import sys -import select -import time -import logging -from threading import Thread, Condition -try: - import thread -except ImportError: - # py 3 - import _thread as thread -try: - from Queue import Queue -except ImportError: - # py 3 - from queue import Queue -try: - import cPickle as pickle -except ImportError: - # py 3 - import pickle - -pickle_proto = -1 -repce_version = 1.0 - -def ioparse(i, o): - if isinstance(i, int): - i = os.fdopen(i) - # rely on duck typing for recognizing - # streams as that works uniformly - # in py2 and py3 - if hasattr(o, 'fileno'): - o = o.fileno() - return (i, o) - -def send(out, *args): - os.write(out, pickle.dumps(args, pickle_proto)) - -def recv(inf): - return pickle.load(inf) - - -class RepceServer(object): - - def __init__(self, obj, i, o, wnum=6): - self.obj = obj - self.inf, self.out = ioparse(i, o) - self.wnum = wnum - self.q = Queue() - - def service_loop(self): - for i in range(self.wnum): - t = Thread(target=self.worker) - t.setDaemon(True) - t.start() - try: - while True: - self.q.put(recv(self.inf)) - except EOFError: - logging.info("terminating on reaching EOF.") - - def worker(self): - while True: - in_data = self.q.get(True) - rid = in_data[0] - rmeth = in_data[1] - exc = False - if rmeth == '__repce_version__': - res = repce_version - else: - try: - res = getattr(self.obj, rmeth)(*in_data[2:]) - except: - res = sys.exc_info()[1] - exc = True - logging.exception("call failed: ") - send(self.out, rid, exc, res) - - -class RepceJob(object): - - def __init__(self, cbk): - self.rid = (os.getpid(), thread.get_ident(), time.time()) - self.cbk = cbk - self.lever = Condition() - self.done = False - - def __repr__(self): - return ':'.join([str(x) for x in self.rid]) - - def wait(self): - self.lever.acquire() - if not self.done: - self.lever.wait() - self.lever.release() - return self.result - - def wakeup(self, data): - self.result = data - self.lever.acquire() - self.done = True - self.lever.notify() - self.lever.release() - - -class RepceClient(object): - - def __init__(self, i, o): - self.inf, self.out = ioparse(i, o) - self.jtab = {} - t = Thread(target = self.listen) - t.setDaemon(True) - t.start() - - def listen(self): - while True: - select.select((self.inf,), (), ()) - rid, exc, res = recv(self.inf) - rjob = self.jtab.pop(rid) - if rjob.cbk: - rjob.cbk(rjob, [exc, res]) - - def push(self, meth, *args, **kw): - cbk = kw.get('cbk') - if not cbk: - def cbk(rj, res): - if res[0]: - raise res[1] - rjob = RepceJob(cbk) - self.jtab[rjob.rid] = rjob - logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args))) - send(self.out, rjob.rid, meth, *args) - return rjob - - def __call__(self, meth, *args): - rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) - exc, res = rjob.wait() - if exc: - logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__))) - raise res - logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res))) - return res - - class mprx(object): - - def __init__(self, ins, meth): - self.ins = ins - self.meth = meth - - def __call__(self, *a): - return self.ins(self.meth, *a) - - def __getattr__(self, meth): - return self.mprx(self, meth) - - def __version__(self): - d = {'proto': self('__repce_version__')} - try: - d['object'] = self('version') - except AttributeError: - pass - return d diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py deleted file mode 100644 index a56b6caa0..000000000 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ /dev/null @@ -1,432 +0,0 @@ -import re -import os -import sys -import stat -import time -import errno -import struct -import select -import socket -import logging -import tempfile -import threading -from ctypes import * -from ctypes.util import find_library -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR - -from gconf import gconf -import repce -from repce import RepceServer, RepceClient -from master import GMaster -import syncdutils - -UrlRX = re.compile('\A(\w+)://(.*)') -HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) -UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") - -def sup(x, *a, **kw): - return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) - -def desugar(ustr): - m = re.match('([^:]*):(.*)', ustr) - if m: - if not m.groups()[0]: - return "gluster://localhost" + ustr - elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]): - return "ssh://" + ustr - else: - return "gluster://#{str}" - else: - return "file://" + os.path.abspath(ustr) - -def parse_url(ustr): - m = UrlRX.match(ustr) - if not m: - ustr = desugar(ustr) - m = UrlRX.match(ustr) - if not m: - raise RuntimeError("malformed url") - sch, path = m.groups() - this = sys.modules[__name__] - if not hasattr(this, sch.upper()): - raise RuntimeError("unknown url scheme " + sch) - return getattr(this, sch.upper())(path) - - -class Xattr(object): - - libc = CDLL(find_library("libc")) - - @classmethod - def geterrno(cls): - return c_int.in_dll(cls.libc, 'errno').value - - @classmethod - def raise_oserr(cls): - errn = cls.geterrno() - raise OSError(errn, os.strerror(errn)) - - @classmethod - def lgetxattr(cls, path, attr, siz=0): - if siz: - buf = create_string_buffer('\0' * siz) - else: - buf = None - ret = cls.libc.lgetxattr(path, attr, buf, siz) - if ret == -1: - cls.raise_oserr() - if siz: - return buf.raw[:ret] - else: - return ret - - @classmethod - def lsetxattr(cls, path, attr, val): - ret = cls.libc.lsetxattr(path, attr, val, len(val), 0) - if ret == -1: - cls.raise_oserr() - - -class Server(object): - - GX_NSPACE = "trusted.glusterfs" - - @staticmethod - def entries(path): - # prevent symlinks being followed - if not stat.S_ISDIR(os.lstat(path).st_mode): - raise OSError(ENOTDIR, os.strerror(ENOTDIR)) - return os.listdir(path) - - @classmethod - def purge(cls, path, entries=None): - me_also = entries == None - if not entries: - try: - # if it's a symlink, prevent - # following it - try: - os.unlink(path) - return - except OSError: - ex = sys.exc_info()[1] - if ex.errno == EISDIR: - entries = os.listdir(path) - else: - raise - except OSError: - ex = sys.exc_info()[1] - if ex.errno in (ENOTDIR, ENOENT, ELOOP): - try: - os.unlink(path) - return - except OSError: - ex = sys.exc_info()[1] - if ex.errno == ENOENT: - return - raise - else: - raise - for e in entries: - cls.purge(os.path.join(path, e)) - if me_also: - os.rmdir(path) - - @classmethod - def _create(cls, path, ctor): - try: - ctor(path) - except OSError: - ex = sys.exc_info()[1] - if ex.errno == EEXIST: - cls.purge(path) - return ctor(path) - raise - - @classmethod - def mkdir(cls, path): - cls._create(path, os.mkdir) - - @classmethod - def symlink(cls, lnk, path): - cls._create(path, lambda p: os.symlink(lnk, p)) - - @classmethod - def xtime(cls, path, uuid): - try: - return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) - except OSError: - ex = sys.exc_info()[1] - if ex.errno in (ENOENT, ENODATA, ENOTDIR): - return ex.errno - else: - raise - - @classmethod - def set_xtime(cls, path, uuid, mark): - Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) - - @staticmethod - def setattr(path, adct): - own = adct.get('own') - if own: - os.lchown(path, *own) - mode = adct.get('mode') - if mode: - os.chmod(path, stat.S_IMODE(mode)) - times = adct.get('times') - if times: - os.utime(path, times) - - @staticmethod - def pid(): - return os.getpid() - - lastping = 0 - @classmethod - def ping(cls): - cls.lastping += 1 - return cls.lastping - - @staticmethod - def version(): - return 1.0 - - -class SlaveLocal(object): - - def can_connect_to(self, remote): - return not remote - - def service_loop(self): - repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) - t = threading.Thread(target=repce.service_loop) - t.setDaemon(True) - t.start() - logging.info("slave listening") - if gconf.timeout and int(gconf.timeout) > 0: - while True: - lp = self.server.lastping - time.sleep(int(gconf.timeout)) - if lp == self.server.lastping: - logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout)) - break - else: - select.select((), (), ()) - -class SlaveRemote(object): - - def connect_remote(self, rargs=[], **opts): - slave = opts.get('slave', self.url) - ix, ox = os.pipe() - iy, oy = os.pipe() - pid = os.fork() - if not pid: - os.close(ox) - os.dup2(ix, sys.stdin.fileno()) - os.close(iy) - os.dup2(oy, sys.stdout.fileno()) - argv = rargs + gconf.remote_gsyncd.split() + ['-N', '--listen', '--timeout', str(gconf.timeout), slave] - os.execvp(argv[0], argv) - os.close(ix) - os.close(oy) - return self.start_fd_client(iy, ox, **opts) - - def start_fd_client(self, i, o, **opts): - self.server = RepceClient(i, o) - rv = self.server.__version__() - exrv = {'proto': repce.repce_version, 'object': Server.version()} - da0 = (rv, exrv) - da1 = ({}, {}) - for i in range(2): - for k, v in da0[i].iteritems(): - da1[i][k] = int(v) - if da1[0] != da1[1]: - raise RuntimeError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) - if gconf.timeout and int(gconf.timeout) > 0: - def pinger(): - while True: - self.server.ping() - time.sleep(int(gconf.timeout) * 0.5) - t = threading.Thread(target=pinger) - t.setDaemon(True) - t.start() - - def rsync(self, files, *args): - if not files: - raise RuntimeError("no files to sync") - logging.debug("files: " + ", ".join(files)) - argv = gconf.rsync_command.split() + gconf.rsync_extra.split() + ['-aR'] + files + list(args) - return os.spawnvp(os.P_WAIT, argv[0], argv) == 0 - - -class AbstractUrl(object): - - def __init__(self, path, pattern): - m = re.search(pattern, path) - if not m: - raise RuntimeError("malformed path") - self.path = path - return m.groups() - - def scheme(self): - return type(self).__name__.lower() - - def canonical_path(self): - return self.path - - def get_url(self, canonical=False, escaped=False): - if canonical: - pa = self.canonical_path() - else: - pa = self.path - u = "://".join((self.scheme(), pa)) - if escaped: - u = syncdutils.escape(u) - return u - - @property - def url(self): - return self.get_url() - - - ### Concrete resource classes ### - - -class FILE(AbstractUrl, SlaveLocal, SlaveRemote): - - class FILEServer(Server): - pass - - server = FILEServer - - def __init__(self, path): - sup(self, path, '^/') - - def connect(self): - os.chdir(self.path) - - def rsync(self, files): - return sup(self, files, self.path) - - -class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): - - class GLUSTERServer(Server): - - @classmethod - def volume_info(cls): - vm = struct.unpack('!' + 'B'*19 + 'II', - Xattr.lgetxattr('.', '.'.join([cls.GX_NSPACE, 'volume-mark']), 27)) - m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]])) - uuid = '-'.join(m.groups()) - return { 'version': vm[0:2], - 'uuid' : uuid, - 'retval' : vm[18], - 'volume_mark': vm[-2:] } - - server = GLUSTERServer - - def __init__(self, path): - self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) - - def canonical_path(self): - return ':'.join([socket.gethostbyname(self.host), self.volume]) - - def can_connect_to(self, remote): - return True - - def connect(self): - d = tempfile.mkdtemp() - try: - argv = [gconf.gluster_command] + \ - (gconf.gluster_log_level and ['-L', gConf.gluster_log_level] or []) + \ - ['-l', gconf.gluster_log_file, '-s', self.host, - '--volfile-id', self.volume, '--client-pid=-1', d] - if os.spawnvp(os.P_WAIT, argv[0], argv): - raise RuntimeError("command failed: " + " ".join(argv)) - logging.debug('auxiliary glusterfs mount in place') - os.chdir(d) - argv = ['umount', '-l', d] - if os.spawnvp(os.P_WAIT, argv[0], argv): - raise RuntimeError("command failed: " + " ".join(argv)) - finally: - try: - os.rmdir(d) - except: - logging.warn('stale mount possibly left behind on ' + d) - logging.debug('auxiliary glusterfs mount prepared') - - def connect_remote(self, *a, **kw): - sup(self, *a, **kw) - self.slavedir = "/proc/%d/cwd" % self.server.pid() - - def service_loop(self, *args): - if args: - GMaster(self, args[0]).crawl() - else: - sup(self, *args) - - def rsync(self, files): - return sup(self, files, self.slavedir) - - -class SSH(AbstractUrl, SlaveRemote): - - def __init__(self, path): - self.remote_addr, inner_url = sup(self, path, - '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) - self.inner_rsc = parse_url(inner_url) - - def canonical_path(self): - m = re.match('([^@]+)@(.+)', self.remote_addr) - if m: - u, h = m.groups() - else: - u, h = os.getlogin(), self.remote_addr - remote_addr = '@'.join([u, socket.gethostbyname(h)]) - return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) - - def can_connect_to(self, remote): - return False - - def start_fd_client(self, *a, **opts): - if opts.get('deferred'): - return a - sup(self, *a) - ityp = type(self.inner_rsc) - if ityp == FILE: - slavepath = self.inner_rsc.path - elif ityp == GLUSTER: - slavepath = "/proc/%d/cwd" % self.server.pid() - else: - raise NotImplementedError - self.slaveurl = ':'.join([self.remote_addr, slavepath]) - - def connect_remote(self, go_daemon=None): - if go_daemon == 'done': - return self.start_fd_client(*self.fd_pair) - gconf.setup_ssh_ctl(tempfile.mkdtemp()) - deferred = go_daemon == 'postconn' - ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred) - if deferred: - # send a message to peer so that we can wait for - # the answer from which we know connection is - # established and we can proceed with daemonization - # (doing that too early robs the ssh passwd prompt...) - # However, we'd better not start the RepceClient - # before daemonization (that's not preserved properly - # in daemon), we just do a an ad-hoc linear put/get. - i, o = ret - inf = os.fdopen(i) - repce.send(o, None, '__repce_version__') - select.select((inf,), (), ()) - repce.recv(inf) - # hack hack hack: store a global reference to the file - # to save it from getting GC'd which implies closing it - gconf.permanent_handles.append(inf) - self.fd_pair = (i, o) - return 'should' - - def rsync(self, files): - return sup(self, files, '-ze', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), self.slaveurl) diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py deleted file mode 100644 index 52dad8c5f..000000000 --- a/xlators/features/marker/utils/syncdaemon/syncdutils.py +++ /dev/null @@ -1,11 +0,0 @@ -try: - # py 3 - from urllib import parse as urllib -except ImportError: - import urllib - -def escape(s): - return urllib.quote_plus(s) - -def unescape(s): - return urllib.unquote_plus(s) |
