diff options
Diffstat (limited to 'geo-replication/syncdaemon/resource.py')
| -rw-r--r-- | geo-replication/syncdaemon/resource.py | 1389 | 
1 files changed, 605 insertions, 784 deletions
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index c617c7b312e..f5a19629e7a 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -13,25 +13,24 @@ import os  import sys  import stat  import time -import signal  import fcntl  import types  import struct -import socket  import logging  import tempfile  import subprocess -import errno  from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES  from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM -import shutil +import errno + +from rconf import rconf +import gsyncdconfig as gconf -from gconf import gconf  import repce  from repce import RepceServer, RepceClient  from master import gmaster_builder  import syncdutils -from syncdutils import GsyncdError, select, privileged, boolify, funcode +from syncdutils import GsyncdError, select, privileged, funcode  from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat  from syncdutils import NoStimeAvailable, PartialHistoryAvailable  from syncdutils import ChangelogException, ChangelogHistoryNotAvailable @@ -39,13 +38,9 @@ from syncdutils import get_changelog_log_level, get_rsync_version  from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION  from syncdutils import GX_GFID_CANONICAL_LEN  from gsyncdstatus import GeorepStatus -from syncdutils import get_master_and_slave_data_from_args  from syncdutils import mntpt_list, lf, Popen, sup, Volinfo  from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt -UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z') -HostRX = re.compile('[a-zA-Z\d](?:[a-zA-Z\d.-]*[a-zA-Z\d])?', re.I) -UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")  ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP') @@ -53,58 +48,6 @@ slv_volume = None  slv_host = None  slv_bricks = None -def desugar(ustr): -    """transform sugared url strings to standard <scheme>://<urlbody> form - -    parsing logic enforces the constraint that sugared forms should contatin -    a ':' or a '/', which ensures that sugared urls do not conflict with -    gluster volume names. -    """ -    m = re.match('([^:]*):(.*)', ustr) -    if m: -        if not m.groups()[0]: -            return "gluster://localhost" + ustr -        elif '@' in m.groups()[0] or re.search('[:/]', m.groups()[1]): -            return "ssh://" + ustr -        else: -            return "gluster://" + ustr -    else: -        if ustr[0] != '/': -            raise GsyncdError("cannot resolve sugared url '%s'" % ustr) -        ap = os.path.normpath(ustr) -        if ap.startswith('//'): -            ap = ap[1:] -        return "file://" + ap - - -def gethostbyname(hnam): -    """gethostbyname wrapper""" -    try: -        return socket.gethostbyname(hnam) -    except socket.gaierror: -        ex = sys.exc_info()[1] -        raise GsyncdError("failed to resolve %s: %s" % -                          (hnam, ex.strerror)) - - -def parse_url(ustr): -    """instantiate an url object by scheme-to-class dispatch - -    The url classes taken into consideration are the ones in -    this module whose names are full-caps. -    """ -    m = UrlRX.match(ustr) -    if not m: -        ustr = desugar(ustr) -    m = UrlRX.match(ustr) -    if not m: -        raise GsyncdError("malformed url") -    sch, path = m.groups() -    this = sys.modules[__name__] -    if not hasattr(this, sch.upper()): -        raise GsyncdError("unknown url scheme " + sch) -    return getattr(this, sch.upper())(path) -  class Server(object): @@ -149,14 +92,14 @@ class Server(object):          fc = funcode(f)          pi = list(fc.co_varnames).index('path') -        def ff(*a): -            path = a[pi] +        def ff(*args): +            path = args[pi]              ps = path.split('/')              if path[0] == '/' or '..' in ps:                  raise ValueError('unsafe path') -            a = list(a) -            a[pi] = os.path.join(a[0].local_path, path) -            return f(*a) +            args = list(args) +            args[pi] = os.path.join(args[0].local_path, path) +            return f(*args)          return ff      @classmethod @@ -493,7 +436,8 @@ class Server(object):                  else:                      en = e['entry']                  disk_gfid = get_gfid_from_mnt(en) -                if isinstance(disk_gfid, basestring) and e['gfid'] != disk_gfid: +                if isinstance(disk_gfid, basestring) and \ +                   e['gfid'] != disk_gfid:                      slv_entry_info['gfid_mismatch'] = True                      st = lstat(en)                      if not isinstance(st, int): @@ -560,7 +504,6 @@ class Server(object):                                   [ENOENT, EEXIST], [ESTALE, EBUSY])              collect_failure(e, cmd_ret) -          for e in entries:              blob = None              op = e['op'] @@ -636,19 +579,21 @@ class Server(object):                      global slv_volume                      global slv_host                      if not slv_bricks: -                        slv_info = Volinfo (slv_volume, slv_host) +                        slv_info = Volinfo(slv_volume, slv_host)                          slv_bricks = slv_info.bricks                      # Result of readlink would be of format as below.                      # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"                      realpath = os.readlink(os.path.join(slv_bricks[0]['dir'], -                                                        ".glusterfs", gfid[0:2], -                                                        gfid[2:4], gfid)) +                                                        ".glusterfs", +                                                        gfid[0:2], +                                                        gfid[2:4], +                                                        gfid))                      realpath_parts = realpath.split('/')                      src_pargfid = realpath_parts[-2]                      src_basename = realpath_parts[-1]                      src_entry = os.path.join(pfx, src_pargfid, src_basename)                      logging.info(lf("Special case: rename on mkdir", -                                   gfid=gfid, entry=repr(entry))) +                                    gfid=gfid, entry=repr(entry)))                      rename_with_disk_gfid_confirmation(gfid, src_entry, entry)              elif op == 'LINK':                  slink = os.path.join(pfx, gfid) @@ -735,7 +680,7 @@ class Server(object):                                       [pg, 'glusterfs.gfid.newfile', blob],                                       [EEXIST, ENOENT],                                       [ESTALE, EINVAL, EBUSY]) -                failed = collect_failure(e, cmd_ret) +                collect_failure(e, cmd_ret)                  # If UID/GID is different than zero that means we are trying                  # create Entry with different UID/GID. Create Entry with @@ -852,274 +797,239 @@ class Server(object):          return 1.0 -class SlaveLocal(object): +class Mounter(object): -    """mix-in class to implement some factes of a slave server - -    ("mix-in" is sort of like "abstract class", ie. it's not -    instantiated just included in the ancesty DAG. I use "mix-in" -    to indicate that it's not used as an abstract base class, -    rather just taken in to implement additional functionality -    on the basis of the assumed availability of certain interfaces.) -    """ +    """Abstract base class for mounter backends""" -    def can_connect_to(self, remote): -        """determine our position in the connectibility matrix""" -        return not remote - -    def service_loop(self): -        """start a RePCe server serving self's server - -        stop servicing if a timeout is configured and got no -        keep-alime in that inteval -        """ - -        if boolify(gconf.use_rsync_xattrs) and not privileged(): -            raise GsyncdError( -                "using rsync for extended attributes is not supported") - -        repce = RepceServer( -            self.server, sys.stdin, sys.stdout, int(gconf.sync_jobs)) -        t = syncdutils.Thread(target=lambda: (repce.service_loop(), -                                              syncdutils.finalize())) -        t.start() -        logging.info("slave listening") -        if gconf.timeout and int(gconf.timeout) > 0: -            while True: -                lp = self.server.last_keep_alive -                time.sleep(int(gconf.timeout)) -                if lp == self.server.last_keep_alive: -                    logging.info( -                        lf("connection inactive, stopping", -                           timeout=int(gconf.timeout))) -                    break -        else: -            select((), (), ()) +    def __init__(self, params): +        self.params = params +        self.mntpt = None +    @classmethod +    def get_glusterprog(cls): +        return os.path.join(gconf.get("gluster-command-dir"), +                            cls.glusterprog) + +    def umount_l(self, d): +        """perform lazy umount""" +        po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE) +        po.wait() +        return po -class SlaveRemote(object): +    @classmethod +    def make_umount_argv(cls, d): +        raise NotImplementedError -    """mix-in class to implement an interface to a remote slave""" +    def make_mount_argv(self, label=None): +        raise NotImplementedError -    def connect_remote(self, rargs=[], **opts): -        """connects to a remote slave +    def cleanup_mntpt(self, *a): +        pass -        Invoke an auxiliary utility (slave gsyncd, possibly wrapped) -        which sets up the connection and set up a RePCe client to -        communicate throuh its stdio. -        """ -        slave = opts.get('slave', self.url) -        extra_opts = [] -        so = getattr(gconf, 'session_owner', None) -        if so: -            extra_opts += ['--session-owner', so] -        li = getattr(gconf, 'local_id', None) -        if li: -            extra_opts += ['--local-id', li] -        ln = getattr(gconf, 'local_node', None) -        if ln: -            extra_opts += ['--local-node', ln] -        if boolify(gconf.use_rsync_xattrs): -            extra_opts.append('--use-rsync-xattrs') -        if boolify(gconf.access_mount): -            extra_opts.append('--access-mount') -        po = Popen(rargs + gconf.remote_gsyncd.split() + extra_opts + -                   ['-N', '--listen', '--timeout', str(gconf.timeout), -                    slave], -                   stdin=subprocess.PIPE, stdout=subprocess.PIPE, -                   stderr=subprocess.PIPE) -        gconf.transport = po -        return self.start_fd_client(po.stdout, po.stdin, **opts) +    def handle_mounter(self, po): +        po.wait() -    def start_fd_client(self, i, o, **opts): -        """set up RePCe client, handshake with server +    def inhibit(self, label): +        """inhibit a gluster filesystem -        It's cut out as a separate method to let -        subclasses hook into client startup +        Mount glusterfs over a temporary mountpoint, +        change into the mount, and lazy unmount the +        filesystem.          """ -        self.server = RepceClient(i, o) -        rv = self.server.__version__() -        exrv = {'proto': repce.repce_version, 'object': Server.version()} -        da0 = (rv, exrv) -        da1 = ({}, {}) -        for i in range(2): -            for k, v in da0[i].iteritems(): -                da1[i][k] = int(v) -        if da1[0] != da1[1]: -            raise GsyncdError( -                "RePCe major version mismatch: local %s, remote %s" % -                (exrv, rv)) - -    def rsync(self, files, *args, **kw): -        """invoke rsync""" -        if not files: -            raise GsyncdError("no files to sync") -        logging.debug("files: " + ", ".join(files)) - -        extra_rsync_flags = [] -        # Performance flag, --ignore-missing-args, if rsync version is -        # greater than 3.1.0 then include this flag. -        if boolify(gconf.rsync_opt_ignore_missing_args) and \ -           get_rsync_version(gconf.rsync_command) >= "3.1.0": -            extra_rsync_flags = ["--ignore-missing-args"] - -        argv = gconf.rsync_command.split() + \ -            ['-aR0', '--inplace', '--files-from=-', '--super', -             '--stats', '--numeric-ids', '--no-implied-dirs'] + \ -            (boolify(gconf.rsync_opt_existing) and ['--existing'] or []) + \ -            gconf.rsync_options.split() + \ -            (boolify(gconf.sync_xattrs) and ['--xattrs'] or []) + \ -            (boolify(gconf.sync_acls) and ['--acls'] or []) + \ -            extra_rsync_flags + \ -            ['.'] + list(args) - -        log_rsync_performance = boolify(gconf.configinterface.get_realtime( -            "log_rsync_performance", default_value=False)) +        access_mount = gconf.get("access-mount") +        if rconf.args.subcmd == "slave": +            access_mount = gconf.get("slave-access-mount") + +        mpi, mpo = os.pipe() +        mh = Popen.fork() +        if mh: +            # Parent +            os.close(mpi) +            fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC) +            d = None +            margv = self.make_mount_argv(label) +            if self.mntpt: +                # mntpt is determined pre-mount +                d = self.mntpt +                os.write(mpo, d + '\0') +            po = Popen(margv, **self.mountkw) +            self.handle_mounter(po) +            po.terminate_geterr() +            logging.debug('auxiliary glusterfs mount in place') +            if not d: +                # mntpt is determined during mount +                d = self.mntpt +                os.write(mpo, d + '\0') +            os.write(mpo, 'M') +            t = syncdutils.Thread(target=lambda: os.chdir(d)) +            t.start() +            tlim = rconf.starttime + gconf.get("connection-timeout") +            while True: +                if not t.isAlive(): +                    break -        if log_rsync_performance: -            # use stdout=PIPE only when log_rsync_performance enabled -            # Else rsync will write to stdout and nobody is their -            # to consume. If PIPE is full rsync hangs. -            po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, -                       stderr=subprocess.PIPE) +                if time.time() >= tlim: +                    syncdutils.finalize(exval=1) +                time.sleep(1) +            os.close(mpo) +            _, rv = syncdutils.waitpid(mh, 0) +            if rv: +                rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ +                     (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) +                logging.warn(lf('stale mount possibly left behind', +                                path=d)) +                raise GsyncdError("cleaning up temp mountpoint %s " +                                  "failed with status %d" % +                                  (d, rv))          else: -            po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE) +            rv = 0 +            try: +                os.setsid() +                os.close(mpo) +                mntdata = '' +                while True: +                    c = os.read(mpi, 1) +                    if not c: +                        break +                    mntdata += c +                if mntdata: +                    mounted = False +                    if mntdata[-1] == 'M': +                        mntdata = mntdata[:-1] +                        assert(mntdata) +                        mounted = True +                    assert(mntdata[-1] == '\0') +                    mntpt = mntdata[:-1] +                    assert(mntpt) +                    if mounted and not access_mount: +                        po = self.umount_l(mntpt) +                        po.terminate_geterr(fail_on_err=False) +                        if po.returncode != 0: +                            po.errlog() +                            rv = po.returncode +                    if not access_mount: +                        self.cleanup_mntpt(mntpt) +            except: +                logging.exception('mount cleanup failure:') +                rv = 200 +            os._exit(rv) +        logging.debug('auxiliary glusterfs mount prepared') + + +class DirectMounter(Mounter): + +    """mounter backend which calls mount(8), umount(8) directly""" + +    mountkw = {'stderr': subprocess.PIPE} +    glusterprog = 'glusterfs' -        for f in files: -            po.stdin.write(f) -            po.stdin.write('\0') +    @staticmethod +    def make_umount_argv(d): +        return ['umount', '-l', d] -        stdout, stderr = po.communicate() +    def make_mount_argv(self, label=None): +        self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-') +        mntpt_list.append(self.mntpt) +        return [self.get_glusterprog()] + \ +            ['--' + p for p in self.params] + [self.mntpt] -        if kw.get("log_err", False): -            for errline in stderr.strip().split("\n")[:-1]: -                logging.error(lf("SYNC Error", -                                 sync_engine="Rsync", -                                 error=errline)) +    def cleanup_mntpt(self, mntpt=None): +        if not mntpt: +            mntpt = self.mntpt +        errno_wrap(os.rmdir, [mntpt], [ENOENT, EBUSY]) -        if log_rsync_performance: -            rsync_msg = [] -            for line in stdout.split("\n"): -                if line.startswith("Number of files:") or \ -                   line.startswith("Number of regular files transferred:") or \ -                   line.startswith("Total file size:") or \ -                   line.startswith("Total transferred file size:") or \ -                   line.startswith("Literal data:") or \ -                   line.startswith("Matched data:") or \ -                   line.startswith("Total bytes sent:") or \ -                   line.startswith("Total bytes received:") or \ -                   line.startswith("sent "): -                    rsync_msg.append(line) -            logging.info(lf("rsync performance", -                            data=", ".join(rsync_msg))) -        return po +class MountbrokerMounter(Mounter): -    def tarssh(self, files, slaveurl, log_err=False): -        """invoke tar+ssh -        -z (compress) can be use if needed, but omitting it now -        as it results in weird error (tar+ssh errors out (errcode: 2) -        """ -        if not files: -            raise GsyncdError("no files to sync") -        logging.debug("files: " + ", ".join(files)) -        (host, rdir) = slaveurl.split(':') -        tar_cmd = ["tar"] + \ -            ["--sparse", "-cf", "-", "--files-from", "-"] -        ssh_cmd = gconf.ssh_command_tar.split() + \ -            ["-p", str(gconf.ssh_port)] + \ -            [host, "tar"] + \ -            ["--overwrite", "-xf", "-", "-C", rdir] -        p0 = Popen(tar_cmd, stdout=subprocess.PIPE, -                   stdin=subprocess.PIPE, stderr=subprocess.PIPE) -        p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) -        for f in files: -            p0.stdin.write(f) -            p0.stdin.write('\n') +    """mounter backend using the mountbroker gluster service""" -        p0.stdin.close() -        p0.stdout.close()  # Allow p0 to receive a SIGPIPE if p1 exits. -        # wait for tar to terminate, collecting any errors, further -        # waiting for transfer to complete -        _, stderr1 = p1.communicate() +    mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} +    glusterprog = 'gluster' -        # stdin and stdout of p0 is already closed, Reset to None and -        # wait for child process to complete -        p0.stdin = None -        p0.stdout = None -        p0.communicate() - -        if log_err: -            for errline in stderr1.strip().split("\n")[:-1]: -                logging.error(lf("SYNC Error", -                                 sync_engine="Tarssh", -                                 error=errline)) - -        return p1 +    @classmethod +    def make_cli_argv(cls): +        return [cls.get_glusterprog()] + ['--remote-host=localhost'] + \ +            gconf.get("gluster-cli-options").split() + ['system::'] +    @classmethod +    def make_umount_argv(cls, d): +        return cls.make_cli_argv() + ['umount', d, 'lazy'] -class AbstractUrl(object): +    def make_mount_argv(self, label): +        return self.make_cli_argv() + \ +            ['mount', label, 'user-map-root=' + +                syncdutils.getusername()] + self.params -    """abstract base class for url scheme classes""" +    def handle_mounter(self, po): +        self.mntpt = po.stdout.readline()[:-1] +        po.stdout.close() +        sup(self, po) +        if po.returncode != 0: +            # if cli terminated with error due to being +            # refused by glusterd, what it put +            # out on stdout is a diagnostic message +            logging.error(lf('glusterd answered', mnt=self.mntpt)) -    def __init__(self, path, pattern): -        m = re.search(pattern, path) -        if not m: -            raise GsyncdError("malformed path") -        self.path = path -        return m.groups() -    @property -    def scheme(self): -        return type(self).__name__.lower() +class GLUSTERServer(Server): -    def canonical_path(self): -        return self.path +    "server enhancements for a glusterfs backend""" -    def get_url(self, canonical=False, escaped=False): -        """format self's url in various styles""" -        if canonical: -            pa = self.canonical_path() +    @classmethod +    def _attr_unpack_dict(cls, xattr, extra_fields=''): +        """generic volume mark fetching/parsing backed""" +        fmt_string = cls.NTV_FMTSTR + extra_fields +        buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) +        vm = struct.unpack(fmt_string, buf) +        m = re.match( +            '(.{8})(.{4})(.{4})(.{4})(.{12})', +            "".join(['%02x' % x for x in vm[2:18]])) +        uuid = '-'.join(m.groups()) +        volinfo = {'version': vm[0:2], +                   'uuid': uuid, +                   'retval': vm[18], +                   'volume_mark': vm[19:21], +                   } +        if extra_fields: +            return volinfo, vm[-len(extra_fields):]          else: -            pa = self.path -        u = "://".join((self.scheme, pa)) -        if escaped: -            u = syncdutils.escape(u) -        return u - -    @property -    def url(self): -        return self.get_url() - - -class FILE(AbstractUrl, SlaveLocal, SlaveRemote): - -    """scheme class for file:// urls - -    can be used to represent a file slave server -    on slave side, or interface to a remote file -    file server on master side -    """ - -    class FILEServer(Server): - -        """included server flavor""" -        pass - -    server = FILEServer - -    def __init__(self, path): -        sup(self, path, '^/') +            return volinfo -    def connect(self): -        """inhibit the resource beyond""" -        os.chdir(self.path) +    @classmethod +    def foreign_volume_infos(cls): +        """return list of valid (not expired) foreign volume marks""" +        dict_list = [] +        xattr_list = Xattr.llistxattr_buf('.') +        for ele in xattr_list: +            if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: +                d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) +                now = int(time.time()) +                if x[0] > now: +                    logging.debug("volinfo[%s] expires: %d " +                                  "(%d sec later)" % +                                  (d['uuid'], x[0], x[0] - now)) +                    d['timeout'] = x[0] +                    dict_list.append(d) +                else: +                    try: +                        Xattr.lremovexattr('.', ele) +                    except OSError: +                        pass +        return dict_list -    def rsync(self, files, log_err=False): -        return sup(self, files, self.path, log_err=log_err) +    @classmethod +    def native_volume_info(cls): +        """get the native volume mark of the underlying gluster volume""" +        try: +            return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, +                                                   'volume-mark'])) +        except OSError: +            ex = sys.exc_info()[1] +            if ex.errno != ENODATA: +                raise -class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): +class GLUSTER(object):      """scheme class for gluster:// urls @@ -1129,247 +1039,18 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      (slave-ish features come from the mixins, master      functionality is outsourced to GMaster from master)      """ - -    class GLUSTERServer(Server): - -        "server enhancements for a glusterfs backend""" - -        @classmethod -        def _attr_unpack_dict(cls, xattr, extra_fields=''): -            """generic volume mark fetching/parsing backed""" -            fmt_string = cls.NTV_FMTSTR + extra_fields -            buf = Xattr.lgetxattr('.', xattr, struct.calcsize(fmt_string)) -            vm = struct.unpack(fmt_string, buf) -            m = re.match( -                '(.{8})(.{4})(.{4})(.{4})(.{12})', -                "".join(['%02x' % x for x in vm[2:18]])) -            uuid = '-'.join(m.groups()) -            volinfo = {'version': vm[0:2], -                       'uuid': uuid, -                       'retval': vm[18], -                       'volume_mark': vm[19:21], -                       } -            if extra_fields: -                return volinfo, vm[-len(extra_fields):] -            else: -                return volinfo - -        @classmethod -        def foreign_volume_infos(cls): -            """return list of valid (not expired) foreign volume marks""" -            dict_list = [] -            xattr_list = Xattr.llistxattr_buf('.') -            for ele in xattr_list: -                if ele.find('.'.join([cls.GX_NSPACE, 'volume-mark', ''])) == 0: -                    d, x = cls._attr_unpack_dict(ele, cls.FRGN_XTRA_FMT) -                    now = int(time.time()) -                    if x[0] > now: -                        logging.debug("volinfo[%s] expires: %d " -                                      "(%d sec later)" % -                                      (d['uuid'], x[0], x[0] - now)) -                        d['timeout'] = x[0] -                        dict_list.append(d) -                    else: -                        try: -                            Xattr.lremovexattr('.', ele) -                        except OSError: -                            pass -            return dict_list - -        @classmethod -        def native_volume_info(cls): -            """get the native volume mark of the underlying gluster volume""" -            try: -                return cls._attr_unpack_dict('.'.join([cls.GX_NSPACE, -                                                       'volume-mark'])) -            except OSError: -                ex = sys.exc_info()[1] -                if ex.errno != ENODATA: -                    raise -      server = GLUSTERServer -    def __init__(self, path): -        self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern) +    def __init__(self, host, volume): +        self.path = "%s:%s" % (host, volume) +        self.host = host +        self.volume = volume          global slv_volume          global slv_host          slv_volume = self.volume          slv_host = self.host -    def canonical_path(self): -        return ':'.join([gethostbyname(self.host), self.volume]) - -    def can_connect_to(self, remote): -        """determine our position in the connectibility matrix""" -        return not remote or \ -            (isinstance(remote, SSH) and isinstance(remote.inner_rsc, GLUSTER)) - -    class Mounter(object): - -        """Abstract base class for mounter backends""" - -        def __init__(self, params): -            self.params = params -            self.mntpt = None - -        @classmethod -        def get_glusterprog(cls): -            return os.path.join(gconf.gluster_command_dir, cls.glusterprog) - -        def umount_l(self, d): -            """perform lazy umount""" -            po = Popen(self.make_umount_argv(d), stderr=subprocess.PIPE) -            po.wait() -            return po - -        @classmethod -        def make_umount_argv(cls, d): -            raise NotImplementedError - -        def make_mount_argv(self, *a): -            raise NotImplementedError - -        def cleanup_mntpt(self, *a): -            pass - -        def handle_mounter(self, po): -            po.wait() - -        def inhibit(self, *a): -            """inhibit a gluster filesystem - -            Mount glusterfs over a temporary mountpoint, -            change into the mount, and lazy unmount the -            filesystem. -            """ - -            mpi, mpo = os.pipe() -            mh = Popen.fork() -            if mh: -                os.close(mpi) -                fcntl.fcntl(mpo, fcntl.F_SETFD, fcntl.FD_CLOEXEC) -                d = None -                margv = self.make_mount_argv(*a) -                if self.mntpt: -                    # mntpt is determined pre-mount -                    d = self.mntpt -                    os.write(mpo, d + '\0') -                po = Popen(margv, **self.mountkw) -                self.handle_mounter(po) -                po.terminate_geterr() -                logging.debug('auxiliary glusterfs mount in place') -                if not d: -                    # mntpt is determined during mount -                    d = self.mntpt -                    os.write(mpo, d + '\0') -                os.write(mpo, 'M') -                t = syncdutils.Thread(target=lambda: os.chdir(d)) -                t.start() -                tlim = gconf.starttime + int(gconf.connection_timeout) -                while True: -                    if not t.isAlive(): -                        break -                    if time.time() >= tlim: -                        syncdutils.finalize(exval=1) -                    time.sleep(1) -                os.close(mpo) -                _, rv = syncdutils.waitpid(mh, 0) -                if rv: -                    rv = (os.WIFEXITED(rv) and os.WEXITSTATUS(rv) or 0) - \ -                         (os.WIFSIGNALED(rv) and os.WTERMSIG(rv) or 0) -                    logging.warn(lf('stale mount possibly left behind', -                                    path=d)) -                    raise GsyncdError("cleaning up temp mountpoint %s " -                                      "failed with status %d" % -                                      (d, rv)) -            else: -                rv = 0 -                try: -                    os.setsid() -                    os.close(mpo) -                    mntdata = '' -                    while True: -                        c = os.read(mpi, 1) -                        if not c: -                            break -                        mntdata += c -                    if mntdata: -                        mounted = False -                        if mntdata[-1] == 'M': -                            mntdata = mntdata[:-1] -                            assert(mntdata) -                            mounted = True -                        assert(mntdata[-1] == '\0') -                        mntpt = mntdata[:-1] -                        assert(mntpt) -                        if mounted and not boolify(gconf.access_mount): -                            po = self.umount_l(mntpt) -                            po.terminate_geterr(fail_on_err=False) -                            if po.returncode != 0: -                                po.errlog() -                                rv = po.returncode -                        if not boolify(gconf.access_mount): -                            self.cleanup_mntpt(mntpt) -                except: -                    logging.exception('mount cleanup failure:') -                    rv = 200 -                os._exit(rv) -            logging.debug('auxiliary glusterfs mount prepared') - -    class DirectMounter(Mounter): - -        """mounter backend which calls mount(8), umount(8) directly""" - -        mountkw = {'stderr': subprocess.PIPE} -        glusterprog = 'glusterfs' - -        @staticmethod -        def make_umount_argv(d): -            return ['umount', '-l', d] - -        def make_mount_argv(self): -            self.mntpt = tempfile.mkdtemp(prefix='gsyncd-aux-mount-') -            mntpt_list.append(self.mntpt) -            return [self.get_glusterprog()] + \ -                ['--' + p for p in self.params] + [self.mntpt] - -        def cleanup_mntpt(self, mntpt=None): -            if not mntpt: -                mntpt = self.mntpt -            errno_wrap(os.rmdir, [mntpt], [ENOENT, EBUSY]) - -    class MountbrokerMounter(Mounter): - -        """mounter backend using the mountbroker gluster service""" - -        mountkw = {'stderr': subprocess.PIPE, 'stdout': subprocess.PIPE} -        glusterprog = 'gluster' - -        @classmethod -        def make_cli_argv(cls): -            return [cls.get_glusterprog()] + ['--remote-host=localhost'] + \ -                gconf.gluster_cli_options.split() + ['system::'] - -        @classmethod -        def make_umount_argv(cls, d): -            return cls.make_cli_argv() + ['umount', d, 'lazy'] - -        def make_mount_argv(self, label): -            return self.make_cli_argv() + \ -                ['mount', label, 'user-map-root=' + -                    syncdutils.getusername()] + self.params - -        def handle_mounter(self, po): -            self.mntpt = po.stdout.readline()[:-1] -            po.stdout.close() -            sup(self, po) -            if po.returncode != 0: -                # if cli terminated with error due to being -                # refused by glusterd, what it put -                # out on stdout is a diagnostic message -                logging.error(lf('glusterd answered', mnt=self.mntpt)) -      def connect(self):          """inhibit the resource beyond @@ -1380,23 +1061,29 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):          logging.info("Mounting gluster volume locally...")          t0 = time.time() -        label = getattr(gconf, 'mountbroker', None) +        label = gconf.get('mountbroker', None)          if not label and not privileged():              label = syncdutils.getusername() -        mounter = label and self.MountbrokerMounter or self.DirectMounter -        params = gconf.gluster_params.split() + \ -            (gconf.gluster_log_level and ['log-level=' + -                                          gconf.gluster_log_level] or []) + \ -            ['log-file=' + gconf.gluster_log_file, 'volfile-server=' + -             self.host, 'volfile-id=' + self.volume, 'client-pid=-1'] -        mounter(params).inhibit(*[l for l in [label] if l]) +        mounter = label and MountbrokerMounter or DirectMounter + +        log_file = gconf.get("gluster-log-file") +        if rconf.args.subcmd == "slave": +            log_file = gconf.get("slave-gluster-log-file") + +        log_level = gconf.get("gluster-log-level") +        if rconf.args.subcmd == "slave": +            log_level = gconf.get("slave-gluster-log-level") + +        params = gconf.get("gluster-params").split() + \ +            ['log-level=' + log_level] + \ +            ['log-file=' + log_file, 'volfile-server=' + self.host] + \ +            ['volfile-id=' + self.volume, 'client-pid=-1'] + +        self.mounter = mounter(params) +        self.mounter.inhibit(label)          logging.info(lf("Mounted gluster volume",                          duration="%.4f" % (time.time() - t0))) -    def connect_remote(self, *a, **kw): -        sup(self, *a, **kw) -        self.slavedir = "/proc/%d/cwd" % self.server.pid() -      def gmaster_instantiate_tuple(self, slave):          """return a tuple of the 'one shot' and the 'main crawl'          class instance""" @@ -1404,7 +1091,7 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):                  gmaster_builder()(self, slave),                  gmaster_builder('changeloghistory')(self, slave)) -    def service_loop(self, *args): +    def service_loop(self, slave=None):          """enter service loop          - if slave given, instantiate GMaster and @@ -1412,171 +1099,183 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):            master behavior          - else do that's what's inherited          """ -        if args: -            slave = args[0] -            if gconf.local_path: -                class brickserver(FILE.FILEServer): -                    local_path = gconf.local_path -                    aggregated = self.server - -                    @classmethod -                    def entries(cls, path): -                        e = super(brickserver, cls).entries(path) -                        # on the brick don't mess with /.glusterfs -                        if path == '.': -                            try: -                                e.remove('.glusterfs') -                                e.remove('.trashcan') -                            except ValueError: -                                pass -                        return e - -                    @classmethod -                    def lstat(cls, e): -                        """ path based backend stat """ -                        return super(brickserver, cls).lstat(e) - -                    @classmethod -                    def gfid(cls, e): -                        """ path based backend gfid fetch """ -                        return super(brickserver, cls).gfid(e) - -                    @classmethod -                    def linkto_check(cls, e): -                        return super(brickserver, cls).linkto_check(e) -                if gconf.slave_id: -                    # define {,set_}xtime in slave, thus preempting -                    # the call to remote, so that it takes data from -                    # the local brick -                    slave.server.xtime = types.MethodType( -                        lambda _self, path, uuid: ( -                            brickserver.xtime(path, -                                              uuid + '.' + gconf.slave_id) -                        ), -                        slave.server) -                    slave.server.stime = types.MethodType( -                        lambda _self, path, uuid: ( -                            brickserver.stime(path, -                                              uuid + '.' + gconf.slave_id) -                        ), -                        slave.server) -                    slave.server.entry_stime = types.MethodType( -                        lambda _self, path, uuid: ( -                            brickserver.entry_stime( -                                path, -                                uuid + '.' + gconf.slave_id) -                        ), -                        slave.server) -                    slave.server.set_stime = types.MethodType( -                        lambda _self, path, uuid, mark: ( -                            brickserver.set_stime(path, -                                                  uuid + '.' + gconf.slave_id, -                                                  mark) -                        ), -                        slave.server) -                    slave.server.set_entry_stime = types.MethodType( -                        lambda _self, path, uuid, mark: ( -                            brickserver.set_entry_stime( -                                path, -                                uuid + '.' + gconf.slave_id, -                                mark) -                        ), -                        slave.server) -                (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) -                g1.master.server = brickserver -                g2.master.server = brickserver -                g3.master.server = brickserver -            else: -                (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) -                g1.master.server.aggregated = gmaster.master.server -                g2.master.server.aggregated = gmaster.master.server -                g3.master.server.aggregated = gmaster.master.server -            # bad bad bad: bad way to do things like this -            # need to make this elegant -            # register the crawlers and start crawling -            # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) -            # g3 ==> changelog History -            changelog_register_failed = False -            (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') -            changelog_agent = RepceClient(int(inf), int(ouf)) -            master_name, slave_data = get_master_and_slave_data_from_args( -                sys.argv) -            status = GeorepStatus(gconf.state_file, gconf.local_node, -                                  gconf.local_path, -                                  gconf.local_node_id, -                                  master_name, slave_data) -            status.reset_on_worker_start() -            rv = changelog_agent.version() -            if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: +        if rconf.args.subcmd == "slave": +            if gconf.get("use-rsync-xattrs") and not privileged():                  raise GsyncdError( -                    "RePCe major version mismatch(changelog agent): " -                    "local %s, remote %s" % -                    (CHANGELOG_AGENT_CLIENT_VERSION, rv)) +                    "using rsync for extended attributes is not supported") + +            repce = RepceServer( +                self.server, sys.stdin, sys.stdout, gconf.get("sync-jobs")) +            t = syncdutils.Thread(target=lambda: (repce.service_loop(), +                                                  syncdutils.finalize())) +            t.start() +            logging.info("slave listening") +            if gconf.get("slave-timeout") and gconf.get("slave-timeout") > 0: +                while True: +                    lp = self.server.last_keep_alive +                    time.sleep(gconf.get("slave-timeout")) +                    if lp == self.server.last_keep_alive: +                        logging.info( +                            lf("connection inactive, stopping", +                               timeout=gconf.get("slave-timeout"))) +                        break +            else: +                select((), (), ()) -            try: -                workdir = g2.setup_working_dir() -                # Register only when change_detector is not set to -                # xsync, else agent will generate changelog files -                # in .processing directory of working dir -                if gconf.change_detector != 'xsync': -                    # register with the changelog library -                    # 9 == log level (DEBUG) -                    # 5 == connection retries -                    changelog_agent.init() -                    changelog_agent.register(gconf.local_path, -                                             workdir, gconf.changelog_log_file, -                                             get_changelog_log_level( -                                                 gconf.changelog_log_level), -                                             g2.CHANGELOG_CONN_RETRIES) - -                register_time = int(time.time()) -                g2.register(register_time, changelog_agent, status) -                g3.register(register_time, changelog_agent, status) -            except ChangelogException as e: -                logging.error(lf("Changelog register failed", error=e)) -                sys.exit(1) - -            g1.register(status=status) -            logging.info(lf("Register time", -                            time=register_time)) -            # oneshot: Try to use changelog history api, if not -            # available switch to FS crawl -            # Note: if config.change_detector is xsync then -            # it will not use changelog history api -            try: -                g3.crawlwrap(oneshot=True) -            except PartialHistoryAvailable as e: -                logging.info(lf('Partial history available, using xsync crawl' -                                ' after consuming history', -                                till=e)) -                g1.crawlwrap(oneshot=True, register_time=register_time) -            except ChangelogHistoryNotAvailable: -                logging.info('Changelog history not available, using xsync') -                g1.crawlwrap(oneshot=True, register_time=register_time) -            except NoStimeAvailable: -                logging.info('No stime available, using xsync crawl') -                g1.crawlwrap(oneshot=True, register_time=register_time) -            except ChangelogException as e: -                logging.error(lf("Changelog History Crawl failed", -                                 error=e)) -                sys.exit(1) +            return -            try: -                g2.crawlwrap() -            except ChangelogException as e: -                logging.error(lf("Changelog crawl failed", error=e)) -                sys.exit(1) -        else: -            sup(self, *args) +        class brickserver(Server): +            local_path = rconf.args.local_path +            aggregated = self.server + +            @classmethod +            def entries(cls, path): +                e = super(brickserver, cls).entries(path) +                # on the brick don't mess with /.glusterfs +                if path == '.': +                    try: +                        e.remove('.glusterfs') +                        e.remove('.trashcan') +                    except ValueError: +                        pass +                return e + +            @classmethod +            def lstat(cls, e): +                """ path based backend stat """ +                return super(brickserver, cls).lstat(e) + +            @classmethod +            def gfid(cls, e): +                """ path based backend gfid fetch """ +                return super(brickserver, cls).gfid(e) + +            @classmethod +            def linkto_check(cls, e): +                return super(brickserver, cls).linkto_check(e) + +        # define {,set_}xtime in slave, thus preempting +        # the call to remote, so that it takes data from +        # the local brick +        slave.server.xtime = types.MethodType( +            lambda _self, path, uuid: ( +                brickserver.xtime(path, +                                  uuid + '.' + rconf.args.slave_id) +            ), +            slave.server) +        slave.server.stime = types.MethodType( +            lambda _self, path, uuid: ( +                brickserver.stime(path, +                                  uuid + '.' + rconf.args.slave_id) +            ), +            slave.server) +        slave.server.entry_stime = types.MethodType( +            lambda _self, path, uuid: ( +                brickserver.entry_stime( +                    path, +                    uuid + '.' + rconf.args.slave_id) +            ), +            slave.server) +        slave.server.set_stime = types.MethodType( +            lambda _self, path, uuid, mark: ( +                brickserver.set_stime(path, +                                      uuid + '.' + rconf.args.slave_id, +                                      mark) +            ), +            slave.server) +        slave.server.set_entry_stime = types.MethodType( +            lambda _self, path, uuid, mark: ( +                brickserver.set_entry_stime( +                    path, +                    uuid + '.' + rconf.args.slave_id, +                    mark) +            ), +            slave.server) + +        (g1, g2, g3) = self.gmaster_instantiate_tuple(slave) +        g1.master.server = brickserver +        g2.master.server = brickserver +        g3.master.server = brickserver + +        # bad bad bad: bad way to do things like this +        # need to make this elegant +        # register the crawlers and start crawling +        # g1 ==> Xsync, g2 ==> config.change_detector(changelog by default) +        # g3 ==> changelog History +        (inf, ouf, ra, wa) = rconf.args.rpc_fd.split(',') +        changelog_agent = RepceClient(int(inf), int(ouf)) + +        status = GeorepStatus(gconf.get("state-file"), +                              rconf.args.local_node, +                              rconf.args.local_path, +                              rconf.args.local_node_id, +                              rconf.args.master, +                              rconf.args.slave) +        status.reset_on_worker_start() +        rv = changelog_agent.version() +        if int(rv) != CHANGELOG_AGENT_CLIENT_VERSION: +            raise GsyncdError( +                "RePCe major version mismatch(changelog agent): " +                "local %s, remote %s" % +                (CHANGELOG_AGENT_CLIENT_VERSION, rv)) -    def rsync(self, files, log_err=False): -        return sup(self, files, self.slavedir, log_err=log_err) +        try: +            workdir = g2.setup_working_dir() +            # Register only when change_detector is not set to +            # xsync, else agent will generate changelog files +            # in .processing directory of working dir +            if gconf.get("change-detector") != 'xsync': +                # register with the changelog library +                # 9 == log level (DEBUG) +                # 5 == connection retries +                changelog_agent.init() +                changelog_agent.register(rconf.args.local_path, +                                         workdir, +                                         gconf.get("changelog-log-file"), +                                         get_changelog_log_level( +                                             gconf.get("changelog-log-level")), +                                         g2.CHANGELOG_CONN_RETRIES) + +            register_time = int(time.time()) +            g2.register(register_time, changelog_agent, status) +            g3.register(register_time, changelog_agent, status) +        except ChangelogException as e: +            logging.error(lf("Changelog register failed", error=e)) +            sys.exit(1) + +        g1.register(status=status) +        logging.info(lf("Register time", +                        time=register_time)) +        # oneshot: Try to use changelog history api, if not +        # available switch to FS crawl +        # Note: if config.change_detector is xsync then +        # it will not use changelog history api +        try: +            g3.crawlwrap(oneshot=True) +        except PartialHistoryAvailable as e: +            logging.info(lf('Partial history available, using xsync crawl' +                            ' after consuming history', +                            till=e)) +            g1.crawlwrap(oneshot=True, register_time=register_time) +        except ChangelogHistoryNotAvailable: +            logging.info('Changelog history not available, using xsync') +            g1.crawlwrap(oneshot=True, register_time=register_time) +        except NoStimeAvailable: +            logging.info('No stime available, using xsync crawl') +            g1.crawlwrap(oneshot=True, register_time=register_time) +        except ChangelogException as e: +            logging.error(lf("Changelog History Crawl failed", +                             error=e)) +            sys.exit(1) -    def tarssh(self, files, log_err=False): -        return sup(self, files, self.slavedir, log_err=log_err) +        try: +            g2.crawlwrap() +        except ChangelogException as e: +            logging.error(lf("Changelog crawl failed", error=e)) +            sys.exit(1) -class SSH(AbstractUrl, SlaveRemote): +class SSH(object):      """scheme class for ssh:// urls @@ -1584,13 +1283,9 @@ class SSH(AbstractUrl, SlaveRemote):      implementing an ssh based proxy      """ -    def __init__(self, path): -        self.remote_addr, inner_url = sup(self, path, -                                          '^((?:%s@)?%s):(.+)' % -                                          tuple([r.pattern -                                                 for r in (UserRX, HostRX)])) -        self.inner_rsc = parse_url(inner_url) -        self.volume = inner_url[1:] +    def __init__(self, host, volume): +        self.remote_addr = host +        self.volume = volume      @staticmethod      def parse_ssh_address(self): @@ -1602,35 +1297,28 @@ class SSH(AbstractUrl, SlaveRemote):          self.remotehost = h          return {'user': u, 'host': h} -    def canonical_path(self): -        rap = self.parse_ssh_address(self) -        remote_addr = '@'.join([rap['user'], gethostbyname(rap['host'])]) -        return ':'.join([remote_addr, self.inner_rsc.get_url(canonical=True)]) - -    def can_connect_to(self, remote): -        """determine our position in the connectibility matrix""" -        return False - -    def start_fd_client(self, *a, **opts): -        """customizations for client startup +    def start_fd_client(self, i, o): +        """set up RePCe client, handshake with server -        - be a no-op if we are to daemonize (client startup is deferred -          to post-daemon stage) -        - determine target url for rsync after consulting server +        It's cut out as a separate method to let +        subclasses hook into client startup          """ -        if opts.get('deferred'): -            return a -        sup(self, *a) -        ityp = type(self.inner_rsc) -        if ityp == FILE: -            slavepath = self.inner_rsc.path -        elif ityp == GLUSTER: -            slavepath = "/proc/%d/cwd" % self.server.pid() -        else: -            raise NotImplementedError +        self.server = RepceClient(i, o) +        rv = self.server.__version__() +        exrv = {'proto': repce.repce_version, 'object': Server.version()} +        da0 = (rv, exrv) +        da1 = ({}, {}) +        for i in range(2): +            for k, v in da0[i].iteritems(): +                da1[i][k] = int(v) +        if da1[0] != da1[1]: +            raise GsyncdError( +                "RePCe major version mismatch: local %s, remote %s" % +                (exrv, rv)) +        slavepath = "/proc/%d/cwd" % self.server.pid()          self.slaveurl = ':'.join([self.remote_addr, slavepath]) -    def connect_remote(self, go_daemon=None): +    def connect_remote(self):          """connect to inner slave url through outer ssh url          Wrap the connecting utility in ssh. @@ -1648,49 +1336,182 @@ class SSH(AbstractUrl, SlaveRemote):          [NB. ATM gluster product does not makes use of interactive          authentication.]          """ -        if go_daemon == 'done': -            return self.start_fd_client(*self.fd_pair) -          syncdutils.setup_ssh_ctl(tempfile.mkdtemp(prefix='gsyncd-aux-ssh-'),                                   self.remote_addr, -                                 self.inner_rsc.url) +                                 self.volume) -        deferred = go_daemon == 'postconn'          logging.info("Initializing SSH connection between master and slave...")          t0 = time.time() -        ret = sup(self, gconf.ssh_command.split() + -                  ["-p", str(gconf.ssh_port)] + -                  gconf.ssh_ctl_args + [self.remote_addr], -                  slave=self.inner_rsc.url, deferred=deferred) + +        extra_opts = [] +        remote_gsyncd = gconf.get("remote-gsyncd") +        if remote_gsyncd == "": +            remote_gsyncd = "/nonexistent/gsyncd" + +        if gconf.get("use-rsync-xattrs"): +            extra_opts.append('--use-rsync-xattrs') + +        args_to_slave = [gconf.get("ssh-command")] + \ +            gconf.get("ssh-options").split() + \ +            ["-p", str(gconf.get("ssh-port"))] + \ +            rconf.ssh_ctl_args + [self.remote_addr] + \ +            [remote_gsyncd, "slave"] + \ +            extra_opts + \ +            [rconf.args.master, rconf.args.slave] + \ +            [ +                '--master-node', rconf.args.local_node, +                '--master-node-id', rconf.args.local_node_id, +                '--master-brick', rconf.args.local_path, +                '--local-node', rconf.args.resource_remote, +                '--local-node-id', rconf.args.resource_remote_id] + \ +            [ +                # Add all config arguments here, slave gsyncd will not use +                # config file in slave side, so all overridding options should +                # be sent as arguments +                '--slave-timeout', str(gconf.get("slave-timeout")), +                '--slave-log-level', gconf.get("slave-log-level"), +                '--slave-gluster-log-level', +                gconf.get("slave-gluster-log-level")] + +        if gconf.get("slave-access-mount"): +            args_to_slave.append('--slave-access-mount') + +        if rconf.args.debug: +            args_to_slave.append('--debug') + +        po = Popen(args_to_slave, +                   stdin=subprocess.PIPE, stdout=subprocess.PIPE, +                   stderr=subprocess.PIPE) +        rconf.transport = po +        self.start_fd_client(po.stdout, po.stdin)          logging.info(lf("SSH connection between master and slave established.",                          duration="%.4f" % (time.time() - t0))) -        if deferred: -            # send a message to peer so that we can wait for -            # the answer from which we know connection is -            # established and we can proceed with daemonization -            # (doing that too early robs the ssh passwd prompt...) -            # However, we'd better not start the RepceClient -            # before daemonization (that's not preserved properly -            # in daemon), we just do a an ad-hoc linear put/get. -            i, o = ret -            inf = os.fdopen(i) -            repce.send(o, None, '__repce_version__') -            select((inf,), (), ()) -            repce.recv(inf) -            # hack hack hack: store a global reference to the file -            # to save it from getting GC'd which implies closing it -            gconf.permanent_handles.append(inf) -            self.fd_pair = (i, o) -            return 'should' - -    def rsync(self, files, log_err=False): -        return sup(self, files, '-e', -                   " ".join(gconf.ssh_command.split() + -                            ["-p", str(gconf.ssh_port)] + -                            gconf.ssh_ctl_args), -                   *(gconf.rsync_ssh_options.split() + [self.slaveurl]), -                   log_err=log_err) - -    def tarssh(self, files, log_err=False): -        return sup(self, files, self.slaveurl, log_err=log_err) +    def rsync(self, files, *args, **kw): +        """invoke rsync""" +        if not files: +            raise GsyncdError("no files to sync") +        logging.debug("files: " + ", ".join(files)) + +        extra_rsync_flags = [] +        # Performance flag, --ignore-missing-args, if rsync version is +        # greater than 3.1.0 then include this flag. +        if gconf.get("rsync-opt-ignore-missing-args") and \ +           get_rsync_version(gconf.get("rsync-command")) >= "3.1.0": +            extra_rsync_flags = ["--ignore-missing-args"] + +        rsync_ssh_opts = [gconf.get("ssh-command")] + \ +            gconf.get("ssh-options").split() + \ +            ["-p", str(gconf.get("ssh-port"))] + \ +            rconf.ssh_ctl_args + \ +            gconf.get("rsync-ssh-options").split() + +        argv = [ +            gconf.get("rsync-command"), +            '-aR0', +            '--inplace', +            '--files-from=-', +            '--super', +            '--stats', +            '--numeric-ids', +            '--no-implied-dirs' +        ] + +        if gconf.get("rsync-opt-existing"): +            argv += ["--existing"] + +        if gconf.get("sync-xattrs"): +            argv += ['--xattrs'] + +        if gconf.get("sync-acls"): +            argv += ['--acls'] + +        argv = argv + \ +            gconf.get("rsync-options").split() + \ +            extra_rsync_flags + ['.'] + \ +            ["-e", " ".join(rsync_ssh_opts)] + \ +            [self.slaveurl] + +        log_rsync_performance = gconf.getr("log-rsync-performance", False) + +        if log_rsync_performance: +            # use stdout=PIPE only when log_rsync_performance enabled +            # Else rsync will write to stdout and nobody is their +            # to consume. If PIPE is full rsync hangs. +            po = Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, +                       stderr=subprocess.PIPE) +        else: +            po = Popen(argv, stdin=subprocess.PIPE, stderr=subprocess.PIPE) + +        for f in files: +            po.stdin.write(f) +            po.stdin.write('\0') + +        stdout, stderr = po.communicate() + +        if kw.get("log_err", False): +            for errline in stderr.strip().split("\n")[:-1]: +                logging.error(lf("SYNC Error", +                                 sync_engine="Rsync", +                                 error=errline)) + +        if log_rsync_performance: +            rsync_msg = [] +            for line in stdout.split("\n"): +                if line.startswith("Number of files:") or \ +                   line.startswith("Number of regular files transferred:") or \ +                   line.startswith("Total file size:") or \ +                   line.startswith("Total transferred file size:") or \ +                   line.startswith("Literal data:") or \ +                   line.startswith("Matched data:") or \ +                   line.startswith("Total bytes sent:") or \ +                   line.startswith("Total bytes received:") or \ +                   line.startswith("sent "): +                    rsync_msg.append(line) +            logging.info(lf("rsync performance", +                            data=", ".join(rsync_msg))) + +        return po + +    def tarssh(self, files, slaveurl, log_err=False): +        """invoke tar+ssh +        -z (compress) can be use if needed, but omitting it now +        as it results in weird error (tar+ssh errors out (errcode: 2) +        """ +        if not files: +            raise GsyncdError("no files to sync") +        logging.debug("files: " + ", ".join(files)) +        (host, rdir) = slaveurl.split(':') +        tar_cmd = ["tar"] + \ +            ["--sparse", "-cf", "-", "--files-from", "-"] +        ssh_cmd = gconf.get("ssh-command-tar").split() + \ +            gconf.get("ssh-options-tar").split() + \ +            ["-p", str(gconf.get("ssh-port"))] + \ +            [host, "tar"] + \ +            ["--overwrite", "-xf", "-", "-C", rdir] +        p0 = Popen(tar_cmd, stdout=subprocess.PIPE, +                   stdin=subprocess.PIPE, stderr=subprocess.PIPE) +        p1 = Popen(ssh_cmd, stdin=p0.stdout, stderr=subprocess.PIPE) +        for f in files: +            p0.stdin.write(f) +            p0.stdin.write('\n') + +        p0.stdin.close() +        p0.stdout.close()  # Allow p0 to receive a SIGPIPE if p1 exits. +        # wait for tar to terminate, collecting any errors, further +        # waiting for transfer to complete +        _, stderr1 = p1.communicate() + +        # stdin and stdout of p0 is already closed, Reset to None and +        # wait for child process to complete +        p0.stdin = None +        p0.stdout = None +        p0.communicate() + +        if log_err: +            for errline in stderr1.strip().split("\n")[:-1]: +                logging.error(lf("SYNC Error", +                                 sync_engine="Tarssh", +                                 error=errline)) + +        return p1  | 
