diff options
author | Aravinda VK <avishwan@redhat.com> | 2014-04-29 12:14:24 +0530 |
---|---|---|
committer | Venky Shankar <vshankar@redhat.com> | 2014-05-09 00:27:40 -0700 |
commit | c7b0396f680863528248e6f5a162de47184b6c88 (patch) | |
tree | ead2f295d041df1e258db4bf09fe10944b15f4d7 /geo-replication/syncdaemon/monitor.py | |
parent | 65757e0f57f93103d87fdf9534c5ca25b66d14b7 (diff) |
geo-rep: Pause and Resume feature for geo-replication
Changelog consumption/processing now happens in seperate process
group than monitor. When monitor process group gets SIGSTOP all
worker process, ssh, rsync will be paused except the changelog
processing. When it gets SIGCONT it resumes its operation.
Changelog agent runs as RepceServer, geo-rep worker communicates
with changelog agent using RepceClient.
Change-Id: I35c333e4d8b13d03a7808aed601960eef23cfa04
BUG: 1093602
Signed-off-by: Venky Shankar <vshankar@redhat.com>
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/7322
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 34 |
1 files changed, 32 insertions, 2 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index 8ed6f832618..e49a24ee5f5 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -108,7 +108,7 @@ class Monitor(object): # give a chance to graceful exit os.kill(-os.getpid(), signal.SIGTERM) - def monitor(self, w, argv, cpids): + def monitor(self, w, argv, cpids, agents): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -149,6 +149,23 @@ class Monitor(object): while ret in (0, 1): logging.info('-' * conn_timeout) logging.info('starting gsyncd worker') + + # Couple of pipe pairs for RPC communication b/w + # worker and changelog agent. + + # read/write end for agent + (ra, ww) = os.pipe() + # read/write end for worker + (rw, wa) = os.pipe() + + # spawn the agent process + apid = os.fork() + if apid == 0: + os.execv(sys.executable, argv + ['--local-path', w[0], + '--agent', + '--rpc-fd', + ','.join([str(ra), str(wa), + str(rw), str(ww)])]) pr, pw = os.pipe() cpid = os.fork() if cpid == 0: @@ -157,14 +174,26 @@ class Monitor(object): '--local-path', w[0], '--local-id', '.' + escape(w[0]), + '--rpc-fd', + ','.join([str(rw), str(ww), + str(ra), str(wa)]), '--resource-remote', w[1]]) self.lock.acquire() cpids.add(cpid) + agents.add(apid) self.lock.release() os.close(pw) + t0 = time.time() so = select((pr,), (), (), conn_timeout)[0] os.close(pr) + + # close all RPC pipes in monitor + os.close(ra) + os.close(wa) + os.close(rw) + os.close(ww) + if so: ret = nwait(cpid, os.WNOHANG) if ret is not None: @@ -206,10 +235,11 @@ class Monitor(object): argv.insert(0, os.path.basename(sys.executable)) cpids = set() + agents = set() ta = [] for wx in wspx: def wmon(w): - cpid, _ = self.monitor(w, argv, cpids) + cpid, _ = self.monitor(w, argv, cpids, agents) time.sleep(1) self.lock.acquire() for cpid in cpids: |