path: root/glusternagios
diff options
authorBala.FA <>2014-03-07 18:28:09 +0530
committerBala.FA <>2014-04-28 16:20:46 +0530
commit8f5f4537af7790be386974628c804a7bc719b738 (patch)
treeea134e1411640bdacf116332afc6a82c86ee28b0 /glusternagios
parent79b4c3e202e0ce07d13d28b88f04c4dc79edfb12 (diff)
Initial commitv0.1.0
Change-Id: Ie8fdd046d111a4a46abe0e162985e833323bfd7d Signed-off-by: Bala.FA <>
Diffstat (limited to 'glusternagios')
3 files changed, 461 insertions, 0 deletions
diff --git a/glusternagios/ b/glusternagios/
new file mode 100644
index 0000000..55f8642
--- /dev/null
+++ b/glusternagios/
@@ -0,0 +1,4 @@
+dist_glusternagioscommonpylib_PYTHON = \
+ \
+ \
+ $(NULL)
diff --git a/glusternagios/ b/glusternagios/
new file mode 100644
index 0000000..ff21495
--- /dev/null
+++ b/glusternagios/
@@ -0,0 +1,19 @@
+# Copyright 2014 Red Hat, Inc.
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+# Refer to the README and COPYING files for full details of the license
diff --git a/glusternagios/ b/glusternagios/
new file mode 100644
index 0000000..b4d77c6
--- /dev/null
+++ b/glusternagios/
@@ -0,0 +1,438 @@
+# Copyright 2014 Red Hat, Inc.
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+# Refer to the README and COPYING files for full details of the license
+## most of the code is copied from vdsm project
+import logging
+import subprocess
+from cpopen import CPopen
+import io
+import select
+import threading
+from StringIO import StringIO
+from weakref import proxy
+import time
+import os
+import errno
+import signal
+class CommandPath(object):
+ def __init__(self, name, *args):
+ = name
+ self.paths = args
+ self._cmd = None
+ @property
+ def cmd(self):
+ if not self._cmd:
+ for path in self.paths:
+ if os.path.exists(path):
+ self._cmd = path
+ break
+ else:
+ raise OSError(os.errno.ENOENT,
+ os.strerror(os.errno.ENOENT) + ': ' +
+ return self._cmd
+ def __repr__(self):
+ return str(self.cmd)
+ def __str__(self):
+ return str(self.cmd)
+ def __unicode__(self):
+ return unicode(self.cmd)
+ioniceCmdPath = CommandPath("ionice",
+ "/bin/ionice",
+ )
+killCmdPath = CommandPath("kill",
+ "/bin/kill",
+ )
+niceCmdPath = CommandPath("nice",
+ "/bin/nice",
+ )
+setsidCmdPath = CommandPath("setsid",
+ "/bin/setsid",
+ )
+sudoCmdPath = CommandPath("sudo",
+ "/bin/sudo",
+ )
+# Buffsize is 1K because I tested it on some use cases and 1K was fastest. If
+# you find this number to be a bottleneck in any way you are welcome to change
+# it
+BUFFSIZE = 1024
+# NOTE: it would be best to try and unify NoIntrCall and NoIntrPoll.
+# We could do so defining a new object that can be used as a placeholer
+# for the changing timeout value in the *args/**kwargs. This would
+# lead us to rebuilding the function arguments at each loop.
+def NoIntrPoll(pollfun, timeout=-1):
+ """
+ This wrapper is used to handle the interrupt exceptions that might
+ occur during a poll system call. The wrapped function must be defined
+ as poll([timeout]) where the special timeout value 0 is used to return
+ immediately and -1 is used to wait indefinitely.
+ """
+ # When the timeout < 0 we shouldn't compute a new timeout after an
+ # interruption.
+ endtime = None if timeout < 0 else time.time() + timeout
+ while True:
+ try:
+ return pollfun(timeout)
+ except (IOError, select.error) as e:
+ if e.args[0] != errno.EINTR:
+ raise
+ if endtime is not None:
+ timeout = max(0, endtime - time.time())
+class AsyncProc(object):
+ """
+ AsyncProc is a funky class. It wraps a standard subprocess.Popen
+ Object and gives it super powers. Like the power to read from a stream
+ without the fear of deadlock. It does this by always sampling all
+ stream while waiting for data. By doing this the other process can freely
+ write data to all stream without the fear of it getting stuck writing
+ to a full pipe.
+ """
+ class _streamWrapper(io.RawIOBase):
+ def __init__(self, parent, streamToWrap, fd):
+ io.IOBase.__init__(self)
+ self._stream = streamToWrap
+ self._parent = proxy(parent)
+ self._fd = fd
+ self._closed = False
+ def close(self):
+ if not self._closed:
+ self._closed = True
+ while not self._streamClosed:
+ self._parent._processStreams()
+ @property
+ def closed(self):
+ return self._closed
+ @property
+ def _streamClosed(self):
+ return (self.fileno() in self._parent._closedfds)
+ def fileno(self):
+ return self._fd
+ def seekable(self):
+ return False
+ def readable(self):
+ return True
+ def writable(self):
+ return True
+ def _readNonBlock(self, length):
+ hasNewData = (self._stream.len - self._stream.pos)
+ if hasNewData < length and not self._streamClosed:
+ self._parent._processStreams()
+ with self._parent._streamLock:
+ res =
+ if self._stream.pos == self._stream.len:
+ self._stream.truncate(0)
+ if res == "" and not self._streamClosed:
+ return None
+ else:
+ return res
+ def read(self, length):
+ if not self._parent.blocking:
+ return self._readNonBlock(length)
+ else:
+ res = None
+ while res is None:
+ res = self._readNonBlock(length)
+ return res
+ def readinto(self, b):
+ data =
+ if data is None:
+ return None
+ bytesRead = len(data)
+ b[:bytesRead] = data
+ return bytesRead
+ def write(self, data):
+ if hasattr(data, "tobytes"):
+ data = data.tobytes()
+ with self._parent._streamLock:
+ oldPos = self._stream.pos
+ self._stream.pos = self._stream.len
+ self._stream.write(data)
+ self._stream.pos = oldPos
+ while self._stream.len > 0 and not self._streamClosed:
+ self._parent._processStreams()
+ if self._streamClosed:
+ self._closed = True
+ if self._stream.len != 0:
+ raise IOError(errno.EPIPE,
+ "Could not write all data to stream")
+ return len(data)
+ def __init__(self, popenToWrap):
+ self._streamLock = threading.Lock()
+ self._proc = popenToWrap
+ self._stdout = StringIO()
+ self._stderr = StringIO()
+ self._stdin = StringIO()
+ fdout = self._proc.stdout.fileno()
+ fderr = self._proc.stderr.fileno()
+ self._fdin = self._proc.stdin.fileno()
+ self._closedfds = []
+ self._poller = select.epoll()
+ self._poller.register(fdout, select.EPOLLIN | select.EPOLLPRI)
+ self._poller.register(fderr, select.EPOLLIN | select.EPOLLPRI)
+ self._poller.register(self._fdin, 0)
+ self._fdMap = {fdout: self._stdout,
+ fderr: self._stderr,
+ self._fdin: self._stdin}
+ self.stdout = io.BufferedReader(self._streamWrapper(self,
+ self._stdout, fdout), BUFFSIZE)
+ self.stderr = io.BufferedReader(self._streamWrapper(self,
+ self._stderr, fderr), BUFFSIZE)
+ self.stdin = io.BufferedWriter(self._streamWrapper(self,
+ self._stdin, self._fdin), BUFFSIZE)
+ self._returncode = None
+ self.blocking = False
+ def _processStreams(self):
+ if len(self._closedfds) == 3:
+ return
+ if not self._streamLock.acquire(False):
+ self._streamLock.acquire()
+ self._streamLock.release()
+ return
+ try:
+ if self._stdin.len > 0 and self._stdin.pos == 0:
+ # Polling stdin is redundant if there is nothing to write
+ # turn on only if data is waiting to be pushed
+ self._poller.modify(self._fdin, select.EPOLLOUT)
+ pollres = NoIntrPoll(self._poller.poll, 1)
+ for fd, event in pollres:
+ stream = self._fdMap[fd]
+ if event & select.EPOLLOUT and self._stdin.len > 0:
+ buff =
+ written = os.write(fd, buff)
+ stream.pos -= len(buff) - written
+ if stream.pos == stream.len:
+ stream.truncate(0)
+ self._poller.modify(fd, 0)
+ elif event & (select.EPOLLIN | select.EPOLLPRI):
+ data =, BUFFSIZE)
+ oldpos = stream.pos
+ stream.pos = stream.len
+ stream.write(data)
+ stream.pos = oldpos
+ elif event & (select.EPOLLHUP | select.EPOLLERR):
+ self._poller.unregister(fd)
+ self._closedfds.append(fd)
+ # I don't close the fd because the original Popen
+ # will do it.
+ if self.stdin.closed and self._fdin not in self._closedfds:
+ self._poller.unregister(self._fdin)
+ self._closedfds.append(self._fdin)
+ self._proc.stdin.close()
+ finally:
+ self._streamLock.release()
+ @property
+ def pid(self):
+ return
+ @property
+ def returncode(self):
+ if self._returncode is None:
+ self._returncode = self._proc.poll()
+ return self._returncode
+ def kill(self):
+ try:
+ self._proc.kill()
+ except OSError as ex:
+ if ex.errno != errno.EPERM:
+ raise
+ execCmd([killCmdPath.cmd, "-%d" % (signal.SIGTERM,),
+ str(], sudo=True)
+ def wait(self, timeout=None, cond=None):
+ startTime = time.time()
+ while self.returncode is None:
+ if timeout is not None and (time.time() - startTime) > timeout:
+ return False
+ if cond is not None and cond():
+ return False
+ self._processStreams()
+ return True
+ def communicate(self, data=None):
+ if data is not None:
+ self.stdin.write(data)
+ self.stdin.flush()
+ self.stdin.close()
+ self.wait()
+ return "".join(self.stdout), "".join(self.stderr)
+ def __del__(self):
+ self._poller.close()
+def execCmd(command, sudo=False, cwd=None, data=None, raw=False, logErr=True,
+ printable=None, env=None, sync=True, nice=None, ioclass=None,
+ ioclassdata=None, setsid=False, execCmdLogger=logging.root,
+ deathSignal=0, childUmask=None):
+ """
+ Executes an external command, optionally via sudo.
+ IMPORTANT NOTE: the new process would receive `deathSignal` when the
+ controlling thread dies, which may not be what you intended: if you create
+ a temporary thread, spawn a sync=False sub-process, and have the thread
+ finish, the new subprocess would die immediately.
+ """
+ if ioclass is not None:
+ cmd = command
+ command = [ioniceCmdPath.cmd, '-c', str(ioclass)]
+ if ioclassdata is not None:
+ command.extend(("-n", str(ioclassdata)))
+ command = command + cmd
+ if nice is not None:
+ command = [niceCmdPath.cmd, '-n', str(nice)] + command
+ if setsid:
+ command = [setsidCmdPath.cmd] + command
+ if sudo:
+ command = [sudoCmdPath.cmd, SUDO_NON_INTERACTIVE_FLAG] + command
+ if not printable:
+ printable = command
+ cmdline = repr(subprocess.list2cmdline(printable))
+ execCmdLogger.debug("%s (cwd %s)", cmdline, cwd)
+ p = CPopen(command, close_fds=True, cwd=cwd, env=env,
+ deathSignal=deathSignal, childUmask=childUmask)
+ p = AsyncProc(p)
+ if not sync:
+ if data is not None:
+ p.stdin.write(data)
+ p.stdin.flush()
+ return p
+ (out, err) = p.communicate(data)
+ if out is None:
+ # Prevent splitlines() from barfing later on
+ out = ""
+ execCmdLogger.debug("%s: <err> = %s; <rc> = %d",
+ {True: "SUCCESS", False: "FAILED"}[p.returncode == 0],
+ repr(err), p.returncode)
+ if not raw:
+ out = out.splitlines(False)
+ err = err.splitlines(False)
+ return (p.returncode, out, err)
+def retry(func, expectedException=Exception, tries=None,
+ timeout=None, sleep=1, stopCallback=None):
+ """
+ Retry a function. Wraps the retry logic so you don't have to
+ implement it each time you need it.
+ :param func: The callable to run.
+ :param expectedException: The exception you expect to receive when the
+ function fails.
+ :param tries: The number of times to try. None\0,-1 means infinite.
+ :param timeout: The time you want to spend waiting. This **WILL NOT** stop
+ the method. It will just not run it if it ended after the
+ timeout.
+ :param sleep: Time to sleep between calls in seconds.
+ :param stopCallback: A function that takes no parameters and causes the
+ method to stop retrying when it returns with a
+ positive value.
+ """
+ if tries in [0, None]:
+ tries = -1
+ if timeout in [0, None]:
+ timeout = -1
+ startTime = time.time()
+ while True:
+ tries -= 1
+ try:
+ return func()
+ except expectedException:
+ if tries == 0:
+ raise
+ if (timeout > 0) and ((time.time() - startTime) > timeout):
+ raise
+ if stopCallback is not None and stopCallback():
+ raise
+ time.sleep(sleep)