From 0f524f0710229a7f8de3a4e1e6a2790d40f67a8e Mon Sep 17 00:00:00 2001 From: Kotresh HR Date: Thu, 21 Sep 2017 18:11:15 -0400 Subject: geo-rep: Fix rename of directory in hybrid crawl In hybrid crawl, renames and unlink can't be synced but directory renames can be detected. While syncing the directory on slave, if the gfid already exists, it should be rename. Hence if directory gfid already exists, rename it. Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6 BUG: 1499566 Signed-off-by: Kotresh HR --- geo-replication/syncdaemon/gsyncd.py | 4 +- geo-replication/syncdaemon/monitor.py | 85 +---------- geo-replication/syncdaemon/resource.py | 189 +++++------------------- geo-replication/syncdaemon/syncdutils.py | 238 ++++++++++++++++++++++++++++++- 4 files changed, 276 insertions(+), 240 deletions(-) (limited to 'geo-replication') diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py index 932e37d1124..adca0374c6c 100644 --- a/geo-replication/syncdaemon/gsyncd.py +++ b/geo-replication/syncdaemon/gsyncd.py @@ -39,7 +39,7 @@ from changelogagent import agent, Changelog from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc from libcxattr import Xattr import struct -from syncdutils import get_master_and_slave_data_from_args, lf +from syncdutils import get_master_and_slave_data_from_args, lf, Popen ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError @@ -777,7 +777,7 @@ def main_i(): else: label = 'slave' startup(go_daemon=go_daemon, log_file=log_file, label=label) - resource.Popen.init_errhandler() + Popen.init_errhandler() if be_agent: os.setsid() diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 4da933047c8..c6fa1076a85 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -16,7 +16,7 @@ import logging import uuid import xml.etree.ElementTree as XET from subprocess import PIPE -from resource import Popen, FILE, GLUSTER, SSH +from resource import FILE, GLUSTER, SSH from threading import Lock from errno import ECHILD, ESRCH import re @@ -24,8 +24,9 @@ import random from gconf import gconf from syncdutils import select, waitpid, errno_wrap, lf from syncdutils import set_term_handler, is_host_local, GsyncdError -from syncdutils import escape, Thread, finalize, memoize +from syncdutils import escape, Thread, finalize from syncdutils import gf_event, EVENT_GEOREP_FAULTY +from syncdutils import Volinfo, Popen from gsyncdstatus import GeorepStatus, set_monitor_status @@ -91,86 +92,6 @@ def get_slave_bricks_status(host, vol): return list(up_hosts) -class Volinfo(object): - - def __init__(self, vol, host='localhost', prelude=[]): - po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, - 'volume', 'info', vol], - stdout=PIPE, stderr=PIPE) - vix = po.stdout.read() - po.wait() - po.terminate_geterr() - vi = XET.fromstring(vix) - if vi.find('opRet').text != '0': - if prelude: - via = '(via %s) ' % prelude.join(' ') - else: - via = ' ' - raise GsyncdError('getting volume info of %s%s ' - 'failed with errorcode %s' % - (vol, via, vi.find('opErrno').text)) - self.tree = vi - self.volume = vol - self.host = host - - def get(self, elem): - return self.tree.findall('.//' + elem) - - def is_tier(self): - return (self.get('typeStr')[0].text == 'Tier') - - def is_hot(self, brickpath): - logging.debug('brickpath: ' + repr(brickpath)) - return brickpath in self.hot_bricks - - @property - @memoize - def bricks(self): - def bparse(b): - host, dirp = b.find("name").text.split(':', 2) - return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text} - return [bparse(b) for b in self.get('brick')] - - @property - @memoize - def uuid(self): - ids = self.get('id') - if len(ids) != 1: - raise GsyncdError("volume info of %s obtained from %s: " - "ambiguous uuid" % (self.volume, self.host)) - return ids[0].text - - def replica_count(self, tier, hot): - if (tier and hot): - return int(self.get('hotBricks/hotreplicaCount')[0].text) - elif (tier and not hot): - return int(self.get('coldBricks/coldreplicaCount')[0].text) - else: - return int(self.get('replicaCount')[0].text) - - def disperse_count(self, tier, hot): - if (tier and hot): - # Tiering doesn't support disperse volume as hot brick, - # hence no xml output, so returning 0. In case, if it's - # supported later, we should change here. - return 0 - elif (tier and not hot): - return int(self.get('coldBricks/colddisperseCount')[0].text) - else: - return int(self.get('disperseCount')[0].text) - - @property - @memoize - def hot_bricks(self): - return [b.text for b in self.get('hotBricks/brick')] - - def get_hot_bricks_count(self, tier): - if (tier): - return int(self.get('hotBricks/hotbrickCount')[0].text) - else: - return 0 - - class Monitor(object): """class which spawns and manages gsyncd workers""" diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 0ca023cd8c5..a9810ae325b 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -15,18 +15,15 @@ import stat import time import signal import fcntl -import errno import types import struct import socket import logging import tempfile -import threading import subprocess import errno from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM -from select import error as SelectError import shutil from gconf import gconf @@ -43,7 +40,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION from syncdutils import GX_GFID_CANONICAL_LEN from gsyncdstatus import GeorepStatus from syncdutils import get_master_and_slave_data_from_args -from syncdutils import mntpt_list, lf +from syncdutils import mntpt_list, lf, Popen, sup, Volinfo from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') @@ -52,14 +49,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') -def sup(x, *a, **kw): - """a rubyesque "super" for python ;) - - invoke caller method in parent class with given args. - """ - return getattr(super(type(x), x), - sys._getframe(1).f_code.co_name)(*a, **kw) - +slv_volume = None +slv_host = None +slv_bricks = None def desugar(ustr): """transform sugared url strings to standard :// form @@ -114,149 +106,6 @@ def parse_url(ustr): return getattr(this, sch.upper())(path) -class Popen(subprocess.Popen): - - """customized subclass of subprocess.Popen with a ring - buffer for children error output""" - - @classmethod - def init_errhandler(cls): - """start the thread which handles children's error output""" - cls.errstore = {} - - def tailer(): - while True: - errstore = cls.errstore.copy() - try: - poe, _, _ = select( - [po.stderr for po in errstore], [], [], 1) - except (ValueError, SelectError): - # stderr is already closed wait for some time before - # checking next error - time.sleep(0.5) - continue - for po in errstore: - if po.stderr not in poe: - continue - po.lock.acquire() - try: - if po.on_death_row: - continue - la = errstore[po] - try: - fd = po.stderr.fileno() - except ValueError: # file is already closed - time.sleep(0.5) - continue - - try: - l = os.read(fd, 1024) - except OSError: - time.sleep(0.5) - continue - - if not l: - continue - tots = len(l) - for lx in la: - tots += len(lx) - while tots > 1 << 20 and la: - tots -= len(la.pop(0)) - la.append(l) - finally: - po.lock.release() - t = syncdutils.Thread(target=tailer) - t.start() - cls.errhandler = t - - @classmethod - def fork(cls): - """fork wrapper that restarts errhandler thread in child""" - pid = os.fork() - if not pid: - cls.init_errhandler() - return pid - - def __init__(self, args, *a, **kw): - """customizations for subprocess.Popen instantiation - - - 'close_fds' is taken to be the default - - if child's stderr is chosen to be managed, - register it with the error handler thread - """ - self.args = args - if 'close_fds' not in kw: - kw['close_fds'] = True - self.lock = threading.Lock() - self.on_death_row = False - self.elines = [] - try: - sup(self, args, *a, **kw) - except: - ex = sys.exc_info()[1] - if not isinstance(ex, OSError): - raise - raise GsyncdError("""execution of "%s" failed with %s (%s)""" % - (args[0], errno.errorcode[ex.errno], - os.strerror(ex.errno))) - if kw.get('stderr') == subprocess.PIPE: - assert(getattr(self, 'errhandler', None)) - self.errstore[self] = [] - - def errlog(self): - """make a log about child's failure event""" - logging.error(lf("command returned error", - cmd=" ".join(self.args), - error=self.returncode)) - lp = '' - - def logerr(l): - logging.error(self.args[0] + "> " + l) - for l in self.elines: - ls = l.split('\n') - ls[0] = lp + ls[0] - lp = ls.pop() - for ll in ls: - logerr(ll) - if lp: - logerr(lp) - - def errfail(self): - """fail nicely if child did not terminate with success""" - self.errlog() - syncdutils.finalize(exval=1) - - def terminate_geterr(self, fail_on_err=True): - """kill child, finalize stderr harvesting (unregister - from errhandler, set up .elines), fail on error if - asked for - """ - self.lock.acquire() - try: - self.on_death_row = True - finally: - self.lock.release() - elines = self.errstore.pop(self) - if self.poll() is None: - self.terminate() - if self.poll() is None: - time.sleep(0.1) - self.kill() - self.wait() - while True: - if not select([self.stderr], [], [], 0.1)[0]: - break - b = os.read(self.stderr.fileno(), 1024) - if b: - elines.append(b) - else: - break - self.stderr.close() - self.elines = elines - if fail_on_err and self.returncode != 0: - self.errfail() - - class Server(object): """singleton implemening those filesystem access primitives @@ -776,6 +625,31 @@ class Server(object): if isinstance(st, int): blob = entry_pack_mkdir( gfid, bname, e['mode'], e['uid'], e['gid']) + else: + # If gfid of a directory exists on slave but path based + # create is getting EEXIST. This means the directory is + # renamed in master but recorded as MKDIR during hybrid + # crawl. Get the directory path by reading the backend + # symlink and trying to rename to new name as said by + # master. + global slv_bricks + global slv_volume + global slv_host + if not slv_bricks: + slv_info = Volinfo (slv_volume, slv_host) + slv_bricks = slv_info.bricks + # Result of readlink would be of format as below. + # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" + realpath = os.readlink(os.path.join(slv_bricks[0]['dir'], + ".glusterfs", gfid[0:2], + gfid[2:4], gfid)) + realpath_parts = realpath.split('/') + src_pargfid = realpath_parts[-2] + src_basename = realpath_parts[-1] + src_entry = os.path.join(pfx, src_pargfid, src_basename) + logging.info(lf("Special case: rename on mkdir", + gfid=gfid, entry=repr(entry))) + rename_with_disk_gfid_confirmation(gfid, src_entry, entry) elif op == 'LINK': slink = os.path.join(pfx, gfid) st = lstat(slink) @@ -1309,6 +1183,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def __init__(self, path): self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) + global slv_volume + global slv_host + slv_volume = self.volume + slv_host = self.host + def canonical_path(self): return ':'.join([gethostbyname(self.host), self.volume]) diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py index 2187ecd226b..e611b7b6ae5 100644 --- a/geo-replication/syncdaemon/syncdutils.py +++ b/geo-replication/syncdaemon/syncdutils.py @@ -16,14 +16,18 @@ import fcntl import shutil import logging import socket +import errno +import threading import subprocess +from subprocess import PIPE from threading import Lock, Thread as baseThread from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode from signal import signal, SIGTERM import select as oselect from os import waitpid as owaitpid -import subprocess +import xml.etree.ElementTree as XET +from select import error as SelectError from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE sys.path.insert(1, GLUSTERFS_LIBEXECDIR) @@ -77,6 +81,15 @@ NEWLINE_ESCAPE_CHAR = "%0A" PERCENTAGE_ESCAPE_CHAR = "%25" +def sup(x, *a, **kw): + """a rubyesque "super" for python ;) + + invoke caller method in parent class with given args. + """ + return getattr(super(type(x), x), + sys._getframe(1).f_code.co_name)(*a, **kw) + + def escape(s): """the chosen flavor of string escaping, used all over to turn whatever data to creatable representation""" @@ -648,3 +661,226 @@ def lf(event, **kwargs): for k, v in kwargs.items(): msg += "\t{0}={1}".format(k, v) return msg + + +class Popen(subprocess.Popen): + + """customized subclass of subprocess.Popen with a ring + buffer for children error output""" + + @classmethod + def init_errhandler(cls): + """start the thread which handles children's error output""" + cls.errstore = {} + + def tailer(): + while True: + errstore = cls.errstore.copy() + try: + poe, _, _ = select( + [po.stderr for po in errstore], [], [], 1) + except (ValueError, SelectError): + # stderr is already closed wait for some time before + # checking next error + time.sleep(0.5) + continue + for po in errstore: + if po.stderr not in poe: + continue + po.lock.acquire() + try: + if po.on_death_row: + continue + la = errstore[po] + try: + fd = po.stderr.fileno() + except ValueError: # file is already closed + time.sleep(0.5) + continue + + try: + l = os.read(fd, 1024) + except OSError: + time.sleep(0.5) + continue + + if not l: + continue + tots = len(l) + for lx in la: + tots += len(lx) + while tots > 1 << 20 and la: + tots -= len(la.pop(0)) + la.append(l) + finally: + po.lock.release() + t = Thread(target=tailer) + t.start() + cls.errhandler = t + + @classmethod + def fork(cls): + """fork wrapper that restarts errhandler thread in child""" + pid = os.fork() + if not pid: + cls.init_errhandler() + return pid + + def __init__(self, args, *a, **kw): + """customizations for subprocess.Popen instantiation + + - 'close_fds' is taken to be the default + - if child's stderr is chosen to be managed, + register it with the error handler thread + """ + self.args = args + if 'close_fds' not in kw: + kw['close_fds'] = True + self.lock = threading.Lock() + self.on_death_row = False + self.elines = [] + try: + sup(self, args, *a, **kw) + except: + ex = sys.exc_info()[1] + if not isinstance(ex, OSError): + raise + raise GsyncdError("""execution of "%s" failed with %s (%s)""" % + (args[0], errno.errorcode[ex.errno], + os.strerror(ex.errno))) + if kw.get('stderr') == subprocess.PIPE: + assert(getattr(self, 'errhandler', None)) + self.errstore[self] = [] + + def errlog(self): + """make a log about child's failure event""" + logging.error(lf("command returned error", + cmd=" ".join(self.args), + error=self.returncode)) + lp = '' + + def logerr(l): + logging.error(self.args[0] + "> " + l) + for l in self.elines: + ls = l.split('\n') + ls[0] = lp + ls[0] + lp = ls.pop() + for ll in ls: + logerr(ll) + if lp: + logerr(lp) + + def errfail(self): + """fail nicely if child did not terminate with success""" + self.errlog() + finalize(exval=1) + + def terminate_geterr(self, fail_on_err=True): + """kill child, finalize stderr harvesting (unregister + from errhandler, set up .elines), fail on error if + asked for + """ + self.lock.acquire() + try: + self.on_death_row = True + finally: + self.lock.release() + elines = self.errstore.pop(self) + if self.poll() is None: + self.terminate() + if self.poll() is None: + time.sleep(0.1) + self.kill() + self.wait() + while True: + if not select([self.stderr], [], [], 0.1)[0]: + break + b = os.read(self.stderr.fileno(), 1024) + if b: + elines.append(b) + else: + break + self.stderr.close() + self.elines = elines + if fail_on_err and self.returncode != 0: + self.errfail() + + +class Volinfo(object): + + def __init__(self, vol, host='localhost', prelude=[]): + po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, + 'volume', 'info', vol], + stdout=PIPE, stderr=PIPE) + vix = po.stdout.read() + po.wait() + po.terminate_geterr() + vi = XET.fromstring(vix) + if vi.find('opRet').text != '0': + if prelude: + via = '(via %s) ' % prelude.join(' ') + else: + via = ' ' + raise GsyncdError('getting volume info of %s%s ' + 'failed with errorcode %s' % + (vol, via, vi.find('opErrno').text)) + self.tree = vi + self.volume = vol + self.host = host + + def get(self, elem): + return self.tree.findall('.//' + elem) + + def is_tier(self): + return (self.get('typeStr')[0].text == 'Tier') + + def is_hot(self, brickpath): + logging.debug('brickpath: ' + repr(brickpath)) + return brickpath in self.hot_bricks + + @property + @memoize + def bricks(self): + def bparse(b): + host, dirp = b.find("name").text.split(':', 2) + return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text} + return [bparse(b) for b in self.get('brick')] + + @property + @memoize + def uuid(self): + ids = self.get('id') + if len(ids) != 1: + raise GsyncdError("volume info of %s obtained from %s: " + "ambiguous uuid" % (self.volume, self.host)) + return ids[0].text + + def replica_count(self, tier, hot): + if (tier and hot): + return int(self.get('hotBricks/hotreplicaCount')[0].text) + elif (tier and not hot): + return int(self.get('coldBricks/coldreplicaCount')[0].text) + else: + return int(self.get('replicaCount')[0].text) + + def disperse_count(self, tier, hot): + if (tier and hot): + # Tiering doesn't support disperse volume as hot brick, + # hence no xml output, so returning 0. In case, if it's + # supported later, we should change here. + return 0 + elif (tier and not hot): + return int(self.get('coldBricks/colddisperseCount')[0].text) + else: + return int(self.get('disperseCount')[0].text) + + @property + @memoize + def hot_bricks(self): + return [b.text for b in self.get('hotBricks/brick')] + + def get_hot_bricks_count(self, tier): + if (tier): + return int(self.get('hotBricks/hotbrickCount')[0].text) + else: + return 0 -- cgit