summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--xlators/features/marker/utils/syncdaemon/gsyncd.py1
-rw-r--r--xlators/features/marker/utils/syncdaemon/master.py10
-rw-r--r--xlators/features/marker/utils/syncdaemon/resource.py142
-rw-r--r--xlators/features/marker/utils/syncdaemon/syncdutils.py3
4 files changed, 124 insertions, 32 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py
index fb1dc1b9c..960b83c13 100644
--- a/xlators/features/marker/utils/syncdaemon/gsyncd.py
+++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py
@@ -289,6 +289,7 @@ def main_i():
return monitor()
logging.info("syncing: %s" % " -> ".join(peers))
+ resource.Popen.init_errhandler()
if remote:
go_daemon = remote.connect_remote(go_daemon=go_daemon)
if go_daemon:
diff --git a/xlators/features/marker/utils/syncdaemon/master.py b/xlators/features/marker/utils/syncdaemon/master.py
index 495634b06..e7cb977e8 100644
--- a/xlators/features/marker/utils/syncdaemon/master.py
+++ b/xlators/features/marker/utils/syncdaemon/master.py
@@ -381,7 +381,15 @@ class Syncer(object):
break
time.sleep(0.5)
pb.close()
- pb.wakeup(self.slave.rsync(pb))
+ po = self.slave.rsync(pb)
+ if po.returncode == 0:
+ ret = True
+ elif po.returncode in (23, 24):
+ # partial transfer (cf. rsync(1)), that's normal
+ ret = False
+ else:
+ po.errfail()
+ pb.wakeup(ret)
def add(self, e):
while True:
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py
index 30011b3d3..66600fdad 100644
--- a/xlators/features/marker/utils/syncdaemon/resource.py
+++ b/xlators/features/marker/utils/syncdaemon/resource.py
@@ -10,6 +10,8 @@ import select
import socket
import logging
import tempfile
+import threading
+import subprocess
from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EISDIR
from gconf import gconf
@@ -81,6 +83,89 @@ class _MetaXattr(object):
Xattr = _MetaXattr()
+class Popen(subprocess.Popen):
+
+ @classmethod
+ def init_errhandler(cls):
+ cls.errstore = {}
+ def tailer():
+ while True:
+ for po in select.select([po.stderr for po in cls.errstore], [], []):
+ po.lock()
+ try:
+ la = cls.errstore.get(po)
+ if la == None:
+ continue
+ l = os.read(po.stderr.fileno(), 1024)
+ tots = len(l)
+ for lx in la:
+ tots += len(lx)
+ while tots > 1<<20 and la:
+ tots -= len(la.pop(0))
+ finally:
+ po.unlock()
+ t = syncdutils.Thread(target = tailer)
+ t.start()
+ cls.errhandler = t
+
+ def lock(self):
+ self._lock.acquire()
+
+ def unlock(self):
+ self._lock.release()
+
+ def __init__(self, args, *a, **kw):
+ """subprocess.Popen wrapper with error-handling"""
+ self.args = args
+ if 'close_fds' not in kw:
+ kw['close_fds'] = True
+ self._lock = threading.Lock()
+ try:
+ sup(self, args, *a, **kw)
+ except:
+ ex = sys.exc_info()[1]
+ if not isinstance(ex, OSError):
+ raise
+ raise GsyncdError("""execution of "%s" failed with %s (%s)""" % \
+ (args[0], errno.errorcode[ex.errno], os.strerror(ex.errno)))
+ if kw['stderr'] == subprocess.PIPE:
+ assert(getattr(self, 'errhandler', None))
+ self.errstore[self] = []
+
+ def errfail(self):
+ filling = None
+ if self.elines:
+ filling = ", saying:"
+ logging.error("""command "%s" returned with %d%s""" % \
+ (" ".join(self.args), self.returncode, filling))
+ for l in self.elines:
+ for ll in l.rstrip().split("\n"):
+ logging.error(self.args[0] + "> " + ll.rstrip())
+ syncdutils.finalize(exval = 1)
+
+ def terminate_geterr(self, fail_on_err = True):
+ self.lock()
+ try:
+ elines = self.errstore.pop(self)
+ finally:
+ self.unlock()
+ if self.poll() == None:
+ self.terminate()
+ if sp.poll() == None:
+ time.sleep(0.1)
+ sp.kill()
+ while True:
+ b = os.read(self.stderr.fileno(), 1024)
+ if b:
+ elines.append(b)
+ else:
+ break
+ self.stderr.close()
+ self.elines = elines
+ if fail_on_err and self.returncode != 0:
+ self.errfail()
+
+
class Server(object):
GX_NSPACE = "trusted.glusterfs"
@@ -222,25 +307,16 @@ class SlaveRemote(object):
def connect_remote(self, rargs=[], **opts):
slave = opts.get('slave', self.url)
- ix, ox = os.pipe()
- iy, oy = os.pipe()
- pid = os.fork()
- if not pid:
- os.close(ox)
- os.dup2(ix, sys.stdin.fileno())
- os.close(iy)
- os.dup2(oy, sys.stdout.fileno())
- so = getattr(gconf, 'session_owner', None)
- if so:
- so_args = ['--session-owner', so]
- else:
- so_args = []
- argv = rargs + gconf.remote_gsyncd.split() + so_args + \
- ['-N', '--listen', '--timeout', str(gconf.timeout), slave]
- os.execvp(argv[0], argv)
- os.close(ix)
- os.close(oy)
- return self.start_fd_client(iy, ox, **opts)
+ so = getattr(gconf, 'session_owner', None)
+ if so:
+ so_args = ['--session-owner', so]
+ else:
+ so_args = []
+ po = Popen(rargs + gconf.remote_gsyncd.split() + so_args + \
+ ['-N', '--listen', '--timeout', str(gconf.timeout), slave],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ gconf.transport = po
+ return self.start_fd_client(po.stdout, po.stdin, **opts)
def start_fd_client(self, i, o, **opts):
self.server = RepceClient(i, o)
@@ -259,7 +335,10 @@ class SlaveRemote(object):
raise GsyncdError("no files to sync")
logging.debug("files: " + ", ".join(files))
argv = gconf.rsync_command.split() + gconf.rsync_extra.split() + ['-aR'] + files + list(args)
- return os.spawnvp(os.P_WAIT, argv[0], argv) == 0
+ po = Popen(argv, stderr=subprocess.PIPE)
+ po.wait()
+ po.terminate_geterr(fail_on_err = False)
+ return po
class AbstractUrl(object):
@@ -375,27 +454,28 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def connect(self):
def umount_l(d):
- argv = ['umount', '-l', d]
- return os.spawnvp(os.P_WAIT, argv[0], argv)
+ po = Popen(['umount', '-l', d], stderr=subprocess.PIPE)
+ po.wait()
+ return po
d = tempfile.mkdtemp(prefix='gsyncd-aux-mount-')
mounted = False
try:
- argv = gconf.gluster_command.split() + \
- (gconf.gluster_log_level and ['-L', gconf.gluster_log_level] or []) + \
- ['-l', gconf.gluster_log_file, '-s', self.host,
- '--volfile-id', self.volume, '--client-pid=-1', d]
- if os.spawnvp(os.P_WAIT, argv[0], argv):
- raise GsyncdError("command failed: " + " ".join(argv))
+ po = Popen(gconf.gluster_command.split() + \
+ (gconf.gluster_log_level and ['-L', gconf.gluster_log_level] or []) + \
+ ['-l', gconf.gluster_log_file, '-s', self.host,
+ '--volfile-id', self.volume, '--client-pid=-1', d],
+ stderr=subprocess.PIPE)
+ po.wait()
+ po.terminate_geterr()
mounted = True
logging.debug('auxiliary glusterfs mount in place')
os.chdir(d)
- if umount_l(d) != 0:
- raise GsyncdError("umounting %s failed" % d)
+ umount_l(d).terminate_geterr()
mounted = False
finally:
try:
if mounted:
- umount_l(d)
+ umount_l(d).terminate_geterr(fail_on_err = False)
os.rmdir(d)
except:
logging.warn('stale mount possibly left behind on ' + d)
diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py
index a905745f1..35afe64e9 100644
--- a/xlators/features/marker/utils/syncdaemon/syncdutils.py
+++ b/xlators/features/marker/utils/syncdaemon/syncdutils.py
@@ -146,6 +146,9 @@ def log_raise_exception(excont):
((isinstance(exc, OSError) or isinstance(exc, IOError)) and \
exc.errno == EPIPE):
logging.error('connection to peer is broken')
+ if hasattr(gconf, 'transport'):
+ gconf.transport.wait()
+ gconf.transport.terminate_geterr()
elif isinstance(exc, OSError) and exc.errno == ENOTCONN:
logging.error('glusterfs session went down')
else: