diff options
| author | Csaba Henk <csaba@gluster.com> | 2011-07-15 02:45:18 +0200 | 
|---|---|---|
| committer | Vijay Bellur <vijay@gluster.com> | 2011-08-25 05:36:45 -0700 | 
| commit | b617b89372f9f6cb5031dfb8513029a7fa490f73 (patch) | |
| tree | 0517a825e979f0615ddea7e8507ea6261db5f1e2 | |
| parent | f9b09cd2be47c044c1396e70724a427ef46f8b81 (diff) | |
gsyncd: refine command invocation
Use subprocess module instead of os.spawn* / ad-hoc fork/exec.
With this, we do now:
- close uneeded files in children
- watch childrens' stderr:
  - have a thread which collects childrens' stderr into a ring buffer
    (so that stderr pipe doesn't get stuffed)
  - on command failure show stderr
- distinguish between rsync exit values, tolerate only partial errors
- if connection is broken to slave, show ssh/slave gsycd's stderr
Change-Id: Ia92f57b5bdfa47f8c44375c50cf279006a0bf69b
BUG: 2946
Reviewed-on: http://review.gluster.com/85
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Tested-by: Kaushik BV <kaushikbv@gluster.com>
Reviewed-by: Kaushik BV <kaushikbv@gluster.com>
4 files changed, 124 insertions, 32 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py index fb1dc1b9c..960b83c13 100644 --- a/xlators/features/marker/utils/syncdaemon/gsyncd.py +++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py @@ -289,6 +289,7 @@ def main_i():          return monitor()      logging.info("syncing: %s" % " -> ".join(peers)) +    resource.Popen.init_errhandler()      if remote:          go_daemon = remote.connect_remote(go_daemon=go_daemon)          if go_daemon: diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index 495634b06..e7cb977e8 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -381,7 +381,15 @@ class Syncer(object):                      break                  time.sleep(0.5)              pb.close() -            pb.wakeup(self.slave.rsync(pb)) +            po = self.slave.rsync(pb) +            if po.returncode == 0: +                ret = True +            elif po.returncode in (23, 24): +                # partial transfer (cf. rsync(1)), that's normal +                ret = False +            else: +                po.errfail() +            pb.wakeup(ret)      def add(self, e):          while True: diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py index 30011b3d3..66600fdad 100644 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -10,6 +10,8 @@ import select  import socket  import logging  import tempfile +import threading +import subprocess  from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR  from gconf import gconf @@ -81,6 +83,89 @@ class _MetaXattr(object):  Xattr = _MetaXattr() +class Popen(subprocess.Popen): + +    @classmethod +    def init_errhandler(cls): +        cls.errstore = {} +        def tailer(): +            while True: +                for po in select.select([po.stderr for po in cls.errstore], [], []): +                    po.lock() +                    try: +                        la = cls.errstore.get(po) +                        if la == None: +                            continue +                        l = os.read(po.stderr.fileno(), 1024) +                        tots = len(l) +                        for lx in la: +                            tots += len(lx) +                        while tots > 1<<20 and la: +                            tots -= len(la.pop(0)) +                    finally: +                        po.unlock() +        t = syncdutils.Thread(target = tailer) +        t.start() +        cls.errhandler = t + +    def lock(self): +        self._lock.acquire() + +    def unlock(self): +        self._lock.release() + +    def __init__(self, args, *a, **kw): +        """subprocess.Popen wrapper with error-handling""" +        self.args = args +        if 'close_fds' not in kw: +            kw['close_fds'] = True +        self._lock = threading.Lock() +        try: +            sup(self, args, *a, **kw) +        except: +            ex = sys.exc_info()[1] +            if not isinstance(ex, OSError): +                raise +            raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \ +                              (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno))) +        if kw['stderr'] == subprocess.PIPE: +            assert(getattr(self, 'errhandler', None)) +            self.errstore[self] = [] + +    def errfail(self): +        filling = None +        if self.elines: +            filling = ", saying:" +        logging.error("""command "%s" returned with %d%s""" % \ +                      (" ".join(self.args), self.returncode, filling)) +        for l in self.elines: +            for ll in l.rstrip().split("\n"): +                logging.error(self.args[0] + "> " + ll.rstrip()) +        syncdutils.finalize(exval = 1) + +    def terminate_geterr(self, fail_on_err = True): +        self.lock() +        try: +            elines = self.errstore.pop(self) +        finally: +            self.unlock() +        if self.poll() == None: +            self.terminate() +            if sp.poll() == None: +                time.sleep(0.1) +            sp.kill() +        while True: +            b = os.read(self.stderr.fileno(), 1024) +            if b: +                elines.append(b) +            else: +                break +        self.stderr.close() +        self.elines = elines +        if fail_on_err and self.returncode != 0: +            self.errfail() + +  class Server(object):      GX_NSPACE = "trusted.glusterfs" @@ -222,25 +307,16 @@ class SlaveRemote(object):      def connect_remote(self, rargs=[], **opts):          slave = opts.get('slave', self.url) -        ix, ox = os.pipe() -        iy, oy = os.pipe() -        pid = os.fork() -        if not pid: -            os.close(ox) -            os.dup2(ix, sys.stdin.fileno()) -            os.close(iy) -            os.dup2(oy, sys.stdout.fileno()) -            so = getattr(gconf, 'session_owner', None) -            if so: -                so_args = ['--session-owner', so] -            else: -                so_args = [] -            argv = rargs + gconf.remote_gsyncd.split() + so_args + \ -                     ['-N', '--listen', '--timeout', str(gconf.timeout), slave] -            os.execvp(argv[0], argv) -        os.close(ix) -        os.close(oy) -        return self.start_fd_client(iy, ox, **opts) +        so = getattr(gconf, 'session_owner', None) +        if so: +            so_args = ['--session-owner', so] +        else: +            so_args = [] +        po = Popen(rargs + gconf.remote_gsyncd.split() + so_args + \ +                   ['-N', '--listen', '--timeout', str(gconf.timeout), slave], +                   stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) +        gconf.transport = po +        return self.start_fd_client(po.stdout, po.stdin, **opts)      def start_fd_client(self, i, o, **opts):          self.server = RepceClient(i, o) @@ -259,7 +335,10 @@ class SlaveRemote(object):              raise GsyncdError("no files to sync")          logging.debug("files: " + ", ".join(files))          argv = gconf.rsync_command.split() + gconf.rsync_extra.split() + ['-aR'] + files + list(args) -        return os.spawnvp(os.P_WAIT, argv[0], argv) == 0 +        po = Popen(argv, stderr=subprocess.PIPE) +        po.wait() +        po.terminate_geterr(fail_on_err = False) +        return po  class AbstractUrl(object): @@ -375,27 +454,28 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):      def connect(self):          def umount_l(d): -            argv = ['umount', '-l', d] -            return os.spawnvp(os.P_WAIT, argv[0], argv) +            po = Popen(['umount', '-l', d], stderr=subprocess.PIPE) +            po.wait() +            return po          d = tempfile.mkdtemp(prefix='gsyncd-aux-mount-')          mounted = False          try: -            argv = gconf.gluster_command.split() + \ -                    (gconf.gluster_log_level and ['-L', gconf.gluster_log_level] or []) + \ -                    ['-l', gconf.gluster_log_file, '-s', self.host, -                     '--volfile-id', self.volume, '--client-pid=-1', d] -            if os.spawnvp(os.P_WAIT, argv[0], argv): -                raise GsyncdError("command failed: " + " ".join(argv)) +            po = Popen(gconf.gluster_command.split() + \ +                       (gconf.gluster_log_level and ['-L', gconf.gluster_log_level] or []) + \ +                       ['-l', gconf.gluster_log_file, '-s', self.host, +                        '--volfile-id', self.volume, '--client-pid=-1', d], +                       stderr=subprocess.PIPE) +            po.wait() +            po.terminate_geterr()              mounted = True              logging.debug('auxiliary glusterfs mount in place')              os.chdir(d) -            if umount_l(d) != 0: -                raise GsyncdError("umounting %s failed" % d) +            umount_l(d).terminate_geterr()              mounted = False          finally:              try:                  if mounted: -                    umount_l(d) +                    umount_l(d).terminate_geterr(fail_on_err = False)                  os.rmdir(d)              except:                  logging.warn('stale mount possibly left behind on ' + d) diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py index a905745f1..35afe64e9 100644 --- a/xlators/features/marker/utils/syncdaemon/syncdutils.py +++ b/xlators/features/marker/utils/syncdaemon/syncdutils.py @@ -146,6 +146,9 @@ def log_raise_exception(excont):               ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \                exc.errno == EPIPE):              logging.error('connection to peer is broken') +            if hasattr(gconf, 'transport'): +                gconf.transport.wait() +                gconf.transport.terminate_geterr()          elif isinstance(exc, OSError) and exc.errno == ENOTCONN:              logging.error('glusterfs session went down')          else:  | 
