diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 189 |
1 files changed, 34 insertions, 155 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 0ca023cd8c5..a9810ae325b 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -15,18 +15,15 @@ import stat import time import signal import fcntl -import errno import types import struct import socket import logging import tempfile -import threading import subprocess import errno from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM -from select import error as SelectError import shutil from gconf import gconf @@ -43,7 +40,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION from syncdutils import GX_GFID_CANONICAL_LEN from gsyncdstatus import GeorepStatus from syncdutils import get_master_and_slave_data_from_args -from syncdutils import mntpt_list, lf +from syncdutils import mntpt_list, lf, Popen, sup, Volinfo from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') @@ -52,14 +49,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+") ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') -def sup(x, *a, **kw): - """a rubyesque "super" for python ;) - - invoke caller method in parent class with given args. - """ - return getattr(super(type(x), x), - sys._getframe(1).f_code.co_name)(*a, **kw) - +slv_volume = None +slv_host = None +slv_bricks = None def desugar(ustr): """transform sugared url strings to standard <scheme>://<urlbody> form @@ -114,149 +106,6 @@ def parse_url(ustr): return getattr(this, sch.upper())(path) -class Popen(subprocess.Popen): - - """customized subclass of subprocess.Popen with a ring - buffer for children error output""" - - @classmethod - def init_errhandler(cls): - """start the thread which handles children's error output""" - cls.errstore = {} - - def tailer(): - while True: - errstore = cls.errstore.copy() - try: - poe, _, _ = select( - [po.stderr for po in errstore], [], [], 1) - except (ValueError, SelectError): - # stderr is already closed wait for some time before - # checking next error - time.sleep(0.5) - continue - for po in errstore: - if po.stderr not in poe: - continue - po.lock.acquire() - try: - if po.on_death_row: - continue - la = errstore[po] - try: - fd = po.stderr.fileno() - except ValueError: # file is already closed - time.sleep(0.5) - continue - - try: - l = os.read(fd, 1024) - except OSError: - time.sleep(0.5) - continue - - if not l: - continue - tots = len(l) - for lx in la: - tots += len(lx) - while tots > 1 << 20 and la: - tots -= len(la.pop(0)) - la.append(l) - finally: - po.lock.release() - t = syncdutils.Thread(target=tailer) - t.start() - cls.errhandler = t - - @classmethod - def fork(cls): - """fork wrapper that restarts errhandler thread in child""" - pid = os.fork() - if not pid: - cls.init_errhandler() - return pid - - def __init__(self, args, *a, **kw): - """customizations for subprocess.Popen instantiation - - - 'close_fds' is taken to be the default - - if child's stderr is chosen to be managed, - register it with the error handler thread - """ - self.args = args - if 'close_fds' not in kw: - kw['close_fds'] = True - self.lock = threading.Lock() - self.on_death_row = False - self.elines = [] - try: - sup(self, args, *a, **kw) - except: - ex = sys.exc_info()[1] - if not isinstance(ex, OSError): - raise - raise GsyncdError("""execution of "%s" failed with %s (%s)""" % - (args[0], errno.errorcode[ex.errno], - os.strerror(ex.errno))) - if kw.get('stderr') == subprocess.PIPE: - assert(getattr(self, 'errhandler', None)) - self.errstore[self] = [] - - def errlog(self): - """make a log about child's failure event""" - logging.error(lf("command returned error", - cmd=" ".join(self.args), - error=self.returncode)) - lp = '' - - def logerr(l): - logging.error(self.args[0] + "> " + l) - for l in self.elines: - ls = l.split('\n') - ls[0] = lp + ls[0] - lp = ls.pop() - for ll in ls: - logerr(ll) - if lp: - logerr(lp) - - def errfail(self): - """fail nicely if child did not terminate with success""" - self.errlog() - syncdutils.finalize(exval=1) - - def terminate_geterr(self, fail_on_err=True): - """kill child, finalize stderr harvesting (unregister - from errhandler, set up .elines), fail on error if - asked for - """ - self.lock.acquire() - try: - self.on_death_row = True - finally: - self.lock.release() - elines = self.errstore.pop(self) - if self.poll() is None: - self.terminate() - if self.poll() is None: - time.sleep(0.1) - self.kill() - self.wait() - while True: - if not select([self.stderr], [], [], 0.1)[0]: - break - b = os.read(self.stderr.fileno(), 1024) - if b: - elines.append(b) - else: - break - self.stderr.close() - self.elines = elines - if fail_on_err and self.returncode != 0: - self.errfail() - - class Server(object): """singleton implemening those filesystem access primitives @@ -776,6 +625,31 @@ class Server(object): if isinstance(st, int): blob = entry_pack_mkdir( gfid, bname, e['mode'], e['uid'], e['gid']) + else: + # If gfid of a directory exists on slave but path based + # create is getting EEXIST. This means the directory is + # renamed in master but recorded as MKDIR during hybrid + # crawl. Get the directory path by reading the backend + # symlink and trying to rename to new name as said by + # master. + global slv_bricks + global slv_volume + global slv_host + if not slv_bricks: + slv_info = Volinfo (slv_volume, slv_host) + slv_bricks = slv_info.bricks + # Result of readlink would be of format as below. + # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" + realpath = os.readlink(os.path.join(slv_bricks[0]['dir'], + ".glusterfs", gfid[0:2], + gfid[2:4], gfid)) + realpath_parts = realpath.split('/') + src_pargfid = realpath_parts[-2] + src_basename = realpath_parts[-1] + src_entry = os.path.join(pfx, src_pargfid, src_basename) + logging.info(lf("Special case: rename on mkdir", + gfid=gfid, entry=repr(entry))) + rename_with_disk_gfid_confirmation(gfid, src_entry, entry) elif op == 'LINK': slink = os.path.join(pfx, gfid) st = lstat(slink) @@ -1309,6 +1183,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def __init__(self, path): self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) + global slv_volume + global slv_host + slv_volume = self.volume + slv_host = self.host + def canonical_path(self): return ':'.join([gethostbyname(self.host), self.volume]) |