diff options
-rw-r--r-- | geo-replication/syncdaemon/master.py | 50 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 5 |
2 files changed, 38 insertions, 17 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 79c90630dca..d8d26baafe1 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -239,6 +239,7 @@ class TarSSHEngine(object): """ def a_syncdata(self, files): + self.unlinked_gfids = [] logging.debug('files: %s' % (files)) for f in files: pb = self.syncer.add(f) @@ -252,6 +253,7 @@ class TarSSHEngine(object): # stat check for file presence st = lstat(se) if isinstance(st, int): + self.unlinked_gfids.append(se) return True logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1])) self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -270,6 +272,7 @@ class RsyncEngine(object): """Sync engine that uses rsync(1) for data transfers""" def a_syncdata(self, files): + self.unlinked_gfids = [] logging.debug('files: %s' % (files)) for f in files: logging.debug('candidate for syncing %s' % f) @@ -286,6 +289,7 @@ class RsyncEngine(object): st = lstat(se) if isinstance(st, int): # file got unlinked in the interim + self.unlinked_gfids.append(se) return True logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -423,6 +427,7 @@ class GMasterCommon(object): self.checkpoint_thread = None self.current_files_skipped_count = 0 self.skipped_gfid_list = [] + self.unlinked_gfids = [] def init_keep_alive(cls): """start the keep-alive thread """ @@ -797,9 +802,10 @@ class GMasterChangelogMixin(GMasterCommon): if k == 'stat': st = ed[k] dst = dct['stat'] = {} - dst['uid'] = st.st_uid - dst['gid'] = st.st_gid - dst['mode'] = st.st_mode + if st: + dst['uid'] = st.st_uid + dst['gid'] = st.st_gid + dst['mode'] = st.st_mode else: dct[k] = ed[k] return dct @@ -830,6 +836,12 @@ class GMasterChangelogMixin(GMasterCommon): gfid = ec[self.POS_GFID] if ty in ['UNLINK', 'RMDIR']: + # Remove from DATA list, so that rsync will + # not fail + pt = os.path.join(pfx, ec[0]) + if pt in datas: + datas.remove(pt) + purge_update() entries.append(edct(ty, gfid=gfid, entry=en)) elif ty in ['CREATE', 'MKDIR', 'MKNOD']: @@ -838,16 +850,22 @@ class GMasterChangelogMixin(GMasterCommon): entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]), uid=int(ec[3]), gid=int(ec[4]))) + elif ty == "RENAME": + go = os.path.join(pfx, gfid) + st = lstat(go) + if isinstance(st, int): + st = {} + + entry_update() + e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) + entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, + stat=st)) else: # stat() to get mode and other information go = os.path.join(pfx, gfid) st = lstat(go) if isinstance(st, int): - if ty == 'RENAME': # special hack for renames... - entries.append(edct('UNLINK', gfid=gfid, entry=en)) - else: - logging.debug( - 'file %s got purged in the interim' % go) + logging.debug('file %s got purged in the interim' % go) continue if ty == 'LINK': @@ -860,16 +878,17 @@ class GMasterChangelogMixin(GMasterCommon): entry_update() entries.append( edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) - elif ty == 'RENAME': - entry_update() - e1 = unescape( - os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) - entries.append( - edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) else: logging.warn('ignoring %s [op %s]' % (gfid, ty)) elif et == self.TYPE_GFID: - datas.add(os.path.join(pfx, ec[0])) + # If self.unlinked_gfids is available, then that means it is + # retrying the changelog second time. Do not add the GFID's + # to rsync job if failed previously but unlinked in master + if self.unlinked_gfids and \ + os.path.join(pfx, ec[0]) in self.unlinked_gfids: + logging.debug("ignoring data, since file purged interim") + else: + datas.add(os.path.join(pfx, ec[0])) elif et == self.TYPE_META: if ec[1] == 'SETATTR': # only setattr's for now... if len(ec) == 5: @@ -950,6 +969,7 @@ class GMasterChangelogMixin(GMasterCommon): # @change is the last changelog (therefore max time for this batch) if self.syncdata_wait(): + self.unlinked_gfids = [] if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 2a887daab15..1bee0a3338f 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -651,8 +651,9 @@ class Server(object): en = e['entry1'] st = lstat(entry) if isinstance(st, int): - (pg, bname) = entry2pb(en) - blob = entry_pack_reg_stat(gfid, bname, e['stat']) + if e['stat']: + (pg, bname) = entry2pb(en) + blob = entry_pack_reg_stat(gfid, bname, e['stat']) else: errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) if blob: |