diff options
-rw-r--r-- | geo-replication/syncdaemon/gsyncd.py | 2 | ||||
-rw-r--r-- | geo-replication/syncdaemon/master.py | 82 |
2 files changed, 79 insertions, 5 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index cb29903bf4f..84ad13487ce 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -252,6 +252,8 @@ def main_i(): op.add_option('--sync-jobs', metavar='N', type=int, default=3) op.add_option('--replica-failover-interval', metavar='N', type=int, default=1) + op.add_option('--changelog-archive-format', metavar='N', + type=str, default="%Y%m") op.add_option( '--turns', metavar='N', type=int, default=0, help=SUPPRESS_HELP) op.add_option('--allow-network', metavar='IPS', default='') diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 6f1b639e566..c092c526a0e 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -17,6 +17,7 @@ import logging import socket import string import errno +import tarfile from errno import ENOENT, ENODATA, EPIPE, EEXIST from threading import Condition, Lock from datetime import datetime @@ -533,6 +534,19 @@ class GMasterCommon(object): if brick_stime < cluster_stime: self.slave.server.set_stime( self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) + # Purge all changelogs available in processing dir + # less than cluster_stime + proc_dir = os.path.join(self.setup_working_dir(), + ".processing") + + if os.path.exists(proc_dir): + to_purge = [f for f in os.listdir(proc_dir) + if (f.startswith("CHANGELOG.") and + int(f.split('.')[-1]) < + cluster_stime[0])] + for f in to_purge: + os.remove(os.path.join(proc_dir, f)) + time.sleep(5) continue self.update_worker_health("Active") @@ -775,6 +789,47 @@ class GMasterChangelogMixin(GMasterCommon): CHANGELOG_LOG_LEVEL = 9 CHANGELOG_CONN_RETRIES = 5 + def archive_and_purge_changelogs(self, changelogs): + # Creates tar file instead of tar.gz, since changelogs will + # be appended to existing tar. archive name is + # archive_<YEAR><MONTH>.tar + archive_name = "archive_%s.tar" % datetime.today().strftime( + gconf.changelog_archive_format) + + try: + tar = tarfile.open(os.path.join(self.processed_changelogs_dir, + archive_name), + "a") + except tarfile.ReadError: + tar = tarfile.open(os.path.join(self.processed_changelogs_dir, + archive_name), + "w") + + for f in changelogs: + try: + f = os.path.basename(f) + tar.add(os.path.join(self.processed_changelogs_dir, f), + arcname=os.path.basename(f)) + except: + exc = sys.exc_info()[1] + if ((isinstance(exc, OSError) or + isinstance(exc, IOError)) and exc.errno == ENOENT): + continue + else: + tar.close() + raise + tar.close() + + for f in changelogs: + try: + f = os.path.basename(f) + os.remove(os.path.join(self.processed_changelogs_dir, f)) + except OSError as e: + if e.errno == errno.ENOENT: + continue + else: + raise + def fallback_xsync(self): logging.info('falling back to xsync mode') gconf.configinterface.set('change-detector', 'xsync') @@ -990,6 +1045,7 @@ class GMasterChangelogMixin(GMasterCommon): xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) map(self.changelog_done_func, changes) + self.archive_and_purge_changelogs(changes) self.update_worker_files_syncd() break @@ -1009,6 +1065,7 @@ class GMasterChangelogMixin(GMasterCommon): xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) map(self.changelog_done_func, changes) + self.archive_and_purge_changelogs(changes) break # it's either entry_ops() or Rsync that failed to do it's # job. Mostly it's entry_ops() [which currently has a problem @@ -1204,6 +1261,7 @@ class GMasterChangelogMixin(GMasterCommon): os.path.basename(pr)) self.changelog_done_func(pr) changes.remove(pr) + self.archive_and_purge_changelogs(processed) if changes: logging.debug('processing changes %s' % repr(changes)) @@ -1213,6 +1271,8 @@ class GMasterChangelogMixin(GMasterCommon): self.changelog_agent = changelog_agent self.sleep_interval = int(gconf.change_interval) self.changelog_done_func = self.changelog_agent.done + self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), + ".processed") class GMasterChangeloghistoryMixin(GMasterChangelogMixin): @@ -1222,6 +1282,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin): self.history_crawl_start_time = register_time self.changelog_done_func = self.changelog_agent.history_done self.history_turns = 0 + self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), + ".history/.processed") def crawl(self, no_stime_update=False): self.history_turns += 1 @@ -1314,6 +1376,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): self.sleep_interval = 60 self.tempdir = self.setup_working_dir() self.tempdir = os.path.join(self.tempdir, 'xsync') + self.processed_changelogs_dir = self.tempdir logging.info('xsync temp directory: %s' % self.tempdir) try: os.makedirs(self.tempdir) @@ -1348,6 +1411,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin): elif item[0] == 'xsync': logging.info('processing xsync changelog %s' % (item[1])) self.process([item[1]], 0) + self.archive_and_purge_changelogs([item[1]]) elif item[0] == 'stime': if not no_stime_update: # xsync is started after running history but if @@ -1367,6 +1431,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin): time.sleep(1) def write_entry_change(self, prefix, data=[]): + if not getattr(self, "fh", None): + self.open() + self.fh.write("%s %s\n" % (prefix, ' '.join(data))) def open(self): @@ -1378,7 +1445,11 @@ class GMasterXsyncMixin(GMasterChangelogMixin): raise def close(self): - self.fh.close() + if getattr(self, "fh", None): + self.fh.flush() + os.fsync(self.fh.fileno()) + self.fh.close() + self.fh = None def fname(self): return self.xsync_change @@ -1389,11 +1460,11 @@ class GMasterXsyncMixin(GMasterChangelogMixin): def sync_xsync(self, last): """schedule a processing of changelog""" self.close() - self.put('xsync', self.fname()) + if self.counter > 0: + self.put('xsync', self.fname()) self.counter = 0 if not last: time.sleep(1) # make sure changelogs are 1 second apart - self.open() def sync_stime(self, stime=None, last=False): """schedule a stime synchronization""" @@ -1427,7 +1498,6 @@ class GMasterXsyncMixin(GMasterChangelogMixin): the filesystem tree, but set after directory synchronization. """ if path == '.': - self.open() self.crawls += 1 if not xtr_root: # get the root stime and use it for all comparisons @@ -1479,7 +1549,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin): logging.warn('skipping entry %s..' % e) continue mo = st.st_mode - self.counter += 1 + self.counter += 1 if ((stat.S_ISDIR(mo) or + stat.S_ISLNK(mo) or + stat.S_ISREG(mo))) else 0 if self.counter == self.XSYNC_MAX_ENTRIES: self.sync_done(self.stimes, False) self.stimes = [] |