diff options
Diffstat (limited to 'geo-replication')
-rw-r--r-- | geo-replication/syncdaemon/changelogagent.py | 2 | ||||
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 54 | ||||
-rw-r--r-- | geo-replication/syncdaemon/resource.py | 2 |
3 files changed, 44 insertions, 14 deletions
diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py index ad5f69cfb23..731dbd06f57 100644 --- a/geo-replication/syncdaemon/changelogagent.py +++ b/geo-replication/syncdaemon/changelogagent.py @@ -66,8 +66,6 @@ class Changelog(object): class ChangelogAgent(object): def __init__(self, obj, fd_tup): (inf, ouf, rw, ww) = fd_tup.split(',') - os.close(int(rw)) - os.close(int(ww)) repce = RepceServer(obj, int(inf), int(ouf), 1) t = syncdutils.Thread(target=lambda: (repce.service_loop(), syncdutils.finalize())) diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index c41eb969143..5a6bf5033a4 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -18,7 +18,7 @@ import xml.etree.ElementTree as XET from subprocess import PIPE from resource import Popen, FILE, GLUSTER, SSH from threading import Lock -from errno import EEXIST +from errno import ECHILD import re import random from gconf import gconf @@ -188,10 +188,18 @@ class Monitor(object): ret = 0 def nwait(p, o=0): - p2, r = waitpid(p, o) - if not p2: - return - return r + try: + p2, r = waitpid(p, o) + if not p2: + return + return r + except OSError as e: + # no child process, this happens if the child process + # already died and has been cleaned up + if e.errno == ECHILD: + return -1 + else: + raise def exit_signalled(s): """ child teminated due to receipt of SIGUSR1 """ @@ -240,6 +248,8 @@ class Monitor(object): # spawn the agent process apid = os.fork() if apid == 0: + os.close(rw) + os.close(ww) os.execv(sys.executable, argv + ['--local-path', w[0], '--agent', '--rpc-fd', @@ -249,6 +259,8 @@ class Monitor(object): cpid = os.fork() if cpid == 0: os.close(pr) + os.close(ra) + os.close(wa) os.execv(sys.executable, argv + ['--feedback-fd', str(pw), '--local-path', w[0], '--local-id', @@ -277,30 +289,52 @@ class Monitor(object): if so: ret = nwait(cpid, os.WNOHANG) + ret_agent = nwait(apid, os.WNOHANG) + + if ret_agent is not None: + # Agent is died Kill Worker + logging.info("Changelog Agent died, " + "Aborting Worker(%s)" % w[0]) + os.kill(cpid, signal.SIGKILL) + nwait(cpid) + nwait(apid) + if ret is not None: logging.info("worker(%s) died before establishing " "connection" % w[0]) - nwait(apid) #wait for agent + nwait(apid) # wait for agent else: logging.debug("worker(%s) connected" % w[0]) while time.time() < t0 + conn_timeout: ret = nwait(cpid, os.WNOHANG) + ret_agent = nwait(apid, os.WNOHANG) + if ret is not None: logging.info("worker(%s) died in startup " "phase" % w[0]) - nwait(apid) #wait for agent + nwait(apid) # wait for agent + break + + if ret_agent is not None: + # Agent is died Kill Worker + logging.info("Changelog Agent died, Aborting " + "Worker(%s)" % w[0]) + os.kill(cpid, signal.SIGKILL) + nwait(cpid) + nwait(apid) break + time.sleep(1) else: logging.info("worker(%s) not confirmed in %d sec, " "aborting it" % (w[0], conn_timeout)) os.kill(cpid, signal.SIGKILL) - nwait(apid) #wait for agent + nwait(apid) # wait for agent ret = nwait(cpid) if ret is None: self.status[w[0]].set_worker_status(self.ST_STABLE) - #If worker dies, agent terminates on EOF. - #So lets wait for agent first. + # If worker dies, agent terminates on EOF. + # So lets wait for agent first. nwait(apid) ret = nwait(cpid) if exit_signalled(ret): diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py index 51f88627a96..a44ca914222 100644 --- a/geo-replication/syncdaemon/resource.py +++ b/geo-replication/syncdaemon/resource.py @@ -1394,8 +1394,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote): # g3 ==> changelog History changelog_register_failed = False (inf, ouf, ra, wa) = gconf.rpc_fd.split(',') - os.close(int(ra)) - os.close(int(wa)) changelog_agent = RepceClient(int(inf), int(ouf)) status = GeorepStatus(gconf.state_file, gconf.local_path) status.reset_on_worker_start() |