diff options
author | Aravinda VK <avishwan@redhat.com> | 2014-08-08 15:06:11 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2014-08-19 23:27:26 -0700 |
commit | d0a3b4307e40b70d73faf4baed9fa0c930682894 (patch) | |
tree | 2710140e3a1b38eb0fed69be7b1dc0cd69e0a012 /geo-replication | |
parent | 9b5231e5c98b8cfa116838287c7a14042702795f (diff) |
geo-rep: Fixing issue with xsync upper limit
While identifying the file/dir to sync, xtime of the file was compared
with xsync_upper_limit as `xtime < xsync_upper_limit` After the sync,
xtime of parent directory is updated as stime. With the upper limit
condition, stime is updated as MIN(xtime_parent, xsync_upper_limit)
With this files will get missed if `xtime_of_file == xsync_upper_limit`
With this patch xtime_of_file is compared as
xtime_of_file <= xsync_upper_limit
BUG: 1128093
Change-Id: I5022ce67ba503ed4621531a649a16fc06b2229d9
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/8439
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 73 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 36 |
2 files changed, 50 insertions, 59 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index e3904736ba2..79c90630dca 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -168,10 +168,10 @@ class NormalMixin(object): raise GsyncdError("timestamp corruption for " + path) def need_sync(self, e, xte, xtrd): - if self.xsync_upper_limit is None: - return xte > xtrd + if self.xsync_upper_limit: + return xte > xtrd and xte <= self.xsync_upper_limit else: - return xte > xtrd and xte < self.xsync_upper_limit + return xte > xtrd def set_slave_xtime(self, path, mark): self.slave.server.set_stime(path, self.uuid, mark) @@ -442,13 +442,24 @@ class GMasterCommon(object): def register(self): self.register() - def crawlwrap(self, oneshot=False, no_stime_update=False): + def crawlwrap(self, oneshot=False, no_stime_update=False, + register_time=None): if oneshot: # it's important to do this during the oneshot crawl as # for a passive gsyncd (ie. in a replicate scenario) # the keepalive thread would keep the connection alive. self.init_keep_alive() + # If crawlwrap is called when partial history available, + # then it sets register_time which is the time when geo-rep + # worker registerd to changelog consumption. Since nsec is + # not considered in register time, their are chances of skipping + # changes detection in xsync crawl. Add 1 sec to upper_limit. + # This limit will be reset when crawlwrap is called again. + self.xsync_upper_limit = None + if register_time: + self.xsync_upper_limit = (register_time + 1, 0) + # no need to maintain volinfo state machine. # in a cascading setup, each geo-replication session is # independent (ie. 'volume-mark' and 'xtime' are not @@ -757,6 +768,12 @@ class GMasterChangelogMixin(GMasterCommon): logging.debug('changelog working dir %s' % workdir) return workdir + def get_purge_time(self): + purge_time = self.xtime('.', self.slave) + if isinstance(purge_time, int): + purge_time = None + return purge_time + def process_change(self, change, done, retry): pfx = gauxpfx() clist = [] @@ -1135,9 +1152,7 @@ class GMasterChangelogMixin(GMasterCommon): changes = [] # get stime (from the brick) and purge changelogs # that are _historical_ to that time. - purge_time = self.xtime('.', self.slave) - if isinstance(purge_time, int): - purge_time = None + purge_time = self.get_purge_time() self.changelog_agent.scan() self.crawls += 1 @@ -1175,14 +1190,10 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): def crawl(self, no_stime_update=False): self.history_turns += 1 self.update_worker_crawl_status("History Crawl") - logging.info('starting history crawl... turns: %s' % - self.history_turns) + purge_time = self.get_purge_time() - # get stime (from the brick) and purge changelogs - # that are _historical_ to that time. - purge_time = self.xtime('.', self.slave) - if isinstance(purge_time, int): - purge_time = None + logging.info('starting history crawl... turns: %s, stime: %s' + % (self.history_turns, repr(purge_time))) if not purge_time or purge_time == URXTIME: logging.info("stime not available, abandoning history crawl") @@ -1226,8 +1237,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): history_turn_time = int(time.time()) - self.history_crawl_start_time - logging.info('finished history crawl syncing between %s - %s.' % - (purge_time[0], actual_end)) + logging.info('finished history crawl syncing, endtime: %s, stime: %s' + % (actual_end, repr(self.get_purge_time()))) # If TS returned from history_changelog is < register_time # then FS crawl may be required, since history is only available @@ -1260,7 +1271,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): XSYNC_MAX_ENTRIES = 1 << 13 - def register(self, register_time, changelog_agent=None): + def register(self, register_time=None, changelog_agent=None): self.counter = 0 self.comlist = [] self.stimes = [] @@ -1277,17 +1288,6 @@ class GMasterXsyncMixin(GMasterChangelogMixin): else: raise - # After changelogs history processing completes, it switches - # to xsync/hibrid crawl if history actual end time is less than - # live changelog register time. Xsync should only run for that - # small gap, ie.. changelog_register_time - history_actual_end_time - # If we don't have upper limit to limit the XSync change detection - # It will keep on detecting the files even though changelogs are - # available for the same. Set upper limit during register - # and reset at the end of each crawl. Reseting at the end of - # crawl is required if change_detector is set to xsync. - self.xsync_upper_limit = (register_time, 0) - def crawl(self, no_stime_update=False): """ event dispatcher thread @@ -1299,18 +1299,15 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.Xcrawl() t = Thread(target=Xsyncer) t.start() - logging.info('starting hybrid crawl...') + logging.info('starting hybrid crawl..., stime: %s' + % repr(self.get_purge_time())) self.update_worker_crawl_status("Hybrid Crawl") while True: try: item = self.comlist.pop(0) if item[0] == 'finale': - if self.xsync_upper_limit is not None: - logging.info('finished hybrid crawl syncing, endtime: ' - '%s' % self.xsync_upper_limit[0]) - else: - logging.info('finished hybrid crawl syncing') - + logging.info('finished hybrid crawl syncing, stime: %s' + % repr(self.get_purge_time())) break elif item[0] == 'xsync': logging.info('processing xsync changelog %s' % (item[1])) @@ -1333,8 +1330,6 @@ class GMasterXsyncMixin(GMasterChangelogMixin): except IndexError: time.sleep(1) - self.xsync_upper_limit = None - def write_entry_change(self, prefix, data=[]): self.fh.write("%s %s\n" % (prefix, ' '.join(data))) @@ -1460,7 +1455,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): str(st.st_gid), str(st.st_mode)]) self.Xcrawl(e, xtr_root) stime_to_update = xte - if self.xsync_upper_limit is not None: + if self.xsync_upper_limit: stime_to_update = min(self.xsync_upper_limit, xte) self.stimes.append((e, stime_to_update)) elif stat.S_ISLNK(mo): @@ -1487,7 +1482,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.write_entry_change("D", [gfid]) if path == '.': stime_to_update = xtl - if self.xsync_upper_limit is not None: + if self.xsync_upper_limit: stime_to_update = min(self.xsync_upper_limit, xtl) self.stimes.append((path, stime_to_update)) self.sync_done(self.stimes, True) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 09310c1f1aa..2a887daab15 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -1293,22 +1293,27 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): try: workdir = g2.setup_working_dir() - # register with the changelog library - # 9 == log level (DEBUG) - # 5 == connection retries - changelog_agent.register(gconf.local_path, - workdir, gconf.changelog_log_file, - g2.CHANGELOG_LOG_LEVEL, - g2.CHANGELOG_CONN_RETRIES) + # Register only when change_detector is not set to + # xsync, else agent will generate changelog files + # in .processing directory of working dir + if gconf.change_detector != 'xsync': + # register with the changelog library + # 9 == log level (DEBUG) + # 5 == connection retries + changelog_agent.register(gconf.local_path, + workdir, gconf.changelog_log_file, + g2.CHANGELOG_LOG_LEVEL, + g2.CHANGELOG_CONN_RETRIES) + register_time = int(time.time()) g2.register(register_time, changelog_agent) g3.register(register_time, changelog_agent) except ChangelogException: changelog_register_failed = True - register_time = int(time.time()) + register_time = None logging.info("Changelog register failed, fallback to xsync") - g1.register(register_time) + g1.register() logging.info("Register time: %s" % register_time) # oneshot: Try to use changelog history api, if not # available switch to FS crawl @@ -1328,17 +1333,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): logging.info('Partial history available, using xsync crawl' ' after consuming history ' 'till %s' % str(e)) - g1.crawlwrap(oneshot=True, no_stime_update=True) - - # Reset xsync upper limit. g2, g3 are changelog and history - # instances, but if change_detector is set to xsync then - # g1, g2, g3 will be xsync instances. - g1.xsync_upper_limit = None - if getattr(g2, "xsync_upper_limit", None) is not None: - g2.xsync_upper_limit = None - - if getattr(g3, "xsync_upper_limit", None) is not None: - g3.xsync_upper_limit = None + g1.crawlwrap(oneshot=True, no_stime_update=True, + register_time=register_time) # crawl loop: Try changelog crawl, if failed # switch to FS crawl |