diff options
author | Prashanth Pai <ppai@redhat.com> | 2013-10-10 15:47:31 +0530 |
---|---|---|
committer | Luis Pabon <lpabon@redhat.com> | 2013-10-14 20:20:24 -0700 |
commit | 9812a4a9e4a30a208d77d3b10828a1c174dccd77 (patch) | |
tree | 458a0bb606320394d7ad331c71b9b2425b18a4dd /test/unit/test_kerbauth.py | |
parent | be359eabbbbf8269a5fa3aebaef17cec48c44176 (diff) |
Add unit tests
Change-Id: I7bbf74b66c26d0a964fa769bf9c46dd73bd03d73
Signed-off-by: Prashanth Pai <ppai@redhat.com>
Reviewed-on: http://review.gluster.org/6067
Reviewed-by: Luis Pabon <lpabon@redhat.com>
Tested-by: Luis Pabon <lpabon@redhat.com>
Diffstat (limited to 'test/unit/test_kerbauth.py')
-rw-r--r-- | test/unit/test_kerbauth.py | 368 |
1 files changed, 368 insertions, 0 deletions
diff --git a/test/unit/test_kerbauth.py b/test/unit/test_kerbauth.py new file mode 100644 index 0000000..95697a4 --- /dev/null +++ b/test/unit/test_kerbauth.py @@ -0,0 +1,368 @@ +# Copyright (c) 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from contextlib import contextmanager +from time import time + +from swiftkerbauth import kerbauth as auth +from swift.common.swob import Request, Response + +EXT_AUTHENTICATION_URL = "127.0.0.1" +REDIRECT_STATUS = 302 + + +def my_filter_factory(global_conf, **local_conf): + if 'ext_authentication_url' not in global_conf: + global_conf['ext_authentication_url'] = EXT_AUTHENTICATION_URL + conf = global_conf.copy() + conf.update(local_conf) + + def auth_filter(app): + return auth.KerbAuth(app, conf) + return auth_filter + +# Monkey patching filter_factory to always pass ext_authentication_url +# as a parameter. Absence of ext_authentication_url raises a RuntimeError + + +def patch_filter_factory(): + auth.filter_factory = my_filter_factory + + +def unpatch_filter_factory(): + reload(auth) + + +class FakeMemcache(object): + + def __init__(self): + self.store = {} + + def get(self, key): + return self.store.get(key) + + def set(self, key, value, time=0): + self.store[key] = value + return True + + def incr(self, key, time=0): + self.store[key] = self.store.setdefault(key, 0) + 1 + return self.store[key] + + @contextmanager + def soft_lock(self, key, timeout=0, retries=5): + yield True + + def delete(self, key): + try: + del self.store[key] + except Exception: + pass + return True + + +class FakeApp(object): + + def __init__(self, status_headers_body_iter=None, acl=None, sync_key=None): + self.calls = 0 + self.status_headers_body_iter = status_headers_body_iter + if not self.status_headers_body_iter: + self.status_headers_body_iter = iter([('404 Not Found', {}, '')]) + self.acl = acl + self.sync_key = sync_key + + def __call__(self, env, start_response): + self.calls += 1 + self.request = Request.blank('', environ=env) + if self.acl: + self.request.acl = self.acl + if self.sync_key: + self.request.environ['swift_sync_key'] = self.sync_key + if 'swift.authorize' in env: + resp = env['swift.authorize'](self.request) + if resp: + return resp(env, start_response) + status, headers, body = self.status_headers_body_iter.next() + return Response(status=status, headers=headers, + body=body)(env, start_response) + + +class TestAuth(unittest.TestCase): + + # Patch auth.filter_factory() + patch_filter_factory() + + def setUp(self): + self.test_auth = auth.filter_factory({})(FakeApp()) + + def _make_request(self, path, **kwargs): + req = Request.blank(path, **kwargs) + req.environ['swift.cache'] = FakeMemcache() + return req + + def test_no_ext_authentication_url(self): + app = FakeApp() + try: + # Use original auth.filter_factory and NOT monkey patched version + unpatch_filter_factory() + auth.filter_factory({})(app) + except RuntimeError as e: + # Restore monkey patched version + patch_filter_factory() + self.assertTrue(e.args[0].startswith("Missing filter parameter " + "ext_authentication_url")) + + def test_reseller_prefix_init(self): + app = FakeApp() + ath = auth.filter_factory({})(app) + self.assertEquals(ath.reseller_prefix, 'AUTH_') + ath = auth.filter_factory({'reseller_prefix': 'TEST'})(app) + self.assertEquals(ath.reseller_prefix, 'TEST_') + ath = auth.filter_factory({'reseller_prefix': 'TEST_'})(app) + self.assertEquals(ath.reseller_prefix, 'TEST_') + + def test_auth_prefix_init(self): + app = FakeApp() + ath = auth.filter_factory({})(app) + self.assertEquals(ath.auth_prefix, '/auth/') + ath = auth.filter_factory({'auth_prefix': ''})(app) + self.assertEquals(ath.auth_prefix, '/auth/') + ath = auth.filter_factory({'auth_prefix': '/'})(app) + self.assertEquals(ath.auth_prefix, '/auth/') + ath = auth.filter_factory({'auth_prefix': '/test/'})(app) + self.assertEquals(ath.auth_prefix, '/test/') + ath = auth.filter_factory({'auth_prefix': '/test'})(app) + self.assertEquals(ath.auth_prefix, '/test/') + ath = auth.filter_factory({'auth_prefix': 'test/'})(app) + self.assertEquals(ath.auth_prefix, '/test/') + ath = auth.filter_factory({'auth_prefix': 'test'})(app) + self.assertEquals(ath.auth_prefix, '/test/') + + def test_top_level_redirect(self): + req = self._make_request('/') + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, REDIRECT_STATUS) + self.assertEquals(req.environ['swift.authorize'], + self.test_auth.denied_response) + + def test_override_asked_for_and_allowed(self): + self.test_auth = \ + auth.filter_factory({'allow_overrides': 'true'})(FakeApp()) + req = self._make_request('/v1/AUTH_account', + environ={'swift.authorize_override': True}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertTrue('swift.authorize' not in req.environ) + + def test_override_default_allowed(self): + req = self._make_request('/v1/AUTH_account', + environ={'swift.authorize_override': True}) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 404) + self.assertTrue('swift.authorize' not in req.environ) + + def test_options_call(self): + req = self._make_request('/v1/AUTH_cfa/c/o', + environ={'REQUEST_METHOD': 'OPTIONS'}) + resp = self.test_auth.authorize(req) + self.assertEquals(resp, None) + + def test_auth_deny_non_reseller_prefix_no_override(self): + fake_authorize = lambda x: Response(status='500 Fake') + req = self._make_request('/v1/BLAH_account', + headers={'X-Auth-Token': 'BLAH_t'}, + environ={'swift.authorize': fake_authorize} + ) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 500) + self.assertEquals(req.environ['swift.authorize'], fake_authorize) + + def test_authorize_acl_group_access(self): + req = self._make_request('/v1/AUTH_cfa') + req.remote_user = 'act:usr,act' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + req = self._make_request('/v1/AUTH_cfa') + req.remote_user = 'act:usr,act' + req.acl = 'act' + self.assertEquals(self.test_auth.authorize(req), None) + req = self._make_request('/v1/AUTH_cfa') + req.remote_user = 'act:usr,act' + req.acl = 'act:usr' + self.assertEquals(self.test_auth.authorize(req), None) + req = self._make_request('/v1/AUTH_cfa') + req.remote_user = 'act:usr,act' + + def test_deny_cross_reseller(self): + # Tests that cross-reseller is denied, even if ACLs/group names match + req = self._make_request('/v1/OTHER_cfa') + req.remote_user = 'act:usr,act,AUTH_cfa' + req.acl = 'act' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + + def test_authorize_acl_referer_after_user_groups(self): + req = self._make_request('/v1/AUTH_cfa/c') + req.remote_user = 'act:usr' + req.acl = '.r:*,act:usr' + self.assertEquals(self.test_auth.authorize(req), None) + + def test_detect_reseller_request(self): + req = self._make_request('/v1/AUTH_admin', + headers={'X-Auth-Token': 'AUTH_t'}) + cache_key = 'AUTH_/token/AUTH_t' + cache_entry = (time() + 3600, '.reseller_admin') + req.environ['swift.cache'].set(cache_key, cache_entry) + req.get_response(self.test_auth) + self.assertTrue(req.environ.get('reseller_request', False)) + + def test_regular_is_not_owner(self): + orig_authorize = self.test_auth.authorize + owner_values = [] + + def mitm_authorize(req): + rv = orig_authorize(req) + owner_values.append(req.environ.get('swift_owner', False)) + return rv + + self.test_auth.authorize = mitm_authorize + + req = self._make_request( + '/v1/AUTH_cfa/c', + headers={'X-Auth-Token': 'AUTH_t'}) + req.remote_user = 'act:usr' + self.test_auth.authorize(req) + self.assertEquals(owner_values, [False]) + + def test_no_memcache(self): + env = {'swift.cache': None} + try: + self.test_auth.get_groups(env, None) + except Exception as e: + self.assertTrue(e.args[0].startswith("Memcache required")) + + def test_handle_request(self): + req = self._make_request('/auth/v1.0') + resp = self.test_auth.handle_request(req) + self.assertEquals(resp.status_int, REDIRECT_STATUS) + + def test_handle_request_bad_request(self): + req = self._make_request('////') + resp = self.test_auth.handle_request(req) + self.assertEquals(resp.status_int, 404) + + def test_handle_request_no_handler(self): + req = self._make_request('/blah/blah/blah/blah') + resp = self.test_auth.handle_request(req) + self.assertEquals(resp.status_int, 400) + + def test_handle_get_token_bad_request(self): + req = self._make_request('/blah/blah') + resp = self.test_auth.handle_get_token(req) + self.assertEquals(resp.status_int, 400) + req = self._make_request('/////') + resp = self.test_auth.handle_get_token(req) + self.assertEquals(resp.status_int, 404) + + def test_handle(self): + req = self._make_request('/auth/v1.0') + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, REDIRECT_STATUS) + + def test_authorize_invalid_req(self): + req = self._make_request('/') + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 404) + + def test_authorize_set_swift_owner(self): + req = self._make_request('/v1/AUTH_test/c1/o1') + req.remote_user = 'test,auth_reseller_admin' + resp = self.test_auth.authorize(req) + self.assertEquals(req.environ['swift_owner'], True) + self.assertTrue(resp is None) + req = self._make_request('/v1/AUTH_test/c1/o1') + req.remote_user = 'test,auth_test' + resp = self.test_auth.authorize(req) + self.assertEquals(req.environ['swift_owner'], True) + self.assertTrue(resp is None) + + def test_authorize_swift_sync_key(self): + req = self._make_request( + '/v1/AUTH_cfa/c/o', + environ={'swift_sync_key': 'secret'}, + headers={'x-container-sync-key': 'secret', + 'x-timestamp': '123.456'}) + resp = self.test_auth.authorize(req) + self.assertTrue(resp is None) + + def test_authorize_acl_referrer_access(self): + req = self._make_request('/v1/AUTH_cfa/c') + req.remote_user = 'act:usr,act' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + req = self._make_request('/v1/AUTH_cfa/c') + req.remote_user = 'act:usr,act' + req.acl = '.r:*,.rlistings' + self.assertEquals(self.test_auth.authorize(req), None) + req = self._make_request('/v1/AUTH_cfa/c') + req.remote_user = 'act:usr,act' + req.acl = '.r:*' # No listings allowed + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + req = self._make_request('/v1/AUTH_cfa/c') + req.remote_user = 'act:usr,act' + req.acl = '.r:.example.com,.rlistings' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, 403) + req = self._make_request('/v1/AUTH_cfa/c') + req.remote_user = 'act:usr,act' + req.referer = 'http://www.example.com/index.html' + req.acl = '.r:.example.com,.rlistings' + self.assertEquals(self.test_auth.authorize(req), None) + req = self._make_request('/v1/AUTH_cfa/c') + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, REDIRECT_STATUS) + req = self._make_request('/v1/AUTH_cfa/c') + req.acl = '.r:*,.rlistings' + self.assertEquals(self.test_auth.authorize(req), None) + req = self._make_request('/v1/AUTH_cfa/c') + req.acl = '.r:*' # No listings allowed + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, REDIRECT_STATUS) + req = self._make_request('/v1/AUTH_cfa/c') + req.acl = '.r:.example.com,.rlistings' + resp = self.test_auth.authorize(req) + self.assertEquals(resp.status_int, REDIRECT_STATUS) + req = self._make_request('/v1/AUTH_cfa/c') + req.referer = 'http://www.example.com/index.html' + req.acl = '.r:.example.com,.rlistings' + self.assertEquals(self.test_auth.authorize(req), None) + + def test_handle_x_storage_token(self): + req = self._make_request( + '/auth/v1.0', + headers={'x-storage-token': 'blahblah', }) + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, REDIRECT_STATUS) + + def test_invalid_token(self): + req = self._make_request('/k1/test') + req.environ['HTTP_X_AUTH_TOKEN'] = 'AUTH_blahblahblah' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, REDIRECT_STATUS) + +if __name__ == '__main__': + unittest.main() |