summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/resource.py
diff options
context:
space:
mode:
authorAvra Sengupta <asengupt@redhat.com>2013-05-27 22:23:57 +0530
committerVijay Bellur <vbellur@redhat.com>2013-07-22 01:53:03 -0700
commit950371be29d029179ac5cd0ad2dfdbfcd4467b96 (patch)
treea979d3c710c25074088bd05cef5b6a0ede9505a9 /geo-replication/syncdaemon/resource.py
parent11f6c56f83b977a08f9d74563249cef59e22a05d (diff)
move 'xlators/marker/utils/' to 'geo-replication/' directory
Change-Id: Ibd0faefecc15b6713eda28bc96794ae58aff45aa BUG: 847839 Original Author: Amar Tumballi <amarts@redhat.com> Signed-off-by: Avra Sengupta <asengupt@redhat.com> Reviewed-on: http://review.gluster.org/5133 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r--geo-replication/syncdaemon/resource.py972
1 files changed, 972 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
new file mode 100644
index 00000000000..73102fbcb44
--- /dev/null
+++ b/geo-replication/syncdaemon/resource.py
@@ -0,0 +1,972 @@
+import re
+import os
+import sys
+import stat
+import time
+import fcntl
+import errno
+import struct
+import socket
+import logging
+import tempfile
+import threading
+import subprocess
+from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR
+from select import error as SelectError
+
+from gconf import gconf
+import repce
+from repce import RepceServer, RepceClient
+from master import gmaster_builder
+import syncdutils
+from syncdutils import GsyncdError, select, privileged, boolify
+
+UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
+HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
+UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
+
+def sup(x, *a, **kw):
+ """a rubyesque "super" for python ;)
+
+ invoke caller method in parent class with given args.
+ """
+ return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw)
+
+def desugar(ustr):
+ """transform sugared url strings to standard <scheme>://<urlbody> form
+
+ parsing logic enforces the constraint that sugared forms should contatin
+ a ':' or a '/', which ensures that sugared urls do not conflict with
+ gluster volume names.
+ """
+ m = re.match('([^:]*):(.*)', ustr)
+ if m:
+ if not m.groups()[0]:
+ return "gluster://localhost" + ustr
+ elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]):
+ return "ssh://" + ustr
+ else:
+ return "gluster://" + ustr
+ else:
+ if ustr[0] != '/':
+ raise GsyncdError("cannot resolve sugared url '%s'" % ustr)
+ ap = os.path.normpath(ustr)
+ if ap.startswith('//'):
+ ap = ap[1:]
+ return "file://" + ap
+
+def gethostbyname(hnam):
+ """gethostbyname wrapper"""
+ try:
+ return socket.gethostbyname(hnam)
+ except socket.gaierror:
+ ex = sys.exc_info()[1]
+ raise GsyncdError("failed to resolve %s: %s" % \
+ (hnam, ex.strerror))
+
+def parse_url(ustr):
+ """instantiate an url object by scheme-to-class dispatch
+
+ The url classes taken into consideration are the ones in
+ this module whose names are full-caps.
+ """
+ m = UrlRX.match(ustr)
+ if not m:
+ ustr = desugar(ustr)
+ m = UrlRX.match(ustr)
+ if not m:
+ raise GsyncdError("malformed url")
+ sch, path = m.groups()
+ this = sys.modules[__name__]
+ if not hasattr(this, sch.upper()):
+ raise GsyncdError("unknown url scheme " + sch)
+ return getattr(this, sch.upper())(path)
+
+
+class _MetaXattr(object):
+ """singleton class, a lazy wrapper around the
+ libcxattr module
+
+ libcxattr (a heavy import due to ctypes) is
+ loaded only when when the single
+ instance is tried to be used.
+
+ This reduces runtime for those invocations
+ which do not need filesystem manipulation
+ (eg. for config, url parsing)
+ """
+
+ def __getattr__(self, meth):
+ from libcxattr import Xattr as LXattr
+ xmeth = [ m for m in dir(LXattr) if m[0] != '_' ]
+ if not meth in xmeth:
+ return
+ for m in xmeth:
+ setattr(self, m, getattr(LXattr, m))
+ return getattr(self, meth)
+
+Xattr = _MetaXattr()
+
+
+class Popen(subprocess.Popen):
+ """customized subclass of subprocess.Popen with a ring
+ buffer for children error output"""
+
+ @classmethod
+ def init_errhandler(cls):
+ """start the thread which handles children's error output"""
+ cls.errstore = {}
+ def tailer():
+ while True:
+ errstore = cls.errstore.copy()
+ try:
+ poe, _ ,_ = select([po.stderr for po in errstore], [], [], 1)
+ except (ValueError, SelectError):
+ continue
+ for po in errstore:
+ if po.stderr not in poe:
+ continue
+ po.lock.acquire()
+ try:
+ if po.on_death_row:
+ continue
+ la = errstore[po]
+ try:
+ fd = po.stderr.fileno()
+ except ValueError: # file is already closed
+ continue
+ l = os.read(fd, 1024)
+ if not l:
+ continue
+ tots = len(l)
+ for lx in la:
+ tots += len(lx)
+ while tots > 1<<20 and la:
+ tots -= len(la.pop(0))
+ la.append(l)
+ finally:
+ po.lock.release()
+ t = syncdutils.Thread(target = tailer)
+ t.start()
+ cls.errhandler = t
+
+ @classmethod
+ def fork(cls):
+ """fork wrapper that restarts errhandler thread in child"""
+ pid = os.fork()
+ if not pid:
+ cls.init_errhandler()
+ return pid
+
+ def __init__(self, args, *a, **kw):
+ """customizations for subprocess.Popen instantiation
+
+ - 'close_fds' is taken to be the default
+ - if child's stderr is chosen to be managed,
+ register it with the error handler thread
+ """
+ self.args = args
+ if 'close_fds' not in kw:
+ kw['close_fds'] = True
+ self.lock = threading.Lock()
+ self.on_death_row = False
+ try:
+ sup(self, args, *a, **kw)
+ except:
+ ex = sys.exc_info()[1]
+ if not isinstance(ex, OSError):
+ raise
+ raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \
+ (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno)))
+ if kw.get('stderr') == subprocess.PIPE:
+ assert(getattr(self, 'errhandler', None))
+ self.errstore[self] = []
+
+ def errlog(self):
+ """make a log about child's failure event"""
+ filling = ""
+ if self.elines:
+ filling = ", saying:"
+ logging.error("""command "%s" returned with %s%s""" % \
+ (" ".join(self.args), repr(self.returncode), filling))
+ lp = ''
+ def logerr(l):
+ logging.error(self.args[0] + "> " + l)
+ for l in self.elines:
+ ls = l.split('\n')
+ ls[0] = lp + ls[0]
+ lp = ls.pop()
+ for ll in ls:
+ logerr(ll)
+ if lp:
+ logerr(lp)
+
+ def errfail(self):
+ """fail nicely if child did not terminate with success"""
+ self.errlog()
+ syncdutils.finalize(exval = 1)
+
+ def terminate_geterr(self, fail_on_err = True):
+ """kill child, finalize stderr harvesting (unregister
+ from errhandler, set up .elines), fail on error if
+ asked for
+ """
+ self.lock.acquire()
+ try:
+ self.on_death_row = True
+ finally:
+ self.lock.release()
+ elines = self.errstore.pop(self)
+ if self.poll() == None:
+ self.terminate()
+ if self.poll() == None:
+ time.sleep(0.1)
+ self.kill()
+ self.wait()
+ while True:
+ if not select([self.stderr],[],[],0.1)[0]:
+ break
+ b = os.read(self.stderr.fileno(), 1024)
+ if b:
+ elines.append(b)
+ else:
+ break
+ self.stderr.close()
+ self.elines = elines
+ if fail_on_err and self.returncode != 0:
+ self.errfail()
+
+
+class Server(object):
+ """singleton implemening those filesystem access primitives
+ which are needed for geo-replication functionality
+
+ (Singleton in the sense it's a class which has only static
+ and classmethods and is used directly, without instantiation.)
+ """
+
+ GX_NSPACE = (privileged() and "trusted" or "system") + ".glusterfs"
+ NTV_FMTSTR = "!" + "B"*19 + "II"
+ FRGN_XTRA_FMT = "I"
+ FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT
+
+ def _pathguard(f):
+ """decorator method that checks
+ the path argument of the decorated
+ functions to make sure it does not
+ point out of the managed tree
+ """
+
+ fc = getattr(f, 'func_code', None)
+ if not fc:
+ # python 3
+ fc = f.__code__
+ pi = list(fc.co_varnames).index('path')
+ def ff(*a):
+ path = a[pi]
+ ps = path.split('/')
+ if path[0] == '/' or '..' in ps:
+ raise ValueError('unsafe path')
+ return f(*a)
+ return ff
+
+ @staticmethod
+ @_pathguard
+ def entries(path):
+ """directory entries in an array"""
+ # prevent symlinks being followed
+ if not stat.S_ISDIR(os.lstat(path).st_mode):
+ raise OSError(ENOTDIR, os.strerror(ENOTDIR))
+ return os.listdir(path)
+
+ @classmethod
+ @_pathguard
+ def purge(cls, path, entries=None):
+ """force-delete subtrees
+
+ If @entries is not specified, delete
+ the whole subtree under @path (including
+ @path).
+
+ Otherwise, @entries should be a
+ a sequence of children of @path, and
+ the effect is identical with a joint
+ @entries-less purge on them, ie.
+
+ for e in entries:
+ cls.purge(os.path.join(path, e))
+ """
+ me_also = entries == None
+ if not entries:
+ try:
+ # if it's a symlink, prevent
+ # following it
+ try:
+ os.unlink(path)
+ return
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EISDIR:
+ entries = os.listdir(path)
+ else:
+ raise
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOTDIR, ENOENT, ELOOP):
+ try:
+ os.unlink(path)
+ return
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == ENOENT:
+ return
+ raise
+ else:
+ raise
+ for e in entries:
+ cls.purge(os.path.join(path, e))
+ if me_also:
+ os.rmdir(path)
+
+ @classmethod
+ @_pathguard
+ def _create(cls, path, ctor):
+ """path creation backend routine"""
+ try:
+ ctor(path)
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno == EEXIST:
+ cls.purge(path)
+ return ctor(path)
+ raise
+
+ @classmethod
+ @_pathguard
+ def mkdir(cls, path):
+ cls._create(path, os.mkdir)
+
+ @classmethod
+ @_pathguard
+ def symlink(cls, lnk, path):
+ cls._create(path, lambda p: os.symlink(lnk, p))
+
+ @classmethod
+ @_pathguard
+ def xtime(cls, path, uuid):
+ """query xtime extended attribute
+
+ Return xtime of @path for @uuid as a pair of integers.
+ "Normal" errors due to non-existent @path or extended attribute
+ are tolerated and errno is returned in such a case.
+ """
+
+ try:
+ return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno in (ENOENT, ENODATA, ENOTDIR):
+ return ex.errno
+ else:
+ raise
+
+ @classmethod
+ def xtime_vec(cls, path, *uuids):
+ """vectored version of @xtime
+
+ accepts a list of uuids and returns a dictionary
+ with uuid as key(s) and xtime as value(s)
+ """
+ xt = {}
+ for uuid in uuids:
+ xtu = cls.xtime(path, uuid)
+ if xtu == ENODATA:
+ xtu = None
+ if isinstance(xtu, int):
+ return xtu
+ xt[uuid] = xtu
+ return xt
+
+ @classmethod
+ @_pathguard
+ def set_xtime(cls, path, uuid, mark):
+ """set @mark as xtime for @uuid on @path"""
+ Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark))
+
+ @classmethod
+ def set_xtime_vec(cls, path, mark_dct):
+ """vectored (or dictered) version of set_xtime
+
+ ignore values that match @ignore
+ """
+ for u,t in mark_dct.items():
+ cls.set_xtime(path, u, t)
+
+ @staticmethod
+ @_pathguard
+ def setattr(path, adct):
+ """set file attributes
+
+ @adct is a dict, where 'own', 'mode' and 'times'
+ keys are looked for and values used to perform
+ chown, chmod or utimes on @path.
+ """
+ own = adct.get('own')
+ if own:
+ os.lchown(path, *own)
+ mode = adct.get('mode')
+ if mode:
+ os.chmod(path, stat.S_IMODE(mode))
+ times = adct.get('times')
+ if times:
+ os.utime(path, times)
+
+ @staticmethod
+ def pid():
+ return os.getpid()
+
+ last_keep_alive = 0
+ @classmethod
+ def keep_alive(cls, dct):
+ """process keepalive messages.
+
+ Return keep-alive counter (number of received keep-alive
+ messages).
+
+ Now the "keep-alive" message can also have a payload which is
+ used to set a foreign volume-mark on the underlying file system.
+ """
+ if dct:
+ key = '.'.join([cls.GX_NSPACE, 'volume-mark', dct['uuid']])
+ val = struct.pack(cls.FRGN_FMTSTR,
+ *(dct['version'] +
+ tuple(int(x,16) for x in re.findall('(?:[\da-f]){2}', dct['uuid'])) +
+ (dct['retval'],) + dct['volume_mark'][0:2] + (dct['timeout'],)))
+ Xattr.lsetxattr('.', key, val)
+ cls.last_keep_alive += 1
+ return cls.last_keep_alive
+
+ @staticmethod
+ def version():
+ """version used in handshake"""
+ return 1.0
+
+
+class SlaveLocal(object):
+ """mix-in class to implement some factes of a slave server
+
+ ("mix-in" is sort of like "abstract class", ie. it's not
+ instantiated just included in the ancesty DAG. I use "mix-in"
+ to indicate that it's not used as an abstract base class,
+ rather just taken in to implement additional functionality
+ on the basis of the assumed availability of certain interfaces.)
+ """
+
+ def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
+ return not remote
+
+ def service_loop(self):
+ """start a RePCe server serving self's server
+
+ stop servicing if a timeout is configured and got no
+ keep-alime in that inteval
+ """
+
+ if boolify(gconf.use_rsync_xattrs) and not privileged():
+ raise GsyncdError("using rsync for extended attributes is not supported")
+
+ repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs))
+ t = syncdutils.Thread(target=lambda: (repce.service_loop(),
+ syncdutils.finalize()))
+ t.start()
+ logging.info("slave listening")
+ if gconf.timeout and int(gconf.timeout) > 0:
+ while True:
+ lp = self.server.last_keep_alive
+ time.sleep(int(gconf.timeout))
+ if lp == self.server.last_keep_alive:
+ logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout))
+ break
+ else:
+ select((), (), ())
+
+class SlaveRemote(object):
+ """mix-in class to implement an interface to a remote slave"""
+
+ def connect_remote(self, rargs=[], **opts):
+ """connects to a remote slave
+
+ Invoke an auxiliary utility (slave gsyncd, possibly wrapped)
+ which sets up the connection and set up a RePCe client to
+ communicate throuh its stdio.
+ """
+ slave = opts.get('slave', self.url)
+ extra_opts = []
+ so = getattr(gconf, 'session_owner', None)
+ if so:
+ extra_opts += ['--session-owner', so]
+ if boolify(gconf.use_rsync_xattrs):
+ extra_opts.append('--use-rsync-xattrs')
+ po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + \
+ ['-N', '--listen', '--timeout', str(gconf.timeout), slave],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ gconf.transport = po
+ return self.start_fd_client(po.stdout, po.stdin, **opts)
+
+ def start_fd_client(self, i, o, **opts):
+ """set up RePCe client, handshake with server
+
+ It's cut out as a separate method to let
+ subclasses hook into client startup
+ """
+ self.server = RepceClient(i, o)
+ rv = self.server.__version__()
+ exrv = {'proto': repce.repce_version, 'object': Server.version()}
+ da0 = (rv, exrv)
+ da1 = ({}, {})
+ for i in range(2):
+ for k, v in da0[i].iteritems():
+ da1[i][k] = int(v)
+ if da1[0] != da1[1]:
+ raise GsyncdError("RePCe major version mismatch: local %s, remote %s" % (exrv, rv))
+
+ def rsync(self, files, *args):
+ """invoke rsync"""
+ if not files:
+ raise GsyncdError("no files to sync")
+ logging.debug("files: " + ", ".join(files))
+ argv = gconf.rsync_command.split() + \
+ ['-aR0', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \
+ gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \
+ ['.'] + list(args)
+ po = Popen(argv, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
+ for f in files:
+ po.stdin.write(f)
+ po.stdin.write('\0')
+
+ po.stdin.close()
+ po.wait()
+ po.terminate_geterr(fail_on_err = False)
+
+ return po
+
+
+class AbstractUrl(object):
+ """abstract base class for url scheme classes"""
+
+ def __init__(self, path, pattern):
+ m = re.search(pattern, path)
+ if not m:
+ raise GsyncdError("malformed path")
+ self.path = path
+ return m.groups()
+
+ @property
+ def scheme(self):
+ return type(self).__name__.lower()
+
+ def canonical_path(self):
+ return self.path
+
+ def get_url(self, canonical=False, escaped=False):
+ """format self's url in various styles"""
+ if canonical:
+ pa = self.canonical_path()
+ else:
+ pa = self.path
+ u = "://".join((self.scheme, pa))
+ if escaped:
+ u = syncdutils.escape(u)
+ return u
+
+ @property
+ def url(self):
+ return self.get_url()
+
+
+ ### Concrete resource classes ###
+
+
+class FILE(AbstractUrl, SlaveLocal, SlaveRemote):
+ """scheme class for file:// urls
+
+ can be used to represent a file slave server
+ on slave side, or interface to a remote file
+ file server on master side
+ """
+
+ class FILEServer(Server):
+ """included server flavor"""
+ pass
+
+ server = FILEServer
+
+ def __init__(self, path):
+ sup(self, path, '^/')
+
+ def connect(self):
+ """inhibit the resource beyond"""
+ os.chdir(self.path)
+
+ def rsync(self, files):
+ return sup(self, files, self.path)
+
+
+class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
+ """scheme class for gluster:// urls
+
+ can be used to represent a gluster slave server
+ on slave side, or interface to a remote gluster
+ slave on master side, or to represent master
+ (slave-ish features come from the mixins, master
+ functionality is outsourced to GMaster from master)
+ """
+
+ class GLUSTERServer(Server):
+ "server enhancements for a glusterfs backend"""
+
+ @classmethod
+ def _attr_unpack_dict(cls, xattr, extra_fields = ''):
+ """generic volume mark fetching/parsing backed"""
+ fmt_string = cls.NTV_FMTSTR + extra_fields
+ buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string))
+ vm = struct.unpack(fmt_string, buf)
+ m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]]))
+ uuid = '-'.join(m.groups())
+ volinfo = { 'version': vm[0:2],
+ 'uuid' : uuid,
+ 'retval' : vm[18],
+ 'volume_mark': vm[19:21],
+ }
+ if extra_fields:
+ return volinfo, vm[-len(extra_fields):]
+ else:
+ return volinfo
+
+ @classmethod
+ def foreign_volume_infos(cls):
+ """return list of valid (not expired) foreign volume marks"""
+ dict_list = []
+ xattr_list = Xattr.llistxattr_buf('.')
+ for ele in xattr_list:
+ if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0:
+ d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT)
+ now = int(time.time())
+ if x[0] > now:
+ logging.debug("volinfo[%s] expires: %d (%d sec later)" % \
+ (d['uuid'], x[0], x[0] - now))
+ d['timeout'] = x[0]
+ dict_list.append(d)
+ else:
+ try:
+ Xattr.lremovexattr('.', ele)
+ except OSError:
+ pass
+ return dict_list
+
+ @classmethod
+ def native_volume_info(cls):
+ """get the native volume mark of the underlying gluster volume"""
+ try:
+ return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, 'volume-mark']))
+ except OSError:
+ ex = sys.exc_info()[1]
+ if ex.errno != ENODATA:
+ raise
+
+ server = GLUSTERServer
+
+ def __init__(self, path):
+ self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
+
+ def canonical_path(self):
+ return ':'.join([gethostbyname(self.host), self.volume])
+
+ def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
+ return True
+
+ class Mounter(object):
+ """Abstract base class for mounter backends"""
+
+ def __init__(self, params):
+ self.params = params
+ self.mntpt = None
+
+ @classmethod
+ def get_glusterprog(cls):
+ return os.path.join(gconf.gluster_command_dir, cls.glusterprog)
+
+ def umount_l(self, d):
+ """perform lazy umount"""
+ po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE)
+ po.wait()
+ return po
+
+ @classmethod
+ def make_umount_argv(cls, d):
+ raise NotImplementedError
+
+ def make_mount_argv(self, *a):
+ raise NotImplementedError
+
+ def cleanup_mntpt(self, *a):
+ pass
+
+ def handle_mounter(self, po):
+ po.wait()
+
+ def inhibit(self, *a):
+ """inhibit a gluster filesystem
+
+ Mount glusterfs over a temporary mountpoint,
+ change into the mount, and lazy unmount the
+ filesystem.
+ """
+
+ mpi, mpo = os.pipe()
+ mh = Popen.fork()
+ if mh:
+ os.close(mpi)
+ fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
+ d = None
+ margv = self.make_mount_argv(*a)
+ if self.mntpt:
+ # mntpt is determined pre-mount
+ d = self.mntpt
+ os.write(mpo, d + '\0')
+ po = Popen(margv, **self.mountkw)
+ self.handle_mounter(po)
+ po.terminate_geterr()
+ logging.debug('auxiliary glusterfs mount in place')
+ if not d:
+ # mntpt is determined during mount
+ d = self.mntpt
+ os.write(mpo, d + '\0')
+ os.write(mpo, 'M')
+ t = syncdutils.Thread(target=lambda: os.chdir(d))
+ t.start()
+ tlim = gconf.starttime + int(gconf.connection_timeout)
+ while True:
+ if not t.isAlive():
+ break
+ if time.time() >= tlim:
+ syncdutils.finalize(exval = 1)
+ time.sleep(1)
+ os.close(mpo)
+ _, rv = syncdutils.waitpid(mh, 0)
+ if rv:
+ rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \
+ (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0)
+ logging.warn('stale mount possibly left behind on ' + d)
+ raise GsyncdError("cleaning up temp mountpoint %s failed with status %d" % \
+ (d, rv))
+ else:
+ rv = 0
+ try:
+ os.setsid()
+ os.close(mpo)
+ mntdata = ''
+ while True:
+ c = os.read(mpi, 1)
+ if not c:
+ break
+ mntdata += c
+ if mntdata:
+ mounted = False
+ if mntdata[-1] == 'M':
+ mntdata = mntdata[:-1]
+ assert(mntdata)
+ mounted = True
+ assert(mntdata[-1] == '\0')
+ mntpt = mntdata[:-1]
+ assert(mntpt)
+ if mounted:
+ po = self.umount_l(mntpt)
+ po.terminate_geterr(fail_on_err = False)
+ if po.returncode != 0:
+ po.errlog()
+ rv = po.returncode
+ self.cleanup_mntpt(mntpt)
+ except:
+ logging.exception('mount cleanup failure:')
+ rv = 200
+ os._exit(rv)
+ logging.debug('auxiliary glusterfs mount prepared')
+
+ class DirectMounter(Mounter):
+ """mounter backend which calls mount(8), umount(8) directly"""
+
+ mountkw = {'stderr': subprocess.PIPE}
+ glusterprog = 'glusterfs'
+
+ @staticmethod
+ def make_umount_argv(d):
+ return ['umount', '-l', d]
+
+ def make_mount_argv(self):
+ self.mntpt = tempfile.mkdtemp(prefix = 'gsyncd-aux-mount-')
+ return [self.get_glusterprog()] + ['--' + p for p in self.params] + [self.mntpt]
+
+ def cleanup_mntpt(self, mntpt = None):
+ if not mntpt:
+ mntpt = self.mntpt
+ os.rmdir(mntpt)
+
+ class MountbrokerMounter(Mounter):
+ """mounter backend using the mountbroker gluster service"""
+
+ mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE}
+ glusterprog = 'gluster'
+
+ @classmethod
+ def make_cli_argv(cls):
+ return [cls.get_glusterprog()] + gconf.gluster_cli_options.split() + ['system::']
+
+ @classmethod
+ def make_umount_argv(cls, d):
+ return cls.make_cli_argv() + ['umount', d, 'lazy']
+
+ def make_mount_argv(self, label):
+ return self.make_cli_argv() + \
+ ['mount', label, 'user-map-root=' + syncdutils.getusername()] + self.params
+
+ def handle_mounter(self, po):
+ self.mntpt = po.stdout.readline()[:-1]
+ po.stdout.close()
+ sup(self, po)
+ if po.returncode != 0:
+ # if cli terminated with error due to being
+ # refused by glusterd, what it put
+ # out on stdout is a diagnostic message
+ logging.error('glusterd answered: %s' % self.mntpt)
+
+ def connect(self):
+ """inhibit the resource beyond
+
+ Choose mounting backend (direct or mountbroker),
+ set up glusterfs parameters and perform the mount
+ with given backend
+ """
+
+ label = getattr(gconf, 'mountbroker', None)
+ if not label and not privileged():
+ label = syncdutils.getusername()
+ mounter = label and self.MountbrokerMounter or self.DirectMounter
+ params = gconf.gluster_params.split() + \
+ (gconf.gluster_log_level and ['log-level=' + gconf.gluster_log_level] or []) + \
+ ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + self.host,
+ 'volfile-id=' + self.volume, 'client-pid=-1']
+ mounter(params).inhibit(*[l for l in [label] if l])
+
+ def connect_remote(self, *a, **kw):
+ sup(self, *a, **kw)
+ self.slavedir = "/proc/%d/cwd" % self.server.pid()
+
+ def service_loop(self, *args):
+ """enter service loop
+
+ - if slave given, instantiate GMaster and
+ pass control to that instance, which implements
+ master behavior
+ - else do that's what's inherited
+ """
+ if args:
+ gmaster_builder()(self, args[0]).crawl_loop()
+ else:
+ sup(self, *args)
+
+ def rsync(self, files):
+ return sup(self, files, self.slavedir)
+
+
+class SSH(AbstractUrl, SlaveRemote):
+ """scheme class for ssh:// urls
+
+ interface to remote slave on master side
+ implementing an ssh based proxy
+ """
+
+ def __init__(self, path):
+ self.remote_addr, inner_url = sup(self, path,
+ '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ]))
+ self.inner_rsc = parse_url(inner_url)
+
+ def canonical_path(self):
+ m = re.match('([^@]+)@(.+)', self.remote_addr)
+ if m:
+ u, h = m.groups()
+ else:
+ u, h = syncdutils.getusername(), self.remote_addr
+ remote_addr = '@'.join([u, gethostbyname(h)])
+ return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)])
+
+ def can_connect_to(self, remote):
+ """determine our position in the connectibility matrix"""
+ return False
+
+ def start_fd_client(self, *a, **opts):
+ """customizations for client startup
+
+ - be a no-op if we are to daemonize (client startup is deferred
+ to post-daemon stage)
+ - determine target url for rsync after consulting server
+ """
+ if opts.get('deferred'):
+ return a
+ sup(self, *a)
+ ityp = type(self.inner_rsc)
+ if ityp == FILE:
+ slavepath = self.inner_rsc.path
+ elif ityp == GLUSTER:
+ slavepath = "/proc/%d/cwd" % self.server.pid()
+ else:
+ raise NotImplementedError
+ self.slaveurl = ':'.join([self.remote_addr, slavepath])
+
+ def connect_remote(self, go_daemon=None):
+ """connect to inner slave url through outer ssh url
+
+ Wrap the connecting utility in ssh.
+
+ Much care is put into daemonizing: in that case
+ ssh is started before daemonization, but
+ RePCe client is to be created after that (as ssh
+ interactive password auth would be defeated by
+ a daemonized ssh, while client should be present
+ only in the final process). In that case the action
+ is taken apart to two parts, this method is ivoked
+ once pre-daemon, once post-daemon. Use @go_daemon
+ to deiced what part to perform.
+
+ [NB. ATM gluster product does not makes use of interactive
+ authentication.]
+ """
+ if go_daemon == 'done':
+ return self.start_fd_client(*self.fd_pair)
+ gconf.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'))
+ deferred = go_daemon == 'postconn'
+ ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred)
+ if deferred:
+ # send a message to peer so that we can wait for
+ # the answer from which we know connection is
+ # established and we can proceed with daemonization
+ # (doing that too early robs the ssh passwd prompt...)
+ # However, we'd better not start the RepceClient
+ # before daemonization (that's not preserved properly
+ # in daemon), we just do a an ad-hoc linear put/get.
+ i, o = ret
+ inf = os.fdopen(i)
+ repce.send(o, None, '__repce_version__')
+ select((inf,), (), ())
+ repce.recv(inf)
+ # hack hack hack: store a global reference to the file
+ # to save it from getting GC'd which implies closing it
+ gconf.permanent_handles.append(inf)
+ self.fd_pair = (i, o)
+ return 'should'
+
+ def rsync(self, files):
+ return sup(self, files, '-e', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args),
+ *(gconf.rsync_ssh_options.split() + [self.slaveurl]))