diff options
author | Aravinda VK <avishwan@redhat.com> | 2014-03-21 12:33:10 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2014-04-07 21:56:55 -0700 |
commit | 238d101e55e067e5afcd43c728884e9ab8d36549 (patch) | |
tree | 60498b107335c0ae526bfa034bd56303406710ab | |
parent | 0c20b17c09b2eca82f3c79013fd3fe1c72a957fd (diff) |
geo-rep: code pep8/flake8 fixes
pep8 is a style guide for python.
http://legacy.python.org/dev/peps/pep-0008/
pep8 can be installed using, `pip install pep8`
Usage: `pep8 <python file>`, For example, `pep8 master.py`
will display all the coding standard errors.
flake8 is used to identify unused imports and other issues
in code.
pip install flake8
cd $GLUSTER_REPO/geo-replication/
flake8 syncdaemon
Updated license headers to each source file.
Change-Id: I01c7d0a6091d21bfa48720e9fb5624b77fa3db4a
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/7311
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Reviewed-by: Prashanth Pai <ppai@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
-rw-r--r-- | geo-replication/syncdaemon/__codecheck.py | 17 | ||||
-rw-r--r-- | geo-replication/syncdaemon/__init__.py | 9 | ||||
-rw-r--r-- | geo-replication/syncdaemon/configinterface.py | 57 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gconf.py | 12 | ||||
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 292 | ||||
-rw-r--r-- | geo-replication/syncdaemon/libcxattr.py | 16 | ||||
-rw-r--r-- | geo-replication/syncdaemon/libgfchangelog.py | 22 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 384 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 78 | ||||
-rw-r--r-- | geo-replication/syncdaemon/repce.py | 52 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 306 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 114 |
12 files changed, 912 insertions, 447 deletions
diff --git a/geo-replication/syncdaemon/__codecheck.py b/geo-replication/syncdaemon/__codecheck.py index e3386afba8b..45dbd26bb64 100644 --- a/geo-replication/syncdaemon/__codecheck.py +++ b/geo-replication/syncdaemon/__codecheck.py @@ -1,10 +1,20 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + import os import os.path import sys import tempfile import shutil -ipd = tempfile.mkdtemp(prefix = 'codecheck-aux') +ipd = tempfile.mkdtemp(prefix='codecheck-aux') try: # add a fake ipaddr module, we don't want to @@ -25,7 +35,7 @@ class IPNetwork(list): if f[-3:] != '.py' or f[0] == '_': continue m = f[:-3] - sys.stdout.write('importing %s ...' % m) + sys.stdout.write('importing %s ...' % m) __import__(m) print(' OK.') @@ -33,7 +43,8 @@ class IPNetwork(list): sys.argv = sys.argv[:1] + a gsyncd = sys.modules['gsyncd'] - for a in [['--help'], ['--version'], ['--canonicalize-escape-url', '/foo']]: + for a in [['--help'], ['--version'], + ['--canonicalize-escape-url', '/foo']]: print('>>> invoking program with args: %s' % ' '.join(a)) pid = os.fork() if not pid: diff --git a/geo-replication/syncdaemon/__init__.py b/geo-replication/syncdaemon/__init__.py index e69de29bb2d..b4648b69645 100644 --- a/geo-replication/syncdaemon/__init__.py +++ b/geo-replication/syncdaemon/__init__.py @@ -0,0 +1,9 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py index 35f754c98c9..c4d47b5dbda 100644 --- a/geo-replication/syncdaemon/configinterface.py +++ b/geo-replication/syncdaemon/configinterface.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + try: import ConfigParser except ImportError: @@ -21,14 +31,25 @@ config_version = 2.0 re_type = type(re.compile('')) - # (SECTION, OPTION, OLD VALUE, NEW VALUE) CONFIGS = ( - ("peersrx . .", "georep_session_working_dir", "", "/var/lib/glusterd/geo-replication/${mastervol}_${remotehost}_${slavevol}/"), - ("peersrx .", "gluster_params", "aux-gfid-mount xlator-option=\*-dht.assert-no-child-down=true", "aux-gfid-mount"), - ("peersrx . .", "ssh_command_tar", "", "ssh -oPasswordAuthentication=no -oStrictHostKeyChecking=no -i /var/lib/glusterd/geo-replication/tar_ssh.pem"), + ("peersrx . .", + "georep_session_working_dir", + "", + "/var/lib/glusterd/geo-replication/${mastervol}_${remotehost}_" + "${slavevol}/"), + ("peersrx .", + "gluster_params", + "aux-gfid-mount xlator-option=\*-dht.assert-no-child-down=true", + "aux-gfid-mount"), + ("peersrx . .", + "ssh_command_tar", + "", + "ssh -oPasswordAuthentication=no -oStrictHostKeyChecking=no " + "-i /var/lib/glusterd/geo-replication/tar_ssh.pem"), ) + def upgrade_config_file(path): config_change = False config = ConfigParser.RawConfigParser() @@ -72,7 +93,9 @@ def upgrade_config_file(path): class MultiDict(object): - """a virtual dict-like class which functions as the union of underlying dicts""" + + """a virtual dict-like class which functions as the union + of underlying dicts""" def __init__(self, *dd): self.dicts = dd @@ -80,14 +103,15 @@ class MultiDict(object): def __getitem__(self, key): val = None for d in self.dicts: - if d.get(key) != None: + if d.get(key) is not None: val = d[key] - if val == None: + if val is None: raise KeyError(key) return val class GConffile(object): + """A high-level interface to ConfigParser which flattens the two-tiered config layout by implenting automatic section dispatch based on initial parameters. @@ -155,7 +179,8 @@ class GConffile(object): return self.get(opt, printValue=False) def section(self, rx=False): - """get the section name of the section representing .peers in .config""" + """get the section name of the section representing .peers + in .config""" peers = self.peers if not peers: peers = ['.', '.'] @@ -209,6 +234,7 @@ class GConffile(object): continue so2[s] = tv tv += 1 + def scmp(x, y): return cmp(*(so2[s] for s in (x, y))) ss.sort(scmp) @@ -218,12 +244,13 @@ class GConffile(object): """update @dct from key/values of ours. key/values are collected from .config by filtering the regexp sections - according to match, and from .section. The values are treated as templates, - which are substituted from .auxdicts and (in case of regexp sections) - match groups. + according to match, and from .section. The values are treated as + templates, which are substituted from .auxdicts and (in case of regexp + sections) match groups. """ if not self.peers: raise GsyncdError('no peers given, cannot select matching options') + def update_from_sect(sect, mud): for k, v in self.config._sections[sect].items(): if k == '__name__': @@ -243,7 +270,7 @@ class GConffile(object): match = False break for j in range(len(m.groups())): - mad['match%d_%d' % (i+1, j+1)] = m.groups()[j] + mad['match%d_%d' % (i + 1, j + 1)] = m.groups()[j] if match: update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts)) if self.config.has_section(self.section()): @@ -255,7 +282,7 @@ class GConffile(object): logic described in .update_to) """ d = {} - self.update_to(d, allow_unresolved = True) + self.update_to(d, allow_unresolved=True) if opt: opt = norm(opt) v = d.get(opt) @@ -283,6 +310,7 @@ class GConffile(object): self.config.add_section(SECT_META) self.config.set(SECT_META, 'version', config_version) return trfn(norm(opt), *a, **kw) + def updateconf(f): self.config.write(f) update_file(self.path, updateconf, mergeconf) @@ -295,7 +323,8 @@ class GConffile(object): # regarding SECT_ORD, cf. ord_sections if not self.config.has_section(SECT_ORD): self.config.add_section(SECT_ORD) - self.config.set(SECT_ORD, sect, len(self.config._sections[SECT_ORD])) + self.config.set( + SECT_ORD, sect, len(self.config._sections[SECT_ORD])) self.config.set(sect, opt, val) return True diff --git a/geo-replication/syncdaemon/gconf.py b/geo-replication/syncdaemon/gconf.py index fe5795f16e2..1fc7c381bc4 100644 --- a/geo-replication/syncdaemon/gconf.py +++ b/geo-replication/syncdaemon/gconf.py @@ -1,6 +1,16 @@ -import os +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + class GConf(object): + """singleton class to store globals shared between gsyncd modules""" diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 6eb62c6b076..426d964de95 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -1,4 +1,13 @@ #!/usr/bin/env python +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# import os import os.path @@ -6,25 +15,27 @@ import glob import sys import time import logging -import signal import shutil import optparse import fcntl import fnmatch from optparse import OptionParser, SUPPRESS_HELP from logging import Logger, handlers -from errno import EEXIST, ENOENT +from errno import ENOENT from ipaddr import IPAddress, IPNetwork from gconf import gconf -from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception -from syncdutils import GsyncdError, select, set_term_handler, privileged, update_file +from syncdutils import FreeObject, norm, grabpidfile, finalize +from syncdutils import log_raise_exception, privileged, update_file +from syncdutils import GsyncdError, select, set_term_handler from configinterface import GConffile, upgrade_config_file import resource from monitor import monitor + class GLogger(Logger): + """Logger customizations for gsyncd. It implements a log format similar to that of glusterfs. @@ -51,7 +62,8 @@ class GLogger(Logger): if lbl: lbl = '(' + lbl + ')' lprm = {'datefmt': "%Y-%m-%d %H:%M:%S", - 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} + 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"} lprm.update(kw) lvl = kw.get('level', logging.INFO) lprm['level'] = lvl @@ -64,7 +76,7 @@ class GLogger(Logger): try: logging_handler = handlers.WatchedFileHandler(lprm['filename']) formatter = logging.Formatter(fmt=lprm['format'], - datefmt=lprm['datefmt']) + datefmt=lprm['datefmt']) logging_handler.setFormatter(formatter) logging.getLogger().addHandler(logging_handler) except AttributeError: @@ -96,6 +108,7 @@ class GLogger(Logger): gconf.log_metadata = lkw gconf.log_exit = True + def startup(**kw): """set up logging, pidfile grabbing, daemonization""" if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn': @@ -144,14 +157,15 @@ def main(): gconf.starttime = time.time() set_term_handler() GLogger.setup() - excont = FreeObject(exval = 0) + excont = FreeObject(exval=0) try: try: main_i() except: log_raise_exception(excont) finally: - finalize(exval = excont.exval) + finalize(exval=excont.exval) + def main_i(): """internal main routine @@ -171,50 +185,71 @@ def main_i(): if val and val != '-': val = os.path.abspath(val) setattr(parser.values, opt.dest, val) + def store_local(opt, optstr, val, parser): rconf[opt.dest] = val + def store_local_curry(val): return lambda o, oo, vx, p: store_local(o, oo, val, p) + def store_local_obj(op, dmake): - return lambda o, oo, vx, p: store_local(o, oo, FreeObject(op=op, **dmake(vx)), p) - - op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") - op.add_option('--gluster-command-dir', metavar='DIR', default='') - op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs) - op.add_option('--gluster-log-level', metavar='LVL') - op.add_option('--gluster-params', metavar='PRMS', default='') - op.add_option('--glusterd-uuid', metavar='UUID', type=str, default='', help=SUPPRESS_HELP) - op.add_option('--gluster-cli-options', metavar='OPTS', default='--log-file=-') - op.add_option('--mountbroker', metavar='LABEL') - op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs) - op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs) - op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs) - op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs) - op.add_option('--state-detail-file', metavar='STATF', type=str, action='callback', callback=store_abs) - op.add_option('--georep-session-working-dir', metavar='STATF', type=str, action='callback', callback=store_abs) - op.add_option('--ignore-deletes', default=False, action='store_true') - op.add_option('--isolated-slave', default=False, action='store_true') - op.add_option('--use-rsync-xattrs', default=False, action='store_true') - op.add_option('-L', '--log-level', metavar='LVL') - op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0])) - op.add_option('--volume-id', metavar='UUID') - op.add_option('--slave-id', metavar='ID') - op.add_option('--session-owner', metavar='ID') - op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='') - op.add_option('--local-path', metavar='PATH', help=SUPPRESS_HELP, default='') - op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') - op.add_option('--ssh-command-tar', metavar='CMD', default='ssh') - op.add_option('--rsync-command', metavar='CMD', default='rsync') - op.add_option('--rsync-options', metavar='OPTS', default='') - op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') - op.add_option('--timeout', metavar='SEC', type=int, default=120) - op.add_option('--connection-timeout', metavar='SEC', type=int, default=60, help=SUPPRESS_HELP) - op.add_option('--sync-jobs', metavar='N', type=int, default=3) - op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) - op.add_option('--allow-network', metavar='IPS', default='') - op.add_option('--socketdir', metavar='DIR') - op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) - op.add_option('--checkpoint', metavar='LABEL', default='') + return lambda o, oo, vx, p: store_local( + o, oo, FreeObject(op=op, **dmake(vx)), p) + + op = OptionParser( + usage="%prog [options...] <master> <slave>", version="%prog 0.0.1") + op.add_option('--gluster-command-dir', metavar='DIR', default='') + op.add_option('--gluster-log-file', metavar='LOGF', + default=os.devnull, type=str, action='callback', + callback=store_abs) + op.add_option('--gluster-log-level', metavar='LVL') + op.add_option('--gluster-params', metavar='PRMS', default='') + op.add_option( + '--glusterd-uuid', metavar='UUID', type=str, default='', + help=SUPPRESS_HELP) + op.add_option( + '--gluster-cli-options', metavar='OPTS', default='--log-file=-') + op.add_option('--mountbroker', metavar='LABEL') + op.add_option('-p', '--pid-file', metavar='PIDF', type=str, + action='callback', callback=store_abs) + op.add_option('-l', '--log-file', metavar='LOGF', type=str, + action='callback', callback=store_abs) + op.add_option('--log-file-mbr', metavar='LOGF', type=str, + action='callback', callback=store_abs) + op.add_option('--state-file', metavar='STATF', type=str, + action='callback', callback=store_abs) + op.add_option('--state-detail-file', metavar='STATF', + type=str, action='callback', callback=store_abs) + op.add_option('--georep-session-working-dir', metavar='STATF', + type=str, action='callback', callback=store_abs) + op.add_option('--ignore-deletes', default=False, action='store_true') + op.add_option('--isolated-slave', default=False, action='store_true') + op.add_option('--use-rsync-xattrs', default=False, action='store_true') + op.add_option('-L', '--log-level', metavar='LVL') + op.add_option('-r', '--remote-gsyncd', metavar='CMD', + default=os.path.abspath(sys.argv[0])) + op.add_option('--volume-id', metavar='UUID') + op.add_option('--slave-id', metavar='ID') + op.add_option('--session-owner', metavar='ID') + op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='') + op.add_option( + '--local-path', metavar='PATH', help=SUPPRESS_HELP, default='') + op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh') + op.add_option('--ssh-command-tar', metavar='CMD', default='ssh') + op.add_option('--rsync-command', metavar='CMD', default='rsync') + op.add_option('--rsync-options', metavar='OPTS', default='') + op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress') + op.add_option('--timeout', metavar='SEC', type=int, default=120) + op.add_option('--connection-timeout', metavar='SEC', + type=int, default=60, help=SUPPRESS_HELP) + op.add_option('--sync-jobs', metavar='N', type=int, default=3) + op.add_option( + '--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) + op.add_option('--allow-network', metavar='IPS', default='') + op.add_option('--socketdir', metavar='DIR') + op.add_option('--state-socket-unencoded', metavar='SOCKF', + type=str, action='callback', callback=store_abs) + op.add_option('--checkpoint', metavar='LABEL', default='') # tunables for failover/failback mechanism: # None - gsyncd behaves as normal # blind - gsyncd works with xtime pairs to identify @@ -225,56 +260,86 @@ def main_i(): op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) # changelog or xtime? (TODO: Change the default) - op.add_option('--change-detector', metavar='MODE', type=str, default='xtime') - # sleep interval for change detection (xtime crawl uses a hardcoded 1 second sleep time) + op.add_option( + '--change-detector', metavar='MODE', type=str, default='xtime') + # sleep interval for change detection (xtime crawl uses a hardcoded 1 + # second sleep time) op.add_option('--change-interval', metavar='SEC', type=int, default=3) # working directory for changelog based mechanism - op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs) + op.add_option('--working-dir', metavar='DIR', type=str, + action='callback', callback=store_abs) op.add_option('--use-tarssh', default=False, action='store_true') - op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) + op.add_option('-c', '--config-file', metavar='CONF', + type=str, action='callback', callback=store_local) # duh. need to specify dest or value will be mapped to None :S - op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True)) - op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local) - op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local) - op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local) - op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True)) - op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont')) - op.add_option('--verify', type=str, dest="verify", action='callback', callback=store_local) - op.add_option('--create', type=str, dest="create", action='callback', callback=store_local) - op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True)) - op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a), - setattr(a[-1].values, 'log_file', '-'), - setattr(a[-1].values, 'log_level', 'DEBUG'))), + op.add_option('--monitor', dest='monitor', action='callback', + callback=store_local_curry(True)) + op.add_option('--resource-local', dest='resource_local', + type=str, action='callback', callback=store_local) + op.add_option('--resource-remote', dest='resource_remote', + type=str, action='callback', callback=store_local) + op.add_option('--feedback-fd', dest='feedback_fd', type=int, + help=SUPPRESS_HELP, action='callback', callback=store_local) + op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, + action='callback', callback=store_local_curry(True)) + op.add_option('-N', '--no-daemon', dest="go_daemon", + action='callback', callback=store_local_curry('dont')) + op.add_option('--verify', type=str, dest="verify", + action='callback', callback=store_local) + op.add_option('--create', type=str, dest="create", + action='callback', callback=store_local) + op.add_option('--delete', dest='delete', action='callback', + callback=store_local_curry(True)) + op.add_option('--debug', dest="go_daemon", action='callback', + callback=lambda *a: (store_local_curry('dont')(*a), + setattr( + a[-1].values, 'log_file', '-'), + setattr(a[-1].values, 'log_level', + 'DEBUG'))), op.add_option('--path', type=str, action='append') for a in ('check', 'get'): - op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback', + op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', + action='callback', callback=store_local_obj(a, lambda vx: {'opt': vx})) - op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_obj('get', lambda vx: {'opt': None})) + op.add_option('--config-get-all', dest='config', action='callback', + callback=store_local_obj('get', lambda vx: {'opt': None})) for m in ('', '-rx', '-glob'): # call this code 'Pythonic' eh? - # have to define a one-shot local function to be able to inject (a value depending on the) + # have to define a one-shot local function to be able + # to inject (a value depending on the) # iteration variable into the inner lambda def conf_mod_opt_regex_variant(rx): - op.add_option('--config-set' + m, metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback', - callback=store_local_obj('set', lambda vx: {'opt': vx[0], 'val': vx[1], 'rx': rx})) - op.add_option('--config-del' + m, metavar='OPT', type=str, dest='config', action='callback', - callback=store_local_obj('del', lambda vx: {'opt': vx, 'rx': rx})) + op.add_option('--config-set' + m, metavar='OPT VAL', type=str, + nargs=2, dest='config', action='callback', + callback=store_local_obj('set', lambda vx: { + 'opt': vx[0], 'val': vx[1], 'rx': rx})) + op.add_option('--config-del' + m, metavar='OPT', type=str, + dest='config', action='callback', + callback=store_local_obj('del', lambda vx: { + 'opt': vx, 'rx': rx})) conf_mod_opt_regex_variant(m and m[1:] or False) - op.add_option('--normalize-url', dest='url_print', action='callback', callback=store_local_curry('normal')) - op.add_option('--canonicalize-url', dest='url_print', action='callback', callback=store_local_curry('canon')) - op.add_option('--canonicalize-escape-url', dest='url_print', action='callback', callback=store_local_curry('canon_esc')) - - tunables = [ norm(o.get_opt_string()[2:]) for o in op.option_list if o.callback in (store_abs, 'store_true', None) and o.get_opt_string() not in ('--version', '--help') ] - remote_tunables = [ 'listen', 'go_daemon', 'timeout', 'session_owner', 'config_file', 'use_rsync_xattrs' ] - rq_remote_tunables = { 'listen': True } - - # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults - # -- for this to work out we need to tell apart defaults from explicitly set - # options... so churn out the defaults here and call the parser with virgin - # values container. + op.add_option('--normalize-url', dest='url_print', + action='callback', callback=store_local_curry('normal')) + op.add_option('--canonicalize-url', dest='url_print', + action='callback', callback=store_local_curry('canon')) + op.add_option('--canonicalize-escape-url', dest='url_print', + action='callback', callback=store_local_curry('canon_esc')) + + tunables = [norm(o.get_opt_string()[2:]) + for o in op.option_list + if (o.callback in (store_abs, 'store_true', None) and + o.get_opt_string() not in ('--version', '--help'))] + remote_tunables = ['listen', 'go_daemon', 'timeout', + 'session_owner', 'config_file', 'use_rsync_xattrs'] + rq_remote_tunables = {'listen': True} + + # precedence for sources of values: 1) commandline, 2) cfg file, 3) + # defaults for this to work out we need to tell apart defaults from + # explicitly set options... so churn out the defaults here and call + # the parser with virgin values container. defaults = op.get_default_values() opts, args = op.parse_args(values=optparse.Values()) args_orig = args[:] @@ -291,9 +356,9 @@ def main_i(): args.append(None) args[1] = r confdata = rconf.get('config') - if not (len(args) == 2 or \ - (len(args) == 1 and rconf.get('listen')) or \ - (len(args) <= 2 and confdata) or \ + if not (len(args) == 2 or + (len(args) == 1 and rconf.get('listen')) or + (len(args) <= 2 and confdata) or rconf.get('url_print')): sys.stderr.write("error: incorrect number of arguments\n\n") sys.stderr.write(op.get_usage() + "\n") @@ -301,8 +366,8 @@ def main_i(): verify = rconf.get('verify') if verify: - logging.info (verify) - logging.info ("Able to spawn gsyncd.py") + logging.info(verify) + logging.info("Able to spawn gsyncd.py") return restricted = os.getenv('_GSYNCD_RESTRICTED_') @@ -313,14 +378,17 @@ def main_i(): allopts.update(rconf) bannedtuns = set(allopts.keys()) - set(remote_tunables) if bannedtuns: - raise GsyncdError('following tunables cannot be set with restricted SSH invocaton: ' + \ + raise GsyncdError('following tunables cannot be set with ' + 'restricted SSH invocaton: ' + ', '.join(bannedtuns)) for k, v in rq_remote_tunables.items(): if not k in allopts or allopts[k] != v: - raise GsyncdError('tunable %s is not set to value %s required for restricted SSH invocaton' % \ + raise GsyncdError('tunable %s is not set to value %s required ' + 'for restricted SSH invocaton' % (k, v)) confrx = getattr(confdata, 'rx', None) + def makersc(aa, check=True): if not aa: return ([], None, None) @@ -330,12 +398,13 @@ def main_i(): if len(ra) > 1: remote = ra[1] if check and not local.can_connect_to(remote): - raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path)) + raise GsyncdError("%s cannot work with %s" % + (local.path, remote and remote.path)) return (ra, local, remote) if confrx: # peers are regexen, don't try to parse them if confrx == 'glob': - args = [ '\A' + fnmatch.translate(a) for a in args ] + args = ['\A' + fnmatch.translate(a) for a in args] canon_peers = args namedict = {} else: @@ -345,21 +414,24 @@ def main_i(): for r in rscs: print(r.get_url(**{'normal': {}, 'canon': {'canonical': True}, - 'canon_esc': {'canonical': True, 'escaped': True}}[dc])) + 'canon_esc': {'canonical': True, + 'escaped': True}}[dc])) return pa = ([], [], []) - urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True}) + urlprms = ( + {}, {'canonical': True}, {'canonical': True, 'escaped': True}) for x in rscs: for i in range(len(pa)): pa[i].append(x.get_url(**urlprms[i])) _, canon_peers, canon_esc_peers = pa - # creating the namedict, a dict representing various ways of referring to / repreenting - # peers to be fillable in config templates - mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) + # creating the namedict, a dict representing various ways of referring + # to / repreenting peers to be fillable in config templates + mods = (lambda x: x, lambda x: x[ + 0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:]) if remote: - rmap = { local: ('local', 'master'), remote: ('remote', 'slave') } + rmap = {local: ('local', 'master'), remote: ('remote', 'slave')} else: - rmap = { local: ('local', 'slave') } + rmap = {local: ('local', 'slave')} namedict = {} for i in range(len(rscs)): x = rscs[i] @@ -370,10 +442,13 @@ def main_i(): if name == 'remote': namedict['remotehost'] = x.remotehost if not 'config_file' in rconf: - rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd_template.conf") + rconf['config_file'] = os.path.join( + os.path.dirname(sys.argv[0]), "conf/gsyncd_template.conf") upgrade_config_file(rconf['config_file']) - gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict) + gcnf = GConffile( + rconf['config_file'], canon_peers, + defaults.__dict__, opts.__dict__, namedict) checkpoint_change = False if confdata: @@ -407,7 +482,7 @@ def main_i(): delete = rconf.get('delete') if delete: - logging.info ('geo-replication delete') + logging.info('geo-replication delete') # Delete pid file, status file, socket file cleanup_paths = [] if getattr(gconf, 'pid_file', None): @@ -422,7 +497,7 @@ def main_i(): if getattr(gconf, 'state_socket_unencoded', None): cleanup_paths.append(gconf.state_socket_unencoded) - cleanup_paths.append(rconf['config_file'][:-11] + "*"); + cleanup_paths.append(rconf['config_file'][:-11] + "*") # Cleanup changelog working dirs if getattr(gconf, 'working_dir', None): @@ -432,7 +507,9 @@ def main_i(): if sys.exc_info()[1].errno == ENOENT: pass else: - raise GsyncdError('Error while removing working dir: %s' % gconf.working_dir) + raise GsyncdError( + 'Error while removing working dir: %s' % + gconf.working_dir) for path in cleanup_paths: # To delete temp files @@ -443,10 +520,11 @@ def main_i(): if restricted and gconf.allow_network: ssh_conn = os.getenv('SSH_CONNECTION') if not ssh_conn: - #legacy env var + # legacy env var ssh_conn = os.getenv('SSH_CLIENT') if ssh_conn: - allowed_networks = [ IPNetwork(a) for a in gconf.allow_network.split(',') ] + allowed_networks = [IPNetwork(a) + for a in gconf.allow_network.split(',')] client_ip = IPAddress(ssh_conn.split()[0]) allowed = False for nw in allowed_networks: @@ -460,7 +538,7 @@ def main_i(): if ffd: fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC) - #normalize loglevel + # normalize loglevel lvl0 = gconf.log_level if isinstance(lvl0, str): lvl1 = lvl0.upper() @@ -519,7 +597,7 @@ def main_i(): if be_monitor: label = 'monitor' elif remote: - #master + # master label = gconf.local_path else: label = 'slave' diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py index b5b6956aea6..e6035e26b43 100644 --- a/geo-replication/syncdaemon/libcxattr.py +++ b/geo-replication/syncdaemon/libcxattr.py @@ -1,8 +1,20 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + import os -from ctypes import * +from ctypes import CDLL, c_int, create_string_buffer from ctypes.util import find_library + class Xattr(object): + """singleton that wraps the extended attribues system interface for python using ctypes @@ -40,7 +52,7 @@ class Xattr(object): @classmethod def lgetxattr(cls, path, attr, siz=0): - return cls._query_xattr( path, siz, 'lgetxattr', attr) + return cls._query_xattr(path, siz, 'lgetxattr', attr) @classmethod def lgetxattr_buf(cls, path, attr): diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py index 68ec3baf144..ec563b36f29 100644 --- a/geo-replication/syncdaemon/libgfchangelog.py +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -1,7 +1,18 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + import os -from ctypes import * +from ctypes import CDLL, create_string_buffer, get_errno from ctypes.util import find_library + class Changes(object): libgfc = CDLL(find_library("gfchangelog"), use_errno=True) @@ -19,9 +30,10 @@ class Changes(object): return getattr(cls.libgfc, call) @classmethod - def cl_register(cls, brick, path, log_file, log_level, retries = 0): + def cl_register(cls, brick, path, log_file, log_level, retries=0): ret = cls._get_api('gf_changelog_register')(brick, path, - log_file, log_level, retries) + log_file, + log_level, retries) if ret == -1: cls.raise_oserr() @@ -49,8 +61,8 @@ class Changes(object): while True: ret = call(buf, 4096) if ret in (0, -1): - break; - changes.append(buf.raw[:ret-1]) + break + changes.append(buf.raw[:ret - 1]) if ret == -1: cls.raise_oserr() # cleanup tracker diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 98a61bc1d75..4301396f9f4 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -1,25 +1,30 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + import os import sys import time import stat -import random -import signal import json import logging import socket import string import errno -from shutil import copyfileobj -from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode -from threading import currentThread, Condition, Lock +from errno import ENOENT, ENODATA, EPIPE, EEXIST +from threading import Condition, Lock from datetime import datetime -from libcxattr import Xattr - from gconf import gconf -from tempfile import mkdtemp, NamedTemporaryFile -from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ - unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ - lstat, errno_wrap, update_file +from tempfile import NamedTemporaryFile +from syncdutils import Thread, GsyncdError, boolify, escape +from syncdutils import unescape, select, gauxpfx, md5hex, selfkill +from syncdutils import lstat, errno_wrap URXTIME = (-1, 0) @@ -27,18 +32,20 @@ URXTIME = (-1, 0) # of the DRY principle (no, don't look for elevated or # perspectivistic things here) + def _xtime_now(): t = time.time() sec = int(t) nsec = int((t - sec) * 1000000) return (sec, nsec) + def _volinfo_hook_relax_foreign(self): volinfo_sys = self.get_sys_volinfo() fgn_vi = volinfo_sys[self.KFGN] if fgn_vi: expiry = fgn_vi['timeout'] - int(time.time()) + 1 - logging.info('foreign volume info found, waiting %d sec for expiry' % \ + logging.info('foreign volume info found, waiting %d sec for expiry' % expiry) time.sleep(expiry) volinfo_sys = self.get_sys_volinfo() @@ -58,10 +65,14 @@ def gmaster_builder(excrawl=None): logging.info('setting up %s change detection mode' % changemixin) modemixin = getattr(this, modemixin.capitalize() + 'Mixin') crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') - sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin - purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin + sendmarkmixin = boolify( + gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin + purgemixin = boolify( + gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine - class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine): + + class _GMaster(crawlmixin, modemixin, sendmarkmixin, + purgemixin, syncengine): pass return _GMaster @@ -71,6 +82,7 @@ def gmaster_builder(excrawl=None): # sync modes class NormalMixin(object): + """normal geo-rep behavior""" minus_infinity = URXTIME @@ -152,14 +164,18 @@ class NormalMixin(object): self.slave.server.set_stime(path, self.uuid, mark) # self.slave.server.set_xtime_remote(path, self.uuid, mark) + class PartialMixin(NormalMixin): + """a variant tuned towards operation with a master that has partial info of the slave (brick typically)""" def xtime_reversion_hook(self, path, xtl, xtr): pass + class RecoverMixin(NormalMixin): + """a variant that differs from normal in terms of ignoring non-indexed files""" @@ -178,11 +194,13 @@ class RecoverMixin(NormalMixin): # Further mixins for certain tunable behaviors + class SendmarkNormalMixin(object): def sendmark_regular(self, *a, **kw): return self.sendmark(*a, **kw) + class SendmarkRsyncMixin(object): def sendmark_regular(self, *a, **kw): @@ -194,19 +212,24 @@ class PurgeNormalMixin(object): def purge_missing(self, path, names): self.slave.server.purge(path, names) + class PurgeNoopMixin(object): def purge_missing(self, path, names): pass + class TarSSHEngine(object): + """Sync engine that uses tar(1) piped over ssh(1) for data transfers. Good for lots of small files. """ + def a_syncdata(self, files): logging.debug('files: %s' % (files)) for f in files: pb = self.syncer.add(f) + def regjob(se, xte, pb): rv = pb.wait() if rv[0]: @@ -228,13 +251,17 @@ class TarSSHEngine(object): self.a_syncdata(files) self.syncdata_wait() + class RsyncEngine(object): + """Sync engine that uses rsync(1) for data transfers""" + def a_syncdata(self, files): logging.debug('files: %s' % (files)) for f in files: logging.debug('candidate for syncing %s' % f) pb = self.syncer.add(f) + def regjob(se, xte, pb): rv = pb.wait() if rv[0]: @@ -258,7 +285,9 @@ class RsyncEngine(object): self.a_syncdata(files) self.syncdata_wait() + class GMasterCommon(object): + """abstract class impementling master role""" KFGN = 0 @@ -269,8 +298,9 @@ class GMasterCommon(object): err out on multiple foreign masters """ - fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \ - self.master.server.aggregated.native_volume_info() + fgn_vis, nat_vi = ( + self.master.server.aggregated.foreign_volume_infos(), + self.master.server.aggregated.native_volume_info()) fgn_vi = None if fgn_vis: if len(fgn_vis) > 1: @@ -316,15 +346,14 @@ class GMasterCommon(object): if getattr(gconf, 'state_detail_file', None): try: with open(gconf.state_detail_file, 'r+') as f: - loaded_data= json.load(f) - diff_data = set(default_data) - set (loaded_data) + loaded_data = json.load(f) + diff_data = set(default_data) - set(loaded_data) if len(diff_data): for i in diff_data: loaded_data[i] = default_data[i] return loaded_data - except (IOError): - ex = sys.exc_info()[1] - logging.warn ('Creating new gconf.state_detail_file.') + except IOError: + logging.warn('Creating new gconf.state_detail_file.') # Create file with initial data try: with open(gconf.state_detail_file, 'wb') as f: @@ -364,7 +393,8 @@ class GMasterCommon(object): # - self.turns is the number of turns since start # - self.total_turns is a limit so that if self.turns reaches it, then # we exit (for diagnostic purposes) - # so, eg., if the master fs changes unceasingly, self.turns will remain 0. + # so, eg., if the master fs changes unceasingly, self.turns will remain + # 0. self.crawls = 0 self.turns = 0 self.total_turns = int(gconf.turns) @@ -394,7 +424,7 @@ class GMasterCommon(object): t.start() def should_crawl(cls): - return (gconf.glusterd_uuid in cls.master.server.node_uuid()) + return gconf.glusterd_uuid in cls.master.server.node_uuid() def register(self): self.register() @@ -416,18 +446,18 @@ class GMasterCommon(object): volinfo_sys = self.volinfo_hook() self.volinfo = volinfo_sys[self.KNAT] inter_master = volinfo_sys[self.KFGN] - logging.info("%s master with volume id %s ..." % \ - (inter_master and "intermediate" or "primary", - self.uuid)) + logging.info("%s master with volume id %s ..." % + (inter_master and "intermediate" or "primary", + self.uuid)) gconf.configinterface.set('volume_id', self.uuid) if self.volinfo: if self.volinfo['retval']: - logging.warn("master cluster's info may not be valid %d" % \ + logging.warn("master cluster's info may not be valid %d" % self.volinfo['retval']) self.start_checkpoint_thread() else: raise GsyncdError("master volinfo unavailable") - self.total_crawl_stats = self.get_initial_crawl_data() + self.total_crawl_stats = self.get_initial_crawl_data() self.lastreport['time'] = time.time() logging.info('crawl interval: %d seconds' % self.sleep_interval) @@ -435,7 +465,7 @@ class GMasterCommon(object): crawl = self.should_crawl() while not self.terminate: if self.start: - logging.debug("... crawl #%d done, took %.6f seconds" % \ + logging.debug("... crawl #%d done, took %.6f seconds" % (self.crawls, time.time() - self.start)) self.start = time.time() should_display_info = self.start - self.lastreport['time'] >= 60 @@ -443,11 +473,11 @@ class GMasterCommon(object): logging.info("%d crawls, %d turns", self.crawls - self.lastreport['crawls'], self.turns - self.lastreport['turns']) - self.lastreport.update(crawls = self.crawls, - turns = self.turns, - time = self.start) + self.lastreport.update(crawls=self.crawls, + turns=self.turns, + time=self.start) t1 = time.time() - if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds + if int(t1 - t0) >= 60: # lets hardcode this check to 60 seconds crawl = self.should_crawl() t0 = t1 self.update_worker_remote_node() @@ -456,11 +486,14 @@ class GMasterCommon(object): # bring up _this_ brick to the cluster stime # which is min of cluster (but max of the replicas) brick_stime = self.xtime('.', self.slave) - cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)])) - logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime))) + cluster_stime = self.master.server.aggregated.stime_mnt( + '.', '.'.join([str(self.uuid), str(gconf.slave_id)])) + logging.debug("Cluster stime: %s | Brick stime: %s" % + (repr(cluster_stime), repr(brick_stime))) if not isinstance(cluster_stime, int): if brick_stime < cluster_stime: - self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) + self.slave.server.set_stime( + self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) time.sleep(5) continue self.update_worker_health("Active") @@ -489,13 +522,14 @@ class GMasterCommon(object): with checkpoint @chkpt""" if xtimish: val = cls.serialize_xtime(val) - gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) + gconf.configinterface.set( + 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) @staticmethod def humantime(*tpair): """format xtime-like (sec, nsec) pair to human readable format""" ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ - strftime("%Y-%m-%d %H:%M:%S") + strftime("%Y-%m-%d %H:%M:%S") if len(tpair) > 1: ts += '.' + str(tpair[1]) return ts @@ -506,7 +540,7 @@ class GMasterCommon(object): years = int(years) days = int(days) - date="" + date = "" m, s = divmod(crawl_time.seconds, 60) h, m = divmod(m, 60) @@ -515,7 +549,8 @@ class GMasterCommon(object): if days != 0: date += "%s %s " % (days, "day" if days == 1 else "days") - date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2)) + date += "%s:%s:%s" % (string.zfill(h, 2), + string.zfill(m, 2), string.zfill(s, 2)) return date def checkpt_service(self, chan, chkpt): @@ -540,16 +575,18 @@ class GMasterCommon(object): if not checkpt_tgt: checkpt_tgt = self.xtime('.') if isinstance(checkpt_tgt, int): - raise GsyncdError("master root directory is unaccessible (%s)", + raise GsyncdError("master root directory is " + "unaccessible (%s)", os.strerror(checkpt_tgt)) self._set_checkpt_param(chkpt, 'target', checkpt_tgt) - logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ + logging.debug("checkpoint target %s has been determined " + "for checkpoint %s" % (repr(checkpt_tgt), chkpt)) # check if the label is 'now' chkpt_lbl = chkpt try: - x1,x2 = chkpt.split(':') + x1, x2 = chkpt.split(':') if x1 == 'now': chkpt_lbl = "as of " + self.humantime(x2) except: @@ -557,41 +594,46 @@ class GMasterCommon(object): completed = self._checkpt_param(chkpt, 'completed', xtimish=False) if completed: completed = tuple(int(x) for x in completed.split('.')) - s,_,_ = select([chan], [], [], (not completed) and 5 or None) + s, _, _ = select([chan], [], [], (not completed) and 5 or None) # either request made and we re-check to not # give back stale data, or we still hunting for completion - if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark: + if (self.native_xtime(checkpt_tgt) and ( + self.native_xtime(checkpt_tgt) < self.volmark)): # indexing has been reset since setting the checkpoint status = "is invalid" else: xtr = self.xtime('.', self.slave) if isinstance(xtr, int): - raise GsyncdError("slave root directory is unaccessible (%s)", + raise GsyncdError("slave root directory is " + "unaccessible (%s)", os.strerror(xtr)) ncompleted = self.xtime_geq(xtr, checkpt_tgt) - if completed and not ncompleted: # stale data - logging.warn("completion time %s for checkpoint %s became stale" % \ + if completed and not ncompleted: # stale data + logging.warn("completion time %s for checkpoint %s " + "became stale" % (self.humantime(*completed), chkpt)) completed = None gconf.configinterface.delete('checkpoint_completed') - if ncompleted and not completed: # just reaching completion + if ncompleted and not completed: # just reaching completion completed = "%.6f" % time.time() - self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) + self._set_checkpt_param( + chkpt, 'completed', completed, xtimish=False) completed = tuple(int(x) for x in completed.split('.')) logging.info("checkpoint %s completed" % chkpt) status = completed and \ - "completed at " + self.humantime(completed[0]) or \ - "not reached yet" + "completed at " + self.humantime(completed[0]) or \ + "not reached yet" if s: conn = None try: conn, _ = chan.accept() try: - conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status)) + conn.send("checkpoint %s is %s\0" % + (chkpt_lbl, status)) except: exc = sys.exc_info()[1] - if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ - exc.errno == EPIPE: + if ((isinstance(exc, OSError) or isinstance( + exc, IOError)) and exc.errno == EPIPE): logging.debug('checkpoint client disconnected') else: raise @@ -602,11 +644,13 @@ class GMasterCommon(object): def start_checkpoint_thread(self): """prepare and start checkpoint service""" if self.checkpoint_thread or not ( - getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None) + getattr(gconf, 'state_socket_unencoded', None) and getattr( + gconf, 'socketdir', None) ): return chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") + state_socket = os.path.join( + gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") try: os.unlink(state_socket) except: @@ -621,9 +665,9 @@ class GMasterCommon(object): def add_job(self, path, label, job, *a, **kw): """insert @job function to job table at @path with @label""" - if self.jobtab.get(path) == None: + if self.jobtab.get(path) is None: self.jobtab[path] = [] - self.jobtab[path].append((label, a, lambda : job(*a, **kw))) + self.jobtab[path].append((label, a, lambda: job(*a, **kw))) def add_failjob(self, path, label): """invoke .add_job with a job that does nothing just fails""" @@ -644,7 +688,7 @@ class GMasterCommon(object): ret = j[-1]() if not ret: succeed = False - if succeed and not args[0] == None: + if succeed and not args[0] is None: self.sendmark(path, *args) return succeed @@ -657,19 +701,21 @@ class GMasterCommon(object): self.slave.server.setattr(path, adct) self.set_slave_xtime(path, mark) + class GMasterChangelogMixin(GMasterCommon): + """ changelog based change detection and syncing """ # index for change type and entry IDX_START = 0 - IDX_END = 2 + IDX_END = 2 - POS_GFID = 0 - POS_TYPE = 1 + POS_GFID = 0 + POS_TYPE = 1 POS_ENTRY1 = -1 - TYPE_META = "M " - TYPE_GFID = "D " + TYPE_META = "M " + TYPE_GFID = "D " TYPE_ENTRY = "E " # flat directory heirarchy for gfid based access @@ -686,18 +732,19 @@ class GMasterChangelogMixin(GMasterCommon): def setup_working_dir(self): workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path)) logfile = os.path.join(workdir, 'changes.log') - logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) + logging.debug('changelog working dir %s (log: %s)' % + (workdir, logfile)) return (workdir, logfile) def process_change(self, change, done, retry): pfx = gauxpfx() - clist = [] + clist = [] entries = [] meta_gfid = set() datas = set() # basic crawl stats: files and bytes - files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} + files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} try: f = open(change, "r") clist = f.readlines() @@ -750,17 +797,19 @@ class GMasterChangelogMixin(GMasterCommon): elif ty in ['CREATE', 'MKDIR', 'MKNOD']: entry_update() # stat information present in the changelog itself - entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\ + entries.append(edct(ty, gfid=gfid, entry=en, + mode=int(ec[2]), uid=int(ec[3]), gid=int(ec[4]))) else: # stat() to get mode and other information go = os.path.join(pfx, gfid) st = lstat(go) if isinstance(st, int): - if ty == 'RENAME': # special hack for renames... + if ty == 'RENAME': # special hack for renames... entries.append(edct('UNLINK', gfid=gfid, entry=en)) else: - logging.debug('file %s got purged in the interim' % go) + logging.debug( + 'file %s got purged in the interim' % go) continue if ty == 'LINK': @@ -771,17 +820,20 @@ class GMasterChangelogMixin(GMasterCommon): if isinstance(rl, int): continue entry_update() - entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) + entries.append( + edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) elif ty == 'RENAME': entry_update() - e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) - entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) + e1 = unescape( + os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) + entries.append( + edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) else: logging.warn('ignoring %s [op %s]' % (gfid, ty)) elif et == self.TYPE_GFID: datas.add(os.path.join(pfx, ec[0])) elif et == self.TYPE_META: - if ec[1] == 'SETATTR': # only setattr's for now... + if ec[1] == 'SETATTR': # only setattr's for now... meta_gfid.add(os.path.join(pfx, ec[0])) else: logging.warn('got invalid changelog type: %s' % (et)) @@ -789,10 +841,10 @@ class GMasterChangelogMixin(GMasterCommon): if not retry: self.update_worker_cumilitive_status(files_pending) # sync namespace - if (entries): + if entries: self.slave.server.entry_ops(entries) # sync metadata - if (meta_gfid): + if meta_gfid: meta_entries = [] for go in meta_gfid: st = lstat(go) @@ -814,22 +866,25 @@ class GMasterChangelogMixin(GMasterCommon): self.skipped_gfid_list = [] self.current_files_skipped_count = 0 - # first, fire all changelog transfers in parallel. entry and metadata - # are performed synchronously, therefore in serial. However at the end - # of each changelog, data is synchronized with syncdata_async() - which - # means it is serial w.r.t entries/metadata of that changelog but - # happens in parallel with data of other changelogs. + # first, fire all changelog transfers in parallel. entry and + # metadata are performed synchronously, therefore in serial. + # However at the end of each changelog, data is synchronized + # with syncdata_async() - which means it is serial w.r.t + # entries/metadata of that changelog but happens in parallel + # with data of other changelogs. for change in changes: logging.debug('processing change %s' % change) self.process_change(change, done, retry) if not retry: - self.turns += 1 # number of changelogs processed in the batch + # number of changelogs processed in the batch + self.turns += 1 - # Now we wait for all the data transfers fired off in the above step - # to complete. Note that this is not ideal either. Ideally we want to - # trigger the entry/meta-data transfer of the next batch while waiting - # for the data transfer of the current batch to finish. + # Now we wait for all the data transfers fired off in the above + # step to complete. Note that this is not ideal either. Ideally + # we want to trigger the entry/meta-data transfer of the next + # batch while waiting for the data transfer of the current batch + # to finish. # Note that the reason to wait for the data transfer (vs doing it # completely in the background and call the changelog_done() @@ -837,10 +892,11 @@ class GMasterChangelogMixin(GMasterCommon): # and prevents a spiraling increase of wait stubs from consuming # unbounded memory and resources. - # update the slave's time with the timestamp of the _last_ changelog - # file time suffix. Since, the changelog prefix time is the time when - # the changelog was rolled over, introduce a tolerence of 1 second to - # counter the small delta b/w the marker update and gettimeofday(). + # update the slave's time with the timestamp of the _last_ + # changelog file time suffix. Since, the changelog prefix time + # is the time when the changelog was rolled over, introduce a + # tolerence of 1 second to counter the small delta b/w the + # marker update and gettimeofday(). # NOTE: this is only for changelog mode, not xsync. # @change is the last changelog (therefore max time for this batch) @@ -856,10 +912,13 @@ class GMasterChangelogMixin(GMasterCommon): retry = True tries += 1 if tries == self.MAX_RETRIES: - logging.warn('changelogs %s could not be processed - moving on...' % \ + logging.warn('changelogs %s could not be processed - ' + 'moving on...' % ' '.join(map(os.path.basename, changes))) - self.update_worker_total_files_skipped(self.current_files_skipped_count) - logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list)) + self.update_worker_total_files_skipped( + self.current_files_skipped_count) + logging.warn('SKIPPED GFID = %s' % + ','.join(self.skipped_gfid_list)) self.update_worker_files_syncd() if done: xtl = (int(change.split('.')[-1]) - 1, 0) @@ -873,7 +932,7 @@ class GMasterChangelogMixin(GMasterCommon): # entry_ops() that failed... so we retry the _whole_ changelog # again. # TODO: remove entry retries when it's gets fixed. - logging.warn('incomplete sync, retrying changelogs: %s' % \ + logging.warn('incomplete sync, retrying changelogs: %s' % ' '.join(map(os.path.basename, changes))) time.sleep(0.5) @@ -884,15 +943,15 @@ class GMasterChangelogMixin(GMasterCommon): self.sendmark(path, stime) def get_worker_status_file(self): - file_name = gconf.local_path+'.status' + file_name = gconf.local_path + '.status' file_name = file_name.replace("/", "_") - worker_status_file = gconf.georep_session_working_dir+file_name + worker_status_file = gconf.georep_session_working_dir + file_name return worker_status_file def update_worker_status(self, key, value): - default_data = {"remote_node":"N/A", - "worker status":"Not Started", - "crawl status":"N/A", + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", "files_syncd": 0, "files_remaining": 0, "bytes_remaining": 0, @@ -909,7 +968,7 @@ class GMasterChangelogMixin(GMasterCommon): f.flush() os.fsync(f.fileno()) except (IOError, OSError, ValueError): - logging.info ('Creating new %s' % worker_status_file) + logging.info('Creating new %s' % worker_status_file) try: with open(worker_status_file, 'wb') as f: default_data[key] = value @@ -920,9 +979,9 @@ class GMasterChangelogMixin(GMasterCommon): raise def update_worker_cumilitive_status(self, files_pending): - default_data = {"remote_node":"N/A", - "worker status":"Not Started", - "crawl status":"N/A", + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", "files_syncd": 0, "files_remaining": 0, "bytes_remaining": 0, @@ -932,8 +991,8 @@ class GMasterChangelogMixin(GMasterCommon): try: with open(worker_status_file, 'r+') as f: loaded_data = json.load(f) - loaded_data['files_remaining'] = files_pending['count'] - loaded_data['bytes_remaining'] = files_pending['bytes'] + loaded_data['files_remaining'] = files_pending['count'] + loaded_data['bytes_remaining'] = files_pending['bytes'] loaded_data['purges_remaining'] = files_pending['purge'] os.ftruncate(f.fileno(), 0) os.lseek(f.fileno(), 0, os.SEEK_SET) @@ -941,11 +1000,11 @@ class GMasterChangelogMixin(GMasterCommon): f.flush() os.fsync(f.fileno()) except (IOError, OSError, ValueError): - logging.info ('Creating new %s' % worker_status_file) + logging.info('Creating new %s' % worker_status_file) try: with open(worker_status_file, 'wb') as f: - default_data['files_remaining'] = files_pending['count'] - default_data['bytes_remaining'] = files_pending['bytes'] + default_data['files_remaining'] = files_pending['count'] + default_data['bytes_remaining'] = files_pending['bytes'] default_data['purges_remaining'] = files_pending['purge'] json.dump(default_data, f) f.flush() @@ -953,24 +1012,24 @@ class GMasterChangelogMixin(GMasterCommon): except: raise - def update_worker_remote_node (self): + def update_worker_remote_node(self): node = sys.argv[-1] node = node.split("@")[-1] remote_node_ip = node.split(":")[0] remote_node_vol = node.split(":")[3] remote_node = remote_node_ip + '::' + remote_node_vol - self.update_worker_status ('remote_node', remote_node) + self.update_worker_status('remote_node', remote_node) - def update_worker_health (self, state): - self.update_worker_status ('worker status', state) + def update_worker_health(self, state): + self.update_worker_status('worker status', state) - def update_worker_crawl_status (self, state): - self.update_worker_status ('crawl status', state) + def update_worker_crawl_status(self, state): + self.update_worker_status('crawl status', state) - def update_worker_files_syncd (self): - default_data = {"remote_node":"N/A", - "worker status":"Not Started", - "crawl status":"N/A", + def update_worker_files_syncd(self): + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", "files_syncd": 0, "files_remaining": 0, "bytes_remaining": 0, @@ -981,8 +1040,8 @@ class GMasterChangelogMixin(GMasterCommon): with open(worker_status_file, 'r+') as f: loaded_data = json.load(f) loaded_data['files_syncd'] += loaded_data['files_remaining'] - loaded_data['files_remaining'] = 0 - loaded_data['bytes_remaining'] = 0 + loaded_data['files_remaining'] = 0 + loaded_data['bytes_remaining'] = 0 loaded_data['purges_remaining'] = 0 os.ftruncate(f.fileno(), 0) os.lseek(f.fileno(), 0, os.SEEK_SET) @@ -990,7 +1049,7 @@ class GMasterChangelogMixin(GMasterCommon): f.flush() os.fsync(f.fileno()) except (IOError, OSError, ValueError): - logging.info ('Creating new %s' % worker_status_file) + logging.info('Creating new %s' % worker_status_file) try: with open(worker_status_file, 'wb') as f: json.dump(default_data, f) @@ -999,19 +1058,19 @@ class GMasterChangelogMixin(GMasterCommon): except: raise - def update_worker_files_remaining (self, state): - self.update_worker_status ('files_remaining', state) + def update_worker_files_remaining(self, state): + self.update_worker_status('files_remaining', state) - def update_worker_bytes_remaining (self, state): - self.update_worker_status ('bytes_remaining', state) + def update_worker_bytes_remaining(self, state): + self.update_worker_status('bytes_remaining', state) - def update_worker_purges_remaining (self, state): - self.update_worker_status ('purges_remaining', state) + def update_worker_purges_remaining(self, state): + self.update_worker_status('purges_remaining', state) - def update_worker_total_files_skipped (self, value): - default_data = {"remote_node":"N/A", - "worker status":"Not Started", - "crawl status":"N/A", + def update_worker_total_files_skipped(self, value): + default_data = {"remote_node": "N/A", + "worker status": "Not Started", + "crawl status": "N/A", "files_syncd": 0, "files_remaining": 0, "bytes_remaining": 0, @@ -1029,7 +1088,7 @@ class GMasterChangelogMixin(GMasterCommon): f.flush() os.fsync(f.fileno()) except (IOError, OSError, ValueError): - logging.info ('Creating new %s' % worker_status_file) + logging.info('Creating new %s' % worker_status_file) try: with open(worker_status_file, 'wb') as f: default_data['total_files_skipped'] = value @@ -1057,9 +1116,12 @@ class GMasterChangelogMixin(GMasterCommon): if changes: if purge_time: logging.info("slave's time: %s" % repr(purge_time)) - processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]] + processed = [x for x in changes + if int(x.split('.')[-1]) < purge_time[0]] for pr in processed: - logging.info('skipping already processed change: %s...' % os.path.basename(pr)) + logging.info( + 'skipping already processed change: %s...' % + os.path.basename(pr)) self.master.server.changelog_done(pr) changes.remove(pr) logging.debug('processing changes %s' % repr(changes)) @@ -1080,7 +1142,9 @@ class GMasterChangelogMixin(GMasterCommon): # control should not reach here raise + class GMasterXsyncMixin(GMasterChangelogMixin): + """ This crawl needs to be xtime based (as of now it's not. this is beacuse we generate CHANGELOG @@ -1091,7 +1155,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): files, hardlinks and symlinks. """ - XSYNC_MAX_ENTRIES = 1<<13 + XSYNC_MAX_ENTRIES = 1 << 13 def register(self): self.counter = 0 @@ -1145,7 +1209,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin): def open(self): try: - self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time()))) + self.xsync_change = os.path.join( + self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time()))) self.fh = open(self.xsync_change, 'w') except IOError: raise @@ -1165,7 +1230,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.put('xsync', self.fname()) self.counter = 0 if not last: - time.sleep(1) # make sure changelogs are 1 second apart + time.sleep(1) # make sure changelogs are 1 second apart self.open() def sync_stime(self, stime=None, last=False): @@ -1207,7 +1272,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): xtr_root = self.xtime('.', self.slave) if isinstance(xtr_root, int): if xtr_root != ENOENT: - logging.warn("slave cluster not returning the " \ + logging.warn("slave cluster not returning the " "correct xtime for root (%d)" % xtr_root) xtr_root = self.minus_infinity xtl = self.xtime(path) @@ -1216,7 +1281,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): xtr = self.xtime(path, self.slave) if isinstance(xtr, int): if xtr != ENOENT: - logging.warn("slave cluster not returning the " \ + logging.warn("slave cluster not returning the " "correct xtime for %s (%d)" % (path, xtr)) xtr = self.minus_infinity xtr = max(xtr, xtr_root) @@ -1235,7 +1300,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin): e = os.path.join(path, e) xte = self.xtime(e) if isinstance(xte, int): - logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) + logging.warn("irregular xtime for %s: %s" % + (e, errno.errorcode[xte])) continue if not self.need_sync(e, xte, xtr): continue @@ -1256,35 +1322,51 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.sync_done(self.stimes, False) self.stimes = [] if stat.S_ISDIR(mo): - self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) + self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str( + st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, + bname))]) self.Xcrawl(e, xtr_root) self.stimes.append((e, xte)) elif stat.S_ISLNK(mo): - self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) + self.write_entry_change( + "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, + bname))]) elif stat.S_ISREG(mo): nlink = st.st_nlink - nlink -= 1 # fixup backend stat link count - # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave - # side will decide if to create the new entry, or to create link. + nlink -= 1 # fixup backend stat link count + # if a file has a hardlink, create a Changelog entry as + # 'LINK' so the slave side will decide if to create the + # new entry, or to create link. if nlink == 1: - self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) + self.write_entry_change("E", + [gfid, 'MKNOD', str(mo), + str(st.st_uid), + str(st.st_gid), + escape(os.path.join( + pargfid, bname))]) else: - self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))]) + self.write_entry_change( + "E", [gfid, 'LINK', escape(os.path.join(pargfid, + bname))]) self.write_entry_change("D", [gfid]) if path == '.': self.stimes.append((path, xtl)) self.sync_done(self.stimes, True) + class BoxClosedErr(Exception): pass + class PostBox(list): + """synchronized collection for storing things thought of as "requests" """ def __init__(self, *a): list.__init__(self, *a) # too bad Python stdlib does not have read/write locks... - # it would suffivce to grab the lock in .append as reader, in .close as writer + # it would suffivce to grab the lock in .append as reader, in .close as + # writer self.lever = Condition() self.open = True self.done = False @@ -1319,7 +1401,9 @@ class PostBox(list): self.open = False self.lever.release() + class Syncer(object): + """a staged queue to relay rsync requests to rsync workers By "staged queue" its meant that when a consumer comes to the diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index b0262ee30a8..8ed6f832618 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + import os import sys import time @@ -9,12 +19,16 @@ from subprocess import PIPE from resource import Popen, FILE, GLUSTER, SSH from threading import Lock from gconf import gconf -from syncdutils import update_file, select, waitpid, set_term_handler, is_host_local, GsyncdError +from syncdutils import update_file, select, waitpid +from syncdutils import set_term_handler, is_host_local, GsyncdError from syncdutils import escape, Thread, finalize, memoize + class Volinfo(object): + def __init__(self, vol, host='localhost', prelude=[]): - po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, 'volume', 'info', vol], + po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, + 'volume', 'info', vol], stdout=PIPE, stderr=PIPE) vix = po.stdout.read() po.wait() @@ -25,7 +39,8 @@ class Volinfo(object): via = '(via %s) ' % prelude.join(' ') else: via = ' ' - raise GsyncdError('getting volume info of %s%s failed with errorcode %s', + raise GsyncdError('getting volume info of %s%s ' + 'failed with errorcode %s', (vol, via, vi.find('opErrno').text)) self.tree = vi self.volume = vol @@ -40,25 +55,27 @@ class Volinfo(object): def bparse(b): host, dirp = b.text.split(':', 2) return {'host': host, 'dir': dirp} - return [ bparse(b) for b in self.get('brick') ] + return [bparse(b) for b in self.get('brick')] @property @memoize def uuid(self): ids = self.get('id') if len(ids) != 1: - raise GsyncdError("volume info of %s obtained from %s: ambiguous uuid", + raise GsyncdError("volume info of %s obtained from %s: " + "ambiguous uuid", self.volume, self.host) return ids[0].text class Monitor(object): + """class which spawns and manages gsyncd workers""" - ST_INIT = 'Initializing...' - ST_STABLE = 'Stable' - ST_FAULTY = 'faulty' - ST_INCON = 'inconsistent' + ST_INIT = 'Initializing...' + ST_STABLE = 'Stable' + ST_FAULTY = 'faulty' + ST_INCON = 'inconsistent' _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON] def __init__(self): @@ -68,7 +85,8 @@ class Monitor(object): def set_state(self, state, w=None): """set the state that can be used by external agents like glusterd for status reporting""" - computestate = lambda: self.state and self._ST_ORD[max(self._ST_ORD.index(s) for s in self.state.values())] + computestate = lambda: self.state and self._ST_ORD[ + max(self._ST_ORD.index(s) for s in self.state.values())] if w: self.lock.acquire() old_state = computestate() @@ -112,14 +130,17 @@ class Monitor(object): self.set_state(self.ST_INIT, w) ret = 0 + def nwait(p, o=0): p2, r = waitpid(p, o) if not p2: return return r + def exit_signalled(s): """ child teminated due to receipt of SIGUSR1 """ return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1)) + def exit_status(s): if os.WIFEXITED(s): return os.WEXITSTATUS(s) @@ -134,7 +155,8 @@ class Monitor(object): os.close(pr) os.execv(sys.executable, argv + ['--feedback-fd', str(pw), '--local-path', w[0], - '--local-id', '.' + escape(w[0]), + '--local-id', + '.' + escape(w[0]), '--resource-remote', w[1]]) self.lock.acquire() cpids.add(cpid) @@ -145,31 +167,31 @@ class Monitor(object): os.close(pr) if so: ret = nwait(cpid, os.WNOHANG) - if ret != None: - logging.info("worker(%s) died before establishing " \ + if ret is not None: + logging.info("worker(%s) died before establishing " "connection" % w[0]) else: logging.debug("worker(%s) connected" % w[0]) while time.time() < t0 + conn_timeout: ret = nwait(cpid, os.WNOHANG) - if ret != None: - logging.info("worker(%s) died in startup " \ + if ret is not None: + logging.info("worker(%s) died in startup " "phase" % w[0]) break time.sleep(1) else: - logging.info("worker(%s) not confirmed in %d sec, " \ + logging.info("worker(%s) not confirmed in %d sec, " "aborting it" % (w[0], conn_timeout)) os.kill(cpid, signal.SIGKILL) ret = nwait(cpid) - if ret == None: + if ret is None: self.set_state(self.ST_STABLE, w) ret = nwait(cpid) if exit_signalled(ret): ret = 0 else: ret = exit_status(ret) - if ret in (0,1): + if ret in (0, 1): self.set_state(self.ST_FAULTY, w) time.sleep(10) self.set_state(self.ST_INCON, w) @@ -194,17 +216,18 @@ class Monitor(object): os.kill(cpid, signal.SIGKILL) self.lock.release() finalize(exval=1) - t = Thread(target = wmon, args=[wx]) + t = Thread(target=wmon, args=[wx]) t.start() ta.append(t) for t in ta: t.join() + def distribute(*resources): master, slave = resources mvol = Volinfo(master.volume, master.host) logging.debug('master bricks: ' + repr(mvol.bricks)) - prelude = [] + prelude = [] si = slave if isinstance(slave, SSH): prelude = gconf.ssh_command.split() + [slave.remote_addr] @@ -221,23 +244,28 @@ def distribute(*resources): raise GsyncdError("unkown slave type " + slave.url) logging.info('slave bricks: ' + repr(sbricks)) if isinstance(si, FILE): - slaves = [ slave.url ] + slaves = [slave.url] else: slavenodes = set(b['host'] for b in sbricks) if isinstance(slave, SSH) and not gconf.isolated_slave: rap = SSH.parse_ssh_address(slave) - slaves = [ 'ssh://' + rap['user'] + '@' + h + ':' + si.url for h in slavenodes ] + slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url + for h in slavenodes] else: - slavevols = [ h + ':' + si.volume for h in slavenodes ] + slavevols = [h + ':' + si.volume for h in slavenodes] if isinstance(slave, SSH): - slaves = [ 'ssh://' + rap.remote_addr + ':' + v for v in slavevols ] + slaves = ['ssh://' + rap.remote_addr + ':' + v + for v in slavevols] else: slaves = slavevols - workerspex = [ (brick['dir'], slaves[idx % len(slaves)]) for idx, brick in enumerate(mvol.bricks) if is_host_local(brick['host']) ] + workerspex = [(brick['dir'], slaves[idx % len(slaves)]) + for idx, brick in enumerate(mvol.bricks) + if is_host_local(brick['host'])] logging.info('worker specs: ' + repr(workerspex)) return workerspex, suuid + def monitor(*resources): """oh yeah, actually Monitor is used as singleton, too""" return Monitor().multiplex(*distribute(*resources)) diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py index 755fb61df48..d7b17dda796 100644 --- a/geo-replication/syncdaemon/repce.py +++ b/geo-replication/syncdaemon/repce.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + import os import sys import time @@ -24,6 +34,7 @@ from syncdutils import Thread, select pickle_proto = -1 repce_version = 1.0 + def ioparse(i, o): if isinstance(i, int): i = os.fdopen(i) @@ -34,6 +45,7 @@ def ioparse(i, o): o = o.fileno() return (i, o) + def send(out, *args): """pickle args and write out wholly in one syscall @@ -43,12 +55,14 @@ def send(out, *args): """ os.write(out, pickle.dumps(args, pickle_proto)) + def recv(inf): """load an object from input stream""" return pickle.load(inf) class RepceServer(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce ... also our homebrewed RPC backend where the transport layer is @@ -95,16 +109,17 @@ class RepceServer(object): if rmeth == '__repce_version__': res = repce_version else: - try: - res = getattr(self.obj, rmeth)(*in_data[2:]) - except: - res = sys.exc_info()[1] - exc = True - logging.exception("call failed: ") + try: + res = getattr(self.obj, rmeth)(*in_data[2:]) + except: + res = sys.exc_info()[1] + exc = True + logging.exception("call failed: ") send(self.out, rid, exc, res) class RepceJob(object): + """class representing message status we can use for waiting on reply""" @@ -137,6 +152,7 @@ class RepceJob(object): class RepceClient(object): + """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce ... also our homebrewed RPC backend where the transport layer is @@ -148,7 +164,7 @@ class RepceClient(object): def __init__(self, i, o): self.inf, self.out = ioparse(i, o) self.jtab = {} - t = Thread(target = self.listen) + t = Thread(target=self.listen) t.start() def listen(self): @@ -177,25 +193,31 @@ class RepceClient(object): return rjob def __call__(self, meth, *args): - """RePCe client is callabe, calling it implements a synchronous remote call + """RePCe client is callabe, calling it implements a synchronous + remote call. - We do a .push with a cbk which does a wakeup upon receiving anwser, then wait - on the RepceJob. + We do a .push with a cbk which does a wakeup upon receiving anwser, + then wait on the RepceJob. """ - rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) + rjob = self.push( + meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)}) exc, res = rjob.wait() if exc: - logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__))) + logging.error('call %s (%s) failed on peer with %s' % + (repr(rjob), meth, str(type(res).__name__))) raise res logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res))) return res class mprx(object): - """method proxy, standard trick to implement rubyesque method_missing - in Python - A class is a closure factory, you know what I mean, or go read some SICP. + """method proxy, standard trick to implement rubyesque + method_missing in Python + + A class is a closure factory, you know what I mean, or go read + some SICP. """ + def __init__(self, ins, meth): self.ins = ins self.meth = meth diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 41add6fb287..2fb6b3078d8 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + import re import os import sys @@ -12,27 +22,31 @@ import logging import tempfile import threading import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR, ENOTEMPTY, ESTALE, EINVAL +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP +from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL from select import error as SelectError from gconf import gconf import repce from repce import RepceServer, RepceClient -from master import gmaster_builder +from master import gmaster_builder import syncdutils from syncdutils import GsyncdError, select, privileged, boolify, funcode from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat -UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') +UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") + def sup(x, *a, **kw): """a rubyesque "super" for python ;) invoke caller method in parent class with given args. """ - return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) + return getattr(super(type(x), x), + sys._getframe(1).f_code.co_name)(*a, **kw) + def desugar(ustr): """transform sugared url strings to standard <scheme>://<urlbody> form @@ -57,15 +71,17 @@ def desugar(ustr): ap = ap[1:] return "file://" + ap + def gethostbyname(hnam): """gethostbyname wrapper""" try: return socket.gethostbyname(hnam) except socket.gaierror: ex = sys.exc_info()[1] - raise GsyncdError("failed to resolve %s: %s" % \ + raise GsyncdError("failed to resolve %s: %s" % (hnam, ex.strerror)) + def parse_url(ustr): """instantiate an url object by scheme-to-class dispatch @@ -86,6 +102,7 @@ def parse_url(ustr): class _MetaXattr(object): + """singleton class, a lazy wrapper around the libcxattr module @@ -100,17 +117,19 @@ class _MetaXattr(object): def __getattr__(self, meth): from libcxattr import Xattr as LXattr - xmeth = [ m for m in dir(LXattr) if m[0] != '_' ] + xmeth = [m for m in dir(LXattr) if m[0] != '_'] if not meth in xmeth: return for m in xmeth: setattr(self, m, getattr(LXattr, m)) return getattr(self, meth) + class _MetaChangelog(object): + def __getattr__(self, meth): from libgfchangelog import Changes as LChanges - xmeth = [ m for m in dir(LChanges) if m[0] != '_' ] + xmeth = [m for m in dir(LChanges) if m[0] != '_'] if not meth in xmeth: return for m in xmeth: @@ -122,6 +141,7 @@ Changes = _MetaChangelog() class Popen(subprocess.Popen): + """customized subclass of subprocess.Popen with a ring buffer for children error output""" @@ -129,11 +149,13 @@ class Popen(subprocess.Popen): def init_errhandler(cls): """start the thread which handles children's error output""" cls.errstore = {} + def tailer(): while True: errstore = cls.errstore.copy() try: - poe, _ ,_ = select([po.stderr for po in errstore], [], [], 1) + poe, _, _ = select( + [po.stderr for po in errstore], [], [], 1) except (ValueError, SelectError): continue for po in errstore: @@ -154,12 +176,12 @@ class Popen(subprocess.Popen): tots = len(l) for lx in la: tots += len(lx) - while tots > 1<<20 and la: + while tots > 1 << 20 and la: tots -= len(la.pop(0)) la.append(l) finally: po.lock.release() - t = syncdutils.Thread(target = tailer) + t = syncdutils.Thread(target=tailer) t.start() cls.errhandler = t @@ -189,8 +211,9 @@ class Popen(subprocess.Popen): ex = sys.exc_info()[1] if not isinstance(ex, OSError): raise - raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \ - (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno))) + raise GsyncdError("""execution of "%s" failed with %s (%s)""" % + (args[0], errno.errorcode[ex.errno], + os.strerror(ex.errno))) if kw.get('stderr') == subprocess.PIPE: assert(getattr(self, 'errhandler', None)) self.errstore[self] = [] @@ -200,9 +223,10 @@ class Popen(subprocess.Popen): filling = "" if self.elines: filling = ", saying:" - logging.error("""command "%s" returned with %s%s""" % \ + logging.error("""command "%s" returned with %s%s""" % (" ".join(self.args), repr(self.returncode), filling)) lp = '' + def logerr(l): logging.error(self.args[0] + "> " + l) for l in self.elines: @@ -217,9 +241,9 @@ class Popen(subprocess.Popen): def errfail(self): """fail nicely if child did not terminate with success""" self.errlog() - syncdutils.finalize(exval = 1) + syncdutils.finalize(exval=1) - def terminate_geterr(self, fail_on_err = True): + def terminate_geterr(self, fail_on_err=True): """kill child, finalize stderr harvesting (unregister from errhandler, set up .elines), fail on error if asked for @@ -230,14 +254,14 @@ class Popen(subprocess.Popen): finally: self.lock.release() elines = self.errstore.pop(self) - if self.poll() == None: + if self.poll() is None: self.terminate() - if self.poll() == None: + if self.poll() is None: time.sleep(0.1) self.kill() self.wait() while True: - if not select([self.stderr],[],[],0.1)[0]: + if not select([self.stderr], [], [], 0.1)[0]: break b = os.read(self.stderr.fileno(), 1024) if b: @@ -251,6 +275,7 @@ class Popen(subprocess.Popen): class Server(object): + """singleton implemening those filesystem access primitives which are needed for geo-replication functionality @@ -260,25 +285,28 @@ class Server(object): GX_NSPACE_PFX = (privileged() and "trusted" or "system") GX_NSPACE = GX_NSPACE_PFX + ".glusterfs" - NTV_FMTSTR = "!" + "B"*19 + "II" + NTV_FMTSTR = "!" + "B" * 19 + "II" FRGN_XTRA_FMT = "I" FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT - GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' + GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' - GFID_XATTR = 'trusted.gfid' # for backend gfid fetch, do not use GX_NSPACE_PFX - GFID_FMTSTR = "!" + "B"*16 + # for backend gfid fetch, do not use GX_NSPACE_PFX + GFID_XATTR = 'trusted.gfid' + GFID_FMTSTR = "!" + "B" * 16 local_path = '' @classmethod def _fmt_mknod(cls, l): - return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l+1) + return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) + @classmethod def _fmt_mkdir(cls, l): - return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l+1) + return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) + @classmethod def _fmt_symlink(cls, l1, l2): - return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1+1, l2+1) + return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1) def _pathguard(f): """decorator method that checks @@ -289,6 +317,7 @@ class Server(object): fc = funcode(f) pi = list(fc.co_varnames).index('path') + def ff(*a): path = a[pi] ps = path.split('/') @@ -308,7 +337,6 @@ class Server(object): raise OSError(ENOTDIR, os.strerror(ENOTDIR)) return os.listdir(path) - @classmethod @_pathguard def lstat(cls, path): @@ -325,7 +353,9 @@ class Server(object): @_pathguard def linkto_check(cls, path): try: - return not (Xattr.lgetxattr_buf(path, 'trusted.glusterfs.dht.linkto') == '') + return not ( + Xattr.lgetxattr_buf(path, + 'trusted.glusterfs.dht.linkto') == '') except (IOError, OSError): ex = sys.exc_info()[1] if ex.errno in (ENOENT, ENODATA): @@ -333,13 +363,13 @@ class Server(object): else: raise - @classmethod @_pathguard def gfid(cls, path): try: buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16) - m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) + m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join( + ['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) return '-'.join(m.groups()) except (IOError, OSError): ex = sys.exc_info()[1] @@ -350,7 +380,9 @@ class Server(object): @classmethod def gfid_mnt(cls, gfidpath): - return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) + return errno_wrap(Xattr.lgetxattr, + [gfidpath, 'glusterfs.gfid.string', + cls.GX_GFID_CANONICAL_LEN], [ENOENT]) @classmethod @_pathguard @@ -369,7 +401,7 @@ class Server(object): for e in entries: cls.purge(os.path.join(path, e)) """ - me_also = entries == None + me_also = entries is None if not entries: try: # if it's a symlink, prevent @@ -435,7 +467,9 @@ class Server(object): """ try: - return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) + val = Xattr.lgetxattr(path, + '.'.join([cls.GX_NSPACE, uuid, 'xtime'])) + return struct.unpack('!II', val, 8) except OSError: ex = sys.exc_info()[1] if ex.errno in (ENOENT, ENODATA, ENOTDIR): @@ -454,7 +488,9 @@ class Server(object): """ try: - return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) + val = Xattr.lgetxattr(path, + '.'.join([cls.GX_NSPACE, uuid, 'stime'])) + return struct.unpack('!II', val, 8) except OSError: ex = sys.exc_info()[1] if ex.errno in (ENOENT, ENODATA, ENOTDIR): @@ -473,7 +509,9 @@ class Server(object): """ try: - return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) + val = Xattr.lgetxattr(path, + '.'.join([cls.GX_NSPACE, uuid, 'stime'])) + return struct.unpack('!II', val, 8) except OSError: ex = sys.exc_info()[1] if ex.errno in (ENOENT, ENODATA, ENOTDIR): @@ -484,7 +522,8 @@ class Server(object): @classmethod def node_uuid(cls, path='.'): try: - uuid_l = Xattr.lgetxattr_buf(path, '.'.join([cls.GX_NSPACE, 'node-uuid'])) + uuid_l = Xattr.lgetxattr_buf( + path, '.'.join([cls.GX_NSPACE, 'node-uuid'])) return uuid_l[:-1].split(' ') except OSError: raise @@ -493,13 +532,17 @@ class Server(object): @_pathguard def set_stime(cls, path, uuid, mark): """set @mark as stime for @uuid on @path""" - Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), struct.pack('!II', *mark)) + Xattr.lsetxattr( + path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), + struct.pack('!II', *mark)) @classmethod @_pathguard def set_xtime(cls, path, uuid, mark): """set @mark as xtime for @uuid on @path""" - Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) + Xattr.lsetxattr( + path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), + struct.pack('!II', *mark)) @classmethod @_pathguard @@ -511,18 +554,22 @@ class Server(object): on the brick (this method sets xtime on the remote slave) """ - Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) + Xattr.lsetxattr( + path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), + struct.pack('!II', *mark)) @classmethod def entry_ops(cls, entries): pfx = gauxpfx() logging.debug('entries: %s' % repr(entries)) # regular file + def entry_pack_reg(gf, bn, mo, uid, gid): blen = len(bn) return struct.pack(cls._fmt_mknod(blen), uid, gid, gf, mo, bn, stat.S_IMODE(mo), 0, umask()) + def entry_pack_reg_stat(gf, bn, st): blen = len(bn) mo = st['mode'] @@ -531,18 +578,21 @@ class Server(object): gf, mo, bn, stat.S_IMODE(mo), 0, umask()) # mkdir + def entry_pack_mkdir(gf, bn, mo, uid, gid): blen = len(bn) return struct.pack(cls._fmt_mkdir(blen), uid, gid, gf, mo, bn, stat.S_IMODE(mo), umask()) - #symlink + # symlink + def entry_pack_symlink(gf, bn, lnk, st): blen = len(bn) llen = len(lnk) return struct.pack(cls._fmt_symlink(blen, llen), st['uid'], st['gid'], gf, st['mode'], bn, lnk) + def entry_purge(entry, gfid): # This is an extremely racy code and needs to be fixed ASAP. # The GFID check here is to be sure that the pargfid/bname @@ -574,9 +624,11 @@ class Server(object): else: break elif op in ['CREATE', 'MKNOD']: - blob = entry_pack_reg(gfid, bname, e['mode'], e['uid'], e['uid']) + blob = entry_pack_reg( + gfid, bname, e['mode'], e['uid'], e['uid']) elif op == 'MKDIR': - blob = entry_pack_mkdir(gfid, bname, e['mode'], e['uid'], e['uid']) + blob = entry_pack_mkdir( + gfid, bname, e['mode'], e['uid'], e['uid']) elif op == 'LINK': slink = os.path.join(pfx, gfid) st = lstat(slink) @@ -596,21 +648,23 @@ class Server(object): else: errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) if blob: - errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [EEXIST], [ENOENT, ESTALE, EINVAL]) + errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', + blob], + [EEXIST], [ENOENT, ESTALE, EINVAL]) @classmethod def meta_ops(cls, meta_entries): logging.debug('Meta-entries: %s' % repr(meta_entries)) for e in meta_entries: mode = e['stat']['mode'] - uid = e['stat']['uid'] - gid = e['stat']['gid'] - go = e['go'] + uid = e['stat']['uid'] + gid = e['stat']['gid'] + go = e['go'] errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL]) errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL]) @classmethod - def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0): + def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0): Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) @classmethod @@ -649,6 +703,7 @@ class Server(object): return os.getpid() last_keep_alive = 0 + @classmethod def keep_alive(cls, dct): """process keepalive messages. @@ -662,9 +717,12 @@ class Server(object): if dct: key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']]) val = struct.pack(cls.FRGN_FMTSTR, - *(dct['version'] + - tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) + - (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],))) + *(dct['version'] + + tuple(int(x, 16) + for x in re.findall('(?:[\da-f]){2}', + dct['uuid'])) + + (dct['retval'],) + dct['volume_mark'][0:2] + ( + dct['timeout'],))) Xattr.lsetxattr('.', key, val) cls.last_keep_alive += 1 return cls.last_keep_alive @@ -676,6 +734,7 @@ class Server(object): class SlaveLocal(object): + """mix-in class to implement some factes of a slave server ("mix-in" is sort of like "abstract class", ie. it's not @@ -697,9 +756,11 @@ class SlaveLocal(object): """ if boolify(gconf.use_rsync_xattrs) and not privileged(): - raise GsyncdError("using rsync for extended attributes is not supported") + raise GsyncdError( + "using rsync for extended attributes is not supported") - repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) + repce = RepceServer( + self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) t = syncdutils.Thread(target=lambda: (repce.service_loop(), syncdutils.finalize())) t.start() @@ -709,12 +770,16 @@ class SlaveLocal(object): lp = self.server.last_keep_alive time.sleep(int(gconf.timeout)) if lp == self.server.last_keep_alive: - logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout)) + logging.info( + "connection inactive for %d seconds, stopping" % + int(gconf.timeout)) break else: select((), (), ()) + class SlaveRemote(object): + """mix-in class to implement an interface to a remote slave""" def connect_remote(self, rargs=[], **opts): @@ -731,9 +796,11 @@ class SlaveRemote(object): extra_opts += ['--session-owner', so] if boolify(gconf.use_rsync_xattrs): extra_opts.append('--use-rsync-xattrs') - po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + \ - ['-N', '--listen', '--timeout', str(gconf.timeout), slave], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + + ['-N', '--listen', '--timeout', str(gconf.timeout), + slave], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) gconf.transport = po return self.start_fd_client(po.stdout, po.stdin, **opts) @@ -752,7 +819,9 @@ class SlaveRemote(object): for k, v in da0[i].iteritems(): da1[i][k] = int(v) if da1[0] != da1[1]: - raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) + raise GsyncdError( + "RePCe major version mismatch: local %s, remote %s" % + (exrv, rv)) def rsync(self, files, *args): """invoke rsync""" @@ -760,17 +829,19 @@ class SlaveRemote(object): raise GsyncdError("no files to sync") logging.debug("files: " + ", ".join(files)) argv = gconf.rsync_command.split() + \ - ['-avR0', '--inplace', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ - gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ - ['.'] + list(args) - po = Popen(argv, stdin=subprocess.PIPE,stderr=subprocess.PIPE) + ['-avR0', '--inplace', '--files-from=-', '--super', + '--stats', '--numeric-ids', '--no-implied-dirs'] + \ + gconf.rsync_options.split() + \ + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ + ['.'] + list(args) + po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE) for f in files: po.stdin.write(f) po.stdin.write('\0') po.stdin.close() po.wait() - po.terminate_geterr(fail_on_err = False) + po.terminate_geterr(fail_on_err=False) return po @@ -784,8 +855,10 @@ class SlaveRemote(object): logging.debug("files: " + ", ".join(files)) (host, rdir) = slaveurl.split(':') tar_cmd = ["tar", "-cf", "-", "--files-from", "-"] - ssh_cmd = gconf.ssh_command_tar.split() + [host, "tar", "--overwrite", "-xf", "-", "-C", rdir] - p0 = Popen(tar_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + ssh_cmd = gconf.ssh_command_tar.split() + \ + [host, "tar", "--overwrite", "-xf", "-", "-C", rdir] + p0 = Popen(tar_cmd, stdout=subprocess.PIPE, + stdin=subprocess.PIPE, stderr=subprocess.PIPE) p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) for f in files: p0.stdin.write(f) @@ -795,14 +868,16 @@ class SlaveRemote(object): # wait() for tar to terminate, collecting any errors, further # waiting for transfer to complete p0.wait() - p0.terminate_geterr(fail_on_err = False) + p0.terminate_geterr(fail_on_err=False) p1.wait() - p1.terminate_geterr(fail_on_err = False) + p1.terminate_geterr(fail_on_err=False) return p1 + class AbstractUrl(object): + """abstract base class for url scheme classes""" def __init__(self, path, pattern): @@ -839,6 +914,7 @@ class AbstractUrl(object): class FILE(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for file:// urls can be used to represent a file slave server @@ -847,6 +923,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote): """ class FILEServer(Server): + """included server flavor""" pass @@ -864,6 +941,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote): class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for gluster:// urls can be used to represent a gluster slave server @@ -874,21 +952,24 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): """ class GLUSTERServer(Server): + "server enhancements for a glusterfs backend""" @classmethod - def _attr_unpack_dict(cls, xattr, extra_fields = ''): + def _attr_unpack_dict(cls, xattr, extra_fields=''): """generic volume mark fetching/parsing backed""" fmt_string = cls.NTV_FMTSTR + extra_fields buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) vm = struct.unpack(fmt_string, buf) - m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]])) + m = re.match( + '(.{8})(.{4})(.{4})(.{4})(.{12})', + "".join(['%02x' % x for x in vm[2:18]])) uuid = '-'.join(m.groups()) - volinfo = { 'version': vm[0:2], - 'uuid' : uuid, - 'retval' : vm[18], - 'volume_mark': vm[19:21], - } + volinfo = {'version': vm[0:2], + 'uuid': uuid, + 'retval': vm[18], + 'volume_mark': vm[19:21], + } if extra_fields: return volinfo, vm[-len(extra_fields):] else: @@ -904,7 +985,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) now = int(time.time()) if x[0] > now: - logging.debug("volinfo[%s] expires: %d (%d sec later)" % \ + logging.debug("volinfo[%s] expires: %d " + "(%d sec later)" % (d['uuid'], x[0], x[0] - now)) d['timeout'] = x[0] dict_list.append(d) @@ -919,7 +1001,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def native_volume_info(cls): """get the native volume mark of the underlying gluster volume""" try: - return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) + return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, + 'volume-mark'])) except OSError: ex = sys.exc_info()[1] if ex.errno != ENODATA: @@ -936,9 +1019,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def can_connect_to(self, remote): """determine our position in the connectibility matrix""" return not remote or \ - (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER)) + (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER)) class Mounter(object): + """Abstract base class for mounter backends""" def __init__(self, params): @@ -1003,7 +1087,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): if not t.isAlive(): break if time.time() >= tlim: - syncdutils.finalize(exval = 1) + syncdutils.finalize(exval=1) time.sleep(1) os.close(mpo) _, rv = syncdutils.waitpid(mh, 0) @@ -1011,7 +1095,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) logging.warn('stale mount possibly left behind on ' + d) - raise GsyncdError("cleaning up temp mountpoint %s failed with status %d" % \ + raise GsyncdError("cleaning up temp mountpoint %s " + "failed with status %d" % (d, rv)) else: rv = 0 @@ -1035,7 +1120,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): assert(mntpt) if mounted: po = self.umount_l(mntpt) - po.terminate_geterr(fail_on_err = False) + po.terminate_geterr(fail_on_err=False) if po.returncode != 0: po.errlog() rv = po.returncode @@ -1047,6 +1132,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): logging.debug('auxiliary glusterfs mount prepared') class DirectMounter(Mounter): + """mounter backend which calls mount(8), umount(8) directly""" mountkw = {'stderr': subprocess.PIPE} @@ -1057,15 +1143,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): return ['umount', '-l', d] def make_mount_argv(self): - self.mntpt = tempfile.mkdtemp(prefix = 'gsyncd-aux-mount-') - return [self.get_glusterprog()] + ['--' + p for p in self.params] + [self.mntpt] + self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-') + return [self.get_glusterprog()] + \ + ['--' + p for p in self.params] + [self.mntpt] - def cleanup_mntpt(self, mntpt = None): + def cleanup_mntpt(self, mntpt=None): if not mntpt: mntpt = self.mntpt os.rmdir(mntpt) class MountbrokerMounter(Mounter): + """mounter backend using the mountbroker gluster service""" mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} @@ -1073,7 +1161,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): @classmethod def make_cli_argv(cls): - return [cls.get_glusterprog()] + gconf.gluster_cli_options.split() + ['system::'] + return [cls.get_glusterprog()] + \ + gconf.gluster_cli_options.split() + ['system::'] @classmethod def make_umount_argv(cls, d): @@ -1081,7 +1170,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def make_mount_argv(self, label): return self.make_cli_argv() + \ - ['mount', label, 'user-map-root=' + syncdutils.getusername()] + self.params + ['mount', label, 'user-map-root=' + + syncdutils.getusername()] + self.params def handle_mounter(self, po): self.mntpt = po.stdout.readline()[:-1] @@ -1106,9 +1196,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): label = syncdutils.getusername() mounter = label and self.MountbrokerMounter or self.DirectMounter params = gconf.gluster_params.split() + \ - (gconf.gluster_log_level and ['log-level=' + gconf.gluster_log_level] or []) + \ - ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + self.host, - 'volfile-id=' + self.volume, 'client-pid=-1'] + (gconf.gluster_log_level and ['log-level=' + + gconf.gluster_log_level] or []) + \ + ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + + self.host, 'volfile-id=' + self.volume, 'client-pid=-1'] mounter(params).inhibit(*[l for l in [label] if l]) def connect_remote(self, *a, **kw): @@ -1116,8 +1207,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): self.slavedir = "/proc/%d/cwd" % self.server.pid() def gmaster_instantiate_tuple(self, slave): - """return a tuple of the 'one shot' and the 'main crawl' class instance""" - return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave)) + """return a tuple of the 'one shot' and the 'main crawl' + class instance""" + return (gmaster_builder('xsync')(self, slave), + gmaster_builder()(self, slave)) def service_loop(self, *args): """enter service loop @@ -1133,6 +1226,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): class brickserver(FILE.FILEServer): local_path = gconf.local_path aggregated = self.server + @classmethod def entries(cls, path): e = super(brickserver, cls).entries(path) @@ -1143,14 +1237,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): except ValueError: pass return e + @classmethod def lstat(cls, e): """ path based backend stat """ return super(brickserver, cls).lstat(e) + @classmethod def gfid(cls, e): """ path based backend gfid fetch """ return super(brickserver, cls).gfid(e) + @classmethod def linkto_check(cls, e): return super(brickserver, cls).linkto_check(e) @@ -1158,9 +1255,25 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): # define {,set_}xtime in slave, thus preempting # the call to remote, so that it takes data from # the local brick - slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server) - slave.server.stime = types.MethodType(lambda _self, path, uuid: brickserver.stime(path, uuid + '.' + gconf.slave_id), slave.server) - slave.server.set_stime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_stime(path, uuid + '.' + gconf.slave_id, mark), slave.server) + slave.server.xtime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.xtime(path, + uuid + '.' + gconf.slave_id) + ), + slave.server) + slave.server.stime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.stime(path, + uuid + '.' + gconf.slave_id) + ), + slave.server) + slave.server.set_stime = types.MethodType( + lambda _self, path, uuid, mark: ( + brickserver.set_stime(path, + uuid + '.' + gconf.slave_id, + mark) + ), + slave.server) (g1, g2) = self.gmaster_instantiate_tuple(slave) g1.master.server = brickserver g2.master.server = brickserver @@ -1186,6 +1299,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): class SSH(AbstractUrl, SlaveRemote): + """scheme class for ssh:// urls interface to remote slave on master side @@ -1194,7 +1308,9 @@ class SSH(AbstractUrl, SlaveRemote): def __init__(self, path): self.remote_addr, inner_url = sup(self, path, - '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) + '^((?:%s@)?%s):(.+)' % + tuple([r.pattern + for r in (UserRX, HostRX)])) self.inner_rsc = parse_url(inner_url) self.volume = inner_url[1:] @@ -1262,7 +1378,8 @@ class SSH(AbstractUrl, SlaveRemote): self.inner_rsc.url) deferred = go_daemon == 'postconn' - ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], + ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred) if deferred: @@ -1285,7 +1402,8 @@ class SSH(AbstractUrl, SlaveRemote): return 'should' def rsync(self, files): - return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), + return sup(self, files, '-e', + " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), *(gconf.rsync_ssh_options.split() + [self.slaveurl])) def tarssh(self, files): diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 1b5684c6d0c..822d919ecb1 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + import os import sys import pwd @@ -7,9 +17,9 @@ import shutil import logging import socket from threading import Lock, Thread as baseThread -from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, ENOENT, EPERM, ESTALE, errorcode -from signal import signal, SIGTERM, SIGKILL -from time import sleep +from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED +from errno import EINTR, ENOENT, EPERM, ESTALE, errorcode +from signal import signal, SIGTERM import select as oselect from os import waitpid as owaitpid @@ -37,25 +47,29 @@ except ImportError: _CL_AUX_GFID_PFX = ".gfid/" GF_OP_RETRIES = 20 + def escape(s): """the chosen flavor of string escaping, used all over to turn whatever data to creatable representation""" return urllib.quote_plus(s) + def unescape(s): """inverse of .escape""" return urllib.unquote_plus(s) + def norm(s): if s: return s.replace('-', '_') -def update_file(path, updater, merger = lambda f: True): + +def update_file(path, updater, merger=lambda f: True): """update a file in a transaction-like manner""" fr = fw = None try: - fd = os.open(path, os.O_CREAT|os.O_RDWR) + fd = os.open(path, os.O_CREAT | os.O_RDWR) try: fr = os.fdopen(fd, 'r+b') except: @@ -66,7 +80,7 @@ def update_file(path, updater, merger = lambda f: True): return tmpp = path + '.tmp.' + str(os.getpid()) - fd = os.open(tmpp, os.O_CREAT|os.O_EXCL|os.O_WRONLY) + fd = os.open(tmpp, os.O_CREAT | os.O_EXCL | os.O_WRONLY) try: fw = os.fdopen(fd, 'wb', 0) except: @@ -80,29 +94,31 @@ def update_file(path, updater, merger = lambda f: True): if fx: fx.close() + def create_manifest(fname, content): """ Create manifest file for SSH Control Path """ fd = None try: - fd = os.open(fname, os.O_CREAT|os.O_RDWR) + fd = os.open(fname, os.O_CREAT | os.O_RDWR) try: os.write(fd, content) except: os.close(fd) raise finally: - if fd != None: + if fd is not None: os.close(fd) + def setup_ssh_ctl(ctld, remote_addr, resource_url): """ Setup GConf ssh control path parameters """ gconf.ssh_ctl_dir = ctld content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr, - resource_url) + resource_url) content_md5 = md5hex(content) fname = os.path.join(gconf.ssh_ctl_dir, "%s.mft" % content_md5) @@ -112,16 +128,17 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url): "%s.sock" % content_md5) gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path] + def grabfile(fname, content=None): """open @fname + contest for its fcntl lock @content: if given, set the file content to it """ # damn those messy open() mode codes - fd = os.open(fname, os.O_CREAT|os.O_RDWR) + fd = os.open(fname, os.O_CREAT | os.O_RDWR) f = os.fdopen(fd, 'r+b', 0) try: - fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB) + fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB) except: ex = sys.exc_info()[1] f.close() @@ -139,6 +156,7 @@ def grabfile(fname, content=None): gconf.permanent_handles.append(f) return f + def grabpidfile(fname=None, setpid=True): """.grabfile customization for pid files""" if not fname: @@ -150,6 +168,7 @@ def grabpidfile(fname=None, setpid=True): final_lock = Lock() + def finalize(*a, **kw): """all those messy final steps we go trough upon termination @@ -169,7 +188,7 @@ def finalize(*a, **kw): if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid: # child has terminated rm_pidf = True - break; + break time.sleep(0.1) if rm_pidf: try: @@ -194,6 +213,7 @@ def finalize(*a, **kw): sys.stderr.flush() os._exit(kw.get('exval', 0)) + def log_raise_exception(excont): """top-level exception handler @@ -218,20 +238,27 @@ def log_raise_exception(excont): logging.error(exc.args[0]) sys.stderr.write('failure: ' + exc.args[0] + '\n') elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \ - ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \ - exc.errno == EPIPE): + ((isinstance(exc, OSError) or isinstance(exc, IOError)) and + exc.errno == EPIPE): logging.error('connection to peer is broken') if hasattr(gconf, 'transport'): gconf.transport.wait() if gconf.transport.returncode == 127: logging.warn("!!!!!!!!!!!!!") - logging.warn('!!! getting "No such file or directory" errors ' - "is most likely due to MISCONFIGURATION, please consult " - "https://access.redhat.com/site/documentation/en-US/Red_Hat_Storage/2.1/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html") + logging.warn('!!! getting "No such file or directory" ' + "errors is most likely due to " + "MISCONFIGURATION" + ", please consult https://access.redhat.com" + "/site/documentation/en-US/Red_Hat_Storage" + "/2.1/html/Administration_Guide" + "/chap-User_Guide-Geo_Rep-Preparation-" + "Settingup_Environment.html") logging.warn("!!!!!!!!!!!!!") gconf.transport.terminate_geterr() - elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED): - logging.error('glusterfs session went down [%s]', errorcode[exc.errno]) + elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, + ECONNABORTED): + logging.error('glusterfs session went down [%s]', + errorcode[exc.errno]) else: logtag = "FAIL" if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG): @@ -244,46 +271,54 @@ def log_raise_exception(excont): class FreeObject(object): + """wildcard class for which any attribute can be set""" def __init__(self, **kw): - for k,v in kw.items(): + for k, v in kw.items(): setattr(self, k, v) + class Thread(baseThread): + """thread class flavor for gsyncd - always a daemon thread - force exit for whole program if thread function coughs up an exception """ + def __init__(self, *a, **kw): tf = kw.get('target') if tf: def twrap(*aa): - excont = FreeObject(exval = 0) + excont = FreeObject(exval=0) try: tf(*aa) except: try: log_raise_exception(excont) finally: - finalize(exval = excont.exval) + finalize(exval=excont.exval) kw['target'] = twrap baseThread.__init__(self, *a, **kw) self.setDaemon(True) + class GsyncdError(Exception): pass -def getusername(uid = None): - if uid == None: + +def getusername(uid=None): + if uid is None: uid = os.geteuid() return pwd.getpwuid(uid).pw_name + def privileged(): return os.geteuid() == 0 + def boolify(s): """ Generic string to boolean converter @@ -294,7 +329,7 @@ def boolify(s): - False if it's in false_list - Warn if it's not present in either and return False """ - true_list = ['true', 'yes', '1', 'on'] + true_list = ['true', 'yes', '1', 'on'] false_list = ['false', 'no', '0', 'off'] if isinstance(s, bool): @@ -305,10 +340,12 @@ def boolify(s): if lstr in true_list: rv = True elif not lstr in false_list: - logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s)) + logging.warn("Unknown string (%s) in string to boolean conversion " + "defaulting to False\n" % (s)) return rv + def eintr_wrap(func, exc, *a): """ wrapper around syscalls resilient to interrupt caused @@ -322,19 +359,24 @@ def eintr_wrap(func, exc, *a): if not ex.args[0] == EINTR: raise + def select(*a): return eintr_wrap(oselect.select, oselect.error, *a) -def waitpid (*a): + +def waitpid(*a): return eintr_wrap(owaitpid, OSError, *a) + def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})): signal(SIGTERM, hook) + def is_host_local(host): locaddr = False for ai in socket.getaddrinfo(host, None): - # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators/mgmt/glusterd/src/glusterd-utils.c#L125 + # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators + # /mgmt/glusterd/src/glusterd-utils.c#L125 if ai[0] == socket.AF_INET: if ai[-1][0].split(".")[0] == "127": locaddr = True @@ -358,8 +400,8 @@ def is_host_local(host): f = open("/proc/sys/net/ipv4/ip_nonlocal_bind") if int(f.read()) != 0: raise GsyncdError( - "non-local bind is set and not allowed to create raw sockets, " - "cannot determine if %s is local" % host) + "non-local bind is set and not allowed to create " + "raw sockets, cannot determine if %s is local" % host) s = socket.socket(ai[0], socket.SOCK_DGRAM) finally: if f: @@ -373,6 +415,7 @@ def is_host_local(host): s.close() return locaddr + def funcode(f): fc = getattr(f, 'func_code', None) if not fc: @@ -380,32 +423,40 @@ def funcode(f): fc = f.__code__ return fc + def memoize(f): fc = funcode(f) fn = fc.co_name + def ff(self, *a, **kw): rv = getattr(self, '_' + fn, None) - if rv == None: + if rv is None: rv = f(self, *a, **kw) setattr(self, '_' + fn, rv) return rv return ff + def umask(): return os.umask(0) + def entry2pb(e): return e.rsplit('/', 1) + def gauxpfx(): return _CL_AUX_GFID_PFX + def md5hex(s): return md5(s).hexdigest() + def selfkill(sig=SIGTERM): os.kill(os.getpid(), sig) + def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): """ wrapper around calls resilient to errnos. retry in case of ESTALE by default. @@ -427,6 +478,7 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): return time.sleep(0.250) # retry the call + def lstat(e): try: return os.lstat(e) |