diff options
-rw-r--r-- | geo-replication/syncdaemon/master.py | 101 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 55 |
2 files changed, 118 insertions, 38 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 2decc5de930..e3904736ba2 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -29,6 +29,14 @@ from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable URXTIME = (-1, 0) +# Default rollover time set in changelog translator +# changelog rollover time is hardcoded here to avoid the +# xsync usage when crawling switch happens from history +# to changelog. If rollover time increased in translator +# then geo-rep can enter into xsync crawl after history +# crawl before starting live changelog crawl. +CHANGELOG_ROLLOVER_TIME = 15 + # Utility functions to help us to get to closer proximity # of the DRY principle (no, don't look for elevated or # perspectivistic things here) @@ -160,7 +168,10 @@ class NormalMixin(object): raise GsyncdError("timestamp corruption for " + path) def need_sync(self, e, xte, xtrd): - return xte > xtrd + if self.xsync_upper_limit is None: + return xte > xtrd + else: + return xte > xtrd and xte < self.xsync_upper_limit def set_slave_xtime(self, path, mark): self.slave.server.set_stime(path, self.uuid, mark) @@ -431,7 +442,7 @@ class GMasterCommon(object): def register(self): self.register() - def crawlwrap(self, oneshot=False): + def crawlwrap(self, oneshot=False, no_stime_update=False): if oneshot: # it's important to do this during the oneshot crawl as # for a passive gsyncd (ie. in a replicate scenario) @@ -499,7 +510,7 @@ class GMasterCommon(object): time.sleep(5) continue self.update_worker_health("Active") - self.crawl() + self.crawl(no_stime_update=no_stime_update) if oneshot: return time.sleep(self.sleep_interval) @@ -1119,7 +1130,7 @@ class GMasterChangelogMixin(GMasterCommon): except: raise - def crawl(self): + def crawl(self, no_stime_update=False): self.update_worker_crawl_status("Changelog Crawl") changes = [] # get stime (from the brick) and purge changelogs @@ -1147,20 +1158,25 @@ class GMasterChangelogMixin(GMasterCommon): logging.debug('processing changes %s' % repr(changes)) self.process(changes) - def register(self, changelog_agent): + def register(self, register_time, changelog_agent): self.changelog_agent = changelog_agent self.sleep_interval = int(gconf.change_interval) self.changelog_done_func = self.changelog_agent.done class GMasterChangeloghistoryMixin(GMasterChangelogMixin): - def register(self, changelog_agent): + def register(self, register_time, changelog_agent): self.changelog_agent = changelog_agent - self.changelog_register_time = int(time.time()) + self.changelog_register_time = register_time + self.history_crawl_start_time = register_time self.changelog_done_func = self.changelog_agent.history_done + self.history_turns = 0 - def crawl(self): + def crawl(self, no_stime_update=False): + self.history_turns += 1 self.update_worker_crawl_status("History Crawl") + logging.info('starting history crawl... turns: %s' % + self.history_turns) # get stime (from the brick) and purge changelogs # that are _historical_ to that time. @@ -1169,11 +1185,9 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): purge_time = None if not purge_time or purge_time == URXTIME: + logging.info("stime not available, abandoning history crawl") raise NoPurgeTimeAvailable() - logging.debug("Get changelog history between %s and %s" % - (purge_time[0], self.changelog_register_time)) - # Changelogs backend path is hardcoded as # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different # location then consuming history will not work(Known issue as of now) @@ -1210,11 +1224,26 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): logging.debug('processing changes %s' % repr(changes)) self.process(changes) + history_turn_time = int(time.time()) - self.history_crawl_start_time + + logging.info('finished history crawl syncing between %s - %s.' % + (purge_time[0], actual_end)) + # If TS returned from history_changelog is < register_time # then FS crawl may be required, since history is only available # till TS returned from history_changelog if actual_end < self.changelog_register_time: - raise PartialHistoryAvailable(str(actual_end)) + if self.history_turns < 2: + sleep_time = 1 + if history_turn_time < CHANGELOG_ROLLOVER_TIME: + sleep_time = CHANGELOG_ROLLOVER_TIME - history_turn_time + time.sleep(sleep_time) + self.history_crawl_start_time = int(time.time()) + self.crawl() + else: + # This exeption will be catched in resource.py and + # fallback to xsync for the small gap. + raise PartialHistoryAvailable(str(actual_end)) class GMasterXsyncMixin(GMasterChangelogMixin): @@ -1231,7 +1260,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self, changelog_agent=None): + def register(self, register_time, changelog_agent=None): self.counter = 0 self.comlist = [] self.stimes = [] @@ -1248,7 +1277,18 @@ class GMasterXsyncMixin(GMasterChangelogMixin): else: raise - def crawl(self): + # After changelogs history processing completes, it switches + # to xsync/hibrid crawl if history actual end time is less than + # live changelog register time. Xsync should only run for that + # small gap, ie.. changelog_register_time - history_actual_end_time + # If we don't have upper limit to limit the XSync change detection + # It will keep on detecting the files even though changelogs are + # available for the same. Set upper limit during register + # and reset at the end of each crawl. Reseting at the end of + # crawl is required if change_detector is set to xsync. + self.xsync_upper_limit = (register_time, 0) + + def crawl(self, no_stime_update=False): """ event dispatcher thread @@ -1265,19 +1305,36 @@ class GMasterXsyncMixin(GMasterChangelogMixin): try: item = self.comlist.pop(0) if item[0] == 'finale': - logging.info('finished hybrid crawl syncing') + if self.xsync_upper_limit is not None: + logging.info('finished hybrid crawl syncing, endtime: ' + '%s' % self.xsync_upper_limit[0]) + else: + logging.info('finished hybrid crawl syncing') + break elif item[0] == 'xsync': logging.info('processing xsync changelog %s' % (item[1])) self.process([item[1]], 0) elif item[0] == 'stime': - logging.debug('setting slave time: %s' % repr(item[1])) - self.upd_stime(item[1][1], item[1][0]) + if not no_stime_update: + # xsync is started after running history but if + # history actual end time is less than register time + # then if we update stime, live changelog processing + # will skip the changelogs for which TS is less than + # stime. During this deletes and renames are not + # propogated. By not setting stime live changelog will + # start processing from the register time. Since we + # have xsync_upper_limit their will not be much + # overlap/redo of changelogs. + logging.debug('setting slave time: %s' % repr(item[1])) + self.upd_stime(item[1][1], item[1][0]) else: logging.warn('unknown tuple in comlist (%s)' % repr(item)) except IndexError: time.sleep(1) + self.xsync_upper_limit = None + def write_entry_change(self, prefix, data=[]): self.fh.write("%s %s\n" % (prefix, ' '.join(data))) @@ -1402,7 +1459,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.write_entry_change("M", [gfid, "SETATTR", str(st.st_uid), str(st.st_gid), str(st.st_mode)]) self.Xcrawl(e, xtr_root) - self.stimes.append((e, xte)) + stime_to_update = xte + if self.xsync_upper_limit is not None: + stime_to_update = min(self.xsync_upper_limit, xte) + self.stimes.append((e, stime_to_update)) elif stat.S_ISLNK(mo): self.write_entry_change( "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, @@ -1426,7 +1486,10 @@ class GMasterXsyncMixin(GMasterChangelogMixin): bname))]) self.write_entry_change("D", [gfid]) if path == '.': - self.stimes.append((path, xtl)) + stime_to_update = xtl + if self.xsync_upper_limit is not None: + stime_to_update = min(self.xsync_upper_limit, xtl) + self.stimes.append((path, stime_to_update)) self.sync_done(self.stimes, True) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index b537ff65003..c84265739c5 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -1267,6 +1267,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): # register the crawlers and start crawling # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) # g3 ==> changelog History + changelog_register_failed = False (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') os.close(int(ra)) os.close(int(wa)) @@ -1278,7 +1279,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): "local %s, remote %s" % (CHANGELOG_AGENT_CLIENT_VERSION, rv)) - g1.register() try: workdir = g2.setup_working_dir() # register with the changelog library @@ -1288,38 +1288,55 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): workdir, gconf.changelog_log_file, g2.CHANGELOG_LOG_LEVEL, g2.CHANGELOG_CONN_RETRIES) - g2.register(changelog_agent) - g3.register(changelog_agent) - except ChangelogException as e: - logging.debug("Changelog register failed: %s - %s" % - (e.errno, e.strerror)) - + register_time = int(time.time()) + g2.register(register_time, changelog_agent) + g3.register(register_time, changelog_agent) + except ChangelogException: + changelog_register_failed = True + register_time = int(time.time()) + logging.info("Changelog register failed, fallback to xsync") + + g1.register(register_time) + logging.info("Register time: %s" % register_time) # oneshot: Try to use changelog history api, if not # available switch to FS crawl # Note: if config.change_detector is xsync then # it will not use changelog history api try: - g3.crawlwrap(oneshot=True) + if not changelog_register_failed: + g3.crawlwrap(oneshot=True) + else: + g1.crawlwrap(oneshot=True) except (ChangelogException, NoPurgeTimeAvailable, PartialHistoryAvailable) as e: if isinstance(e, ChangelogException): - logging.debug('Changelog history crawl failed, failback ' - 'to xsync: %s - %s' % (e.errno, e.strerror)) - elif isinstance(e, NoPurgeTimeAvailable): - logging.debug('Using xsync crawl since no purge time ' - 'available') + logging.info('Changelog history crawl failed, fallback ' + 'to xsync: %s - %s' % (e.errno, e.strerror)) elif isinstance(e, PartialHistoryAvailable): - logging.debug('Using xsync crawl after consuming history ' - 'till %s' % str(e)) - g1.crawlwrap(oneshot=True) + logging.info('Partial history available, using xsync crawl' + ' after consuming history ' + 'till %s' % str(e)) + g1.crawlwrap(oneshot=True, no_stime_update=True) + + # Reset xsync upper limit. g2, g3 are changelog and history + # instances, but if change_detector is set to xsync then + # g1, g2, g3 will be xsync instances. + g1.xsync_upper_limit = None + if getattr(g2, "xsync_upper_limit", None) is not None: + g2.xsync_upper_limit = None + + if getattr(g3, "xsync_upper_limit", None) is not None: + g3.xsync_upper_limit = None # crawl loop: Try changelog crawl, if failed # switch to FS crawl try: - g2.crawlwrap() + if not changelog_register_failed: + g2.crawlwrap() + else: + g1.crawlwrap() except ChangelogException as e: - logging.debug('Changelog crawl failed, failback to xsync: ' - '%s - %s' % (e.errno, e.strerror)) + logging.info('Changelog crawl failed, fallback to xsync') g1.crawlwrap() else: sup(self, *args) |