summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r--geo-replication/syncdaemon/changelogagent.py2
-rw-r--r--geo-replication/syncdaemon/monitor.py54
-rw-r--r--geo-replication/syncdaemon/resource.py2
3 files changed, 44 insertions, 14 deletions
diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py
index ad5f69cfb23..731dbd06f57 100644
--- a/geo-replication/syncdaemon/changelogagent.py
+++ b/geo-replication/syncdaemon/changelogagent.py
@@ -66,8 +66,6 @@ class Changelog(object):
class ChangelogAgent(object):
def __init__(self, obj, fd_tup):
(inf, ouf, rw, ww) = fd_tup.split(',')
- os.close(int(rw))
- os.close(int(ww))
repce = RepceServer(obj, int(inf), int(ouf), 1)
t = syncdutils.Thread(target=lambda: (repce.service_loop(),
syncdutils.finalize()))
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index ba5c8e32514..ecf48c51f7b 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -18,7 +18,7 @@ import xml.etree.ElementTree as XET
from subprocess import PIPE
from resource import Popen, FILE, GLUSTER, SSH
from threading import Lock
-from errno import EEXIST
+from errno import ECHILD
import re
import random
from gconf import gconf
@@ -180,10 +180,18 @@ class Monitor(object):
ret = 0
def nwait(p, o=0):
- p2, r = waitpid(p, o)
- if not p2:
- return
- return r
+ try:
+ p2, r = waitpid(p, o)
+ if not p2:
+ return
+ return r
+ except OSError as e:
+ # no child process, this happens if the child process
+ # already died and has been cleaned up
+ if e.errno == ECHILD:
+ return -1
+ else:
+ raise
def exit_signalled(s):
""" child teminated due to receipt of SIGUSR1 """
@@ -232,6 +240,8 @@ class Monitor(object):
# spawn the agent process
apid = os.fork()
if apid == 0:
+ os.close(rw)
+ os.close(ww)
os.execv(sys.executable, argv + ['--local-path', w[0],
'--agent',
'--rpc-fd',
@@ -241,6 +251,8 @@ class Monitor(object):
cpid = os.fork()
if cpid == 0:
os.close(pr)
+ os.close(ra)
+ os.close(wa)
os.execv(sys.executable, argv + ['--feedback-fd', str(pw),
'--local-path', w[0],
'--local-id',
@@ -269,30 +281,52 @@ class Monitor(object):
if so:
ret = nwait(cpid, os.WNOHANG)
+ ret_agent = nwait(apid, os.WNOHANG)
+
+ if ret_agent is not None:
+ # Agent is died Kill Worker
+ logging.info("Changelog Agent died, "
+ "Aborting Worker(%s)" % w[0])
+ os.kill(cpid, signal.SIGKILL)
+ nwait(cpid)
+ nwait(apid)
+
if ret is not None:
logging.info("worker(%s) died before establishing "
"connection" % w[0])
- nwait(apid) #wait for agent
+ nwait(apid) # wait for agent
else:
logging.debug("worker(%s) connected" % w[0])
while time.time() < t0 + conn_timeout:
ret = nwait(cpid, os.WNOHANG)
+ ret_agent = nwait(apid, os.WNOHANG)
+
if ret is not None:
logging.info("worker(%s) died in startup "
"phase" % w[0])
- nwait(apid) #wait for agent
+ nwait(apid) # wait for agent
+ break
+
+ if ret_agent is not None:
+ # Agent is died Kill Worker
+ logging.info("Changelog Agent died, Aborting "
+ "Worker(%s)" % w[0])
+ os.kill(cpid, signal.SIGKILL)
+ nwait(cpid)
+ nwait(apid)
break
+
time.sleep(1)
else:
logging.info("worker(%s) not confirmed in %d sec, "
"aborting it" % (w[0], conn_timeout))
os.kill(cpid, signal.SIGKILL)
- nwait(apid) #wait for agent
+ nwait(apid) # wait for agent
ret = nwait(cpid)
if ret is None:
self.status[w[0]].set_worker_status(self.ST_STABLE)
- #If worker dies, agent terminates on EOF.
- #So lets wait for agent first.
+ # If worker dies, agent terminates on EOF.
+ # So lets wait for agent first.
nwait(apid)
ret = nwait(cpid)
if exit_signalled(ret):
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index c73347aaf17..8869a109cf9 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -1394,8 +1394,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
# g3 ==> changelog History
changelog_register_failed = False
(inf, ouf, ra, wa) = gconf.rpc_fd.split(',')
- os.close(int(ra))
- os.close(int(wa))
changelog_agent = RepceClient(int(inf), int(ouf))
status = GeorepStatus(gconf.state_file, gconf.local_path)
status.reset_on_worker_start()