diff options
Diffstat (limited to 'geo-replication/syncdaemon')
| -rw-r--r-- | geo-replication/syncdaemon/Makefile.am | 7 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/README.md | 58 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/__codecheck.py | 57 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/__init__.py | 9 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/configinterface.py | 343 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gconf.py | 25 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 624 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/libcxattr.py | 99 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/libgfchangelog.py | 76 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 1476 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/monitor.py | 271 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/repce.py | 247 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 1413 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 490 |
14 files changed, 5195 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am new file mode 100644 index 000000000..83f969639 --- /dev/null +++ b/geo-replication/syncdaemon/Makefile.am @@ -0,0 +1,7 @@ +syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon + +syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \ + resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \ + $(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py + +CLEANFILES = diff --git a/geo-replication/syncdaemon/README.md b/geo-replication/syncdaemon/README.md new file mode 100644 index 000000000..67f346ace --- /dev/null +++ b/geo-replication/syncdaemon/README.md @@ -0,0 +1,58 @@ +gsycnd, the Gluster Syncdaemon +============================== + +REQUIREMENTS +------------ + +_gsyncd_ is a program which can operate either in _master_ or in _slave_ mode. +Requirements are categorized according to this. + +* supported OS is GNU/Linux +* Python >= 2.5, or 2.4 with Ctypes (see below) (both) +* OpenSSH >= 4.0 (master) / SSH2 compliant sshd (eg. openssh) (slave) +* rsync (both) +* glusterfs: with marker and changelog support (master & slave); +* FUSE: glusterfs fuse module with auxilary gfid based access support + +INSTALLATION +------------ + +As of now, the supported way of operation is running from the source directory or using the RPMs given. + +If you use Python 2.4.x, you need to install the [Ctypes module](http://python.net/crew/theller/ctypes/). + +CONFIGURATION +------------- + +gsyncd tunables are a subset of the long command-line options; for listing them, +type + + gsyncd.py --help + +and see the long options up to "--config-file". (The leading double dash should be omitted; +interim underscores and dashes are interchangeable.) The set of options bear some resemblance +to those of glusterfs and rsync. + +The config file format matches the following syntax: + + <option1>: <value1> + <option2>: <value2> + # comment + +By default (unless specified by the option `-c`), gsyncd looks for config file at _conf/gsyncd_template.conf_ +in the source tree. + +USAGE +----- + +gsyncd is a utilitly for continous mirroring, ie. it mirrors master to slave incrementally. +Assume we have a gluster volume _pop_ at localhost. We try to set up the mirroring for volume +_pop_ using gsyncd for gluster volume _moz_ on remote machine/cluster @ example.com. The +respective gsyncd invocations are (demoing some syntax sugaring): + +`gsyncd.py :pop example.com::moz` + +gsyncd has to be available on both sides; it's location on the remote side has to be specified +via the "--remote-gsyncd" option (or "remote-gsyncd" config file parameter). (This option can also be +used for setting options on the remote side, although the suggested mode of operation is to +set parameters like log file / pid file in the configuration file.) diff --git a/geo-replication/syncdaemon/__codecheck.py b/geo-replication/syncdaemon/__codecheck.py new file mode 100644 index 000000000..45dbd26bb --- /dev/null +++ b/geo-replication/syncdaemon/__codecheck.py @@ -0,0 +1,57 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +import os.path +import sys +import tempfile +import shutil + +ipd = tempfile.mkdtemp(prefix='codecheck-aux') + +try: + # add a fake ipaddr module, we don't want to + # deal with the real one (just test our code) + f = open(os.path.join(ipd, 'ipaddr.py'), 'w') + f.write(""" +class IPAddress(object): + pass +class IPNetwork(list): + pass +""") + f.close() + sys.path.append(ipd) + + fl = os.listdir(os.path.dirname(sys.argv[0]) or '.') + fl.sort() + for f in fl: + if f[-3:] != '.py' or f[0] == '_': + continue + m = f[:-3] + sys.stdout.write('importing %s ...' % m) + __import__(m) + print(' OK.') + + def sys_argv_set(a): + sys.argv = sys.argv[:1] + a + + gsyncd = sys.modules['gsyncd'] + for a in [['--help'], ['--version'], + ['--canonicalize-escape-url', '/foo']]: + print('>>> invoking program with args: %s' % ' '.join(a)) + pid = os.fork() + if not pid: + sys_argv_set(a) + gsyncd.main() + _, r = os.waitpid(pid, 0) + if r: + raise RuntimeError('invocation failed') +finally: + shutil.rmtree(ipd) diff --git a/geo-replication/syncdaemon/__init__.py b/geo-replication/syncdaemon/__init__.py new file mode 100644 index 000000000..b4648b696 --- /dev/null +++ b/geo-replication/syncdaemon/__init__.py @@ -0,0 +1,9 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py new file mode 100644 index 000000000..c4d47b5db --- /dev/null +++ b/geo-replication/syncdaemon/configinterface.py @@ -0,0 +1,343 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +try: + import ConfigParser +except ImportError: + # py 3 + import configparser as ConfigParser +import re +from string import Template +import os +import errno +import sys +from stat import ST_DEV, ST_INO, ST_MTIME +import tempfile +import shutil + +from syncdutils import escape, unescape, norm, update_file, GsyncdError + +SECT_ORD = '__section_order__' +SECT_META = '__meta__' +config_version = 2.0 + +re_type = type(re.compile('')) + + +# (SECTION, OPTION, OLD VALUE, NEW VALUE) +CONFIGS = ( + ("peersrx . .", + "georep_session_working_dir", + "", + "/var/lib/glusterd/geo-replication/${mastervol}_${remotehost}_" + "${slavevol}/"), + ("peersrx .", + "gluster_params", + "aux-gfid-mount xlator-option=\*-dht.assert-no-child-down=true", + "aux-gfid-mount"), + ("peersrx . .", + "ssh_command_tar", + "", + "ssh -oPasswordAuthentication=no -oStrictHostKeyChecking=no " + "-i /var/lib/glusterd/geo-replication/tar_ssh.pem"), +) + + +def upgrade_config_file(path): + config_change = False + config = ConfigParser.RawConfigParser() + config.read(path) + + for sec, opt, oldval, newval in CONFIGS: + try: + val = config.get(sec, opt) + except ConfigParser.NoOptionError: + # if new config opt not exists + config_change = True + config.set(sec, opt, newval) + continue + except ConfigParser.Error: + """ + When gsyncd invoked at the time of create, config file + will not be their. Ignore any ConfigParser errors + """ + continue + + if val == newval: + # value is same as new val + continue + + if val == oldval: + # config value needs update + config_change = True + config.set(sec, opt, newval) + + if config_change: + tempConfigFile = tempfile.NamedTemporaryFile(mode="wb", delete=False) + with open(tempConfigFile.name, 'wb') as configFile: + config.write(configFile) + + # If src and dst are two different file system, then os.rename + # fails, In this case if temp file created in /tmp and if /tmp is + # seperate fs then os.rename gives following error, so use shutil + # OSError: [Errno 18] Invalid cross-device link + # mail.python.org/pipermail/python-list/2005-February/342893.html + shutil.move(tempConfigFile.name, path) + + +class MultiDict(object): + + """a virtual dict-like class which functions as the union + of underlying dicts""" + + def __init__(self, *dd): + self.dicts = dd + + def __getitem__(self, key): + val = None + for d in self.dicts: + if d.get(key) is not None: + val = d[key] + if val is None: + raise KeyError(key) + return val + + +class GConffile(object): + + """A high-level interface to ConfigParser which flattens the two-tiered + config layout by implenting automatic section dispatch based on initial + parameters. + + Also ensure section ordering in terms of their time of addition -- a compat + hack for Python < 2.7. + """ + + def _normconfig(self): + """normalize config keys by s/-/_/g""" + for n, s in self.config._sections.items(): + if n.find('__') == 0: + continue + s2 = type(s)() + for k, v in s.items(): + if k.find('__') != 0: + k = norm(k) + s2[k] = v + self.config._sections[n] = s2 + + def __init__(self, path, peers, *dd): + """ + - .path: location of config file + - .config: underlying ConfigParser instance + - .peers: on behalf of whom we flatten .config + (master, or master-slave url pair) + - .auxdicts: template subtituents + """ + self.peers = peers + self.path = path + self.auxdicts = dd + self.config = ConfigParser.RawConfigParser() + self.config.read(path) + self.dev, self.ino, self.mtime = -1, -1, -1 + self._normconfig() + + def _load(self): + try: + sres = os.stat(self.path) + self.dev = sres[ST_DEV] + self.ino = sres[ST_INO] + self.mtime = sres[ST_MTIME] + except (OSError, IOError): + if sys.exc_info()[1].errno == errno.ENOENT: + sres = None + + self.config = ConfigParser.RawConfigParser() + self.config.read(self.path) + self._normconfig() + + def get_realtime(self, opt): + try: + sres = os.stat(self.path) + except (OSError, IOError): + if sys.exc_info()[1].errno == errno.ENOENT: + sres = None + else: + raise + + # compare file system stat with that of our stream file handle + if not sres or sres[ST_DEV] != self.dev or \ + sres[ST_INO] != self.ino or self.mtime != sres[ST_MTIME]: + self._load() + + return self.get(opt, printValue=False) + + def section(self, rx=False): + """get the section name of the section representing .peers + in .config""" + peers = self.peers + if not peers: + peers = ['.', '.'] + rx = True + if rx: + st = 'peersrx' + else: + st = 'peers' + return ' '.join([st] + [escape(u) for u in peers]) + + @staticmethod + def parse_section(section): + """retrieve peers sequence encoded by section name + (as urls or regexen, depending on section type) + """ + sl = section.split() + st = sl.pop(0) + sl = [unescape(u) for u in sl] + if st == 'peersrx': + sl = [re.compile(u) for u in sl] + return sl + + def ord_sections(self): + """Return an ordered list of sections. + + Ordering happens based on the auxiliary + SECT_ORD section storing indices for each + section added through the config API. + + To not to go corrupt in case of manually + written config files, we take care to append + also those sections which are not registered + in SECT_ORD. + + Needed for python 2.{4,5,6} where ConfigParser + cannot yet order sections/options internally. + """ + so = {} + if self.config.has_section(SECT_ORD): + so = self.config._sections[SECT_ORD] + so2 = {} + for k, v in so.items(): + if k != '__name__': + so2[k] = int(v) + tv = 0 + if so2: + tv = max(so2.values()) + 1 + ss = [s for s in self.config.sections() if s.find('__') != 0] + for s in ss: + if s in so.keys(): + continue + so2[s] = tv + tv += 1 + + def scmp(x, y): + return cmp(*(so2[s] for s in (x, y))) + ss.sort(scmp) + return ss + + def update_to(self, dct, allow_unresolved=False): + """update @dct from key/values of ours. + + key/values are collected from .config by filtering the regexp sections + according to match, and from .section. The values are treated as + templates, which are substituted from .auxdicts and (in case of regexp + sections) match groups. + """ + if not self.peers: + raise GsyncdError('no peers given, cannot select matching options') + + def update_from_sect(sect, mud): + for k, v in self.config._sections[sect].items(): + if k == '__name__': + continue + if allow_unresolved: + dct[k] = Template(v).safe_substitute(mud) + else: + dct[k] = Template(v).substitute(mud) + for sect in self.ord_sections(): + sp = self.parse_section(sect) + if isinstance(sp[0], re_type) and len(sp) == len(self.peers): + match = True + mad = {} + for i in range(len(sp)): + m = sp[i].search(self.peers[i]) + if not m: + match = False + break + for j in range(len(m.groups())): + mad['match%d_%d' % (i + 1, j + 1)] = m.groups()[j] + if match: + update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts)) + if self.config.has_section(self.section()): + update_from_sect(self.section(), MultiDict(dct, *self.auxdicts)) + + def get(self, opt=None, printValue=True): + """print the matching key/value pairs from .config, + or if @opt given, the value for @opt (according to the + logic described in .update_to) + """ + d = {} + self.update_to(d, allow_unresolved=True) + if opt: + opt = norm(opt) + v = d.get(opt) + if v: + if printValue: + print(v) + else: + return v + else: + for k, v in d.iteritems(): + if k == '__name__': + continue + print("%s: %s" % (k, v)) + + def write(self, trfn, opt, *a, **kw): + """update on-disk config transactionally + + @trfn is the transaction function + """ + def mergeconf(f): + self.config = ConfigParser.RawConfigParser() + self.config.readfp(f) + self._normconfig() + if not self.config.has_section(SECT_META): + self.config.add_section(SECT_META) + self.config.set(SECT_META, 'version', config_version) + return trfn(norm(opt), *a, **kw) + + def updateconf(f): + self.config.write(f) + update_file(self.path, updateconf, mergeconf) + + def _set(self, opt, val, rx=False): + """set @opt to @val in .section""" + sect = self.section(rx) + if not self.config.has_section(sect): + self.config.add_section(sect) + # regarding SECT_ORD, cf. ord_sections + if not self.config.has_section(SECT_ORD): + self.config.add_section(SECT_ORD) + self.config.set( + SECT_ORD, sect, len(self.config._sections[SECT_ORD])) + self.config.set(sect, opt, val) + return True + + def set(self, opt, *a, **kw): + """perform ._set transactionally""" + self.write(self._set, opt, *a, **kw) + + def _delete(self, opt, rx=False): + """delete @opt from .section""" + sect = self.section(rx) + if self.config.has_section(sect): + return self.config.remove_option(sect, opt) + + def delete(self, opt, *a, **kw): + """perform ._delete transactionally""" + self.write(self._delete, opt, *a, **kw) diff --git a/geo-replication/syncdaemon/gconf.py b/geo-replication/syncdaemon/gconf.py new file mode 100644 index 000000000..1fc7c381b --- /dev/null +++ b/geo-replication/syncdaemon/gconf.py @@ -0,0 +1,25 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + + +class GConf(object): + + """singleton class to store globals + shared between gsyncd modules""" + + ssh_ctl_dir = None + ssh_ctl_args = None + cpid = None + pid_file_owned = False + log_exit = False + permanent_handles = [] + log_metadata = {} + +gconf = GConf() diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py new file mode 100644 index 000000000..426d964de --- /dev/null +++ b/geo-replication/syncdaemon/gsyncd.py @@ -0,0 +1,624 @@ +#!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +import os.path +import glob +import sys +import time +import logging +import shutil +import optparse +import fcntl +import fnmatch +from optparse import OptionParser, SUPPRESS_HELP +from logging import Logger, handlers +from errno import ENOENT + +from ipaddr import IPAddress, IPNetwork + +from gconf import gconf +from syncdutils import FreeObject, norm, grabpidfile, finalize +from syncdutils import log_raise_exception, privileged, update_file +from syncdutils import GsyncdError, select, set_term_handler +from configinterface import GConffile, upgrade_config_file +import resource +from monitor import monitor + + +class GLogger(Logger): + + """Logger customizations for gsyncd. + + It implements a log format similar to that of glusterfs. + """ + + def makeRecord(self, name, level, *a): + rv = Logger.makeRecord(self, name, level, *a) + rv.nsecs = (rv.created - int(rv.created)) * 1000000 + fr = sys._getframe(4) + callee = fr.f_locals.get('self') + if callee: + ctx = str(type(callee)).split("'")[1].split('.')[-1] + else: + ctx = '<top>' + if not hasattr(rv, 'funcName'): + rv.funcName = fr.f_code.co_name + rv.lvlnam = logging.getLevelName(level)[0] + rv.ctx = ctx + return rv + + @classmethod + def setup(cls, **kw): + lbl = kw.get('label', "") + if lbl: + lbl = '(' + lbl + ')' + lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", + 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} + lprm.update(kw) + lvl = kw.get('level', logging.INFO) + lprm['level'] = lvl + logging.root = cls("root", lvl) + logging.setLoggerClass(cls) + logging.getLogger().handlers = [] + logging.getLogger().setLevel(lprm['level']) + + if 'filename' in lprm: + try: + logging_handler = handlers.WatchedFileHandler(lprm['filename']) + formatter = logging.Formatter(fmt=lprm['format'], + datefmt=lprm['datefmt']) + logging_handler.setFormatter(formatter) + logging.getLogger().addHandler(logging_handler) + except AttributeError: + # Python version < 2.6 will not have WatchedFileHandler + # so fallback to logging without any handler. + # Note: logrotate will not work if Python version is < 2.6 + logging.basicConfig(**lprm) + else: + # If filename not passed(not available in lprm) then it may be + # streaming.(Ex: {"stream": "/dev/stdout"}) + logging.basicConfig(**lprm) + + @classmethod + def _gsyncd_loginit(cls, **kw): + lkw = {} + if gconf.log_level: + lkw['level'] = gconf.log_level + if kw.get('log_file'): + if kw['log_file'] in ('-', '/dev/stderr'): + lkw['stream'] = sys.stderr + elif kw['log_file'] == '/dev/stdout': + lkw['stream'] = sys.stdout + else: + lkw['filename'] = kw['log_file'] + + cls.setup(label=kw.get('label'), **lkw) + + lkw.update({'saved_label': kw.get('label')}) + gconf.log_metadata = lkw + gconf.log_exit = True + + +def startup(**kw): + """set up logging, pidfile grabbing, daemonization""" + if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': + if not grabpidfile(): + sys.stderr.write("pidfile is taken, exiting.\n") + sys.exit(2) + gconf.pid_file_owned = True + + if kw.get('go_daemon') == 'should': + x, y = os.pipe() + gconf.cpid = os.fork() + if gconf.cpid: + os.close(x) + sys.exit() + os.close(y) + os.setsid() + dn = os.open(os.devnull, os.O_RDWR) + for f in (sys.stdin, sys.stdout, sys.stderr): + os.dup2(dn, f.fileno()) + if getattr(gconf, 'pid_file', None): + if not grabpidfile(gconf.pid_file + '.tmp'): + raise GsyncdError("cannot grab temporary pidfile") + os.rename(gconf.pid_file + '.tmp', gconf.pid_file) + # wait for parent to terminate + # so we can start up with + # no messing from the dirty + # ol' bustard + select((x,), (), ()) + os.close(x) + + GLogger._gsyncd_loginit(**kw) + + +def _unlink(path): + try: + os.unlink(path) + except (OSError, IOError): + if sys.exc_info()[1].errno == ENOENT: + pass + else: + raise GsyncdError('Unlink error: %s' % path) + + +def main(): + """main routine, signal/exception handling boilerplates""" + gconf.starttime = time.time() + set_term_handler() + GLogger.setup() + excont = FreeObject(exval=0) + try: + try: + main_i() + except: + log_raise_exception(excont) + finally: + finalize(exval=excont.exval) + + +def main_i(): + """internal main routine + + parse command line, decide what action will be taken; + we can either: + - query/manipulate configuration + - format gsyncd urls using gsyncd's url parsing engine + - start service in following modes, in given stages: + - monitor: startup(), monitor() + - master: startup(), connect_remote(), connect(), service_loop() + - slave: startup(), connect(), service_loop() + """ + rconf = {'go_daemon': 'should'} + + def store_abs(opt, optstr, val, parser): + if val and val != '-': + val = os.path.abspath(val) + setattr(parser.values, opt.dest, val) + + def store_local(opt, optstr, val, parser): + rconf[opt.dest] = val + + def store_local_curry(val): + return lambda o, oo, vx, p: store_local(o, oo, val, p) + + def store_local_obj(op, dmake): + return lambda o, oo, vx, p: store_local( + o, oo, FreeObject(op=op, **dmake(vx)), p) + + op = OptionParser( + usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") + op.add_option('--gluster-command-dir', metavar='DIR', default='') + op.add_option('--gluster-log-file', metavar='LOGF', + default=os.devnull, type=str, action='callback', + callback=store_abs) + op.add_option('--gluster-log-level', metavar='LVL') + op.add_option('--gluster-params', metavar='PRMS', default='') + op.add_option( + '--glusterd-uuid', metavar='UUID', type=str, default='', + help=SUPPRESS_HELP) + op.add_option( + '--gluster-cli-options', metavar='OPTS', default='--log-file=-') + op.add_option('--mountbroker', metavar='LABEL') + op.add_option('-p', '--pid-file', metavar='PIDF', type=str, + action='callback', callback=store_abs) + op.add_option('-l', '--log-file', metavar='LOGF', type=str, + action='callback', callback=store_abs) + op.add_option('--log-file-mbr', metavar='LOGF', type=str, + action='callback', callback=store_abs) + op.add_option('--state-file', metavar='STATF', type=str, + action='callback', callback=store_abs) + op.add_option('--state-detail-file', metavar='STATF', + type=str, action='callback', callback=store_abs) + op.add_option('--georep-session-working-dir', metavar='STATF', + type=str, action='callback', callback=store_abs) + op.add_option('--ignore-deletes', default=False, action='store_true') + op.add_option('--isolated-slave', default=False, action='store_true') + op.add_option('--use-rsync-xattrs', default=False, action='store_true') + op.add_option('-L', '--log-level', metavar='LVL') + op.add_option('-r', '--remote-gsyncd', metavar='CMD', + default=os.path.abspath(sys.argv[0])) + op.add_option('--volume-id', metavar='UUID') + op.add_option('--slave-id', metavar='ID') + op.add_option('--session-owner', metavar='ID') + op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='') + op.add_option( + '--local-path', metavar='PATH', help=SUPPRESS_HELP, default='') + op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') + op.add_option('--ssh-command-tar', metavar='CMD', default='ssh') + op.add_option('--rsync-command', metavar='CMD', default='rsync') + op.add_option('--rsync-options', metavar='OPTS', default='') + op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') + op.add_option('--timeout', metavar='SEC', type=int, default=120) + op.add_option('--connection-timeout', metavar='SEC', + type=int, default=60, help=SUPPRESS_HELP) + op.add_option('--sync-jobs', metavar='N', type=int, default=3) + op.add_option( + '--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) + op.add_option('--allow-network', metavar='IPS', default='') + op.add_option('--socketdir', metavar='DIR') + op.add_option('--state-socket-unencoded', metavar='SOCKF', + type=str, action='callback', callback=store_abs) + op.add_option('--checkpoint', metavar='LABEL', default='') + # tunables for failover/failback mechanism: + # None - gsyncd behaves as normal + # blind - gsyncd works with xtime pairs to identify + # candidates for synchronization + # wrapup - same as normal mode but does not assign + # xtimes to orphaned files + # see crawl() for usage of the above tunables + op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) + + # changelog or xtime? (TODO: Change the default) + op.add_option( + '--change-detector', metavar='MODE', type=str, default='xtime') + # sleep interval for change detection (xtime crawl uses a hardcoded 1 + # second sleep time) + op.add_option('--change-interval', metavar='SEC', type=int, default=3) + # working directory for changelog based mechanism + op.add_option('--working-dir', metavar='DIR', type=str, + action='callback', callback=store_abs) + op.add_option('--use-tarssh', default=False, action='store_true') + + op.add_option('-c', '--config-file', metavar='CONF', + type=str, action='callback', callback=store_local) + # duh. need to specify dest or value will be mapped to None :S + op.add_option('--monitor', dest='monitor', action='callback', + callback=store_local_curry(True)) + op.add_option('--resource-local', dest='resource_local', + type=str, action='callback', callback=store_local) + op.add_option('--resource-remote', dest='resource_remote', + type=str, action='callback', callback=store_local) + op.add_option('--feedback-fd', dest='feedback_fd', type=int, + help=SUPPRESS_HELP, action='callback', callback=store_local) + op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, + action='callback', callback=store_local_curry(True)) + op.add_option('-N', '--no-daemon', dest="go_daemon", + action='callback', callback=store_local_curry('dont')) + op.add_option('--verify', type=str, dest="verify", + action='callback', callback=store_local) + op.add_option('--create', type=str, dest="create", + action='callback', callback=store_local) + op.add_option('--delete', dest='delete', action='callback', + callback=store_local_curry(True)) + op.add_option('--debug', dest="go_daemon", action='callback', + callback=lambda *a: (store_local_curry('dont')(*a), + setattr( + a[-1].values, 'log_file', '-'), + setattr(a[-1].values, 'log_level', + 'DEBUG'))), + op.add_option('--path', type=str, action='append') + + for a in ('check', 'get'): + op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', + action='callback', + callback=store_local_obj(a, lambda vx: {'opt': vx})) + op.add_option('--config-get-all', dest='config', action='callback', + callback=store_local_obj('get', lambda vx: {'opt': None})) + for m in ('', '-rx', '-glob'): + # call this code 'Pythonic' eh? + # have to define a one-shot local function to be able + # to inject (a value depending on the) + # iteration variable into the inner lambda + def conf_mod_opt_regex_variant(rx): + op.add_option('--config-set' + m, metavar='OPT VAL', type=str, + nargs=2, dest='config', action='callback', + callback=store_local_obj('set', lambda vx: { + 'opt': vx[0], 'val': vx[1], 'rx': rx})) + op.add_option('--config-del' + m, metavar='OPT', type=str, + dest='config', action='callback', + callback=store_local_obj('del', lambda vx: { + 'opt': vx, 'rx': rx})) + conf_mod_opt_regex_variant(m and m[1:] or False) + + op.add_option('--normalize-url', dest='url_print', + action='callback', callback=store_local_curry('normal')) + op.add_option('--canonicalize-url', dest='url_print', + action='callback', callback=store_local_curry('canon')) + op.add_option('--canonicalize-escape-url', dest='url_print', + action='callback', callback=store_local_curry('canon_esc')) + + tunables = [norm(o.get_opt_string()[2:]) + for o in op.option_list + if (o.callback in (store_abs, 'store_true', None) and + o.get_opt_string() not in ('--version', '--help'))] + remote_tunables = ['listen', 'go_daemon', 'timeout', + 'session_owner', 'config_file', 'use_rsync_xattrs'] + rq_remote_tunables = {'listen': True} + + # precedence for sources of values: 1) commandline, 2) cfg file, 3) + # defaults for this to work out we need to tell apart defaults from + # explicitly set options... so churn out the defaults here and call + # the parser with virgin values container. + defaults = op.get_default_values() + opts, args = op.parse_args(values=optparse.Values()) + args_orig = args[:] + r = rconf.get('resource_local') + if r: + if len(args) == 0: + args.append(None) + args[0] = r + r = rconf.get('resource_remote') + if r: + if len(args) == 0: + raise GsyncdError('local resource unspecfied') + elif len(args) == 1: + args.append(None) + args[1] = r + confdata = rconf.get('config') + if not (len(args) == 2 or + (len(args) == 1 and rconf.get('listen')) or + (len(args) <= 2 and confdata) or + rconf.get('url_print')): + sys.stderr.write("error: incorrect number of arguments\n\n") + sys.stderr.write(op.get_usage() + "\n") + sys.exit(1) + + verify = rconf.get('verify') + if verify: + logging.info(verify) + logging.info("Able to spawn gsyncd.py") + return + + restricted = os.getenv('_GSYNCD_RESTRICTED_') + + if restricted: + allopts = {} + allopts.update(opts.__dict__) + allopts.update(rconf) + bannedtuns = set(allopts.keys()) - set(remote_tunables) + if bannedtuns: + raise GsyncdError('following tunables cannot be set with ' + 'restricted SSH invocaton: ' + + ', '.join(bannedtuns)) + for k, v in rq_remote_tunables.items(): + if not k in allopts or allopts[k] != v: + raise GsyncdError('tunable %s is not set to value %s required ' + 'for restricted SSH invocaton' % + (k, v)) + + confrx = getattr(confdata, 'rx', None) + + def makersc(aa, check=True): + if not aa: + return ([], None, None) + ra = [resource.parse_url(u) for u in aa] + local = ra[0] + remote = None + if len(ra) > 1: + remote = ra[1] + if check and not local.can_connect_to(remote): + raise GsyncdError("%s cannot work with %s" % + (local.path, remote and remote.path)) + return (ra, local, remote) + if confrx: + # peers are regexen, don't try to parse them + if confrx == 'glob': + args = ['\A' + fnmatch.translate(a) for a in args] + canon_peers = args + namedict = {} + else: + dc = rconf.get('url_print') + rscs, local, remote = makersc(args_orig, not dc) + if dc: + for r in rscs: + print(r.get_url(**{'normal': {}, + 'canon': {'canonical': True}, + 'canon_esc': {'canonical': True, + 'escaped': True}}[dc])) + return + pa = ([], [], []) + urlprms = ( + {}, {'canonical': True}, {'canonical': True, 'escaped': True}) + for x in rscs: + for i in range(len(pa)): + pa[i].append(x.get_url(**urlprms[i])) + _, canon_peers, canon_esc_peers = pa + # creating the namedict, a dict representing various ways of referring + # to / repreenting peers to be fillable in config templates + mods = (lambda x: x, lambda x: x[ + 0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) + if remote: + rmap = {local: ('local', 'master'), remote: ('remote', 'slave')} + else: + rmap = {local: ('local', 'slave')} + namedict = {} + for i in range(len(rscs)): + x = rscs[i] + for name in rmap[x]: + for j in range(3): + namedict[mods[j](name)] = pa[j][i] + namedict[name + 'vol'] = x.volume + if name == 'remote': + namedict['remotehost'] = x.remotehost + if not 'config_file' in rconf: + rconf['config_file'] = os.path.join( + os.path.dirname(sys.argv[0]), "conf/gsyncd_template.conf") + + upgrade_config_file(rconf['config_file']) + gcnf = GConffile( + rconf['config_file'], canon_peers, + defaults.__dict__, opts.__dict__, namedict) + + checkpoint_change = False + if confdata: + opt_ok = norm(confdata.opt) in tunables + [None] + if confdata.op == 'check': + if opt_ok: + sys.exit(0) + else: + sys.exit(1) + elif not opt_ok: + raise GsyncdError("not a valid option: " + confdata.opt) + if confdata.op == 'get': + gcnf.get(confdata.opt) + elif confdata.op == 'set': + gcnf.set(confdata.opt, confdata.val, confdata.rx) + elif confdata.op == 'del': + gcnf.delete(confdata.opt, confdata.rx) + # when modifying checkpoint, it's important to make a log + # of that, so in that case we go on to set up logging even + # if its just config invocation + if confdata.opt == 'checkpoint' and confdata.op in ('set', 'del') and \ + not confdata.rx: + checkpoint_change = True + if not checkpoint_change: + return + + gconf.__dict__.update(defaults.__dict__) + gcnf.update_to(gconf.__dict__) + gconf.__dict__.update(opts.__dict__) + gconf.configinterface = gcnf + + delete = rconf.get('delete') + if delete: + logging.info('geo-replication delete') + # Delete pid file, status file, socket file + cleanup_paths = [] + if getattr(gconf, 'pid_file', None): + cleanup_paths.append(gconf.pid_file) + + if getattr(gconf, 'state_file', None): + cleanup_paths.append(gconf.state_file) + + if getattr(gconf, 'state_detail_file', None): + cleanup_paths.append(gconf.state_detail_file) + + if getattr(gconf, 'state_socket_unencoded', None): + cleanup_paths.append(gconf.state_socket_unencoded) + + cleanup_paths.append(rconf['config_file'][:-11] + "*") + + # Cleanup changelog working dirs + if getattr(gconf, 'working_dir', None): + try: + shutil.rmtree(gconf.working_dir) + except (IOError, OSError): + if sys.exc_info()[1].errno == ENOENT: + pass + else: + raise GsyncdError( + 'Error while removing working dir: %s' % + gconf.working_dir) + + for path in cleanup_paths: + # To delete temp files + for f in glob.glob(path + "*"): + _unlink(f) + return + + if restricted and gconf.allow_network: + ssh_conn = os.getenv('SSH_CONNECTION') + if not ssh_conn: + # legacy env var + ssh_conn = os.getenv('SSH_CLIENT') + if ssh_conn: + allowed_networks = [IPNetwork(a) + for a in gconf.allow_network.split(',')] + client_ip = IPAddress(ssh_conn.split()[0]) + allowed = False + for nw in allowed_networks: + if client_ip in nw: + allowed = True + break + if not allowed: + raise GsyncdError("client IP address is not allowed") + + ffd = rconf.get('feedback_fd') + if ffd: + fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) + + # normalize loglevel + lvl0 = gconf.log_level + if isinstance(lvl0, str): + lvl1 = lvl0.upper() + lvl2 = logging.getLevelName(lvl1) + # I have _never_ _ever_ seen such an utterly braindead + # error condition + if lvl2 == "Level " + lvl1: + raise GsyncdError('cannot recognize log level "%s"' % lvl0) + gconf.log_level = lvl2 + + if not privileged() and gconf.log_file_mbr: + gconf.log_file = gconf.log_file_mbr + + if checkpoint_change: + try: + GLogger._gsyncd_loginit(log_file=gconf.log_file, label='conf') + if confdata.op == 'set': + logging.info('checkpoint %s set' % confdata.val) + gcnf.delete('checkpoint_completed') + gcnf.delete('checkpoint_target') + elif confdata.op == 'del': + logging.info('checkpoint info was reset') + # if it is removing 'checkpoint' then we need + # to remove 'checkpoint_completed' and 'checkpoint_target' too + gcnf.delete('checkpoint_completed') + gcnf.delete('checkpoint_target') + + except IOError: + if sys.exc_info()[1].errno == ENOENT: + # directory of log path is not present, + # which happens if we get here from + # a peer-multiplexed "config-set checkpoint" + # (as that directory is created only on the + # original node) + pass + else: + raise + return + + create = rconf.get('create') + if create: + if getattr(gconf, 'state_file', None): + update_file(gconf.state_file, lambda f: f.write(create + '\n')) + return + + go_daemon = rconf['go_daemon'] + be_monitor = rconf.get('monitor') + + rscs, local, remote = makersc(args) + if not be_monitor and isinstance(remote, resource.SSH) and \ + go_daemon == 'should': + go_daemon = 'postconn' + log_file = None + else: + log_file = gconf.log_file + if be_monitor: + label = 'monitor' + elif remote: + # master + label = gconf.local_path + else: + label = 'slave' + startup(go_daemon=go_daemon, log_file=log_file, label=label) + resource.Popen.init_errhandler() + + if be_monitor: + return monitor(*rscs) + + logging.info("syncing: %s" % " -> ".join(r.url for r in rscs)) + if remote: + go_daemon = remote.connect_remote(go_daemon=go_daemon) + if go_daemon: + startup(go_daemon=go_daemon, log_file=gconf.log_file) + # complete remote connection in child + remote.connect_remote(go_daemon='done') + local.connect() + if ffd: + os.close(ffd) + local.service_loop(*[r for r in [remote] if r]) + + +if __name__ == "__main__": + main() diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py new file mode 100644 index 000000000..e6035e26b --- /dev/null +++ b/geo-replication/syncdaemon/libcxattr.py @@ -0,0 +1,99 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +from ctypes import CDLL, c_int, create_string_buffer +from ctypes.util import find_library + + +class Xattr(object): + + """singleton that wraps the extended attribues system + interface for python using ctypes + + Just implement it to the degree we need it, in particular + - we need just the l*xattr variants, ie. we never want symlinks to be + followed + - don't need size discovery for getxattr, as we always know the exact + sizes we expect + """ + + libc = CDLL(find_library("libc")) + + @classmethod + def geterrno(cls): + return c_int.in_dll(cls.libc, 'errno').value + + @classmethod + def raise_oserr(cls): + errn = cls.geterrno() + raise OSError(errn, os.strerror(errn)) + + @classmethod + def _query_xattr(cls, path, siz, syscall, *a): + if siz: + buf = create_string_buffer('\0' * siz) + else: + buf = None + ret = getattr(cls.libc, syscall)(*((path,) + a + (buf, siz))) + if ret == -1: + cls.raise_oserr() + if siz: + return buf.raw[:ret] + else: + return ret + + @classmethod + def lgetxattr(cls, path, attr, siz=0): + return cls._query_xattr(path, siz, 'lgetxattr', attr) + + @classmethod + def lgetxattr_buf(cls, path, attr): + """lgetxattr variant with size discovery""" + size = cls.lgetxattr(path, attr) + if size == -1: + cls.raise_oserr() + if size == 0: + return '' + return cls.lgetxattr(path, attr, size) + + @classmethod + def llistxattr(cls, path, siz=0): + ret = cls._query_xattr(path, siz, 'llistxattr') + if isinstance(ret, str): + ret = ret.split('\0') + return ret + + @classmethod + def lsetxattr(cls, path, attr, val): + ret = cls.libc.lsetxattr(path, attr, val, len(val), 0) + if ret == -1: + cls.raise_oserr() + + @classmethod + def lsetxattr_l(cls, path, attr, val): + """ lazy lsetxattr(): caller handles errno """ + cls.libc.lsetxattr(path, attr, val, len(val), 0) + + @classmethod + def lremovexattr(cls, path, attr): + ret = cls.libc.lremovexattr(path, attr) + if ret == -1: + cls.raise_oserr() + + @classmethod + def llistxattr_buf(cls, path): + """listxattr variant with size discovery""" + size = cls.llistxattr(path) + if size == -1: + cls.raise_oserr() + if size == 0: + return [] + return cls.llistxattr(path, size) diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py new file mode 100644 index 000000000..ec563b36f --- /dev/null +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -0,0 +1,76 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +from ctypes import CDLL, create_string_buffer, get_errno +from ctypes.util import find_library + + +class Changes(object): + libgfc = CDLL(find_library("gfchangelog"), use_errno=True) + + @classmethod + def geterrno(cls): + return get_errno() + + @classmethod + def raise_oserr(cls): + errn = cls.geterrno() + raise OSError(errn, os.strerror(errn)) + + @classmethod + def _get_api(cls, call): + return getattr(cls.libgfc, call) + + @classmethod + def cl_register(cls, brick, path, log_file, log_level, retries=0): + ret = cls._get_api('gf_changelog_register')(brick, path, + log_file, + log_level, retries) + if ret == -1: + cls.raise_oserr() + + @classmethod + def cl_scan(cls): + ret = cls._get_api('gf_changelog_scan')() + if ret == -1: + cls.raise_oserr() + + @classmethod + def cl_startfresh(cls): + ret = cls._get_api('gf_changelog_start_fresh')() + if ret == -1: + cls.raise_oserr() + + @classmethod + def cl_getchanges(cls): + """ remove hardcoding for path name length """ + def clsort(f): + return f.split('.')[-1] + changes = [] + buf = create_string_buffer('\0', 4096) + call = cls._get_api('gf_changelog_next_change') + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break + changes.append(buf.raw[:ret - 1]) + if ret == -1: + cls.raise_oserr() + # cleanup tracker + cls.cl_startfresh() + return sorted(changes, key=clsort) + + @classmethod + def cl_done(cls, clfile): + ret = cls._get_api('gf_changelog_done')(clfile) + if ret == -1: + cls.raise_oserr() diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py new file mode 100644 index 000000000..4301396f9 --- /dev/null +++ b/geo-replication/syncdaemon/master.py @@ -0,0 +1,1476 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +import sys +import time +import stat +import json +import logging +import socket +import string +import errno +from errno import ENOENT, ENODATA, EPIPE, EEXIST +from threading import Condition, Lock +from datetime import datetime +from gconf import gconf +from tempfile import NamedTemporaryFile +from syncdutils import Thread, GsyncdError, boolify, escape +from syncdutils import unescape, select, gauxpfx, md5hex, selfkill +from syncdutils import lstat, errno_wrap + +URXTIME = (-1, 0) + +# Utility functions to help us to get to closer proximity +# of the DRY principle (no, don't look for elevated or +# perspectivistic things here) + + +def _xtime_now(): + t = time.time() + sec = int(t) + nsec = int((t - sec) * 1000000) + return (sec, nsec) + + +def _volinfo_hook_relax_foreign(self): + volinfo_sys = self.get_sys_volinfo() + fgn_vi = volinfo_sys[self.KFGN] + if fgn_vi: + expiry = fgn_vi['timeout'] - int(time.time()) + 1 + logging.info('foreign volume info found, waiting %d sec for expiry' % + expiry) + time.sleep(expiry) + volinfo_sys = self.get_sys_volinfo() + return volinfo_sys + + +# The API! + +def gmaster_builder(excrawl=None): + """produce the GMaster class variant corresponding + to sync mode""" + this = sys.modules[__name__] + modemixin = gconf.special_sync_mode + if not modemixin: + modemixin = 'normal' + changemixin = isinstance(excrawl, str) and excrawl or gconf.change_detector + logging.info('setting up %s change detection mode' % changemixin) + modemixin = getattr(this, modemixin.capitalize() + 'Mixin') + crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') + sendmarkmixin = boolify( + gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin + purgemixin = boolify( + gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin + syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine + + class _GMaster(crawlmixin, modemixin, sendmarkmixin, + purgemixin, syncengine): + pass + return _GMaster + + +# Mixin classes that implement the data format +# and logic particularities of the certain +# sync modes + +class NormalMixin(object): + + """normal geo-rep behavior""" + + minus_infinity = URXTIME + + # following staticmethods ideally would be + # methods of an xtime object (in particular, + # implementing the hooks needed for comparison + # operators), but at this point we don't yet + # have a dedicated xtime class + + @staticmethod + def serialize_xtime(xt): + return "%d.%d" % tuple(xt) + + @staticmethod + def deserialize_xtime(xt): + return tuple(int(x) for x in xt.split(".")) + + @staticmethod + def native_xtime(xt): + return xt + + @staticmethod + def xtime_geq(xt0, xt1): + return xt0 >= xt1 + + def make_xtime_opts(self, is_master, opts): + if not 'create' in opts: + opts['create'] = is_master + if not 'default_xtime' in opts: + opts['default_xtime'] = URXTIME + + def xtime_low(self, rsc, path, **opts): + if rsc == self.master: + xt = rsc.server.xtime(path, self.uuid) + else: + xt = rsc.server.stime(path, self.uuid) + if isinstance(xt, int) and xt == ENODATA: + xt = rsc.server.xtime(path, self.uuid) + if not isinstance(xt, int): + self.slave.server.set_stime(path, self.uuid, xt) + if isinstance(xt, int) and xt != ENODATA: + return xt + if xt == ENODATA or xt < self.volmark: + if opts['create']: + xt = _xtime_now() + rsc.server.aggregated.set_xtime(path, self.uuid, xt) + else: + xt = opts['default_xtime'] + return xt + + def keepalive_payload_hook(self, timo, gap): + # first grab a reference as self.volinfo + # can be changed in main thread + vi = self.volinfo + if vi: + # then have a private copy which we can mod + vi = vi.copy() + vi['timeout'] = int(time.time()) + timo + else: + # send keep-alives more frequently to + # avoid a delay in announcing our volume info + # to slave if it becomes established in the + # meantime + gap = min(10, gap) + return (vi, gap) + + def volinfo_hook(self): + return self.get_sys_volinfo() + + def xtime_reversion_hook(self, path, xtl, xtr): + if xtr > xtl: + raise GsyncdError("timestamp corruption for " + path) + + def need_sync(self, e, xte, xtrd): + return xte > xtrd + + def set_slave_xtime(self, path, mark): + self.slave.server.set_stime(path, self.uuid, mark) + # self.slave.server.set_xtime_remote(path, self.uuid, mark) + + +class PartialMixin(NormalMixin): + + """a variant tuned towards operation with a master + that has partial info of the slave (brick typically)""" + + def xtime_reversion_hook(self, path, xtl, xtr): + pass + + +class RecoverMixin(NormalMixin): + + """a variant that differs from normal in terms + of ignoring non-indexed files""" + + @staticmethod + def make_xtime_opts(is_master, opts): + if not 'create' in opts: + opts['create'] = False + if not 'default_xtime' in opts: + opts['default_xtime'] = URXTIME + + def keepalive_payload_hook(self, timo, gap): + return (None, gap) + + def volinfo_hook(self): + return _volinfo_hook_relax_foreign(self) + +# Further mixins for certain tunable behaviors + + +class SendmarkNormalMixin(object): + + def sendmark_regular(self, *a, **kw): + return self.sendmark(*a, **kw) + + +class SendmarkRsyncMixin(object): + + def sendmark_regular(self, *a, **kw): + pass + + +class PurgeNormalMixin(object): + + def purge_missing(self, path, names): + self.slave.server.purge(path, names) + + +class PurgeNoopMixin(object): + + def purge_missing(self, path, names): + pass + + +class TarSSHEngine(object): + + """Sync engine that uses tar(1) piped over ssh(1) + for data transfers. Good for lots of small files. + """ + + def a_syncdata(self, files): + logging.debug('files: %s' % (files)) + for f in files: + pb = self.syncer.add(f) + + def regjob(se, xte, pb): + rv = pb.wait() + if rv[0]: + logging.debug('synced ' + se) + return True + else: + # stat check for file presence + st = lstat(se) + if isinstance(st, int): + return True + logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1])) + self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) + + def syncdata_wait(self): + if self.wait(self.FLAT_DIR_HIERARCHY, None): + return True + + def syncdata(self, files): + self.a_syncdata(files) + self.syncdata_wait() + + +class RsyncEngine(object): + + """Sync engine that uses rsync(1) for data transfers""" + + def a_syncdata(self, files): + logging.debug('files: %s' % (files)) + for f in files: + logging.debug('candidate for syncing %s' % f) + pb = self.syncer.add(f) + + def regjob(se, xte, pb): + rv = pb.wait() + if rv[0]: + logging.debug('synced ' + se) + return True + else: + if rv[1] in [23, 24]: + # stat to check if the file exist + st = lstat(se) + if isinstance(st, int): + # file got unlinked in the interim + return True + logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) + self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) + + def syncdata_wait(self): + if self.wait(self.FLAT_DIR_HIERARCHY, None): + return True + + def syncdata(self, files): + self.a_syncdata(files) + self.syncdata_wait() + + +class GMasterCommon(object): + + """abstract class impementling master role""" + + KFGN = 0 + KNAT = 1 + + def get_sys_volinfo(self): + """query volume marks on fs root + + err out on multiple foreign masters + """ + fgn_vis, nat_vi = ( + self.master.server.aggregated.foreign_volume_infos(), + self.master.server.aggregated.native_volume_info()) + fgn_vi = None + if fgn_vis: + if len(fgn_vis) > 1: + raise GsyncdError("cannot work with multiple foreign masters") + fgn_vi = fgn_vis[0] + return fgn_vi, nat_vi + + @property + def uuid(self): + if self.volinfo: + return self.volinfo['uuid'] + + @property + def volmark(self): + if self.volinfo: + return self.volinfo['volume_mark'] + + def xtime(self, path, *a, **opts): + """get amended xtime + + as of amending, we can create missing xtime, or + determine a valid value if what we get is expired + (as of the volume mark expiry); way of amendig + depends on @opts and on subject of query (master + or slave). + """ + if a: + rsc = a[0] + else: + rsc = self.master + self.make_xtime_opts(rsc == self.master, opts) + return self.xtime_low(rsc, path, **opts) + + def get_initial_crawl_data(self): + # while persisting only 'files_syncd' is non-zero, rest of + # the stats are nulls. lets keep it that way in case they + # are needed to be used some day... + default_data = {'files_syncd': 0, + 'files_remaining': 0, + 'bytes_remaining': 0, + 'purges_remaining': 0, + 'total_files_skipped': 0} + if getattr(gconf, 'state_detail_file', None): + try: + with open(gconf.state_detail_file, 'r+') as f: + loaded_data = json.load(f) + diff_data = set(default_data) - set(loaded_data) + if len(diff_data): + for i in diff_data: + loaded_data[i] = default_data[i] + return loaded_data + except IOError: + logging.warn('Creating new gconf.state_detail_file.') + # Create file with initial data + try: + with open(gconf.state_detail_file, 'wb') as f: + json.dump(default_data, f) + return default_data + except: + raise + return default_data + + def update_crawl_data(self): + if getattr(gconf, 'state_detail_file', None): + try: + same_dir = os.path.dirname(gconf.state_detail_file) + with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: + json.dump(self.total_crawl_stats, tmp) + tmp.flush() + os.fsync(tmp.fileno()) + os.rename(tmp.name, gconf.state_detail_file) + except (IOError, OSError): + raise + + def __init__(self, master, slave): + self.master = master + self.slave = slave + self.jobtab = {} + if boolify(gconf.use_tarssh): + logging.info("using 'tar over ssh' as the sync engine") + self.syncer = Syncer(slave, self.slave.tarssh) + else: + logging.info("using 'rsync' as the sync engine") + # partial transfer (cf. rsync(1)), that's normal + self.syncer = Syncer(slave, self.slave.rsync, [23, 24]) + # crawls vs. turns: + # - self.crawls is simply the number of crawl() invocations on root + # - one turn is a maximal consecutive sequence of crawls so that each + # crawl in it detects a change to be synced + # - self.turns is the number of turns since start + # - self.total_turns is a limit so that if self.turns reaches it, then + # we exit (for diagnostic purposes) + # so, eg., if the master fs changes unceasingly, self.turns will remain + # 0. + self.crawls = 0 + self.turns = 0 + self.total_turns = int(gconf.turns) + self.crawl_start = datetime.now() + self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} + self.total_crawl_stats = None + self.start = None + self.change_seen = None + # the actual volinfo we make use of + self.volinfo = None + self.terminate = False + self.sleep_interval = 1 + self.checkpoint_thread = None + self.current_files_skipped_count = 0 + self.skipped_gfid_list = [] + + def init_keep_alive(cls): + """start the keep-alive thread """ + timo = int(gconf.timeout or 0) + if timo > 0: + def keep_alive(): + while True: + vi, gap = cls.keepalive_payload_hook(timo, timo * 0.5) + cls.slave.server.keep_alive(vi) + time.sleep(gap) + t = Thread(target=keep_alive) + t.start() + + def should_crawl(cls): + return gconf.glusterd_uuid in cls.master.server.node_uuid() + + def register(self): + self.register() + + def crawlwrap(self, oneshot=False): + if oneshot: + # it's important to do this during the oneshot crawl as + # for a passive gsyncd (ie. in a replicate scenario) + # the keepalive thread would keep the connection alive. + self.init_keep_alive() + + # no need to maintain volinfo state machine. + # in a cascading setup, each geo-replication session is + # independent (ie. 'volume-mark' and 'xtime' are not + # propogated). This is beacuse the slave's xtime is now + # stored on the master itself. 'volume-mark' just identifies + # that we are in a cascading setup and need to enable + # 'geo-replication.ignore-pid-check' option. + volinfo_sys = self.volinfo_hook() + self.volinfo = volinfo_sys[self.KNAT] + inter_master = volinfo_sys[self.KFGN] + logging.info("%s master with volume id %s ..." % + (inter_master and "intermediate" or "primary", + self.uuid)) + gconf.configinterface.set('volume_id', self.uuid) + if self.volinfo: + if self.volinfo['retval']: + logging.warn("master cluster's info may not be valid %d" % + self.volinfo['retval']) + self.start_checkpoint_thread() + else: + raise GsyncdError("master volinfo unavailable") + self.total_crawl_stats = self.get_initial_crawl_data() + self.lastreport['time'] = time.time() + logging.info('crawl interval: %d seconds' % self.sleep_interval) + + t0 = time.time() + crawl = self.should_crawl() + while not self.terminate: + if self.start: + logging.debug("... crawl #%d done, took %.6f seconds" % + (self.crawls, time.time() - self.start)) + self.start = time.time() + should_display_info = self.start - self.lastreport['time'] >= 60 + if should_display_info: + logging.info("%d crawls, %d turns", + self.crawls - self.lastreport['crawls'], + self.turns - self.lastreport['turns']) + self.lastreport.update(crawls=self.crawls, + turns=self.turns, + time=self.start) + t1 = time.time() + if int(t1 - t0) >= 60: # lets hardcode this check to 60 seconds + crawl = self.should_crawl() + t0 = t1 + self.update_worker_remote_node() + if not crawl: + self.update_worker_health("Passive") + # bring up _this_ brick to the cluster stime + # which is min of cluster (but max of the replicas) + brick_stime = self.xtime('.', self.slave) + cluster_stime = self.master.server.aggregated.stime_mnt( + '.', '.'.join([str(self.uuid), str(gconf.slave_id)])) + logging.debug("Cluster stime: %s | Brick stime: %s" % + (repr(cluster_stime), repr(brick_stime))) + if not isinstance(cluster_stime, int): + if brick_stime < cluster_stime: + self.slave.server.set_stime( + self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) + time.sleep(5) + continue + self.update_worker_health("Active") + self.crawl() + if oneshot: + return + time.sleep(self.sleep_interval) + + @classmethod + def _checkpt_param(cls, chkpt, prm, xtimish=True): + """use config backend to lookup a parameter belonging to + checkpoint @chkpt""" + cprm = gconf.configinterface.get_realtime('checkpoint_' + prm) + if not cprm: + return + chkpt_mapped, val = cprm.split(':', 1) + if unescape(chkpt_mapped) != chkpt: + return + if xtimish: + val = cls.deserialize_xtime(val) + return val + + @classmethod + def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): + """use config backend to store a parameter associated + with checkpoint @chkpt""" + if xtimish: + val = cls.serialize_xtime(val) + gconf.configinterface.set( + 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) + + @staticmethod + def humantime(*tpair): + """format xtime-like (sec, nsec) pair to human readable format""" + ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ + strftime("%Y-%m-%d %H:%M:%S") + if len(tpair) > 1: + ts += '.' + str(tpair[1]) + return ts + + def _crawl_time_format(self, crawl_time): + # Ex: 5 years, 4 days, 20:23:10 + years, days = divmod(crawl_time.days, 365.25) + years = int(years) + days = int(days) + + date = "" + m, s = divmod(crawl_time.seconds, 60) + h, m = divmod(m, 60) + + if years != 0: + date += "%s %s " % (years, "year" if years == 1 else "years") + if days != 0: + date += "%s %s " % (days, "day" if days == 1 else "days") + + date += "%s:%s:%s" % (string.zfill(h, 2), + string.zfill(m, 2), string.zfill(s, 2)) + return date + + def checkpt_service(self, chan, chkpt): + """checkpoint service loop + + monitor and verify checkpoint status for @chkpt, and listen + for incoming requests for whom we serve a pretty-formatted + status report""" + while True: + chkpt = gconf.configinterface.get_realtime("checkpoint") + if not chkpt: + gconf.configinterface.delete("checkpoint_completed") + gconf.configinterface.delete("checkpoint_target") + # dummy loop for the case when there is no checkpt set + select([chan], [], []) + conn, _ = chan.accept() + conn.send('\0') + conn.close() + continue + + checkpt_tgt = self._checkpt_param(chkpt, 'target') + if not checkpt_tgt: + checkpt_tgt = self.xtime('.') + if isinstance(checkpt_tgt, int): + raise GsyncdError("master root directory is " + "unaccessible (%s)", + os.strerror(checkpt_tgt)) + self._set_checkpt_param(chkpt, 'target', checkpt_tgt) + logging.debug("checkpoint target %s has been determined " + "for checkpoint %s" % + (repr(checkpt_tgt), chkpt)) + + # check if the label is 'now' + chkpt_lbl = chkpt + try: + x1, x2 = chkpt.split(':') + if x1 == 'now': + chkpt_lbl = "as of " + self.humantime(x2) + except: + pass + completed = self._checkpt_param(chkpt, 'completed', xtimish=False) + if completed: + completed = tuple(int(x) for x in completed.split('.')) + s, _, _ = select([chan], [], [], (not completed) and 5 or None) + # either request made and we re-check to not + # give back stale data, or we still hunting for completion + if (self.native_xtime(checkpt_tgt) and ( + self.native_xtime(checkpt_tgt) < self.volmark)): + # indexing has been reset since setting the checkpoint + status = "is invalid" + else: + xtr = self.xtime('.', self.slave) + if isinstance(xtr, int): + raise GsyncdError("slave root directory is " + "unaccessible (%s)", + os.strerror(xtr)) + ncompleted = self.xtime_geq(xtr, checkpt_tgt) + if completed and not ncompleted: # stale data + logging.warn("completion time %s for checkpoint %s " + "became stale" % + (self.humantime(*completed), chkpt)) + completed = None + gconf.configinterface.delete('checkpoint_completed') + if ncompleted and not completed: # just reaching completion + completed = "%.6f" % time.time() + self._set_checkpt_param( + chkpt, 'completed', completed, xtimish=False) + completed = tuple(int(x) for x in completed.split('.')) + logging.info("checkpoint %s completed" % chkpt) + status = completed and \ + "completed at " + self.humantime(completed[0]) or \ + "not reached yet" + if s: + conn = None + try: + conn, _ = chan.accept() + try: + conn.send("checkpoint %s is %s\0" % + (chkpt_lbl, status)) + except: + exc = sys.exc_info()[1] + if ((isinstance(exc, OSError) or isinstance( + exc, IOError)) and exc.errno == EPIPE): + logging.debug('checkpoint client disconnected') + else: + raise + finally: + if conn: + conn.close() + + def start_checkpoint_thread(self): + """prepare and start checkpoint service""" + if self.checkpoint_thread or not ( + getattr(gconf, 'state_socket_unencoded', None) and getattr( + gconf, 'socketdir', None) + ): + return + chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + state_socket = os.path.join( + gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") + try: + os.unlink(state_socket) + except: + if sys.exc_info()[0] == OSError: + pass + chan.bind(state_socket) + chan.listen(1) + chkpt = gconf.configinterface.get_realtime("checkpoint") + t = Thread(target=self.checkpt_service, args=(chan, chkpt)) + t.start() + self.checkpoint_thread = t + + def add_job(self, path, label, job, *a, **kw): + """insert @job function to job table at @path with @label""" + if self.jobtab.get(path) is None: + self.jobtab[path] = [] + self.jobtab[path].append((label, a, lambda: job(*a, **kw))) + + def add_failjob(self, path, label): + """invoke .add_job with a job that does nothing just fails""" + logging.debug('salvaged: ' + label) + self.add_job(path, label, lambda: False) + + def wait(self, path, *args): + """perform jobs registered for @path + + Reset jobtab entry for @path, + determine success as the conjuction of + success of all the jobs. In case of + success, call .sendmark on @path + """ + jobs = self.jobtab.pop(path, []) + succeed = True + for j in jobs: + ret = j[-1]() + if not ret: + succeed = False + if succeed and not args[0] is None: + self.sendmark(path, *args) + return succeed + + def sendmark(self, path, mark, adct=None): + """update slave side xtime for @path to master side xtime + + also can send a setattr payload (see Server.setattr). + """ + if adct: + self.slave.server.setattr(path, adct) + self.set_slave_xtime(path, mark) + + +class GMasterChangelogMixin(GMasterCommon): + + """ changelog based change detection and syncing """ + + # index for change type and entry + IDX_START = 0 + IDX_END = 2 + + POS_GFID = 0 + POS_TYPE = 1 + POS_ENTRY1 = -1 + + TYPE_META = "M " + TYPE_GFID = "D " + TYPE_ENTRY = "E " + + # flat directory heirarchy for gfid based access + FLAT_DIR_HIERARCHY = '.' + + # maximum retries per changelog before giving up + MAX_RETRIES = 10 + + def fallback_xsync(self): + logging.info('falling back to xsync mode') + gconf.configinterface.set('change-detector', 'xsync') + selfkill() + + def setup_working_dir(self): + workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path)) + logfile = os.path.join(workdir, 'changes.log') + logging.debug('changelog working dir %s (log: %s)' % + (workdir, logfile)) + return (workdir, logfile) + + def process_change(self, change, done, retry): + pfx = gauxpfx() + clist = [] + entries = [] + meta_gfid = set() + datas = set() + + # basic crawl stats: files and bytes + files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} + try: + f = open(change, "r") + clist = f.readlines() + f.close() + except IOError: + raise + + def edct(op, **ed): + dct = {} + dct['op'] = op + for k in ed: + if k == 'stat': + st = ed[k] + dst = dct['stat'] = {} + dst['uid'] = st.st_uid + dst['gid'] = st.st_gid + dst['mode'] = st.st_mode + else: + dct[k] = ed[k] + return dct + + # entry counts (not purges) + def entry_update(): + files_pending['count'] += 1 + + # purge count + def purge_update(): + files_pending['purge'] += 1 + + for e in clist: + e = e.strip() + et = e[self.IDX_START:self.IDX_END] # entry type + ec = e[self.IDX_END:].split(' ') # rest of the bits + + if et == self.TYPE_ENTRY: + # extract information according to the type of + # the entry operation. create(), mkdir() and mknod() + # have mode, uid, gid information in the changelog + # itself, so no need to stat()... + ty = ec[self.POS_TYPE] + + # PARGFID/BNAME + en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1])) + # GFID of the entry + gfid = ec[self.POS_GFID] + + if ty in ['UNLINK', 'RMDIR']: + purge_update() + entries.append(edct(ty, gfid=gfid, entry=en)) + elif ty in ['CREATE', 'MKDIR', 'MKNOD']: + entry_update() + # stat information present in the changelog itself + entries.append(edct(ty, gfid=gfid, entry=en, + mode=int(ec[2]), + uid=int(ec[3]), gid=int(ec[4]))) + else: + # stat() to get mode and other information + go = os.path.join(pfx, gfid) + st = lstat(go) + if isinstance(st, int): + if ty == 'RENAME': # special hack for renames... + entries.append(edct('UNLINK', gfid=gfid, entry=en)) + else: + logging.debug( + 'file %s got purged in the interim' % go) + continue + + if ty == 'LINK': + entry_update() + entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) + elif ty == 'SYMLINK': + rl = errno_wrap(os.readlink, [en], [ENOENT]) + if isinstance(rl, int): + continue + entry_update() + entries.append( + edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) + elif ty == 'RENAME': + entry_update() + e1 = unescape( + os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) + entries.append( + edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) + else: + logging.warn('ignoring %s [op %s]' % (gfid, ty)) + elif et == self.TYPE_GFID: + datas.add(os.path.join(pfx, ec[0])) + elif et == self.TYPE_META: + if ec[1] == 'SETATTR': # only setattr's for now... + meta_gfid.add(os.path.join(pfx, ec[0])) + else: + logging.warn('got invalid changelog type: %s' % (et)) + logging.debug('entries: %s' % repr(entries)) + if not retry: + self.update_worker_cumilitive_status(files_pending) + # sync namespace + if entries: + self.slave.server.entry_ops(entries) + # sync metadata + if meta_gfid: + meta_entries = [] + for go in meta_gfid: + st = lstat(go) + if isinstance(st, int): + logging.debug('file %s got purged in the interim' % go) + continue + meta_entries.append(edct('META', go=go, stat=st)) + if meta_entries: + self.slave.server.meta_ops(meta_entries) + # sync data + if datas: + self.a_syncdata(datas) + + def process(self, changes, done=1): + tries = 0 + retry = False + + while True: + self.skipped_gfid_list = [] + self.current_files_skipped_count = 0 + + # first, fire all changelog transfers in parallel. entry and + # metadata are performed synchronously, therefore in serial. + # However at the end of each changelog, data is synchronized + # with syncdata_async() - which means it is serial w.r.t + # entries/metadata of that changelog but happens in parallel + # with data of other changelogs. + + for change in changes: + logging.debug('processing change %s' % change) + self.process_change(change, done, retry) + if not retry: + # number of changelogs processed in the batch + self.turns += 1 + + # Now we wait for all the data transfers fired off in the above + # step to complete. Note that this is not ideal either. Ideally + # we want to trigger the entry/meta-data transfer of the next + # batch while waiting for the data transfer of the current batch + # to finish. + + # Note that the reason to wait for the data transfer (vs doing it + # completely in the background and call the changelog_done() + # asynchronously) is because this waiting acts as a "backpressure" + # and prevents a spiraling increase of wait stubs from consuming + # unbounded memory and resources. + + # update the slave's time with the timestamp of the _last_ + # changelog file time suffix. Since, the changelog prefix time + # is the time when the changelog was rolled over, introduce a + # tolerence of 1 second to counter the small delta b/w the + # marker update and gettimeofday(). + # NOTE: this is only for changelog mode, not xsync. + + # @change is the last changelog (therefore max time for this batch) + if self.syncdata_wait(): + if done: + xtl = (int(change.split('.')[-1]) - 1, 0) + self.upd_stime(xtl) + map(self.master.server.changelog_done, changes) + self.update_worker_files_syncd() + break + + # We do not know which changelog transfer failed, retry everything. + retry = True + tries += 1 + if tries == self.MAX_RETRIES: + logging.warn('changelogs %s could not be processed - ' + 'moving on...' % + ' '.join(map(os.path.basename, changes))) + self.update_worker_total_files_skipped( + self.current_files_skipped_count) + logging.warn('SKIPPED GFID = %s' % + ','.join(self.skipped_gfid_list)) + self.update_worker_files_syncd() + if done: + xtl = (int(change.split('.')[-1]) - 1, 0) + self.upd_stime(xtl) + map(self.master.server.changelog_done, changes) + break + # it's either entry_ops() or Rsync that failed to do it's + # job. Mostly it's entry_ops() [which currently has a problem + # of failing to create an entry but failing to return an errno] + # Therefore we do not know if it's either Rsync or the freaking + # entry_ops() that failed... so we retry the _whole_ changelog + # again. + # TODO: remove entry retries when it's gets fixed. + logging.warn('incomplete sync, retrying changelogs: %s' % + ' '.join(map(os.path.basename, changes))) + time.sleep(0.5) + + def upd_stime(self, stime, path=None): + if not path: + path = self.FLAT_DIR_HIERARCHY + if not stime == URXTIME: + self.sendmark(path, stime) + + def get_worker_status_file(self): + file_name = gconf.local_path + '.status' + file_name = file_name.replace("/", "_") + worker_status_file = gconf.georep_session_working_dir + file_name + return worker_status_file + + def update_worker_status(self, key, value): + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data[key] = value + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + default_data[key] = value + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise + + def update_worker_cumilitive_status(self, files_pending): + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data['files_remaining'] = files_pending['count'] + loaded_data['bytes_remaining'] = files_pending['bytes'] + loaded_data['purges_remaining'] = files_pending['purge'] + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + default_data['files_remaining'] = files_pending['count'] + default_data['bytes_remaining'] = files_pending['bytes'] + default_data['purges_remaining'] = files_pending['purge'] + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise + + def update_worker_remote_node(self): + node = sys.argv[-1] + node = node.split("@")[-1] + remote_node_ip = node.split(":")[0] + remote_node_vol = node.split(":")[3] + remote_node = remote_node_ip + '::' + remote_node_vol + self.update_worker_status('remote_node', remote_node) + + def update_worker_health(self, state): + self.update_worker_status('worker status', state) + + def update_worker_crawl_status(self, state): + self.update_worker_status('crawl status', state) + + def update_worker_files_syncd(self): + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data['files_syncd'] += loaded_data['files_remaining'] + loaded_data['files_remaining'] = 0 + loaded_data['bytes_remaining'] = 0 + loaded_data['purges_remaining'] = 0 + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise + + def update_worker_files_remaining(self, state): + self.update_worker_status('files_remaining', state) + + def update_worker_bytes_remaining(self, state): + self.update_worker_status('bytes_remaining', state) + + def update_worker_purges_remaining(self, state): + self.update_worker_status('purges_remaining', state) + + def update_worker_total_files_skipped(self, value): + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", + "files_syncd": 0, + "files_remaining": 0, + "bytes_remaining": 0, + "purges_remaining": 0, + "total_files_skipped": 0} + worker_status_file = self.get_worker_status_file() + try: + with open(worker_status_file, 'r+') as f: + loaded_data = json.load(f) + loaded_data['total_files_skipped'] = value + loaded_data['files_remaining'] -= value + os.ftruncate(f.fileno(), 0) + os.lseek(f.fileno(), 0, os.SEEK_SET) + json.dump(loaded_data, f) + f.flush() + os.fsync(f.fileno()) + except (IOError, OSError, ValueError): + logging.info('Creating new %s' % worker_status_file) + try: + with open(worker_status_file, 'wb') as f: + default_data['total_files_skipped'] = value + json.dump(default_data, f) + f.flush() + os.fsync(f.fileno()) + except: + raise + + def crawl(self): + self.update_worker_crawl_status("Changelog Crawl") + changes = [] + # get stime (from the brick) and purge changelogs + # that are _historical_ to that time. + purge_time = self.xtime('.', self.slave) + if isinstance(purge_time, int): + purge_time = None + try: + self.master.server.changelog_scan() + self.crawls += 1 + except OSError: + self.fallback_xsync() + self.update_worker_crawl_status("Hybrid Crawl") + changes = self.master.server.changelog_getchanges() + if changes: + if purge_time: + logging.info("slave's time: %s" % repr(purge_time)) + processed = [x for x in changes + if int(x.split('.')[-1]) < purge_time[0]] + for pr in processed: + logging.info( + 'skipping already processed change: %s...' % + os.path.basename(pr)) + self.master.server.changelog_done(pr) + changes.remove(pr) + logging.debug('processing changes %s' % repr(changes)) + if changes: + self.process(changes) + + def register(self): + (workdir, logfile) = self.setup_working_dir() + self.sleep_interval = int(gconf.change_interval) + # register with the changelog library + try: + # 9 == log level (DEBUG) + # 5 == connection retries + self.master.server.changelog_register(gconf.local_path, + workdir, logfile, 9, 5) + except OSError: + self.fallback_xsync() + # control should not reach here + raise + + +class GMasterXsyncMixin(GMasterChangelogMixin): + + """ + This crawl needs to be xtime based (as of now + it's not. this is beacuse we generate CHANGELOG + file during each crawl which is then processed + by process_change()). + For now it's used as a one-shot initial sync + mechanism and only syncs directories, regular + files, hardlinks and symlinks. + """ + + XSYNC_MAX_ENTRIES = 1 << 13 + + def register(self): + self.counter = 0 + self.comlist = [] + self.stimes = [] + self.sleep_interval = 60 + self.tempdir = self.setup_working_dir()[0] + self.tempdir = os.path.join(self.tempdir, 'xsync') + logging.info('xsync temp directory: %s' % self.tempdir) + try: + os.makedirs(self.tempdir) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EEXIST and os.path.isdir(self.tempdir): + pass + else: + raise + + def crawl(self): + """ + event dispatcher thread + + this thread dispatches either changelog or synchronizes stime. + additionally terminates itself on recieving a 'finale' event + """ + def Xsyncer(): + self.Xcrawl() + t = Thread(target=Xsyncer) + t.start() + logging.info('starting hybrid crawl...') + self.update_worker_crawl_status("Hybrid Crawl") + while True: + try: + item = self.comlist.pop(0) + if item[0] == 'finale': + logging.info('finished hybrid crawl syncing') + break + elif item[0] == 'xsync': + logging.info('processing xsync changelog %s' % (item[1])) + self.process([item[1]], 0) + elif item[0] == 'stime': + logging.debug('setting slave time: %s' % repr(item[1])) + self.upd_stime(item[1][1], item[1][0]) + else: + logging.warn('unknown tuple in comlist (%s)' % repr(item)) + except IndexError: + time.sleep(1) + + def write_entry_change(self, prefix, data=[]): + self.fh.write("%s %s\n" % (prefix, ' '.join(data))) + + def open(self): + try: + self.xsync_change = os.path.join( + self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time()))) + self.fh = open(self.xsync_change, 'w') + except IOError: + raise + + def close(self): + self.fh.close() + + def fname(self): + return self.xsync_change + + def put(self, mark, item): + self.comlist.append((mark, item)) + + def sync_xsync(self, last): + """schedule a processing of changelog""" + self.close() + self.put('xsync', self.fname()) + self.counter = 0 + if not last: + time.sleep(1) # make sure changelogs are 1 second apart + self.open() + + def sync_stime(self, stime=None, last=False): + """schedule a stime synchronization""" + if stime: + self.put('stime', stime) + if last: + self.put('finale', None) + + def sync_done(self, stime=[], last=False): + self.sync_xsync(last) + if stime: + # Send last as True only for last stime entry + for st in stime[:-1]: + self.sync_stime(st, False) + + if stime and stime[-1]: + self.sync_stime(stime[-1], last) + + def is_sticky(self, path, mo): + """check for DHTs linkto sticky bit file""" + sticky = False + if mo & 01000: + sticky = self.master.server.linkto_check(path) + return sticky + + def Xcrawl(self, path='.', xtr_root=None): + """ + generate a CHANGELOG file consumable by process_change. + + slave's xtime (stime) is _cached_ for comparisons across + the filesystem tree, but set after directory synchronization. + """ + if path == '.': + self.open() + self.crawls += 1 + if not xtr_root: + # get the root stime and use it for all comparisons + xtr_root = self.xtime('.', self.slave) + if isinstance(xtr_root, int): + if xtr_root != ENOENT: + logging.warn("slave cluster not returning the " + "correct xtime for root (%d)" % xtr_root) + xtr_root = self.minus_infinity + xtl = self.xtime(path) + if isinstance(xtl, int): + logging.warn("master cluster's xtime not found") + xtr = self.xtime(path, self.slave) + if isinstance(xtr, int): + if xtr != ENOENT: + logging.warn("slave cluster not returning the " + "correct xtime for %s (%d)" % (path, xtr)) + xtr = self.minus_infinity + xtr = max(xtr, xtr_root) + if not self.need_sync(path, xtl, xtr): + if path == '.': + self.sync_done([(path, xtl)], True) + return + self.xtime_reversion_hook(path, xtl, xtr) + logging.debug("entering " + path) + dem = self.master.server.entries(path) + pargfid = self.master.server.gfid(path) + if isinstance(pargfid, int): + logging.warn('skipping directory %s' % (path)) + for e in dem: + bname = e + e = os.path.join(path, e) + xte = self.xtime(e) + if isinstance(xte, int): + logging.warn("irregular xtime for %s: %s" % + (e, errno.errorcode[xte])) + continue + if not self.need_sync(e, xte, xtr): + continue + st = self.master.server.lstat(e) + if isinstance(st, int): + logging.warn('%s got purged in the interim ...' % e) + continue + if self.is_sticky(e, st.st_mode): + logging.debug('ignoring sticky bit file %s' % e) + continue + gfid = self.master.server.gfid(e) + if isinstance(gfid, int): + logging.warn('skipping entry %s..' % e) + continue + mo = st.st_mode + self.counter += 1 + if self.counter == self.XSYNC_MAX_ENTRIES: + self.sync_done(self.stimes, False) + self.stimes = [] + if stat.S_ISDIR(mo): + self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str( + st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, + bname))]) + self.Xcrawl(e, xtr_root) + self.stimes.append((e, xte)) + elif stat.S_ISLNK(mo): + self.write_entry_change( + "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, + bname))]) + elif stat.S_ISREG(mo): + nlink = st.st_nlink + nlink -= 1 # fixup backend stat link count + # if a file has a hardlink, create a Changelog entry as + # 'LINK' so the slave side will decide if to create the + # new entry, or to create link. + if nlink == 1: + self.write_entry_change("E", + [gfid, 'MKNOD', str(mo), + str(st.st_uid), + str(st.st_gid), + escape(os.path.join( + pargfid, bname))]) + else: + self.write_entry_change( + "E", [gfid, 'LINK', escape(os.path.join(pargfid, + bname))]) + self.write_entry_change("D", [gfid]) + if path == '.': + self.stimes.append((path, xtl)) + self.sync_done(self.stimes, True) + + +class BoxClosedErr(Exception): + pass + + +class PostBox(list): + + """synchronized collection for storing things thought of as "requests" """ + + def __init__(self, *a): + list.__init__(self, *a) + # too bad Python stdlib does not have read/write locks... + # it would suffivce to grab the lock in .append as reader, in .close as + # writer + self.lever = Condition() + self.open = True + self.done = False + + def wait(self): + """wait on requests to be processed""" + self.lever.acquire() + if not self.done: + self.lever.wait() + self.lever.release() + return self.result + + def wakeup(self, data): + """wake up requestors with the result""" + self.result = data + self.lever.acquire() + self.done = True + self.lever.notifyAll() + self.lever.release() + + def append(self, e): + """post a request""" + self.lever.acquire() + if not self.open: + raise BoxClosedErr + list.append(self, e) + self.lever.release() + + def close(self): + """prohibit the posting of further requests""" + self.lever.acquire() + self.open = False + self.lever.release() + + +class Syncer(object): + + """a staged queue to relay rsync requests to rsync workers + + By "staged queue" its meant that when a consumer comes to the + queue, it takes _all_ entries, leaving the queue empty. + (I don't know if there is an official term for this pattern.) + + The queue uses a PostBox to accumulate incoming items. + When a consumer (rsync worker) comes, a new PostBox is + set up and the old one is passed on to the consumer. + + Instead of the simplistic scheme of having one big lock + which synchronizes both the addition of new items and + PostBox exchanges, use a separate lock to arbitrate consumers, + and rely on PostBox's synchronization mechanisms take + care about additions. + + There is a corner case racy situation, producers vs. consumers, + which is not handled by this scheme: namely, when the PostBox + exchange occurs in between being passed to the producer for posting + and the post placement. But that's what Postbox.close is for: + such a posting will find the PostBox closed, in which case + the producer can re-try posting against the actual PostBox of + the queue. + + To aid accumlation of items in the PostBoxen before grabbed + by an rsync worker, the worker goes to sleep a bit after + each completed syncjob. + """ + + def __init__(self, slave, sync_engine, resilient_errnos=[]): + """spawn worker threads""" + self.slave = slave + self.lock = Lock() + self.pb = PostBox() + self.sync_engine = sync_engine + self.errnos_ok = resilient_errnos + for i in range(int(gconf.sync_jobs)): + t = Thread(target=self.syncjob) + t.start() + + def syncjob(self): + """the life of a worker""" + while True: + pb = None + while True: + self.lock.acquire() + if self.pb: + pb, self.pb = self.pb, PostBox() + self.lock.release() + if pb: + break + time.sleep(0.5) + pb.close() + po = self.sync_engine(pb) + if po.returncode == 0: + ret = (True, 0) + elif po.returncode in self.errnos_ok: + ret = (False, po.returncode) + else: + po.errfail() + pb.wakeup(ret) + + def add(self, e): + while True: + pb = self.pb + try: + pb.append(e) + return pb + except BoxClosedErr: + pass diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py new file mode 100644 index 000000000..8ed6f8326 --- /dev/null +++ b/geo-replication/syncdaemon/monitor.py @@ -0,0 +1,271 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +import sys +import time +import signal +import logging +import uuid +import xml.etree.ElementTree as XET +from subprocess import PIPE +from resource import Popen, FILE, GLUSTER, SSH +from threading import Lock +from gconf import gconf +from syncdutils import update_file, select, waitpid +from syncdutils import set_term_handler, is_host_local, GsyncdError +from syncdutils import escape, Thread, finalize, memoize + + +class Volinfo(object): + + def __init__(self, vol, host='localhost', prelude=[]): + po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, + 'volume', 'info', vol], + stdout=PIPE, stderr=PIPE) + vix = po.stdout.read() + po.wait() + po.terminate_geterr() + vi = XET.fromstring(vix) + if vi.find('opRet').text != '0': + if prelude: + via = '(via %s) ' % prelude.join(' ') + else: + via = ' ' + raise GsyncdError('getting volume info of %s%s ' + 'failed with errorcode %s', + (vol, via, vi.find('opErrno').text)) + self.tree = vi + self.volume = vol + self.host = host + + def get(self, elem): + return self.tree.findall('.//' + elem) + + @property + @memoize + def bricks(self): + def bparse(b): + host, dirp = b.text.split(':', 2) + return {'host': host, 'dir': dirp} + return [bparse(b) for b in self.get('brick')] + + @property + @memoize + def uuid(self): + ids = self.get('id') + if len(ids) != 1: + raise GsyncdError("volume info of %s obtained from %s: " + "ambiguous uuid", + self.volume, self.host) + return ids[0].text + + +class Monitor(object): + + """class which spawns and manages gsyncd workers""" + + ST_INIT = 'Initializing...' + ST_STABLE = 'Stable' + ST_FAULTY = 'faulty' + ST_INCON = 'inconsistent' + _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON] + + def __init__(self): + self.lock = Lock() + self.state = {} + + def set_state(self, state, w=None): + """set the state that can be used by external agents + like glusterd for status reporting""" + computestate = lambda: self.state and self._ST_ORD[ + max(self._ST_ORD.index(s) for s in self.state.values())] + if w: + self.lock.acquire() + old_state = computestate() + self.state[w] = state + state = computestate() + self.lock.release() + if state != old_state: + self.set_state(state) + else: + logging.info('new state: %s' % state) + if getattr(gconf, 'state_file', None): + update_file(gconf.state_file, lambda f: f.write(state + '\n')) + + @staticmethod + def terminate(): + # relax one SIGTERM by setting a handler that sets back + # standard handler + set_term_handler(lambda *a: set_term_handler()) + # give a chance to graceful exit + os.kill(-os.getpid(), signal.SIGTERM) + + def monitor(self, w, argv, cpids): + """the monitor loop + + Basic logic is a blantantly simple blunt heuristics: + if spawned client survives 60 secs, it's considered OK. + This servers us pretty well as it's not vulneralbe to + any kind of irregular behavior of the child... + + ... well, except for one: if children is hung up on + waiting for some event, it can survive aeons, still + will be defunct. So we tweak the above logic to + expect the worker to send us a signal within 60 secs + (in the form of closing its end of a pipe). The worker + does this when it's done with the setup stage + ready to enter the service loop (note it's the setup + stage which is vulnerable to hangs -- the full + blown worker blows up on EPIPE if the net goes down, + due to the keep-alive thread) + """ + + self.set_state(self.ST_INIT, w) + ret = 0 + + def nwait(p, o=0): + p2, r = waitpid(p, o) + if not p2: + return + return r + + def exit_signalled(s): + """ child teminated due to receipt of SIGUSR1 """ + return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1)) + + def exit_status(s): + if os.WIFEXITED(s): + return os.WEXITSTATUS(s) + return 1 + conn_timeout = int(gconf.connection_timeout) + while ret in (0, 1): + logging.info('-' * conn_timeout) + logging.info('starting gsyncd worker') + pr, pw = os.pipe() + cpid = os.fork() + if cpid == 0: + os.close(pr) + os.execv(sys.executable, argv + ['--feedback-fd', str(pw), + '--local-path', w[0], + '--local-id', + '.' + escape(w[0]), + '--resource-remote', w[1]]) + self.lock.acquire() + cpids.add(cpid) + self.lock.release() + os.close(pw) + t0 = time.time() + so = select((pr,), (), (), conn_timeout)[0] + os.close(pr) + if so: + ret = nwait(cpid, os.WNOHANG) + if ret is not None: + logging.info("worker(%s) died before establishing " + "connection" % w[0]) + else: + logging.debug("worker(%s) connected" % w[0]) + while time.time() < t0 + conn_timeout: + ret = nwait(cpid, os.WNOHANG) + if ret is not None: + logging.info("worker(%s) died in startup " + "phase" % w[0]) + break + time.sleep(1) + else: + logging.info("worker(%s) not confirmed in %d sec, " + "aborting it" % (w[0], conn_timeout)) + os.kill(cpid, signal.SIGKILL) + ret = nwait(cpid) + if ret is None: + self.set_state(self.ST_STABLE, w) + ret = nwait(cpid) + if exit_signalled(ret): + ret = 0 + else: + ret = exit_status(ret) + if ret in (0, 1): + self.set_state(self.ST_FAULTY, w) + time.sleep(10) + self.set_state(self.ST_INCON, w) + return ret + + def multiplex(self, wspx, suuid): + argv = sys.argv[:] + for o in ('-N', '--no-daemon', '--monitor'): + while o in argv: + argv.remove(o) + argv.extend(('-N', '-p', '', '--slave-id', suuid)) + argv.insert(0, os.path.basename(sys.executable)) + + cpids = set() + ta = [] + for wx in wspx: + def wmon(w): + cpid, _ = self.monitor(w, argv, cpids) + time.sleep(1) + self.lock.acquire() + for cpid in cpids: + os.kill(cpid, signal.SIGKILL) + self.lock.release() + finalize(exval=1) + t = Thread(target=wmon, args=[wx]) + t.start() + ta.append(t) + for t in ta: + t.join() + + +def distribute(*resources): + master, slave = resources + mvol = Volinfo(master.volume, master.host) + logging.debug('master bricks: ' + repr(mvol.bricks)) + prelude = [] + si = slave + if isinstance(slave, SSH): + prelude = gconf.ssh_command.split() + [slave.remote_addr] + si = slave.inner_rsc + logging.debug('slave SSH gateway: ' + slave.remote_addr) + if isinstance(si, FILE): + sbricks = {'host': 'localhost', 'dir': si.path} + suuid = uuid.uuid5(uuid.NAMESPACE_URL, slave.get_url(canonical=True)) + elif isinstance(si, GLUSTER): + svol = Volinfo(si.volume, si.host, prelude) + sbricks = svol.bricks + suuid = svol.uuid + else: + raise GsyncdError("unkown slave type " + slave.url) + logging.info('slave bricks: ' + repr(sbricks)) + if isinstance(si, FILE): + slaves = [slave.url] + else: + slavenodes = set(b['host'] for b in sbricks) + if isinstance(slave, SSH) and not gconf.isolated_slave: + rap = SSH.parse_ssh_address(slave) + slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url + for h in slavenodes] + else: + slavevols = [h + ':' + si.volume for h in slavenodes] + if isinstance(slave, SSH): + slaves = ['ssh://' + rap.remote_addr + ':' + v + for v in slavevols] + else: + slaves = slavevols + + workerspex = [(brick['dir'], slaves[idx % len(slaves)]) + for idx, brick in enumerate(mvol.bricks) + if is_host_local(brick['host'])] + logging.info('worker specs: ' + repr(workerspex)) + return workerspex, suuid + + +def monitor(*resources): + """oh yeah, actually Monitor is used as singleton, too""" + return Monitor().multiplex(*distribute(*resources)) diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py new file mode 100644 index 000000000..d7b17dda7 --- /dev/null +++ b/geo-replication/syncdaemon/repce.py @@ -0,0 +1,247 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +import sys +import time +import logging +from threading import Condition +try: + import thread +except ImportError: + # py 3 + import _thread as thread +try: + from Queue import Queue +except ImportError: + # py 3 + from queue import Queue +try: + import cPickle as pickle +except ImportError: + # py 3 + import pickle + +from syncdutils import Thread, select + +pickle_proto = -1 +repce_version = 1.0 + + +def ioparse(i, o): + if isinstance(i, int): + i = os.fdopen(i) + # rely on duck typing for recognizing + # streams as that works uniformly + # in py2 and py3 + if hasattr(o, 'fileno'): + o = o.fileno() + return (i, o) + + +def send(out, *args): + """pickle args and write out wholly in one syscall + + ie. not use the ability of pickle to dump directly to + a stream, as that would potentially mess up messages + by interleaving them + """ + os.write(out, pickle.dumps(args, pickle_proto)) + + +def recv(inf): + """load an object from input stream""" + return pickle.load(inf) + + +class RepceServer(object): + + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce + + ... also our homebrewed RPC backend where the transport layer is + reduced to a pair of filehandles. + + This is the server component. + """ + + def __init__(self, obj, i, o, wnum=6): + """register a backend object .obj to which incoming messages + are dispatched, also incoming/outcoming streams + """ + self.obj = obj + self.inf, self.out = ioparse(i, o) + self.wnum = wnum + self.q = Queue() + + def service_loop(self): + """fire up worker threads, get messages and dispatch among them""" + for i in range(self.wnum): + t = Thread(target=self.worker) + t.start() + try: + while True: + self.q.put(recv(self.inf)) + except EOFError: + logging.info("terminating on reaching EOF.") + + def worker(self): + """life of a worker + + Get message, extract its id, method name and arguments + (kwargs not supported), call method on .obj. + Send back message id + return value. + If method call throws an exception, rescue it, and send + back the exception as result (with flag marking it as + exception). + """ + while True: + in_data = self.q.get(True) + rid = in_data[0] + rmeth = in_data[1] + exc = False + if rmeth == '__repce_version__': + res = repce_version + else: + try: + res = getattr(self.obj, rmeth)(*in_data[2:]) + except: + res = sys.exc_info()[1] + exc = True + logging.exception("call failed: ") + send(self.out, rid, exc, res) + + +class RepceJob(object): + + """class representing message status we can use + for waiting on reply""" + + def __init__(self, cbk): + """ + - .rid: (process-wise) unique id + - .cbk: what we do upon receiving reply + """ + self.rid = (os.getpid(), thread.get_ident(), time.time()) + self.cbk = cbk + self.lever = Condition() + self.done = False + + def __repr__(self): + return ':'.join([str(x) for x in self.rid]) + + def wait(self): + self.lever.acquire() + if not self.done: + self.lever.wait() + self.lever.release() + return self.result + + def wakeup(self, data): + self.result = data + self.lever.acquire() + self.done = True + self.lever.notify() + self.lever.release() + + +class RepceClient(object): + + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce + + ... also our homebrewed RPC backend where the transport layer is + reduced to a pair of filehandles. + + This is the client component. + """ + + def __init__(self, i, o): + self.inf, self.out = ioparse(i, o) + self.jtab = {} + t = Thread(target=self.listen) + t.start() + + def listen(self): + while True: + select((self.inf,), (), ()) + rid, exc, res = recv(self.inf) + rjob = self.jtab.pop(rid) + if rjob.cbk: + rjob.cbk(rjob, [exc, res]) + + def push(self, meth, *args, **kw): + """wrap arguments in a RepceJob, send them to server + and return the RepceJob + + @cbk to pass on RepceJob can be given as kwarg. + """ + cbk = kw.get('cbk') + if not cbk: + def cbk(rj, res): + if res[0]: + raise res[1] + rjob = RepceJob(cbk) + self.jtab[rjob.rid] = rjob + logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args))) + send(self.out, rjob.rid, meth, *args) + return rjob + + def __call__(self, meth, *args): + """RePCe client is callabe, calling it implements a synchronous + remote call. + + We do a .push with a cbk which does a wakeup upon receiving anwser, + then wait on the RepceJob. + """ + rjob = self.push( + meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) + exc, res = rjob.wait() + if exc: + logging.error('call %s (%s) failed on peer with %s' % + (repr(rjob), meth, str(type(res).__name__))) + raise res + logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res))) + return res + + class mprx(object): + + """method proxy, standard trick to implement rubyesque + method_missing in Python + + A class is a closure factory, you know what I mean, or go read + some SICP. + """ + + def __init__(self, ins, meth): + self.ins = ins + self.meth = meth + + def __call__(self, *a): + return self.ins(self.meth, *a) + + def __getattr__(self, meth): + """this implements transparent method dispatch to remote object, + so that you don't need to call the RepceClient instance like + + rclient('how_old_are_you_if_born_in', 1979) + + but you can make it into an ordinary method call like + + rclient.how_old_are_you_if_born_in(1979) + """ + return self.mprx(self, meth) + + def __version__(self): + """used in handshake to verify compatibility""" + d = {'proto': self('__repce_version__')} + try: + d['object'] = self('version') + except AttributeError: + pass + return d diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py new file mode 100644 index 000000000..e3cf33ffd --- /dev/null +++ b/geo-replication/syncdaemon/resource.py @@ -0,0 +1,1413 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import re +import os +import sys +import stat +import time +import fcntl +import errno +import types +import struct +import socket +import logging +import tempfile +import threading +import subprocess +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP +from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL +from select import error as SelectError + +from gconf import gconf +import repce +from repce import RepceServer, RepceClient +from master import gmaster_builder +import syncdutils +from syncdutils import GsyncdError, select, privileged, boolify, funcode +from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat + +UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') +HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) +UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") + + +def sup(x, *a, **kw): + """a rubyesque "super" for python ;) + + invoke caller method in parent class with given args. + """ + return getattr(super(type(x), x), + sys._getframe(1).f_code.co_name)(*a, **kw) + + +def desugar(ustr): + """transform sugared url strings to standard <scheme>://<urlbody> form + + parsing logic enforces the constraint that sugared forms should contatin + a ':' or a '/', which ensures that sugared urls do not conflict with + gluster volume names. + """ + m = re.match('([^:]*):(.*)', ustr) + if m: + if not m.groups()[0]: + return "gluster://localhost" + ustr + elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]): + return "ssh://" + ustr + else: + return "gluster://" + ustr + else: + if ustr[0] != '/': + raise GsyncdError("cannot resolve sugared url '%s'" % ustr) + ap = os.path.normpath(ustr) + if ap.startswith('//'): + ap = ap[1:] + return "file://" + ap + + +def gethostbyname(hnam): + """gethostbyname wrapper""" + try: + return socket.gethostbyname(hnam) + except socket.gaierror: + ex = sys.exc_info()[1] + raise GsyncdError("failed to resolve %s: %s" % + (hnam, ex.strerror)) + + +def parse_url(ustr): + """instantiate an url object by scheme-to-class dispatch + + The url classes taken into consideration are the ones in + this module whose names are full-caps. + """ + m = UrlRX.match(ustr) + if not m: + ustr = desugar(ustr) + m = UrlRX.match(ustr) + if not m: + raise GsyncdError("malformed url") + sch, path = m.groups() + this = sys.modules[__name__] + if not hasattr(this, sch.upper()): + raise GsyncdError("unknown url scheme " + sch) + return getattr(this, sch.upper())(path) + + +class _MetaXattr(object): + + """singleton class, a lazy wrapper around the + libcxattr module + + libcxattr (a heavy import due to ctypes) is + loaded only when when the single + instance is tried to be used. + + This reduces runtime for those invocations + which do not need filesystem manipulation + (eg. for config, url parsing) + """ + + def __getattr__(self, meth): + from libcxattr import Xattr as LXattr + xmeth = [m for m in dir(LXattr) if m[0] != '_'] + if not meth in xmeth: + return + for m in xmeth: + setattr(self, m, getattr(LXattr, m)) + return getattr(self, meth) + + +class _MetaChangelog(object): + + def __getattr__(self, meth): + from libgfchangelog import Changes as LChanges + xmeth = [m for m in dir(LChanges) if m[0] != '_'] + if not meth in xmeth: + return + for m in xmeth: + setattr(self, m, getattr(LChanges, m)) + return getattr(self, meth) + +Xattr = _MetaXattr() +Changes = _MetaChangelog() + + +class Popen(subprocess.Popen): + + """customized subclass of subprocess.Popen with a ring + buffer for children error output""" + + @classmethod + def init_errhandler(cls): + """start the thread which handles children's error output""" + cls.errstore = {} + + def tailer(): + while True: + errstore = cls.errstore.copy() + try: + poe, _, _ = select( + [po.stderr for po in errstore], [], [], 1) + except (ValueError, SelectError): + continue + for po in errstore: + if po.stderr not in poe: + continue + po.lock.acquire() + try: + if po.on_death_row: + continue + la = errstore[po] + try: + fd = po.stderr.fileno() + except ValueError: # file is already closed + continue + l = os.read(fd, 1024) + if not l: + continue + tots = len(l) + for lx in la: + tots += len(lx) + while tots > 1 << 20 and la: + tots -= len(la.pop(0)) + la.append(l) + finally: + po.lock.release() + t = syncdutils.Thread(target=tailer) + t.start() + cls.errhandler = t + + @classmethod + def fork(cls): + """fork wrapper that restarts errhandler thread in child""" + pid = os.fork() + if not pid: + cls.init_errhandler() + return pid + + def __init__(self, args, *a, **kw): + """customizations for subprocess.Popen instantiation + + - 'close_fds' is taken to be the default + - if child's stderr is chosen to be managed, + register it with the error handler thread + """ + self.args = args + if 'close_fds' not in kw: + kw['close_fds'] = True + self.lock = threading.Lock() + self.on_death_row = False + try: + sup(self, args, *a, **kw) + except: + ex = sys.exc_info()[1] + if not isinstance(ex, OSError): + raise + raise GsyncdError("""execution of "%s" failed with %s (%s)""" % + (args[0], errno.errorcode[ex.errno], + os.strerror(ex.errno))) + if kw.get('stderr') == subprocess.PIPE: + assert(getattr(self, 'errhandler', None)) + self.errstore[self] = [] + + def errlog(self): + """make a log about child's failure event""" + filling = "" + if self.elines: + filling = ", saying:" + logging.error("""command "%s" returned with %s%s""" % + (" ".join(self.args), repr(self.returncode), filling)) + lp = '' + + def logerr(l): + logging.error(self.args[0] + "> " + l) + for l in self.elines: + ls = l.split('\n') + ls[0] = lp + ls[0] + lp = ls.pop() + for ll in ls: + logerr(ll) + if lp: + logerr(lp) + + def errfail(self): + """fail nicely if child did not terminate with success""" + self.errlog() + syncdutils.finalize(exval=1) + + def terminate_geterr(self, fail_on_err=True): + """kill child, finalize stderr harvesting (unregister + from errhandler, set up .elines), fail on error if + asked for + """ + self.lock.acquire() + try: + self.on_death_row = True + finally: + self.lock.release() + elines = self.errstore.pop(self) + if self.poll() is None: + self.terminate() + if self.poll() is None: + time.sleep(0.1) + self.kill() + self.wait() + while True: + if not select([self.stderr], [], [], 0.1)[0]: + break + b = os.read(self.stderr.fileno(), 1024) + if b: + elines.append(b) + else: + break + self.stderr.close() + self.elines = elines + if fail_on_err and self.returncode != 0: + self.errfail() + + +class Server(object): + + """singleton implemening those filesystem access primitives + which are needed for geo-replication functionality + + (Singleton in the sense it's a class which has only static + and classmethods and is used directly, without instantiation.) + """ + + GX_NSPACE_PFX = (privileged() and "trusted" or "system") + GX_NSPACE = GX_NSPACE_PFX + ".glusterfs" + NTV_FMTSTR = "!" + "B" * 19 + "II" + FRGN_XTRA_FMT = "I" + FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT + GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' + + # for backend gfid fetch, do not use GX_NSPACE_PFX + GFID_XATTR = 'trusted.gfid' + GFID_FMTSTR = "!" + "B" * 16 + + local_path = '' + + @classmethod + def _fmt_mknod(cls, l): + return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) + + @classmethod + def _fmt_mkdir(cls, l): + return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) + + @classmethod + def _fmt_symlink(cls, l1, l2): + return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1) + + def _pathguard(f): + """decorator method that checks + the path argument of the decorated + functions to make sure it does not + point out of the managed tree + """ + + fc = funcode(f) + pi = list(fc.co_varnames).index('path') + + def ff(*a): + path = a[pi] + ps = path.split('/') + if path[0] == '/' or '..' in ps: + raise ValueError('unsafe path') + a = list(a) + a[pi] = os.path.join(a[0].local_path, path) + return f(*a) + return ff + + @classmethod + @_pathguard + def entries(cls, path): + """directory entries in an array""" + # prevent symlinks being followed + if not stat.S_ISDIR(os.lstat(path).st_mode): + raise OSError(ENOTDIR, os.strerror(ENOTDIR)) + return os.listdir(path) + + @classmethod + @_pathguard + def lstat(cls, path): + try: + return os.lstat(path) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise + + @classmethod + @_pathguard + def linkto_check(cls, path): + try: + return not ( + Xattr.lgetxattr_buf(path, + 'trusted.glusterfs.dht.linkto') == '') + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA): + return False + else: + raise + + @classmethod + @_pathguard + def gfid(cls, path): + try: + buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16) + m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join( + ['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) + return '-'.join(m.groups()) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise + + @classmethod + def gfid_mnt(cls, gfidpath): + return errno_wrap(Xattr.lgetxattr, + [gfidpath, 'glusterfs.gfid.string', + cls.GX_GFID_CANONICAL_LEN], [ENOENT]) + + @classmethod + @_pathguard + def purge(cls, path, entries=None): + """force-delete subtrees + + If @entries is not specified, delete + the whole subtree under @path (including + @path). + + Otherwise, @entries should be a + a sequence of children of @path, and + the effect is identical with a joint + @entries-less purge on them, ie. + + for e in entries: + cls.purge(os.path.join(path, e)) + """ + me_also = entries is None + if not entries: + try: + # if it's a symlink, prevent + # following it + try: + os.unlink(path) + return + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EISDIR: + entries = os.listdir(path) + else: + raise + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOTDIR, ENOENT, ELOOP): + try: + os.unlink(path) + return + except OSError: + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return + raise + else: + raise + for e in entries: + cls.purge(os.path.join(path, e)) + if me_also: + os.rmdir(path) + + @classmethod + @_pathguard + def _create(cls, path, ctor): + """path creation backend routine""" + try: + ctor(path) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EEXIST: + cls.purge(path) + return ctor(path) + raise + + @classmethod + @_pathguard + def mkdir(cls, path): + cls._create(path, os.mkdir) + + @classmethod + @_pathguard + def symlink(cls, lnk, path): + cls._create(path, lambda p: os.symlink(lnk, p)) + + @classmethod + @_pathguard + def xtime(cls, path, uuid): + """query xtime extended attribute + + Return xtime of @path for @uuid as a pair of integers. + "Normal" errors due to non-existent @path or extended attribute + are tolerated and errno is returned in such a case. + """ + + try: + val = Xattr.lgetxattr(path, + '.'.join([cls.GX_NSPACE, uuid, 'xtime']), + 8) + return struct.unpack('!II', val) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA, ENOTDIR): + return ex.errno + else: + raise + + @classmethod + @_pathguard + def stime_mnt(cls, path, uuid): + """query xtime extended attribute + + Return xtime of @path for @uuid as a pair of integers. + "Normal" errors due to non-existent @path or extended attribute + are tolerated and errno is returned in such a case. + """ + + try: + val = Xattr.lgetxattr(path, + '.'.join([cls.GX_NSPACE, uuid, 'stime']), + 8) + return struct.unpack('!II', val) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA, ENOTDIR): + return ex.errno + else: + raise + + @classmethod + @_pathguard + def stime(cls, path, uuid): + """query xtime extended attribute + + Return xtime of @path for @uuid as a pair of integers. + "Normal" errors due to non-existent @path or extended attribute + are tolerated and errno is returned in such a case. + """ + + try: + val = Xattr.lgetxattr(path, + '.'.join([cls.GX_NSPACE, uuid, 'stime']), + 8) + return struct.unpack('!II', val) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA, ENOTDIR): + return ex.errno + else: + raise + + @classmethod + def node_uuid(cls, path='.'): + try: + uuid_l = Xattr.lgetxattr_buf( + path, '.'.join([cls.GX_NSPACE, 'node-uuid'])) + return uuid_l[:-1].split(' ') + except OSError: + raise + + @classmethod + @_pathguard + def set_stime(cls, path, uuid, mark): + """set @mark as stime for @uuid on @path""" + Xattr.lsetxattr( + path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), + struct.pack('!II', *mark)) + + @classmethod + @_pathguard + def set_xtime(cls, path, uuid, mark): + """set @mark as xtime for @uuid on @path""" + Xattr.lsetxattr( + path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), + struct.pack('!II', *mark)) + + @classmethod + @_pathguard + def set_xtime_remote(cls, path, uuid, mark): + """ + set @mark as xtime for @uuid on @path + the difference b/w this and set_xtime() being + set_xtime() being overloaded to set the xtime + on the brick (this method sets xtime on the + remote slave) + """ + Xattr.lsetxattr( + path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), + struct.pack('!II', *mark)) + + @classmethod + def entry_ops(cls, entries): + pfx = gauxpfx() + logging.debug('entries: %s' % repr(entries)) + # regular file + + def entry_pack_reg(gf, bn, mo, uid, gid): + blen = len(bn) + return struct.pack(cls._fmt_mknod(blen), + uid, gid, gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + + def entry_pack_reg_stat(gf, bn, st): + blen = len(bn) + mo = st['mode'] + return struct.pack(cls._fmt_mknod(blen), + st['uid'], st['gid'], + gf, mo, bn, + stat.S_IMODE(mo), 0, umask()) + # mkdir + + def entry_pack_mkdir(gf, bn, mo, uid, gid): + blen = len(bn) + return struct.pack(cls._fmt_mkdir(blen), + uid, gid, gf, mo, bn, + stat.S_IMODE(mo), umask()) + # symlink + + def entry_pack_symlink(gf, bn, lnk, st): + blen = len(bn) + llen = len(lnk) + return struct.pack(cls._fmt_symlink(blen, llen), + st['uid'], st['gid'], + gf, st['mode'], bn, lnk) + + def entry_purge(entry, gfid): + # This is an extremely racy code and needs to be fixed ASAP. + # The GFID check here is to be sure that the pargfid/bname + # to be purged is the GFID gotten from the changelog. + # (a stat(changelog_gfid) would also be valid here) + # The race here is between the GFID check and the purge. + disk_gfid = cls.gfid_mnt(entry) + if isinstance(disk_gfid, int): + return + if not gfid == disk_gfid: + return + er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR]) + if isinstance(er, int): + if er == EISDIR: + er = errno_wrap(os.rmdir, [entry], [ENOENT, ENOTEMPTY]) + if er == ENOTEMPTY: + return er + for e in entries: + blob = None + op = e['op'] + gfid = e['gfid'] + entry = e['entry'] + (pg, bname) = entry2pb(entry) + if op in ['RMDIR', 'UNLINK']: + while True: + er = entry_purge(entry, gfid) + if isinstance(er, int): + time.sleep(1) + else: + break + elif op in ['CREATE', 'MKNOD']: + blob = entry_pack_reg( + gfid, bname, e['mode'], e['uid'], e['uid']) + elif op == 'MKDIR': + blob = entry_pack_mkdir( + gfid, bname, e['mode'], e['uid'], e['uid']) + elif op == 'LINK': + slink = os.path.join(pfx, gfid) + st = lstat(slink) + if isinstance(st, int): + (pg, bname) = entry2pb(entry) + blob = entry_pack_reg_stat(gfid, bname, e['stat']) + else: + errno_wrap(os.link, [slink, entry], [ENOENT, EEXIST]) + elif op == 'SYMLINK': + blob = entry_pack_symlink(gfid, bname, e['link'], e['stat']) + elif op == 'RENAME': + en = e['entry1'] + st = lstat(entry) + if isinstance(st, int): + (pg, bname) = entry2pb(en) + blob = entry_pack_reg_stat(gfid, bname, e['stat']) + else: + errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) + if blob: + errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', + blob], + [EEXIST], [ENOENT, ESTALE, EINVAL]) + + @classmethod + def meta_ops(cls, meta_entries): + logging.debug('Meta-entries: %s' % repr(meta_entries)) + for e in meta_entries: + mode = e['stat']['mode'] + uid = e['stat']['uid'] + gid = e['stat']['gid'] + go = e['go'] + errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL]) + errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL]) + + @classmethod + def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0): + Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) + + @classmethod + def changelog_scan(cls): + Changes.cl_scan() + + @classmethod + def changelog_getchanges(cls): + return Changes.cl_getchanges() + + @classmethod + def changelog_done(cls, clfile): + Changes.cl_done(clfile) + + @classmethod + @_pathguard + def setattr(cls, path, adct): + """set file attributes + + @adct is a dict, where 'own', 'mode' and 'times' + keys are looked for and values used to perform + chown, chmod or utimes on @path. + """ + own = adct.get('own') + if own: + os.lchown(path, *own) + mode = adct.get('mode') + if mode: + os.chmod(path, stat.S_IMODE(mode)) + times = adct.get('times') + if times: + os.utime(path, times) + + @staticmethod + def pid(): + return os.getpid() + + last_keep_alive = 0 + + @classmethod + def keep_alive(cls, dct): + """process keepalive messages. + + Return keep-alive counter (number of received keep-alive + messages). + + Now the "keep-alive" message can also have a payload which is + used to set a foreign volume-mark on the underlying file system. + """ + if dct: + key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']]) + val = struct.pack(cls.FRGN_FMTSTR, + *(dct['version'] + + tuple(int(x, 16) + for x in re.findall('(?:[\da-f]){2}', + dct['uuid'])) + + (dct['retval'],) + dct['volume_mark'][0:2] + ( + dct['timeout'],))) + Xattr.lsetxattr('.', key, val) + cls.last_keep_alive += 1 + return cls.last_keep_alive + + @staticmethod + def version(): + """version used in handshake""" + return 1.0 + + +class SlaveLocal(object): + + """mix-in class to implement some factes of a slave server + + ("mix-in" is sort of like "abstract class", ie. it's not + instantiated just included in the ancesty DAG. I use "mix-in" + to indicate that it's not used as an abstract base class, + rather just taken in to implement additional functionality + on the basis of the assumed availability of certain interfaces.) + """ + + def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" + return not remote + + def service_loop(self): + """start a RePCe server serving self's server + + stop servicing if a timeout is configured and got no + keep-alime in that inteval + """ + + if boolify(gconf.use_rsync_xattrs) and not privileged(): + raise GsyncdError( + "using rsync for extended attributes is not supported") + + repce = RepceServer( + self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) + t = syncdutils.Thread(target=lambda: (repce.service_loop(), + syncdutils.finalize())) + t.start() + logging.info("slave listening") + if gconf.timeout and int(gconf.timeout) > 0: + while True: + lp = self.server.last_keep_alive + time.sleep(int(gconf.timeout)) + if lp == self.server.last_keep_alive: + logging.info( + "connection inactive for %d seconds, stopping" % + int(gconf.timeout)) + break + else: + select((), (), ()) + + +class SlaveRemote(object): + + """mix-in class to implement an interface to a remote slave""" + + def connect_remote(self, rargs=[], **opts): + """connects to a remote slave + + Invoke an auxiliary utility (slave gsyncd, possibly wrapped) + which sets up the connection and set up a RePCe client to + communicate throuh its stdio. + """ + slave = opts.get('slave', self.url) + extra_opts = [] + so = getattr(gconf, 'session_owner', None) + if so: + extra_opts += ['--session-owner', so] + if boolify(gconf.use_rsync_xattrs): + extra_opts.append('--use-rsync-xattrs') + po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + + ['-N', '--listen', '--timeout', str(gconf.timeout), + slave], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + gconf.transport = po + return self.start_fd_client(po.stdout, po.stdin, **opts) + + def start_fd_client(self, i, o, **opts): + """set up RePCe client, handshake with server + + It's cut out as a separate method to let + subclasses hook into client startup + """ + self.server = RepceClient(i, o) + rv = self.server.__version__() + exrv = {'proto': repce.repce_version, 'object': Server.version()} + da0 = (rv, exrv) + da1 = ({}, {}) + for i in range(2): + for k, v in da0[i].iteritems(): + da1[i][k] = int(v) + if da1[0] != da1[1]: + raise GsyncdError( + "RePCe major version mismatch: local %s, remote %s" % + (exrv, rv)) + + def rsync(self, files, *args): + """invoke rsync""" + if not files: + raise GsyncdError("no files to sync") + logging.debug("files: " + ", ".join(files)) + argv = gconf.rsync_command.split() + \ + ['-avR0', '--inplace', '--files-from=-', '--super', + '--stats', '--numeric-ids', '--no-implied-dirs'] + \ + gconf.rsync_options.split() + \ + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ + ['.'] + list(args) + po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + for f in files: + po.stdin.write(f) + po.stdin.write('\0') + + po.stdin.close() + po.wait() + po.terminate_geterr(fail_on_err=False) + + return po + + def tarssh(self, files, slaveurl): + """invoke tar+ssh + -z (compress) can be use if needed, but ommitting it now + as it results in wierd error (tar+ssh errors out (errcode: 2) + """ + if not files: + raise GsyncdError("no files to sync") + logging.debug("files: " + ", ".join(files)) + (host, rdir) = slaveurl.split(':') + tar_cmd = ["tar", "-cf", "-", "--files-from", "-"] + ssh_cmd = gconf.ssh_command_tar.split() + \ + [host, "tar", "--overwrite", "-xf", "-", "-C", rdir] + p0 = Popen(tar_cmd, stdout=subprocess.PIPE, + stdin=subprocess.PIPE, stderr=subprocess.PIPE) + p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) + for f in files: + p0.stdin.write(f) + p0.stdin.write('\n') + p0.stdin.close() + + # wait() for tar to terminate, collecting any errors, further + # waiting for transfer to complete + p0.wait() + p0.terminate_geterr(fail_on_err=False) + + p1.wait() + p1.terminate_geterr(fail_on_err=False) + + return p1 + + +class AbstractUrl(object): + + """abstract base class for url scheme classes""" + + def __init__(self, path, pattern): + m = re.search(pattern, path) + if not m: + raise GsyncdError("malformed path") + self.path = path + return m.groups() + + @property + def scheme(self): + return type(self).__name__.lower() + + def canonical_path(self): + return self.path + + def get_url(self, canonical=False, escaped=False): + """format self's url in various styles""" + if canonical: + pa = self.canonical_path() + else: + pa = self.path + u = "://".join((self.scheme, pa)) + if escaped: + u = syncdutils.escape(u) + return u + + @property + def url(self): + return self.get_url() + + + ### Concrete resource classes ### + + +class FILE(AbstractUrl, SlaveLocal, SlaveRemote): + + """scheme class for file:// urls + + can be used to represent a file slave server + on slave side, or interface to a remote file + file server on master side + """ + + class FILEServer(Server): + + """included server flavor""" + pass + + server = FILEServer + + def __init__(self, path): + sup(self, path, '^/') + + def connect(self): + """inhibit the resource beyond""" + os.chdir(self.path) + + def rsync(self, files): + return sup(self, files, self.path) + + +class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): + + """scheme class for gluster:// urls + + can be used to represent a gluster slave server + on slave side, or interface to a remote gluster + slave on master side, or to represent master + (slave-ish features come from the mixins, master + functionality is outsourced to GMaster from master) + """ + + class GLUSTERServer(Server): + + "server enhancements for a glusterfs backend""" + + @classmethod + def _attr_unpack_dict(cls, xattr, extra_fields=''): + """generic volume mark fetching/parsing backed""" + fmt_string = cls.NTV_FMTSTR + extra_fields + buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) + vm = struct.unpack(fmt_string, buf) + m = re.match( + '(.{8})(.{4})(.{4})(.{4})(.{12})', + "".join(['%02x' % x for x in vm[2:18]])) + uuid = '-'.join(m.groups()) + volinfo = {'version': vm[0:2], + 'uuid': uuid, + 'retval': vm[18], + 'volume_mark': vm[19:21], + } + if extra_fields: + return volinfo, vm[-len(extra_fields):] + else: + return volinfo + + @classmethod + def foreign_volume_infos(cls): + """return list of valid (not expired) foreign volume marks""" + dict_list = [] + xattr_list = Xattr.llistxattr_buf('.') + for ele in xattr_list: + if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: + d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) + now = int(time.time()) + if x[0] > now: + logging.debug("volinfo[%s] expires: %d " + "(%d sec later)" % + (d['uuid'], x[0], x[0] - now)) + d['timeout'] = x[0] + dict_list.append(d) + else: + try: + Xattr.lremovexattr('.', ele) + except OSError: + pass + return dict_list + + @classmethod + def native_volume_info(cls): + """get the native volume mark of the underlying gluster volume""" + try: + return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, + 'volume-mark'])) + except OSError: + ex = sys.exc_info()[1] + if ex.errno != ENODATA: + raise + + server = GLUSTERServer + + def __init__(self, path): + self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) + + def canonical_path(self): + return ':'.join([gethostbyname(self.host), self.volume]) + + def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" + return not remote or \ + (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER)) + + class Mounter(object): + + """Abstract base class for mounter backends""" + + def __init__(self, params): + self.params = params + self.mntpt = None + + @classmethod + def get_glusterprog(cls): + return os.path.join(gconf.gluster_command_dir, cls.glusterprog) + + def umount_l(self, d): + """perform lazy umount""" + po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE) + po.wait() + return po + + @classmethod + def make_umount_argv(cls, d): + raise NotImplementedError + + def make_mount_argv(self, *a): + raise NotImplementedError + + def cleanup_mntpt(self, *a): + pass + + def handle_mounter(self, po): + po.wait() + + def inhibit(self, *a): + """inhibit a gluster filesystem + + Mount glusterfs over a temporary mountpoint, + change into the mount, and lazy unmount the + filesystem. + """ + + mpi, mpo = os.pipe() + mh = Popen.fork() + if mh: + os.close(mpi) + fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC) + d = None + margv = self.make_mount_argv(*a) + if self.mntpt: + # mntpt is determined pre-mount + d = self.mntpt + os.write(mpo, d + '\0') + po = Popen(margv, **self.mountkw) + self.handle_mounter(po) + po.terminate_geterr() + logging.debug('auxiliary glusterfs mount in place') + if not d: + # mntpt is determined during mount + d = self.mntpt + os.write(mpo, d + '\0') + os.write(mpo, 'M') + t = syncdutils.Thread(target=lambda: os.chdir(d)) + t.start() + tlim = gconf.starttime + int(gconf.connection_timeout) + while True: + if not t.isAlive(): + break + if time.time() >= tlim: + syncdutils.finalize(exval=1) + time.sleep(1) + os.close(mpo) + _, rv = syncdutils.waitpid(mh, 0) + if rv: + rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ + (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) + logging.warn('stale mount possibly left behind on ' + d) + raise GsyncdError("cleaning up temp mountpoint %s " + "failed with status %d" % + (d, rv)) + else: + rv = 0 + try: + os.setsid() + os.close(mpo) + mntdata = '' + while True: + c = os.read(mpi, 1) + if not c: + break + mntdata += c + if mntdata: + mounted = False + if mntdata[-1] == 'M': + mntdata = mntdata[:-1] + assert(mntdata) + mounted = True + assert(mntdata[-1] == '\0') + mntpt = mntdata[:-1] + assert(mntpt) + if mounted: + po = self.umount_l(mntpt) + po.terminate_geterr(fail_on_err=False) + if po.returncode != 0: + po.errlog() + rv = po.returncode + self.cleanup_mntpt(mntpt) + except: + logging.exception('mount cleanup failure:') + rv = 200 + os._exit(rv) + logging.debug('auxiliary glusterfs mount prepared') + + class DirectMounter(Mounter): + + """mounter backend which calls mount(8), umount(8) directly""" + + mountkw = {'stderr': subprocess.PIPE} + glusterprog = 'glusterfs' + + @staticmethod + def make_umount_argv(d): + return ['umount', '-l', d] + + def make_mount_argv(self): + self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-') + return [self.get_glusterprog()] + \ + ['--' + p for p in self.params] + [self.mntpt] + + def cleanup_mntpt(self, mntpt=None): + if not mntpt: + mntpt = self.mntpt + os.rmdir(mntpt) + + class MountbrokerMounter(Mounter): + + """mounter backend using the mountbroker gluster service""" + + mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} + glusterprog = 'gluster' + + @classmethod + def make_cli_argv(cls): + return [cls.get_glusterprog()] + \ + gconf.gluster_cli_options.split() + ['system::'] + + @classmethod + def make_umount_argv(cls, d): + return cls.make_cli_argv() + ['umount', d, 'lazy'] + + def make_mount_argv(self, label): + return self.make_cli_argv() + \ + ['mount', label, 'user-map-root=' + + syncdutils.getusername()] + self.params + + def handle_mounter(self, po): + self.mntpt = po.stdout.readline()[:-1] + po.stdout.close() + sup(self, po) + if po.returncode != 0: + # if cli terminated with error due to being + # refused by glusterd, what it put + # out on stdout is a diagnostic message + logging.error('glusterd answered: %s' % self.mntpt) + + def connect(self): + """inhibit the resource beyond + + Choose mounting backend (direct or mountbroker), + set up glusterfs parameters and perform the mount + with given backend + """ + + label = getattr(gconf, 'mountbroker', None) + if not label and not privileged(): + label = syncdutils.getusername() + mounter = label and self.MountbrokerMounter or self.DirectMounter + params = gconf.gluster_params.split() + \ + (gconf.gluster_log_level and ['log-level=' + + gconf.gluster_log_level] or []) + \ + ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + + self.host, 'volfile-id=' + self.volume, 'client-pid=-1'] + mounter(params).inhibit(*[l for l in [label] if l]) + + def connect_remote(self, *a, **kw): + sup(self, *a, **kw) + self.slavedir = "/proc/%d/cwd" % self.server.pid() + + def gmaster_instantiate_tuple(self, slave): + """return a tuple of the 'one shot' and the 'main crawl' + class instance""" + return (gmaster_builder('xsync')(self, slave), + gmaster_builder()(self, slave)) + + def service_loop(self, *args): + """enter service loop + + - if slave given, instantiate GMaster and + pass control to that instance, which implements + master behavior + - else do that's what's inherited + """ + if args: + slave = args[0] + if gconf.local_path: + class brickserver(FILE.FILEServer): + local_path = gconf.local_path + aggregated = self.server + + @classmethod + def entries(cls, path): + e = super(brickserver, cls).entries(path) + # on the brick don't mess with /.glusterfs + if path == '.': + try: + e.remove('.glusterfs') + except ValueError: + pass + return e + + @classmethod + def lstat(cls, e): + """ path based backend stat """ + return super(brickserver, cls).lstat(e) + + @classmethod + def gfid(cls, e): + """ path based backend gfid fetch """ + return super(brickserver, cls).gfid(e) + + @classmethod + def linkto_check(cls, e): + return super(brickserver, cls).linkto_check(e) + if gconf.slave_id: + # define {,set_}xtime in slave, thus preempting + # the call to remote, so that it takes data from + # the local brick + slave.server.xtime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.xtime(path, + uuid + '.' + gconf.slave_id) + ), + slave.server) + slave.server.stime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.stime(path, + uuid + '.' + gconf.slave_id) + ), + slave.server) + slave.server.set_stime = types.MethodType( + lambda _self, path, uuid, mark: ( + brickserver.set_stime(path, + uuid + '.' + gconf.slave_id, + mark) + ), + slave.server) + (g1, g2) = self.gmaster_instantiate_tuple(slave) + g1.master.server = brickserver + g2.master.server = brickserver + else: + (g1, g2) = self.gmaster_instantiate_tuple(slave) + g1.master.server.aggregated = gmaster.master.server + g2.master.server.aggregated = gmaster.master.server + # bad bad bad: bad way to do things like this + # need to make this elegant + # register the crawlers and start crawling + g1.register() + g2.register() + g1.crawlwrap(oneshot=True) + g2.crawlwrap() + else: + sup(self, *args) + + def rsync(self, files): + return sup(self, files, self.slavedir) + + def tarssh(self, files): + return sup(self, files, self.slavedir) + + +class SSH(AbstractUrl, SlaveRemote): + + """scheme class for ssh:// urls + + interface to remote slave on master side + implementing an ssh based proxy + """ + + def __init__(self, path): + self.remote_addr, inner_url = sup(self, path, + '^((?:%s@)?%s):(.+)' % + tuple([r.pattern + for r in (UserRX, HostRX)])) + self.inner_rsc = parse_url(inner_url) + self.volume = inner_url[1:] + + @staticmethod + def parse_ssh_address(self): + m = re.match('([^@]+)@(.+)', self.remote_addr) + if m: + u, h = m.groups() + else: + u, h = syncdutils.getusername(), self.remote_addr + self.remotehost = h + return {'user': u, 'host': h} + + def canonical_path(self): + rap = self.parse_ssh_address(self) + remote_addr = '@'.join([rap['user'], gethostbyname(rap['host'])]) + return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) + + def can_connect_to(self, remote): + """determine our position in the connectibility matrix""" + return False + + def start_fd_client(self, *a, **opts): + """customizations for client startup + + - be a no-op if we are to daemonize (client startup is deferred + to post-daemon stage) + - determine target url for rsync after consulting server + """ + if opts.get('deferred'): + return a + sup(self, *a) + ityp = type(self.inner_rsc) + if ityp == FILE: + slavepath = self.inner_rsc.path + elif ityp == GLUSTER: + slavepath = "/proc/%d/cwd" % self.server.pid() + else: + raise NotImplementedError + self.slaveurl = ':'.join([self.remote_addr, slavepath]) + + def connect_remote(self, go_daemon=None): + """connect to inner slave url through outer ssh url + + Wrap the connecting utility in ssh. + + Much care is put into daemonizing: in that case + ssh is started before daemonization, but + RePCe client is to be created after that (as ssh + interactive password auth would be defeated by + a daemonized ssh, while client should be present + only in the final process). In that case the action + is taken apart to two parts, this method is ivoked + once pre-daemon, once post-daemon. Use @go_daemon + to deiced what part to perform. + + [NB. ATM gluster product does not makes use of interactive + authentication.] + """ + if go_daemon == 'done': + return self.start_fd_client(*self.fd_pair) + + syncdutils.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'), + self.remote_addr, + self.inner_rsc.url) + + deferred = go_daemon == 'postconn' + ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + + [self.remote_addr], + slave=self.inner_rsc.url, deferred=deferred) + + if deferred: + # send a message to peer so that we can wait for + # the answer from which we know connection is + # established and we can proceed with daemonization + # (doing that too early robs the ssh passwd prompt...) + # However, we'd better not start the RepceClient + # before daemonization (that's not preserved properly + # in daemon), we just do a an ad-hoc linear put/get. + i, o = ret + inf = os.fdopen(i) + repce.send(o, None, '__repce_version__') + select((inf,), (), ()) + repce.recv(inf) + # hack hack hack: store a global reference to the file + # to save it from getting GC'd which implies closing it + gconf.permanent_handles.append(inf) + self.fd_pair = (i, o) + return 'should' + + def rsync(self, files): + return sup(self, files, '-e', + " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), + *(gconf.rsync_ssh_options.split() + [self.slaveurl])) + + def tarssh(self, files): + return sup(self, files, self.slaveurl) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py new file mode 100644 index 000000000..822d919ec --- /dev/null +++ b/geo-replication/syncdaemon/syncdutils.py @@ -0,0 +1,490 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + +import os +import sys +import pwd +import time +import fcntl +import shutil +import logging +import socket +from threading import Lock, Thread as baseThread +from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED +from errno import EINTR, ENOENT, EPERM, ESTALE, errorcode +from signal import signal, SIGTERM +import select as oselect +from os import waitpid as owaitpid + +try: + from cPickle import PickleError +except ImportError: + # py 3 + from pickle import PickleError + +from gconf import gconf + +try: + # py 3 + from urllib import parse as urllib +except ImportError: + import urllib + +try: + from hashlib import md5 as md5 +except ImportError: + # py 2.4 + from md5 import new as md5 + +# auxillary gfid based access prefix +_CL_AUX_GFID_PFX = ".gfid/" +GF_OP_RETRIES = 20 + + +def escape(s): + """the chosen flavor of string escaping, used all over + to turn whatever data to creatable representation""" + return urllib.quote_plus(s) + + +def unescape(s): + """inverse of .escape""" + return urllib.unquote_plus(s) + + +def norm(s): + if s: + return s.replace('-', '_') + + +def update_file(path, updater, merger=lambda f: True): + """update a file in a transaction-like manner""" + + fr = fw = None + try: + fd = os.open(path, os.O_CREAT | os.O_RDWR) + try: + fr = os.fdopen(fd, 'r+b') + except: + os.close(fd) + raise + fcntl.lockf(fr, fcntl.LOCK_EX) + if not merger(fr): + return + + tmpp = path + '.tmp.' + str(os.getpid()) + fd = os.open(tmpp, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + try: + fw = os.fdopen(fd, 'wb', 0) + except: + os.close(fd) + raise + updater(fw) + os.fsync(fd) + os.rename(tmpp, path) + finally: + for fx in (fr, fw): + if fx: + fx.close() + + +def create_manifest(fname, content): + """ + Create manifest file for SSH Control Path + """ + fd = None + try: + fd = os.open(fname, os.O_CREAT | os.O_RDWR) + try: + os.write(fd, content) + except: + os.close(fd) + raise + finally: + if fd is not None: + os.close(fd) + + +def setup_ssh_ctl(ctld, remote_addr, resource_url): + """ + Setup GConf ssh control path parameters + """ + gconf.ssh_ctl_dir = ctld + content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr, + resource_url) + content_md5 = md5hex(content) + fname = os.path.join(gconf.ssh_ctl_dir, + "%s.mft" % content_md5) + + create_manifest(fname, content) + ssh_ctl_path = os.path.join(gconf.ssh_ctl_dir, + "%s.sock" % content_md5) + gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] + + +def grabfile(fname, content=None): + """open @fname + contest for its fcntl lock + + @content: if given, set the file content to it + """ + # damn those messy open() mode codes + fd = os.open(fname, os.O_CREAT | os.O_RDWR) + f = os.fdopen(fd, 'r+b', 0) + try: + fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB) + except: + ex = sys.exc_info()[1] + f.close() + if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): + # cannot grab, it's taken + return + raise + if content: + try: + f.truncate() + f.write(content) + except: + f.close() + raise + gconf.permanent_handles.append(f) + return f + + +def grabpidfile(fname=None, setpid=True): + """.grabfile customization for pid files""" + if not fname: + fname = gconf.pid_file + content = None + if setpid: + content = str(os.getpid()) + '\n' + return grabfile(fname, content=content) + +final_lock = Lock() + + +def finalize(*a, **kw): + """all those messy final steps we go trough upon termination + + Do away with pidfile, ssh control dir and logging. + """ + final_lock.acquire() + if getattr(gconf, 'pid_file', None): + rm_pidf = gconf.pid_file_owned + if gconf.cpid: + # exit path from parent branch of daemonization + rm_pidf = False + while True: + f = grabpidfile(setpid=False) + if not f: + # child has already taken over pidfile + break + if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: + # child has terminated + rm_pidf = True + break + time.sleep(0.1) + if rm_pidf: + try: + os.unlink(gconf.pid_file) + except: + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + pass + else: + raise + if gconf.ssh_ctl_dir and not gconf.cpid: + shutil.rmtree(gconf.ssh_ctl_dir) + if getattr(gconf, 'state_socket', None): + try: + os.unlink(gconf.state_socket) + except: + if sys.exc_info()[0] == OSError: + pass + if gconf.log_exit: + logging.info("exiting.") + sys.stdout.flush() + sys.stderr.flush() + os._exit(kw.get('exval', 0)) + + +def log_raise_exception(excont): + """top-level exception handler + + Try to some fancy things to cover up we face with an error. + Translate some weird sounding but well understood exceptions + into human-friendly lingo + """ + is_filelog = False + for h in logging.getLogger().handlers: + fno = getattr(getattr(h, 'stream', None), 'fileno', None) + if fno and not os.isatty(fno()): + is_filelog = True + + exc = sys.exc_info()[1] + if isinstance(exc, SystemExit): + excont.exval = exc.code or 0 + raise + else: + logtag = None + if isinstance(exc, GsyncdError): + if is_filelog: + logging.error(exc.args[0]) + sys.stderr.write('failure: ' + exc.args[0] + '\n') + elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \ + ((isinstance(exc, OSError) or isinstance(exc, IOError)) and + exc.errno == EPIPE): + logging.error('connection to peer is broken') + if hasattr(gconf, 'transport'): + gconf.transport.wait() + if gconf.transport.returncode == 127: + logging.warn("!!!!!!!!!!!!!") + logging.warn('!!! getting "No such file or directory" ' + "errors is most likely due to " + "MISCONFIGURATION" + ", please consult https://access.redhat.com" + "/site/documentation/en-US/Red_Hat_Storage" + "/2.1/html/Administration_Guide" + "/chap-User_Guide-Geo_Rep-Preparation-" + "Settingup_Environment.html") + logging.warn("!!!!!!!!!!!!!") + gconf.transport.terminate_geterr() + elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, + ECONNABORTED): + logging.error('glusterfs session went down [%s]', + errorcode[exc.errno]) + else: + logtag = "FAIL" + if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): + logtag = "FULL EXCEPTION TRACE" + if logtag: + logging.exception(logtag + ": ") + sys.stderr.write("failed with %s.\n" % type(exc).__name__) + excont.exval = 1 + sys.exit(excont.exval) + + +class FreeObject(object): + + """wildcard class for which any attribute can be set""" + + def __init__(self, **kw): + for k, v in kw.items(): + setattr(self, k, v) + + +class Thread(baseThread): + + """thread class flavor for gsyncd + + - always a daemon thread + - force exit for whole program if thread + function coughs up an exception + """ + + def __init__(self, *a, **kw): + tf = kw.get('target') + if tf: + def twrap(*aa): + excont = FreeObject(exval=0) + try: + tf(*aa) + except: + try: + log_raise_exception(excont) + finally: + finalize(exval=excont.exval) + kw['target'] = twrap + baseThread.__init__(self, *a, **kw) + self.setDaemon(True) + + +class GsyncdError(Exception): + pass + + +def getusername(uid=None): + if uid is None: + uid = os.geteuid() + return pwd.getpwuid(uid).pw_name + + +def privileged(): + return os.geteuid() == 0 + + +def boolify(s): + """ + Generic string to boolean converter + + return + - Quick return if string 's' is of type bool + - True if it's in true_list + - False if it's in false_list + - Warn if it's not present in either and return False + """ + true_list = ['true', 'yes', '1', 'on'] + false_list = ['false', 'no', '0', 'off'] + + if isinstance(s, bool): + return s + + rv = False + lstr = s.lower() + if lstr in true_list: + rv = True + elif not lstr in false_list: + logging.warn("Unknown string (%s) in string to boolean conversion " + "defaulting to False\n" % (s)) + + return rv + + +def eintr_wrap(func, exc, *a): + """ + wrapper around syscalls resilient to interrupt caused + by signals + """ + while True: + try: + return func(*a) + except exc: + ex = sys.exc_info()[1] + if not ex.args[0] == EINTR: + raise + + +def select(*a): + return eintr_wrap(oselect.select, oselect.error, *a) + + +def waitpid(*a): + return eintr_wrap(owaitpid, OSError, *a) + + +def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): + signal(SIGTERM, hook) + + +def is_host_local(host): + locaddr = False + for ai in socket.getaddrinfo(host, None): + # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators + # /mgmt/glusterd/src/glusterd-utils.c#L125 + if ai[0] == socket.AF_INET: + if ai[-1][0].split(".")[0] == "127": + locaddr = True + break + elif ai[0] == socket.AF_INET6: + if ai[-1][0] == "::1": + locaddr = True + break + else: + continue + try: + # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue, + # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587 + s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP) + except socket.error: + ex = sys.exc_info()[1] + if ex.errno != EPERM: + raise + f = None + try: + f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") + if int(f.read()) != 0: + raise GsyncdError( + "non-local bind is set and not allowed to create " + "raw sockets, cannot determine if %s is local" % host) + s = socket.socket(ai[0], socket.SOCK_DGRAM) + finally: + if f: + f.close() + try: + s.bind(ai[-1]) + locaddr = True + break + except: + pass + s.close() + return locaddr + + +def funcode(f): + fc = getattr(f, 'func_code', None) + if not fc: + # python 3 + fc = f.__code__ + return fc + + +def memoize(f): + fc = funcode(f) + fn = fc.co_name + + def ff(self, *a, **kw): + rv = getattr(self, '_' + fn, None) + if rv is None: + rv = f(self, *a, **kw) + setattr(self, '_' + fn, rv) + return rv + return ff + + +def umask(): + return os.umask(0) + + +def entry2pb(e): + return e.rsplit('/', 1) + + +def gauxpfx(): + return _CL_AUX_GFID_PFX + + +def md5hex(s): + return md5(s).hexdigest() + + +def selfkill(sig=SIGTERM): + os.kill(os.getpid(), sig) + + +def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): + """ wrapper around calls resilient to errnos. + retry in case of ESTALE by default. + """ + nr_tries = 0 + while True: + try: + return call(*arg) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in errnos: + return ex.errno + if not ex.errno in retry_errnos: + raise + nr_tries += 1 + if nr_tries == GF_OP_RETRIES: + # probably a screwed state, cannot do much... + logging.warn('reached maximum retries (%s)...' % repr(arg)) + return + time.sleep(0.250) # retry the call + + +def lstat(e): + try: + return os.lstat(e) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + return ex.errno + else: + raise |
