diff options
author | Aravinda VK <avishwan@redhat.com> | 2015-04-12 17:46:45 +0530 |
---|---|---|
committer | Vijay Bellur <vbellur@redhat.com> | 2015-05-05 06:45:01 -0700 |
commit | 08107796c89f5f201b24d689ab6757237c743c0d (patch) | |
tree | 4fe4af7979e6734e9b2246fd4b2582570e8592f0 /geo-replication | |
parent | cfa6c85334fd62175aa114d779873b6790d6db8a (diff) |
geo-rep: Minimize rm -rf race in Geo-rep
While doing RMDIR worker gets ENOTEMPTY because same directory will
have files from other bricks which are not deleted since that worker
is slow processing. So geo-rep does recursive_delete.
Recursive delete was done using shutil.rmtree. once started, it will
not check disk_gfid in between. So it ends up deleting the new files
created by other workers. Also if other worker creates files after one
worker gets list of files to be deleted, then first worker will again
get ENOTEMPTY again.
To fix these races, retry is added when it gets ENOTEMPTY/ESTALE/ENODATA.
And disk_gfid check added for original path for which recursive_delete is
called. This disk gfid check executed before every Unlink/Rmdir. If disk
gfid is not matching with GFID from Changelog, that means other worker
deleted the directory. Even if the subdir/file present, it belongs to
different parent. Exit without performing further deletes.
Retry on ENOENT during create is ignored, since if CREATE/MKNOD/MKDIR
failed with ENOENT will not succeed unless parent directory is created
again.
Rsync errors handling was handling unlinked_gfids_list only for one
Changelog, but when processed in batch it fails to detect unlinked_gfids
and retries again. Finally skips the entire Changelogs in that batch.
Fixed this issue by moving self.unlinked_gfids reset logic before batch
start and after batch end.
Most of the Geo-rep races with rm -rf is eliminated with this patch,
but in some cases stale directories left in some bricks and in mount
point we get ENOTEMPTY.(DHT issue, Error will be logged in Slave log)
BUG: 1211037
Change-Id: I8716b88e4c741545f526095bf789f7c1e28008cb
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/10204
Reviewed-by: Kotresh HR <khiremat@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Tested-by: NetBSD Build System
Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'geo-replication')
-rw-r--r-- | geo-replication/syncdaemon/master.py | 3 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 85 | ||||
-rw-r--r-- | geo-replication/syncdaemon/syncdutils.py | 6 |
3 files changed, 68 insertions, 26 deletions
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py index 8e4c43046b0..ef79f02a52c 100644 --- a/geo-replication/syncdaemon/master.py +++ b/geo-replication/syncdaemon/master.py @@ -244,7 +244,6 @@ class TarSSHEngine(object): """ def a_syncdata(self, files): - self.unlinked_gfids = [] logging.debug('files: %s' % (files)) self.current_files_skipped_count = 0 del self.skipped_gfid_list[:] @@ -283,7 +282,6 @@ class RsyncEngine(object): """Sync engine that uses rsync(1) for data transfers""" def a_syncdata(self, files): - self.unlinked_gfids = [] logging.debug('files: %s' % (files)) self.current_files_skipped_count = 0 del self.skipped_gfid_list[:] @@ -930,6 +928,7 @@ class GMasterChangelogMixin(GMasterCommon): def process(self, changes, done=1): tries = 0 retry = False + self.unlinked_gfids = [] while True: self.skipped_gfid_list = [] diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 6bf1ad03e70..2a04d632091 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -597,11 +597,9 @@ class Server(object): # to be purged is the GFID gotten from the changelog. # (a stat(changelog_gfid) would also be valid here) # The race here is between the GFID check and the purge. - disk_gfid = cls.gfid_mnt(entry) - if isinstance(disk_gfid, int): - return - if not gfid == disk_gfid: + if not matching_disk_gfid(gfid, entry): return + er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR]) if isinstance(er, int): if er == EISDIR: @@ -624,6 +622,48 @@ class Server(object): failures.append((e, cmd_ret)) failures = [] + + def matching_disk_gfid(gfid, entry): + disk_gfid = cls.gfid_mnt(entry) + if isinstance(disk_gfid, int): + return False + + if not gfid == disk_gfid: + return False + + return True + + def recursive_rmdir(gfid, entry, path): + """disk_gfid check added for original path for which + recursive_delete is called. This disk gfid check executed + before every Unlink/Rmdir. If disk gfid is not matching + with GFID from Changelog, that means other worker + deleted the directory. Even if the subdir/file present, + it belongs to different parent. Exit without performing + further deletes. + """ + if not matching_disk_gfid(gfid, entry): + return + + names = [] + names = errno_wrap(os.listdir, [path], [ENOENT]) + if isinstance(names, int): + return + + for name in names: + fullname = os.path.join(path, name) + if not matching_disk_gfid(gfid, entry): + return + er = errno_wrap(os.remove, [fullname], [ENOENT, EISDIR]) + + if er == EISDIR: + recursive_rmdir(gfid, entry, fullname) + + if not matching_disk_gfid(gfid, entry): + return + + errno_wrap(os.rmdir, [path], [ENOENT]) + for e in entries: blob = None op = e['op'] @@ -631,23 +671,26 @@ class Server(object): entry = e['entry'] (pg, bname) = entry2pb(entry) if op in ['RMDIR', 'UNLINK']: - while True: - er = entry_purge(entry, gfid) - if isinstance(er, int): - if er == ENOTEMPTY and op == 'RMDIR': - er1 = errno_wrap(shutil.rmtree, - [os.path.join(pg, bname)], - [ENOENT]) - if not isinstance(er1, int): - logging.info("Removed %s/%s recursively" % - (pg, bname)) - break - + # Try once, if rmdir failed with ENOTEMPTY + # then delete recursively. + er = entry_purge(entry, gfid) + if isinstance(er, int): + if er == ENOTEMPTY and op == 'RMDIR': + # Retry if ENOTEMPTY, ESTALE + er1 = errno_wrap(recursive_rmdir, + [gfid, entry, + os.path.join(pg, bname)], + [], [ENOTEMPTY, ESTALE, ENODATA]) + if not isinstance(er1, int): + logging.debug("Removed %s => %s/%s recursively" % + (gfid, pg, bname)) + else: + logging.warn("Recursive remove %s => %s/%s" + "failed: %s" % (gfid, pg, bname, + os.strerror(er1))) + else: logging.warn("Failed to remove %s => %s/%s. %s" % (gfid, pg, bname, os.strerror(er))) - time.sleep(1) - else: - break elif op in ['CREATE', 'MKNOD']: blob = entry_pack_reg( gfid, bname, e['mode'], e['uid'], e['gid']) @@ -682,8 +725,8 @@ class Server(object): if blob: cmd_ret = errno_wrap(Xattr.lsetxattr, [pg, 'glusterfs.gfid.newfile', blob], - [EEXIST], - [ENOENT, ESTALE, EINVAL]) + [EEXIST, ENOENT], + [ESTALE, EINVAL]) collect_failure(e, cmd_ret) return failures diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 2614c828104..b565ec66cb5 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -466,9 +466,8 @@ def selfkill(sig=SIGTERM): os.kill(os.getpid(), sig) -def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): +def errno_wrap(call, arg=[], errnos=[], retry_errnos=[]): """ wrapper around calls resilient to errnos. - retry in case of ESTALE by default. """ nr_tries = 0 while True: @@ -483,7 +482,8 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]): nr_tries += 1 if nr_tries == GF_OP_RETRIES: # probably a screwed state, cannot do much... - logging.warn('reached maximum retries (%s)...' % repr(arg)) + logging.warn('reached maximum retries (%s)...%s' % + (repr(arg), ex)) return ex.errno time.sleep(0.250) # retry the call |