diff options
Diffstat (limited to 'geo-replication')
-rw-r--r-- | geo-replication/syncdaemon/libgfchangelog.py | 62 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 93 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 66 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 8 |
4 files changed, 198 insertions, 31 deletions
diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py index ec563b36f29..0fa32a73499 100644 --- a/geo-replication/syncdaemon/libgfchangelog.py +++ b/geo-replication/syncdaemon/libgfchangelog.py @@ -13,6 +13,10 @@ from ctypes import CDLL, create_string_buffer, get_errno from ctypes.util import find_library +class ChangelogException(OSError): + pass + + class Changes(object): libgfc = CDLL(find_library("gfchangelog"), use_errno=True) @@ -21,9 +25,9 @@ class Changes(object): return get_errno() @classmethod - def raise_oserr(cls): + def raise_changelog_err(cls): errn = cls.geterrno() - raise OSError(errn, os.strerror(errn)) + raise ChangelogException(errn, os.strerror(errn)) @classmethod def _get_api(cls, call): @@ -35,19 +39,19 @@ class Changes(object): log_file, log_level, retries) if ret == -1: - cls.raise_oserr() + cls.raise_changelog_err() @classmethod def cl_scan(cls): ret = cls._get_api('gf_changelog_scan')() if ret == -1: - cls.raise_oserr() + cls.raise_changelog_err() @classmethod def cl_startfresh(cls): ret = cls._get_api('gf_changelog_start_fresh')() if ret == -1: - cls.raise_oserr() + cls.raise_changelog_err() @classmethod def cl_getchanges(cls): @@ -64,7 +68,7 @@ class Changes(object): break changes.append(buf.raw[:ret - 1]) if ret == -1: - cls.raise_oserr() + cls.raise_changelog_err() # cleanup tracker cls.cl_startfresh() return sorted(changes, key=clsort) @@ -73,4 +77,48 @@ class Changes(object): def cl_done(cls, clfile): ret = cls._get_api('gf_changelog_done')(clfile) if ret == -1: - cls.raise_oserr() + cls.raise_changelog_err() + + @classmethod + def cl_history_scan(cls): + ret = cls._get_api('gf_history_changelog_scan')() + if ret == -1: + cls.raise_changelog_err() + + return ret + + @classmethod + def cl_history_changelog(cls, changelog_path, start, end): + ret = cls._get_api('gf_history_changelog')(changelog_path, start, end) + if ret == -1: + cls.raise_changelog_err() + + return ret + + @classmethod + def cl_history_startfresh(cls): + ret = cls._get_api('gf_history_changelog_start_fresh')() + if ret == -1: + cls.raise_changelog_err() + + @classmethod + def cl_history_getchanges(cls): + changes = [] + buf = create_string_buffer('\0', 4096) + call = cls._get_api('gf_history_changelog_next_change') + + while True: + ret = call(buf, 4096) + if ret in (0, -1): + break + changes.append(buf.raw[:ret - 1]) + if ret == -1: + cls.raise_changelog_err() + + return changes + + @classmethod + def cl_history_done(cls, clfile): + ret = cls._get_api('gf_history_changelog_done')(clfile) + if ret == -1: + cls.raise_changelog_err() diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 4301396f9f4..3047c99050e 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -25,6 +25,7 @@ from tempfile import NamedTemporaryFile from syncdutils import Thread, GsyncdError, boolify, escape from syncdutils import unescape, select, gauxpfx, md5hex, selfkill from syncdutils import lstat, errno_wrap +from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable URXTIME = (-1, 0) @@ -904,7 +905,7 @@ class GMasterChangelogMixin(GMasterCommon): if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) - map(self.master.server.changelog_done, changes) + map(self.changelog_done_func, changes) self.update_worker_files_syncd() break @@ -923,7 +924,7 @@ class GMasterChangelogMixin(GMasterCommon): if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) - map(self.master.server.changelog_done, changes) + map(self.changelog_done_func, changes) break # it's either entry_ops() or Rsync that failed to do it's # job. Mostly it's entry_ops() [which currently has a problem @@ -1106,12 +1107,9 @@ class GMasterChangelogMixin(GMasterCommon): purge_time = self.xtime('.', self.slave) if isinstance(purge_time, int): purge_time = None - try: - self.master.server.changelog_scan() - self.crawls += 1 - except OSError: - self.fallback_xsync() - self.update_worker_crawl_status("Hybrid Crawl") + + self.master.server.changelog_scan() + self.crawls += 1 changes = self.master.server.changelog_getchanges() if changes: if purge_time: @@ -1124,23 +1122,82 @@ class GMasterChangelogMixin(GMasterCommon): os.path.basename(pr)) self.master.server.changelog_done(pr) changes.remove(pr) - logging.debug('processing changes %s' % repr(changes)) + if changes: + logging.debug('processing changes %s' % repr(changes)) self.process(changes) def register(self): (workdir, logfile) = self.setup_working_dir() self.sleep_interval = int(gconf.change_interval) + self.changelog_done_func = self.master.server.changelog_done # register with the changelog library - try: - # 9 == log level (DEBUG) - # 5 == connection retries - self.master.server.changelog_register(gconf.local_path, - workdir, logfile, 9, 5) - except OSError: - self.fallback_xsync() - # control should not reach here - raise + # 9 == log level (DEBUG) + # 5 == connection retries + self.master.server.changelog_register(gconf.local_path, + workdir, logfile, 9, 5) + + +class GMasterChangeloghistoryMixin(GMasterChangelogMixin): + def register(self): + super(GMasterChangeloghistoryMixin, self).register() + self.changelog_register_time = int(time.time()) + self.changelog_done_func = self.master.server.history_changelog_done + + def crawl(self): + self.update_worker_crawl_status("History Crawl") + + # get stime (from the brick) and purge changelogs + # that are _historical_ to that time. + purge_time = self.xtime('.', self.slave) + if isinstance(purge_time, int): + purge_time = None + + if not purge_time or purge_time == URXTIME: + raise NoPurgeTimeAvailable() + + logging.debug("Get changelog history between %s and %s" % + (purge_time[0], self.changelog_register_time)) + + # Changelogs backend path is hardcoded as + # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different + # location then consuming history will not work(Known issue as of now) + changelog_path = os.path.join(gconf.local_path, + ".glusterfs/changelogs") + ts = self.master.server.history_changelog(changelog_path, + purge_time[0], + self.changelog_register_time) + + # scan followed by getchanges till scan returns zero. + # history_changelog_scan() is blocking call, till it gets the number + # of changelogs to process. Returns zero when no changelogs + # to be processed. returns positive value as number of changelogs + # to be processed, which will be fetched using + # history_changelog_getchanges() + while self.master.server.history_changelog_scan() > 0: + self.crawls += 1 + + changes = self.master.server.history_changelog_getchanges() + if changes: + if purge_time: + logging.info("slave's time: %s" % repr(purge_time)) + processed = [x for x in changes + if int(x.split('.')[-1]) < purge_time[0]] + for pr in processed: + logging.info('skipping already processed change: ' + '%s...' % os.path.basename(pr)) + self.changelog_done_func(pr) + changes.remove(pr) + + if changes: + logging.debug('processing changes %s' % repr(changes)) + self.process(changes) + + # If TS returned from history_changelog is < register_time + # then FS crawl may be required, since history is only available + # till TS returned from history_changelog + if ts < self.changelog_register_time: + raise PartialHistoryAvailable(str(ts)) class GMasterXsyncMixin(GMasterChangelogMixin): diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index e3cf33ffdc5..aaf257e9c71 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -33,6 +33,8 @@ from master import gmaster_builder import syncdutils from syncdutils import GsyncdError, select, privileged, boolify, funcode from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat +from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable +from libgfchangelog import ChangelogException UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) @@ -683,6 +685,22 @@ class Server(object): Changes.cl_done(clfile) @classmethod + def history_changelog(cls, changelog_path, start, end): + return Changes.cl_history_changelog(changelog_path, start, end) + + @classmethod + def history_changelog_scan(cls): + return Changes.cl_history_scan() + + @classmethod + def history_changelog_getchanges(cls): + return Changes.cl_history_getchanges() + + @classmethod + def history_changelog_done(cls, clfile): + Changes.cl_history_done(clfile) + + @classmethod @_pathguard def setattr(cls, path, adct): """set file attributes @@ -1213,7 +1231,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): """return a tuple of the 'one shot' and the 'main crawl' class instance""" return (gmaster_builder('xsync')(self, slave), - gmaster_builder()(self, slave)) + gmaster_builder()(self, slave), + gmaster_builder('changeloghistory')(self, slave)) def service_loop(self, *args): """enter service loop @@ -1277,20 +1296,55 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): mark) ), slave.server) - (g1, g2) = self.gmaster_instantiate_tuple(slave) + (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) g1.master.server = brickserver g2.master.server = brickserver + g3.master.server = brickserver else: - (g1, g2) = self.gmaster_instantiate_tuple(slave) + (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) g1.master.server.aggregated = gmaster.master.server g2.master.server.aggregated = gmaster.master.server + g3.master.server.aggregated = gmaster.master.server # bad bad bad: bad way to do things like this # need to make this elegant # register the crawlers and start crawling + # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) + # g3 ==> changelog History g1.register() - g2.register() - g1.crawlwrap(oneshot=True) - g2.crawlwrap() + try: + g2.register() + g3.register() + except ChangelogException as e: + logging.debug("Changelog register failed: %s - %s" % + (e.errno, e.strerror)) + + # oneshot: Try to use changelog history api, if not + # available switch to FS crawl + # Note: if config.change_detector is xsync then + # it will not use changelog history api + try: + g3.crawlwrap(oneshot=True) + except (ChangelogException, NoPurgeTimeAvailable, + PartialHistoryAvailable) as e: + if isinstance(e, ChangelogException): + logging.debug('Changelog history crawl failed, failback ' + 'to xsync: %s - %s' % (e.errno, e.strerror)) + elif isinstance(e, NoPurgeTimeAvailable): + logging.debug('Using xsync crawl since no purge time ' + 'available') + elif isinstance(e, PartialHistoryAvailable): + logging.debug('Using xsync crawl after consuming history ' + 'till %s' % str(e)) + g1.crawlwrap(oneshot=True) + + # crawl loop: Try changelog crawl, if failed + # switch to FS crawl + try: + g2.crawlwrap() + except ChangelogException as e: + logging.debug('Changelog crawl failed, failback to xsync: ' + '%s - %s' % (e.errno, e.strerror)) + g1.crawlwrap() else: sup(self, *args) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 822d919ecb1..d4ded39f562 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -488,3 +488,11 @@ def lstat(e): return ex.errno else: raise + + +class NoPurgeTimeAvailable(Exception): + pass + + +class PartialHistoryAvailable(Exception): + pass |