diff options
Diffstat (limited to 'geo-replication/syncdaemon/master.py')
| -rw-r--r-- | geo-replication/syncdaemon/master.py | 632 | 
1 files changed, 504 insertions, 128 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index f903f3059..58df14954 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -4,22 +4,20 @@ import time  import stat  import random  import signal +import json  import logging  import socket +import string  import errno -import re -from errno import ENOENT, ENODATA, EPIPE +from shutil import copyfileobj +from errno import ENOENT, ENODATA, EPIPE, EEXIST  from threading import currentThread, Condition, Lock  from datetime import datetime -try: -    from hashlib import md5 as md5 -except ImportError: -    # py 2.4 -    from md5 import new as md5  from gconf import gconf -from syncdutils import FreeObject, Thread, GsyncdError, boolify, \ -                       escape, unescape, select +from tempfile import mkdtemp, NamedTemporaryFile +from syncdutils import FreeObject, Thread, GsyncdError, boolify, escape, \ +                       unescape, select, gauxpfx, md5hex, selfkill, entry2pb  URXTIME = (-1, 0) @@ -51,18 +49,20 @@ def _volinfo_hook_relax_foreign(self):  # The API! -def gmaster_builder(): +def gmaster_builder(excrawl=None):      """produce the GMaster class variant corresponding         to sync mode"""      this = sys.modules[__name__]      modemixin = gconf.special_sync_mode      if not modemixin:          modemixin = 'normal' -    logging.info('setting up master for %s sync mode' % modemixin) +    changemixin = isinstance(excrawl, str) and excrawl or gconf.change_detector +    logging.info('setting up %s change detection mode' % changemixin)      modemixin = getattr(this, modemixin.capitalize() + 'Mixin') +    crawlmixin = getattr(this, 'GMaster' + changemixin.capitalize() + 'Mixin')      sendmarkmixin = boolify(gconf.use_rsync_xattrs) and SendmarkRsyncMixin or SendmarkNormalMixin      purgemixin = boolify(gconf.ignore_deletes) and PurgeNoopMixin or PurgeNormalMixin -    class _GMaster(GMasterBase, modemixin, sendmarkmixin, purgemixin): +    class _GMaster(crawlmixin, modemixin, sendmarkmixin, purgemixin):          pass      return _GMaster @@ -100,12 +100,9 @@ class NormalMixin(object):      def make_xtime_opts(self, is_master, opts):          if not 'create' in opts: -            opts['create'] = is_master and not self.inter_master +            opts['create'] = is_master          if not 'default_xtime' in opts: -            if is_master and self.inter_master: -                opts['default_xtime'] = ENODATA -            else: -                opts['default_xtime'] = URXTIME +            opts['default_xtime'] = URXTIME      def xtime_low(self, server, path, **opts):          xt = server.xtime(path, self.uuid) @@ -114,7 +111,7 @@ class NormalMixin(object):          if xt == ENODATA or xt < self.volmark:              if opts['create']:                  xt = _xtime_now() -                server.set_xtime(path, self.uuid, xt) +                server.aggregated.set_xtime(path, self.uuid, xt)              else:                  xt = opts['default_xtime']          return xt @@ -151,6 +148,13 @@ class NormalMixin(object):      def set_slave_xtime(self, path, mark):          self.slave.server.set_xtime(path, self.uuid, mark) +class PartialMixin(NormalMixin): +    """a variant tuned towards operation with a master +       that has partial info of the slave (brick typically)""" + +    def xtime_reversion_hook(self, path, xtl, xtr): +        pass +  class WrapupMixin(NormalMixin):      """a variant that differs from normal in terms         of ignoring non-indexed files""" @@ -163,7 +167,7 @@ class WrapupMixin(NormalMixin):              opts['default_xtime'] = URXTIME      @staticmethod -    def keepalive_payload_hook(timo, gap): +    def keepalive_payload_hook(self, timo, gap):          return (None, gap)      def volinfo_hook(self): @@ -236,19 +240,19 @@ class BlindMixin(object):                  # from interrupted gsyncd transfer                  logging.warn('have to fix up missing xtime on ' + path)                  xt0 = _xtime_now() -                server.set_xtime(path, self.uuid, xt0) +                server.aggregated.set_xtime(path, self.uuid, xt0)              else:                  xt0 = opts['default_xtime']              xt = (xt0, xt[1])          return xt      @staticmethod -    def keepalive_payload_hook(timo, gap): +    def keepalive_payload_hook(self, timo, gap):          return (None, gap)      def volinfo_hook(self):          res = _volinfo_hook_relax_foreign(self) -        volinfo_r_new = self.slave.server.native_volume_info() +        volinfo_r_new = self.slave.server.aggregated.native_volume_info()          if volinfo_r_new['retval']:              raise GsyncdError("slave is corrupt")          if getattr(self, 'volinfo_r', None): @@ -321,9 +325,7 @@ class PurgeNoopMixin(object):      def purge_missing(self, path, names):          pass - - -class GMasterBase(object): +class GMasterCommon(object):      """abstract class impementling master role"""      KFGN = 0 @@ -334,8 +336,8 @@ class GMasterBase(object):          err out on multiple foreign masters          """ -        fgn_vis, nat_vi = self.master.server.foreign_volume_infos(), \ -                          self.master.server.native_volume_info() +        fgn_vis, nat_vi = self.master.server.aggregated.foreign_volume_infos(), \ +                          self.master.server.aggregated.native_volume_info()          fgn_vi = None          if fgn_vis:              if len(fgn_vis) > 1: @@ -376,6 +378,33 @@ class GMasterBase(object):          self.make_xtime_opts(rsc == self.master, opts)          return self.xtime_low(rsc.server, path, **opts) +    def get_initial_crawl_data(self): +        default_data = {'sync_time': 0, 'files_synced': 0, 'bytes_synced': 0} +        if getattr(gconf, 'state_detail_file', None): +            try: +                return json.load(open(gconf.state_detail_file)) +            except (IOError, OSError): +                ex = sys.exc_info()[1] +                if ex.errno == ENOENT: +                    # Create file with initial data +                    with open(gconf.state_detail_file, 'wb') as f: +                        json.dump(default_data, f) +                    return default_data +                else: +                    raise + +        return default_data + +    def update_crawl_data(self): +        if getattr(gconf, 'state_detail_file', None): +            try: +                same_dir = os.path.dirname(gconf.state_detail_file) +                with NamedTemporaryFile(dir=same_dir, delete=False) as tmp: +                    json.dump(self.total_crawl_stats, tmp) +                    os.rename(tmp.name, gconf.state_detail_file) +            except (IOError, OSError): +                raise +      def __init__(self, master, slave):          self.master = master          self.slave = slave @@ -392,15 +421,12 @@ class GMasterBase(object):          self.crawls = 0          self.turns = 0          self.total_turns = int(gconf.turns) -        self.lastreport = {'crawls': 0, 'turns': 0} +        self.lastreport = {'crawls': 0, 'turns': 0, 'time': 0} +        self.crawl_stats = {'sync_time': 0, 'last_synctime': 0, 'crawl_starttime': 0, +                            'crawl_time': 0, 'files_synced': 0, 'bytes_synced' :0} +        self.total_crawl_stats = self.get_initial_crawl_data()          self.start = None          self.change_seen = None -        self.syncTime=0 -        self.lastSyncTime=0 -        self.crawlStartTime=0 -        self.crawlTime=0 -        self.filesSynced=0 -        self.bytesSynced=0          # the authoritative (foreign, native) volinfo pair          # which lets us deduce what to do when we refetch          # the volinfos from system @@ -409,8 +435,94 @@ class GMasterBase(object):          # the actual volinfo we make use of          self.volinfo = None          self.terminate = False +        self.sleep_interval = 1          self.checkpoint_thread = None +    def init_keep_alive(cls): +        """start the keep-alive thread """ +        timo = int(gconf.timeout or 0) +        if timo > 0: +            def keep_alive(): +                while True: +                    vi, gap = cls.keepalive_payload_hook(timo, timo * 0.5) +                    cls.slave.server.keep_alive(vi) +                    time.sleep(gap) +            t = Thread(target=keep_alive) +            t.start() + +    def volinfo_query(self): +        """volume info state machine""" +        volinfo_sys, state_change = self.volinfo_hook() +        if self.inter_master: +            self.volinfo = volinfo_sys[self.KFGN] +        else: +            self.volinfo = volinfo_sys[self.KNAT] +        if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): +            logging.info('new master is %s', self.uuid) +            if self.volinfo: +                logging.info("%s master with volume id %s ..." % \ +                                 (self.inter_master and "intermediate" or "primary", +                                  self.uuid)) +        if state_change == self.KFGN: +            gconf.configinterface.set('volume_id', self.uuid) +        if self.volinfo: +            if self.volinfo['retval']: +                raise GsyncdError ("master is corrupt") +            self.start_checkpoint_thread() +        else: +            if should_display_info or self.crawls == 0: +                if self.inter_master: +                    logging.info("waiting for being synced from %s ..." % \ +                                     self.volinfo_state[self.KFGN]['uuid']) +                else: +                    logging.info("waiting for volume info ...") +            return True + +    def should_crawl(cls): +        return (gconf.glusterd_uuid in cls.master.server.node_uuid()) + +    def register(self): +        self.register() + +    def crawlwrap(self, oneshot=False): +        if oneshot: +            # it's important to do this during the oneshot crawl as +            # for a passive gsyncd (ie. in a replicate scenario) +            # the keepalive thread would keep the connection alive. +            self.init_keep_alive() +        self.lastreport['time'] = time.time() +        self.crawl_stats['crawl_starttime'] = datetime.now() + +        logging.info('crawl interval: %d seconds' % self.sleep_interval) +        t0 = time.time() +        crawl = self.should_crawl() +        while not self.terminate: +            if self.volinfo_query(): +                continue +            t1 = time.time() +            if int(t1 - t0) >= 60: #lets hardcode this check to 60 seconds +                crawl = self.should_crawl() +                t0 = t1 +            if not crawl: +                time.sleep(5) +                continue +            if self.start: +                logging.debug("... crawl #%d done, took %.6f seconds" % \ +                                  (self.crawls, time.time() - self.start)) +            self.start = t1 +            should_display_info = self.start - self.lastreport['time'] >= 60 +            if should_display_info: +                logging.info("%d crawls, %d turns", +                             self.crawls - self.lastreport['crawls'], +                             self.turns - self.lastreport['turns']) +                self.lastreport.update(crawls = self.crawls, +                                       turns = self.turns, +                                       time = self.start) +            self.crawl() +            if oneshot: +                return +            time.sleep(self.sleep_interval) +      @classmethod      def _checkpt_param(cls, chkpt, prm, xtimish=True):          """use config backend to lookup a parameter belonging to @@ -443,32 +555,37 @@ class GMasterBase(object):          return ts      def get_extra_info(self): -        str_info="\nFile synced : %d" %(self.filesSynced) -        str_info+="\nBytes Synced : %d KB" %(self.syncer.bytesSynced) -        str_info+="\nSync Time : %f seconds" %(self.syncTime) -        self.crawlTime=datetime.now()-self.crawlStartTime -        years , days =divmod(self.crawlTime.days,365.25) -        years=int(years) -        days=int(days) +        str_info = "\nFilesSynced=%d;" % (self.crawl_stats['files_synced']) +        str_info += "BytesSynced=%s;" % (self.crawl_stats['bytes_synced']) + +        self.crawl_stats['crawl_time'] = datetime.now() - self.crawl_stats['crawl_starttime'] + +        str_info += "Uptime=%s;" % (self._crawl_time_format(self.crawl_stats['crawl_time'])) +        str_info += "SyncTime=%s;" % (self.crawl_stats['sync_time']) +        str_info += "TotalSyncTime=%s;" % (self.total_crawl_stats['sync_time']) +        str_info += "TotalFilesSynced=%d;" % (self.total_crawl_stats['files_synced']) +        str_info += "TotalBytesSynced=%s;" % (self.total_crawl_stats['bytes_synced']) +        str_info += "\0" +        logging.debug(str_info) +        return str_info + +    def _crawl_time_format(self, crawl_time): +        # Ex: 5 years, 4 days, 20:23:10 +        years, days = divmod(crawl_time.days, 365.25) +        years = int(years) +        days = int(days)          date="" -        m, s = divmod(self.crawlTime.seconds, 60) +        m, s = divmod(crawl_time.seconds, 60)          h, m = divmod(m, 60) -        if years!=0 : -                date+=str(years)+" year " -        if days!=0 : -                date+=str(days)+" day " -        if h!=0 : -                date+=str(h)+" H : " -        if m!=0 or h!=0 : -                date+=str(m)+" M : " - -        date+=str(s)+" S" -        self.crawlTime=date -        str_info+="\nCrawl Time : %s" %(str(self.crawlTime)) -        str_info+="\n\0" -        return str_info +        if years != 0: +            date += "%s %s " % (years, "year" if years == 1 else "years") +        if days != 0: +            date += "%s %s " % (days, "day" if days == 1 else "days") + +        date += "%s:%s:%s" % (string.zfill(h, 2), string.zfill(m, 2), string.zfill(s, 2)) +        return date      def checkpt_service(self, chan, chkpt, tgt):          """checkpoint service loop @@ -517,7 +634,7 @@ class GMasterBase(object):                  try:                      conn, _ = chan.accept()                      try: -                        conn.send("  | checkpoint %s %s %s" % (chkpt, status,self.get_extra_info())) +                        conn.send("  | checkpoint %s %s %s" % (chkpt, status, self.get_extra_info()))                      except:                          exc = sys.exc_info()[1]                          if (isinstance(exc, OSError) or isinstance(exc, IOError)) and \ @@ -536,7 +653,7 @@ class GMasterBase(object):          ):              return          chan = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) -        state_socket = os.path.join(gconf.socketdir, md5(gconf.state_socket_unencoded).hexdigest() + ".socket") +        state_socket = os.path.join(gconf.socketdir, md5hex(gconf.state_socket_unencoded) + ".socket")          try:              os.unlink(state_socket)          except: @@ -559,22 +676,6 @@ class GMasterBase(object):          t.start()          self.checkpoint_thread = t -    def crawl_loop(self): -        """start the keep-alive thread and iterate .crawl""" -        timo = int(gconf.timeout or 0) -        if timo > 0: -            def keep_alive(): -                while True: -                    vi, gap = self.keepalive_payload_hook(timo, timo * 0.5) -                    self.slave.server.keep_alive(vi) -                    time.sleep(gap) -            t = Thread(target=keep_alive) -            t.start() -        self.lastreport['time'] = time.time() -        self.crawlStartTime=datetime.now() -        while not self.terminate: -            self.crawl() -      def add_job(self, path, label, job, *a, **kw):          """insert @job function to job table at @path with @label"""          if self.jobtab.get(path) == None: @@ -600,7 +701,7 @@ class GMasterBase(object):              ret = j[-1]()              if not ret:                  succeed = False -        if succeed: +        if succeed and not args[0] == None:              self.sendmark(path, *args)          return succeed @@ -653,6 +754,319 @@ class GMasterBase(object):                        tuple(srep(vi) for vi in volinfo_state + volinfo_sys + newstate))          return newstate, param.state_change +class GMasterChangelogMixin(GMasterCommon): +    """ changelog based change detection and syncing """ + +    # index for change type and entry +    IDX_START = 0 +    IDX_END   = 2 + +    POS_GFID   = 0 +    POS_TYPE   = 1 +    POS_ENTRY1 = 2 +    POS_ENTRY2 = 3  # renames + +    _CL_TYPE_DATA_PFX     = "D " +    _CL_TYPE_METADATA_PFX = "M " +    _CL_TYPE_ENTRY_PFX    = "E " + +    TYPE_GFID  = [_CL_TYPE_DATA_PFX] # ignoring metadata ops +    TYPE_ENTRY = [_CL_TYPE_ENTRY_PFX] + +    # flat directory heirarchy for gfid based access +    FLAT_DIR_HIERARCHY = '.' + +    def fallback_xsync(self): +        logging.info('falling back to xsync mode') +        gconf.configinterface.set('change-detector', 'xsync') +        selfkill() + +    def setup_working_dir(self): +        workdir = os.path.join(gconf.working_dir, md5hex(gconf.local_path)) +        logfile = os.path.join(workdir, 'changes.log') +        logging.debug('changelog working dir %s (log: %s)' % (workdir, logfile)) +        return (workdir, logfile) + +    def lstat(self, e): +        try: +            return os.lstat(e) +        except (IOError, OSError): +            ex = sys.exc_info()[1] +            if ex.errno == ENOENT: +                return ex.errno +            else: +                raise + +    # sync data +    def syncdata(self, datas): +        logging.debug('datas: %s' % (datas)) +        for data in datas: +            logging.debug('candidate for syncing %s' % data) +            pb = self.syncer.add(data) +            timeA = datetime.now() +            def regjob(se, xte, pb): +                rv = pb.wait() +                if rv[0]: +                    logging.debug('synced ' + se) +                    # update stats +                    timeB = datetime.now() +                    self.crawl_stats['last_synctime'] = timeB - timeA +                    self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) +                    self.crawl_stats['files_synced'] += 1 +                    self.crawl_stats['bytes_synced'] += self.syncer.bytes_synced + +                    # cumulative statistics +                    self.total_crawl_stats['bytes_synced'] += self.syncer.bytes_synced +                    self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) +                    self.total_crawl_stats['files_synced'] += 1 +                    return True +                else: +                    if rv[1] in [23, 24]: +                        # stat to check if the file exist +                        st = self.lstat(se) +                        if isinstance(st, int): +                            # file got unlinked in the interim +                            return True +                    logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) +            self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, data, None, pb) +        if self.wait(self.FLAT_DIR_HIERARCHY, None): +            self.update_crawl_data() +            return True + +    def process_change(self, change, done): +        clist   = [] +        entries = [] +        purges = set() +        links = set() +        datas = set() +        pfx = gauxpfx() +        try: +            f = open(change, "r") +            clist = f.readlines() +            f.close() +        except IOError: +            raise + +        def edct(op, **ed): +            dct = {} +            dct['op'] = op +            for k in ed: +                if k == 'stat': +                    st = ed[k] +                    dst = dct['stat'] = {} +                    dst['uid'] = st.st_uid +                    dst['gid'] = st.st_gid +                    dst['mode'] = st.st_mode +                else: +                    dct[k] = ed[k] +            return dct +        for e in clist: +            e = e.strip() +            et = e[self.IDX_START:self.IDX_END] +            ec = e[self.IDX_END:].split(' ') +            if et in self.TYPE_ENTRY: +                ty = ec[self.POS_TYPE] +                en = unescape(os.path.join(pfx, ec[self.POS_ENTRY1])) +                gfid = ec[self.POS_GFID] +                # definitely need a better way bucketize entry ops +                if ty in ['UNLINK', 'RMDIR']: +                  entries.append(edct(ty, gfid=gfid, entry=en)) +                  purges.update([os.path.join(pfx, gfid)]) +                  continue +                if not ty == 'RENAME': +                    go = os.path.join(pfx, gfid) +                    st = self.lstat(go) +                    if isinstance(st, int): +                        logging.debug('file %s got purged in the interim' % go) +                        continue +                if ty in ['CREATE', 'MKDIR', 'MKNOD']: +                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) +                elif ty == 'LINK': +                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid)) +                    links.update([os.path.join(pfx, gfid)]) +                elif ty == 'SYMLINK': +                    entries.append(edct(ty, stat=st, entry=en, gfid=gfid, link=os.readlink(en))) +                elif ty == 'RENAME': +                    e2 = unescape(os.path.join(pfx, ec[self.POS_ENTRY2])) +                    entries.append(edct(ty, gfid=gfid, entry=en, entry1=e2)) +                else: +                    pass +            elif et in self.TYPE_GFID: +                da = os.path.join(pfx, ec[0]) +                st = self.lstat(da) +                if isinstance(st, int): +                    logging.debug('file %s got purged in the interim' % da) +                    continue +                datas.update([da]) +        logging.debug('entries: %s' % repr(entries)) +        # sync namespace +        if (entries): +            self.slave.server.entry_ops(entries) +        # sync data +        if self.syncdata(datas - (purges - links)): +            if done: +                self.master.server.changelog_done(change) +            return True + +    def process(self, changes, done=1): +        for change in changes: +            times = 0 +            while True: +                times += 1 +                logging.debug('processing change %s [%d time(s)]' % (change, times)) +                if self.process_change(change, done): +                    break +                # it's either entry_ops() or Rsync that failed to do it's +                # job. Mostly it's entry_ops() [which currently has a problem +                # of failing to create an entry but failing to return an errno] +                # Therefore we do not know if it's either Rsync or the freaking +                # entry_ops() that failed... so we retry the _whole_ changelog +                # again. +                # TODO: remove entry retries when it's gets fixed. +                logging.warn('incomplete sync, retrying changelog: %s' % change) +                time.sleep(0.5) +            self.turns += 1 + +    def upd_stime(self, stime): +        if stime: +            self.sendmark(self.FLAT_DIR_HIERARCHY, stime) + +    def crawl(self): +        changes = [] +        try: +            self.master.server.changelog_scan() +            self.crawls += 1 +        except OSError: +            self.fallback_xsync() +        changes = self.master.server.changelog_getchanges() +        if changes: +            xtl = self.xtime(self.FLAT_DIR_HIERARCHY) +            if isinstance(xtl, int): +                raise GsyncdError('master is corrupt') +            logging.debug('processing changes %s' % repr(changes)) +            self.process(changes) +            self.upd_stime(xtl) + +    def register(self): +        (workdir, logfile) = self.setup_working_dir() +        self.sleep_interval = int(gconf.change_interval) +        # register with the changelog library +        try: +            # 9 == log level (DEBUG) +            # 5 == connection retries +            self.master.server.changelog_register(gconf.local_path, +                                                  workdir, logfile, 9, 5) +        except OSError: +            self.fallback_xsync() +            # control should not reach here +            raise + +class GMasterXsyncMixin(GMasterChangelogMixin): +    """ + +    This crawl needs to be xtime based (as of now +    it's not. this is beacuse we generate CHANGELOG +    file during each crawl which is then processed +    by process_change()). +    For now it's used as a one-shot initial sync +    mechanism and only syncs directories, regular +    files and symlinks. +    """ + +    def register(self): +        self.sleep_interval = 60 +        self.tempdir = self.setup_working_dir()[0] +        self.tempdir = os.path.join(self.tempdir, 'xsync') +        logging.info('xsync temp directory: %s' % self.tempdir) +        try: +            os.makedirs(self.tempdir) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno == EEXIST and os.path.isdir(self.tempdir): +                pass +            else: +                raise + +    def write_entry_change(self, prefix, data=[]): +        self.fh.write("%s %s\n" % (prefix, ' '.join(data))) + +    def open(self): +        try: +            self.xsync_change = os.path.join(self.tempdir, 'XSYNC-CHANGELOG.' + str(int(time.time()))) +            self.fh = open(self.xsync_change, 'w') +        except IOError: +            raise + +    def close(self): +        self.fh.close() + +    def fname(self): +        return self.xsync_change + +    def crawl(self, path='.', xtr=None, done=0): +        """ generate a CHANGELOG file consumable by process_change """ +        if path == '.': +            self.open() +            self.crawls += 1 +        if not xtr: +            # get the root stime and use it for all comparisons +            xtr = self.xtime('.', self.slave) +            if isinstance(xtr, int): +                if xtr != ENOENT: +                    raise GsyncdError('slave is corrupt') +                xtr = self.minus_infinity +        xtl = self.xtime(path) +        if isinstance(xtl, int): +            raise GsyncdError('master is corrupt') +        if xtr == xtl: +            if path == '.': +                self.close() +            return +        self.xtime_reversion_hook(path, xtl, xtr) +        logging.debug("entering " + path) +        dem = self.master.server.entries(path) +        pargfid = self.master.server.gfid(path) +        if isinstance(pargfid, int): +            logging.warn('skipping directory %s' % (path)) +        for e in dem: +            bname = e +            e = os.path.join(path, e) +            st = self.lstat(e) +            if isinstance(st, int): +                logging.warn('%s got purged in the interim..' % e) +                continue +            gfid = self.master.server.gfid(e) +            if isinstance(gfid, int): +                logging.warn('skipping entry %s..' % (e)) +                continue +            xte = self.xtime(e) +            if isinstance(xte, int): +                raise GsyncdError('master is corrupt') +            if not self.need_sync(e, xte, xtr): +                continue +            mo = st.st_mode +            if stat.S_ISDIR(mo): +                self.write_entry_change("E", [gfid, 'MKDIR', escape(os.path.join(pargfid, bname))]) +                self.crawl(e, xtr) +            elif stat.S_ISREG(mo): +                self.write_entry_change("E", [gfid, 'CREATE', escape(os.path.join(pargfid, bname))]) +                self.write_entry_change("D", [gfid]) +            elif stat.S_ISLNK(mo): +                self.write_entry_change("E", [gfid, 'SYMLINK', escape(os.path.join(pargfid, bname))]) +            else: +                logging.info('ignoring %s' % e) +        if path == '.': +            logging.info('processing xsync changelog %s' % self.fname()) +            self.close() +            self.process([self.fname()], done) +            self.upd_stime(xtl) + +class GMasterXtimeMixin(GMasterCommon): +    """ xtime based change detection and syncing """ + +    def register(self): +        pass +      def crawl(self, path='.', xtl=None):          """crawling... @@ -691,46 +1105,6 @@ class GMasterBase(object):          assert that the file systems (master / slave) underneath do not change and actions          taken upon some condition will not lose their context by the time they are performed.          """ -        if path == '.': -            if self.start: -                self.crawls += 1 -                logging.debug("... crawl #%d done, took %.6f seconds" % \ -                              (self.crawls, time.time() - self.start)) -            time.sleep(1) -            self.start = time.time() -            should_display_info = self.start - self.lastreport['time'] >= 60 -            if should_display_info: -                logging.info("completed %d crawls, %d turns", -                             self.crawls - self.lastreport['crawls'], -                             self.turns - self.lastreport['turns']) -                self.lastreport.update(crawls = self.crawls, -                                       turns = self.turns, -                                       time = self.start) -            volinfo_sys, state_change = self.volinfo_hook() -            if self.inter_master: -                self.volinfo = volinfo_sys[self.KFGN] -            else: -                self.volinfo = volinfo_sys[self.KNAT] -            if state_change == self.KFGN or (state_change == self.KNAT and not self.inter_master): -                logging.info('new master is %s', self.uuid) -                if self.volinfo: -                    logging.info("%s master with volume id %s ..." % \ -                                 (self.inter_master and "intermediate" or "primary", -                                  self.uuid)) -            if state_change == self.KFGN: -               gconf.configinterface.set('volume_id', self.uuid) -            if self.volinfo: -                if self.volinfo['retval']: -                    raise GsyncdError ("master is corrupt") -                self.start_checkpoint_thread() -            else: -                if should_display_info or self.crawls == 0: -                    if self.inter_master: -                        logging.info("waiting for being synced from %s ..." % \ -                                     self.volinfo_state[self.KFGN]['uuid']) -                    else: -                        logging.info("waiting for volume info ...") -                return          logging.debug("entering " + path)          if not xtl:              xtl = self.xtime(path) @@ -806,6 +1180,7 @@ class GMasterBase(object):              st = indulgently(e, lambda e: os.lstat(e))              if st == False:                  continue +              mo = st.st_mode              adct = {'own': (st.st_uid, st.st_gid)}              if stat.S_ISLNK(mo): @@ -815,16 +1190,19 @@ class GMasterBase(object):              elif stat.S_ISREG(mo):                  logging.debug("syncing %s ..." % e)                  pb = self.syncer.add(e) -                timeA=datetime.now() +                timeA = datetime.now()                  def regjob(e, xte, pb): -                    if pb.wait(): +                    if pb.wait()[0]:                          logging.debug("synced " + e)                          self.sendmark_regular(e, xte) - -                        timeB=datetime.now() -                        self.lastSyncTime=timeB-timeA -                        self.syncTime=(self.syncTime+self.lastSyncTime.microseconds)/(10.0**6) -                        self.filesSynced=self.filesSynced+1 +                        # update stats +                        timeB = datetime.now() +                        self.crawl_stats['last_synctime'] = timeB - timeA +                        self.crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) +                        self.crawl_stats['files_synced'] += 1 +                        self.total_crawl_stats['sync_time'] += ((self.crawl_stats['last_synctime'].microseconds) / (10.0 ** 6)) +                        self.total_crawl_stats['files_synced'] += 1 +                        self.update_crawl_data()                          return True                      else:                          logging.warn("failed to sync " + e) @@ -841,6 +1219,7 @@ class GMasterBase(object):          if path == '.':              self.wait(path, xtl) +  class BoxClosedErr(Exception):      pass @@ -920,7 +1299,7 @@ class Syncer(object):          self.slave = slave          self.lock = Lock()          self.pb = PostBox() -        self.bytesSynced=0 +        self.bytes_synced = 0          for i in range(int(gconf.sync_jobs)):              t = Thread(target=self.syncjob)              t.start() @@ -940,13 +1319,10 @@ class Syncer(object):              pb.close()              po = self.slave.rsync(pb)              if po.returncode == 0: -                regEx=re.search('\ *total\ *transferred\ *file\ *size:\ *(\d+)\ *bytes\ *',po.stdout.read(),re.IGNORECASE) -                if regEx: -                        self.bytesSynced+=(int(regEx.group(1)))/1024 -                ret = True +                ret = (True, 0)              elif po.returncode in (23, 24):                  # partial transfer (cf. rsync(1)), that's normal -                ret = False +                ret = (False, po.returncode)              else:                  po.errfail()              pb.wakeup(ret)  | 
