diff options
author | Aravinda VK <avishwan@redhat.com> | 2014-09-17 12:59:52 +0530 |
---|---|---|
committer | Venky Shankar <vshankar@redhat.com> | 2014-09-26 00:18:48 -0700 |
commit | 3c29c50cf60135245290133fbfed11aa3cf01e39 (patch) | |
tree | 94a988edf292f0441fc21ee27520a048b2502132 | |
parent | 60a75cdca76b0a4b83eb6f5bc70a320d586d79aa (diff) |
geo-rep: fix same file different gfid in master and slave
While processing RENAME in changelog, if the file is unlinked
in master, then geo-rep was sending UNLINK to slave instead of
RENAME.
If rsync job fails if one of the file failed to sync in the job.
This patch adds logic to remove GFID from data list if the same
changelog has UNLINK entry for it after the DATA. Or it removes
those GFIDs during retry of changelogs processing.
BUG: 1143853
Change-Id: I982dc976397cd0ab676bb912583f66a28f821926
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/8761
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Reviewed-by: Venky Shankar <vshankar@redhat.com>
Tested-by: Venky Shankar <vshankar@redhat.com>
-rw-r--r-- | geo-replication/syncdaemon/master.py | 50 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 5 |
2 files changed, 38 insertions, 17 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 79c90630dca..d8d26baafe1 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -239,6 +239,7 @@ class TarSSHEngine(object): """ def a_syncdata(self, files): + self.unlinked_gfids = [] logging.debug('files: %s' % (files)) for f in files: pb = self.syncer.add(f) @@ -252,6 +253,7 @@ class TarSSHEngine(object): # stat check for file presence st = lstat(se) if isinstance(st, int): + self.unlinked_gfids.append(se) return True logging.warn('tar+ssh: %s [errcode: %d]' % (se, rv[1])) self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -270,6 +272,7 @@ class RsyncEngine(object): """Sync engine that uses rsync(1) for data transfers""" def a_syncdata(self, files): + self.unlinked_gfids = [] logging.debug('files: %s' % (files)) for f in files: logging.debug('candidate for syncing %s' % f) @@ -286,6 +289,7 @@ class RsyncEngine(object): st = lstat(se) if isinstance(st, int): # file got unlinked in the interim + self.unlinked_gfids.append(se) return True logging.warn('Rsync: %s [errcode: %d]' % (se, rv[1])) self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb) @@ -423,6 +427,7 @@ class GMasterCommon(object): self.checkpoint_thread = None self.current_files_skipped_count = 0 self.skipped_gfid_list = [] + self.unlinked_gfids = [] def init_keep_alive(cls): """start the keep-alive thread """ @@ -797,9 +802,10 @@ class GMasterChangelogMixin(GMasterCommon): if k == 'stat': st = ed[k] dst = dct['stat'] = {} - dst['uid'] = st.st_uid - dst['gid'] = st.st_gid - dst['mode'] = st.st_mode + if st: + dst['uid'] = st.st_uid + dst['gid'] = st.st_gid + dst['mode'] = st.st_mode else: dct[k] = ed[k] return dct @@ -830,6 +836,12 @@ class GMasterChangelogMixin(GMasterCommon): gfid = ec[self.POS_GFID] if ty in ['UNLINK', 'RMDIR']: + # Remove from DATA list, so that rsync will + # not fail + pt = os.path.join(pfx, ec[0]) + if pt in datas: + datas.remove(pt) + purge_update() entries.append(edct(ty, gfid=gfid, entry=en)) elif ty in ['CREATE', 'MKDIR', 'MKNOD']: @@ -838,16 +850,22 @@ class GMasterChangelogMixin(GMasterCommon): entries.append(edct(ty, gfid=gfid, entry=en, mode=int(ec[2]), uid=int(ec[3]), gid=int(ec[4]))) + elif ty == "RENAME": + go = os.path.join(pfx, gfid) + st = lstat(go) + if isinstance(st, int): + st = {} + + entry_update() + e1 = unescape(os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) + entries.append(edct(ty, gfid=gfid, entry=e1, entry1=en, + stat=st)) else: # stat() to get mode and other information go = os.path.join(pfx, gfid) st = lstat(go) if isinstance(st, int): - if ty == 'RENAME': # special hack for renames... - entries.append(edct('UNLINK', gfid=gfid, entry=en)) - else: - logging.debug( - 'file %s got purged in the interim' % go) + logging.debug('file %s got purged in the interim' % go) continue if ty == 'LINK': @@ -860,16 +878,17 @@ class GMasterChangelogMixin(GMasterCommon): entry_update() entries.append( edct(ty, stat=st, entry=en, gfid=gfid, link=rl)) - elif ty == 'RENAME': - entry_update() - e1 = unescape( - os.path.join(pfx, ec[self.POS_ENTRY1 - 1])) - entries.append( - edct(ty, gfid=gfid, entry=e1, entry1=en, stat=st)) else: logging.warn('ignoring %s [op %s]' % (gfid, ty)) elif et == self.TYPE_GFID: - datas.add(os.path.join(pfx, ec[0])) + # If self.unlinked_gfids is available, then that means it is + # retrying the changelog second time. Do not add the GFID's + # to rsync job if failed previously but unlinked in master + if self.unlinked_gfids and \ + os.path.join(pfx, ec[0]) in self.unlinked_gfids: + logging.debug("ignoring data, since file purged interim") + else: + datas.add(os.path.join(pfx, ec[0])) elif et == self.TYPE_META: if ec[1] == 'SETATTR': # only setattr's for now... if len(ec) == 5: @@ -950,6 +969,7 @@ class GMasterChangelogMixin(GMasterCommon): # @change is the last changelog (therefore max time for this batch) if self.syncdata_wait(): + self.unlinked_gfids = [] if done: xtl = (int(change.split('.')[-1]) - 1, 0) self.upd_stime(xtl) diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 2a887daab15..1bee0a3338f 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -651,8 +651,9 @@ class Server(object): en = e['entry1'] st = lstat(entry) if isinstance(st, int): - (pg, bname) = entry2pb(en) - blob = entry_pack_reg_stat(gfid, bname, e['stat']) + if e['stat']: + (pg, bname) = entry2pb(en) + blob = entry_pack_reg_stat(gfid, bname, e['stat']) else: errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) if blob: |