diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 156 | 
1 files changed, 81 insertions, 75 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 12eadb107..cf2f7db07 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -17,7 +17,8 @@ from datetime import datetime  from gconf import gconf  from tempfile import mkdtemp, NamedTemporaryFile  from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ -                       unescape, select, gauxpfx, md5hex, selfkill, entry2pb +                       unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ +                       lstat  URXTIME = (-1, 0) @@ -380,7 +381,13 @@ class GMasterCommon(object):          return self.xtime_low(rsc.server, path, **opts)      def get_initial_crawl_data(self): -        default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0} +        # while persisting only 'files_syncd' is non-zero, rest of +        # the stats are nulls. lets keep it that way in case they +        # are needed to be used some day... +        default_data = {'files_syncd': 0, +                        'files_remaining': 0, +                        'bytes_remaining': 0, +                        'purges_remaining': 0}          if getattr(gconf, 'state_detail_file', None):              try:                  return json.load(open(gconf.state_detail_file)) @@ -393,7 +400,6 @@ class GMasterCommon(object):                      return default_data                  else:                      raise -          return default_data      def update_crawl_data(self): @@ -422,10 +428,9 @@ class GMasterCommon(object):          self.crawls = 0          self.turns = 0          self.total_turns = int(gconf.turns) +        self.crawl_start = datetime.now()          self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} -        self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0, -                            'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0} -        self.total_crawl_stats = self.get_initial_crawl_data() +        self.total_crawl_stats = None          self.start = None          self.change_seen = None          # the authoritative (foreign, native) volinfo pair @@ -491,9 +496,8 @@ class GMasterCommon(object):              # for a passive gsyncd (ie. in a replicate scenario)              # the keepalive thread would keep the connection alive.              self.init_keep_alive() +	self.total_crawl_stats = self.get_initial_crawl_data()          self.lastreport['time'] = time.time() -        self.crawl_stats['crawl_starttime'] = datetime.now() -          logging.info('crawl interval: %d seconds' % self.sleep_interval)          t0 = time.time()          crawl = self.should_crawl() @@ -556,17 +560,13 @@ class GMasterCommon(object):          return ts      def get_extra_info(self): -        str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced']) -        str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced']) - -        self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime'] - -        str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time'])) -        str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time']) -        str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time']) -        str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced']) -        str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced']) -        str_info += "\0" +        str_info = '\nUptime=%s;FilesSyncd=%d;FilesPending=%d;BytesPending=%d;DeletesPending=%d;' % \ +            (self._crawl_time_format(datetime.now() - self.crawl_start), \ +                 self.total_crawl_stats['files_syncd'], \ +                 self.total_crawl_stats['files_remaining'], \ +                 self.total_crawl_stats['bytes_remaining'], \ +                 self.total_crawl_stats['purges_remaining']) +        str_info += '\0'          logging.debug(str_info)          return str_info @@ -788,15 +788,11 @@ class GMasterChangelogMixin(GMasterCommon):          logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile))          return (workdir, logfile) -    def lstat(self, e): -        try: -            return os.lstat(e) -        except (IOError, OSError): -            ex = sys.exc_info()[1] -            if ex.errno == ENOENT: -                return ex.errno -            else: -                raise +    # update stats from *this* crawl +    def update_cumulative_stats(self, files_pending): +        self.total_crawl_stats['files_remaining']  = files_pending['count'] +        self.total_crawl_stats['bytes_remaining']  = files_pending['bytes'] +        self.total_crawl_stats['purges_remaining'] = files_pending['purge']      # sync data      def syncdata(self, datas): @@ -804,43 +800,31 @@ class GMasterChangelogMixin(GMasterCommon):          for data in datas:              logging.debug('candidate for syncing %s' % data)              pb = self.syncer.add(data) -            timeA = datetime.now()              def regjob(se, xte, pb):                  rv = pb.wait()                  if rv[0]:                      logging.debug('synced ' + se) -                    # update stats -                    timeB = datetime.now() -                    self.crawl_stats['last_synctime'] = timeB - timeA -                    self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) -                    self.crawl_stats['files_synced'] += 1 -                    self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced - -                    # cumulative statistics -                    self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced -                    self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) -                    self.total_crawl_stats['files_synced'] += 1                      return True                  else:                      if rv[1] in [23, 24]:                          # stat to check if the file exist -                        st = self.lstat(se) +                        st = lstat(se)                          if isinstance(st, int):                              # file got unlinked in the interim                              return True                      logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1]))              self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb)          if self.wait(self.FLAT_DIR_HIERARCHY, None): -            self.update_crawl_data()              return True -    def process_change(self, change, done): +    def process_change(self, change, done, retry): +        pfx = gauxpfx()          clist   = []          entries = [] -        purges = set() -        links = set()          datas = set() -        pfx = gauxpfx() + +        # basic crawl stats: files and bytes +        files_pending  = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}          try:              f = open(change, "r")              clist = f.readlines() @@ -861,6 +845,27 @@ class GMasterChangelogMixin(GMasterCommon):                  else:                      dct[k] = ed[k]              return dct + +        # regular file update: bytes & count +        def _update_reg(entry, size): +            if not entry in files_pending['files']: +                files_pending['count'] += 1 +                files_pending['bytes'] += size +                files_pending['files'].append(entry) +        # updates for directories, symlinks etc.. +        def _update_rest(): +            files_pending['count'] += 1 + +        # entry count +        def entry_update(entry, size, mode): +            if stat.S_ISREG(mode): +                _update_reg(entry, size) +            else: +                _update_rest() +        # purge count +        def purge_update(): +            files_pending['purge'] += 1 +          for e in clist:              e = e.strip()              et = e[self.IDX_START:self.IDX_END] @@ -871,20 +876,19 @@ class GMasterChangelogMixin(GMasterCommon):                  gfid = ec[self.POS_GFID]                  # definitely need a better way bucketize entry ops                  if ty in ['UNLINK', 'RMDIR']: -                  entries.append(edct(ty, gfid=gfid, entry=en)) -                  purges.update([os.path.join(pfx, gfid)]) -                  continue -                if not ty == 'RENAME': -                    go = os.path.join(pfx, gfid) -                    st = self.lstat(go) -                    if isinstance(st, int): -                        logging.debug('file %s got purged in the interim' % go) -                        continue +                    purge_update() +                    entries.append(edct(ty, gfid=gfid, entry=en)) +                    continue +                go = os.path.join(pfx, gfid) +                st = lstat(go) +                if isinstance(st, int): +                    logging.debug('file %s got purged in the interim' % go) +                    continue +                entry_update(go, st.st_size, st.st_mode)                  if ty in ['CREATE', 'MKDIR', 'MKNOD']:                      entries.append(edct(ty, stat=st, entry=en, gfid=gfid))                  elif ty == 'LINK':                      entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) -                    links.update([os.path.join(pfx, gfid)])                  elif ty == 'SYMLINK':                      entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en)))                  elif ty == 'RENAME': @@ -893,30 +897,41 @@ class GMasterChangelogMixin(GMasterCommon):                  else:                      pass              elif et in self.TYPE_GFID: -                da = os.path.join(pfx, ec[0]) -                st = self.lstat(da) +                go = os.path.join(pfx, ec[0]) +                st = lstat(go)                  if isinstance(st, int): -                    logging.debug('file %s got purged in the interim' % da) +                    logging.debug('file %s got purged in the interim' % go)                      continue -                datas.update([da]) +                entry_update(go, st.st_size, st.st_mode) +                datas.update([go])          logging.debug('entries: %s' % repr(entries)) +        if not retry: +            self.update_cumulative_stats(files_pending)          # sync namespace          if (entries):              self.slave.server.entry_ops(entries)          # sync data -        if self.syncdata(datas - (purges - links)): +        if self.syncdata(datas):              if done:                  self.master.server.changelog_done(change)              return True +    def sync_done(self): +        self.total_crawl_stats['files_syncd'] += self.total_crawl_stats['files_remaining'] +        self.total_crawl_stats['files_remaining']  = 0 +        self.total_crawl_stats['bytes_remaining']  = 0 +        self.total_crawl_stats['purges_remaining'] = 0 +        self.update_crawl_data() +      def process(self, changes, done=1):          for change in changes: -            times = 0 +            retry = False              while True: -                times += 1 -                logging.debug('processing change %s [%d time(s)]' % (change, times)) -                if self.process_change(change, done): +                logging.debug('processing change %s' % change) +                if self.process_change(change, done, retry): +                    self.sync_done()                      break +                retry = True                  # it's either entry_ops() or Rsync that failed to do it's                  # job. Mostly it's entry_ops() [which currently has a problem                  # of failing to create an entry but failing to return an errno] @@ -1032,7 +1047,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          for e in dem:              bname = e              e = os.path.join(path, e) -            st = self.lstat(e) +            st = lstat(e)              if isinstance(st, int):                  logging.warn('%s got purged in the interim..' % e)                  continue @@ -1191,19 +1206,10 @@ class GMasterXtimeMixin(GMasterCommon):              elif stat.S_ISREG(mo):                  logging.debug("syncing %s ..." % e)                  pb = self.syncer.add(e) -                timeA = datetime.now()                  def regjob(e, xte, pb):                      if pb.wait()[0]:                          logging.debug("synced " + e)                          self.sendmark_regular(e, xte) -                        # update stats -                        timeB = datetime.now() -                        self.crawl_stats['last_synctime'] = timeB - timeA -                        self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) -                        self.crawl_stats['files_synced'] += 1 -                        self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) -                        self.total_crawl_stats['files_synced'] += 1 -                        self.update_crawl_data()                          return True                      else:                          logging.warn("failed to sync " + e)  | 
