diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 82 | 
1 files changed, 77 insertions, 5 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 6f1b639e566..c092c526a0e 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -17,6 +17,7 @@ import logging  import socket  import string  import errno +import tarfile  from errno import ENOENT, ENODATA, EPIPE, EEXIST  from threading import Condition, Lock  from datetime import datetime @@ -533,6 +534,19 @@ class GMasterCommon(object):                      if brick_stime < cluster_stime:                          self.slave.server.set_stime(                              self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) +                        # Purge all changelogs available in processing dir +                        # less than cluster_stime +                        proc_dir = os.path.join(self.setup_working_dir(), +                                                ".processing") + +                        if os.path.exists(proc_dir): +                            to_purge = [f for f in os.listdir(proc_dir) +                                        if (f.startswith("CHANGELOG.") and +                                            int(f.split('.')[-1]) < +                                            cluster_stime[0])] +                            for f in to_purge: +                                os.remove(os.path.join(proc_dir, f)) +                  time.sleep(5)                  continue              self.update_worker_health("Active") @@ -775,6 +789,47 @@ class GMasterChangelogMixin(GMasterCommon):      CHANGELOG_LOG_LEVEL = 9      CHANGELOG_CONN_RETRIES = 5 +    def archive_and_purge_changelogs(self, changelogs): +        # Creates tar file instead of tar.gz, since changelogs will +        # be appended to existing tar. archive name is +        # archive_<YEAR><MONTH>.tar +        archive_name = "archive_%s.tar" % datetime.today().strftime( +            gconf.changelog_archive_format) + +        try: +            tar = tarfile.open(os.path.join(self.processed_changelogs_dir, +                                            archive_name), +                               "a") +        except tarfile.ReadError: +            tar = tarfile.open(os.path.join(self.processed_changelogs_dir, +                                            archive_name), +                               "w") + +        for f in changelogs: +            try: +                f = os.path.basename(f) +                tar.add(os.path.join(self.processed_changelogs_dir, f), +                        arcname=os.path.basename(f)) +            except: +                exc = sys.exc_info()[1] +                if ((isinstance(exc, OSError) or +                     isinstance(exc, IOError)) and exc.errno == ENOENT): +                    continue +                else: +                    tar.close() +                    raise +        tar.close() + +        for f in changelogs: +            try: +                f = os.path.basename(f) +                os.remove(os.path.join(self.processed_changelogs_dir, f)) +            except OSError as e: +                if e.errno == errno.ENOENT: +                    continue +                else: +                    raise +      def fallback_xsync(self):          logging.info('falling back to xsync mode')          gconf.configinterface.set('change-detector', 'xsync') @@ -990,6 +1045,7 @@ class GMasterChangelogMixin(GMasterCommon):                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl)                      map(self.changelog_done_func, changes) +                    self.archive_and_purge_changelogs(changes)                  self.update_worker_files_syncd()                  break @@ -1009,6 +1065,7 @@ class GMasterChangelogMixin(GMasterCommon):                      xtl = (int(change.split('.')[-1]) - 1, 0)                      self.upd_stime(xtl)                      map(self.changelog_done_func, changes) +                    self.archive_and_purge_changelogs(changes)                  break              # it's either entry_ops() or Rsync that failed to do it's              # job. Mostly it's entry_ops() [which currently has a problem @@ -1204,6 +1261,7 @@ class GMasterChangelogMixin(GMasterCommon):                          os.path.basename(pr))                      self.changelog_done_func(pr)                      changes.remove(pr) +                self.archive_and_purge_changelogs(processed)              if changes:                  logging.debug('processing changes %s' % repr(changes)) @@ -1213,6 +1271,8 @@ class GMasterChangelogMixin(GMasterCommon):          self.changelog_agent = changelog_agent          self.sleep_interval = int(gconf.change_interval)          self.changelog_done_func = self.changelog_agent.done +        self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), +                                                     ".processed")  class GMasterChangeloghistoryMixin(GMasterChangelogMixin): @@ -1222,6 +1282,8 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):          self.history_crawl_start_time = register_time          self.changelog_done_func = self.changelog_agent.history_done          self.history_turns = 0 +        self.processed_changelogs_dir = os.path.join(self.setup_working_dir(), +                                                     ".history/.processed")      def crawl(self, no_stime_update=False):          self.history_turns += 1 @@ -1314,6 +1376,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          self.sleep_interval = 60          self.tempdir = self.setup_working_dir()          self.tempdir = os.path.join(self.tempdir, 'xsync') +        self.processed_changelogs_dir = self.tempdir          logging.info('xsync temp directory: %s' % self.tempdir)          try:              os.makedirs(self.tempdir) @@ -1348,6 +1411,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):                  elif item[0] == 'xsync':                      logging.info('processing xsync changelog %s' % (item[1]))                      self.process([item[1]], 0) +                    self.archive_and_purge_changelogs([item[1]])                  elif item[0] == 'stime':                      if not no_stime_update:                          # xsync is started after running history but if @@ -1367,6 +1431,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin):                  time.sleep(1)      def write_entry_change(self, prefix, data=[]): +        if not getattr(self, "fh", None): +            self.open() +          self.fh.write("%s %s\n" % (prefix, ' '.join(data)))      def open(self): @@ -1378,7 +1445,11 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              raise      def close(self): -        self.fh.close() +        if getattr(self, "fh", None): +            self.fh.flush() +            os.fsync(self.fh.fileno()) +            self.fh.close() +            self.fh = None      def fname(self):          return self.xsync_change @@ -1389,11 +1460,11 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      def sync_xsync(self, last):          """schedule a processing of changelog"""          self.close() -        self.put('xsync', self.fname()) +        if self.counter > 0: +            self.put('xsync', self.fname())          self.counter = 0          if not last:              time.sleep(1)  # make sure changelogs are 1 second apart -            self.open()      def sync_stime(self, stime=None, last=False):          """schedule a stime synchronization""" @@ -1427,7 +1498,6 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          the filesystem tree, but set after directory synchronization.          """          if path == '.': -            self.open()              self.crawls += 1          if not xtr_root:              # get the root stime and use it for all comparisons @@ -1479,7 +1549,9 @@ class GMasterXsyncMixin(GMasterChangelogMixin):                  logging.warn('skipping entry %s..' % e)                  continue              mo = st.st_mode -            self.counter += 1 +            self.counter += 1 if ((stat.S_ISDIR(mo) or +                                   stat.S_ISLNK(mo) or +                                   stat.S_ISREG(mo))) else 0              if self.counter == self.XSYNC_MAX_ENTRIES:                  self.sync_done(self.stimes, False)                  self.stimes = []  | 
