summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/monitor.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2014-04-29 12:14:24 +0530
committerVenky Shankar <vshankar@redhat.com>2014-05-09 00:27:40 -0700
commitc7b0396f680863528248e6f5a162de47184b6c88 (patch)
treeead2f295d041df1e258db4bf09fe10944b15f4d7 /geo-replication/syncdaemon/monitor.py
parent65757e0f57f93103d87fdf9534c5ca25b66d14b7 (diff)
geo-rep: Pause and Resume feature for geo-replication
Changelog consumption/processing now happens in seperate process group than monitor. When monitor process group gets SIGSTOP all worker process, ssh, rsync will be paused except the changelog processing. When it gets SIGCONT it resumes its operation. Changelog agent runs as RepceServer, geo-rep worker communicates with changelog agent using RepceClient. Change-Id: I35c333e4d8b13d03a7808aed601960eef23cfa04 BUG: 1093602 Signed-off-by: Venky Shankar <vshankar@redhat.com> Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/7322
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r--geo-replication/syncdaemon/monitor.py34
1 files changed, 32 insertions, 2 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index 8ed6f832618..e49a24ee5f5 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -108,7 +108,7 @@ class Monitor(object):
# give a chance to graceful exit
os.kill(-os.getpid(), signal.SIGTERM)
- def monitor(self, w, argv, cpids):
+ def monitor(self, w, argv, cpids, agents):
"""the monitor loop
Basic logic is a blantantly simple blunt heuristics:
@@ -149,6 +149,23 @@ class Monitor(object):
while ret in (0, 1):
logging.info('-' * conn_timeout)
logging.info('starting gsyncd worker')
+
+ # Couple of pipe pairs for RPC communication b/w
+ # worker and changelog agent.
+
+ # read/write end for agent
+ (ra, ww) = os.pipe()
+ # read/write end for worker
+ (rw, wa) = os.pipe()
+
+ # spawn the agent process
+ apid = os.fork()
+ if apid == 0:
+ os.execv(sys.executable, argv + ['--local-path', w[0],
+ '--agent',
+ '--rpc-fd',
+ ','.join([str(ra), str(wa),
+ str(rw), str(ww)])])
pr, pw = os.pipe()
cpid = os.fork()
if cpid == 0:
@@ -157,14 +174,26 @@ class Monitor(object):
'--local-path', w[0],
'--local-id',
'.' + escape(w[0]),
+ '--rpc-fd',
+ ','.join([str(rw), str(ww),
+ str(ra), str(wa)]),
'--resource-remote', w[1]])
self.lock.acquire()
cpids.add(cpid)
+ agents.add(apid)
self.lock.release()
os.close(pw)
+
t0 = time.time()
so = select((pr,), (), (), conn_timeout)[0]
os.close(pr)
+
+ # close all RPC pipes in monitor
+ os.close(ra)
+ os.close(wa)
+ os.close(rw)
+ os.close(ww)
+
if so:
ret = nwait(cpid, os.WNOHANG)
if ret is not None:
@@ -206,10 +235,11 @@ class Monitor(object):
argv.insert(0, os.path.basename(sys.executable))
cpids = set()
+ agents = set()
ta = []
for wx in wspx:
def wmon(w):
- cpid, _ = self.monitor(w, argv, cpids)
+ cpid, _ = self.monitor(w, argv, cpids, agents)
time.sleep(1)
self.lock.acquire()
for cpid in cpids: