diff options
-rw-r--r-- | geo-replication/syncdaemon/master.py | 3 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 85 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 6 |
3 files changed, 68 insertions, 26 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 8e4c43046b0..ef79f02a52c 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -244,7 +244,6 @@ class TarSSHEngine(object): """ def a_syncdata(self, files): - self.unlinked_gfids = [] logging.debug('files: %s' % (files)) self.current_files_skipped_count = 0 del self.skipped_gfid_list[:] @@ -283,7 +282,6 @@ class RsyncEngine(object): """Sync engine that uses rsync(1) for data transfers""" def a_syncdata(self, files): - self.unlinked_gfids = [] logging.debug('files: %s' % (files)) self.current_files_skipped_count = 0 del self.skipped_gfid_list[:] @@ -930,6 +928,7 @@ class GMasterChangelogMixin(GMasterCommon): def process(self, changes, done=1): tries = 0 retry = False + self.unlinked_gfids = [] while True: self.skipped_gfid_list = [] diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 6bf1ad03e70..2a04d632091 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -597,11 +597,9 @@ class Server(object): # to be purged is the GFID gotten from the changelog. # (a stat(changelog_gfid) would also be valid here) # The race here is between the GFID check and the purge. - disk_gfid = cls.gfid_mnt(entry) - if isinstance(disk_gfid, int): - return - if not gfid == disk_gfid: + if not matching_disk_gfid(gfid, entry): return + er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR]) if isinstance(er, int): if er == EISDIR: @@ -624,6 +622,48 @@ class Server(object): failures.append((e, cmd_ret)) failures = [] + + def matching_disk_gfid(gfid, entry): + disk_gfid = cls.gfid_mnt(entry) + if isinstance(disk_gfid, int): + return False + + if not gfid == disk_gfid: + return False + + return True + + def recursive_rmdir(gfid, entry, path): + """disk_gfid check added for original path for which + recursive_delete is called. This disk gfid check executed + before every Unlink/Rmdir. If disk gfid is not matching + with GFID from Changelog, that means other worker + deleted the directory. Even if the subdir/file present, + it belongs to different parent. Exit without performing + further deletes. + """ + if not matching_disk_gfid(gfid, entry): + return + + names = [] + names = errno_wrap(os.listdir, [path], [ENOENT]) + if isinstance(names, int): + return + + for name in names: + fullname = os.path.join(path, name) + if not matching_disk_gfid(gfid, entry): + return + er = errno_wrap(os.remove, [fullname], [ENOENT, EISDIR]) + + if er == EISDIR: + recursive_rmdir(gfid, entry, fullname) + + if not matching_disk_gfid(gfid, entry): + return + + errno_wrap(os.rmdir, [path], [ENOENT]) + for e in entries: blob = None op = e['op'] @@ -631,23 +671,26 @@ class Server(object): entry = e['entry'] (pg, bname) = entry2pb(entry) if op in ['RMDIR', 'UNLINK']: - while True: - er = entry_purge(entry, gfid) - if isinstance(er, int): - if er == ENOTEMPTY and op == 'RMDIR': - er1 = errno_wrap(shutil.rmtree, - [os.path.join(pg, bname)], - [ENOENT]) - if not isinstance(er1, int): - logging.info("Removed %s/%s recursively" % - (pg, bname)) - break - + # Try once, if rmdir failed with ENOTEMPTY + # then delete recursively. + er = entry_purge(entry, gfid) + if isinstance(er, int): + if er == ENOTEMPTY and op == 'RMDIR': + # Retry if ENOTEMPTY, ESTALE + er1 = errno_wrap(recursive_rmdir, + [gfid, entry, + os.path.join(pg, bname)], + [], [ENOTEMPTY, ESTALE, ENODATA]) + if not isinstance(er1, int): + logging.debug("Removed %s => %s/%s recursively" % + (gfid, pg, bname)) + else: + logging.warn("Recursive remove %s => %s/%s" + "failed: %s" % (gfid, pg, bname, + os.strerror(er1))) + else: logging.warn("Failed to remove %s => %s/%s. %s" % (gfid, pg, bname, os.strerror(er))) - time.sleep(1) - else: - break elif op in ['CREATE', 'MKNOD']: blob = entry_pack_reg( gfid, bname, e['mode'], e['uid'], e['gid']) @@ -682,8 +725,8 @@ class Server(object): if blob: cmd_ret = errno_wrap(Xattr.lsetxattr, [pg, 'glusterfs.gfid.newfile', blob], - [EEXIST], - [ENOENT, ESTALE, EINVAL]) + [EEXIST, ENOENT], + [ESTALE, EINVAL]) collect_failure(e, cmd_ret) return failures diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 2614c828104..b565ec66cb5 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -466,9 +466,8 @@ def selfkill(sig=SIGTERM): os.kill(os.getpid(), sig) -def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): +def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): """ wrapper around calls resilient to errnos. - retry in case of ESTALE by default. """ nr_tries = 0 while True: @@ -483,7 +482,8 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): nr_tries += 1 if nr_tries == GF_OP_RETRIES: # probably a screwed state, cannot do much... - logging.warn('reached maximum retries (%s)...' % repr(arg)) + logging.warn('reached maximum retries (%s)...%s' % + (repr(arg), ex)) return ex.errno time.sleep(0.250) # retry the call |