diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 93 |
1 files changed, 75 insertions, 18 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 4301396f9f4..3047c99050e 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -25,6 +25,7 @@ from tempfile import NamedTemporaryFile from syncdutils import Thread, GsyncdError, boolify, escape from syncdutils import unescape, select, gauxpfx, md5hex, selfkill from syncdutils import lstat, errno_wrap +from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable URXTIME = (-1, 0) @@ -904,7 +905,7 @@ class GMasterChangelogMixin(GMasterCommon): if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) - map(self.master.server.changelog_done, changes) + map(self.changelog_done_func, changes) self.update_worker_files_syncd() break @@ -923,7 +924,7 @@ class GMasterChangelogMixin(GMasterCommon): if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) - map(self.master.server.changelog_done, changes) + map(self.changelog_done_func, changes) break # it's either entry_ops() or Rsync that failed to do it's # job. Mostly it's entry_ops() [which currently has a problem @@ -1106,12 +1107,9 @@ class GMasterChangelogMixin(GMasterCommon): purge_time = self.xtime('.', self.slave) if isinstance(purge_time, int): purge_time = None - try: - self.master.server.changelog_scan() - self.crawls += 1 - except OSError: - self.fallback_xsync() - self.update_worker_crawl_status("Hybrid Crawl") + + self.master.server.changelog_scan() + self.crawls += 1 changes = self.master.server.changelog_getchanges() if changes: if purge_time: @@ -1124,23 +1122,82 @@ class GMasterChangelogMixin(GMasterCommon): os.path.basename(pr)) self.master.server.changelog_done(pr) changes.remove(pr) - logging.debug('processing changes %s' % repr(changes)) + if changes: + logging.debug('processing changes %s' % repr(changes)) self.process(changes) def register(self): (workdir, logfile) = self.setup_working_dir() self.sleep_interval = int(gconf.change_interval) + self.changelog_done_func = self.master.server.changelog_done # register with the changelog library - try: - # 9 == log level (DEBUG) - # 5 == connection retries - self.master.server.changelog_register(gconf.local_path, - workdir, logfile, 9, 5) - except OSError: - self.fallback_xsync() - # control should not reach here - raise + # 9 == log level (DEBUG) + # 5 == connection retries + self.master.server.changelog_register(gconf.local_path, + workdir, logfile, 9, 5) + + +class GMasterChangeloghistoryMixin(GMasterChangelogMixin): + def register(self): + super(GMasterChangeloghistoryMixin, self).register() + self.changelog_register_time = int(time.time()) + self.changelog_done_func = self.master.server.history_changelog_done + + def crawl(self): + self.update_worker_crawl_status("History Crawl") + + # get stime (from the brick) and purge changelogs + # that are _historical_ to that time. + purge_time = self.xtime('.', self.slave) + if isinstance(purge_time, int): + purge_time = None + + if not purge_time or purge_time == URXTIME: + raise NoPurgeTimeAvailable() + + logging.debug("Get changelog history between %s and %s" % + (purge_time[0], self.changelog_register_time)) + + # Changelogs backend path is hardcoded as + # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different + # location then consuming history will not work(Known issue as of now) + changelog_path = os.path.join(gconf.local_path, + ".glusterfs/changelogs") + ts = self.master.server.history_changelog(changelog_path, + purge_time[0], + self.changelog_register_time) + + # scan followed by getchanges till scan returns zero. + # history_changelog_scan() is blocking call, till it gets the number + # of changelogs to process. Returns zero when no changelogs + # to be processed. returns positive value as number of changelogs + # to be processed, which will be fetched using + # history_changelog_getchanges() + while self.master.server.history_changelog_scan() > 0: + self.crawls += 1 + + changes = self.master.server.history_changelog_getchanges() + if changes: + if purge_time: + logging.info("slave's time: %s" % repr(purge_time)) + processed = [x for x in changes + if int(x.split('.')[-1]) < purge_time[0]] + for pr in processed: + logging.info('skipping already processed change: ' + '%s...' % os.path.basename(pr)) + self.changelog_done_func(pr) + changes.remove(pr) + + if changes: + logging.debug('processing changes %s' % repr(changes)) + self.process(changes) + + # If TS returned from history_changelog is < register_time + # then FS crawl may be required, since history is only available + # till TS returned from history_changelog + if ts < self.changelog_register_time: + raise PartialHistoryAvailable(str(ts)) class GMasterXsyncMixin(GMasterChangelogMixin): |