diff options
Diffstat (limited to 'test/unit/proxy/controllers')
-rw-r--r-- | test/unit/proxy/controllers/test_account.py | 16 | ||||
-rw-r--r-- | test/unit/proxy/controllers/test_base.py | 463 | ||||
-rw-r--r-- | test/unit/proxy/controllers/test_container.py | 93 | ||||
-rwxr-xr-x | test/unit/proxy/controllers/test_obj.py | 1412 |
4 files changed, 1747 insertions, 237 deletions
diff --git a/test/unit/proxy/controllers/test_account.py b/test/unit/proxy/controllers/test_account.py index 47f76dc..23ad0a1 100644 --- a/test/unit/proxy/controllers/test_account.py +++ b/test/unit/proxy/controllers/test_account.py @@ -20,18 +20,21 @@ from swift.common.swob import Request, Response from swift.common.middleware.acl import format_acl from swift.proxy import server as proxy_server from swift.proxy.controllers.base import headers_to_account_info -from swift.common.constraints import MAX_ACCOUNT_NAME_LENGTH as MAX_ANAME_LEN +from swift.common import constraints from test.unit import fake_http_connect, FakeRing, FakeMemcache +from swift.common.storage_policy import StoragePolicy from swift.common.request_helpers import get_sys_meta_prefix import swift.proxy.controllers.base +from test.unit import patch_policies + +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestAccountController(unittest.TestCase): def setUp(self): - self.app = proxy_server.Application(None, FakeMemcache(), - account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing()) + self.app = proxy_server.Application( + None, FakeMemcache(), + account_ring=FakeRing(), container_ring=FakeRing()) def test_account_info_in_response_env(self): controller = proxy_server.AccountController(self.app, 'AUTH_bob') @@ -79,7 +82,8 @@ class TestAccountController(unittest.TestCase): self.assertEquals(410, resp.status_int) def test_long_acct_names(self): - long_acct_name = '%sLongAccountName' % ('Very' * (MAX_ANAME_LEN // 4)) + long_acct_name = '%sLongAccountName' % ( + 'Very' * (constraints.MAX_ACCOUNT_NAME_LENGTH // 4)) controller = proxy_server.AccountController(self.app, long_acct_name) req = Request.blank('/v1/%s' % long_acct_name) diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index 0c94f90..037e28b 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -13,87 +13,159 @@ # See the License for the specific language governing permissions and # limitations under the License. +import itertools +from collections import defaultdict import unittest from mock import patch from swift.proxy.controllers.base import headers_to_container_info, \ headers_to_account_info, headers_to_object_info, get_container_info, \ get_container_memcache_key, get_account_info, get_account_memcache_key, \ - get_object_env_key, _get_cache_key, get_info, get_object_info, \ - Controller, GetOrHeadHandler -from swift.common.swob import Request, HTTPException, HeaderKeyDict + get_object_env_key, get_info, get_object_info, \ + Controller, GetOrHeadHandler, _set_info_cache, _set_object_info_cache, \ + bytes_to_skip +from swift.common.swob import Request, HTTPException, HeaderKeyDict, \ + RESPONSE_REASONS +from swift.common import exceptions from swift.common.utils import split_path +from swift.common.http import is_success +from swift.common.storage_policy import StoragePolicy from test.unit import fake_http_connect, FakeRing, FakeMemcache from swift.proxy import server as proxy_server from swift.common.request_helpers import get_sys_meta_prefix - -FakeResponse_status_int = 201 +from test.unit import patch_policies class FakeResponse(object): - def __init__(self, headers, env, account, container, obj): - self.headers = headers - self.status_int = FakeResponse_status_int - self.environ = env - if obj: - env_key = get_object_env_key(account, container, obj) - else: - cache_key, env_key = _get_cache_key(account, container) - if account and container and obj: - info = headers_to_object_info(headers, FakeResponse_status_int) - elif account and container: - info = headers_to_container_info(headers, FakeResponse_status_int) - else: - info = headers_to_account_info(headers, FakeResponse_status_int) - env[env_key] = info + base_headers = {} + def __init__(self, status_int=200, headers=None, body=''): + self.status_int = status_int + self._headers = headers or {} + self.body = body + + @property + def headers(self): + if is_success(self.status_int): + self._headers.update(self.base_headers) + return self._headers -class FakeRequest(object): - def __init__(self, env, path, swift_source=None): - self.environ = env - (version, account, container, obj) = split_path(path, 2, 4, True) - self.account = account - self.container = container - self.obj = obj - if obj: - stype = 'object' - self.headers = {'content-length': 5555, - 'content-type': 'text/plain'} - else: - stype = container and 'container' or 'account' - self.headers = {'x-%s-object-count' % (stype): 1000, - 'x-%s-bytes-used' % (stype): 6666} - if swift_source: - meta = 'x-%s-meta-fakerequest-swift-source' % stype - self.headers[meta] = swift_source - def get_response(self, app): - return FakeResponse(self.headers, self.environ, self.account, - self.container, self.obj) +class AccountResponse(FakeResponse): + base_headers = { + 'x-account-container-count': 333, + 'x-account-object-count': 1000, + 'x-account-bytes-used': 6666, + } -class FakeCache(object): - def __init__(self, val): - self.val = val - def get(self, *args): - return self.val +class ContainerResponse(FakeResponse): + base_headers = { + 'x-container-object-count': 1000, + 'x-container-bytes-used': 6666, + } + +class ObjectResponse(FakeResponse): + + base_headers = { + 'content-length': 5555, + 'content-type': 'text/plain' + } + + +class DynamicResponseFactory(object): + + def __init__(self, *statuses): + if statuses: + self.statuses = iter(statuses) + else: + self.statuses = itertools.repeat(200) + self.stats = defaultdict(int) + + response_type = { + 'obj': ObjectResponse, + 'container': ContainerResponse, + 'account': AccountResponse, + } + + def _get_response(self, type_): + self.stats[type_] += 1 + class_ = self.response_type[type_] + return class_(self.statuses.next()) + + def get_response(self, environ): + (version, account, container, obj) = split_path( + environ['PATH_INFO'], 2, 4, True) + if obj: + resp = self._get_response('obj') + elif container: + resp = self._get_response('container') + else: + resp = self._get_response('account') + resp.account = account + resp.container = container + resp.obj = obj + return resp + + +class FakeApp(object): + + recheck_container_existence = 30 + recheck_account_existence = 30 + + def __init__(self, response_factory=None, statuses=None): + self.responses = response_factory or \ + DynamicResponseFactory(*statuses or []) + self.sources = [] + + def __call__(self, environ, start_response): + self.sources.append(environ.get('swift.source')) + response = self.responses.get_response(environ) + reason = RESPONSE_REASONS[response.status_int][0] + start_response('%d %s' % (response.status_int, reason), + [(k, v) for k, v in response.headers.items()]) + # It's a bit strnage, but the get_info cache stuff relies on the + # app setting some keys in the environment as it makes requests + # (in particular GETorHEAD_base) - so our fake does the same + _set_info_cache(self, environ, response.account, + response.container, response) + if response.obj: + _set_object_info_cache(self, environ, response.account, + response.container, response.obj, + response) + return iter(response.body) + + +class FakeCache(FakeMemcache): + def __init__(self, stub=None, **pre_cached): + super(FakeCache, self).__init__() + if pre_cached: + self.store.update(pre_cached) + self.stub = stub + + def get(self, key): + return self.stub or self.store.get(key) + + +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestFuncs(unittest.TestCase): def setUp(self): self.app = proxy_server.Application(None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing) + container_ring=FakeRing()) def test_GETorHEAD_base(self): base = Controller(self.app) req = Request.blank('/v1/a/c/o/with/slashes') + ring = FakeRing() + nodes = list(ring.get_part_nodes(0)) + list(ring.get_more_nodes(0)) with patch('swift.proxy.controllers.base.' 'http_connect', fake_http_connect(200)): - resp = base.GETorHEAD_base(req, 'object', FakeRing(), 'part', + resp = base.GETorHEAD_base(req, 'object', iter(nodes), 'part', '/a/c/o/with/slashes') self.assertTrue('swift.object/a/c/o/with/slashes' in resp.environ) self.assertEqual( @@ -101,14 +173,14 @@ class TestFuncs(unittest.TestCase): req = Request.blank('/v1/a/c/o') with patch('swift.proxy.controllers.base.' 'http_connect', fake_http_connect(200)): - resp = base.GETorHEAD_base(req, 'object', FakeRing(), 'part', + resp = base.GETorHEAD_base(req, 'object', iter(nodes), 'part', '/a/c/o') self.assertTrue('swift.object/a/c/o' in resp.environ) self.assertEqual(resp.environ['swift.object/a/c/o']['status'], 200) req = Request.blank('/v1/a/c') with patch('swift.proxy.controllers.base.' 'http_connect', fake_http_connect(200)): - resp = base.GETorHEAD_base(req, 'container', FakeRing(), 'part', + resp = base.GETorHEAD_base(req, 'container', iter(nodes), 'part', '/a/c') self.assertTrue('swift.container/a/c' in resp.environ) self.assertEqual(resp.environ['swift.container/a/c']['status'], 200) @@ -116,150 +188,166 @@ class TestFuncs(unittest.TestCase): req = Request.blank('/v1/a') with patch('swift.proxy.controllers.base.' 'http_connect', fake_http_connect(200)): - resp = base.GETorHEAD_base(req, 'account', FakeRing(), 'part', + resp = base.GETorHEAD_base(req, 'account', iter(nodes), 'part', '/a') self.assertTrue('swift.account/a' in resp.environ) self.assertEqual(resp.environ['swift.account/a']['status'], 200) def test_get_info(self): - global FakeResponse_status_int + app = FakeApp() # Do a non cached call to account env = {} - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - info_a = get_info(None, env, 'a') + info_a = get_info(app, env, 'a') # Check that you got proper info - self.assertEquals(info_a['status'], 201) + self.assertEquals(info_a['status'], 200) self.assertEquals(info_a['bytes'], 6666) self.assertEquals(info_a['total_object_count'], 1000) # Make sure the env cache is set self.assertEquals(env.get('swift.account/a'), info_a) + # Make sure the app was called + self.assertEqual(app.responses.stats['account'], 1) # Do an env cached call to account - info_a = get_info(None, env, 'a') + info_a = get_info(app, env, 'a') # Check that you got proper info - self.assertEquals(info_a['status'], 201) + self.assertEquals(info_a['status'], 200) self.assertEquals(info_a['bytes'], 6666) self.assertEquals(info_a['total_object_count'], 1000) # Make sure the env cache is set self.assertEquals(env.get('swift.account/a'), info_a) + # Make sure the app was NOT called AGAIN + self.assertEqual(app.responses.stats['account'], 1) # This time do env cached call to account and non cached to container - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - info_c = get_info(None, env, 'a', 'c') + info_c = get_info(app, env, 'a', 'c') # Check that you got proper info - self.assertEquals(info_a['status'], 201) + self.assertEquals(info_c['status'], 200) self.assertEquals(info_c['bytes'], 6666) self.assertEquals(info_c['object_count'], 1000) # Make sure the env cache is set self.assertEquals(env.get('swift.account/a'), info_a) self.assertEquals(env.get('swift.container/a/c'), info_c) + # Make sure the app was called for container + self.assertEqual(app.responses.stats['container'], 1) # This time do a non cached call to account than non cached to # container + app = FakeApp() env = {} # abandon previous call to env - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - info_c = get_info(None, env, 'a', 'c') + info_c = get_info(app, env, 'a', 'c') # Check that you got proper info - self.assertEquals(info_a['status'], 201) + self.assertEquals(info_c['status'], 200) self.assertEquals(info_c['bytes'], 6666) self.assertEquals(info_c['object_count'], 1000) # Make sure the env cache is set self.assertEquals(env.get('swift.account/a'), info_a) self.assertEquals(env.get('swift.container/a/c'), info_c) + # check app calls both account and container + self.assertEqual(app.responses.stats['account'], 1) + self.assertEqual(app.responses.stats['container'], 1) # This time do an env cached call to container while account is not # cached del(env['swift.account/a']) - info_c = get_info(None, env, 'a', 'c') + info_c = get_info(app, env, 'a', 'c') # Check that you got proper info - self.assertEquals(info_a['status'], 201) + self.assertEquals(info_a['status'], 200) self.assertEquals(info_c['bytes'], 6666) self.assertEquals(info_c['object_count'], 1000) # Make sure the env cache is set and account still not cached self.assertEquals(env.get('swift.container/a/c'), info_c) + # no additional calls were made + self.assertEqual(app.responses.stats['account'], 1) + self.assertEqual(app.responses.stats['container'], 1) # Do a non cached call to account not found with ret_not_found + app = FakeApp(statuses=(404,)) env = {} - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - try: - FakeResponse_status_int = 404 - info_a = get_info(None, env, 'a', ret_not_found=True) - finally: - FakeResponse_status_int = 201 + info_a = get_info(app, env, 'a', ret_not_found=True) # Check that you got proper info self.assertEquals(info_a['status'], 404) - self.assertEquals(info_a['bytes'], 6666) - self.assertEquals(info_a['total_object_count'], 1000) + self.assertEquals(info_a['bytes'], None) + self.assertEquals(info_a['total_object_count'], None) # Make sure the env cache is set self.assertEquals(env.get('swift.account/a'), info_a) + # and account was called + self.assertEqual(app.responses.stats['account'], 1) # Do a cached call to account not found with ret_not_found - info_a = get_info(None, env, 'a', ret_not_found=True) + info_a = get_info(app, env, 'a', ret_not_found=True) # Check that you got proper info self.assertEquals(info_a['status'], 404) - self.assertEquals(info_a['bytes'], 6666) - self.assertEquals(info_a['total_object_count'], 1000) + self.assertEquals(info_a['bytes'], None) + self.assertEquals(info_a['total_object_count'], None) # Make sure the env cache is set self.assertEquals(env.get('swift.account/a'), info_a) + # add account was NOT called AGAIN + self.assertEqual(app.responses.stats['account'], 1) # Do a non cached call to account not found without ret_not_found + app = FakeApp(statuses=(404,)) env = {} - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - try: - FakeResponse_status_int = 404 - info_a = get_info(None, env, 'a') - finally: - FakeResponse_status_int = 201 + info_a = get_info(app, env, 'a') # Check that you got proper info self.assertEquals(info_a, None) self.assertEquals(env['swift.account/a']['status'], 404) + # and account was called + self.assertEqual(app.responses.stats['account'], 1) # Do a cached call to account not found without ret_not_found info_a = get_info(None, env, 'a') # Check that you got proper info self.assertEquals(info_a, None) self.assertEquals(env['swift.account/a']['status'], 404) + # add account was NOT called AGAIN + self.assertEqual(app.responses.stats['account'], 1) def test_get_container_info_swift_source(self): - req = Request.blank("/v1/a/c", environ={'swift.cache': FakeCache({})}) - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - resp = get_container_info(req.environ, 'app', swift_source='MC') - self.assertEquals(resp['meta']['fakerequest-swift-source'], 'MC') + app = FakeApp() + req = Request.blank("/v1/a/c", environ={'swift.cache': FakeCache()}) + get_container_info(req.environ, app, swift_source='MC') + self.assertEqual(app.sources, ['GET_INFO', 'MC']) def test_get_object_info_swift_source(self): + app = FakeApp() req = Request.blank("/v1/a/c/o", - environ={'swift.cache': FakeCache({})}) - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - resp = get_object_info(req.environ, 'app', swift_source='LU') - self.assertEquals(resp['meta']['fakerequest-swift-source'], 'LU') + environ={'swift.cache': FakeCache()}) + get_object_info(req.environ, app, swift_source='LU') + self.assertEqual(app.sources, ['LU']) def test_get_container_info_no_cache(self): req = Request.blank("/v1/AUTH_account/cont", environ={'swift.cache': FakeCache({})}) - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - resp = get_container_info(req.environ, 'xxx') + resp = get_container_info(req.environ, FakeApp()) + self.assertEquals(resp['storage_policy'], '0') self.assertEquals(resp['bytes'], 6666) self.assertEquals(resp['object_count'], 1000) + def test_get_container_info_no_account(self): + responses = DynamicResponseFactory(404, 200) + app = FakeApp(responses) + req = Request.blank("/v1/AUTH_does_not_exist/cont") + info = get_container_info(req.environ, app) + self.assertEqual(info['status'], 0) + + def test_get_container_info_no_auto_account(self): + responses = DynamicResponseFactory(404, 200) + app = FakeApp(responses) + req = Request.blank("/v1/.system_account/cont") + info = get_container_info(req.environ, app) + self.assertEqual(info['status'], 200) + self.assertEquals(info['bytes'], 6666) + self.assertEquals(info['object_count'], 1000) + def test_get_container_info_cache(self): - cached = {'status': 404, - 'bytes': 3333, - 'object_count': 10, - # simplejson sometimes hands back strings, sometimes unicodes - 'versions': u"\u1F4A9"} + cache_stub = { + 'status': 404, 'bytes': 3333, 'object_count': 10, + # simplejson sometimes hands back strings, sometimes unicodes + 'versions': u"\u1F4A9"} req = Request.blank("/v1/account/cont", - environ={'swift.cache': FakeCache(cached)}) - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - resp = get_container_info(req.environ, 'xxx') + environ={'swift.cache': FakeCache(cache_stub)}) + resp = get_container_info(req.environ, FakeApp()) + self.assertEquals(resp['storage_policy'], '0') self.assertEquals(resp['bytes'], 3333) self.assertEquals(resp['object_count'], 10) self.assertEquals(resp['status'], 404) @@ -275,18 +363,16 @@ class TestFuncs(unittest.TestCase): self.assertEquals(resp['bytes'], 3867) def test_get_account_info_swift_source(self): - req = Request.blank("/v1/a", environ={'swift.cache': FakeCache({})}) - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - resp = get_account_info(req.environ, 'a', swift_source='MC') - self.assertEquals(resp['meta']['fakerequest-swift-source'], 'MC') + app = FakeApp() + req = Request.blank("/v1/a", environ={'swift.cache': FakeCache()}) + get_account_info(req.environ, app, swift_source='MC') + self.assertEqual(app.sources, ['MC']) def test_get_account_info_no_cache(self): + app = FakeApp() req = Request.blank("/v1/AUTH_account", environ={'swift.cache': FakeCache({})}) - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - resp = get_account_info(req.environ, 'xxx') + resp = get_account_info(req.environ, app) self.assertEquals(resp['bytes'], 6666) self.assertEquals(resp['total_object_count'], 1000) @@ -297,9 +383,7 @@ class TestFuncs(unittest.TestCase): 'total_object_count': 10} req = Request.blank("/v1/account/cont", environ={'swift.cache': FakeCache(cached)}) - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - resp = get_account_info(req.environ, 'xxx') + resp = get_account_info(req.environ, FakeApp()) self.assertEquals(resp['bytes'], 3333) self.assertEquals(resp['total_object_count'], 10) self.assertEquals(resp['status'], 404) @@ -312,9 +396,7 @@ class TestFuncs(unittest.TestCase): 'meta': {}} req = Request.blank("/v1/account/cont", environ={'swift.cache': FakeCache(cached)}) - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - resp = get_account_info(req.environ, 'xxx') + resp = get_account_info(req.environ, FakeApp()) self.assertEquals(resp['status'], 404) self.assertEquals(resp['bytes'], '3333') self.assertEquals(resp['container_count'], 234) @@ -344,11 +426,13 @@ class TestFuncs(unittest.TestCase): self.assertEquals(resp['type'], 'application/json') def test_get_object_info_no_env(self): + app = FakeApp() req = Request.blank("/v1/account/cont/obj", environ={'swift.cache': FakeCache({})}) - with patch('swift.proxy.controllers.base.' - '_prepare_pre_auth_info_request', FakeRequest): - resp = get_object_info(req.environ, 'xxx') + resp = get_object_info(req.environ, app) + self.assertEqual(app.responses.stats['account'], 0) + self.assertEqual(app.responses.stats['container'], 0) + self.assertEqual(app.responses.stats['obj'], 1) self.assertEquals(resp['length'], 5555) self.assertEquals(resp['type'], 'text/plain') @@ -443,6 +527,15 @@ class TestFuncs(unittest.TestCase): self.assertEquals(resp['meta']['whatevs'], 14) self.assertEquals(resp['meta']['somethingelse'], 0) + def test_headers_to_object_info_sys_meta(self): + prefix = get_sys_meta_prefix('object') + headers = {'%sWhatevs' % prefix: 14, + '%ssomethingelse' % prefix: 0} + resp = headers_to_object_info(headers.items(), 200) + self.assertEquals(len(resp['sysmeta']), 2) + self.assertEquals(resp['sysmeta']['whatevs'], 14) + self.assertEquals(resp['sysmeta']['somethingelse'], 0) + def test_headers_to_object_info_values(self): headers = { 'content-length': '1024', @@ -457,7 +550,7 @@ class TestFuncs(unittest.TestCase): resp, headers_to_object_info(headers.items(), 200)) - def test_have_quorum(self): + def test_base_have_quorum(self): base = Controller(self.app) # just throw a bunch of test cases at it self.assertEqual(base.have_quorum([201, 404], 3), False) @@ -470,6 +563,32 @@ class TestFuncs(unittest.TestCase): self.assertEqual(base.have_quorum([404, 404], 2), True) self.assertEqual(base.have_quorum([201, 404, 201, 201], 4), True) + def test_best_response_overrides(self): + base = Controller(self.app) + responses = [ + (302, 'Found', '', 'The resource has moved temporarily.'), + (100, 'Continue', '', ''), + (404, 'Not Found', '', 'Custom body'), + ] + server_type = "Base DELETE" + req = Request.blank('/v1/a/c/o', method='DELETE') + statuses, reasons, headers, bodies = zip(*responses) + + # First test that you can't make a quorum with only overridden + # responses + overrides = {302: 204, 100: 204} + resp = base.best_response(req, statuses, reasons, bodies, server_type, + headers=headers, overrides=overrides) + self.assertEqual(resp.status, '503 Internal Server Error') + + # next make a 404 quorum and make sure the last delete (real) 404 + # status is the one returned. + overrides = {100: 404} + resp = base.best_response(req, statuses, reasons, bodies, server_type, + headers=headers, overrides=overrides) + self.assertEqual(resp.status, '404 Not Found') + self.assertEqual(resp.body, 'Custom body') + def test_range_fast_forward(self): req = Request.blank('/') handler = GetOrHeadHandler(None, req, None, None, None, None, {}) @@ -512,7 +631,8 @@ class TestFuncs(unittest.TestCase): req = Request.blank('/v1/a/c/o', headers=src_headers) dst_headers = base.generate_request_headers(req, transfer=True) expected_headers = {'x-base-meta-owner': '', - 'x-base-meta-size': '151M'} + 'x-base-meta-size': '151M', + 'connection': 'close'} for k, v in expected_headers.iteritems(): self.assertTrue(k in dst_headers) self.assertEqual(v, dst_headers[k]) @@ -532,3 +652,88 @@ class TestFuncs(unittest.TestCase): self.assertEqual(v, dst_headers[k.lower()]) for k, v in bad_hdrs.iteritems(): self.assertFalse(k.lower() in dst_headers) + + def test_client_chunk_size(self): + + class TestSource(object): + def __init__(self, chunks): + self.chunks = list(chunks) + + def read(self, _read_size): + if self.chunks: + return self.chunks.pop(0) + else: + return '' + + source = TestSource(( + 'abcd', '1234', 'abc', 'd1', '234abcd1234abcd1', '2')) + req = Request.blank('/v1/a/c/o') + node = {} + handler = GetOrHeadHandler(self.app, req, None, None, None, None, {}, + client_chunk_size=8) + + app_iter = handler._make_app_iter(req, node, source) + client_chunks = list(app_iter) + self.assertEqual(client_chunks, [ + 'abcd1234', 'abcd1234', 'abcd1234', 'abcd12']) + + def test_client_chunk_size_resuming(self): + + class TestSource(object): + def __init__(self, chunks): + self.chunks = list(chunks) + + def read(self, _read_size): + if self.chunks: + chunk = self.chunks.pop(0) + if chunk is None: + raise exceptions.ChunkReadTimeout() + else: + return chunk + else: + return '' + + node = {'ip': '1.2.3.4', 'port': 6000, 'device': 'sda'} + + source1 = TestSource(['abcd', '1234', 'abc', None]) + source2 = TestSource(['efgh5678']) + req = Request.blank('/v1/a/c/o') + handler = GetOrHeadHandler( + self.app, req, 'Object', None, None, None, {}, + client_chunk_size=8) + + app_iter = handler._make_app_iter(req, node, source1) + with patch.object(handler, '_get_source_and_node', + lambda: (source2, node)): + client_chunks = list(app_iter) + self.assertEqual(client_chunks, ['abcd1234', 'efgh5678']) + self.assertEqual(handler.backend_headers['Range'], 'bytes=8-') + + def test_bytes_to_skip(self): + # if you start at the beginning, skip nothing + self.assertEqual(bytes_to_skip(1024, 0), 0) + + # missed the first 10 bytes, so we've got 1014 bytes of partial + # record + self.assertEqual(bytes_to_skip(1024, 10), 1014) + + # skipped some whole records first + self.assertEqual(bytes_to_skip(1024, 4106), 1014) + + # landed on a record boundary + self.assertEqual(bytes_to_skip(1024, 1024), 0) + self.assertEqual(bytes_to_skip(1024, 2048), 0) + + # big numbers + self.assertEqual(bytes_to_skip(2 ** 20, 2 ** 32), 0) + self.assertEqual(bytes_to_skip(2 ** 20, 2 ** 32 + 1), 2 ** 20 - 1) + self.assertEqual(bytes_to_skip(2 ** 20, 2 ** 32 + 2 ** 19), 2 ** 19) + + # odd numbers + self.assertEqual(bytes_to_skip(123, 0), 0) + self.assertEqual(bytes_to_skip(123, 23), 100) + self.assertEqual(bytes_to_skip(123, 247), 122) + + # prime numbers + self.assertEqual(bytes_to_skip(11, 7), 4) + self.assertEqual(bytes_to_skip(97, 7873823), 55) diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py index 7c8ecf7..715cd94 100644 --- a/test/unit/proxy/controllers/test_container.py +++ b/test/unit/proxy/controllers/test_container.py @@ -16,19 +16,61 @@ import mock import unittest +from eventlet import Timeout + from swift.common.swob import Request from swift.proxy import server as proxy_server from swift.proxy.controllers.base import headers_to_container_info from test.unit import fake_http_connect, FakeRing, FakeMemcache +from swift.common.storage_policy import StoragePolicy from swift.common.request_helpers import get_sys_meta_prefix +from swift.common import utils + +from test.unit import patch_policies, mocked_http_conn, debug_logger +from test.unit.proxy.test_server import node_error_count +@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestContainerController(unittest.TestCase): def setUp(self): + # SOF + self._orig_hash_suffix = utils.HASH_PATH_SUFFIX + self._orig_hash_prefix = utils.HASH_PATH_PREFIX + utils.HASH_PATH_SUFFIX = 'endcap' + utils.HASH_PATH_PREFIX = '' + self.logger = debug_logger() + self.container_ring = FakeRing(max_more_nodes=9) self.app = proxy_server.Application(None, FakeMemcache(), + logger=self.logger, account_ring=FakeRing(), - container_ring=FakeRing(), - object_ring=FakeRing()) + container_ring=self.container_ring) + + self.account_info = { + 'status': 200, + 'container_count': '10', + 'total_object_count': '100', + 'bytes': '1000', + 'meta': {}, + 'sysmeta': {}, + } + + class FakeAccountInfoContainerController( + proxy_server.ContainerController): + + def account_info(controller, *args, **kwargs): + patch_path = 'swift.proxy.controllers.base.get_info' + with mock.patch(patch_path) as mock_get_info: + mock_get_info.return_value = dict(self.account_info) + return super(FakeAccountInfoContainerController, + controller).account_info( + *args, **kwargs) + _orig_get_controller = self.app.get_controller + + def wrapped_get_controller(*args, **kwargs): + with mock.patch('swift.proxy.server.ContainerController', + new=FakeAccountInfoContainerController): + return _orig_get_controller(*args, **kwargs) + self.app.get_controller = wrapped_get_controller def test_container_info_in_response_env(self): controller = proxy_server.ContainerController(self.app, 'a', 'c') @@ -118,6 +160,53 @@ class TestContainerController(unittest.TestCase): self.assertEqual(context['headers'][user_meta_key], 'bar') self.assertNotEqual(context['headers']['x-timestamp'], '1.0') + def test_node_errors(self): + self.app.sort_nodes = lambda n: n + + for method in ('PUT', 'DELETE', 'POST'): + def test_status_map(statuses, expected): + self.app._error_limiting = {} + req = Request.blank('/v1/a/c', method=method) + with mocked_http_conn(*statuses) as fake_conn: + print 'a' * 50 + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, expected) + for req in fake_conn.requests: + self.assertEqual(req['method'], method) + self.assert_(req['path'].endswith('/a/c')) + + base_status = [201] * 3 + # test happy path + test_status_map(list(base_status), 201) + for i in range(3): + self.assertEqual(node_error_count( + self.app, self.container_ring.devs[i]), 0) + # single node errors and test isolation + for i in range(3): + status_list = list(base_status) + status_list[i] = 503 + status_list.append(201) + test_status_map(status_list, 201) + for j in range(3): + expected = 1 if j == i else 0 + self.assertEqual(node_error_count( + self.app, self.container_ring.devs[j]), expected) + # timeout + test_status_map((201, Timeout(), 201, 201), 201) + self.assertEqual(node_error_count( + self.app, self.container_ring.devs[1]), 1) + + # exception + test_status_map((Exception('kaboom!'), 201, 201, 201), 201) + self.assertEqual(node_error_count( + self.app, self.container_ring.devs[0]), 1) + + # insufficient storage + test_status_map((201, 201, 507, 201), 201) + self.assertEqual(node_error_count( + self.app, self.container_ring.devs[2]), + self.app.error_suppression_limit + 1) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 4942691..a38e753 100755 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -14,185 +14,1397 @@ # See the License for the specific language governing permissions and # limitations under the License. +import email.parser +import itertools +import random +import time import unittest +from collections import defaultdict from contextlib import contextmanager +import json +from hashlib import md5 import mock +from eventlet import Timeout import swift +from swift.common import utils, swob from swift.proxy import server as proxy_server -from swift.common.swob import HTTPException -from test.unit import FakeRing, FakeMemcache, fake_http_connect, debug_logger +from swift.proxy.controllers import obj +from swift.proxy.controllers.base import get_info as _real_get_info +from swift.common.storage_policy import POLICIES, ECDriverError + +from test.unit import FakeRing, FakeMemcache, fake_http_connect, \ + debug_logger, patch_policies, SlowBody +from test.unit.proxy.test_server import node_error_count + + +def unchunk_body(chunked_body): + body = '' + remaining = chunked_body + while remaining: + hex_length, remaining = remaining.split('\r\n', 1) + length = int(hex_length, 16) + body += remaining[:length] + remaining = remaining[length + 2:] + return body @contextmanager def set_http_connect(*args, **kwargs): old_connect = swift.proxy.controllers.base.http_connect new_connect = fake_http_connect(*args, **kwargs) - swift.proxy.controllers.base.http_connect = new_connect - swift.proxy.controllers.obj.http_connect = new_connect - swift.proxy.controllers.account.http_connect = new_connect - swift.proxy.controllers.container.http_connect = new_connect - yield new_connect - swift.proxy.controllers.base.http_connect = old_connect - swift.proxy.controllers.obj.http_connect = old_connect - swift.proxy.controllers.account.http_connect = old_connect - swift.proxy.controllers.container.http_connect = old_connect - - -class TestObjControllerWriteAffinity(unittest.TestCase): + try: + swift.proxy.controllers.base.http_connect = new_connect + swift.proxy.controllers.obj.http_connect = new_connect + swift.proxy.controllers.account.http_connect = new_connect + swift.proxy.controllers.container.http_connect = new_connect + yield new_connect + left_over_status = list(new_connect.code_iter) + if left_over_status: + raise AssertionError('left over status %r' % left_over_status) + finally: + swift.proxy.controllers.base.http_connect = old_connect + swift.proxy.controllers.obj.http_connect = old_connect + swift.proxy.controllers.account.http_connect = old_connect + swift.proxy.controllers.container.http_connect = old_connect + + +class PatchedObjControllerApp(proxy_server.Application): + """ + This patch is just a hook over the proxy server's __call__ to ensure + that calls to get_info will return the stubbed value for + container_info if it's a container info call. + """ + + container_info = {} + per_container_info = {} + + def __call__(self, *args, **kwargs): + + def _fake_get_info(app, env, account, container=None, **kwargs): + if container: + if container in self.per_container_info: + return self.per_container_info[container] + return self.container_info + else: + return _real_get_info(app, env, account, container, **kwargs) + + mock_path = 'swift.proxy.controllers.base.get_info' + with mock.patch(mock_path, new=_fake_get_info): + return super( + PatchedObjControllerApp, self).__call__(*args, **kwargs) + + +class BaseObjectControllerMixin(object): + container_info = { + 'write_acl': None, + 'read_acl': None, + 'storage_policy': None, + 'sync_key': None, + 'versions': None, + } + + # this needs to be set on the test case + controller_cls = None + def setUp(self): - self.app = proxy_server.Application( + # setup fake rings with handoffs + for policy in POLICIES: + policy.object_ring.max_more_nodes = policy.object_ring.replicas + + self.logger = debug_logger('proxy-server') + self.logger.thread_locals = ('txn1', '127.0.0.2') + self.app = PatchedObjControllerApp( None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), object_ring=FakeRing(max_more_nodes=9)) - self.app.request_node_count = lambda replicas: 10000000 - self.app.sort_nodes = lambda l: l # stop shuffling the primary nodes + container_ring=FakeRing(), logger=self.logger) + # you can over-ride the container_info just by setting it on the app + self.app.container_info = dict(self.container_info) + # default policy and ring references + self.policy = POLICIES.default + self.obj_ring = self.policy.object_ring + self._ts_iter = (utils.Timestamp(t) for t in + itertools.count(int(time.time()))) + + def ts(self): + return self._ts_iter.next() + + def replicas(self, policy=None): + policy = policy or POLICIES.default + return policy.object_ring.replicas + + def quorum(self, policy=None): + policy = policy or POLICIES.default + return policy.quorum def test_iter_nodes_local_first_noops_when_no_affinity(self): - controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') + # this test needs a stable node order - most don't + self.app.sort_nodes = lambda l: l + controller = self.controller_cls( + self.app, 'a', 'c', 'o') self.app.write_affinity_is_local_fn = None - - all_nodes = self.app.object_ring.get_part_nodes(1) - all_nodes.extend(self.app.object_ring.get_more_nodes(1)) + object_ring = self.app.get_object_ring(None) + all_nodes = object_ring.get_part_nodes(1) + all_nodes.extend(object_ring.get_more_nodes(1)) local_first_nodes = list(controller.iter_nodes_local_first( - self.app.object_ring, 1)) + object_ring, 1)) self.maxDiff = None self.assertEqual(all_nodes, local_first_nodes) def test_iter_nodes_local_first_moves_locals_first(self): - controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') + controller = self.controller_cls( + self.app, 'a', 'c', 'o') self.app.write_affinity_is_local_fn = ( lambda node: node['region'] == 1) - self.app.write_affinity_node_count = lambda ring: 4 + # we'll write to one more than replica count local nodes + self.app.write_affinity_node_count = lambda r: r + 1 + + object_ring = self.app.get_object_ring(None) + # make our fake ring have plenty of nodes, and not get limited + # artificially by the proxy max request node count + object_ring.max_more_nodes = 100000 + self.app.request_node_count = lambda r: 100000 - all_nodes = self.app.object_ring.get_part_nodes(1) - all_nodes.extend(self.app.object_ring.get_more_nodes(1)) + all_nodes = object_ring.get_part_nodes(1) + all_nodes.extend(object_ring.get_more_nodes(1)) + # i guess fake_ring wants the get_more_nodes iter to more safely be + # converted to a list with a smallish sort of limit which *can* be + # lower than max_more_nodes + fake_rings_real_max_more_nodes_value = object_ring.replicas ** 2 + self.assertEqual(len(all_nodes), fake_rings_real_max_more_nodes_value) + + # make sure we have enough local nodes (sanity) + all_local_nodes = [n for n in all_nodes if + self.app.write_affinity_is_local_fn(n)] + self.assertTrue(len(all_local_nodes) >= self.replicas() + 1) + + # finally, create the local_first_nodes iter and flatten it out local_first_nodes = list(controller.iter_nodes_local_first( - self.app.object_ring, 1)) + object_ring, 1)) # the local nodes move up in the ordering - self.assertEqual([1, 1, 1, 1], - [node['region'] for node in local_first_nodes[:4]]) + self.assertEqual([1] * (self.replicas() + 1), [ + node['region'] for node in local_first_nodes[ + :self.replicas() + 1]]) # we don't skip any nodes + self.assertEqual(len(all_nodes), len(local_first_nodes)) + self.assertEqual(sorted(all_nodes), sorted(local_first_nodes)) + + def test_iter_nodes_local_first_best_effort(self): + controller = self.controller_cls( + self.app, 'a', 'c', 'o') + self.app.write_affinity_is_local_fn = ( + lambda node: node['region'] == 1) + + object_ring = self.app.get_object_ring(None) + all_nodes = object_ring.get_part_nodes(1) + all_nodes.extend(object_ring.get_more_nodes(1)) + + local_first_nodes = list(controller.iter_nodes_local_first( + object_ring, 1)) + + # we won't have quite enough local nodes... + self.assertEqual(len(all_nodes), self.replicas() + + POLICIES.default.object_ring.max_more_nodes) + all_local_nodes = [n for n in all_nodes if + self.app.write_affinity_is_local_fn(n)] + self.assertEqual(len(all_local_nodes), self.replicas()) + # but the local nodes we do have are at the front of the local iter + first_n_local_first_nodes = local_first_nodes[:len(all_local_nodes)] + self.assertEqual(sorted(all_local_nodes), + sorted(first_n_local_first_nodes)) + # but we *still* don't *skip* any nodes + self.assertEqual(len(all_nodes), len(local_first_nodes)) self.assertEqual(sorted(all_nodes), sorted(local_first_nodes)) def test_connect_put_node_timeout(self): - controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') - self.app.conn_timeout = 0.1 - with set_http_connect(200, slow_connect=True): + controller = self.controller_cls( + self.app, 'a', 'c', 'o') + self.app.conn_timeout = 0.05 + with set_http_connect(slow_connect=True): nodes = [dict(ip='', port='', device='')] res = controller._connect_put_node(nodes, '', '', {}, ('', '')) self.assertTrue(res is None) + def test_DELETE_simple(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE') + codes = [204] * self.replicas() + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 204) + + def test_DELETE_missing_one(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE') + codes = [404] + [204] * (self.replicas() - 1) + random.shuffle(codes) + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 204) -class TestObjController(unittest.TestCase): - def setUp(self): - logger = debug_logger('proxy-server') - logger.thread_locals = ('txn1', '127.0.0.2') - self.app = proxy_server.Application( - None, FakeMemcache(), account_ring=FakeRing(), - container_ring=FakeRing(), object_ring=FakeRing(), - logger=logger) - self.controller = proxy_server.ObjectController(self.app, - 'a', 'c', 'o') - self.controller.container_info = mock.MagicMock(return_value={ - 'partition': 1, - 'nodes': [ - {'ip': '127.0.0.1', 'port': '1', 'device': 'sda'}, - {'ip': '127.0.0.1', 'port': '2', 'device': 'sda'}, - {'ip': '127.0.0.1', 'port': '3', 'device': 'sda'}, - ], - 'write_acl': None, - 'read_acl': None, - 'sync_key': None, - 'versions': None}) + def test_DELETE_not_found(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE') + codes = [404] * (self.replicas() - 1) + [204] + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 404) + + def test_DELETE_mostly_found(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE') + mostly_204s = [204] * self.quorum() + codes = mostly_204s + [404] * (self.replicas() - len(mostly_204s)) + self.assertEqual(len(codes), self.replicas()) + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 204) + + def test_DELETE_mostly_not_found(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE') + mostly_404s = [404] * self.quorum() + codes = mostly_404s + [204] * (self.replicas() - len(mostly_404s)) + self.assertEqual(len(codes), self.replicas()) + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 404) + + def test_DELETE_half_not_found_statuses(self): + self.obj_ring.set_replicas(4) + + req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE') + with set_http_connect(404, 204, 404, 204): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 204) + + def test_DELETE_half_not_found_headers_and_body(self): + # Transformed responses have bogus bodies and headers, so make sure we + # send the client headers and body from a real node's response. + self.obj_ring.set_replicas(4) + + status_codes = (404, 404, 204, 204) + bodies = ('not found', 'not found', '', '') + headers = [{}, {}, {'Pick-Me': 'yes'}, {'Pick-Me': 'yes'}] + + req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE') + with set_http_connect(*status_codes, body_iter=bodies, + headers=headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 204) + self.assertEquals(resp.headers.get('Pick-Me'), 'yes') + self.assertEquals(resp.body, '') + + def test_DELETE_handoff(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE') + codes = [204] * self.replicas() + with set_http_connect(507, *codes): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 204) + + def test_POST_non_int_delete_after(self): + t = str(int(time.time() + 100)) + '.1' + req = swob.Request.blank('/v1/a/c/o', method='POST', + headers={'Content-Type': 'foo/bar', + 'X-Delete-After': t}) + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 400) + self.assertEqual('Non-integer X-Delete-After', resp.body) + + def test_PUT_non_int_delete_after(self): + t = str(int(time.time() + 100)) + '.1' + req = swob.Request.blank('/v1/a/c/o', method='PUT', body='', + headers={'Content-Type': 'foo/bar', + 'X-Delete-After': t}) + with set_http_connect(): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 400) + self.assertEqual('Non-integer X-Delete-After', resp.body) + + def test_POST_negative_delete_after(self): + req = swob.Request.blank('/v1/a/c/o', method='POST', + headers={'Content-Type': 'foo/bar', + 'X-Delete-After': '-60'}) + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 400) + self.assertEqual('X-Delete-After in past', resp.body) + + def test_PUT_negative_delete_after(self): + req = swob.Request.blank('/v1/a/c/o', method='PUT', body='', + headers={'Content-Type': 'foo/bar', + 'X-Delete-After': '-60'}) + with set_http_connect(): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 400) + self.assertEqual('X-Delete-After in past', resp.body) + + def test_POST_delete_at_non_integer(self): + t = str(int(time.time() + 100)) + '.1' + req = swob.Request.blank('/v1/a/c/o', method='POST', + headers={'Content-Type': 'foo/bar', + 'X-Delete-At': t}) + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 400) + self.assertEqual('Non-integer X-Delete-At', resp.body) + + def test_PUT_delete_at_non_integer(self): + t = str(int(time.time() - 100)) + '.1' + req = swob.Request.blank('/v1/a/c/o', method='PUT', body='', + headers={'Content-Type': 'foo/bar', + 'X-Delete-At': t}) + with set_http_connect(): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 400) + self.assertEqual('Non-integer X-Delete-At', resp.body) + + def test_POST_delete_at_in_past(self): + t = str(int(time.time() - 100)) + req = swob.Request.blank('/v1/a/c/o', method='POST', + headers={'Content-Type': 'foo/bar', + 'X-Delete-At': t}) + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 400) + self.assertEqual('X-Delete-At in past', resp.body) + + def test_PUT_delete_at_in_past(self): + t = str(int(time.time() - 100)) + req = swob.Request.blank('/v1/a/c/o', method='PUT', body='', + headers={'Content-Type': 'foo/bar', + 'X-Delete-At': t}) + with set_http_connect(): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 400) + self.assertEqual('X-Delete-At in past', resp.body) + + def test_HEAD_simple(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD') + with set_http_connect(200): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 200) + + def test_HEAD_x_newest(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD', + headers={'X-Newest': 'true'}) + with set_http_connect(200, 200, 200): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 200) + + def test_HEAD_x_newest_different_timestamps(self): + req = swob.Request.blank('/v1/a/c/o', method='HEAD', + headers={'X-Newest': 'true'}) + ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + timestamps = [next(ts) for i in range(3)] + newest_timestamp = timestamps[-1] + random.shuffle(timestamps) + backend_response_headers = [{ + 'X-Backend-Timestamp': t.internal, + 'X-Timestamp': t.normal + } for t in timestamps] + with set_http_connect(200, 200, 200, + headers=backend_response_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['x-timestamp'], newest_timestamp.normal) + + def test_HEAD_x_newest_with_two_vector_timestamps(self): + req = swob.Request.blank('/v1/a/c/o', method='HEAD', + headers={'X-Newest': 'true'}) + ts = (utils.Timestamp(time.time(), offset=offset) + for offset in itertools.count()) + timestamps = [next(ts) for i in range(3)] + newest_timestamp = timestamps[-1] + random.shuffle(timestamps) + backend_response_headers = [{ + 'X-Backend-Timestamp': t.internal, + 'X-Timestamp': t.normal + } for t in timestamps] + with set_http_connect(200, 200, 200, + headers=backend_response_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.headers['x-backend-timestamp'], + newest_timestamp.internal) + + def test_HEAD_x_newest_with_some_missing(self): + req = swob.Request.blank('/v1/a/c/o', method='HEAD', + headers={'X-Newest': 'true'}) + ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + request_count = self.app.request_node_count(self.obj_ring.replicas) + backend_response_headers = [{ + 'x-timestamp': next(ts).normal, + } for i in range(request_count)] + responses = [404] * (request_count - 1) + responses.append(200) + request_log = [] + + def capture_requests(ip, port, device, part, method, path, + headers=None, **kwargs): + req = { + 'ip': ip, + 'port': port, + 'device': device, + 'part': part, + 'method': method, + 'path': path, + 'headers': headers, + } + request_log.append(req) + with set_http_connect(*responses, + headers=backend_response_headers, + give_connect=capture_requests): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + for req in request_log: + self.assertEqual(req['method'], 'HEAD') + self.assertEqual(req['path'], '/a/c/o') + + def test_container_sync_delete(self): + ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + test_indexes = [None] + [int(p) for p in POLICIES] + for policy_index in test_indexes: + req = swob.Request.blank( + '/v1/a/c/o', method='DELETE', headers={ + 'X-Timestamp': ts.next().internal}) + codes = [409] * self.obj_ring.replicas + ts_iter = itertools.repeat(ts.next().internal) + with set_http_connect(*codes, timestamps=ts_iter): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 409) + + def test_PUT_requires_length(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT') + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 411) + +# end of BaseObjectControllerMixin + + +@patch_policies() +class TestReplicatedObjController(BaseObjectControllerMixin, + unittest.TestCase): + + controller_cls = obj.ReplicatedObjectController def test_PUT_simple(self): - req = swift.common.swob.Request.blank('/v1/a/c/o') + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT') req.headers['content-length'] = '0' with set_http_connect(201, 201, 201): - resp = self.controller.PUT(req) + resp = req.get_response(self.app) self.assertEquals(resp.status_int, 201) def test_PUT_if_none_match(self): - req = swift.common.swob.Request.blank('/v1/a/c/o') + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT') req.headers['if-none-match'] = '*' req.headers['content-length'] = '0' with set_http_connect(201, 201, 201): - resp = self.controller.PUT(req) + resp = req.get_response(self.app) self.assertEquals(resp.status_int, 201) def test_PUT_if_none_match_denied(self): - req = swift.common.swob.Request.blank('/v1/a/c/o') + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT') req.headers['if-none-match'] = '*' req.headers['content-length'] = '0' - with set_http_connect(201, (412, 412), 201): - resp = self.controller.PUT(req) + with set_http_connect(201, 412, 201): + resp = req.get_response(self.app) self.assertEquals(resp.status_int, 412) def test_PUT_if_none_match_not_star(self): - req = swift.common.swob.Request.blank('/v1/a/c/o') + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT') req.headers['if-none-match'] = 'somethingelse' req.headers['content-length'] = '0' - with set_http_connect(201, 201, 201): - resp = self.controller.PUT(req) + with set_http_connect(): + resp = req.get_response(self.app) self.assertEquals(resp.status_int, 400) + def test_PUT_connect_exceptions(self): + object_ring = self.app.get_object_ring(None) + self.app.sort_nodes = lambda n: n # disable shuffle + + def test_status_map(statuses, expected): + self.app._error_limiting = {} + req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT', + body='test body') + with set_http_connect(*statuses): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, expected) + + base_status = [201] * 3 + # test happy path + test_status_map(list(base_status), 201) + for i in range(3): + self.assertEqual(node_error_count( + self.app, object_ring.devs[i]), 0) + # single node errors and test isolation + for i in range(3): + status_list = list(base_status) + status_list[i] = 503 + test_status_map(status_list, 201) + for j in range(3): + self.assertEqual(node_error_count( + self.app, object_ring.devs[j]), 1 if j == i else 0) + # connect errors + test_status_map((201, Timeout(), 201, 201), 201) + self.assertEqual(node_error_count( + self.app, object_ring.devs[1]), 1) + test_status_map((Exception('kaboom!'), 201, 201, 201), 201) + self.assertEqual(node_error_count( + self.app, object_ring.devs[0]), 1) + # expect errors + test_status_map((201, 201, (503, None), 201), 201) + self.assertEqual(node_error_count( + self.app, object_ring.devs[2]), 1) + test_status_map(((507, None), 201, 201, 201), 201) + self.assertEqual( + node_error_count(self.app, object_ring.devs[0]), + self.app.error_suppression_limit + 1) + # response errors + test_status_map(((100, Timeout()), 201, 201), 201) + self.assertEqual( + node_error_count(self.app, object_ring.devs[0]), 1) + test_status_map((201, 201, (100, Exception())), 201) + self.assertEqual( + node_error_count(self.app, object_ring.devs[2]), 1) + test_status_map((201, (100, 507), 201), 201) + self.assertEqual( + node_error_count(self.app, object_ring.devs[1]), + self.app.error_suppression_limit + 1) + def test_GET_simple(self): req = swift.common.swob.Request.blank('/v1/a/c/o') with set_http_connect(200): - resp = self.controller.GET(req) + resp = req.get_response(self.app) self.assertEquals(resp.status_int, 200) - def test_DELETE_simple(self): + def test_GET_error(self): req = swift.common.swob.Request.blank('/v1/a/c/o') - with set_http_connect(204, 204, 204): - resp = self.controller.DELETE(req) - self.assertEquals(resp.status_int, 204) + with set_http_connect(503, 200): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 200) + + def test_GET_handoff(self): + req = swift.common.swob.Request.blank('/v1/a/c/o') + codes = [503] * self.obj_ring.replicas + [200] + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 200) - def test_POST_simple(self): + def test_GET_not_found(self): req = swift.common.swob.Request.blank('/v1/a/c/o') - with set_http_connect(200, 200, 200, 201, 201, 201): - resp = self.controller.POST(req) + codes = [404] * (self.obj_ring.replicas + + self.obj_ring.max_more_nodes) + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 404) + + def test_POST_as_COPY_simple(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='POST') + head_resp = [200] * self.obj_ring.replicas + \ + [404] * self.obj_ring.max_more_nodes + put_resp = [201] * self.obj_ring.replicas + codes = head_resp + put_resp + with set_http_connect(*codes): + resp = req.get_response(self.app) self.assertEquals(resp.status_int, 202) + def test_POST_delete_at(self): + t = str(int(time.time() + 100)) + req = swob.Request.blank('/v1/a/c/o', method='POST', + headers={'Content-Type': 'foo/bar', + 'X-Delete-At': t}) + post_headers = [] + + def capture_headers(ip, port, device, part, method, path, headers, + **kwargs): + if method == 'POST': + post_headers.append(headers) + x_newest_responses = [200] * self.obj_ring.replicas + \ + [404] * self.obj_ring.max_more_nodes + post_resp = [200] * self.obj_ring.replicas + codes = x_newest_responses + post_resp + with set_http_connect(*codes, give_connect=capture_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 200) + for given_headers in post_headers: + self.assertEquals(given_headers.get('X-Delete-At'), t) + self.assertTrue('X-Delete-At-Host' in given_headers) + self.assertTrue('X-Delete-At-Device' in given_headers) + self.assertTrue('X-Delete-At-Partition' in given_headers) + self.assertTrue('X-Delete-At-Container' in given_headers) + + def test_PUT_delete_at(self): + t = str(int(time.time() + 100)) + req = swob.Request.blank('/v1/a/c/o', method='PUT', body='', + headers={'Content-Type': 'foo/bar', + 'X-Delete-At': t}) + put_headers = [] + + def capture_headers(ip, port, device, part, method, path, headers, + **kwargs): + if method == 'PUT': + put_headers.append(headers) + codes = [201] * self.obj_ring.replicas + with set_http_connect(*codes, give_connect=capture_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 201) + for given_headers in put_headers: + self.assertEquals(given_headers.get('X-Delete-At'), t) + self.assertTrue('X-Delete-At-Host' in given_headers) + self.assertTrue('X-Delete-At-Device' in given_headers) + self.assertTrue('X-Delete-At-Partition' in given_headers) + self.assertTrue('X-Delete-At-Container' in given_headers) + + def test_PUT_converts_delete_after_to_delete_at(self): + req = swob.Request.blank('/v1/a/c/o', method='PUT', body='', + headers={'Content-Type': 'foo/bar', + 'X-Delete-After': '60'}) + put_headers = [] + + def capture_headers(ip, port, device, part, method, path, headers, + **kwargs): + if method == 'PUT': + put_headers.append(headers) + codes = [201] * self.obj_ring.replicas + t = time.time() + with set_http_connect(*codes, give_connect=capture_headers): + with mock.patch('time.time', lambda: t): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 201) + expected_delete_at = str(int(t) + 60) + for given_headers in put_headers: + self.assertEquals(given_headers.get('X-Delete-At'), + expected_delete_at) + self.assertTrue('X-Delete-At-Host' in given_headers) + self.assertTrue('X-Delete-At-Device' in given_headers) + self.assertTrue('X-Delete-At-Partition' in given_headers) + self.assertTrue('X-Delete-At-Container' in given_headers) + + def test_container_sync_put_x_timestamp_not_found(self): + test_indexes = [None] + [int(p) for p in POLICIES] + for policy_index in test_indexes: + self.app.container_info['storage_policy'] = policy_index + put_timestamp = utils.Timestamp(time.time()).normal + req = swob.Request.blank( + '/v1/a/c/o', method='PUT', headers={ + 'Content-Length': 0, + 'X-Timestamp': put_timestamp}) + codes = [201] * self.obj_ring.replicas + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_container_sync_put_x_timestamp_match(self): + test_indexes = [None] + [int(p) for p in POLICIES] + for policy_index in test_indexes: + self.app.container_info['storage_policy'] = policy_index + put_timestamp = utils.Timestamp(time.time()).normal + req = swob.Request.blank( + '/v1/a/c/o', method='PUT', headers={ + 'Content-Length': 0, + 'X-Timestamp': put_timestamp}) + ts_iter = itertools.repeat(put_timestamp) + codes = [409] * self.obj_ring.replicas + with set_http_connect(*codes, timestamps=ts_iter): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 202) + + def test_container_sync_put_x_timestamp_older(self): + ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + test_indexes = [None] + [int(p) for p in POLICIES] + for policy_index in test_indexes: + self.app.container_info['storage_policy'] = policy_index + req = swob.Request.blank( + '/v1/a/c/o', method='PUT', headers={ + 'Content-Length': 0, + 'X-Timestamp': ts.next().internal}) + ts_iter = itertools.repeat(ts.next().internal) + codes = [409] * self.obj_ring.replicas + with set_http_connect(*codes, timestamps=ts_iter): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 202) + + def test_container_sync_put_x_timestamp_newer(self): + ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + test_indexes = [None] + [int(p) for p in POLICIES] + for policy_index in test_indexes: + orig_timestamp = ts.next().internal + req = swob.Request.blank( + '/v1/a/c/o', method='PUT', headers={ + 'Content-Length': 0, + 'X-Timestamp': ts.next().internal}) + ts_iter = itertools.repeat(orig_timestamp) + codes = [201] * self.obj_ring.replicas + with set_http_connect(*codes, timestamps=ts_iter): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_put_x_timestamp_conflict(self): + ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + req = swob.Request.blank( + '/v1/a/c/o', method='PUT', headers={ + 'Content-Length': 0, + 'X-Timestamp': ts.next().internal}) + ts_iter = iter([ts.next().internal, None, None]) + codes = [409] + [201] * (self.obj_ring.replicas - 1) + with set_http_connect(*codes, timestamps=ts_iter): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 202) + + def test_container_sync_put_x_timestamp_race(self): + ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + test_indexes = [None] + [int(p) for p in POLICIES] + for policy_index in test_indexes: + put_timestamp = ts.next().internal + req = swob.Request.blank( + '/v1/a/c/o', method='PUT', headers={ + 'Content-Length': 0, + 'X-Timestamp': put_timestamp}) + + # object nodes they respond 409 because another in-flight request + # finished and now the on disk timestamp is equal to the request. + put_ts = [put_timestamp] * self.obj_ring.replicas + codes = [409] * self.obj_ring.replicas + + ts_iter = iter(put_ts) + with set_http_connect(*codes, timestamps=ts_iter): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 202) + + def test_container_sync_put_x_timestamp_unsynced_race(self): + ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) + test_indexes = [None] + [int(p) for p in POLICIES] + for policy_index in test_indexes: + put_timestamp = ts.next().internal + req = swob.Request.blank( + '/v1/a/c/o', method='PUT', headers={ + 'Content-Length': 0, + 'X-Timestamp': put_timestamp}) + + # only one in-flight request finished + put_ts = [None] * (self.obj_ring.replicas - 1) + put_resp = [201] * (self.obj_ring.replicas - 1) + put_ts += [put_timestamp] + put_resp += [409] + + ts_iter = iter(put_ts) + codes = put_resp + with set_http_connect(*codes, timestamps=ts_iter): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 202) + def test_COPY_simple(self): + req = swift.common.swob.Request.blank( + '/v1/a/c/o', method='COPY', + headers={'Content-Length': 0, + 'Destination': 'c/o-copy'}) + head_resp = [200] * self.obj_ring.replicas + \ + [404] * self.obj_ring.max_more_nodes + put_resp = [201] * self.obj_ring.replicas + codes = head_resp + put_resp + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 201) + + def test_PUT_log_info(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT') + req.headers['x-copy-from'] = 'some/where' + req.headers['Content-Length'] = 0 + # override FakeConn default resp headers to keep log_info clean + resp_headers = {'x-delete-at': None} + head_resp = [200] * self.obj_ring.replicas + \ + [404] * self.obj_ring.max_more_nodes + put_resp = [201] * self.obj_ring.replicas + codes = head_resp + put_resp + with set_http_connect(*codes, headers=resp_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + self.assertEquals( + req.environ.get('swift.log_info'), ['x-copy-from:some/where']) + # and then check that we don't do that for originating POSTs req = swift.common.swob.Request.blank('/v1/a/c/o') - with set_http_connect(200, 200, 200, 201, 201, 201): - resp = self.controller.POST(req) - self.assertEquals(resp.status_int, 202) + req.method = 'POST' + req.headers['x-copy-from'] = 'else/where' + with set_http_connect(*codes, headers=resp_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 202) + self.assertEquals(req.environ.get('swift.log_info'), None) - def test_HEAD_simple(self): + +@patch_policies(legacy_only=True) +class TestObjControllerLegacyCache(TestReplicatedObjController): + """ + This test pretends like memcache returned a stored value that should + resemble whatever "old" format. It catches KeyErrors you'd get if your + code was expecting some new format during a rolling upgrade. + """ + + # in this case policy_index is missing + container_info = { + 'read_acl': None, + 'write_acl': None, + 'sync_key': None, + 'versions': None, + } + + def test_invalid_storage_policy_cache(self): + self.app.container_info['storage_policy'] = 1 + for method in ('GET', 'HEAD', 'POST', 'PUT', 'COPY'): + req = swob.Request.blank('/v1/a/c/o', method=method) + with set_http_connect(): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + + +@patch_policies(with_ec_default=True) +class TestECObjController(BaseObjectControllerMixin, unittest.TestCase): + container_info = { + 'read_acl': None, + 'write_acl': None, + 'sync_key': None, + 'versions': None, + 'storage_policy': '0', + } + + controller_cls = obj.ECObjectController + + def test_determine_chunk_destinations(self): + class FakePutter(object): + def __init__(self, index): + self.node_index = index + + controller = self.controller_cls( + self.app, 'a', 'c', 'o') + + # create a dummy list of putters, check no handoffs + putters = [] + for index in range(0, 4): + putters.append(FakePutter(index)) + got = controller._determine_chunk_destinations(putters) + expected = {} + for i, p in enumerate(putters): + expected[p] = i + self.assertEquals(got, expected) + + # now lets make a handoff at the end + putters[3].node_index = None + got = controller._determine_chunk_destinations(putters) + self.assertEquals(got, expected) + putters[3].node_index = 3 + + # now lets make a handoff at the start + putters[0].node_index = None + got = controller._determine_chunk_destinations(putters) + self.assertEquals(got, expected) + putters[0].node_index = 0 + + # now lets make a handoff in the middle + putters[2].node_index = None + got = controller._determine_chunk_destinations(putters) + self.assertEquals(got, expected) + putters[2].node_index = 0 + + # now lets make all of them handoffs + for index in range(0, 4): + putters[index].node_index = None + got = controller._determine_chunk_destinations(putters) + self.assertEquals(got, expected) + + def test_GET_simple(self): req = swift.common.swob.Request.blank('/v1/a/c/o') - with set_http_connect(200, 200, 200, 201, 201, 201): - resp = self.controller.POST(req) - self.assertEquals(resp.status_int, 202) + get_resp = [200] * self.policy.ec_ndata + with set_http_connect(*get_resp): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 200) - def test_PUT_log_info(self): - # mock out enough to get to the area of the code we want to test - with mock.patch('swift.proxy.controllers.obj.check_object_creation', - mock.MagicMock(return_value=None)): - req = swift.common.swob.Request.blank('/v1/a/c/o') - req.headers['x-copy-from'] = 'somewhere' - try: - self.controller.PUT(req) - except HTTPException: - pass - self.assertEquals( - req.environ.get('swift.log_info'), ['x-copy-from:somewhere']) - # and then check that we don't do that for originating POSTs - req = swift.common.swob.Request.blank('/v1/a/c/o') - req.method = 'POST' - req.headers['x-copy-from'] = 'elsewhere' - try: - self.controller.PUT(req) - except HTTPException: - pass - self.assertEquals(req.environ.get('swift.log_info'), None) + def test_GET_simple_x_newest(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', + headers={'X-Newest': 'true'}) + codes = [200] * self.replicas() + codes += [404] * self.obj_ring.max_more_nodes + with set_http_connect(*codes): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 200) + + def test_GET_error(self): + req = swift.common.swob.Request.blank('/v1/a/c/o') + get_resp = [503] + [200] * self.policy.ec_ndata + with set_http_connect(*get_resp): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 200) + + def test_GET_with_body(self): + req = swift.common.swob.Request.blank('/v1/a/c/o') + # turn a real body into fragments + segment_size = self.policy.ec_segment_size + real_body = ('asdf' * segment_size)[:-10] + # split it up into chunks + chunks = [real_body[x:x + segment_size] + for x in range(0, len(real_body), segment_size)] + fragment_payloads = [] + for chunk in chunks: + fragments = self.policy.pyeclib_driver.encode(chunk) + if not fragments: + break + fragment_payloads.append(fragments) + # sanity + sanity_body = '' + for fragment_payload in fragment_payloads: + sanity_body += self.policy.pyeclib_driver.decode( + fragment_payload) + self.assertEqual(len(real_body), len(sanity_body)) + self.assertEqual(real_body, sanity_body) + + node_fragments = zip(*fragment_payloads) + self.assertEqual(len(node_fragments), self.replicas()) # sanity + responses = [(200, ''.join(node_fragments[i]), {}) + for i in range(POLICIES.default.ec_ndata)] + status_codes, body_iter, headers = zip(*responses) + with set_http_connect(*status_codes, body_iter=body_iter, + headers=headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 200) + self.assertEqual(len(real_body), len(resp.body)) + self.assertEqual(real_body, resp.body) + + def test_PUT_simple(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [201] * self.replicas() + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 201) + + def test_PUT_with_explicit_commit_status(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [(100, 100, 201)] * self.replicas() + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 201) + + def test_PUT_error(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [503] * self.replicas() + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 503) + + def test_PUT_mostly_success(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [201] * self.quorum() + codes += [503] * (self.replicas() - len(codes)) + random.shuffle(codes) + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 201) + + def test_PUT_error_commit(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [(100, 503, Exception('not used'))] * self.replicas() + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 503) + + def test_PUT_mostly_success_commit(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [201] * self.quorum() + codes += [(100, 503, Exception('not used'))] * ( + self.replicas() - len(codes)) + random.shuffle(codes) + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 201) + + def test_PUT_mostly_error_commit(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [(100, 503, Exception('not used'))] * self.quorum() + codes += [201] * (self.replicas() - len(codes)) + random.shuffle(codes) + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 503) + + def test_PUT_commit_timeout(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [201] * (self.replicas() - 1) + codes.append((100, Timeout(), Exception('not used'))) + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 201) + + def test_PUT_commit_exception(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + codes = [201] * (self.replicas() - 1) + codes.append((100, Exception('kaboom!'), Exception('not used'))) + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 201) + + def test_PUT_with_body(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT') + segment_size = self.policy.ec_segment_size + test_body = ('asdf' * segment_size)[:-10] + etag = md5(test_body).hexdigest() + size = len(test_body) + req.body = test_body + codes = [201] * self.replicas() + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + + put_requests = defaultdict(lambda: {'boundary': None, 'chunks': []}) + + def capture_body(conn_id, chunk): + put_requests[conn_id]['chunks'].append(chunk) + + def capture_headers(ip, port, device, part, method, path, headers, + **kwargs): + conn_id = kwargs['connection_id'] + put_requests[conn_id]['boundary'] = headers[ + 'X-Backend-Obj-Multipart-Mime-Boundary'] + + with set_http_connect(*codes, expect_headers=expect_headers, + give_send=capture_body, + give_connect=capture_headers): + resp = req.get_response(self.app) + + self.assertEquals(resp.status_int, 201) + frag_archives = [] + for connection_id, info in put_requests.items(): + body = unchunk_body(''.join(info['chunks'])) + self.assertTrue(info['boundary'] is not None, + "didn't get boundary for conn %r" % ( + connection_id,)) + + # email.parser.FeedParser doesn't know how to take a multipart + # message and boundary together and parse it; it only knows how + # to take a string, parse the headers, and figure out the + # boundary on its own. + parser = email.parser.FeedParser() + parser.feed( + "Content-Type: multipart/nobodycares; boundary=%s\r\n\r\n" % + info['boundary']) + parser.feed(body) + message = parser.close() + + self.assertTrue(message.is_multipart()) # sanity check + mime_parts = message.get_payload() + self.assertEqual(len(mime_parts), 3) + obj_part, footer_part, commit_part = mime_parts + + # attach the body to frag_archives list + self.assertEqual(obj_part['X-Document'], 'object body') + frag_archives.append(obj_part.get_payload()) + + # validate some footer metadata + self.assertEqual(footer_part['X-Document'], 'object metadata') + footer_metadata = json.loads(footer_part.get_payload()) + self.assertTrue(footer_metadata) + expected = { + 'X-Object-Sysmeta-EC-Content-Length': str(size), + 'X-Backend-Container-Update-Override-Size': str(size), + 'X-Object-Sysmeta-EC-Etag': etag, + 'X-Backend-Container-Update-Override-Etag': etag, + 'X-Object-Sysmeta-EC-Segment-Size': str(segment_size), + } + for header, value in expected.items(): + self.assertEqual(footer_metadata[header], value) + + # sanity on commit message + self.assertEqual(commit_part['X-Document'], 'put commit') + + self.assertEqual(len(frag_archives), self.replicas()) + fragment_size = self.policy.fragment_size + node_payloads = [] + for fa in frag_archives: + payload = [fa[x:x + fragment_size] + for x in range(0, len(fa), fragment_size)] + node_payloads.append(payload) + fragment_payloads = zip(*node_payloads) + + expected_body = '' + for fragment_payload in fragment_payloads: + self.assertEqual(len(fragment_payload), self.replicas()) + if True: + fragment_payload = list(fragment_payload) + expected_body += self.policy.pyeclib_driver.decode( + fragment_payload) + + self.assertEqual(len(test_body), len(expected_body)) + self.assertEqual(test_body, expected_body) + + def test_PUT_old_obj_server(self): + req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT', + body='') + responses = [ + # one server will response 100-continue but not include the + # needful expect headers and the connection will be dropped + ((100, Exception('not used')), {}), + ] + [ + # and pleanty of successful responses too + (201, { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes', + }), + ] * self.replicas() + random.shuffle(responses) + if responses[-1][0] != 201: + # whoops, stupid random + responses = responses[1:] + [responses[0]] + codes, expect_headers = zip(*responses) + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEquals(resp.status_int, 201) + + def test_COPY_cross_policy_type_from_replicated(self): + self.app.per_container_info = { + 'c1': self.app.container_info.copy(), + 'c2': self.app.container_info.copy(), + } + # make c2 use replicated storage policy 1 + self.app.per_container_info['c2']['storage_policy'] = '1' + + # a put request with copy from source c2 + req = swift.common.swob.Request.blank('/v1/a/c1/o', method='PUT', + body='', headers={ + 'X-Copy-From': 'c2/o'}) + + # c2 get + codes = [200] * self.replicas(POLICIES[1]) + codes += [404] * POLICIES[1].object_ring.max_more_nodes + # c1 put + codes += [201] * self.replicas() + expect_headers = { + 'X-Obj-Metadata-Footer': 'yes', + 'X-Obj-Multiphase-Commit': 'yes' + } + with set_http_connect(*codes, expect_headers=expect_headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_COPY_cross_policy_type_to_replicated(self): + self.app.per_container_info = { + 'c1': self.app.container_info.copy(), + 'c2': self.app.container_info.copy(), + } + # make c1 use replicated storage policy 1 + self.app.per_container_info['c1']['storage_policy'] = '1' + + # a put request with copy from source c2 + req = swift.common.swob.Request.blank('/v1/a/c1/o', method='PUT', + body='', headers={ + 'X-Copy-From': 'c2/o'}) + + # c2 get + codes = [200] * self.replicas() + codes += [404] * self.obj_ring.max_more_nodes + headers = { + 'X-Object-Sysmeta-Ec-Content-Length': 0, + } + # c1 put + codes += [201] * self.replicas(POLICIES[1]) + with set_http_connect(*codes, headers=headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 201) + + def test_COPY_cross_policy_type_unknown(self): + self.app.per_container_info = { + 'c1': self.app.container_info.copy(), + 'c2': self.app.container_info.copy(), + } + # make c1 use some made up storage policy index + self.app.per_container_info['c1']['storage_policy'] = '13' + + # a COPY request of c2 with destination in c1 + req = swift.common.swob.Request.blank('/v1/a/c2/o', method='COPY', + body='', headers={ + 'Destination': 'c1/o'}) + with set_http_connect(): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 503) + + def _make_ec_archive_bodies(self, test_body, policy=None): + policy = policy or self.policy + segment_size = policy.ec_segment_size + # split up the body into buffers + chunks = [test_body[x:x + segment_size] + for x in range(0, len(test_body), segment_size)] + # encode the buffers into fragment payloads + fragment_payloads = [] + for chunk in chunks: + fragments = self.policy.pyeclib_driver.encode(chunk) + if not fragments: + break + fragment_payloads.append(fragments) + + # join up the fragment payloads per node + ec_archive_bodies = [''.join(fragments) + for fragments in zip(*fragment_payloads)] + return ec_archive_bodies + + def test_GET_mismatched_fragment_archives(self): + segment_size = self.policy.ec_segment_size + test_data1 = ('test' * segment_size)[:-333] + # N.B. the object data *length* here is different + test_data2 = ('blah1' * segment_size)[:-333] + + etag1 = md5(test_data1).hexdigest() + etag2 = md5(test_data2).hexdigest() + + ec_archive_bodies1 = self._make_ec_archive_bodies(test_data1) + ec_archive_bodies2 = self._make_ec_archive_bodies(test_data2) + + headers1 = {'X-Object-Sysmeta-Ec-Etag': etag1} + # here we're going to *lie* and say the etag here matches + headers2 = {'X-Object-Sysmeta-Ec-Etag': etag1} + + responses1 = [(200, body, headers1) + for body in ec_archive_bodies1] + responses2 = [(200, body, headers2) + for body in ec_archive_bodies2] + + req = swob.Request.blank('/v1/a/c/o') + + # sanity check responses1 + responses = responses1[:self.policy.ec_ndata] + status_codes, body_iter, headers = zip(*responses) + with set_http_connect(*status_codes, body_iter=body_iter, + headers=headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(md5(resp.body).hexdigest(), etag1) + + # sanity check responses2 + responses = responses2[:self.policy.ec_ndata] + status_codes, body_iter, headers = zip(*responses) + with set_http_connect(*status_codes, body_iter=body_iter, + headers=headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(md5(resp.body).hexdigest(), etag2) + + # now mix the responses a bit + mix_index = random.randint(0, self.policy.ec_ndata - 1) + mixed_responses = responses1[:self.policy.ec_ndata] + mixed_responses[mix_index] = responses2[mix_index] + + status_codes, body_iter, headers = zip(*mixed_responses) + with set_http_connect(*status_codes, body_iter=body_iter, + headers=headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + try: + resp.body + except ECDriverError: + pass + else: + self.fail('invalid ec fragment response body did not blow up!') + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(1, len(error_lines)) + msg = error_lines[0] + self.assertTrue('Error decoding fragments' in msg) + self.assertTrue('/a/c/o' in msg) + log_msg_args, log_msg_kwargs = self.logger.log_dict['error'][0] + self.assertEqual(log_msg_kwargs['exc_info'][0], ECDriverError) + + def test_GET_read_timeout(self): + segment_size = self.policy.ec_segment_size + test_data = ('test' * segment_size)[:-333] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + headers = {'X-Object-Sysmeta-Ec-Etag': etag} + self.app.recoverable_node_timeout = 0.01 + responses = [(200, SlowBody(body, 0.1), headers) + for body in ec_archive_bodies] + + req = swob.Request.blank('/v1/a/c/o') + + status_codes, body_iter, headers = zip(*responses + [ + (404, '', {}) for i in range( + self.policy.object_ring.max_more_nodes)]) + with set_http_connect(*status_codes, body_iter=body_iter, + headers=headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + # do this inside the fake http context manager, it'll try to + # resume but won't be able to give us all the right bytes + self.assertNotEqual(md5(resp.body).hexdigest(), etag) + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(self.replicas(), len(error_lines)) + nparity = self.policy.ec_nparity + for line in error_lines[:nparity]: + self.assertTrue('retrying' in line) + for line in error_lines[nparity:]: + self.assertTrue('ChunkReadTimeout (0.01s)' in line) + + def test_GET_read_timeout_resume(self): + segment_size = self.policy.ec_segment_size + test_data = ('test' * segment_size)[:-333] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + headers = {'X-Object-Sysmeta-Ec-Etag': etag} + self.app.recoverable_node_timeout = 0.05 + # first one is slow + responses = [(200, SlowBody(ec_archive_bodies[0], 0.1), headers)] + # ... the rest are fine + responses += [(200, body, headers) + for body in ec_archive_bodies[1:]] + + req = swob.Request.blank('/v1/a/c/o') + + status_codes, body_iter, headers = zip( + *responses[:self.policy.ec_ndata + 1]) + with set_http_connect(*status_codes, body_iter=body_iter, + headers=headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertTrue(md5(resp.body).hexdigest(), etag) + error_lines = self.logger.get_lines_for_level('error') + self.assertEqual(1, len(error_lines)) + self.assertTrue('retrying' in error_lines[0]) if __name__ == '__main__': |