diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 207 | 
1 files changed, 188 insertions, 19 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 73102fbcb..52989fe28 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -5,13 +5,14 @@ import stat  import time  import fcntl  import errno +import types  import struct  import socket  import logging  import tempfile  import threading  import subprocess -from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR +from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR, ENOTEMPTY  from select import error as SelectError  from gconf import gconf @@ -19,7 +20,8 @@ import repce  from repce import RepceServer, RepceClient  from  master import gmaster_builder  import syncdutils -from syncdutils import GsyncdError, select, privileged, boolify +from syncdutils import GsyncdError, select, privileged, boolify, funcode +from syncdutils import umask, entry2pb, gauxpfx, errno_wrap  UrlRX  = re.compile('\A(\w+)://([^ *?[]*)\Z')  HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I) @@ -105,7 +107,18 @@ class _MetaXattr(object):              setattr(self, m, getattr(LXattr, m))          return getattr(self, meth) +class _MetaChangelog(object): +    def __getattr__(self, meth): +        from libgfchangelog import Changes as LChanges +        xmeth = [ m for m in dir(LChanges) if m[0] != '_' ] +        if not meth in xmeth: +            return +        for m in xmeth: +            setattr(self, m, getattr(LChanges, m)) +        return getattr(self, meth) +  Xattr = _MetaXattr() +Changes = _MetaChangelog()  class Popen(subprocess.Popen): @@ -245,10 +258,24 @@ class Server(object):      and classmethods and is used directly, without instantiation.)      """ -    GX_NSPACE = (privileged() and "trusted" or "system") + ".glusterfs" +    GX_NSPACE_PFX = (privileged() and "trusted" or "system") +    GX_NSPACE = GX_NSPACE_PFX + ".glusterfs"      NTV_FMTSTR = "!" + "B"*19 + "II"      FRGN_XTRA_FMT = "I"      FRGN_FMTSTR = NTV_FMTSTR + FRGN_XTRA_FMT +    GX_GFID_CANONICAL_LEN = 37 # canonical gfid len + '\0' + +    local_path = '' + +    @classmethod +    def _fmt_mknod(cls, l): +        return "!II%dsI%dsIII" % (cls.GX_GFID_CANONICAL_LEN, l+1) +    @classmethod +    def _fmt_mkdir(cls, l): +        return "!II%dsI%dsII" % (cls.GX_GFID_CANONICAL_LEN, l+1) +    @classmethod +    def _fmt_symlink(cls, l1, l2): +        return "!II%dsI%ds%ds" % (cls.GX_GFID_CANONICAL_LEN, l1+1, l2+1)      def _pathguard(f):          """decorator method that checks @@ -257,22 +284,21 @@ class Server(object):          point out of the managed tree          """ -        fc = getattr(f, 'func_code', None) -        if not fc: -            # python 3 -            fc = f.__code__ +        fc = funcode(f)          pi = list(fc.co_varnames).index('path')          def ff(*a):              path = a[pi]              ps = path.split('/')              if path[0] == '/' or '..' in ps:                  raise ValueError('unsafe path') +            a = list(a) +            a[pi] = os.path.join(a[0].local_path, path)              return f(*a)          return ff -    @staticmethod +    @classmethod      @_pathguard -    def entries(path): +    def entries(cls, path):          """directory entries in an array"""          # prevent symlinks being followed          if not stat.S_ISDIR(os.lstat(path).st_mode): @@ -371,6 +397,18 @@ class Server(object):                  raise      @classmethod +    def gfid(cls, gfidpath): +        return errno_wrap(Xattr.lgetxattr, [gfidpath, 'glusterfs.gfid', cls.GX_GFID_CANONICAL_LEN], [ENOENT]) + +    @classmethod +    def node_uuid(cls, path='.'): +        try: +            uuid_l = Xattr.lgetxattr_buf(path, '.'.join([cls.GX_NSPACE, 'node-uuid'])) +            return uuid_l[:-1].split(' ') +        except OSError: +            raise + +    @classmethod      def xtime_vec(cls, path, *uuids):          """vectored version of @xtime @@ -402,9 +440,96 @@ class Server(object):          for u,t in mark_dct.items():              cls.set_xtime(path, u, t) -    @staticmethod +    @classmethod +    def entry_ops(cls, entries): +        pfx = gauxpfx() +        logging.debug('entries: %s' % repr(entries)) +        # regular file +        def entry_pack_reg(gf, bn, st): +            blen = len(bn) +            mo = st['mode'] +            return struct.pack(cls._fmt_mknod(blen), +                               st['uid'], st['gid'], +                               gf, mo, bn, +                               stat.S_IMODE(mo), 0, umask()) +        # mkdir +        def entry_pack_mkdir(gf, bn, st): +            blen = len(bn) +            mo = st['mode'] +            return struct.pack(cls._fmt_mkdir(blen), +                               st['uid'], st['gid'], +                               gf, mo, bn, +                               stat.S_IMODE(mo), umask()) +        #symlink +        def entry_pack_symlink(gf, bn, lnk, st): +            blen = len(bn) +            llen = len(lnk) +            return struct.pack(cls._fmt_symlink(blen, llen), +                               st['uid'], st['gid'], +                               gf, st['mode'], bn, lnk) +        def entry_purge(entry, gfid): +            # This is an extremely racy code and needs to be fixed ASAP. +            # The GFID check here is to be sure that the pargfid/bname +            # to be purged is the GFID gotten from the changelog. +            # (a stat(changelog_gfid) would also be valid here) +            # The race here is between the GFID check and the purge. +            disk_gfid = cls.gfid(entry) +            if isinstance(disk_gfid, int): +                return +            if not gfid == disk_gfid: +                return +            er = errno_wrap(os.unlink, [entry], [ENOENT, EISDIR]) +            if isinstance(er, int): +                if er == EISDIR: +                    er = errno_wrap(os.rmdir, [entry], [ENOENT, ENOTEMPTY]) +                    if er == ENOTEMPTY: +                        return er +        for e in entries: +            blob = None +            op = e['op'] +            gfid = e['gfid'] +            entry = e['entry'] +            (pg, bname) = entry2pb(entry) +            if op in ['RMDIR', 'UNLINK']: +                while True: +                    er = entry_purge(entry, gfid) +                    if isinstance(er, int): +                        time.sleep(1) +                    else: +                        break +            elif op == 'CREATE': +                blob = entry_pack_reg(gfid, bname, e['stat']) +            elif op == 'MKDIR': +                blob = entry_pack_mkdir(gfid, bname, e['stat']) +            elif op == 'LINK': +                errno_wrap(os.link, [os.path.join(pfx, gfid), entry], [ENOENT, EEXIST]) +            elif op == 'SYMLINK': +                blob = entry_pack_symlink(gfid, bname, e['link'], e['stat']) +            elif op == 'RENAME': +                en = e['entry1'] +                errno_wrap(os.rename, [entry, en], [ENOENT, EEXIST]) +            if blob: +                errno_wrap(Xattr.lsetxattr_l, [pg, 'glusterfs.gfid.newfile', blob], [ENOENT, EEXIST]) + +    @classmethod +    def changelog_register(cls, cl_brick, cl_dir, cl_log, cl_level, retries = 0): +        Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries) + +    @classmethod +    def changelog_scan(cls): +        Changes.cl_scan() + +    @classmethod +    def changelog_getchanges(cls): +        return Changes.cl_getchanges() + +    @classmethod +    def changelog_done(cls, clfile): +        Changes.cl_done(clfile) + +    @classmethod      @_pathguard -    def setattr(path, adct): +    def setattr(cls, path, adct):          """set file attributes          @adct is a dict, where 'own', 'mode' and 'times' @@ -537,10 +662,10 @@ class SlaveRemote(object):              raise GsyncdError("no files to sync")          logging.debug("files: " + ", ".join(files))          argv = gconf.rsync_command.split() + \ -               ['-aR0', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \ +               ['-avR0', '--inplace', '--files-from=-', '--super','--stats', '--numeric-ids', '--no-implied-dirs'] + \                 gconf.rsync_options.split() + (boolify(gconf.use_rsync_xattrs) and ['--xattrs'] or []) + \                 ['.'] + list(args) -        po = Popen(argv, stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) +        po = Popen(argv, stdin=subprocess.PIPE,stderr=subprocess.PIPE)          for f in files:              po.stdin.write(f)              po.stdin.write('\0') @@ -685,7 +810,8 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      def can_connect_to(self, remote):          """determine our position in the connectibility matrix""" -        return True +        return not remote or \ +               (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER))      class Mounter(object):          """Abstract base class for mounter backends""" @@ -864,6 +990,10 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          sup(self, *a, **kw)          self.slavedir = "/proc/%d/cwd" % self.server.pid() +    def gmaster_instantiate_tuple(self, slave): +        """return a tuple of the 'one shot' and the 'main crawl' class instance""" +        return (gmaster_builder('xsync')(self, slave), gmaster_builder()(self, slave)) +      def service_loop(self, *args):          """enter service loop @@ -873,7 +1003,41 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          - else do that's what's inherited          """          if args: -            gmaster_builder()(self, args[0]).crawl_loop() +            slave = args[0] +            if gconf.local_path: +                class brickserver(FILE.FILEServer): +                    local_path = gconf.local_path +                    aggregated = self.server +                    @classmethod +                    def entries(cls, path): +                        e = super(brickserver, cls).entries(path) +                        # on the brick don't mess with /.glusterfs +                        if path == '.': +                            try: +                                e.remove('.glusterfs') +                            except ValueError: +                                pass +                        return e +                if gconf.slave_id: +                    # define {,set_}xtime in slave, thus preempting +                    # the call to remote, so that it takes data from +                    # the local brick +                    slave.server.xtime = types.MethodType(lambda _self, path, uuid: brickserver.xtime(path, uuid + '.' + gconf.slave_id), slave.server) +                    slave.server.set_xtime = types.MethodType(lambda _self, path, uuid, mark: brickserver.set_xtime(path, uuid + '.' + gconf.slave_id, mark), slave.server) +                (g1, g2) = self.gmaster_instantiate_tuple(slave) +                g1.master.server = brickserver +                g2.master.server = brickserver +            else: +                (g1, g2) = self.gmaster_instantiate_tuple(slave) +                g1.master.server.aggregated = gmaster.master.server +                g2.master.server.aggregated = gmaster.master.server +            # bad bad bad: bad way to do things like this +            # need to make this elegant +            # register the crawlers and start crawling +            g1.register() +            g2.register() +            g1.crawlwrap(oneshot=True) +            g2.crawlwrap()          else:              sup(self, *args) @@ -893,13 +1057,18 @@ class SSH(AbstractUrl, SlaveRemote):                                            '^((?:%s@)?%s):(.+)' % tuple([ r.pattern for r in (UserRX, HostRX) ]))          self.inner_rsc = parse_url(inner_url) -    def canonical_path(self): -        m = re.match('([^@]+)@(.+)', self.remote_addr) +    @staticmethod +    def parse_ssh_address(addr): +        m = re.match('([^@]+)@(.+)', addr)          if m:              u, h = m.groups()          else: -            u, h = syncdutils.getusername(), self.remote_addr -        remote_addr = '@'.join([u, gethostbyname(h)]) +            u, h = syncdutils.getusername(), addr +        return {'user': u, 'host': h} + +    def canonical_path(self): +        rap = self.parse_ssh_address(self.remote_addr) +        remote_addr = '@'.join([rap['user'], gethostbyname(rap['host'])])          return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)])      def can_connect_to(self, remote):  | 
