diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 961 |
1 files changed, 961 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py new file mode 100644 index 00000000000..f903f30595d --- /dev/null +++ b/geo-replication/syncdaemon/master.py @@ -0,0 +1,961 @@ +import os +import sys +import time +import stat +import random +import signal +import logging +import socket +import errno +import re +from errno import ENOENT, ENODATA, EPIPE +from threading import currentThread, Condition, Lock +from datetime import datetime +try: + from hashlib import md5 as md5 +except ImportError: + # py 2.4 + from md5 import new as md5 + +from gconf import gconf +from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ + escape, unescape, select + +URXTIME = (-1, 0) + +# Utility functions to help us to get to closer proximity +# of the DRY principle (no, don't look for elevated or +# perspectivistic things here) + +def _xtime_now(): + t = time.time() + sec = int(t) + nsec = int((t - sec) * 1000000) + return (sec, nsec) + +def _volinfo_hook_relax_foreign(self): + volinfo_sys = self.get_sys_volinfo() + fgn_vi = volinfo_sys[self.KFGN] + if fgn_vi: + expiry = fgn_vi['timeout'] - int(time.time()) + 1 + logging.info('foreign volume info found, waiting %d sec for expiry' % \ + expiry) + time.sleep(expiry) + volinfo_sys = self.get_sys_volinfo() + self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, + volinfo_sys) + if self.inter_master: + raise GsyncdError("cannot be intermediate master in special mode") + return (volinfo_sys, state_change) + + +# The API! + +def gmaster_builder(): + """produce the GMaster class variant corresponding + to sync mode""" + this = sys.modules[__name__] + modemixin = gconf.special_sync_mode + if not modemixin: + modemixin = 'normal' + logging.info('setting up master for %s sync mode' % modemixin) + modemixin = getattr(this, modemixin.capitalize() + 'Mixin') + sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin + purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin + class _GMaster(GMasterBase, modemixin, sendmarkmixin, purgemixin): + pass + return _GMaster + + +# Mixin classes that implement the data format +# and logic particularities of the certain +# sync modes + +class NormalMixin(object): + """normal geo-rep behavior""" + + minus_infinity = URXTIME + + # following staticmethods ideally would be + # methods of an xtime object (in particular, + # implementing the hooks needed for comparison + # operators), but at this point we don't yet + # have a dedicated xtime class + + @staticmethod + def serialize_xtime(xt): + return "%d.%d" % tuple(xt) + + @staticmethod + def deserialize_xtime(xt): + return tuple(int(x) for x in xt.split(".")) + + @staticmethod + def native_xtime(xt): + return xt + + @staticmethod + def xtime_geq(xt0, xt1): + return xt0 >= xt1 + + def make_xtime_opts(self, is_master, opts): + if not 'create' in opts: + opts['create'] = is_master and not self.inter_master + if not 'default_xtime' in opts: + if is_master and self.inter_master: + opts['default_xtime'] = ENODATA + else: + opts['default_xtime'] = URXTIME + + def xtime_low(self, server, path, **opts): + xt = server.xtime(path, self.uuid) + if isinstance(xt, int) and xt != ENODATA: + return xt + if xt == ENODATA or xt < self.volmark: + if opts['create']: + xt = _xtime_now() + server.set_xtime(path, self.uuid, xt) + else: + xt = opts['default_xtime'] + return xt + + def keepalive_payload_hook(self, timo, gap): + # first grab a reference as self.volinfo + # can be changed in main thread + vi = self.volinfo + if vi: + # then have a private copy which we can mod + vi = vi.copy() + vi['timeout'] = int(time.time()) + timo + else: + # send keep-alives more frequently to + # avoid a delay in announcing our volume info + # to slave if it becomes established in the + # meantime + gap = min(10, gap) + return (vi, gap) + + def volinfo_hook(self): + volinfo_sys = self.get_sys_volinfo() + self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, + volinfo_sys) + return (volinfo_sys, state_change) + + def xtime_reversion_hook(self, path, xtl, xtr): + if xtr > xtl: + raise GsyncdError("timestamp corruption for " + path) + + def need_sync(self, e, xte, xtrd): + return xte > xtrd + + def set_slave_xtime(self, path, mark): + self.slave.server.set_xtime(path, self.uuid, mark) + +class WrapupMixin(NormalMixin): + """a variant that differs from normal in terms + of ignoring non-indexed files""" + + @staticmethod + def make_xtime_opts(is_master, opts): + if not 'create' in opts: + opts['create'] = False + if not 'default_xtime' in opts: + opts['default_xtime'] = URXTIME + + @staticmethod + def keepalive_payload_hook(timo, gap): + return (None, gap) + + def volinfo_hook(self): + return _volinfo_hook_relax_foreign(self) + +class BlindMixin(object): + """Geo-rep flavor using vectored xtime. + + Coordinates are the master, slave uuid pair; + in master coordinate behavior is normal, + in slave coordinate we force synchronization + on any value difference (these are in disjunctive + relation, ie. if either orders the entry to be + synced, it shall be synced. + """ + + minus_infinity = (URXTIME, None) + + @staticmethod + def serialize_xtime(xt): + a = [] + for x in xt: + if not x: + x = ('None', '') + a.extend(x) + return '.'.join(str(n) for n in a) + + @staticmethod + def deserialize_xtime(xt): + a = xt.split(".") + a = (tuple(a[0:2]), tuple(a[3:4])) + b = [] + for p in a: + if p[0] == 'None': + p = None + else: + p = tuple(int(x) for x in p) + b.append(p) + return tuple(b) + + @staticmethod + def native_xtime(xt): + return xt[0] + + @staticmethod + def xtime_geq(xt0, xt1): + return (not xt1[0] or xt0[0] >= xt1[0]) and \ + (not xt1[1] or xt0[1] >= xt1[1]) + + @property + def ruuid(self): + if self.volinfo_r: + return self.volinfo_r['uuid'] + + @staticmethod + def make_xtime_opts(is_master, opts): + if not 'create' in opts: + opts['create'] = is_master + if not 'default_xtime' in opts: + opts['default_xtime'] = URXTIME + + def xtime_low(self, server, path, **opts): + xtd = server.xtime_vec(path, self.uuid, self.ruuid) + if isinstance(xtd, int): + return xtd + xt = (xtd[self.uuid], xtd[self.ruuid]) + if not xt[1] and (not xt[0] or xt[0] < self.volmark): + if opts['create']: + # not expected, but can happen if file originates + # from interrupted gsyncd transfer + logging.warn('have to fix up missing xtime on ' + path) + xt0 = _xtime_now() + server.set_xtime(path, self.uuid, xt0) + else: + xt0 = opts['default_xtime'] + xt = (xt0, xt[1]) + return xt + + @staticmethod + def keepalive_payload_hook(timo, gap): + return (None, gap) + + def volinfo_hook(self): + res = _volinfo_hook_relax_foreign(self) + volinfo_r_new = self.slave.server.native_volume_info() + if volinfo_r_new['retval']: + raise GsyncdError("slave is corrupt") + if getattr(self, 'volinfo_r', None): + if self.volinfo_r['uuid'] != volinfo_r_new['uuid']: + raise GsyncdError("uuid mismatch on slave") + self.volinfo_r = volinfo_r_new + return res + + def xtime_reversion_hook(self, path, xtl, xtr): + if not isinstance(xtr[0], int) and \ + (isinstance(xtl[0], int) or xtr[0] > xtl[0]): + raise GsyncdError("timestamp corruption for " + path) + + def need_sync(self, e, xte, xtrd): + if xte[0]: + if not xtrd[0] or xte[0] > xtrd[0]: + # there is outstanding diff at 0th pos, + # we can short-cut to true + return True + # we arrived to this point by either of these + # two possiblilites: + # - no outstanding difference at 0th pos, + # wanna see 1st pos if he raises veto + # against "no need to sync" proposal + # - no data at 0th pos, 1st pos will have + # to decide (due to xtime assignment, + # in this case 1st pos does carry data + # -- iow, if 1st pos did not have data, + # and 0th neither, 0th would have been + # force-feeded) + if not xte[1]: + # no data, no veto + return False + # the hard work: for 1st pos, + # the conduct is fetch corresponding + # slave data and do a "blind" comparison + # (ie. do not care who is newer, we trigger + # sync on non-identical xitmes) + xtr = self.xtime(e, self.slave) + return isinstance(xtr, int) or xte[1] != xtr[1] + + def set_slave_xtime(self, path, mark): + xtd = {} + for (u, t) in zip((self.uuid, self.ruuid), mark): + if t: + xtd[u] = t + self.slave.server.set_xtime_vec(path, xtd) + + +# Further mixins for certain tunable behaviors + +class SendmarkNormalMixin(object): + + def sendmark_regular(self, *a, **kw): + return self.sendmark(*a, **kw) + +class SendmarkRsyncMixin(object): + + def sendmark_regular(self, *a, **kw): + pass + + +class PurgeNormalMixin(object): + + def purge_missing(self, path, names): + self.slave.server.purge(path, names) + +class PurgeNoopMixin(object): + + def purge_missing(self, path, names): + pass + + + +class GMasterBase(object): + """abstract class impementling master role""" + + KFGN = 0 + KNAT = 1 + + def get_sys_volinfo(self): + """query volume marks on fs root + + err out on multiple foreign masters + """ + fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ + self.master.server.native_volume_info() + fgn_vi = None + if fgn_vis: + if len(fgn_vis) > 1: + raise GsyncdError("cannot work with multiple foreign masters") + fgn_vi = fgn_vis[0] + return fgn_vi, nat_vi + + @property + def uuid(self): + if self.volinfo: + return self.volinfo['uuid'] + + @property + def volmark(self): + if self.volinfo: + return self.volinfo['volume_mark'] + + @property + def inter_master(self): + """decide if we are an intermediate master + in a cascading setup + """ + return self.volinfo_state[self.KFGN] and True or False + + def xtime(self, path, *a, **opts): + """get amended xtime + + as of amending, we can create missing xtime, or + determine a valid value if what we get is expired + (as of the volume mark expiry); way of amendig + depends on @opts and on subject of query (master + or slave). + """ + if a: + rsc = a[0] + else: + rsc = self.master + self.make_xtime_opts(rsc == self.master, opts) + return self.xtime_low(rsc.server, path, **opts) + + def __init__(self, master, slave): + self.master = master + self.slave = slave + self.jobtab = {} + self.syncer = Syncer(slave) + # crawls vs. turns: + # - self.crawls is simply the number of crawl() invocations on root + # - one turn is a maximal consecutive sequence of crawls so that each + # crawl in it detects a change to be synced + # - self.turns is the number of turns since start + # - self.total_turns is a limit so that if self.turns reaches it, then + # we exit (for diagnostic purposes) + # so, eg., if the master fs changes unceasingly, self.turns will remain 0. + self.crawls = 0 + self.turns = 0 + self.total_turns = int(gconf.turns) + self.lastreport = {'crawls': 0, 'turns': 0} + self.start = None + self.change_seen = None + self.syncTime=0 + self.lastSyncTime=0 + self.crawlStartTime=0 + self.crawlTime=0 + self.filesSynced=0 + self.bytesSynced=0 + # the authoritative (foreign, native) volinfo pair + # which lets us deduce what to do when we refetch + # the volinfos from system + uuid_preset = getattr(gconf, 'volume_id', None) + self.volinfo_state = (uuid_preset and {'uuid': uuid_preset}, None) + # the actual volinfo we make use of + self.volinfo = None + self.terminate = False + self.checkpoint_thread = None + + @classmethod + def _checkpt_param(cls, chkpt, prm, xtimish=True): + """use config backend to lookup a parameter belonging to + checkpoint @chkpt""" + cprm = getattr(gconf, 'checkpoint_' + prm, None) + if not cprm: + return + chkpt_mapped, val = cprm.split(':', 1) + if unescape(chkpt_mapped) != chkpt: + return + if xtimish: + val = cls.deserialize_xtime(val) + return val + + @classmethod + def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): + """use config backend to store a parameter associated + with checkpoint @chkpt""" + if xtimish: + val = cls.serialize_xtime(val) + gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) + + @staticmethod + def humantime(*tpair): + """format xtime-like (sec, nsec) pair to human readable format""" + ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ + strftime("%Y-%m-%d %H:%M:%S") + if len(tpair) > 1: + ts += '.' + str(tpair[1]) + return ts + + def get_extra_info(self): + str_info="\nFile synced : %d" %(self.filesSynced) + str_info+="\nBytes Synced : %d KB" %(self.syncer.bytesSynced) + str_info+="\nSync Time : %f seconds" %(self.syncTime) + self.crawlTime=datetime.now()-self.crawlStartTime + years , days =divmod(self.crawlTime.days,365.25) + years=int(years) + days=int(days) + + date="" + m, s = divmod(self.crawlTime.seconds, 60) + h, m = divmod(m, 60) + + if years!=0 : + date+=str(years)+" year " + if days!=0 : + date+=str(days)+" day " + if h!=0 : + date+=str(h)+" H : " + if m!=0 or h!=0 : + date+=str(m)+" M : " + + date+=str(s)+" S" + self.crawlTime=date + str_info+="\nCrawl Time : %s" %(str(self.crawlTime)) + str_info+="\n\0" + return str_info + + def checkpt_service(self, chan, chkpt, tgt): + """checkpoint service loop + + monitor and verify checkpoint status for @chkpt, and listen + for incoming requests for whom we serve a pretty-formatted + status report""" + if not chkpt: + # dummy loop for the case when there is no checkpt set + while True: + select([chan], [], []) + conn, _ = chan.accept() + conn.send(self.get_extra_info()) + conn.close() + completed = self._checkpt_param(chkpt, 'completed', xtimish=False) + if completed: + completed = tuple(int(x) for x in completed.split('.')) + while True: + s,_,_ = select([chan], [], [], (not completed) and 5 or None) + # either request made and we re-check to not + # give back stale data, or we still hunting for completion + if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark: + # indexing has been reset since setting the checkpoint + status = "is invalid" + else: + xtr = self.xtime('.', self.slave) + if isinstance(xtr, int): + raise GsyncdError("slave root directory is unaccessible (%s)", + os.strerror(xtr)) + ncompleted = self.xtime_geq(xtr, tgt) + if completed and not ncompleted: # stale data + logging.warn("completion time %s for checkpoint %s became stale" % \ + (self.humantime(*completed), chkpt)) + completed = None + gconf.confdata.delete('checkpoint-completed') + if ncompleted and not completed: # just reaching completion + completed = "%.6f" % time.time() + self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) + completed = tuple(int(x) for x in completed.split('.')) + logging.info("checkpoint %s completed" % chkpt) + status = completed and \ + "completed at " + self.humantime(completed[0]) or \ + "not reached yet" + if s: + conn = None + try: + conn, _ = chan.accept() + try: + conn.send(" | checkpoint %s %s %s" % (chkpt, status,self.get_extra_info())) + except: + exc = sys.exc_info()[1] + if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ + exc.errno == EPIPE: + logging.debug('checkpoint client disconnected') + else: + raise + finally: + if conn: + conn.close() + + def start_checkpoint_thread(self): + """prepare and start checkpoint service""" + if self.checkpoint_thread or not ( + getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None) + ): + return + chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + state_socket = os.path.join(gconf.socketdir, md5(gconf.state_socket_unencoded).hexdigest() + ".socket") + try: + os.unlink(state_socket) + except: + if sys.exc_info()[0] == OSError: + pass + chan.bind(state_socket) + chan.listen(1) + checkpt_tgt = None + if gconf.checkpoint: + checkpt_tgt = self._checkpt_param(gconf.checkpoint, 'target') + if not checkpt_tgt: + checkpt_tgt = self.xtime('.') + if isinstance(checkpt_tgt, int): + raise GsyncdError("master root directory is unaccessible (%s)", + os.strerror(checkpt_tgt)) + self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) + logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ + (repr(checkpt_tgt), gconf.checkpoint)) + t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) + t.start() + self.checkpoint_thread = t + + def crawl_loop(self): + """start the keep-alive thread and iterate .crawl""" + timo = int(gconf.timeout or 0) + if timo > 0: + def keep_alive(): + while True: + vi, gap = self.keepalive_payload_hook(timo, timo * 0.5) + self.slave.server.keep_alive(vi) + time.sleep(gap) + t = Thread(target=keep_alive) + t.start() + self.lastreport['time'] = time.time() + self.crawlStartTime=datetime.now() + while not self.terminate: + self.crawl() + + def add_job(self, path, label, job, *a, **kw): + """insert @job function to job table at @path with @label""" + if self.jobtab.get(path) == None: + self.jobtab[path] = [] + self.jobtab[path].append((label, a, lambda : job(*a, **kw))) + + def add_failjob(self, path, label): + """invoke .add_job with a job that does nothing just fails""" + logging.debug('salvaged: ' + label) + self.add_job(path, label, lambda: False) + + def wait(self, path, *args): + """perform jobs registered for @path + + Reset jobtab entry for @path, + determine success as the conjuction of + success of all the jobs. In case of + success, call .sendmark on @path + """ + jobs = self.jobtab.pop(path, []) + succeed = True + for j in jobs: + ret = j[-1]() + if not ret: + succeed = False + if succeed: + self.sendmark(path, *args) + return succeed + + def sendmark(self, path, mark, adct=None): + """update slave side xtime for @path to master side xtime + + also can send a setattr payload (see Server.setattr). + """ + if adct: + self.slave.server.setattr(path, adct) + self.set_slave_xtime(path, mark) + + @staticmethod + def volinfo_state_machine(volinfo_state, volinfo_sys): + """compute new volinfo_state from old one and incoming + as of current system state, also indicating if there was a + change regarding which volume mark is the authoritative one + + @volinfo_state, @volinfo_sys are pairs of volume mark dicts + (foreign, native). + + Note this method is marked as static, ie. the computation is + pure, without reliance on any excess implicit state. State + transitions which are deemed as ambiguous or banned will raise + an exception. + + """ + # store the value below "boxed" to emulate proper closures + # (variables of the enclosing scope are available inner functions + # provided they are no reassigned; mutation is OK). + param = FreeObject(relax_mismatch = False, state_change = None, index=-1) + def select_vi(vi0, vi): + param.index += 1 + if vi and (not vi0 or vi0['uuid'] == vi['uuid']): + if not vi0 and not param.relax_mismatch: + param.state_change = param.index + # valid new value found; for the rest, we are graceful about + # uuid mismatch + param.relax_mismatch = True + return vi + if vi0 and vi and vi0['uuid'] != vi['uuid'] and not param.relax_mismatch: + # uuid mismatch for master candidate, bail out + raise GsyncdError("aborting on uuid change from %s to %s" % \ + (vi0['uuid'], vi['uuid'])) + # fall back to old + return vi0 + newstate = tuple(select_vi(*vip) for vip in zip(volinfo_state, volinfo_sys)) + srep = lambda vi: vi and vi['uuid'][0:8] + logging.debug('(%s, %s) << (%s, %s) -> (%s, %s)' % \ + tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate)) + return newstate, param.state_change + + def crawl(self, path='.', xtl=None): + """crawling... + + Standing around + All the right people + Crawling + Tennis on Tuesday + The ladder is long + It is your nature + You've gotta suntan + Football on Sunday + Society boy + + Recursively walk the master side tree and check if updates are + needed due to xtime differences. One invocation of crawl checks + children of @path and do a recursive enter only on + those directory children where there is an update needed. + + Way of updates depend on file type: + - for symlinks, sync them directy and synchronously + - for regular children, register jobs for @path (cf. .add_job) to start + and wait on their rsync + - for directory children, register a job for @path which waits (.wait) + on jobs for the given child + (other kind of filesystem nodes are not considered) + + Those slave side children which do not exist on master are simply + purged (see Server.purge). + + Behavior is fault tolerant, synchronization is adaptive: if some action fails, + just go on relentlessly, adding a fail job (see .add_failjob) which will prevent + the .sendmark on @path, so when the next crawl will arrive to @path it will not + see it as up-to-date and will try to sync it again. While this semantics can be + supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris), + the ultimate reason which excludes other possibilities is simply transience: we cannot + assert that the file systems (master / slave) underneath do not change and actions + taken upon some condition will not lose their context by the time they are performed. + """ + if path == '.': + if self.start: + self.crawls += 1 + logging.debug("... crawl #%d done, took %.6f seconds" % \ + (self.crawls, time.time() - self.start)) + time.sleep(1) + self.start = time.time() + should_display_info = self.start - self.lastreport['time'] >= 60 + if should_display_info: + logging.info("completed %d crawls, %d turns", + self.crawls - self.lastreport['crawls'], + self.turns - self.lastreport['turns']) + self.lastreport.update(crawls = self.crawls, + turns = self.turns, + time = self.start) + volinfo_sys, state_change = self.volinfo_hook() + if self.inter_master: + self.volinfo = volinfo_sys[self.KFGN] + else: + self.volinfo = volinfo_sys[self.KNAT] + if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): + logging.info('new master is %s', self.uuid) + if self.volinfo: + logging.info("%s master with volume id %s ..." % \ + (self.inter_master and "intermediate" or "primary", + self.uuid)) + if state_change == self.KFGN: + gconf.configinterface.set('volume_id', self.uuid) + if self.volinfo: + if self.volinfo['retval']: + raise GsyncdError ("master is corrupt") + self.start_checkpoint_thread() + else: + if should_display_info or self.crawls == 0: + if self.inter_master: + logging.info("waiting for being synced from %s ..." % \ + self.volinfo_state[self.KFGN]['uuid']) + else: + logging.info("waiting for volume info ...") + return + logging.debug("entering " + path) + if not xtl: + xtl = self.xtime(path) + if isinstance(xtl, int): + self.add_failjob(path, 'no-local-node') + return + xtr = self.xtime(path, self.slave) + if isinstance(xtr, int): + if xtr != ENOENT: + self.slave.server.purge(path) + try: + self.slave.server.mkdir(path) + except OSError: + self.add_failjob(path, 'no-remote-node') + return + xtr = self.minus_infinity + else: + self.xtime_reversion_hook(path, xtl, xtr) + if xtl == xtr: + if path == '.' and self.change_seen: + self.turns += 1 + self.change_seen = False + if self.total_turns: + logging.info("finished turn #%s/%s" % \ + (self.turns, self.total_turns)) + if self.turns == self.total_turns: + logging.info("reached turn limit") + self.terminate = True + return + if path == '.': + self.change_seen = True + try: + dem = self.master.server.entries(path) + except OSError: + self.add_failjob(path, 'local-entries-fail') + return + random.shuffle(dem) + try: + des = self.slave.server.entries(path) + except OSError: + self.slave.server.purge(path) + try: + self.slave.server.mkdir(path) + des = self.slave.server.entries(path) + except OSError: + self.add_failjob(path, 'remote-entries-fail') + return + dd = set(des) - set(dem) + if dd: + self.purge_missing(path, dd) + chld = [] + for e in dem: + e = os.path.join(path, e) + xte = self.xtime(e) + if isinstance(xte, int): + logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) + elif self.need_sync(e, xte, xtr): + chld.append((e, xte)) + def indulgently(e, fnc, blame=None): + if not blame: + blame = path + try: + return fnc(e) + except (IOError, OSError): + ex = sys.exc_info()[1] + if ex.errno == ENOENT: + logging.warn("salvaged ENOENT for " + e) + self.add_failjob(blame, 'by-indulgently') + return False + else: + raise + for e, xte in chld: + st = indulgently(e, lambda e: os.lstat(e)) + if st == False: + continue + mo = st.st_mode + adct = {'own': (st.st_uid, st.st_gid)} + if stat.S_ISLNK(mo): + if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False: + continue + self.sendmark(e, xte, adct) + elif stat.S_ISREG(mo): + logging.debug("syncing %s ..." % e) + pb = self.syncer.add(e) + timeA=datetime.now() + def regjob(e, xte, pb): + if pb.wait(): + logging.debug("synced " + e) + self.sendmark_regular(e, xte) + + timeB=datetime.now() + self.lastSyncTime=timeB-timeA + self.syncTime=(self.syncTime+self.lastSyncTime.microseconds)/(10.0**6) + self.filesSynced=self.filesSynced+1 + return True + else: + logging.warn("failed to sync " + e) + self.add_job(path, 'reg', regjob, e, xte, pb) + elif stat.S_ISDIR(mo): + adct['mode'] = mo + if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct), + self.crawl(e, xte), + True)[-1], blame=e) == False: + continue + else: + # ignore fifos, sockets and special files + pass + if path == '.': + self.wait(path, xtl) + +class BoxClosedErr(Exception): + pass + +class PostBox(list): + """synchronized collection for storing things thought of as "requests" """ + + def __init__(self, *a): + list.__init__(self, *a) + # too bad Python stdlib does not have read/write locks... + # it would suffivce to grab the lock in .append as reader, in .close as writer + self.lever = Condition() + self.open = True + self.done = False + + def wait(self): + """wait on requests to be processed""" + self.lever.acquire() + if not self.done: + self.lever.wait() + self.lever.release() + return self.result + + def wakeup(self, data): + """wake up requestors with the result""" + self.result = data + self.lever.acquire() + self.done = True + self.lever.notifyAll() + self.lever.release() + + def append(self, e): + """post a request""" + self.lever.acquire() + if not self.open: + raise BoxClosedErr + list.append(self, e) + self.lever.release() + + def close(self): + """prohibit the posting of further requests""" + self.lever.acquire() + self.open = False + self.lever.release() + +class Syncer(object): + """a staged queue to relay rsync requests to rsync workers + + By "staged queue" its meant that when a consumer comes to the + queue, it takes _all_ entries, leaving the queue empty. + (I don't know if there is an official term for this pattern.) + + The queue uses a PostBox to accumulate incoming items. + When a consumer (rsync worker) comes, a new PostBox is + set up and the old one is passed on to the consumer. + + Instead of the simplistic scheme of having one big lock + which synchronizes both the addition of new items and + PostBox exchanges, use a separate lock to arbitrate consumers, + and rely on PostBox's synchronization mechanisms take + care about additions. + + There is a corner case racy situation, producers vs. consumers, + which is not handled by this scheme: namely, when the PostBox + exchange occurs in between being passed to the producer for posting + and the post placement. But that's what Postbox.close is for: + such a posting will find the PostBox closed, in which case + the producer can re-try posting against the actual PostBox of + the queue. + + To aid accumlation of items in the PostBoxen before grabbed + by an rsync worker, the worker goes to sleep a bit after + each completed syncjob. + """ + + def __init__(self, slave): + """spawn worker threads""" + self.slave = slave + self.lock = Lock() + self.pb = PostBox() + self.bytesSynced=0 + for i in range(int(gconf.sync_jobs)): + t = Thread(target=self.syncjob) + t.start() + + def syncjob(self): + """the life of a worker""" + while True: + pb = None + while True: + self.lock.acquire() + if self.pb: + pb, self.pb = self.pb, PostBox() + self.lock.release() + if pb: + break + time.sleep(0.5) + pb.close() + po = self.slave.rsync(pb) + if po.returncode == 0: + regEx=re.search('\ *total\ *transferred\ *file\ *size:\ *(\d+)\ *bytes\ *',po.stdout.read(),re.IGNORECASE) + if regEx: + self.bytesSynced+=(int(regEx.group(1)))/1024 + ret = True + elif po.returncode in (23, 24): + # partial transfer (cf. rsync(1)), that's normal + ret = False + else: + po.errfail() + pb.wakeup(ret) + + def add(self, e): + while True: + pb = self.pb + try: + pb.append(e) + return pb + except BoxClosedErr: + pass |