diff options
Diffstat (limited to 'xlators/features/marker/utils/syncdaemon/master.py')
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/master.py | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py new file mode 100644 index 000000000..a2f9f718e --- /dev/null +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -0,0 +1,240 @@ +import os +import sys +import time +import stat +import signal +import logging +import errno +from errno import ENOENT, ENODATA +from threading import Thread, currentThread, Condition, Lock + +from gconf import gconf + +URXTIME = (-1, 0) + +class GMaster(object): + + def get_volinfo(self): + self.volume_info = self.master.server.volume_info() + if self.volume_info['retval']: + raise RuntimeError("master is corrupt") + return self.volume_info + + @property + def uuid(self): + if not getattr(self, '_uuid', None): + self._uuid = self.volume_info['uuid'] + return self._uuid + + @property + def volmark(self): + return self.volume_info['volume_mark'] + + def xtime(self, path, *a, **opts): + if a: + rsc = a[0] + else: + rsc = self.master + if not 'create' in opts: + opts['create'] = rsc == self.master + xt = rsc.server.xtime(path, self.uuid) + if (isinstance(xt, int) or xt < self.volmark) and opts['create']: + t = time.time() + sec = int(t) + nsec = int((t - sec) * 1000000) + xt = (sec, nsec) + rsc.server.set_xtime(path, self.uuid, xt) + if xt == ENODATA: + xt = URXTIME + return xt + + def __init__(self, master, slave): + self.master = master + self.slave = slave + self.get_volinfo() + self.jobtab = {} + self.syncer = Syncer(slave) + self.total_turns = int(gconf.turns) + self.turns = 0 + self.start = None + self.change_seen = None + logging.info('master started on ' + self.uuid) + while True: + self.crawl() + + def add_job(self, path, label, job, *a, **kw): + if self.jobtab.get(path) == None: + self.jobtab[path] = [] + self.jobtab[path].append((label, a, lambda : job(*a, **kw))) + + def wait(self, path, mark): + jobs = self.jobtab.pop(path, []) + succeed = True + for j in jobs: + ret = j[-1]() + if not ret: + succeed = False + if succeed: + self.sendmark(path, mark) + return succeed + + def sendmark(self, path, mark): + self.slave.server.set_xtime(path, self.uuid, mark) + + def crawl(self, path='.', xtl=None): + if path == '.': + if self.start: + logging.info("crawl took %.6f" % (time.time() - self.start)) + time.sleep(1) + self.start = time.time() + logging.info("crawling...") + self.get_volinfo() + if self.volume_info['uuid'] != self.uuid: + raise RuntimeError("master uuid mismatch") + logging.debug("entering " + path) + if not xtl: + xtl = self.xtime(path) + xtr0 = self.xtime(path, self.slave) + if isinstance(xtr0, int): + xtr = URXTIME + else: + xtr = xtr0 + if xtr0 == ENOENT: + self.slave.server.mkdir(path) + else: + if xtr > xtl: + raise RuntimeError("timestamp corruption for " + path) + if xtl == xtr: + if path == '.' and self.total_turns and self.change_seen: + self.turns += 1 + self.change_seen = False + logging.info("finished turn #%s/%s" % (self.turns, self.total_turns)) + if self.turns == self.total_turns: + logging.info("reached turn limit, terminating.") + os.kill(os.getpid(), signal.SIGTERM) + return + if path == '.': + self.change_seen = True + dem, des = ( x.server.entries(path) for x in (self.master, self.slave) ) + dd = set(des) - set(dem) + if dd: + self.slave.server.purge(path, dd) + chld = [] + for e in dem: + e = os.path.join(path, e) + xte = self.xtime(e) + if isinstance(xte, int): + logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) + elif xte > xtr: + chld.append((e, xte)) + def indulgently(e, fnc, blame=None): + if not blame: + blame = path + try: + return fnc(e) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + logging.warn("salvaged ENOENT for" + e) + self.add_job(blame, 'salvage', lambda: False) + return False + else: + raise + for e, xte in chld: + mo = indulgently(e, lambda e: os.lstat(e).st_mode) + if mo == False: + continue + if stat.S_ISLNK(mo): + self.slave.server.symlink(os.readlink(e), e) + self.sendmark(e, xte) + elif stat.S_ISREG(mo): + logging.debug("syncing %s ..." % e) + pb = self.syncer.add(e) + def regjob(e, xte, pb): + if pb.wait(): + logging.debug("synced " + e) + self.sendmark(e, xte) + return True + else: + logging.error("failed to sync " + e) + self.add_job(path, 'reg', regjob, e, xte, pb) + elif stat.S_ISDIR(mo): + if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte), + self.crawl(e, xte), + True)[-1], blame=e) == False: + continue + else: + # ignore fifos, sockets and special files + pass + if path == '.': + self.wait(path, xtl) + +class BoxClosedErr(Exception): + pass + +class PostBox(list): + + def __init__(self, *a): + list.__init__(self, *a) + self.lever = Condition() + self.open = True + self.done = False + + def wait(self): + self.lever.acquire() + if not self.done: + self.lever.wait() + self.lever.release() + return self.result + + def wakeup(self, data): + self.result = data + self.lever.acquire() + self.done = True + self.lever.notifyAll() + self.lever.release() + + def append(self, e): + self.lever.acquire() + if not self.open: + raise BoxClosedErr + list.append(self, e) + self.lever.release() + + def close(self): + self.lever.acquire() + self.open = False + self.lever.release() + +class Syncer(object): + + def __init__(self, slave): + self.slave = slave + self.lock = Lock() + self.pb = PostBox() + for i in range(int(gconf.sync_jobs)): + t = Thread(target=self.syncjob) + t.setDaemon = True + t.start() + + def syncjob(self): + while True: + pb = None + while True: + self.lock.acquire() + if self.pb: + pb, self.pb = self.pb, PostBox() + self.lock.release() + if pb: + break + time.sleep(0.5) + pb.close() + pb.wakeup(self.slave.rsync(pb)) + + def add(self, e): + while True: + try: + self.pb.append(e) + return self.pb + except BoxClosedErr: + pass |