summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker/utils/syncdaemon
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon')
-rw-r--r--xlators/features/marker/utils/syncdaemon/Makefile.am5
-rw-r--r--xlators/features/marker/utils/syncdaemon/README.md81
-rw-r--r--xlators/features/marker/utils/syncdaemon/__init__.py0
-rw-r--r--xlators/features/marker/utils/syncdaemon/configinterface.py185
-rw-r--r--xlators/features/marker/utils/syncdaemon/gconf.py15
-rw-r--r--xlators/features/marker/utils/syncdaemon/gsyncd.py302
-rw-r--r--xlators/features/marker/utils/syncdaemon/libcxattr.py62
-rw-r--r--xlators/features/marker/utils/syncdaemon/master.py366
-rw-r--r--xlators/features/marker/utils/syncdaemon/monitor.py80
-rw-r--r--xlators/features/marker/utils/syncdaemon/repce.py162
-rw-r--r--xlators/features/marker/utils/syncdaemon/resource.py467
-rw-r--r--xlators/features/marker/utils/syncdaemon/syncdutils.py160
12 files changed, 0 insertions, 1885 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/Makefile.am b/xlators/features/marker/utils/syncdaemon/Makefile.am
deleted file mode 100644
index ef2dc9aea2c..00000000000
--- a/xlators/features/marker/utils/syncdaemon/Makefile.am
+++ /dev/null
@@ -1,5 +0,0 @@
-syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon
-
-syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py resource.py configinterface.py syncdutils.py monitor.py libcxattr.py
-
-CLEANFILES =
diff --git a/xlators/features/marker/utils/syncdaemon/README.md b/xlators/features/marker/utils/syncdaemon/README.md
deleted file mode 100644
index d45006932d1..00000000000
--- a/xlators/features/marker/utils/syncdaemon/README.md
+++ /dev/null
@@ -1,81 +0,0 @@
-gsycnd, the Gluster Syncdaemon
-==============================
-
-REQUIREMENTS
-------------
-
-_gsyncd_ is a program which can operate either in _master_ or in _slave_ mode.
-Requirements are categorized according to this.
-
-* supported OS is GNU/Linux
-* Python >= 2.5, or 2.4 with Ctypes (see below) (both)
-* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave)
-* rsync (both)
-* glusterfs with marker support (master); glusterfs (optional on slave)
-* FUSE; for supported versions consult glusterfs
-
-INSTALLATION
-------------
-
-As of now, the supported way of operation is running from the source directory.
-
-If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/).
-
-CONFIGURATION
--------------
-
-gsyncd tunables are a subset of the long command-line options; for listing them,
-type
-
- gsyncd.py --help
-
-and see the long options up to "--config-file". (The leading double dash should be omitted;
-interim underscores and dashes are interchangeable.) The set of options bear some resemblance
-to those of glusterfs and rsync.
-
-The config file format matches the following syntax:
-
- <option1>: <value1>
- <option2>: <value2>
- # comment
-
-By default (unless specified by the option `-c`), gsyncd looks for config file at _conf/gsyncd.conf_
-in the source tree.
-
-USAGE
------
-
-gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally.
-Assume we have a gluster volume _pop_ at localhost. We try to set up the following mirrors
-for it with gysncd:
-
-1. _/data/mirror_
-2. local gluster volume _yow_
-3. _/data/far_mirror_ at example.com
-4. gluster volume _moz_ at example.com
-
-The respective gsyncd invocations are (demoing some syntax sugaring):
-
-1.
-
- gsyncd.py gluster://localhost:pop file:///data/mirror
-
- or short form
-
- gsyncd.py :pop /data/mirror
-
-2. `gsyncd :pop :yow`
-3.
-
- gsyncd.py :pop ssh://example.com:/data/far_mirror
-
- or short form
-
- gsyncd.py :pop example.com:/data/far_mirror
-
-4. `gsyncd.py :pop example.com::moz`
-
-gsyncd has to be available on both sides; it's location on the remote side has to be specified
-via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be
-used for setting options on the remote side, although the suggested mode of operation is to
-set parameters like log file / pid file in the configuration file.)
diff --git a/xlators/features/marker/utils/syncdaemon/__init__.py b/xlators/features/marker/utils/syncdaemon/__init__.py
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/xlators/features/marker/utils/syncdaemon/__init__.py
+++ /dev/null
diff --git a/xlators/features/marker/utils/syncdaemon/configinterface.py b/xlators/features/marker/utils/syncdaemon/configinterface.py
deleted file mode 100644
index a170b2236cb..00000000000
--- a/xlators/features/marker/utils/syncdaemon/configinterface.py
+++ /dev/null
@@ -1,185 +0,0 @@
-try:
- import ConfigParser
-except ImportError:
- # py 3
- import configparser as ConfigParser
-import re
-from string import Template
-
-from syncdutils import escape, unescape, norm, update_file
-
-SECT_ORD = '__section_order__'
-SECT_META = '__meta__'
-config_version = 2.0
-
-re_type = type(re.compile(''))
-
-
-class MultiDict(object):
-
- def __init__(self, *dd):
- self.dicts = dd
-
- def __getitem__(self, key):
- val = None
- for d in self.dicts:
- if d.get(key):
- val = d[key]
- if not val:
- raise KeyError(key)
- return val
-
-
-class GConffile(object):
-
- def _normconfig(self):
- for n, s in self.config._sections.items():
- if n.find('__') == 0:
- continue
- s2 = type(s)()
- for k, v in s.items():
- if k.find('__') != 0:
- k = norm(k)
- s2[k] = v
- self.config._sections[n] = s2
-
- def __init__(self, path, peers, *dd):
- self.peers = peers
- self.path = path
- self.auxdicts = dd
- self.config = ConfigParser.RawConfigParser()
- self.config.read(path)
- self._normconfig()
-
- def section(self, rx=False):
- peers = self.peers
- if not peers:
- peers = ['.', '.']
- rx = True
- if rx:
- st = 'peersrx'
- else:
- st = 'peers'
- return ' '.join([st] + [escape(u) for u in peers])
-
- @staticmethod
- def parse_section(section):
- sl = section.split()
- st = sl.pop(0)
- sl = [unescape(u) for u in sl]
- if st == 'peersrx':
- sl = [re.compile(u) for u in sl]
- return sl
-
- def ord_sections(self):
- """Return an ordered list of sections.
-
- Ordering happens based on the auxiliary
- SECT_ORD section storing indices for each
- section added through the config API.
-
- To not to go corrupt in case of manually
- written config files, we take care to append
- also those sections which are not registered
- in SECT_ORD.
-
- Needed for python 2.{4,5} where ConfigParser
- cannot yet order sections/options internally.
- """
- so = {}
- if self.config.has_section(SECT_ORD):
- so = self.config._sections[SECT_ORD]
- so2 = {}
- for k, v in so.items():
- if k != '__name__':
- so2[k] = int(v)
- tv = 0
- if so2:
- tv = max(so2.values()) + 1
- ss = [s for s in self.config.sections() if s.find('__') != 0]
- for s in ss:
- if s in so.keys():
- continue
- so2[s] = tv
- tv += 1
- def scmp(x, y):
- return cmp(*(so2[s] for s in (x, y)))
- ss.sort(scmp)
- return ss
-
- def update_to(self, dct, allow_unresolved=False):
- if not self.peers:
- raise RuntimeError('no peers given, cannot select matching options')
- def update_from_sect(sect, mud):
- for k, v in self.config._sections[sect].items():
- if k == '__name__':
- continue
- if allow_unresolved:
- dct[k] = Template(v).safe_substitute(mud)
- else:
- dct[k] = Template(v).substitute(mud)
- for sect in self.ord_sections():
- sp = self.parse_section(sect)
- if isinstance(sp[0], re_type) and len(sp) == len(self.peers):
- match = True
- mad = {}
- for i in range(len(sp)):
- m = sp[i].search(self.peers[i])
- if not m:
- match = False
- break
- for j in range(len(m.groups())):
- mad['match%d_%d' % (i+1, j+1)] = m.groups()[j]
- if match:
- update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts))
- if self.config.has_section(self.section()):
- update_from_sect(self.section(), MultiDict(dct, mad, *self.auxdicts))
-
- def get(self, opt=None):
- d = {}
- self.update_to(d, allow_unresolved = True)
- if opt:
- opt = norm(opt)
- v = d.get(opt)
- if v:
- print v
- else:
- for k, v in d.iteritems():
- if k == '__name__':
- continue
- print("%s: %s" % (k, v))
-
- def write(self, trfn, opt, *a, **kw):
- def mergeconf(f):
- self.config = ConfigParser.RawConfigParser()
- self.config.readfp(f)
- self._normconfig()
- if not self.config.has_section(SECT_META):
- self.config.add_section(SECT_META)
- self.config.set(SECT_META, 'version', config_version)
- return trfn(norm(opt), *a, **kw)
- def updateconf(f):
- self.config.write(f)
- update_file(self.path, updateconf, mergeconf)
-
- def _set(self, opt, val, rx=False):
- sect = self.section(rx)
- if not self.config.has_section(sect):
- self.config.add_section(sect)
- # regarding SECT_ORD, cf. ord_sections
- if not self.config.has_section(SECT_ORD):
- self.config.add_section(SECT_ORD)
- self.config.set(SECT_ORD, sect, len(self.config._sections[SECT_ORD]))
- self.config.set(sect, opt, val)
- return True
-
- def set(self, opt, *a, **kw):
- self.write(self._set, opt, *a, **kw)
-
- def _delete(self, opt, rx=False):
- sect = self.section(rx)
- if self.config.has_section(sect):
- return self.config.remove_option(sect, opt)
-
- def delete(self, opt, *a, **kw):
- self.write(self._delete, opt, *a, **kw)
diff --git a/xlators/features/marker/utils/syncdaemon/gconf.py b/xlators/features/marker/utils/syncdaemon/gconf.py
deleted file mode 100644
index 24165b6191a..00000000000
--- a/xlators/features/marker/utils/syncdaemon/gconf.py
+++ /dev/null
@@ -1,15 +0,0 @@
-import os
-
-class GConf(object):
- ssh_ctl_dir = None
- ssh_ctl_args = None
- cpid = None
- pid_file_owned = False
- permanent_handles = []
-
- @classmethod
- def setup_ssh_ctl(cls, ctld):
- cls.ssh_ctl_dir = ctld
- cls.ssh_ctl_args = ["-oControlMaster=auto", "-S", os.path.join(ctld, "gsycnd-ssh-%r@%h:%p")]
-
-gconf = GConf()
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py
deleted file mode 100644
index 193af9d5f37..00000000000
--- a/xlators/features/marker/utils/syncdaemon/gsyncd.py
+++ /dev/null
@@ -1,302 +0,0 @@
-#!/usr/bin/env python
-
-import os
-import os.path
-import sys
-import time
-import logging
-import signal
-import select
-import optparse
-import fcntl
-from optparse import OptionParser, SUPPRESS_HELP
-from logging import Logger
-from errno import EEXIST, ENOENT
-
-from gconf import gconf
-from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception
-from configinterface import GConffile
-import resource
-from monitor import monitor
-
-class GLogger(Logger):
-
- def makeRecord(self, name, level, *a):
- rv = Logger.makeRecord(self, name, level, *a)
- rv.nsecs = (rv.created - int(rv.created)) * 1000000
- fr = sys._getframe(4)
- callee = fr.f_locals.get('self')
- if callee:
- ctx = str(type(callee)).split("'")[1].split('.')[-1]
- else:
- ctx = '<top>'
- if not hasattr(rv, 'funcName'):
- rv.funcName = fr.f_code.co_name
- rv.lvlnam = logging.getLevelName(level)[0]
- rv.ctx = ctx
- return rv
-
- @classmethod
- def setup(cls, **kw):
- lbl = kw.get('label', "")
- if lbl:
- lbl = '(' + lbl + ')'
- lprm = {'datefmt': "%Y-%m-%d %H:%M:%S",
- 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"}
- lprm.update(kw)
- lvl = kw.get('level', logging.INFO)
- lprm['level'] = lvl
- logging.root = cls("root", lvl)
- logging.setLoggerClass(cls)
- logging.getLogger().handlers = []
- logging.basicConfig(**lprm)
-
-
-def startup(**kw):
- if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn':
- if not grabpidfile():
- sys.stderr.write("pidfile is taken, exiting.\n")
- sys.exit(2)
- gconf.pid_file_owned = True
-
- if kw.get('go_daemon') == 'should':
- x, y = os.pipe()
- gconf.cpid = os.fork()
- if gconf.cpid:
- os.close(x)
- sys.exit()
- os.close(y)
- os.setsid()
- dn = os.open(os.devnull, os.O_RDWR)
- for f in (sys.stdin, sys.stdout, sys.stderr):
- os.dup2(dn, f.fileno())
- if getattr(gconf, 'pid_file', None):
- if not grabpidfile(gconf.pid_file + '.tmp'):
- raise RuntimeError("cannot grap temporary pidfile")
- os.rename(gconf.pid_file + '.tmp', gconf.pid_file)
- # wait for parent to terminate
- # so we can start up with
- # no messing from the dirty
- # ol' bustard
- select.select((x,), (), ())
- os.close(x)
-
- lkw = {}
- if gconf.log_level:
- lkw['level'] = gconf.log_level
- if kw.get('log_file'):
- if kw['log_file'] in ('-', '/dev/stderr'):
- lkw['stream'] = sys.stderr
- elif kw['log_file'] == '/dev/stdout':
- lkw['stream'] = sys.stdout
- else:
- lkw['filename'] = kw['log_file']
- GLogger.setup(label=kw.get('label'), **lkw)
-
-def main():
- signal.signal(signal.SIGTERM, lambda *a: finalize(*a, **{'exval': 1}))
- GLogger.setup()
- excont = FreeObject(exval = 0)
- try:
- try:
- main_i()
- except:
- log_raise_exception(excont)
- finally:
- finalize(exval = excont.exval)
-
-def main_i():
- rconf = {'go_daemon': 'should'}
-
- def store_abs(opt, optstr, val, parser):
- if val and val != '-':
- val = os.path.abspath(val)
- setattr(parser.values, opt.dest, val)
- def store_local(opt, optstr, val, parser):
- rconf[opt.dest] = val
- def store_local_curry(val):
- return lambda o, oo, vx, p: store_local(o, oo, val, p)
- def store_local_obj(op, dmake):
- return lambda o, oo, vx, p: store_local(o, oo, FreeObject(op=op, **dmake(vx)), p)
-
- op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1")
- op.add_option('--gluster-command', metavar='CMD', default='glusterfs')
- op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs)
- op.add_option('--gluster-log-level', metavar='LVL')
- op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs)
- op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs)
- op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs)
- op.add_option('-L', '--log-level', metavar='LVL')
- op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0]))
- op.add_option('--volume-id', metavar='UUID')
- op.add_option('--session-owner', metavar='ID')
- op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
- op.add_option('--rsync-command', metavar='CMD', default='rsync')
- op.add_option('--rsync-extra', metavar='ARGS', default='-sS', help=SUPPRESS_HELP)
- op.add_option('--timeout', metavar='SEC', type=int, default=120)
- op.add_option('--sync-jobs', metavar='N', type=int, default=3)
- op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
-
- op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
- # duh. need to specify dest or value will be mapped to None :S
- op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True))
- op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local)
- op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True))
- op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont'))
- op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a),
- setattr(a[-1].values, 'log_file', '-'),
- setattr(a[-1].values, 'log_level', 'DEBUG'))),
-
- for a in ('check', 'get'):
- op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback',
- callback=store_local_obj(a, lambda vx: {'opt': vx}))
- op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_obj('get', lambda vx: {'opt': None}))
- for m in ('', '-rx'):
- # call this code 'Pythonic' eh?
- # have to define a one-shot local function to be able to inject (a value depending on the)
- # iteration variable into the inner lambda
- def conf_mod_opt_regex_variant(rx):
- op.add_option('--config-set' + m, metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback',
- callback=store_local_obj('set', lambda vx: {'opt': vx[0], 'val': vx[1], 'rx': rx}))
- op.add_option('--config-del' + m, metavar='OPT', type=str, dest='config', action='callback',
- callback=store_local_obj('del', lambda vx: {'opt': vx, 'rx': rx}))
- conf_mod_opt_regex_variant(not not m)
-
- op.add_option('--canonicalize-url', dest='do_canon', action='callback', callback=store_local_curry('raw'))
- op.add_option('--canonicalize-escape-url', dest='do_canon', action='callback', callback=store_local_curry('escaped'))
-
- tunables = [ norm(o.get_opt_string()[2:]) for o in op.option_list if o.callback in (store_abs, None) and o.get_opt_string() not in ('--version', '--help') ]
-
- # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults
- # -- for this to work out we need to tell apart defaults from explicitly set
- # options... so churn out the defaults here and call the parser with virgin
- # values container.
- defaults = op.get_default_values()
- opts, args = op.parse_args(values=optparse.Values())
- confdata = rconf.get('config')
- if not (len(args) == 2 or \
- (len(args) == 1 and rconf.get('listen')) or \
- (len(args) <= 2 and confdata) or \
- rconf.get('do_canon')):
- sys.stderr.write("error: incorrect number of arguments\n\n")
- sys.stderr.write(op.get_usage() + "\n")
- sys.exit(1)
-
- if getattr(confdata, 'rx', None):
- # peers are regexen, don't try to parse them
- canon_peers = args
- namedict = {}
- else:
- rscs = [resource.parse_url(u) for u in args]
- dc = rconf.get('do_canon')
- if dc:
- for r in rscs:
- print(r.get_url(canonical=True, escaped=(dc=='escaped')))
- return
- local = remote = None
- if rscs:
- local = rscs[0]
- if len(rscs) > 1:
- remote = rscs[1]
- if not local.can_connect_to(remote):
- raise RuntimeError("%s cannot work with %s" % (local.path, remote and remote.path))
- pa = ([], [], [])
- urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True})
- for x in rscs:
- for i in range(len(pa)):
- pa[i].append(x.get_url(**urlprms[i]))
- peers, canon_peers, canon_esc_peers = pa
- # creating the namedict, a dict representing various ways of referring to / repreenting
- # peers to be fillable in config templates
- mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:])
- if remote:
- rmap = { local: ('local', 'master'), remote: ('remote', 'slave') }
- else:
- rmap = { local: ('local', 'slave') }
- namedict = {}
- for i in range(len(rscs)):
- x = rscs[i]
- for name in rmap[x]:
- for j in range(3):
- namedict[mods[j](name)] = pa[j][i]
- if x.scheme == 'gluster':
- namedict[name + 'vol'] = x.volume
- if not 'config_file' in rconf:
- rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd.conf")
- gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict)
-
- if confdata:
- opt_ok = norm(confdata.opt) in tunables + [None]
- if confdata.op == 'check':
- if opt_ok:
- sys.exit(0)
- else:
- sys.exit(1)
- elif not opt_ok:
- raise RuntimeError("not a valid option: " + confdata.opt)
- if confdata.op == 'get':
- gcnf.get(confdata.opt)
- elif confdata.op == 'set':
- gcnf.set(confdata.opt, confdata.val, confdata.rx)
- elif confdata.op == 'del':
- gcnf.delete(confdata.opt, confdata.rx)
- return
-
- gconf.__dict__.update(defaults.__dict__)
- gcnf.update_to(gconf.__dict__)
- gconf.__dict__.update(opts.__dict__)
- gconf.configinterface = gcnf
-
- ffd = rconf.get('feedback_fd')
- if ffd:
- fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
-
- #normalize loglevel
- lvl0 = gconf.log_level
- if isinstance(lvl0, str):
- lvl1 = lvl0.upper()
- lvl2 = logging.getLevelName(lvl1)
- # I have _never_ _ever_ seen such an utterly braindead
- # error condition
- if lvl2 == "Level " + lvl1:
- raise RuntimeError('cannot recognize log level "%s"' % lvl0)
- gconf.log_level = lvl2
-
- go_daemon = rconf['go_daemon']
- be_monitor = rconf.get('monitor')
-
- if not be_monitor and isinstance(remote, resource.SSH) and \
- go_daemon == 'should':
- go_daemon = 'postconn'
- log_file = None
- else:
- log_file = gconf.log_file
- if be_monitor:
- label = 'monitor'
- elif remote:
- #master
- label = ''
- else:
- label = 'slave'
- startup(go_daemon=go_daemon, log_file=log_file, label=label)
-
- if be_monitor:
- return monitor()
-
- logging.info("syncing: %s" % " -> ".join(peers))
- if remote:
- go_daemon = remote.connect_remote(go_daemon=go_daemon)
- if go_daemon:
- startup(go_daemon=go_daemon, log_file=gconf.log_file)
- # complete remote connection in child
- remote.connect_remote(go_daemon='done')
- local.connect()
- if ffd:
- os.close(ffd)
- local.service_loop(*[r for r in [remote] if r])
-
- logging.info("exiting.")
-
-
-if __name__ == "__main__":
- main()
diff --git a/xlators/features/marker/utils/syncdaemon/libcxattr.py b/xlators/features/marker/utils/syncdaemon/libcxattr.py
deleted file mode 100644
index fdc016c47ce..00000000000
--- a/xlators/features/marker/utils/syncdaemon/libcxattr.py
+++ /dev/null
@@ -1,62 +0,0 @@
-import os
-from ctypes import *
-from ctypes.util import find_library
-
-class Xattr(object):
-
- libc = CDLL(find_library("libc"))
-
- @classmethod
- def geterrno(cls):
- return c_int.in_dll(cls.libc, 'errno').value
-
- @classmethod
- def raise_oserr(cls):
- errn = cls.geterrno()
- raise OSError(errn, os.strerror(errn))
-
- @classmethod
- def _query_xattr(cls, path, siz, syscall, *a):
- if siz:
- buf = create_string_buffer('\0' * siz)
- else:
- buf = None
- ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz)))
- if ret == -1:
- cls.raise_oserr()
- if siz:
- return buf.raw[:ret]
- else:
- return ret
-
- @classmethod
- def lgetxattr(cls, path, attr, siz=0):
- return cls._query_xattr( path, siz, 'lgetxattr', attr)
-
- @classmethod
- def llistxattr(cls, path, siz=0):
- ret = cls._query_xattr(path, siz, 'llistxattr')
- if isinstance(ret, str):
- ret = ret.split('\0')
- return ret
-
- @classmethod
- def lsetxattr(cls, path, attr, val):
- ret = cls.libc.lsetxattr(path, attr, val, len(val), 0)
- if ret == -1:
- cls.raise_oserr()
-
- @classmethod
- def lremovexattr(cls, path, attr):
- ret = cls.libc.lremovexattr(path, attr)
- if ret == -1:
- cls.raise_oserr()
-
- @classmethod
- def llistxattr_buf(cls, path):
- size = cls.llistxattr(path)
- if size == -1:
- cls.raise_oserr()
- if size == 0:
- return []
- return cls.llistxattr(path, size)
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py
deleted file mode 100644
index 35dc4ee06aa..00000000000
--- a/xlators/features/marker/utils/syncdaemon/master.py
+++ /dev/null
@@ -1,366 +0,0 @@
-import os
-import sys
-import time
-import stat
-import signal
-import logging
-import errno
-from errno import ENOENT, ENODATA
-from threading import currentThread, Condition, Lock
-
-from gconf import gconf
-from syncdutils import FreeObject, Thread
-
-URXTIME = (-1, 0)
-
-class GMaster(object):
-
- KFGN = 0
- KNAT = 1
-
- def get_sys_volinfo(self):
- fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \
- self.master.server.native_volume_info()
- fgn_vi = None
- if fgn_vis:
- if len(fgn_vis) > 1:
- raise RuntimeError("cannot work with multiple foreign masters")
- fgn_vi = fgn_vis[0]
- return fgn_vi, nat_vi
-
- @property
- def uuid(self):
- if self.volinfo:
- return self.volinfo['uuid']
-
- @property
- def volmark(self):
- if self.volinfo:
- return self.volinfo['volume_mark']
-
- @property
- def inter_master(self):
- return self.volinfo_state[self.KFGN] and True or False
-
- def xtime(self, path, *a, **opts):
- if a:
- rsc = a[0]
- else:
- rsc = self.master
- if not 'create' in opts:
- opts['create'] = (rsc == self.master and not self.inter_master)
- if not 'default_xtime' in opts:
- if rsc == self.master and self.inter_master:
- opts['default_xtime'] = ENODATA
- else:
- opts['default_xtime'] = URXTIME
- xt = rsc.server.xtime(path, self.uuid)
- if isinstance(xt, int) and xt != ENODATA:
- return xt
- invalid_xtime = (xt == ENODATA or xt < self.volmark)
- if invalid_xtime and opts['create']:
- t = time.time()
- sec = int(t)
- nsec = int((t - sec) * 1000000)
- xt = (sec, nsec)
- rsc.server.set_xtime(path, self.uuid, xt)
- if invalid_xtime:
- xt = opts['default_xtime']
- return xt
-
- def __init__(self, master, slave):
- self.master = master
- self.slave = slave
- self.jobtab = {}
- self.syncer = Syncer(slave)
- self.total_turns = int(gconf.turns)
- self.turns = 0
- self.start = None
- self.change_seen = None
- # the authorative (foreign, native) volinfo pair
- # which lets us deduce what to do when we refetch
- # the volinfos from system
- uuid_preset = getattr(gconf, 'volume_id', None)
- self.volinfo_state = (uuid_preset and {'uuid': uuid_preset}, None)
- # the actual volinfo we make use of
- self.volinfo = None
- self.terminate = False
-
- def crawl_loop(self):
- timo = int(gconf.timeout or 0)
- if timo > 0:
- def keep_alive():
- while True:
- gap = timo * 0.5
- # first grab a reference as self.volinfo
- # can be changed in main thread
- vi = self.volinfo
- if vi:
- # then have a private copy which we can mod
- vi = vi.copy()
- vi['timeout'] = int(time.time()) + timo
- else:
- # send keep-alives more frequently to
- # avoid a delay in announcing our volume info
- # to slave if it becomes established in the
- # meantime
- gap = min(10, gap)
- self.slave.server.keep_alive(vi)
- time.sleep(gap)
- t = Thread(target=keep_alive)
- t.start()
- while not self.terminate:
- self.crawl()
-
- def add_job(self, path, label, job, *a, **kw):
- if self.jobtab.get(path) == None:
- self.jobtab[path] = []
- self.jobtab[path].append((label, a, lambda : job(*a, **kw)))
-
- def add_failjob(self, path, label):
- logging.debug('salvaged: ' + label)
- self.add_job(path, label, lambda: False)
-
- def wait(self, path, *args):
- jobs = self.jobtab.pop(path, [])
- succeed = True
- for j in jobs:
- ret = j[-1]()
- if not ret:
- succeed = False
- if succeed:
- self.sendmark(path, *args)
- return succeed
-
- def sendmark(self, path, mark, adct=None):
- if adct:
- self.slave.server.setattr(path, adct)
- self.slave.server.set_xtime(path, self.uuid, mark)
-
- @staticmethod
- def volinfo_state_machine(volinfo_state, volinfo_sys):
- # store the value below "boxed" to emulate proper closures
- # (variables of the enclosing scope are available inner functions
- # provided they are no reassigned; mutation is OK).
- param = FreeObject(relax_mismatch = False, state_change = None, index=-1)
- def select_vi(vi0, vi):
- param.index += 1
- if vi and (not vi0 or vi0['uuid'] == vi['uuid']):
- if not vi0 and not param.relax_mismatch:
- param.state_change = param.index
- # valid new value found; for the rest, we are graceful about
- # uuid mismatch
- param.relax_mismatch = True
- return vi
- if vi0 and vi and vi0['uuid'] != vi['uuid'] and not param.relax_mismatch:
- # uuid mismatch for master candidate, bail out
- raise RuntimeError("aborting on uuid change from %s to %s" % \
- (vi0['uuid'], vi['uuid']))
- # fall back to old
- return vi0
- newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys))
- srep = lambda vi: vi and vi['uuid'][0:8]
- logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \
- tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate))
- return newstate, param.state_change
-
- def crawl(self, path='.', xtl=None):
- if path == '.':
- if self.start:
- logging.info("... done, took %.6f seconds" % (time.time() - self.start))
- time.sleep(1)
- self.start = time.time()
- volinfo_sys = self.get_sys_volinfo()
- self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state,
- volinfo_sys)
- if self.inter_master:
- self.volinfo = volinfo_sys[self.KFGN]
- else:
- self.volinfo = volinfo_sys[self.KNAT]
- if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master):
- logging.info('new master is %s', self.uuid)
- if state_change == self.KFGN:
- gconf.configinterface.set('volume_id', self.uuid)
- if self.volinfo:
- if self.volinfo['retval']:
- raise RuntimeError ("master is corrupt")
- logging.info("%s master with volume id %s ..." % \
- (self.inter_master and "intermediate" or "primary", self.uuid))
- else:
- if self.inter_master:
- logging.info("waiting for being synced from %s ..." % \
- self.volinfo_state[self.KFGN]['uuid'])
- else:
- logging.info("waiting for volume info ...")
- return
- logging.debug("entering " + path)
- if not xtl:
- xtl = self.xtime(path)
- if isinstance(xtl, int):
- self.add_failjob(path, 'no-local-node')
- return
- xtr0 = self.xtime(path, self.slave)
- if isinstance(xtr0, int):
- if xtr0 != ENOENT:
- self.slave.server.purge(path)
- try:
- self.slave.server.mkdir(path)
- except OSError:
- self.add_failjob(path, 'no-remote-node')
- return
- xtr = URXTIME
- else:
- xtr = xtr0
- if xtr > xtl:
- raise RuntimeError("timestamp corruption for " + path)
- if xtl == xtr:
- if path == '.' and self.total_turns and self.change_seen:
- self.turns += 1
- self.change_seen = False
- logging.info("finished turn #%s/%s" % (self.turns, self.total_turns))
- if self.turns == self.total_turns:
- logging.info("reached turn limit")
- self.terminate = True
- return
- if path == '.':
- self.change_seen = True
- try:
- dem = self.master.server.entries(path)
- except OSError:
- self.add_failjob(path, 'local-entries-fail')
- return
- try:
- des = self.slave.server.entries(path)
- except OSError:
- self.slave.server.purge(path)
- try:
- self.slave.server.mkdir(path)
- des = self.slave.server.entries(path)
- except OSError:
- self.add_failjob(path, 'remote-entries-fail')
- return
- dd = set(des) - set(dem)
- if dd:
- self.slave.server.purge(path, dd)
- chld = []
- for e in dem:
- e = os.path.join(path, e)
- xte = self.xtime(e)
- if isinstance(xte, int):
- logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte]))
- elif xte > xtr:
- chld.append((e, xte))
- def indulgently(e, fnc, blame=None):
- if not blame:
- blame = path
- try:
- return fnc(e)
- except (IOError, OSError):
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- logging.warn("salvaged ENOENT for" + e)
- self.add_failjob(blame, 'by-indulgently')
- return False
- else:
- raise
- for e, xte in chld:
- st = indulgently(e, lambda e: os.lstat(e))
- if st == False:
- continue
- mo = st.st_mode
- adct = {'own': (st.st_uid, st.st_gid)}
- if stat.S_ISLNK(mo):
- if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False:
- continue
- self.sendmark(e, xte, adct)
- elif stat.S_ISREG(mo):
- logging.debug("syncing %s ..." % e)
- pb = self.syncer.add(e)
- def regjob(e, xte, pb):
- if pb.wait():
- logging.debug("synced " + e)
- self.sendmark(e, xte)
- return True
- else:
- logging.error("failed to sync " + e)
- self.add_job(path, 'reg', regjob, e, xte, pb)
- elif stat.S_ISDIR(mo):
- adct['mode'] = mo
- if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct),
- self.crawl(e, xte),
- True)[-1], blame=e) == False:
- continue
- else:
- # ignore fifos, sockets and special files
- pass
- if path == '.':
- self.wait(path, xtl)
-
-class BoxClosedErr(Exception):
- pass
-
-class PostBox(list):
-
- def __init__(self, *a):
- list.__init__(self, *a)
- self.lever = Condition()
- self.open = True
- self.done = False
-
- def wait(self):
- self.lever.acquire()
- if not self.done:
- self.lever.wait()
- self.lever.release()
- return self.result
-
- def wakeup(self, data):
- self.result = data
- self.lever.acquire()
- self.done = True
- self.lever.notifyAll()
- self.lever.release()
-
- def append(self, e):
- self.lever.acquire()
- if not self.open:
- raise BoxClosedErr
- list.append(self, e)
- self.lever.release()
-
- def close(self):
- self.lever.acquire()
- self.open = False
- self.lever.release()
-
-class Syncer(object):
-
- def __init__(self, slave):
- self.slave = slave
- self.lock = Lock()
- self.pb = PostBox()
- for i in range(int(gconf.sync_jobs)):
- t = Thread(target=self.syncjob)
- t.start()
-
- def syncjob(self):
- while True:
- pb = None
- while True:
- self.lock.acquire()
- if self.pb:
- pb, self.pb = self.pb, PostBox()
- self.lock.release()
- if pb:
- break
- time.sleep(0.5)
- pb.close()
- pb.wakeup(self.slave.rsync(pb))
-
- def add(self, e):
- while True:
- try:
- self.pb.append(e)
- return self.pb
- except BoxClosedErr:
- pass
diff --git a/xlators/features/marker/utils/syncdaemon/monitor.py b/xlators/features/marker/utils/syncdaemon/monitor.py
deleted file mode 100644
index 365e91435fd..00000000000
--- a/xlators/features/marker/utils/syncdaemon/monitor.py
+++ /dev/null
@@ -1,80 +0,0 @@
-import os
-import sys
-import time
-import logging
-import select
-from signal import SIGKILL
-from gconf import gconf
-from syncdutils import update_file
-
-class Monitor(object):
-
- def __init__(self):
- self.state = None
-
- def set_state(self, state):
- if state == self.state:
- return
- self.state = state
- logging.info('new state: %s' % state)
- if getattr(gconf, 'state_file', None):
- update_file(gconf.state_file, lambda f: f.write(state + '\n'))
-
- def monitor(self):
- argv = sys.argv[:]
- for o in ('-N', '--no-daemon', '--monitor'):
- while o in argv:
- argv.remove(o)
- argv.extend(('-N', '-p', ''))
- argv.insert(0, os.path.basename(sys.executable))
-
- self.set_state('starting...')
- ret = 0
- def nwait(p, o=0):
- p2, r = os.waitpid(p, o)
- if not p2:
- return
- if os.WIFEXITED(r):
- return os.WEXITSTATUS(r)
- return 1
- conn_timeout = 60
- while ret in (0, 1):
- logging.info('-' * conn_timeout)
- logging.info('starting gsyncd worker')
- pr, pw = os.pipe()
- cpid = os.fork()
- if cpid == 0:
- os.close(pr)
- os.execv(sys.executable, argv + ['--feedback-fd', str(pw)])
- os.close(pw)
- t0 = time.time()
- so = select.select((pr,), (), (), conn_timeout)[0]
- os.close(pr)
- if so:
- ret = nwait(cpid, os.WNOHANG)
- if ret != None:
- logging.debug("worker died before establishing connection")
- else:
- logging.debug("worker seems to be connected (?? racy check)")
- while time.time() < t0 + conn_timeout:
- ret = nwait(cpid, os.WNOHANG)
- if ret != None:
- logging.debug("worker died in startup phase")
- break
- time.sleep(1)
- else:
- logging.debug("worker not confirmed in %d sec, aborting it" % \
- conn_timeout)
- os.kill(cpid, SIGKILL)
- ret = nwait(cpid)
- if ret == None:
- self.set_state('OK')
- ret = nwait(cpid)
- elif ret in (0, 1):
- self.set_state('faulty')
- time.sleep(10)
- self.set_state('inconsistent')
- return ret
-
-def monitor():
- return Monitor().monitor()
diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py
deleted file mode 100644
index 47691301e29..00000000000
--- a/xlators/features/marker/utils/syncdaemon/repce.py
+++ /dev/null
@@ -1,162 +0,0 @@
-import os
-import sys
-import select
-import time
-import logging
-from threading import Condition
-try:
- import thread
-except ImportError:
- # py 3
- import _thread as thread
-try:
- from Queue import Queue
-except ImportError:
- # py 3
- from queue import Queue
-try:
- import cPickle as pickle
-except ImportError:
- # py 3
- import pickle
-
-from syncdutils import Thread
-
-pickle_proto = -1
-repce_version = 1.0
-
-def ioparse(i, o):
- if isinstance(i, int):
- i = os.fdopen(i)
- # rely on duck typing for recognizing
- # streams as that works uniformly
- # in py2 and py3
- if hasattr(o, 'fileno'):
- o = o.fileno()
- return (i, o)
-
-def send(out, *args):
- os.write(out, pickle.dumps(args, pickle_proto))
-
-def recv(inf):
- return pickle.load(inf)
-
-
-class RepceServer(object):
-
- def __init__(self, obj, i, o, wnum=6):
- self.obj = obj
- self.inf, self.out = ioparse(i, o)
- self.wnum = wnum
- self.q = Queue()
-
- def service_loop(self):
- for i in range(self.wnum):
- t = Thread(target=self.worker)
- t.start()
- try:
- while True:
- self.q.put(recv(self.inf))
- except EOFError:
- logging.info("terminating on reaching EOF.")
-
- def worker(self):
- while True:
- in_data = self.q.get(True)
- rid = in_data[0]
- rmeth = in_data[1]
- exc = False
- if rmeth == '__repce_version__':
- res = repce_version
- else:
- try:
- res = getattr(self.obj, rmeth)(*in_data[2:])
- except:
- res = sys.exc_info()[1]
- exc = True
- logging.exception("call failed: ")
- send(self.out, rid, exc, res)
-
-
-class RepceJob(object):
-
- def __init__(self, cbk):
- self.rid = (os.getpid(), thread.get_ident(), time.time())
- self.cbk = cbk
- self.lever = Condition()
- self.done = False
-
- def __repr__(self):
- return ':'.join([str(x) for x in self.rid])
-
- def wait(self):
- self.lever.acquire()
- if not self.done:
- self.lever.wait()
- self.lever.release()
- return self.result
-
- def wakeup(self, data):
- self.result = data
- self.lever.acquire()
- self.done = True
- self.lever.notify()
- self.lever.release()
-
-
-class RepceClient(object):
-
- def __init__(self, i, o):
- self.inf, self.out = ioparse(i, o)
- self.jtab = {}
- t = Thread(target = self.listen)
- t.start()
-
- def listen(self):
- while True:
- select.select((self.inf,), (), ())
- rid, exc, res = recv(self.inf)
- rjob = self.jtab.pop(rid)
- if rjob.cbk:
- rjob.cbk(rjob, [exc, res])
-
- def push(self, meth, *args, **kw):
- cbk = kw.get('cbk')
- if not cbk:
- def cbk(rj, res):
- if res[0]:
- raise res[1]
- rjob = RepceJob(cbk)
- self.jtab[rjob.rid] = rjob
- logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args)))
- send(self.out, rjob.rid, meth, *args)
- return rjob
-
- def __call__(self, meth, *args):
- rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
- exc, res = rjob.wait()
- if exc:
- logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__)))
- raise res
- logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
- return res
-
- class mprx(object):
-
- def __init__(self, ins, meth):
- self.ins = ins
- self.meth = meth
-
- def __call__(self, *a):
- return self.ins(self.meth, *a)
-
- def __getattr__(self, meth):
- return self.mprx(self, meth)
-
- def __version__(self):
- d = {'proto': self('__repce_version__')}
- try:
- d['object'] = self('version')
- except AttributeError:
- pass
- return d
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py
deleted file mode 100644
index 800d297bacd..00000000000
--- a/xlators/features/marker/utils/syncdaemon/resource.py
+++ /dev/null
@@ -1,467 +0,0 @@
-import re
-import os
-import sys
-import pwd
-import stat
-import time
-import errno
-import struct
-import select
-import socket
-import logging
-import tempfile
-from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR
-
-from gconf import gconf
-import repce
-from repce import RepceServer, RepceClient
-from master import GMaster
-import syncdutils
-
-UrlRX = re.compile('\A(\w+)://(.*)')
-HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
-UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
-
-def sup(x, *a, **kw):
- return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw)
-
-def desugar(ustr):
- m = re.match('([^:]*):(.*)', ustr)
- if m:
- if not m.groups()[0]:
- return "gluster://localhost" + ustr
- elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]):
- return "ssh://" + ustr
- else:
- return "gluster://" + ustr
- else:
- if ustr[0] != '/':
- raise RuntimeError("cannot resolve sugared url '%s'" % ustr)
- ap = os.path.normpath(ustr)
- if ap.startswith('//'):
- ap = ap[1:]
- return "file://" + ap
-
-def parse_url(ustr):
- m = UrlRX.match(ustr)
- if not m:
- ustr = desugar(ustr)
- m = UrlRX.match(ustr)
- if not m:
- raise RuntimeError("malformed url")
- sch, path = m.groups()
- this = sys.modules[__name__]
- if not hasattr(this, sch.upper()):
- raise RuntimeError("unknown url scheme " + sch)
- return getattr(this, sch.upper())(path)
-
-
-class _MetaXattr(object):
-
- # load Xattr stuff on-demand
-
- def __getattr__(self, meth):
- from libcxattr import Xattr as LXattr
- xmeth = [ m for m in dir(LXattr) if m[0] != '_' ]
- if not meth in xmeth:
- return
- for m in xmeth:
- setattr(self, m, getattr(LXattr, m))
- return getattr(self, meth)
-
-Xattr = _MetaXattr()
-
-
-class Server(object):
-
- GX_NSPACE = "trusted.glusterfs"
- NTV_FMTSTR = "!" + "B"*19 + "II"
- FRGN_XTRA_FMT = "I"
- FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
-
- @staticmethod
- def entries(path):
- # prevent symlinks being followed
- if not stat.S_ISDIR(os.lstat(path).st_mode):
- raise OSError(ENOTDIR, os.strerror(ENOTDIR))
- return os.listdir(path)
-
- @classmethod
- def purge(cls, path, entries=None):
- me_also = entries == None
- if not entries:
- try:
- # if it's a symlink, prevent
- # following it
- try:
- os.unlink(path)
- return
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno == EISDIR:
- entries = os.listdir(path)
- else:
- raise
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno in (ENOTDIR, ENOENT, ELOOP):
- try:
- os.unlink(path)
- return
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- return
- raise
- else:
- raise
- for e in entries:
- cls.purge(os.path.join(path, e))
- if me_also:
- os.rmdir(path)
-
- @classmethod
- def _create(cls, path, ctor):
- try:
- ctor(path)
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno == EEXIST:
- cls.purge(path)
- return ctor(path)
- raise
-
- @classmethod
- def mkdir(cls, path):
- cls._create(path, os.mkdir)
-
- @classmethod
- def symlink(cls, lnk, path):
- cls._create(path, lambda p: os.symlink(lnk, p))
-
- @classmethod
- def xtime(cls, path, uuid):
- try:
- return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8))
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno in (ENOENT, ENODATA, ENOTDIR):
- return ex.errno
- else:
- raise
-
- @classmethod
- def set_xtime(cls, path, uuid, mark):
- Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
-
- @staticmethod
- def setattr(path, adct):
- own = adct.get('own')
- if own:
- os.lchown(path, *own)
- mode = adct.get('mode')
- if mode:
- os.chmod(path, stat.S_IMODE(mode))
- times = adct.get('times')
- if times:
- os.utime(path, times)
-
- @staticmethod
- def pid():
- return os.getpid()
-
- last_keep_alive = 0
- @classmethod
- def keep_alive(cls, dct):
- if dct:
- key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])
- val = struct.pack(cls.FRGN_FMTSTR,
- *(dct['version'] +
- tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) +
- (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],)))
- Xattr.lsetxattr('.', key, val)
- cls.last_keep_alive += 1
- return cls.last_keep_alive
-
- @staticmethod
- def version():
- return 1.0
-
-
-class SlaveLocal(object):
-
- def can_connect_to(self, remote):
- return not remote
-
- def service_loop(self):
- repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
- t = syncdutils.Thread(target=repce.service_loop)
- t.start()
- logging.info("slave listening")
- if gconf.timeout and int(gconf.timeout) > 0:
- while True:
- lp = self.server.last_keep_alive
- time.sleep(int(gconf.timeout))
- if lp == self.server.last_keep_alive:
- logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout))
- break
- else:
- select.select((), (), ())
-
-class SlaveRemote(object):
-
- def connect_remote(self, rargs=[], **opts):
- slave = opts.get('slave', self.url)
- ix, ox = os.pipe()
- iy, oy = os.pipe()
- pid = os.fork()
- if not pid:
- os.close(ox)
- os.dup2(ix, sys.stdin.fileno())
- os.close(iy)
- os.dup2(oy, sys.stdout.fileno())
- so = getattr(gconf, 'session_owner', None)
- if so:
- so_args = ['--session-owner', so]
- else:
- so_args = []
- argv = rargs + gconf.remote_gsyncd.split() + so_args + \
- ['-N', '--listen', '--timeout', str(gconf.timeout), slave]
- os.execvp(argv[0], argv)
- os.close(ix)
- os.close(oy)
- return self.start_fd_client(iy, ox, **opts)
-
- def start_fd_client(self, i, o, **opts):
- self.server = RepceClient(i, o)
- rv = self.server.__version__()
- exrv = {'proto': repce.repce_version, 'object': Server.version()}
- da0 = (rv, exrv)
- da1 = ({}, {})
- for i in range(2):
- for k, v in da0[i].iteritems():
- da1[i][k] = int(v)
- if da1[0] != da1[1]:
- raise RuntimeError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv))
-
- def rsync(self, files, *args):
- if not files:
- raise RuntimeError("no files to sync")
- logging.debug("files: " + ", ".join(files))
- argv = gconf.rsync_command.split() + gconf.rsync_extra.split() + ['-aR'] + files + list(args)
- return os.spawnvp(os.P_WAIT, argv[0], argv) == 0
-
-
-class AbstractUrl(object):
-
- def __init__(self, path, pattern):
- m = re.search(pattern, path)
- if not m:
- raise RuntimeError("malformed path")
- self.path = path
- return m.groups()
-
- @property
- def scheme(self):
- return type(self).__name__.lower()
-
- def canonical_path(self):
- return self.path
-
- def get_url(self, canonical=False, escaped=False):
- if canonical:
- pa = self.canonical_path()
- else:
- pa = self.path
- u = "://".join((self.scheme, pa))
- if escaped:
- u = syncdutils.escape(u)
- return u
-
- @property
- def url(self):
- return self.get_url()
-
-
- ### Concrete resource classes ###
-
-
-class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
-
- class FILEServer(Server):
- pass
-
- server = FILEServer
-
- def __init__(self, path):
- sup(self, path, '^/')
-
- def connect(self):
- os.chdir(self.path)
-
- def rsync(self, files):
- return sup(self, files, self.path)
-
-
-class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
-
- class GLUSTERServer(Server):
-
- @classmethod
- def _attr_unpack_dict(cls, xattr, extra_fields = ''):
- fmt_string = cls.NTV_FMTSTR + extra_fields
- buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
- vm = struct.unpack(fmt_string, buf)
- m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]]))
- uuid = '-'.join(m.groups())
- volinfo = { 'version': vm[0:2],
- 'uuid' : uuid,
- 'retval' : vm[18],
- 'volume_mark': vm[19:21],
- }
- if extra_fields:
- return volinfo, vm[-len(extra_fields):]
- else:
- return volinfo
-
- @classmethod
- def foreign_volume_infos(cls):
- dict_list = []
- xattr_list = Xattr.llistxattr_buf('.')
- for ele in xattr_list:
- if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0:
- d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
- now = int(time.time())
- if x[0] > now:
- logging.debug("volinfo[%s] expires: %d (%d sec later)" % \
- (d['uuid'], x[0], x[0] - now))
- dict_list.append(d)
- else:
- try:
- Xattr.lremovexattr('.', ele)
- except OSError:
- pass
- return dict_list
-
- @classmethod
- def native_volume_info(cls):
- try:
- return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark']))
- except OSError:
- ex = sys.exc_info()[1]
- if ex.errno != ENODATA:
- raise
-
- server = GLUSTERServer
-
- def __init__(self, path):
- self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
-
- def canonical_path(self):
- return ':'.join([socket.gethostbyname(self.host), self.volume])
-
- def can_connect_to(self, remote):
- return True
-
- def connect(self):
- def umount_l(d):
- time.sleep(0.2) # XXX temporary workaround
- argv = ['umount', '-l', d]
- return os.spawnvp(os.P_WAIT, argv[0], argv)
- d = tempfile.mkdtemp(prefix='gsyncd-aux-mount-')
- mounted = False
- try:
- argv = gconf.gluster_command.split() + \
- (gconf.gluster_log_level and ['-L', gconf.gluster_log_level] or []) + \
- ['-l', gconf.gluster_log_file, '-s', self.host,
- '--volfile-id', self.volume, '--client-pid=-1', d]
- if os.spawnvp(os.P_WAIT, argv[0], argv):
- raise RuntimeError("command failed: " + " ".join(argv))
- mounted = True
- logging.debug('auxiliary glusterfs mount in place')
- os.chdir(d)
- if umount_l(d) != 0:
- raise RuntimeError("umounting %s failed" % d)
- mounted = False
- finally:
- try:
- if mounted:
- umount_l(d)
- os.rmdir(d)
- except:
- logging.warn('stale mount possibly left behind on ' + d)
- logging.debug('auxiliary glusterfs mount prepared')
-
- def connect_remote(self, *a, **kw):
- sup(self, *a, **kw)
- self.slavedir = "/proc/%d/cwd" % self.server.pid()
-
- def service_loop(self, *args):
- if args:
- GMaster(self, args[0]).crawl_loop()
- else:
- sup(self, *args)
-
- def rsync(self, files):
- return sup(self, files, self.slavedir)
-
-
-class SSH(AbstractUrl, SlaveRemote):
-
- def __init__(self, path):
- self.remote_addr, inner_url = sup(self, path,
- '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ]))
- self.inner_rsc = parse_url(inner_url)
-
- def canonical_path(self):
- m = re.match('([^@]+)@(.+)', self.remote_addr)
- if m:
- u, h = m.groups()
- else:
- u, h = pwd.getpwuid(os.geteuid()).pw_name, self.remote_addr
- remote_addr = '@'.join([u, socket.gethostbyname(h)])
- return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)])
-
- def can_connect_to(self, remote):
- return False
-
- def start_fd_client(self, *a, **opts):
- if opts.get('deferred'):
- return a
- sup(self, *a)
- ityp = type(self.inner_rsc)
- if ityp == FILE:
- slavepath = self.inner_rsc.path
- elif ityp == GLUSTER:
- slavepath = "/proc/%d/cwd" % self.server.pid()
- else:
- raise NotImplementedError
- self.slaveurl = ':'.join([self.remote_addr, slavepath])
-
- def connect_remote(self, go_daemon=None):
- if go_daemon == 'done':
- return self.start_fd_client(*self.fd_pair)
- gconf.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'))
- deferred = go_daemon == 'postconn'
- ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred)
- if deferred:
- # send a message to peer so that we can wait for
- # the answer from which we know connection is
- # established and we can proceed with daemonization
- # (doing that too early robs the ssh passwd prompt...)
- # However, we'd better not start the RepceClient
- # before daemonization (that's not preserved properly
- # in daemon), we just do a an ad-hoc linear put/get.
- i, o = ret
- inf = os.fdopen(i)
- repce.send(o, None, '__repce_version__')
- select.select((inf,), (), ())
- repce.recv(inf)
- # hack hack hack: store a global reference to the file
- # to save it from getting GC'd which implies closing it
- gconf.permanent_handles.append(inf)
- self.fd_pair = (i, o)
- return 'should'
-
- def rsync(self, files):
- return sup(self, files, '-ze', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), self.slaveurl)
diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py
deleted file mode 100644
index 4bf51da746e..00000000000
--- a/xlators/features/marker/utils/syncdaemon/syncdutils.py
+++ /dev/null
@@ -1,160 +0,0 @@
-import os
-import sys
-import time
-import fcntl
-import shutil
-import logging
-from threading import Lock, Thread as baseThread
-from errno import EACCES, EAGAIN
-from signal import SIGTERM, SIGKILL
-from time import sleep
-
-from gconf import gconf
-
-try:
- # py 3
- from urllib import parse as urllib
-except ImportError:
- import urllib
-
-def escape(s):
- return urllib.quote_plus(s)
-
-def unescape(s):
- return urllib.unquote_plus(s)
-
-def norm(s):
- if s:
- return s.replace('-', '_')
-
-def update_file(path, updater, merger = lambda f: True):
- """update a file in a transaction-like manner"""
-
- fr = fw = None
- try:
- fd = os.open(path, os.O_CREAT|os.O_RDWR)
- try:
- fr = os.fdopen(fd, 'r+b')
- except:
- os.close(fd)
- raise
- fcntl.lockf(fr, fcntl.LOCK_EX)
- if not merger(fr):
- return
-
- tmpp = path + '.tmp.' + str(os.getpid())
- fd = os.open(tmpp, os.O_CREAT|os.O_EXCL|os.O_WRONLY)
- try:
- fw = os.fdopen(fd, 'wb', 0)
- except:
- os.close(fd)
- raise
- updater(fw)
- os.fsync(fd)
- os.rename(tmpp, path)
- finally:
- for fx in (fr, fw):
- if fx:
- fx.close()
-
-def grabfile(fname, content=None):
- # damn those messy open() mode codes
- fd = os.open(fname, os.O_CREAT|os.O_RDWR)
- f = os.fdopen(fd, 'r+b', 0)
- try:
- fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB)
- except:
- ex = sys.exc_info()[1]
- f.close()
- if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN):
- # cannot grab, it's taken
- return
- raise
- if content:
- try:
- f.truncate()
- f.write(content)
- except:
- f.close()
- raise
- gconf.permanent_handles.append(f)
- return f
-
-def grabpidfile(fname=None, setpid=True):
- if not fname:
- fname = gconf.pid_file
- content = None
- if setpid:
- content = str(os.getpid()) + '\n'
- return grabfile(fname, content=content)
-
-final_lock = Lock()
-
-def finalize(*a, **kw):
- final_lock.acquire()
- if getattr(gconf, 'pid_file', None):
- rm_pidf = gconf.pid_file_owned
- if gconf.cpid:
- # exit path from parent branch of daemonization
- rm_pidf = False
- while True:
- f = grabpidfile(setpid=False)
- if not f:
- # child has already taken over pidfile
- break
- if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:
- # child has terminated
- rm_pidf = True
- break;
- time.sleep(0.1)
- if rm_pidf:
- try:
- os.unlink(gconf.pid_file)
- except:
- ex = sys.exc_info()[1]
- if ex.errno == ENOENT:
- pass
- else:
- raise
- if gconf.ssh_ctl_dir and not gconf.cpid:
- shutil.rmtree(gconf.ssh_ctl_dir)
- sys.stdout.flush()
- sys.stderr.flush()
- os._exit(kw.get('exval', 0))
-
-def log_raise_exception(excont):
- exc = sys.exc_info()[1]
- if isinstance(exc, SystemExit):
- excont.exval = exc.code or 0
- raise
- else:
- logging.exception("FAIL: ")
- sys.stderr.write("failed with %s.\n" % type(exc).__name__)
- excont.exval = 1
- sys.exit(excont.exval)
-
-
-class FreeObject(object):
- """wildcard class for which any attribute can be set"""
-
- def __init__(self, **kw):
- for k,v in kw.iteritems():
- setattr(self, k, v)
-
-class Thread(baseThread):
-
- def __init__(self, *a, **kw):
- tf = kw.get('target')
- if tf:
- def twrap(*aa):
- excont = FreeObject(exval = 0)
- try:
- tf(*aa)
- except:
- try:
- log_raise_exception(excont)
- finally:
- finalize(exval = excont.exval)
- kw['target'] = twrap
- baseThread.__init__(self, *a, **kw)
- self.setDaemon(True)