summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--geo-replication/syncdaemon/master.py93
-rw-r--r--geo-replication/syncdaemon/resource.py27
2 files changed, 107 insertions, 13 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 372717e36f2..9a53189348e 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -691,6 +691,8 @@ class GMasterChangelogMixin(GMasterCommon):
TYPE_GFID = "D "
TYPE_ENTRY = "E "
+ MAX_EF_RETRIES = 15
+
# flat directory hierarchy for gfid based access
FLAT_DIR_HIERARCHY = '.'
@@ -785,6 +787,95 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.inc_value("failures", num_failures)
+ def fix_possible_entry_failures(self, failures, retry_count):
+ pfx = gauxpfx()
+ fix_entry_ops = []
+ failures1 = []
+ for failure in failures:
+ if failure[2]['gfid_mismatch']:
+ slave_gfid = failure[2]['slave_gfid']
+ st = lstat(os.path.join(pfx, slave_gfid))
+ if isinstance(st, int) and st == ENOENT:
+ logging.info ("Fixing gfid mismatch [%s]: Deleting %s"
+ % (retry_count, repr(failure)))
+ #Add deletion to fix_entry_ops list
+ pbname = failure[0]['entry']
+ if failure[2]['slave_isdir']:
+ fix_entry_ops.append(edct('RMDIR',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ else:
+ fix_entry_ops.append(edct('UNLINK',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ elif not isinstance(st, int):
+ #The file exists on master but with different name.
+ #Probabaly renamed and got missed during xsync crawl.
+ if failure[2]['slave_isdir']:
+ logging.info ("Fixing gfid mismatch [%s]: %s"
+ % (retry_count, repr(failure)))
+ realpath = os.readlink(os.path.join(gconf.local_path,
+ ".glusterfs",
+ slave_gfid[0:2],
+ slave_gfid[2:4],
+ slave_gfid))
+ dst_entry = os.path.join(pfx, realpath.split('/')[-2],
+ realpath.split('/')[-1])
+ rename_dict = edct('RENAME', gfid=slave_gfid,
+ entry=failure[0]['entry'],
+ entry1=dst_entry, stat=st,
+ link=None)
+ logging.info ("Fixing gfid mismatch [%s]: Renaming %s"
+ % (retry_count, repr(rename_dict)))
+ fix_entry_ops.append(rename_dict)
+ else:
+ logging.info ("Fixing gfid mismatch [%s]: Deleting %s"
+ % (retry_count, repr(failure)))
+ pbname = failure[0]['entry']
+ fix_entry_ops.append(edct('UNLINK',
+ gfid=failure[2]['slave_gfid'],
+ entry=pbname))
+ logging.error ("GFID MISMATCH: ENTRY CANNOT BE FIXED: "
+ "gfid: %s" % slave_gfid)
+
+ if fix_entry_ops:
+ #Process deletions of entries whose gfids are mismatched
+ failures1 = self.slave.server.entry_ops(fix_entry_ops)
+ if not failures1:
+ logging.info ("Sucessfully fixed entry ops with gfid mismatch")
+
+ return failures1
+
+ def handle_entry_failures(self, failures, entries):
+ retries = 0
+ pending_failures = False
+ failures1 = []
+ failures2 = []
+
+ if failures:
+ pending_failures = True
+ failures1 = failures
+
+ while pending_failures and retries < self.MAX_EF_RETRIES:
+ retries += 1
+ failures2 = self.fix_possible_entry_failures(failures1,
+ retries)
+ if not failures2:
+ pending_failures = False
+ else:
+ pending_failures = True
+ failures1 = failures2
+
+ if pending_failures:
+ for failure in failures1:
+ logging.error("Failed to fix entry ops %s", repr(failure))
+ else:
+ #Retry original entry list 5 times
+ failures = self.slave.server.entry_ops(entries)
+
+ self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
+
+
def process_change(self, change, done, retry):
pfx = gauxpfx()
clist = []
@@ -997,7 +1088,7 @@ class GMasterChangelogMixin(GMasterCommon):
self.status.inc_value("entry", len(entries))
failures = self.slave.server.entry_ops(entries)
- self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
+ self.handle_entry_failures(failures, entries)
self.status.dec_value("entry", len(entries))
# Update Entry stime in Brick Root only in case of Changelog mode
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 37f6e1cabc1..275e9fd29ab 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -642,13 +642,14 @@ class Server(object):
st['uid'], st['gid'],
gf, st['mode'], bn, lnk)
- def entry_purge(op, entry, gfid):
+ def entry_purge(op, entry, gfid, e):
# This is an extremely racy code and needs to be fixed ASAP.
# The GFID check here is to be sure that the pargfid/bname
# to be purged is the GFID gotten from the changelog.
# (a stat(changelog_gfid) would also be valid here)
# The race here is between the GFID check and the purge.
if not matching_disk_gfid(gfid, entry):
+ collect_failure(e, EEXIST)
return
if op == 'UNLINK':
@@ -661,6 +662,8 @@ class Server(object):
return er
def collect_failure(e, cmd_ret):
+ slv_entry_info = {}
+ slv_entry_info['gfid_mismatch'] = False
# We do this for failing fops on Slave
# Master should be logging this
if cmd_ret is None:
@@ -669,11 +672,19 @@ class Server(object):
if cmd_ret == EEXIST:
disk_gfid = cls.gfid_mnt(e['entry'])
if isinstance(disk_gfid, basestring) and e['gfid'] != disk_gfid:
- failures.append((e, cmd_ret, disk_gfid))
+ slv_entry_info['gfid_mismatch'] = True
+ st = lstat(e['entry'])
+ if not isinstance(st, int):
+ if st and stat.S_ISDIR(st.st_mode):
+ slv_entry_info['slave_isdir'] = True
+ else:
+ slv_entry_info['slave_isdir'] = False
+ slv_entry_info['slave_gfid'] = disk_gfid
+ failures.append((e, cmd_ret, slv_entry_info))
else:
return False
else:
- failures.append((e, cmd_ret))
+ failures.append((e, cmd_ret, slv_entry_info))
return True
@@ -756,7 +767,7 @@ class Server(object):
if op in ['RMDIR', 'UNLINK']:
# Try once, if rmdir failed with ENOTEMPTY
# then delete recursively.
- er = entry_purge(op, entry, gfid)
+ er = entry_purge(op, entry, gfid, e)
if isinstance(er, int):
if er == ENOTEMPTY and op == 'RMDIR':
# Retry if ENOTEMPTY, ESTALE
@@ -855,14 +866,6 @@ class Server(object):
[ESTALE, EINVAL, EBUSY])
failed = collect_failure(e, cmd_ret)
- # If directory creation is failed, return immediately before
- # further processing. Allowing it to further process will
- # cause the entire directory tree to fail syncing to slave.
- # Hence master will log and raise exception if it's
- # directory failure.
- if failed and op == 'MKDIR':
- return failures
-
# If UID/GID is different than zero that means we are trying
# create Entry with different UID/GID. Create Entry with
# UID:0 and GID:0, and then call chown to set UID/GID