diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 306 |
1 files changed, 212 insertions, 94 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 41add6fb287..2fb6b3078d8 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -1,3 +1,13 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# + import re import os import sys @@ -12,27 +22,31 @@ import logging import tempfile import threading import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR, ENOTEMPTY, ESTALE, EINVAL +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP +from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL from select import error as SelectError from gconf import gconf import repce from repce import RepceServer, RepceClient -from master import gmaster_builder +from master import gmaster_builder import syncdutils from syncdutils import GsyncdError, select, privileged, boolify, funcode from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat -UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') +UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") + def sup(x, *a, **kw): """a rubyesque "super" for python ;) invoke caller method in parent class with given args. """ - return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) + return getattr(super(type(x), x), + sys._getframe(1).f_code.co_name)(*a, **kw) + def desugar(ustr): """transform sugared url strings to standard <scheme>://<urlbody> form @@ -57,15 +71,17 @@ def desugar(ustr): ap = ap[1:] return "file://" + ap + def gethostbyname(hnam): """gethostbyname wrapper""" try: return socket.gethostbyname(hnam) except socket.gaierror: ex = sys.exc_info()[1] - raise GsyncdError("failed to resolve %s: %s" % \ + raise GsyncdError("failed to resolve %s: %s" % (hnam, ex.strerror)) + def parse_url(ustr): """instantiate an url object by scheme-to-class dispatch @@ -86,6 +102,7 @@ def parse_url(ustr): class _MetaXattr(object): + """singleton class, a lazy wrapper around the libcxattr module @@ -100,17 +117,19 @@ class _MetaXattr(object): def __getattr__(self, meth): from libcxattr import Xattr as LXattr - xmeth = [ m for m in dir(LXattr) if m[0] != '_' ] + xmeth = [m for m in dir(LXattr) if m[0] != '_'] if not meth in xmeth: return for m in xmeth: setattr(self, m, getattr(LXattr, m)) return getattr(self, meth) + class _MetaChangelog(object): + def __getattr__(self, meth): from libgfchangelog import Changes as LChanges - xmeth = [ m for m in dir(LChanges) if m[0] != '_' ] + xmeth = [m for m in dir(LChanges) if m[0] != '_'] if not meth in xmeth: return for m in xmeth: @@ -122,6 +141,7 @@ Changes = _MetaChangelog() class Popen(subprocess.Popen): + """customized subclass of subprocess.Popen with a ring buffer for children error output""" @@ -129,11 +149,13 @@ class Popen(subprocess.Popen): def init_errhandler(cls): """start the thread which handles children's error output""" cls.errstore = {} + def tailer(): while True: errstore = cls.errstore.copy() try: - poe, _ ,_ = select([po.stderr for po in errstore], [], [], 1) + poe, _, _ = select( + [po.stderr for po in errstore], [], [], 1) except (ValueError, SelectError): continue for po in errstore: @@ -154,12 +176,12 @@ class Popen(subprocess.Popen): tots = len(l) for lx in la: tots += len(lx) - while tots > 1<<20 and la: + while tots > 1 << 20 and la: tots -= len(la.pop(0)) la.append(l) finally: po.lock.release() - t = syncdutils.Thread(target = tailer) + t = syncdutils.Thread(target=tailer) t.start() cls.errhandler = t @@ -189,8 +211,9 @@ class Popen(subprocess.Popen): ex = sys.exc_info()[1] if not isinstance(ex, OSError): raise - raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \ - (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno))) + raise GsyncdError("""execution of "%s" failed with %s (%s)""" % + (args[0], errno.errorcode[ex.errno], + os.strerror(ex.errno))) if kw.get('stderr') == subprocess.PIPE: assert(getattr(self, 'errhandler', None)) self.errstore[self] = [] @@ -200,9 +223,10 @@ class Popen(subprocess.Popen): filling = "" if self.elines: filling = ", saying:" - logging.error("""command "%s" returned with %s%s""" % \ + logging.error("""command "%s" returned with %s%s""" % (" ".join(self.args), repr(self.returncode), filling)) lp = '' + def logerr(l): logging.error(self.args[0] + "> " + l) for l in self.elines: @@ -217,9 +241,9 @@ class Popen(subprocess.Popen): def errfail(self): """fail nicely if child did not terminate with success""" self.errlog() - syncdutils.finalize(exval = 1) + syncdutils.finalize(exval=1) - def terminate_geterr(self, fail_on_err = True): + def terminate_geterr(self, fail_on_err=True): """kill child, finalize stderr harvesting (unregister from errhandler, set up .elines), fail on error if asked for @@ -230,14 +254,14 @@ class Popen(subprocess.Popen): finally: self.lock.release() elines = self.errstore.pop(self) - if self.poll() == None: + if self.poll() is None: self.terminate() - if self.poll() == None: + if self.poll() is None: time.sleep(0.1) self.kill() self.wait() while True: - if not select([self.stderr],[],[],0.1)[0]: + if not select([self.stderr], [], [], 0.1)[0]: break b = os.read(self.stderr.fileno(), 1024) if b: @@ -251,6 +275,7 @@ class Popen(subprocess.Popen): class Server(object): + """singleton implemening those filesystem access primitives which are needed for geo-replication functionality @@ -260,25 +285,28 @@ class Server(object): GX_NSPACE_PFX = (privileged() and "trusted" or "system") GX_NSPACE = GX_NSPACE_PFX + ".glusterfs" - NTV_FMTSTR = "!" + "B"*19 + "II" + NTV_FMTSTR = "!" + "B" * 19 + "II" FRGN_XTRA_FMT = "I" FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT - GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' + GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' - GFID_XATTR = 'trusted.gfid' # for backend gfid fetch, do not use GX_NSPACE_PFX - GFID_FMTSTR = "!" + "B"*16 + # for backend gfid fetch, do not use GX_NSPACE_PFX + GFID_XATTR = 'trusted.gfid' + GFID_FMTSTR = "!" + "B" * 16 local_path = '' @classmethod def _fmt_mknod(cls, l): - return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l+1) + return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) + @classmethod def _fmt_mkdir(cls, l): - return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l+1) + return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l + 1) + @classmethod def _fmt_symlink(cls, l1, l2): - return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1+1, l2+1) + return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1 + 1, l2 + 1) def _pathguard(f): """decorator method that checks @@ -289,6 +317,7 @@ class Server(object): fc = funcode(f) pi = list(fc.co_varnames).index('path') + def ff(*a): path = a[pi] ps = path.split('/') @@ -308,7 +337,6 @@ class Server(object): raise OSError(ENOTDIR, os.strerror(ENOTDIR)) return os.listdir(path) - @classmethod @_pathguard def lstat(cls, path): @@ -325,7 +353,9 @@ class Server(object): @_pathguard def linkto_check(cls, path): try: - return not (Xattr.lgetxattr_buf(path, 'trusted.glusterfs.dht.linkto') == '') + return not ( + Xattr.lgetxattr_buf(path, + 'trusted.glusterfs.dht.linkto') == '') except (IOError, OSError): ex = sys.exc_info()[1] if ex.errno in (ENOENT, ENODATA): @@ -333,13 +363,13 @@ class Server(object): else: raise - @classmethod @_pathguard def gfid(cls, path): try: buf = Xattr.lgetxattr(path, cls.GFID_XATTR, 16) - m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) + m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join( + ['%02x' % x for x in struct.unpack(cls.GFID_FMTSTR, buf)])) return '-'.join(m.groups()) except (IOError, OSError): ex = sys.exc_info()[1] @@ -350,7 +380,9 @@ class Server(object): @classmethod def gfid_mnt(cls, gfidpath): - return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid.string', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) + return errno_wrap(Xattr.lgetxattr, + [gfidpath, 'glusterfs.gfid.string', + cls.GX_GFID_CANONICAL_LEN], [ENOENT]) @classmethod @_pathguard @@ -369,7 +401,7 @@ class Server(object): for e in entries: cls.purge(os.path.join(path, e)) """ - me_also = entries == None + me_also = entries is None if not entries: try: # if it's a symlink, prevent @@ -435,7 +467,9 @@ class Server(object): """ try: - return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) + val = Xattr.lgetxattr(path, + '.'.join([cls.GX_NSPACE, uuid, 'xtime'])) + return struct.unpack('!II', val, 8) except OSError: ex = sys.exc_info()[1] if ex.errno in (ENOENT, ENODATA, ENOTDIR): @@ -454,7 +488,9 @@ class Server(object): """ try: - return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) + val = Xattr.lgetxattr(path, + '.'.join([cls.GX_NSPACE, uuid, 'stime'])) + return struct.unpack('!II', val, 8) except OSError: ex = sys.exc_info()[1] if ex.errno in (ENOENT, ENODATA, ENOTDIR): @@ -473,7 +509,9 @@ class Server(object): """ try: - return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), 8)) + val = Xattr.lgetxattr(path, + '.'.join([cls.GX_NSPACE, uuid, 'stime'])) + return struct.unpack('!II', val, 8) except OSError: ex = sys.exc_info()[1] if ex.errno in (ENOENT, ENODATA, ENOTDIR): @@ -484,7 +522,8 @@ class Server(object): @classmethod def node_uuid(cls, path='.'): try: - uuid_l = Xattr.lgetxattr_buf(path, '.'.join([cls.GX_NSPACE, 'node-uuid'])) + uuid_l = Xattr.lgetxattr_buf( + path, '.'.join([cls.GX_NSPACE, 'node-uuid'])) return uuid_l[:-1].split(' ') except OSError: raise @@ -493,13 +532,17 @@ class Server(object): @_pathguard def set_stime(cls, path, uuid, mark): """set @mark as stime for @uuid on @path""" - Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), struct.pack('!II', *mark)) + Xattr.lsetxattr( + path, '.'.join([cls.GX_NSPACE, uuid, 'stime']), + struct.pack('!II', *mark)) @classmethod @_pathguard def set_xtime(cls, path, uuid, mark): """set @mark as xtime for @uuid on @path""" - Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) + Xattr.lsetxattr( + path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), + struct.pack('!II', *mark)) @classmethod @_pathguard @@ -511,18 +554,22 @@ class Server(object): on the brick (this method sets xtime on the remote slave) """ - Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) + Xattr.lsetxattr( + path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), + struct.pack('!II', *mark)) @classmethod def entry_ops(cls, entries): pfx = gauxpfx() logging.debug('entries: %s' % repr(entries)) # regular file + def entry_pack_reg(gf, bn, mo, uid, gid): blen = len(bn) return struct.pack(cls._fmt_mknod(blen), uid, gid, gf, mo, bn, stat.S_IMODE(mo), 0, umask()) + def entry_pack_reg_stat(gf, bn, st): blen = len(bn) mo = st['mode'] @@ -531,18 +578,21 @@ class Server(object): gf, mo, bn, stat.S_IMODE(mo), 0, umask()) # mkdir + def entry_pack_mkdir(gf, bn, mo, uid, gid): blen = len(bn) return struct.pack(cls._fmt_mkdir(blen), uid, gid, gf, mo, bn, stat.S_IMODE(mo), umask()) - #symlink + # symlink + def entry_pack_symlink(gf, bn, lnk, st): blen = len(bn) llen = len(lnk) return struct.pack(cls._fmt_symlink(blen, llen), st['uid'], st['gid'], gf, st['mode'], bn, lnk) + def entry_purge(entry, gfid): # This is an extremely racy code and needs to be fixed ASAP. # The GFID check here is to be sure that the pargfid/bname @@ -574,9 +624,11 @@ class Server(object): else: break elif op in ['CREATE', 'MKNOD']: - blob = entry_pack_reg(gfid, bname, e['mode'], e['uid'], e['uid']) + blob = entry_pack_reg( + gfid, bname, e['mode'], e['uid'], e['uid']) elif op == 'MKDIR': - blob = entry_pack_mkdir(gfid, bname, e['mode'], e['uid'], e['uid']) + blob = entry_pack_mkdir( + gfid, bname, e['mode'], e['uid'], e['uid']) elif op == 'LINK': slink = os.path.join(pfx, gfid) st = lstat(slink) @@ -596,21 +648,23 @@ class Server(object): else: errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) if blob: - errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [EEXIST], [ENOENT, ESTALE, EINVAL]) + errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', + blob], + [EEXIST], [ENOENT, ESTALE, EINVAL]) @classmethod def meta_ops(cls, meta_entries): logging.debug('Meta-entries: %s' % repr(meta_entries)) for e in meta_entries: mode = e['stat']['mode'] - uid = e['stat']['uid'] - gid = e['stat']['gid'] - go = e['go'] + uid = e['stat']['uid'] + gid = e['stat']['gid'] + go = e['go'] errno_wrap(os.chmod, [go, mode], [ENOENT], [ESTALE, EINVAL]) errno_wrap(os.chown, [go, uid, gid], [ENOENT], [ESTALE, EINVAL]) @classmethod - def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0): + def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries=0): Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) @classmethod @@ -649,6 +703,7 @@ class Server(object): return os.getpid() last_keep_alive = 0 + @classmethod def keep_alive(cls, dct): """process keepalive messages. @@ -662,9 +717,12 @@ class Server(object): if dct: key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']]) val = struct.pack(cls.FRGN_FMTSTR, - *(dct['version'] + - tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) + - (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],))) + *(dct['version'] + + tuple(int(x, 16) + for x in re.findall('(?:[\da-f]){2}', + dct['uuid'])) + + (dct['retval'],) + dct['volume_mark'][0:2] + ( + dct['timeout'],))) Xattr.lsetxattr('.', key, val) cls.last_keep_alive += 1 return cls.last_keep_alive @@ -676,6 +734,7 @@ class Server(object): class SlaveLocal(object): + """mix-in class to implement some factes of a slave server ("mix-in" is sort of like "abstract class", ie. it's not @@ -697,9 +756,11 @@ class SlaveLocal(object): """ if boolify(gconf.use_rsync_xattrs) and not privileged(): - raise GsyncdError("using rsync for extended attributes is not supported") + raise GsyncdError( + "using rsync for extended attributes is not supported") - repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) + repce = RepceServer( + self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) t = syncdutils.Thread(target=lambda: (repce.service_loop(), syncdutils.finalize())) t.start() @@ -709,12 +770,16 @@ class SlaveLocal(object): lp = self.server.last_keep_alive time.sleep(int(gconf.timeout)) if lp == self.server.last_keep_alive: - logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout)) + logging.info( + "connection inactive for %d seconds, stopping" % + int(gconf.timeout)) break else: select((), (), ()) + class SlaveRemote(object): + """mix-in class to implement an interface to a remote slave""" def connect_remote(self, rargs=[], **opts): @@ -731,9 +796,11 @@ class SlaveRemote(object): extra_opts += ['--session-owner', so] if boolify(gconf.use_rsync_xattrs): extra_opts.append('--use-rsync-xattrs') - po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + \ - ['-N', '--listen', '--timeout', str(gconf.timeout), slave], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + + ['-N', '--listen', '--timeout', str(gconf.timeout), + slave], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) gconf.transport = po return self.start_fd_client(po.stdout, po.stdin, **opts) @@ -752,7 +819,9 @@ class SlaveRemote(object): for k, v in da0[i].iteritems(): da1[i][k] = int(v) if da1[0] != da1[1]: - raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv)) + raise GsyncdError( + "RePCe major version mismatch: local %s, remote %s" % + (exrv, rv)) def rsync(self, files, *args): """invoke rsync""" @@ -760,17 +829,19 @@ class SlaveRemote(object): raise GsyncdError("no files to sync") logging.debug("files: " + ", ".join(files)) argv = gconf.rsync_command.split() + \ - ['-avR0', '--inplace', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ - gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ - ['.'] + list(args) - po = Popen(argv, stdin=subprocess.PIPE,stderr=subprocess.PIPE) + ['-avR0', '--inplace', '--files-from=-', '--super', + '--stats', '--numeric-ids', '--no-implied-dirs'] + \ + gconf.rsync_options.split() + \ + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \ + ['.'] + list(args) + po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE) for f in files: po.stdin.write(f) po.stdin.write('\0') po.stdin.close() po.wait() - po.terminate_geterr(fail_on_err = False) + po.terminate_geterr(fail_on_err=False) return po @@ -784,8 +855,10 @@ class SlaveRemote(object): logging.debug("files: " + ", ".join(files)) (host, rdir) = slaveurl.split(':') tar_cmd = ["tar", "-cf", "-", "--files-from", "-"] - ssh_cmd = gconf.ssh_command_tar.split() + [host, "tar", "--overwrite", "-xf", "-", "-C", rdir] - p0 = Popen(tar_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + ssh_cmd = gconf.ssh_command_tar.split() + \ + [host, "tar", "--overwrite", "-xf", "-", "-C", rdir] + p0 = Popen(tar_cmd, stdout=subprocess.PIPE, + stdin=subprocess.PIPE, stderr=subprocess.PIPE) p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) for f in files: p0.stdin.write(f) @@ -795,14 +868,16 @@ class SlaveRemote(object): # wait() for tar to terminate, collecting any errors, further # waiting for transfer to complete p0.wait() - p0.terminate_geterr(fail_on_err = False) + p0.terminate_geterr(fail_on_err=False) p1.wait() - p1.terminate_geterr(fail_on_err = False) + p1.terminate_geterr(fail_on_err=False) return p1 + class AbstractUrl(object): + """abstract base class for url scheme classes""" def __init__(self, path, pattern): @@ -839,6 +914,7 @@ class AbstractUrl(object): class FILE(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for file:// urls can be used to represent a file slave server @@ -847,6 +923,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote): """ class FILEServer(Server): + """included server flavor""" pass @@ -864,6 +941,7 @@ class FILE(AbstractUrl, SlaveLocal, SlaveRemote): class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): + """scheme class for gluster:// urls can be used to represent a gluster slave server @@ -874,21 +952,24 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): """ class GLUSTERServer(Server): + "server enhancements for a glusterfs backend""" @classmethod - def _attr_unpack_dict(cls, xattr, extra_fields = ''): + def _attr_unpack_dict(cls, xattr, extra_fields=''): """generic volume mark fetching/parsing backed""" fmt_string = cls.NTV_FMTSTR + extra_fields buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) vm = struct.unpack(fmt_string, buf) - m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]])) + m = re.match( + '(.{8})(.{4})(.{4})(.{4})(.{12})', + "".join(['%02x' % x for x in vm[2:18]])) uuid = '-'.join(m.groups()) - volinfo = { 'version': vm[0:2], - 'uuid' : uuid, - 'retval' : vm[18], - 'volume_mark': vm[19:21], - } + volinfo = {'version': vm[0:2], + 'uuid': uuid, + 'retval': vm[18], + 'volume_mark': vm[19:21], + } if extra_fields: return volinfo, vm[-len(extra_fields):] else: @@ -904,7 +985,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) now = int(time.time()) if x[0] > now: - logging.debug("volinfo[%s] expires: %d (%d sec later)" % \ + logging.debug("volinfo[%s] expires: %d " + "(%d sec later)" % (d['uuid'], x[0], x[0] - now)) d['timeout'] = x[0] dict_list.append(d) @@ -919,7 +1001,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def native_volume_info(cls): """get the native volume mark of the underlying gluster volume""" try: - return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark'])) + return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, + 'volume-mark'])) except OSError: ex = sys.exc_info()[1] if ex.errno != ENODATA: @@ -936,9 +1019,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def can_connect_to(self, remote): """determine our position in the connectibility matrix""" return not remote or \ - (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER)) + (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER)) class Mounter(object): + """Abstract base class for mounter backends""" def __init__(self, params): @@ -1003,7 +1087,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): if not t.isAlive(): break if time.time() >= tlim: - syncdutils.finalize(exval = 1) + syncdutils.finalize(exval=1) time.sleep(1) os.close(mpo) _, rv = syncdutils.waitpid(mh, 0) @@ -1011,7 +1095,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) logging.warn('stale mount possibly left behind on ' + d) - raise GsyncdError("cleaning up temp mountpoint %s failed with status %d" % \ + raise GsyncdError("cleaning up temp mountpoint %s " + "failed with status %d" % (d, rv)) else: rv = 0 @@ -1035,7 +1120,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): assert(mntpt) if mounted: po = self.umount_l(mntpt) - po.terminate_geterr(fail_on_err = False) + po.terminate_geterr(fail_on_err=False) if po.returncode != 0: po.errlog() rv = po.returncode @@ -1047,6 +1132,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): logging.debug('auxiliary glusterfs mount prepared') class DirectMounter(Mounter): + """mounter backend which calls mount(8), umount(8) directly""" mountkw = {'stderr': subprocess.PIPE} @@ -1057,15 +1143,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): return ['umount', '-l', d] def make_mount_argv(self): - self.mntpt = tempfile.mkdtemp(prefix = 'gsyncd-aux-mount-') - return [self.get_glusterprog()] + ['--' + p for p in self.params] + [self.mntpt] + self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-') + return [self.get_glusterprog()] + \ + ['--' + p for p in self.params] + [self.mntpt] - def cleanup_mntpt(self, mntpt = None): + def cleanup_mntpt(self, mntpt=None): if not mntpt: mntpt = self.mntpt os.rmdir(mntpt) class MountbrokerMounter(Mounter): + """mounter backend using the mountbroker gluster service""" mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} @@ -1073,7 +1161,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): @classmethod def make_cli_argv(cls): - return [cls.get_glusterprog()] + gconf.gluster_cli_options.split() + ['system::'] + return [cls.get_glusterprog()] + \ + gconf.gluster_cli_options.split() + ['system::'] @classmethod def make_umount_argv(cls, d): @@ -1081,7 +1170,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def make_mount_argv(self, label): return self.make_cli_argv() + \ - ['mount', label, 'user-map-root=' + syncdutils.getusername()] + self.params + ['mount', label, 'user-map-root=' + + syncdutils.getusername()] + self.params def handle_mounter(self, po): self.mntpt = po.stdout.readline()[:-1] @@ -1106,9 +1196,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): label = syncdutils.getusername() mounter = label and self.MountbrokerMounter or self.DirectMounter params = gconf.gluster_params.split() + \ - (gconf.gluster_log_level and ['log-level=' + gconf.gluster_log_level] or []) + \ - ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + self.host, - 'volfile-id=' + self.volume, 'client-pid=-1'] + (gconf.gluster_log_level and ['log-level=' + + gconf.gluster_log_level] or []) + \ + ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + + self.host, 'volfile-id=' + self.volume, 'client-pid=-1'] mounter(params).inhibit(*[l for l in [label] if l]) def connect_remote(self, *a, **kw): @@ -1116,8 +1207,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): self.slavedir = "/proc/%d/cwd" % self.server.pid() def gmaster_instantiate_tuple(self, slave): - """return a tuple of the 'one shot' and the 'main crawl' class instance""" - return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave)) + """return a tuple of the 'one shot' and the 'main crawl' + class instance""" + return (gmaster_builder('xsync')(self, slave), + gmaster_builder()(self, slave)) def service_loop(self, *args): """enter service loop @@ -1133,6 +1226,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): class brickserver(FILE.FILEServer): local_path = gconf.local_path aggregated = self.server + @classmethod def entries(cls, path): e = super(brickserver, cls).entries(path) @@ -1143,14 +1237,17 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): except ValueError: pass return e + @classmethod def lstat(cls, e): """ path based backend stat """ return super(brickserver, cls).lstat(e) + @classmethod def gfid(cls, e): """ path based backend gfid fetch """ return super(brickserver, cls).gfid(e) + @classmethod def linkto_check(cls, e): return super(brickserver, cls).linkto_check(e) @@ -1158,9 +1255,25 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): # define {,set_}xtime in slave, thus preempting # the call to remote, so that it takes data from # the local brick - slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server) - slave.server.stime = types.MethodType(lambda _self, path, uuid: brickserver.stime(path, uuid + '.' + gconf.slave_id), slave.server) - slave.server.set_stime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_stime(path, uuid + '.' + gconf.slave_id, mark), slave.server) + slave.server.xtime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.xtime(path, + uuid + '.' + gconf.slave_id) + ), + slave.server) + slave.server.stime = types.MethodType( + lambda _self, path, uuid: ( + brickserver.stime(path, + uuid + '.' + gconf.slave_id) + ), + slave.server) + slave.server.set_stime = types.MethodType( + lambda _self, path, uuid, mark: ( + brickserver.set_stime(path, + uuid + '.' + gconf.slave_id, + mark) + ), + slave.server) (g1, g2) = self.gmaster_instantiate_tuple(slave) g1.master.server = brickserver g2.master.server = brickserver @@ -1186,6 +1299,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): class SSH(AbstractUrl, SlaveRemote): + """scheme class for ssh:// urls interface to remote slave on master side @@ -1194,7 +1308,9 @@ class SSH(AbstractUrl, SlaveRemote): def __init__(self, path): self.remote_addr, inner_url = sup(self, path, - '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) + '^((?:%s@)?%s):(.+)' % + tuple([r.pattern + for r in (UserRX, HostRX)])) self.inner_rsc = parse_url(inner_url) self.volume = inner_url[1:] @@ -1262,7 +1378,8 @@ class SSH(AbstractUrl, SlaveRemote): self.inner_rsc.url) deferred = go_daemon == 'postconn' - ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], + ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred) if deferred: @@ -1285,7 +1402,8 @@ class SSH(AbstractUrl, SlaveRemote): return 'should' def rsync(self, files): - return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), + return sup(self, files, '-e', + " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), *(gconf.rsync_ssh_options.split() + [self.slaveurl])) def tarssh(self, files): |