summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/monitor.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2014-03-21 12:33:10 +0530
committerVijay Bellur <vbellur@redhat.com>2014-04-07 21:56:55 -0700
commit238d101e55e067e5afcd43c728884e9ab8d36549 (patch)
tree60498b107335c0ae526bfa034bd56303406710ab /geo-replication/syncdaemon/monitor.py
parent0c20b17c09b2eca82f3c79013fd3fe1c72a957fd (diff)
geo-rep: code pep8/flake8 fixes
pep8 is a style guide for python. http://legacy.python.org/dev/peps/pep-0008/ pep8 can be installed using, `pip install pep8` Usage: `pep8 <python file>`, For example, `pep8 master.py` will display all the coding standard errors. flake8 is used to identify unused imports and other issues in code. pip install flake8 cd $GLUSTER_REPO/geo-replication/ flake8 syncdaemon Updated license headers to each source file. Change-Id: I01c7d0a6091d21bfa48720e9fb5624b77fa3db4a Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/7311 Reviewed-by: Kotresh HR <khiremat@redhat.com> Reviewed-by: Prashanth Pai <ppai@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r--geo-replication/syncdaemon/monitor.py78
1 files changed, 53 insertions, 25 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
index b0262ee30a8..8ed6f832618 100644
--- a/geo-replication/syncdaemon/monitor.py
+++ b/geo-replication/syncdaemon/monitor.py
@@ -1,3 +1,13 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
import sys
import time
@@ -9,12 +19,16 @@ from subprocess import PIPE
from resource import Popen, FILE, GLUSTER, SSH
from threading import Lock
from gconf import gconf
-from syncdutils import update_file, select, waitpid, set_term_handler, is_host_local, GsyncdError
+from syncdutils import update_file, select, waitpid
+from syncdutils import set_term_handler, is_host_local, GsyncdError
from syncdutils import escape, Thread, finalize, memoize
+
class Volinfo(object):
+
def __init__(self, vol, host='localhost', prelude=[]):
- po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host, 'volume', 'info', vol],
+ po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
+ 'volume', 'info', vol],
stdout=PIPE, stderr=PIPE)
vix = po.stdout.read()
po.wait()
@@ -25,7 +39,8 @@ class Volinfo(object):
via = '(via %s) ' % prelude.join(' ')
else:
via = ' '
- raise GsyncdError('getting volume info of %s%s failed with errorcode %s',
+ raise GsyncdError('getting volume info of %s%s '
+ 'failed with errorcode %s',
(vol, via, vi.find('opErrno').text))
self.tree = vi
self.volume = vol
@@ -40,25 +55,27 @@ class Volinfo(object):
def bparse(b):
host, dirp = b.text.split(':', 2)
return {'host': host, 'dir': dirp}
- return [ bparse(b) for b in self.get('brick') ]
+ return [bparse(b) for b in self.get('brick')]
@property
@memoize
def uuid(self):
ids = self.get('id')
if len(ids) != 1:
- raise GsyncdError("volume info of %s obtained from %s: ambiguous uuid",
+ raise GsyncdError("volume info of %s obtained from %s: "
+ "ambiguous uuid",
self.volume, self.host)
return ids[0].text
class Monitor(object):
+
"""class which spawns and manages gsyncd workers"""
- ST_INIT = 'Initializing...'
- ST_STABLE = 'Stable'
- ST_FAULTY = 'faulty'
- ST_INCON = 'inconsistent'
+ ST_INIT = 'Initializing...'
+ ST_STABLE = 'Stable'
+ ST_FAULTY = 'faulty'
+ ST_INCON = 'inconsistent'
_ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]
def __init__(self):
@@ -68,7 +85,8 @@ class Monitor(object):
def set_state(self, state, w=None):
"""set the state that can be used by external agents
like glusterd for status reporting"""
- computestate = lambda: self.state and self._ST_ORD[max(self._ST_ORD.index(s) for s in self.state.values())]
+ computestate = lambda: self.state and self._ST_ORD[
+ max(self._ST_ORD.index(s) for s in self.state.values())]
if w:
self.lock.acquire()
old_state = computestate()
@@ -112,14 +130,17 @@ class Monitor(object):
self.set_state(self.ST_INIT, w)
ret = 0
+
def nwait(p, o=0):
p2, r = waitpid(p, o)
if not p2:
return
return r
+
def exit_signalled(s):
""" child teminated due to receipt of SIGUSR1 """
return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))
+
def exit_status(s):
if os.WIFEXITED(s):
return os.WEXITSTATUS(s)
@@ -134,7 +155,8 @@ class Monitor(object):
os.close(pr)
os.execv(sys.executable, argv + ['--feedback-fd', str(pw),
'--local-path', w[0],
- '--local-id', '.' + escape(w[0]),
+ '--local-id',
+ '.' + escape(w[0]),
'--resource-remote', w[1]])
self.lock.acquire()
cpids.add(cpid)
@@ -145,31 +167,31 @@ class Monitor(object):
os.close(pr)
if so:
ret = nwait(cpid, os.WNOHANG)
- if ret != None:
- logging.info("worker(%s) died before establishing " \
+ if ret is not None:
+ logging.info("worker(%s) died before establishing "
"connection" % w[0])
else:
logging.debug("worker(%s) connected" % w[0])
while time.time() < t0 + conn_timeout:
ret = nwait(cpid, os.WNOHANG)
- if ret != None:
- logging.info("worker(%s) died in startup " \
+ if ret is not None:
+ logging.info("worker(%s) died in startup "
"phase" % w[0])
break
time.sleep(1)
else:
- logging.info("worker(%s) not confirmed in %d sec, " \
+ logging.info("worker(%s) not confirmed in %d sec, "
"aborting it" % (w[0], conn_timeout))
os.kill(cpid, signal.SIGKILL)
ret = nwait(cpid)
- if ret == None:
+ if ret is None:
self.set_state(self.ST_STABLE, w)
ret = nwait(cpid)
if exit_signalled(ret):
ret = 0
else:
ret = exit_status(ret)
- if ret in (0,1):
+ if ret in (0, 1):
self.set_state(self.ST_FAULTY, w)
time.sleep(10)
self.set_state(self.ST_INCON, w)
@@ -194,17 +216,18 @@ class Monitor(object):
os.kill(cpid, signal.SIGKILL)
self.lock.release()
finalize(exval=1)
- t = Thread(target = wmon, args=[wx])
+ t = Thread(target=wmon, args=[wx])
t.start()
ta.append(t)
for t in ta:
t.join()
+
def distribute(*resources):
master, slave = resources
mvol = Volinfo(master.volume, master.host)
logging.debug('master bricks: ' + repr(mvol.bricks))
- prelude = []
+ prelude = []
si = slave
if isinstance(slave, SSH):
prelude = gconf.ssh_command.split() + [slave.remote_addr]
@@ -221,23 +244,28 @@ def distribute(*resources):
raise GsyncdError("unkown slave type " + slave.url)
logging.info('slave bricks: ' + repr(sbricks))
if isinstance(si, FILE):
- slaves = [ slave.url ]
+ slaves = [slave.url]
else:
slavenodes = set(b['host'] for b in sbricks)
if isinstance(slave, SSH) and not gconf.isolated_slave:
rap = SSH.parse_ssh_address(slave)
- slaves = [ 'ssh://' + rap['user'] + '@' + h + ':' + si.url for h in slavenodes ]
+ slaves = ['ssh://' + rap['user'] + '@' + h + ':' + si.url
+ for h in slavenodes]
else:
- slavevols = [ h + ':' + si.volume for h in slavenodes ]
+ slavevols = [h + ':' + si.volume for h in slavenodes]
if isinstance(slave, SSH):
- slaves = [ 'ssh://' + rap.remote_addr + ':' + v for v in slavevols ]
+ slaves = ['ssh://' + rap.remote_addr + ':' + v
+ for v in slavevols]
else:
slaves = slavevols
- workerspex = [ (brick['dir'], slaves[idx % len(slaves)]) for idx, brick in enumerate(mvol.bricks) if is_host_local(brick['host']) ]
+ workerspex = [(brick['dir'], slaves[idx % len(slaves)])
+ for idx, brick in enumerate(mvol.bricks)
+ if is_host_local(brick['host'])]
logging.info('worker specs: ' + repr(workerspex))
return workerspex, suuid
+
def monitor(*resources):
"""oh yeah, actually Monitor is used as singleton, too"""
return Monitor().multiplex(*distribute(*resources))