From 488744a005fb399af8d094ad7a62c1917410398c Mon Sep 17 00:00:00 2001 From: venkata edara Date: Wed, 22 Nov 2017 13:35:46 +0530 Subject: Rebase to Swift 2.15.1 (pike) Change-Id: I84ebb44c5c3cf2f80c50f2d4ae4bd92b619a4297 Signed-off-by: venkata edara Reviewed-on: https://review.gluster.org/18412 Reviewed-by: Prashanth Pai Tested-by: Prashanth Pai --- test/unit/__init__.py | 244 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 218 insertions(+), 26 deletions(-) (limited to 'test/unit/__init__.py') diff --git a/test/unit/__init__.py b/test/unit/__init__.py index ee2a262..d9750b7 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -21,6 +21,7 @@ import copy import logging import errno from six.moves import range +from six import BytesIO import sys from contextlib import contextmanager, closing from collections import defaultdict, Iterable @@ -29,18 +30,20 @@ from numbers import Number from tempfile import NamedTemporaryFile import time import eventlet +from eventlet import greenpool, debug as eventlet_debug from eventlet.green import socket from tempfile import mkdtemp from shutil import rmtree import signal import json - +import random from swift.common.utils import Timestamp, NOTICE from test import get_config from swift.common import utils from swift.common.header_key_dict import HeaderKeyDict from swift.common.ring import Ring, RingData +from swift.obj import server from hashlib import md5 import logging.handlers @@ -48,6 +51,7 @@ from six.moves.http_client import HTTPException from swift.common import storage_policy from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, VALID_EC_TYPES) +from swift.common import swob import functools import six.moves.cPickle as pickle from gzip import GzipFile @@ -118,7 +122,7 @@ def patch_policies(thing_or_policies=None, legacy_only=False, class PatchPolicies(object): """ Why not mock.patch? In my case, when used as a decorator on the class it - seemed to patch setUp at the wrong time (i.e. in setup the global wasn't + seemed to patch setUp at the wrong time (i.e. in setUp the global wasn't patched yet) """ @@ -165,41 +169,38 @@ class PatchPolicies(object): """ orig_setUp = cls.setUp - orig_tearDown = cls.tearDown + + def unpatch_cleanup(cls_self): + if cls_self._policies_patched: + self.__exit__() + cls_self._policies_patched = False def setUp(cls_self): - self._orig_POLICIES = storage_policy._POLICIES if not getattr(cls_self, '_policies_patched', False): - storage_policy._POLICIES = self.policies - self._setup_rings() + self.__enter__() cls_self._policies_patched = True - + cls_self.addCleanup(unpatch_cleanup, cls_self) orig_setUp(cls_self) - def tearDown(cls_self): - orig_tearDown(cls_self) - storage_policy._POLICIES = self._orig_POLICIES - cls.setUp = setUp - cls.tearDown = tearDown return cls def _patch_method(self, f): @functools.wraps(f) def mywrapper(*args, **kwargs): - self._orig_POLICIES = storage_policy._POLICIES - try: - storage_policy._POLICIES = self.policies - self._setup_rings() + with self: return f(*args, **kwargs) - finally: - storage_policy._POLICIES = self._orig_POLICIES return mywrapper def __enter__(self): self._orig_POLICIES = storage_policy._POLICIES storage_policy._POLICIES = self.policies + try: + self._setup_rings() + except: # noqa + self.__exit__() + raise def __exit__(self, *args): storage_policy._POLICIES = self._orig_POLICIES @@ -212,17 +213,35 @@ class FakeRing(Ring): self._base_port = base_port self.max_more_nodes = max_more_nodes self._part_shift = 32 - part_power + self._init_device_char() # 9 total nodes (6 more past the initial 3) is the cap, no matter if # this is set higher, or R^2 for R replicas self.set_replicas(replicas) self._reload() + def has_changed(self): + """ + The real implementation uses getmtime on the serialized_path attribute, + which doesn't exist on our fake and relies on the implementation of + _reload which we override. So ... just NOOPE. + """ + return False + def _reload(self): self._rtime = time.time() + @property + def device_char(self): + return next(self._device_char_iter) + + def _init_device_char(self): + self._device_char_iter = itertools.cycle( + ['sd%s' % chr(ord('a') + x) for x in range(26)]) + def set_replicas(self, replicas): self.replicas = replicas self._devs = [] + self._init_device_char() for x in range(self.replicas): ip = '10.0.0.%s' % x port = self._base_port + x @@ -232,7 +251,7 @@ class FakeRing(Ring): 'replication_ip': ip, 'port': port, 'replication_port': port, - 'device': 'sd' + (chr(ord('a') + x)), + 'device': self.device_char, 'zone': x % 3, 'region': x % 2, 'id': x, @@ -289,9 +308,8 @@ class FabricatedRing(Ring): self.devices = devices self.nodes = nodes self.port = port - self.replicas = 6 - self.part_power = part_power - self._part_shift = 32 - self.part_power + self.replicas = replicas + self._part_shift = 32 - part_power self._reload() def _reload(self, *args, **kwargs): @@ -681,6 +699,16 @@ if utils.config_true_value( fake_syslog_handler() +@contextmanager +def quiet_eventlet_exceptions(): + orig_state = greenpool.DEBUG + eventlet_debug.hub_exceptions(False) + try: + yield + finally: + eventlet_debug.hub_exceptions(orig_state) + + class MockTrue(object): """ Instances of MockTrue evaluate like True @@ -998,6 +1026,7 @@ def fake_http_connect(*code_iter, **kwargs): body_iter = kwargs.get('body_iter', None) if body_iter: body_iter = iter(body_iter) + unexpected_requests = [] def connect(*args, **ckwargs): if kwargs.get('slow_connect', False): @@ -1007,7 +1036,15 @@ def fake_http_connect(*code_iter, **kwargs): kwargs['give_content_type'](args[6]['Content-Type']) else: kwargs['give_content_type']('') - i, status = next(conn_id_and_code_iter) + try: + i, status = next(conn_id_and_code_iter) + except StopIteration: + # the code under test may swallow the StopIteration, so by logging + # unexpected requests here we allow the test framework to check for + # them after the connect function has been used. + unexpected_requests.append((args, kwargs)) + raise + if 'give_connect' in kwargs: give_conn_fn = kwargs['give_connect'] argspec = inspect.getargspec(give_conn_fn) @@ -1030,6 +1067,7 @@ def fake_http_connect(*code_iter, **kwargs): connection_id=i, give_send=kwargs.get('give_send'), give_expect=kwargs.get('give_expect')) + connect.unexpected_requests = unexpected_requests connect.code_iter = code_iter return connect @@ -1059,10 +1097,14 @@ def mocked_http_conn(*args, **kwargs): left_over_status = list(fake_conn.code_iter) if left_over_status: raise AssertionError('left over status %r' % left_over_status) + if fake_conn.unexpected_requests: + raise AssertionError('unexpected requests %r' % + fake_conn.unexpected_requests) -def make_timestamp_iter(): - return iter(Timestamp(t) for t in itertools.count(int(time.time()))) +def make_timestamp_iter(offset=0): + return iter(Timestamp(t) + for t in itertools.count(int(time.time()) + offset)) class Timeout(object): @@ -1091,6 +1133,30 @@ def requires_o_tmpfile_support(func): return wrapper +class StubResponse(object): + + def __init__(self, status, body='', headers=None, frag_index=None): + self.status = status + self.body = body + self.readable = BytesIO(body) + self.headers = HeaderKeyDict(headers) + if frag_index is not None: + self.headers['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index + fake_reason = ('Fake', 'This response is a lie.') + self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0] + + def getheader(self, header_name, default=None): + return self.headers.get(header_name, default) + + def getheaders(self): + if 'Content-Length' not in self.headers: + self.headers['Content-Length'] = len(self.body) + return self.headers.items() + + def read(self, amt=0): + return self.readable.read(amt) + + def encode_frag_archive_bodies(policy, body): """ Given a stub body produce a list of complete frag_archive bodies as @@ -1109,7 +1175,8 @@ def encode_frag_archive_bodies(policy, body): # encode the buffers into fragment payloads fragment_payloads = [] for chunk in chunks: - fragments = policy.pyeclib_driver.encode(chunk) + fragments = policy.pyeclib_driver.encode(chunk) \ + * policy.ec_duplication_factor if not fragments: break fragment_payloads.append(fragments) @@ -1118,3 +1185,128 @@ def encode_frag_archive_bodies(policy, body): ec_archive_bodies = [''.join(frags) for frags in zip(*fragment_payloads)] return ec_archive_bodies + + +def make_ec_object_stub(test_body, policy, timestamp): + segment_size = policy.ec_segment_size + test_body = test_body or ( + 'test' * segment_size)[:-random.randint(1, 1000)] + timestamp = timestamp or utils.Timestamp.now() + etag = md5(test_body).hexdigest() + ec_archive_bodies = encode_frag_archive_bodies(policy, test_body) + + return { + 'body': test_body, + 'etag': etag, + 'frags': ec_archive_bodies, + 'timestamp': timestamp + } + + +def fake_ec_node_response(node_frags, policy): + """ + Given a list of entries for each node in ring order, where the entries + are a dict (or list of dicts) which describes the fragment (or + fragments) that are on the node; create a function suitable for use + with capture_http_requests that will accept a req object and return a + response that will suitably fake the behavior of an object server who + had the given fragments on disk at the time. + + :param node_frags: a list. Each item in the list describes the + fragments that are on a node; each item is a dict or list of dicts, + each dict describing a single fragment; where the item is a list, + repeated calls to get_response will return fragments in the order + of the list; each dict has keys: + - obj: an object stub, as generated by _make_ec_object_stub, + that defines all of the fragments that compose an object + at a specific timestamp. + - frag: the index of a fragment to be selected from the object + stub + - durable (optional): True if the selected fragment is durable + :param policy: storage policy to return + """ + node_map = {} # maps node ip and port to node index + all_nodes = [] + call_count = {} # maps node index to get_response call count for node + + def _build_node_map(req, policy): + node_key = lambda n: (n['ip'], n['port']) + part = utils.split_path(req['path'], 5, 5, True)[1] + all_nodes.extend(policy.object_ring.get_part_nodes(part)) + all_nodes.extend(policy.object_ring.get_more_nodes(part)) + for i, node in enumerate(all_nodes): + node_map[node_key(node)] = i + call_count[i] = 0 + + # normalize node_frags to a list of fragments for each node even + # if there's only one fragment in the dataset provided. + for i, frags in enumerate(node_frags): + if isinstance(frags, dict): + node_frags[i] = [frags] + + def get_response(req): + requested_policy = int( + req['headers']['X-Backend-Storage-Policy-Index']) + if int(policy) != requested_policy: + AssertionError( + "Requested polciy doesn't fit the fake response policy") + if not node_map: + _build_node_map(req, policy) + + try: + node_index = node_map[(req['ip'], req['port'])] + except KeyError: + raise Exception("Couldn't find node %s:%s in %r" % ( + req['ip'], req['port'], all_nodes)) + try: + frags = node_frags[node_index] + except IndexError: + raise Exception('Found node %r:%r at index %s - ' + 'but only got %s stub response nodes' % ( + req['ip'], req['port'], node_index, + len(node_frags))) + + if not frags: + return StubResponse(404) + + # determine response fragment (if any) for this call + resp_frag = frags[call_count[node_index]] + call_count[node_index] += 1 + frag_prefs = req['headers'].get('X-Backend-Fragment-Preferences') + if not (frag_prefs or resp_frag.get('durable', True)): + return StubResponse(404) + + # prepare durable timestamp and backend frags header for this node + obj_stub = resp_frag['obj'] + ts2frags = defaultdict(list) + durable_timestamp = None + for frag in frags: + ts_frag = frag['obj']['timestamp'] + if frag.get('durable', True): + durable_timestamp = ts_frag.internal + ts2frags[ts_frag].append(frag['frag']) + + try: + body = obj_stub['frags'][resp_frag['frag']] + except IndexError as err: + raise Exception( + 'Frag index %s not defined: node index %s, frags %r\n%s' % + (resp_frag['frag'], node_index, [f['frag'] for f in frags], + err)) + headers = { + 'X-Object-Sysmeta-Ec-Content-Length': len(obj_stub['body']), + 'X-Object-Sysmeta-Ec-Etag': obj_stub['etag'], + 'X-Object-Sysmeta-Ec-Frag-Index': + policy.get_backend_index(resp_frag['frag']), + 'X-Backend-Timestamp': obj_stub['timestamp'].internal, + 'X-Timestamp': obj_stub['timestamp'].normal, + 'X-Backend-Data-Timestamp': obj_stub['timestamp'].internal, + 'X-Backend-Fragments': + server._make_backend_fragments_header(ts2frags) + } + if durable_timestamp: + headers['X-Backend-Durable-Timestamp'] = durable_timestamp + + return StubResponse(200, body, headers) + + return get_response -- cgit