diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 189 | 
1 files changed, 34 insertions, 155 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 0ca023cd8c5..a9810ae325b 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -15,18 +15,15 @@ import stat  import time  import signal  import fcntl -import errno  import types  import struct  import socket  import logging  import tempfile -import threading  import subprocess  import errno  from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES  from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM -from select import error as SelectError  import shutil  from gconf import gconf @@ -43,7 +40,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION  from syncdutils import GX_GFID_CANONICAL_LEN  from gsyncdstatus import GeorepStatus  from syncdutils import get_master_and_slave_data_from_args -from syncdutils import mntpt_list, lf +from syncdutils import mntpt_list, lf, Popen, sup, Volinfo  from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt  UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') @@ -52,14 +49,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")  ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') -def sup(x, *a, **kw): -    """a rubyesque "super" for python ;) - -    invoke caller method in parent class with given args. -    """ -    return getattr(super(type(x), x), -                   sys._getframe(1).f_code.co_name)(*a, **kw) - +slv_volume = None +slv_host = None +slv_bricks = None  def desugar(ustr):      """transform sugared url strings to standard <scheme>://<urlbody> form @@ -114,149 +106,6 @@ def parse_url(ustr):      return getattr(this, sch.upper())(path) -class Popen(subprocess.Popen): - -    """customized subclass of subprocess.Popen with a ring -    buffer for children error output""" - -    @classmethod -    def init_errhandler(cls): -        """start the thread which handles children's error output""" -        cls.errstore = {} - -        def tailer(): -            while True: -                errstore = cls.errstore.copy() -                try: -                    poe, _, _ = select( -                        [po.stderr for po in errstore], [], [], 1) -                except (ValueError, SelectError): -                    # stderr is already closed wait for some time before -                    # checking next error -                    time.sleep(0.5) -                    continue -                for po in errstore: -                    if po.stderr not in poe: -                        continue -                    po.lock.acquire() -                    try: -                        if po.on_death_row: -                            continue -                        la = errstore[po] -                        try: -                            fd = po.stderr.fileno() -                        except ValueError:  # file is already closed -                            time.sleep(0.5) -                            continue - -                        try: -                            l = os.read(fd, 1024) -                        except OSError: -                            time.sleep(0.5) -                            continue - -                        if not l: -                            continue -                        tots = len(l) -                        for lx in la: -                            tots += len(lx) -                        while tots > 1 << 20 and la: -                            tots -= len(la.pop(0)) -                        la.append(l) -                    finally: -                        po.lock.release() -        t = syncdutils.Thread(target=tailer) -        t.start() -        cls.errhandler = t - -    @classmethod -    def fork(cls): -        """fork wrapper that restarts errhandler thread in child""" -        pid = os.fork() -        if not pid: -            cls.init_errhandler() -        return pid - -    def __init__(self, args, *a, **kw): -        """customizations for subprocess.Popen instantiation - -        - 'close_fds' is taken to be the default -        - if child's stderr is chosen to be managed, -          register it with the error handler thread -        """ -        self.args = args -        if 'close_fds' not in kw: -            kw['close_fds'] = True -        self.lock = threading.Lock() -        self.on_death_row = False -        self.elines = [] -        try: -            sup(self, args, *a, **kw) -        except: -            ex = sys.exc_info()[1] -            if not isinstance(ex, OSError): -                raise -            raise GsyncdError("""execution of "%s" failed with %s (%s)""" % -                              (args[0], errno.errorcode[ex.errno], -                               os.strerror(ex.errno))) -        if kw.get('stderr') == subprocess.PIPE: -            assert(getattr(self, 'errhandler', None)) -            self.errstore[self] = [] - -    def errlog(self): -        """make a log about child's failure event""" -        logging.error(lf("command returned error", -                         cmd=" ".join(self.args), -                         error=self.returncode)) -        lp = '' - -        def logerr(l): -            logging.error(self.args[0] + "> " + l) -        for l in self.elines: -            ls = l.split('\n') -            ls[0] = lp + ls[0] -            lp = ls.pop() -            for ll in ls: -                logerr(ll) -        if lp: -            logerr(lp) - -    def errfail(self): -        """fail nicely if child did not terminate with success""" -        self.errlog() -        syncdutils.finalize(exval=1) - -    def terminate_geterr(self, fail_on_err=True): -        """kill child, finalize stderr harvesting (unregister -        from errhandler, set up .elines), fail on error if -        asked for -        """ -        self.lock.acquire() -        try: -            self.on_death_row = True -        finally: -            self.lock.release() -        elines = self.errstore.pop(self) -        if self.poll() is None: -            self.terminate() -            if self.poll() is None: -                time.sleep(0.1) -                self.kill() -                self.wait() -        while True: -            if not select([self.stderr], [], [], 0.1)[0]: -                break -            b = os.read(self.stderr.fileno(), 1024) -            if b: -                elines.append(b) -            else: -                break -        self.stderr.close() -        self.elines = elines -        if fail_on_err and self.returncode != 0: -            self.errfail() - -  class Server(object):      """singleton implemening those filesystem access primitives @@ -776,6 +625,31 @@ class Server(object):                  if isinstance(st, int):                      blob = entry_pack_mkdir(                          gfid, bname, e['mode'], e['uid'], e['gid']) +                else: +                    # If gfid of a directory exists on slave but path based +                    # create is getting EEXIST. This means the directory is +                    # renamed in master but recorded as MKDIR during hybrid +                    # crawl. Get the directory path by reading the backend +                    # symlink and trying to rename to new name as said by +                    # master. +                    global slv_bricks +                    global slv_volume +                    global slv_host +                    if not slv_bricks: +                        slv_info = Volinfo (slv_volume, slv_host) +                        slv_bricks = slv_info.bricks +                    # Result of readlink would be of format as below. +                    # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename" +                    realpath = os.readlink(os.path.join(slv_bricks[0]['dir'], +                                                        ".glusterfs", gfid[0:2], +                                                        gfid[2:4], gfid)) +                    realpath_parts = realpath.split('/') +                    src_pargfid = realpath_parts[-2] +                    src_basename = realpath_parts[-1] +                    src_entry = os.path.join(pfx, src_pargfid, src_basename) +                    logging.info(lf("Special case: rename on mkdir", +                                   gfid=gfid, entry=repr(entry))) +                    rename_with_disk_gfid_confirmation(gfid, src_entry, entry)              elif op == 'LINK':                  slink = os.path.join(pfx, gfid)                  st = lstat(slink) @@ -1309,6 +1183,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      def __init__(self, path):          self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) +        global slv_volume +        global slv_host +        slv_volume = self.volume +        slv_host = self.host +      def canonical_path(self):          return ':'.join([gethostbyname(self.host), self.volume])  | 
