diff options
Diffstat (limited to 'geo-replication')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 93 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 27 |
2 files changed, 107 insertions, 13 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 372717e36f2..9a53189348e 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -691,6 +691,8 @@ class GMasterChangelogMixin(GMasterCommon): TYPE_GFID = "D " TYPE_ENTRY = "E " + MAX_EF_RETRIES = 15 + # flat directory hierarchy for gfid based access FLAT_DIR_HIERARCHY = '.' @@ -785,6 +787,95 @@ class GMasterChangelogMixin(GMasterCommon): self.status.inc_value("failures", num_failures) + def fix_possible_entry_failures(self, failures, retry_count): + pfx = gauxpfx() + fix_entry_ops = [] + failures1 = [] + for failure in failures: + if failure[2]['gfid_mismatch']: + slave_gfid = failure[2]['slave_gfid'] + st = lstat(os.path.join(pfx, slave_gfid)) + if isinstance(st, int) and st == ENOENT: + logging.info ("Fixing gfid mismatch [%s]: Deleting %s" + % (retry_count, repr(failure))) + #Add deletion to fix_entry_ops list + pbname = failure[0]['entry'] + if failure[2]['slave_isdir']: + fix_entry_ops.append(edct('RMDIR', + gfid=failure[2]['slave_gfid'], + entry=pbname)) + else: + fix_entry_ops.append(edct('UNLINK', + gfid=failure[2]['slave_gfid'], + entry=pbname)) + elif not isinstance(st, int): + #The file exists on master but with different name. + #Probabaly renamed and got missed during xsync crawl. + if failure[2]['slave_isdir']: + logging.info ("Fixing gfid mismatch [%s]: %s" + % (retry_count, repr(failure))) + realpath = os.readlink(os.path.join(gconf.local_path, + ".glusterfs", + slave_gfid[0:2], + slave_gfid[2:4], + slave_gfid)) + dst_entry = os.path.join(pfx, realpath.split('/')[-2], + realpath.split('/')[-1]) + rename_dict = edct('RENAME', gfid=slave_gfid, + entry=failure[0]['entry'], + entry1=dst_entry, stat=st, + link=None) + logging.info ("Fixing gfid mismatch [%s]: Renaming %s" + % (retry_count, repr(rename_dict))) + fix_entry_ops.append(rename_dict) + else: + logging.info ("Fixing gfid mismatch [%s]: Deleting %s" + % (retry_count, repr(failure))) + pbname = failure[0]['entry'] + fix_entry_ops.append(edct('UNLINK', + gfid=failure[2]['slave_gfid'], + entry=pbname)) + logging.error ("GFID MISMATCH: ENTRY CANNOT BE FIXED: " + "gfid: %s" % slave_gfid) + + if fix_entry_ops: + #Process deletions of entries whose gfids are mismatched + failures1 = self.slave.server.entry_ops(fix_entry_ops) + if not failures1: + logging.info ("Sucessfully fixed entry ops with gfid mismatch") + + return failures1 + + def handle_entry_failures(self, failures, entries): + retries = 0 + pending_failures = False + failures1 = [] + failures2 = [] + + if failures: + pending_failures = True + failures1 = failures + + while pending_failures and retries < self.MAX_EF_RETRIES: + retries += 1 + failures2 = self.fix_possible_entry_failures(failures1, + retries) + if not failures2: + pending_failures = False + else: + pending_failures = True + failures1 = failures2 + + if pending_failures: + for failure in failures1: + logging.error("Failed to fix entry ops %s", repr(failure)) + else: + #Retry original entry list 5 times + failures = self.slave.server.entry_ops(entries) + + self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') + + def process_change(self, change, done, retry): pfx = gauxpfx() clist = [] @@ -997,7 +1088,7 @@ class GMasterChangelogMixin(GMasterCommon): self.status.inc_value("entry", len(entries)) failures = self.slave.server.entry_ops(entries) - self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') + self.handle_entry_failures(failures, entries) self.status.dec_value("entry", len(entries)) # Update Entry stime in Brick Root only in case of Changelog mode diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 37f6e1cabc1..275e9fd29ab 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -642,13 +642,14 @@ class Server(object): st['uid'], st['gid'], gf, st['mode'], bn, lnk) - def entry_purge(op, entry, gfid): + def entry_purge(op, entry, gfid, e): # This is an extremely racy code and needs to be fixed ASAP. # The GFID check here is to be sure that the pargfid/bname # to be purged is the GFID gotten from the changelog. # (a stat(changelog_gfid) would also be valid here) # The race here is between the GFID check and the purge. if not matching_disk_gfid(gfid, entry): + collect_failure(e, EEXIST) return if op == 'UNLINK': @@ -661,6 +662,8 @@ class Server(object): return er def collect_failure(e, cmd_ret): + slv_entry_info = {} + slv_entry_info['gfid_mismatch'] = False # We do this for failing fops on Slave # Master should be logging this if cmd_ret is None: @@ -669,11 +672,19 @@ class Server(object): if cmd_ret == EEXIST: disk_gfid = cls.gfid_mnt(e['entry']) if isinstance(disk_gfid, basestring) and e['gfid'] != disk_gfid: - failures.append((e, cmd_ret, disk_gfid)) + slv_entry_info['gfid_mismatch'] = True + st = lstat(e['entry']) + if not isinstance(st, int): + if st and stat.S_ISDIR(st.st_mode): + slv_entry_info['slave_isdir'] = True + else: + slv_entry_info['slave_isdir'] = False + slv_entry_info['slave_gfid'] = disk_gfid + failures.append((e, cmd_ret, slv_entry_info)) else: return False else: - failures.append((e, cmd_ret)) + failures.append((e, cmd_ret, slv_entry_info)) return True @@ -756,7 +767,7 @@ class Server(object): if op in ['RMDIR', 'UNLINK']: # Try once, if rmdir failed with ENOTEMPTY # then delete recursively. - er = entry_purge(op, entry, gfid) + er = entry_purge(op, entry, gfid, e) if isinstance(er, int): if er == ENOTEMPTY and op == 'RMDIR': # Retry if ENOTEMPTY, ESTALE @@ -855,14 +866,6 @@ class Server(object): [ESTALE, EINVAL, EBUSY]) failed = collect_failure(e, cmd_ret) - # If directory creation is failed, return immediately before - # further processing. Allowing it to further process will - # cause the entire directory tree to fail syncing to slave. - # Hence master will log and raise exception if it's - # directory failure. - if failed and op == 'MKDIR': - return failures - # If UID/GID is different than zero that means we are trying # create Entry with different UID/GID. Create Entry with # UID:0 and GID:0, and then call chown to set UID/GID |