diff options
4 files changed, 124 insertions, 32 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py index fb1dc1b9c..960b83c13 100644 --- a/xlators/features/marker/utils/syncdaemon/gsyncd.py +++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py @@ -289,6 +289,7 @@ def main_i(): return monitor() logging.info("syncing: %s" % " -> ".join(peers)) + resource.Popen.init_errhandler() if remote: go_daemon = remote.connect_remote(go_daemon=go_daemon) if go_daemon: diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py index 495634b06..e7cb977e8 100644 --- a/xlators/features/marker/utils/syncdaemon/master.py +++ b/xlators/features/marker/utils/syncdaemon/master.py @@ -381,7 +381,15 @@ class Syncer(object): break time.sleep(0.5) pb.close() - pb.wakeup(self.slave.rsync(pb)) + po = self.slave.rsync(pb) + if po.returncode == 0: + ret = True + elif po.returncode in (23, 24): + # partial transfer (cf. rsync(1)), that's normal + ret = False + else: + po.errfail() + pb.wakeup(ret) def add(self, e): while True: diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py index 30011b3d3..66600fdad 100644 --- a/xlators/features/marker/utils/syncdaemon/resource.py +++ b/xlators/features/marker/utils/syncdaemon/resource.py @@ -10,6 +10,8 @@ import select import socket import logging import tempfile +import threading +import subprocess from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR from gconf import gconf @@ -81,6 +83,89 @@ class _MetaXattr(object): Xattr = _MetaXattr() +class Popen(subprocess.Popen): + + @classmethod + def init_errhandler(cls): + cls.errstore = {} + def tailer(): + while True: + for po in select.select([po.stderr for po in cls.errstore], [], []): + po.lock() + try: + la = cls.errstore.get(po) + if la == None: + continue + l = os.read(po.stderr.fileno(), 1024) + tots = len(l) + for lx in la: + tots += len(lx) + while tots > 1<<20 and la: + tots -= len(la.pop(0)) + finally: + po.unlock() + t = syncdutils.Thread(target = tailer) + t.start() + cls.errhandler = t + + def lock(self): + self._lock.acquire() + + def unlock(self): + self._lock.release() + + def __init__(self, args, *a, **kw): + """subprocess.Popen wrapper with error-handling""" + self.args = args + if 'close_fds' not in kw: + kw['close_fds'] = True + self._lock = threading.Lock() + try: + sup(self, args, *a, **kw) + except: + ex = sys.exc_info()[1] + if not isinstance(ex, OSError): + raise + raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \ + (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno))) + if kw['stderr'] == subprocess.PIPE: + assert(getattr(self, 'errhandler', None)) + self.errstore[self] = [] + + def errfail(self): + filling = None + if self.elines: + filling = ", saying:" + logging.error("""command "%s" returned with %d%s""" % \ + (" ".join(self.args), self.returncode, filling)) + for l in self.elines: + for ll in l.rstrip().split("\n"): + logging.error(self.args[0] + "> " + ll.rstrip()) + syncdutils.finalize(exval = 1) + + def terminate_geterr(self, fail_on_err = True): + self.lock() + try: + elines = self.errstore.pop(self) + finally: + self.unlock() + if self.poll() == None: + self.terminate() + if sp.poll() == None: + time.sleep(0.1) + sp.kill() + while True: + b = os.read(self.stderr.fileno(), 1024) + if b: + elines.append(b) + else: + break + self.stderr.close() + self.elines = elines + if fail_on_err and self.returncode != 0: + self.errfail() + + class Server(object): GX_NSPACE = "trusted.glusterfs" @@ -222,25 +307,16 @@ class SlaveRemote(object): def connect_remote(self, rargs=[], **opts): slave = opts.get('slave', self.url) - ix, ox = os.pipe() - iy, oy = os.pipe() - pid = os.fork() - if not pid: - os.close(ox) - os.dup2(ix, sys.stdin.fileno()) - os.close(iy) - os.dup2(oy, sys.stdout.fileno()) - so = getattr(gconf, 'session_owner', None) - if so: - so_args = ['--session-owner', so] - else: - so_args = [] - argv = rargs + gconf.remote_gsyncd.split() + so_args + \ - ['-N', '--listen', '--timeout', str(gconf.timeout), slave] - os.execvp(argv[0], argv) - os.close(ix) - os.close(oy) - return self.start_fd_client(iy, ox, **opts) + so = getattr(gconf, 'session_owner', None) + if so: + so_args = ['--session-owner', so] + else: + so_args = [] + po = Popen(rargs + gconf.remote_gsyncd.split() + so_args + \ + ['-N', '--listen', '--timeout', str(gconf.timeout), slave], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gconf.transport = po + return self.start_fd_client(po.stdout, po.stdin, **opts) def start_fd_client(self, i, o, **opts): self.server = RepceClient(i, o) @@ -259,7 +335,10 @@ class SlaveRemote(object): raise GsyncdError("no files to sync") logging.debug("files: " + ", ".join(files)) argv = gconf.rsync_command.split() + gconf.rsync_extra.split() + ['-aR'] + files + list(args) - return os.spawnvp(os.P_WAIT, argv[0], argv) == 0 + po = Popen(argv, stderr=subprocess.PIPE) + po.wait() + po.terminate_geterr(fail_on_err = False) + return po class AbstractUrl(object): @@ -375,27 +454,28 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): def connect(self): def umount_l(d): - argv = ['umount', '-l', d] - return os.spawnvp(os.P_WAIT, argv[0], argv) + po = Popen(['umount', '-l', d], stderr=subprocess.PIPE) + po.wait() + return po d = tempfile.mkdtemp(prefix='gsyncd-aux-mount-') mounted = False try: - argv = gconf.gluster_command.split() + \ - (gconf.gluster_log_level and ['-L', gconf.gluster_log_level] or []) + \ - ['-l', gconf.gluster_log_file, '-s', self.host, - '--volfile-id', self.volume, '--client-pid=-1', d] - if os.spawnvp(os.P_WAIT, argv[0], argv): - raise GsyncdError("command failed: " + " ".join(argv)) + po = Popen(gconf.gluster_command.split() + \ + (gconf.gluster_log_level and ['-L', gconf.gluster_log_level] or []) + \ + ['-l', gconf.gluster_log_file, '-s', self.host, + '--volfile-id', self.volume, '--client-pid=-1', d], + stderr=subprocess.PIPE) + po.wait() + po.terminate_geterr() mounted = True logging.debug('auxiliary glusterfs mount in place') os.chdir(d) - if umount_l(d) != 0: - raise GsyncdError("umounting %s failed" % d) + umount_l(d).terminate_geterr() mounted = False finally: try: if mounted: - umount_l(d) + umount_l(d).terminate_geterr(fail_on_err = False) os.rmdir(d) except: logging.warn('stale mount possibly left behind on ' + d) diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py index a905745f1..35afe64e9 100644 --- a/xlators/features/marker/utils/syncdaemon/syncdutils.py +++ b/xlators/features/marker/utils/syncdaemon/syncdutils.py @@ -146,6 +146,9 @@ def log_raise_exception(excont): ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \ exc.errno == EPIPE): logging.error('connection to peer is broken') + if hasattr(gconf, 'transport'): + gconf.transport.wait() + gconf.transport.terminate_geterr() elif isinstance(exc, OSError) and exc.errno == ENOTCONN: logging.error('glusterfs session went down') else: |