diff options
Diffstat (limited to 'test/functional/test_object.py')
-rwxr-xr-x | test/functional/test_object.py | 579 |
1 files changed, 490 insertions, 89 deletions
diff --git a/test/functional/test_object.py b/test/functional/test_object.py index e74a7f6..f23ccbc 100755 --- a/test/functional/test_object.py +++ b/test/functional/test_object.py @@ -15,18 +15,29 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest -from nose import SkipTest +import datetime +import json +import unittest2 +from unittest2 import SkipTest from uuid import uuid4 +import time -from swift.common.utils import json +from six.moves import range from test.functional import check_response, retry, requires_acls, \ requires_policies import test.functional as tf -class TestObject(unittest.TestCase): +def setUpModule(): + tf.setup_package() + + +def tearDownModule(): + tf.teardown_package() + + +class TestObject(unittest2.TestCase): def setUp(self): if tf.skip: @@ -62,6 +73,20 @@ class TestObject(unittest.TestCase): resp = retry(put, name, use_account=use_account) resp.read() self.assertEqual(resp.status, 201) + + # With keystoneauth we need the accounts to have had the project + # domain id persisted as sysmeta prior to testing ACLs. This may + # not be the case if, for example, the account was created using + # a request with reseller_admin role, when project domain id may + # not have been known. So we ensure that the project domain id is + # in sysmeta by making a POST to the accounts using an admin role. + def post(url, token, parsed, conn): + conn.request('POST', parsed.path, '', {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(post, use_account=use_account) + resp.read() + self.assertEqual(resp.status, 204) + return name def tearDown(self): @@ -88,14 +113,14 @@ class TestObject(unittest.TestCase): body = resp.read() if resp.status == 404: break - self.assert_(resp.status // 100 == 2, resp.status) + self.assertEqual(resp.status // 100, 2, resp.status) objs = json.loads(body) if not objs: break for obj in objs: resp = retry(delete, container, obj) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) # delete the container def delete(url, token, parsed, conn, name): @@ -106,7 +131,146 @@ class TestObject(unittest.TestCase): for container in self.containers: resp = retry(delete, container) resp.read() - self.assert_(resp.status in (204, 404)) + self.assertIn(resp.status, (204, 404)) + + def test_metadata(self): + obj = 'test_metadata' + req_metadata = {} + + def put(url, token, parsed, conn): + headers = {'X-Auth-Token': token} + headers.update(req_metadata) + conn.request('PUT', '%s/%s/%s' % ( + parsed.path, self.container, obj + ), '', headers) + return check_response(conn) + + def get(url, token, parsed, conn): + conn.request( + 'GET', + '%s/%s/%s' % (parsed.path, self.container, obj), + '', + {'X-Auth-Token': token}) + return check_response(conn) + + def post(url, token, parsed, conn): + headers = {'X-Auth-Token': token} + headers.update(req_metadata) + conn.request('POST', '%s/%s/%s' % ( + parsed.path, self.container, obj + ), '', headers) + return check_response(conn) + + def metadata(resp): + metadata = {} + for k, v in resp.headers.items(): + if 'meta' in k.lower(): + metadata[k] = v + return metadata + + # empty put + resp = retry(put) + resp.read() + self.assertEqual(resp.status, 201) + resp = retry(get) + self.assertEqual('', resp.read()) + self.assertEqual(resp.status, 200) + self.assertEqual(metadata(resp), {}) + # empty post + resp = retry(post) + resp.read() + self.assertEqual(resp.status, 202) + resp = retry(get) + self.assertEqual('', resp.read()) + self.assertEqual(resp.status, 200) + self.assertEqual(metadata(resp), {}) + + # metadata put + req_metadata = { + 'x-object-meta-Color': 'blUe', + 'X-Object-Meta-food': 'PizZa', + } + resp = retry(put) + resp.read() + self.assertEqual(resp.status, 201) + resp = retry(get) + self.assertEqual('', resp.read()) + self.assertEqual(resp.status, 200) + self.assertEqual(metadata(resp), { + 'X-Object-Meta-Color': 'blUe', + 'X-Object-Meta-Food': 'PizZa', + }) + # metadata post + req_metadata = {'X-Object-Meta-color': 'oraNge'} + resp = retry(post) + resp.read() + self.assertEqual(resp.status, 202) + resp = retry(get) + self.assertEqual('', resp.read()) + self.assertEqual(resp.status, 200) + self.assertEqual(metadata(resp), { + 'X-Object-Meta-Color': 'oraNge' + }) + + # sysmeta put + req_metadata = { + 'X-Object-Meta-Color': 'Red', + 'X-Object-Sysmeta-Color': 'Green', + 'X-Object-Transient-Sysmeta-Color': 'Blue', + } + resp = retry(put) + resp.read() + self.assertEqual(resp.status, 201) + resp = retry(get) + self.assertEqual('', resp.read()) + self.assertEqual(resp.status, 200) + self.assertEqual(metadata(resp), { + 'X-Object-Meta-Color': 'Red', + }) + # sysmeta post + req_metadata = { + 'X-Object-Meta-Food': 'Burger', + 'X-Object-Meta-Animal': 'Cat', + 'X-Object-Sysmeta-Animal': 'Cow', + 'X-Object-Transient-Sysmeta-Food': 'Burger', + } + resp = retry(post) + resp.read() + self.assertEqual(resp.status, 202) + resp = retry(get) + self.assertEqual('', resp.read()) + self.assertEqual(resp.status, 200) + self.assertEqual(metadata(resp), { + 'X-Object-Meta-Food': 'Burger', + 'X-Object-Meta-Animal': 'Cat', + }) + + # non-ascii put + req_metadata = { + 'X-Object-Meta-Foo': u'B\u00e2r', + } + resp = retry(put) + resp.read() + self.assertEqual(resp.status, 201) + resp = retry(get) + self.assertEqual('', resp.read()) + self.assertEqual(resp.status, 200) + self.assertEqual(metadata(resp), { + 'X-Object-Meta-Foo': 'B\xc3\xa2r', + }) + # non-ascii post + req_metadata = { + 'X-Object-Meta-Foo': u'B\u00e5z', + } + resp = retry(post) + resp.read() + self.assertEqual(resp.status, 202) + resp = retry(get) + self.assertEqual('', resp.read()) + self.assertEqual(resp.status, 200) + self.assertEqual(metadata(resp), { + 'X-Object-Meta-Foo': 'B\xc3\xa5z', + }) def test_if_none_match(self): def put(url, token, parsed, conn): @@ -118,10 +282,10 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(put) resp.read() - self.assertEquals(resp.status, 201) + self.assertEqual(resp.status, 201) resp = retry(put) resp.read() - self.assertEquals(resp.status, 412) + self.assertEqual(resp.status, 412) def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % ( @@ -132,7 +296,148 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(put) resp.read() - self.assertEquals(resp.status, 400) + self.assertEqual(resp.status, 400) + + def test_too_small_x_timestamp(self): + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container, + 'too_small_x_timestamp'), + '', {'X-Auth-Token': token, + 'Content-Length': '0', + 'X-Timestamp': '-1'}) + return check_response(conn) + + def head(url, token, parsed, conn): + conn.request('HEAD', '%s/%s/%s' % (parsed.path, self.container, + 'too_small_x_timestamp'), + '', {'X-Auth-Token': token, + 'Content-Length': '0'}) + return check_response(conn) + ts_before = time.time() + resp = retry(put) + body = resp.read() + ts_after = time.time() + if resp.status == 400: + # shunt_inbound_x_timestamp must be false + self.assertIn( + 'X-Timestamp should be a UNIX timestamp float value', body) + else: + self.assertEqual(resp.status, 201) + self.assertEqual(body, '') + resp = retry(head) + resp.read() + self.assertGreater(float(resp.headers['x-timestamp']), ts_before) + self.assertLess(float(resp.headers['x-timestamp']), ts_after) + + def test_too_big_x_timestamp(self): + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container, + 'too_big_x_timestamp'), + '', {'X-Auth-Token': token, + 'Content-Length': '0', + 'X-Timestamp': '99999999999.9999999999'}) + return check_response(conn) + + def head(url, token, parsed, conn): + conn.request('HEAD', '%s/%s/%s' % (parsed.path, self.container, + 'too_big_x_timestamp'), + '', {'X-Auth-Token': token, + 'Content-Length': '0'}) + return check_response(conn) + ts_before = time.time() + resp = retry(put) + body = resp.read() + ts_after = time.time() + if resp.status == 400: + # shunt_inbound_x_timestamp must be false + self.assertIn( + 'X-Timestamp should be a UNIX timestamp float value', body) + else: + self.assertEqual(resp.status, 201) + self.assertEqual(body, '') + resp = retry(head) + resp.read() + self.assertGreater(float(resp.headers['x-timestamp']), ts_before) + self.assertLess(float(resp.headers['x-timestamp']), ts_after) + + def test_x_delete_after(self): + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container, + 'x_delete_after'), + '', {'X-Auth-Token': token, + 'Content-Length': '0', + 'X-Delete-After': '1'}) + return check_response(conn) + resp = retry(put) + resp.read() + self.assertEqual(resp.status, 201) + + def get(url, token, parsed, conn): + conn.request( + 'GET', + '%s/%s/%s' % (parsed.path, self.container, 'x_delete_after'), + '', + {'X-Auth-Token': token}) + return check_response(conn) + + resp = retry(get) + resp.read() + count = 0 + while resp.status == 200 and count < 10: + resp = retry(get) + resp.read() + count += 1 + time.sleep(1) + + self.assertEqual(resp.status, 404) + + # To avoid an error when the object deletion in tearDown(), + # the object is added again. + resp = retry(put) + resp.read() + self.assertEqual(resp.status, 201) + + def test_x_delete_at(self): + def put(url, token, parsed, conn): + dt = datetime.datetime.now() + epoch = time.mktime(dt.timetuple()) + delete_time = str(int(epoch) + 3) + conn.request( + 'PUT', + '%s/%s/%s' % (parsed.path, self.container, 'x_delete_at'), + '', + {'X-Auth-Token': token, + 'Content-Length': '0', + 'X-Delete-At': delete_time}) + return check_response(conn) + resp = retry(put) + resp.read() + self.assertEqual(resp.status, 201) + + def get(url, token, parsed, conn): + conn.request( + 'GET', + '%s/%s/%s' % (parsed.path, self.container, 'x_delete_at'), + '', + {'X-Auth-Token': token}) + return check_response(conn) + + resp = retry(get) + resp.read() + count = 0 + while resp.status == 200 and count < 10: + resp = retry(get) + resp.read() + count += 1 + time.sleep(1) + + self.assertEqual(resp.status, 404) + + # To avoid an error when the object deletion in tearDown(), + # the object is added again. + resp = retry(put) + resp.read() + self.assertEqual(resp.status, 201) def test_non_integer_x_delete_after(self): def put(url, token, parsed, conn): @@ -144,7 +449,7 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(put) body = resp.read() - self.assertEquals(resp.status, 400) + self.assertEqual(resp.status, 400) self.assertEqual(body, 'Non-integer X-Delete-After') def test_non_integer_x_delete_at(self): @@ -157,7 +462,7 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(put) body = resp.read() - self.assertEquals(resp.status, 400) + self.assertEqual(resp.status, 400) self.assertEqual(body, 'Non-integer X-Delete-At') def test_x_delete_at_in_the_past(self): @@ -170,7 +475,7 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(put) body = resp.read() - self.assertEquals(resp.status, 400) + self.assertEqual(resp.status, 400) self.assertEqual(body, 'X-Delete-At in past') def test_copy_object(self): @@ -220,7 +525,7 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(delete) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) # verify dest does not exist resp = retry(get_dest) resp.read() @@ -242,10 +547,27 @@ class TestObject(unittest.TestCase): self.assertEqual(resp.status, 200) self.assertEqual(dest_contents, source_contents) + # copy source to dest with COPY and range + def copy(url, token, parsed, conn): + conn.request('COPY', '%s/%s' % (parsed.path, source), '', + {'X-Auth-Token': token, + 'Destination': dest, + 'Range': 'bytes=1-2'}) + return check_response(conn) + resp = retry(copy) + resp.read() + self.assertEqual(resp.status, 201) + + # contents of dest should be the same as source + resp = retry(get_dest) + dest_contents = resp.read() + self.assertEqual(resp.status, 200) + self.assertEqual(dest_contents, source_contents[1:3]) + # delete the copy resp = retry(delete) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) def test_copy_between_accounts(self): if tf.skip: @@ -311,7 +633,7 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(delete, use_account=2) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) # verify dest does not exist resp = retry(get_dest, use_account=2) resp.read() @@ -355,7 +677,7 @@ class TestObject(unittest.TestCase): # delete the copy resp = retry(delete, use_account=2) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) def test_public_object(self): if tf.skip: @@ -369,7 +691,7 @@ class TestObject(unittest.TestCase): resp = retry(get) raise Exception('Should not have been able to GET') except Exception as err: - self.assert_(str(err).startswith('No result after ')) + self.assertTrue(str(err).startswith('No result after ')) def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.container, '', @@ -394,7 +716,7 @@ class TestObject(unittest.TestCase): resp = retry(get) raise Exception('Should not have been able to GET') except Exception as err: - self.assert_(str(err).startswith('No result after ')) + self.assertTrue(str(err).startswith('No result after ')) def test_private_object(self): if tf.skip or tf.skip3: @@ -477,7 +799,7 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(delete) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) # clean up shared_container def delete(url, token, parsed, conn): @@ -487,7 +809,86 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(delete) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) + + def test_container_write_only(self): + if tf.skip or tf.skip3: + raise SkipTest + + # Ensure we can't access the object with the third account + def get(url, token, parsed, conn): + conn.request('GET', '%s/%s/%s' % ( + parsed.path, self.container, self.obj), '', + {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(get, use_account=3) + resp.read() + self.assertEqual(resp.status, 403) + + # create a shared container writable (but not readable) by account3 + shared_container = uuid4().hex + + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s' % ( + parsed.path, shared_container), '', + {'X-Auth-Token': token, + 'X-Container-Write': tf.swift_test_perm[2]}) + return check_response(conn) + resp = retry(put) + resp.read() + self.assertEqual(resp.status, 201) + + # verify third account can write "obj1" to shared container + def put(url, token, parsed, conn): + conn.request('PUT', '%s/%s/%s' % ( + parsed.path, shared_container, 'obj1'), 'test', + {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(put, use_account=3) + resp.read() + self.assertEqual(resp.status, 201) + + # verify third account cannot copy "obj1" to shared container + def copy(url, token, parsed, conn): + conn.request('COPY', '%s/%s/%s' % ( + parsed.path, shared_container, 'obj1'), '', + {'X-Auth-Token': token, + 'Destination': '%s/%s' % (shared_container, 'obj2')}) + return check_response(conn) + resp = retry(copy, use_account=3) + resp.read() + self.assertEqual(resp.status, 403) + + # verify third account can POST to "obj1" in shared container + def post(url, token, parsed, conn): + conn.request('POST', '%s/%s/%s' % ( + parsed.path, shared_container, 'obj1'), '', + {'X-Auth-Token': token, + 'X-Object-Meta-Color': 'blue'}) + return check_response(conn) + resp = retry(post, use_account=3) + resp.read() + self.assertEqual(resp.status, 202) + + # verify third account can DELETE from shared container + def delete(url, token, parsed, conn): + conn.request('DELETE', '%s/%s/%s' % ( + parsed.path, shared_container, 'obj1'), '', + {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(delete, use_account=3) + resp.read() + self.assertIn(resp.status, (204, 404)) + + # clean up shared_container + def delete(url, token, parsed, conn): + conn.request('DELETE', + parsed.path + '/' + shared_container, '', + {'X-Auth-Token': token}) + return check_response(conn) + resp = retry(delete) + resp.read() + self.assertIn(resp.status, (204, 404)) @requires_acls def test_read_only(self): @@ -525,12 +926,12 @@ class TestObject(unittest.TestCase): # cannot list objects resp = retry(get_listing, use_account=3) resp.read() - self.assertEquals(resp.status, 403) + self.assertEqual(resp.status, 403) # cannot get object resp = retry(get, self.obj, use_account=3) resp.read() - self.assertEquals(resp.status, 403) + self.assertEqual(resp.status, 403) # grant read-only access acl_user = tf.swift_test_user[2] @@ -543,32 +944,32 @@ class TestObject(unittest.TestCase): # can list objects resp = retry(get_listing, use_account=3) listing = resp.read() - self.assertEquals(resp.status, 200) - self.assert_(self.obj in listing) + self.assertEqual(resp.status, 200) + self.assertIn(self.obj, listing) # can get object resp = retry(get, self.obj, use_account=3) body = resp.read() - self.assertEquals(resp.status, 200) - self.assertEquals(body, 'test') + self.assertEqual(resp.status, 200) + self.assertEqual(body, 'test') # can not put an object obj_name = str(uuid4()) resp = retry(put, obj_name, use_account=3) body = resp.read() - self.assertEquals(resp.status, 403) + self.assertEqual(resp.status, 403) # can not delete an object resp = retry(delete, self.obj, use_account=3) body = resp.read() - self.assertEquals(resp.status, 403) + self.assertEqual(resp.status, 403) # sanity with account1 resp = retry(get_listing, use_account=3) listing = resp.read() - self.assertEquals(resp.status, 200) - self.assert_(obj_name not in listing) - self.assert_(self.obj in listing) + self.assertEqual(resp.status, 200) + self.assertNotIn(obj_name, listing) + self.assertIn(self.obj, listing) @requires_acls def test_read_write(self): @@ -606,12 +1007,12 @@ class TestObject(unittest.TestCase): # cannot list objects resp = retry(get_listing, use_account=3) resp.read() - self.assertEquals(resp.status, 403) + self.assertEqual(resp.status, 403) # cannot get object resp = retry(get, self.obj, use_account=3) resp.read() - self.assertEquals(resp.status, 403) + self.assertEqual(resp.status, 403) # grant read-write access acl_user = tf.swift_test_user[2] @@ -624,32 +1025,32 @@ class TestObject(unittest.TestCase): # can list objects resp = retry(get_listing, use_account=3) listing = resp.read() - self.assertEquals(resp.status, 200) - self.assert_(self.obj in listing) + self.assertEqual(resp.status, 200) + self.assertIn(self.obj, listing) # can get object resp = retry(get, self.obj, use_account=3) body = resp.read() - self.assertEquals(resp.status, 200) - self.assertEquals(body, 'test') + self.assertEqual(resp.status, 200) + self.assertEqual(body, 'test') # can put an object obj_name = str(uuid4()) resp = retry(put, obj_name, use_account=3) body = resp.read() - self.assertEquals(resp.status, 201) + self.assertEqual(resp.status, 201) # can delete an object resp = retry(delete, self.obj, use_account=3) body = resp.read() - self.assertEquals(resp.status, 204) + self.assertIn(resp.status, (204, 404)) # sanity with account1 resp = retry(get_listing, use_account=3) listing = resp.read() - self.assertEquals(resp.status, 200) - self.assert_(obj_name in listing) - self.assert_(self.obj not in listing) + self.assertEqual(resp.status, 200) + self.assertIn(obj_name, listing) + self.assertNotIn(self.obj, listing) @requires_acls def test_admin(self): @@ -687,12 +1088,12 @@ class TestObject(unittest.TestCase): # cannot list objects resp = retry(get_listing, use_account=3) resp.read() - self.assertEquals(resp.status, 403) + self.assertEqual(resp.status, 403) # cannot get object resp = retry(get, self.obj, use_account=3) resp.read() - self.assertEquals(resp.status, 403) + self.assertEqual(resp.status, 403) # grant admin access acl_user = tf.swift_test_user[2] @@ -705,32 +1106,32 @@ class TestObject(unittest.TestCase): # can list objects resp = retry(get_listing, use_account=3) listing = resp.read() - self.assertEquals(resp.status, 200) - self.assert_(self.obj in listing) + self.assertEqual(resp.status, 200) + self.assertIn(self.obj, listing) # can get object resp = retry(get, self.obj, use_account=3) body = resp.read() - self.assertEquals(resp.status, 200) - self.assertEquals(body, 'test') + self.assertEqual(resp.status, 200) + self.assertEqual(body, 'test') # can put an object obj_name = str(uuid4()) resp = retry(put, obj_name, use_account=3) body = resp.read() - self.assertEquals(resp.status, 201) + self.assertEqual(resp.status, 201) # can delete an object resp = retry(delete, self.obj, use_account=3) body = resp.read() - self.assertEquals(resp.status, 204) + self.assertIn(resp.status, (204, 404)) # sanity with account1 resp = retry(get_listing, use_account=3) listing = resp.read() - self.assertEquals(resp.status, 200) - self.assert_(obj_name in listing) - self.assert_(self.obj not in listing) + self.assertEqual(resp.status, 200) + self.assertIn(obj_name, listing) + self.assertNotIn(self.obj, listing) def test_manifest(self): if tf.skip: @@ -746,7 +1147,7 @@ class TestObject(unittest.TestCase): parsed.path, self.container, str(objnum)), segments1[objnum], {'X-Auth-Token': token}) return check_response(conn) - for objnum in xrange(len(segments1)): + for objnum in range(len(segments1)): resp = retry(put, objnum) resp.read() self.assertEqual(resp.status, 201) @@ -809,7 +1210,7 @@ class TestObject(unittest.TestCase): parsed.path, self.container, str(objnum)), segments2[objnum], {'X-Auth-Token': token}) return check_response(conn) - for objnum in xrange(len(segments2)): + for objnum in range(len(segments2)): resp = retry(put, objnum) resp.read() self.assertEqual(resp.status, 201) @@ -891,7 +1292,7 @@ class TestObject(unittest.TestCase): parsed.path, acontainer, str(objnum)), segments3[objnum], {'X-Auth-Token': token}) return check_response(conn) - for objnum in xrange(len(segments3)): + for objnum in range(len(segments3)): resp = retry(put, objnum) resp.read() self.assertEqual(resp.status, 201) @@ -958,7 +1359,7 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(delete, objnum) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) # Delete the third set of segments def delete(url, token, parsed, conn, objnum): @@ -966,10 +1367,10 @@ class TestObject(unittest.TestCase): parsed.path, acontainer, str(objnum)), '', {'X-Auth-Token': token}) return check_response(conn) - for objnum in xrange(len(segments3)): + for objnum in range(len(segments3)): resp = retry(delete, objnum) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) # Delete the second set of segments def delete(url, token, parsed, conn, objnum): @@ -977,10 +1378,10 @@ class TestObject(unittest.TestCase): parsed.path, self.container, str(objnum)), '', {'X-Auth-Token': token}) return check_response(conn) - for objnum in xrange(len(segments2)): + for objnum in range(len(segments2)): resp = retry(delete, objnum) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) # Delete the first set of segments def delete(url, token, parsed, conn, objnum): @@ -988,10 +1389,10 @@ class TestObject(unittest.TestCase): parsed.path, self.container, str(objnum)), '', {'X-Auth-Token': token}) return check_response(conn) - for objnum in xrange(len(segments1)): + for objnum in range(len(segments1)): resp = retry(delete, objnum) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) # Delete the extra container def delete(url, token, parsed, conn): @@ -1000,7 +1401,7 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(delete) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) def test_delete_content_type(self): if tf.skip: @@ -1020,7 +1421,7 @@ class TestObject(unittest.TestCase): return check_response(conn) resp = retry(delete) resp.read() - self.assertEqual(resp.status, 204) + self.assertIn(resp.status, (204, 404)) self.assertEqual(resp.getheader('Content-Type'), 'text/html; charset=UTF-8') @@ -1095,78 +1496,78 @@ class TestObject(unittest.TestCase): resp = retry(put_cors_cont, '*') resp.read() - self.assertEquals(resp.status // 100, 2) + self.assertEqual(resp.status // 100, 2) resp = retry(put_obj, 'cat') resp.read() - self.assertEquals(resp.status // 100, 2) + self.assertEqual(resp.status // 100, 2) resp = retry(check_cors, 'OPTIONS', 'cat', {'Origin': 'http://m.com'}) - self.assertEquals(resp.status, 401) + self.assertEqual(resp.status, 401) resp = retry(check_cors, 'OPTIONS', 'cat', {'Origin': 'http://m.com', 'Access-Control-Request-Method': 'GET'}) - self.assertEquals(resp.status, 200) + self.assertEqual(resp.status, 200) resp.read() headers = dict((k.lower(), v) for k, v in resp.getheaders()) - self.assertEquals(headers.get('access-control-allow-origin'), - '*') + self.assertEqual(headers.get('access-control-allow-origin'), + '*') resp = retry(check_cors, 'GET', 'cat', {'Origin': 'http://m.com'}) - self.assertEquals(resp.status, 200) + self.assertEqual(resp.status, 200) headers = dict((k.lower(), v) for k, v in resp.getheaders()) - self.assertEquals(headers.get('access-control-allow-origin'), - '*') + self.assertEqual(headers.get('access-control-allow-origin'), + '*') resp = retry(check_cors, 'GET', 'cat', {'Origin': 'http://m.com', 'X-Web-Mode': 'True'}) - self.assertEquals(resp.status, 200) + self.assertEqual(resp.status, 200) headers = dict((k.lower(), v) for k, v in resp.getheaders()) - self.assertEquals(headers.get('access-control-allow-origin'), - '*') + self.assertEqual(headers.get('access-control-allow-origin'), + '*') #################### resp = retry(put_cors_cont, 'http://secret.com') resp.read() - self.assertEquals(resp.status // 100, 2) + self.assertEqual(resp.status // 100, 2) resp = retry(check_cors, 'OPTIONS', 'cat', {'Origin': 'http://m.com', 'Access-Control-Request-Method': 'GET'}) resp.read() - self.assertEquals(resp.status, 401) + self.assertEqual(resp.status, 401) if strict_cors: resp = retry(check_cors, 'GET', 'cat', {'Origin': 'http://m.com'}) resp.read() - self.assertEquals(resp.status, 200) + self.assertEqual(resp.status, 200) headers = dict((k.lower(), v) for k, v in resp.getheaders()) - self.assertTrue('access-control-allow-origin' not in headers) + self.assertNotIn('access-control-allow-origin', headers) resp = retry(check_cors, 'GET', 'cat', {'Origin': 'http://secret.com'}) resp.read() - self.assertEquals(resp.status, 200) + self.assertEqual(resp.status, 200) headers = dict((k.lower(), v) for k, v in resp.getheaders()) - self.assertEquals(headers.get('access-control-allow-origin'), - 'http://secret.com') + self.assertEqual(headers.get('access-control-allow-origin'), + 'http://secret.com') else: resp = retry(check_cors, 'GET', 'cat', {'Origin': 'http://m.com'}) resp.read() - self.assertEquals(resp.status, 200) + self.assertEqual(resp.status, 200) headers = dict((k.lower(), v) for k, v in resp.getheaders()) - self.assertEquals(headers.get('access-control-allow-origin'), - 'http://m.com') + self.assertEqual(headers.get('access-control-allow-origin'), + 'http://m.com') @requires_policies def test_cross_policy_copy(self): @@ -1228,4 +1629,4 @@ class TestObject(unittest.TestCase): if __name__ == '__main__': - unittest.main() + unittest2.main() |