From 513988915aa1af13a989d062b021fe1562cbf18d Mon Sep 17 00:00:00 2001 From: venkata edara Date: Wed, 10 May 2017 13:27:38 +0530 Subject: Rebase to Swift 2.10.1 (newton) Change-Id: I53a962c9a301089c8aed0b43c50f944c30225944 Signed-off-by: venkata edara Reviewed-on: https://review.gluster.org/16653 Reviewed-by: Prashanth Pai Tested-by: Prashanth Pai --- gluster/swift/__init__.py | 2 +- gluster/swift/account/utils.py | 4 +- gluster/swift/common/DiskDir.py | 23 ++- gluster/swift/common/constraints.py | 11 ++ gluster/swift/common/exceptions.py | 6 + .../common/middleware/gswauth/swauth/middleware.py | 6 +- gluster/swift/common/utils.py | 208 ++++++++++++++++++++- gluster/swift/container/server.py | 6 +- gluster/swift/obj/diskfile.py | 65 ++++++- gluster/swift/obj/expirer.py | 2 +- 10 files changed, 314 insertions(+), 19 deletions(-) (limited to 'gluster') diff --git a/gluster/swift/__init__.py b/gluster/swift/__init__.py index c0b415a..ac0c566 100644 --- a/gluster/swift/__init__.py +++ b/gluster/swift/__init__.py @@ -45,6 +45,6 @@ class PkgInfo(object): # # Change the Package version here # -_pkginfo = PkgInfo('2.3.0', '0', 'gluster_swift', False) +_pkginfo = PkgInfo('2.10.1', '0', 'gluster_swift', False) __version__ = _pkginfo.pretty_version __canonical_version__ = _pkginfo.canonical_version diff --git a/gluster/swift/account/utils.py b/gluster/swift/account/utils.py index 99fe5ea..4424835 100644 --- a/gluster/swift/account/utils.py +++ b/gluster/swift/account/utils.py @@ -21,7 +21,7 @@ from xml.sax import saxutils def account_listing_response(account, req, response_content_type, broker=None, limit='', marker='', end_marker='', prefix='', - delimiter=''): + delimiter='', reverse=False): """ This is an exact copy of swift.account.utis.account_listing_response() except for one difference i.e this method passes response_content_type @@ -34,7 +34,7 @@ def account_listing_response(account, req, response_content_type, broker=None, account_list = broker.list_containers_iter(limit, marker, end_marker, prefix, delimiter, - response_content_type) + response_content_type, reverse) if response_content_type == 'application/json': data = [] for (name, object_count, bytes_used, is_subdir) in account_list: diff --git a/gluster/swift/common/DiskDir.py b/gluster/swift/common/DiskDir.py index 4f4a2ef..0bc95df 100644 --- a/gluster/swift/common/DiskDir.py +++ b/gluster/swift/common/DiskDir.py @@ -33,7 +33,7 @@ from gluster.swift.common.exceptions import FileOrDirNotFoundError, \ from gluster.swift.obj.expirer import delete_tracker_object from swift.common.constraints import MAX_META_COUNT, MAX_META_OVERALL_SIZE from swift.common.swob import HTTPBadRequest -from swift.common.utils import ThreadPool +from gluster.swift.common.utils import ThreadPool DATADIR = 'containers' @@ -399,7 +399,7 @@ class DiskDir(DiskCommon): def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter, path=None, storage_policy_index=0, - out_content_type=None): + out_content_type=None, reverse=False): """ Returns tuple of name, created_at, size, content_type, etag. """ @@ -427,6 +427,9 @@ class DiskDir(DiskCommon): # No objects in container , return empty list return container_list + if marker and end_marker and reverse: + marker, end_marker = end_marker, marker + if end_marker: objects = filter_end_marker(objects, end_marker) @@ -471,6 +474,8 @@ class DiskDir(DiskCommon): container_list.append((obj, '0', 0, 'text/plain', '')) if len(container_list) >= limit: break + if reverse: + container_list.reverse() return container_list count = 0 @@ -512,7 +517,8 @@ class DiskDir(DiskCommon): count += 1 if count >= limit: break - + if reverse: + container_list.reverse() return container_list def _update_object_count(self): @@ -778,7 +784,8 @@ class DiskAccount(DiskCommon): return containers def list_containers_iter(self, limit, marker, end_marker, - prefix, delimiter, response_content_type=None): + prefix, delimiter, response_content_type=None, + reverse=False): """ Return tuple of name, object_count, bytes_used, 0(is_subdir). Used by account server. @@ -794,6 +801,9 @@ class DiskAccount(DiskCommon): # No containers in account, return empty list return account_list + if marker and end_marker and reverse: + marker, end_marker = end_marker, marker + if containers and end_marker: containers = filter_end_marker(containers, end_marker) @@ -841,6 +851,8 @@ class DiskAccount(DiskCommon): account_list.append((container, 0, 0, 0)) if len(account_list) >= limit: break + if reverse: + account_list.reverse() return account_list count = 0 @@ -866,7 +878,8 @@ class DiskAccount(DiskCommon): count += 1 if count >= limit: break - + if reverse: + account_list.reverse() return account_list def get_info(self): diff --git a/gluster/swift/common/constraints.py b/gluster/swift/common/constraints.py index 98e2a27..2007b71 100644 --- a/gluster/swift/common/constraints.py +++ b/gluster/swift/common/constraints.py @@ -102,3 +102,14 @@ _ring.Ring = ring.Ring import swift.account.utils from gluster.swift.account.utils import account_listing_response as gf_als swift.account.utils.account_listing_response = gf_als + +# Monkey patch StoragePolicy.load_ring as POLICIES are initialized already +from swift.common.storage_policy import StoragePolicy + + +def load_ring(self, swift_dir): + if self.object_ring: + return + self.object_ring = ring.Ring(swift_dir, ring_name='object') + +StoragePolicy.load_ring = load_ring diff --git a/gluster/swift/common/exceptions.py b/gluster/swift/common/exceptions.py index 8260dd9..4dc2878 100644 --- a/gluster/swift/common/exceptions.py +++ b/gluster/swift/common/exceptions.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from swift.common.exceptions import SwiftException + class GlusterFileSystemOSError(OSError): pass @@ -48,3 +50,7 @@ class AlreadyExistsAsFile(GlusterfsException): class DiskFileContainerDoesNotExist(GlusterfsException): pass + + +class ThreadPoolDead(SwiftException): + pass diff --git a/gluster/swift/common/middleware/gswauth/swauth/middleware.py b/gluster/swift/common/middleware/gswauth/swauth/middleware.py index 7a6d713..a266d74 100644 --- a/gluster/swift/common/middleware/gswauth/swauth/middleware.py +++ b/gluster/swift/common/middleware/gswauth/swauth/middleware.py @@ -379,7 +379,7 @@ class Swauth(object): if memcache_client: memcache_client.set( memcache_key, (time() + expires_from_now, groups), - timeout=expires_from_now) + time=expires_from_now) else: path = quote('/v1/%s/.token_%s/%s' % (self.auth_account, token[-1], token)) @@ -401,7 +401,7 @@ class Swauth(object): memcache_client.set( memcache_key, (detail['expires'], groups), - timeout=float(detail['expires'] - time())) + time=float(detail['expires'] - time())) return groups def authorize(self, req): @@ -1448,7 +1448,7 @@ class Swauth(object): (self.itoken_expires, '%s,.reseller_admin,%s' % (self.metadata_volume, self.auth_account)), - timeout=self.token_life) + time=self.token_life) return self.itoken def get_admin_detail(self, req): diff --git a/gluster/swift/common/utils.py b/gluster/swift/common/utils.py index 8f68319..ac41698 100644 --- a/gluster/swift/common/utils.py +++ b/gluster/swift/common/utils.py @@ -14,17 +14,23 @@ # limitations under the License. import os +import sys import stat import json import errno import random import logging from hashlib import md5 -from eventlet import sleep +from eventlet import sleep, Timeout, tpool, greenthread, \ + greenio, event +from Queue import Queue, Empty +import threading as stdlib_threading + import cPickle as pickle from cStringIO import StringIO import pickletools -from gluster.swift.common.exceptions import GlusterFileSystemIOError +from gluster.swift.common.exceptions import GlusterFileSystemIOError, \ + ThreadPoolDead from swift.common.exceptions import DiskFileNoSpace from swift.common.db import utf8encodekeys from gluster.swift.common.fs_utils import do_getctime, do_getmtime, do_stat, \ @@ -69,6 +75,204 @@ PICKLE_PROTOCOL = 2 CHUNK_SIZE = 65536 +class ThreadPool(object): + """ + Perform blocking operations in background threads. + + Call its methods from within greenlets to green-wait for results without + blocking the eventlet reactor (hopefully). + """ + + BYTE = 'a'.encode('utf-8') + + def __init__(self, nthreads=2): + self.nthreads = nthreads + self._run_queue = Queue() + self._result_queue = Queue() + self._threads = [] + self._alive = True + + if nthreads <= 0: + return + + # We spawn a greenthread whose job it is to pull results from the + # worker threads via a real Queue and send them to eventlet Events so + # that the calling greenthreads can be awoken. + # + # Since each OS thread has its own collection of greenthreads, it + # doesn't work to have the worker thread send stuff to the event, as + # it then notifies its own thread-local eventlet hub to wake up, which + # doesn't do anything to help out the actual calling greenthread over + # in the main thread. + # + # Thus, each worker sticks its results into a result queue and then + # writes a byte to a pipe, signaling the result-consuming greenlet (in + # the main thread) to wake up and consume results. + # + # This is all stuff that eventlet.tpool does, but that code can't have + # multiple instances instantiated. Since the object server uses one + # pool per disk, we have to reimplement this stuff. + _raw_rpipe, self.wpipe = os.pipe() + self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb', bufsize=0) + + for _junk in xrange(nthreads): + thr = stdlib_threading.Thread( + target=self._worker, + args=(self._run_queue, self._result_queue)) + thr.daemon = True + thr.start() + self._threads.append(thr) + # This is the result-consuming greenthread that runs in the main OS + # thread, as described above. + self._consumer_coro = greenthread.spawn_n(self._consume_results, + self._result_queue) + + def _worker(self, work_queue, result_queue): + """ + Pulls an item from the queue and runs it, then puts the result into + the result queue. Repeats forever. + + :param work_queue: queue from which to pull work + :param result_queue: queue into which to place results + """ + while True: + item = work_queue.get() + if item is None: + break + ev, func, args, kwargs = item + try: + result = func(*args, **kwargs) + result_queue.put((ev, True, result)) + except BaseException: + result_queue.put((ev, False, sys.exc_info())) + finally: + work_queue.task_done() + os.write(self.wpipe, self.BYTE) + + def _consume_results(self, queue): + """ + Runs as a greenthread in the same OS thread as callers of + run_in_thread(). + + Takes results from the worker OS threads and sends them to the waiting + greenthreads. + """ + while True: + try: + self.rpipe.read(1) + except ValueError: + # can happen at process shutdown when pipe is closed + break + + while True: + try: + ev, success, result = queue.get(block=False) + except Empty: + break + + try: + if success: + ev.send(result) + else: + ev.send_exception(*result) + finally: + queue.task_done() + + def run_in_thread(self, func, *args, **kwargs): + """ + Runs func(*args, **kwargs) in a thread. Blocks the current greenlet + until results are available. + + Exceptions thrown will be reraised in the calling thread. + + If the threadpool was initialized with nthreads=0, it invokes + func(*args, **kwargs) directly, followed by eventlet.sleep() to ensure + the eventlet hub has a chance to execute. It is more likely the hub + will be invoked when queuing operations to an external thread. + + :returns: result of calling func + :raises: whatever func raises + """ + if not self._alive: + raise ThreadPoolDead() + + if self.nthreads <= 0: + result = func(*args, **kwargs) + sleep() + return result + + ev = event.Event() + self._run_queue.put((ev, func, args, kwargs), block=False) + + # blocks this greenlet (and only *this* greenlet) until the real + # thread calls ev.send(). + result = ev.wait() + return result + + def _run_in_eventlet_tpool(self, func, *args, **kwargs): + """ + Really run something in an external thread, even if we haven't got any + threads of our own. + """ + def inner(): + try: + return (True, func(*args, **kwargs)) + except (Timeout, BaseException) as err: + return (False, err) + + success, result = tpool.execute(inner) + if success: + return result + else: + raise result + + def force_run_in_thread(self, func, *args, **kwargs): + """ + Runs func(*args, **kwargs) in a thread. Blocks the current greenlet + until results are available. + + Exceptions thrown will be reraised in the calling thread. + + If the threadpool was initialized with nthreads=0, uses eventlet.tpool + to run the function. This is in contrast to run_in_thread(), which + will (in that case) simply execute func in the calling thread. + + :returns: result of calling func + :raises: whatever func raises + """ + if not self._alive: + raise ThreadPoolDead() + + if self.nthreads <= 0: + return self._run_in_eventlet_tpool(func, *args, **kwargs) + else: + return self.run_in_thread(func, *args, **kwargs) + + def terminate(self): + """ + Releases the threadpool's resources (OS threads, greenthreads, pipes, + etc.) and renders it unusable. + + Don't call run_in_thread() or force_run_in_thread() after calling + terminate(). + """ + self._alive = False + if self.nthreads <= 0: + return + + for _junk in range(self.nthreads): + self._run_queue.put(None) + for thr in self._threads: + thr.join() + self._threads = [] + self.nthreads = 0 + + greenthread.kill(self._consumer_coro) + + self.rpipe.close() + os.close(self.wpipe) + + class SafeUnpickler(object): """ Loading a pickled stream is potentially unsafe and exploitable because diff --git a/gluster/swift/container/server.py b/gluster/swift/container/server.py index e62076a..82e682a 100644 --- a/gluster/swift/container/server.py +++ b/gluster/swift/container/server.py @@ -21,7 +21,7 @@ import gluster.swift.common.constraints # noqa from swift.container import server from gluster.swift.common.DiskDir import DiskDir -from swift.common.utils import public, timing_stats +from swift.common.utils import public, timing_stats, config_true_value from swift.common.exceptions import DiskFileNoSpace from swift.common.swob import HTTPInsufficientStorage, HTTPNotFound, \ HTTPPreconditionFailed @@ -105,6 +105,8 @@ class ContainerController(server.ContainerController): end_marker = get_param(req, 'end_marker') limit = constraints.CONTAINER_LISTING_LIMIT given_limit = get_param(req, 'limit') + reverse = config_true_value(get_param(req, 'reverse')) + if given_limit and given_limit.isdigit(): limit = int(given_limit) if limit > constraints.CONTAINER_LISTING_LIMIT: @@ -125,7 +127,7 @@ class ContainerController(server.ContainerController): container_list = broker.list_objects_iter( limit, marker, end_marker, prefix, delimiter, path, storage_policy_index=info['storage_policy_index'], - out_content_type=out_content_type) + out_content_type=out_content_type, reverse=reverse) return self.create_listing(req, out_content_type, info, resp_headers, broker.metadata, container_list, container) diff --git a/gluster/swift/obj/diskfile.py b/gluster/swift/obj/diskfile.py index b94cf3d..be0669f 100644 --- a/gluster/swift/obj/diskfile.py +++ b/gluster/swift/obj/diskfile.py @@ -16,6 +16,7 @@ import os import stat import errno +from collections import defaultdict try: from random import SystemRandom random = SystemRandom() @@ -25,10 +26,11 @@ import logging import time from uuid import uuid4 from eventlet import sleep +from swift.common.utils import Timestamp from contextlib import contextmanager from gluster.swift.common.exceptions import AlreadyExistsAsFile, \ AlreadyExistsAsDir, DiskFileContainerDoesNotExist -from swift.common.utils import ThreadPool +from gluster.swift.common.utils import ThreadPool from swift.common.exceptions import DiskFileNotExist, DiskFileError, \ DiskFileNoSpace, DiskFileDeviceUnavailable, DiskFileNotOpen, \ DiskFileExpired @@ -212,9 +214,16 @@ class DiskFileManager(SwiftDiskFileManager): :param conf: caller provided configuration object :param logger: caller provided logger """ + def __init__(self, conf, logger): + super(DiskFileManager, self).__init__(conf, logger) + threads_per_disk = int(conf.get('threads_per_disk', '0')) + self.threadpools = defaultdict( + lambda: ThreadPool(nthreads=threads_per_disk)) + def get_diskfile(self, device, partition, account, container, obj, policy=None, **kwargs): dev_path = self.get_dev_path(device, self.mount_check) + if not dev_path: raise DiskFileDeviceUnavailable() return DiskFile(self, dev_path, self.threadpools[device], @@ -553,7 +562,7 @@ class DiskFile(object): """ def __init__(self, mgr, dev_path, threadpool, partition, account=None, container=None, obj=None, - policy=None, uid=DEFAULT_UID, gid=DEFAULT_GID): + policy=None, uid=DEFAULT_UID, gid=DEFAULT_GID, **kwargs): # Variables partition and policy is currently unused. self._mgr = mgr self._device_path = dev_path @@ -588,6 +597,48 @@ class DiskFile(object): self._data_file = os.path.join(self._put_datadir, self._obj) self._disk_file_open = False + @property + def timestamp(self): + if self._metadata is None: + raise DiskFileNotOpen() + return Timestamp(self._metadata.get(X_TIMESTAMP)) + + @property + def data_timestamp(self): + return self.timestamp + + @property + def durable_timestamp(self): + """ + Provides the timestamp of the newest data file found in the object + directory. + + :return: A Timestamp instance, or None if no data file was found. + :raises DiskFileNotOpen: if the open() method has not been previously + called on this instance. + """ + if self._metadata: + return Timestamp(self._metadata.get(X_TIMESTAMP)) + return None + + @property + def fragments(self): + return None + + @property + def content_type(self): + if self._metadata is None: + raise DiskFileNotOpen() + return self._metadata.get(X_CONTENT_TYPE) + + @property + def content_type_timestamp(self): + if self._metadata is None: + raise DiskFileNotOpen() + t = self._metadata.get('Content-Type-Timestamp') or \ + self._metadata.get(X_TIMESTAMP) + return Timestamp(t) + def open(self): """ Open the object. @@ -710,6 +761,15 @@ class DiskFile(object): self._disk_file_open = False self._close_fd() + def get_datafile_metadata(self): + '''gluster swift dont have seperate data,meta files ''' + if self._metadata is None: + raise DiskFileNotOpen() + return self._metadata + + def get_metafile_metadata(self): + return None + def get_metadata(self): """ Provide the metadata for a previously opened object as a dictionary. @@ -879,7 +939,6 @@ class DiskFile(object): :raises AlreadyExistsAsFile: if path or part of a path is not a \ directory """ - data_file = os.path.join(self._put_datadir, self._obj) # Assume the full directory path exists to the file already, and diff --git a/gluster/swift/obj/expirer.py b/gluster/swift/obj/expirer.py index 38f870e..97e08e9 100644 --- a/gluster/swift/obj/expirer.py +++ b/gluster/swift/obj/expirer.py @@ -25,7 +25,7 @@ from gluster.swift.common.utils import delete_tracker_object from swift.obj.expirer import ObjectExpirer as SwiftObjectExpirer from swift.common.http import HTTP_NOT_FOUND from swift.common.internal_client import InternalClient, UnexpectedResponse -from swift.common.utils import ThreadPool +from gluster.swift.common.utils import ThreadPool EXCLUDE_DIRS = ('.trashcan', '.glusterfs') -- cgit