summaryrefslogtreecommitdiffstats
path: root/gluster/swift/common/middleware/gswauth
diff options
context:
space:
mode:
authorThiago da Silva <thiago@redhat.com>2014-04-22 14:15:02 -0400
committerPrashanth Pai <ppai@redhat.com>2016-01-06 07:53:12 -0800
commit2a8f9f0f530327039c32e444b6a27130b12666bd (patch)
treee24e38b5b3c0245a0acafc63fc50bacbf7de718a /gluster/swift/common/middleware/gswauth
parent4c6ca1db931377b75583f61a7bca262cfc27b0fa (diff)
Update repo
This is a squashed commit imported from this repo: https://github.com/openstack/swiftonfile/tree/icehouse Contains the follwing commits from above mentioned repo: eb50236 Merge "Backport: Fix metadata overall limits bug" into icehouse 79ea52a Backport: Fix metadata overall limits bug bc43f0b Fix inconsistent data being returned on GET ad0bb79 Import HTTPBadRequest from swift's module 74d02e6 Exclude .trashcan dir from container listing b2dbc15 Catch ESTALE in addition to ENOENT 8d60b48 Properly handle read_metadata() exceptions 6762fc6 Fix object server leaking file descriptors 2842e82 Fix API incompatibility in update_metadata() 2beeef6 Merge "Remove swiftkerbauth code" into icehouse 93dbcb5 Update object-expirer.conf with explanations c9d2f09 Merge "Check if /etc/swift exists in ring builder" into icehouse d66c14c Remove swiftkerbauth code 3142ed2 Add object expiration functests 97153d1 Merge "Cleanup functest and undo old patch" into icehouse bc234d0 Remove old travis config file and fix typo 260c8ef Check if /etc/swift exists in ring builder 637dac9 Cleanup functest and undo old patch 051e068 Merge pull request #35 from prashanthpai/backport-1 be104a3 Merge pull request #36 from prashanthpai/backport-2 ff76f42 fix issue with GET on large object (icehouse-backport) 04d0a99 Fix unlink call after successful rename 4c6ca1d updating README file with project name change 10b2680 Merge pull request #18 from thiagol11/icehouse 5bcab8f Updating version on __init__ file 5c2cba2 Merge pull request #15 from thiagol11/update_spec 52b00a8 updating spec file to add dependency on swift icehouse ae7c93b Merge pull request #6 from prashanthpai/rebase 191e55b Revert: allow non-root user to run functests cb7e968 Modify unit tests and func tests d23fd1b Sync with OpenStack Swift v1.13.1 b6d1671 Merge pull request #12 from pushpesh/functionalnosetestremove 962622b Merge pull request #8 from thiagol11/update_readme 4560857 Merge pull request #9 from prashanthpai/spec-expirer be0ae7e Minor update 65000f1 Removing functionalnosetests 8ab1069 Fix object-expirer.conf-gluster RPM build error afee30f added new support filesystem section 527b01f updated README.md to Swift-On-File 9a240c7 Merge pull request #3 from thiagol11/add_jenkins_to_travis 34b5a8b removing blank lines 3568b64 fixing missing fi d8f5b0f adding support to run jenkins triggered by travis 6f4a88c Removing functionalnosetests 8041944 Update README.md c015148 Merge pull request #2 from thiagol11/master 3ddd952 fixing travis file to run correct unit test c582669 adding travis status badge to README 8093096 adding py26 unit testing to travis 37835fd trigger travis build cb6332a adding travis ci testing All tests have been run sucessfully against this. tox -e p2p8,py27,functest Change-Id: I096b611da852d3eb3913844034b443b8272c2ac4 Signed-off-by: Prashanth Pai <ppai@redhat.com> Reviewed-on: http://review.gluster.org/13188
Diffstat (limited to 'gluster/swift/common/middleware/gswauth')
-rw-r--r--gluster/swift/common/middleware/gswauth/swauth/middleware.py33
-rw-r--r--gluster/swift/common/middleware/gswauth/test_swauth/__init__.py6
-rw-r--r--gluster/swift/common/middleware/gswauth/test_swauth/unit/__init__.py0
-rw-r--r--gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py63
-rw-r--r--gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py4519
5 files changed, 17 insertions, 4604 deletions
diff --git a/gluster/swift/common/middleware/gswauth/swauth/middleware.py b/gluster/swift/common/middleware/gswauth/swauth/middleware.py
index 314eedb..cdcc638 100644
--- a/gluster/swift/common/middleware/gswauth/swauth/middleware.py
+++ b/gluster/swift/common/middleware/gswauth/swauth/middleware.py
@@ -241,7 +241,7 @@ class Swauth(object):
version, rest = split_path(env.get('PATH_INFO', ''),
1, 2, True)
except ValueError:
- version, rest = None, None
+ rest = None
if rest and rest.startswith(self.reseller_prefix):
# Handle anonymous access to accounts I'm the definitive
# auth for.
@@ -693,7 +693,7 @@ class Swauth(object):
return HTTPBadRequest(request=req)
try:
new_services = json.loads(req.body)
- except ValueError, err:
+ except ValueError as err:
return HTTPBadRequest(body=str(err))
# Get the current services information
path = quote('/v1/%s/%s/.services' % (self.auth_account, account))
@@ -1405,7 +1405,7 @@ class Swauth(object):
memcache_key,
(self.itoken_expires,
'%s,.reseller_admin,%s' % (self.metadata_volume,
- self.auth_account)),
+ self.auth_account)),
timeout=self.token_life)
return self.itoken
@@ -1589,19 +1589,20 @@ class Swauth(object):
if getattr(req, 'client_disconnect', False) or \
getattr(response, 'client_disconnect', False):
status_int = 499
- self.logger.info(
- ' '.join(quote(str(x)) for x in (client or '-',
- req.remote_addr or '-', strftime('%d/%b/%Y/%H/%M/%S', gmtime()),
- req.method, the_request, req.environ['SERVER_PROTOCOL'],
- status_int, req.referer or '-', req.user_agent or '-',
- req.headers.get(
- 'x-auth-token',
- req.headers.get('x-auth-admin-user', '-')),
- getattr(req, 'bytes_transferred', 0) or '-',
- getattr(response, 'bytes_transferred', 0) or '-',
- req.headers.get('etag', '-'),
- req.headers.get('x-trans-id', '-'), logged_headers or '-',
- trans_time)))
+ self.logger.info(' '.join(quote(str(x)) for x in
+ (client or '-',
+ req.remote_addr or '-', strftime('%d/%b/%Y/%H/%M/%S',
+ gmtime()),
+ req.method, the_request,
+ req.environ['SERVER_PROTOCOL'], status_int,
+ req.referer or '-', req.user_agent or '-',
+ req.headers.get('x-auth-token', req.headers.get(
+ 'x-auth-admin-user', '-')),
+ getattr(req, 'bytes_transferred', 0) or '-',
+ getattr(response, 'bytes_transferred', 0) or '-',
+ req.headers.get('etag', '-'),
+ req.headers.get('x-trans-id', '-'), logged_headers
+ or '-', trans_time)))
def filter_factory(global_conf, **local_conf):
diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py b/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py
deleted file mode 100644
index f53bc5a..0000000
--- a/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-# See http://code.google.com/p/python-nose/issues/detail?id=373
-# The code below enables nosetests to work with i18n _() blocks
-
-import __builtin__
-
-setattr(__builtin__, '_', lambda x: x)
diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/unit/__init__.py b/gluster/swift/common/middleware/gswauth/test_swauth/unit/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/gluster/swift/common/middleware/gswauth/test_swauth/unit/__init__.py
+++ /dev/null
diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py b/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py
deleted file mode 100644
index d9b7b55..0000000
--- a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Pablo Llopis 2011
-
-import unittest
-from swauth import authtypes
-
-
-class TestPlaintext(unittest.TestCase):
-
- def setUp(self):
- self.auth_encoder = authtypes.Plaintext()
-
- def test_plaintext_encode(self):
- enc_key = self.auth_encoder.encode('keystring')
- self.assertEquals('plaintext:keystring', enc_key)
-
- def test_plaintext_valid_match(self):
- creds = 'plaintext:keystring'
- match = self.auth_encoder.match('keystring', creds)
- self.assertEquals(match, True)
-
- def test_plaintext_invalid_match(self):
- creds = 'plaintext:other-keystring'
- match = self.auth_encoder.match('keystring', creds)
- self.assertEquals(match, False)
-
-
-class TestSha1(unittest.TestCase):
-
- def setUp(self):
- self.auth_encoder = authtypes.Sha1()
- self.auth_encoder.salt = 'salt'
-
- def test_sha1_encode(self):
- enc_key = self.auth_encoder.encode('keystring')
- self.assertEquals('sha1:salt$d50dc700c296e23ce5b41f7431a0e01f69010f06',
- enc_key)
-
- def test_sha1_valid_match(self):
- creds = 'sha1:salt$d50dc700c296e23ce5b41f7431a0e01f69010f06'
- match = self.auth_encoder.match('keystring', creds)
- self.assertEquals(match, True)
-
- def test_sha1_invalid_match(self):
- creds = 'sha1:salt$deadbabedeadbabedeadbabec0ffeebadc0ffeee'
- match = self.auth_encoder.match('keystring', creds)
- self.assertEquals(match, False)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py b/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py
deleted file mode 100644
index 62259ff..0000000
--- a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py
+++ /dev/null
@@ -1,4519 +0,0 @@
-# Copyright (c) 2010-2011 OpenStack, LLC.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-try:
- import simplejson as json
-except ImportError:
- import json
-import unittest
-from contextlib import contextmanager
-from time import time
-
-from swift.common.swob import Request, Response
-
-from swauth import middleware as auth
-from swauth.authtypes import MAX_TOKEN_LENGTH
-
-
-DEFAULT_TOKEN_LIFE = 86400
-MAX_TOKEN_LIFE = 100000
-
-
-class FakeMemcache(object):
-
- def __init__(self):
- self.store = {}
-
- def get(self, key):
- return self.store.get(key)
-
- def set(self, key, value, timeout=0, time=0):
- self.store[key] = value
- return True
-
- def incr(self, key, timeout=0, time=0):
- self.store[key] = self.store.setdefault(key, 0) + 1
- return self.store[key]
-
- @contextmanager
- def soft_lock(self, key, timeout=0, retries=5, time=0):
- yield True
-
- def delete(self, key):
- try:
- del self.store[key]
- except Exception:
- pass
- return True
-
-
-class FakeApp(object):
-
- def __init__(
- self, status_headers_body_iter=None, acl=None, sync_key=None):
- self.calls = 0
- self.status_headers_body_iter = status_headers_body_iter
- if not self.status_headers_body_iter:
- self.status_headers_body_iter = iter(
- [('404 Not Found', {}, '')])
- self.acl = acl
- self.sync_key = sync_key
-
- def __call__(self, env, start_response):
- self.calls += 1
- self.request = Request.blank('', environ=env)
- if self.acl:
- self.request.acl = self.acl
- if self.sync_key:
- self.request.environ[
- 'swift_sync_key'] = self.sync_key
- if 'swift.authorize' in env:
- resp = env['swift.authorize'](self.request)
- if resp:
- return resp(env, start_response)
- status, headers, body = self.status_headers_body_iter.next(
- )
- return Response(status=status, headers=headers,
- body=body)(env, start_response)
-
-
-class FakeConn(object):
-
- def __init__(self, status_headers_body_iter=None):
- self.calls = 0
- self.status_headers_body_iter = status_headers_body_iter
- if not self.status_headers_body_iter:
- self.status_headers_body_iter = iter(
- [('404 Not Found', {}, '')])
-
- def request(self, method, path, headers):
- self.calls += 1
- self.request_path = path
- self.status, self.headers, self.body = \
- self.status_headers_body_iter.next()
- self.status, self.reason = self.status.split(' ', 1)
- self.status = int(self.status)
-
- def getresponse(self):
- return self
-
- def read(self):
- body = self.body
- self.body = ''
- return body
-
-
-class TestAuth(unittest.TestCase):
-
- def setUp(self):
- self.test_auth = \
- auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'token_life': str(DEFAULT_TOKEN_LIFE),
- 'max_token_life': str(MAX_TOKEN_LIFE)})(FakeApp())
-
- def test_super_admin_key_not_required(self):
- auth.filter_factory({})(FakeApp())
-
- def test_reseller_prefix_init(self):
- app = FakeApp()
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest'})(app)
- self.assertEquals(ath.reseller_prefix, 'AUTH_')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': 'TEST'})(app)
- self.assertEquals(ath.reseller_prefix, 'TEST_')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': 'TEST_'})(app)
- self.assertEquals(ath.reseller_prefix, 'TEST_')
-
- def test_auth_prefix_init(self):
- app = FakeApp()
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest'})(app)
- self.assertEquals(ath.auth_prefix, '/auth/')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'auth_prefix': ''})(app)
- self.assertEquals(ath.auth_prefix, '/auth/')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'auth_prefix': '/test/'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'auth_prefix': '/test'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'auth_prefix': 'test/'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'auth_prefix': 'test'})(app)
- self.assertEquals(ath.auth_prefix, '/test/')
-
- def test_no_auth_type_init(self):
- app = FakeApp()
- ath = auth.filter_factory({})(app)
- self.assertEquals(ath.auth_type, 'Plaintext')
-
- def test_valid_auth_type_init(self):
- app = FakeApp()
- ath = auth.filter_factory(
- {'auth_type': 'sha1'})(app)
- self.assertEquals(ath.auth_type, 'Sha1')
- ath = auth.filter_factory(
- {'auth_type': 'plaintext'})(app)
- self.assertEquals(ath.auth_type, 'Plaintext')
-
- def test_invalid_auth_type_init(self):
- app = FakeApp()
- exc = None
- try:
- auth.filter_factory(
- {'auth_type': 'NONEXISTANT'})(app)
- except Exception as err:
- exc = err
- self.assertEquals(str(exc),
- 'Invalid auth_type in config file: %s' %
- 'Nonexistant')
-
- def test_default_swift_cluster_init(self):
- app = FakeApp()
- self.assertRaises(Exception, auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#badscheme://host/path'}), app)
- ath = auth.filter_factory(
- {'super_admin_key': 'supertest'})(app)
- self.assertEquals(ath.default_swift_cluster,
- 'local#http://127.0.0.1:8080/v1')
- ath = auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#http://host/path'})(app)
- self.assertEquals(ath.default_swift_cluster,
- 'local#http://host/path')
- ath = auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#https://host/path/'})(app)
- self.assertEquals(ath.dsc_url, 'https://host/path')
- self.assertEquals(ath.dsc_url2, 'https://host/path')
- ath = auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'default_swift_cluster':
- 'local#https://host/path/#http://host2/path2/'})(app)
- self.assertEquals(ath.dsc_url, 'https://host/path')
- self.assertEquals(
- ath.dsc_url2,
- 'http://host2/path2')
-
- def test_top_level_denied(self):
- resp = Request.blank(
- '/').get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_anon(self):
- resp = Request.blank(
- '/v1/AUTH_account').get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(resp.environ['swift.authorize'],
- self.test_auth.authorize)
-
- def test_auth_deny_non_reseller_prefix(self):
- resp = Request.blank(
- '/v1/BLAH_account',
- headers={'X-Auth-Token': 'BLAH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(resp.environ['swift.authorize'],
- self.test_auth.denied_response)
-
- def test_auth_deny_non_reseller_prefix_no_override(
- self):
- fake_authorize = lambda x: Response(
- status='500 Fake')
- resp = Request.blank(
- '/v1/BLAH_account',
- headers={'X-Auth-Token': 'BLAH_t'},
- environ={'swift.authorize': fake_authorize}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(resp.environ['swift.authorize'], fake_authorize)
-
- def test_auth_no_reseller_prefix_deny(self):
- # Ensures that when we have no reseller prefix, we don't deny a request
- # outright but set up a denial swift.authorize and pass the request on
- # down the chain.
- local_app = FakeApp()
- local_auth = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': ''})(local_app)
- resp = Request.blank(
- '/v1/account',
- headers={'X-Auth-Token': 't'}).get_response(local_auth)
- self.assertEquals(resp.status_int, 401)
- # one for checking auth, two for request passed
- # along
- self.assertEquals(local_app.calls, 2)
- self.assertEquals(resp.environ['swift.authorize'],
- local_auth.denied_response)
-
- def test_auth_no_reseller_prefix_allow(self):
- # Ensures that when we have no reseller prefix, we can still allow
- # access if our auth server accepts requests
- local_app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- local_auth = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': ''})(local_app)
- resp = Request.blank(
- '/v1/act',
- headers={'X-Auth-Token': 't'}).get_response(local_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(local_app.calls, 2)
- self.assertEquals(resp.environ['swift.authorize'],
- local_auth.authorize)
-
- def test_auth_no_reseller_prefix_no_token(self):
- # Check that normally we set up a call back to our
- # authorize.
- local_auth = \
- auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': ''})(FakeApp(iter([])))
- resp = Request.blank(
- '/v1/account').get_response(
- local_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(
- resp.environ['swift.authorize'], local_auth.authorize)
- # Now make sure we don't override an existing swift.authorize when we
- # have no reseller prefix.
- local_auth = \
- auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'reseller_prefix': ''})(FakeApp())
- local_authorize = lambda req: Response('test')
- resp = Request.blank(
- '/v1/account', environ={'swift.authorize':
- local_authorize}).get_response(local_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- resp.environ['swift.authorize'],
- local_authorize)
-
- def test_auth_fail(self):
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_auth_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_auth_memcache(self):
- # First run our test without memcache, showing we need to return the
- # token contents twice.
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, ''),
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 4)
- # Now run our test with memcache, showing we no longer need to return
- # the token contents twice.
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, ''),
- # Don't need a second token object returned if memcache is
- # used
- ('204 No Content', {}, '')]))
- fake_memcache = FakeMemcache()
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'},
- environ={'swift.cache': fake_memcache}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 204)
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'},
- environ={'swift.cache': fake_memcache}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_auth_just_expired(self):
- self.test_auth.app = FakeApp(iter([
- # Request for token (which will have expired)
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() - 1})),
- # Request to delete token
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_middleware_storage_token(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Storage-Token': 'AUTH_t'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_authorize_bad_path(self):
- req = Request.blank('/badpath')
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 401)
- req = Request.blank('/badpath')
- req.remote_user = 'act:usr,act,AUTH_cfa'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_authorize_account_access(self):
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act,AUTH_cfa'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_authorize_acl_group_access(self):
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act:usr'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act2'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = Request.blank('/v1/AUTH_cfa')
- req.remote_user = 'act:usr,act'
- req.acl = 'act:usr2'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_deny_cross_reseller(self):
- # Tests that cross-reseller is denied, even if ACLs/group
- # names match
- req = Request.blank('/v1/OTHER_cfa')
- req.remote_user = 'act:usr,act,AUTH_cfa'
- req.acl = 'act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_authorize_acl_referrer_access(self):
- req = Request.blank('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:*,.rlistings'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:*' # No listings allowed
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.acl = '.r:.example.com,.rlistings'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.remote_user = 'act:usr,act'
- req.referer = 'http://www.example.com/index.html'
- req.acl = '.r:.example.com,.rlistings'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa/c')
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 401)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.acl = '.r:*,.rlistings'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.acl = '.r:*' # No listings allowed
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 401)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.acl = '.r:.example.com,.rlistings'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 401)
- req = Request.blank('/v1/AUTH_cfa/c')
- req.referer = 'http://www.example.com/index.html'
- req.acl = '.r:.example.com,.rlistings'
- self.assertEquals(
- self.test_auth.authorize(req),
- None)
-
- def test_detect_reseller_request(self):
- req = self._make_request('/v1/AUTH_admin',
- headers={'X-Auth-Token': 'AUTH_t'})
- cache_key = 'AUTH_/auth/AUTH_t'
- cache_entry = (time() + 3600, '.reseller_admin')
- req.environ['swift.cache'].set(
- cache_key, cache_entry)
- self.assertTrue(req.environ.get('reseller_request'))
-
- def test_account_put_permissions(self):
- req = Request.blank(
- '/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'PUT'})
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- req = Request.blank(
- '/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'PUT'})
- req.remote_user = 'act:usr,act,AUTH_other'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- # Even PUTs to your own account as account admin
- # should fail
- req = Request.blank(
- '/v1/AUTH_old',
- environ={'REQUEST_METHOD': 'PUT'})
- req.remote_user = 'act:usr,act,AUTH_old'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- req = Request.blank(
- '/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'PUT'})
- req.remote_user = 'act:usr,act,.reseller_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp, None)
-
- # .super_admin is not something the middleware should ever see or care
- # about
- req = Request.blank(
- '/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'PUT'})
- req.remote_user = 'act:usr,act,.super_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_account_delete_permissions(self):
- req = Request.blank('/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'DELETE'})
- req.remote_user = 'act:usr,act'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- req = Request.blank('/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'DELETE'})
- req.remote_user = 'act:usr,act,AUTH_other'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- # Even DELETEs to your own account as account admin should
- # fail
- req = Request.blank('/v1/AUTH_old',
- environ={'REQUEST_METHOD': 'DELETE'})
- req.remote_user = 'act:usr,act,AUTH_old'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- req = Request.blank('/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'DELETE'})
- req.remote_user = 'act:usr,act,.reseller_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp, None)
-
- # .super_admin is not something the middleware should ever see or care
- # about
- req = Request.blank('/v1/AUTH_new',
- environ={'REQUEST_METHOD': 'DELETE'})
- req.remote_user = 'act:usr,act,.super_admin'
- resp = self.test_auth.authorize(req)
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_get_token_fail(self):
- resp = Request.blank(
- '/auth/v1.0').get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 401)
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_get_token_fail_invalid_key(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'invalid'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_token_fail_invalid_x_auth_user_format(
- self):
- resp = Request.blank(
- '/auth/v1/act/auth',
- headers={'X-Auth-User': 'usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_get_token_fail_non_matching_account_in_request(
- self):
- resp = Request.blank(
- '/auth/v1/act/auth',
- headers={'X-Auth-User': 'act2:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_get_token_fail_bad_path(self):
- resp = Request.blank(
- '/auth/v1/act/auth/invalid',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_get_token_fail_missing_key(self):
- resp = Request.blank(
- '/auth/v1/act/auth',
- headers={'X-Auth-User': 'act:usr'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_get_token_fail_get_user_details(self):
- self.test_auth.app = FakeApp(iter([
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_token_fail_get_account(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_token_fail_put_new_token(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_get_token_fail_post_to_user(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 4)
-
- def test_get_token_fail_get_services(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_fail_get_existing_token(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of token
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_token_success_v1_0(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assert_(resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_v1_0_with_user_token_life(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key',
- 'X-Auth-Token-Lifetime': 10}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- left = int(resp.headers['x-auth-token-expires'])
- self.assertTrue(left > 0, '%d > 0' % left)
- self.assertTrue(left <= 10, '%d <= 10' % left)
- self.assert_(resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_v1_0_with_user_token_life_past_max(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- req = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key',
- 'X-Auth-Token-Lifetime': MAX_TOKEN_LIFE * 10})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- left = int(resp.headers['x-auth-token-expires'])
- self.assertTrue(left > DEFAULT_TOKEN_LIFE,
- '%d > %d' % (left, DEFAULT_TOKEN_LIFE))
- self.assertTrue(left <= MAX_TOKEN_LIFE,
- '%d <= %d' % (left, MAX_TOKEN_LIFE))
- self.assert_(resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_v1_act_auth(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1/act/auth',
- headers={'X-Storage-User': 'usr',
- 'X-Storage-Pass': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assert_(resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_storage_instead_of_auth(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Storage-User': 'act:usr',
- 'X-Storage-Pass': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assert_(
- resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_v1_act_auth_auth_instead_of_storage(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1/act/auth',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assert_(resp.headers.get(
- 'x-auth-token',
- '').startswith('AUTH_tk'), resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_existing_token(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of token
- ('200 Ok', {}, json.dumps(
- {"account": "act", "user": "usr",
- "account_id": "AUTH_cfa",
- "groups": [{'name': "act:usr"},
- {'name': "key"}, {'name': ".admin"}],
- "expires": 9999999999.9999999})),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- resp.headers.get('x-auth-token'),
- 'AUTH_tktest')
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_get_token_success_existing_token_but_request_new_one(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # DELETE of expired token
- ('204 No Content', {}, ''),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key',
- 'X-Auth-New-Token': 'true'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertNotEquals(
- resp.headers.get('x-auth-token'), 'AUTH_tktest')
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 6)
-
- def test_get_token_success_existing_token_expired(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of token
- ('200 Ok', {}, json.dumps(
- {"account": "act", "user": "usr",
- "account_id": "AUTH_cfa",
- "groups": [{'name': "act:usr"},
- {'name': "key"}, {'name': ".admin"}],
- "expires": 0.0})),
- # DELETE of expired token
- ('204 No Content', {}, ''),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertNotEquals(
- resp.headers.get('x-auth-token'),
- 'AUTH_tktest')
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 7)
-
- def test_get_token_success_existing_token_expired_fail_deleting_old(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of token
- ('200 Ok', {}, json.dumps({"account": "act", "user": "usr",
- "account_id": "AUTH_cfa",
- "groups": [{'name': "act:usr"},
- {'name': "key"}, {'name': ".admin"}],
- "expires": 0.0})),
- # DELETE of expired token
- ('503 Service Unavailable', {}, ''),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': 'act:usr',
- 'X-Auth-Key': 'key'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertNotEquals(
- resp.headers.get('x-auth-token'),
- 'AUTH_tktest')
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 7)
-
- def test_prep_success(self):
- list_to_iter = [
- # PUT of .auth account
- ('201 Created', {}, ''),
- # PUT of .account_id container
- ('201 Created', {}, '')]
- # PUT of .token* containers
- for x in xrange(16):
- list_to_iter.append(('201 Created', {}, ''))
- self.test_auth.app = FakeApp(iter(list_to_iter))
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 18)
-
- def test_prep_bad_method(self):
- resp = Request.blank('/auth/v2/.prep',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'HEAD'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_prep_bad_creds(self):
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- 'super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'upertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User': '.super_admin'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- resp = Request.blank(
- '/auth/v2/.prep',
- environ={'REQUEST_METHOD': 'POST'}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
-
- def test_prep_fail_account_create(self):
- self.test_auth.app = FakeApp(iter([
- # PUT of .auth account
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_prep_fail_token_container_create(self):
- self.test_auth.app = FakeApp(iter([
- # PUT of .auth account
- ('201 Created', {}, ''),
- # PUT of .token container
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_prep_fail_account_id_container_create(self):
- self.test_auth.app = FakeApp(iter([
- # PUT of .auth account
- ('201 Created', {}, ''),
- # PUT of .token container
- ('201 Created', {}, ''),
- # PUT of .account_id container
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/.prep',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_get_reseller_success(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .auth account (list containers)
- ('200 Ok', {}, json.dumps([
- {"name": ".token", "count": 0, "bytes": 0},
- {"name": ".account_id",
- "count": 0, "bytes": 0},
- {"name": "act", "count": 0, "bytes": 0}])),
- # GET of .auth account (list containers
- # continuation)
- ('200 Ok', {}, '[]')]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(json.loads(resp.body),
- {"accounts": [{"name": "act"}]})
- self.assertEquals(self.test_auth.app.calls, 2)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"},
- {"name": ".reseller_admin"}], "auth": "plaintext:key"})),
- # GET of .auth account (list containers)
- ('200 Ok', {}, json.dumps([
- {"name": ".token", "count": 0, "bytes": 0},
- {"name": ".account_id",
- "count": 0, "bytes": 0},
- {"name": "act", "count": 0, "bytes": 0}])),
- # GET of .auth account (list containers
- # continuation)
- ('200 Ok', {}, '[]')]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(json.loads(resp.body),
- {"accounts": [{"name": "act"}]})
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_get_reseller_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller
- # admin)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_reseller_fail_listing(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .auth account (list containers)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of .auth account (list containers)
- ('200 Ok', {}, json.dumps([
- {"name": ".token", "count": 0, "bytes": 0},
- {"name": ".account_id",
- "count": 0, "bytes": 0},
- {"name": "act", "count": 0, "bytes": 0}])),
- # GET of .auth account (list containers
- # continuation)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_account_success(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {},
- json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"},
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of account container (list objects
- # continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- json.loads(resp.body),
- {'account_id': 'AUTH_cfa',
- 'services': {'storage':
- {'default': 'local',
- 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}},
- 'users': [{'name': 'tester'}, {'name': 'tester3'}]})
- self.assertEquals(self.test_auth.app.calls, 3)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of .services object
- ('200 Ok', {},
- json.dumps({"storage":
- {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"},
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of account container (list objects
- # continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- json.loads(resp.body),
- {'account_id': 'AUTH_cfa',
- 'services': {'storage':
- {'default': 'local',
- 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}},
- 'users': [{'name': 'tester'}, {'name': 'tester3'}]})
- self.assertEquals(self.test_auth.app.calls, 4)
-
- def test_get_account_fail_bad_account_name(self):
- resp = Request.blank('/auth/v2/.token',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
- resp = Request.blank('/auth/v2/.anything',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_get_account_fail_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but wrong
- # account)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- 'act2:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_account_fail_get_services(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_account_fail_listing(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # GET of account container (list objects)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # GET of account container (list objects)
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"},
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of account container (list objects
- # continuation)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_set_services_new_service(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {},
- json.dumps({"storage":
- {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # PUT of new .services object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'new_service':
- {'new_endpoint': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- json.loads(resp.body),
- {'storage': {'default': 'local',
- 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'},
- 'new_service': {'new_endpoint': 'new_value'}})
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_set_services_new_endpoint(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage":
- {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # PUT of new .services object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'storage': {'new_endpoint': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(
- json.loads(resp.body),
- {'storage': {'default': 'local',
- 'local':
- 'http://127.0.0.1:8080/v1/AUTH_cfa',
- 'new_endpoint': 'new_value'}})
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_set_services_update_endpoint(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # PUT of new .services object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'storage': {'local': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(json.loads(resp.body),
- {'storage': {'default': 'local',
- 'local': 'new_value'}})
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_set_services_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'storage': {'local': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller
- # admin)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'},
- body=json.dumps(
- {'storage': {'local': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'},
- body=json.dumps(
- {'storage': {'local': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_set_services_fail_bad_account_name(self):
- resp = Request.blank('/auth/v2/.act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'storage': {'local': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_set_services_fail_bad_json(self):
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body='garbage'
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=''
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_set_services_fail_get_services(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('503 Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps({
- 'new_service': {'new_endpoint': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps({
- 'new_service': {'new_endpoint': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_set_services_fail_put_services(self):
- self.test_auth.app = FakeApp(iter([
- # GET of .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # PUT of new .services object
- ('503 Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/.services',
- environ={
- 'REQUEST_METHOD': 'POST'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},
- body=json.dumps(
- {'new_service':
- {'new_endpoint': 'new_value'}})
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_put_account_success(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, ''),
- # PUT of .account_id mapping object
- ('204 No Content', {}, ''),
- # PUT of .services object
- ('204 No Content', {}, ''),
- # POST to account container updating
- # X-Container-Meta-Account-Id
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 5)
- self.assertEquals(conn.calls, 1)
-
- def test_put_account_success_preexist_but_not_completed(
- self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for pre-existence
- # We're going to show it as existing this time, but with no
- # X-Container-Meta-Account-Id, indicating a failed
- # previous attempt
- ('200 Ok', {}, ''),
- # PUT of .account_id mapping object
- ('204 No Content', {}, ''),
- # PUT of .services object
- ('204 No Content', {}, ''),
- # POST to account container updating
- # X-Container-Meta-Account-Id
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 4)
- self.assertEquals(conn.calls, 1)
-
- def test_put_account_success_preexist_and_completed(
- self):
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for pre-existence
- # We're going to show it as existing this time, and with an
- # X-Container-Meta-Account-Id, indicating it already
- # exists
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 202)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_account_success_with_given_suffix(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, ''),
- # PUT of .account_id mapping object
- ('204 No Content', {}, ''),
- # PUT of .services object
- ('204 No Content', {}, ''),
- # POST to account container updating
- # X-Container-Meta-Account-Id
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest',
- 'X-Account-Suffix': 'test-suffix'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(
- conn.request_path,
- '/v1/AUTH_test-suffix')
- self.assertEquals(self.test_auth.app.calls, 5)
- self.assertEquals(conn.calls, 1)
-
- def test_put_account_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'},).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller
- # admin)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': 'act:adm',
- 'X-Auth-Admin-Key': 'key'},).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': 'act:usr',
- 'X-Auth-Admin-Key': 'key'},).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_account_fail_invalid_account_name(self):
- resp = Request.blank(
- '/auth/v2/.act',
- environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'},).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_put_account_fail_on_initial_account_head(self):
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_account_fail_on_account_marker_put(self):
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_put_account_fail_on_storage_account_put(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('503 Service Unavailable', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(conn.calls, 1)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_put_account_fail_on_account_id_mapping(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, ''),
- # PUT of .account_id mapping object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(conn.calls, 1)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_put_account_fail_on_services_object(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, ''),
- # PUT of .account_id mapping object
- ('204 No Content', {}, ''),
- # PUT of .services object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT',
- 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(conn.calls, 1)
- self.assertEquals(self.test_auth.app.calls, 4)
-
- def test_put_account_fail_on_post_mapping(self):
- conn = FakeConn(iter([
- # PUT of storage account itself
- ('201 Created', {}, '')]))
- self.test_auth.get_conn = lambda: conn
- self.test_auth.app = FakeApp(iter([
- # Initial HEAD of account container to check for
- # pre-existence
- ('404 Not Found', {}, ''),
- # PUT of account container
- ('204 No Content', {}, ''),
- # PUT of .account_id mapping object
- ('204 No Content', {}, ''),
- # PUT of .services object
- ('204 No Content', {}, ''),
- # POST to account container updating
- # X-Container-Meta-Account-Id
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank(
- '/auth/v2/act',
- environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()},
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(conn.calls, 1)
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_delete_account_success(self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('204 No Content', {}, ''),
- # DELETE the account container
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 6)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_success_missing_services(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('404 Not Found', {}, ''),
- # DELETE the .account_id mapping object
- ('204 No Content', {}, ''),
- # DELETE the account container
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_delete_account_success_missing_storage_account(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('404 Not Found', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('204 No Content', {}, ''),
- # DELETE the account container
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 6)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_success_missing_account_id_mapping(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('404 Not Found', {}, ''),
- # DELETE the account container
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 6)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_success_missing_account_container_at_end(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('204 No Content', {}, ''),
- # DELETE the account container
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 6)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'},
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller
- # admin)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'},
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'},
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_account_fail_invalid_account_name(self):
- resp = Request.blank('/auth/v2/.act',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_delete_account_fail_not_found(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_account_fail_not_found_concurrency(
- self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_delete_account_fail_list_account(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_account_fail_list_account_concurrency(
- self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_delete_account_fail_has_users(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"}]))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 409)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_account_fail_has_users2(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"}]))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 409)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_delete_account_fail_get_services(self):
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_delete_account_fail_delete_storage_account(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('409 Conflict', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 409)
- self.assertEquals(self.test_auth.app.calls, 3)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_fail_delete_storage_account2(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, ''),
- # DELETE of storage account itself
- ('409 Conflict', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa",
- "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
- self.assertEquals(conn.calls, 2)
-
- def test_delete_account_fail_delete_storage_account3(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('503 Service Unavailable', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_fail_delete_storage_account4(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, ''),
- # DELETE of storage account itself
- ('503 Service Unavailable', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa",
- "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
- self.assertEquals(conn.calls, 2)
-
- def test_delete_account_fail_delete_services(self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 4)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_fail_delete_account_id_mapping(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 5)
- self.assertEquals(conn.calls, 1)
-
- def test_delete_account_fail_delete_account_container(
- self):
- conn = FakeConn(iter([
- # DELETE of storage account itself
- ('204 No Content', {}, '')]))
- self.test_auth.get_conn = lambda x: conn
- self.test_auth.app = FakeApp(iter([
- # Account's container listing, checking for
- # users
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"}])),
- # Account's container listing, checking for users
- # (continuation)
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'),
- # GET the .services object
- ('200 Ok', {}, json.dumps(
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})),
- # DELETE the .services object
- ('204 No Content', {}, ''),
- # DELETE the .account_id mapping object
- ('204 No Content', {}, ''),
- # DELETE the account container
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act',
- environ={
- 'REQUEST_METHOD': 'DELETE',
- 'swift.cache': FakeMemcache()},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 6)
- self.assertEquals(conn.calls, 1)
-
- def test_get_user_success(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(resp.body, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"}))
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_fail_no_super_admin_key(self):
- local_auth = auth.filter_factory({})(FakeApp(iter([
- # GET of user object (but we should never get
- # here)
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"}))])))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(local_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(local_auth.app.calls, 0)
-
- def test_get_user_groups_success(self):
- self.test_auth.app = FakeApp(iter([
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"},
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:tester"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:tester3"}, {"name": "act"}],
- "auth": "plaintext:key3"})),
- # GET of account container (list objects
- # continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
- resp = Request.blank('/auth/v2/act/.groups',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(resp.body, json.dumps(
- {"groups": [{"name": ".admin"}, {"name": "act"},
- {"name": "act:tester"}, {"name": "act:tester3"}]}))
- self.assertEquals(self.test_auth.app.calls, 4)
-
- def test_get_user_groups_success2(self):
- self.test_auth.app = FakeApp(iter([
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"}])),
- # GET of user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:tester"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of account container (list objects
- # continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:tester3"}, {"name": "act"}],
- "auth": "plaintext:key3"})),
- # GET of account container (list objects
- # continuation)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')]))
- resp = Request.blank('/auth/v2/act/.groups',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(resp.body, json.dumps(
- {"groups": [{"name": ".admin"}, {"name": "act"},
- {"name": "act:tester"}, {"name": "act:tester3"}]}))
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_user_fail_invalid_account(self):
- resp = Request.blank('/auth/v2/.invalid/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_get_user_fail_invalid_user(self):
- resp = Request.blank('/auth/v2/act/.invalid',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_get_user_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'super:admin',
- 'X-Auth-Admin-Key': 'supertest'},
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'},
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_account_admin_success(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller
- # admin)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of requested user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(resp.body, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"}],
- "auth": "plaintext:key"}))
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_user_account_admin_fail_getting_account_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin check)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of requested user object [who is an .admin
- # as well]
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of user object (reseller admin check [and fail
- # here])
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_get_user_account_admin_fail_getting_reseller_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin check)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"})),
- # GET of requested user object [who is a
- # .reseller_admin]
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".reseller_admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_user_reseller_admin_fail_getting_reseller_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin check)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".reseller_admin"}],
- "auth": "plaintext:key"})),
- # GET of requested user object [who also is a
- # .reseller_admin]
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".reseller_admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_user_super_admin_succeed_getting_reseller_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- # GET of requested user object
- ('200 Ok', {}, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".reseller_admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assertEquals(resp.body, json.dumps(
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".reseller_admin"}],
- "auth": "plaintext:key"}))
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_groups_not_found(self):
- self.test_auth.app = FakeApp(iter([
- # GET of account container (list objects)
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/.groups',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_groups_fail_listing(self):
- self.test_auth.app = FakeApp(iter([
- # GET of account container (list objects)
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/.groups',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_groups_fail_get_user(self):
- self.test_auth.app = FakeApp(iter([
- # GET of account container (list objects)
- ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'},
- json.dumps([
- {"name": ".services", "hash": "etag", "bytes": 112,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.618110"},
- {"name": "tester", "hash": "etag", "bytes": 104,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:27.736680"},
- {"name": "tester3", "hash": "etag", "bytes": 86,
- "content_type":
- "application/octet-stream",
- "last_modified": "2010-12-03T17:16:28.135530"}])),
- # GET of user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/.groups',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_get_user_not_found(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_user_fail(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_user_fail_invalid_account(self):
- resp = Request.blank('/auth/v2/.invalid/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_put_user_fail_invalid_user(self):
- resp = Request.blank('/auth/v2/act/.usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_put_user_fail_no_user_key(self):
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_put_user_reseller_admin_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (reseller admin)
- # This shouldn't actually get called, checked
- # below
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:rdm"},
- {"name": "test"}, {"name": ".admin"},
- {"name": ".reseller_admin"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act:rdm',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key':
- 'key',
- 'X-Auth-User-Reseller-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 0)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but not reseller admin)
- # This shouldn't actually get called, checked
- # below
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key':
- 'key',
- 'X-Auth-User-Reseller-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 0)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- # This shouldn't actually get called, checked
- # below
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key':
- 'key',
- 'X-Auth-User-Reseller-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 0)
-
- def test_put_user_account_admin_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but wrong
- # account)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act2:adm',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key':
- 'key',
- 'X-Auth-User-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key':
- 'key',
- 'X-Auth-User-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_user_regular_fail_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but wrong
- # account)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act2:adm',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key':
- 'key',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_put_user_regular_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of user object
- ('201 Created', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 2)
- self.assertEquals(
- json.loads(self.test_auth.app.request.body),
- {"groups": [{"name": "act:usr"}, {"name": "act"}],
- "auth": "plaintext:key"})
-
- def test_put_user_special_chars_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of user object
- ('201 Created', {}, '')]))
- resp = Request.blank('/auth/v2/act/u_s-r',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 2)
- self.assertEquals(
- json.loads(self.test_auth.app.request.body),
- {"groups": [{"name": "act:u_s-r"}, {"name": "act"}],
- "auth": "plaintext:key"})
-
- def test_put_user_account_admin_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of user object
- ('201 Created', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key',
- 'X-Auth-User-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 2)
- self.assertEquals(
- json.loads(self.test_auth.app.request.body),
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}],
- "auth": "plaintext:key"})
-
- def test_put_user_reseller_admin_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of user object
- ('201 Created', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key',
- 'X-Auth-User-Reseller-Admin': 'true'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 201)
- self.assertEquals(self.test_auth.app.calls, 2)
- self.assertEquals(
- json.loads(self.test_auth.app.request.body),
- {"groups": [{"name": "act:usr"}, {"name": "act"},
- {"name": ".admin"}, {"name": ".reseller_admin"}],
- "auth": "plaintext:key"})
-
- def test_put_user_fail_not_found(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_put_user_fail(self):
- self.test_auth.app = FakeApp(iter([
- # PUT of user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'PUT'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key':
- 'supertest',
- 'X-Auth-User-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_user_bad_creds(self):
- self.test_auth.app = FakeApp(iter([
- # GET of user object (account admin, but wrong
- # account)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"},
- {"name": "test"}, {"name": ".admin"}],
- "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- 'act2:adm',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- self.test_auth.app = FakeApp(iter([
- # GET of user object (regular user)
- ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"},
- {"name": "test"}], "auth": "plaintext:key"}))]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 403)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_user_invalid_account(self):
- resp = Request.blank('/auth/v2/.invalid/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_delete_user_invalid_user(self):
- resp = Request.blank('/auth/v2/act/.invalid',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_delete_user_not_found(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_user_fail_head_user(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_delete_user_fail_delete_token(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok',
- {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
- # DELETE of token
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_delete_user_fail_delete_user(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok',
- {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
- # DELETE of token
- ('204 No Content', {}, ''),
- # DELETE of user object
- ('503 Service Unavailable', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 500)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_delete_user_success(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok',
- {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
- # DELETE of token
- ('204 No Content', {}, ''),
- # DELETE of user object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_delete_user_success_missing_user_at_end(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok',
- {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
- # DELETE of token
- ('204 No Content', {}, ''),
- # DELETE of user object
- ('404 Not Found', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_delete_user_success_missing_token(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok',
- {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''),
- # DELETE of token
- ('404 Not Found', {}, ''),
- # DELETE of user object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 3)
-
- def test_delete_user_success_no_token(self):
- self.test_auth.app = FakeApp(iter([
- # HEAD of user object
- ('200 Ok', {}, ''),
- # DELETE of user object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/act/usr',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_validate_token_bad_prefix(self):
- resp = Request.blank('/auth/v2/.token/BAD_token'
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_validate_token_tmi(self):
- resp = Request.blank(
- '/auth/v2/.token/AUTH_token/tmi').get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
-
- def test_validate_token_bad_memcache(self):
- fake_memcache = FakeMemcache()
- fake_memcache.set('AUTH_/auth/AUTH_token', 'bogus')
- resp = Request.blank(
- '/auth/v2/.token/AUTH_token',
- environ={'swift.cache': fake_memcache}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 500)
-
- def test_validate_token_from_memcache(self):
- fake_memcache = FakeMemcache()
- fake_memcache.set(
- 'AUTH_/auth/AUTH_token',
- (time() + 1,
- 'act:usr,act'))
- resp = Request.blank(
- '/auth/v2/.token/AUTH_token',
- environ={'swift.cache': fake_memcache}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(
- resp.headers.get('x-auth-groups'),
- 'act:usr,act')
- self.assert_(float(resp.headers['x-auth-ttl']) < 1,
- resp.headers['x-auth-ttl'])
-
- def test_validate_token_from_memcache_expired(self):
- fake_memcache = FakeMemcache()
- fake_memcache.set(
- 'AUTH_/auth/AUTH_token',
- (time() - 1,
- 'act:usr,act'))
- resp = Request.blank(
- '/auth/v2/.token/AUTH_token',
- environ={'swift.cache': fake_memcache}).get_response(
- self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assert_('x-auth-groups' not in resp.headers)
- self.assert_('x-auth-ttl' not in resp.headers)
-
- def test_validate_token_from_object(self):
- self.test_auth.app = FakeApp(iter([
- # GET of token object
- ('200 Ok', {}, json.dumps({'groups': [{'name': 'act:usr'},
- {'name': 'act'}], 'expires': time() + 1}))]))
- resp = Request.blank('/auth/v2/.token/AUTH_token'
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 1)
- self.assertEquals(
- resp.headers.get('x-auth-groups'),
- 'act:usr,act')
- self.assert_(float(resp.headers['x-auth-ttl']) < 1,
- resp.headers['x-auth-ttl'])
-
- def test_validate_token_from_object_expired(self):
- self.test_auth.app = FakeApp(iter([
- # GET of token object
- ('200 Ok', {}, json.dumps({'groups': 'act:usr,act',
- 'expires': time() - 1})),
- # DELETE of expired token object
- ('204 No Content', {}, '')]))
- resp = Request.blank('/auth/v2/.token/AUTH_token'
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertEquals(self.test_auth.app.calls, 2)
-
- def test_validate_token_from_object_with_admin(self):
- self.test_auth.app = FakeApp(iter([
- # GET of token object
- ('200 Ok', {}, json.dumps({'account_id': 'AUTH_cfa', 'groups':
- [{'name': 'act:usr'},
- {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 1}))]))
- resp = Request.blank('/auth/v2/.token/AUTH_token'
- ).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(self.test_auth.app.calls, 1)
- self.assertEquals(resp.headers.get('x-auth-groups'),
- 'act:usr,act,AUTH_cfa')
- self.assert_(float(resp.headers['x-auth-ttl']) < 1,
- resp.headers['x-auth-ttl'])
-
- def test_get_conn_default(self):
- conn = self.test_auth.get_conn()
- self.assertEquals(
- conn.__class__,
- auth.HTTPConnection)
- self.assertEquals(conn.host, '127.0.0.1')
- self.assertEquals(conn.port, 8080)
-
- def test_get_conn_default_https(self):
- local_auth = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp())
- conn = local_auth.get_conn()
- self.assertEquals(
- conn.__class__,
- auth.HTTPSConnection)
- self.assertEquals(conn.host, '1.2.3.4')
- self.assertEquals(conn.port, 443)
-
- def test_get_conn_overridden(self):
- local_auth = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp())
- conn = \
- local_auth.get_conn(
- urlparsed=auth.urlparse('http://5.6.7.8/v1'))
- self.assertEquals(
- conn.__class__,
- auth.HTTPConnection)
- self.assertEquals(conn.host, '5.6.7.8')
- self.assertEquals(conn.port, 80)
-
- def test_get_conn_overridden_https(self):
- local_auth = auth.filter_factory(
- {'super_admin_key': 'supertest',
- 'default_swift_cluster': 'local#http://1.2.3.4/v1'})(FakeApp())
- conn = \
- local_auth.get_conn(
- urlparsed=auth.urlparse(
- 'https://5.6.7.8/v1'))
- self.assertEquals(
- conn.__class__,
- auth.HTTPSConnection)
- self.assertEquals(conn.host, '5.6.7.8')
- self.assertEquals(conn.port, 443)
-
- def test_get_itoken_fail_no_memcache(self):
- exc = None
- try:
- self.test_auth.get_itoken({})
- except Exception as err:
- exc = err
- self.assertEquals(str(exc),
- 'No memcache set up; required for Swauth middleware')
-
- def test_get_itoken_success(self):
- fmc = FakeMemcache()
- itk = self.test_auth.get_itoken(
- {'swift.cache': fmc})
- self.assert_(itk.startswith('AUTH_itk'), itk)
- expires, groups = fmc.get('AUTH_/auth/%s' % itk)
- self.assert_(expires > time(), expires)
- self.assertEquals(
- groups,
- '.auth,.reseller_admin,AUTH_.auth')
-
- def test_get_admin_detail_fail_no_colon(self):
- self.test_auth.app = FakeApp(iter([]))
- self.assertEquals(
- self.test_auth.get_admin_detail(
- Request.blank('/')),
- None)
- self.assertEquals(
- self.test_auth.get_admin_detail(
- Request.blank('/',
- headers={'X-Auth-Admin-User': 'usr'})), None)
- self.assertRaises(
- StopIteration, self.test_auth.get_admin_detail,
- Request.blank('/', headers={'X-Auth-Admin-User': 'act:usr'}))
-
- def test_get_admin_detail_fail_user_not_found(self):
- self.test_auth.app = FakeApp(
- iter([('404 Not Found', {}, '')]))
- self.assertEquals(
- self.test_auth.get_admin_detail(
- Request.blank('/',
- headers={'X-Auth-Admin-User': 'act:usr'})), None)
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_admin_detail_fail_get_user_error(self):
- self.test_auth.app = FakeApp(iter([
- ('503 Service Unavailable', {}, '')]))
- exc = None
- try:
- self.test_auth.get_admin_detail(
- Request.blank('/',
- headers={'X-Auth-Admin-User': 'act:usr'}))
- except Exception as err:
- exc = err
- self.assertEquals(str(exc), 'Could not get admin user object: '
- '/v1/AUTH_.auth/act/usr 503 Service Unavailable')
- self.assertEquals(self.test_auth.app.calls, 1)
-
- def test_get_admin_detail_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:key",
- "groups": [{'name': "act:usr"}, {'name': "act"},
- {'name': ".admin"}]}))]))
- detail = self.test_auth.get_admin_detail(
- Request.blank('/',
- headers={'X-Auth-Admin-User': 'act:usr'}))
- self.assertEquals(self.test_auth.app.calls, 1)
- self.assertEquals(
- detail, {'account': 'act',
- 'auth': 'plaintext:key',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}]})
-
- def test_credentials_match_success(self):
- self.assert_(self.test_auth.credentials_match(
- {'auth': 'plaintext:key'}, 'key'))
-
- def test_credentials_match_fail_no_details(self):
- self.assert_(
- not self.test_auth.credentials_match(None, 'notkey'))
-
- def test_credentials_match_fail_plaintext(self):
- self.assert_(not self.test_auth.credentials_match(
- {'auth': 'plaintext:key'}, 'notkey'))
-
- def test_is_super_admin_success(self):
- self.assert_(
- self.test_auth.is_super_admin(
- Request.blank(
- '/',
- headers={'X-Auth-Admin-User': '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'})))
-
- def test_is_super_admin_fail_bad_key(self):
- self.assert_(
- not self.test_auth.is_super_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'bad'})))
- self.assert_(
- not self.test_auth.is_super_admin(
- Request.blank('/',
- headers={'X-Auth-Admin-User': '.super_admin'})))
- self.assert_(
- not self.test_auth.is_super_admin(Request.blank('/')))
-
- def test_is_super_admin_fail_bad_user(self):
- self.assert_(
- not self.test_auth.is_super_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'bad',
- 'X-Auth-Admin-Key': 'supertest'})))
- self.assert_(
- not self.test_auth.is_super_admin(
- Request.blank('/',
- headers={'X-Auth-Admin-Key': 'supertest'})))
- self.assert_(
- not self.test_auth.is_super_admin(Request.blank('/')))
-
- def test_is_reseller_admin_success_is_super_admin(self):
- self.assert_(
- self.test_auth.is_reseller_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'})))
-
- def test_is_reseller_admin_success_called_get_admin_detail(
- self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
- {'name': '.admin'},
- {'name': '.reseller_admin'}]}))]))
- self.assert_(
- self.test_auth.is_reseller_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:rdm',
- 'X-Auth-Admin-Key': 'key'})))
-
- def test_is_reseller_admin_fail_only_account_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:adm'}, {'name': 'act'},
- {'name': '.admin'}]}))]))
- self.assert_(
- not self.test_auth.is_reseller_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'})))
-
- def test_is_reseller_admin_fail_regular_user(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))]))
- self.assert_(
- not self.test_auth.is_reseller_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'})))
-
- def test_is_reseller_admin_fail_bad_key(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
- {'name': '.admin'},
- {'name': '.reseller_admin'}]}))]))
- self.assert_(
- not self.test_auth.is_reseller_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:rdm',
- 'X-Auth-Admin-Key': 'bad'})))
-
- def test_is_account_admin_success_is_super_admin(self):
- self.assert_(
- self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- '.super_admin',
- 'X-Auth-Admin-Key': 'supertest'}), 'act'))
-
- def test_is_account_admin_success_is_reseller_admin(
- self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
- {'name': '.admin'},
- {'name': '.reseller_admin'}]}))]))
- self.assert_(
- self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:rdm',
- 'X-Auth-Admin-Key': 'key'}), 'act'))
-
- def test_is_account_admin_success(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:adm'}, {'name': 'act'},
- {'name': '.admin'}]}))]))
- self.assert_(
- self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:adm',
- 'X-Auth-Admin-Key': 'key'}), 'act'))
-
- def test_is_account_admin_fail_account_admin_different_account(
- self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act2:adm'}, {'name': 'act2'},
- {'name': '.admin'}]}))]))
- self.assert_(
- not self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act2:adm',
- 'X-Auth-Admin-Key': 'key'}), 'act'))
-
- def test_is_account_admin_fail_regular_user(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))]))
- self.assert_(
- not self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:usr',
- 'X-Auth-Admin-Key': 'key'}), 'act'))
-
- def test_is_account_admin_fail_bad_key(self):
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps({'auth': 'plaintext:key',
- 'groups': [{'name': 'act:rdm'}, {'name': 'act'},
- {'name': '.admin'},
- {'name': '.reseller_admin'}]}))]))
- self.assert_(
- not self.test_auth.is_account_admin(
- Request.blank('/',
- headers={
- 'X-Auth-Admin-User':
- 'act:rdm',
- 'X-Auth-Admin-Key': 'bad'}), 'act'))
-
- def test_reseller_admin_but_account_is_internal_use_only(
- self):
- req = Request.blank('/v1/AUTH_.auth',
- environ={'REQUEST_METHOD': 'GET'})
- req.remote_user = 'act:usr,act,.reseller_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def test_reseller_admin_but_account_is_exactly_reseller_prefix(
- self):
- req = Request.blank(
- '/v1/AUTH_',
- environ={'REQUEST_METHOD': 'GET'})
- req.remote_user = 'act:usr,act,.reseller_admin'
- resp = self.test_auth.authorize(req)
- self.assertEquals(resp.status_int, 403)
-
- def _get_token_success_v1_0_encoded(
- self, saved_user, saved_key, sent_user,
- sent_key):
- self.test_auth.app = FakeApp(iter([
- # GET of user object
- ('200 Ok', {},
- json.dumps({"auth": "plaintext:%s" % saved_key,
- "groups": [{'name': saved_user}, {'name': "act"},
- {'name': ".admin"}]})),
- # GET of account
- ('204 Ok',
- {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
- # PUT of new token
- ('201 Created', {}, ''),
- # POST of token to user object
- ('204 No Content', {}, ''),
- # GET of services object
- ('200 Ok', {}, json.dumps({"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))]))
- resp = Request.blank(
- '/auth/v1.0',
- headers={'X-Auth-User': sent_user,
- 'X-Auth-Key': sent_key}).get_response(self.test_auth)
- self.assertEquals(resp.status_int, 200)
- self.assert_(
- resp.headers.get('x-auth-token',
- '').startswith('AUTH_tk'),
- resp.headers.get('x-auth-token'))
- self.assertEquals(resp.headers.get('x-auth-token'),
- resp.headers.get('x-storage-token'))
- self.assertEquals(resp.headers.get('x-storage-url'),
- 'http://127.0.0.1:8080/v1/AUTH_cfa')
- self.assertEquals(
- json.loads(resp.body),
- {"storage": {"default": "local",
- "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})
- self.assertEquals(self.test_auth.app.calls, 5)
-
- def test_get_token_success_v1_0_encoded1(self):
- self._get_token_success_v1_0_encoded(
- 'act:usr', 'key', 'act%3ausr', 'key')
-
- def test_get_token_success_v1_0_encoded2(self):
- self._get_token_success_v1_0_encoded(
- 'act:u s r', 'key', 'act%3au%20s%20r', 'key')
-
- def test_get_token_success_v1_0_encoded3(self):
- self._get_token_success_v1_0_encoded(
- 'act:u s r', 'k:e:y', 'act%3au%20s%20r', 'k%3Ae%3ay')
-
- def test_allowed_sync_hosts(self):
- a = auth.filter_factory(
- {'super_admin_key': 'supertest'})(FakeApp())
- self.assertEquals(
- a.allowed_sync_hosts,
- ['127.0.0.1'])
- a = auth.filter_factory({
- 'super_admin_key': 'supertest',
- 'allowed_sync_hosts':
- '1.1.1.1,2.1.1.1, 3.1.1.1 , 4.1.1.1,, , 5.1.1.1'})(FakeApp())
- self.assertEquals(
- a.allowed_sync_hosts,
- ['1.1.1.1', '2.1.1.1', '3.1.1.1', '4.1.1.1', '5.1.1.1'])
-
- def test_reseller_admin_is_owner(self):
- orig_authorize = self.test_auth.authorize
- owner_values = []
-
- def mitm_authorize(req):
- rv = orig_authorize(req)
- owner_values.append(
- req.environ.get('swift_owner', False))
- return rv
-
- self.test_auth.authorize = mitm_authorize
-
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'other', 'user': 'other:usr',
- 'account_id': 'AUTH_other',
- 'groups': [{'name': 'other:usr'}, {'name': 'other'},
- {'name': '.reseller_admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- req = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(owner_values, [True])
-
- def test_admin_is_owner(self):
- orig_authorize = self.test_auth.authorize
- owner_values = []
-
- def mitm_authorize(req):
- rv = orig_authorize(req)
- owner_values.append(
- req.environ.get('swift_owner', False))
- return rv
-
- self.test_auth.authorize = mitm_authorize
-
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups': [{'name': 'act:usr'}, {'name': 'act'},
- {'name': '.admin'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]))
- req = Request.blank(
- '/v1/AUTH_cfa',
- headers={'X-Auth-Token': 'AUTH_t'})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(owner_values, [True])
-
- def test_regular_is_not_owner(self):
- orig_authorize = self.test_auth.authorize
- owner_values = []
-
- def mitm_authorize(req):
- rv = orig_authorize(req)
- owner_values.append(
- req.environ.get('swift_owner', False))
- return rv
-
- self.test_auth.authorize = mitm_authorize
-
- self.test_auth.app = FakeApp(iter([
- ('200 Ok', {},
- json.dumps(
- {'account': 'act', 'user': 'act:usr',
- 'account_id': 'AUTH_cfa',
- 'groups':
- [{'name': 'act:usr'}, {
- 'name': 'act'}],
- 'expires': time() + 60})),
- ('204 No Content', {}, '')]), acl='act:usr')
- req = Request.blank('/v1/AUTH_cfa/c',
- headers={'X-Auth-Token': 'AUTH_t'})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
- self.assertEquals(owner_values, [False])
-
- def test_sync_request_success(self):
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp': '123.456'})
- req.remote_addr = '127.0.0.1'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
-
- def test_sync_request_fail_key(self):
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'wrongsecret',
- 'x-timestamp': '123.456'})
- req.remote_addr = '127.0.0.1'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='othersecret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp': '123.456'})
- req.remote_addr = '127.0.0.1'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key=None)
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp': '123.456'})
- req.remote_addr = '127.0.0.1'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_sync_request_fail_no_timestamp(self):
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={'x-container-sync-key': 'secret'})
- req.remote_addr = '127.0.0.1'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_sync_request_fail_sync_host(self):
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp': '123.456'})
- req.remote_addr = '127.0.0.2'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
-
- def test_sync_request_success_lb_sync_host(self):
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp':
- '123.456',
- 'x-forwarded-for': '127.0.0.1'})
- req.remote_addr = '127.0.0.2'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
-
- self.test_auth.app = FakeApp(
- iter([('204 No Content', {}, '')]),
- sync_key='secret')
- req = Request.blank('/v1/AUTH_cfa/c/o',
- environ={
- 'REQUEST_METHOD': 'DELETE'},
- headers={
- 'x-container-sync-key':
- 'secret',
- 'x-timestamp':
- '123.456',
- 'x-cluster-client-ip': '127.0.0.1'})
- req.remote_addr = '127.0.0.2'
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 204)
-
- def _make_request(self, path, **kwargs):
- req = Request.blank(path, **kwargs)
- req.environ['swift.cache'] = FakeMemcache()
- return req
-
- def test_override_asked_for_but_not_allowed(self):
- self.test_auth = \
- auth.filter_factory(
- {'allow_overrides': 'false'})(FakeApp())
- req = self._make_request('/v1/AUTH_account',
- environ={'swift.authorize_override': True})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(resp.environ['swift.authorize'],
- self.test_auth.authorize)
-
- def test_override_asked_for_and_allowed(self):
- self.test_auth = \
- auth.filter_factory(
- {'allow_overrides': 'true'})(FakeApp())
- req = self._make_request('/v1/AUTH_account',
- environ={'swift.authorize_override': True})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertTrue(
- 'swift.authorize' not in resp.environ)
-
- def test_override_default_allowed(self):
- req = self._make_request('/v1/AUTH_account',
- environ={'swift.authorize_override': True})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 404)
- self.assertTrue(
- 'swift.authorize' not in resp.environ)
-
- def test_token_too_long(self):
- req = self._make_request('/v1/AUTH_account', headers={
- 'x-auth-token': 'a' * MAX_TOKEN_LENGTH})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertNotEquals(
- resp.body,
- 'Token exceeds maximum length.')
- req = self._make_request('/v1/AUTH_account', headers={
- 'x-auth-token': 'a' * (MAX_TOKEN_LENGTH + 1)})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 400)
- self.assertEquals(
- resp.body,
- 'Token exceeds maximum length.')
-
- def test_crazy_authorization(self):
- req = self._make_request('/v1/AUTH_account', headers={
- 'authorization': 'somebody elses header value'})
- resp = req.get_response(self.test_auth)
- self.assertEquals(resp.status_int, 401)
- self.assertEquals(resp.environ['swift.authorize'],
- self.test_auth.denied_response)
-
-
-if __name__ == '__main__':
- unittest.main()