summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPushpesh Sharma <psharma@redhat.com>2014-04-21 18:04:39 +0530
committerChetan Risbud <crisbud@redhat.com>2014-04-27 22:09:18 -0700
commit6f4a88cbcd6389af4baebe7759b773de2cd856c7 (patch)
tree26abbb86e9a0962e6de167020570aaf66340c60f
parent404fcd815c4abe00742531bf19a5224c0fe45b36 (diff)
Removing functionalnosetests
With the Icehouse release of openstack-swift functional case classified as fucntionalnose tests have been moved to functional cases.Although we copied test suites from test/fucntionalnose/ to test/functional/ but we still have same cases under test/functionalnose.This might cause duplicate tests run. Change-Id: I025206467aad364debd9050b3186e1379d89ffaf Signed-off-by: Pushpesh Sharma <psharma@redhat.com> Reviewed-on: http://review.gluster.org/7516 Reviewed-by: Prashanth Pai <ppai@redhat.com> Tested-by: Prashanth Pai <ppai@redhat.com> Reviewed-by: Chetan Risbud <crisbud@redhat.com> Reviewed-by: Thiago da Silva <thiago@redhat.com>
-rw-r--r--test/functionalnosetests/__init__.py0
-rw-r--r--test/functionalnosetests/swift_testing.py175
-rwxr-xr-xtest/functionalnosetests/test_account.py203
-rwxr-xr-xtest/functionalnosetests/test_container.py687
-rwxr-xr-xtest/functionalnosetests/test_object.py602
-rwxr-xr-xtools/functional_tests.sh6
-rwxr-xr-xtools/gswauth_functional_tests.sh6
-rwxr-xr-xtools/keystone_functional_tests.sh6
-rwxr-xr-xtools/swkrbath_functional_tests.sh6
9 files changed, 0 insertions, 1691 deletions
diff --git a/test/functionalnosetests/__init__.py b/test/functionalnosetests/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/test/functionalnosetests/__init__.py
+++ /dev/null
diff --git a/test/functionalnosetests/swift_testing.py b/test/functionalnosetests/swift_testing.py
deleted file mode 100644
index 50abc8e..0000000
--- a/test/functionalnosetests/swift_testing.py
+++ /dev/null
@@ -1,175 +0,0 @@
-# Copyright (c) 2010-2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from httplib import HTTPException
-import os
-import socket
-import sys
-from time import sleep
-
-from test import get_config
-
-from swiftclient import get_auth, http_connection
-
-conf = get_config('func_test')
-web_front_end = conf.get('web_front_end', 'integral')
-normalized_urls = conf.get('normalized_urls', False)
-
-# If no conf was read, we will fall back to old school env vars
-swift_test_auth = os.environ.get('SWIFT_TEST_AUTH')
-swift_test_user = [os.environ.get('SWIFT_TEST_USER'), None, None]
-swift_test_key = [os.environ.get('SWIFT_TEST_KEY'), None, None]
-swift_test_tenant = ['', '', '']
-swift_test_perm = ['', '', '']
-
-if conf:
- swift_test_auth_version = str(conf.get('auth_version', '1'))
-
- swift_test_auth = 'http'
- if conf.get('auth_ssl', 'no').lower() in ('yes', 'true', 'on', '1'):
- swift_test_auth = 'https'
- if 'auth_prefix' not in conf:
- conf['auth_prefix'] = '/'
- try:
- suffix = '://%(auth_host)s:%(auth_port)s%(auth_prefix)s' % conf
- swift_test_auth += suffix
- except KeyError:
- pass # skip
-
- if swift_test_auth_version == "1":
- swift_test_auth += 'v1.0'
-
- if 'account' in conf:
- swift_test_user[0] = '%(account)s:%(username)s' % conf
- else:
- swift_test_user[0] = '%(username)s' % conf
- swift_test_key[0] = conf['password']
- try:
- swift_test_user[1] = '%s%s' % (
- '%s:' % conf['account2'] if 'account2' in conf else '',
- conf['username2'])
- swift_test_key[1] = conf['password2']
- except KeyError as err:
- pass # old conf, no second account tests can be run
- try:
- swift_test_user[2] = '%s%s' % ('%s:' % conf['account'] if 'account'
- in conf else '', conf['username3'])
- swift_test_key[2] = conf['password3']
- except KeyError as err:
- pass # old conf, no third account tests can be run
-
- for _ in range(3):
- swift_test_perm[_] = swift_test_user[_]
-
- else:
- swift_test_user[0] = conf['username']
- swift_test_tenant[0] = conf['account']
- swift_test_key[0] = conf['password']
- swift_test_user[1] = conf['username2']
- swift_test_tenant[1] = conf['account2']
- swift_test_key[1] = conf['password2']
- swift_test_user[2] = conf['username3']
- swift_test_tenant[2] = conf['account']
- swift_test_key[2] = conf['password3']
-
- for _ in range(3):
- swift_test_perm[_] = swift_test_tenant[_] + ':' \
- + swift_test_user[_]
-
-skip = not all([swift_test_auth, swift_test_user[0], swift_test_key[0]])
-if skip:
- print >>sys.stderr, 'SKIPPING FUNCTIONAL TESTS DUE TO NO CONFIG'
-
-skip2 = not all([not skip, swift_test_user[1], swift_test_key[1]])
-if not skip and skip2:
- print >>sys.stderr, \
- 'SKIPPING SECOND ACCOUNT FUNCTIONAL TESTS DUE TO NO CONFIG FOR THEM'
-
-skip3 = not all([not skip, swift_test_user[2], swift_test_key[2]])
-if not skip and skip3:
- print >>sys.stderr, \
- 'SKIPPING THIRD ACCOUNT FUNCTIONAL TESTS DUE TO NO CONFIG FOR THEM'
-
-
-class AuthError(Exception):
- pass
-
-
-class InternalServerError(Exception):
- pass
-
-
-url = [None, None, None]
-token = [None, None, None]
-parsed = [None, None, None]
-conn = [None, None, None]
-
-
-def retry(func, *args, **kwargs):
- """
- You can use the kwargs to override the 'retries' (default: 5) and
- 'use_account' (default: 1).
- """
- global url, token, parsed, conn
- retries = kwargs.get('retries', 5)
- use_account = 1
- if 'use_account' in kwargs:
- use_account = kwargs['use_account']
- del kwargs['use_account']
- use_account -= 1
- attempts = 0
- backoff = 1
- while attempts <= retries:
- attempts += 1
- try:
- if not url[use_account] or not token[use_account]:
- url[use_account], token[use_account] = \
- get_auth(swift_test_auth, swift_test_user[use_account],
- swift_test_key[use_account],
- snet=False,
- tenant_name=swift_test_tenant[use_account],
- auth_version=swift_test_auth_version,
- os_options={})
- parsed[use_account] = conn[use_account] = None
- if not parsed[use_account] or not conn[use_account]:
- parsed[use_account], conn[use_account] = \
- http_connection(url[use_account])
- return func(url[use_account], token[use_account],
- parsed[use_account], conn[use_account],
- *args, **kwargs)
- except (socket.error, HTTPException):
- if attempts > retries:
- raise
- parsed[use_account] = conn[use_account] = None
- except AuthError:
- url[use_account] = token[use_account] = None
- continue
- except InternalServerError:
- pass
- if attempts <= retries:
- sleep(backoff)
- backoff *= 2
- raise Exception('No result after %s retries.' % retries)
-
-
-def check_response(conn):
- resp = conn.getresponse()
- if resp.status == 401:
- resp.read()
- raise AuthError()
- elif resp.status // 100 == 5:
- resp.read()
- raise InternalServerError()
- return resp
diff --git a/test/functionalnosetests/test_account.py b/test/functionalnosetests/test_account.py
deleted file mode 100755
index b2f743f..0000000
--- a/test/functionalnosetests/test_account.py
+++ /dev/null
@@ -1,203 +0,0 @@
-#!/usr/bin/python
-
-# Copyright (c) 2010-2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from nose import SkipTest
-
-from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
- MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
-
-from swift_testing import check_response, retry, skip, web_front_end
-
-
-class TestAccount(unittest.TestCase):
-
- def test_metadata(self):
- if skip:
- raise SkipTest
-
- def post(url, token, parsed, conn, value):
- conn.request('POST', parsed.path, '',
- {'X-Auth-Token': token, 'X-Account-Meta-Test': value})
- return check_response(conn)
-
- def head(url, token, parsed, conn):
- conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
- return check_response(conn)
-
- def get(url, token, parsed, conn):
- conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(post, '')
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), None)
- resp = retry(get)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), None)
- resp = retry(post, 'Value')
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
- resp = retry(get)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
-
- def test_unicode_metadata(self):
- if skip:
- raise SkipTest
-
- def post(url, token, parsed, conn, name, value):
- conn.request('POST', parsed.path, '',
- {'X-Auth-Token': token, name: value})
- return check_response(conn)
-
- def head(url, token, parsed, conn):
- conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
- return check_response(conn)
- uni_key = u'X-Account-Meta-uni\u0E12'
- uni_value = u'uni\u0E12'
- if (web_front_end == 'integral'):
- resp = retry(post, uni_key, '1')
- resp.read()
- self.assertTrue(resp.status in (201, 204))
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')), '1')
- resp = retry(post, 'X-Account-Meta-uni', uni_value)
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('X-Account-Meta-uni'),
- uni_value.encode('utf-8'))
- if (web_front_end == 'integral'):
- resp = retry(post, uni_key, uni_value)
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')),
- uni_value.encode('utf-8'))
-
- def test_multi_metadata(self):
- if skip:
- raise SkipTest
-
- def post(url, token, parsed, conn, name, value):
- conn.request('POST', parsed.path, '',
- {'X-Auth-Token': token, name: value})
- return check_response(conn)
-
- def head(url, token, parsed, conn):
- conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(post, 'X-Account-Meta-One', '1')
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-one'), '1')
- resp = retry(post, 'X-Account-Meta-Two', '2')
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-account-meta-one'), '1')
- self.assertEquals(resp.getheader('x-account-meta-two'), '2')
-
- def test_bad_metadata(self):
- if skip:
- raise SkipTest
-
- def post(url, token, parsed, conn, extra_headers):
- headers = {'X-Auth-Token': token}
- headers.update(extra_headers)
- conn.request('POST', parsed.path, '', headers)
- return check_response(conn)
-
- resp = retry(post,
- {'X-Account-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(
- post,
- {'X-Account-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
- resp.read()
- self.assertEquals(resp.status, 400)
-
- resp = retry(post,
- {'X-Account-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(
- post,
- {'X-Account-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
- resp.read()
- self.assertEquals(resp.status, 400)
-
- headers = {}
- for x in xrange(MAX_META_COUNT):
- headers['X-Account-Meta-%d' % x] = 'v'
- resp = retry(post, headers)
- resp.read()
- self.assertEquals(resp.status, 204)
- headers = {}
- for x in xrange(MAX_META_COUNT + 1):
- headers['X-Account-Meta-%d' % x] = 'v'
- resp = retry(post, headers)
- resp.read()
- self.assertEquals(resp.status, 400)
-
- headers = {}
- header_value = 'k' * MAX_META_VALUE_LENGTH
- size = 0
- x = 0
- while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
- size += 4 + MAX_META_VALUE_LENGTH
- headers['X-Account-Meta-%04d' % x] = header_value
- x += 1
- if MAX_META_OVERALL_SIZE - size > 1:
- headers['X-Account-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size - 1)
- resp = retry(post, headers)
- resp.read()
- self.assertEquals(resp.status, 204)
- headers['X-Account-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size)
- resp = retry(post, headers)
- resp.read()
- self.assertEquals(resp.status, 400)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/test/functionalnosetests/test_container.py b/test/functionalnosetests/test_container.py
deleted file mode 100755
index 15f7fc1..0000000
--- a/test/functionalnosetests/test_container.py
+++ /dev/null
@@ -1,687 +0,0 @@
-#!/usr/bin/python
-
-# Copyright (c) 2010-2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-import unittest
-from nose import SkipTest
-from uuid import uuid4
-
-from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
- MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
-
-from swift_testing import check_response, retry, skip, skip2, skip3, \
- swift_test_perm, web_front_end
-
-
-class TestContainer(unittest.TestCase):
-
- def setUp(self):
- if skip:
- raise SkipTest
- self.name = uuid4().hex
-
- def put(url, token, parsed, conn):
- conn.request('PUT', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(put)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- def tearDown(self):
- if skip:
- raise SkipTest
-
- def get(url, token, parsed, conn):
- conn.request('GET', parsed.path + '/' + self.name + '?format=json',
- '', {'X-Auth-Token': token})
- return check_response(conn)
-
- def delete(url, token, parsed, conn, obj):
- conn.request('DELETE',
- '/'.join([parsed.path, self.name, obj['name']]), '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- while True:
- resp = retry(get)
- body = resp.read()
- self.assert_(resp.status // 100 == 2, resp.status)
- objs = json.loads(body)
- if not objs:
- break
- for obj in objs:
- resp = retry(delete, obj)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- def delete(url, token, parsed, conn):
- conn.request('DELETE', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(delete)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- def test_multi_metadata(self):
- if skip:
- raise SkipTest
-
- def post(url, token, parsed, conn, name, value):
- conn.request('POST', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token, name: value})
- return check_response(conn)
-
- def head(url, token, parsed, conn):
- conn.request('HEAD', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(post, 'X-Container-Meta-One', '1')
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-one'), '1')
- resp = retry(post, 'X-Container-Meta-Two', '2')
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-one'), '1')
- self.assertEquals(resp.getheader('x-container-meta-two'), '2')
-
- def test_unicode_metadata(self):
- if skip:
- raise SkipTest
-
- def post(url, token, parsed, conn, name, value):
- conn.request('POST', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token, name: value})
- return check_response(conn)
-
- def head(url, token, parsed, conn):
- conn.request('HEAD', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- uni_key = u'X-Container-Meta-uni\u0E12'
- uni_value = u'uni\u0E12'
- if (web_front_end == 'integral'):
- resp = retry(post, uni_key, '1')
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')), '1')
- resp = retry(post, 'X-Container-Meta-uni', uni_value)
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('X-Container-Meta-uni'),
- uni_value.encode('utf-8'))
- if (web_front_end == 'integral'):
- resp = retry(post, uni_key, uni_value)
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader(uni_key.encode('utf-8')),
- uni_value.encode('utf-8'))
-
- def test_PUT_metadata(self):
- if skip:
- raise SkipTest
-
- def put(url, token, parsed, conn, name, value):
- conn.request('PUT', parsed.path + '/' + name, '',
- {'X-Auth-Token': token,
- 'X-Container-Meta-Test': value})
- return check_response(conn)
-
- def head(url, token, parsed, conn, name):
- conn.request('HEAD', parsed.path + '/' + name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- def get(url, token, parsed, conn, name):
- conn.request('GET', parsed.path + '/' + name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- def delete(url, token, parsed, conn, name):
- conn.request('DELETE', parsed.path + '/' + name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- name = uuid4().hex
- resp = retry(put, name, 'Value')
- resp.read()
- self.assertEquals(resp.status, 201)
- resp = retry(head, name)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
- resp = retry(get, name)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
- resp = retry(delete, name)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- name = uuid4().hex
- resp = retry(put, name, '')
- resp.read()
- self.assertEquals(resp.status, 201)
- resp = retry(head, name)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
- resp = retry(get, name)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
- resp = retry(delete, name)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- def test_POST_metadata(self):
- if skip:
- raise SkipTest
-
- def post(url, token, parsed, conn, value):
- conn.request('POST', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token,
- 'X-Container-Meta-Test': value})
- return check_response(conn)
-
- def head(url, token, parsed, conn):
- conn.request('HEAD', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- def get(url, token, parsed, conn):
- conn.request('GET', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
- resp = retry(get)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), None)
- resp = retry(post, 'Value')
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(head)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
- resp = retry(get)
- resp.read()
- self.assert_(resp.status in (200, 204), resp.status)
- self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
-
- def test_PUT_bad_metadata(self):
- if skip:
- raise SkipTest
-
- def put(url, token, parsed, conn, name, extra_headers):
- headers = {'X-Auth-Token': token}
- headers.update(extra_headers)
- conn.request('PUT', parsed.path + '/' + name, '', headers)
- return check_response(conn)
-
- def delete(url, token, parsed, conn, name):
- conn.request('DELETE', parsed.path + '/' + name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- name = uuid4().hex
- resp = retry(
- put, name,
- {'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
- resp.read()
- self.assertEquals(resp.status, 201)
- resp = retry(delete, name)
- resp.read()
- self.assertEquals(resp.status, 204)
- name = uuid4().hex
- resp = retry(
- put, name,
- {'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
- resp.read()
- self.assertEquals(resp.status, 400)
- resp = retry(delete, name)
- resp.read()
- self.assertEquals(resp.status, 404)
-
- name = uuid4().hex
- resp = retry(
- put, name,
- {'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
- resp.read()
- self.assertEquals(resp.status, 201)
- resp = retry(delete, name)
- resp.read()
- self.assertEquals(resp.status, 204)
- name = uuid4().hex
- resp = retry(
- put, name,
- {'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
- resp.read()
- self.assertEquals(resp.status, 400)
- resp = retry(delete, name)
- resp.read()
- self.assertEquals(resp.status, 404)
-
- name = uuid4().hex
- headers = {}
- for x in xrange(MAX_META_COUNT):
- headers['X-Container-Meta-%d' % x] = 'v'
- resp = retry(put, name, headers)
- resp.read()
- self.assertEquals(resp.status, 201)
- resp = retry(delete, name)
- resp.read()
- self.assertEquals(resp.status, 204)
- name = uuid4().hex
- headers = {}
- for x in xrange(MAX_META_COUNT + 1):
- headers['X-Container-Meta-%d' % x] = 'v'
- resp = retry(put, name, headers)
- resp.read()
- self.assertEquals(resp.status, 400)
- resp = retry(delete, name)
- resp.read()
- self.assertEquals(resp.status, 404)
-
- name = uuid4().hex
- headers = {}
- header_value = 'k' * MAX_META_VALUE_LENGTH
- size = 0
- x = 0
- while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
- size += 4 + MAX_META_VALUE_LENGTH
- headers['X-Container-Meta-%04d' % x] = header_value
- x += 1
- if MAX_META_OVERALL_SIZE - size > 1:
- headers['X-Container-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size - 1)
- resp = retry(put, name, headers)
- resp.read()
- self.assertEquals(resp.status, 201)
- resp = retry(delete, name)
- resp.read()
- self.assertEquals(resp.status, 204)
- name = uuid4().hex
- headers['X-Container-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size)
- resp = retry(put, name, headers)
- resp.read()
- self.assertEquals(resp.status, 400)
- resp = retry(delete, name)
- resp.read()
- self.assertEquals(resp.status, 404)
-
- def test_POST_bad_metadata(self):
- if skip:
- raise SkipTest
-
- def post(url, token, parsed, conn, extra_headers):
- headers = {'X-Auth-Token': token}
- headers.update(extra_headers)
- conn.request('POST', parsed.path + '/' + self.name, '', headers)
- return check_response(conn)
-
- resp = retry(
- post,
- {'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(
- post,
- {'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
- resp.read()
- self.assertEquals(resp.status, 400)
-
- resp = retry(
- post,
- {'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(
- post,
- {'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
- resp.read()
- self.assertEquals(resp.status, 400)
-
- headers = {}
- for x in xrange(MAX_META_COUNT):
- headers['X-Container-Meta-%d' % x] = 'v'
- resp = retry(post, headers)
- resp.read()
- self.assertEquals(resp.status, 204)
- headers = {}
- for x in xrange(MAX_META_COUNT + 1):
- headers['X-Container-Meta-%d' % x] = 'v'
- resp = retry(post, headers)
- resp.read()
- self.assertEquals(resp.status, 400)
-
- headers = {}
- header_value = 'k' * MAX_META_VALUE_LENGTH
- size = 0
- x = 0
- while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
- size += 4 + MAX_META_VALUE_LENGTH
- headers['X-Container-Meta-%04d' % x] = header_value
- x += 1
- if MAX_META_OVERALL_SIZE - size > 1:
- headers['X-Container-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size - 1)
- resp = retry(post, headers)
- resp.read()
- self.assertEquals(resp.status, 204)
- headers['X-Container-Meta-k'] = \
- 'v' * (MAX_META_OVERALL_SIZE - size)
- resp = retry(post, headers)
- resp.read()
- self.assertEquals(resp.status, 400)
-
- def test_public_container(self):
- if skip:
- raise SkipTest
-
- def get(url, token, parsed, conn):
- conn.request('GET', parsed.path + '/' + self.name)
- return check_response(conn)
-
- try:
- resp = retry(get)
- raise Exception('Should not have been able to GET')
- except Exception as err:
- self.assert_(str(err).startswith('No result after '), err)
-
- def post(url, token, parsed, conn):
- conn.request('POST', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token,
- 'X-Container-Read': '.r:*,.rlistings'})
- return check_response(conn)
-
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(get)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- def post(url, token, parsed, conn):
- conn.request('POST', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token, 'X-Container-Read': ''})
- return check_response(conn)
-
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
- try:
- resp = retry(get)
- raise Exception('Should not have been able to GET')
- except Exception as err:
- self.assert_(str(err).startswith('No result after '), err)
-
- def test_cross_account_container(self):
- if skip or skip2:
- raise SkipTest
- # Obtain the first account's string
- first_account = ['unknown']
-
- def get1(url, token, parsed, conn):
- first_account[0] = parsed.path
- conn.request('HEAD', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(get1)
- resp.read()
-
- # Ensure we can't access the container with the second account
- def get2(url, token, parsed, conn):
- conn.request('GET', first_account[0] + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(get2, use_account=2)
- resp.read()
- self.assertEquals(resp.status, 403)
-
- # Make the container accessible by the second account
- def post(url, token, parsed, conn):
- conn.request('POST', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token,
- 'X-Container-Read': swift_test_perm[1],
- 'X-Container-Write': swift_test_perm[1]})
- return check_response(conn)
-
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
- # Ensure we can now use the container with the second account
- resp = retry(get2, use_account=2)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- # Make the container private again
- def post(url, token, parsed, conn):
- conn.request('POST', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token, 'X-Container-Read': '',
- 'X-Container-Write': ''})
- return check_response(conn)
-
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
- # Ensure we can't access the container with the second account again
- resp = retry(get2, use_account=2)
- resp.read()
- self.assertEquals(resp.status, 403)
-
- def test_cross_account_public_container(self):
- if skip or skip2:
- raise SkipTest
- # Obtain the first account's string
- first_account = ['unknown']
-
- def get1(url, token, parsed, conn):
- first_account[0] = parsed.path
- conn.request('HEAD', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(get1)
- resp.read()
-
- # Ensure we can't access the container with the second account
- def get2(url, token, parsed, conn):
- conn.request('GET', first_account[0] + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(get2, use_account=2)
- resp.read()
- self.assertEquals(resp.status, 403)
-
- # Make the container completely public
- def post(url, token, parsed, conn):
- conn.request('POST', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token,
- 'X-Container-Read': '.r:*,.rlistings'})
- return check_response(conn)
-
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
- # Ensure we can now read the container with the second account
- resp = retry(get2, use_account=2)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- # But we shouldn't be able to write with the second account
- def put2(url, token, parsed, conn):
- conn.request('PUT', first_account[0] + '/' + self.name + '/object',
- 'test object', {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(put2, use_account=2)
- resp.read()
- self.assertEquals(resp.status, 403)
-
- # Now make the container also writeable by the second account
- def post(url, token, parsed, conn):
- conn.request('POST', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token,
- 'X-Container-Write': swift_test_perm[1]})
- return check_response(conn)
-
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
- # Ensure we can still read the container with the second account
- resp = retry(get2, use_account=2)
- resp.read()
- self.assertEquals(resp.status, 204)
- # And that we can now write with the second account
- resp = retry(put2, use_account=2)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- def test_nonadmin_user(self):
- if skip or skip3:
- raise SkipTest
- # Obtain the first account's string
- first_account = ['unknown']
-
- def get1(url, token, parsed, conn):
- first_account[0] = parsed.path
- conn.request('HEAD', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(get1)
- resp.read()
-
- # Ensure we can't access the container with the third account
- def get3(url, token, parsed, conn):
- conn.request('GET', first_account[0] + '/' + self.name, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(get3, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 403)
-
- # Make the container accessible by the third account
- def post(url, token, parsed, conn):
- conn.request('POST', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token,
- 'X-Container-Read': swift_test_perm[2]})
- return check_response(conn)
-
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
- # Ensure we can now read the container with the third account
- resp = retry(get3, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- # But we shouldn't be able to write with the third account
- def put3(url, token, parsed, conn):
- conn.request('PUT', first_account[0] + '/' + self.name + '/object',
- 'test object', {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(put3, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 403)
-
- # Now make the container also writeable by the third account
- def post(url, token, parsed, conn):
- conn.request('POST', parsed.path + '/' + self.name, '',
- {'X-Auth-Token': token,
- 'X-Container-Write': swift_test_perm[2]})
- return check_response(conn)
-
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
- # Ensure we can still read the container with the third account
- resp = retry(get3, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 204)
- # And that we can now write with the third account
- resp = retry(put3, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- def test_long_name_content_type(self):
- if skip:
- raise SkipTest
-
- def put(url, token, parsed, conn):
- container_name = 'X' * 2048
- conn.request('PUT', '%s/%s' % (parsed.path, container_name),
- 'there', {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(put)
- resp.read()
- self.assertEquals(resp.status, 400)
- self.assertEquals(resp.getheader('Content-Type'),
- 'text/html; charset=UTF-8')
-
- def test_null_name(self):
- if skip:
- raise SkipTest
-
- def put(url, token, parsed, conn):
- conn.request('PUT', '%s/abc%%00def' % parsed.path, '',
- {'X-Auth-Token': token})
- return check_response(conn)
-
- resp = retry(put)
- if (web_front_end == 'apache2'):
- self.assertEquals(resp.status, 404)
- else:
- self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
- self.assertEquals(resp.status, 412)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/test/functionalnosetests/test_object.py b/test/functionalnosetests/test_object.py
deleted file mode 100755
index 97cd8d0..0000000
--- a/test/functionalnosetests/test_object.py
+++ /dev/null
@@ -1,602 +0,0 @@
-#!/usr/bin/python
-
-# Copyright (c) 2010-2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from nose import SkipTest
-from uuid import uuid4
-
-from swift_testing import check_response, retry, skip, skip3, \
- swift_test_perm, web_front_end
-
-
-class TestObject(unittest.TestCase):
-
- def setUp(self):
- if skip:
- raise SkipTest
- self.container = uuid4().hex
-
- def put(url, token, parsed, conn):
- conn.request('PUT', parsed.path + '/' + self.container, '',
- {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(put)
- resp.read()
- self.assertEquals(resp.status, 201)
- self.obj = uuid4().hex
-
- def put(url, token, parsed, conn):
- conn.request('PUT', '%s/%s/%s' % (
- parsed.path, self.container, self.obj), 'test',
- {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(put)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- def tearDown(self):
- if skip:
- raise SkipTest
-
- def delete(url, token, parsed, conn, obj):
- conn.request('DELETE',
- '%s/%s/%s' % (parsed.path, self.container, obj),
- '', {'X-Auth-Token': token})
- return check_response(conn)
-
- # get list of objects in container
- def list(url, token, parsed, conn):
- conn.request('GET',
- '%s/%s' % (parsed.path, self.container),
- '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(list)
- object_listing = resp.read()
- self.assertEquals(resp.status, 200)
-
- # iterate over object listing and delete all objects
- for obj in object_listing.splitlines():
- resp = retry(delete, obj)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- # delete the container
- def delete(url, token, parsed, conn):
- conn.request('DELETE', parsed.path + '/' + self.container, '',
- {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(delete)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- def test_copy_object(self):
- if skip:
- raise SkipTest
-
- source = '%s/%s' % (self.container, self.obj)
- dest = '%s/%s' % (self.container, 'test_copy')
-
- # get contents of source
- def get_source(url, token, parsed, conn):
- conn.request('GET',
- '%s/%s' % (parsed.path, source),
- '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(get_source)
- source_contents = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(source_contents, 'test')
-
- # copy source to dest with X-Copy-From
- def put(url, token, parsed, conn):
- conn.request('PUT', '%s/%s' % (parsed.path, dest), '',
- {'X-Auth-Token': token,
- 'Content-Length': '0',
- 'X-Copy-From': source})
- return check_response(conn)
- resp = retry(put)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # contents of dest should be the same as source
- def get_dest(url, token, parsed, conn):
- conn.request('GET',
- '%s/%s' % (parsed.path, dest),
- '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(get_dest)
- dest_contents = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(dest_contents, source_contents)
-
- # delete the copy
- def delete(url, token, parsed, conn):
- conn.request('DELETE', '%s/%s' % (parsed.path, dest), '',
- {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(delete)
- resp.read()
- self.assertEquals(resp.status, 204)
- # verify dest does not exist
- resp = retry(get_dest)
- resp.read()
- self.assertEquals(resp.status, 404)
-
- # copy source to dest with COPY
- def copy(url, token, parsed, conn):
- conn.request('COPY', '%s/%s' % (parsed.path, source), '',
- {'X-Auth-Token': token,
- 'Destination': dest})
- return check_response(conn)
- resp = retry(copy)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # contents of dest should be the same as source
- resp = retry(get_dest)
- dest_contents = resp.read()
- self.assertEquals(resp.status, 200)
- self.assertEquals(dest_contents, source_contents)
-
- # delete the copy
- resp = retry(delete)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- def test_public_object(self):
- if skip:
- raise SkipTest
-
- def get(url, token, parsed, conn):
- conn.request('GET',
- '%s/%s/%s' % (parsed.path, self.container, self.obj))
- return check_response(conn)
- try:
- resp = retry(get)
- raise Exception('Should not have been able to GET')
- except Exception as err:
- self.assert_(str(err).startswith('No result after '))
-
- def post(url, token, parsed, conn):
- conn.request('POST', parsed.path + '/' + self.container, '',
- {'X-Auth-Token': token,
- 'X-Container-Read': '.r:*'})
- return check_response(conn)
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
- resp = retry(get)
- resp.read()
- self.assertEquals(resp.status, 200)
-
- def post(url, token, parsed, conn):
- conn.request('POST', parsed.path + '/' + self.container, '',
- {'X-Auth-Token': token, 'X-Container-Read': ''})
- return check_response(conn)
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
- try:
- resp = retry(get)
- raise Exception('Should not have been able to GET')
- except Exception as err:
- self.assert_(str(err).startswith('No result after '))
-
- def test_private_object(self):
- if skip or skip3:
- raise SkipTest
-
- # Ensure we can't access the object with the third account
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/%s' % (
- parsed.path, self.container, self.obj), '',
- {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(get, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 403)
-
- # create a shared container writable by account3
- shared_container = uuid4().hex
-
- def put(url, token, parsed, conn):
- conn.request('PUT', '%s/%s' % (
- parsed.path, shared_container), '',
- {'X-Auth-Token': token,
- 'X-Container-Read': swift_test_perm[2],
- 'X-Container-Write': swift_test_perm[2]})
- return check_response(conn)
- resp = retry(put)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # verify third account can not copy from private container
- def copy(url, token, parsed, conn):
- conn.request('PUT', '%s/%s/%s' % (
- parsed.path, shared_container, 'private_object'), '',
- {'X-Auth-Token': token,
- 'Content-Length': '0',
- 'X-Copy-From': '%s/%s' % (self.container, self.obj)})
- return check_response(conn)
- resp = retry(copy, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 403)
-
- # verify third account can write "obj1" to shared container
- def put(url, token, parsed, conn):
- conn.request('PUT', '%s/%s/%s' % (
- parsed.path, shared_container, 'obj1'), 'test',
- {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(put, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # verify third account can copy "obj1" to shared container
- def copy2(url, token, parsed, conn):
- conn.request('COPY', '%s/%s/%s' % (
- parsed.path, shared_container, 'obj1'), '',
- {'X-Auth-Token': token,
- 'Destination': '%s/%s' % (shared_container, 'obj1')})
- return check_response(conn)
- resp = retry(copy2, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # verify third account STILL can not copy from private container
- def copy3(url, token, parsed, conn):
- conn.request('COPY', '%s/%s/%s' % (
- parsed.path, self.container, self.obj), '',
- {'X-Auth-Token': token,
- 'Destination': '%s/%s' % (shared_container,
- 'private_object')})
- return check_response(conn)
- resp = retry(copy3, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 403)
-
- # clean up "obj1"
- def delete(url, token, parsed, conn):
- conn.request('DELETE', '%s/%s/%s' % (
- parsed.path, shared_container, 'obj1'), '',
- {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(delete)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- # clean up shared_container
- def delete(url, token, parsed, conn):
- conn.request('DELETE',
- parsed.path + '/' + shared_container, '',
- {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(delete)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- def test_manifest(self):
- if skip:
- raise SkipTest
- # Data for the object segments
- segments1 = ['one', 'two', 'three', 'four', 'five']
- segments2 = ['six', 'seven', 'eight']
- segments3 = ['nine', 'ten', 'eleven']
-
- # Upload the first set of segments
- def put(url, token, parsed, conn, objnum):
- conn.request('PUT', '%s/%s/segments1/%s' % (
- parsed.path, self.container, str(objnum)), segments1[objnum],
- {'X-Auth-Token': token})
- return check_response(conn)
- for objnum in xrange(len(segments1)):
- resp = retry(put, objnum)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # Upload the manifest
- def put(url, token, parsed, conn):
- conn.request('PUT', '%s/%s/manifest' % (
- parsed.path, self.container), '', {
- 'X-Auth-Token': token,
- 'X-Object-Manifest': '%s/segments1/' % self.container,
- 'Content-Type': 'text/jibberish', 'Content-Length': '0'})
- return check_response(conn)
- resp = retry(put)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # Get the manifest (should get all the segments as the body)
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/manifest' % (
- parsed.path, self.container), '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1))
- self.assertEquals(resp.status, 200)
- self.assertEquals(resp.getheader('content-type'), 'text/jibberish')
-
- # Get with a range at the start of the second segment
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/manifest' % (
- parsed.path, self.container), '', {
- 'X-Auth-Token': token, 'Range': 'bytes=3-'})
- return check_response(conn)
- resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1[1:]))
- self.assertEquals(resp.status, 206)
-
- # Get with a range in the middle of the second segment
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/manifest' % (
- parsed.path, self.container), '', {
- 'X-Auth-Token': token, 'Range': 'bytes=5-'})
- return check_response(conn)
- resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1)[5:])
- self.assertEquals(resp.status, 206)
-
- # Get with a full start and stop range
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/manifest' % (
- parsed.path, self.container), '', {
- 'X-Auth-Token': token, 'Range': 'bytes=5-10'})
- return check_response(conn)
- resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1)[5:11])
- self.assertEquals(resp.status, 206)
-
- # Upload the second set of segments
- def put(url, token, parsed, conn, objnum):
- conn.request('PUT', '%s/%s/segments2/%s' % (
- parsed.path, self.container, str(objnum)), segments2[objnum],
- {'X-Auth-Token': token})
- return check_response(conn)
- for objnum in xrange(len(segments2)):
- resp = retry(put, objnum)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # Get the manifest (should still be the first segments of course)
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/manifest' % (
- parsed.path, self.container), '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments1))
- self.assertEquals(resp.status, 200)
-
- # Update the manifest
- def put(url, token, parsed, conn):
- conn.request('PUT', '%s/%s/manifest' % (
- parsed.path, self.container), '', {
- 'X-Auth-Token': token,
- 'X-Object-Manifest': '%s/segments2/' % self.container,
- 'Content-Length': '0'})
- return check_response(conn)
- resp = retry(put)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # Get the manifest (should be the second set of segments now)
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/manifest' % (
- parsed.path, self.container), '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments2))
- self.assertEquals(resp.status, 200)
-
- if not skip3:
-
- # Ensure we can't access the manifest with the third account
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/manifest' % (
- parsed.path, self.container), '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(get, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 403)
-
- # Grant access to the third account
- def post(url, token, parsed, conn):
- conn.request('POST', '%s/%s' % (parsed.path, self.container),
- '', {'X-Auth-Token': token,
- 'X-Container-Read': swift_test_perm[2]})
- return check_response(conn)
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- # The third account should be able to get the manifest now
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/manifest' % (
- parsed.path, self.container), '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(get, use_account=3)
- self.assertEquals(resp.read(), ''.join(segments2))
- self.assertEquals(resp.status, 200)
-
- # Create another container for the third set of segments
- acontainer = uuid4().hex
-
- def put(url, token, parsed, conn):
- conn.request('PUT', parsed.path + '/' + acontainer, '',
- {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(put)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # Upload the third set of segments in the other container
- def put(url, token, parsed, conn, objnum):
- conn.request('PUT', '%s/%s/segments3/%s' % (
- parsed.path, acontainer, str(objnum)), segments3[objnum],
- {'X-Auth-Token': token})
- return check_response(conn)
- for objnum in xrange(len(segments3)):
- resp = retry(put, objnum)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # Update the manifest
- def put(url, token, parsed, conn):
- conn.request('PUT', '%s/%s/manifest' % (
- parsed.path, self.container), '',
- {'X-Auth-Token': token,
- 'X-Object-Manifest': '%s/segments3/' % acontainer,
- 'Content-Length': '0'})
- return check_response(conn)
- resp = retry(put)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- # Get the manifest to ensure it's the third set of segments
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/manifest' % (
- parsed.path, self.container), '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(get)
- self.assertEquals(resp.read(), ''.join(segments3))
- self.assertEquals(resp.status, 200)
-
- if not skip3:
-
- # Ensure we can't access the manifest with the third account
- # (because the segments are in a protected container even if the
- # manifest itself is not).
-
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/manifest' % (
- parsed.path, self.container), '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(get, use_account=3)
- resp.read()
- self.assertEquals(resp.status, 403)
-
- # Grant access to the third account
- def post(url, token, parsed, conn):
- conn.request('POST', '%s/%s' % (parsed.path, acontainer),
- '', {'X-Auth-Token': token,
- 'X-Container-Read': swift_test_perm[2]})
- return check_response(conn)
- resp = retry(post)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- # The third account should be able to get the manifest now
- def get(url, token, parsed, conn):
- conn.request('GET', '%s/%s/manifest' % (
- parsed.path, self.container), '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(get, use_account=3)
- self.assertEquals(resp.read(), ''.join(segments3))
- self.assertEquals(resp.status, 200)
-
- # Delete the manifest
- def delete(url, token, parsed, conn, objnum):
- conn.request('DELETE', '%s/%s/manifest' % (
- parsed.path,
- self.container), '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(delete, objnum)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- # Delete the third set of segments
- def delete(url, token, parsed, conn, objnum):
- conn.request('DELETE', '%s/%s/segments3/%s' % (
- parsed.path, acontainer, str(objnum)), '',
- {'X-Auth-Token': token})
- return check_response(conn)
- for objnum in xrange(len(segments3)):
- resp = retry(delete, objnum)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- # Delete the second set of segments
- def delete(url, token, parsed, conn, objnum):
- conn.request('DELETE', '%s/%s/segments2/%s' % (
- parsed.path, self.container, str(objnum)), '',
- {'X-Auth-Token': token})
- return check_response(conn)
- for objnum in xrange(len(segments2)):
- resp = retry(delete, objnum)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- # Delete the first set of segments
- def delete(url, token, parsed, conn, objnum):
- conn.request('DELETE', '%s/%s/segments1/%s' % (
- parsed.path, self.container, str(objnum)), '',
- {'X-Auth-Token': token})
- return check_response(conn)
- for objnum in xrange(len(segments1)):
- resp = retry(delete, objnum)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- # Delete the extra container
- def delete(url, token, parsed, conn):
- conn.request('DELETE', '%s/%s' % (parsed.path, acontainer), '',
- {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(delete)
- resp.read()
- self.assertEquals(resp.status, 204)
-
- def test_delete_content_type(self):
- if skip:
- raise SkipTest
-
- def put(url, token, parsed, conn):
- conn.request('PUT', '%s/%s/hi' % (parsed.path, self.container),
- 'there', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(put)
- resp.read()
- self.assertEquals(resp.status, 201)
-
- def delete(url, token, parsed, conn):
- conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
- '', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(delete)
- resp.read()
- self.assertEquals(resp.status, 204)
- self.assertEquals(resp.getheader('Content-Type'),
- 'text/html; charset=UTF-8')
-
- def test_null_name(self):
- if skip:
- raise SkipTest
-
- def put(url, token, parsed, conn):
- conn.request('PUT', '%s/%s/abc%%00def' % (
- parsed.path,
- self.container), 'test', {'X-Auth-Token': token})
- return check_response(conn)
- resp = retry(put)
- if (web_front_end == 'apache2'):
- self.assertEquals(resp.status, 404)
- else:
- self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
- self.assertEquals(resp.status, 412)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/tools/functional_tests.sh b/tools/functional_tests.sh
index 68b524f..94d5d9b 100755
--- a/tools/functional_tests.sh
+++ b/tools/functional_tests.sh
@@ -78,12 +78,6 @@ nosetests -v --exe \
--with-html-output \
--html-out-file functional_tests/gluster-swift-generic-functional-result.html \
test/functional || fail "Functional tests failed"
-nosetests -v --exe \
- --with-xunit \
- --xunit-file functional_tests/gluster-swift-functionalnosetests-TC-report.xml \
- --with-html-output \
- --html-out-file functional_tests/gluster-swift-functionalnosetests-result.html \
- test/functionalnosetests || fail "Functional-nose tests failed"
cleanup
exit 0
diff --git a/tools/gswauth_functional_tests.sh b/tools/gswauth_functional_tests.sh
index 5e4b31a..dbe2248 100755
--- a/tools/gswauth_functional_tests.sh
+++ b/tools/gswauth_functional_tests.sh
@@ -70,12 +70,6 @@ run_generic_tests()
--with-html-output \
--html-out-file functional_tests/gluster-swift-gswauth-generic-functional-result.html \
test/functional || fail "Functional tests failed"
- nosetests -v --exe \
- --with-xunit \
- --xunit-file functional_tests/gluster-swift-gswauth-functionalnosetests-TC-report.xml \
- --with-html-output \
- --html-out-file functional_tests/gluster-swift-gswauth-functionalnosetests-result.html \
- test/functionalnosetests || fail "Functional-nose tests failed"
}
### MAIN ###
diff --git a/tools/keystone_functional_tests.sh b/tools/keystone_functional_tests.sh
index 6acf24a..620bcc7 100755
--- a/tools/keystone_functional_tests.sh
+++ b/tools/keystone_functional_tests.sh
@@ -97,12 +97,6 @@ nosetests -v --exe \
--with-html-output \
--html-out-file functional_tests/gluster-swift-keystone-generic-functional-result.html \
test/functional || fail "Functional tests failed"
-nosetests -v --exe \
- --with-xunit \
- --xunit-file functional_tests/gluster-swift-keystone-functionalnosetests-TC-report.xml \
- --with-html-output \
- --html-out-file functional_tests/gluster-swift-keystone-functionalnosetests-result.html \
- test/functionalnosetests || fail "Functional-nose tests failed"
cleanup
exit 0
diff --git a/tools/swkrbath_functional_tests.sh b/tools/swkrbath_functional_tests.sh
index 7abfd18..9995f1d 100755
--- a/tools/swkrbath_functional_tests.sh
+++ b/tools/swkrbath_functional_tests.sh
@@ -91,12 +91,6 @@ nosetests -v --exe \
--with-html-output \
--html-out-file functional_tests/gluster-swift-swiftkerbauth-generic-functional-result.html \
test/functional_auth/swiftkerbauth || fail "Functional tests failed"
-#nosetests -v --exe \
-# --with-xunit \
-# --xunit-file functional_tests/gluster-swift-swiftkerbauth-functionalnosetests-TC-report.xml \
-# --with-html-output \
-# --html-out-file functional_tests/gluster-swift-swiftkerbauth-functionalnosetests-result.html \
-# test/functional || fail "Functional-nose tests failed"
cleanup
exit 0