summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/libgfchangelog.py62
-rw-r--r--geo-replication/syncdaemon/master.py93
-rw-r--r--geo-replication/syncdaemon/resource.py66
-rw-r--r--geo-replication/syncdaemon/syncdutils.py8
4 files changed, 198 insertions, 31 deletions
diff --git a/geo-replication/syncdaemon/libgfchangelog.py b/geo-replication/syncdaemon/libgfchangelog.py
index ec563b36f29..0fa32a73499 100644
--- a/geo-replication/syncdaemon/libgfchangelog.py
+++ b/geo-replication/syncdaemon/libgfchangelog.py
@@ -13,6 +13,10 @@ from ctypes import CDLL, create_string_buffer, get_errno
from ctypes.util import find_library
+class ChangelogException(OSError):
+ pass
+
+
class Changes(object):
libgfc = CDLL(find_library("gfchangelog"), use_errno=True)
@@ -21,9 +25,9 @@ class Changes(object):
return get_errno()
@classmethod
- def raise_oserr(cls):
+ def raise_changelog_err(cls):
errn = cls.geterrno()
- raise OSError(errn, os.strerror(errn))
+ raise ChangelogException(errn, os.strerror(errn))
@classmethod
def _get_api(cls, call):
@@ -35,19 +39,19 @@ class Changes(object):
log_file,
log_level, retries)
if ret == -1:
- cls.raise_oserr()
+ cls.raise_changelog_err()
@classmethod
def cl_scan(cls):
ret = cls._get_api('gf_changelog_scan')()
if ret == -1:
- cls.raise_oserr()
+ cls.raise_changelog_err()
@classmethod
def cl_startfresh(cls):
ret = cls._get_api('gf_changelog_start_fresh')()
if ret == -1:
- cls.raise_oserr()
+ cls.raise_changelog_err()
@classmethod
def cl_getchanges(cls):
@@ -64,7 +68,7 @@ class Changes(object):
break
changes.append(buf.raw[:ret - 1])
if ret == -1:
- cls.raise_oserr()
+ cls.raise_changelog_err()
# cleanup tracker
cls.cl_startfresh()
return sorted(changes, key=clsort)
@@ -73,4 +77,48 @@ class Changes(object):
def cl_done(cls, clfile):
ret = cls._get_api('gf_changelog_done')(clfile)
if ret == -1:
- cls.raise_oserr()
+ cls.raise_changelog_err()
+
+ @classmethod
+ def cl_history_scan(cls):
+ ret = cls._get_api('gf_history_changelog_scan')()
+ if ret == -1:
+ cls.raise_changelog_err()
+
+ return ret
+
+ @classmethod
+ def cl_history_changelog(cls, changelog_path, start, end):
+ ret = cls._get_api('gf_history_changelog')(changelog_path, start, end)
+ if ret == -1:
+ cls.raise_changelog_err()
+
+ return ret
+
+ @classmethod
+ def cl_history_startfresh(cls):
+ ret = cls._get_api('gf_history_changelog_start_fresh')()
+ if ret == -1:
+ cls.raise_changelog_err()
+
+ @classmethod
+ def cl_history_getchanges(cls):
+ changes = []
+ buf = create_string_buffer('\0', 4096)
+ call = cls._get_api('gf_history_changelog_next_change')
+
+ while True:
+ ret = call(buf, 4096)
+ if ret in (0, -1):
+ break
+ changes.append(buf.raw[:ret - 1])
+ if ret == -1:
+ cls.raise_changelog_err()
+
+ return changes
+
+ @classmethod
+ def cl_history_done(cls, clfile):
+ ret = cls._get_api('gf_history_changelog_done')(clfile)
+ if ret == -1:
+ cls.raise_changelog_err()
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 4301396f9f4..3047c99050e 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -25,6 +25,7 @@ from tempfile import NamedTemporaryFile
from syncdutils import Thread, GsyncdError, boolify, escape
from syncdutils import unescape, select, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap
+from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
URXTIME = (-1, 0)
@@ -904,7 +905,7 @@ class GMasterChangelogMixin(GMasterCommon):
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
- map(self.master.server.changelog_done, changes)
+ map(self.changelog_done_func, changes)
self.update_worker_files_syncd()
break
@@ -923,7 +924,7 @@ class GMasterChangelogMixin(GMasterCommon):
if done:
xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
- map(self.master.server.changelog_done, changes)
+ map(self.changelog_done_func, changes)
break
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
@@ -1106,12 +1107,9 @@ class GMasterChangelogMixin(GMasterCommon):
purge_time = self.xtime('.', self.slave)
if isinstance(purge_time, int):
purge_time = None
- try:
- self.master.server.changelog_scan()
- self.crawls += 1
- except OSError:
- self.fallback_xsync()
- self.update_worker_crawl_status("Hybrid Crawl")
+
+ self.master.server.changelog_scan()
+ self.crawls += 1
changes = self.master.server.changelog_getchanges()
if changes:
if purge_time:
@@ -1124,23 +1122,82 @@ class GMasterChangelogMixin(GMasterCommon):
os.path.basename(pr))
self.master.server.changelog_done(pr)
changes.remove(pr)
- logging.debug('processing changes %s' % repr(changes))
+
if changes:
+ logging.debug('processing changes %s' % repr(changes))
self.process(changes)
def register(self):
(workdir, logfile) = self.setup_working_dir()
self.sleep_interval = int(gconf.change_interval)
+ self.changelog_done_func = self.master.server.changelog_done
# register with the changelog library
- try:
- # 9 == log level (DEBUG)
- # 5 == connection retries
- self.master.server.changelog_register(gconf.local_path,
- workdir, logfile, 9, 5)
- except OSError:
- self.fallback_xsync()
- # control should not reach here
- raise
+ # 9 == log level (DEBUG)
+ # 5 == connection retries
+ self.master.server.changelog_register(gconf.local_path,
+ workdir, logfile, 9, 5)
+
+
+class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
+ def register(self):
+ super(GMasterChangeloghistoryMixin, self).register()
+ self.changelog_register_time = int(time.time())
+ self.changelog_done_func = self.master.server.history_changelog_done
+
+ def crawl(self):
+ self.update_worker_crawl_status("History Crawl")
+
+ # get stime (from the brick) and purge changelogs
+ # that are _historical_ to that time.
+ purge_time = self.xtime('.', self.slave)
+ if isinstance(purge_time, int):
+ purge_time = None
+
+ if not purge_time or purge_time == URXTIME:
+ raise NoPurgeTimeAvailable()
+
+ logging.debug("Get changelog history between %s and %s" %
+ (purge_time[0], self.changelog_register_time))
+
+ # Changelogs backend path is hardcoded as
+ # <BRICK_PATH>/.glusterfs/changelogs, if user configured to different
+ # location then consuming history will not work(Known issue as of now)
+ changelog_path = os.path.join(gconf.local_path,
+ ".glusterfs/changelogs")
+ ts = self.master.server.history_changelog(changelog_path,
+ purge_time[0],
+ self.changelog_register_time)
+
+ # scan followed by getchanges till scan returns zero.
+ # history_changelog_scan() is blocking call, till it gets the number
+ # of changelogs to process. Returns zero when no changelogs
+ # to be processed. returns positive value as number of changelogs
+ # to be processed, which will be fetched using
+ # history_changelog_getchanges()
+ while self.master.server.history_changelog_scan() > 0:
+ self.crawls += 1
+
+ changes = self.master.server.history_changelog_getchanges()
+ if changes:
+ if purge_time:
+ logging.info("slave's time: %s" % repr(purge_time))
+ processed = [x for x in changes
+ if int(x.split('.')[-1]) < purge_time[0]]
+ for pr in processed:
+ logging.info('skipping already processed change: '
+ '%s...' % os.path.basename(pr))
+ self.changelog_done_func(pr)
+ changes.remove(pr)
+
+ if changes:
+ logging.debug('processing changes %s' % repr(changes))
+ self.process(changes)
+
+ # If TS returned from history_changelog is < register_time
+ # then FS crawl may be required, since history is only available
+ # till TS returned from history_changelog
+ if ts < self.changelog_register_time:
+ raise PartialHistoryAvailable(str(ts))
class GMasterXsyncMixin(GMasterChangelogMixin):
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index e3cf33ffdc5..aaf257e9c71 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -33,6 +33,8 @@ from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
+from syncdutils import NoPurgeTimeAvailable, PartialHistoryAvailable
+from libgfchangelog import ChangelogException
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
@@ -683,6 +685,22 @@ class Server(object):
Changes.cl_done(clfile)
@classmethod
+ def history_changelog(cls, changelog_path, start, end):
+ return Changes.cl_history_changelog(changelog_path, start, end)
+
+ @classmethod
+ def history_changelog_scan(cls):
+ return Changes.cl_history_scan()
+
+ @classmethod
+ def history_changelog_getchanges(cls):
+ return Changes.cl_history_getchanges()
+
+ @classmethod
+ def history_changelog_done(cls, clfile):
+ Changes.cl_history_done(clfile)
+
+ @classmethod
@_pathguard
def setattr(cls, path, adct):
"""set file attributes
@@ -1213,7 +1231,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
"""return a tuple of the 'one shot' and the 'main crawl'
class instance"""
return (gmaster_builder('xsync')(self, slave),
- gmaster_builder()(self, slave))
+ gmaster_builder()(self, slave),
+ gmaster_builder('changeloghistory')(self, slave))
def service_loop(self, *args):
"""enter service loop
@@ -1277,20 +1296,55 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
mark)
),
slave.server)
- (g1, g2) = self.gmaster_instantiate_tuple(slave)
+ (g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
g1.master.server = brickserver
g2.master.server = brickserver
+ g3.master.server = brickserver
else:
- (g1, g2) = self.gmaster_instantiate_tuple(slave)
+ (g1, g2, g3) = self.gmaster_instantiate_tuple(slave)
g1.master.server.aggregated = gmaster.master.server
g2.master.server.aggregated = gmaster.master.server
+ g3.master.server.aggregated = gmaster.master.server
# bad bad bad: bad way to do things like this
# need to make this elegant
# register the crawlers and start crawling
+ # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default)
+ # g3 ==> changelog History
g1.register()
- g2.register()
- g1.crawlwrap(oneshot=True)
- g2.crawlwrap()
+ try:
+ g2.register()
+ g3.register()
+ except ChangelogException as e:
+ logging.debug("Changelog register failed: %s - %s" %
+ (e.errno, e.strerror))
+
+ # oneshot: Try to use changelog history api, if not
+ # available switch to FS crawl
+ # Note: if config.change_detector is xsync then
+ # it will not use changelog history api
+ try:
+ g3.crawlwrap(oneshot=True)
+ except (ChangelogException, NoPurgeTimeAvailable,
+ PartialHistoryAvailable) as e:
+ if isinstance(e, ChangelogException):
+ logging.debug('Changelog history crawl failed, failback '
+ 'to xsync: %s - %s' % (e.errno, e.strerror))
+ elif isinstance(e, NoPurgeTimeAvailable):
+ logging.debug('Using xsync crawl since no purge time '
+ 'available')
+ elif isinstance(e, PartialHistoryAvailable):
+ logging.debug('Using xsync crawl after consuming history '
+ 'till %s' % str(e))
+ g1.crawlwrap(oneshot=True)
+
+ # crawl loop: Try changelog crawl, if failed
+ # switch to FS crawl
+ try:
+ g2.crawlwrap()
+ except ChangelogException as e:
+ logging.debug('Changelog crawl failed, failback to xsync: '
+ '%s - %s' % (e.errno, e.strerror))
+ g1.crawlwrap()
else:
sup(self, *args)
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 822d919ecb1..d4ded39f562 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -488,3 +488,11 @@ def lstat(e):
return ex.errno
else:
raise
+
+
+class NoPurgeTimeAvailable(Exception):
+ pass
+
+
+class PartialHistoryAvailable(Exception):
+ pass