diff options
-rw-r--r-- | geo-replication/syncdaemon/master.py | 141 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 57 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 35 |
3 files changed, 169 insertions, 64 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index d9f63a440fb..665a51f64dd 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -711,7 +711,8 @@ class GMasterChangelogMixin(GMasterCommon): TYPE_GFID = "D " TYPE_ENTRY = "E " - MAX_EF_RETRIES = 15 + MAX_EF_RETRIES = 10 + MAX_OE_RETRIES = 5 # flat directory hierarchy for gfid based access FLAT_DIR_HIERARCHY = '.' @@ -803,21 +804,28 @@ class GMasterChangelogMixin(GMasterCommon): self.status.inc_value("failures", num_failures) - def fix_possible_entry_failures(self, failures, retry_count): + def fix_possible_entry_failures(self, failures, retry_count, entries): pfx = gauxpfx() fix_entry_ops = [] failures1 = [] for failure in failures: - if failure[2]['dst']: + if failure[2]['name_mismatch']: + pbname = failure[2]['slave_entry'] + elif failure[2]['dst']: pbname = failure[0]['entry1'] else: pbname = failure[0]['entry'] - if failure[2]['gfid_mismatch']: + + op = failure[0]['op'] + # name exists but gfid is different + if failure[2]['gfid_mismatch'] or failure[2]['name_mismatch']: slave_gfid = failure[2]['slave_gfid'] st = lstat(os.path.join(pfx, slave_gfid)) + # Takes care of scenarios with no hardlinks if isinstance(st, int) and st == ENOENT: - logging.info(lf('Fixing gfid mismatch in slave. Deleting' - ' the entry', retry_count=retry_count, + logging.info(lf('Entry not present on master. Fixing gfid ' + 'mismatch in slave. Deleting the entry', + retry_count=retry_count, entry=repr(failure))) # Add deletion to fix_entry_ops list if failure[2]['slave_isdir']: @@ -830,79 +838,119 @@ class GMasterChangelogMixin(GMasterCommon): edct('UNLINK', gfid=failure[2]['slave_gfid'], entry=pbname)) + # Takes care of scenarios of hardlinks/renames on master elif not isinstance(st, int): - # The file exists on master but with different name. - # Probabaly renamed and got missed during xsync crawl. - if failure[2]['slave_isdir']: - logging.info(lf('Fixing gfid mismatch in slave', + if matching_disk_gfid(slave_gfid, pbname): + # Safe to ignore the failure as master contains same + # file with same gfid. Remove entry from entries list + logging.info(lf('Fixing gfid mismatch in slave. ' + ' Safe to ignore, take out entry', retry_count=retry_count, entry=repr(failure))) - realpath = os.readlink(os.path.join( - rconf.args.local_path, - ".glusterfs", - slave_gfid[0:2], - slave_gfid[2:4], - slave_gfid)) + entries.remove(failure[0]) + # The file exists on master but with different name. + # Probably renamed and got missed during xsync crawl. + elif failure[2]['slave_isdir']: + realpath = os.readlink(os.path.join(gconf.local_path, + ".glusterfs", + slave_gfid[0:2], + slave_gfid[2:4], + slave_gfid)) dst_entry = os.path.join(pfx, realpath.split('/')[-2], realpath.split('/')[-1]) - rename_dict = edct('RENAME', gfid=slave_gfid, - entry=failure[0]['entry'], - entry1=dst_entry, stat=st, - link=None) - logging.info(lf('Fixing gfid mismatch in slave. ' - 'Renaming', retry_count=retry_count, - entry=repr(rename_dict))) - fix_entry_ops.append(rename_dict) + src_entry = pbname + logging.info(lf('Fixing dir name/gfid mismatch in ' + 'slave', retry_count=retry_count, + entry=repr(failure))) + if src_entry == dst_entry: + # Safe to ignore the failure as master contains + # same directory as in slave with same gfid. + # Remove the failure entry from entries list + logging.info(lf('Fixing dir name/gfid mismatch' + ' in slave. Safe to ignore, ' + 'take out entry', + retry_count=retry_count, + entry=repr(failure))) + entries.remove(failure[0]) + else: + rename_dict = edct('RENAME', gfid=slave_gfid, + entry=src_entry, + entry1=dst_entry, stat=st, + link=None) + logging.info(lf('Fixing dir name/gfid mismatch' + ' in slave. Renaming', + retry_count=retry_count, + entry=repr(rename_dict))) + fix_entry_ops.append(rename_dict) else: - logging.info(lf('Fixing gfid mismatch in slave. ' - ' Deleting the entry', + # A hardlink file exists with different name or + # renamed file exists and we are sure from + # matching_disk_gfid check that the entry doesn't + # exist with same gfid so we can safely delete on slave + logging.info(lf('Fixing file gfid mismatch in slave. ' + 'Hardlink/Rename Case. Deleting entry', retry_count=retry_count, entry=repr(failure))) fix_entry_ops.append( edct('UNLINK', gfid=failure[2]['slave_gfid'], entry=pbname)) - logging.error(lf('Entry cannot be fixed in slave due ' - 'to GFID mismatch, find respective ' - 'path for the GFID and trigger sync', - gfid=slave_gfid)) + elif failure[1] == ENOENT: + # Ignore ENOENT error for fix_entry_ops aka retry_count > 1 + if retry_count > 1: + logging.info(lf('ENOENT error while fixing entry ops. ' + 'Safe to ignore, take out entry', + retry_count=retry_count, + entry=repr(failure))) + entries.remove(failure[0]) + elif op in ('MKNOD', 'CREATE', 'MKDIR'): + pargfid = pbname.split('/')[1] + st = lstat(os.path.join(pfx, pargfid)) + # Safe to ignore the failure as master doesn't contain + # parent directory. + if isinstance(st, int): + logging.info(lf('Fixing ENOENT error in slave. Parent ' + 'does not exist on master. Safe to ' + 'ignore, take out entry', + retry_count=retry_count, + entry=repr(failure))) + entries.remove(failure[0]) if fix_entry_ops: # Process deletions of entries whose gfids are mismatched failures1 = self.slave.server.entry_ops(fix_entry_ops) - if not failures1: - logging.info("Sucessfully fixed entry ops with gfid mismatch") - return failures1 + return (failures1, fix_entry_ops) def handle_entry_failures(self, failures, entries): retries = 0 pending_failures = False failures1 = [] failures2 = [] + entry_ops1 = [] + entry_ops2 = [] if failures: pending_failures = True failures1 = failures + entry_ops1 = entries while pending_failures and retries < self.MAX_EF_RETRIES: retries += 1 - failures2 = self.fix_possible_entry_failures(failures1, - retries) + (failures2, entry_ops2) = self.fix_possible_entry_failures( + failures1, retries, entry_ops1) if not failures2: pending_failures = False + logging.info(lf('Sucessfully fixed entry ops with gfid ' + 'mismatch', retry_count=retries)) else: pending_failures = True failures1 = failures2 + entry_ops1 = entry_ops2 if pending_failures: for failure in failures1: logging.error("Failed to fix entry ops %s", repr(failure)) - else: - # Retry original entry list 5 times - failures = self.slave.server.entry_ops(entries) - - self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') def process_change(self, change, done, retry): pfx = gauxpfx() @@ -1129,7 +1177,18 @@ class GMasterChangelogMixin(GMasterCommon): self.status.inc_value("entry", len(entries)) failures = self.slave.server.entry_ops(entries) - self.handle_entry_failures(failures, entries) + count = 0 + while failures and count < self.MAX_OE_RETRIES: + count += 1 + self.handle_entry_failures(failures, entries) + logging.info("Retry original entries. count = %s" % count) + failures = self.slave.server.entry_ops(entries) + if not failures: + logging.info("Sucessfully fixed all entry ops with gfid " + "mismatch") + break + + self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY') self.status.dec_value("entry", len(entries)) # Update Entry stime in Brick Root only in case of Changelog mode diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 7eeced96caf..43e82f30d28 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -38,16 +38,15 @@ from syncdutils import get_changelog_log_level, get_rsync_version from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION from syncdutils import GX_GFID_CANONICAL_LEN from gsyncdstatus import GeorepStatus -from syncdutils import lf, Popen, sup, Volinfo +from syncdutils import lf, Popen, sup from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt -from syncdutils import unshare_propagation_supported +from syncdutils import unshare_propagation_supported, get_slv_dir_path ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') slv_volume = None slv_host = None -slv_bricks = None class Server(object): @@ -408,13 +407,23 @@ class Server(object): # to be purged is the GFID gotten from the changelog. # (a stat(changelog_gfid) would also be valid here) # The race here is between the GFID check and the purge. + + # If the entry or the gfid of the file to be deleted is not present + # on slave, we can ignore the unlink/rmdir + if isinstance(lstat(entry), int) or \ + isinstance(lstat(os.path.join(pfx, gfid)), int): + return + if not matching_disk_gfid(gfid, entry): collect_failure(e, EEXIST) return if op == 'UNLINK': er = errno_wrap(os.unlink, [entry], [ENOENT, ESTALE], [EBUSY]) - return er + # EISDIR is safe error, ignore. This can only happen when + # unlink is sent from master while fixing gfid conflicts. + if er != EISDIR: + return er elif op == 'RMDIR': er = errno_wrap(os.rmdir, [entry], [ENOENT, ESTALE, @@ -425,7 +434,11 @@ class Server(object): def collect_failure(e, cmd_ret, dst=False): slv_entry_info = {} slv_entry_info['gfid_mismatch'] = False + slv_entry_info['name_mismatch'] = False slv_entry_info['dst'] = dst + slv_entry_info['slave_isdir'] = False + slv_entry_info['slave_name'] = None + slv_entry_info['slave_gfid'] = None # We do this for failing fops on Slave # Master should be logging this if cmd_ret is None: @@ -444,6 +457,9 @@ class Server(object): if not isinstance(st, int): if st and stat.S_ISDIR(st.st_mode): slv_entry_info['slave_isdir'] = True + dir_name = get_slv_dir_path(slv_host, slv_volume, + disk_gfid) + slv_entry_info['slave_name'] = dir_name else: slv_entry_info['slave_isdir'] = False slv_entry_info['slave_gfid'] = disk_gfid @@ -563,39 +579,34 @@ class Server(object): [ENOENT, EEXIST], [ESTALE]) collect_failure(e, cmd_ret) elif op == 'MKDIR': + en = e['entry'] slink = os.path.join(pfx, gfid) st = lstat(slink) # don't create multiple entries with same gfid if isinstance(st, int): blob = entry_pack_mkdir( gfid, bname, e['mode'], e['uid'], e['gid']) - else: + elif (isinstance(lstat(en), int) or + not matching_disk_gfid(gfid, en)): # If gfid of a directory exists on slave but path based # create is getting EEXIST. This means the directory is # renamed in master but recorded as MKDIR during hybrid # crawl. Get the directory path by reading the backend # symlink and trying to rename to new name as said by # master. - global slv_bricks - global slv_volume - global slv_host - if not slv_bricks: - slv_info = Volinfo(slv_volume, slv_host) - slv_bricks = slv_info.bricks - # Result of readlink would be of format as below. - # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" - realpath = os.readlink(os.path.join(slv_bricks[0]['dir'], - ".glusterfs", - gfid[0:2], - gfid[2:4], - gfid)) - realpath_parts = realpath.split('/') - src_pargfid = realpath_parts[-2] - src_basename = realpath_parts[-1] - src_entry = os.path.join(pfx, src_pargfid, src_basename) logging.info(lf("Special case: rename on mkdir", gfid=gfid, entry=repr(entry))) - rename_with_disk_gfid_confirmation(gfid, src_entry, entry) + src_entry = get_slv_dir_path(slv_host, slv_volume, gfid) + if src_entry is not None and src_entry != entry: + slv_entry_info = {} + slv_entry_info['gfid_mismatch'] = False + slv_entry_info['name_mismatch'] = True + slv_entry_info['dst'] = False + slv_entry_info['slave_isdir'] = True + slv_entry_info['slave_gfid'] = gfid + slv_entry_info['slave_entry'] = src_entry + + failures.append((e, EEXIST, slv_entry_info)) elif op == 'LINK': slink = os.path.join(pfx, gfid) st = lstat(slink) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 6acc9f17ad7..f7173017e5d 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -66,6 +66,7 @@ CHANGELOG_AGENT_CLIENT_VERSION = 1.0 NodeID = None rsync_version = None unshare_mnt_propagation = None +slv_bricks = None SPACE_ESCAPE_CHAR = "%20" NEWLINE_ESCAPE_CHAR = "%0A" PERCENTAGE_ESCAPE_CHAR = "%25" @@ -660,6 +661,40 @@ def get_rsync_version(rsync_cmd): return rsync_version +def get_slv_dir_path(slv_host, slv_volume, gfid): + global slv_bricks + + dir_path = ENOENT + + if not slv_bricks: + slv_info = Volinfo(slv_volume, slv_host) + slv_bricks = slv_info.bricks + # Result of readlink would be of format as below. + # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" + for brick in slv_bricks: + dir_path = errno_wrap(os.path.join, + [brick['dir'], + ".glusterfs", gfid[0:2], + gfid[2:4], + gfid], [ENOENT], [ESTALE]) + if dir_path != ENOENT: + break + + if not isinstance(dir_path, int): + realpath = errno_wrap(os.readlink, [dir_path], + [ENOENT], [ESTALE]) + + if not isinstance(realpath, int): + realpath_parts = realpath.split('/') + pargfid = realpath_parts[-2] + basename = realpath_parts[-1] + pfx = gauxpfx() + dir_entry = os.path.join(pfx, pargfid, basename) + return dir_entry + + return None + + def lf(event, **kwargs): """ Log Format helper function, log messages can be |