diff options
Diffstat (limited to 'gluster/swift/common/middleware')
10 files changed, 17 insertions, 5324 deletions
diff --git a/gluster/swift/common/middleware/gswauth/swauth/middleware.py b/gluster/swift/common/middleware/gswauth/swauth/middleware.py index 314eedb..cdcc638 100644 --- a/gluster/swift/common/middleware/gswauth/swauth/middleware.py +++ b/gluster/swift/common/middleware/gswauth/swauth/middleware.py @@ -241,7 +241,7 @@ class Swauth(object): version, rest = split_path(env.get('PATH_INFO', ''), 1, 2, True) except ValueError: - version, rest = None, None + rest = None if rest and rest.startswith(self.reseller_prefix): # Handle anonymous access to accounts I'm the definitive # auth for. @@ -693,7 +693,7 @@ class Swauth(object): return HTTPBadRequest(request=req) try: new_services = json.loads(req.body) - except ValueError, err: + except ValueError as err: return HTTPBadRequest(body=str(err)) # Get the current services information path = quote('/v1/%s/%s/.services' % (self.auth_account, account)) @@ -1405,7 +1405,7 @@ class Swauth(object): memcache_key, (self.itoken_expires, '%s,.reseller_admin,%s' % (self.metadata_volume, - self.auth_account)), + self.auth_account)), timeout=self.token_life) return self.itoken @@ -1589,19 +1589,20 @@ class Swauth(object): if getattr(req, 'client_disconnect', False) or \ getattr(response, 'client_disconnect', False): status_int = 499 - self.logger.info( - ' '.join(quote(str(x)) for x in (client or '-', - req.remote_addr or '-', strftime('%d/%b/%Y/%H/%M/%S', gmtime()), - req.method, the_request, req.environ['SERVER_PROTOCOL'], - status_int, req.referer or '-', req.user_agent or '-', - req.headers.get( - 'x-auth-token', - req.headers.get('x-auth-admin-user', '-')), - getattr(req, 'bytes_transferred', 0) or '-', - getattr(response, 'bytes_transferred', 0) or '-', - req.headers.get('etag', '-'), - req.headers.get('x-trans-id', '-'), logged_headers or '-', - trans_time))) + self.logger.info(' '.join(quote(str(x)) for x in + (client or '-', + req.remote_addr or '-', strftime('%d/%b/%Y/%H/%M/%S', + gmtime()), + req.method, the_request, + req.environ['SERVER_PROTOCOL'], status_int, + req.referer or '-', req.user_agent or '-', + req.headers.get('x-auth-token', req.headers.get( + 'x-auth-admin-user', '-')), + getattr(req, 'bytes_transferred', 0) or '-', + getattr(response, 'bytes_transferred', 0) or '-', + req.headers.get('etag', '-'), + req.headers.get('x-trans-id', '-'), logged_headers + or '-', trans_time))) def filter_factory(global_conf, **local_conf): diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py b/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py deleted file mode 100644 index f53bc5a..0000000 --- a/gluster/swift/common/middleware/gswauth/test_swauth/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# See http://code.google.com/p/python-nose/issues/detail?id=373 -# The code below enables nosetests to work with i18n _() blocks - -import __builtin__ - -setattr(__builtin__, '_', lambda x: x) diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/unit/__init__.py b/gluster/swift/common/middleware/gswauth/test_swauth/unit/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/gluster/swift/common/middleware/gswauth/test_swauth/unit/__init__.py +++ /dev/null diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py b/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py deleted file mode 100644 index d9b7b55..0000000 --- a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_authtypes.py +++ /dev/null @@ -1,63 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Pablo Llopis 2011 - -import unittest -from swauth import authtypes - - -class TestPlaintext(unittest.TestCase): - - def setUp(self): - self.auth_encoder = authtypes.Plaintext() - - def test_plaintext_encode(self): - enc_key = self.auth_encoder.encode('keystring') - self.assertEquals('plaintext:keystring', enc_key) - - def test_plaintext_valid_match(self): - creds = 'plaintext:keystring' - match = self.auth_encoder.match('keystring', creds) - self.assertEquals(match, True) - - def test_plaintext_invalid_match(self): - creds = 'plaintext:other-keystring' - match = self.auth_encoder.match('keystring', creds) - self.assertEquals(match, False) - - -class TestSha1(unittest.TestCase): - - def setUp(self): - self.auth_encoder = authtypes.Sha1() - self.auth_encoder.salt = 'salt' - - def test_sha1_encode(self): - enc_key = self.auth_encoder.encode('keystring') - self.assertEquals('sha1:salt$d50dc700c296e23ce5b41f7431a0e01f69010f06', - enc_key) - - def test_sha1_valid_match(self): - creds = 'sha1:salt$d50dc700c296e23ce5b41f7431a0e01f69010f06' - match = self.auth_encoder.match('keystring', creds) - self.assertEquals(match, True) - - def test_sha1_invalid_match(self): - creds = 'sha1:salt$deadbabedeadbabedeadbabec0ffeebadc0ffeee' - match = self.auth_encoder.match('keystring', creds) - self.assertEquals(match, False) - - -if __name__ == '__main__': - unittest.main() diff --git a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py b/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py deleted file mode 100644 index 62259ff..0000000 --- a/gluster/swift/common/middleware/gswauth/test_swauth/unit/test_middleware.py +++ /dev/null @@ -1,4519 +0,0 @@ -# Copyright (c) 2010-2011 OpenStack, LLC. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -try: - import simplejson as json -except ImportError: - import json -import unittest -from contextlib import contextmanager -from time import time - -from swift.common.swob import Request, Response - -from swauth import middleware as auth -from swauth.authtypes import MAX_TOKEN_LENGTH - - -DEFAULT_TOKEN_LIFE = 86400 -MAX_TOKEN_LIFE = 100000 - - -class FakeMemcache(object): - - def __init__(self): - self.store = {} - - def get(self, key): - return self.store.get(key) - - def set(self, key, value, timeout=0, time=0): - self.store[key] = value - return True - - def incr(self, key, timeout=0, time=0): - self.store[key] = self.store.setdefault(key, 0) + 1 - return self.store[key] - - @contextmanager - def soft_lock(self, key, timeout=0, retries=5, time=0): - yield True - - def delete(self, key): - try: - del self.store[key] - except Exception: - pass - return True - - -class FakeApp(object): - - def __init__( - self, status_headers_body_iter=None, acl=None, sync_key=None): - self.calls = 0 - self.status_headers_body_iter = status_headers_body_iter - if not self.status_headers_body_iter: - self.status_headers_body_iter = iter( - [('404 Not Found', {}, '')]) - self.acl = acl - self.sync_key = sync_key - - def __call__(self, env, start_response): - self.calls += 1 - self.request = Request.blank('', environ=env) - if self.acl: - self.request.acl = self.acl - if self.sync_key: - self.request.environ[ - 'swift_sync_key'] = self.sync_key - if 'swift.authorize' in env: - resp = env['swift.authorize'](self.request) - if resp: - return resp(env, start_response) - status, headers, body = self.status_headers_body_iter.next( - ) - return Response(status=status, headers=headers, - body=body)(env, start_response) - - -class FakeConn(object): - - def __init__(self, status_headers_body_iter=None): - self.calls = 0 - self.status_headers_body_iter = status_headers_body_iter - if not self.status_headers_body_iter: - self.status_headers_body_iter = iter( - [('404 Not Found', {}, '')]) - - def request(self, method, path, headers): - self.calls += 1 - self.request_path = path - self.status, self.headers, self.body = \ - self.status_headers_body_iter.next() - self.status, self.reason = self.status.split(' ', 1) - self.status = int(self.status) - - def getresponse(self): - return self - - def read(self): - body = self.body - self.body = '' - return body - - -class TestAuth(unittest.TestCase): - - def setUp(self): - self.test_auth = \ - auth.filter_factory({ - 'super_admin_key': 'supertest', - 'token_life': str(DEFAULT_TOKEN_LIFE), - 'max_token_life': str(MAX_TOKEN_LIFE)})(FakeApp()) - - def test_super_admin_key_not_required(self): - auth.filter_factory({})(FakeApp()) - - def test_reseller_prefix_init(self): - app = FakeApp() - ath = auth.filter_factory( - {'super_admin_key': 'supertest'})(app) - self.assertEquals(ath.reseller_prefix, 'AUTH_') - ath = auth.filter_factory( - {'super_admin_key': 'supertest', - 'reseller_prefix': 'TEST'})(app) - self.assertEquals(ath.reseller_prefix, 'TEST_') - ath = auth.filter_factory( - {'super_admin_key': 'supertest', - 'reseller_prefix': 'TEST_'})(app) - self.assertEquals(ath.reseller_prefix, 'TEST_') - - def test_auth_prefix_init(self): - app = FakeApp() - ath = auth.filter_factory( - {'super_admin_key': 'supertest'})(app) - self.assertEquals(ath.auth_prefix, '/auth/') - ath = auth.filter_factory( - {'super_admin_key': 'supertest', - 'auth_prefix': ''})(app) - self.assertEquals(ath.auth_prefix, '/auth/') - ath = auth.filter_factory( - {'super_admin_key': 'supertest', - 'auth_prefix': '/test/'})(app) - self.assertEquals(ath.auth_prefix, '/test/') - ath = auth.filter_factory( - {'super_admin_key': 'supertest', - 'auth_prefix': '/test'})(app) - self.assertEquals(ath.auth_prefix, '/test/') - ath = auth.filter_factory( - {'super_admin_key': 'supertest', - 'auth_prefix': 'test/'})(app) - self.assertEquals(ath.auth_prefix, '/test/') - ath = auth.filter_factory( - {'super_admin_key': 'supertest', - 'auth_prefix': 'test'})(app) - self.assertEquals(ath.auth_prefix, '/test/') - - def test_no_auth_type_init(self): - app = FakeApp() - ath = auth.filter_factory({})(app) - self.assertEquals(ath.auth_type, 'Plaintext') - - def test_valid_auth_type_init(self): - app = FakeApp() - ath = auth.filter_factory( - {'auth_type': 'sha1'})(app) - self.assertEquals(ath.auth_type, 'Sha1') - ath = auth.filter_factory( - {'auth_type': 'plaintext'})(app) - self.assertEquals(ath.auth_type, 'Plaintext') - - def test_invalid_auth_type_init(self): - app = FakeApp() - exc = None - try: - auth.filter_factory( - {'auth_type': 'NONEXISTANT'})(app) - except Exception as err: - exc = err - self.assertEquals(str(exc), - 'Invalid auth_type in config file: %s' % - 'Nonexistant') - - def test_default_swift_cluster_init(self): - app = FakeApp() - self.assertRaises(Exception, auth.filter_factory({ - 'super_admin_key': 'supertest', - 'default_swift_cluster': 'local#badscheme://host/path'}), app) - ath = auth.filter_factory( - {'super_admin_key': 'supertest'})(app) - self.assertEquals(ath.default_swift_cluster, - 'local#http://127.0.0.1:8080/v1') - ath = auth.filter_factory({ - 'super_admin_key': 'supertest', - 'default_swift_cluster': 'local#http://host/path'})(app) - self.assertEquals(ath.default_swift_cluster, - 'local#http://host/path') - ath = auth.filter_factory({ - 'super_admin_key': 'supertest', - 'default_swift_cluster': 'local#https://host/path/'})(app) - self.assertEquals(ath.dsc_url, 'https://host/path') - self.assertEquals(ath.dsc_url2, 'https://host/path') - ath = auth.filter_factory({ - 'super_admin_key': 'supertest', - 'default_swift_cluster': - 'local#https://host/path/#http://host2/path2/'})(app) - self.assertEquals(ath.dsc_url, 'https://host/path') - self.assertEquals( - ath.dsc_url2, - 'http://host2/path2') - - def test_top_level_denied(self): - resp = Request.blank( - '/').get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - - def test_anon(self): - resp = Request.blank( - '/v1/AUTH_account').get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - self.assertEquals(resp.environ['swift.authorize'], - self.test_auth.authorize) - - def test_auth_deny_non_reseller_prefix(self): - resp = Request.blank( - '/v1/BLAH_account', - headers={'X-Auth-Token': 'BLAH_t'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - self.assertEquals(resp.environ['swift.authorize'], - self.test_auth.denied_response) - - def test_auth_deny_non_reseller_prefix_no_override( - self): - fake_authorize = lambda x: Response( - status='500 Fake') - resp = Request.blank( - '/v1/BLAH_account', - headers={'X-Auth-Token': 'BLAH_t'}, - environ={'swift.authorize': fake_authorize}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(resp.environ['swift.authorize'], fake_authorize) - - def test_auth_no_reseller_prefix_deny(self): - # Ensures that when we have no reseller prefix, we don't deny a request - # outright but set up a denial swift.authorize and pass the request on - # down the chain. - local_app = FakeApp() - local_auth = auth.filter_factory( - {'super_admin_key': 'supertest', - 'reseller_prefix': ''})(local_app) - resp = Request.blank( - '/v1/account', - headers={'X-Auth-Token': 't'}).get_response(local_auth) - self.assertEquals(resp.status_int, 401) - # one for checking auth, two for request passed - # along - self.assertEquals(local_app.calls, 2) - self.assertEquals(resp.environ['swift.authorize'], - local_auth.denied_response) - - def test_auth_no_reseller_prefix_allow(self): - # Ensures that when we have no reseller prefix, we can still allow - # access if our auth server accepts requests - local_app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps( - {'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), - ('204 No Content', {}, '')])) - local_auth = auth.filter_factory( - {'super_admin_key': 'supertest', - 'reseller_prefix': ''})(local_app) - resp = Request.blank( - '/v1/act', - headers={'X-Auth-Token': 't'}).get_response(local_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(local_app.calls, 2) - self.assertEquals(resp.environ['swift.authorize'], - local_auth.authorize) - - def test_auth_no_reseller_prefix_no_token(self): - # Check that normally we set up a call back to our - # authorize. - local_auth = \ - auth.filter_factory( - {'super_admin_key': 'supertest', - 'reseller_prefix': ''})(FakeApp(iter([]))) - resp = Request.blank( - '/v1/account').get_response( - local_auth) - self.assertEquals(resp.status_int, 401) - self.assertEquals( - resp.environ['swift.authorize'], local_auth.authorize) - # Now make sure we don't override an existing swift.authorize when we - # have no reseller prefix. - local_auth = \ - auth.filter_factory( - {'super_admin_key': 'supertest', - 'reseller_prefix': ''})(FakeApp()) - local_authorize = lambda req: Response('test') - resp = Request.blank( - '/v1/account', environ={'swift.authorize': - local_authorize}).get_response(local_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals( - resp.environ['swift.authorize'], - local_authorize) - - def test_auth_fail(self): - resp = Request.blank( - '/v1/AUTH_cfa', - headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - - def test_auth_success(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps( - {'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), - ('204 No Content', {}, '')])) - resp = Request.blank( - '/v1/AUTH_cfa', - headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_auth_memcache(self): - # First run our test without memcache, showing we need to return the - # token contents twice. - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps( - {'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), - ('204 No Content', {}, ''), - ('200 Ok', {}, - json.dumps( - {'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), - ('204 No Content', {}, '')])) - resp = Request.blank( - '/v1/AUTH_cfa', - headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - resp = Request.blank( - '/v1/AUTH_cfa', - headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 4) - # Now run our test with memcache, showing we no longer need to return - # the token contents twice. - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps( - {'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), - ('204 No Content', {}, ''), - # Don't need a second token object returned if memcache is - # used - ('204 No Content', {}, '')])) - fake_memcache = FakeMemcache() - resp = Request.blank( - '/v1/AUTH_cfa', - headers={'X-Auth-Token': 'AUTH_t'}, - environ={'swift.cache': fake_memcache}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 204) - resp = Request.blank( - '/v1/AUTH_cfa', - headers={'X-Auth-Token': 'AUTH_t'}, - environ={'swift.cache': fake_memcache}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_auth_just_expired(self): - self.test_auth.app = FakeApp(iter([ - # Request for token (which will have expired) - ('200 Ok', {}, - json.dumps( - {'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() - 1})), - # Request to delete token - ('204 No Content', {}, '')])) - resp = Request.blank( - '/v1/AUTH_cfa', - headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_middleware_storage_token(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps( - {'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), - ('204 No Content', {}, '')])) - resp = Request.blank( - '/v1/AUTH_cfa', - headers={'X-Storage-Token': 'AUTH_t'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_authorize_bad_path(self): - req = Request.blank('/badpath') - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 401) - req = Request.blank('/badpath') - req.remote_user = 'act:usr,act,AUTH_cfa' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - def test_authorize_account_access(self): - req = Request.blank('/v1/AUTH_cfa') - req.remote_user = 'act:usr,act,AUTH_cfa' - self.assertEquals( - self.test_auth.authorize(req), - None) - req = Request.blank('/v1/AUTH_cfa') - req.remote_user = 'act:usr,act' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - def test_authorize_acl_group_access(self): - req = Request.blank('/v1/AUTH_cfa') - req.remote_user = 'act:usr,act' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - req = Request.blank('/v1/AUTH_cfa') - req.remote_user = 'act:usr,act' - req.acl = 'act' - self.assertEquals( - self.test_auth.authorize(req), - None) - req = Request.blank('/v1/AUTH_cfa') - req.remote_user = 'act:usr,act' - req.acl = 'act:usr' - self.assertEquals( - self.test_auth.authorize(req), - None) - req = Request.blank('/v1/AUTH_cfa') - req.remote_user = 'act:usr,act' - req.acl = 'act2' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - req = Request.blank('/v1/AUTH_cfa') - req.remote_user = 'act:usr,act' - req.acl = 'act:usr2' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - def test_deny_cross_reseller(self): - # Tests that cross-reseller is denied, even if ACLs/group - # names match - req = Request.blank('/v1/OTHER_cfa') - req.remote_user = 'act:usr,act,AUTH_cfa' - req.acl = 'act' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - def test_authorize_acl_referrer_access(self): - req = Request.blank('/v1/AUTH_cfa/c') - req.remote_user = 'act:usr,act' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - req = Request.blank('/v1/AUTH_cfa/c') - req.remote_user = 'act:usr,act' - req.acl = '.r:*,.rlistings' - self.assertEquals( - self.test_auth.authorize(req), - None) - req = Request.blank('/v1/AUTH_cfa/c') - req.remote_user = 'act:usr,act' - req.acl = '.r:*' # No listings allowed - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - req = Request.blank('/v1/AUTH_cfa/c') - req.remote_user = 'act:usr,act' - req.acl = '.r:.example.com,.rlistings' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - req = Request.blank('/v1/AUTH_cfa/c') - req.remote_user = 'act:usr,act' - req.referer = 'http://www.example.com/index.html' - req.acl = '.r:.example.com,.rlistings' - self.assertEquals( - self.test_auth.authorize(req), - None) - req = Request.blank('/v1/AUTH_cfa/c') - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 401) - req = Request.blank('/v1/AUTH_cfa/c') - req.acl = '.r:*,.rlistings' - self.assertEquals( - self.test_auth.authorize(req), - None) - req = Request.blank('/v1/AUTH_cfa/c') - req.acl = '.r:*' # No listings allowed - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 401) - req = Request.blank('/v1/AUTH_cfa/c') - req.acl = '.r:.example.com,.rlistings' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 401) - req = Request.blank('/v1/AUTH_cfa/c') - req.referer = 'http://www.example.com/index.html' - req.acl = '.r:.example.com,.rlistings' - self.assertEquals( - self.test_auth.authorize(req), - None) - - def test_detect_reseller_request(self): - req = self._make_request('/v1/AUTH_admin', - headers={'X-Auth-Token': 'AUTH_t'}) - cache_key = 'AUTH_/auth/AUTH_t' - cache_entry = (time() + 3600, '.reseller_admin') - req.environ['swift.cache'].set( - cache_key, cache_entry) - self.assertTrue(req.environ.get('reseller_request')) - - def test_account_put_permissions(self): - req = Request.blank( - '/v1/AUTH_new', - environ={'REQUEST_METHOD': 'PUT'}) - req.remote_user = 'act:usr,act' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - req = Request.blank( - '/v1/AUTH_new', - environ={'REQUEST_METHOD': 'PUT'}) - req.remote_user = 'act:usr,act,AUTH_other' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - # Even PUTs to your own account as account admin - # should fail - req = Request.blank( - '/v1/AUTH_old', - environ={'REQUEST_METHOD': 'PUT'}) - req.remote_user = 'act:usr,act,AUTH_old' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - req = Request.blank( - '/v1/AUTH_new', - environ={'REQUEST_METHOD': 'PUT'}) - req.remote_user = 'act:usr,act,.reseller_admin' - resp = self.test_auth.authorize(req) - self.assertEquals(resp, None) - - # .super_admin is not something the middleware should ever see or care - # about - req = Request.blank( - '/v1/AUTH_new', - environ={'REQUEST_METHOD': 'PUT'}) - req.remote_user = 'act:usr,act,.super_admin' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - def test_account_delete_permissions(self): - req = Request.blank('/v1/AUTH_new', - environ={'REQUEST_METHOD': 'DELETE'}) - req.remote_user = 'act:usr,act' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - req = Request.blank('/v1/AUTH_new', - environ={'REQUEST_METHOD': 'DELETE'}) - req.remote_user = 'act:usr,act,AUTH_other' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - # Even DELETEs to your own account as account admin should - # fail - req = Request.blank('/v1/AUTH_old', - environ={'REQUEST_METHOD': 'DELETE'}) - req.remote_user = 'act:usr,act,AUTH_old' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - req = Request.blank('/v1/AUTH_new', - environ={'REQUEST_METHOD': 'DELETE'}) - req.remote_user = 'act:usr,act,.reseller_admin' - resp = self.test_auth.authorize(req) - self.assertEquals(resp, None) - - # .super_admin is not something the middleware should ever see or care - # about - req = Request.blank('/v1/AUTH_new', - environ={'REQUEST_METHOD': 'DELETE'}) - req.remote_user = 'act:usr,act,.super_admin' - resp = self.test_auth.authorize(req) - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - def test_get_token_fail(self): - resp = Request.blank( - '/auth/v1.0').get_response( - self.test_auth) - self.assertEquals(resp.status_int, 401) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - - def test_get_token_fail_invalid_key(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]}))])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'invalid'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_token_fail_invalid_x_auth_user_format( - self): - resp = Request.blank( - '/auth/v1/act/auth', - headers={'X-Auth-User': 'usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - - def test_get_token_fail_non_matching_account_in_request( - self): - resp = Request.blank( - '/auth/v1/act/auth', - headers={'X-Auth-User': 'act2:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - - def test_get_token_fail_bad_path(self): - resp = Request.blank( - '/auth/v1/act/auth/invalid', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_get_token_fail_missing_key(self): - resp = Request.blank( - '/auth/v1/act/auth', - headers={'X-Auth-User': 'act:usr'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - - def test_get_token_fail_get_user_details(self): - self.test_auth.app = FakeApp(iter([ - ('503 Service Unavailable', {}, '')])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_token_fail_get_account(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of account - ('503 Service Unavailable', {}, '')])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_get_token_fail_put_new_token(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('503 Service Unavailable', {}, '')])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_get_token_fail_post_to_user(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 4) - - def test_get_token_fail_get_services(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('204 No Content', {}, ''), - # GET of services object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 5) - - def test_get_token_fail_get_existing_token(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of token - ('503 Service Unavailable', {}, '')])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_get_token_success_v1_0(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('204 No Content', {}, ''), - # GET of services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assert_(resp.headers.get( - 'x-auth-token', - '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) - self.assertEquals(resp.headers.get('x-auth-token'), - resp.headers.get('x-storage-token')) - self.assertEquals(resp.headers.get('x-storage-url'), - 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals( - json.loads(resp.body), - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) - self.assertEquals(self.test_auth.app.calls, 5) - - def test_get_token_success_v1_0_with_user_token_life( - self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('204 No Content', {}, ''), - # GET of services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key', - 'X-Auth-Token-Lifetime': 10}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - left = int(resp.headers['x-auth-token-expires']) - self.assertTrue(left > 0, '%d > 0' % left) - self.assertTrue(left <= 10, '%d <= 10' % left) - self.assert_(resp.headers.get( - 'x-auth-token', - '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) - self.assertEquals(resp.headers.get('x-auth-token'), - resp.headers.get('x-storage-token')) - self.assertEquals(resp.headers.get('x-storage-url'), - 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals( - json.loads(resp.body), - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) - self.assertEquals(self.test_auth.app.calls, 5) - - def test_get_token_success_v1_0_with_user_token_life_past_max( - self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('204 No Content', {}, ''), - # GET of services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - req = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key', - 'X-Auth-Token-Lifetime': MAX_TOKEN_LIFE * 10}) - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - left = int(resp.headers['x-auth-token-expires']) - self.assertTrue(left > DEFAULT_TOKEN_LIFE, - '%d > %d' % (left, DEFAULT_TOKEN_LIFE)) - self.assertTrue(left <= MAX_TOKEN_LIFE, - '%d <= %d' % (left, MAX_TOKEN_LIFE)) - self.assert_(resp.headers.get( - 'x-auth-token', - '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) - self.assertEquals(resp.headers.get('x-auth-token'), - resp.headers.get('x-storage-token')) - self.assertEquals(resp.headers.get('x-storage-url'), - 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals( - json.loads(resp.body), - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) - self.assertEquals(self.test_auth.app.calls, 5) - - def test_get_token_success_v1_act_auth(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('204 No Content', {}, ''), - # GET of services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank( - '/auth/v1/act/auth', - headers={'X-Storage-User': 'usr', - 'X-Storage-Pass': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assert_(resp.headers.get( - 'x-auth-token', - '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) - self.assertEquals(resp.headers.get('x-auth-token'), - resp.headers.get('x-storage-token')) - self.assertEquals(resp.headers.get('x-storage-url'), - 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals( - json.loads(resp.body), - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) - self.assertEquals(self.test_auth.app.calls, 5) - - def test_get_token_success_storage_instead_of_auth( - self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('204 No Content', {}, ''), - # GET of services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Storage-User': 'act:usr', - 'X-Storage-Pass': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assert_( - resp.headers.get( - 'x-auth-token', - '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) - self.assertEquals(resp.headers.get('x-auth-token'), - resp.headers.get('x-storage-token')) - self.assertEquals(resp.headers.get('x-storage-url'), - 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals( - json.loads(resp.body), - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) - self.assertEquals(self.test_auth.app.calls, 5) - - def test_get_token_success_v1_act_auth_auth_instead_of_storage( - self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('204 No Content', {}, ''), - # GET of services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank( - '/auth/v1/act/auth', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assert_(resp.headers.get( - 'x-auth-token', - '').startswith('AUTH_tk'), resp.headers.get('x-auth-token')) - self.assertEquals(resp.headers.get('x-auth-token'), - resp.headers.get('x-storage-token')) - self.assertEquals(resp.headers.get('x-storage-url'), - 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals( - json.loads(resp.body), - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) - self.assertEquals(self.test_auth.app.calls, 5) - - def test_get_token_success_existing_token(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of token - ('200 Ok', {}, json.dumps( - {"account": "act", "user": "usr", - "account_id": "AUTH_cfa", - "groups": [{'name': "act:usr"}, - {'name': "key"}, {'name': ".admin"}], - "expires": 9999999999.9999999})), - # GET of services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals( - resp.headers.get('x-auth-token'), - 'AUTH_tktest') - self.assertEquals(resp.headers.get('x-auth-token'), - resp.headers.get('x-storage-token')) - self.assertEquals(resp.headers.get('x-storage-url'), - 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals( - json.loads(resp.body), - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_get_token_success_existing_token_but_request_new_one( - self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # DELETE of expired token - ('204 No Content', {}, ''), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('204 No Content', {}, ''), - # GET of services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key', - 'X-Auth-New-Token': 'true'}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertNotEquals( - resp.headers.get('x-auth-token'), 'AUTH_tktest') - self.assertEquals(resp.headers.get('x-auth-token'), - resp.headers.get('x-storage-token')) - self.assertEquals(resp.headers.get('x-storage-url'), - 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals( - json.loads(resp.body), - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) - self.assertEquals(self.test_auth.app.calls, 6) - - def test_get_token_success_existing_token_expired(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of token - ('200 Ok', {}, json.dumps( - {"account": "act", "user": "usr", - "account_id": "AUTH_cfa", - "groups": [{'name': "act:usr"}, - {'name': "key"}, {'name': ".admin"}], - "expires": 0.0})), - # DELETE of expired token - ('204 No Content', {}, ''), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('204 No Content', {}, ''), - # GET of services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertNotEquals( - resp.headers.get('x-auth-token'), - 'AUTH_tktest') - self.assertEquals(resp.headers.get('x-auth-token'), - resp.headers.get('x-storage-token')) - self.assertEquals(resp.headers.get('x-storage-url'), - 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals( - json.loads(resp.body), - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) - self.assertEquals(self.test_auth.app.calls, 7) - - def test_get_token_success_existing_token_expired_fail_deleting_old( - self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {'X-Object-Meta-Auth-Token': 'AUTH_tktest'}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of token - ('200 Ok', {}, json.dumps({"account": "act", "user": "usr", - "account_id": "AUTH_cfa", - "groups": [{'name': "act:usr"}, - {'name': "key"}, {'name': ".admin"}], - "expires": 0.0})), - # DELETE of expired token - ('503 Service Unavailable', {}, ''), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('204 No Content', {}, ''), - # GET of services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': 'act:usr', - 'X-Auth-Key': 'key'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertNotEquals( - resp.headers.get('x-auth-token'), - 'AUTH_tktest') - self.assertEquals(resp.headers.get('x-auth-token'), - resp.headers.get('x-storage-token')) - self.assertEquals(resp.headers.get('x-storage-url'), - 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals( - json.loads(resp.body), - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) - self.assertEquals(self.test_auth.app.calls, 7) - - def test_prep_success(self): - list_to_iter = [ - # PUT of .auth account - ('201 Created', {}, ''), - # PUT of .account_id container - ('201 Created', {}, '')] - # PUT of .token* containers - for x in xrange(16): - list_to_iter.append(('201 Created', {}, '')) - self.test_auth.app = FakeApp(iter(list_to_iter)) - resp = Request.blank('/auth/v2/.prep', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 18) - - def test_prep_bad_method(self): - resp = Request.blank('/auth/v2/.prep', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - resp = Request.blank('/auth/v2/.prep', - environ={ - 'REQUEST_METHOD': 'HEAD'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - resp = Request.blank('/auth/v2/.prep', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_prep_bad_creds(self): - resp = Request.blank('/auth/v2/.prep', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - 'super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - resp = Request.blank('/auth/v2/.prep', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'upertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - resp = Request.blank('/auth/v2/.prep', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': '.super_admin'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - resp = Request.blank('/auth/v2/.prep', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - resp = Request.blank( - '/auth/v2/.prep', - environ={'REQUEST_METHOD': 'POST'}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - - def test_prep_fail_account_create(self): - self.test_auth.app = FakeApp(iter([ - # PUT of .auth account - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/.prep', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_prep_fail_token_container_create(self): - self.test_auth.app = FakeApp(iter([ - # PUT of .auth account - ('201 Created', {}, ''), - # PUT of .token container - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/.prep', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_prep_fail_account_id_container_create(self): - self.test_auth.app = FakeApp(iter([ - # PUT of .auth account - ('201 Created', {}, ''), - # PUT of .token container - ('201 Created', {}, ''), - # PUT of .account_id container - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/.prep', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_get_reseller_success(self): - self.test_auth.app = FakeApp(iter([ - # GET of .auth account (list containers) - ('200 Ok', {}, json.dumps([ - {"name": ".token", "count": 0, "bytes": 0}, - {"name": ".account_id", - "count": 0, "bytes": 0}, - {"name": "act", "count": 0, "bytes": 0}])), - # GET of .auth account (list containers - # continuation) - ('200 Ok', {}, '[]')])) - resp = Request.blank('/auth/v2', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals(json.loads(resp.body), - {"accounts": [{"name": "act"}]}) - self.assertEquals(self.test_auth.app.calls, 2) - - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".admin"}, - {"name": ".reseller_admin"}], "auth": "plaintext:key"})), - # GET of .auth account (list containers) - ('200 Ok', {}, json.dumps([ - {"name": ".token", "count": 0, "bytes": 0}, - {"name": ".account_id", - "count": 0, "bytes": 0}, - {"name": "act", "count": 0, "bytes": 0}])), - # GET of .auth account (list containers - # continuation) - ('200 Ok', {}, '[]')])) - resp = Request.blank('/auth/v2', - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals(json.loads(resp.body), - {"accounts": [{"name": "act"}]}) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_get_reseller_fail_bad_creds(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2', - headers={ - 'X-Auth-Admin-User': - 'super:admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller - # admin) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2', - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, - {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2', - headers={ - 'X-Auth-Admin-User': - 'act:usr', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_reseller_fail_listing(self): - self.test_auth.app = FakeApp(iter([ - # GET of .auth account (list containers) - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of .auth account (list containers) - ('200 Ok', {}, json.dumps([ - {"name": ".token", "count": 0, "bytes": 0}, - {"name": ".account_id", - "count": 0, "bytes": 0}, - {"name": "act", "count": 0, "bytes": 0}])), - # GET of .auth account (list containers - # continuation) - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_get_account_success(self): - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('200 Ok', {}, - json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # GET of account container (list objects) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}, - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), - # GET of account container (list objects - # continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) - resp = Request.blank('/auth/v2/act', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals( - json.loads(resp.body), - {'account_id': 'AUTH_cfa', - 'services': {'storage': - {'default': 'local', - 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}}, - 'users': [{'name': 'tester'}, {'name': 'tester3'}]}) - self.assertEquals(self.test_auth.app.calls, 3) - - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"})), - # GET of .services object - ('200 Ok', {}, - json.dumps({"storage": - {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # GET of account container (list objects) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}, - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), - # GET of account container (list objects - # continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) - resp = Request.blank('/auth/v2/act', - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals( - json.loads(resp.body), - {'account_id': 'AUTH_cfa', - 'services': {'storage': - {'default': 'local', - 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}}, - 'users': [{'name': 'tester'}, {'name': 'tester3'}]}) - self.assertEquals(self.test_auth.app.calls, 4) - - def test_get_account_fail_bad_account_name(self): - resp = Request.blank('/auth/v2/.token', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - resp = Request.blank('/auth/v2/.anything', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_get_account_fail_creds(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act', - headers={ - 'X-Auth-Admin-User': - 'super:admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but wrong - # account) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act', - headers={ - 'X-Auth-Admin-User': - 'act2:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, - {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act', - headers={ - 'X-Auth-Admin-User': - 'act:usr', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_account_fail_get_services(self): - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_account_fail_listing(self): - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # GET of account container (list objects) - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 2) - - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # GET of account container (list objects) - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertEquals(self.test_auth.app.calls, 2) - - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # GET of account container (list objects) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}, - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), - # GET of account container (list objects - # continuation) - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_set_services_new_service(self): - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('200 Ok', {}, - json.dumps({"storage": - {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # PUT of new .services object - ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps( - {'new_service': - {'new_endpoint': 'new_value'}}) - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals( - json.loads(resp.body), - {'storage': {'default': 'local', - 'local': 'http://127.0.0.1:8080/v1/AUTH_cfa'}, - 'new_service': {'new_endpoint': 'new_value'}}) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_set_services_new_endpoint(self): - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('200 Ok', {}, json.dumps( - {"storage": - {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # PUT of new .services object - ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps( - {'storage': {'new_endpoint': 'new_value'}}) - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals( - json.loads(resp.body), - {'storage': {'default': 'local', - 'local': - 'http://127.0.0.1:8080/v1/AUTH_cfa', - 'new_endpoint': 'new_value'}}) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_set_services_update_endpoint(self): - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # PUT of new .services object - ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps( - {'storage': {'local': 'new_value'}}) - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals(json.loads(resp.body), - {'storage': {'default': 'local', - 'local': 'new_value'}}) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_set_services_fail_bad_creds(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - 'super:admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps( - {'storage': {'local': 'new_value'}}) - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller - # admin) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': 'key'}, - body=json.dumps( - {'storage': {'local': 'new_value'}}) - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, - {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - 'act:usr', - 'X-Auth-Admin-Key': 'key'}, - body=json.dumps( - {'storage': {'local': 'new_value'}}) - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_set_services_fail_bad_account_name(self): - resp = Request.blank('/auth/v2/.act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps( - {'storage': {'local': 'new_value'}}) - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_set_services_fail_bad_json(self): - resp = Request.blank('/auth/v2/act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body='garbage' - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - resp = Request.blank('/auth/v2/act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body='' - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_set_services_fail_get_services(self): - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('503 Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps({ - 'new_service': {'new_endpoint': 'new_value'}}) - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps({ - 'new_service': {'new_endpoint': 'new_value'}}) - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_set_services_fail_put_services(self): - self.test_auth.app = FakeApp(iter([ - # GET of .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # PUT of new .services object - ('503 Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act/.services', - environ={ - 'REQUEST_METHOD': 'POST'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}, - body=json.dumps( - {'new_service': - {'new_endpoint': 'new_value'}}) - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_put_account_success(self): - conn = FakeConn(iter([ - # PUT of storage account itself - ('201 Created', {}, '')])) - self.test_auth.get_conn = lambda: conn - self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for - # pre-existence - ('404 Not Found', {}, ''), - # PUT of account container - ('204 No Content', {}, ''), - # PUT of .account_id mapping object - ('204 No Content', {}, ''), - # PUT of .services object - ('204 No Content', {}, ''), - # POST to account container updating - # X-Container-Meta-Account-Id - ('204 No Content', {}, '')])) - resp = Request.blank( - '/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 201) - self.assertEquals(self.test_auth.app.calls, 5) - self.assertEquals(conn.calls, 1) - - def test_put_account_success_preexist_but_not_completed( - self): - conn = FakeConn(iter([ - # PUT of storage account itself - ('201 Created', {}, '')])) - self.test_auth.get_conn = lambda: conn - self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for pre-existence - # We're going to show it as existing this time, but with no - # X-Container-Meta-Account-Id, indicating a failed - # previous attempt - ('200 Ok', {}, ''), - # PUT of .account_id mapping object - ('204 No Content', {}, ''), - # PUT of .services object - ('204 No Content', {}, ''), - # POST to account container updating - # X-Container-Meta-Account-Id - ('204 No Content', {}, '')])) - resp = Request.blank( - '/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'PUT', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 201) - self.assertEquals(self.test_auth.app.calls, 4) - self.assertEquals(conn.calls, 1) - - def test_put_account_success_preexist_and_completed( - self): - self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for pre-existence - # We're going to show it as existing this time, and with an - # X-Container-Meta-Account-Id, indicating it already - # exists - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '')])) - resp = Request.blank( - '/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 202) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_put_account_success_with_given_suffix(self): - conn = FakeConn(iter([ - # PUT of storage account itself - ('201 Created', {}, '')])) - self.test_auth.get_conn = lambda: conn - self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for - # pre-existence - ('404 Not Found', {}, ''), - # PUT of account container - ('204 No Content', {}, ''), - # PUT of .account_id mapping object - ('204 No Content', {}, ''), - # PUT of .services object - ('204 No Content', {}, ''), - # POST to account container updating - # X-Container-Meta-Account-Id - ('204 No Content', {}, '')])) - resp = Request.blank( - '/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest', - 'X-Account-Suffix': 'test-suffix'}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 201) - self.assertEquals( - conn.request_path, - '/v1/AUTH_test-suffix') - self.assertEquals(self.test_auth.app.calls, 5) - self.assertEquals(conn.calls, 1) - - def test_put_account_fail_bad_creds(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('404 Not Found', {}, '')])) - resp = Request.blank( - '/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': 'super:admin', - 'X-Auth-Admin-Key': 'supertest'},).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller - # admin) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank( - '/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': 'act:adm', - 'X-Auth-Admin-Key': 'key'},).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, - {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank( - '/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': 'act:usr', - 'X-Auth-Admin-Key': 'key'},).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_put_account_fail_invalid_account_name(self): - resp = Request.blank( - '/auth/v2/.act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'},).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_put_account_fail_on_initial_account_head(self): - self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for - # pre-existence - ('503 Service Unavailable', {}, '')])) - resp = Request.blank( - '/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_put_account_fail_on_account_marker_put(self): - self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for - # pre-existence - ('404 Not Found', {}, ''), - # PUT of account container - ('503 Service Unavailable', {}, '')])) - resp = Request.blank( - '/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_put_account_fail_on_storage_account_put(self): - conn = FakeConn(iter([ - # PUT of storage account itself - ('503 Service Unavailable', {}, '')])) - self.test_auth.get_conn = lambda: conn - self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for - # pre-existence - ('404 Not Found', {}, ''), - # PUT of account container - ('204 No Content', {}, '')])) - resp = Request.blank( - '/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(conn.calls, 1) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_put_account_fail_on_account_id_mapping(self): - conn = FakeConn(iter([ - # PUT of storage account itself - ('201 Created', {}, '')])) - self.test_auth.get_conn = lambda: conn - self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for - # pre-existence - ('404 Not Found', {}, ''), - # PUT of account container - ('204 No Content', {}, ''), - # PUT of .account_id mapping object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank( - '/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(conn.calls, 1) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_put_account_fail_on_services_object(self): - conn = FakeConn(iter([ - # PUT of storage account itself - ('201 Created', {}, '')])) - self.test_auth.get_conn = lambda: conn - self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for - # pre-existence - ('404 Not Found', {}, ''), - # PUT of account container - ('204 No Content', {}, ''), - # PUT of .account_id mapping object - ('204 No Content', {}, ''), - # PUT of .services object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank( - '/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', - 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(conn.calls, 1) - self.assertEquals(self.test_auth.app.calls, 4) - - def test_put_account_fail_on_post_mapping(self): - conn = FakeConn(iter([ - # PUT of storage account itself - ('201 Created', {}, '')])) - self.test_auth.get_conn = lambda: conn - self.test_auth.app = FakeApp(iter([ - # Initial HEAD of account container to check for - # pre-existence - ('404 Not Found', {}, ''), - # PUT of account container - ('204 No Content', {}, ''), - # PUT of .account_id mapping object - ('204 No Content', {}, ''), - # PUT of .services object - ('204 No Content', {}, ''), - # POST to account container updating - # X-Container-Meta-Account-Id - ('503 Service Unavailable', {}, '')])) - resp = Request.blank( - '/auth/v2/act', - environ={'REQUEST_METHOD': 'PUT', 'swift.cache': FakeMemcache()}, - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(conn.calls, 1) - self.assertEquals(self.test_auth.app.calls, 5) - - def test_delete_account_success(self): - conn = FakeConn(iter([ - # DELETE of storage account itself - ('204 No Content', {}, '')])) - self.test_auth.get_conn = lambda x: conn - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # DELETE the .services object - ('204 No Content', {}, ''), - # DELETE the .account_id mapping object - ('204 No Content', {}, ''), - # DELETE the account container - ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 6) - self.assertEquals(conn.calls, 1) - - def test_delete_account_success_missing_services(self): - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('404 Not Found', {}, ''), - # DELETE the .account_id mapping object - ('204 No Content', {}, ''), - # DELETE the account container - ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 5) - - def test_delete_account_success_missing_storage_account( - self): - conn = FakeConn(iter([ - # DELETE of storage account itself - ('404 Not Found', {}, '')])) - self.test_auth.get_conn = lambda x: conn - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # DELETE the .services object - ('204 No Content', {}, ''), - # DELETE the .account_id mapping object - ('204 No Content', {}, ''), - # DELETE the account container - ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 6) - self.assertEquals(conn.calls, 1) - - def test_delete_account_success_missing_account_id_mapping( - self): - conn = FakeConn(iter([ - # DELETE of storage account itself - ('204 No Content', {}, '')])) - self.test_auth.get_conn = lambda x: conn - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # DELETE the .services object - ('204 No Content', {}, ''), - # DELETE the .account_id mapping object - ('404 Not Found', {}, ''), - # DELETE the account container - ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 6) - self.assertEquals(conn.calls, 1) - - def test_delete_account_success_missing_account_container_at_end( - self): - conn = FakeConn(iter([ - # DELETE of storage account itself - ('204 No Content', {}, '')])) - self.test_auth.get_conn = lambda x: conn - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # DELETE the .services object - ('204 No Content', {}, ''), - # DELETE the .account_id mapping object - ('204 No Content', {}, ''), - # DELETE the account container - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 6) - self.assertEquals(conn.calls, 1) - - def test_delete_account_fail_bad_creds(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - 'super:admin', - 'X-Auth-Admin-Key': 'supertest'}, - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller - # admin) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': 'key'}, - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, - {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - 'act:usr', - 'X-Auth-Admin-Key': 'key'}, - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_delete_account_fail_invalid_account_name(self): - resp = Request.blank('/auth/v2/.act', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_delete_account_fail_not_found(self): - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_delete_account_fail_not_found_concurrency( - self): - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_delete_account_fail_list_account(self): - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_delete_account_fail_list_account_concurrency( - self): - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_delete_account_fail_has_users(self): - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}]))])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 409) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_delete_account_fail_has_users2(self): - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}]))])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 409) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_delete_account_fail_get_services(self): - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_delete_account_fail_delete_storage_account( - self): - conn = FakeConn(iter([ - # DELETE of storage account itself - ('409 Conflict', {}, '')])) - self.test_auth.get_conn = lambda x: conn - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 409) - self.assertEquals(self.test_auth.app.calls, 3) - self.assertEquals(conn.calls, 1) - - def test_delete_account_fail_delete_storage_account2( - self): - conn = FakeConn(iter([ - # DELETE of storage account itself - ('204 No Content', {}, ''), - # DELETE of storage account itself - ('409 Conflict', {}, '')])) - self.test_auth.get_conn = lambda x: conn - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa", - "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 3) - self.assertEquals(conn.calls, 2) - - def test_delete_account_fail_delete_storage_account3( - self): - conn = FakeConn(iter([ - # DELETE of storage account itself - ('503 Service Unavailable', {}, '')])) - self.test_auth.get_conn = lambda x: conn - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 3) - self.assertEquals(conn.calls, 1) - - def test_delete_account_fail_delete_storage_account4( - self): - conn = FakeConn(iter([ - # DELETE of storage account itself - ('204 No Content', {}, ''), - # DELETE of storage account itself - ('503 Service Unavailable', {}, '')])) - self.test_auth.get_conn = lambda x: conn - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa", - "other": "http://127.0.0.1:8080/v1/AUTH_cfa2"}}))])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 3) - self.assertEquals(conn.calls, 2) - - def test_delete_account_fail_delete_services(self): - conn = FakeConn(iter([ - # DELETE of storage account itself - ('204 No Content', {}, '')])) - self.test_auth.get_conn = lambda x: conn - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # DELETE the .services object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 4) - self.assertEquals(conn.calls, 1) - - def test_delete_account_fail_delete_account_id_mapping( - self): - conn = FakeConn(iter([ - # DELETE of storage account itself - ('204 No Content', {}, '')])) - self.test_auth.get_conn = lambda x: conn - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # DELETE the .services object - ('204 No Content', {}, ''), - # DELETE the .account_id mapping object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 5) - self.assertEquals(conn.calls, 1) - - def test_delete_account_fail_delete_account_container( - self): - conn = FakeConn(iter([ - # DELETE of storage account itself - ('204 No Content', {}, '')])) - self.test_auth.get_conn = lambda x: conn - self.test_auth.app = FakeApp(iter([ - # Account's container listing, checking for - # users - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}])), - # Account's container listing, checking for users - # (continuation) - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]'), - # GET the .services object - ('200 Ok', {}, json.dumps( - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}})), - # DELETE the .services object - ('204 No Content', {}, ''), - # DELETE the .account_id mapping object - ('204 No Content', {}, ''), - # DELETE the account container - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act', - environ={ - 'REQUEST_METHOD': 'DELETE', - 'swift.cache': FakeMemcache()}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 6) - self.assertEquals(conn.calls, 1) - - def test_get_user_success(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals(resp.body, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".admin"}], - "auth": "plaintext:key"})) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_user_fail_no_super_admin_key(self): - local_auth = auth.filter_factory({})(FakeApp(iter([ - # GET of user object (but we should never get - # here) - ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".admin"}], - "auth": "plaintext:key"}))]))) - resp = Request.blank('/auth/v2/act/usr', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(local_auth) - self.assertEquals(resp.status_int, 404) - self.assertEquals(local_auth.app.calls, 0) - - def test_get_user_groups_success(self): - self.test_auth.app = FakeApp(iter([ - # GET of account container (list objects) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}, - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), - # GET of user object - ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:tester"}, {"name": "act"}, - {"name": ".admin"}], - "auth": "plaintext:key"})), - # GET of user object - ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:tester3"}, {"name": "act"}], - "auth": "plaintext:key3"})), - # GET of account container (list objects - # continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) - resp = Request.blank('/auth/v2/act/.groups', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals(resp.body, json.dumps( - {"groups": [{"name": ".admin"}, {"name": "act"}, - {"name": "act:tester"}, {"name": "act:tester3"}]})) - self.assertEquals(self.test_auth.app.calls, 4) - - def test_get_user_groups_success2(self): - self.test_auth.app = FakeApp(iter([ - # GET of account container (list objects) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}])), - # GET of user object - ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:tester"}, {"name": "act"}, - {"name": ".admin"}], - "auth": "plaintext:key"})), - # GET of account container (list objects - # continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), - # GET of user object - ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:tester3"}, {"name": "act"}], - "auth": "plaintext:key3"})), - # GET of account container (list objects - # continuation) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, '[]')])) - resp = Request.blank('/auth/v2/act/.groups', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals(resp.body, json.dumps( - {"groups": [{"name": ".admin"}, {"name": "act"}, - {"name": "act:tester"}, {"name": "act:tester3"}]})) - self.assertEquals(self.test_auth.app.calls, 5) - - def test_get_user_fail_invalid_account(self): - resp = Request.blank('/auth/v2/.invalid/usr', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_get_user_fail_invalid_user(self): - resp = Request.blank('/auth/v2/act/.invalid', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_get_user_fail_bad_creds(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - headers={ - 'X-Auth-Admin-User': - 'super:admin', - 'X-Auth-Admin-Key': 'supertest'}, - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, - {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - headers={ - 'X-Auth-Admin-User': - 'act:usr', - 'X-Auth-Admin-Key': 'key'}, - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_user_account_admin_success(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller - # admin) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"})), - # GET of requested user object - ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals(resp.body, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}], - "auth": "plaintext:key"})) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_get_user_account_admin_fail_getting_account_admin( - self): - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin check) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"})), - # GET of requested user object [who is an .admin - # as well] - ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".admin"}], - "auth": "plaintext:key"})), - # GET of user object (reseller admin check [and fail - # here]) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_get_user_account_admin_fail_getting_reseller_admin( - self): - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin check) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"})), - # GET of requested user object [who is a - # .reseller_admin] - ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".reseller_admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_get_user_reseller_admin_fail_getting_reseller_admin( - self): - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin check) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".reseller_admin"}], - "auth": "plaintext:key"})), - # GET of requested user object [who also is a - # .reseller_admin] - ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".reseller_admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_get_user_super_admin_succeed_getting_reseller_admin( - self): - self.test_auth.app = FakeApp(iter([ - # GET of requested user object - ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".reseller_admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assertEquals(resp.body, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".reseller_admin"}], - "auth": "plaintext:key"})) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_user_groups_not_found(self): - self.test_auth.app = FakeApp(iter([ - # GET of account container (list objects) - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act/.groups', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_user_groups_fail_listing(self): - self.test_auth.app = FakeApp(iter([ - # GET of account container (list objects) - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act/.groups', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_user_groups_fail_get_user(self): - self.test_auth.app = FakeApp(iter([ - # GET of account container (list objects) - ('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, - json.dumps([ - {"name": ".services", "hash": "etag", "bytes": 112, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.618110"}, - {"name": "tester", "hash": "etag", "bytes": 104, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:27.736680"}, - {"name": "tester3", "hash": "etag", "bytes": 86, - "content_type": - "application/octet-stream", - "last_modified": "2010-12-03T17:16:28.135530"}])), - # GET of user object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act/.groups', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_get_user_not_found(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_user_fail(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': - 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_put_user_fail_invalid_account(self): - resp = Request.blank('/auth/v2/.invalid/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': - 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_put_user_fail_invalid_user(self): - resp = Request.blank('/auth/v2/act/.usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': - 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_put_user_fail_no_user_key(self): - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_put_user_reseller_admin_fail_bad_creds(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object (reseller admin) - # This shouldn't actually get called, checked - # below - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:rdm"}, - {"name": "test"}, {"name": ".admin"}, - {"name": ".reseller_admin"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - 'act:rdm', - 'X-Auth-Admin-Key': - 'key', - 'X-Auth-User-Key': - 'key', - 'X-Auth-User-Reseller-Admin': 'true'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 0) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but not reseller admin) - # This shouldn't actually get called, checked - # below - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': - 'key', - 'X-Auth-User-Key': - 'key', - 'X-Auth-User-Reseller-Admin': 'true'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 0) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - # This shouldn't actually get called, checked - # below - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, - {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': - 'key', - 'X-Auth-User-Key': - 'key', - 'X-Auth-User-Reseller-Admin': 'true'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 0) - - def test_put_user_account_admin_fail_bad_creds(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but wrong - # account) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - 'act2:adm', - 'X-Auth-Admin-Key': - 'key', - 'X-Auth-User-Key': - 'key', - 'X-Auth-User-Admin': 'true'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, - {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - 'act:usr', - 'X-Auth-Admin-Key': - 'key', - 'X-Auth-User-Key': - 'key', - 'X-Auth-User-Admin': 'true'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_put_user_regular_fail_bad_creds(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but wrong - # account) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - 'act2:adm', - 'X-Auth-Admin-Key': - 'key', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, - {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - 'act:usr', - 'X-Auth-Admin-Key': - 'key', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_put_user_regular_success(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of user object - ('201 Created', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': - 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 201) - self.assertEquals(self.test_auth.app.calls, 2) - self.assertEquals( - json.loads(self.test_auth.app.request.body), - {"groups": [{"name": "act:usr"}, {"name": "act"}], - "auth": "plaintext:key"}) - - def test_put_user_special_chars_success(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of user object - ('201 Created', {}, '')])) - resp = Request.blank('/auth/v2/act/u_s-r', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': - 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 201) - self.assertEquals(self.test_auth.app.calls, 2) - self.assertEquals( - json.loads(self.test_auth.app.request.body), - {"groups": [{"name": "act:u_s-r"}, {"name": "act"}], - "auth": "plaintext:key"}) - - def test_put_user_account_admin_success(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of user object - ('201 Created', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': - 'supertest', - 'X-Auth-User-Key': 'key', - 'X-Auth-User-Admin': 'true'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 201) - self.assertEquals(self.test_auth.app.calls, 2) - self.assertEquals( - json.loads(self.test_auth.app.request.body), - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".admin"}], - "auth": "plaintext:key"}) - - def test_put_user_reseller_admin_success(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of user object - ('201 Created', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': - 'supertest', - 'X-Auth-User-Key': 'key', - 'X-Auth-User-Reseller-Admin': 'true'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 201) - self.assertEquals(self.test_auth.app.calls, 2) - self.assertEquals( - json.loads(self.test_auth.app.request.body), - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".admin"}, {"name": ".reseller_admin"}], - "auth": "plaintext:key"}) - - def test_put_user_fail_not_found(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of user object - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': - 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_put_user_fail(self): - self.test_auth.app = FakeApp(iter([ - # PUT of user object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'PUT'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': - 'supertest', - 'X-Auth-User-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_delete_user_bad_creds(self): - self.test_auth.app = FakeApp(iter([ - # GET of user object (account admin, but wrong - # account) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act2:adm"}, - {"name": "test"}, {"name": ".admin"}], - "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - 'act2:adm', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - self.test_auth.app = FakeApp(iter([ - # GET of user object (regular user) - ('200 Ok', {}, json.dumps({"groups": [{"name": "act:usr"}, - {"name": "test"}], "auth": "plaintext:key"}))])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - 'act:usr', - 'X-Auth-Admin-Key': 'key'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 403) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_delete_user_invalid_account(self): - resp = Request.blank('/auth/v2/.invalid/usr', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_delete_user_invalid_user(self): - resp = Request.blank('/auth/v2/act/.invalid', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_delete_user_not_found(self): - self.test_auth.app = FakeApp(iter([ - # HEAD of user object - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_delete_user_fail_head_user(self): - self.test_auth.app = FakeApp(iter([ - # HEAD of user object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_delete_user_fail_delete_token(self): - self.test_auth.app = FakeApp(iter([ - # HEAD of user object - ('200 Ok', - {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), - # DELETE of token - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_delete_user_fail_delete_user(self): - self.test_auth.app = FakeApp(iter([ - # HEAD of user object - ('200 Ok', - {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), - # DELETE of token - ('204 No Content', {}, ''), - # DELETE of user object - ('503 Service Unavailable', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 500) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_delete_user_success(self): - self.test_auth.app = FakeApp(iter([ - # HEAD of user object - ('200 Ok', - {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), - # DELETE of token - ('204 No Content', {}, ''), - # DELETE of user object - ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_delete_user_success_missing_user_at_end(self): - self.test_auth.app = FakeApp(iter([ - # HEAD of user object - ('200 Ok', - {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), - # DELETE of token - ('204 No Content', {}, ''), - # DELETE of user object - ('404 Not Found', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_delete_user_success_missing_token(self): - self.test_auth.app = FakeApp(iter([ - # HEAD of user object - ('200 Ok', - {'X-Object-Meta-Auth-Token': 'AUTH_tk'}, ''), - # DELETE of token - ('404 Not Found', {}, ''), - # DELETE of user object - ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 3) - - def test_delete_user_success_no_token(self): - self.test_auth.app = FakeApp(iter([ - # HEAD of user object - ('200 Ok', {}, ''), - # DELETE of user object - ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/act/usr', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'} - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_validate_token_bad_prefix(self): - resp = Request.blank('/auth/v2/.token/BAD_token' - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_validate_token_tmi(self): - resp = Request.blank( - '/auth/v2/.token/AUTH_token/tmi').get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - - def test_validate_token_bad_memcache(self): - fake_memcache = FakeMemcache() - fake_memcache.set('AUTH_/auth/AUTH_token', 'bogus') - resp = Request.blank( - '/auth/v2/.token/AUTH_token', - environ={'swift.cache': fake_memcache}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 500) - - def test_validate_token_from_memcache(self): - fake_memcache = FakeMemcache() - fake_memcache.set( - 'AUTH_/auth/AUTH_token', - (time() + 1, - 'act:usr,act')) - resp = Request.blank( - '/auth/v2/.token/AUTH_token', - environ={'swift.cache': fake_memcache}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals( - resp.headers.get('x-auth-groups'), - 'act:usr,act') - self.assert_(float(resp.headers['x-auth-ttl']) < 1, - resp.headers['x-auth-ttl']) - - def test_validate_token_from_memcache_expired(self): - fake_memcache = FakeMemcache() - fake_memcache.set( - 'AUTH_/auth/AUTH_token', - (time() - 1, - 'act:usr,act')) - resp = Request.blank( - '/auth/v2/.token/AUTH_token', - environ={'swift.cache': fake_memcache}).get_response( - self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assert_('x-auth-groups' not in resp.headers) - self.assert_('x-auth-ttl' not in resp.headers) - - def test_validate_token_from_object(self): - self.test_auth.app = FakeApp(iter([ - # GET of token object - ('200 Ok', {}, json.dumps({'groups': [{'name': 'act:usr'}, - {'name': 'act'}], 'expires': time() + 1}))])) - resp = Request.blank('/auth/v2/.token/AUTH_token' - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 1) - self.assertEquals( - resp.headers.get('x-auth-groups'), - 'act:usr,act') - self.assert_(float(resp.headers['x-auth-ttl']) < 1, - resp.headers['x-auth-ttl']) - - def test_validate_token_from_object_expired(self): - self.test_auth.app = FakeApp(iter([ - # GET of token object - ('200 Ok', {}, json.dumps({'groups': 'act:usr,act', - 'expires': time() - 1})), - # DELETE of expired token object - ('204 No Content', {}, '')])) - resp = Request.blank('/auth/v2/.token/AUTH_token' - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertEquals(self.test_auth.app.calls, 2) - - def test_validate_token_from_object_with_admin(self): - self.test_auth.app = FakeApp(iter([ - # GET of token object - ('200 Ok', {}, json.dumps({'account_id': 'AUTH_cfa', 'groups': - [{'name': 'act:usr'}, - {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 1}))])) - resp = Request.blank('/auth/v2/.token/AUTH_token' - ).get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(self.test_auth.app.calls, 1) - self.assertEquals(resp.headers.get('x-auth-groups'), - 'act:usr,act,AUTH_cfa') - self.assert_(float(resp.headers['x-auth-ttl']) < 1, - resp.headers['x-auth-ttl']) - - def test_get_conn_default(self): - conn = self.test_auth.get_conn() - self.assertEquals( - conn.__class__, - auth.HTTPConnection) - self.assertEquals(conn.host, '127.0.0.1') - self.assertEquals(conn.port, 8080) - - def test_get_conn_default_https(self): - local_auth = auth.filter_factory( - {'super_admin_key': 'supertest', - 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp()) - conn = local_auth.get_conn() - self.assertEquals( - conn.__class__, - auth.HTTPSConnection) - self.assertEquals(conn.host, '1.2.3.4') - self.assertEquals(conn.port, 443) - - def test_get_conn_overridden(self): - local_auth = auth.filter_factory( - {'super_admin_key': 'supertest', - 'default_swift_cluster': 'local#https://1.2.3.4/v1'})(FakeApp()) - conn = \ - local_auth.get_conn( - urlparsed=auth.urlparse('http://5.6.7.8/v1')) - self.assertEquals( - conn.__class__, - auth.HTTPConnection) - self.assertEquals(conn.host, '5.6.7.8') - self.assertEquals(conn.port, 80) - - def test_get_conn_overridden_https(self): - local_auth = auth.filter_factory( - {'super_admin_key': 'supertest', - 'default_swift_cluster': 'local#http://1.2.3.4/v1'})(FakeApp()) - conn = \ - local_auth.get_conn( - urlparsed=auth.urlparse( - 'https://5.6.7.8/v1')) - self.assertEquals( - conn.__class__, - auth.HTTPSConnection) - self.assertEquals(conn.host, '5.6.7.8') - self.assertEquals(conn.port, 443) - - def test_get_itoken_fail_no_memcache(self): - exc = None - try: - self.test_auth.get_itoken({}) - except Exception as err: - exc = err - self.assertEquals(str(exc), - 'No memcache set up; required for Swauth middleware') - - def test_get_itoken_success(self): - fmc = FakeMemcache() - itk = self.test_auth.get_itoken( - {'swift.cache': fmc}) - self.assert_(itk.startswith('AUTH_itk'), itk) - expires, groups = fmc.get('AUTH_/auth/%s' % itk) - self.assert_(expires > time(), expires) - self.assertEquals( - groups, - '.auth,.reseller_admin,AUTH_.auth') - - def test_get_admin_detail_fail_no_colon(self): - self.test_auth.app = FakeApp(iter([])) - self.assertEquals( - self.test_auth.get_admin_detail( - Request.blank('/')), - None) - self.assertEquals( - self.test_auth.get_admin_detail( - Request.blank('/', - headers={'X-Auth-Admin-User': 'usr'})), None) - self.assertRaises( - StopIteration, self.test_auth.get_admin_detail, - Request.blank('/', headers={'X-Auth-Admin-User': 'act:usr'})) - - def test_get_admin_detail_fail_user_not_found(self): - self.test_auth.app = FakeApp( - iter([('404 Not Found', {}, '')])) - self.assertEquals( - self.test_auth.get_admin_detail( - Request.blank('/', - headers={'X-Auth-Admin-User': 'act:usr'})), None) - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_admin_detail_fail_get_user_error(self): - self.test_auth.app = FakeApp(iter([ - ('503 Service Unavailable', {}, '')])) - exc = None - try: - self.test_auth.get_admin_detail( - Request.blank('/', - headers={'X-Auth-Admin-User': 'act:usr'})) - except Exception as err: - exc = err - self.assertEquals(str(exc), 'Could not get admin user object: ' - '/v1/AUTH_.auth/act/usr 503 Service Unavailable') - self.assertEquals(self.test_auth.app.calls, 1) - - def test_get_admin_detail_success(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps({"auth": "plaintext:key", - "groups": [{'name': "act:usr"}, {'name': "act"}, - {'name': ".admin"}]}))])) - detail = self.test_auth.get_admin_detail( - Request.blank('/', - headers={'X-Auth-Admin-User': 'act:usr'})) - self.assertEquals(self.test_auth.app.calls, 1) - self.assertEquals( - detail, {'account': 'act', - 'auth': 'plaintext:key', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}]}) - - def test_credentials_match_success(self): - self.assert_(self.test_auth.credentials_match( - {'auth': 'plaintext:key'}, 'key')) - - def test_credentials_match_fail_no_details(self): - self.assert_( - not self.test_auth.credentials_match(None, 'notkey')) - - def test_credentials_match_fail_plaintext(self): - self.assert_(not self.test_auth.credentials_match( - {'auth': 'plaintext:key'}, 'notkey')) - - def test_is_super_admin_success(self): - self.assert_( - self.test_auth.is_super_admin( - Request.blank( - '/', - headers={'X-Auth-Admin-User': '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}))) - - def test_is_super_admin_fail_bad_key(self): - self.assert_( - not self.test_auth.is_super_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'bad'}))) - self.assert_( - not self.test_auth.is_super_admin( - Request.blank('/', - headers={'X-Auth-Admin-User': '.super_admin'}))) - self.assert_( - not self.test_auth.is_super_admin(Request.blank('/'))) - - def test_is_super_admin_fail_bad_user(self): - self.assert_( - not self.test_auth.is_super_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - 'bad', - 'X-Auth-Admin-Key': 'supertest'}))) - self.assert_( - not self.test_auth.is_super_admin( - Request.blank('/', - headers={'X-Auth-Admin-Key': 'supertest'}))) - self.assert_( - not self.test_auth.is_super_admin(Request.blank('/'))) - - def test_is_reseller_admin_success_is_super_admin(self): - self.assert_( - self.test_auth.is_reseller_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}))) - - def test_is_reseller_admin_success_called_get_admin_detail( - self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps({'auth': 'plaintext:key', - 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, - {'name': '.admin'}, - {'name': '.reseller_admin'}]}))])) - self.assert_( - self.test_auth.is_reseller_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - 'act:rdm', - 'X-Auth-Admin-Key': 'key'}))) - - def test_is_reseller_admin_fail_only_account_admin( - self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps({'auth': 'plaintext:key', - 'groups': [{'name': 'act:adm'}, {'name': 'act'}, - {'name': '.admin'}]}))])) - self.assert_( - not self.test_auth.is_reseller_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': 'key'}))) - - def test_is_reseller_admin_fail_regular_user(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps({'auth': 'plaintext:key', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))])) - self.assert_( - not self.test_auth.is_reseller_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - 'act:usr', - 'X-Auth-Admin-Key': 'key'}))) - - def test_is_reseller_admin_fail_bad_key(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps({'auth': 'plaintext:key', - 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, - {'name': '.admin'}, - {'name': '.reseller_admin'}]}))])) - self.assert_( - not self.test_auth.is_reseller_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - 'act:rdm', - 'X-Auth-Admin-Key': 'bad'}))) - - def test_is_account_admin_success_is_super_admin(self): - self.assert_( - self.test_auth.is_account_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - '.super_admin', - 'X-Auth-Admin-Key': 'supertest'}), 'act')) - - def test_is_account_admin_success_is_reseller_admin( - self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps({'auth': 'plaintext:key', - 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, - {'name': '.admin'}, - {'name': '.reseller_admin'}]}))])) - self.assert_( - self.test_auth.is_account_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - 'act:rdm', - 'X-Auth-Admin-Key': 'key'}), 'act')) - - def test_is_account_admin_success(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps({'auth': 'plaintext:key', - 'groups': [{'name': 'act:adm'}, {'name': 'act'}, - {'name': '.admin'}]}))])) - self.assert_( - self.test_auth.is_account_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - 'act:adm', - 'X-Auth-Admin-Key': 'key'}), 'act')) - - def test_is_account_admin_fail_account_admin_different_account( - self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps({'auth': 'plaintext:key', - 'groups': [{'name': 'act2:adm'}, {'name': 'act2'}, - {'name': '.admin'}]}))])) - self.assert_( - not self.test_auth.is_account_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - 'act2:adm', - 'X-Auth-Admin-Key': 'key'}), 'act')) - - def test_is_account_admin_fail_regular_user(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps({'auth': 'plaintext:key', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}]}))])) - self.assert_( - not self.test_auth.is_account_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - 'act:usr', - 'X-Auth-Admin-Key': 'key'}), 'act')) - - def test_is_account_admin_fail_bad_key(self): - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps({'auth': 'plaintext:key', - 'groups': [{'name': 'act:rdm'}, {'name': 'act'}, - {'name': '.admin'}, - {'name': '.reseller_admin'}]}))])) - self.assert_( - not self.test_auth.is_account_admin( - Request.blank('/', - headers={ - 'X-Auth-Admin-User': - 'act:rdm', - 'X-Auth-Admin-Key': 'bad'}), 'act')) - - def test_reseller_admin_but_account_is_internal_use_only( - self): - req = Request.blank('/v1/AUTH_.auth', - environ={'REQUEST_METHOD': 'GET'}) - req.remote_user = 'act:usr,act,.reseller_admin' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - def test_reseller_admin_but_account_is_exactly_reseller_prefix( - self): - req = Request.blank( - '/v1/AUTH_', - environ={'REQUEST_METHOD': 'GET'}) - req.remote_user = 'act:usr,act,.reseller_admin' - resp = self.test_auth.authorize(req) - self.assertEquals(resp.status_int, 403) - - def _get_token_success_v1_0_encoded( - self, saved_user, saved_key, sent_user, - sent_key): - self.test_auth.app = FakeApp(iter([ - # GET of user object - ('200 Ok', {}, - json.dumps({"auth": "plaintext:%s" % saved_key, - "groups": [{'name': saved_user}, {'name': "act"}, - {'name': ".admin"}]})), - # GET of account - ('204 Ok', - {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''), - # PUT of new token - ('201 Created', {}, ''), - # POST of token to user object - ('204 No Content', {}, ''), - # GET of services object - ('200 Ok', {}, json.dumps({"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}))])) - resp = Request.blank( - '/auth/v1.0', - headers={'X-Auth-User': sent_user, - 'X-Auth-Key': sent_key}).get_response(self.test_auth) - self.assertEquals(resp.status_int, 200) - self.assert_( - resp.headers.get('x-auth-token', - '').startswith('AUTH_tk'), - resp.headers.get('x-auth-token')) - self.assertEquals(resp.headers.get('x-auth-token'), - resp.headers.get('x-storage-token')) - self.assertEquals(resp.headers.get('x-storage-url'), - 'http://127.0.0.1:8080/v1/AUTH_cfa') - self.assertEquals( - json.loads(resp.body), - {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_cfa"}}) - self.assertEquals(self.test_auth.app.calls, 5) - - def test_get_token_success_v1_0_encoded1(self): - self._get_token_success_v1_0_encoded( - 'act:usr', 'key', 'act%3ausr', 'key') - - def test_get_token_success_v1_0_encoded2(self): - self._get_token_success_v1_0_encoded( - 'act:u s r', 'key', 'act%3au%20s%20r', 'key') - - def test_get_token_success_v1_0_encoded3(self): - self._get_token_success_v1_0_encoded( - 'act:u s r', 'k:e:y', 'act%3au%20s%20r', 'k%3Ae%3ay') - - def test_allowed_sync_hosts(self): - a = auth.filter_factory( - {'super_admin_key': 'supertest'})(FakeApp()) - self.assertEquals( - a.allowed_sync_hosts, - ['127.0.0.1']) - a = auth.filter_factory({ - 'super_admin_key': 'supertest', - 'allowed_sync_hosts': - '1.1.1.1,2.1.1.1, 3.1.1.1 , 4.1.1.1,, , 5.1.1.1'})(FakeApp()) - self.assertEquals( - a.allowed_sync_hosts, - ['1.1.1.1', '2.1.1.1', '3.1.1.1', '4.1.1.1', '5.1.1.1']) - - def test_reseller_admin_is_owner(self): - orig_authorize = self.test_auth.authorize - owner_values = [] - - def mitm_authorize(req): - rv = orig_authorize(req) - owner_values.append( - req.environ.get('swift_owner', False)) - return rv - - self.test_auth.authorize = mitm_authorize - - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps( - {'account': 'other', 'user': 'other:usr', - 'account_id': 'AUTH_other', - 'groups': [{'name': 'other:usr'}, {'name': 'other'}, - {'name': '.reseller_admin'}], - 'expires': time() + 60})), - ('204 No Content', {}, '')])) - req = Request.blank( - '/v1/AUTH_cfa', - headers={'X-Auth-Token': 'AUTH_t'}) - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(owner_values, [True]) - - def test_admin_is_owner(self): - orig_authorize = self.test_auth.authorize - owner_values = [] - - def mitm_authorize(req): - rv = orig_authorize(req) - owner_values.append( - req.environ.get('swift_owner', False)) - return rv - - self.test_auth.authorize = mitm_authorize - - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps( - {'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': [{'name': 'act:usr'}, {'name': 'act'}, - {'name': '.admin'}], - 'expires': time() + 60})), - ('204 No Content', {}, '')])) - req = Request.blank( - '/v1/AUTH_cfa', - headers={'X-Auth-Token': 'AUTH_t'}) - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(owner_values, [True]) - - def test_regular_is_not_owner(self): - orig_authorize = self.test_auth.authorize - owner_values = [] - - def mitm_authorize(req): - rv = orig_authorize(req) - owner_values.append( - req.environ.get('swift_owner', False)) - return rv - - self.test_auth.authorize = mitm_authorize - - self.test_auth.app = FakeApp(iter([ - ('200 Ok', {}, - json.dumps( - {'account': 'act', 'user': 'act:usr', - 'account_id': 'AUTH_cfa', - 'groups': - [{'name': 'act:usr'}, { - 'name': 'act'}], - 'expires': time() + 60})), - ('204 No Content', {}, '')]), acl='act:usr') - req = Request.blank('/v1/AUTH_cfa/c', - headers={'X-Auth-Token': 'AUTH_t'}) - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - self.assertEquals(owner_values, [False]) - - def test_sync_request_success(self): - self.test_auth.app = FakeApp( - iter([('204 No Content', {}, '')]), - sync_key='secret') - req = Request.blank('/v1/AUTH_cfa/c/o', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'x-container-sync-key': - 'secret', - 'x-timestamp': '123.456'}) - req.remote_addr = '127.0.0.1' - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - - def test_sync_request_fail_key(self): - self.test_auth.app = FakeApp( - iter([('204 No Content', {}, '')]), - sync_key='secret') - req = Request.blank('/v1/AUTH_cfa/c/o', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'x-container-sync-key': - 'wrongsecret', - 'x-timestamp': '123.456'}) - req.remote_addr = '127.0.0.1' - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - - self.test_auth.app = FakeApp( - iter([('204 No Content', {}, '')]), - sync_key='othersecret') - req = Request.blank('/v1/AUTH_cfa/c/o', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'x-container-sync-key': - 'secret', - 'x-timestamp': '123.456'}) - req.remote_addr = '127.0.0.1' - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - - self.test_auth.app = FakeApp( - iter([('204 No Content', {}, '')]), - sync_key=None) - req = Request.blank('/v1/AUTH_cfa/c/o', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'x-container-sync-key': - 'secret', - 'x-timestamp': '123.456'}) - req.remote_addr = '127.0.0.1' - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - - def test_sync_request_fail_no_timestamp(self): - self.test_auth.app = FakeApp( - iter([('204 No Content', {}, '')]), - sync_key='secret') - req = Request.blank('/v1/AUTH_cfa/c/o', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={'x-container-sync-key': 'secret'}) - req.remote_addr = '127.0.0.1' - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - - def test_sync_request_fail_sync_host(self): - self.test_auth.app = FakeApp( - iter([('204 No Content', {}, '')]), - sync_key='secret') - req = Request.blank('/v1/AUTH_cfa/c/o', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'x-container-sync-key': - 'secret', - 'x-timestamp': '123.456'}) - req.remote_addr = '127.0.0.2' - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - - def test_sync_request_success_lb_sync_host(self): - self.test_auth.app = FakeApp( - iter([('204 No Content', {}, '')]), - sync_key='secret') - req = Request.blank('/v1/AUTH_cfa/c/o', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'x-container-sync-key': - 'secret', - 'x-timestamp': - '123.456', - 'x-forwarded-for': '127.0.0.1'}) - req.remote_addr = '127.0.0.2' - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - - self.test_auth.app = FakeApp( - iter([('204 No Content', {}, '')]), - sync_key='secret') - req = Request.blank('/v1/AUTH_cfa/c/o', - environ={ - 'REQUEST_METHOD': 'DELETE'}, - headers={ - 'x-container-sync-key': - 'secret', - 'x-timestamp': - '123.456', - 'x-cluster-client-ip': '127.0.0.1'}) - req.remote_addr = '127.0.0.2' - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 204) - - def _make_request(self, path, **kwargs): - req = Request.blank(path, **kwargs) - req.environ['swift.cache'] = FakeMemcache() - return req - - def test_override_asked_for_but_not_allowed(self): - self.test_auth = \ - auth.filter_factory( - {'allow_overrides': 'false'})(FakeApp()) - req = self._make_request('/v1/AUTH_account', - environ={'swift.authorize_override': True}) - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - self.assertEquals(resp.environ['swift.authorize'], - self.test_auth.authorize) - - def test_override_asked_for_and_allowed(self): - self.test_auth = \ - auth.filter_factory( - {'allow_overrides': 'true'})(FakeApp()) - req = self._make_request('/v1/AUTH_account', - environ={'swift.authorize_override': True}) - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertTrue( - 'swift.authorize' not in resp.environ) - - def test_override_default_allowed(self): - req = self._make_request('/v1/AUTH_account', - environ={'swift.authorize_override': True}) - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 404) - self.assertTrue( - 'swift.authorize' not in resp.environ) - - def test_token_too_long(self): - req = self._make_request('/v1/AUTH_account', headers={ - 'x-auth-token': 'a' * MAX_TOKEN_LENGTH}) - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - self.assertNotEquals( - resp.body, - 'Token exceeds maximum length.') - req = self._make_request('/v1/AUTH_account', headers={ - 'x-auth-token': 'a' * (MAX_TOKEN_LENGTH + 1)}) - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 400) - self.assertEquals( - resp.body, - 'Token exceeds maximum length.') - - def test_crazy_authorization(self): - req = self._make_request('/v1/AUTH_account', headers={ - 'authorization': 'somebody elses header value'}) - resp = req.get_response(self.test_auth) - self.assertEquals(resp.status_int, 401) - self.assertEquals(resp.environ['swift.authorize'], - self.test_auth.denied_response) - - -if __name__ == '__main__': - unittest.main() diff --git a/gluster/swift/common/middleware/swiftkerbauth/__init__.py b/gluster/swift/common/middleware/swiftkerbauth/__init__.py deleted file mode 100644 index c752df7..0000000 --- a/gluster/swift/common/middleware/swiftkerbauth/__init__.py +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from swift.common.utils import readconf, config_true_value - -config_file = {} -try: - config_file = readconf("/etc/swift/proxy-server.conf", - section_name="filter:cache") -except SystemExit: - pass - -MEMCACHE_SERVERS = config_file.get('memcache_servers', None) - -config_file = {} - -try: - config_file = readconf("/etc/swift/proxy-server.conf", - section_name="filter:kerbauth") -except SystemExit: - pass - -TOKEN_LIFE = int(config_file.get('token_life', 86400)) -RESELLER_PREFIX = config_file.get('reseller_prefix', "AUTH_") -DEBUG_HEADERS = config_true_value(config_file.get('debug_headers', 'yes')) diff --git a/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/etc/httpd/conf.d/swift-auth.conf b/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/etc/httpd/conf.d/swift-auth.conf deleted file mode 100644 index 68472d8..0000000 --- a/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/etc/httpd/conf.d/swift-auth.conf +++ /dev/null @@ -1,12 +0,0 @@ -<Location /cgi-bin/swift-auth> - AuthType Kerberos - AuthName "Swift Authentication" - KrbMethodNegotiate On - KrbMethodK5Passwd On - KrbSaveCredentials On - KrbServiceName HTTP/client.example.com - KrbAuthRealms EXAMPLE.COM - Krb5KeyTab /etc/httpd/conf/http.keytab - KrbVerifyKDC Off - Require valid-user -</Location> diff --git a/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/var/www/cgi-bin/swift-auth b/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/var/www/cgi-bin/swift-auth deleted file mode 100755 index 11fe0e2..0000000 --- a/gluster/swift/common/middleware/swiftkerbauth/apachekerbauth/var/www/cgi-bin/swift-auth +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/python - -# Copyright (c) 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Requires the following command to be run: -# setsebool -P httpd_can_network_connect 1 -# setsebool -P httpd_can_network_memcache 1 - -import os -import cgi -from swift.common.memcached import MemcacheRing -from time import time, ctime -from swiftkerbauth import MEMCACHE_SERVERS, TOKEN_LIFE, DEBUG_HEADERS -from swiftkerbauth.kerbauth_utils import get_remote_user, get_auth_data, \ - generate_token, set_auth_data, get_groups_from_username - - -def main(): - try: - username = get_remote_user(os.environ) - except RuntimeError: - print "Status: 401 Unauthorized\n" - print "Malformed REMOTE_USER" - return - - if not MEMCACHE_SERVERS: - print "Status: 500 Internal Server Error\n" - print "Memcache not configured in /etc/swift/proxy-server.conf" - return - - mc_servers = [s.strip() for s in MEMCACHE_SERVERS.split(',') if s.strip()] - mc = MemcacheRing(mc_servers) - - token, expires, groups = get_auth_data(mc, username) - - if not token: - token = generate_token() - expires = time() + TOKEN_LIFE - groups = get_groups_from_username(username) - set_auth_data(mc, username, token, expires, groups) - - print "X-Auth-Token: %s" % token - print "X-Storage-Token: %s" % token - - # For debugging. - if DEBUG_HEADERS: - print "X-Debug-Remote-User: %s" % username - print "X-Debug-Groups: %s" % groups - print "X-Debug-Token-Life: %ss" % TOKEN_LIFE - print "X-Debug-Token-Expires: %s" % ctime(expires) - - print "" - -try: - print("Content-Type: text/html") - main() -except: - cgi.print_exception() diff --git a/gluster/swift/common/middleware/swiftkerbauth/kerbauth.py b/gluster/swift/common/middleware/swiftkerbauth/kerbauth.py deleted file mode 100644 index 1a63a40..0000000 --- a/gluster/swift/common/middleware/swiftkerbauth/kerbauth.py +++ /dev/null @@ -1,463 +0,0 @@ -# Copyright (c) 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import errno -from time import time, ctime -from traceback import format_exc -from eventlet import Timeout -from urllib import unquote - -from swift.common.swob import Request, Response -from swift.common.swob import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \ - HTTPSeeOther, HTTPUnauthorized, HTTPServerError - -from swift.common.middleware.acl import clean_acl, parse_acl, referrer_allowed -from swift.common.utils import cache_from_env, get_logger, \ - split_path, config_true_value -from gluster.swift.common.middleware.swiftkerbauth.kerbauth_utils import \ - get_auth_data, generate_token, \ - set_auth_data, run_kinit, get_groups_from_username - - -class KerbAuth(object): - """ - Test authentication and authorization system. - - Add to your pipeline in proxy-server.conf, such as:: - - [pipeline:main] - pipeline = catch_errors cache kerbauth proxy-server - - Set account auto creation to true in proxy-server.conf:: - - [app:proxy-server] - account_autocreate = true - - And add a kerbauth filter section, such as:: - - [filter:kerbauth] - use = egg:swiftkerbauth#kerbauth - - See the proxy-server.conf-sample for more information. - - :param app: The next WSGI app in the pipeline - :param conf: The dict of configuration values - """ - - def __init__(self, app, conf): - self.app = app - self.conf = conf - self.logger = get_logger(conf, log_route='kerbauth') - self.log_headers = config_true_value(conf.get('log_headers', 'f')) - self.reseller_prefix = conf.get('reseller_prefix', 'AUTH').strip() - if self.reseller_prefix and self.reseller_prefix[-1] != '_': - self.reseller_prefix += '_' - self.logger.set_statsd_prefix('kerbauth.%s' % ( - self.reseller_prefix if self.reseller_prefix else 'NONE',)) - self.auth_prefix = conf.get('auth_prefix', '/auth/') - if not self.auth_prefix or not self.auth_prefix.strip('/'): - self.logger.warning('Rewriting invalid auth prefix "%s" to ' - '"/auth/" (Non-empty auth prefix path ' - 'is required)' % self.auth_prefix) - self.auth_prefix = '/auth/' - if self.auth_prefix[0] != '/': - self.auth_prefix = '/' + self.auth_prefix - if self.auth_prefix[-1] != '/': - self.auth_prefix += '/' - self.token_life = int(conf.get('token_life', 86400)) - self.auth_method = conf.get('auth_method', 'passive') - self.debug_headers = config_true_value( - conf.get('debug_headers', 'yes')) - self.realm_name = conf.get('realm_name', None) - self.allow_overrides = config_true_value( - conf.get('allow_overrides', 't')) - self.storage_url_scheme = conf.get('storage_url_scheme', 'default') - self.ext_authentication_url = conf.get('ext_authentication_url') - if not self.ext_authentication_url: - raise RuntimeError("Missing filter parameter ext_authentication_" - "url in /etc/swift/proxy-server.conf") - - def __call__(self, env, start_response): - """ - Accepts a standard WSGI application call, authenticating the request - and installing callback hooks for authorization and ACL header - validation. For an authenticated request, REMOTE_USER will be set to a - comma separated list of the user's groups. - - If the request matches the self.auth_prefix, the request will be - routed through the internal auth request handler (self.handle). - This is to handle granting tokens, etc. - """ - if self.allow_overrides and env.get('swift.authorize_override', False): - return self.app(env, start_response) - if env.get('PATH_INFO', '').startswith(self.auth_prefix): - return self.handle(env, start_response) - token = env.get('HTTP_X_AUTH_TOKEN', env.get('HTTP_X_STORAGE_TOKEN')) - if token and token.startswith(self.reseller_prefix): - groups = self.get_groups(env, token) - if groups: - user = groups and groups.split(',', 1)[0] or '' - trans_id = env.get('swift.trans_id') - self.logger.debug('User: %s uses token %s (trans_id %s)' % - (user, token, trans_id)) - env['REMOTE_USER'] = groups - env['swift.authorize'] = self.authorize - env['swift.clean_acl'] = clean_acl - if '.reseller_admin' in groups: - env['reseller_request'] = True - else: - # Invalid token (may be expired) - if self.auth_method == "active": - return HTTPSeeOther( - location=self.ext_authentication_url)(env, - start_response) - elif self.auth_method == "passive": - self.logger.increment('unauthorized') - return HTTPUnauthorized()(env, start_response) - else: - # With a non-empty reseller_prefix, I would like to be called - # back for anonymous access to accounts I know I'm the - # definitive auth for. - try: - version, rest = split_path(env.get('PATH_INFO', ''), - 1, 2, True) - except ValueError: - version, rest = None, None - self.logger.increment('errors') - # Not my token, not my account, I can't authorize this request, - # deny all is a good idea if not already set... - if 'swift.authorize' not in env: - env['swift.authorize'] = self.denied_response - - return self.app(env, start_response) - - def get_groups(self, env, token): - """ - Get groups for the given token. - - :param env: The current WSGI environment dictionary. - :param token: Token to validate and return a group string for. - - :returns: None if the token is invalid or a string containing a comma - separated list of groups the authenticated user is a member - of. The first group in the list is also considered a unique - identifier for that user. - """ - groups = None - memcache_client = cache_from_env(env) - if not memcache_client: - raise Exception('Memcache required') - memcache_token_key = '%s/token/%s' % (self.reseller_prefix, token) - cached_auth_data = memcache_client.get(memcache_token_key) - if cached_auth_data: - expires, groups = cached_auth_data - if expires < time(): - groups = None - - return groups - - def authorize(self, req): - """ - Returns None if the request is authorized to continue or a standard - WSGI response callable if not. - - Assumes that user groups are all lower case, which is true when Red Hat - Enterprise Linux Identity Management is used. - """ - try: - version, account, container, obj = req.split_path(1, 4, True) - except ValueError: - self.logger.increment('errors') - return HTTPNotFound(request=req) - - if not account or not account.startswith(self.reseller_prefix): - self.logger.debug("Account name: %s doesn't start with " - "reseller_prefix: %s." - % (account, self.reseller_prefix)) - return self.denied_response(req) - - user_groups = (req.remote_user or '').split(',') - account_user = user_groups[1] if len(user_groups) > 1 else None - # If the user is in the reseller_admin group for our prefix, he gets - # full access to all accounts we manage. For the default reseller - # prefix, the group name is auth_reseller_admin. - admin_group = ("%sreseller_admin" % self.reseller_prefix).lower() - if admin_group in user_groups and \ - account != self.reseller_prefix and \ - account[len(self.reseller_prefix)] != '.': - req.environ['swift_owner'] = True - return None - - # The "account" is part of the request URL, and already contains the - # reseller prefix, like in "/v1/AUTH_vol1/pictures/pic1.png". - if account.lower() in user_groups and \ - (req.method not in ('DELETE', 'PUT') or container): - # If the user is admin for the account and is not trying to do an - # account DELETE or PUT... - req.environ['swift_owner'] = True - self.logger.debug("User %s has admin authorizing." - % account_user) - return None - - if (req.environ.get('swift_sync_key') - and (req.environ['swift_sync_key'] == - req.headers.get('x-container-sync-key', None)) - and 'x-timestamp' in req.headers): - self.logger.debug("Allow request with container sync-key: %s." - % req.environ['swift_sync_key']) - return None - - if req.method == 'OPTIONS': - #allow OPTIONS requests to proceed as normal - self.logger.debug("Allow OPTIONS request.") - return None - - referrers, groups = parse_acl(getattr(req, 'acl', None)) - - if referrer_allowed(req.referer, referrers): - if obj or '.rlistings' in groups: - self.logger.debug("Allow authorizing %s via referer ACL." - % req.referer) - return None - - for user_group in user_groups: - if user_group in groups: - self.logger.debug("User %s allowed in ACL: %s authorizing." - % (account_user, user_group)) - return None - - return self.denied_response(req) - - def denied_response(self, req): - """ - Returns a standard WSGI response callable with the status of 403 or 401 - depending on whether the REMOTE_USER is set or not. - """ - if req.remote_user: - self.logger.increment('forbidden') - return HTTPForbidden(request=req) - else: - if self.auth_method == "active": - return HTTPSeeOther(location=self.ext_authentication_url) - elif self.auth_method == "passive": - self.logger.increment('unauthorized') - return HTTPUnauthorized(request=req) - - def handle(self, env, start_response): - """ - WSGI entry point for auth requests (ones that match the - self.auth_prefix). - Wraps env in swob.Request object and passes it down. - - :param env: WSGI environment dictionary - :param start_response: WSGI callable - """ - try: - req = Request(env) - if self.auth_prefix: - req.path_info_pop() - req.bytes_transferred = '-' - req.client_disconnect = False - if 'x-storage-token' in req.headers and \ - 'x-auth-token' not in req.headers: - req.headers['x-auth-token'] = req.headers['x-storage-token'] - return self.handle_request(req)(env, start_response) - except (Exception, Timeout): - print "EXCEPTION IN handle: %s: %s" % (format_exc(), env) - self.logger.increment('errors') - start_response('500 Server Error', - [('Content-Type', 'text/plain')]) - return ['Internal server error.\n'] - - def handle_request(self, req): - """ - Entry point for auth requests (ones that match the self.auth_prefix). - Should return a WSGI-style callable (such as webob.Response). - - :param req: swob.Request object - """ - req.start_time = time() - handler = None - try: - version, account, user, _junk = req.split_path(1, 4, True) - except ValueError: - self.logger.increment('errors') - return HTTPNotFound(request=req) - if version in ('v1', 'v1.0', 'auth'): - if req.method == 'GET': - handler = self.handle_get_token - if not handler: - self.logger.increment('errors') - req.response = HTTPBadRequest(request=req) - else: - req.response = handler(req) - return req.response - - def handle_get_token(self, req): - """ - Handles the various `request for token and service end point(s)` calls. - There are various formats to support the various auth servers in the - past. - - "Active Mode" usage: - All formats require GSS (Kerberos) authentication. - - GET <auth-prefix>/v1/<act>/auth - GET <auth-prefix>/auth - GET <auth-prefix>/v1.0 - - On successful authentication, the response will have X-Auth-Token - and X-Storage-Token set to the token to use with Swift. - - "Passive Mode" usage:: - - GET <auth-prefix>/v1/<act>/auth - X-Auth-User: <act>:<usr> or X-Storage-User: <usr> - X-Auth-Key: <key> or X-Storage-Pass: <key> - GET <auth-prefix>/auth - X-Auth-User: <act>:<usr> or X-Storage-User: <act>:<usr> - X-Auth-Key: <key> or X-Storage-Pass: <key> - GET <auth-prefix>/v1.0 - X-Auth-User: <act>:<usr> or X-Storage-User: <act>:<usr> - X-Auth-Key: <key> or X-Storage-Pass: <key> - - Values should be url encoded, "act%3Ausr" instead of "act:usr" for - example; however, for backwards compatibility the colon may be - included unencoded. - - On successful authentication, the response will have X-Auth-Token - and X-Storage-Token set to the token to use with Swift and - X-Storage-URL set to the URL to the default Swift cluster to use. - - :param req: The swob.Request to process. - :returns: swob.Response, 2xx on success with data set as explained - above. - """ - # Validate the request info - try: - pathsegs = split_path(req.path_info, 1, 3, True) - except ValueError: - self.logger.increment('errors') - return HTTPNotFound(request=req) - if not ((pathsegs[0] == 'v1' and pathsegs[2] == 'auth') - or pathsegs[0] in ('auth', 'v1.0')): - return HTTPBadRequest(request=req) - - # Client is inside the domain - if self.auth_method == "active": - return HTTPSeeOther(location=self.ext_authentication_url) - - # Client is outside the domain - elif self.auth_method == "passive": - account, user, key = None, None, None - # Extract user, account and key from request - if pathsegs[0] == 'v1' and pathsegs[2] == 'auth': - account = pathsegs[1] - user = req.headers.get('x-storage-user') - if not user: - user = unquote(req.headers.get('x-auth-user', '')) - if user: - if ':' not in user: - return HTTPUnauthorized(request=req) - else: - account2, user = user.split(':', 1) - if account != account2: - return HTTPUnauthorized(request=req) - key = req.headers.get('x-storage-pass') - if not key: - key = unquote(req.headers.get('x-auth-key', '')) - elif pathsegs[0] in ('auth', 'v1.0'): - user = unquote(req.headers.get('x-auth-user', '')) - if not user: - user = req.headers.get('x-storage-user') - if user: - if ':' not in user: - return HTTPUnauthorized(request=req) - else: - account, user = user.split(':', 1) - key = unquote(req.headers.get('x-auth-key', '')) - if not key: - key = req.headers.get('x-storage-pass') - - if not (account or user or key): - # If all are not given, client may be part of the domain - return HTTPSeeOther(location=self.ext_authentication_url) - elif None in (key, user, account): - # If only one or two of them is given, but not all - return HTTPUnauthorized(request=req) - - # Run kinit on the user - if self.realm_name and "@" not in user: - user = user + "@" + self.realm_name - try: - ret = run_kinit(user, key) - except OSError as e: - if e.errno == errno.ENOENT: - return HTTPServerError("kinit command not found\n") - if ret != 0: - self.logger.warning("Failed: kinit %s", user) - if ret == -1: - self.logger.warning("Failed: kinit: Password has probably " - "expired.") - return HTTPServerError("Kinit is taking too long.\n") - return HTTPUnauthorized(request=req) - self.logger.debug("kinit succeeded") - - if "@" in user: - user = user.split("@")[0] - - # Check if user really belongs to the account - groups_list = get_groups_from_username(user).strip().split(",") - user_group = ("%s%s" % (self.reseller_prefix, account)).lower() - reseller_admin_group = \ - ("%sreseller_admin" % self.reseller_prefix).lower() - if user_group not in groups_list: - # Check if user is reseller_admin. If not, return Unauthorized. - # On AD/IdM server, auth_reseller_admin is a separate group - if reseller_admin_group not in groups_list: - return HTTPUnauthorized(request=req) - - mc = cache_from_env(req.environ) - if not mc: - raise Exception('Memcache required') - token, expires, groups = get_auth_data(mc, user) - if not token: - token = generate_token() - expires = time() + self.token_life - groups = get_groups_from_username(user) - set_auth_data(mc, user, token, expires, groups) - - headers = {'X-Auth-Token': token, - 'X-Storage-Token': token} - - if self.debug_headers: - headers.update({'X-Debug-Remote-User': user, - 'X-Debug-Groups:': groups, - 'X-Debug-Token-Life': self.token_life, - 'X-Debug-Token-Expires': ctime(expires)}) - - resp = Response(request=req, headers=headers) - resp.headers['X-Storage-Url'] = \ - '%s/v1/%s%s' % (resp.host_url, self.reseller_prefix, account) - return resp - - -def filter_factory(global_conf, **local_conf): - """Returns a WSGI filter app for use with paste.deploy.""" - conf = global_conf.copy() - conf.update(local_conf) - - def auth_filter(app): - return KerbAuth(app, conf) - return auth_filter diff --git a/gluster/swift/common/middleware/swiftkerbauth/kerbauth_utils.py b/gluster/swift/common/middleware/swiftkerbauth/kerbauth_utils.py deleted file mode 100644 index 599ef99..0000000 --- a/gluster/swift/common/middleware/swiftkerbauth/kerbauth_utils.py +++ /dev/null @@ -1,137 +0,0 @@ -# Copyright (c) 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import re -import random -import grp -import signal -from subprocess import Popen, PIPE -from time import time -from gluster.swift.common.middleware.swiftkerbauth \ - import TOKEN_LIFE, RESELLER_PREFIX - - -def get_remote_user(env): - """Retrieve REMOTE_USER set by Apache from environment.""" - remote_user = env.get('REMOTE_USER', "") - matches = re.match('([^@]+)@.*', remote_user) - if not matches: - raise RuntimeError("Malformed REMOTE_USER \"%s\"" % remote_user) - return matches.group(1) - - -def get_auth_data(mc, username): - """ - Returns the token, expiry time and groups for the user if it already exists - on memcache. Returns None otherwise. - - :param mc: MemcacheRing object - :param username: swift user - """ - token, expires, groups = None, None, None - memcache_user_key = '%s/user/%s' % (RESELLER_PREFIX, username) - candidate_token = mc.get(memcache_user_key) - if candidate_token: - memcache_token_key = '%s/token/%s' % (RESELLER_PREFIX, candidate_token) - cached_auth_data = mc.get(memcache_token_key) - if cached_auth_data: - expires, groups = cached_auth_data - if expires > time(): - token = candidate_token - else: - expires, groups = None, None - return (token, expires, groups) - - -def set_auth_data(mc, username, token, expires, groups): - """ - Stores the following key value pairs on Memcache: - (token, expires+groups) - (user, token) - """ - auth_data = (expires, groups) - memcache_token_key = "%s/token/%s" % (RESELLER_PREFIX, token) - mc.set(memcache_token_key, auth_data, time=TOKEN_LIFE) - - # Record the token with the user info for future use. - memcache_user_key = '%s/user/%s' % (RESELLER_PREFIX, username) - mc.set(memcache_user_key, token, time=TOKEN_LIFE) - - -def generate_token(): - """Generates a random token.""" - # We don't use uuid.uuid4() here because importing the uuid module - # causes (harmless) SELinux denials in the audit log on RHEL 6. If this - # is a security concern, a custom SELinux policy module could be - # written to not log those denials. - r = random.SystemRandom() - token = '%stk%s' % \ - (RESELLER_PREFIX, - ''.join(r.choice('abcdef0123456789') for x in range(32))) - return token - - -def get_groups_from_username(username): - """Return a set of groups to which the user belongs to.""" - # Retrieve the numerical group IDs. We cannot list the group names - # because group names from Active Directory may contain spaces, and - # we wouldn't be able to split the list of group names into its - # elements. - p = Popen(['id', '-G', username], stdout=PIPE) - if p.wait() != 0: - raise RuntimeError("Failure running id -G for %s" % username) - (p_stdout, p_stderr) = p.communicate() - - # Convert the group numbers into group names. - groups = [] - for gid in p_stdout.strip().split(" "): - groups.append(grp.getgrgid(int(gid))[0]) - - # The first element of the list is considered a unique identifier - # for the user. We add the username to accomplish this. - if username in groups: - groups.remove(username) - groups = [username] + groups - groups = ','.join(groups) - return groups - - -def run_kinit(username, password): - """Runs kinit command as a child process and returns the status code.""" - kinit = Popen(['kinit', username], - stdin=PIPE, stdout=PIPE, stderr=PIPE) - kinit.stdin.write('%s\n' % password) - - # The following code handles a corner case where the Kerberos password - # has expired and a prompt is displayed to enter new password. Ideally, - # we would want to read from stdout but these are blocked reads. This is - # a hack to kill the process if it's taking too long! - - class Alarm(Exception): - pass - - def signal_handler(signum, frame): - raise Alarm - # Set the signal handler and a 1-second alarm - signal.signal(signal.SIGALRM, signal_handler) - signal.alarm(1) - try: - kinit.wait() # Wait for the child to exit - signal.alarm(0) # Reset the alarm - return kinit.returncode # Exit status of child on graceful exit - except Alarm: - # Taking too long, kill and return error - kinit.kill() - return -1 |