# Copyright (c) 2013 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import unittest from time import time from swiftkerbauth import kerbauth as auth from test.unit import FakeMemcache from swift.common.swob import Request, Response EXT_AUTHENTICATION_URL = "127.0.0.1" REDIRECT_STATUS = 303 # HTTPSeeOther def my_filter_factory(global_conf, **local_conf): if 'ext_authentication_url' not in global_conf: global_conf['ext_authentication_url'] = EXT_AUTHENTICATION_URL conf = global_conf.copy() conf.update(local_conf) def auth_filter(app): return auth.KerbAuth(app, conf) return auth_filter # Monkey patching filter_factory to always pass ext_authentication_url # as a parameter. Absence of ext_authentication_url raises a RuntimeError def patch_filter_factory(): auth.filter_factory = my_filter_factory def unpatch_filter_factory(): reload(auth) class FakeApp(object): def __init__(self, status_headers_body_iter=None, acl=None, sync_key=None): self.calls = 0 self.status_headers_body_iter = status_headers_body_iter if not self.status_headers_body_iter: self.status_headers_body_iter = iter([('404 Not Found', {}, '')]) self.acl = acl self.sync_key = sync_key def __call__(self, env, start_response): self.calls += 1 self.request = Request.blank('', environ=env) if self.acl: self.request.acl = self.acl if self.sync_key: self.request.environ['swift_sync_key'] = self.sync_key if 'swift.authorize' in env: resp = env['swift.authorize'](self.request) if resp: return resp(env, start_response) status, headers, body = self.status_headers_body_iter.next() return Response(status=status, headers=headers, body=body)(env, start_response) class TestAuth(unittest.TestCase): # Patch auth.filter_factory() patch_filter_factory() def setUp(self): self.test_auth = auth.filter_factory({})(FakeApp()) def _make_request(self, path, **kwargs): req = Request.blank(path, **kwargs) req.environ['swift.cache'] = FakeMemcache() return req def test_no_ext_authentication_url(self): app = FakeApp() try: # Use original auth.filter_factory and NOT monkey patched version unpatch_filter_factory() auth.filter_factory({})(app) except RuntimeError as e: # Restore monkey patched version patch_filter_factory() self.assertTrue(e.args[0].startswith("Missing filter parameter " "ext_authentication_url")) def test_reseller_prefix_init(self): app = FakeApp() ath = auth.filter_factory({})(app) self.assertEquals(ath.reseller_prefix, 'AUTH_') ath = auth.filter_factory({'reseller_prefix': 'TEST'})(app) self.assertEquals(ath.reseller_prefix, 'TEST_') ath = auth.filter_factory({'reseller_prefix': 'TEST_'})(app) self.assertEquals(ath.reseller_prefix, 'TEST_') def test_auth_prefix_init(self): app = FakeApp() ath = auth.filter_factory({})(app) self.assertEquals(ath.auth_prefix, '/auth/') ath = auth.filter_factory({'auth_prefix': ''})(app) self.assertEquals(ath.auth_prefix, '/auth/') ath = auth.filter_factory({'auth_prefix': '/'})(app) self.assertEquals(ath.auth_prefix, '/auth/') ath = auth.filter_factory({'auth_prefix': '/test/'})(app) self.assertEquals(ath.auth_prefix, '/test/') ath = auth.filter_factory({'auth_prefix': '/test'})(app) self.assertEquals(ath.auth_prefix, '/test/') ath = auth.filter_factory({'auth_prefix': 'test/'})(app) self.assertEquals(ath.auth_prefix, '/test/') ath = auth.filter_factory({'auth_prefix': 'test'})(app) self.assertEquals(ath.auth_prefix, '/test/') def test_top_level_redirect(self): req = self._make_request('/') resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, REDIRECT_STATUS) self.assertEquals(req.environ['swift.authorize'], self.test_auth.denied_response) def test_override_asked_for_and_allowed(self): self.test_auth = \ auth.filter_factory({'allow_overrides': 'true'})(FakeApp()) req = self._make_request('/v1/AUTH_account', environ={'swift.authorize_override': True}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertTrue('swift.authorize' not in req.environ) def test_override_default_allowed(self): req = self._make_request('/v1/AUTH_account', environ={'swift.authorize_override': True}) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 404) self.assertTrue('swift.authorize' not in req.environ) def test_options_call(self): req = self._make_request('/v1/AUTH_cfa/c/o', environ={'REQUEST_METHOD': 'OPTIONS'}) resp = self.test_auth.authorize(req) self.assertEquals(resp, None) def test_auth_deny_non_reseller_prefix_no_override(self): fake_authorize = lambda x: Response(status='500 Fake') req = self._make_request('/v1/BLAH_account', headers={'X-Auth-Token': 'BLAH_t'}, environ={'swift.authorize': fake_authorize} ) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, 500) self.assertEquals(req.environ['swift.authorize'], fake_authorize) def test_authorize_acl_group_access(self): req = self._make_request('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = self._make_request('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' req.acl = 'act' self.assertEquals(self.test_auth.authorize(req), None) req = self._make_request('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' req.acl = 'act:usr' self.assertEquals(self.test_auth.authorize(req), None) req = self._make_request('/v1/AUTH_cfa') req.remote_user = 'act:usr,act' def test_deny_cross_reseller(self): # Tests that cross-reseller is denied, even if ACLs/group names match req = self._make_request('/v1/OTHER_cfa') req.remote_user = 'act:usr,act,AUTH_cfa' req.acl = 'act' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) def test_authorize_acl_referer_after_user_groups(self): req = self._make_request('/v1/AUTH_cfa/c') req.remote_user = 'act:usr' req.acl = '.r:*,act:usr' self.assertEquals(self.test_auth.authorize(req), None) def test_detect_reseller_request(self): req = self._make_request('/v1/AUTH_admin', headers={'X-Auth-Token': 'AUTH_t'}) cache_key = 'AUTH_/token/AUTH_t' cache_entry = (time() + 3600, '.reseller_admin') req.environ['swift.cache'].set(cache_key, cache_entry) req.get_response(self.test_auth) self.assertTrue(req.environ.get('reseller_request', False)) def test_regular_is_not_owner(self): orig_authorize = self.test_auth.authorize owner_values = [] def mitm_authorize(req): rv = orig_authorize(req) owner_values.append(req.environ.get('swift_owner', False)) return rv self.test_auth.authorize = mitm_authorize req = self._make_request( '/v1/AUTH_cfa/c', headers={'X-Auth-Token': 'AUTH_t'}) req.remote_user = 'act:usr' self.test_auth.authorize(req) self.assertEquals(owner_values, [False]) def test_no_memcache(self): env = {'swift.cache': None} try: self.test_auth.get_groups(env, None) except Exception as e: self.assertTrue(e.args[0].startswith("Memcache required")) def test_handle_request(self): req = self._make_request('/auth/v1.0') resp = self.test_auth.handle_request(req) self.assertEquals(resp.status_int, REDIRECT_STATUS) def test_handle_request_bad_request(self): req = self._make_request('////') resp = self.test_auth.handle_request(req) self.assertEquals(resp.status_int, 404) def test_handle_request_no_handler(self): req = self._make_request('/blah/blah/blah/blah') resp = self.test_auth.handle_request(req) self.assertEquals(resp.status_int, 400) def test_handle_get_token_bad_request(self): req = self._make_request('/blah/blah') resp = self.test_auth.handle_get_token(req) self.assertEquals(resp.status_int, 400) req = self._make_request('/////') resp = self.test_auth.handle_get_token(req) self.assertEquals(resp.status_int, 404) def test_handle(self): req = self._make_request('/auth/v1.0') resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, REDIRECT_STATUS) def test_authorize_invalid_req(self): req = self._make_request('/') resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 404) def test_authorize_set_swift_owner(self): req = self._make_request('/v1/AUTH_test/c1/o1') req.remote_user = 'test,auth_reseller_admin' resp = self.test_auth.authorize(req) self.assertEquals(req.environ['swift_owner'], True) self.assertTrue(resp is None) req = self._make_request('/v1/AUTH_test/c1/o1') req.remote_user = 'test,auth_test' resp = self.test_auth.authorize(req) self.assertEquals(req.environ['swift_owner'], True) self.assertTrue(resp is None) def test_authorize_swift_sync_key(self): req = self._make_request( '/v1/AUTH_cfa/c/o', environ={'swift_sync_key': 'secret'}, headers={'x-container-sync-key': 'secret', 'x-timestamp': '123.456'}) resp = self.test_auth.authorize(req) self.assertTrue(resp is None) def test_authorize_acl_referrer_access(self): req = self._make_request('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = self._make_request('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' req.acl = '.r:*,.rlistings' self.assertEquals(self.test_auth.authorize(req), None) req = self._make_request('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' req.acl = '.r:*' # No listings allowed resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = self._make_request('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' req.acl = '.r:.example.com,.rlistings' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) req = self._make_request('/v1/AUTH_cfa/c') req.remote_user = 'act:usr,act' req.referer = 'http://www.example.com/index.html' req.acl = '.r:.example.com,.rlistings' self.assertEquals(self.test_auth.authorize(req), None) req = self._make_request('/v1/AUTH_cfa/c') resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, REDIRECT_STATUS) req = self._make_request('/v1/AUTH_cfa/c') req.acl = '.r:*,.rlistings' self.assertEquals(self.test_auth.authorize(req), None) req = self._make_request('/v1/AUTH_cfa/c') req.acl = '.r:*' # No listings allowed resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, REDIRECT_STATUS) req = self._make_request('/v1/AUTH_cfa/c') req.acl = '.r:.example.com,.rlistings' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, REDIRECT_STATUS) req = self._make_request('/v1/AUTH_cfa/c') req.referer = 'http://www.example.com/index.html' req.acl = '.r:.example.com,.rlistings' self.assertEquals(self.test_auth.authorize(req), None) def test_handle_x_storage_token(self): req = self._make_request( '/auth/v1.0', headers={'x-storage-token': 'blahblah', }) resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, REDIRECT_STATUS) def test_invalid_token(self): req = self._make_request('/k1/test') req.environ['HTTP_X_AUTH_TOKEN'] = 'AUTH_blahblahblah' resp = req.get_response(self.test_auth) self.assertEquals(resp.status_int, REDIRECT_STATUS) if __name__ == '__main__': unittest.main()