summaryrefslogtreecommitdiffstats
path: root/geo-replication
diff options
context:
space:
mode:
authorKotresh HR <khiremat@redhat.com>2017-09-21 18:11:15 -0400
committerAravinda VK <avishwan@redhat.com>2017-11-10 05:36:22 +0000
commit0f524f0710229a7f8de3a4e1e6a2790d40f67a8e (patch)
treed938aa2ba8c0b8dc7638c2740443d2d82557a099 /geo-replication
parent0fc1c562d8b8d09ec2b59bc525ec5635a21a4561 (diff)
geo-rep: Fix rename of directory in hybrid crawl
In hybrid crawl, renames and unlink can't be synced but directory renames can be detected. While syncing the directory on slave, if the gfid already exists, it should be rename. Hence if directory gfid already exists, rename it. Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6 BUG: 1499566 Signed-off-by: Kotresh HR <khiremat@redhat.com>
Diffstat (limited to 'geo-replication')
-rw-r--r--geo-replication/syncdaemon/gsyncd.py4
-rw-r--r--geo-replication/syncdaemon/monitor.py85
-rw-r--r--geo-replication/syncdaemon/resource.py189
-rw-r--r--geo-replication/syncdaemon/syncdutils.py238
4 files changed, 276 insertions, 240 deletions
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 932e37d1124..adca0374c6c 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -39,7 +39,7 @@ from changelogagent import agent, Changelog
from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc
from libcxattr import Xattr
import struct
-from syncdutils import get_master_and_slave_data_from_args, lf
+from syncdutils import get_master_and_slave_data_from_args, lf, Popen
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
@@ -777,7 +777,7 @@ def main_i():
else:
label = 'slave'
startup(go_daemon=go_daemon, log_file=log_file, label=label)
- resource.Popen.init_errhandler()
+ Popen.init_errhandler()
if be_agent:
os.setsid()
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 4da933047c8..c6fa1076a85 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -16,7 +16,7 @@ import logging
import uuid
import xml.etree.ElementTree as XET
from subprocess import PIPE
-from resource import Popen, FILE, GLUSTER, SSH
+from resource import FILE, GLUSTER, SSH
from threading import Lock
from errno import ECHILD, ESRCH
import re
@@ -24,8 +24,9 @@ import random
from gconf import gconf
from syncdutils import select, waitpid, errno_wrap, lf
from syncdutils import set_term_handler, is_host_local, GsyncdError
-from syncdutils import escape, Thread, finalize, memoize
+from syncdutils import escape, Thread, finalize
from syncdutils import gf_event, EVENT_GEOREP_FAULTY
+from syncdutils import Volinfo, Popen
from gsyncdstatus import GeorepStatus, set_monitor_status
@@ -91,86 +92,6 @@ def get_slave_bricks_status(host, vol):
return list(up_hosts)
-class Volinfo(object):
-
- def __init__(self, vol, host='localhost', prelude=[]):
- po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
- 'volume', 'info', vol],
- stdout=PIPE, stderr=PIPE)
- vix = po.stdout.read()
- po.wait()
- po.terminate_geterr()
- vi = XET.fromstring(vix)
- if vi.find('opRet').text != '0':
- if prelude:
- via = '(via %s) ' % prelude.join(' ')
- else:
- via = ' '
- raise GsyncdError('getting volume info of %s%s '
- 'failed with errorcode %s' %
- (vol, via, vi.find('opErrno').text))
- self.tree = vi
- self.volume = vol
- self.host = host
-
- def get(self, elem):
- return self.tree.findall('.//' + elem)
-
- def is_tier(self):
- return (self.get('typeStr')[0].text == 'Tier')
-
- def is_hot(self, brickpath):
- logging.debug('brickpath: ' + repr(brickpath))
- return brickpath in self.hot_bricks
-
- @property
- @memoize
- def bricks(self):
- def bparse(b):
- host, dirp = b.find("name").text.split(':', 2)
- return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
- return [bparse(b) for b in self.get('brick')]
-
- @property
- @memoize
- def uuid(self):
- ids = self.get('id')
- if len(ids) != 1:
- raise GsyncdError("volume info of %s obtained from %s: "
- "ambiguous uuid" % (self.volume, self.host))
- return ids[0].text
-
- def replica_count(self, tier, hot):
- if (tier and hot):
- return int(self.get('hotBricks/hotreplicaCount')[0].text)
- elif (tier and not hot):
- return int(self.get('coldBricks/coldreplicaCount')[0].text)
- else:
- return int(self.get('replicaCount')[0].text)
-
- def disperse_count(self, tier, hot):
- if (tier and hot):
- # Tiering doesn't support disperse volume as hot brick,
- # hence no xml output, so returning 0. In case, if it's
- # supported later, we should change here.
- return 0
- elif (tier and not hot):
- return int(self.get('coldBricks/colddisperseCount')[0].text)
- else:
- return int(self.get('disperseCount')[0].text)
-
- @property
- @memoize
- def hot_bricks(self):
- return [b.text for b in self.get('hotBricks/brick')]
-
- def get_hot_bricks_count(self, tier):
- if (tier):
- return int(self.get('hotBricks/hotbrickCount')[0].text)
- else:
- return 0
-
-
class Monitor(object):
"""class which spawns and manages gsyncd workers"""
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 0ca023cd8c5..a9810ae325b 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -15,18 +15,15 @@ import stat
import time
import signal
import fcntl
-import errno
import types
import struct
import socket
import logging
import tempfile
-import threading
import subprocess
import errno
from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
-from select import error as SelectError
import shutil
from gconf import gconf
@@ -43,7 +40,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
from syncdutils import GX_GFID_CANONICAL_LEN
from gsyncdstatus import GeorepStatus
from syncdutils import get_master_and_slave_data_from_args
-from syncdutils import mntpt_list, lf
+from syncdutils import mntpt_list, lf, Popen, sup, Volinfo
from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
@@ -52,14 +49,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
-def sup(x, *a, **kw):
- """a rubyesque "super" for python ;)
-
- invoke caller method in parent class with given args.
- """
- return getattr(super(type(x), x),
- sys._getframe(1).f_code.co_name)(*a, **kw)
-
+slv_volume = None
+slv_host = None
+slv_bricks = None
def desugar(ustr):
"""transform sugared url strings to standard <scheme>://<urlbody> form
@@ -114,149 +106,6 @@ def parse_url(ustr):
return getattr(this, sch.upper())(path)
-class Popen(subprocess.Popen):
-
- """customized subclass of subprocess.Popen with a ring
- buffer for children error output"""
-
- @classmethod
- def init_errhandler(cls):
- """start the thread which handles children's error output"""
- cls.errstore = {}
-
- def tailer():
- while True:
- errstore = cls.errstore.copy()
- try:
- poe, _, _ = select(
- [po.stderr for po in errstore], [], [], 1)
- except (ValueError, SelectError):
- # stderr is already closed wait for some time before
- # checking next error
- time.sleep(0.5)
- continue
- for po in errstore:
- if po.stderr not in poe:
- continue
- po.lock.acquire()
- try:
- if po.on_death_row:
- continue
- la = errstore[po]
- try:
- fd = po.stderr.fileno()
- except ValueError: # file is already closed
- time.sleep(0.5)
- continue
-
- try:
- l = os.read(fd, 1024)
- except OSError:
- time.sleep(0.5)
- continue
-
- if not l:
- continue
- tots = len(l)
- for lx in la:
- tots += len(lx)
- while tots > 1 << 20 and la:
- tots -= len(la.pop(0))
- la.append(l)
- finally:
- po.lock.release()
- t = syncdutils.Thread(target=tailer)
- t.start()
- cls.errhandler = t
-
- @classmethod
- def fork(cls):
- """fork wrapper that restarts errhandler thread in child"""
- pid = os.fork()
- if not pid:
- cls.init_errhandler()
- return pid
-
- def __init__(self, args, *a, **kw):
- """customizations for subprocess.Popen instantiation
-
- - 'close_fds' is taken to be the default
- - if child's stderr is chosen to be managed,
- register it with the error handler thread
- """
- self.args = args
- if 'close_fds' not in kw:
- kw['close_fds'] = True
- self.lock = threading.Lock()
- self.on_death_row = False
- self.elines = []
- try:
- sup(self, args, *a, **kw)
- except:
- ex = sys.exc_info()[1]
- if not isinstance(ex, OSError):
- raise
- raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
- (args[0], errno.errorcode[ex.errno],
- os.strerror(ex.errno)))
- if kw.get('stderr') == subprocess.PIPE:
- assert(getattr(self, 'errhandler', None))
- self.errstore[self] = []
-
- def errlog(self):
- """make a log about child's failure event"""
- logging.error(lf("command returned error",
- cmd=" ".join(self.args),
- error=self.returncode))
- lp = ''
-
- def logerr(l):
- logging.error(self.args[0] + "> " + l)
- for l in self.elines:
- ls = l.split('\n')
- ls[0] = lp + ls[0]
- lp = ls.pop()
- for ll in ls:
- logerr(ll)
- if lp:
- logerr(lp)
-
- def errfail(self):
- """fail nicely if child did not terminate with success"""
- self.errlog()
- syncdutils.finalize(exval=1)
-
- def terminate_geterr(self, fail_on_err=True):
- """kill child, finalize stderr harvesting (unregister
- from errhandler, set up .elines), fail on error if
- asked for
- """
- self.lock.acquire()
- try:
- self.on_death_row = True
- finally:
- self.lock.release()
- elines = self.errstore.pop(self)
- if self.poll() is None:
- self.terminate()
- if self.poll() is None:
- time.sleep(0.1)
- self.kill()
- self.wait()
- while True:
- if not select([self.stderr], [], [], 0.1)[0]:
- break
- b = os.read(self.stderr.fileno(), 1024)
- if b:
- elines.append(b)
- else:
- break
- self.stderr.close()
- self.elines = elines
- if fail_on_err and self.returncode != 0:
- self.errfail()
-
-
class Server(object):
"""singleton implemening those filesystem access primitives
@@ -776,6 +625,31 @@ class Server(object):
if isinstance(st, int):
blob = entry_pack_mkdir(
gfid, bname, e['mode'], e['uid'], e['gid'])
+ else:
+ # If gfid of a directory exists on slave but path based
+ # create is getting EEXIST. This means the directory is
+ # renamed in master but recorded as MKDIR during hybrid
+ # crawl. Get the directory path by reading the backend
+ # symlink and trying to rename to new name as said by
+ # master.
+ global slv_bricks
+ global slv_volume
+ global slv_host
+ if not slv_bricks:
+ slv_info = Volinfo (slv_volume, slv_host)
+ slv_bricks = slv_info.bricks
+ # Result of readlink would be of format as below.
+ # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
+ realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
+ ".glusterfs", gfid[0:2],
+ gfid[2:4], gfid))
+ realpath_parts = realpath.split('/')
+ src_pargfid = realpath_parts[-2]
+ src_basename = realpath_parts[-1]
+ src_entry = os.path.join(pfx, src_pargfid, src_basename)
+ logging.info(lf("Special case: rename on mkdir",
+ gfid=gfid, entry=repr(entry)))
+ rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
elif op == 'LINK':
slink = os.path.join(pfx, gfid)
st = lstat(slink)
@@ -1309,6 +1183,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def __init__(self, path):
self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
+ global slv_volume
+ global slv_host
+ slv_volume = self.volume
+ slv_host = self.host
+
def canonical_path(self):
return ':'.join([gethostbyname(self.host), self.volume])
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 2187ecd226b..e611b7b6ae5 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -16,14 +16,18 @@ import fcntl
import shutil
import logging
import socket
+import errno
+import threading
import subprocess
+from subprocess import PIPE
from threading import Lock, Thread as baseThread
from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode
from signal import signal, SIGTERM
import select as oselect
from os import waitpid as owaitpid
-import subprocess
+import xml.etree.ElementTree as XET
+from select import error as SelectError
from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
@@ -77,6 +81,15 @@ NEWLINE_ESCAPE_CHAR = "%0A"
PERCENTAGE_ESCAPE_CHAR = "%25"
+def sup(x, *a, **kw):
+ """a rubyesque "super" for python ;)
+
+ invoke caller method in parent class with given args.
+ """
+ return getattr(super(type(x), x),
+ sys._getframe(1).f_code.co_name)(*a, **kw)
+
+
def escape(s):
"""the chosen flavor of string escaping, used all over
to turn whatever data to creatable representation"""
@@ -648,3 +661,226 @@ def lf(event, **kwargs):
for k, v in kwargs.items():
msg += "\t{0}={1}".format(k, v)
return msg
+
+
+class Popen(subprocess.Popen):
+
+ """customized subclass of subprocess.Popen with a ring
+ buffer for children error output"""
+
+ @classmethod
+ def init_errhandler(cls):
+ """start the thread which handles children's error output"""
+ cls.errstore = {}
+
+ def tailer():
+ while True:
+ errstore = cls.errstore.copy()
+ try:
+ poe, _, _ = select(
+ [po.stderr for po in errstore], [], [], 1)
+ except (ValueError, SelectError):
+ # stderr is already closed wait for some time before
+ # checking next error
+ time.sleep(0.5)
+ continue
+ for po in errstore:
+ if po.stderr not in poe:
+ continue
+ po.lock.acquire()
+ try:
+ if po.on_death_row:
+ continue
+ la = errstore[po]
+ try:
+ fd = po.stderr.fileno()
+ except ValueError: # file is already closed
+ time.sleep(0.5)
+ continue
+
+ try:
+ l = os.read(fd, 1024)
+ except OSError:
+ time.sleep(0.5)
+ continue
+
+ if not l:
+ continue
+ tots = len(l)
+ for lx in la:
+ tots += len(lx)
+ while tots > 1 << 20 and la:
+ tots -= len(la.pop(0))
+ la.append(l)
+ finally:
+ po.lock.release()
+ t = Thread(target=tailer)
+ t.start()
+ cls.errhandler = t
+
+ @classmethod
+ def fork(cls):
+ """fork wrapper that restarts errhandler thread in child"""
+ pid = os.fork()
+ if not pid:
+ cls.init_errhandler()
+ return pid
+
+ def __init__(self, args, *a, **kw):
+ """customizations for subprocess.Popen instantiation
+
+ - 'close_fds' is taken to be the default
+ - if child's stderr is chosen to be managed,
+ register it with the error handler thread
+ """
+ self.args = args
+ if 'close_fds' not in kw:
+ kw['close_fds'] = True
+ self.lock = threading.Lock()
+ self.on_death_row = False
+ self.elines = []
+ try:
+ sup(self, args, *a, **kw)
+ except:
+ ex = sys.exc_info()[1]
+ if not isinstance(ex, OSError):
+ raise
+ raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
+ (args[0], errno.errorcode[ex.errno],
+ os.strerror(ex.errno)))
+ if kw.get('stderr') == subprocess.PIPE:
+ assert(getattr(self, 'errhandler', None))
+ self.errstore[self] = []
+
+ def errlog(self):
+ """make a log about child's failure event"""
+ logging.error(lf("command returned error",
+ cmd=" ".join(self.args),
+ error=self.returncode))
+ lp = ''
+
+ def logerr(l):
+ logging.error(self.args[0] + "> " + l)
+ for l in self.elines:
+ ls = l.split('\n')
+ ls[0] = lp + ls[0]
+ lp = ls.pop()
+ for ll in ls:
+ logerr(ll)
+ if lp:
+ logerr(lp)
+
+ def errfail(self):
+ """fail nicely if child did not terminate with success"""
+ self.errlog()
+ finalize(exval=1)
+
+ def terminate_geterr(self, fail_on_err=True):
+ """kill child, finalize stderr harvesting (unregister
+ from errhandler, set up .elines), fail on error if
+ asked for
+ """
+ self.lock.acquire()
+ try:
+ self.on_death_row = True
+ finally:
+ self.lock.release()
+ elines = self.errstore.pop(self)
+ if self.poll() is None:
+ self.terminate()
+ if self.poll() is None:
+ time.sleep(0.1)
+ self.kill()
+ self.wait()
+ while True:
+ if not select([self.stderr], [], [], 0.1)[0]:
+ break
+ b = os.read(self.stderr.fileno(), 1024)
+ if b:
+ elines.append(b)
+ else:
+ break
+ self.stderr.close()
+ self.elines = elines
+ if fail_on_err and self.returncode != 0:
+ self.errfail()
+
+
+class Volinfo(object):
+
+ def __init__(self, vol, host='localhost', prelude=[]):
+ po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
+ 'volume', 'info', vol],
+ stdout=PIPE, stderr=PIPE)
+ vix = po.stdout.read()
+ po.wait()
+ po.terminate_geterr()
+ vi = XET.fromstring(vix)
+ if vi.find('opRet').text != '0':
+ if prelude:
+ via = '(via %s) ' % prelude.join(' ')
+ else:
+ via = ' '
+ raise GsyncdError('getting volume info of %s%s '
+ 'failed with errorcode %s' %
+ (vol, via, vi.find('opErrno').text))
+ self.tree = vi
+ self.volume = vol
+ self.host = host
+
+ def get(self, elem):
+ return self.tree.findall('.//' + elem)
+
+ def is_tier(self):
+ return (self.get('typeStr')[0].text == 'Tier')
+
+ def is_hot(self, brickpath):
+ logging.debug('brickpath: ' + repr(brickpath))
+ return brickpath in self.hot_bricks
+
+ @property
+ @memoize
+ def bricks(self):
+ def bparse(b):
+ host, dirp = b.find("name").text.split(':', 2)
+ return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
+ return [bparse(b) for b in self.get('brick')]
+
+ @property
+ @memoize
+ def uuid(self):
+ ids = self.get('id')
+ if len(ids) != 1:
+ raise GsyncdError("volume info of %s obtained from %s: "
+ "ambiguous uuid" % (self.volume, self.host))
+ return ids[0].text
+
+ def replica_count(self, tier, hot):
+ if (tier and hot):
+ return int(self.get('hotBricks/hotreplicaCount')[0].text)
+ elif (tier and not hot):
+ return int(self.get('coldBricks/coldreplicaCount')[0].text)
+ else:
+ return int(self.get('replicaCount')[0].text)
+
+ def disperse_count(self, tier, hot):
+ if (tier and hot):
+ # Tiering doesn't support disperse volume as hot brick,
+ # hence no xml output, so returning 0. In case, if it's
+ # supported later, we should change here.
+ return 0
+ elif (tier and not hot):
+ return int(self.get('coldBricks/colddisperseCount')[0].text)
+ else:
+ return int(self.get('disperseCount')[0].text)
+
+ @property
+ @memoize
+ def hot_bricks(self):
+ return [b.text for b in self.get('hotBricks/brick')]
+
+ def get_hot_bricks_count(self, tier):
+ if (tier):
+ return int(self.get('hotBricks/hotbrickCount')[0].text)
+ else:
+ return 0