diff options
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/gsyncd.py | 8 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/master.py | 368 | ||||
| -rw-r--r-- | xlators/features/marker/utils/syncdaemon/resource.py | 31 | 
3 files changed, 341 insertions, 66 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py index d68cea672..165aebda1 100644 --- a/xlators/features/marker/utils/syncdaemon/gsyncd.py +++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py @@ -172,6 +172,14 @@ def main_i():      op.add_option('--allow-network',       metavar='IPS',   default='')      op.add_option('--state-socket-unencoded', metavar='SOCKF', type=str, action='callback', callback=store_abs)      op.add_option('--checkpoint',          metavar='LABEL', default='') +    # tunables for failover/failback mechanism: +    # None   - gsyncd behaves as normal +    # blind  - gsyncd works with xtime pairs to identify +    #          candidates for synchronization +    # wrapup - same as normal mode but does not assign +    #          xtimes to orphaned files +    # see crawl() for usage of the above tunables +    op.add_option('--special-sync-mode', type=str, help=SUPPRESS_HELP)      op.add_option('-c', '--config-file',   metavar='CONF',  type=str, action='callback', callback=store_local)      # duh. need to specify dest or value will be mapped to None :S diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index 4826037f1..945ebb75d 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -22,8 +22,281 @@ from syncdutils import FreeObject, Thread, GsyncdError, boolify, \  URXTIME = (-1, 0) -class GMaster(object): -    """class impementling master role""" +# Utility functions to help us to get to closer proximity +# of the DRY principle (no, don't look for elevated or +# perspectivistic things here) + +def _xtime_now(): +    t = time.time() +    sec = int(t) +    nsec = int((t - sec) * 1000000) +    return (sec, nsec) + +def _volinfo_hook_relax_foreign(self): +    volinfo_sys = self.get_sys_volinfo() +    fgn_vi = volinfo_sys[self.KFGN] +    if fgn_vi: +        expiry = fgn_vi['timeout'] - int(time.time()) + 1 +        logging.info('foreign volume info found, waiting %d sec for expiry' % \ +                     expiry) +        time.sleep(expiry) +        volinfo_sys = self.get_sys_volinfo() +    self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, +                                                                  volinfo_sys) +    if self.inter_master: +        raise GsyncdError("cannot be intermediate master in special mode") +    return (volinfo_sys, state_change) + + +# The API! + +def gmaster_builder(): +    """produce the GMaster class variant corresponding +       to sync mode""" +    this = sys.modules[__name__] +    mixin = gconf.special_sync_mode +    if not mixin: +        mixin = 'normal' +    logging.info('setting up master for %s sync mode' % mixin) +    mixin = getattr(this, mixin.capitalize() + 'Mixin') +    class _GMaster(GMasterBase, mixin): +        pass +    return _GMaster + + +# Mixin classes that implement the data format +# and logic particularities of the certain +# sync modes + +class NormalMixin(object): +    """normal geo-rep behavior""" + +    minus_infinity = URXTIME + +    # following staticmethods ideally would be +    # methods of an xtime object (in particular, +    # implementing the hooks needed for comparison +    # operators), but at this point we don't yet +    # have a dedicated xtime class + +    @staticmethod +    def serialize_xtime(xt): +        return "%d.%d" % tuple(xt) + +    @staticmethod +    def deserialize_xtime(xt): +        return tuple(int(x) for x in xt.split(".")) + +    @staticmethod +    def native_xtime(xt): +        return xt + +    @staticmethod +    def xtime_geq(xt0, xt1): +        return xt0 >= xt1 + +    def make_xtime_opts(self, is_master, opts): +        if not 'create' in opts: +            opts['create'] = is_master and not self.inter_master +        if not 'default_xtime' in opts: +            if is_master and self.inter_master: +                opts['default_xtime'] = ENODATA +            else: +                opts['default_xtime'] = URXTIME + +    def xtime_low(self, server, path, **opts): +        xt = server.xtime(path, self.uuid) +        if isinstance(xt, int) and xt != ENODATA: +            return xt +        if xt == ENODATA or xt < self.volmark: +            if opts['create']: +                xt = _xtime_now() +                server.set_xtime(path, self.uuid, xt) +            else: +                xt = opts['default_xtime'] +        return xt + +    def keepalive_payload_hook(self, timo, gap): +        # first grab a reference as self.volinfo +        # can be changed in main thread +        vi = self.volinfo +        if vi: +            # then have a private copy which we can mod +            vi = vi.copy() +            vi['timeout'] = int(time.time()) + timo +        else: +            # send keep-alives more frequently to +            # avoid a delay in announcing our volume info +            # to slave if it becomes established in the +            # meantime +            gap = min(10, gap) +        return (vi, gap) + +    def volinfo_hook(self): +        volinfo_sys = self.get_sys_volinfo() +        self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, +                                                                      volinfo_sys) +        return (volinfo_sys, state_change) + +    def xtime_reversion_hook(self, path, xtl, xtr): +        if xtr > xtl: +            raise GsyncdError("timestamp corruption for " + path) + +    def need_sync(self, e, xte, xtrd): +        return xte > xtrd + +    def set_slave_xtime(self, path, mark): +        self.slave.server.set_xtime(path, self.uuid, mark) + +class WrapupMixin(NormalMixin): +    """a variant that differs from normal in terms +       of ignoring non-indexed files""" + +    @staticmethod +    def make_xtime_opts(is_master, opts): +        if not 'create' in opts: +            opts['create'] = False +        if not 'default_xtime' in opts: +            opts['default_xtime'] = URXTIME + +    @staticmethod +    def keepalive_payload_hook(timo, gap): +        return (None, gap) + +    def volinfo_hook(self): +        return _volinfo_hook_relax_foreign(self) + +class BlindMixin(object): +    """Geo-rep flavor using vectored xtime. + +    Coordinates are the master, slave uuid pair; +    in master coordinate behavior is normal, +    in slave coordinate we force synchronization +    on any value difference (these are in disjunctive +    relation, ie. if either orders the entry to be +    synced, it shall be synced. +    """ + +    minus_infinity = (URXTIME, None) + +    @staticmethod +    def serialize_xtime(xt): +        a = [] +        for x in xt: +            if not x: +                x = ('None', '') +            a.extend(x) +        return '.'.join(str(n) for n in a) + +    @staticmethod +    def deserialize_xtime(xt): +        a = xt.split(".") +        a = (tuple(a[0:2]), tuple(a[3:4])) +        b = [] +        for p in a: +            if p[0] == 'None': +                p = None +            else: +                p = tuple(int(x) for x in p) +            b.append(p) +        return tuple(b) + +    @staticmethod +    def native_xtime(xt): +        return xt[0] + +    @staticmethod +    def xtime_geq(xt0, xt1): +        return (not xt1[0] or xt0[0] >= xt1[0]) and \ +               (not xt1[1] or xt0[1] >= xt1[1]) + +    @property +    def ruuid(self): +        if self.volinfo_r: +            return self.volinfo_r['uuid'] + +    @staticmethod +    def make_xtime_opts(is_master, opts): +        if not 'create' in opts: +            opts['create'] = is_master +        if not 'default_xtime' in opts: +            opts['default_xtime'] = URXTIME + +    def xtime_low(self, server, path, **opts): +        xtd = server.xtime_vec(path, self.uuid, self.ruuid) +        if isinstance(xtd, int): +            return xtd +        xt = (xtd[self.uuid], xtd[self.ruuid]) +        if not xt[1] and (not xt[0] or xt[0] < self.volmark): +            if opts['create']: +                # not expected, but can happen if file originates +                # from interrupted gsyncd transfer +                logging.warn('have to fix up missing xtime on ' + path) +                xt0 = _xtime_now() +                server.set_xtime(path, self.uuid, xt0) +            else: +                xt0 = opts['default_xtime'] +            xt = (xt0, xt[1]) +        return xt + +    @staticmethod +    def keepalive_payload_hook(timo, gap): +        return (None, gap) + +    def volinfo_hook(self): +        res = _volinfo_hook_relax_foreign(self) +        volinfo_r_new = self.slave.server.native_volume_info() +        if volinfo_r_new['retval']: +            raise GsyncdError("slave is corrupt") +        if getattr(self, 'volinfo_r', None): +            if self.volinfo_r['uuid'] != volinfo_r_new['uuid']: +                raise GsyncdError("uuid mismatch on slave") +        self.volinfo_r = volinfo_r_new +        return res + +    def xtime_reversion_hook(self, path, xtl, xtr): +        if not isinstance(xtr[0], int) and \ +          (isinstance(xtl[0], int) or xtr[0] > xtl[0]): +            raise GsyncdError("timestamp corruption for " + path) + +    def need_sync(self, e, xte, xtrd): +        if xte[0]: +            if not xtrd[0] or xte[0] > xtrd[0]: +                # there is outstanding diff at 0th pos, +                # we can short-cut to true +                return True +        # we arrived to this point by either of these +        # two possiblilites: +        # - no outstanding difference at 0th pos, +        #   wanna see 1st pos if he raises veto +        #   against "no need to sync" proposal +        # - no data at 0th pos, 1st pos will have +        #   to decide (due to xtime assignment, +        #   in this case 1st pos does carry data +        #   -- iow, if 1st pos did not have data, +        #   and 0th neither, 0th would have been +        #   force-feeded) +        if not xte[1]: +            # no data, no veto +            return False +        # the hard work: for 1st pos, +        # the conduct is fetch corresponding +        # slave data and do a "blind" comparison +        # (ie. do not care who is newer, we trigger +        # sync on non-identical xitmes) +        xtr = self.xtime(e, self.slave) +        return isinstance(xtr, int) or xte[1] != xtr[1] + +    def set_slave_xtime(self, path, mark): +        xtd = {} +        for (u, t) in zip((self.uuid, self.ruuid), mark): +            if t: +                xtd[u] = t +        self.slave.server.set_xtime_vec(path, xtd) + + +class GMasterBase(object): +    """abstract class impementling master role"""      KFGN = 0      KNAT = 1 @@ -72,27 +345,8 @@ class GMaster(object):              rsc = a[0]          else:              rsc = self.master -        if not 'create' in opts: -            opts['create'] = (rsc == self.master and not self.inter_master) -        if not 'default_xtime' in opts: -            if rsc == self.master and self.inter_master: -                opts['default_xtime'] = ENODATA -            else: -                opts['default_xtime'] = URXTIME -        xt = rsc.server.xtime(path, self.uuid) -        if isinstance(xt, int) and xt != ENODATA: -            return xt -        invalid_xtime = (xt == ENODATA or xt < self.volmark) -        if invalid_xtime: -            if opts['create']: -                t = time.time() -                sec = int(t) -                nsec = int((t - sec) * 1000000) -                xt = (sec, nsec) -                rsc.server.set_xtime(path, self.uuid, xt) -            else: -                xt = opts['default_xtime'] -        return xt +        self.make_xtime_opts(rsc == self.master, opts) +        return self.xtime_low(rsc.server, path, **opts)      def __init__(self, master, slave):          self.master = master @@ -123,8 +377,8 @@ class GMaster(object):          self.terminate = False          self.checkpoint_thread = None -    @staticmethod -    def _checkpt_param(chkpt, prm, timish=True): +    @classmethod +    def _checkpt_param(cls, chkpt, prm, xtimish=True):          """use config backend to lookup a parameter belonging to             checkpoint @chkpt"""          cprm = getattr(gconf, 'checkpoint_' + prm, None) @@ -133,16 +387,16 @@ class GMaster(object):          chkpt_mapped, val = cprm.split(':', 1)          if unescape(chkpt_mapped) != chkpt:              return -        if timish: -            val = tuple(int(x) for x in val.split(".")) +        if xtimish: +            val = cls.deserialize_xtime(val)          return val -    @staticmethod -    def _set_checkpt_param(chkpt, prm, val, timish=True): +    @classmethod +    def _set_checkpt_param(cls, chkpt, prm, val, xtimish=True):          """use config backend to store a parameter associated             with checkpoint @chkpt""" -        if timish: -            val = "%d.%d" % tuple(val) +        if xtimish: +            val = cls.serialize_xtime(val)          gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))      @staticmethod @@ -167,12 +421,14 @@ class GMaster(object):                  conn, _ = chan.accept()                  conn.send('\0')                  conn.close() -        completed = self._checkpt_param(chkpt, 'completed') +        completed = self._checkpt_param(chkpt, 'completed', xtimish=False) +        if completed: +            completed = tuple(int(x) for x in completed.split('.'))          while True:              s,_,_ = select([chan], [], [], (not completed) and 5 or None)              # either request made and we re-check to not              # give back stale data, or we still hunting for completion -            if tgt < self.volmark: +            if self.native_xtime(tgt) and self.native_xtime(tgt) < self.volmark:                  # indexing has been reset since setting the checkpoint                  status = "is invalid"              else: @@ -180,15 +436,16 @@ class GMaster(object):                  if isinstance(xtr, int):                      raise GsyncdError("slave root directory is unaccessible (%s)",                                        os.strerror(xtr)) -                ncompleted = (xtr >= tgt) +                ncompleted = self.xtime_geq(xtr, tgt)                  if completed and not ncompleted: # stale data                      logging.warn("completion time %s for checkpoint %s became stale" % \                                   (self.humantime(*completed), chkpt))                      completed = None                      gconf.confdata.delete('checkpoint-completed')                  if ncompleted and not completed: # just reaching completion -                    completed = [ int(x) for x in ("%.6f" % time.time()).split('.') ] -                    self._set_checkpt_param(chkpt, 'completed', completed) +                    completed = "%.6f" % time.time() +                    self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) +                    completed = tuple(int(x) for x in completed.split('.'))                      logging.info("checkpoint %s completed" % chkpt)                  status = completed and \                    "completed at " + self.humantime(completed[0]) or \ @@ -232,8 +489,8 @@ class GMaster(object):                      raise GsyncdError("master root directory is unaccessible (%s)",                                        os.strerror(checkpt_tgt))                  self._set_checkpt_param(gconf.checkpoint, 'target', checkpt_tgt) -            logging.debug("checkpoint target %d.%d has been determined for checkpoint %s" % \ -                          (checkpt_tgt[0], checkpt_tgt[1], gconf.checkpoint)) +            logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ +                          (repr(checkpt_tgt), gconf.checkpoint))          t = Thread(target=self.checkpt_service, args=(chan, gconf.checkpoint, checkpt_tgt))          t.start()          self.checkpoint_thread = t @@ -244,20 +501,7 @@ class GMaster(object):          if timo > 0:              def keep_alive():                  while True: -                    gap = timo * 0.5 -                    # first grab a reference as self.volinfo -                    # can be changed in main thread -                    vi = self.volinfo -                    if vi: -                        # then have a private copy which we can mod -                        vi = vi.copy() -                        vi['timeout'] = int(time.time()) + timo -                    else: -                        # send keep-alives more frequently to -                        # avoid a delay in announcing our volume info -                        # to slave if it becomes established in the -                        # meantime -                        gap = min(10, gap) +                    vi, gap = self.keepalive_payload_hook(timo, timo * 0.5)                      self.slave.server.keep_alive(vi)                      time.sleep(gap)              t = Thread(target=keep_alive) @@ -302,7 +546,7 @@ class GMaster(object):          """          if adct:              self.slave.server.setattr(path, adct) -        self.slave.server.set_xtime(path, self.uuid, mark) +        self.set_slave_xtime(path, mark)      @staticmethod      def volinfo_state_machine(volinfo_state, volinfo_sys): @@ -397,9 +641,7 @@ class GMaster(object):                  self.lastreport.update(crawls = self.crawls,                                         turns = self.turns,                                         time = self.start) -            volinfo_sys = self.get_sys_volinfo() -            self.volinfo_state, state_change = self.volinfo_state_machine(self.volinfo_state, -                                                                          volinfo_sys) +            volinfo_sys, state_change = self.volinfo_hook()              if self.inter_master:                  self.volinfo = volinfo_sys[self.KFGN]              else: @@ -430,20 +672,18 @@ class GMaster(object):              if isinstance(xtl, int):                  self.add_failjob(path, 'no-local-node')                  return -        xtr0 = self.xtime(path, self.slave) -        if isinstance(xtr0, int): -            if xtr0 != ENOENT: +        xtr = self.xtime(path, self.slave) +        if isinstance(xtr, int): +            if xtr != ENOENT:                  self.slave.server.purge(path)              try:                  self.slave.server.mkdir(path)              except OSError:                  self.add_failjob(path, 'no-remote-node')                  return -            xtr = URXTIME +            xtr = self.minus_infinity          else: -            xtr = xtr0 -            if xtr > xtl: -                raise GsyncdError("timestamp corruption for " + path) +            self.xtime_reversion_hook(path, xtl, xtr)              if xtl == xtr:                  if path == '.' and self.change_seen:                      self.turns += 1 @@ -482,7 +722,7 @@ class GMaster(object):              xte = self.xtime(e)              if isinstance(xte, int):                  logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) -            elif xte > xtr: +            elif self.need_sync(e, xte, xtr):                  chld.append((e, xte))          def indulgently(e, fnc, blame=None):              if not blame: diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py index c4cd19c9f..7e62fd48c 100644 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -17,7 +17,7 @@ from select import error as selecterror  from gconf import gconf  import repce  from repce import RepceServer, RepceClient -from master import GMaster +from  master import gmaster_builder  import syncdutils  from syncdutils import GsyncdError, select, privileged @@ -359,11 +359,37 @@ class Server(object):                  raise      @classmethod +    def xtime_vec(cls, path, *uuids): +        """vectored version of @xtime + +        accepts a list of uuids and returns a dictionary +        with uuid as key(s) and xtime as value(s) +        """ +        xt = {} +        for uuid in uuids: +            xtu = cls.xtime(path, uuid) +            if xtu == ENODATA: +                xtu = None +            if isinstance(xtu, int): +                return xtu +            xt[uuid] = xtu +        return xt + +    @classmethod      @_pathguard      def set_xtime(cls, path, uuid, mark):          """set @mark as xtime for @uuid on @path"""          Xattr.lsetxattr(path, '.'.join([cls.GX_NSPACE, uuid, 'xtime']), struct.pack('!II', *mark)) +    @classmethod +    def set_xtime_vec(cls, path, mark_dct): +        """vectored (or dictered) version of set_xtime + +        ignore values that match @ignore +        """ +        for u,t in mark_dct.items(): +            cls.set_xtime(path, u, t) +      @staticmethod      @_pathguard      def setattr(path, adct): @@ -604,6 +630,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                      if x[0] > now:                          logging.debug("volinfo[%s] expires: %d (%d sec later)" % \                                        (d['uuid'], x[0], x[0] - now)) +                        d['timeout'] = x[0]                          dict_list.append(d)                      else:                          try: @@ -820,7 +847,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          - else do that's what's inherited          """          if args: -            GMaster(self, args[0]).crawl_loop() +            gmaster_builder()(self, args[0]).crawl_loop()          else:              sup(self, *args)  | 
