diff options
author | Csaba Henk <csaba@gluster.com> | 2011-01-27 05:23:35 +0000 |
---|---|---|
committer | Anand V. Avati <avati@dev.gluster.com> | 2011-01-27 03:17:20 -0800 |
commit | 85300e25f2d47e33b169d14fa9eb0b7cfe39011b (patch) | |
tree | 6a00e8790358f1321855122d90a78e669168c1d1 /xlators/features/marker/utils/syncdaemon/resource.py | |
parent | 7d883898c5225df3f7c38e67274b74ff8ac396c0 (diff) |
adding syncdaemon
Signed-off-by: Csaba Henk <csaba@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 2310 (georeplication)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=2310
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon/resource.py')
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/resource.py | 372 |
1 files changed, 372 insertions, 0 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py new file mode 100644 index 00000000000..bbf7459bb55 --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -0,0 +1,372 @@ +import re +import os +import sys +import time +import errno +import struct +import select +import logging +import tempfile +import threading +from ctypes import * +from ctypes.util import find_library +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR + +from gconf import gconf +import repce +from repce import RepceServer, RepceClient +from master import GMaster + +UrlRX = re.compile('\A(\w+)://(.*)') +HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) +UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") + +def sup(x, *a, **kw): + return getattr(super(type(x), x), sys._getframe(1).f_code.co_name)(*a, **kw) + +def desugar(ustr): + m = re.match('([^:]*):(.*)', ustr) + if m: + if not m.groups()[0]: + return "gluster://localhost" + ustr + elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]): + return "ssh://" + ustr + else: + return "gluster://#{str}" + else: + return "file://" + os.path.abspath(ustr) + +def parse_url(ustr): + m = UrlRX.match(ustr) + if not m: + ustr = desugar(ustr) + m = UrlRX.match(ustr) + if not m: + raise RuntimeError("malformed url") + sch, path = m.groups() + this = sys.modules[__name__] + if not hasattr(this, sch.upper()): + raise RuntimeError("unknown url scheme " + sch) + return getattr(this, sch.upper())(path) + + +class Xattr(object): + + libc = CDLL(find_library("libc")) + + @classmethod + def geterrno(cls): + return c_int.in_dll(cls.libc, 'errno').value + + @classmethod + def raise_oserr(cls): + errn = cls.geterrno() + raise OSError(errn, os.strerror(errn)) + + @classmethod + def lgetxattr(cls, path, attr, siz=0): + if siz: + buf = create_string_buffer('\0' * siz) + else: + buf = None + ret = cls.libc.lgetxattr(path, attr, buf, siz) + if ret == -1: + cls.raise_oserr() + if siz: + return buf.raw[:ret] + else: + return ret + + @classmethod + def lsetxattr(cls, path, attr, val): + ret = cls.libc.lsetxattr(path, attr, val, len(val), 0) + if ret == -1: + cls.raise_oserr() + + +class Server(object): + + GX_NSPACE = "trusted.glusterfs" + + @staticmethod + def entries(path): + try: + return os.listdir(path) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == ENOTDIR: + return [] + else: + raise + + @classmethod + def purge(cls, path, entries=None): + me_also = entries == None + if not entries: + try: + entries = os.listdir(path) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOTDIR, ENOENT): + try: + os.unlink(path) + return + except OSError: + ex = sys.exc_info()[1] + if ex.errno != ENOENT: + raise + else: + raise + for e in entries: + cls.purge(os.path.join(path, e)) + if me_also: + os.rmdir(path) + + @classmethod + def _create(cls, path, ctor): + try: + ctor(path) + except OSError: + ex = sys.exc_info()[1] + if ex.errno == EEXIST: + cls.purge(path) + return ctor(path) + raise + + @classmethod + def mkdir(cls, path): + cls._create(path, os.mkdir) + + @classmethod + def symlink(cls, lnk, path): + cls._create(path, lambda p: os.symlink(lnk, p)) + + @classmethod + def xtime(cls, path, uuid): + try: + return struct.unpack('!II', Xattr.lgetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), 8)) + except OSError: + ex = sys.exc_info()[1] + if ex.errno in (ENOENT, ENODATA): + return ex.errno + else: + raise + + @classmethod + def set_xtime(cls, path, uuid, mark): + Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) + + @staticmethod + def pid(): + return os.getpid() + + lastping = 0 + @classmethod + def ping(cls): + cls.lastping += 1 + return cls.lastping + + +class SlaveLocal(object): + + def can_connect_to(self, remote): + return not remote + + def service_loop(self): + repce = RepceServer(self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) + t = threading.Thread(target=repce.service_loop) + t.setDaemon(True) + t.start() + logging.info("slave listening") + if gconf.timeout and int(gconf.timeout) > 0: + while True: + lp = self.server.lastping + time.sleep(int(gconf.timeout)) + if lp == self.server.lastping: + logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout)) + break + else: + select.select((), (), ()) + +class SlaveRemote(object): + + def connect_remote(self, rargs=[], **opts): + slave = opts.get('slave', self.url) + ix, ox = os.pipe() + iy, oy = os.pipe() + pid = os.fork() + if not pid: + os.close(ox) + os.dup2(ix, sys.stdin.fileno()) + os.close(iy) + os.dup2(oy, sys.stdout.fileno()) + argv = rargs + gconf.remote_gsyncd.split() + ['-N', '--listen', '--timeout', str(gconf.timeout), slave] + os.execvp(argv[0], argv) + os.close(ix) + os.close(oy) + return self.start_fd_client(iy, ox, **opts) + + def start_fd_client(self, i, o, **opts): + self.server = RepceClient(i, o) + if gconf.timeout and int(gconf.timeout) > 0: + def pinger(): + while True: + self.server.ping() + time.sleep(int(gconf.timeout) * 0.5) + t = threading.Thread(target=pinger) + t.setDaemon(True) + t.start() + + def rsync(self, files, *args): + if not files: + raise RuntimeError("no files to sync") + logging.debug("files: " + ", ".join(files)) + argv = gconf.rsync_command.split() + gconf.rsync_extra.split() + ['-aR'] + files + list(args) + return os.spawnvp(os.P_WAIT, argv[0], argv) == 0 + + +class AbstractUrl(object): + + def __init__(self, path, pattern): + m = re.search(pattern, path) + if not m: + raise RuntimeError("malformed path") + self.path = path + return m.groups() + + def scheme(self): + return type(self).__name__.lower() + + @property + def url(self): + return "://".join((self.scheme(), self.path)) + + + ### Concrete resource classes ### + + +class FILE(AbstractUrl, SlaveLocal, SlaveRemote): + + class FILEServer(Server): + pass + + server = FILEServer + + def __init__(self, path): + sup(self, path, '^/') + + def connect(self): + os.chdir(self.path) + + def rsync(self, files): + return sup(self, files, self.path) + + +class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): + + class GLUSTERServer(Server): + + @classmethod + def volume_info(cls): + vm = struct.unpack('!' + 'B'*19 + 'II', + Xattr.lgetxattr('.', '.'.join([cls.GX_NSPACE, 'volume-mark']), 27)) + m = re.match('(.{8})(.{4})(.{4})(.{4})(.{12})', "".join(['%02x' % x for x in vm[2:18]])) + uuid = '-'.join(m.groups()) + return { 'version': vm[0:2], + 'uuid' : uuid, + 'retval' : vm[18], + 'volume_mark': vm[-2:] } + + server = GLUSTERServer + + def __init__(self, path): + self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) + + def can_connect_to(self, remote): + return True + + def connect(self): + d = tempfile.mkdtemp() + try: + argv = [gconf.gluster_command] + \ + (gconf.gluster_log_level and ['-L', gConf.gluster_log_level] or []) + \ + ['-l', gconf.gluster_log_file, '-s', self.host, + '--volfile-id', self.volume, '--client-pid=-1', d] + if os.spawnvp(os.P_WAIT, argv[0], argv): + raise RuntimeError("command failed: " + " ".join(argv)) + logging.debug('auxiliary glusterfs mount in place') + os.chdir(d) + argv = ['umount', '-l', d] + if os.spawnvp(os.P_WAIT, argv[0], argv): + raise RuntimeError("command failed: " + " ".join(argv)) + finally: + try: + os.rmdir(d) + except: + logging.warn('stale mount left behind on ' + d) + logging.debug('auxiliary glusterfs mount prepared') + + def connect_remote(self, *a, **kw): + sup(self, *a, **kw) + self.slavedir = "/proc/%d/cwd" % self.server.pid() + + def service_loop(self, *args): + if args: + GMaster(self, args[0]).crawl() + else: + sup(self, *args) + + def rsync(self, files): + return sup(self, files, self.slavedir) + + +class SSH(AbstractUrl, SlaveRemote): + + def __init__(self, path): + self.remote_addr, inner_url = sup(self, path, + '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ])) + self.inner_rsc = parse_url(inner_url) + + def can_connect_to(self, remote): + return False + + def start_fd_client(self, *a, **opts): + if opts['deferred']: + return a + sup(self, *a) + ityp = type(self.inner_rsc) + if ityp == FILE: + slavepath = self.inner_rsc.path + elif ityp == GLUSTER: + slavepath = "/proc/%d/cwd" % self.server.pid() + else: + raise NotImplementedError + self.slaveurl = ':'.join([self.remote_addr, slavepath]) + + def connect_remote(self, go_daemon=None): + if go_daemon == 'done': + return self.start_fd_client(*self.fd_pair) + gconf.setup_ssh_ctl(tempfile.mkdtemp()) + deferred = go_daemon == 'postconn' + ret = sup(self, gconf.ssh_command.split() + gconf.ssh_ctl_args + [self.remote_addr], slave=self.inner_rsc.url, deferred=deferred) + if deferred: + # send a ping to peer so that we can wait for + # the answer from which we know connection is + # established and we can proceed with daemonization + # (doing that too early robs the ssh passwd prompt...) + # However, we'd better not start the RepceClient + # before daemonization (that's not preserved properly + # in daemon), we just do a an ad-hoc linear put/get. + i, o = ret + inf = os.fdopen(i) + repce.send(o, None, 'ping') + select.select((inf,), (), ()) + repce.recv(inf) + # hack hack hack: store a global reference to the file + # to save it from getting GC'd which implies closing it + gconf._in_fd_reference = inf + self.fd_pair = (i, o) + return 'should' + + def rsync(self, files): + return sup(self, files, '-ze', " ".join(gconf.ssh_command.split() + gconf.ssh_ctl_args), self.slaveurl) |