summaryrefslogtreecommitdiffstats
path: root/gluster
diff options
context:
space:
mode:
authorvenkata edara <redara@redhat.com>2017-05-10 13:27:38 +0530
committerPrashanth Pai <ppai@redhat.com>2017-05-11 05:48:27 +0000
commit513988915aa1af13a989d062b021fe1562cbf18d (patch)
tree1c281911e3a9bfa97f8a7285f20691cb77c45c1d /gluster
parente9c2c5eb55e1012ccce0ce51ac48bed0c0f1d4b7 (diff)
Rebase to Swift 2.10.1 (newton)
Change-Id: I53a962c9a301089c8aed0b43c50f944c30225944 Signed-off-by: venkata edara <redara@redhat.com> Reviewed-on: https://review.gluster.org/16653 Reviewed-by: Prashanth Pai <ppai@redhat.com> Tested-by: Prashanth Pai <ppai@redhat.com>
Diffstat (limited to 'gluster')
-rw-r--r--gluster/swift/__init__.py2
-rw-r--r--gluster/swift/account/utils.py4
-rw-r--r--gluster/swift/common/DiskDir.py23
-rw-r--r--gluster/swift/common/constraints.py11
-rw-r--r--gluster/swift/common/exceptions.py6
-rw-r--r--gluster/swift/common/middleware/gswauth/swauth/middleware.py6
-rw-r--r--gluster/swift/common/utils.py208
-rw-r--r--gluster/swift/container/server.py6
-rw-r--r--gluster/swift/obj/diskfile.py65
-rw-r--r--gluster/swift/obj/expirer.py2
10 files changed, 314 insertions, 19 deletions
diff --git a/gluster/swift/__init__.py b/gluster/swift/__init__.py
index c0b415a..ac0c566 100644
--- a/gluster/swift/__init__.py
+++ b/gluster/swift/__init__.py
@@ -45,6 +45,6 @@ class PkgInfo(object):
#
# Change the Package version here
#
-_pkginfo = PkgInfo('2.3.0', '0', 'gluster_swift', False)
+_pkginfo = PkgInfo('2.10.1', '0', 'gluster_swift', False)
__version__ = _pkginfo.pretty_version
__canonical_version__ = _pkginfo.canonical_version
diff --git a/gluster/swift/account/utils.py b/gluster/swift/account/utils.py
index 99fe5ea..4424835 100644
--- a/gluster/swift/account/utils.py
+++ b/gluster/swift/account/utils.py
@@ -21,7 +21,7 @@ from xml.sax import saxutils
def account_listing_response(account, req, response_content_type, broker=None,
limit='', marker='', end_marker='', prefix='',
- delimiter=''):
+ delimiter='', reverse=False):
"""
This is an exact copy of swift.account.utis.account_listing_response()
except for one difference i.e this method passes response_content_type
@@ -34,7 +34,7 @@ def account_listing_response(account, req, response_content_type, broker=None,
account_list = broker.list_containers_iter(limit, marker, end_marker,
prefix, delimiter,
- response_content_type)
+ response_content_type, reverse)
if response_content_type == 'application/json':
data = []
for (name, object_count, bytes_used, is_subdir) in account_list:
diff --git a/gluster/swift/common/DiskDir.py b/gluster/swift/common/DiskDir.py
index 4f4a2ef..0bc95df 100644
--- a/gluster/swift/common/DiskDir.py
+++ b/gluster/swift/common/DiskDir.py
@@ -33,7 +33,7 @@ from gluster.swift.common.exceptions import FileOrDirNotFoundError, \
from gluster.swift.obj.expirer import delete_tracker_object
from swift.common.constraints import MAX_META_COUNT, MAX_META_OVERALL_SIZE
from swift.common.swob import HTTPBadRequest
-from swift.common.utils import ThreadPool
+from gluster.swift.common.utils import ThreadPool
DATADIR = 'containers'
@@ -399,7 +399,7 @@ class DiskDir(DiskCommon):
def list_objects_iter(self, limit, marker, end_marker,
prefix, delimiter, path=None,
storage_policy_index=0,
- out_content_type=None):
+ out_content_type=None, reverse=False):
"""
Returns tuple of name, created_at, size, content_type, etag.
"""
@@ -427,6 +427,9 @@ class DiskDir(DiskCommon):
# No objects in container , return empty list
return container_list
+ if marker and end_marker and reverse:
+ marker, end_marker = end_marker, marker
+
if end_marker:
objects = filter_end_marker(objects, end_marker)
@@ -471,6 +474,8 @@ class DiskDir(DiskCommon):
container_list.append((obj, '0', 0, 'text/plain', ''))
if len(container_list) >= limit:
break
+ if reverse:
+ container_list.reverse()
return container_list
count = 0
@@ -512,7 +517,8 @@ class DiskDir(DiskCommon):
count += 1
if count >= limit:
break
-
+ if reverse:
+ container_list.reverse()
return container_list
def _update_object_count(self):
@@ -778,7 +784,8 @@ class DiskAccount(DiskCommon):
return containers
def list_containers_iter(self, limit, marker, end_marker,
- prefix, delimiter, response_content_type=None):
+ prefix, delimiter, response_content_type=None,
+ reverse=False):
"""
Return tuple of name, object_count, bytes_used, 0(is_subdir).
Used by account server.
@@ -794,6 +801,9 @@ class DiskAccount(DiskCommon):
# No containers in account, return empty list
return account_list
+ if marker and end_marker and reverse:
+ marker, end_marker = end_marker, marker
+
if containers and end_marker:
containers = filter_end_marker(containers, end_marker)
@@ -841,6 +851,8 @@ class DiskAccount(DiskCommon):
account_list.append((container, 0, 0, 0))
if len(account_list) >= limit:
break
+ if reverse:
+ account_list.reverse()
return account_list
count = 0
@@ -866,7 +878,8 @@ class DiskAccount(DiskCommon):
count += 1
if count >= limit:
break
-
+ if reverse:
+ account_list.reverse()
return account_list
def get_info(self):
diff --git a/gluster/swift/common/constraints.py b/gluster/swift/common/constraints.py
index 98e2a27..2007b71 100644
--- a/gluster/swift/common/constraints.py
+++ b/gluster/swift/common/constraints.py
@@ -102,3 +102,14 @@ _ring.Ring = ring.Ring
import swift.account.utils
from gluster.swift.account.utils import account_listing_response as gf_als
swift.account.utils.account_listing_response = gf_als
+
+# Monkey patch StoragePolicy.load_ring as POLICIES are initialized already
+from swift.common.storage_policy import StoragePolicy
+
+
+def load_ring(self, swift_dir):
+ if self.object_ring:
+ return
+ self.object_ring = ring.Ring(swift_dir, ring_name='object')
+
+StoragePolicy.load_ring = load_ring
diff --git a/gluster/swift/common/exceptions.py b/gluster/swift/common/exceptions.py
index 8260dd9..4dc2878 100644
--- a/gluster/swift/common/exceptions.py
+++ b/gluster/swift/common/exceptions.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from swift.common.exceptions import SwiftException
+
class GlusterFileSystemOSError(OSError):
pass
@@ -48,3 +50,7 @@ class AlreadyExistsAsFile(GlusterfsException):
class DiskFileContainerDoesNotExist(GlusterfsException):
pass
+
+
+class ThreadPoolDead(SwiftException):
+ pass
diff --git a/gluster/swift/common/middleware/gswauth/swauth/middleware.py b/gluster/swift/common/middleware/gswauth/swauth/middleware.py
index 7a6d713..a266d74 100644
--- a/gluster/swift/common/middleware/gswauth/swauth/middleware.py
+++ b/gluster/swift/common/middleware/gswauth/swauth/middleware.py
@@ -379,7 +379,7 @@ class Swauth(object):
if memcache_client:
memcache_client.set(
memcache_key, (time() + expires_from_now, groups),
- timeout=expires_from_now)
+ time=expires_from_now)
else:
path = quote('/v1/%s/.token_%s/%s' %
(self.auth_account, token[-1], token))
@@ -401,7 +401,7 @@ class Swauth(object):
memcache_client.set(
memcache_key,
(detail['expires'], groups),
- timeout=float(detail['expires'] - time()))
+ time=float(detail['expires'] - time()))
return groups
def authorize(self, req):
@@ -1448,7 +1448,7 @@ class Swauth(object):
(self.itoken_expires,
'%s,.reseller_admin,%s' % (self.metadata_volume,
self.auth_account)),
- timeout=self.token_life)
+ time=self.token_life)
return self.itoken
def get_admin_detail(self, req):
diff --git a/gluster/swift/common/utils.py b/gluster/swift/common/utils.py
index 8f68319..ac41698 100644
--- a/gluster/swift/common/utils.py
+++ b/gluster/swift/common/utils.py
@@ -14,17 +14,23 @@
# limitations under the License.
import os
+import sys
import stat
import json
import errno
import random
import logging
from hashlib import md5
-from eventlet import sleep
+from eventlet import sleep, Timeout, tpool, greenthread, \
+ greenio, event
+from Queue import Queue, Empty
+import threading as stdlib_threading
+
import cPickle as pickle
from cStringIO import StringIO
import pickletools
-from gluster.swift.common.exceptions import GlusterFileSystemIOError
+from gluster.swift.common.exceptions import GlusterFileSystemIOError, \
+ ThreadPoolDead
from swift.common.exceptions import DiskFileNoSpace
from swift.common.db import utf8encodekeys
from gluster.swift.common.fs_utils import do_getctime, do_getmtime, do_stat, \
@@ -69,6 +75,204 @@ PICKLE_PROTOCOL = 2
CHUNK_SIZE = 65536
+class ThreadPool(object):
+ """
+ Perform blocking operations in background threads.
+
+ Call its methods from within greenlets to green-wait for results without
+ blocking the eventlet reactor (hopefully).
+ """
+
+ BYTE = 'a'.encode('utf-8')
+
+ def __init__(self, nthreads=2):
+ self.nthreads = nthreads
+ self._run_queue = Queue()
+ self._result_queue = Queue()
+ self._threads = []
+ self._alive = True
+
+ if nthreads <= 0:
+ return
+
+ # We spawn a greenthread whose job it is to pull results from the
+ # worker threads via a real Queue and send them to eventlet Events so
+ # that the calling greenthreads can be awoken.
+ #
+ # Since each OS thread has its own collection of greenthreads, it
+ # doesn't work to have the worker thread send stuff to the event, as
+ # it then notifies its own thread-local eventlet hub to wake up, which
+ # doesn't do anything to help out the actual calling greenthread over
+ # in the main thread.
+ #
+ # Thus, each worker sticks its results into a result queue and then
+ # writes a byte to a pipe, signaling the result-consuming greenlet (in
+ # the main thread) to wake up and consume results.
+ #
+ # This is all stuff that eventlet.tpool does, but that code can't have
+ # multiple instances instantiated. Since the object server uses one
+ # pool per disk, we have to reimplement this stuff.
+ _raw_rpipe, self.wpipe = os.pipe()
+ self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb', bufsize=0)
+
+ for _junk in xrange(nthreads):
+ thr = stdlib_threading.Thread(
+ target=self._worker,
+ args=(self._run_queue, self._result_queue))
+ thr.daemon = True
+ thr.start()
+ self._threads.append(thr)
+ # This is the result-consuming greenthread that runs in the main OS
+ # thread, as described above.
+ self._consumer_coro = greenthread.spawn_n(self._consume_results,
+ self._result_queue)
+
+ def _worker(self, work_queue, result_queue):
+ """
+ Pulls an item from the queue and runs it, then puts the result into
+ the result queue. Repeats forever.
+
+ :param work_queue: queue from which to pull work
+ :param result_queue: queue into which to place results
+ """
+ while True:
+ item = work_queue.get()
+ if item is None:
+ break
+ ev, func, args, kwargs = item
+ try:
+ result = func(*args, **kwargs)
+ result_queue.put((ev, True, result))
+ except BaseException:
+ result_queue.put((ev, False, sys.exc_info()))
+ finally:
+ work_queue.task_done()
+ os.write(self.wpipe, self.BYTE)
+
+ def _consume_results(self, queue):
+ """
+ Runs as a greenthread in the same OS thread as callers of
+ run_in_thread().
+
+ Takes results from the worker OS threads and sends them to the waiting
+ greenthreads.
+ """
+ while True:
+ try:
+ self.rpipe.read(1)
+ except ValueError:
+ # can happen at process shutdown when pipe is closed
+ break
+
+ while True:
+ try:
+ ev, success, result = queue.get(block=False)
+ except Empty:
+ break
+
+ try:
+ if success:
+ ev.send(result)
+ else:
+ ev.send_exception(*result)
+ finally:
+ queue.task_done()
+
+ def run_in_thread(self, func, *args, **kwargs):
+ """
+ Runs func(*args, **kwargs) in a thread. Blocks the current greenlet
+ until results are available.
+
+ Exceptions thrown will be reraised in the calling thread.
+
+ If the threadpool was initialized with nthreads=0, it invokes
+ func(*args, **kwargs) directly, followed by eventlet.sleep() to ensure
+ the eventlet hub has a chance to execute. It is more likely the hub
+ will be invoked when queuing operations to an external thread.
+
+ :returns: result of calling func
+ :raises: whatever func raises
+ """
+ if not self._alive:
+ raise ThreadPoolDead()
+
+ if self.nthreads <= 0:
+ result = func(*args, **kwargs)
+ sleep()
+ return result
+
+ ev = event.Event()
+ self._run_queue.put((ev, func, args, kwargs), block=False)
+
+ # blocks this greenlet (and only *this* greenlet) until the real
+ # thread calls ev.send().
+ result = ev.wait()
+ return result
+
+ def _run_in_eventlet_tpool(self, func, *args, **kwargs):
+ """
+ Really run something in an external thread, even if we haven't got any
+ threads of our own.
+ """
+ def inner():
+ try:
+ return (True, func(*args, **kwargs))
+ except (Timeout, BaseException) as err:
+ return (False, err)
+
+ success, result = tpool.execute(inner)
+ if success:
+ return result
+ else:
+ raise result
+
+ def force_run_in_thread(self, func, *args, **kwargs):
+ """
+ Runs func(*args, **kwargs) in a thread. Blocks the current greenlet
+ until results are available.
+
+ Exceptions thrown will be reraised in the calling thread.
+
+ If the threadpool was initialized with nthreads=0, uses eventlet.tpool
+ to run the function. This is in contrast to run_in_thread(), which
+ will (in that case) simply execute func in the calling thread.
+
+ :returns: result of calling func
+ :raises: whatever func raises
+ """
+ if not self._alive:
+ raise ThreadPoolDead()
+
+ if self.nthreads <= 0:
+ return self._run_in_eventlet_tpool(func, *args, **kwargs)
+ else:
+ return self.run_in_thread(func, *args, **kwargs)
+
+ def terminate(self):
+ """
+ Releases the threadpool's resources (OS threads, greenthreads, pipes,
+ etc.) and renders it unusable.
+
+ Don't call run_in_thread() or force_run_in_thread() after calling
+ terminate().
+ """
+ self._alive = False
+ if self.nthreads <= 0:
+ return
+
+ for _junk in range(self.nthreads):
+ self._run_queue.put(None)
+ for thr in self._threads:
+ thr.join()
+ self._threads = []
+ self.nthreads = 0
+
+ greenthread.kill(self._consumer_coro)
+
+ self.rpipe.close()
+ os.close(self.wpipe)
+
+
class SafeUnpickler(object):
"""
Loading a pickled stream is potentially unsafe and exploitable because
diff --git a/gluster/swift/container/server.py b/gluster/swift/container/server.py
index e62076a..82e682a 100644
--- a/gluster/swift/container/server.py
+++ b/gluster/swift/container/server.py
@@ -21,7 +21,7 @@ import gluster.swift.common.constraints # noqa
from swift.container import server
from gluster.swift.common.DiskDir import DiskDir
-from swift.common.utils import public, timing_stats
+from swift.common.utils import public, timing_stats, config_true_value
from swift.common.exceptions import DiskFileNoSpace
from swift.common.swob import HTTPInsufficientStorage, HTTPNotFound, \
HTTPPreconditionFailed
@@ -105,6 +105,8 @@ class ContainerController(server.ContainerController):
end_marker = get_param(req, 'end_marker')
limit = constraints.CONTAINER_LISTING_LIMIT
given_limit = get_param(req, 'limit')
+ reverse = config_true_value(get_param(req, 'reverse'))
+
if given_limit and given_limit.isdigit():
limit = int(given_limit)
if limit > constraints.CONTAINER_LISTING_LIMIT:
@@ -125,7 +127,7 @@ class ContainerController(server.ContainerController):
container_list = broker.list_objects_iter(
limit, marker, end_marker, prefix, delimiter, path,
storage_policy_index=info['storage_policy_index'],
- out_content_type=out_content_type)
+ out_content_type=out_content_type, reverse=reverse)
return self.create_listing(req, out_content_type, info, resp_headers,
broker.metadata, container_list, container)
diff --git a/gluster/swift/obj/diskfile.py b/gluster/swift/obj/diskfile.py
index b94cf3d..be0669f 100644
--- a/gluster/swift/obj/diskfile.py
+++ b/gluster/swift/obj/diskfile.py
@@ -16,6 +16,7 @@
import os
import stat
import errno
+from collections import defaultdict
try:
from random import SystemRandom
random = SystemRandom()
@@ -25,10 +26,11 @@ import logging
import time
from uuid import uuid4
from eventlet import sleep
+from swift.common.utils import Timestamp
from contextlib import contextmanager
from gluster.swift.common.exceptions import AlreadyExistsAsFile, \
AlreadyExistsAsDir, DiskFileContainerDoesNotExist
-from swift.common.utils import ThreadPool
+from gluster.swift.common.utils import ThreadPool
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
DiskFileNoSpace, DiskFileDeviceUnavailable, DiskFileNotOpen, \
DiskFileExpired
@@ -212,9 +214,16 @@ class DiskFileManager(SwiftDiskFileManager):
:param conf: caller provided configuration object
:param logger: caller provided logger
"""
+ def __init__(self, conf, logger):
+ super(DiskFileManager, self).__init__(conf, logger)
+ threads_per_disk = int(conf.get('threads_per_disk', '0'))
+ self.threadpools = defaultdict(
+ lambda: ThreadPool(nthreads=threads_per_disk))
+
def get_diskfile(self, device, partition, account, container, obj,
policy=None, **kwargs):
dev_path = self.get_dev_path(device, self.mount_check)
+
if not dev_path:
raise DiskFileDeviceUnavailable()
return DiskFile(self, dev_path, self.threadpools[device],
@@ -553,7 +562,7 @@ class DiskFile(object):
"""
def __init__(self, mgr, dev_path, threadpool, partition,
account=None, container=None, obj=None,
- policy=None, uid=DEFAULT_UID, gid=DEFAULT_GID):
+ policy=None, uid=DEFAULT_UID, gid=DEFAULT_GID, **kwargs):
# Variables partition and policy is currently unused.
self._mgr = mgr
self._device_path = dev_path
@@ -588,6 +597,48 @@ class DiskFile(object):
self._data_file = os.path.join(self._put_datadir, self._obj)
self._disk_file_open = False
+ @property
+ def timestamp(self):
+ if self._metadata is None:
+ raise DiskFileNotOpen()
+ return Timestamp(self._metadata.get(X_TIMESTAMP))
+
+ @property
+ def data_timestamp(self):
+ return self.timestamp
+
+ @property
+ def durable_timestamp(self):
+ """
+ Provides the timestamp of the newest data file found in the object
+ directory.
+
+ :return: A Timestamp instance, or None if no data file was found.
+ :raises DiskFileNotOpen: if the open() method has not been previously
+ called on this instance.
+ """
+ if self._metadata:
+ return Timestamp(self._metadata.get(X_TIMESTAMP))
+ return None
+
+ @property
+ def fragments(self):
+ return None
+
+ @property
+ def content_type(self):
+ if self._metadata is None:
+ raise DiskFileNotOpen()
+ return self._metadata.get(X_CONTENT_TYPE)
+
+ @property
+ def content_type_timestamp(self):
+ if self._metadata is None:
+ raise DiskFileNotOpen()
+ t = self._metadata.get('Content-Type-Timestamp') or \
+ self._metadata.get(X_TIMESTAMP)
+ return Timestamp(t)
+
def open(self):
"""
Open the object.
@@ -710,6 +761,15 @@ class DiskFile(object):
self._disk_file_open = False
self._close_fd()
+ def get_datafile_metadata(self):
+ '''gluster swift dont have seperate data,meta files '''
+ if self._metadata is None:
+ raise DiskFileNotOpen()
+ return self._metadata
+
+ def get_metafile_metadata(self):
+ return None
+
def get_metadata(self):
"""
Provide the metadata for a previously opened object as a dictionary.
@@ -879,7 +939,6 @@ class DiskFile(object):
:raises AlreadyExistsAsFile: if path or part of a path is not a \
directory
"""
-
data_file = os.path.join(self._put_datadir, self._obj)
# Assume the full directory path exists to the file already, and
diff --git a/gluster/swift/obj/expirer.py b/gluster/swift/obj/expirer.py
index 38f870e..97e08e9 100644
--- a/gluster/swift/obj/expirer.py
+++ b/gluster/swift/obj/expirer.py
@@ -25,7 +25,7 @@ from gluster.swift.common.utils import delete_tracker_object
from swift.obj.expirer import ObjectExpirer as SwiftObjectExpirer
from swift.common.http import HTTP_NOT_FOUND
from swift.common.internal_client import InternalClient, UnexpectedResponse
-from swift.common.utils import ThreadPool
+from gluster.swift.common.utils import ThreadPool
EXCLUDE_DIRS = ('.trashcan', '.glusterfs')