summaryrefslogtreecommitdiffstats
path: root/test/functional/tests.py
diff options
context:
space:
mode:
authorThiago da Silva <thiago@redhat.com>2014-04-22 14:15:02 -0400
committerPrashanth Pai <ppai@redhat.com>2016-01-06 07:53:12 -0800
commit2a8f9f0f530327039c32e444b6a27130b12666bd (patch)
treee24e38b5b3c0245a0acafc63fc50bacbf7de718a /test/functional/tests.py
parent4c6ca1db931377b75583f61a7bca262cfc27b0fa (diff)
Update repo
This is a squashed commit imported from this repo: https://github.com/openstack/swiftonfile/tree/icehouse Contains the follwing commits from above mentioned repo: eb50236 Merge "Backport: Fix metadata overall limits bug" into icehouse 79ea52a Backport: Fix metadata overall limits bug bc43f0b Fix inconsistent data being returned on GET ad0bb79 Import HTTPBadRequest from swift's module 74d02e6 Exclude .trashcan dir from container listing b2dbc15 Catch ESTALE in addition to ENOENT 8d60b48 Properly handle read_metadata() exceptions 6762fc6 Fix object server leaking file descriptors 2842e82 Fix API incompatibility in update_metadata() 2beeef6 Merge "Remove swiftkerbauth code" into icehouse 93dbcb5 Update object-expirer.conf with explanations c9d2f09 Merge "Check if /etc/swift exists in ring builder" into icehouse d66c14c Remove swiftkerbauth code 3142ed2 Add object expiration functests 97153d1 Merge "Cleanup functest and undo old patch" into icehouse bc234d0 Remove old travis config file and fix typo 260c8ef Check if /etc/swift exists in ring builder 637dac9 Cleanup functest and undo old patch 051e068 Merge pull request #35 from prashanthpai/backport-1 be104a3 Merge pull request #36 from prashanthpai/backport-2 ff76f42 fix issue with GET on large object (icehouse-backport) 04d0a99 Fix unlink call after successful rename 4c6ca1d updating README file with project name change 10b2680 Merge pull request #18 from thiagol11/icehouse 5bcab8f Updating version on __init__ file 5c2cba2 Merge pull request #15 from thiagol11/update_spec 52b00a8 updating spec file to add dependency on swift icehouse ae7c93b Merge pull request #6 from prashanthpai/rebase 191e55b Revert: allow non-root user to run functests cb7e968 Modify unit tests and func tests d23fd1b Sync with OpenStack Swift v1.13.1 b6d1671 Merge pull request #12 from pushpesh/functionalnosetestremove 962622b Merge pull request #8 from thiagol11/update_readme 4560857 Merge pull request #9 from prashanthpai/spec-expirer be0ae7e Minor update 65000f1 Removing functionalnosetests 8ab1069 Fix object-expirer.conf-gluster RPM build error afee30f added new support filesystem section 527b01f updated README.md to Swift-On-File 9a240c7 Merge pull request #3 from thiagol11/add_jenkins_to_travis 34b5a8b removing blank lines 3568b64 fixing missing fi d8f5b0f adding support to run jenkins triggered by travis 6f4a88c Removing functionalnosetests 8041944 Update README.md c015148 Merge pull request #2 from thiagol11/master 3ddd952 fixing travis file to run correct unit test c582669 adding travis status badge to README 8093096 adding py26 unit testing to travis 37835fd trigger travis build cb6332a adding travis ci testing All tests have been run sucessfully against this. tox -e p2p8,py27,functest Change-Id: I096b611da852d3eb3913844034b443b8272c2ac4 Signed-off-by: Prashanth Pai <ppai@redhat.com> Reviewed-on: http://review.gluster.org/13188
Diffstat (limited to 'test/functional/tests.py')
-rw-r--r--test/functional/tests.py470
1 files changed, 411 insertions, 59 deletions
diff --git a/test/functional/tests.py b/test/functional/tests.py
index 0d9a9ef..ad87d7e 100644
--- a/test/functional/tests.py
+++ b/test/functional/tests.py
@@ -19,14 +19,16 @@
from datetime import datetime
import os
import hashlib
+import hmac
import json
import locale
import random
import StringIO
import time
import threading
-import uuid
import unittest
+import urllib
+import uuid
from nose import SkipTest
from ConfigParser import ConfigParser
@@ -36,7 +38,7 @@ from test.functional.swift_test_client import Account, Connection, File, \
from swift.common.constraints import MAX_FILE_SIZE, MAX_META_NAME_LENGTH, \
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, \
MAX_OBJECT_NAME_LENGTH, CONTAINER_LISTING_LIMIT, ACCOUNT_LISTING_LIMIT, \
- MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH
+ MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, MAX_HEADER_SIZE
from gluster.swift.common.constraints import \
set_object_name_component_length, get_object_name_component_length
@@ -50,7 +52,8 @@ default_constraints = dict((
('container_listing_limit', CONTAINER_LISTING_LIMIT),
('account_listing_limit', ACCOUNT_LISTING_LIMIT),
('max_account_name_length', MAX_ACCOUNT_NAME_LENGTH),
- ('max_container_name_length', MAX_CONTAINER_NAME_LENGTH)))
+ ('max_container_name_length', MAX_CONTAINER_NAME_LENGTH),
+ ('max_header_size', MAX_HEADER_SIZE)))
constraints_conf = ConfigParser()
conf_exists = constraints_conf.read('/etc/swift/swift.conf')
# Constraints are set first from the test config, then from
@@ -285,7 +288,7 @@ class TestAccount(Base):
if try_count < 5:
time.sleep(1)
- self.assertEquals(info['container_count'], len(self.env.containers))
+ self.assertEqual(info['container_count'], len(self.env.containers))
self.assert_status(204)
def testContainerSerializedInfo(self):
@@ -309,11 +312,11 @@ class TestAccount(Base):
headers = dict(self.env.conn.response.getheaders())
if format_type == 'json':
- self.assertEquals(headers['content-type'],
- 'application/json; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/json; charset=utf-8')
elif format_type == 'xml':
- self.assertEquals(headers['content-type'],
- 'application/xml; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/xml; charset=utf-8')
def testListingLimit(self):
limit = load_constraint('account_listing_limit')
@@ -337,7 +340,7 @@ class TestAccount(Base):
if isinstance(b[0], dict):
b = [x['name'] for x in b]
- self.assertEquals(a, b)
+ self.assertEqual(a, b)
def testInvalidAuthToken(self):
hdrs = {'X-Auth-Token': 'bogus_auth_token'}
@@ -347,12 +350,12 @@ class TestAccount(Base):
def testLastContainerMarker(self):
for format_type in [None, 'json', 'xml']:
containers = self.env.account.containers({'format': format_type})
- self.assertEquals(len(containers), len(self.env.containers))
+ self.assertEqual(len(containers), len(self.env.containers))
self.assert_status(200)
containers = self.env.account.containers(
parms={'format': format_type, 'marker': containers[-1]})
- self.assertEquals(len(containers), 0)
+ self.assertEqual(len(containers), 0)
if format_type is None:
self.assert_status(204)
else:
@@ -380,8 +383,8 @@ class TestAccount(Base):
parms={'format': format_type})
if isinstance(containers[0], dict):
containers = [x['name'] for x in containers]
- self.assertEquals(sorted(containers, cmp=locale.strcoll),
- containers)
+ self.assertEqual(sorted(containers, cmp=locale.strcoll),
+ containers)
class TestAccountUTF8(Base2, TestAccount):
@@ -518,13 +521,13 @@ class TestContainer(Base):
for format_type in [None, 'json', 'xml']:
for prefix in prefixs:
files = cont.files(parms={'prefix': prefix})
- self.assertEquals(files, sorted(prefix_files[prefix]))
+ self.assertEqual(files, sorted(prefix_files[prefix]))
for format_type in [None, 'json', 'xml']:
for prefix in prefixs:
files = cont.files(parms={'limit': limit_count,
'prefix': prefix})
- self.assertEquals(len(files), limit_count)
+ self.assertEqual(len(files), limit_count)
for file_item in files:
self.assert_(file_item.startswith(prefix))
@@ -548,7 +551,7 @@ class TestContainer(Base):
container = self.env.account.container(valid_utf8)
self.assert_(container.create(cfg={'no_path_quote': True}))
self.assert_(container.name in self.env.account.containers())
- self.assertEquals(container.files(), [])
+ self.assertEqual(container.files(), [])
self.assert_(container.delete())
container = self.env.account.container(invalid_utf8)
@@ -614,12 +617,12 @@ class TestContainer(Base):
def testLastFileMarker(self):
for format_type in [None, 'json', 'xml']:
files = self.env.container.files({'format': format_type})
- self.assertEquals(len(files), len(self.env.files))
+ self.assertEqual(len(files), len(self.env.files))
self.assert_status(200)
files = self.env.container.files(
parms={'format': format_type, 'marker': files[-1]})
- self.assertEquals(len(files), 0)
+ self.assertEqual(len(files), 0)
if format_type is None:
self.assert_status(204)
@@ -665,14 +668,14 @@ class TestContainer(Base):
files = self.env.container.files(parms={'format': format_type})
if isinstance(files[0], dict):
files = [x['name'] for x in files]
- self.assertEquals(sorted(files, cmp=locale.strcoll), files)
+ self.assertEqual(sorted(files, cmp=locale.strcoll), files)
def testContainerInfo(self):
info = self.env.container.info()
self.assert_status(204)
- self.assertEquals(info['object_count'], self.env.file_count)
- self.assertEquals(info['bytes_used'],
- self.env.file_count * self.env.file_size)
+ self.assertEqual(info['object_count'], self.env.file_count)
+ self.assertEqual(info['bytes_used'],
+ self.env.file_count * self.env.file_size)
def testContainerInfoOnContainerThatDoesNotExist(self):
container = self.env.account.container(Utils.create_name())
@@ -683,7 +686,7 @@ class TestContainer(Base):
for format_type in [None, 'json', 'xml']:
files = self.env.container.files(parms={'format': format_type,
'limit': 2})
- self.assertEquals(len(files), 2)
+ self.assertEqual(len(files), 2)
def testTooLongName(self):
cont = self.env.account.container('x' * 257)
@@ -838,7 +841,7 @@ class TestContainerPaths(Base):
if isinstance(files[0], dict):
files = [str(x['name']) for x in files]
- self.assertEquals(files, self.env.stored_files)
+ self.assertEqual(files, self.env.stored_files)
for format_type in ('json', 'xml'):
for file_item in self.env.container.files(parms={'format':
@@ -846,13 +849,13 @@ class TestContainerPaths(Base):
self.assert_(int(file_item['bytes']) >= 0)
self.assert_('last_modified' in file_item)
if file_item['name'].endswith('/'):
- self.assertEquals(file_item['content_type'],
- 'application/directory')
+ self.assertEqual(file_item['content_type'],
+ 'application/directory')
def testStructure(self):
def assert_listing(path, file_list):
files = self.env.container.files(parms={'path': path})
- self.assertEquals(sorted(file_list, cmp=locale.strcoll), files)
+ self.assertEqual(sorted(file_list, cmp=locale.strcoll), files)
if not normalized_urls:
assert_listing('/', ['/dir1/', '/dir2/', '/file1', '/file A'])
assert_listing('/dir1',
@@ -1176,7 +1179,7 @@ class TestFile(Base):
for i in container.files(parms={'format': 'json'}):
file_types_read[i['name'].split('.')[1]] = i['content_type']
- self.assertEquals(file_types, file_types_read)
+ self.assertEqual(file_types, file_types_read)
def testRangedGets(self):
file_length = 10000
@@ -1201,7 +1204,7 @@ class TestFile(Base):
self.assertRaises(ResponseError, file_item.read, hdrs=hdrs)
self.assert_status(416)
else:
- self.assertEquals(file_item.read(hdrs=hdrs), data[-i:])
+ self.assertEqual(file_item.read(hdrs=hdrs), data[-i:])
range_string = 'bytes=%d-' % (i)
hdrs = {'Range': range_string}
@@ -1350,9 +1353,9 @@ class TestFile(Base):
info = file_item.info()
self.assert_status(200)
- self.assertEquals(info['content_length'], self.env.file_size)
- self.assertEquals(info['etag'], md5)
- self.assertEquals(info['content_type'], content_type)
+ self.assertEqual(info['content_length'], self.env.file_size)
+ self.assertEqual(info['etag'], md5)
+ self.assertEqual(info['content_type'], content_type)
self.assert_('last_modified' in info)
def testDeleteOfFileThatDoesNotExist(self):
@@ -1395,7 +1398,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_item.name)
self.assert_(file_item.initialize())
self.assert_status(200)
- self.assertEquals(file_item.metadata, metadata)
+ self.assertEqual(file_item.metadata, metadata)
def testGetContentType(self):
file_name = Utils.create_name()
@@ -1408,7 +1411,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_name)
file_item.read()
- self.assertEquals(content_type, file_item.content_type)
+ self.assertEqual(content_type, file_item.content_type)
def testGetOnFileThatDoesNotExist(self):
# in container that exists
@@ -1449,7 +1452,7 @@ class TestFile(Base):
file_item = self.env.container.file(file_item.name)
self.assert_(file_item.initialize())
self.assert_status(200)
- self.assertEquals(file_item.metadata, metadata)
+ self.assertEqual(file_item.metadata, metadata)
def testSerialization(self):
container = self.env.account.container(Utils.create_name())
@@ -1478,9 +1481,9 @@ class TestFile(Base):
if f['name'] != file_item['name']:
continue
- self.assertEquals(file_item['content_type'],
- f['content_type'])
- self.assertEquals(int(file_item['bytes']), f['bytes'])
+ self.assertEqual(file_item['content_type'],
+ f['content_type'])
+ self.assertEqual(int(file_item['bytes']), f['bytes'])
d = datetime.strptime(
file_item['last_modified'].split('.')[0],
@@ -1488,7 +1491,7 @@ class TestFile(Base):
lm = time.mktime(d.timetuple())
if 'last_modified' in f:
- self.assertEquals(f['last_modified'], lm)
+ self.assertEqual(f['last_modified'], lm)
else:
f['last_modified'] = lm
@@ -1500,11 +1503,11 @@ class TestFile(Base):
headers = dict(self.env.conn.response.getheaders())
if format_type == 'json':
- self.assertEquals(headers['content-type'],
- 'application/json; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/json; charset=utf-8')
elif format_type == 'xml':
- self.assertEquals(headers['content-type'],
- 'application/xml; charset=utf-8')
+ self.assertEqual(headers['content-type'],
+ 'application/xml; charset=utf-8')
lm_diff = max([f['last_modified'] for f in files]) -\
min([f['last_modified'] for f in files])
@@ -1547,7 +1550,7 @@ class TestFile(Base):
self.assert_('etag' in headers.keys())
header_etag = headers['etag'].strip('"')
- self.assertEquals(etag, header_etag)
+ self.assertEqual(etag, header_etag)
def testChunkedPut(self):
if (web_front_end == 'apache2'):
@@ -1565,7 +1568,7 @@ class TestFile(Base):
self.assert_(data == file_item.read())
info = file_item.info()
- self.assertEquals(etag, info['etag'])
+ self.assertEqual(etag, info['etag'])
class TestFileUTF8(Base2, TestFile):
@@ -1677,12 +1680,30 @@ class TestDlo(Base):
file_contents,
"aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffff")
+ def test_copy_manifest(self):
+ # Copying the manifest should result in another manifest
+ try:
+ man1_item = self.env.container.file('man1')
+ man1_item.copy(self.env.container.name, "copied-man1",
+ parms={'multipart-manifest': 'get'})
+
+ copied = self.env.container.file("copied-man1")
+ copied_contents = copied.read(parms={'multipart-manifest': 'get'})
+ self.assertEqual(copied_contents, "man1-contents")
+
+ copied_contents = copied.read()
+ self.assertEqual(
+ copied_contents,
+ "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee")
+ finally:
+ # try not to leave this around for other tests to stumble over
+ self.env.container.file("copied-man1").delete()
class TestDloUTF8(Base2, TestDlo):
set_up = False
-class TestFileComparisonEnv:
+class TestFileComparisonEnv(object):
@classmethod
def setUp(cls):
cls.conn = Connection(config)
@@ -1806,19 +1827,8 @@ class TestSloEnv(object):
cls.conn.authenticate()
if cls.slo_enabled is None:
- status = cls.conn.make_request('GET', '/info',
- cfg={'verbatim_path': True})
- if not (200 <= status <= 299):
- # Can't tell if SLO is enabled or not since we're running
- # against an old cluster, so let's skip the tests instead of
- # possibly having spurious failures.
- cls.slo_enabled = False
- else:
- # Don't bother looking for ValueError here. If something is
- # responding to a GET /info request with invalid JSON, then
- # the cluster is broken and a test failure will let us know.
- cluster_info = json.loads(cls.conn.response.read())
- cls.slo_enabled = 'slo' in cluster_info
+ cluster_info = cls.conn.cluster_info()
+ cls.slo_enabled = 'slo' in cluster_info
if not cls.slo_enabled:
return
@@ -2034,5 +2044,347 @@ class TestSloUTF8(Base2, TestSlo):
set_up = False
+class TestObjectVersioningEnv(object):
+ versioning_enabled = None # tri-state: None initially, then True/False
+
+ @classmethod
+ def setUp(cls):
+ cls.conn = Connection(config)
+ cls.conn.authenticate()
+
+ cls.account = Account(cls.conn, config.get('account',
+ config['username']))
+
+ # avoid getting a prefix that stops halfway through an encoded
+ # character
+ prefix = Utils.create_name().decode("utf-8")[:10].encode("utf-8")
+
+ cls.versions_container = cls.account.container(prefix + "-versions")
+ if not cls.versions_container.create():
+ raise ResponseError(cls.conn.response)
+
+ cls.container = cls.account.container(prefix + "-objs")
+ if not cls.container.create(
+ hdrs={'X-Versions-Location': cls.versions_container.name}):
+ raise ResponseError(cls.conn.response)
+
+ container_info = cls.container.info()
+ # if versioning is off, then X-Versions-Location won't persist
+ cls.versioning_enabled = 'versions' in container_info
+
+
+class TestObjectVersioning(Base):
+ env = TestObjectVersioningEnv
+ set_up = False
+
+ def setUp(self):
+ super(TestObjectVersioning, self).setUp()
+ if self.env.versioning_enabled is False:
+ raise SkipTest("Object versioning not enabled")
+ elif self.env.versioning_enabled is not True:
+ # just some sanity checking
+ raise Exception(
+ "Expected versioning_enabled to be True/False, got %r" %
+ (self.env.versioning_enabled,))
+
+ def test_overwriting(self):
+ container = self.env.container
+ versions_container = self.env.versions_container
+ obj_name = Utils.create_name()
+
+ versioned_obj = container.file(obj_name)
+ versioned_obj.write("aaaaa")
+
+ self.assertEqual(0, versions_container.info()['object_count'])
+
+ versioned_obj.write("bbbbb")
+
+ # the old version got saved off
+ self.assertEqual(1, versions_container.info()['object_count'])
+ versioned_obj_name = versions_container.files()[0]
+ self.assertEqual(
+ "aaaaa", versions_container.file(versioned_obj_name).read())
+
+ # if we overwrite it again, there are two versions
+ versioned_obj.write("ccccc")
+ self.assertEqual(2, versions_container.info()['object_count'])
+
+ # as we delete things, the old contents return
+ self.assertEqual("ccccc", versioned_obj.read())
+ versioned_obj.delete()
+ self.assertEqual("bbbbb", versioned_obj.read())
+ versioned_obj.delete()
+ self.assertEqual("aaaaa", versioned_obj.read())
+ versioned_obj.delete()
+ self.assertRaises(ResponseError, versioned_obj.read)
+
+
+class TestObjectVersioningUTF8(Base2, TestObjectVersioning):
+ set_up = False
+
+
+class TestTempurlEnv(object):
+ tempurl_enabled = None # tri-state: None initially, then True/False
+
+ @classmethod
+ def setUp(cls):
+ cls.conn = Connection(config)
+ cls.conn.authenticate()
+
+ if cls.tempurl_enabled is None:
+ cluster_info = cls.conn.cluster_info()
+ cls.tempurl_enabled = 'tempurl' in cluster_info
+ if not cls.tempurl_enabled:
+ return
+ cls.tempurl_methods = cluster_info['tempurl']['methods']
+
+ cls.tempurl_key = Utils.create_name()
+ cls.tempurl_key2 = Utils.create_name()
+
+ cls.account = Account(
+ cls.conn, config.get('account', config['username']))
+ cls.account.delete_containers()
+ cls.account.update_metadata({
+ 'temp-url-key': cls.tempurl_key,
+ 'temp-url-key-2': cls.tempurl_key2
+ })
+
+ cls.container = cls.account.container(Utils.create_name())
+ if not cls.container.create():
+ raise ResponseError(cls.conn.response)
+
+ cls.obj = cls.container.file(Utils.create_name())
+ cls.obj.write("obj contents")
+ cls.other_obj = cls.container.file(Utils.create_name())
+ cls.other_obj.write("other obj contents")
+
+
+class TestTempurl(Base):
+ env = TestTempurlEnv
+ set_up = False
+
+ def setUp(self):
+ super(TestTempurl, self).setUp()
+ if self.env.tempurl_enabled is False:
+ raise SkipTest("TempURL not enabled")
+ elif self.env.tempurl_enabled is not True:
+ # just some sanity checking
+ raise Exception(
+ "Expected tempurl_enabled to be True/False, got %r" %
+ (self.env.tempurl_enabled,))
+
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'GET', expires, self.env.conn.make_path(self.env.obj.path),
+ self.env.tempurl_key)
+ self.obj_tempurl_parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ def tempurl_sig(self, method, expires, path, key):
+ return hmac.new(
+ key,
+ '%s\n%s\n%s' % (method, expires, urllib.unquote(path)),
+ hashlib.sha1).hexdigest()
+
+ def test_GET(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ # GET tempurls also allow HEAD requests
+ self.assert_(self.env.obj.info(parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True}))
+
+ def test_GET_with_key_2(self):
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'GET', expires, self.env.conn.make_path(self.env.obj.path),
+ self.env.tempurl_key2)
+ parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ contents = self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ def test_PUT(self):
+ new_obj = self.env.container.file(Utils.create_name())
+
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'PUT', expires, self.env.conn.make_path(new_obj.path),
+ self.env.tempurl_key)
+ put_parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ new_obj.write('new obj contents',
+ parms=put_parms, cfg={'no_auth_token': True})
+ self.assertEqual(new_obj.read(), "new obj contents")
+
+ # PUT tempurls also allow HEAD requests
+ self.assert_(new_obj.info(parms=put_parms,
+ cfg={'no_auth_token': True}))
+
+ def test_HEAD(self):
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'HEAD', expires, self.env.conn.make_path(self.env.obj.path),
+ self.env.tempurl_key)
+ head_parms = {'temp_url_sig': sig,
+ 'temp_url_expires': str(expires)}
+
+ self.assert_(self.env.obj.info(parms=head_parms,
+ cfg={'no_auth_token': True}))
+ # HEAD tempurls don't allow PUT or GET requests, despite the fact that
+ # PUT and GET tempurls both allow HEAD requests
+ self.assertRaises(ResponseError, self.env.other_obj.read,
+ cfg={'no_auth_token': True},
+ parms=self.obj_tempurl_parms)
+ self.assert_status([401])
+
+ self.assertRaises(ResponseError, self.env.other_obj.write,
+ 'new contents',
+ cfg={'no_auth_token': True},
+ parms=self.obj_tempurl_parms)
+ self.assert_status([401])
+
+ def test_different_object(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ self.assertRaises(ResponseError, self.env.other_obj.read,
+ cfg={'no_auth_token': True},
+ parms=self.obj_tempurl_parms)
+ self.assert_status([401])
+
+ def test_changing_sig(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ parms = self.obj_tempurl_parms.copy()
+ if parms['temp_url_sig'][0] == 'a':
+ parms['temp_url_sig'] = 'b' + parms['temp_url_sig'][1:]
+ else:
+ parms['temp_url_sig'] = 'a' + parms['temp_url_sig'][1:]
+
+ self.assertRaises(ResponseError, self.env.obj.read,
+ cfg={'no_auth_token': True},
+ parms=parms)
+ self.assert_status([401])
+
+ def test_changing_expires(self):
+ contents = self.env.obj.read(
+ parms=self.obj_tempurl_parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(contents, "obj contents")
+
+ parms = self.obj_tempurl_parms.copy()
+ if parms['temp_url_expires'][-1] == '0':
+ parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '1'
+ else:
+ parms['temp_url_expires'] = parms['temp_url_expires'][:-1] + '0'
+
+ self.assertRaises(ResponseError, self.env.obj.read,
+ cfg={'no_auth_token': True},
+ parms=parms)
+ self.assert_status([401])
+
+
+class TestTempurlUTF8(Base2, TestTempurl):
+ set_up = False
+
+
+class TestSloTempurlEnv(object):
+ enabled = None # tri-state: None initially, then True/False
+
+ @classmethod
+ def setUp(cls):
+ cls.conn = Connection(config)
+ cls.conn.authenticate()
+
+ if cls.enabled is None:
+ cluster_info = cls.conn.cluster_info()
+ cls.enabled = 'tempurl' in cluster_info and 'slo' in cluster_info
+
+ cls.tempurl_key = Utils.create_name()
+
+ cls.account = Account(
+ cls.conn, config.get('account', config['username']))
+ cls.account.delete_containers()
+ cls.account.update_metadata({'temp-url-key': cls.tempurl_key})
+
+ cls.manifest_container = cls.account.container(Utils.create_name())
+ cls.segments_container = cls.account.container(Utils.create_name())
+ if not cls.manifest_container.create():
+ raise ResponseError(cls.conn.response)
+ if not cls.segments_container.create():
+ raise ResponseError(cls.conn.response)
+
+ seg1 = cls.segments_container.file(Utils.create_name())
+ seg1.write('1' * 1024 * 1024)
+
+ seg2 = cls.segments_container.file(Utils.create_name())
+ seg2.write('2' * 1024 * 1024)
+
+ cls.manifest_data = [{'size_bytes': 1024 * 1024,
+ 'etag': seg1.md5,
+ 'path': '/%s/%s' % (cls.segments_container.name,
+ seg1.name)},
+ {'size_bytes': 1024 * 1024,
+ 'etag': seg2.md5,
+ 'path': '/%s/%s' % (cls.segments_container.name,
+ seg2.name)}]
+
+ cls.manifest = cls.manifest_container.file(Utils.create_name())
+ cls.manifest.write(
+ json.dumps(cls.manifest_data),
+ parms={'multipart-manifest': 'put'})
+
+
+class TestSloTempurl(Base):
+ env = TestSloTempurlEnv
+ set_up = False
+
+ def setUp(self):
+ super(TestSloTempurl, self).setUp()
+ if self.env.enabled is False:
+ raise SkipTest("TempURL and SLO not both enabled")
+ elif self.env.enabled is not True:
+ # just some sanity checking
+ raise Exception(
+ "Expected enabled to be True/False, got %r" %
+ (self.env.enabled,))
+
+ def tempurl_sig(self, method, expires, path, key):
+ return hmac.new(
+ key,
+ '%s\n%s\n%s' % (method, expires, urllib.unquote(path)),
+ hashlib.sha1).hexdigest()
+
+ def test_GET(self):
+ expires = int(time.time()) + 86400
+ sig = self.tempurl_sig(
+ 'GET', expires, self.env.conn.make_path(self.env.manifest.path),
+ self.env.tempurl_key)
+ parms = {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
+
+ contents = self.env.manifest.read(
+ parms=parms,
+ cfg={'no_auth_token': True})
+ self.assertEqual(len(contents), 2 * 1024 * 1024)
+
+ # GET tempurls also allow HEAD requests
+ self.assert_(self.env.manifest.info(
+ parms=parms, cfg={'no_auth_token': True}))
+
+
+class TestSloTempurlUTF8(Base2, TestSloTempurl):
+ set_up = False
+
+
if __name__ == '__main__':
unittest.main()