summaryrefslogtreecommitdiffstats
path: root/gluster/swift/common/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'gluster/swift/common/utils.py')
-rw-r--r--gluster/swift/common/utils.py208
1 files changed, 206 insertions, 2 deletions
diff --git a/gluster/swift/common/utils.py b/gluster/swift/common/utils.py
index 8f68319..ac41698 100644
--- a/gluster/swift/common/utils.py
+++ b/gluster/swift/common/utils.py
@@ -14,17 +14,23 @@
# limitations under the License.
import os
+import sys
import stat
import json
import errno
import random
import logging
from hashlib import md5
-from eventlet import sleep
+from eventlet import sleep, Timeout, tpool, greenthread, \
+ greenio, event
+from Queue import Queue, Empty
+import threading as stdlib_threading
+
import cPickle as pickle
from cStringIO import StringIO
import pickletools
-from gluster.swift.common.exceptions import GlusterFileSystemIOError
+from gluster.swift.common.exceptions import GlusterFileSystemIOError, \
+ ThreadPoolDead
from swift.common.exceptions import DiskFileNoSpace
from swift.common.db import utf8encodekeys
from gluster.swift.common.fs_utils import do_getctime, do_getmtime, do_stat, \
@@ -69,6 +75,204 @@ PICKLE_PROTOCOL = 2
CHUNK_SIZE = 65536
+class ThreadPool(object):
+ """
+ Perform blocking operations in background threads.
+
+ Call its methods from within greenlets to green-wait for results without
+ blocking the eventlet reactor (hopefully).
+ """
+
+ BYTE = 'a'.encode('utf-8')
+
+ def __init__(self, nthreads=2):
+ self.nthreads = nthreads
+ self._run_queue = Queue()
+ self._result_queue = Queue()
+ self._threads = []
+ self._alive = True
+
+ if nthreads <= 0:
+ return
+
+ # We spawn a greenthread whose job it is to pull results from the
+ # worker threads via a real Queue and send them to eventlet Events so
+ # that the calling greenthreads can be awoken.
+ #
+ # Since each OS thread has its own collection of greenthreads, it
+ # doesn't work to have the worker thread send stuff to the event, as
+ # it then notifies its own thread-local eventlet hub to wake up, which
+ # doesn't do anything to help out the actual calling greenthread over
+ # in the main thread.
+ #
+ # Thus, each worker sticks its results into a result queue and then
+ # writes a byte to a pipe, signaling the result-consuming greenlet (in
+ # the main thread) to wake up and consume results.
+ #
+ # This is all stuff that eventlet.tpool does, but that code can't have
+ # multiple instances instantiated. Since the object server uses one
+ # pool per disk, we have to reimplement this stuff.
+ _raw_rpipe, self.wpipe = os.pipe()
+ self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb', bufsize=0)
+
+ for _junk in xrange(nthreads):
+ thr = stdlib_threading.Thread(
+ target=self._worker,
+ args=(self._run_queue, self._result_queue))
+ thr.daemon = True
+ thr.start()
+ self._threads.append(thr)
+ # This is the result-consuming greenthread that runs in the main OS
+ # thread, as described above.
+ self._consumer_coro = greenthread.spawn_n(self._consume_results,
+ self._result_queue)
+
+ def _worker(self, work_queue, result_queue):
+ """
+ Pulls an item from the queue and runs it, then puts the result into
+ the result queue. Repeats forever.
+
+ :param work_queue: queue from which to pull work
+ :param result_queue: queue into which to place results
+ """
+ while True:
+ item = work_queue.get()
+ if item is None:
+ break
+ ev, func, args, kwargs = item
+ try:
+ result = func(*args, **kwargs)
+ result_queue.put((ev, True, result))
+ except BaseException:
+ result_queue.put((ev, False, sys.exc_info()))
+ finally:
+ work_queue.task_done()
+ os.write(self.wpipe, self.BYTE)
+
+ def _consume_results(self, queue):
+ """
+ Runs as a greenthread in the same OS thread as callers of
+ run_in_thread().
+
+ Takes results from the worker OS threads and sends them to the waiting
+ greenthreads.
+ """
+ while True:
+ try:
+ self.rpipe.read(1)
+ except ValueError:
+ # can happen at process shutdown when pipe is closed
+ break
+
+ while True:
+ try:
+ ev, success, result = queue.get(block=False)
+ except Empty:
+ break
+
+ try:
+ if success:
+ ev.send(result)
+ else:
+ ev.send_exception(*result)
+ finally:
+ queue.task_done()
+
+ def run_in_thread(self, func, *args, **kwargs):
+ """
+ Runs func(*args, **kwargs) in a thread. Blocks the current greenlet
+ until results are available.
+
+ Exceptions thrown will be reraised in the calling thread.
+
+ If the threadpool was initialized with nthreads=0, it invokes
+ func(*args, **kwargs) directly, followed by eventlet.sleep() to ensure
+ the eventlet hub has a chance to execute. It is more likely the hub
+ will be invoked when queuing operations to an external thread.
+
+ :returns: result of calling func
+ :raises: whatever func raises
+ """
+ if not self._alive:
+ raise ThreadPoolDead()
+
+ if self.nthreads <= 0:
+ result = func(*args, **kwargs)
+ sleep()
+ return result
+
+ ev = event.Event()
+ self._run_queue.put((ev, func, args, kwargs), block=False)
+
+ # blocks this greenlet (and only *this* greenlet) until the real
+ # thread calls ev.send().
+ result = ev.wait()
+ return result
+
+ def _run_in_eventlet_tpool(self, func, *args, **kwargs):
+ """
+ Really run something in an external thread, even if we haven't got any
+ threads of our own.
+ """
+ def inner():
+ try:
+ return (True, func(*args, **kwargs))
+ except (Timeout, BaseException) as err:
+ return (False, err)
+
+ success, result = tpool.execute(inner)
+ if success:
+ return result
+ else:
+ raise result
+
+ def force_run_in_thread(self, func, *args, **kwargs):
+ """
+ Runs func(*args, **kwargs) in a thread. Blocks the current greenlet
+ until results are available.
+
+ Exceptions thrown will be reraised in the calling thread.
+
+ If the threadpool was initialized with nthreads=0, uses eventlet.tpool
+ to run the function. This is in contrast to run_in_thread(), which
+ will (in that case) simply execute func in the calling thread.
+
+ :returns: result of calling func
+ :raises: whatever func raises
+ """
+ if not self._alive:
+ raise ThreadPoolDead()
+
+ if self.nthreads <= 0:
+ return self._run_in_eventlet_tpool(func, *args, **kwargs)
+ else:
+ return self.run_in_thread(func, *args, **kwargs)
+
+ def terminate(self):
+ """
+ Releases the threadpool's resources (OS threads, greenthreads, pipes,
+ etc.) and renders it unusable.
+
+ Don't call run_in_thread() or force_run_in_thread() after calling
+ terminate().
+ """
+ self._alive = False
+ if self.nthreads <= 0:
+ return
+
+ for _junk in range(self.nthreads):
+ self._run_queue.put(None)
+ for thr in self._threads:
+ thr.join()
+ self._threads = []
+ self.nthreads = 0
+
+ greenthread.kill(self._consumer_coro)
+
+ self.rpipe.close()
+ os.close(self.wpipe)
+
+
class SafeUnpickler(object):
"""
Loading a pickled stream is potentially unsafe and exploitable because