diff options
author | Kotresh HR <khiremat@redhat.com> | 2017-09-21 18:11:15 -0400 |
---|---|---|
committer | Aravinda VK <avishwan@redhat.com> | 2017-11-10 05:36:22 +0000 |
commit | 0f524f0710229a7f8de3a4e1e6a2790d40f67a8e (patch) | |
tree | d938aa2ba8c0b8dc7638c2740443d2d82557a099 /geo-replication/syncdaemon/resource.py | |
parent | 0fc1c562d8b8d09ec2b59bc525ec5635a21a4561 (diff) |
geo-rep: Fix rename of directory in hybrid crawl
In hybrid crawl, renames and unlink can't be
synced but directory renames can be detected.
While syncing the directory on slave, if the
gfid already exists, it should be rename.
Hence if directory gfid already exists, rename
it.
Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
BUG: 1499566
Signed-off-by: Kotresh HR <khiremat@redhat.com>
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 189 |
1 files changed, 34 insertions, 155 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 0ca023cd8c5..a9810ae325b 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -15,18 +15,15 @@ import stat import time import signal import fcntl -import errno import types import struct import socket import logging import tempfile -import threading import subprocess import errno from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM -from select import error as SelectError import shutil from gconf import gconf @@ -43,7 +40,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION from syncdutils import GX_GFID_CANONICAL_LEN from gsyncdstatus import GeorepStatus from syncdutils import get_master_and_slave_data_from_args -from syncdutils import mntpt_list, lf +from syncdutils import mntpt_list, lf, Popen, sup, Volinfo from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') @@ -52,14 +49,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') -def sup(x, *a, **kw): - """a rubyesque "super" for python ;) - - invoke caller method in parent class with given args. - """ - return getattr(super(type(x), x), - sys._getframe(1).f_code.co_name)(*a, **kw) - +slv_volume = None +slv_host = None +slv_bricks = None def desugar(ustr): """transform sugared url strings to standard <scheme>://<urlbody> form @@ -114,149 +106,6 @@ def parse_url(ustr): return getattr(this, sch.upper())(path) -class Popen(subprocess.Popen): - - """customized subclass of subprocess.Popen with a ring - buffer for children error output""" - - @classmethod - def init_errhandler(cls): - """start the thread which handles children's error output""" - cls.errstore = {} - - def tailer(): - while True: - errstore = cls.errstore.copy() - try: - poe, _, _ = select( - [po.stderr for po in errstore], [], [], 1) - except (ValueError, SelectError): - # stderr is already closed wait for some time before - # checking next error - time.sleep(0.5) - continue - for po in errstore: - if po.stderr not in poe: - continue - po.lock.acquire() - try: - if po.on_death_row: - continue - la = errstore[po] - try: - fd = po.stderr.fileno() - except ValueError: # file is already closed - time.sleep(0.5) - continue - - try: - l = os.read(fd, 1024) - except OSError: - time.sleep(0.5) - continue - - if not l: - continue - tots = len(l) - for lx in la: - tots += len(lx) - while tots > 1 << 20 and la: - tots -= len(la.pop(0)) - la.append(l) - finally: - po.lock.release() - t = syncdutils.Thread(target=tailer) - t.start() - cls.errhandler = t - - @classmethod - def fork(cls): - """fork wrapper that restarts errhandler thread in child""" - pid = os.fork() - if not pid: - cls.init_errhandler() - return pid - - def __init__(self, args, *a, **kw): - """customizations for subprocess.Popen instantiation - - - 'close_fds' is taken to be the default - - if child's stderr is chosen to be managed, - register it with the error handler thread - """ - self.args = args - if 'close_fds' not in kw: - kw['close_fds'] = True - self.lock = threading.Lock() - self.on_death_row = False - self.elines = [] - try: - sup(self, args, *a, **kw) - except: - ex = sys.exc_info()[1] - if not isinstance(ex, OSError): - raise - raise GsyncdError("""execution of "%s" failed with %s (%s)""" % - (args[0], errno.errorcode[ex.errno], - os.strerror(ex.errno))) - if kw.get('stderr') == subprocess.PIPE: - assert(getattr(self, 'errhandler', None)) - self.errstore[self] = [] - - def errlog(self): - """make a log about child's failure event""" - logging.error(lf("command returned error", - cmd=" ".join(self.args), - error=self.returncode)) - lp = '' - - def logerr(l): - logging.error(self.args[0] + "> " + l) - for l in self.elines: - ls = l.split('\n') - ls[0] = lp + ls[0] - lp = ls.pop() - for ll in ls: - logerr(ll) - if lp: - logerr(lp) - - def errfail(self): - """fail nicely if child did not terminate with success""" - self.errlog() - syncdutils.finalize(exval=1) - - def terminate_geterr(self, fail_on_err=True): - """kill child, finalize stderr harvesting (unregister - from errhandler, set up .elines), fail on error if - asked for - """ - self.lock.acquire() - try: - self.on_death_row = True - finally: - self.lock.release() - elines = self.errstore.pop(self) - if self.poll() is None: - self.terminate() - if self.poll() is None: - time.sleep(0.1) - self.kill() - self.wait() - while True: - if not select([self.stderr], [], [], 0.1)[0]: - break - b = os.read(self.stderr.fileno(), 1024) - if b: - elines.append(b) - else: - break - self.stderr.close() - self.elines = elines - if fail_on_err and self.returncode != 0: - self.errfail() - - class Server(object): """singleton implemening those filesystem access primitives @@ -776,6 +625,31 @@ class Server(object): if isinstance(st, int): blob = entry_pack_mkdir( gfid, bname, e['mode'], e['uid'], e['gid']) + else: + # If gfid of a directory exists on slave but path based + # create is getting EEXIST. This means the directory is + # renamed in master but recorded as MKDIR during hybrid + # crawl. Get the directory path by reading the backend + # symlink and trying to rename to new name as said by + # master. + global slv_bricks + global slv_volume + global slv_host + if not slv_bricks: + slv_info = Volinfo (slv_volume, slv_host) + slv_bricks = slv_info.bricks + # Result of readlink would be of format as below. + # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" + realpath = os.readlink(os.path.join(slv_bricks[0]['dir'], + ".glusterfs", gfid[0:2], + gfid[2:4], gfid)) + realpath_parts = realpath.split('/') + src_pargfid = realpath_parts[-2] + src_basename = realpath_parts[-1] + src_entry = os.path.join(pfx, src_pargfid, src_basename) + logging.info(lf("Special case: rename on mkdir", + gfid=gfid, entry=repr(entry))) + rename_with_disk_gfid_confirmation(gfid, src_entry, entry) elif op == 'LINK': slink = os.path.join(pfx, gfid) st = lstat(slink) @@ -1309,6 +1183,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def __init__(self, path): self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) + global slv_volume + global slv_host + slv_volume = self.volume + slv_host = self.host + def canonical_path(self): return ':'.join([gethostbyname(self.host), self.volume]) |