diff options
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 297 | 
1 files changed, 0 insertions, 297 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index c9abd91c..3b093677 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -157,153 +157,6 @@ class PartialMixin(NormalMixin):      def xtime_reversion_hook(self, path, xtl, xtr):          pass -class WrapupMixin(NormalMixin): -    """a variant that differs from normal in terms -       of ignoring non-indexed files""" - -    @staticmethod -    def make_xtime_opts(is_master, opts): -        if not 'create' in opts: -            opts['create'] = False -        if not 'default_xtime' in opts: -            opts['default_xtime'] = URXTIME - -    @staticmethod -    def keepalive_payload_hook(self, timo, gap): -        return (None, gap) - -    def volinfo_hook(self): -        return _volinfo_hook_relax_foreign(self) - -class BlindMixin(object): -    """Geo-rep flavor using vectored xtime. - -    Coordinates are the master, slave uuid pair; -    in master coordinate behavior is normal, -    in slave coordinate we force synchronization -    on any value difference (these are in disjunctive -    relation, ie. if either orders the entry to be -    synced, it shall be synced. -    """ - -    minus_infinity = (URXTIME, None) - -    @staticmethod -    def serialize_xtime(xt): -        a = [] -        for x in xt: -            if not x: -                x = ('None', '') -            a.extend(x) -        return '.'.join(str(n) for n in a) - -    @staticmethod -    def deserialize_xtime(xt): -        a = xt.split(".") -        a = (tuple(a[0:2]), tuple(a[3:4])) -        b = [] -        for p in a: -            if p[0] == 'None': -                p = None -            else: -                p = tuple(int(x) for x in p) -            b.append(p) -        return tuple(b) - -    @staticmethod -    def native_xtime(xt): -        return xt[0] - -    @staticmethod -    def xtime_geq(xt0, xt1): -        return (not xt1[0] or xt0[0] >= xt1[0]) and \ -               (not xt1[1] or xt0[1] >= xt1[1]) - -    @property -    def ruuid(self): -        if self.volinfo_r: -            return self.volinfo_r['uuid'] - -    @staticmethod -    def make_xtime_opts(is_master, opts): -        if not 'create' in opts: -            opts['create'] = is_master -        if not 'default_xtime' in opts: -            opts['default_xtime'] = URXTIME - -    def xtime_low(self, server, path, **opts): -        xtd = server.xtime_vec(path, self.uuid, self.ruuid) -        if isinstance(xtd, int): -            return xtd -        xt = (xtd[self.uuid], xtd[self.ruuid]) -        if not xt[1] and (not xt[0] or xt[0] < self.volmark): -            if opts['create']: -                # not expected, but can happen if file originates -                # from interrupted gsyncd transfer -                logging.warn('have to fix up missing xtime on ' + path) -                xt0 = _xtime_now() -                server.aggregated.set_xtime(path, self.uuid, xt0) -            else: -                xt0 = opts['default_xtime'] -            xt = (xt0, xt[1]) -        return xt - -    @staticmethod -    def keepalive_payload_hook(self, timo, gap): -        return (None, gap) - -    def volinfo_hook(self): -        res = _volinfo_hook_relax_foreign(self) -        volinfo_r_new = self.slave.server.aggregated.native_volume_info() -        if volinfo_r_new['retval']: -            raise GsyncdError("slave is corrupt") -        if getattr(self, 'volinfo_r', None): -            if self.volinfo_r['uuid'] != volinfo_r_new['uuid']: -                raise GsyncdError("uuid mismatch on slave") -        self.volinfo_r = volinfo_r_new -        return res - -    def xtime_reversion_hook(self, path, xtl, xtr): -        if not isinstance(xtr[0], int) and \ -          (isinstance(xtl[0], int) or xtr[0] > xtl[0]): -            raise GsyncdError("timestamp corruption for " + path) - -    def need_sync(self, e, xte, xtrd): -        if xte[0]: -            if not xtrd[0] or xte[0] > xtrd[0]: -                # there is outstanding diff at 0th pos, -                # we can short-cut to true -                return True -        # we arrived to this point by either of these -        # two possiblilites: -        # - no outstanding difference at 0th pos, -        #   wanna see 1st pos if he raises veto -        #   against "no need to sync" proposal -        # - no data at 0th pos, 1st pos will have -        #   to decide (due to xtime assignment, -        #   in this case 1st pos does carry data -        #   -- iow, if 1st pos did not have data, -        #   and 0th neither, 0th would have been -        #   force-feeded) -        if not xte[1]: -            # no data, no veto -            return False -        # the hard work: for 1st pos, -        # the conduct is fetch corresponding -        # slave data and do a "blind" comparison -        # (ie. do not care who is newer, we trigger -        # sync on non-identical xitmes) -        xtr = self.xtime(e, self.slave) -        return isinstance(xtr, int) or xte[1] != xtr[1] - -    def set_slave_xtime(self, path, mark): -        xtd = {} -        for (u, t) in zip((self.uuid, self.ruuid), mark): -            if t: -                xtd[u] = t -        self.slave.server.set_xtime_vec(path, xtd) - -  # Further mixins for certain tunable behaviors  class SendmarkNormalMixin(object): @@ -1077,156 +930,6 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              self.process([self.fname()], done)              self.upd_stime(xtl) -class GMasterXtimeMixin(GMasterCommon): -    """ xtime based change detection and syncing """ - -    def register(self): -        pass - -    def crawl(self, path='.', xtl=None): -        """crawling... - -          Standing around -          All the right people -          Crawling -          Tennis on Tuesday -          The ladder is long -          It is your nature -          You've gotta suntan -          Football on Sunday -          Society boy - -        Recursively walk the master side tree and check if updates are -        needed due to xtime differences. One invocation of crawl checks -        children of @path and do a recursive enter only on -        those directory children where there is an update needed. - -        Way of updates depend on file type: -        - for symlinks, sync them directy and synchronously -        - for regular children, register jobs for @path (cf. .add_job) to start -          and wait on their rsync -        - for directory children, register a job for @path which waits (.wait) -          on jobs for the given child -        (other kind of filesystem nodes are not considered) - -        Those slave side children which do not exist on master are simply -        purged (see Server.purge). - -        Behavior is fault tolerant, synchronization is adaptive: if some action fails, -        just go on relentlessly, adding a fail job (see .add_failjob) which will prevent -        the .sendmark on @path, so when the next crawl will arrive to @path it will not -        see it as up-to-date and  will try to sync it again. While this semantics can be -        supported by funky design principles (http://c2.com/cgi/wiki?LazinessImpatienceHubris), -        the ultimate reason which excludes other possibilities is simply transience: we cannot -        assert that the file systems (master / slave) underneath do not change and actions -        taken upon some condition will not lose their context by the time they are performed. -        """ -        logging.debug("entering " + path) -        if not xtl: -            xtl = self.xtime(path) -            if isinstance(xtl, int): -                self.add_failjob(path, 'no-local-node') -                return -        xtr = self.xtime(path, self.slave) -        if isinstance(xtr, int): -            if xtr != ENOENT: -                self.slave.server.purge(path) -            try: -                self.slave.server.mkdir(path) -            except OSError: -                self.add_failjob(path, 'no-remote-node') -                return -            xtr = self.minus_infinity -        else: -            self.xtime_reversion_hook(path, xtl, xtr) -            if xtl == xtr: -                if path == '.' and self.change_seen: -                    self.turns += 1 -                    self.change_seen = False -                    if self.total_turns: -                        logging.info("finished turn #%s/%s" % \ -                                     (self.turns, self.total_turns)) -                        if self.turns == self.total_turns: -                            logging.info("reached turn limit") -                            self.terminate = True -                return -        if path == '.': -            self.change_seen = True -        try: -            dem = self.master.server.entries(path) -        except OSError: -            self.add_failjob(path, 'local-entries-fail') -            return -        random.shuffle(dem) -        try: -            des = self.slave.server.entries(path) -        except OSError: -            self.slave.server.purge(path) -            try: -                self.slave.server.mkdir(path) -                des = self.slave.server.entries(path) -            except OSError: -                self.add_failjob(path, 'remote-entries-fail') -                return -        dd = set(des) - set(dem) -        if dd: -            self.purge_missing(path, dd) -        chld = [] -        for e in dem: -            e = os.path.join(path, e) -            xte = self.xtime(e) -            if isinstance(xte, int): -                logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) -            elif self.need_sync(e, xte, xtr): -                chld.append((e, xte)) -        def indulgently(e, fnc, blame=None): -            if not blame: -                blame = path -            try: -                return fnc(e) -            except (IOError, OSError): -                ex = sys.exc_info()[1] -                if ex.errno == ENOENT: -                    logging.warn("salvaged ENOENT for " + e) -                    self.add_failjob(blame, 'by-indulgently') -                    return False -                else: -                    raise -        for e, xte in chld: -            st = indulgently(e, lambda e: os.lstat(e)) -            if st == False: -                continue - -            mo = st.st_mode -            adct = {'own': (st.st_uid, st.st_gid)} -            if stat.S_ISLNK(mo): -                if indulgently(e, lambda e: self.slave.server.symlink(os.readlink(e), e)) == False: -                    continue -                self.sendmark(e, xte, adct) -            elif stat.S_ISREG(mo): -                logging.debug("syncing %s ..." % e) -                pb = self.syncer.add(e) -                def regjob(e, xte, pb): -                    if pb.wait()[0]: -                        logging.debug("synced " + e) -                        self.sendmark_regular(e, xte) -                        return True -                    else: -                        logging.warn("failed to sync " + e) -                self.add_job(path, 'reg', regjob, e, xte, pb) -            elif stat.S_ISDIR(mo): -                adct['mode'] = mo -                if indulgently(e, lambda e: (self.add_job(path, 'cwait', self.wait, e, xte, adct), -                                             self.crawl(e, xte), -                                             True)[-1], blame=e) == False: -                    continue -            else: -                # ignore fifos, sockets and special files -                pass -        if path == '.': -            self.wait(path, xtl) - -  class BoxClosedErr(Exception):      pass  | 
