summaryrefslogtreecommitdiffstats
path: root/test/functional/test_object.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_object.py')
-rwxr-xr-xtest/functional/test_object.py579
1 files changed, 490 insertions, 89 deletions
diff --git a/test/functional/test_object.py b/test/functional/test_object.py
index e74a7f6..f23ccbc 100755
--- a/test/functional/test_object.py
+++ b/test/functional/test_object.py
@@ -15,18 +15,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import unittest
-from nose import SkipTest
+import datetime
+import json
+import unittest2
+from unittest2 import SkipTest
from uuid import uuid4
+import time
-from swift.common.utils import json
+from six.moves import range
from test.functional import check_response, retry, requires_acls, \
requires_policies
import test.functional as tf
-class TestObject(unittest.TestCase):
+def setUpModule():
+ tf.setup_package()
+
+
+def tearDownModule():
+ tf.teardown_package()
+
+
+class TestObject(unittest2.TestCase):
def setUp(self):
if tf.skip:
@@ -62,6 +73,20 @@ class TestObject(unittest.TestCase):
resp = retry(put, name, use_account=use_account)
resp.read()
self.assertEqual(resp.status, 201)
+
+ # With keystoneauth we need the accounts to have had the project
+ # domain id persisted as sysmeta prior to testing ACLs. This may
+ # not be the case if, for example, the account was created using
+ # a request with reseller_admin role, when project domain id may
+ # not have been known. So we ensure that the project domain id is
+ # in sysmeta by making a POST to the accounts using an admin role.
+ def post(url, token, parsed, conn):
+ conn.request('POST', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(post, use_account=use_account)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
return name
def tearDown(self):
@@ -88,14 +113,14 @@ class TestObject(unittest.TestCase):
body = resp.read()
if resp.status == 404:
break
- self.assert_(resp.status // 100 == 2, resp.status)
+ self.assertEqual(resp.status // 100, 2, resp.status)
objs = json.loads(body)
if not objs:
break
for obj in objs:
resp = retry(delete, container, obj)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
# delete the container
def delete(url, token, parsed, conn, name):
@@ -106,7 +131,146 @@ class TestObject(unittest.TestCase):
for container in self.containers:
resp = retry(delete, container)
resp.read()
- self.assert_(resp.status in (204, 404))
+ self.assertIn(resp.status, (204, 404))
+
+ def test_metadata(self):
+ obj = 'test_metadata'
+ req_metadata = {}
+
+ def put(url, token, parsed, conn):
+ headers = {'X-Auth-Token': token}
+ headers.update(req_metadata)
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, self.container, obj
+ ), '', headers)
+ return check_response(conn)
+
+ def get(url, token, parsed, conn):
+ conn.request(
+ 'GET',
+ '%s/%s/%s' % (parsed.path, self.container, obj),
+ '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn):
+ headers = {'X-Auth-Token': token}
+ headers.update(req_metadata)
+ conn.request('POST', '%s/%s/%s' % (
+ parsed.path, self.container, obj
+ ), '', headers)
+ return check_response(conn)
+
+ def metadata(resp):
+ metadata = {}
+ for k, v in resp.headers.items():
+ if 'meta' in k.lower():
+ metadata[k] = v
+ return metadata
+
+ # empty put
+ resp = retry(put)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+ resp = retry(get)
+ self.assertEqual('', resp.read())
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(metadata(resp), {})
+ # empty post
+ resp = retry(post)
+ resp.read()
+ self.assertEqual(resp.status, 202)
+ resp = retry(get)
+ self.assertEqual('', resp.read())
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(metadata(resp), {})
+
+ # metadata put
+ req_metadata = {
+ 'x-object-meta-Color': 'blUe',
+ 'X-Object-Meta-food': 'PizZa',
+ }
+ resp = retry(put)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+ resp = retry(get)
+ self.assertEqual('', resp.read())
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(metadata(resp), {
+ 'X-Object-Meta-Color': 'blUe',
+ 'X-Object-Meta-Food': 'PizZa',
+ })
+ # metadata post
+ req_metadata = {'X-Object-Meta-color': 'oraNge'}
+ resp = retry(post)
+ resp.read()
+ self.assertEqual(resp.status, 202)
+ resp = retry(get)
+ self.assertEqual('', resp.read())
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(metadata(resp), {
+ 'X-Object-Meta-Color': 'oraNge'
+ })
+
+ # sysmeta put
+ req_metadata = {
+ 'X-Object-Meta-Color': 'Red',
+ 'X-Object-Sysmeta-Color': 'Green',
+ 'X-Object-Transient-Sysmeta-Color': 'Blue',
+ }
+ resp = retry(put)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+ resp = retry(get)
+ self.assertEqual('', resp.read())
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(metadata(resp), {
+ 'X-Object-Meta-Color': 'Red',
+ })
+ # sysmeta post
+ req_metadata = {
+ 'X-Object-Meta-Food': 'Burger',
+ 'X-Object-Meta-Animal': 'Cat',
+ 'X-Object-Sysmeta-Animal': 'Cow',
+ 'X-Object-Transient-Sysmeta-Food': 'Burger',
+ }
+ resp = retry(post)
+ resp.read()
+ self.assertEqual(resp.status, 202)
+ resp = retry(get)
+ self.assertEqual('', resp.read())
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(metadata(resp), {
+ 'X-Object-Meta-Food': 'Burger',
+ 'X-Object-Meta-Animal': 'Cat',
+ })
+
+ # non-ascii put
+ req_metadata = {
+ 'X-Object-Meta-Foo': u'B\u00e2r',
+ }
+ resp = retry(put)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+ resp = retry(get)
+ self.assertEqual('', resp.read())
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(metadata(resp), {
+ 'X-Object-Meta-Foo': 'B\xc3\xa2r',
+ })
+ # non-ascii post
+ req_metadata = {
+ 'X-Object-Meta-Foo': u'B\u00e5z',
+ }
+ resp = retry(post)
+ resp.read()
+ self.assertEqual(resp.status, 202)
+ resp = retry(get)
+ self.assertEqual('', resp.read())
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(metadata(resp), {
+ 'X-Object-Meta-Foo': 'B\xc3\xa5z',
+ })
def test_if_none_match(self):
def put(url, token, parsed, conn):
@@ -118,10 +282,10 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 412)
+ self.assertEqual(resp.status, 412)
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
@@ -132,7 +296,148 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
+
+ def test_too_small_x_timestamp(self):
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
+ 'too_small_x_timestamp'),
+ '', {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'X-Timestamp': '-1'})
+ return check_response(conn)
+
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', '%s/%s/%s' % (parsed.path, self.container,
+ 'too_small_x_timestamp'),
+ '', {'X-Auth-Token': token,
+ 'Content-Length': '0'})
+ return check_response(conn)
+ ts_before = time.time()
+ resp = retry(put)
+ body = resp.read()
+ ts_after = time.time()
+ if resp.status == 400:
+ # shunt_inbound_x_timestamp must be false
+ self.assertIn(
+ 'X-Timestamp should be a UNIX timestamp float value', body)
+ else:
+ self.assertEqual(resp.status, 201)
+ self.assertEqual(body, '')
+ resp = retry(head)
+ resp.read()
+ self.assertGreater(float(resp.headers['x-timestamp']), ts_before)
+ self.assertLess(float(resp.headers['x-timestamp']), ts_after)
+
+ def test_too_big_x_timestamp(self):
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
+ 'too_big_x_timestamp'),
+ '', {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'X-Timestamp': '99999999999.9999999999'})
+ return check_response(conn)
+
+ def head(url, token, parsed, conn):
+ conn.request('HEAD', '%s/%s/%s' % (parsed.path, self.container,
+ 'too_big_x_timestamp'),
+ '', {'X-Auth-Token': token,
+ 'Content-Length': '0'})
+ return check_response(conn)
+ ts_before = time.time()
+ resp = retry(put)
+ body = resp.read()
+ ts_after = time.time()
+ if resp.status == 400:
+ # shunt_inbound_x_timestamp must be false
+ self.assertIn(
+ 'X-Timestamp should be a UNIX timestamp float value', body)
+ else:
+ self.assertEqual(resp.status, 201)
+ self.assertEqual(body, '')
+ resp = retry(head)
+ resp.read()
+ self.assertGreater(float(resp.headers['x-timestamp']), ts_before)
+ self.assertLess(float(resp.headers['x-timestamp']), ts_after)
+
+ def test_x_delete_after(self):
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
+ 'x_delete_after'),
+ '', {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'X-Delete-After': '1'})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ def get(url, token, parsed, conn):
+ conn.request(
+ 'GET',
+ '%s/%s/%s' % (parsed.path, self.container, 'x_delete_after'),
+ '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ resp = retry(get)
+ resp.read()
+ count = 0
+ while resp.status == 200 and count < 10:
+ resp = retry(get)
+ resp.read()
+ count += 1
+ time.sleep(1)
+
+ self.assertEqual(resp.status, 404)
+
+ # To avoid an error when the object deletion in tearDown(),
+ # the object is added again.
+ resp = retry(put)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ def test_x_delete_at(self):
+ def put(url, token, parsed, conn):
+ dt = datetime.datetime.now()
+ epoch = time.mktime(dt.timetuple())
+ delete_time = str(int(epoch) + 3)
+ conn.request(
+ 'PUT',
+ '%s/%s/%s' % (parsed.path, self.container, 'x_delete_at'),
+ '',
+ {'X-Auth-Token': token,
+ 'Content-Length': '0',
+ 'X-Delete-At': delete_time})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ def get(url, token, parsed, conn):
+ conn.request(
+ 'GET',
+ '%s/%s/%s' % (parsed.path, self.container, 'x_delete_at'),
+ '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ resp = retry(get)
+ resp.read()
+ count = 0
+ while resp.status == 200 and count < 10:
+ resp = retry(get)
+ resp.read()
+ count += 1
+ time.sleep(1)
+
+ self.assertEqual(resp.status, 404)
+
+ # To avoid an error when the object deletion in tearDown(),
+ # the object is added again.
+ resp = retry(put)
+ resp.read()
+ self.assertEqual(resp.status, 201)
def test_non_integer_x_delete_after(self):
def put(url, token, parsed, conn):
@@ -144,7 +449,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
body = resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
self.assertEqual(body, 'Non-integer X-Delete-After')
def test_non_integer_x_delete_at(self):
@@ -157,7 +462,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
body = resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
self.assertEqual(body, 'Non-integer X-Delete-At')
def test_x_delete_at_in_the_past(self):
@@ -170,7 +475,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(put)
body = resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
self.assertEqual(body, 'X-Delete-At in past')
def test_copy_object(self):
@@ -220,7 +525,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
# verify dest does not exist
resp = retry(get_dest)
resp.read()
@@ -242,10 +547,27 @@ class TestObject(unittest.TestCase):
self.assertEqual(resp.status, 200)
self.assertEqual(dest_contents, source_contents)
+ # copy source to dest with COPY and range
+ def copy(url, token, parsed, conn):
+ conn.request('COPY', '%s/%s' % (parsed.path, source), '',
+ {'X-Auth-Token': token,
+ 'Destination': dest,
+ 'Range': 'bytes=1-2'})
+ return check_response(conn)
+ resp = retry(copy)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ # contents of dest should be the same as source
+ resp = retry(get_dest)
+ dest_contents = resp.read()
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(dest_contents, source_contents[1:3])
+
# delete the copy
resp = retry(delete)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
def test_copy_between_accounts(self):
if tf.skip:
@@ -311,7 +633,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete, use_account=2)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
# verify dest does not exist
resp = retry(get_dest, use_account=2)
resp.read()
@@ -355,7 +677,7 @@ class TestObject(unittest.TestCase):
# delete the copy
resp = retry(delete, use_account=2)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
def test_public_object(self):
if tf.skip:
@@ -369,7 +691,7 @@ class TestObject(unittest.TestCase):
resp = retry(get)
raise Exception('Should not have been able to GET')
except Exception as err:
- self.assert_(str(err).startswith('No result after '))
+ self.assertTrue(str(err).startswith('No result after '))
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
@@ -394,7 +716,7 @@ class TestObject(unittest.TestCase):
resp = retry(get)
raise Exception('Should not have been able to GET')
except Exception as err:
- self.assert_(str(err).startswith('No result after '))
+ self.assertTrue(str(err).startswith('No result after '))
def test_private_object(self):
if tf.skip or tf.skip3:
@@ -477,7 +799,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
# clean up shared_container
def delete(url, token, parsed, conn):
@@ -487,7 +809,86 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
+
+ def test_container_write_only(self):
+ if tf.skip or tf.skip3:
+ raise SkipTest
+
+ # Ensure we can't access the object with the third account
+ def get(url, token, parsed, conn):
+ conn.request('GET', '%s/%s/%s' % (
+ parsed.path, self.container, self.obj), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # create a shared container writable (but not readable) by account3
+ shared_container = uuid4().hex
+
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s' % (
+ parsed.path, shared_container), '',
+ {'X-Auth-Token': token,
+ 'X-Container-Write': tf.swift_test_perm[2]})
+ return check_response(conn)
+ resp = retry(put)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ # verify third account can write "obj1" to shared container
+ def put(url, token, parsed, conn):
+ conn.request('PUT', '%s/%s/%s' % (
+ parsed.path, shared_container, 'obj1'), 'test',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(put, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 201)
+
+ # verify third account cannot copy "obj1" to shared container
+ def copy(url, token, parsed, conn):
+ conn.request('COPY', '%s/%s/%s' % (
+ parsed.path, shared_container, 'obj1'), '',
+ {'X-Auth-Token': token,
+ 'Destination': '%s/%s' % (shared_container, 'obj2')})
+ return check_response(conn)
+ resp = retry(copy, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # verify third account can POST to "obj1" in shared container
+ def post(url, token, parsed, conn):
+ conn.request('POST', '%s/%s/%s' % (
+ parsed.path, shared_container, 'obj1'), '',
+ {'X-Auth-Token': token,
+ 'X-Object-Meta-Color': 'blue'})
+ return check_response(conn)
+ resp = retry(post, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 202)
+
+ # verify third account can DELETE from shared container
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE', '%s/%s/%s' % (
+ parsed.path, shared_container, 'obj1'), '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete, use_account=3)
+ resp.read()
+ self.assertIn(resp.status, (204, 404))
+
+ # clean up shared_container
+ def delete(url, token, parsed, conn):
+ conn.request('DELETE',
+ parsed.path + '/' + shared_container, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+ resp = retry(delete)
+ resp.read()
+ self.assertIn(resp.status, (204, 404))
@requires_acls
def test_read_only(self):
@@ -525,12 +926,12 @@ class TestObject(unittest.TestCase):
# cannot list objects
resp = retry(get_listing, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# cannot get object
resp = retry(get, self.obj, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# grant read-only access
acl_user = tf.swift_test_user[2]
@@ -543,32 +944,32 @@ class TestObject(unittest.TestCase):
# can list objects
resp = retry(get_listing, use_account=3)
listing = resp.read()
- self.assertEquals(resp.status, 200)
- self.assert_(self.obj in listing)
+ self.assertEqual(resp.status, 200)
+ self.assertIn(self.obj, listing)
# can get object
resp = retry(get, self.obj, use_account=3)
body = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(body, 'test')
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(body, 'test')
# can not put an object
obj_name = str(uuid4())
resp = retry(put, obj_name, use_account=3)
body = resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# can not delete an object
resp = retry(delete, self.obj, use_account=3)
body = resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# sanity with account1
resp = retry(get_listing, use_account=3)
listing = resp.read()
- self.assertEquals(resp.status, 200)
- self.assert_(obj_name not in listing)
- self.assert_(self.obj in listing)
+ self.assertEqual(resp.status, 200)
+ self.assertNotIn(obj_name, listing)
+ self.assertIn(self.obj, listing)
@requires_acls
def test_read_write(self):
@@ -606,12 +1007,12 @@ class TestObject(unittest.TestCase):
# cannot list objects
resp = retry(get_listing, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# cannot get object
resp = retry(get, self.obj, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# grant read-write access
acl_user = tf.swift_test_user[2]
@@ -624,32 +1025,32 @@ class TestObject(unittest.TestCase):
# can list objects
resp = retry(get_listing, use_account=3)
listing = resp.read()
- self.assertEquals(resp.status, 200)
- self.assert_(self.obj in listing)
+ self.assertEqual(resp.status, 200)
+ self.assertIn(self.obj, listing)
# can get object
resp = retry(get, self.obj, use_account=3)
body = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(body, 'test')
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(body, 'test')
# can put an object
obj_name = str(uuid4())
resp = retry(put, obj_name, use_account=3)
body = resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# can delete an object
resp = retry(delete, self.obj, use_account=3)
body = resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
# sanity with account1
resp = retry(get_listing, use_account=3)
listing = resp.read()
- self.assertEquals(resp.status, 200)
- self.assert_(obj_name in listing)
- self.assert_(self.obj not in listing)
+ self.assertEqual(resp.status, 200)
+ self.assertIn(obj_name, listing)
+ self.assertNotIn(self.obj, listing)
@requires_acls
def test_admin(self):
@@ -687,12 +1088,12 @@ class TestObject(unittest.TestCase):
# cannot list objects
resp = retry(get_listing, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# cannot get object
resp = retry(get, self.obj, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# grant admin access
acl_user = tf.swift_test_user[2]
@@ -705,32 +1106,32 @@ class TestObject(unittest.TestCase):
# can list objects
resp = retry(get_listing, use_account=3)
listing = resp.read()
- self.assertEquals(resp.status, 200)
- self.assert_(self.obj in listing)
+ self.assertEqual(resp.status, 200)
+ self.assertIn(self.obj, listing)
# can get object
resp = retry(get, self.obj, use_account=3)
body = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(body, 'test')
+ self.assertEqual(resp.status, 200)
+ self.assertEqual(body, 'test')
# can put an object
obj_name = str(uuid4())
resp = retry(put, obj_name, use_account=3)
body = resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
# can delete an object
resp = retry(delete, self.obj, use_account=3)
body = resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
# sanity with account1
resp = retry(get_listing, use_account=3)
listing = resp.read()
- self.assertEquals(resp.status, 200)
- self.assert_(obj_name in listing)
- self.assert_(self.obj not in listing)
+ self.assertEqual(resp.status, 200)
+ self.assertIn(obj_name, listing)
+ self.assertNotIn(self.obj, listing)
def test_manifest(self):
if tf.skip:
@@ -746,7 +1147,7 @@ class TestObject(unittest.TestCase):
parsed.path, self.container, str(objnum)), segments1[objnum],
{'X-Auth-Token': token})
return check_response(conn)
- for objnum in xrange(len(segments1)):
+ for objnum in range(len(segments1)):
resp = retry(put, objnum)
resp.read()
self.assertEqual(resp.status, 201)
@@ -809,7 +1210,7 @@ class TestObject(unittest.TestCase):
parsed.path, self.container, str(objnum)), segments2[objnum],
{'X-Auth-Token': token})
return check_response(conn)
- for objnum in xrange(len(segments2)):
+ for objnum in range(len(segments2)):
resp = retry(put, objnum)
resp.read()
self.assertEqual(resp.status, 201)
@@ -891,7 +1292,7 @@ class TestObject(unittest.TestCase):
parsed.path, acontainer, str(objnum)), segments3[objnum],
{'X-Auth-Token': token})
return check_response(conn)
- for objnum in xrange(len(segments3)):
+ for objnum in range(len(segments3)):
resp = retry(put, objnum)
resp.read()
self.assertEqual(resp.status, 201)
@@ -958,7 +1359,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete, objnum)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
# Delete the third set of segments
def delete(url, token, parsed, conn, objnum):
@@ -966,10 +1367,10 @@ class TestObject(unittest.TestCase):
parsed.path, acontainer, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
- for objnum in xrange(len(segments3)):
+ for objnum in range(len(segments3)):
resp = retry(delete, objnum)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
# Delete the second set of segments
def delete(url, token, parsed, conn, objnum):
@@ -977,10 +1378,10 @@ class TestObject(unittest.TestCase):
parsed.path, self.container, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
- for objnum in xrange(len(segments2)):
+ for objnum in range(len(segments2)):
resp = retry(delete, objnum)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
# Delete the first set of segments
def delete(url, token, parsed, conn, objnum):
@@ -988,10 +1389,10 @@ class TestObject(unittest.TestCase):
parsed.path, self.container, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
- for objnum in xrange(len(segments1)):
+ for objnum in range(len(segments1)):
resp = retry(delete, objnum)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
# Delete the extra container
def delete(url, token, parsed, conn):
@@ -1000,7 +1401,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
def test_delete_content_type(self):
if tf.skip:
@@ -1020,7 +1421,7 @@ class TestObject(unittest.TestCase):
return check_response(conn)
resp = retry(delete)
resp.read()
- self.assertEqual(resp.status, 204)
+ self.assertIn(resp.status, (204, 404))
self.assertEqual(resp.getheader('Content-Type'),
'text/html; charset=UTF-8')
@@ -1095,78 +1496,78 @@ class TestObject(unittest.TestCase):
resp = retry(put_cors_cont, '*')
resp.read()
- self.assertEquals(resp.status // 100, 2)
+ self.assertEqual(resp.status // 100, 2)
resp = retry(put_obj, 'cat')
resp.read()
- self.assertEquals(resp.status // 100, 2)
+ self.assertEqual(resp.status // 100, 2)
resp = retry(check_cors,
'OPTIONS', 'cat', {'Origin': 'http://m.com'})
- self.assertEquals(resp.status, 401)
+ self.assertEqual(resp.status, 401)
resp = retry(check_cors,
'OPTIONS', 'cat',
{'Origin': 'http://m.com',
'Access-Control-Request-Method': 'GET'})
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.status, 200)
resp.read()
headers = dict((k.lower(), v) for k, v in resp.getheaders())
- self.assertEquals(headers.get('access-control-allow-origin'),
- '*')
+ self.assertEqual(headers.get('access-control-allow-origin'),
+ '*')
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com'})
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
- self.assertEquals(headers.get('access-control-allow-origin'),
- '*')
+ self.assertEqual(headers.get('access-control-allow-origin'),
+ '*')
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com',
'X-Web-Mode': 'True'})
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
- self.assertEquals(headers.get('access-control-allow-origin'),
- '*')
+ self.assertEqual(headers.get('access-control-allow-origin'),
+ '*')
####################
resp = retry(put_cors_cont, 'http://secret.com')
resp.read()
- self.assertEquals(resp.status // 100, 2)
+ self.assertEqual(resp.status // 100, 2)
resp = retry(check_cors,
'OPTIONS', 'cat',
{'Origin': 'http://m.com',
'Access-Control-Request-Method': 'GET'})
resp.read()
- self.assertEquals(resp.status, 401)
+ self.assertEqual(resp.status, 401)
if strict_cors:
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com'})
resp.read()
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
- self.assertTrue('access-control-allow-origin' not in headers)
+ self.assertNotIn('access-control-allow-origin', headers)
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://secret.com'})
resp.read()
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
- self.assertEquals(headers.get('access-control-allow-origin'),
- 'http://secret.com')
+ self.assertEqual(headers.get('access-control-allow-origin'),
+ 'http://secret.com')
else:
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com'})
resp.read()
- self.assertEquals(resp.status, 200)
+ self.assertEqual(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
- self.assertEquals(headers.get('access-control-allow-origin'),
- 'http://m.com')
+ self.assertEqual(headers.get('access-control-allow-origin'),
+ 'http://m.com')
@requires_policies
def test_cross_policy_copy(self):
@@ -1228,4 +1629,4 @@ class TestObject(unittest.TestCase):
if __name__ == '__main__':
- unittest.main()
+ unittest2.main()