summaryrefslogtreecommitdiffstats
path: root/test/functional/test_container.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/functional/test_container.py')
-rwxr-xr-xtest/functional/test_container.py833
1 files changed, 752 insertions, 81 deletions
diff --git a/test/functional/test_container.py b/test/functional/test_container.py
index 15f7fc1..91702e9 100755
--- a/test/functional/test_container.py
+++ b/test/functional/test_container.py
@@ -24,7 +24,7 @@ from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
from swift_testing import check_response, retry, skip, skip2, skip3, \
- swift_test_perm, web_front_end
+ swift_test_perm, web_front_end, requires_acls, swift_test_user
class TestContainer(unittest.TestCase):
@@ -41,7 +41,7 @@ class TestContainer(unittest.TestCase):
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
def tearDown(self):
if skip:
@@ -68,7 +68,7 @@ class TestContainer(unittest.TestCase):
for obj in objs:
resp = retry(delete, obj)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def delete(url, token, parsed, conn):
conn.request('DELETE', parsed.path + '/' + self.name, '',
@@ -77,7 +77,7 @@ class TestContainer(unittest.TestCase):
resp = retry(delete)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def test_multi_metadata(self):
if skip:
@@ -95,19 +95,19 @@ class TestContainer(unittest.TestCase):
resp = retry(post, 'X-Container-Meta-One', '1')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-one'), '1')
+ self.assertEqual(resp.getheader('x-container-meta-one'), '1')
resp = retry(post, 'X-Container-Meta-Two', '2')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-one'), '1')
- self.assertEquals(resp.getheader('x-container-meta-two'), '2')
+ self.assertEqual(resp.getheader('x-container-meta-one'), '1')
+ self.assertEqual(resp.getheader('x-container-meta-two'), '2')
def test_unicode_metadata(self):
if skip:
@@ -128,28 +128,28 @@ class TestContainer(unittest.TestCase):
if (web_front_end == 'integral'):
resp = retry(post, uni_key, '1')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')), '1')
+ self.assertEqual(resp.getheader(uni_key.encode('utf-8')), '1')
resp = retry(post, 'X-Container-Meta-uni', uni_value)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('X-Container-Meta-uni'),
- uni_value.encode('utf-8'))
+ self.assertEqual(resp.getheader('X-Container-Meta-uni'),
+ uni_value.encode('utf-8'))
if (web_front_end == 'integral'):
resp = retry(post, uni_key, uni_value)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')),
- uni_value.encode('utf-8'))
+ self.assertEqual(resp.getheader(uni_key.encode('utf-8')),
+ uni_value.encode('utf-8'))
def test_PUT_metadata(self):
if skip:
@@ -179,34 +179,34 @@ class TestContainer(unittest.TestCase):
name = uuid4().hex
resp = retry(put, name, 'Value')
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(head, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(get, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
name = uuid4().hex
resp = retry(put, name, '')
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(head, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ self.assertEqual(resp.getheader('x-container-meta-test'), None)
resp = retry(get, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ self.assertEqual(resp.getheader('x-container-meta-test'), None)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def test_POST_metadata(self):
if skip:
@@ -231,22 +231,22 @@ class TestContainer(unittest.TestCase):
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ self.assertEqual(resp.getheader('x-container-meta-test'), None)
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
+ self.assertEqual(resp.getheader('x-container-meta-test'), None)
resp = retry(post, 'Value')
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
+ self.assertEqual(resp.getheader('x-container-meta-test'), 'Value')
def test_PUT_bad_metadata(self):
if skip:
@@ -268,38 +268,38 @@ class TestContainer(unittest.TestCase):
put, name,
{'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
name = uuid4().hex
resp = retry(
put, name,
{'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
name = uuid4().hex
resp = retry(
put, name,
{'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
name = uuid4().hex
resp = retry(
put, name,
{'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
name = uuid4().hex
headers = {}
@@ -307,20 +307,20 @@ class TestContainer(unittest.TestCase):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
name = uuid4().hex
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
name = uuid4().hex
headers = {}
@@ -336,19 +336,19 @@ class TestContainer(unittest.TestCase):
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(put, name, headers)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
name = uuid4().hex
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(put, name, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(delete, name)
resp.read()
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
def test_POST_bad_metadata(self):
if skip:
@@ -364,36 +364,56 @@ class TestContainer(unittest.TestCase):
post,
{'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(
post,
{'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
resp = retry(
post,
{'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(
post,
{'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
+
+ def test_POST_bad_metadata2(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, extra_headers):
+ headers = {'X-Auth-Token': token}
+ headers.update(extra_headers)
+ conn.request('POST', parsed.path + '/' + self.name, '', headers)
+ return check_response(conn)
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
+
+ def test_POST_bad_metadata3(self):
+ if skip:
+ raise SkipTest
+
+ def post(url, token, parsed, conn, extra_headers):
+ headers = {'X-Auth-Token': token}
+ headers.update(extra_headers)
+ conn.request('POST', parsed.path + '/' + self.name, '', headers)
+ return check_response(conn)
headers = {}
header_value = 'k' * MAX_META_VALUE_LENGTH
@@ -408,12 +428,12 @@ class TestContainer(unittest.TestCase):
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(post, headers)
resp.read()
- self.assertEquals(resp.status, 400)
+ self.assertEqual(resp.status, 400)
def test_public_container(self):
if skip:
@@ -437,10 +457,10 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
resp = retry(get)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
@@ -449,7 +469,7 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
@@ -479,7 +499,7 @@ class TestContainer(unittest.TestCase):
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Make the container accessible by the second account
def post(url, token, parsed, conn):
@@ -491,11 +511,11 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can now use the container with the second account
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Make the container private again
def post(url, token, parsed, conn):
@@ -506,11 +526,11 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can't access the container with the second account again
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
def test_cross_account_public_container(self):
if skip or skip2:
@@ -535,7 +555,7 @@ class TestContainer(unittest.TestCase):
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Make the container completely public
def post(url, token, parsed, conn):
@@ -546,11 +566,11 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can now read the container with the second account
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# But we shouldn't be able to write with the second account
def put2(url, token, parsed, conn):
@@ -560,7 +580,7 @@ class TestContainer(unittest.TestCase):
resp = retry(put2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Now make the container also writeable by the second account
def post(url, token, parsed, conn):
@@ -571,15 +591,15 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can still read the container with the second account
resp = retry(get2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# And that we can now write with the second account
resp = retry(put2, use_account=2)
resp.read()
- self.assertEquals(resp.status, 201)
+ self.assertEqual(resp.status, 201)
def test_nonadmin_user(self):
if skip or skip3:
@@ -604,7 +624,7 @@ class TestContainer(unittest.TestCase):
resp = retry(get3, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Make the container accessible by the third account
def post(url, token, parsed, conn):
@@ -615,11 +635,11 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can now read the container with the third account
resp = retry(get3, use_account=3)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# But we shouldn't be able to write with the third account
def put3(url, token, parsed, conn):
@@ -629,7 +649,7 @@ class TestContainer(unittest.TestCase):
resp = retry(put3, use_account=3)
resp.read()
- self.assertEquals(resp.status, 403)
+ self.assertEqual(resp.status, 403)
# Now make the container also writeable by the third account
def post(url, token, parsed, conn):
@@ -640,15 +660,666 @@ class TestContainer(unittest.TestCase):
resp = retry(post)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# Ensure we can still read the container with the third account
resp = retry(get3, use_account=3)
resp.read()
- self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.status, 204)
# And that we can now write with the third account
resp = retry(put3, use_account=3)
resp.read()
+ self.assertEqual(resp.status, 201)
+
+ @requires_acls
+ def test_read_only_acl_listings(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list containers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-only access
+ acl_user = swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-only can list containers
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.name in listing)
+
+ # read-only can not create containers
+ new_container_name = str(uuid4())
+ resp = retry(put, new_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # but it can see newly created ones
+ resp = retry(put, new_container_name, use_account=1)
+ resp.read()
self.assertEquals(resp.status, 201)
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(new_container_name in listing)
+
+ @requires_acls
+ def test_read_only_acl_metadata(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, name, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
+ return check_response(conn)
+
+ # add some metadata
+ value = str(uuid4())
+ headers = {'x-container-meta-test': value}
+ resp = retry(post, self.name, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # cannot see metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-only access
+ acl_user = swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-only can NOT write container metadata
+ new_value = str(uuid4())
+ headers = {'x-container-meta-test': new_value}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # read-only can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ @requires_acls
+ def test_read_write_acl_listings(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, name):
+ conn.request('DELETE', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list containers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can list containers
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.name in listing)
+
+ # can create new containers
+ new_container_name = str(uuid4())
+ resp = retry(put, new_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(new_container_name in listing)
+
+ # can also delete them
+ resp = retry(delete, new_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(new_container_name not in listing)
+
+ # even if they didn't create them
+ empty_container_name = str(uuid4())
+ resp = retry(put, empty_container_name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(delete, empty_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ @requires_acls
+ def test_read_write_acl_metadata(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, name, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
+ return check_response(conn)
+
+ # add some metadata
+ value = str(uuid4())
+ headers = {'x-container-meta-test': value}
+ resp = retry(post, self.name, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # cannot see metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # read-write can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # read-write can also write container metadata
+ new_value = str(uuid4())
+ headers = {'x-container-meta-test': new_value}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+
+ # and remove it
+ headers = {'x-remove-container-meta-test': 'true'}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), None)
+
+ @requires_acls
+ def test_admin_acl_listing(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn):
+ conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def put(url, token, parsed, conn, name):
+ conn.request('PUT', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def delete(url, token, parsed, conn, name):
+ conn.request('DELETE', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ # cannot list containers
+ resp = retry(get, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant admin access
+ acl_user = swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can list containers
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(self.name in listing)
+
+ # can create new containers
+ new_container_name = str(uuid4())
+ resp = retry(put, new_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(new_container_name in listing)
+
+ # can also delete them
+ resp = retry(delete, new_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, use_account=3)
+ listing = resp.read()
+ self.assertEquals(resp.status, 200)
+ self.assert_(new_container_name not in listing)
+
+ # even if they didn't create them
+ empty_container_name = str(uuid4())
+ resp = retry(put, empty_container_name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 201)
+ resp = retry(delete, empty_container_name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+
+ @requires_acls
+ def test_admin_acl_metadata(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, name, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
+ return check_response(conn)
+
+ # add some metadata
+ value = str(uuid4())
+ headers = {'x-container-meta-test': value}
+ resp = retry(post, self.name, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # cannot see metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 403)
+
+ # grant access
+ acl_user = swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # can also write container metadata
+ new_value = str(uuid4())
+ headers = {'x-container-meta-test': new_value}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+
+ # and remove it
+ headers = {'x-remove-container-meta-test': 'true'}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), None)
+
+ @requires_acls
+ def test_protected_container_sync(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, name, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
+ return check_response(conn)
+
+ # add some metadata
+ value = str(uuid4())
+ headers = {
+ 'x-container-sync-key': 'secret',
+ 'x-container-meta-test': value,
+ }
+ resp = retry(post, self.name, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # grant read-only access
+ acl_user = swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+ # but not sync-key
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), None)
+
+ # and can not write
+ headers = {'x-container-sync-key': str(uuid4())}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+ # but not sync-key
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), None)
+
+ # sanity check sync-key w/ account1
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
+
+ # and can write
+ new_value = str(uuid4())
+ headers = {
+ 'x-container-sync-key': str(uuid4()),
+ 'x-container-meta-test': new_value,
+ }
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1) # validate w/ account1
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+ # but can not write sync-key
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
+
+ # grant admin access
+ acl_user = swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # admin can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+ # and ALSO sync-key
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), 'secret')
+
+ # admin tester3 can even change sync-key
+ new_secret = str(uuid4())
+ headers = {'x-container-sync-key': new_secret}
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Sync-Key'), new_secret)
+
+ @requires_acls
+ def test_protected_container_acl(self):
+ if skip3:
+ raise SkipTest
+
+ def get(url, token, parsed, conn, name):
+ conn.request('GET', parsed.path + '/%s' % name, '',
+ {'X-Auth-Token': token})
+ return check_response(conn)
+
+ def post_account(url, token, parsed, conn, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path, '', new_headers)
+ return check_response(conn)
+
+ def post(url, token, parsed, conn, name, headers):
+ new_headers = dict({'X-Auth-Token': token}, **headers)
+ conn.request('POST', parsed.path + '/%s' % name, '', new_headers)
+ return check_response(conn)
+
+ # add some container acls
+ value = str(uuid4())
+ headers = {
+ 'x-container-read': 'jdoe',
+ 'x-container-write': 'jdoe',
+ 'x-container-meta-test': value,
+ }
+ resp = retry(post, self.name, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
+ self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+
+ # grant read-only access
+ acl_user = swift_test_user[2]
+ acl = {'read-only': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+ # but not container acl
+ self.assertEqual(resp.getheader('X-Container-Read'), None)
+ self.assertEqual(resp.getheader('X-Container-Write'), None)
+
+ # and can not write
+ headers = {
+ 'x-container-read': 'frank',
+ 'x-container-write': 'frank',
+ }
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 403)
+
+ # grant read-write access
+ acl_user = swift_test_user[2]
+ acl = {'read-write': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), value)
+ # but not container acl
+ self.assertEqual(resp.getheader('X-Container-Read'), None)
+ self.assertEqual(resp.getheader('X-Container-Write'), None)
+
+ # sanity check container acls with account1
+ resp = retry(get, self.name, use_account=1)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
+ self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
+
+ # and can write
+ new_value = str(uuid4())
+ headers = {
+ 'x-container-read': 'frank',
+ 'x-container-write': 'frank',
+ 'x-container-meta-test': new_value,
+ }
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=1) # validate w/ account1
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+ # but can not write container acls
+ self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
+ self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
+
+ # grant admin access
+ acl_user = swift_test_user[2]
+ acl = {'admin': [acl_user]}
+ headers = {'x-account-access-control': json.dumps(acl)}
+ resp = retry(post_account, headers=headers, use_account=1)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+
+ # admin can read container metadata
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Meta-Test'), new_value)
+ # and ALSO container acls
+ self.assertEqual(resp.getheader('X-Container-Read'), 'jdoe')
+ self.assertEqual(resp.getheader('X-Container-Write'), 'jdoe')
+
+ # admin tester3 can even change container acls
+ new_value = str(uuid4())
+ headers = {
+ 'x-container-read': '.r:*',
+ }
+ resp = retry(post, self.name, headers=headers, use_account=3)
+ resp.read()
+ self.assertEqual(resp.status, 204)
+ resp = retry(get, self.name, use_account=3)
+ resp.read()
+ self.assertEquals(resp.status, 204)
+ self.assertEqual(resp.getheader('X-Container-Read'), '.r:*')
def test_long_name_content_type(self):
if skip:
@@ -662,9 +1333,9 @@ class TestContainer(unittest.TestCase):
resp = retry(put)
resp.read()
- self.assertEquals(resp.status, 400)
- self.assertEquals(resp.getheader('Content-Type'),
- 'text/html; charset=UTF-8')
+ self.assertEqual(resp.status, 400)
+ self.assertEqual(resp.getheader('Content-Type'),
+ 'text/html; charset=UTF-8')
def test_null_name(self):
if skip:
@@ -677,10 +1348,10 @@ class TestContainer(unittest.TestCase):
resp = retry(put)
if (web_front_end == 'apache2'):
- self.assertEquals(resp.status, 404)
+ self.assertEqual(resp.status, 404)
else:
- self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
- self.assertEquals(resp.status, 412)
+ self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL')
+ self.assertEqual(resp.status, 412)
if __name__ == '__main__':