summaryrefslogtreecommitdiffstats
path: root/glusternagios/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'glusternagios/utils.py')
-rw-r--r--glusternagios/utils.py438
1 files changed, 438 insertions, 0 deletions
diff --git a/glusternagios/utils.py b/glusternagios/utils.py
new file mode 100644
index 0000000..b4d77c6
--- /dev/null
+++ b/glusternagios/utils.py
@@ -0,0 +1,438 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+## most of the code is copied from vdsm project
+
+import logging
+import subprocess
+from cpopen import CPopen
+import io
+import select
+import threading
+from StringIO import StringIO
+from weakref import proxy
+import time
+import os
+import errno
+import signal
+
+
+class CommandPath(object):
+ def __init__(self, name, *args):
+ self.name = name
+ self.paths = args
+ self._cmd = None
+
+ @property
+ def cmd(self):
+ if not self._cmd:
+ for path in self.paths:
+ if os.path.exists(path):
+ self._cmd = path
+ break
+ else:
+ raise OSError(os.errno.ENOENT,
+ os.strerror(os.errno.ENOENT) + ': ' + self.name)
+ return self._cmd
+
+ def __repr__(self):
+ return str(self.cmd)
+
+ def __str__(self):
+ return str(self.cmd)
+
+ def __unicode__(self):
+ return unicode(self.cmd)
+
+
+ioniceCmdPath = CommandPath("ionice",
+ "/bin/ionice",
+ )
+killCmdPath = CommandPath("kill",
+ "/bin/kill",
+ )
+niceCmdPath = CommandPath("nice",
+ "/bin/nice",
+ )
+setsidCmdPath = CommandPath("setsid",
+ "/bin/setsid",
+ )
+sudoCmdPath = CommandPath("sudo",
+ "/bin/sudo",
+ )
+# Buffsize is 1K because I tested it on some use cases and 1K was fastest. If
+# you find this number to be a bottleneck in any way you are welcome to change
+# it
+BUFFSIZE = 1024
+SUDO_NON_INTERACTIVE_FLAG = "-n"
+
+
+# NOTE: it would be best to try and unify NoIntrCall and NoIntrPoll.
+# We could do so defining a new object that can be used as a placeholer
+# for the changing timeout value in the *args/**kwargs. This would
+# lead us to rebuilding the function arguments at each loop.
+def NoIntrPoll(pollfun, timeout=-1):
+ """
+ This wrapper is used to handle the interrupt exceptions that might
+ occur during a poll system call. The wrapped function must be defined
+ as poll([timeout]) where the special timeout value 0 is used to return
+ immediately and -1 is used to wait indefinitely.
+ """
+ # When the timeout < 0 we shouldn't compute a new timeout after an
+ # interruption.
+ endtime = None if timeout < 0 else time.time() + timeout
+
+ while True:
+ try:
+ return pollfun(timeout)
+ except (IOError, select.error) as e:
+ if e.args[0] != errno.EINTR:
+ raise
+
+ if endtime is not None:
+ timeout = max(0, endtime - time.time())
+
+
+class AsyncProc(object):
+ """
+ AsyncProc is a funky class. It wraps a standard subprocess.Popen
+ Object and gives it super powers. Like the power to read from a stream
+ without the fear of deadlock. It does this by always sampling all
+ stream while waiting for data. By doing this the other process can freely
+ write data to all stream without the fear of it getting stuck writing
+ to a full pipe.
+ """
+ class _streamWrapper(io.RawIOBase):
+ def __init__(self, parent, streamToWrap, fd):
+ io.IOBase.__init__(self)
+ self._stream = streamToWrap
+ self._parent = proxy(parent)
+ self._fd = fd
+ self._closed = False
+
+ def close(self):
+ if not self._closed:
+ self._closed = True
+ while not self._streamClosed:
+ self._parent._processStreams()
+
+ @property
+ def closed(self):
+ return self._closed
+
+ @property
+ def _streamClosed(self):
+ return (self.fileno() in self._parent._closedfds)
+
+ def fileno(self):
+ return self._fd
+
+ def seekable(self):
+ return False
+
+ def readable(self):
+ return True
+
+ def writable(self):
+ return True
+
+ def _readNonBlock(self, length):
+ hasNewData = (self._stream.len - self._stream.pos)
+ if hasNewData < length and not self._streamClosed:
+ self._parent._processStreams()
+
+ with self._parent._streamLock:
+ res = self._stream.read(length)
+ if self._stream.pos == self._stream.len:
+ self._stream.truncate(0)
+
+ if res == "" and not self._streamClosed:
+ return None
+ else:
+ return res
+
+ def read(self, length):
+ if not self._parent.blocking:
+ return self._readNonBlock(length)
+ else:
+ res = None
+ while res is None:
+ res = self._readNonBlock(length)
+
+ return res
+
+ def readinto(self, b):
+ data = self.read(len(b))
+ if data is None:
+ return None
+
+ bytesRead = len(data)
+ b[:bytesRead] = data
+
+ return bytesRead
+
+ def write(self, data):
+ if hasattr(data, "tobytes"):
+ data = data.tobytes()
+ with self._parent._streamLock:
+ oldPos = self._stream.pos
+ self._stream.pos = self._stream.len
+ self._stream.write(data)
+ self._stream.pos = oldPos
+
+ while self._stream.len > 0 and not self._streamClosed:
+ self._parent._processStreams()
+
+ if self._streamClosed:
+ self._closed = True
+
+ if self._stream.len != 0:
+ raise IOError(errno.EPIPE,
+ "Could not write all data to stream")
+
+ return len(data)
+
+ def __init__(self, popenToWrap):
+ self._streamLock = threading.Lock()
+ self._proc = popenToWrap
+
+ self._stdout = StringIO()
+ self._stderr = StringIO()
+ self._stdin = StringIO()
+
+ fdout = self._proc.stdout.fileno()
+ fderr = self._proc.stderr.fileno()
+ self._fdin = self._proc.stdin.fileno()
+
+ self._closedfds = []
+
+ self._poller = select.epoll()
+ self._poller.register(fdout, select.EPOLLIN | select.EPOLLPRI)
+ self._poller.register(fderr, select.EPOLLIN | select.EPOLLPRI)
+ self._poller.register(self._fdin, 0)
+ self._fdMap = {fdout: self._stdout,
+ fderr: self._stderr,
+ self._fdin: self._stdin}
+
+ self.stdout = io.BufferedReader(self._streamWrapper(self,
+ self._stdout, fdout), BUFFSIZE)
+
+ self.stderr = io.BufferedReader(self._streamWrapper(self,
+ self._stderr, fderr), BUFFSIZE)
+
+ self.stdin = io.BufferedWriter(self._streamWrapper(self,
+ self._stdin, self._fdin), BUFFSIZE)
+
+ self._returncode = None
+
+ self.blocking = False
+
+ def _processStreams(self):
+ if len(self._closedfds) == 3:
+ return
+
+ if not self._streamLock.acquire(False):
+ self._streamLock.acquire()
+ self._streamLock.release()
+ return
+ try:
+ if self._stdin.len > 0 and self._stdin.pos == 0:
+ # Polling stdin is redundant if there is nothing to write
+ # turn on only if data is waiting to be pushed
+ self._poller.modify(self._fdin, select.EPOLLOUT)
+
+ pollres = NoIntrPoll(self._poller.poll, 1)
+
+ for fd, event in pollres:
+ stream = self._fdMap[fd]
+ if event & select.EPOLLOUT and self._stdin.len > 0:
+ buff = self._stdin.read(BUFFSIZE)
+ written = os.write(fd, buff)
+ stream.pos -= len(buff) - written
+ if stream.pos == stream.len:
+ stream.truncate(0)
+ self._poller.modify(fd, 0)
+
+ elif event & (select.EPOLLIN | select.EPOLLPRI):
+ data = os.read(fd, BUFFSIZE)
+ oldpos = stream.pos
+ stream.pos = stream.len
+ stream.write(data)
+ stream.pos = oldpos
+
+ elif event & (select.EPOLLHUP | select.EPOLLERR):
+ self._poller.unregister(fd)
+ self._closedfds.append(fd)
+ # I don't close the fd because the original Popen
+ # will do it.
+
+ if self.stdin.closed and self._fdin not in self._closedfds:
+ self._poller.unregister(self._fdin)
+ self._closedfds.append(self._fdin)
+ self._proc.stdin.close()
+
+ finally:
+ self._streamLock.release()
+
+ @property
+ def pid(self):
+ return self._proc.pid
+
+ @property
+ def returncode(self):
+ if self._returncode is None:
+ self._returncode = self._proc.poll()
+ return self._returncode
+
+ def kill(self):
+ try:
+ self._proc.kill()
+ except OSError as ex:
+ if ex.errno != errno.EPERM:
+ raise
+ execCmd([killCmdPath.cmd, "-%d" % (signal.SIGTERM,),
+ str(self.pid)], sudo=True)
+
+ def wait(self, timeout=None, cond=None):
+ startTime = time.time()
+ while self.returncode is None:
+ if timeout is not None and (time.time() - startTime) > timeout:
+ return False
+ if cond is not None and cond():
+ return False
+ self._processStreams()
+ return True
+
+ def communicate(self, data=None):
+ if data is not None:
+ self.stdin.write(data)
+ self.stdin.flush()
+ self.stdin.close()
+
+ self.wait()
+ return "".join(self.stdout), "".join(self.stderr)
+
+ def __del__(self):
+ self._poller.close()
+
+
+def execCmd(command, sudo=False, cwd=None, data=None, raw=False, logErr=True,
+ printable=None, env=None, sync=True, nice=None, ioclass=None,
+ ioclassdata=None, setsid=False, execCmdLogger=logging.root,
+ deathSignal=0, childUmask=None):
+ """
+ Executes an external command, optionally via sudo.
+
+ IMPORTANT NOTE: the new process would receive `deathSignal` when the
+ controlling thread dies, which may not be what you intended: if you create
+ a temporary thread, spawn a sync=False sub-process, and have the thread
+ finish, the new subprocess would die immediately.
+ """
+ if ioclass is not None:
+ cmd = command
+ command = [ioniceCmdPath.cmd, '-c', str(ioclass)]
+ if ioclassdata is not None:
+ command.extend(("-n", str(ioclassdata)))
+
+ command = command + cmd
+
+ if nice is not None:
+ command = [niceCmdPath.cmd, '-n', str(nice)] + command
+
+ if setsid:
+ command = [setsidCmdPath.cmd] + command
+
+ if sudo:
+ command = [sudoCmdPath.cmd, SUDO_NON_INTERACTIVE_FLAG] + command
+
+ if not printable:
+ printable = command
+
+ cmdline = repr(subprocess.list2cmdline(printable))
+ execCmdLogger.debug("%s (cwd %s)", cmdline, cwd)
+
+ p = CPopen(command, close_fds=True, cwd=cwd, env=env,
+ deathSignal=deathSignal, childUmask=childUmask)
+ p = AsyncProc(p)
+ if not sync:
+ if data is not None:
+ p.stdin.write(data)
+ p.stdin.flush()
+
+ return p
+
+ (out, err) = p.communicate(data)
+
+ if out is None:
+ # Prevent splitlines() from barfing later on
+ out = ""
+
+ execCmdLogger.debug("%s: <err> = %s; <rc> = %d",
+ {True: "SUCCESS", False: "FAILED"}[p.returncode == 0],
+ repr(err), p.returncode)
+
+ if not raw:
+ out = out.splitlines(False)
+ err = err.splitlines(False)
+
+ return (p.returncode, out, err)
+
+
+def retry(func, expectedException=Exception, tries=None,
+ timeout=None, sleep=1, stopCallback=None):
+ """
+ Retry a function. Wraps the retry logic so you don't have to
+ implement it each time you need it.
+
+ :param func: The callable to run.
+ :param expectedException: The exception you expect to receive when the
+ function fails.
+ :param tries: The number of times to try. None\0,-1 means infinite.
+ :param timeout: The time you want to spend waiting. This **WILL NOT** stop
+ the method. It will just not run it if it ended after the
+ timeout.
+ :param sleep: Time to sleep between calls in seconds.
+ :param stopCallback: A function that takes no parameters and causes the
+ method to stop retrying when it returns with a
+ positive value.
+ """
+ if tries in [0, None]:
+ tries = -1
+
+ if timeout in [0, None]:
+ timeout = -1
+
+ startTime = time.time()
+
+ while True:
+ tries -= 1
+ try:
+ return func()
+ except expectedException:
+ if tries == 0:
+ raise
+
+ if (timeout > 0) and ((time.time() - startTime) > timeout):
+ raise
+
+ if stopCallback is not None and stopCallback():
+ raise
+
+ time.sleep(sleep)