diff options
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 73 | ||||
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 36 | 
2 files changed, 50 insertions, 59 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index e3904736ba2..79c90630dca 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -168,10 +168,10 @@ class NormalMixin(object):              raise GsyncdError("timestamp corruption for " + path)      def need_sync(self, e, xte, xtrd): -        if self.xsync_upper_limit is None: -            return xte > xtrd +        if self.xsync_upper_limit: +            return xte > xtrd and xte <= self.xsync_upper_limit          else: -            return xte > xtrd and xte < self.xsync_upper_limit +            return xte > xtrd      def set_slave_xtime(self, path, mark):          self.slave.server.set_stime(path, self.uuid, mark) @@ -442,13 +442,24 @@ class GMasterCommon(object):      def register(self):          self.register() -    def crawlwrap(self, oneshot=False, no_stime_update=False): +    def crawlwrap(self, oneshot=False, no_stime_update=False, +                  register_time=None):          if oneshot:              # it's important to do this during the oneshot crawl as              # for a passive gsyncd (ie. in a replicate scenario)              # the keepalive thread would keep the connection alive.              self.init_keep_alive() +        # If crawlwrap is called when partial history available, +        # then it sets register_time which is the time when geo-rep +        # worker registerd to changelog consumption. Since nsec is +        # not considered in register time, their are chances of skipping +        # changes detection in xsync crawl. Add 1 sec to upper_limit. +        # This limit will be reset when crawlwrap is called again. +        self.xsync_upper_limit = None +        if register_time: +            self.xsync_upper_limit = (register_time + 1, 0) +          # no need to maintain volinfo state machine.          # in a cascading setup, each geo-replication session is          # independent (ie. 'volume-mark' and 'xtime' are not @@ -757,6 +768,12 @@ class GMasterChangelogMixin(GMasterCommon):          logging.debug('changelog working dir %s' % workdir)          return workdir +    def get_purge_time(self): +        purge_time = self.xtime('.', self.slave) +        if isinstance(purge_time, int): +            purge_time = None +        return purge_time +      def process_change(self, change, done, retry):          pfx = gauxpfx()          clist = [] @@ -1135,9 +1152,7 @@ class GMasterChangelogMixin(GMasterCommon):          changes = []          # get stime (from the brick) and purge changelogs          # that are _historical_ to that time. -        purge_time = self.xtime('.', self.slave) -        if isinstance(purge_time, int): -            purge_time = None +        purge_time = self.get_purge_time()          self.changelog_agent.scan()          self.crawls += 1 @@ -1175,14 +1190,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):      def crawl(self, no_stime_update=False):          self.history_turns += 1          self.update_worker_crawl_status("History Crawl") -        logging.info('starting history crawl... turns: %s' % -                     self.history_turns) +        purge_time = self.get_purge_time() -        # get stime (from the brick) and purge changelogs -        # that are _historical_ to that time. -        purge_time = self.xtime('.', self.slave) -        if isinstance(purge_time, int): -            purge_time = None +        logging.info('starting history crawl... turns: %s, stime: %s' +                     % (self.history_turns, repr(purge_time)))          if not purge_time or purge_time == URXTIME:              logging.info("stime not available, abandoning history crawl") @@ -1226,8 +1237,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):          history_turn_time = int(time.time()) - self.history_crawl_start_time -        logging.info('finished history crawl syncing between %s - %s.' % -                     (purge_time[0], actual_end)) +        logging.info('finished history crawl syncing, endtime: %s, stime: %s' +                     % (actual_end, repr(self.get_purge_time())))          # If TS returned from history_changelog is < register_time          # then FS crawl may be required, since history is only available @@ -1260,7 +1271,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      XSYNC_MAX_ENTRIES = 1 << 13 -    def register(self, register_time, changelog_agent=None): +    def register(self, register_time=None, changelog_agent=None):          self.counter = 0          self.comlist = []          self.stimes = [] @@ -1277,17 +1288,6 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              else:                  raise -        # After changelogs history processing completes, it switches -        # to xsync/hibrid crawl if history actual end time is less than -        # live changelog register time. Xsync should only run for that -        # small gap, ie.. changelog_register_time - history_actual_end_time -        # If we don't have upper limit to limit the XSync change detection -        # It will keep on detecting the files even though changelogs are -        # available for the same. Set upper limit during register -        # and reset at the end of each crawl. Reseting at the end of -        # crawl is required if change_detector is set to xsync. -        self.xsync_upper_limit = (register_time, 0) -      def crawl(self, no_stime_update=False):          """          event dispatcher thread @@ -1299,18 +1299,15 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              self.Xcrawl()          t = Thread(target=Xsyncer)          t.start() -        logging.info('starting hybrid crawl...') +        logging.info('starting hybrid crawl..., stime: %s' +                     % repr(self.get_purge_time()))          self.update_worker_crawl_status("Hybrid Crawl")          while True:              try:                  item = self.comlist.pop(0)                  if item[0] == 'finale': -                    if self.xsync_upper_limit is not None: -                        logging.info('finished hybrid crawl syncing, endtime: ' -                                     '%s' % self.xsync_upper_limit[0]) -                    else: -                        logging.info('finished hybrid crawl syncing') - +                    logging.info('finished hybrid crawl syncing, stime: %s' +                                 % repr(self.get_purge_time()))                      break                  elif item[0] == 'xsync':                      logging.info('processing xsync changelog %s' % (item[1])) @@ -1333,8 +1330,6 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              except IndexError:                  time.sleep(1) -        self.xsync_upper_limit = None -      def write_entry_change(self, prefix, data=[]):          self.fh.write("%s %s\n" % (prefix, ' '.join(data))) @@ -1460,7 +1455,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):                                                str(st.st_gid), str(st.st_mode)])                  self.Xcrawl(e, xtr_root)                  stime_to_update = xte -                if self.xsync_upper_limit is not None: +                if self.xsync_upper_limit:                      stime_to_update = min(self.xsync_upper_limit, xte)                  self.stimes.append((e, stime_to_update))              elif stat.S_ISLNK(mo): @@ -1487,7 +1482,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):                  self.write_entry_change("D", [gfid])          if path == '.':              stime_to_update = xtl -            if self.xsync_upper_limit is not None: +            if self.xsync_upper_limit:                  stime_to_update = min(self.xsync_upper_limit, xtl)              self.stimes.append((path, stime_to_update))              self.sync_done(self.stimes, True) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 09310c1f1aa..2a887daab15 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -1293,22 +1293,27 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):              try:                  workdir = g2.setup_working_dir() -                # register with the changelog library -                # 9 == log level (DEBUG) -                # 5 == connection retries -                changelog_agent.register(gconf.local_path, -                                         workdir, gconf.changelog_log_file, -                                         g2.CHANGELOG_LOG_LEVEL, -                                         g2.CHANGELOG_CONN_RETRIES) +                # Register only when change_detector is not set to +                # xsync, else agent will generate changelog files +                # in .processing directory of working dir +                if gconf.change_detector != 'xsync': +                    # register with the changelog library +                    # 9 == log level (DEBUG) +                    # 5 == connection retries +                    changelog_agent.register(gconf.local_path, +                                             workdir, gconf.changelog_log_file, +                                             g2.CHANGELOG_LOG_LEVEL, +                                             g2.CHANGELOG_CONN_RETRIES) +                  register_time = int(time.time())                  g2.register(register_time, changelog_agent)                  g3.register(register_time, changelog_agent)              except ChangelogException:                  changelog_register_failed = True -                register_time = int(time.time()) +                register_time = None                  logging.info("Changelog register failed, fallback to xsync") -            g1.register(register_time) +            g1.register()              logging.info("Register time: %s" % register_time)              # oneshot: Try to use changelog history api, if not              # available switch to FS crawl @@ -1328,17 +1333,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                      logging.info('Partial history available, using xsync crawl'                                   ' after consuming history '                                   'till %s' % str(e)) -                g1.crawlwrap(oneshot=True, no_stime_update=True) - -            # Reset xsync upper limit. g2, g3 are changelog and history -            # instances, but if change_detector is set to xsync then -            # g1, g2, g3 will be xsync instances. -            g1.xsync_upper_limit = None -            if getattr(g2, "xsync_upper_limit", None) is not None: -                g2.xsync_upper_limit = None - -            if getattr(g3, "xsync_upper_limit", None) is not None: -                g3.xsync_upper_limit = None +                g1.crawlwrap(oneshot=True, no_stime_update=True, +                             register_time=register_time)              # crawl loop: Try changelog crawl, if failed              # switch to FS crawl  | 
