summaryrefslogtreecommitdiffstats
path: root/xlators/features
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features')
-rw-r--r--xlators/features/marker/utils/syncdaemon/gconf.py1
-rw-r--r--xlators/features/marker/utils/syncdaemon/gsyncd.py10
-rw-r--r--xlators/features/marker/utils/syncdaemon/monitor.py41
-rw-r--r--xlators/features/marker/utils/syncdaemon/repce.py5
-rw-r--r--xlators/features/marker/utils/syncdaemon/resource.py11
-rw-r--r--xlators/features/marker/utils/syncdaemon/syncdutils.py22
6 files changed, 66 insertions, 24 deletions
diff --git a/xlators/features/marker/utils/syncdaemon/gconf.py b/xlators/features/marker/utils/syncdaemon/gconf.py
index 4e3b959fe37..146c72a1825 100644
--- a/xlators/features/marker/utils/syncdaemon/gconf.py
+++ b/xlators/features/marker/utils/syncdaemon/gconf.py
@@ -10,6 +10,7 @@ class GConf(object):
pid_file_owned = False
log_exit = False
permanent_handles = []
+ log_metadata = {}
@classmethod
def setup_ssh_ctl(cls, ctld):
diff --git a/xlators/features/marker/utils/syncdaemon/gsyncd.py b/xlators/features/marker/utils/syncdaemon/gsyncd.py
index 67a873cb57c..2b8356b1620 100644
--- a/xlators/features/marker/utils/syncdaemon/gsyncd.py
+++ b/xlators/features/marker/utils/syncdaemon/gsyncd.py
@@ -6,7 +6,6 @@ import sys
import time
import logging
import signal
-import select
import optparse
import fcntl
import fnmatch
@@ -18,7 +17,7 @@ from ipaddr import IPAddress, IPNetwork
from gconf import gconf
from syncdutils import FreeObject, norm, grabpidfile, finalize, log_raise_exception
-from syncdutils import GsyncdError
+from syncdutils import GsyncdError, select
from configinterface import GConffile
import resource
from monitor import monitor
@@ -59,7 +58,6 @@ class GLogger(Logger):
logging.getLogger().handlers = []
logging.basicConfig(**lprm)
-
def startup(**kw):
"""set up logging, pidfile grabbing, daemonization"""
if getattr(gconf, 'pid_file', None) and kw.get('go_daemon') != 'postconn':
@@ -87,7 +85,7 @@ def startup(**kw):
# so we can start up with
# no messing from the dirty
# ol' bustard
- select.select((x,), (), ())
+ select((x,), (), ())
os.close(x)
lkw = {}
@@ -100,7 +98,11 @@ def startup(**kw):
lkw['stream'] = sys.stdout
else:
lkw['filename'] = kw['log_file']
+
GLogger.setup(label=kw.get('label'), **lkw)
+
+ lkw.update({'saved_label': kw.get('label')})
+ gconf.log_metadata = lkw
gconf.log_exit = True
def main():
diff --git a/xlators/features/marker/utils/syncdaemon/monitor.py b/xlators/features/marker/utils/syncdaemon/monitor.py
index b8e9219dc47..9536f3e2683 100644
--- a/xlators/features/marker/utils/syncdaemon/monitor.py
+++ b/xlators/features/marker/utils/syncdaemon/monitor.py
@@ -1,11 +1,10 @@
import os
import sys
import time
+import signal
import logging
-import select
-from signal import SIGKILL
from gconf import gconf
-from syncdutils import update_file
+from syncdutils import update_file, select, waitpid
class Monitor(object):
"""class which spawns and manages gsyncd workers"""
@@ -42,6 +41,19 @@ class Monitor(object):
blown worker blows up on EPIPE if the net goes down,
due to the keep-alive thread)
"""
+ def sigcont_handler(*a):
+ """
+ Re-init logging and send group kill signal
+ """
+ md = gconf.log_metadata
+ logging.shutdown()
+ lcls = logging.getLoggerClass()
+ lcls.setup(label=md.get('saved_label'), **md)
+ pid = os.getpid()
+ os.kill(-pid, signal.SIGUSR1)
+ signal.signal(signal.SIGUSR1, lambda *a: ())
+ signal.signal(signal.SIGCONT, sigcont_handler)
+
argv = sys.argv[:]
for o in ('-N', '--no-daemon', '--monitor'):
while o in argv:
@@ -52,11 +64,16 @@ class Monitor(object):
self.set_state('starting...')
ret = 0
def nwait(p, o=0):
- p2, r = os.waitpid(p, o)
+ p2, r = waitpid(p, o)
if not p2:
return
- if os.WIFEXITED(r):
- return os.WEXITSTATUS(r)
+ return r
+ def exit_signalled(s):
+ """ child teminated due to receipt of SIGUSR1 """
+ return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))
+ def exit_status(s):
+ if os.WIFEXITED(s):
+ return os.WEXITSTATUS(s)
return 1
conn_timeout = 60
while ret in (0, 1):
@@ -69,7 +86,7 @@ class Monitor(object):
os.execv(sys.executable, argv + ['--feedback-fd', str(pw)])
os.close(pw)
t0 = time.time()
- so = select.select((pr,), (), (), conn_timeout)[0]
+ so = select((pr,), (), (), conn_timeout)[0]
os.close(pr)
if so:
ret = nwait(cpid, os.WNOHANG)
@@ -86,13 +103,17 @@ class Monitor(object):
else:
logging.debug("worker not confirmed in %d sec, aborting it" % \
conn_timeout)
- os.kill(cpid, SIGKILL)
+ os.kill(cpid, signal.SIGKILL)
ret = nwait(cpid)
if ret == None:
self.set_state('OK')
ret = nwait(cpid)
- elif ret in (0, 1):
- self.set_state('faulty')
+ if exit_signalled(ret):
+ ret = 0
+ else:
+ ret = exit_status(ret)
+ if ret in (0,1):
+ self.set_state('faulty')
time.sleep(10)
self.set_state('inconsistent')
return ret
diff --git a/xlators/features/marker/utils/syncdaemon/repce.py b/xlators/features/marker/utils/syncdaemon/repce.py
index 9473524909a..755fb61df48 100644
--- a/xlators/features/marker/utils/syncdaemon/repce.py
+++ b/xlators/features/marker/utils/syncdaemon/repce.py
@@ -1,6 +1,5 @@
import os
import sys
-import select
import time
import logging
from threading import Condition
@@ -20,7 +19,7 @@ except ImportError:
# py 3
import pickle
-from syncdutils import Thread
+from syncdutils import Thread, select
pickle_proto = -1
repce_version = 1.0
@@ -154,7 +153,7 @@ class RepceClient(object):
def listen(self):
while True:
- select.select((self.inf,), (), ())
+ select((self.inf,), (), ())
rid, exc, res = recv(self.inf)
rjob = self.jtab.pop(rid)
if rjob.cbk:
diff --git a/xlators/features/marker/utils/syncdaemon/resource.py b/xlators/features/marker/utils/syncdaemon/resource.py
index 3595f428fdd..3454c38234a 100644
--- a/xlators/features/marker/utils/syncdaemon/resource.py
+++ b/xlators/features/marker/utils/syncdaemon/resource.py
@@ -5,7 +5,6 @@ import stat
import time
import errno
import struct
-import select
import socket
import logging
import tempfile
@@ -18,7 +17,7 @@ import repce
from repce import RepceServer, RepceClient
from master import GMaster
import syncdutils
-from syncdutils import GsyncdError
+from syncdutils import GsyncdError, select
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
HostRX = re.compile('[a-z\d](?:[a-z\d.-]*[a-z\d])?', re.I)
@@ -113,11 +112,11 @@ class Popen(subprocess.Popen):
@classmethod
def init_errhandler(cls):
- """start the thread which hanldes children's error output"""
+ """start the thread which handles children's error output"""
cls.errstore = {}
def tailer():
while True:
- for po in select.select([po.stderr for po in cls.errstore], [], []):
+ for po in select([po.stderr for po in cls.errstore], [], []):
po.lock.acquire()
try:
la = cls.errstore.get(po)
@@ -419,7 +418,7 @@ class SlaveLocal(object):
logging.info("connection inactive for %d seconds, stopping" % int(gconf.timeout))
break
else:
- select.select((), (), ())
+ select((), (), ())
class SlaveRemote(object):
"""mix-in class to implement an interface to a remote slave"""
@@ -826,7 +825,7 @@ class SSH(AbstractUrl, SlaveRemote):
i, o = ret
inf = os.fdopen(i)
repce.send(o, None, '__repce_version__')
- select.select((inf,), (), ())
+ select((inf,), (), ())
repce.recv(inf)
# hack hack hack: store a global reference to the file
# to save it from getting GC'd which implies closing it
diff --git a/xlators/features/marker/utils/syncdaemon/syncdutils.py b/xlators/features/marker/utils/syncdaemon/syncdutils.py
index e3098d5f4ea..59defa711ed 100644
--- a/xlators/features/marker/utils/syncdaemon/syncdutils.py
+++ b/xlators/features/marker/utils/syncdaemon/syncdutils.py
@@ -6,9 +6,11 @@ import fcntl
import shutil
import logging
from threading import Lock, Thread as baseThread
-from errno import EACCES, EAGAIN, EPIPE, ENOTCONN
+from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, EINTR
from signal import SIGTERM, SIGKILL
from time import sleep
+import select as oselect
+from os import waitpid as owaitpid
try:
from cPickle import PickleError
except ImportError:
@@ -247,3 +249,21 @@ def boolify(s):
logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s))
return rv
+
+def eintr_wrap(func, exc, *a):
+ """
+ wrapper around syscalls resilient to interrupt caused
+ by signals
+ """
+ while True:
+ try:
+ return func(*a)
+ except exc, ex:
+ if not ex[0] == EINTR:
+ raise GsyncdError(ex[1])
+
+def select(*a):
+ return eintr_wrap(oselect.select, oselect.error, *a)
+
+def waitpid (*a):
+ return eintr_wrap(owaitpid, OSError, *a)