summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
authorJeff Darcy <jdarcy@redhat.com>2014-04-22 15:37:09 +0000
committerJeff Darcy <jdarcy@redhat.com>2014-04-22 15:37:09 +0000
commita827c5eab32a43ade5551259ea56a6a1af7e861b (patch)
treee6707df68f72baa8645210ba931272285116ad85 /geo-replication/syncdaemon/resource.py
parent46d333783a968ab39e0beade9c7a1eec8035f8b1 (diff)
parent99bfc2a2a1689da1e173cb2f8ef54d2b09ef3a5d (diff)
Merge branch 'upstream'
Conflicts: glusterfs.spec.in xlators/mgmt/glusterd/src/Makefile.am xlators/mgmt/glusterd/src/glusterd-utils.c xlators/mgmt/glusterd/src/glusterd.h Change-Id: I27bdcf42b003cfc42d6ad981bd2bf8180176806d
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py309
1 files changed, 215 insertions, 94 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 41add6fb2..e3cf33ffd 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -1,3 +1,13 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import re
import os
import sys
@@ -12,27 +22,31 @@ import logging
import tempfile
import threading
import subprocess
-from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR, ENOTEMPTY, ESTALE, EINVAL
+from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP
+from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL
from select import error as SelectError
from gconf import gconf
import repce
from repce import RepceServer, RepceClient
-from master import gmaster_builder
+from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
-UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
+UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
+
def sup(x, *a, **kw):
"""a rubyesque "super" for python ;)
invoke caller method in parent class with given args.
"""
- return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw)
+ return getattr(super(type(x), x),
+ sys._getframe(1).f_code.co_name)(*a, **kw)
+
def desugar(ustr):
"""transform sugared url strings to standard <scheme>://<urlbody> form
@@ -57,15 +71,17 @@ def desugar(ustr):
ap = ap[1:]
return "file://" + ap
+
def gethostbyname(hnam):
"""gethostbyname wrapper"""
try:
return socket.gethostbyname(hnam)
except socket.gaierror:
ex = sys.exc_info()[1]
- raise GsyncdError("failed to resolve %s: %s" % \
+ raise GsyncdError("failed to resolve %s: %s" %
(hnam, ex.strerror))
+
def parse_url(ustr):
"""instantiate an url object by scheme-to-class dispatch
@@ -86,6 +102,7 @@ def parse_url(ustr):
class _MetaXattr(object):
+
"""singleton class, a lazy wrapper around the
libcxattr module
@@ -100,17 +117,19 @@ class _MetaXattr(object):
def __getattr__(self, meth):
from libcxattr import Xattr as LXattr
- xmeth = [ m for m in dir(LXattr) if m[0] != '_' ]
+ xmeth = [m for m in dir(LXattr) if m[0] != '_']
if not meth in xmeth:
return
for m in xmeth:
setattr(self, m, getattr(LXattr, m))
return getattr(self, meth)
+
class _MetaChangelog(object):
+
def __getattr__(self, meth):
from libgfchangelog import Changes as LChanges
- xmeth = [ m for m in dir(LChanges) if m[0] != '_' ]
+ xmeth = [m for m in dir(LChanges) if m[0] != '_']
if not meth in xmeth:
return
for m in xmeth:
@@ -122,6 +141,7 @@ Changes = _MetaChangelog()
class Popen(subprocess.Popen):
+
"""customized subclass of subprocess.Popen with a ring
buffer for children error output"""
@@ -129,11 +149,13 @@ class Popen(subprocess.Popen):
def init_errhandler(cls):
"""start the thread which handles children's error output"""
cls.errstore = {}
+
def tailer():
while True:
errstore = cls.errstore.copy()
try:
- poe, _ ,_ = select([po.stderr for po in errstore], [], [], 1)
+ poe, _, _ = select(
+ [po.stderr for po in errstore], [], [], 1)
except (ValueError, SelectError):
continue
for po in errstore:
@@ -154,12 +176,12 @@ class Popen(subprocess.Popen):
tots = len(l)
for lx in la:
tots += len(lx)
- while tots > 1<<20 and la:
+ while tots > 1 << 20 and la:
tots -= len(la.pop(0))
la.append(l)
finally:
po.lock.release()
- t = syncdutils.Thread(target = tailer)
+ t = syncdutils.Thread(target=tailer)
t.start()
cls.errhandler = t
@@ -189,8 +211,9 @@ class Popen(subprocess.Popen):
ex = sys.exc_info()[1]
if not isinstance(ex, OSError):
raise
- raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \
- (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno)))
+ raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
+ (args[0], errno.errorcode[ex.errno],
+ os.strerror(ex.errno)))
if kw.get('stderr') == subprocess.PIPE:
assert(getattr(self, 'errhandler', None))
self.errstore[self] = []
@@ -200,9 +223,10 @@ class Popen(subprocess.Popen):
filling = ""
if self.elines:
filling = ", saying:"
- logging.error("""command "%s" returned with %s%s""" % \
+ logging.error("""command "%s" returned with %s%s""" %
(" ".join(self.args), repr(self.returncode), filling))
lp = ''
+
def logerr(l):
logging.error(self.args[0] + "> " + l)
for l in self.elines:
@@ -217,9 +241,9 @@ class Popen(subprocess.Popen):
def errfail(self):
"""fail nicely if child did not terminate with success"""
self.errlog()
- syncdutils.finalize(exval = 1)
+ syncdutils.finalize(exval=1)
- def terminate_geterr(self, fail_on_err = True):
+ def terminate_geterr(self, fail_on_err=True):
"""kill child, finalize stderr harvesting (unregister
from errhandler, set up .elines), fail on error if
asked for
@@ -230,14 +254,14 @@ class Popen(subprocess.Popen):
finally:
self.lock.release()
elines = self.errstore.pop(self)
- if self.poll() == None:
+ if self.poll() is None:
self.terminate()
- if self.poll() == None:
+ if self.poll() is None:
time.sleep(0.1)
self.kill()
self.wait()
while True:
- if not select([self.stderr],[],[],0.1)[0]:
+ if not select([self.stderr], [], [], 0.1)[0]:
break
b = os.read(self.stderr.fileno(), 1024)
if b:
@@ -251,6 +275,7 @@ class Popen(subprocess.Popen):
class Server(object):
+
"""singleton implemening those filesystem access primitives
which are needed for geo-replication functionality
@@ -260,25 +285,28 @@ class Server(object):
GX_NSPACE_PFX = (privileged() and "trusted" or "system")
GX_NSPACE = GX_NSPACE_PFX + ".glusterfs"
- NTV_FMTSTR = "!" + "B"*19 + "II"
+ NTV_FMTSTR = "!" + "B" * 19 + "II"
FRGN_XTRA_FMT = "I"
FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
- GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
+ GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0'
- GFID_XATTR = 'trusted.gfid' # for backend gfid fetch, do not use GX_NSPACE_PFX
- GFID_FMTSTR = "!" + "B"*16
+ # for backend gfid fetch, do not use GX_NSPACE_PFX
+ GFID_XATTR = 'trusted.gfid'
+ GFID_FMTSTR = "!" + "B" * 16
local_path = ''
@classmethod
def _fmt_mknod(cls, l):
- return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l+1)
+ return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l + 1)
+
@classmethod
def _fmt_mkdir(cls, l):
- return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l+1)
+ return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l + 1)
+
@classmethod
def _fmt_symlink(cls, l1, l2):
- return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1+1, l2+1)
+ return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1)
def _pathguard(f):
"""decorator method that checks
@@ -289,6 +317,7 @@ class Server(object):
fc = funcode(f)
pi = list(fc.co_varnames).index('path')
+
def ff(*a):
path = a[pi]
ps = path.split('/')
@@ -308,7 +337,6 @@ class Server(object):
raise OSError(ENOTDIR, os.strerror(ENOTDIR))
return os.listdir(path)
-
@classmethod
@_pathguard
def lstat(cls, path):
@@ -325,7 +353,9 @@ class Server(object):
@_pathguard
def linkto_check(cls, path):
try:
- return not (Xattr.lgetxattr_buf(path, 'trusted.glusterfs.dht.linkto') == '')
+ return not (
+ Xattr.lgetxattr_buf(path,
+ 'trusted.glusterfs.dht.linkto') == '')
except (IOError, OSError):
ex = sys.exc_info()[1]
if ex.errno in (ENOENT, ENODATA):
@@ -333,13 +363,13 @@ class Server(object):
else:
raise
-
@classmethod
@_pathguard
def gfid(cls, path):
try:
buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16)
- m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)]))
+ m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(
+ ['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)]))
return '-'.join(m.groups())
except (IOError, OSError):
ex = sys.exc_info()[1]
@@ -350,7 +380,9 @@ class Server(object):
@classmethod
def gfid_mnt(cls, gfidpath):
- return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT])
+ return errno_wrap(Xattr.lgetxattr,
+ [gfidpath, 'glusterfs.gfid.string',
+ cls.GX_GFID_CANONICAL_LEN], [ENOENT])
@classmethod
@_pathguard
@@ -369,7 +401,7 @@ class Server(object):
for e in entries:
cls.purge(os.path.join(path, e))
"""
- me_also = entries == None
+ me_also = entries is None
if not entries:
try:
# if it's a symlink, prevent
@@ -435,7 +467,10 @@ class Server(object):
"""
try:
- return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8))
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid, 'xtime']),
+ 8)
+ return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
if ex.errno in (ENOENT, ENODATA, ENOTDIR):
@@ -454,7 +489,10 @@ class Server(object):
"""
try:
- return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8))
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid, 'stime']),
+ 8)
+ return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
if ex.errno in (ENOENT, ENODATA, ENOTDIR):
@@ -473,7 +511,10 @@ class Server(object):
"""
try:
- return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8))
+ val = Xattr.lgetxattr(path,
+ '.'.join([cls.GX_NSPACE, uuid, 'stime']),
+ 8)
+ return struct.unpack('!II', val)
except OSError:
ex = sys.exc_info()[1]
if ex.errno in (ENOENT, ENODATA, ENOTDIR):
@@ -484,7 +525,8 @@ class Server(object):
@classmethod
def node_uuid(cls, path='.'):
try:
- uuid_l = Xattr.lgetxattr_buf(path, '.'.join([cls.GX_NSPACE, 'node-uuid']))
+ uuid_l = Xattr.lgetxattr_buf(
+ path, '.'.join([cls.GX_NSPACE, 'node-uuid']))
return uuid_l[:-1].split(' ')
except OSError:
raise
@@ -493,13 +535,17 @@ class Server(object):
@_pathguard
def set_stime(cls, path, uuid, mark):
"""set @mark as stime for @uuid on @path"""
- Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), struct.pack('!II', *mark))
+ Xattr.lsetxattr(
+ path, '.'.join([cls.GX_NSPACE, uuid, 'stime']),
+ struct.pack('!II', *mark))
@classmethod
@_pathguard
def set_xtime(cls, path, uuid, mark):
"""set @mark as xtime for @uuid on @path"""
- Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
+ Xattr.lsetxattr(
+ path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']),
+ struct.pack('!II', *mark))
@classmethod
@_pathguard
@@ -511,18 +557,22 @@ class Server(object):
on the brick (this method sets xtime on the
remote slave)
"""
- Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
+ Xattr.lsetxattr(
+ path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']),
+ struct.pack('!II', *mark))
@classmethod
def entry_ops(cls, entries):
pfx = gauxpfx()
logging.debug('entries: %s' % repr(entries))
# regular file
+
def entry_pack_reg(gf, bn, mo, uid, gid):
blen = len(bn)
return struct.pack(cls._fmt_mknod(blen),
uid, gid, gf, mo, bn,
stat.S_IMODE(mo), 0, umask())
+
def entry_pack_reg_stat(gf, bn, st):
blen = len(bn)
mo = st['mode']
@@ -531,18 +581,21 @@ class Server(object):
gf, mo, bn,
stat.S_IMODE(mo), 0, umask())
# mkdir
+
def entry_pack_mkdir(gf, bn, mo, uid, gid):
blen = len(bn)
return struct.pack(cls._fmt_mkdir(blen),
uid, gid, gf, mo, bn,
stat.S_IMODE(mo), umask())
- #symlink
+ # symlink
+
def entry_pack_symlink(gf, bn, lnk, st):
blen = len(bn)
llen = len(lnk)
return struct.pack(cls._fmt_symlink(blen, llen),
st['uid'], st['gid'],
gf, st['mode'], bn, lnk)
+
def entry_purge(entry, gfid):
# This is an extremely racy code and needs to be fixed ASAP.
# The GFID check here is to be sure that the pargfid/bname
@@ -574,9 +627,11 @@ class Server(object):
else:
break
elif op in ['CREATE', 'MKNOD']:
- blob = entry_pack_reg(gfid, bname, e['mode'], e['uid'], e['uid'])
+ blob = entry_pack_reg(
+ gfid, bname, e['mode'], e['uid'], e['uid'])
elif op == 'MKDIR':
- blob = entry_pack_mkdir(gfid, bname, e['mode'], e['uid'], e['uid'])
+ blob = entry_pack_mkdir(
+ gfid, bname, e['mode'], e['uid'], e['uid'])
elif op == 'LINK':
slink = os.path.join(pfx, gfid)
st = lstat(slink)
@@ -596,21 +651,23 @@ class Server(object):
else:
errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST])
if blob:
- errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [EEXIST], [ENOENT, ESTALE, EINVAL])
+ errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile',
+ blob],
+ [EEXIST], [ENOENT, ESTALE, EINVAL])
@classmethod
def meta_ops(cls, meta_entries):
logging.debug('Meta-entries: %s' % repr(meta_entries))
for e in meta_entries:
mode = e['stat']['mode']
- uid = e['stat']['uid']
- gid = e['stat']['gid']
- go = e['go']
+ uid = e['stat']['uid']
+ gid = e['stat']['gid']
+ go = e['go']
errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL])
errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL])
@classmethod
- def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0):
+ def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0):
Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
@classmethod
@@ -649,6 +706,7 @@ class Server(object):
return os.getpid()
last_keep_alive = 0
+
@classmethod
def keep_alive(cls, dct):
"""process keepalive messages.
@@ -662,9 +720,12 @@ class Server(object):
if dct:
key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])
val = struct.pack(cls.FRGN_FMTSTR,
- *(dct['version'] +
- tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) +
- (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],)))
+ *(dct['version'] +
+ tuple(int(x, 16)
+ for x in re.findall('(?:[\da-f]){2}',
+ dct['uuid'])) +
+ (dct['retval'],) + dct['volume_mark'][0:2] + (
+ dct['timeout'],)))
Xattr.lsetxattr('.', key, val)
cls.last_keep_alive += 1
return cls.last_keep_alive
@@ -676,6 +737,7 @@ class Server(object):
class SlaveLocal(object):
+
"""mix-in class to implement some factes of a slave server
("mix-in" is sort of like "abstract class", ie. it's not
@@ -697,9 +759,11 @@ class SlaveLocal(object):
"""
if boolify(gconf.use_rsync_xattrs) and not privileged():
- raise GsyncdError("using rsync for extended attributes is not supported")
+ raise GsyncdError(
+ "using rsync for extended attributes is not supported")
- repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
+ repce = RepceServer(
+ self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
t = syncdutils.Thread(target=lambda: (repce.service_loop(),
syncdutils.finalize()))
t.start()
@@ -709,12 +773,16 @@ class SlaveLocal(object):
lp = self.server.last_keep_alive
time.sleep(int(gconf.timeout))
if lp == self.server.last_keep_alive:
- logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout))
+ logging.info(
+ "connection inactive for %d seconds, stopping" %
+ int(gconf.timeout))
break
else:
select((), (), ())
+
class SlaveRemote(object):
+
"""mix-in class to implement an interface to a remote slave"""
def connect_remote(self, rargs=[], **opts):
@@ -731,9 +799,11 @@ class SlaveRemote(object):
extra_opts += ['--session-owner', so]
if boolify(gconf.use_rsync_xattrs):
extra_opts.append('--use-rsync-xattrs')
- po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + \
- ['-N', '--listen', '--timeout', str(gconf.timeout), slave],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts +
+ ['-N', '--listen', '--timeout', str(gconf.timeout),
+ slave],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
gconf.transport = po
return self.start_fd_client(po.stdout, po.stdin, **opts)
@@ -752,7 +822,9 @@ class SlaveRemote(object):
for k, v in da0[i].iteritems():
da1[i][k] = int(v)
if da1[0] != da1[1]:
- raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv))
+ raise GsyncdError(
+ "RePCe major version mismatch: local %s, remote %s" %
+ (exrv, rv))
def rsync(self, files, *args):
"""invoke rsync"""
@@ -760,17 +832,19 @@ class SlaveRemote(object):
raise GsyncdError("no files to sync")
logging.debug("files: " + ", ".join(files))
argv = gconf.rsync_command.split() + \
- ['-avR0', '--inplace', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \
- gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \
- ['.'] + list(args)
- po = Popen(argv, stdin=subprocess.PIPE,stderr=subprocess.PIPE)
+ ['-avR0', '--inplace', '--files-from=-', '--super',
+ '--stats', '--numeric-ids', '--no-implied-dirs'] + \
+ gconf.rsync_options.split() + \
+ (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \
+ ['.'] + list(args)
+ po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
for f in files:
po.stdin.write(f)
po.stdin.write('\0')
po.stdin.close()
po.wait()
- po.terminate_geterr(fail_on_err = False)
+ po.terminate_geterr(fail_on_err=False)
return po
@@ -784,8 +858,10 @@ class SlaveRemote(object):
logging.debug("files: " + ", ".join(files))
(host, rdir) = slaveurl.split(':')
tar_cmd = ["tar", "-cf", "-", "--files-from", "-"]
- ssh_cmd = gconf.ssh_command_tar.split() + [host, "tar", "--overwrite", "-xf", "-", "-C", rdir]
- p0 = Popen(tar_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ ssh_cmd = gconf.ssh_command_tar.split() + \
+ [host, "tar", "--overwrite", "-xf", "-", "-C", rdir]
+ p0 = Popen(tar_cmd, stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE, stderr=subprocess.PIPE)
p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE)
for f in files:
p0.stdin.write(f)
@@ -795,14 +871,16 @@ class SlaveRemote(object):
# wait() for tar to terminate, collecting any errors, further
# waiting for transfer to complete
p0.wait()
- p0.terminate_geterr(fail_on_err = False)
+ p0.terminate_geterr(fail_on_err=False)
p1.wait()
- p1.terminate_geterr(fail_on_err = False)
+ p1.terminate_geterr(fail_on_err=False)
return p1
+
class AbstractUrl(object):
+
"""abstract base class for url scheme classes"""
def __init__(self, path, pattern):
@@ -839,6 +917,7 @@ class AbstractUrl(object):
class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
+
"""scheme class for file:// urls
can be used to represent a file slave server
@@ -847,6 +926,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
"""
class FILEServer(Server):
+
"""included server flavor"""
pass
@@ -864,6 +944,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
+
"""scheme class for gluster:// urls
can be used to represent a gluster slave server
@@ -874,21 +955,24 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
"""
class GLUSTERServer(Server):
+
"server enhancements for a glusterfs backend"""
@classmethod
- def _attr_unpack_dict(cls, xattr, extra_fields = ''):
+ def _attr_unpack_dict(cls, xattr, extra_fields=''):
"""generic volume mark fetching/parsing backed"""
fmt_string = cls.NTV_FMTSTR + extra_fields
buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
vm = struct.unpack(fmt_string, buf)
- m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]]))
+ m = re.match(
+ '(.{8})(.{4})(.{4})(.{4})(.{12})',
+ "".join(['%02x' % x for x in vm[2:18]]))
uuid = '-'.join(m.groups())
- volinfo = { 'version': vm[0:2],
- 'uuid' : uuid,
- 'retval' : vm[18],
- 'volume_mark': vm[19:21],
- }
+ volinfo = {'version': vm[0:2],
+ 'uuid': uuid,
+ 'retval': vm[18],
+ 'volume_mark': vm[19:21],
+ }
if extra_fields:
return volinfo, vm[-len(extra_fields):]
else:
@@ -904,7 +988,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
now = int(time.time())
if x[0] > now:
- logging.debug("volinfo[%s] expires: %d (%d sec later)" % \
+ logging.debug("volinfo[%s] expires: %d "
+ "(%d sec later)" %
(d['uuid'], x[0], x[0] - now))
d['timeout'] = x[0]
dict_list.append(d)
@@ -919,7 +1004,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def native_volume_info(cls):
"""get the native volume mark of the underlying gluster volume"""
try:
- return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark']))
+ return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE,
+ 'volume-mark']))
except OSError:
ex = sys.exc_info()[1]
if ex.errno != ENODATA:
@@ -936,9 +1022,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def can_connect_to(self, remote):
"""determine our position in the connectibility matrix"""
return not remote or \
- (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER))
+ (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER))
class Mounter(object):
+
"""Abstract base class for mounter backends"""
def __init__(self, params):
@@ -1003,7 +1090,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
if not t.isAlive():
break
if time.time() >= tlim:
- syncdutils.finalize(exval = 1)
+ syncdutils.finalize(exval=1)
time.sleep(1)
os.close(mpo)
_, rv = syncdutils.waitpid(mh, 0)
@@ -1011,7 +1098,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \
(os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0)
logging.warn('stale mount possibly left behind on ' + d)
- raise GsyncdError("cleaning up temp mountpoint %s failed with status %d" % \
+ raise GsyncdError("cleaning up temp mountpoint %s "
+ "failed with status %d" %
(d, rv))
else:
rv = 0
@@ -1035,7 +1123,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
assert(mntpt)
if mounted:
po = self.umount_l(mntpt)
- po.terminate_geterr(fail_on_err = False)
+ po.terminate_geterr(fail_on_err=False)
if po.returncode != 0:
po.errlog()
rv = po.returncode
@@ -1047,6 +1135,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
logging.debug('auxiliary glusterfs mount prepared')
class DirectMounter(Mounter):
+
"""mounter backend which calls mount(8), umount(8) directly"""
mountkw = {'stderr': subprocess.PIPE}
@@ -1057,15 +1146,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
return ['umount', '-l', d]
def make_mount_argv(self):
- self.mntpt = tempfile.mkdtemp(prefix = 'gsyncd-aux-mount-')
- return [self.get_glusterprog()] + ['--' + p for p in self.params] + [self.mntpt]
+ self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-')
+ return [self.get_glusterprog()] + \
+ ['--' + p for p in self.params] + [self.mntpt]
- def cleanup_mntpt(self, mntpt = None):
+ def cleanup_mntpt(self, mntpt=None):
if not mntpt:
mntpt = self.mntpt
os.rmdir(mntpt)
class MountbrokerMounter(Mounter):
+
"""mounter backend using the mountbroker gluster service"""
mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE}
@@ -1073,7 +1164,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
@classmethod
def make_cli_argv(cls):
- return [cls.get_glusterprog()] + gconf.gluster_cli_options.split() + ['system::']
+ return [cls.get_glusterprog()] + \
+ gconf.gluster_cli_options.split() + ['system::']
@classmethod
def make_umount_argv(cls, d):
@@ -1081,7 +1173,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def make_mount_argv(self, label):
return self.make_cli_argv() + \
- ['mount', label, 'user-map-root=' + syncdutils.getusername()] + self.params
+ ['mount', label, 'user-map-root=' +
+ syncdutils.getusername()] + self.params
def handle_mounter(self, po):
self.mntpt = po.stdout.readline()[:-1]
@@ -1106,9 +1199,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
label = syncdutils.getusername()
mounter = label and self.MountbrokerMounter or self.DirectMounter
params = gconf.gluster_params.split() + \
- (gconf.gluster_log_level and ['log-level=' + gconf.gluster_log_level] or []) + \
- ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + self.host,
- 'volfile-id=' + self.volume, 'client-pid=-1']
+ (gconf.gluster_log_level and ['log-level=' +
+ gconf.gluster_log_level] or []) + \
+ ['log-file=' + gconf.gluster_log_file, 'volfile-server=' +
+ self.host, 'volfile-id=' + self.volume, 'client-pid=-1']
mounter(params).inhibit(*[l for l in [label] if l])
def connect_remote(self, *a, **kw):
@@ -1116,8 +1210,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
self.slavedir = "/proc/%d/cwd" % self.server.pid()
def gmaster_instantiate_tuple(self, slave):
- """return a tuple of the 'one shot' and the 'main crawl' class instance"""
- return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave))
+ """return a tuple of the 'one shot' and the 'main crawl'
+ class instance"""
+ return (gmaster_builder('xsync')(self, slave),
+ gmaster_builder()(self, slave))
def service_loop(self, *args):
"""enter service loop
@@ -1133,6 +1229,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
class brickserver(FILE.FILEServer):
local_path = gconf.local_path
aggregated = self.server
+
@classmethod
def entries(cls, path):
e = super(brickserver, cls).entries(path)
@@ -1143,14 +1240,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
except ValueError:
pass
return e
+
@classmethod
def lstat(cls, e):
""" path based backend stat """
return super(brickserver, cls).lstat(e)
+
@classmethod
def gfid(cls, e):
""" path based backend gfid fetch """
return super(brickserver, cls).gfid(e)
+
@classmethod
def linkto_check(cls, e):
return super(brickserver, cls).linkto_check(e)
@@ -1158,9 +1258,25 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
# define {,set_}xtime in slave, thus preempting
# the call to remote, so that it takes data from
# the local brick
- slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server)
- slave.server.stime = types.MethodType(lambda _self, path, uuid: brickserver.stime(path, uuid + '.' + gconf.slave_id), slave.server)
- slave.server.set_stime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_stime(path, uuid + '.' + gconf.slave_id, mark), slave.server)
+ slave.server.xtime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.xtime(path,
+ uuid + '.' + gconf.slave_id)
+ ),
+ slave.server)
+ slave.server.stime = types.MethodType(
+ lambda _self, path, uuid: (
+ brickserver.stime(path,
+ uuid + '.' + gconf.slave_id)
+ ),
+ slave.server)
+ slave.server.set_stime = types.MethodType(
+ lambda _self, path, uuid, mark: (
+ brickserver.set_stime(path,
+ uuid + '.' + gconf.slave_id,
+ mark)
+ ),
+ slave.server)
(g1, g2) = self.gmaster_instantiate_tuple(slave)
g1.master.server = brickserver
g2.master.server = brickserver
@@ -1186,6 +1302,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
class SSH(AbstractUrl, SlaveRemote):
+
"""scheme class for ssh:// urls
interface to remote slave on master side
@@ -1194,7 +1311,9 @@ class SSH(AbstractUrl, SlaveRemote):
def __init__(self, path):
self.remote_addr, inner_url = sup(self, path,
- '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ]))
+ '^((?:%s@)?%s):(.+)' %
+ tuple([r.pattern
+ for r in (UserRX, HostRX)]))
self.inner_rsc = parse_url(inner_url)
self.volume = inner_url[1:]
@@ -1262,7 +1381,8 @@ class SSH(AbstractUrl, SlaveRemote):
self.inner_rsc.url)
deferred = go_daemon == 'postconn'
- ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr],
+ ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args +
+ [self.remote_addr],
slave=self.inner_rsc.url, deferred=deferred)
if deferred:
@@ -1285,7 +1405,8 @@ class SSH(AbstractUrl, SlaveRemote):
return 'should'
def rsync(self, files):
- return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args),
+ return sup(self, files, '-e',
+ " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args),
*(gconf.rsync_ssh_options.split() + [self.slaveurl]))
def tarssh(self, files):