summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r--geo-replication/syncdaemon/__codecheck.py17
-rw-r--r--geo-replication/syncdaemon/__init__.py9
-rw-r--r--geo-replication/syncdaemon/configinterface.py57
-rw-r--r--geo-replication/syncdaemon/gconf.py12
-rw-r--r--geo-replication/syncdaemon/gsyncd.py292
-rw-r--r--geo-replication/syncdaemon/libcxattr.py16
-rw-r--r--geo-replication/syncdaemon/libgfchangelog.py22
-rw-r--r--geo-replication/syncdaemon/master.py384
-rw-r--r--geo-replication/syncdaemon/monitor.py78
-rw-r--r--geo-replication/syncdaemon/repce.py52
-rw-r--r--geo-replication/syncdaemon/resource.py306
-rw-r--r--geo-replication/syncdaemon/syncdutils.py114
12 files changed, 912 insertions, 447 deletions
diff --git a/geo-replication/syncdaemon/__codecheck.py b/geo-replication/syncdaemon/__codecheck.py
index e3386afba8b..45dbd26bb64 100644
--- a/geo-replication/syncdaemon/__codecheck.py
+++ b/geo-replication/syncdaemon/__codecheck.py
@@ -1,10 +1,20 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
import os.path
import sys
import tempfile
import shutil
-ipd = tempfile.mkdtemp(prefix = 'codecheck-aux')
+ipd = tempfile.mkdtemp(prefix='codecheck-aux')
try:
# add a fake ipaddr module, we don't want to
@@ -25,7 +35,7 @@ class IPNetwork(list):
if f[-3:] != '.py' or f[0] == '_':
continue
m = f[:-3]
- sys.stdout.write('importing %s ...' % m)
+ sys.stdout.write('importing %s ...' % m)
__import__(m)
print(' OK.')
@@ -33,7 +43,8 @@ class IPNetwork(list):
sys.argv = sys.argv[:1] + a
gsyncd = sys.modules['gsyncd']
- for a in [['--help'], ['--version'], ['--canonicalize-escape-url', '/foo']]:
+ for a in [['--help'], ['--version'],
+ ['--canonicalize-escape-url', '/foo']]:
print('>>> invoking program with args: %s' % ' '.join(a))
pid = os.fork()
if not pid:
diff --git a/geo-replication/syncdaemon/__init__.py b/geo-replication/syncdaemon/__init__.py
index e69de29bb2d..b4648b69645 100644
--- a/geo-replication/syncdaemon/__init__.py
+++ b/geo-replication/syncdaemon/__init__.py
@@ -0,0 +1,9 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
diff --git a/geo-replication/syncdaemon/configinterface.py b/geo-replication/syncdaemon/configinterface.py
index 35f754c98c9..c4d47b5dbda 100644
--- a/geo-replication/syncdaemon/configinterface.py
+++ b/geo-replication/syncdaemon/configinterface.py
@@ -1,3 +1,13 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
try:
import ConfigParser
except ImportError:
@@ -21,14 +31,25 @@ config_version = 2.0
re_type = type(re.compile(''))
-
# (SECTION, OPTION, OLD VALUE, NEW VALUE)
CONFIGS = (
- ("peersrx . .", "georep_session_working_dir", "", "/var/lib/glusterd/geo-replication/${mastervol}_${remotehost}_${slavevol}/"),
- ("peersrx .", "gluster_params", "aux-gfid-mount xlator-option=\*-dht.assert-no-child-down=true", "aux-gfid-mount"),
- ("peersrx . .", "ssh_command_tar", "", "ssh -oPasswordAuthentication=no -oStrictHostKeyChecking=no -i /var/lib/glusterd/geo-replication/tar_ssh.pem"),
+ ("peersrx . .",
+ "georep_session_working_dir",
+ "",
+ "/var/lib/glusterd/geo-replication/${mastervol}_${remotehost}_"
+ "${slavevol}/"),
+ ("peersrx .",
+ "gluster_params",
+ "aux-gfid-mount xlator-option=\*-dht.assert-no-child-down=true",
+ "aux-gfid-mount"),
+ ("peersrx . .",
+ "ssh_command_tar",
+ "",
+ "ssh -oPasswordAuthentication=no -oStrictHostKeyChecking=no "
+ "-i /var/lib/glusterd/geo-replication/tar_ssh.pem"),
)
+
def upgrade_config_file(path):
config_change = False
config = ConfigParser.RawConfigParser()
@@ -72,7 +93,9 @@ def upgrade_config_file(path):
class MultiDict(object):
- """a virtual dict-like class which functions as the union of underlying dicts"""
+
+ """a virtual dict-like class which functions as the union
+ of underlying dicts"""
def __init__(self, *dd):
self.dicts = dd
@@ -80,14 +103,15 @@ class MultiDict(object):
def __getitem__(self, key):
val = None
for d in self.dicts:
- if d.get(key) != None:
+ if d.get(key) is not None:
val = d[key]
- if val == None:
+ if val is None:
raise KeyError(key)
return val
class GConffile(object):
+
"""A high-level interface to ConfigParser which flattens the two-tiered
config layout by implenting automatic section dispatch based on initial
parameters.
@@ -155,7 +179,8 @@ class GConffile(object):
return self.get(opt, printValue=False)
def section(self, rx=False):
- """get the section name of the section representing .peers in .config"""
+ """get the section name of the section representing .peers
+ in .config"""
peers = self.peers
if not peers:
peers = ['.', '.']
@@ -209,6 +234,7 @@ class GConffile(object):
continue
so2[s] = tv
tv += 1
+
def scmp(x, y):
return cmp(*(so2[s] for s in (x, y)))
ss.sort(scmp)
@@ -218,12 +244,13 @@ class GConffile(object):
"""update @dct from key/values of ours.
key/values are collected from .config by filtering the regexp sections
- according to match, and from .section. The values are treated as templates,
- which are substituted from .auxdicts and (in case of regexp sections)
- match groups.
+ according to match, and from .section. The values are treated as
+ templates, which are substituted from .auxdicts and (in case of regexp
+ sections) match groups.
"""
if not self.peers:
raise GsyncdError('no peers given, cannot select matching options')
+
def update_from_sect(sect, mud):
for k, v in self.config._sections[sect].items():
if k == '__name__':
@@ -243,7 +270,7 @@ class GConffile(object):
match = False
break
for j in range(len(m.groups())):
- mad['match%d_%d' % (i+1, j+1)] = m.groups()[j]
+ mad['match%d_%d' % (i + 1, j + 1)] = m.groups()[j]
if match:
update_from_sect(sect, MultiDict(dct, mad, *self.auxdicts))
if self.config.has_section(self.section()):
@@ -255,7 +282,7 @@ class GConffile(object):
logic described in .update_to)
"""
d = {}
- self.update_to(d, allow_unresolved = True)
+ self.update_to(d, allow_unresolved=True)
if opt:
opt = norm(opt)
v = d.get(opt)
@@ -283,6 +310,7 @@ class GConffile(object):
self.config.add_section(SECT_META)
self.config.set(SECT_META, 'version', config_version)
return trfn(norm(opt), *a, **kw)
+
def updateconf(f):
self.config.write(f)
update_file(self.path, updateconf, mergeconf)
@@ -295,7 +323,8 @@ class GConffile(object):
# regarding SECT_ORD, cf. ord_sections
if not self.config.has_section(SECT_ORD):
self.config.add_section(SECT_ORD)
- self.config.set(SECT_ORD, sect, len(self.config._sections[SECT_ORD]))
+ self.config.set(
+ SECT_ORD, sect, len(self.config._sections[SECT_ORD]))
self.config.set(sect, opt, val)
return True
diff --git a/geo-replication/syncdaemon/gconf.py b/geo-replication/syncdaemon/gconf.py
index fe5795f16e2..1fc7c381bc4 100644
--- a/geo-replication/syncdaemon/gconf.py
+++ b/geo-replication/syncdaemon/gconf.py
@@ -1,6 +1,16 @@
-import os
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
class GConf(object):
+
"""singleton class to store globals
shared between gsyncd modules"""
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 6eb62c6b076..426d964de95 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -1,4 +1,13 @@
#!/usr/bin/env python
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
import os
import os.path
@@ -6,25 +15,27 @@ import glob
import sys
import time
import logging
-import signal
import shutil
import optparse
import fcntl
import fnmatch
from optparse import OptionParser, SUPPRESS_HELP
from logging import Logger, handlers
-from errno import EEXIST, ENOENT
+from errno import ENOENT
from ipaddr import IPAddress, IPNetwork
from gconf import gconf
-from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception
-from syncdutils import GsyncdError, select, set_term_handler, privileged, update_file
+from syncdutils import FreeObject, norm, grabpidfile, finalize
+from syncdutils import log_raise_exception, privileged, update_file
+from syncdutils import GsyncdError, select, set_term_handler
from configinterface import GConffile, upgrade_config_file
import resource
from monitor import monitor
+
class GLogger(Logger):
+
"""Logger customizations for gsyncd.
It implements a log format similar to that of glusterfs.
@@ -51,7 +62,8 @@ class GLogger(Logger):
if lbl:
lbl = '(' + lbl + ')'
lprm = {'datefmt': "%Y-%m-%d %H:%M:%S",
- 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" + lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"}
+ 'format': "[%(asctime)s.%(nsecs)d] %(lvlnam)s [%(module)s" +
+ lbl + ":%(lineno)s:%(funcName)s] %(ctx)s: %(message)s"}
lprm.update(kw)
lvl = kw.get('level', logging.INFO)
lprm['level'] = lvl
@@ -64,7 +76,7 @@ class GLogger(Logger):
try:
logging_handler = handlers.WatchedFileHandler(lprm['filename'])
formatter = logging.Formatter(fmt=lprm['format'],
- datefmt=lprm['datefmt'])
+ datefmt=lprm['datefmt'])
logging_handler.setFormatter(formatter)
logging.getLogger().addHandler(logging_handler)
except AttributeError:
@@ -96,6 +108,7 @@ class GLogger(Logger):
gconf.log_metadata = lkw
gconf.log_exit = True
+
def startup(**kw):
"""set up logging, pidfile grabbing, daemonization"""
if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn':
@@ -144,14 +157,15 @@ def main():
gconf.starttime = time.time()
set_term_handler()
GLogger.setup()
- excont = FreeObject(exval = 0)
+ excont = FreeObject(exval=0)
try:
try:
main_i()
except:
log_raise_exception(excont)
finally:
- finalize(exval = excont.exval)
+ finalize(exval=excont.exval)
+
def main_i():
"""internal main routine
@@ -171,50 +185,71 @@ def main_i():
if val and val != '-':
val = os.path.abspath(val)
setattr(parser.values, opt.dest, val)
+
def store_local(opt, optstr, val, parser):
rconf[opt.dest] = val
+
def store_local_curry(val):
return lambda o, oo, vx, p: store_local(o, oo, val, p)
+
def store_local_obj(op, dmake):
- return lambda o, oo, vx, p: store_local(o, oo, FreeObject(op=op, **dmake(vx)), p)
-
- op = OptionParser(usage="%prog [options...] <master> <slave>", version="%prog 0.0.1")
- op.add_option('--gluster-command-dir', metavar='DIR', default='')
- op.add_option('--gluster-log-file', metavar='LOGF', default=os.devnull, type=str, action='callback', callback=store_abs)
- op.add_option('--gluster-log-level', metavar='LVL')
- op.add_option('--gluster-params', metavar='PRMS', default='')
- op.add_option('--glusterd-uuid', metavar='UUID', type=str, default='', help=SUPPRESS_HELP)
- op.add_option('--gluster-cli-options', metavar='OPTS', default='--log-file=-')
- op.add_option('--mountbroker', metavar='LABEL')
- op.add_option('-p', '--pid-file', metavar='PIDF', type=str, action='callback', callback=store_abs)
- op.add_option('-l', '--log-file', metavar='LOGF', type=str, action='callback', callback=store_abs)
- op.add_option('--log-file-mbr', metavar='LOGF', type=str, action='callback', callback=store_abs)
- op.add_option('--state-file', metavar='STATF', type=str, action='callback', callback=store_abs)
- op.add_option('--state-detail-file', metavar='STATF', type=str, action='callback', callback=store_abs)
- op.add_option('--georep-session-working-dir', metavar='STATF', type=str, action='callback', callback=store_abs)
- op.add_option('--ignore-deletes', default=False, action='store_true')
- op.add_option('--isolated-slave', default=False, action='store_true')
- op.add_option('--use-rsync-xattrs', default=False, action='store_true')
- op.add_option('-L', '--log-level', metavar='LVL')
- op.add_option('-r', '--remote-gsyncd', metavar='CMD', default=os.path.abspath(sys.argv[0]))
- op.add_option('--volume-id', metavar='UUID')
- op.add_option('--slave-id', metavar='ID')
- op.add_option('--session-owner', metavar='ID')
- op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='')
- op.add_option('--local-path', metavar='PATH', help=SUPPRESS_HELP, default='')
- op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
- op.add_option('--ssh-command-tar', metavar='CMD', default='ssh')
- op.add_option('--rsync-command', metavar='CMD', default='rsync')
- op.add_option('--rsync-options', metavar='OPTS', default='')
- op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress')
- op.add_option('--timeout', metavar='SEC', type=int, default=120)
- op.add_option('--connection-timeout', metavar='SEC', type=int, default=60, help=SUPPRESS_HELP)
- op.add_option('--sync-jobs', metavar='N', type=int, default=3)
- op.add_option('--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
- op.add_option('--allow-network', metavar='IPS', default='')
- op.add_option('--socketdir', metavar='DIR')
- op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs)
- op.add_option('--checkpoint', metavar='LABEL', default='')
+ return lambda o, oo, vx, p: store_local(
+ o, oo, FreeObject(op=op, **dmake(vx)), p)
+
+ op = OptionParser(
+ usage="%prog [options...] <master> <slave>", version="%prog 0.0.1")
+ op.add_option('--gluster-command-dir', metavar='DIR', default='')
+ op.add_option('--gluster-log-file', metavar='LOGF',
+ default=os.devnull, type=str, action='callback',
+ callback=store_abs)
+ op.add_option('--gluster-log-level', metavar='LVL')
+ op.add_option('--gluster-params', metavar='PRMS', default='')
+ op.add_option(
+ '--glusterd-uuid', metavar='UUID', type=str, default='',
+ help=SUPPRESS_HELP)
+ op.add_option(
+ '--gluster-cli-options', metavar='OPTS', default='--log-file=-')
+ op.add_option('--mountbroker', metavar='LABEL')
+ op.add_option('-p', '--pid-file', metavar='PIDF', type=str,
+ action='callback', callback=store_abs)
+ op.add_option('-l', '--log-file', metavar='LOGF', type=str,
+ action='callback', callback=store_abs)
+ op.add_option('--log-file-mbr', metavar='LOGF', type=str,
+ action='callback', callback=store_abs)
+ op.add_option('--state-file', metavar='STATF', type=str,
+ action='callback', callback=store_abs)
+ op.add_option('--state-detail-file', metavar='STATF',
+ type=str, action='callback', callback=store_abs)
+ op.add_option('--georep-session-working-dir', metavar='STATF',
+ type=str, action='callback', callback=store_abs)
+ op.add_option('--ignore-deletes', default=False, action='store_true')
+ op.add_option('--isolated-slave', default=False, action='store_true')
+ op.add_option('--use-rsync-xattrs', default=False, action='store_true')
+ op.add_option('-L', '--log-level', metavar='LVL')
+ op.add_option('-r', '--remote-gsyncd', metavar='CMD',
+ default=os.path.abspath(sys.argv[0]))
+ op.add_option('--volume-id', metavar='UUID')
+ op.add_option('--slave-id', metavar='ID')
+ op.add_option('--session-owner', metavar='ID')
+ op.add_option('--local-id', metavar='ID', help=SUPPRESS_HELP, default='')
+ op.add_option(
+ '--local-path', metavar='PATH', help=SUPPRESS_HELP, default='')
+ op.add_option('-s', '--ssh-command', metavar='CMD', default='ssh')
+ op.add_option('--ssh-command-tar', metavar='CMD', default='ssh')
+ op.add_option('--rsync-command', metavar='CMD', default='rsync')
+ op.add_option('--rsync-options', metavar='OPTS', default='')
+ op.add_option('--rsync-ssh-options', metavar='OPTS', default='--compress')
+ op.add_option('--timeout', metavar='SEC', type=int, default=120)
+ op.add_option('--connection-timeout', metavar='SEC',
+ type=int, default=60, help=SUPPRESS_HELP)
+ op.add_option('--sync-jobs', metavar='N', type=int, default=3)
+ op.add_option(
+ '--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP)
+ op.add_option('--allow-network', metavar='IPS', default='')
+ op.add_option('--socketdir', metavar='DIR')
+ op.add_option('--state-socket-unencoded', metavar='SOCKF',
+ type=str, action='callback', callback=store_abs)
+ op.add_option('--checkpoint', metavar='LABEL', default='')
# tunables for failover/failback mechanism:
# None - gsyncd behaves as normal
# blind - gsyncd works with xtime pairs to identify
@@ -225,56 +260,86 @@ def main_i():
op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP)
# changelog or xtime? (TODO: Change the default)
- op.add_option('--change-detector', metavar='MODE', type=str, default='xtime')
- # sleep interval for change detection (xtime crawl uses a hardcoded 1 second sleep time)
+ op.add_option(
+ '--change-detector', metavar='MODE', type=str, default='xtime')
+ # sleep interval for change detection (xtime crawl uses a hardcoded 1
+ # second sleep time)
op.add_option('--change-interval', metavar='SEC', type=int, default=3)
# working directory for changelog based mechanism
- op.add_option('--working-dir', metavar='DIR', type=str, action='callback', callback=store_abs)
+ op.add_option('--working-dir', metavar='DIR', type=str,
+ action='callback', callback=store_abs)
op.add_option('--use-tarssh', default=False, action='store_true')
- op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local)
+ op.add_option('-c', '--config-file', metavar='CONF',
+ type=str, action='callback', callback=store_local)
# duh. need to specify dest or value will be mapped to None :S
- op.add_option('--monitor', dest='monitor', action='callback', callback=store_local_curry(True))
- op.add_option('--resource-local', dest='resource_local', type=str, action='callback', callback=store_local)
- op.add_option('--resource-remote', dest='resource_remote', type=str, action='callback', callback=store_local)
- op.add_option('--feedback-fd', dest='feedback_fd', type=int, help=SUPPRESS_HELP, action='callback', callback=store_local)
- op.add_option('--listen', dest='listen', help=SUPPRESS_HELP, action='callback', callback=store_local_curry(True))
- op.add_option('-N', '--no-daemon', dest="go_daemon", action='callback', callback=store_local_curry('dont'))
- op.add_option('--verify', type=str, dest="verify", action='callback', callback=store_local)
- op.add_option('--create', type=str, dest="create", action='callback', callback=store_local)
- op.add_option('--delete', dest='delete', action='callback', callback=store_local_curry(True))
- op.add_option('--debug', dest="go_daemon", action='callback', callback=lambda *a: (store_local_curry('dont')(*a),
- setattr(a[-1].values, 'log_file', '-'),
- setattr(a[-1].values, 'log_level', 'DEBUG'))),
+ op.add_option('--monitor', dest='monitor', action='callback',
+ callback=store_local_curry(True))
+ op.add_option('--resource-local', dest='resource_local',
+ type=str, action='callback', callback=store_local)
+ op.add_option('--resource-remote', dest='resource_remote',
+ type=str, action='callback', callback=store_local)
+ op.add_option('--feedback-fd', dest='feedback_fd', type=int,
+ help=SUPPRESS_HELP, action='callback', callback=store_local)
+ op.add_option('--listen', dest='listen', help=SUPPRESS_HELP,
+ action='callback', callback=store_local_curry(True))
+ op.add_option('-N', '--no-daemon', dest="go_daemon",
+ action='callback', callback=store_local_curry('dont'))
+ op.add_option('--verify', type=str, dest="verify",
+ action='callback', callback=store_local)
+ op.add_option('--create', type=str, dest="create",
+ action='callback', callback=store_local)
+ op.add_option('--delete', dest='delete', action='callback',
+ callback=store_local_curry(True))
+ op.add_option('--debug', dest="go_daemon", action='callback',
+ callback=lambda *a: (store_local_curry('dont')(*a),
+ setattr(
+ a[-1].values, 'log_file', '-'),
+ setattr(a[-1].values, 'log_level',
+ 'DEBUG'))),
op.add_option('--path', type=str, action='append')
for a in ('check', 'get'):
- op.add_option('--config-' + a, metavar='OPT', type=str, dest='config', action='callback',
+ op.add_option('--config-' + a, metavar='OPT', type=str, dest='config',
+ action='callback',
callback=store_local_obj(a, lambda vx: {'opt': vx}))
- op.add_option('--config-get-all', dest='config', action='callback', callback=store_local_obj('get', lambda vx: {'opt': None}))
+ op.add_option('--config-get-all', dest='config', action='callback',
+ callback=store_local_obj('get', lambda vx: {'opt': None}))
for m in ('', '-rx', '-glob'):
# call this code 'Pythonic' eh?
- # have to define a one-shot local function to be able to inject (a value depending on the)
+ # have to define a one-shot local function to be able
+ # to inject (a value depending on the)
# iteration variable into the inner lambda
def conf_mod_opt_regex_variant(rx):
- op.add_option('--config-set' + m, metavar='OPT VAL', type=str, nargs=2, dest='config', action='callback',
- callback=store_local_obj('set', lambda vx: {'opt': vx[0], 'val': vx[1], 'rx': rx}))
- op.add_option('--config-del' + m, metavar='OPT', type=str, dest='config', action='callback',
- callback=store_local_obj('del', lambda vx: {'opt': vx, 'rx': rx}))
+ op.add_option('--config-set' + m, metavar='OPT VAL', type=str,
+ nargs=2, dest='config', action='callback',
+ callback=store_local_obj('set', lambda vx: {
+ 'opt': vx[0], 'val': vx[1], 'rx': rx}))
+ op.add_option('--config-del' + m, metavar='OPT', type=str,
+ dest='config', action='callback',
+ callback=store_local_obj('del', lambda vx: {
+ 'opt': vx, 'rx': rx}))
conf_mod_opt_regex_variant(m and m[1:] or False)
- op.add_option('--normalize-url', dest='url_print', action='callback', callback=store_local_curry('normal'))
- op.add_option('--canonicalize-url', dest='url_print', action='callback', callback=store_local_curry('canon'))
- op.add_option('--canonicalize-escape-url', dest='url_print', action='callback', callback=store_local_curry('canon_esc'))
-
- tunables = [ norm(o.get_opt_string()[2:]) for o in op.option_list if o.callback in (store_abs, 'store_true', None) and o.get_opt_string() not in ('--version', '--help') ]
- remote_tunables = [ 'listen', 'go_daemon', 'timeout', 'session_owner', 'config_file', 'use_rsync_xattrs' ]
- rq_remote_tunables = { 'listen': True }
-
- # precedence for sources of values: 1) commandline, 2) cfg file, 3) defaults
- # -- for this to work out we need to tell apart defaults from explicitly set
- # options... so churn out the defaults here and call the parser with virgin
- # values container.
+ op.add_option('--normalize-url', dest='url_print',
+ action='callback', callback=store_local_curry('normal'))
+ op.add_option('--canonicalize-url', dest='url_print',
+ action='callback', callback=store_local_curry('canon'))
+ op.add_option('--canonicalize-escape-url', dest='url_print',
+ action='callback', callback=store_local_curry('canon_esc'))
+
+ tunables = [norm(o.get_opt_string()[2:])
+ for o in op.option_list
+ if (o.callback in (store_abs, 'store_true', None) and
+ o.get_opt_string() not in ('--version', '--help'))]
+ remote_tunables = ['listen', 'go_daemon', 'timeout',
+ 'session_owner', 'config_file', 'use_rsync_xattrs']
+ rq_remote_tunables = {'listen': True}
+
+ # precedence for sources of values: 1) commandline, 2) cfg file, 3)
+ # defaults for this to work out we need to tell apart defaults from
+ # explicitly set options... so churn out the defaults here and call
+ # the parser with virgin values container.
defaults = op.get_default_values()
opts, args = op.parse_args(values=optparse.Values())
args_orig = args[:]
@@ -291,9 +356,9 @@ def main_i():
args.append(None)
args[1] = r
confdata = rconf.get('config')
- if not (len(args) == 2 or \
- (len(args) == 1 and rconf.get('listen')) or \
- (len(args) <= 2 and confdata) or \
+ if not (len(args) == 2 or
+ (len(args) == 1 and rconf.get('listen')) or
+ (len(args) <= 2 and confdata) or
rconf.get('url_print')):
sys.stderr.write("error: incorrect number of arguments\n\n")
sys.stderr.write(op.get_usage() + "\n")
@@ -301,8 +366,8 @@ def main_i():
verify = rconf.get('verify')
if verify:
- logging.info (verify)
- logging.info ("Able to spawn gsyncd.py")
+ logging.info(verify)
+ logging.info("Able to spawn gsyncd.py")
return
restricted = os.getenv('_GSYNCD_RESTRICTED_')
@@ -313,14 +378,17 @@ def main_i():
allopts.update(rconf)
bannedtuns = set(allopts.keys()) - set(remote_tunables)
if bannedtuns:
- raise GsyncdError('following tunables cannot be set with restricted SSH invocaton: ' + \
+ raise GsyncdError('following tunables cannot be set with '
+ 'restricted SSH invocaton: ' +
', '.join(bannedtuns))
for k, v in rq_remote_tunables.items():
if not k in allopts or allopts[k] != v:
- raise GsyncdError('tunable %s is not set to value %s required for restricted SSH invocaton' % \
+ raise GsyncdError('tunable %s is not set to value %s required '
+ 'for restricted SSH invocaton' %
(k, v))
confrx = getattr(confdata, 'rx', None)
+
def makersc(aa, check=True):
if not aa:
return ([], None, None)
@@ -330,12 +398,13 @@ def main_i():
if len(ra) > 1:
remote = ra[1]
if check and not local.can_connect_to(remote):
- raise GsyncdError("%s cannot work with %s" % (local.path, remote and remote.path))
+ raise GsyncdError("%s cannot work with %s" %
+ (local.path, remote and remote.path))
return (ra, local, remote)
if confrx:
# peers are regexen, don't try to parse them
if confrx == 'glob':
- args = [ '\A' + fnmatch.translate(a) for a in args ]
+ args = ['\A' + fnmatch.translate(a) for a in args]
canon_peers = args
namedict = {}
else:
@@ -345,21 +414,24 @@ def main_i():
for r in rscs:
print(r.get_url(**{'normal': {},
'canon': {'canonical': True},
- 'canon_esc': {'canonical': True, 'escaped': True}}[dc]))
+ 'canon_esc': {'canonical': True,
+ 'escaped': True}}[dc]))
return
pa = ([], [], [])
- urlprms = ({}, {'canonical': True}, {'canonical': True, 'escaped': True})
+ urlprms = (
+ {}, {'canonical': True}, {'canonical': True, 'escaped': True})
for x in rscs:
for i in range(len(pa)):
pa[i].append(x.get_url(**urlprms[i]))
_, canon_peers, canon_esc_peers = pa
- # creating the namedict, a dict representing various ways of referring to / repreenting
- # peers to be fillable in config templates
- mods = (lambda x: x, lambda x: x[0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:])
+ # creating the namedict, a dict representing various ways of referring
+ # to / repreenting peers to be fillable in config templates
+ mods = (lambda x: x, lambda x: x[
+ 0].upper() + x[1:], lambda x: 'e' + x[0].upper() + x[1:])
if remote:
- rmap = { local: ('local', 'master'), remote: ('remote', 'slave') }
+ rmap = {local: ('local', 'master'), remote: ('remote', 'slave')}
else:
- rmap = { local: ('local', 'slave') }
+ rmap = {local: ('local', 'slave')}
namedict = {}
for i in range(len(rscs)):
x = rscs[i]
@@ -370,10 +442,13 @@ def main_i():
if name == 'remote':
namedict['remotehost'] = x.remotehost
if not 'config_file' in rconf:
- rconf['config_file'] = os.path.join(os.path.dirname(sys.argv[0]), "conf/gsyncd_template.conf")
+ rconf['config_file'] = os.path.join(
+ os.path.dirname(sys.argv[0]), "conf/gsyncd_template.conf")
upgrade_config_file(rconf['config_file'])
- gcnf = GConffile(rconf['config_file'], canon_peers, defaults.__dict__, opts.__dict__, namedict)
+ gcnf = GConffile(
+ rconf['config_file'], canon_peers,
+ defaults.__dict__, opts.__dict__, namedict)
checkpoint_change = False
if confdata:
@@ -407,7 +482,7 @@ def main_i():
delete = rconf.get('delete')
if delete:
- logging.info ('geo-replication delete')
+ logging.info('geo-replication delete')
# Delete pid file, status file, socket file
cleanup_paths = []
if getattr(gconf, 'pid_file', None):
@@ -422,7 +497,7 @@ def main_i():
if getattr(gconf, 'state_socket_unencoded', None):
cleanup_paths.append(gconf.state_socket_unencoded)
- cleanup_paths.append(rconf['config_file'][:-11] + "*");
+ cleanup_paths.append(rconf['config_file'][:-11] + "*")
# Cleanup changelog working dirs
if getattr(gconf, 'working_dir', None):
@@ -432,7 +507,9 @@ def main_i():
if sys.exc_info()[1].errno == ENOENT:
pass
else:
- raise GsyncdError('Error while removing working dir: %s' % gconf.working_dir)
+ raise GsyncdError(
+ 'Error while removing working dir: %s' %
+ gconf.working_dir)
for path in cleanup_paths:
# To delete temp files
@@ -443,10 +520,11 @@ def main_i():
if restricted and gconf.allow_network:
ssh_conn = os.getenv('SSH_CONNECTION')
if not ssh_conn:
- #legacy env var
+ # legacy env var
ssh_conn = os.getenv('SSH_CLIENT')
if ssh_conn:
- allowed_networks = [ IPNetwork(a) for a in gconf.allow_network.split(',') ]
+ allowed_networks = [IPNetwork(a)
+ for a in gconf.allow_network.split(',')]
client_ip = IPAddress(ssh_conn.split()[0])
allowed = False
for nw in allowed_networks:
@@ -460,7 +538,7 @@ def main_i():
if ffd:
fcntl.fcntl(ffd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
- #normalize loglevel
+ # normalize loglevel
lvl0 = gconf.log_level
if isinstance(lvl0, str):
lvl1 = lvl0.upper()
@@ -519,7 +597,7 @@ def main_i():
if be_monitor:
label = 'monitor'
elif remote:
- #master
+ # master
label = gconf.local_path
else:
label = 'slave'
diff --git a/geo-replication/syncdaemon/libcxattr.py b/geo-replication/syncdaemon/libcxattr.py
index b5b6956aea6..e6035e26b43 100644
--- a/geo-replication/syncdaemon/libcxattr.py
+++ b/geo-replication/syncdaemon/libcxattr.py
@@ -1,8 +1,20 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
-from ctypes import *
+from ctypes import CDLL, c_int, create_string_buffer
from ctypes.util import find_library
+
class Xattr(object):
+
"""singleton that wraps the extended attribues system
interface for python using ctypes
@@ -40,7 +52,7 @@ class Xattr(object):
@classmethod
def lgetxattr(cls, path, attr, siz=0):
- return cls._query_xattr( path, siz, 'lgetxattr', attr)
+ return cls._query_xattr(path, siz, 'lgetxattr', attr)
@classmethod
def lgetxattr_buf(cls, path, attr):
diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py
index 68ec3baf144..ec563b36f29 100644
--- a/geo-replication/syncdaemon/libgfchangelog.py
+++ b/geo-replication/syncdaemon/libgfchangelog.py
@@ -1,7 +1,18 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
-from ctypes import *
+from ctypes import CDLL, create_string_buffer, get_errno
from ctypes.util import find_library
+
class Changes(object):
libgfc = CDLL(find_library("gfchangelog"), use_errno=True)
@@ -19,9 +30,10 @@ class Changes(object):
return getattr(cls.libgfc, call)
@classmethod
- def cl_register(cls, brick, path, log_file, log_level, retries = 0):
+ def cl_register(cls, brick, path, log_file, log_level, retries=0):
ret = cls._get_api('gf_changelog_register')(brick, path,
- log_file, log_level, retries)
+ log_file,
+ log_level, retries)
if ret == -1:
cls.raise_oserr()
@@ -49,8 +61,8 @@ class Changes(object):
while True:
ret = call(buf, 4096)
if ret in (0, -1):
- break;
- changes.append(buf.raw[:ret-1])
+ break
+ changes.append(buf.raw[:ret - 1])
if ret == -1:
cls.raise_oserr()
# cleanup tracker
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 98a61bc1d75..4301396f9f4 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -1,25 +1,30 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
import sys
import time
import stat
-import random
-import signal
import json
import logging
import socket
import string
import errno
-from shutil import copyfileobj
-from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode
-from threading import currentThread, Condition, Lock
+from errno import ENOENT, ENODATA, EPIPE, EEXIST
+from threading import Condition, Lock
from datetime import datetime
-from libcxattr import Xattr
-
from gconf import gconf
-from tempfile import mkdtemp, NamedTemporaryFile
-from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \
- unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \
- lstat, errno_wrap, update_file
+from tempfile import NamedTemporaryFile
+from syncdutils import Thread, GsyncdError, boolify, escape
+from syncdutils import unescape, select, gauxpfx, md5hex, selfkill
+from syncdutils import lstat, errno_wrap
URXTIME = (-1, 0)
@@ -27,18 +32,20 @@ URXTIME = (-1, 0)
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
+
def _xtime_now():
t = time.time()
sec = int(t)
nsec = int((t - sec) * 1000000)
return (sec, nsec)
+
def _volinfo_hook_relax_foreign(self):
volinfo_sys = self.get_sys_volinfo()
fgn_vi = volinfo_sys[self.KFGN]
if fgn_vi:
expiry = fgn_vi['timeout'] - int(time.time()) + 1
- logging.info('foreign volume info found, waiting %d sec for expiry' % \
+ logging.info('foreign volume info found, waiting %d sec for expiry' %
expiry)
time.sleep(expiry)
volinfo_sys = self.get_sys_volinfo()
@@ -58,10 +65,14 @@ def gmaster_builder(excrawl=None):
logging.info('setting up %s change detection mode' % changemixin)
modemixin = getattr(this, modemixin.capitalize() + 'Mixin')
crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')
- sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
- purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
+ sendmarkmixin = boolify(
+ gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin
+ purgemixin = boolify(
+ gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin
syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine
- class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine):
+
+ class _GMaster(crawlmixin, modemixin, sendmarkmixin,
+ purgemixin, syncengine):
pass
return _GMaster
@@ -71,6 +82,7 @@ def gmaster_builder(excrawl=None):
# sync modes
class NormalMixin(object):
+
"""normal geo-rep behavior"""
minus_infinity = URXTIME
@@ -152,14 +164,18 @@ class NormalMixin(object):
self.slave.server.set_stime(path, self.uuid, mark)
# self.slave.server.set_xtime_remote(path, self.uuid, mark)
+
class PartialMixin(NormalMixin):
+
"""a variant tuned towards operation with a master
that has partial info of the slave (brick typically)"""
def xtime_reversion_hook(self, path, xtl, xtr):
pass
+
class RecoverMixin(NormalMixin):
+
"""a variant that differs from normal in terms
of ignoring non-indexed files"""
@@ -178,11 +194,13 @@ class RecoverMixin(NormalMixin):
# Further mixins for certain tunable behaviors
+
class SendmarkNormalMixin(object):
def sendmark_regular(self, *a, **kw):
return self.sendmark(*a, **kw)
+
class SendmarkRsyncMixin(object):
def sendmark_regular(self, *a, **kw):
@@ -194,19 +212,24 @@ class PurgeNormalMixin(object):
def purge_missing(self, path, names):
self.slave.server.purge(path, names)
+
class PurgeNoopMixin(object):
def purge_missing(self, path, names):
pass
+
class TarSSHEngine(object):
+
"""Sync engine that uses tar(1) piped over ssh(1)
for data transfers. Good for lots of small files.
"""
+
def a_syncdata(self, files):
logging.debug('files: %s' % (files))
for f in files:
pb = self.syncer.add(f)
+
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
@@ -228,13 +251,17 @@ class TarSSHEngine(object):
self.a_syncdata(files)
self.syncdata_wait()
+
class RsyncEngine(object):
+
"""Sync engine that uses rsync(1) for data transfers"""
+
def a_syncdata(self, files):
logging.debug('files: %s' % (files))
for f in files:
logging.debug('candidate for syncing %s' % f)
pb = self.syncer.add(f)
+
def regjob(se, xte, pb):
rv = pb.wait()
if rv[0]:
@@ -258,7 +285,9 @@ class RsyncEngine(object):
self.a_syncdata(files)
self.syncdata_wait()
+
class GMasterCommon(object):
+
"""abstract class impementling master role"""
KFGN = 0
@@ -269,8 +298,9 @@ class GMasterCommon(object):
err out on multiple foreign masters
"""
- fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \
- self.master.server.aggregated.native_volume_info()
+ fgn_vis, nat_vi = (
+ self.master.server.aggregated.foreign_volume_infos(),
+ self.master.server.aggregated.native_volume_info())
fgn_vi = None
if fgn_vis:
if len(fgn_vis) > 1:
@@ -316,15 +346,14 @@ class GMasterCommon(object):
if getattr(gconf, 'state_detail_file', None):
try:
with open(gconf.state_detail_file, 'r+') as f:
- loaded_data= json.load(f)
- diff_data = set(default_data) - set (loaded_data)
+ loaded_data = json.load(f)
+ diff_data = set(default_data) - set(loaded_data)
if len(diff_data):
for i in diff_data:
loaded_data[i] = default_data[i]
return loaded_data
- except (IOError):
- ex = sys.exc_info()[1]
- logging.warn ('Creating new gconf.state_detail_file.')
+ except IOError:
+ logging.warn('Creating new gconf.state_detail_file.')
# Create file with initial data
try:
with open(gconf.state_detail_file, 'wb') as f:
@@ -364,7 +393,8 @@ class GMasterCommon(object):
# - self.turns is the number of turns since start
# - self.total_turns is a limit so that if self.turns reaches it, then
# we exit (for diagnostic purposes)
- # so, eg., if the master fs changes unceasingly, self.turns will remain 0.
+ # so, eg., if the master fs changes unceasingly, self.turns will remain
+ # 0.
self.crawls = 0
self.turns = 0
self.total_turns = int(gconf.turns)
@@ -394,7 +424,7 @@ class GMasterCommon(object):
t.start()
def should_crawl(cls):
- return (gconf.glusterd_uuid in cls.master.server.node_uuid())
+ return gconf.glusterd_uuid in cls.master.server.node_uuid()
def register(self):
self.register()
@@ -416,18 +446,18 @@ class GMasterCommon(object):
volinfo_sys = self.volinfo_hook()
self.volinfo = volinfo_sys[self.KNAT]
inter_master = volinfo_sys[self.KFGN]
- logging.info("%s master with volume id %s ..." % \
- (inter_master and "intermediate" or "primary",
- self.uuid))
+ logging.info("%s master with volume id %s ..." %
+ (inter_master and "intermediate" or "primary",
+ self.uuid))
gconf.configinterface.set('volume_id', self.uuid)
if self.volinfo:
if self.volinfo['retval']:
- logging.warn("master cluster's info may not be valid %d" % \
+ logging.warn("master cluster's info may not be valid %d" %
self.volinfo['retval'])
self.start_checkpoint_thread()
else:
raise GsyncdError("master volinfo unavailable")
- self.total_crawl_stats = self.get_initial_crawl_data()
+ self.total_crawl_stats = self.get_initial_crawl_data()
self.lastreport['time'] = time.time()
logging.info('crawl interval: %d seconds' % self.sleep_interval)
@@ -435,7 +465,7 @@ class GMasterCommon(object):
crawl = self.should_crawl()
while not self.terminate:
if self.start:
- logging.debug("... crawl #%d done, took %.6f seconds" % \
+ logging.debug("... crawl #%d done, took %.6f seconds" %
(self.crawls, time.time() - self.start))
self.start = time.time()
should_display_info = self.start - self.lastreport['time'] >= 60
@@ -443,11 +473,11 @@ class GMasterCommon(object):
logging.info("%d crawls, %d turns",
self.crawls - self.lastreport['crawls'],
self.turns - self.lastreport['turns'])
- self.lastreport.update(crawls = self.crawls,
- turns = self.turns,
- time = self.start)
+ self.lastreport.update(crawls=self.crawls,
+ turns=self.turns,
+ time=self.start)
t1 = time.time()
- if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds
+ if int(t1 - t0) >= 60: # lets hardcode this check to 60 seconds
crawl = self.should_crawl()
t0 = t1
self.update_worker_remote_node()
@@ -456,11 +486,14 @@ class GMasterCommon(object):
# bring up _this_ brick to the cluster stime
# which is min of cluster (but max of the replicas)
brick_stime = self.xtime('.', self.slave)
- cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
- logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime)))
+ cluster_stime = self.master.server.aggregated.stime_mnt(
+ '.', '.'.join([str(self.uuid), str(gconf.slave_id)]))
+ logging.debug("Cluster stime: %s | Brick stime: %s" %
+ (repr(cluster_stime), repr(brick_stime)))
if not isinstance(cluster_stime, int):
if brick_stime < cluster_stime:
- self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
+ self.slave.server.set_stime(
+ self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)
time.sleep(5)
continue
self.update_worker_health("Active")
@@ -489,13 +522,14 @@ class GMasterCommon(object):
with checkpoint @chkpt"""
if xtimish:
val = cls.serialize_xtime(val)
- gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
+ gconf.configinterface.set(
+ 'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))
@staticmethod
def humantime(*tpair):
"""format xtime-like (sec, nsec) pair to human readable format"""
ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\
- strftime("%Y-%m-%d %H:%M:%S")
+ strftime("%Y-%m-%d %H:%M:%S")
if len(tpair) > 1:
ts += '.' + str(tpair[1])
return ts
@@ -506,7 +540,7 @@ class GMasterCommon(object):
years = int(years)
days = int(days)
- date=""
+ date = ""
m, s = divmod(crawl_time.seconds, 60)
h, m = divmod(m, 60)
@@ -515,7 +549,8 @@ class GMasterCommon(object):
if days != 0:
date += "%s %s " % (days, "day" if days == 1 else "days")
- date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2))
+ date += "%s:%s:%s" % (string.zfill(h, 2),
+ string.zfill(m, 2), string.zfill(s, 2))
return date
def checkpt_service(self, chan, chkpt):
@@ -540,16 +575,18 @@ class GMasterCommon(object):
if not checkpt_tgt:
checkpt_tgt = self.xtime('.')
if isinstance(checkpt_tgt, int):
- raise GsyncdError("master root directory is unaccessible (%s)",
+ raise GsyncdError("master root directory is "
+ "unaccessible (%s)",
os.strerror(checkpt_tgt))
self._set_checkpt_param(chkpt, 'target', checkpt_tgt)
- logging.debug("checkpoint target %s has been determined for checkpoint %s" % \
+ logging.debug("checkpoint target %s has been determined "
+ "for checkpoint %s" %
(repr(checkpt_tgt), chkpt))
# check if the label is 'now'
chkpt_lbl = chkpt
try:
- x1,x2 = chkpt.split(':')
+ x1, x2 = chkpt.split(':')
if x1 == 'now':
chkpt_lbl = "as of " + self.humantime(x2)
except:
@@ -557,41 +594,46 @@ class GMasterCommon(object):
completed = self._checkpt_param(chkpt, 'completed', xtimish=False)
if completed:
completed = tuple(int(x) for x in completed.split('.'))
- s,_,_ = select([chan], [], [], (not completed) and 5 or None)
+ s, _, _ = select([chan], [], [], (not completed) and 5 or None)
# either request made and we re-check to not
# give back stale data, or we still hunting for completion
- if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark:
+ if (self.native_xtime(checkpt_tgt) and (
+ self.native_xtime(checkpt_tgt) < self.volmark)):
# indexing has been reset since setting the checkpoint
status = "is invalid"
else:
xtr = self.xtime('.', self.slave)
if isinstance(xtr, int):
- raise GsyncdError("slave root directory is unaccessible (%s)",
+ raise GsyncdError("slave root directory is "
+ "unaccessible (%s)",
os.strerror(xtr))
ncompleted = self.xtime_geq(xtr, checkpt_tgt)
- if completed and not ncompleted: # stale data
- logging.warn("completion time %s for checkpoint %s became stale" % \
+ if completed and not ncompleted: # stale data
+ logging.warn("completion time %s for checkpoint %s "
+ "became stale" %
(self.humantime(*completed), chkpt))
completed = None
gconf.configinterface.delete('checkpoint_completed')
- if ncompleted and not completed: # just reaching completion
+ if ncompleted and not completed: # just reaching completion
completed = "%.6f" % time.time()
- self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False)
+ self._set_checkpt_param(
+ chkpt, 'completed', completed, xtimish=False)
completed = tuple(int(x) for x in completed.split('.'))
logging.info("checkpoint %s completed" % chkpt)
status = completed and \
- "completed at " + self.humantime(completed[0]) or \
- "not reached yet"
+ "completed at " + self.humantime(completed[0]) or \
+ "not reached yet"
if s:
conn = None
try:
conn, _ = chan.accept()
try:
- conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status))
+ conn.send("checkpoint %s is %s\0" %
+ (chkpt_lbl, status))
except:
exc = sys.exc_info()[1]
- if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \
- exc.errno == EPIPE:
+ if ((isinstance(exc, OSError) or isinstance(
+ exc, IOError)) and exc.errno == EPIPE):
logging.debug('checkpoint client disconnected')
else:
raise
@@ -602,11 +644,13 @@ class GMasterCommon(object):
def start_checkpoint_thread(self):
"""prepare and start checkpoint service"""
if self.checkpoint_thread or not (
- getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None)
+ getattr(gconf, 'state_socket_unencoded', None) and getattr(
+ gconf, 'socketdir', None)
):
return
chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")
+ state_socket = os.path.join(
+ gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")
try:
os.unlink(state_socket)
except:
@@ -621,9 +665,9 @@ class GMasterCommon(object):
def add_job(self, path, label, job, *a, **kw):
"""insert @job function to job table at @path with @label"""
- if self.jobtab.get(path) == None:
+ if self.jobtab.get(path) is None:
self.jobtab[path] = []
- self.jobtab[path].append((label, a, lambda : job(*a, **kw)))
+ self.jobtab[path].append((label, a, lambda: job(*a, **kw)))
def add_failjob(self, path, label):
"""invoke .add_job with a job that does nothing just fails"""
@@ -644,7 +688,7 @@ class GMasterCommon(object):
ret = j[-1]()
if not ret:
succeed = False
- if succeed and not args[0] == None:
+ if succeed and not args[0] is None:
self.sendmark(path, *args)
return succeed
@@ -657,19 +701,21 @@ class GMasterCommon(object):
self.slave.server.setattr(path, adct)
self.set_slave_xtime(path, mark)
+
class GMasterChangelogMixin(GMasterCommon):
+
""" changelog based change detection and syncing """
# index for change type and entry
IDX_START = 0
- IDX_END = 2
+ IDX_END = 2
- POS_GFID = 0
- POS_TYPE = 1
+ POS_GFID = 0
+ POS_TYPE = 1
POS_ENTRY1 = -1
- TYPE_META = "M "
- TYPE_GFID = "D "
+ TYPE_META = "M "
+ TYPE_GFID = "D "
TYPE_ENTRY = "E "
# flat directory heirarchy for gfid based access
@@ -686,18 +732,19 @@ class GMasterChangelogMixin(GMasterCommon):
def setup_working_dir(self):
workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path))
logfile = os.path.join(workdir, 'changes.log')
- logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))
+ logging.debug('changelog working dir %s (log: %s)' %
+ (workdir, logfile))
return (workdir, logfile)
def process_change(self, change, done, retry):
pfx = gauxpfx()
- clist = []
+ clist = []
entries = []
meta_gfid = set()
datas = set()
# basic crawl stats: files and bytes
- files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}
+ files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}
try:
f = open(change, "r")
clist = f.readlines()
@@ -750,17 +797,19 @@ class GMasterChangelogMixin(GMasterCommon):
elif ty in ['CREATE', 'MKDIR', 'MKNOD']:
entry_update()
# stat information present in the changelog itself
- entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\
+ entries.append(edct(ty, gfid=gfid, entry=en,
+ mode=int(ec[2]),
uid=int(ec[3]), gid=int(ec[4])))
else:
# stat() to get mode and other information
go = os.path.join(pfx, gfid)
st = lstat(go)
if isinstance(st, int):
- if ty == 'RENAME': # special hack for renames...
+ if ty == 'RENAME': # special hack for renames...
entries.append(edct('UNLINK', gfid=gfid, entry=en))
else:
- logging.debug('file %s got purged in the interim' % go)
+ logging.debug(
+ 'file %s got purged in the interim' % go)
continue
if ty == 'LINK':
@@ -771,17 +820,20 @@ class GMasterChangelogMixin(GMasterCommon):
if isinstance(rl, int):
continue
entry_update()
- entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
+ entries.append(
+ edct(ty, stat=st, entry=en, gfid=gfid, link=rl))
elif ty == 'RENAME':
entry_update()
- e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
- entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))
+ e1 = unescape(
+ os.path.join(pfx, ec[self.POS_ENTRY1 - 1]))
+ entries.append(
+ edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))
else:
logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et == self.TYPE_GFID:
datas.add(os.path.join(pfx, ec[0]))
elif et == self.TYPE_META:
- if ec[1] == 'SETATTR': # only setattr's for now...
+ if ec[1] == 'SETATTR': # only setattr's for now...
meta_gfid.add(os.path.join(pfx, ec[0]))
else:
logging.warn('got invalid changelog type: %s' % (et))
@@ -789,10 +841,10 @@ class GMasterChangelogMixin(GMasterCommon):
if not retry:
self.update_worker_cumilitive_status(files_pending)
# sync namespace
- if (entries):
+ if entries:
self.slave.server.entry_ops(entries)
# sync metadata
- if (meta_gfid):
+ if meta_gfid:
meta_entries = []
for go in meta_gfid:
st = lstat(go)
@@ -814,22 +866,25 @@ class GMasterChangelogMixin(GMasterCommon):
self.skipped_gfid_list = []
self.current_files_skipped_count = 0
- # first, fire all changelog transfers in parallel. entry and metadata
- # are performed synchronously, therefore in serial. However at the end
- # of each changelog, data is synchronized with syncdata_async() - which
- # means it is serial w.r.t entries/metadata of that changelog but
- # happens in parallel with data of other changelogs.
+ # first, fire all changelog transfers in parallel. entry and
+ # metadata are performed synchronously, therefore in serial.
+ # However at the end of each changelog, data is synchronized
+ # with syncdata_async() - which means it is serial w.r.t
+ # entries/metadata of that changelog but happens in parallel
+ # with data of other changelogs.
for change in changes:
logging.debug('processing change %s' % change)
self.process_change(change, done, retry)
if not retry:
- self.turns += 1 # number of changelogs processed in the batch
+ # number of changelogs processed in the batch
+ self.turns += 1
- # Now we wait for all the data transfers fired off in the above step
- # to complete. Note that this is not ideal either. Ideally we want to
- # trigger the entry/meta-data transfer of the next batch while waiting
- # for the data transfer of the current batch to finish.
+ # Now we wait for all the data transfers fired off in the above
+ # step to complete. Note that this is not ideal either. Ideally
+ # we want to trigger the entry/meta-data transfer of the next
+ # batch while waiting for the data transfer of the current batch
+ # to finish.
# Note that the reason to wait for the data transfer (vs doing it
# completely in the background and call the changelog_done()
@@ -837,10 +892,11 @@ class GMasterChangelogMixin(GMasterCommon):
# and prevents a spiraling increase of wait stubs from consuming
# unbounded memory and resources.
- # update the slave's time with the timestamp of the _last_ changelog
- # file time suffix. Since, the changelog prefix time is the time when
- # the changelog was rolled over, introduce a tolerence of 1 second to
- # counter the small delta b/w the marker update and gettimeofday().
+ # update the slave's time with the timestamp of the _last_
+ # changelog file time suffix. Since, the changelog prefix time
+ # is the time when the changelog was rolled over, introduce a
+ # tolerence of 1 second to counter the small delta b/w the
+ # marker update and gettimeofday().
# NOTE: this is only for changelog mode, not xsync.
# @change is the last changelog (therefore max time for this batch)
@@ -856,10 +912,13 @@ class GMasterChangelogMixin(GMasterCommon):
retry = True
tries += 1
if tries == self.MAX_RETRIES:
- logging.warn('changelogs %s could not be processed - moving on...' % \
+ logging.warn('changelogs %s could not be processed - '
+ 'moving on...' %
' '.join(map(os.path.basename, changes)))
- self.update_worker_total_files_skipped(self.current_files_skipped_count)
- logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list))
+ self.update_worker_total_files_skipped(
+ self.current_files_skipped_count)
+ logging.warn('SKIPPED GFID = %s' %
+ ','.join(self.skipped_gfid_list))
self.update_worker_files_syncd()
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
@@ -873,7 +932,7 @@ class GMasterChangelogMixin(GMasterCommon):
# entry_ops() that failed... so we retry the _whole_ changelog
# again.
# TODO: remove entry retries when it's gets fixed.
- logging.warn('incomplete sync, retrying changelogs: %s' % \
+ logging.warn('incomplete sync, retrying changelogs: %s' %
' '.join(map(os.path.basename, changes)))
time.sleep(0.5)
@@ -884,15 +943,15 @@ class GMasterChangelogMixin(GMasterCommon):
self.sendmark(path, stime)
def get_worker_status_file(self):
- file_name = gconf.local_path+'.status'
+ file_name = gconf.local_path + '.status'
file_name = file_name.replace("/", "_")
- worker_status_file = gconf.georep_session_working_dir+file_name
+ worker_status_file = gconf.georep_session_working_dir + file_name
return worker_status_file
def update_worker_status(self, key, value):
- default_data = {"remote_node":"N/A",
- "worker status":"Not Started",
- "crawl status":"N/A",
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
"files_syncd": 0,
"files_remaining": 0,
"bytes_remaining": 0,
@@ -909,7 +968,7 @@ class GMasterChangelogMixin(GMasterCommon):
f.flush()
os.fsync(f.fileno())
except (IOError, OSError, ValueError):
- logging.info ('Creating new %s' % worker_status_file)
+ logging.info('Creating new %s' % worker_status_file)
try:
with open(worker_status_file, 'wb') as f:
default_data[key] = value
@@ -920,9 +979,9 @@ class GMasterChangelogMixin(GMasterCommon):
raise
def update_worker_cumilitive_status(self, files_pending):
- default_data = {"remote_node":"N/A",
- "worker status":"Not Started",
- "crawl status":"N/A",
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
"files_syncd": 0,
"files_remaining": 0,
"bytes_remaining": 0,
@@ -932,8 +991,8 @@ class GMasterChangelogMixin(GMasterCommon):
try:
with open(worker_status_file, 'r+') as f:
loaded_data = json.load(f)
- loaded_data['files_remaining'] = files_pending['count']
- loaded_data['bytes_remaining'] = files_pending['bytes']
+ loaded_data['files_remaining'] = files_pending['count']
+ loaded_data['bytes_remaining'] = files_pending['bytes']
loaded_data['purges_remaining'] = files_pending['purge']
os.ftruncate(f.fileno(), 0)
os.lseek(f.fileno(), 0, os.SEEK_SET)
@@ -941,11 +1000,11 @@ class GMasterChangelogMixin(GMasterCommon):
f.flush()
os.fsync(f.fileno())
except (IOError, OSError, ValueError):
- logging.info ('Creating new %s' % worker_status_file)
+ logging.info('Creating new %s' % worker_status_file)
try:
with open(worker_status_file, 'wb') as f:
- default_data['files_remaining'] = files_pending['count']
- default_data['bytes_remaining'] = files_pending['bytes']
+ default_data['files_remaining'] = files_pending['count']
+ default_data['bytes_remaining'] = files_pending['bytes']
default_data['purges_remaining'] = files_pending['purge']
json.dump(default_data, f)
f.flush()
@@ -953,24 +1012,24 @@ class GMasterChangelogMixin(GMasterCommon):
except:
raise
- def update_worker_remote_node (self):
+ def update_worker_remote_node(self):
node = sys.argv[-1]
node = node.split("@")[-1]
remote_node_ip = node.split(":")[0]
remote_node_vol = node.split(":")[3]
remote_node = remote_node_ip + '::' + remote_node_vol
- self.update_worker_status ('remote_node', remote_node)
+ self.update_worker_status('remote_node', remote_node)
- def update_worker_health (self, state):
- self.update_worker_status ('worker status', state)
+ def update_worker_health(self, state):
+ self.update_worker_status('worker status', state)
- def update_worker_crawl_status (self, state):
- self.update_worker_status ('crawl status', state)
+ def update_worker_crawl_status(self, state):
+ self.update_worker_status('crawl status', state)
- def update_worker_files_syncd (self):
- default_data = {"remote_node":"N/A",
- "worker status":"Not Started",
- "crawl status":"N/A",
+ def update_worker_files_syncd(self):
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
"files_syncd": 0,
"files_remaining": 0,
"bytes_remaining": 0,
@@ -981,8 +1040,8 @@ class GMasterChangelogMixin(GMasterCommon):
with open(worker_status_file, 'r+') as f:
loaded_data = json.load(f)
loaded_data['files_syncd'] += loaded_data['files_remaining']
- loaded_data['files_remaining'] = 0
- loaded_data['bytes_remaining'] = 0
+ loaded_data['files_remaining'] = 0
+ loaded_data['bytes_remaining'] = 0
loaded_data['purges_remaining'] = 0
os.ftruncate(f.fileno(), 0)
os.lseek(f.fileno(), 0, os.SEEK_SET)
@@ -990,7 +1049,7 @@ class GMasterChangelogMixin(GMasterCommon):
f.flush()
os.fsync(f.fileno())
except (IOError, OSError, ValueError):
- logging.info ('Creating new %s' % worker_status_file)
+ logging.info('Creating new %s' % worker_status_file)
try:
with open(worker_status_file, 'wb') as f:
json.dump(default_data, f)
@@ -999,19 +1058,19 @@ class GMasterChangelogMixin(GMasterCommon):
except:
raise
- def update_worker_files_remaining (self, state):
- self.update_worker_status ('files_remaining', state)
+ def update_worker_files_remaining(self, state):
+ self.update_worker_status('files_remaining', state)
- def update_worker_bytes_remaining (self, state):
- self.update_worker_status ('bytes_remaining', state)
+ def update_worker_bytes_remaining(self, state):
+ self.update_worker_status('bytes_remaining', state)
- def update_worker_purges_remaining (self, state):
- self.update_worker_status ('purges_remaining', state)
+ def update_worker_purges_remaining(self, state):
+ self.update_worker_status('purges_remaining', state)
- def update_worker_total_files_skipped (self, value):
- default_data = {"remote_node":"N/A",
- "worker status":"Not Started",
- "crawl status":"N/A",
+ def update_worker_total_files_skipped(self, value):
+ default_data = {"remote_node": "N/A",
+ "worker status": "Not Started",
+ "crawl status": "N/A",
"files_syncd": 0,
"files_remaining": 0,
"bytes_remaining": 0,
@@ -1029,7 +1088,7 @@ class GMasterChangelogMixin(GMasterCommon):
f.flush()
os.fsync(f.fileno())
except (IOError, OSError, ValueError):
- logging.info ('Creating new %s' % worker_status_file)
+ logging.info('Creating new %s' % worker_status_file)
try:
with open(worker_status_file, 'wb') as f:
default_data['total_files_skipped'] = value
@@ -1057,9 +1116,12 @@ class GMasterChangelogMixin(GMasterCommon):
if changes:
if purge_time:
logging.info("slave's time: %s" % repr(purge_time))
- processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]]
+ processed = [x for x in changes
+ if int(x.split('.')[-1]) < purge_time[0]]
for pr in processed:
- logging.info('skipping already processed change: %s...' % os.path.basename(pr))
+ logging.info(
+ 'skipping already processed change: %s...' %
+ os.path.basename(pr))
self.master.server.changelog_done(pr)
changes.remove(pr)
logging.debug('processing changes %s' % repr(changes))
@@ -1080,7 +1142,9 @@ class GMasterChangelogMixin(GMasterCommon):
# control should not reach here
raise
+
class GMasterXsyncMixin(GMasterChangelogMixin):
+
"""
This crawl needs to be xtime based (as of now
it's not. this is beacuse we generate CHANGELOG
@@ -1091,7 +1155,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
files, hardlinks and symlinks.
"""
- XSYNC_MAX_ENTRIES = 1<<13
+ XSYNC_MAX_ENTRIES = 1 << 13
def register(self):
self.counter = 0
@@ -1145,7 +1209,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
def open(self):
try:
- self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))
+ self.xsync_change = os.path.join(
+ self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))
self.fh = open(self.xsync_change, 'w')
except IOError:
raise
@@ -1165,7 +1230,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.put('xsync', self.fname())
self.counter = 0
if not last:
- time.sleep(1) # make sure changelogs are 1 second apart
+ time.sleep(1) # make sure changelogs are 1 second apart
self.open()
def sync_stime(self, stime=None, last=False):
@@ -1207,7 +1272,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr_root = self.xtime('.', self.slave)
if isinstance(xtr_root, int):
if xtr_root != ENOENT:
- logging.warn("slave cluster not returning the " \
+ logging.warn("slave cluster not returning the "
"correct xtime for root (%d)" % xtr_root)
xtr_root = self.minus_infinity
xtl = self.xtime(path)
@@ -1216,7 +1281,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
xtr = self.xtime(path, self.slave)
if isinstance(xtr, int):
if xtr != ENOENT:
- logging.warn("slave cluster not returning the " \
+ logging.warn("slave cluster not returning the "
"correct xtime for %s (%d)" % (path, xtr))
xtr = self.minus_infinity
xtr = max(xtr, xtr_root)
@@ -1235,7 +1300,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
e = os.path.join(path, e)
xte = self.xtime(e)
if isinstance(xte, int):
- logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte]))
+ logging.warn("irregular xtime for %s: %s" %
+ (e, errno.errorcode[xte]))
continue
if not self.need_sync(e, xte, xtr):
continue
@@ -1256,35 +1322,51 @@ class GMasterXsyncMixin(GMasterChangelogMixin):
self.sync_done(self.stimes, False)
self.stimes = []
if stat.S_ISDIR(mo):
- self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))])
+ self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(
+ st.st_uid), str(st.st_gid), escape(os.path.join(pargfid,
+ bname))])
self.Xcrawl(e, xtr_root)
self.stimes.append((e, xte))
elif stat.S_ISLNK(mo):
- self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))])
+ self.write_entry_change(
+ "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid,
+ bname))])
elif stat.S_ISREG(mo):
nlink = st.st_nlink
- nlink -= 1 # fixup backend stat link count
- # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave
- # side will decide if to create the new entry, or to create link.
+ nlink -= 1 # fixup backend stat link count
+ # if a file has a hardlink, create a Changelog entry as
+ # 'LINK' so the slave side will decide if to create the
+ # new entry, or to create link.
if nlink == 1:
- self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))])
+ self.write_entry_change("E",
+ [gfid, 'MKNOD', str(mo),
+ str(st.st_uid),
+ str(st.st_gid),
+ escape(os.path.join(
+ pargfid, bname))])
else:
- self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))])
+ self.write_entry_change(
+ "E", [gfid, 'LINK', escape(os.path.join(pargfid,
+ bname))])
self.write_entry_change("D", [gfid])
if path == '.':
self.stimes.append((path, xtl))
self.sync_done(self.stimes, True)
+
class BoxClosedErr(Exception):
pass
+
class PostBox(list):
+
"""synchronized collection for storing things thought of as "requests" """
def __init__(self, *a):
list.__init__(self, *a)
# too bad Python stdlib does not have read/write locks...
- # it would suffivce to grab the lock in .append as reader, in .close as writer
+ # it would suffivce to grab the lock in .append as reader, in .close as
+ # writer
self.lever = Condition()
self.open = True
self.done = False
@@ -1319,7 +1401,9 @@ class PostBox(list):
self.open = False
self.lever.release()
+
class Syncer(object):
+
"""a staged queue to relay rsync requests to rsync workers
By "staged queue" its meant that when a consumer comes to the
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index b0262ee30a8..8ed6f832618 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -1,3 +1,13 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
import sys
import time
@@ -9,12 +19,16 @@ from subprocess import PIPE
from resource import Popen, FILE, GLUSTER, SSH
from threading import Lock
from gconf import gconf
-from syncdutils import update_file, select, waitpid, set_term_handler, is_host_local, GsyncdError
+from syncdutils import update_file, select, waitpid
+from syncdutils import set_term_handler, is_host_local, GsyncdError
from syncdutils import escape, Thread, finalize, memoize
+
class Volinfo(object):
+
def __init__(self, vol, host='localhost', prelude=[]):
- po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, 'volume', 'info', vol],
+ po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
+ 'volume', 'info', vol],
stdout=PIPE, stderr=PIPE)
vix = po.stdout.read()
po.wait()
@@ -25,7 +39,8 @@ class Volinfo(object):
via = '(via %s) ' % prelude.join(' ')
else:
via = ' '
- raise GsyncdError('getting volume info of %s%s failed with errorcode %s',
+ raise GsyncdError('getting volume info of %s%s '
+ 'failed with errorcode %s',
(vol, via, vi.find('opErrno').text))
self.tree = vi
self.volume = vol
@@ -40,25 +55,27 @@ class Volinfo(object):
def bparse(b):
host, dirp = b.text.split(':', 2)
return {'host': host, 'dir': dirp}
- return [ bparse(b) for b in self.get('brick') ]
+ return [bparse(b) for b in self.get('brick')]
@property
@memoize
def uuid(self):
ids = self.get('id')
if len(ids) != 1:
- raise GsyncdError("volume info of %s obtained from %s: ambiguous uuid",
+ raise GsyncdError("volume info of %s obtained from %s: "
+ "ambiguous uuid",
self.volume, self.host)
return ids[0].text
class Monitor(object):
+
"""class which spawns and manages gsyncd workers"""
- ST_INIT = 'Initializing...'
- ST_STABLE = 'Stable'
- ST_FAULTY = 'faulty'
- ST_INCON = 'inconsistent'
+ ST_INIT = 'Initializing...'
+ ST_STABLE = 'Stable'
+ ST_FAULTY = 'faulty'
+ ST_INCON = 'inconsistent'
_ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]
def __init__(self):
@@ -68,7 +85,8 @@ class Monitor(object):
def set_state(self, state, w=None):
"""set the state that can be used by external agents
like glusterd for status reporting"""
- computestate = lambda: self.state and self._ST_ORD[max(self._ST_ORD.index(s) for s in self.state.values())]
+ computestate = lambda: self.state and self._ST_ORD[
+ max(self._ST_ORD.index(s) for s in self.state.values())]
if w:
self.lock.acquire()
old_state = computestate()
@@ -112,14 +130,17 @@ class Monitor(object):
self.set_state(self.ST_INIT, w)
ret = 0
+
def nwait(p, o=0):
p2, r = waitpid(p, o)
if not p2:
return
return r
+
def exit_signalled(s):
""" child teminated due to receipt of SIGUSR1 """
return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))
+
def exit_status(s):
if os.WIFEXITED(s):
return os.WEXITSTATUS(s)
@@ -134,7 +155,8 @@ class Monitor(object):
os.close(pr)
os.execv(sys.executable, argv + ['--feedback-fd', str(pw),
'--local-path', w[0],
- '--local-id', '.' + escape(w[0]),
+ '--local-id',
+ '.' + escape(w[0]),
'--resource-remote', w[1]])
self.lock.acquire()
cpids.add(cpid)
@@ -145,31 +167,31 @@ class Monitor(object):
os.close(pr)
if so:
ret = nwait(cpid, os.WNOHANG)
- if ret != None:
- logging.info("worker(%s) died before establishing " \
+ if ret is not None:
+ logging.info("worker(%s) died before establishing "
"connection" % w[0])
else:
logging.debug("worker(%s) connected" % w[0])
while time.time() < t0 + conn_timeout:
ret = nwait(cpid, os.WNOHANG)
- if ret != None:
- logging.info("worker(%s) died in startup " \
+ if ret is not None:
+ logging.info("worker(%s) died in startup "
"phase" % w[0])
break
time.sleep(1)
else:
- logging.info("worker(%s) not confirmed in %d sec, " \
+ logging.info("worker(%s) not confirmed in %d sec, "
"aborting it" % (w[0], conn_timeout))
os.kill(cpid, signal.SIGKILL)
ret = nwait(cpid)
- if ret == None:
+ if ret is None:
self.set_state(self.ST_STABLE, w)
ret = nwait(cpid)
if exit_signalled(ret):
ret = 0
else:
ret = exit_status(ret)
- if ret in (0,1):
+ if ret in (0, 1):
self.set_state(self.ST_FAULTY, w)
time.sleep(10)
self.set_state(self.ST_INCON, w)
@@ -194,17 +216,18 @@ class Monitor(object):
os.kill(cpid, signal.SIGKILL)
self.lock.release()
finalize(exval=1)
- t = Thread(target = wmon, args=[wx])
+ t = Thread(target=wmon, args=[wx])
t.start()
ta.append(t)
for t in ta:
t.join()
+
def distribute(*resources):
master, slave = resources
mvol = Volinfo(master.volume, master.host)
logging.debug('master bricks: ' + repr(mvol.bricks))
- prelude = []
+ prelude = []
si = slave
if isinstance(slave, SSH):
prelude = gconf.ssh_command.split() + [slave.remote_addr]
@@ -221,23 +244,28 @@ def distribute(*resources):
raise GsyncdError("unkown slave type " + slave.url)
logging.info('slave bricks: ' + repr(sbricks))
if isinstance(si, FILE):
- slaves = [ slave.url ]
+ slaves = [slave.url]
else:
slavenodes = set(b['host'] for b in sbricks)
if isinstance(slave, SSH) and not gconf.isolated_slave:
rap = SSH.parse_ssh_address(slave)
- slaves = [ 'ssh://' + rap['user'] + '@' + h + ':' + si.url for h in slavenodes ]
+ slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url
+ for h in slavenodes]
else:
- slavevols = [ h + ':' + si.volume for h in slavenodes ]
+ slavevols = [h + ':' + si.volume for h in slavenodes]
if isinstance(slave, SSH):
- slaves = [ 'ssh://' + rap.remote_addr + ':' + v for v in slavevols ]
+ slaves = ['ssh://' + rap.remote_addr + ':' + v
+ for v in slavevols]
else:
slaves = slavevols
- workerspex = [ (brick['dir'], slaves[idx % len(slaves)]) for idx, brick in enumerate(mvol.bricks) if is_host_local(brick['host']) ]
+ workerspex = [(brick['dir'], slaves[idx % len(slaves)])
+ for idx, brick in enumerate(mvol.bricks)
+ if is_host_local(brick['host'])]
logging.info('worker specs: ' + repr(workerspex))
return workerspex, suuid
+
def monitor(*resources):
"""oh yeah, actually Monitor is used as singleton, too"""
return Monitor().multiplex(*distribute(*resources))
diff --git a/geo-replication/syncdaemon/repce.py b/geo-replication/syncdaemon/repce.py
index 755fb61df48..d7b17dda796 100644
--- a/geo-replication/syncdaemon/repce.py
+++ b/geo-replication/syncdaemon/repce.py
@@ -1,3 +1,13 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
import sys
import time
@@ -24,6 +34,7 @@ from syncdutils import Thread, select
pickle_proto = -1
repce_version = 1.0
+
def ioparse(i, o):
if isinstance(i, int):
i = os.fdopen(i)
@@ -34,6 +45,7 @@ def ioparse(i, o):
o = o.fileno()
return (i, o)
+
def send(out, *args):
"""pickle args and write out wholly in one syscall
@@ -43,12 +55,14 @@ def send(out, *args):
"""
os.write(out, pickle.dumps(args, pickle_proto))
+
def recv(inf):
"""load an object from input stream"""
return pickle.load(inf)
class RepceServer(object):
+
"""RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
... also our homebrewed RPC backend where the transport layer is
@@ -95,16 +109,17 @@ class RepceServer(object):
if rmeth == '__repce_version__':
res = repce_version
else:
- try:
- res = getattr(self.obj, rmeth)(*in_data[2:])
- except:
- res = sys.exc_info()[1]
- exc = True
- logging.exception("call failed: ")
+ try:
+ res = getattr(self.obj, rmeth)(*in_data[2:])
+ except:
+ res = sys.exc_info()[1]
+ exc = True
+ logging.exception("call failed: ")
send(self.out, rid, exc, res)
class RepceJob(object):
+
"""class representing message status we can use
for waiting on reply"""
@@ -137,6 +152,7 @@ class RepceJob(object):
class RepceClient(object):
+
"""RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
... also our homebrewed RPC backend where the transport layer is
@@ -148,7 +164,7 @@ class RepceClient(object):
def __init__(self, i, o):
self.inf, self.out = ioparse(i, o)
self.jtab = {}
- t = Thread(target = self.listen)
+ t = Thread(target=self.listen)
t.start()
def listen(self):
@@ -177,25 +193,31 @@ class RepceClient(object):
return rjob
def __call__(self, meth, *args):
- """RePCe client is callabe, calling it implements a synchronous remote call
+ """RePCe client is callabe, calling it implements a synchronous
+ remote call.
- We do a .push with a cbk which does a wakeup upon receiving anwser, then wait
- on the RepceJob.
+ We do a .push with a cbk which does a wakeup upon receiving anwser,
+ then wait on the RepceJob.
"""
- rjob = self.push(meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
+ rjob = self.push(
+ meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
exc, res = rjob.wait()
if exc:
- logging.error('call %s (%s) failed on peer with %s' % (repr(rjob), meth, str(type(res).__name__)))
+ logging.error('call %s (%s) failed on peer with %s' %
+ (repr(rjob), meth, str(type(res).__name__)))
raise res
logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
return res
class mprx(object):
- """method proxy, standard trick to implement rubyesque method_missing
- in Python
- A class is a closure factory, you know what I mean, or go read some SICP.
+ """method proxy, standard trick to implement rubyesque
+ method_missing in Python
+
+ A class is a closure factory, you know what I mean, or go read
+ some SICP.
"""
+
def __init__(self, ins, meth):
self.ins = ins
self.meth = meth
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 41add6fb287..2fb6b3078d8 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -1,3 +1,13 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import re
import os
import sys
@@ -12,27 +22,31 @@ import logging
import tempfile
import threading
import subprocess
-from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR, ENOTEMPTY, ESTALE, EINVAL
+from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP
+from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL
from select import error as SelectError
from gconf import gconf
import repce
from repce import RepceServer, RepceClient
-from master import gmaster_builder
+from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
-UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
+UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
+
def sup(x, *a, **kw):
"""a rubyesque "super" for python ;)
invoke caller method in parent class with given args.
"""
- return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw)
+ return getattr(super(type(x), x),
+ sys._getframe(1).f_code.co_name)(*a, **kw)
+
def desugar(ustr):
"""transform sugared url strings to standard <scheme>://<urlbody> form
@@ -57,15 +71,17 @@ def desugar(ustr):
ap = ap[1:]
return "file://" + ap
+
def gethostbyname(hnam):
"""gethostbyname wrapper"""
try:
return socket.gethostbyname(hnam)
except socket.gaierror:
ex = sys.exc_info()[1]
- raise GsyncdError("failed to resolve %s: %s" % \
+ raise GsyncdError("failed to resolve %s: %s" %
(hnam, ex.strerror))
+
def parse_url(ustr):
"""instantiate an url object by scheme-to-class dispatch
@@ -86,6 +102,7 @@ def parse_url(ustr):
class _MetaXattr(object):
+
"""singleton class, a lazy wrapper around the
libcxattr module
@@ -100,17 +117,19 @@ class _MetaXattr(object):
def __getattr__(self, meth):
from libcxattr import Xattr as LXattr
- xmeth = [ m for m in dir(LXattr) if m[0] != '_' ]
+ xmeth = [m for m in dir(LXattr) if m[0] != '_']
if not meth in xmeth:
return
for m in xmeth:
setattr(self, m, getattr(LXattr, m))
return getattr(self, meth)
+
class _MetaChangelog(object):
+
def __getattr__(self, meth):
from libgfchangelog import Changes as LChanges
- xmeth = [ m for m in dir(LChanges) if m[0] != '_' ]
+ xmeth = [m for m in dir(LChanges) if m[0] != '_']
if not meth in xmeth:
return
for m in xmeth:
@@ -122,6 +141,7 @@ Changes = _MetaChangelog()
class Popen(subprocess.Popen):
+
"""customized subclass of subprocess.Popen with a ring
buffer for children error output"""
@@ -129,11 +149,13 @@ class Popen(subprocess.Popen):
def init_errhandler(cls):
"""start the thread which handles children's error output"""
cls.errstore = {}
+
def tailer():
while True:
errstore = cls.errstore.copy()
try:
- poe, _ ,_ = select([po.stderr for po in errstore], [], [], 1)
+ poe, _, _ = select(
+ [po.stderr for po in errstore], [], [], 1)
except (ValueError, SelectError):
continue
for po in errstore:
@@ -154,12 +176,12 @@ class Popen(subprocess.Popen):
tots = len(l)
for lx in la:
tots += len(lx)
- while tots > 1<<20 and la:
+ while tots > 1 << 20 and la:
tots -= len(la.pop(0))
la.append(l)
finally:
po.lock.release()
- t = syncdutils.Thread(target = tailer)
+ t = syncdutils.Thread(target=tailer)
t.start()
cls.errhandler = t
@@ -189,8 +211,9 @@ class Popen(subprocess.Popen):
ex = sys.exc_info()[1]
if not isinstance(ex, OSError):
raise
- raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \
- (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno)))
+ raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
+ (args[0], errno.errorcode[ex.errno],
+ os.strerror(ex.errno)))
if kw.get('stderr') == subprocess.PIPE:
assert(getattr(self, 'errhandler', None))
self.errstore[self] = []
@@ -200,9 +223,10 @@ class Popen(subprocess.Popen):
filling = ""
if self.elines:
filling = ", saying:"
- logging.error("""command "%s" returned with %s%s""" % \
+ logging.error("""command "%s" returned with %s%s""" %
(" ".join(self.args), repr(self.returncode), filling))
lp = ''
+
def logerr(l):
logging.error(self.args[0] + "> " + l)
for l in self.elines:
@@ -217,9 +241,9 @@ class Popen(subprocess.Popen):
def errfail(self):
"""fail nicely if child did not terminate with success"""
self.errlog()
- syncdutils.finalize(exval = 1)
+ syncdutils.finalize(exval=1)
- def terminate_geterr(self, fail_on_err = True):
+ def terminate_geterr(self, fail_on_err=True):
"""kill child, finalize stderr harvesting (unregister
from errhandler, set up .elines), fail on error if
asked for
@@ -230,14 +254,14 @@ class Popen(subprocess.Popen):
finally:
self.lock.release()
elines = self.errstore.pop(self)
- if self.poll() == None:
+ if self.poll() is None:
self.terminate()
- if self.poll() == None:
+ if self.poll() is None:
time.sleep(0.1)
self.kill()
self.wait()
while True:
- if not select([self.stderr],[],[],0.1)[0]:
+ if not select([self.stderr], [], [], 0.1)[0]:
break
b = os.read(self.stderr.fileno(), 1024)
if b:
@@ -251,6 +275,7 @@ class Popen(subprocess.Popen):
class Server(object):
+
"""singleton implemening those filesystem access primitives
which are needed for geo-replication functionality
@@ -260,25 +285,28 @@ class Server(object):
GX_NSPACE_PFX = (privileged() and "trusted" or "system")
GX_NSPACE = GX_NSPACE_PFX + ".glusterfs"
- NTV_FMTSTR = "!" + "B"*19 + "II"
+ NTV_FMTSTR = "!" + "B" * 19 + "II"
FRGN_XTRA_FMT = "I"
FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
- GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
+ GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
- GFID_XATTR = 'trusted.gfid' # for backend gfid fetch, do not use GX_NSPACE_PFX
- GFID_FMTSTR = "!" + "B"*16
+ # for backend gfid fetch, do not use GX_NSPACE_PFX
+ GFID_XATTR = 'trusted.gfid'
+ GFID_FMTSTR = "!" + "B" * 16
local_path = ''
@classmethod
def _fmt_mknod(cls, l):
- return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l+1)
+ return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l + 1)
+
@classmethod
def _fmt_mkdir(cls, l):
- return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l+1)
+ return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l + 1)
+
@classmethod
def _fmt_symlink(cls, l1, l2):
- return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1+1, l2+1)
+ return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1)
def _pathguard(f):
"""decorator method that checks
@@ -289,6 +317,7 @@ class Server(object):
fc = funcode(f)
pi = list(fc.co_varnames).index('path')
+
def ff(*a):
path = a[pi]
ps = path.split('/')
@@ -308,7 +337,6 @@ class Server(object):
raise OSError(ENOTDIR, os.strerror(ENOTDIR))
return os.listdir(path)
-
@classmethod
@_pathguard
def lstat(cls, path):
@@ -325,7 +353,9 @@ class Server(object):
@_pathguard
def linkto_check(cls, path):
try:
- return not (Xattr.lgetxattr_buf(path, 'trusted.glusterfs.dht.linkto') == '')
+ return not (
+ Xattr.lgetxattr_buf(path,
+ 'trusted.glusterfs.dht.linkto') == '')
except (IOError, OSError):
ex = sys.exc_info()[1]
if ex.errno in (ENOENT, ENODATA):
@@ -333,13 +363,13 @@ class Server(object):
else:
raise
-
@classmethod
@_pathguard
def gfid(cls, path):
try:
buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16)
- m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)]))
+ m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(
+ ['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)]))
return '-'.join(m.groups())
except (IOError, OSError):
ex = sys.exc_info()[1]
@@ -350,7 +380,9 @@ class Server(object):
@classmethod
def gfid_mnt(cls, gfidpath):
- return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT])
+ return errno_wrap(Xattr.lgetxattr,
+ [gfidpath, 'glusterfs.gfid.string',
+ cls.GX_GFID_CANONICAL_LEN], [ENOENT])
@classmethod
@_pathguard
@@ -369,7 +401,7 @@ class Server(object):
for e in entries:
cls.purge(os.path.join(path, e))
"""
- me_also = entries == None
+ me_also = entries is None
if not entries:
try:
# if it's a symlink, prevent
@@ -435,7 +467,9 @@ class Server(object):
"""
try:
- return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8))
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid, 'xtime']))
+ return struct.unpack('!II', val, 8)
except OSError:
ex = sys.exc_info()[1]
if ex.errno in (ENOENT, ENODATA, ENOTDIR):
@@ -454,7 +488,9 @@ class Server(object):
"""
try:
- return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8))
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid, 'stime']))
+ return struct.unpack('!II', val, 8)
except OSError:
ex = sys.exc_info()[1]
if ex.errno in (ENOENT, ENODATA, ENOTDIR):
@@ -473,7 +509,9 @@ class Server(object):
"""
try:
- return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8))
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid, 'stime']))
+ return struct.unpack('!II', val, 8)
except OSError:
ex = sys.exc_info()[1]
if ex.errno in (ENOENT, ENODATA, ENOTDIR):
@@ -484,7 +522,8 @@ class Server(object):
@classmethod
def node_uuid(cls, path='.'):
try:
- uuid_l = Xattr.lgetxattr_buf(path, '.'.join([cls.GX_NSPACE, 'node-uuid']))
+ uuid_l = Xattr.lgetxattr_buf(
+ path, '.'.join([cls.GX_NSPACE, 'node-uuid']))
return uuid_l[:-1].split(' ')
except OSError:
raise
@@ -493,13 +532,17 @@ class Server(object):
@_pathguard
def set_stime(cls, path, uuid, mark):
"""set @mark as stime for @uuid on @path"""
- Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), struct.pack('!II', *mark))
+ Xattr.lsetxattr(
+ path, '.'.join([cls.GX_NSPACE, uuid, 'stime']),
+ struct.pack('!II', *mark))
@classmethod
@_pathguard
def set_xtime(cls, path, uuid, mark):
"""set @mark as xtime for @uuid on @path"""
- Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
+ Xattr.lsetxattr(
+ path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']),
+ struct.pack('!II', *mark))
@classmethod
@_pathguard
@@ -511,18 +554,22 @@ class Server(object):
on the brick (this method sets xtime on the
remote slave)
"""
- Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
+ Xattr.lsetxattr(
+ path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']),
+ struct.pack('!II', *mark))
@classmethod
def entry_ops(cls, entries):
pfx = gauxpfx()
logging.debug('entries: %s' % repr(entries))
# regular file
+
def entry_pack_reg(gf, bn, mo, uid, gid):
blen = len(bn)
return struct.pack(cls._fmt_mknod(blen),
uid, gid, gf, mo, bn,
stat.S_IMODE(mo), 0, umask())
+
def entry_pack_reg_stat(gf, bn, st):
blen = len(bn)
mo = st['mode']
@@ -531,18 +578,21 @@ class Server(object):
gf, mo, bn,
stat.S_IMODE(mo), 0, umask())
# mkdir
+
def entry_pack_mkdir(gf, bn, mo, uid, gid):
blen = len(bn)
return struct.pack(cls._fmt_mkdir(blen),
uid, gid, gf, mo, bn,
stat.S_IMODE(mo), umask())
- #symlink
+ # symlink
+
def entry_pack_symlink(gf, bn, lnk, st):
blen = len(bn)
llen = len(lnk)
return struct.pack(cls._fmt_symlink(blen, llen),
st['uid'], st['gid'],
gf, st['mode'], bn, lnk)
+
def entry_purge(entry, gfid):
# This is an extremely racy code and needs to be fixed ASAP.
# The GFID check here is to be sure that the pargfid/bname
@@ -574,9 +624,11 @@ class Server(object):
else:
break
elif op in ['CREATE', 'MKNOD']:
- blob = entry_pack_reg(gfid, bname, e['mode'], e['uid'], e['uid'])
+ blob = entry_pack_reg(
+ gfid, bname, e['mode'], e['uid'], e['uid'])
elif op == 'MKDIR':
- blob = entry_pack_mkdir(gfid, bname, e['mode'], e['uid'], e['uid'])
+ blob = entry_pack_mkdir(
+ gfid, bname, e['mode'], e['uid'], e['uid'])
elif op == 'LINK':
slink = os.path.join(pfx, gfid)
st = lstat(slink)
@@ -596,21 +648,23 @@ class Server(object):
else:
errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])
if blob:
- errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [EEXIST], [ENOENT, ESTALE, EINVAL])
+ errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile',
+ blob],
+ [EEXIST], [ENOENT, ESTALE, EINVAL])
@classmethod
def meta_ops(cls, meta_entries):
logging.debug('Meta-entries: %s' % repr(meta_entries))
for e in meta_entries:
mode = e['stat']['mode']
- uid = e['stat']['uid']
- gid = e['stat']['gid']
- go = e['go']
+ uid = e['stat']['uid']
+ gid = e['stat']['gid']
+ go = e['go']
errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL])
errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL])
@classmethod
- def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0):
+ def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0):
Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
@classmethod
@@ -649,6 +703,7 @@ class Server(object):
return os.getpid()
last_keep_alive = 0
+
@classmethod
def keep_alive(cls, dct):
"""process keepalive messages.
@@ -662,9 +717,12 @@ class Server(object):
if dct:
key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])
val = struct.pack(cls.FRGN_FMTSTR,
- *(dct['version'] +
- tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) +
- (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],)))
+ *(dct['version'] +
+ tuple(int(x, 16)
+ for x in re.findall('(?:[\da-f]){2}',
+ dct['uuid'])) +
+ (dct['retval'],) + dct['volume_mark'][0:2] + (
+ dct['timeout'],)))
Xattr.lsetxattr('.', key, val)
cls.last_keep_alive += 1
return cls.last_keep_alive
@@ -676,6 +734,7 @@ class Server(object):
class SlaveLocal(object):
+
"""mix-in class to implement some factes of a slave server
("mix-in" is sort of like "abstract class", ie. it's not
@@ -697,9 +756,11 @@ class SlaveLocal(object):
"""
if boolify(gconf.use_rsync_xattrs) and not privileged():
- raise GsyncdError("using rsync for extended attributes is not supported")
+ raise GsyncdError(
+ "using rsync for extended attributes is not supported")
- repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
+ repce = RepceServer(
+ self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
t = syncdutils.Thread(target=lambda: (repce.service_loop(),
syncdutils.finalize()))
t.start()
@@ -709,12 +770,16 @@ class SlaveLocal(object):
lp = self.server.last_keep_alive
time.sleep(int(gconf.timeout))
if lp == self.server.last_keep_alive:
- logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout))
+ logging.info(
+ "connection inactive for %d seconds, stopping" %
+ int(gconf.timeout))
break
else:
select((), (), ())
+
class SlaveRemote(object):
+
"""mix-in class to implement an interface to a remote slave"""
def connect_remote(self, rargs=[], **opts):
@@ -731,9 +796,11 @@ class SlaveRemote(object):
extra_opts += ['--session-owner', so]
if boolify(gconf.use_rsync_xattrs):
extra_opts.append('--use-rsync-xattrs')
- po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + \
- ['-N', '--listen', '--timeout', str(gconf.timeout), slave],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts +
+ ['-N', '--listen', '--timeout', str(gconf.timeout),
+ slave],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
gconf.transport = po
return self.start_fd_client(po.stdout, po.stdin, **opts)
@@ -752,7 +819,9 @@ class SlaveRemote(object):
for k, v in da0[i].iteritems():
da1[i][k] = int(v)
if da1[0] != da1[1]:
- raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv))
+ raise GsyncdError(
+ "RePCe major version mismatch: local %s, remote %s" %
+ (exrv, rv))
def rsync(self, files, *args):
"""invoke rsync"""
@@ -760,17 +829,19 @@ class SlaveRemote(object):
raise GsyncdError("no files to sync")
logging.debug("files: " + ", ".join(files))
argv = gconf.rsync_command.split() + \
- ['-avR0', '--inplace', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \
- gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \
- ['.'] + list(args)
- po = Popen(argv, stdin=subprocess.PIPE,stderr=subprocess.PIPE)
+ ['-avR0', '--inplace', '--files-from=-', '--super',
+ '--stats', '--numeric-ids', '--no-implied-dirs'] + \
+ gconf.rsync_options.split() + \
+ (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \
+ ['.'] + list(args)
+ po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
for f in files:
po.stdin.write(f)
po.stdin.write('\0')
po.stdin.close()
po.wait()
- po.terminate_geterr(fail_on_err = False)
+ po.terminate_geterr(fail_on_err=False)
return po
@@ -784,8 +855,10 @@ class SlaveRemote(object):
logging.debug("files: " + ", ".join(files))
(host, rdir) = slaveurl.split(':')
tar_cmd = ["tar", "-cf", "-", "--files-from", "-"]
- ssh_cmd = gconf.ssh_command_tar.split() + [host, "tar", "--overwrite", "-xf", "-", "-C", rdir]
- p0 = Popen(tar_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ ssh_cmd = gconf.ssh_command_tar.split() + \
+ [host, "tar", "--overwrite", "-xf", "-", "-C", rdir]
+ p0 = Popen(tar_cmd, stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE, stderr=subprocess.PIPE)
p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE)
for f in files:
p0.stdin.write(f)
@@ -795,14 +868,16 @@ class SlaveRemote(object):
# wait() for tar to terminate, collecting any errors, further
# waiting for transfer to complete
p0.wait()
- p0.terminate_geterr(fail_on_err = False)
+ p0.terminate_geterr(fail_on_err=False)
p1.wait()
- p1.terminate_geterr(fail_on_err = False)
+ p1.terminate_geterr(fail_on_err=False)
return p1
+
class AbstractUrl(object):
+
"""abstract base class for url scheme classes"""
def __init__(self, path, pattern):
@@ -839,6 +914,7 @@ class AbstractUrl(object):
class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
+
"""scheme class for file:// urls
can be used to represent a file slave server
@@ -847,6 +923,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
"""
class FILEServer(Server):
+
"""included server flavor"""
pass
@@ -864,6 +941,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
+
"""scheme class for gluster:// urls
can be used to represent a gluster slave server
@@ -874,21 +952,24 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
"""
class GLUSTERServer(Server):
+
"server enhancements for a glusterfs backend"""
@classmethod
- def _attr_unpack_dict(cls, xattr, extra_fields = ''):
+ def _attr_unpack_dict(cls, xattr, extra_fields=''):
"""generic volume mark fetching/parsing backed"""
fmt_string = cls.NTV_FMTSTR + extra_fields
buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
vm = struct.unpack(fmt_string, buf)
- m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]]))
+ m = re.match(
+ '(.{8})(.{4})(.{4})(.{4})(.{12})',
+ "".join(['%02x' % x for x in vm[2:18]]))
uuid = '-'.join(m.groups())
- volinfo = { 'version': vm[0:2],
- 'uuid' : uuid,
- 'retval' : vm[18],
- 'volume_mark': vm[19:21],
- }
+ volinfo = {'version': vm[0:2],
+ 'uuid': uuid,
+ 'retval': vm[18],
+ 'volume_mark': vm[19:21],
+ }
if extra_fields:
return volinfo, vm[-len(extra_fields):]
else:
@@ -904,7 +985,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
now = int(time.time())
if x[0] > now:
- logging.debug("volinfo[%s] expires: %d (%d sec later)" % \
+ logging.debug("volinfo[%s] expires: %d "
+ "(%d sec later)" %
(d['uuid'], x[0], x[0] - now))
d['timeout'] = x[0]
dict_list.append(d)
@@ -919,7 +1001,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def native_volume_info(cls):
"""get the native volume mark of the underlying gluster volume"""
try:
- return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark']))
+ return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE,
+ 'volume-mark']))
except OSError:
ex = sys.exc_info()[1]
if ex.errno != ENODATA:
@@ -936,9 +1019,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def can_connect_to(self, remote):
"""determine our position in the connectibility matrix"""
return not remote or \
- (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER))
+ (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER))
class Mounter(object):
+
"""Abstract base class for mounter backends"""
def __init__(self, params):
@@ -1003,7 +1087,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
if not t.isAlive():
break
if time.time() >= tlim:
- syncdutils.finalize(exval = 1)
+ syncdutils.finalize(exval=1)
time.sleep(1)
os.close(mpo)
_, rv = syncdutils.waitpid(mh, 0)
@@ -1011,7 +1095,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \
(os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0)
logging.warn('stale mount possibly left behind on ' + d)
- raise GsyncdError("cleaning up temp mountpoint %s failed with status %d" % \
+ raise GsyncdError("cleaning up temp mountpoint %s "
+ "failed with status %d" %
(d, rv))
else:
rv = 0
@@ -1035,7 +1120,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
assert(mntpt)
if mounted:
po = self.umount_l(mntpt)
- po.terminate_geterr(fail_on_err = False)
+ po.terminate_geterr(fail_on_err=False)
if po.returncode != 0:
po.errlog()
rv = po.returncode
@@ -1047,6 +1132,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
logging.debug('auxiliary glusterfs mount prepared')
class DirectMounter(Mounter):
+
"""mounter backend which calls mount(8), umount(8) directly"""
mountkw = {'stderr': subprocess.PIPE}
@@ -1057,15 +1143,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
return ['umount', '-l', d]
def make_mount_argv(self):
- self.mntpt = tempfile.mkdtemp(prefix = 'gsyncd-aux-mount-')
- return [self.get_glusterprog()] + ['--' + p for p in self.params] + [self.mntpt]
+ self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-')
+ return [self.get_glusterprog()] + \
+ ['--' + p for p in self.params] + [self.mntpt]
- def cleanup_mntpt(self, mntpt = None):
+ def cleanup_mntpt(self, mntpt=None):
if not mntpt:
mntpt = self.mntpt
os.rmdir(mntpt)
class MountbrokerMounter(Mounter):
+
"""mounter backend using the mountbroker gluster service"""
mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE}
@@ -1073,7 +1161,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
@classmethod
def make_cli_argv(cls):
- return [cls.get_glusterprog()] + gconf.gluster_cli_options.split() + ['system::']
+ return [cls.get_glusterprog()] + \
+ gconf.gluster_cli_options.split() + ['system::']
@classmethod
def make_umount_argv(cls, d):
@@ -1081,7 +1170,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def make_mount_argv(self, label):
return self.make_cli_argv() + \
- ['mount', label, 'user-map-root=' + syncdutils.getusername()] + self.params
+ ['mount', label, 'user-map-root=' +
+ syncdutils.getusername()] + self.params
def handle_mounter(self, po):
self.mntpt = po.stdout.readline()[:-1]
@@ -1106,9 +1196,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
label = syncdutils.getusername()
mounter = label and self.MountbrokerMounter or self.DirectMounter
params = gconf.gluster_params.split() + \
- (gconf.gluster_log_level and ['log-level=' + gconf.gluster_log_level] or []) + \
- ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + self.host,
- 'volfile-id=' + self.volume, 'client-pid=-1']
+ (gconf.gluster_log_level and ['log-level=' +
+ gconf.gluster_log_level] or []) + \
+ ['log-file=' + gconf.gluster_log_file, 'volfile-server=' +
+ self.host, 'volfile-id=' + self.volume, 'client-pid=-1']
mounter(params).inhibit(*[l for l in [label] if l])
def connect_remote(self, *a, **kw):
@@ -1116,8 +1207,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
self.slavedir = "/proc/%d/cwd" % self.server.pid()
def gmaster_instantiate_tuple(self, slave):
- """return a tuple of the 'one shot' and the 'main crawl' class instance"""
- return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave))
+ """return a tuple of the 'one shot' and the 'main crawl'
+ class instance"""
+ return (gmaster_builder('xsync')(self, slave),
+ gmaster_builder()(self, slave))
def service_loop(self, *args):
"""enter service loop
@@ -1133,6 +1226,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
class brickserver(FILE.FILEServer):
local_path = gconf.local_path
aggregated = self.server
+
@classmethod
def entries(cls, path):
e = super(brickserver, cls).entries(path)
@@ -1143,14 +1237,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
except ValueError:
pass
return e
+
@classmethod
def lstat(cls, e):
""" path based backend stat """
return super(brickserver, cls).lstat(e)
+
@classmethod
def gfid(cls, e):
""" path based backend gfid fetch """
return super(brickserver, cls).gfid(e)
+
@classmethod
def linkto_check(cls, e):
return super(brickserver, cls).linkto_check(e)
@@ -1158,9 +1255,25 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
# define {,set_}xtime in slave, thus preempting
# the call to remote, so that it takes data from
# the local brick
- slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server)
- slave.server.stime = types.MethodType(lambda _self, path, uuid: brickserver.stime(path, uuid + '.' + gconf.slave_id), slave.server)
- slave.server.set_stime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_stime(path, uuid + '.' + gconf.slave_id, mark), slave.server)
+ slave.server.xtime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.xtime(path,
+ uuid + '.' + gconf.slave_id)
+ ),
+ slave.server)
+ slave.server.stime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.stime(path,
+ uuid + '.' + gconf.slave_id)
+ ),
+ slave.server)
+ slave.server.set_stime = types.MethodType(
+ lambda _self, path, uuid, mark: (
+ brickserver.set_stime(path,
+ uuid + '.' + gconf.slave_id,
+ mark)
+ ),
+ slave.server)
(g1, g2) = self.gmaster_instantiate_tuple(slave)
g1.master.server = brickserver
g2.master.server = brickserver
@@ -1186,6 +1299,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
class SSH(AbstractUrl, SlaveRemote):
+
"""scheme class for ssh:// urls
interface to remote slave on master side
@@ -1194,7 +1308,9 @@ class SSH(AbstractUrl, SlaveRemote):
def __init__(self, path):
self.remote_addr, inner_url = sup(self, path,
- '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ]))
+ '^((?:%s@)?%s):(.+)' %
+ tuple([r.pattern
+ for r in (UserRX, HostRX)]))
self.inner_rsc = parse_url(inner_url)
self.volume = inner_url[1:]
@@ -1262,7 +1378,8 @@ class SSH(AbstractUrl, SlaveRemote):
self.inner_rsc.url)
deferred = go_daemon == 'postconn'
- ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr],
+ ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args +
+ [self.remote_addr],
slave=self.inner_rsc.url, deferred=deferred)
if deferred:
@@ -1285,7 +1402,8 @@ class SSH(AbstractUrl, SlaveRemote):
return 'should'
def rsync(self, files):
- return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args),
+ return sup(self, files, '-e',
+ " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args),
*(gconf.rsync_ssh_options.split() + [self.slaveurl]))
def tarssh(self, files):
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 1b5684c6d0c..822d919ecb1 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -1,3 +1,13 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
import sys
import pwd
@@ -7,9 +17,9 @@ import shutil
import logging
import socket
from threading import Lock, Thread as baseThread
-from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, ENOENT, EPERM, ESTALE, errorcode
-from signal import signal, SIGTERM, SIGKILL
-from time import sleep
+from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
+from errno import EINTR, ENOENT, EPERM, ESTALE, errorcode
+from signal import signal, SIGTERM
import select as oselect
from os import waitpid as owaitpid
@@ -37,25 +47,29 @@ except ImportError:
_CL_AUX_GFID_PFX = ".gfid/"
GF_OP_RETRIES = 20
+
def escape(s):
"""the chosen flavor of string escaping, used all over
to turn whatever data to creatable representation"""
return urllib.quote_plus(s)
+
def unescape(s):
"""inverse of .escape"""
return urllib.unquote_plus(s)
+
def norm(s):
if s:
return s.replace('-', '_')
-def update_file(path, updater, merger = lambda f: True):
+
+def update_file(path, updater, merger=lambda f: True):
"""update a file in a transaction-like manner"""
fr = fw = None
try:
- fd = os.open(path, os.O_CREAT|os.O_RDWR)
+ fd = os.open(path, os.O_CREAT | os.O_RDWR)
try:
fr = os.fdopen(fd, 'r+b')
except:
@@ -66,7 +80,7 @@ def update_file(path, updater, merger = lambda f: True):
return
tmpp = path + '.tmp.' + str(os.getpid())
- fd = os.open(tmpp, os.O_CREAT|os.O_EXCL|os.O_WRONLY)
+ fd = os.open(tmpp, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
try:
fw = os.fdopen(fd, 'wb', 0)
except:
@@ -80,29 +94,31 @@ def update_file(path, updater, merger = lambda f: True):
if fx:
fx.close()
+
def create_manifest(fname, content):
"""
Create manifest file for SSH Control Path
"""
fd = None
try:
- fd = os.open(fname, os.O_CREAT|os.O_RDWR)
+ fd = os.open(fname, os.O_CREAT | os.O_RDWR)
try:
os.write(fd, content)
except:
os.close(fd)
raise
finally:
- if fd != None:
+ if fd is not None:
os.close(fd)
+
def setup_ssh_ctl(ctld, remote_addr, resource_url):
"""
Setup GConf ssh control path parameters
"""
gconf.ssh_ctl_dir = ctld
content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr,
- resource_url)
+ resource_url)
content_md5 = md5hex(content)
fname = os.path.join(gconf.ssh_ctl_dir,
"%s.mft" % content_md5)
@@ -112,16 +128,17 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url):
"%s.sock" % content_md5)
gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path]
+
def grabfile(fname, content=None):
"""open @fname + contest for its fcntl lock
@content: if given, set the file content to it
"""
# damn those messy open() mode codes
- fd = os.open(fname, os.O_CREAT|os.O_RDWR)
+ fd = os.open(fname, os.O_CREAT | os.O_RDWR)
f = os.fdopen(fd, 'r+b', 0)
try:
- fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB)
+ fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except:
ex = sys.exc_info()[1]
f.close()
@@ -139,6 +156,7 @@ def grabfile(fname, content=None):
gconf.permanent_handles.append(f)
return f
+
def grabpidfile(fname=None, setpid=True):
""".grabfile customization for pid files"""
if not fname:
@@ -150,6 +168,7 @@ def grabpidfile(fname=None, setpid=True):
final_lock = Lock()
+
def finalize(*a, **kw):
"""all those messy final steps we go trough upon termination
@@ -169,7 +188,7 @@ def finalize(*a, **kw):
if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:
# child has terminated
rm_pidf = True
- break;
+ break
time.sleep(0.1)
if rm_pidf:
try:
@@ -194,6 +213,7 @@ def finalize(*a, **kw):
sys.stderr.flush()
os._exit(kw.get('exval', 0))
+
def log_raise_exception(excont):
"""top-level exception handler
@@ -218,20 +238,27 @@ def log_raise_exception(excont):
logging.error(exc.args[0])
sys.stderr.write('failure: ' + exc.args[0] + '\n')
elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \
- ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \
- exc.errno == EPIPE):
+ ((isinstance(exc, OSError) or isinstance(exc, IOError)) and
+ exc.errno == EPIPE):
logging.error('connection to peer is broken')
if hasattr(gconf, 'transport'):
gconf.transport.wait()
if gconf.transport.returncode == 127:
logging.warn("!!!!!!!!!!!!!")
- logging.warn('!!! getting "No such file or directory" errors '
- "is most likely due to MISCONFIGURATION, please consult "
- "https://access.redhat.com/site/documentation/en-US/Red_Hat_Storage/2.1/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html")
+ logging.warn('!!! getting "No such file or directory" '
+ "errors is most likely due to "
+ "MISCONFIGURATION"
+ ", please consult https://access.redhat.com"
+ "/site/documentation/en-US/Red_Hat_Storage"
+ "/2.1/html/Administration_Guide"
+ "/chap-User_Guide-Geo_Rep-Preparation-"
+ "Settingup_Environment.html")
logging.warn("!!!!!!!!!!!!!")
gconf.transport.terminate_geterr()
- elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED):
- logging.error('glusterfs session went down [%s]', errorcode[exc.errno])
+ elif isinstance(exc, OSError) and exc.errno in (ENOTCONN,
+ ECONNABORTED):
+ logging.error('glusterfs session went down [%s]',
+ errorcode[exc.errno])
else:
logtag = "FAIL"
if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG):
@@ -244,46 +271,54 @@ def log_raise_exception(excont):
class FreeObject(object):
+
"""wildcard class for which any attribute can be set"""
def __init__(self, **kw):
- for k,v in kw.items():
+ for k, v in kw.items():
setattr(self, k, v)
+
class Thread(baseThread):
+
"""thread class flavor for gsyncd
- always a daemon thread
- force exit for whole program if thread
function coughs up an exception
"""
+
def __init__(self, *a, **kw):
tf = kw.get('target')
if tf:
def twrap(*aa):
- excont = FreeObject(exval = 0)
+ excont = FreeObject(exval=0)
try:
tf(*aa)
except:
try:
log_raise_exception(excont)
finally:
- finalize(exval = excont.exval)
+ finalize(exval=excont.exval)
kw['target'] = twrap
baseThread.__init__(self, *a, **kw)
self.setDaemon(True)
+
class GsyncdError(Exception):
pass
-def getusername(uid = None):
- if uid == None:
+
+def getusername(uid=None):
+ if uid is None:
uid = os.geteuid()
return pwd.getpwuid(uid).pw_name
+
def privileged():
return os.geteuid() == 0
+
def boolify(s):
"""
Generic string to boolean converter
@@ -294,7 +329,7 @@ def boolify(s):
- False if it's in false_list
- Warn if it's not present in either and return False
"""
- true_list = ['true', 'yes', '1', 'on']
+ true_list = ['true', 'yes', '1', 'on']
false_list = ['false', 'no', '0', 'off']
if isinstance(s, bool):
@@ -305,10 +340,12 @@ def boolify(s):
if lstr in true_list:
rv = True
elif not lstr in false_list:
- logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s))
+ logging.warn("Unknown string (%s) in string to boolean conversion "
+ "defaulting to False\n" % (s))
return rv
+
def eintr_wrap(func, exc, *a):
"""
wrapper around syscalls resilient to interrupt caused
@@ -322,19 +359,24 @@ def eintr_wrap(func, exc, *a):
if not ex.args[0] == EINTR:
raise
+
def select(*a):
return eintr_wrap(oselect.select, oselect.error, *a)
-def waitpid (*a):
+
+def waitpid(*a):
return eintr_wrap(owaitpid, OSError, *a)
+
def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})):
signal(SIGTERM, hook)
+
def is_host_local(host):
locaddr = False
for ai in socket.getaddrinfo(host, None):
- # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators/mgmt/glusterd/src/glusterd-utils.c#L125
+ # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators
+ # /mgmt/glusterd/src/glusterd-utils.c#L125
if ai[0] == socket.AF_INET:
if ai[-1][0].split(".")[0] == "127":
locaddr = True
@@ -358,8 +400,8 @@ def is_host_local(host):
f = open("/proc/sys/net/ipv4/ip_nonlocal_bind")
if int(f.read()) != 0:
raise GsyncdError(
- "non-local bind is set and not allowed to create raw sockets, "
- "cannot determine if %s is local" % host)
+ "non-local bind is set and not allowed to create "
+ "raw sockets, cannot determine if %s is local" % host)
s = socket.socket(ai[0], socket.SOCK_DGRAM)
finally:
if f:
@@ -373,6 +415,7 @@ def is_host_local(host):
s.close()
return locaddr
+
def funcode(f):
fc = getattr(f, 'func_code', None)
if not fc:
@@ -380,32 +423,40 @@ def funcode(f):
fc = f.__code__
return fc
+
def memoize(f):
fc = funcode(f)
fn = fc.co_name
+
def ff(self, *a, **kw):
rv = getattr(self, '_' + fn, None)
- if rv == None:
+ if rv is None:
rv = f(self, *a, **kw)
setattr(self, '_' + fn, rv)
return rv
return ff
+
def umask():
return os.umask(0)
+
def entry2pb(e):
return e.rsplit('/', 1)
+
def gauxpfx():
return _CL_AUX_GFID_PFX
+
def md5hex(s):
return md5(s).hexdigest()
+
def selfkill(sig=SIGTERM):
os.kill(os.getpid(), sig)
+
def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):
""" wrapper around calls resilient to errnos.
retry in case of ESTALE by default.
@@ -427,6 +478,7 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):
return
time.sleep(0.250) # retry the call
+
def lstat(e):
try:
return os.lstat(e)