diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 384 | 
1 files changed, 234 insertions, 150 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 98a61bc1d75..4301396f9f4 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -1,25 +1,30 @@ +# +# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com> +# This file is part of GlusterFS. + +# This file is licensed to you under your choice of the GNU Lesser +# General Public License, version 3 or any later version (LGPLv3 or +# later), or the GNU General Public License, version 2 (GPLv2), in all +# cases as published by the Free Software Foundation. +# +  import os  import sys  import time  import stat -import random -import signal  import json  import logging  import socket  import string  import errno -from shutil import copyfileobj -from errno import ENOENT, ENODATA, EPIPE, EEXIST, errorcode -from threading import currentThread, Condition, Lock +from errno import ENOENT, ENODATA, EPIPE, EEXIST +from threading import Condition, Lock  from datetime import datetime -from libcxattr import Xattr -  from gconf import gconf -from tempfile import mkdtemp, NamedTemporaryFile -from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ -                       unescape, select, gauxpfx, md5hex, selfkill, entry2pb, \ -                       lstat, errno_wrap, update_file +from tempfile import NamedTemporaryFile +from syncdutils import Thread, GsyncdError, boolify, escape +from syncdutils import unescape, select, gauxpfx, md5hex, selfkill +from syncdutils import lstat, errno_wrap  URXTIME = (-1, 0) @@ -27,18 +32,20 @@ URXTIME = (-1, 0)  # of the DRY principle (no, don't look for elevated or  # perspectivistic things here) +  def _xtime_now():      t = time.time()      sec = int(t)      nsec = int((t - sec) * 1000000)      return (sec, nsec) +  def _volinfo_hook_relax_foreign(self):      volinfo_sys = self.get_sys_volinfo()      fgn_vi = volinfo_sys[self.KFGN]      if fgn_vi:          expiry = fgn_vi['timeout'] - int(time.time()) + 1 -        logging.info('foreign volume info found, waiting %d sec for expiry' % \ +        logging.info('foreign volume info found, waiting %d sec for expiry' %                       expiry)          time.sleep(expiry)          volinfo_sys = self.get_sys_volinfo() @@ -58,10 +65,14 @@ def gmaster_builder(excrawl=None):      logging.info('setting up %s change detection mode' % changemixin)      modemixin = getattr(this, modemixin.capitalize() + 'Mixin')      crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin') -    sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin -    purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin +    sendmarkmixin = boolify( +        gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin +    purgemixin = boolify( +        gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin      syncengine = boolify(gconf.use_tarssh) and TarSSHEngine or RsyncEngine -    class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin, syncengine): + +    class _GMaster(crawlmixin, modemixin, sendmarkmixin, +                   purgemixin, syncengine):          pass      return _GMaster @@ -71,6 +82,7 @@ def gmaster_builder(excrawl=None):  # sync modes  class NormalMixin(object): +      """normal geo-rep behavior"""      minus_infinity = URXTIME @@ -152,14 +164,18 @@ class NormalMixin(object):          self.slave.server.set_stime(path, self.uuid, mark)          # self.slave.server.set_xtime_remote(path, self.uuid, mark) +  class PartialMixin(NormalMixin): +      """a variant tuned towards operation with a master         that has partial info of the slave (brick typically)"""      def xtime_reversion_hook(self, path, xtl, xtr):          pass +  class RecoverMixin(NormalMixin): +      """a variant that differs from normal in terms         of ignoring non-indexed files""" @@ -178,11 +194,13 @@ class RecoverMixin(NormalMixin):  # Further mixins for certain tunable behaviors +  class SendmarkNormalMixin(object):      def sendmark_regular(self, *a, **kw):          return self.sendmark(*a, **kw) +  class SendmarkRsyncMixin(object):      def sendmark_regular(self, *a, **kw): @@ -194,19 +212,24 @@ class PurgeNormalMixin(object):      def purge_missing(self, path, names):          self.slave.server.purge(path, names) +  class PurgeNoopMixin(object):      def purge_missing(self, path, names):          pass +  class TarSSHEngine(object): +      """Sync engine that uses tar(1) piped over ssh(1)         for data transfers. Good for lots of small files.      """ +      def a_syncdata(self, files):          logging.debug('files: %s' % (files))          for f in files:              pb = self.syncer.add(f) +              def regjob(se, xte, pb):                  rv = pb.wait()                  if rv[0]: @@ -228,13 +251,17 @@ class TarSSHEngine(object):          self.a_syncdata(files)          self.syncdata_wait() +  class RsyncEngine(object): +      """Sync engine that uses rsync(1) for data transfers""" +      def a_syncdata(self, files):          logging.debug('files: %s' % (files))          for f in files:              logging.debug('candidate for syncing %s' % f)              pb = self.syncer.add(f) +              def regjob(se, xte, pb):                  rv = pb.wait()                  if rv[0]: @@ -258,7 +285,9 @@ class RsyncEngine(object):          self.a_syncdata(files)          self.syncdata_wait() +  class GMasterCommon(object): +      """abstract class impementling master role"""      KFGN = 0 @@ -269,8 +298,9 @@ class GMasterCommon(object):          err out on multiple foreign masters          """ -        fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \ -                          self.master.server.aggregated.native_volume_info() +        fgn_vis, nat_vi = ( +            self.master.server.aggregated.foreign_volume_infos(), +            self.master.server.aggregated.native_volume_info())          fgn_vi = None          if fgn_vis:              if len(fgn_vis) > 1: @@ -316,15 +346,14 @@ class GMasterCommon(object):          if getattr(gconf, 'state_detail_file', None):              try:                  with open(gconf.state_detail_file, 'r+') as f: -                    loaded_data= json.load(f) -                    diff_data = set(default_data) - set (loaded_data) +                    loaded_data = json.load(f) +                    diff_data = set(default_data) - set(loaded_data)                      if len(diff_data):                          for i in diff_data:                              loaded_data[i] = default_data[i]                      return loaded_data -            except (IOError): -                ex = sys.exc_info()[1] -                logging.warn ('Creating new gconf.state_detail_file.') +            except IOError: +                logging.warn('Creating new gconf.state_detail_file.')                  # Create file with initial data                  try:                      with open(gconf.state_detail_file, 'wb') as f: @@ -364,7 +393,8 @@ class GMasterCommon(object):          # - self.turns is the number of turns since start          # - self.total_turns is a limit so that if self.turns reaches it, then          #   we exit (for diagnostic purposes) -        # so, eg., if the master fs changes unceasingly, self.turns will remain 0. +        # so, eg., if the master fs changes unceasingly, self.turns will remain +        # 0.          self.crawls = 0          self.turns = 0          self.total_turns = int(gconf.turns) @@ -394,7 +424,7 @@ class GMasterCommon(object):              t.start()      def should_crawl(cls): -        return (gconf.glusterd_uuid in cls.master.server.node_uuid()) +        return gconf.glusterd_uuid in cls.master.server.node_uuid()      def register(self):          self.register() @@ -416,18 +446,18 @@ class GMasterCommon(object):          volinfo_sys = self.volinfo_hook()          self.volinfo = volinfo_sys[self.KNAT]          inter_master = volinfo_sys[self.KFGN] -        logging.info("%s master with volume id %s ..." % \ -                         (inter_master and "intermediate" or "primary", -                          self.uuid)) +        logging.info("%s master with volume id %s ..." % +                     (inter_master and "intermediate" or "primary", +                      self.uuid))          gconf.configinterface.set('volume_id', self.uuid)          if self.volinfo:              if self.volinfo['retval']: -                logging.warn("master cluster's info may not be valid %d" % \ +                logging.warn("master cluster's info may not be valid %d" %                               self.volinfo['retval'])              self.start_checkpoint_thread()          else:              raise GsyncdError("master volinfo unavailable") -	self.total_crawl_stats = self.get_initial_crawl_data() +        self.total_crawl_stats = self.get_initial_crawl_data()          self.lastreport['time'] = time.time()          logging.info('crawl interval: %d seconds' % self.sleep_interval) @@ -435,7 +465,7 @@ class GMasterCommon(object):          crawl = self.should_crawl()          while not self.terminate:              if self.start: -                logging.debug("... crawl #%d done, took %.6f seconds" % \ +                logging.debug("... crawl #%d done, took %.6f seconds" %                                (self.crawls, time.time() - self.start))              self.start = time.time()              should_display_info = self.start - self.lastreport['time'] >= 60 @@ -443,11 +473,11 @@ class GMasterCommon(object):                  logging.info("%d crawls, %d turns",                               self.crawls - self.lastreport['crawls'],                               self.turns - self.lastreport['turns']) -                self.lastreport.update(crawls = self.crawls, -                                       turns = self.turns, -                                       time = self.start) +                self.lastreport.update(crawls=self.crawls, +                                       turns=self.turns, +                                       time=self.start)              t1 = time.time() -            if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds +            if int(t1 - t0) >= 60:  # lets hardcode this check to 60 seconds                  crawl = self.should_crawl()                  t0 = t1              self.update_worker_remote_node() @@ -456,11 +486,14 @@ class GMasterCommon(object):                  # bring up _this_ brick to the cluster stime                  # which is min of cluster (but max of the replicas)                  brick_stime = self.xtime('.', self.slave) -                cluster_stime = self.master.server.aggregated.stime_mnt('.', '.'.join([str(self.uuid), str(gconf.slave_id)])) -                logging.debug("Cluster stime: %s | Brick stime: %s" % (repr(cluster_stime), repr(brick_stime))) +                cluster_stime = self.master.server.aggregated.stime_mnt( +                    '.', '.'.join([str(self.uuid), str(gconf.slave_id)])) +                logging.debug("Cluster stime: %s | Brick stime: %s" % +                              (repr(cluster_stime), repr(brick_stime)))                  if not isinstance(cluster_stime, int):                      if brick_stime < cluster_stime: -                        self.slave.server.set_stime(self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime) +                        self.slave.server.set_stime( +                            self.FLAT_DIR_HIERARCHY, self.uuid, cluster_stime)                  time.sleep(5)                  continue              self.update_worker_health("Active") @@ -489,13 +522,14 @@ class GMasterCommon(object):             with checkpoint @chkpt"""          if xtimish:              val = cls.serialize_xtime(val) -        gconf.configinterface.set('checkpoint_' + prm, "%s:%s" % (escape(chkpt), val)) +        gconf.configinterface.set( +            'checkpoint_' + prm, "%s:%s" % (escape(chkpt), val))      @staticmethod      def humantime(*tpair):          """format xtime-like (sec, nsec) pair to human readable format"""          ts = datetime.fromtimestamp(float('.'.join(str(n) for n in tpair))).\ -               strftime("%Y-%m-%d %H:%M:%S") +            strftime("%Y-%m-%d %H:%M:%S")          if len(tpair) > 1:              ts += '.' + str(tpair[1])          return ts @@ -506,7 +540,7 @@ class GMasterCommon(object):          years = int(years)          days = int(days) -        date="" +        date = ""          m, s = divmod(crawl_time.seconds, 60)          h, m = divmod(m, 60) @@ -515,7 +549,8 @@ class GMasterCommon(object):          if days != 0:              date += "%s %s " % (days, "day" if days == 1 else "days") -        date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2)) +        date += "%s:%s:%s" % (string.zfill(h, 2), +                              string.zfill(m, 2), string.zfill(s, 2))          return date      def checkpt_service(self, chan, chkpt): @@ -540,16 +575,18 @@ class GMasterCommon(object):              if not checkpt_tgt:                  checkpt_tgt = self.xtime('.')                  if isinstance(checkpt_tgt, int): -                    raise GsyncdError("master root directory is unaccessible (%s)", +                    raise GsyncdError("master root directory is " +                                      "unaccessible (%s)",                                        os.strerror(checkpt_tgt))                  self._set_checkpt_param(chkpt, 'target', checkpt_tgt) -            logging.debug("checkpoint target %s has been determined for checkpoint %s" % \ +            logging.debug("checkpoint target %s has been determined " +                          "for checkpoint %s" %                            (repr(checkpt_tgt), chkpt))              # check if the label is 'now'              chkpt_lbl = chkpt              try: -                x1,x2 = chkpt.split(':') +                x1, x2 = chkpt.split(':')                  if x1 == 'now':                      chkpt_lbl = "as of " + self.humantime(x2)              except: @@ -557,41 +594,46 @@ class GMasterCommon(object):              completed = self._checkpt_param(chkpt, 'completed', xtimish=False)              if completed:                  completed = tuple(int(x) for x in completed.split('.')) -            s,_,_ = select([chan], [], [], (not completed) and 5 or None) +            s, _, _ = select([chan], [], [], (not completed) and 5 or None)              # either request made and we re-check to not              # give back stale data, or we still hunting for completion -            if self.native_xtime(checkpt_tgt) and self.native_xtime(checkpt_tgt) < self.volmark: +            if (self.native_xtime(checkpt_tgt) and ( +                    self.native_xtime(checkpt_tgt) < self.volmark)):                  # indexing has been reset since setting the checkpoint                  status = "is invalid"              else:                  xtr = self.xtime('.', self.slave)                  if isinstance(xtr, int): -                    raise GsyncdError("slave root directory is unaccessible (%s)", +                    raise GsyncdError("slave root directory is " +                                      "unaccessible (%s)",                                        os.strerror(xtr))                  ncompleted = self.xtime_geq(xtr, checkpt_tgt) -                if completed and not ncompleted: # stale data -                    logging.warn("completion time %s for checkpoint %s became stale" % \ +                if completed and not ncompleted:  # stale data +                    logging.warn("completion time %s for checkpoint %s " +                                 "became stale" %                                   (self.humantime(*completed), chkpt))                      completed = None                      gconf.configinterface.delete('checkpoint_completed') -                if ncompleted and not completed: # just reaching completion +                if ncompleted and not completed:  # just reaching completion                      completed = "%.6f" % time.time() -                    self._set_checkpt_param(chkpt, 'completed', completed, xtimish=False) +                    self._set_checkpt_param( +                        chkpt, 'completed', completed, xtimish=False)                      completed = tuple(int(x) for x in completed.split('.'))                      logging.info("checkpoint %s completed" % chkpt)                  status = completed and \ -                  "completed at " + self.humantime(completed[0]) or \ -                  "not reached yet" +                    "completed at " + self.humantime(completed[0]) or \ +                    "not reached yet"              if s:                  conn = None                  try:                      conn, _ = chan.accept()                      try: -                        conn.send("checkpoint %s is %s\0" % (chkpt_lbl, status)) +                        conn.send("checkpoint %s is %s\0" % +                                  (chkpt_lbl, status))                      except:                          exc = sys.exc_info()[1] -                        if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ -                           exc.errno == EPIPE: +                        if ((isinstance(exc, OSError) or isinstance( +                                exc, IOError)) and exc.errno == EPIPE):                              logging.debug('checkpoint client disconnected')                          else:                              raise @@ -602,11 +644,13 @@ class GMasterCommon(object):      def start_checkpoint_thread(self):          """prepare and start checkpoint service"""          if self.checkpoint_thread or not ( -          getattr(gconf, 'state_socket_unencoded', None) and getattr(gconf, 'socketdir', None) +            getattr(gconf, 'state_socket_unencoded', None) and getattr( +                gconf, 'socketdir', None)          ):              return          chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) -        state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket") +        state_socket = os.path.join( +            gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")          try:              os.unlink(state_socket)          except: @@ -621,9 +665,9 @@ class GMasterCommon(object):      def add_job(self, path, label, job, *a, **kw):          """insert @job function to job table at @path with @label""" -        if self.jobtab.get(path) == None: +        if self.jobtab.get(path) is None:              self.jobtab[path] = [] -        self.jobtab[path].append((label, a, lambda : job(*a, **kw))) +        self.jobtab[path].append((label, a, lambda: job(*a, **kw)))      def add_failjob(self, path, label):          """invoke .add_job with a job that does nothing just fails""" @@ -644,7 +688,7 @@ class GMasterCommon(object):              ret = j[-1]()              if not ret:                  succeed = False -        if succeed and not args[0] == None: +        if succeed and not args[0] is None:              self.sendmark(path, *args)          return succeed @@ -657,19 +701,21 @@ class GMasterCommon(object):              self.slave.server.setattr(path, adct)          self.set_slave_xtime(path, mark) +  class GMasterChangelogMixin(GMasterCommon): +      """ changelog based change detection and syncing """      # index for change type and entry      IDX_START = 0 -    IDX_END   = 2 +    IDX_END = 2 -    POS_GFID   = 0 -    POS_TYPE   = 1 +    POS_GFID = 0 +    POS_TYPE = 1      POS_ENTRY1 = -1 -    TYPE_META  = "M " -    TYPE_GFID  = "D " +    TYPE_META = "M " +    TYPE_GFID = "D "      TYPE_ENTRY = "E "      # flat directory heirarchy for gfid based access @@ -686,18 +732,19 @@ class GMasterChangelogMixin(GMasterCommon):      def setup_working_dir(self):          workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path))          logfile = os.path.join(workdir, 'changes.log') -        logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) +        logging.debug('changelog working dir %s (log: %s)' % +                      (workdir, logfile))          return (workdir, logfile)      def process_change(self, change, done, retry):          pfx = gauxpfx() -        clist   = [] +        clist = []          entries = []          meta_gfid = set()          datas = set()          # basic crawl stats: files and bytes -        files_pending  = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []} +        files_pending = {'count': 0, 'purge': 0, 'bytes': 0, 'files': []}          try:              f = open(change, "r")              clist = f.readlines() @@ -750,17 +797,19 @@ class GMasterChangelogMixin(GMasterCommon):                  elif ty in ['CREATE', 'MKDIR', 'MKNOD']:                      entry_update()                      # stat information present in the changelog itself -                    entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]),\ +                    entries.append(edct(ty, gfid=gfid, entry=en, +                                        mode=int(ec[2]),                                          uid=int(ec[3]), gid=int(ec[4])))                  else:                      # stat() to get mode and other information                      go = os.path.join(pfx, gfid)                      st = lstat(go)                      if isinstance(st, int): -                        if ty == 'RENAME': # special hack for renames... +                        if ty == 'RENAME':  # special hack for renames...                              entries.append(edct('UNLINK', gfid=gfid, entry=en))                          else: -                            logging.debug('file %s got purged in the interim' % go) +                            logging.debug( +                                'file %s got purged in the interim' % go)                          continue                      if ty == 'LINK': @@ -771,17 +820,20 @@ class GMasterChangelogMixin(GMasterCommon):                          if isinstance(rl, int):                              continue                          entry_update() -                        entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) +                        entries.append( +                            edct(ty, stat=st, entry=en, gfid=gfid, link=rl))                      elif ty == 'RENAME':                          entry_update() -                        e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) -                        entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) +                        e1 = unescape( +                            os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) +                        entries.append( +                            edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st))                      else:                          logging.warn('ignoring %s [op %s]' % (gfid, ty))              elif et == self.TYPE_GFID:                  datas.add(os.path.join(pfx, ec[0]))              elif et == self.TYPE_META: -                if ec[1] == 'SETATTR': # only setattr's for now... +                if ec[1] == 'SETATTR':  # only setattr's for now...                      meta_gfid.add(os.path.join(pfx, ec[0]))              else:                  logging.warn('got invalid changelog type: %s' % (et)) @@ -789,10 +841,10 @@ class GMasterChangelogMixin(GMasterCommon):          if not retry:              self.update_worker_cumilitive_status(files_pending)          # sync namespace -        if (entries): +        if entries:              self.slave.server.entry_ops(entries)          # sync metadata -        if (meta_gfid): +        if meta_gfid:              meta_entries = []              for go in meta_gfid:                  st = lstat(go) @@ -814,22 +866,25 @@ class GMasterChangelogMixin(GMasterCommon):              self.skipped_gfid_list = []              self.current_files_skipped_count = 0 -            # first, fire all changelog transfers in parallel. entry and metadata -            # are performed synchronously, therefore in serial. However at the end -            # of each changelog, data is synchronized with syncdata_async() - which -            # means it is serial w.r.t entries/metadata of that changelog but -            # happens in parallel with data of other changelogs. +            # first, fire all changelog transfers in parallel. entry and +            # metadata are performed synchronously, therefore in serial. +            # However at the end of each changelog, data is synchronized +            # with syncdata_async() - which means it is serial w.r.t +            # entries/metadata of that changelog but happens in parallel +            # with data of other changelogs.              for change in changes:                  logging.debug('processing change %s' % change)                  self.process_change(change, done, retry)                  if not retry: -                    self.turns += 1 # number of changelogs processed in the batch +                    # number of changelogs processed in the batch +                    self.turns += 1 -            # Now we wait for all the data transfers fired off in the above step -            # to complete. Note that this is not ideal either. Ideally we want to -            # trigger the entry/meta-data transfer of the next batch while waiting -            # for the data transfer of the current batch to finish. +            # Now we wait for all the data transfers fired off in the above +            # step to complete. Note that this is not ideal either. Ideally +            # we want to trigger the entry/meta-data transfer of the next +            # batch while waiting for the data transfer of the current batch +            # to finish.              # Note that the reason to wait for the data transfer (vs doing it              # completely in the background and call the changelog_done() @@ -837,10 +892,11 @@ class GMasterChangelogMixin(GMasterCommon):              # and prevents a spiraling increase of wait stubs from consuming              # unbounded memory and resources. -            # update the slave's time with the timestamp of the _last_ changelog -            # file time suffix. Since, the changelog prefix time is the time when -            # the changelog was rolled over, introduce a tolerence of 1 second to -            # counter the small delta b/w the marker update and gettimeofday(). +            # update the slave's time with the timestamp of the _last_ +            # changelog file time suffix. Since, the changelog prefix time +            # is the time when the changelog was rolled over, introduce a +            # tolerence of 1 second to counter the small delta b/w the +            # marker update and gettimeofday().              # NOTE: this is only for changelog mode, not xsync.              # @change is the last changelog (therefore max time for this batch) @@ -856,10 +912,13 @@ class GMasterChangelogMixin(GMasterCommon):              retry = True              tries += 1              if tries == self.MAX_RETRIES: -                logging.warn('changelogs %s could not be processed - moving on...' % \ +                logging.warn('changelogs %s could not be processed - ' +                             'moving on...' %                               ' '.join(map(os.path.basename, changes))) -                self.update_worker_total_files_skipped(self.current_files_skipped_count) -                logging.warn('SKIPPED GFID = %s' % ','.join(self.skipped_gfid_list)) +                self.update_worker_total_files_skipped( +                    self.current_files_skipped_count) +                logging.warn('SKIPPED GFID = %s' % +                             ','.join(self.skipped_gfid_list))                  self.update_worker_files_syncd()                  if done:                      xtl = (int(change.split('.')[-1]) - 1, 0) @@ -873,7 +932,7 @@ class GMasterChangelogMixin(GMasterCommon):              # entry_ops() that failed... so we retry the _whole_ changelog              # again.              # TODO: remove entry retries when it's gets fixed. -            logging.warn('incomplete sync, retrying changelogs: %s' % \ +            logging.warn('incomplete sync, retrying changelogs: %s' %                           ' '.join(map(os.path.basename, changes)))              time.sleep(0.5) @@ -884,15 +943,15 @@ class GMasterChangelogMixin(GMasterCommon):              self.sendmark(path, stime)      def get_worker_status_file(self): -        file_name = gconf.local_path+'.status' +        file_name = gconf.local_path + '.status'          file_name = file_name.replace("/", "_") -        worker_status_file = gconf.georep_session_working_dir+file_name +        worker_status_file = gconf.georep_session_working_dir + file_name          return worker_status_file      def update_worker_status(self, key, value): -        default_data = {"remote_node":"N/A", -                        "worker status":"Not Started", -                        "crawl status":"N/A", +        default_data = {"remote_node": "N/A", +                        "worker status": "Not Started", +                        "crawl status": "N/A",                          "files_syncd": 0,                          "files_remaining": 0,                          "bytes_remaining": 0, @@ -909,7 +968,7 @@ class GMasterChangelogMixin(GMasterCommon):                  f.flush()                  os.fsync(f.fileno())          except (IOError, OSError, ValueError): -            logging.info ('Creating new %s' % worker_status_file) +            logging.info('Creating new %s' % worker_status_file)              try:                  with open(worker_status_file, 'wb') as f:                      default_data[key] = value @@ -920,9 +979,9 @@ class GMasterChangelogMixin(GMasterCommon):                  raise      def update_worker_cumilitive_status(self, files_pending): -        default_data = {"remote_node":"N/A", -                        "worker status":"Not Started", -                        "crawl status":"N/A", +        default_data = {"remote_node": "N/A", +                        "worker status": "Not Started", +                        "crawl status": "N/A",                          "files_syncd": 0,                          "files_remaining": 0,                          "bytes_remaining": 0, @@ -932,8 +991,8 @@ class GMasterChangelogMixin(GMasterCommon):          try:              with open(worker_status_file, 'r+') as f:                  loaded_data = json.load(f) -                loaded_data['files_remaining']  = files_pending['count'] -                loaded_data['bytes_remaining']  = files_pending['bytes'] +                loaded_data['files_remaining'] = files_pending['count'] +                loaded_data['bytes_remaining'] = files_pending['bytes']                  loaded_data['purges_remaining'] = files_pending['purge']                  os.ftruncate(f.fileno(), 0)                  os.lseek(f.fileno(), 0, os.SEEK_SET) @@ -941,11 +1000,11 @@ class GMasterChangelogMixin(GMasterCommon):                  f.flush()                  os.fsync(f.fileno())          except (IOError, OSError, ValueError): -            logging.info ('Creating new %s' % worker_status_file) +            logging.info('Creating new %s' % worker_status_file)              try:                  with open(worker_status_file, 'wb') as f: -                    default_data['files_remaining']  = files_pending['count'] -                    default_data['bytes_remaining']  = files_pending['bytes'] +                    default_data['files_remaining'] = files_pending['count'] +                    default_data['bytes_remaining'] = files_pending['bytes']                      default_data['purges_remaining'] = files_pending['purge']                      json.dump(default_data, f)                      f.flush() @@ -953,24 +1012,24 @@ class GMasterChangelogMixin(GMasterCommon):              except:                  raise -    def update_worker_remote_node (self): +    def update_worker_remote_node(self):          node = sys.argv[-1]          node = node.split("@")[-1]          remote_node_ip = node.split(":")[0]          remote_node_vol = node.split(":")[3]          remote_node = remote_node_ip + '::' + remote_node_vol -        self.update_worker_status ('remote_node', remote_node) +        self.update_worker_status('remote_node', remote_node) -    def update_worker_health (self, state): -        self.update_worker_status ('worker status', state) +    def update_worker_health(self, state): +        self.update_worker_status('worker status', state) -    def update_worker_crawl_status (self, state): -        self.update_worker_status ('crawl status', state) +    def update_worker_crawl_status(self, state): +        self.update_worker_status('crawl status', state) -    def update_worker_files_syncd (self): -        default_data = {"remote_node":"N/A", -                        "worker status":"Not Started", -                        "crawl status":"N/A", +    def update_worker_files_syncd(self): +        default_data = {"remote_node": "N/A", +                        "worker status": "Not Started", +                        "crawl status": "N/A",                          "files_syncd": 0,                          "files_remaining": 0,                          "bytes_remaining": 0, @@ -981,8 +1040,8 @@ class GMasterChangelogMixin(GMasterCommon):              with open(worker_status_file, 'r+') as f:                  loaded_data = json.load(f)                  loaded_data['files_syncd'] += loaded_data['files_remaining'] -                loaded_data['files_remaining']  = 0 -                loaded_data['bytes_remaining']  = 0 +                loaded_data['files_remaining'] = 0 +                loaded_data['bytes_remaining'] = 0                  loaded_data['purges_remaining'] = 0                  os.ftruncate(f.fileno(), 0)                  os.lseek(f.fileno(), 0, os.SEEK_SET) @@ -990,7 +1049,7 @@ class GMasterChangelogMixin(GMasterCommon):                  f.flush()                  os.fsync(f.fileno())          except (IOError, OSError, ValueError): -            logging.info ('Creating new %s' % worker_status_file) +            logging.info('Creating new %s' % worker_status_file)              try:                  with open(worker_status_file, 'wb') as f:                      json.dump(default_data, f) @@ -999,19 +1058,19 @@ class GMasterChangelogMixin(GMasterCommon):              except:                  raise -    def update_worker_files_remaining (self, state): -        self.update_worker_status ('files_remaining', state) +    def update_worker_files_remaining(self, state): +        self.update_worker_status('files_remaining', state) -    def update_worker_bytes_remaining (self, state): -        self.update_worker_status ('bytes_remaining', state) +    def update_worker_bytes_remaining(self, state): +        self.update_worker_status('bytes_remaining', state) -    def update_worker_purges_remaining (self, state): -        self.update_worker_status ('purges_remaining', state) +    def update_worker_purges_remaining(self, state): +        self.update_worker_status('purges_remaining', state) -    def update_worker_total_files_skipped (self, value): -        default_data = {"remote_node":"N/A", -                        "worker status":"Not Started", -                        "crawl status":"N/A", +    def update_worker_total_files_skipped(self, value): +        default_data = {"remote_node": "N/A", +                        "worker status": "Not Started", +                        "crawl status": "N/A",                          "files_syncd": 0,                          "files_remaining": 0,                          "bytes_remaining": 0, @@ -1029,7 +1088,7 @@ class GMasterChangelogMixin(GMasterCommon):                  f.flush()                  os.fsync(f.fileno())          except (IOError, OSError, ValueError): -            logging.info ('Creating new %s' % worker_status_file) +            logging.info('Creating new %s' % worker_status_file)              try:                  with open(worker_status_file, 'wb') as f:                      default_data['total_files_skipped'] = value @@ -1057,9 +1116,12 @@ class GMasterChangelogMixin(GMasterCommon):          if changes:              if purge_time:                  logging.info("slave's time: %s" % repr(purge_time)) -                processed = [x for x in changes if int(x.split('.')[-1]) < purge_time[0]] +                processed = [x for x in changes +                             if int(x.split('.')[-1]) < purge_time[0]]                  for pr in processed: -                    logging.info('skipping already processed change: %s...' % os.path.basename(pr)) +                    logging.info( +                        'skipping already processed change: %s...' % +                        os.path.basename(pr))                      self.master.server.changelog_done(pr)                      changes.remove(pr)              logging.debug('processing changes %s' % repr(changes)) @@ -1080,7 +1142,9 @@ class GMasterChangelogMixin(GMasterCommon):              # control should not reach here              raise +  class GMasterXsyncMixin(GMasterChangelogMixin): +      """      This crawl needs to be xtime based (as of now      it's not. this is beacuse we generate CHANGELOG @@ -1091,7 +1155,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      files, hardlinks and symlinks.      """ -    XSYNC_MAX_ENTRIES = 1<<13 +    XSYNC_MAX_ENTRIES = 1 << 13      def register(self):          self.counter = 0 @@ -1145,7 +1209,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):      def open(self):          try: -            self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time()))) +            self.xsync_change = os.path.join( +                self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time())))              self.fh = open(self.xsync_change, 'w')          except IOError:              raise @@ -1165,7 +1230,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          self.put('xsync', self.fname())          self.counter = 0          if not last: -            time.sleep(1) # make sure changelogs are 1 second apart +            time.sleep(1)  # make sure changelogs are 1 second apart              self.open()      def sync_stime(self, stime=None, last=False): @@ -1207,7 +1272,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              xtr_root = self.xtime('.', self.slave)              if isinstance(xtr_root, int):                  if xtr_root != ENOENT: -                    logging.warn("slave cluster not returning the " \ +                    logging.warn("slave cluster not returning the "                                   "correct xtime for root (%d)" % xtr_root)                  xtr_root = self.minus_infinity          xtl = self.xtime(path) @@ -1216,7 +1281,7 @@ class GMasterXsyncMixin(GMasterChangelogMixin):          xtr = self.xtime(path, self.slave)          if isinstance(xtr, int):              if xtr != ENOENT: -                logging.warn("slave cluster not returning the " \ +                logging.warn("slave cluster not returning the "                               "correct xtime for %s (%d)" % (path, xtr))              xtr = self.minus_infinity          xtr = max(xtr, xtr_root) @@ -1235,7 +1300,8 @@ class GMasterXsyncMixin(GMasterChangelogMixin):              e = os.path.join(path, e)              xte = self.xtime(e)              if isinstance(xte, int): -                logging.warn("irregular xtime for %s: %s" % (e, errno.errorcode[xte])) +                logging.warn("irregular xtime for %s: %s" % +                             (e, errno.errorcode[xte]))                  continue              if not self.need_sync(e, xte, xtr):                  continue @@ -1256,35 +1322,51 @@ class GMasterXsyncMixin(GMasterChangelogMixin):                  self.sync_done(self.stimes, False)                  self.stimes = []              if stat.S_ISDIR(mo): -                self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) +                self.write_entry_change("E", [gfid, 'MKDIR', str(mo), str( +                    st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, +                                                                    bname))])                  self.Xcrawl(e, xtr_root)                  self.stimes.append((e, xte))              elif stat.S_ISLNK(mo): -                self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) +                self.write_entry_change( +                    "E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, +                                                               bname))])              elif stat.S_ISREG(mo):                  nlink = st.st_nlink -                nlink -= 1 # fixup backend stat link count -                # if a file has a hardlink, create a Changelog entry as 'LINK' so the slave -                # side will decide if to create the new entry, or to create link. +                nlink -= 1  # fixup backend stat link count +                # if a file has a hardlink, create a Changelog entry as +                # 'LINK' so the slave side will decide if to create the +                # new entry, or to create link.                  if nlink == 1: -                    self.write_entry_change("E", [gfid, 'MKNOD', str(mo), str(st.st_uid), str(st.st_gid), escape(os.path.join(pargfid, bname))]) +                    self.write_entry_change("E", +                                            [gfid, 'MKNOD', str(mo), +                                             str(st.st_uid), +                                             str(st.st_gid), +                                             escape(os.path.join( +                                                 pargfid, bname))])                  else: -                    self.write_entry_change("E", [gfid, 'LINK', escape(os.path.join(pargfid, bname))]) +                    self.write_entry_change( +                        "E", [gfid, 'LINK', escape(os.path.join(pargfid, +                                                                bname))])                  self.write_entry_change("D", [gfid])          if path == '.':              self.stimes.append((path, xtl))              self.sync_done(self.stimes, True) +  class BoxClosedErr(Exception):      pass +  class PostBox(list): +      """synchronized collection for storing things thought of as "requests" """      def __init__(self, *a):          list.__init__(self, *a)          # too bad Python stdlib does not have read/write locks... -        # it would suffivce to grab the lock in .append as reader, in .close as writer +        # it would suffivce to grab the lock in .append as reader, in .close as +        # writer          self.lever = Condition()          self.open = True          self.done = False @@ -1319,7 +1401,9 @@ class PostBox(list):          self.open = False          self.lever.release() +  class Syncer(object): +      """a staged queue to relay rsync requests to rsync workers      By "staged queue" its meant that when a consumer comes to the  | 
