diff options
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/gsyncd.py | 8 | ||||
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/master.py | 368 | ||||
-rw-r--r-- | xlators/features/marker/utils/syncdaemon/resource.py | 31 |
3 files changed, 341 insertions, 66 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py index d68cea6725e..165aebda1f9 100644 --- a/xlators/features/marker/utils/syncdaemon/gsyncd.py +++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py @@ -172,6 +172,14 @@ def main_i(): op.add_option('--allow-network', metavar='IPS', default='') op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs) op.add_option('--checkpoint', metavar='LABEL', default='') + # tunables for failover/failback mechanism: + # None - gsyncd behaves as normal + # blind - gsyncd works with xtime pairs to identify + # candidates for synchronization + # wrapup - same as normal mode but does not assign + # xtimes to orphaned files + # see crawl() for usage of the above tunables + op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP) op.add_option('-c', '--config-file', metavar='CONF', type=str, action='callback', callback=store_local) # duh. need to specify dest or value will be mapped to None :S diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index 4826037f134..945ebb75dd2 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -22,8 +22,281 @@ from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ URXTIME = (-1, 0) -class GMaster(object): - """class impementling master role""" +# Utility functions to help us to get to closer proximity +# of the DRY principle (no, don't look for elevated or +# perspectivistic things here) + +def _xtime_now(): + t = time.time() + sec = int(t) + nsec = int((t - sec) * 1000000) + return (sec, nsec) + +def _volinfo_hook_relax_foreign(self): + volinfo_sys = self.get_sys_volinfo() + fgn_vi = volinfo_sys[self.KFGN] + if fgn_vi: + expiry = fgn_vi['timeout'] - int(time.time()) + 1 + logging.info('foreign volume info found, waiting %d sec for expiry' % \ + expiry) + time.sleep(expiry) + volinfo_sys = self.get_sys_volinfo() + self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, + volinfo_sys) + if self.inter_master: + raise GsyncdError("cannot be intermediate master in special mode") + return (volinfo_sys, state_change) + + +# The API! + +def gmaster_builder(): + """produce the GMaster class variant corresponding + to sync mode""" + this = sys.modules[__name__] + mixin = gconf.special_sync_mode + if not mixin: + mixin = 'normal' + logging.info('setting up master for %s sync mode' % mixin) + mixin = getattr(this, mixin.capitalize() + 'Mixin') + class _GMaster(GMasterBase, mixin): + pass + return _GMaster + + +# Mixin classes that implement the data format +# and logic particularities of the certain +# sync modes + +class NormalMixin(object): + """normal geo-rep behavior""" + + minus_infinity = URXTIME + + # following staticmethods ideally would be + # methods of an xtime object (in particular, + # implementing the hooks needed for comparison + # operators), but at this point we don't yet + # have a dedicated xtime class + + @staticmethod + def serialize_xtime(xt): + return "%d.%d" % tuple(xt) + + @staticmethod + def deserialize_xtime(xt): + return tuple(int(x) for x in xt.split(".")) + + @staticmethod + def native_xtime(xt): + return xt + + @staticmethod + def xtime_geq(xt0, xt1): + return xt0 >= xt1 + + def make_xtime_opts(self, is_master, opts): + if not 'create' in opts: + opts['create'] = is_master and not self.inter_master + if not 'default_xtime' in opts: + if is_master and self.inter_master: + opts['default_xtime'] = ENODATA + else: + opts['default_xtime'] = URXTIME + + def xtime_low(self, server, path, **opts): + xt = server.xtime(path, self.uuid) + if isinstance(xt, int) and xt != ENODATA: + return xt + if xt == ENODATA or xt < self.volmark: + if opts['create']: + xt = _xtime_now() + server.set_xtime(path, self.uuid, xt) + else: + xt = opts['default_xtime'] + return xt + + def keepalive_payload_hook(self, timo, gap): + # first grab a reference as self.volinfo + # can be changed in main thread + vi = self.volinfo + if vi: + # then have a private copy which we can mod + vi = vi.copy() + vi['timeout'] = int(time.time()) + timo + else: + # send keep-alives more frequently to + # avoid a delay in announcing our volume info + # to slave if it becomes established in the + # meantime + gap = min(10, gap) + return (vi, gap) + + def volinfo_hook(self): + volinfo_sys = self.get_sys_volinfo() + self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, + volinfo_sys) + return (volinfo_sys, state_change) + + def xtime_reversion_hook(self, path, xtl, xtr): + if xtr > xtl: + raise GsyncdError("timestamp corruption for " + path) + + def need_sync(self, e, xte, xtrd): + return xte > xtrd + + def set_slave_xtime(self, path, mark): + self.slave.server.set_xtime(path, self.uuid, mark) + +class WrapupMixin(NormalMixin): + """a variant that differs from normal in terms + of ignoring non-indexed files""" + + @staticmethod + def make_xtime_opts(is_master, opts): + if not 'create' in opts: + opts['create'] = False + if not 'default_xtime' in opts: + opts['default_xtime'] = URXTIME + + @staticmethod + def keepalive_payload_hook(timo, gap): + return (None, gap) + + def volinfo_hook(self): + return _volinfo_hook_relax_foreign(self) + +class BlindMixin(object): + """Geo-rep flavor using vectored xtime. + + Coordinates are the master, slave uuid pair; + in master coordinate behavior is normal, + in slave coordinate we force synchronization + on any value difference (these are in disjunctive + relation, ie. if either orders the entry to be + synced, it shall be synced. + """ + + minus_infinity = (URXTIME, None) + + @staticmethod + def serialize_xtime(xt): + a = [] + for x in xt: + if not x: + x = ('None', '') + a.extend(x) + return '.'.join(str(n) for n in a) + + @staticmethod + def deserialize_xtime(xt): + a = xt.split(".") + a = (tuple(a[0:2]), tuple(a[3:4])) + b = [] + for p in a: + if p[0] == 'None': + p = None + else: + p = tuple(int(x) for x in p) + b.append(p) + return tuple(b) + + @staticmethod + def native_xtime(xt): + return xt[0] + + @staticmethod + def xtime_geq(xt0, xt1): + return (not xt1[0] or xt0[0] >= xt1[0]) and \ + (not xt1[1] or xt0[1] >= xt1[1]) + + @property + def ruuid(self): + if self.volinfo_r: + return self.volinfo_r['uuid'] + + @staticmethod + def make_xtime_opts(is_master, opts): + if not 'create' in opts: + opts['create'] = is_master + if not 'default_xtime' in opts: + opts['default_xtime'] = URXTIME + + def xtime_low(self, server, path, **opts): + xtd = server.xtime_vec(path, self.uuid, self.ruuid) + if isinstance(xtd, int): + return xtd + xt = (xtd[self.uuid], xtd[self.ruuid]) + if not xt[1] and (not xt[0] or xt[0] < self.volmark): + if opts['create']: + # not expected, but can happen if file originates + # from interrupted gsyncd transfer + logging.warn('have to fix up missing xtime on ' + path) + xt0 = _xtime_now() + server.set_xtime(path, self.uuid, xt0) + else: + xt0 = opts['default_xtime'] + xt = (xt0, xt[1]) + return xt + + @staticmethod + def keepalive_payload_hook(timo, gap): + return (None, gap) + + def volinfo_hook(self): + res = _volinfo_hook_relax_foreign(self) + volinfo_r_new = self.slave.server.native_volume_info() + if volinfo_r_new['retval']: + raise GsyncdError("slave is corrupt") + if getattr(self, 'volinfo_r', None): + if self.volinfo_r['uuid'] != volinfo_r_new['uuid']: + raise GsyncdError("uuid mismatch on slave") + self.volinfo_r = volinfo_r_new + return res + + def xtime_reversion_hook(self, path, xtl, xtr): + if not isinstance(xtr[0], int) and \ + (isinstance(xtl[0], int) or xtr[0] > xtl[0]): + raise GsyncdError("timestamp corruption for " + path) + + def need_sync(self, e, xte, xtrd): + if xte[0]: + if not xtrd[0] or xte[0] > xtrd[0]: + # there is outstanding diff at 0th pos, + # we can short-cut to true + return True + # we arrived to this point by either of these + # two possiblilites: + # - no outstanding difference at 0th pos, + # wanna see 1st pos if he raises veto + # against "no need to sync" proposal + # - no data at 0th pos, 1st pos will have + # to decide (due to xtime assignment, + # in this case 1st pos does carry data + # -- iow, if 1st pos did not have data, + # and 0th neither, 0th would have been + # force-feeded) + if not xte[1]: + # no data, no veto + return False + # the hard work: for 1st pos, + # the conduct is fetch corresponding + # slave data and do a "blind" comparison + # (ie. do not care who is newer, we trigger + # sync on non-identical xitmes) + xtr = self.xtime(e, self.slave) + return isinstance(xtr, int) or xte[1] != xtr[1] + + def set_slave_xtime(self, path, mark): + xtd = {} + for (u, t) in zip((self.uuid, self.ruuid), mark): + if t: + xtd[u] = t + self.slave.server.set_xtime_vec(path, xtd) + + +class GMasterBase(object): + """abstract class impementling master role""" KFGN = 0 KNAT = 1 @@ -72,27 +345,8 @@ class GMaster(object): rsc = a[0] else: rsc = self.master - if not 'create' in opts: - opts['create'] = (rsc == self.master and not self.inter_master) - if not 'default_xtime' in opts: - if rsc == self.master and self.inter_master: - opts['default_xtime'] = ENODATA - else: - opts['default_xtime'] = URXTIME - xt = rsc.server.xtime(path, self.uuid) - if isinstance(xt, int) and xt != ENODATA: - return xt - invalid_xtime = (xt == ENODATA or xt < self.volmark) - if invalid_xtime: - if opts['create']: - t = time.time() - sec = int(t) - nsec = int((t - sec) * 1000000) - xt = (sec, nsec) - rsc.server.set_xtime(path, self.uuid, xt) - else: - xt = opts['default_xtime'] - return xt + self.make_xtime_opts(rsc == self.master, opts) + return self.xtime_low(rsc.server, path, **opts) def __init__(self, master, slave): self.master = master @@ -123,8 +377,8 @@ class GMaster(object): self.terminate = False self.checkpoint_thread = None - @staticmethod - def _checkpt_param(chkpt, prm, timish=True): + @classmethod + def _checkpt_param(cls, chkpt, prm, xtimish=True): """use config backend to lookup a parameter belonging to checkpoint @chkpt""" cprm = getattr(gconf, 'checkpoint_' + prm, None) @@ -133,16 +387,16 @@ class GMaster(object): chkpt_mapped, val = cprm.split(':', 1) if unescape(chkpt_mapped) != chkpt: return - if timish: - val = tuple(int(x) for x in val.split(".")) + if xtimish: + val = cls.deserialize_xtime(val) return val - @staticmethod - def _set_checkpt_param(chkpt, prm, val, timish=True): + @classmethod + def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True): """use config backend to store a parameter associated with checkpoint @chkpt""" - if timish: - val = "%d.%d" % tuple(val) + if xtimish: + val = cls.serialize_xtime(val) gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) @staticmethod @@ -167,12 +421,14 @@ class GMaster(object): conn, _ = chan.accept() conn.send('\0') conn.close() - completed = self._checkpt_param(chkpt, 'completed') + completed = self._checkpt_param(chkpt, 'completed', xtimish=False) + if completed: + completed = tuple(int(x) for x in completed.split('.')) while True: s,_,_ = select([chan], [], [], (not completed) and 5 or None) # either request made and we re-check to not # give back stale data, or we still hunting for completion - if tgt < self.volmark: + if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark: # indexing has been reset since setting the checkpoint status = "is invalid" else: @@ -180,15 +436,16 @@ class GMaster(object): if isinstance(xtr, int): raise GsyncdError("slave root directory is unaccessible (%s)", os.strerror(xtr)) - ncompleted = (xtr >= tgt) + ncompleted = self.xtime_geq(xtr, tgt) if completed and not ncompleted: # stale data logging.warn("completion time %s for checkpoint %s became stale" % \ (self.humantime(*completed), chkpt)) completed = None gconf.confdata.delete('checkpoint-completed') if ncompleted and not completed: # just reaching completion - completed = [ int(x) for x in ("%.6f" % time.time()).split('.') ] - self._set_checkpt_param(chkpt, 'completed', completed) + completed = "%.6f" % time.time() + self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) + completed = tuple(int(x) for x in completed.split('.')) logging.info("checkpoint %s completed" % chkpt) status = completed and \ "completed at " + self.humantime(completed[0]) or \ @@ -232,8 +489,8 @@ class GMaster(object): raise GsyncdError("master root directory is unaccessible (%s)", os.strerror(checkpt_tgt)) self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) - logging.debug("checkpoint target %d.%d has been determined for checkpoint %s" % \ - (checkpt_tgt[0], checkpt_tgt[1], gconf.checkpoint)) + logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ + (repr(checkpt_tgt), gconf.checkpoint)) t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt)) t.start() self.checkpoint_thread = t @@ -244,20 +501,7 @@ class GMaster(object): if timo > 0: def keep_alive(): while True: - gap = timo * 0.5 - # first grab a reference as self.volinfo - # can be changed in main thread - vi = self.volinfo - if vi: - # then have a private copy which we can mod - vi = vi.copy() - vi['timeout'] = int(time.time()) + timo - else: - # send keep-alives more frequently to - # avoid a delay in announcing our volume info - # to slave if it becomes established in the - # meantime - gap = min(10, gap) + vi, gap = self.keepalive_payload_hook(timo, timo * 0.5) self.slave.server.keep_alive(vi) time.sleep(gap) t = Thread(target=keep_alive) @@ -302,7 +546,7 @@ class GMaster(object): """ if adct: self.slave.server.setattr(path, adct) - self.slave.server.set_xtime(path, self.uuid, mark) + self.set_slave_xtime(path, mark) @staticmethod def volinfo_state_machine(volinfo_state, volinfo_sys): @@ -397,9 +641,7 @@ class GMaster(object): self.lastreport.update(crawls = self.crawls, turns = self.turns, time = self.start) - volinfo_sys = self.get_sys_volinfo() - self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, - volinfo_sys) + volinfo_sys, state_change = self.volinfo_hook() if self.inter_master: self.volinfo = volinfo_sys[self.KFGN] else: @@ -430,20 +672,18 @@ class GMaster(object): if isinstance(xtl, int): self.add_failjob(path, 'no-local-node') return - xtr0 = self.xtime(path, self.slave) - if isinstance(xtr0, int): - if xtr0 != ENOENT: + xtr = self.xtime(path, self.slave) + if isinstance(xtr, int): + if xtr != ENOENT: self.slave.server.purge(path) try: self.slave.server.mkdir(path) except OSError: self.add_failjob(path, 'no-remote-node') return - xtr = URXTIME + xtr = self.minus_infinity else: - xtr = xtr0 - if xtr > xtl: - raise GsyncdError("timestamp corruption for " + path) + self.xtime_reversion_hook(path, xtl, xtr) if xtl == xtr: if path == '.' and self.change_seen: self.turns += 1 @@ -482,7 +722,7 @@ class GMaster(object): xte = self.xtime(e) if isinstance(xte, int): logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) - elif xte > xtr: + elif self.need_sync(e, xte, xtr): chld.append((e, xte)) def indulgently(e, fnc, blame=None): if not blame: diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py index c4cd19c9fb7..7e62fd48ca9 100644 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -17,7 +17,7 @@ from select import error as selecterror from gconf import gconf import repce from repce import RepceServer, RepceClient -from master import GMaster +from master import gmaster_builder import syncdutils from syncdutils import GsyncdError, select, privileged @@ -359,11 +359,37 @@ class Server(object): raise @classmethod + def xtime_vec(cls, path, *uuids): + """vectored version of @xtime + + accepts a list of uuids and returns a dictionary + with uuid as key(s) and xtime as value(s) + """ + xt = {} + for uuid in uuids: + xtu = cls.xtime(path, uuid) + if xtu == ENODATA: + xtu = None + if isinstance(xtu, int): + return xtu + xt[uuid] = xtu + return xt + + @classmethod @_pathguard def set_xtime(cls, path, uuid, mark): """set @mark as xtime for @uuid on @path""" Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) + @classmethod + def set_xtime_vec(cls, path, mark_dct): + """vectored (or dictered) version of set_xtime + + ignore values that match @ignore + """ + for u,t in mark_dct.items(): + cls.set_xtime(path, u, t) + @staticmethod @_pathguard def setattr(path, adct): @@ -604,6 +630,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): if x[0] > now: logging.debug("volinfo[%s] expires: %d (%d sec later)" % \ (d['uuid'], x[0], x[0] - now)) + d['timeout'] = x[0] dict_list.append(d) else: try: @@ -820,7 +847,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): - else do that's what's inherited """ if args: - GMaster(self, args[0]).crawl_loop() + gmaster_builder()(self, args[0]).crawl_loop() else: sup(self, *args) |