diff options
Diffstat (limited to 'ufo/test/unit/common/test_utils.py')
-rw-r--r-- | ufo/test/unit/common/test_utils.py | 1020 |
1 files changed, 0 insertions, 1020 deletions
diff --git a/ufo/test/unit/common/test_utils.py b/ufo/test/unit/common/test_utils.py deleted file mode 100644 index c645509f..00000000 --- a/ufo/test/unit/common/test_utils.py +++ /dev/null @@ -1,1020 +0,0 @@ -# Copyright (c) 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" Tests for common.utils """ - -import os -import unittest -import errno -import xattr -import cPickle as pickle -import tempfile -import hashlib -import tarfile -import shutil -from collections import defaultdict -from swift.common.utils import normalize_timestamp -from gluster.swift.common import utils, Glusterfs - -# -# Somewhat hacky way of emulating the operation of xattr calls. They are made -# against a dictionary that stores the xattr key/value pairs. -# -_xattrs = {} -_xattr_op_cnt = defaultdict(int) -_xattr_set_err = {} -_xattr_get_err = {} -_xattr_rem_err = {} - -def _xkey(path, key): - return "%s:%s" % (path, key) - -def _setxattr(path, key, value, *args, **kwargs): - _xattr_op_cnt['set'] += 1 - xkey = _xkey(path, key) - if xkey in _xattr_set_err: - e = IOError() - e.errno = _xattr_set_err[xkey] - raise e - global _xattrs - _xattrs[xkey] = value - -def _getxattr(path, key, *args, **kwargs): - _xattr_op_cnt['get'] += 1 - xkey = _xkey(path, key) - if xkey in _xattr_get_err: - e = IOError() - e.errno = _xattr_get_err[xkey] - raise e - global _xattrs - if xkey in _xattrs: - ret_val = _xattrs[xkey] - else: - e = IOError("Fake IOError") - e.errno = errno.ENODATA - raise e - return ret_val - -def _removexattr(path, key, *args, **kwargs): - _xattr_op_cnt['remove'] += 1 - xkey = _xkey(path, key) - if xkey in _xattr_rem_err: - e = IOError() - e.errno = _xattr_rem_err[xkey] - raise e - global _xattrs - if xkey in _xattrs: - del _xattrs[xkey] - else: - e = IOError("Fake IOError") - e.errno = errno.ENODATA - raise e - -def _initxattr(): - global _xattrs - _xattrs = {} - global _xattr_op_cnt - _xattr_op_cnt = defaultdict(int) - global _xattr_set_err, _xattr_get_err, _xattr_rem_err - _xattr_set_err = {} - _xattr_get_err = {} - _xattr_rem_err = {} - - # Save the current methods - global _xattr_set; _xattr_set = xattr.setxattr - global _xattr_get; _xattr_get = xattr.getxattr - global _xattr_remove; _xattr_remove = xattr.removexattr - - # Monkey patch the calls we use with our internal unit test versions - xattr.setxattr = _setxattr - xattr.getxattr = _getxattr - xattr.removexattr = _removexattr - -def _destroyxattr(): - # Restore the current methods just in case - global _xattr_set; xattr.setxattr = _xattr_set - global _xattr_get; xattr.getxattr = _xattr_get - global _xattr_remove; xattr.removexattr = _xattr_remove - # Destroy the stored values and - global _xattrs; _xattrs = None - - -class SimMemcache(object): - def __init__(self): - self._d = {} - - def get(self, key): - return self._d.get(key, None) - - def set(self, key, value): - self._d[key] = value - - -class TestUtils(unittest.TestCase): - """ Tests for common.utils """ - - def setUp(self): - _initxattr() - - def tearDown(self): - _destroyxattr() - - def test_write_metadata(self): - path = "/tmp/foo/w" - orig_d = { 'bar' : 'foo' } - utils.write_metadata(path, orig_d) - xkey = _xkey(path, utils.METADATA_KEY) - assert len(_xattrs.keys()) == 1 - assert xkey in _xattrs - assert orig_d == pickle.loads(_xattrs[xkey]) - assert _xattr_op_cnt['set'] == 1 - - def test_write_metadata_err(self): - path = "/tmp/foo/w" - orig_d = { 'bar' : 'foo' } - xkey = _xkey(path, utils.METADATA_KEY) - _xattr_set_err[xkey] = errno.EOPNOTSUPP - try: - utils.write_metadata(path, orig_d) - except IOError as e: - assert e.errno == errno.EOPNOTSUPP - assert len(_xattrs.keys()) == 0 - assert _xattr_op_cnt['set'] == 1 - else: - self.fail("Expected an IOError exception on write") - - def test_write_metadata_multiple(self): - # At 64 KB an xattr key/value pair, this should generate three keys. - path = "/tmp/foo/w" - orig_d = { 'bar' : 'x' * 150000 } - utils.write_metadata(path, orig_d) - assert len(_xattrs.keys()) == 3, "Expected 3 keys, found %d" % len(_xattrs.keys()) - payload = '' - for i in range(0,3): - xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or '')) - assert xkey in _xattrs - assert len(_xattrs[xkey]) <= utils.MAX_XATTR_SIZE - payload += _xattrs[xkey] - assert orig_d == pickle.loads(payload) - assert _xattr_op_cnt['set'] == 3, "%r" % _xattr_op_cnt - - def test_clean_metadata(self): - path = "/tmp/foo/c" - expected_d = { 'a': 'y' * 150000 } - expected_p = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL) - for i in range(0,3): - xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or '')) - _xattrs[xkey] = expected_p[:utils.MAX_XATTR_SIZE] - expected_p = expected_p[utils.MAX_XATTR_SIZE:] - assert not expected_p - utils.clean_metadata(path) - assert _xattr_op_cnt['remove'] == 4, "%r" % _xattr_op_cnt - - def test_clean_metadata_err(self): - path = "/tmp/foo/c" - xkey = _xkey(path, utils.METADATA_KEY) - _xattrs[xkey] = pickle.dumps({ 'a': 'y' }, utils.PICKLE_PROTOCOL) - _xattr_rem_err[xkey] = errno.EOPNOTSUPP - try: - utils.clean_metadata(path) - except IOError as e: - assert e.errno == errno.EOPNOTSUPP - assert _xattr_op_cnt['remove'] == 1, "%r" % _xattr_op_cnt - else: - self.fail("Expected an IOError exception on remove") - - def test_read_metadata(self): - path = "/tmp/foo/r" - expected_d = { 'a': 'y' } - xkey = _xkey(path, utils.METADATA_KEY) - _xattrs[xkey] = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL) - res_d = utils.read_metadata(path) - assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d) - assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt - - def test_read_metadata_notfound(self): - path = "/tmp/foo/r" - res_d = utils.read_metadata(path) - assert res_d == {} - assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt - - def test_read_metadata_err(self): - path = "/tmp/foo/r" - expected_d = { 'a': 'y' } - xkey = _xkey(path, utils.METADATA_KEY) - _xattrs[xkey] = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL) - _xattr_get_err[xkey] = errno.EOPNOTSUPP - try: - res_d = utils.read_metadata(path) - except IOError as e: - assert e.errno == errno.EOPNOTSUPP - assert (_xattr_op_cnt['get'] == 1), "%r" % _xattr_op_cnt - else: - self.fail("Expected an IOError exception on get") - - def test_read_metadata_multiple(self): - path = "/tmp/foo/r" - expected_d = { 'a': 'y' * 150000 } - expected_p = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL) - for i in range(0,3): - xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or '')) - _xattrs[xkey] = expected_p[:utils.MAX_XATTR_SIZE] - expected_p = expected_p[utils.MAX_XATTR_SIZE:] - assert not expected_p - res_d = utils.read_metadata(path) - assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d) - assert _xattr_op_cnt['get'] == 3, "%r" % _xattr_op_cnt - - def test_read_metadata_multiple_one_missing(self): - path = "/tmp/foo/r" - expected_d = { 'a': 'y' * 150000 } - expected_p = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL) - for i in range(0,2): - xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or '')) - _xattrs[xkey] = expected_p[:utils.MAX_XATTR_SIZE] - expected_p = expected_p[utils.MAX_XATTR_SIZE:] - assert len(expected_p) <= utils.MAX_XATTR_SIZE - res_d = utils.read_metadata(path) - assert res_d == {} - assert _xattr_op_cnt['get'] == 3, "%r" % _xattr_op_cnt - assert len(_xattrs.keys()) == 0, "Expected 0 keys, found %d" % len(_xattrs.keys()) - - def test_restore_metadata_none(self): - # No initial metadata - path = "/tmp/foo/i" - res_d = utils.restore_metadata(path, { 'b': 'y' }) - expected_d = { 'b': 'y' } - assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d) - assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt - assert _xattr_op_cnt['set'] == 1, "%r" % _xattr_op_cnt - - def test_restore_metadata(self): - # Initial metadata - path = "/tmp/foo/i" - initial_d = { 'a': 'z' } - xkey = _xkey(path, utils.METADATA_KEY) - _xattrs[xkey] = pickle.dumps(initial_d, utils.PICKLE_PROTOCOL) - res_d = utils.restore_metadata(path, { 'b': 'y' }) - expected_d = { 'a': 'z', 'b': 'y' } - assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d) - assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt - assert _xattr_op_cnt['set'] == 1, "%r" % _xattr_op_cnt - - def test_restore_metadata_nochange(self): - # Initial metadata but no changes - path = "/tmp/foo/i" - initial_d = { 'a': 'z' } - xkey = _xkey(path, utils.METADATA_KEY) - _xattrs[xkey] = pickle.dumps(initial_d, utils.PICKLE_PROTOCOL) - res_d = utils.restore_metadata(path, {}) - expected_d = { 'a': 'z' } - assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d) - assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt - assert _xattr_op_cnt['set'] == 0, "%r" % _xattr_op_cnt - - def test_add_timestamp_empty(self): - orig = {} - res = utils._add_timestamp(orig) - assert res == {} - - def test_add_timestamp_none(self): - orig = { 'a': 1, 'b': 2, 'c': 3 } - exp = { 'a': (1, 0), 'b': (2, 0), 'c': (3, 0) } - res = utils._add_timestamp(orig) - assert res == exp - - def test_add_timestamp_mixed(self): - orig = { 'a': 1, 'b': (2, 1), 'c': 3 } - exp = { 'a': (1, 0), 'b': (2, 1), 'c': (3, 0) } - res = utils._add_timestamp(orig) - assert res == exp - - def test_add_timestamp_all(self): - orig = { 'a': (1, 0), 'b': (2, 1), 'c': (3, 0) } - res = utils._add_timestamp(orig) - assert res == orig - - def test_get_etag_empty(self): - tf = tempfile.NamedTemporaryFile() - hd = utils._get_etag(tf.name) - assert hd == hashlib.md5().hexdigest() - - def test_get_etag(self): - tf = tempfile.NamedTemporaryFile() - tf.file.write('123' * utils.CHUNK_SIZE) - tf.file.flush() - hd = utils._get_etag(tf.name) - tf.file.seek(0) - md5 = hashlib.md5() - while True: - chunk = tf.file.read(utils.CHUNK_SIZE) - if not chunk: - break - md5.update(chunk) - assert hd == md5.hexdigest() - - def test_get_object_metadata_dne(self): - md = utils.get_object_metadata("/tmp/doesNotEx1st") - assert md == {} - - def test_get_object_metadata_err(self): - tf = tempfile.NamedTemporaryFile() - try: - md = utils.get_object_metadata(os.path.join(tf.name,"doesNotEx1st")) - except OSError as e: - assert e.errno != errno.ENOENT - else: - self.fail("Expected exception") - - obj_keys = (utils.X_TIMESTAMP, utils.X_CONTENT_TYPE, utils.X_ETAG, - utils.X_CONTENT_LENGTH, utils.X_TYPE, utils.X_OBJECT_TYPE) - - def test_get_object_metadata_file(self): - tf = tempfile.NamedTemporaryFile() - tf.file.write('123'); tf.file.flush() - md = utils.get_object_metadata(tf.name) - for key in self.obj_keys: - assert key in md, "Expected key %s in %r" % (key, md) - assert md[utils.X_TYPE] == utils.OBJECT - assert md[utils.X_OBJECT_TYPE] == utils.FILE - assert md[utils.X_CONTENT_TYPE] == utils.FILE_TYPE - assert md[utils.X_CONTENT_LENGTH] == os.path.getsize(tf.name) - assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(tf.name)) - assert md[utils.X_ETAG] == utils._get_etag(tf.name) - - def test_get_object_metadata_dir(self): - td = tempfile.mkdtemp() - try: - md = utils.get_object_metadata(td) - for key in self.obj_keys: - assert key in md, "Expected key %s in %r" % (key, md) - assert md[utils.X_TYPE] == utils.OBJECT - assert md[utils.X_OBJECT_TYPE] == utils.DIR - assert md[utils.X_CONTENT_TYPE] == utils.DIR_TYPE - assert md[utils.X_CONTENT_LENGTH] == 0 - assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(td)) - assert md[utils.X_ETAG] == hashlib.md5().hexdigest() - finally: - os.rmdir(td) - - def test_create_object_metadata_file(self): - tf = tempfile.NamedTemporaryFile() - tf.file.write('4567'); tf.file.flush() - r_md = utils.create_object_metadata(tf.name) - - xkey = _xkey(tf.name, utils.METADATA_KEY) - assert len(_xattrs.keys()) == 1 - assert xkey in _xattrs - assert _xattr_op_cnt['get'] == 1 - assert _xattr_op_cnt['set'] == 1 - md = pickle.loads(_xattrs[xkey]) - assert r_md == md - - for key in self.obj_keys: - assert key in md, "Expected key %s in %r" % (key, md) - assert md[utils.X_TYPE] == utils.OBJECT - assert md[utils.X_OBJECT_TYPE] == utils.FILE - assert md[utils.X_CONTENT_TYPE] == utils.FILE_TYPE - assert md[utils.X_CONTENT_LENGTH] == os.path.getsize(tf.name) - assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(tf.name)) - assert md[utils.X_ETAG] == utils._get_etag(tf.name) - - def test_create_object_metadata_dir(self): - td = tempfile.mkdtemp() - try: - r_md = utils.create_object_metadata(td) - - xkey = _xkey(td, utils.METADATA_KEY) - assert len(_xattrs.keys()) == 1 - assert xkey in _xattrs - assert _xattr_op_cnt['get'] == 1 - assert _xattr_op_cnt['set'] == 1 - md = pickle.loads(_xattrs[xkey]) - assert r_md == md - - for key in self.obj_keys: - assert key in md, "Expected key %s in %r" % (key, md) - assert md[utils.X_TYPE] == utils.OBJECT - assert md[utils.X_OBJECT_TYPE] == utils.DIR - assert md[utils.X_CONTENT_TYPE] == utils.DIR_TYPE - assert md[utils.X_CONTENT_LENGTH] == 0 - assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(td)) - assert md[utils.X_ETAG] == hashlib.md5().hexdigest() - finally: - os.rmdir(td) - - def test_get_container_metadata(self): - def _mock_get_container_details(path, memcache=None): - o_list = [ 'a', 'b', 'c' ] - o_count = 3 - b_used = 47 - return o_list, o_count, b_used - orig_gcd = utils.get_container_details - utils.get_container_details = _mock_get_container_details - td = tempfile.mkdtemp() - try: - exp_md = { - utils.X_TYPE: (utils.CONTAINER, 0), - utils.X_TIMESTAMP: (normalize_timestamp(os.path.getctime(td)), 0), - utils.X_PUT_TIMESTAMP: (normalize_timestamp(os.path.getmtime(td)), 0), - utils.X_OBJECTS_COUNT: (3, 0), - utils.X_BYTES_USED: (47, 0), - } - md = utils.get_container_metadata(td) - assert md == exp_md - finally: - utils.get_container_details = orig_gcd - os.rmdir(td) - - def test_get_account_metadata(self): - def _mock_get_account_details(path, memcache=None): - c_list = [ '123', 'abc' ] - c_count = 2 - return c_list, c_count - orig_gad = utils.get_account_details - utils.get_account_details = _mock_get_account_details - td = tempfile.mkdtemp() - try: - exp_md = { - utils.X_TYPE: (utils.ACCOUNT, 0), - utils.X_TIMESTAMP: (normalize_timestamp(os.path.getctime(td)), 0), - utils.X_PUT_TIMESTAMP: (normalize_timestamp(os.path.getmtime(td)), 0), - utils.X_OBJECTS_COUNT: (0, 0), - utils.X_BYTES_USED: (0, 0), - utils.X_CONTAINER_COUNT: (2, 0), - } - md = utils.get_account_metadata(td) - assert md == exp_md - finally: - utils.get_account_details = orig_gad - os.rmdir(td) - - cont_keys = [utils.X_TYPE, utils.X_TIMESTAMP, utils.X_PUT_TIMESTAMP, - utils.X_OBJECTS_COUNT, utils.X_BYTES_USED] - - def test_create_container_metadata(self): - td = tempfile.mkdtemp() - try: - r_md = utils.create_container_metadata(td) - - xkey = _xkey(td, utils.METADATA_KEY) - assert len(_xattrs.keys()) == 1 - assert xkey in _xattrs - assert _xattr_op_cnt['get'] == 1 - assert _xattr_op_cnt['set'] == 1 - md = pickle.loads(_xattrs[xkey]) - assert r_md == md - - for key in self.cont_keys: - assert key in md, "Expected key %s in %r" % (key, md) - assert md[utils.X_TYPE] == (utils.CONTAINER, 0) - assert md[utils.X_TIMESTAMP] == (normalize_timestamp(os.path.getctime(td)), 0) - assert md[utils.X_PUT_TIMESTAMP] == (normalize_timestamp(os.path.getmtime(td)), 0) - assert md[utils.X_OBJECTS_COUNT] == (0, 0) - assert md[utils.X_BYTES_USED] == (0, 0) - finally: - os.rmdir(td) - - acct_keys = [val for val in cont_keys] - acct_keys.append(utils.X_CONTAINER_COUNT) - - def test_create_account_metadata(self): - td = tempfile.mkdtemp() - try: - r_md = utils.create_account_metadata(td) - - xkey = _xkey(td, utils.METADATA_KEY) - assert len(_xattrs.keys()) == 1 - assert xkey in _xattrs - assert _xattr_op_cnt['get'] == 1 - assert _xattr_op_cnt['set'] == 1 - md = pickle.loads(_xattrs[xkey]) - assert r_md == md - - for key in self.acct_keys: - assert key in md, "Expected key %s in %r" % (key, md) - assert md[utils.X_TYPE] == (utils.ACCOUNT, 0) - assert md[utils.X_TIMESTAMP] == (normalize_timestamp(os.path.getctime(td)), 0) - assert md[utils.X_PUT_TIMESTAMP] == (normalize_timestamp(os.path.getmtime(td)), 0) - assert md[utils.X_OBJECTS_COUNT] == (0, 0) - assert md[utils.X_BYTES_USED] == (0, 0) - assert md[utils.X_CONTAINER_COUNT] == (0, 0) - finally: - os.rmdir(td) - - def test_container_details_uncached(self): - the_path = "/tmp/bar" - def mock_get_container_details_from_fs(cont_path): - bu = 5 - oc = 1 - ol = ['foo',] - dl = [('a',100),] - return utils.ContainerDetails(bu, oc, ol, dl) - orig_gcdff = utils._get_container_details_from_fs - utils._get_container_details_from_fs = mock_get_container_details_from_fs - try: - retval = utils.get_container_details(the_path) - cd = mock_get_container_details_from_fs(the_path) - assert retval == (cd.obj_list, cd.object_count, cd.bytes_used) - finally: - utils._get_container_details_from_fs = orig_gcdff - - def test_container_details_cached_hit(self): - mc = SimMemcache() - the_path = "/tmp/bar" - def mock_get_container_details_from_fs(cont_path, bu_p=5): - bu = bu_p - oc = 1 - ol = ['foo',] - dl = [('a',100),] - return utils.ContainerDetails(bu, oc, ol, dl) - def mock_do_stat(path): - class MockStat(object): - def __init__(self, mtime): - self.st_mtime = mtime - return MockStat(100) - cd = mock_get_container_details_from_fs(the_path, bu_p=6) - mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path, cd) - orig_gcdff = utils._get_container_details_from_fs - utils._get_container_details_from_fs = mock_get_container_details_from_fs - orig_ds = utils.do_stat - utils.do_stat = mock_do_stat - try: - retval = utils.get_container_details(the_path, memcache=mc) - # If it did not properly use memcache, the default mocked version - # of get details from fs would return 5 bytes used instead of the - # 6 we specified above. - cd = mock_get_container_details_from_fs(the_path, bu_p=6) - assert retval == (cd.obj_list, cd.object_count, cd.bytes_used) - finally: - utils._get_container_details_from_fs = orig_gcdff - utils.do_stat = orig_ds - - def test_container_details_cached_miss_key(self): - mc = SimMemcache() - the_path = "/tmp/bar" - def mock_get_container_details_from_fs(cont_path, bu_p=5): - bu = bu_p - oc = 1 - ol = ['foo',] - dl = [('a',100),] - return utils.ContainerDetails(bu, oc, ol, dl) - def mock_do_stat(path): - # Be sure we don't miss due to mtimes not matching - self.fail("do_stat should not have been called") - cd = mock_get_container_details_from_fs(the_path + "u", bu_p=6) - mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path + "u", cd) - orig_gcdff = utils._get_container_details_from_fs - utils._get_container_details_from_fs = mock_get_container_details_from_fs - orig_ds = utils.do_stat - utils.do_stat = mock_do_stat - try: - retval = utils.get_container_details(the_path, memcache=mc) - cd = mock_get_container_details_from_fs(the_path) - assert retval == (cd.obj_list, cd.object_count, cd.bytes_used) - mkey = utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path - assert mkey in mc._d - finally: - utils._get_container_details_from_fs = orig_gcdff - utils.do_stat = orig_ds - - def test_container_details_cached_miss_dir_list(self): - mc = SimMemcache() - the_path = "/tmp/bar" - def mock_get_container_details_from_fs(cont_path, bu_p=5): - bu = bu_p - oc = 1 - ol = ['foo',] - dl = [] - return utils.ContainerDetails(bu, oc, ol, dl) - def mock_do_stat(path): - # Be sure we don't miss due to mtimes not matching - self.fail("do_stat should not have been called") - cd = mock_get_container_details_from_fs(the_path, bu_p=6) - mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path, cd) - orig_gcdff = utils._get_container_details_from_fs - utils._get_container_details_from_fs = mock_get_container_details_from_fs - orig_ds = utils.do_stat - utils.do_stat = mock_do_stat - try: - retval = utils.get_container_details(the_path, memcache=mc) - cd = mock_get_container_details_from_fs(the_path) - assert retval == (cd.obj_list, cd.object_count, cd.bytes_used) - mkey = utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path - assert mkey in mc._d - assert 5 == mc._d[mkey].bytes_used - finally: - utils._get_container_details_from_fs = orig_gcdff - utils.do_stat = orig_ds - - def test_container_details_cached_miss_mtime(self): - mc = SimMemcache() - the_path = "/tmp/bar" - def mock_get_container_details_from_fs(cont_path, bu_p=5): - bu = bu_p - oc = 1 - ol = ['foo',] - dl = [('a',100),] - return utils.ContainerDetails(bu, oc, ol, dl) - def mock_do_stat(path): - # Be sure we miss due to mtimes not matching - class MockStat(object): - def __init__(self, mtime): - self.st_mtime = mtime - return MockStat(200) - cd = mock_get_container_details_from_fs(the_path, bu_p=6) - mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path, cd) - orig_gcdff = utils._get_container_details_from_fs - utils._get_container_details_from_fs = mock_get_container_details_from_fs - orig_ds = utils.do_stat - utils.do_stat = mock_do_stat - try: - retval = utils.get_container_details(the_path, memcache=mc) - cd = mock_get_container_details_from_fs(the_path) - assert retval == (cd.obj_list, cd.object_count, cd.bytes_used) - mkey = utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path - assert mkey in mc._d - assert 5 == mc._d[mkey].bytes_used - finally: - utils._get_container_details_from_fs = orig_gcdff - utils.do_stat = orig_ds - - def test_account_details_uncached(self): - the_path = "/tmp/bar" - def mock_get_account_details_from_fs(acc_path, acc_stats): - mt = 100 - cc = 2 - cl = ['a', 'b'] - return utils.AccountDetails(mt, cc, cl) - orig_gcdff = utils._get_account_details_from_fs - utils._get_account_details_from_fs = mock_get_account_details_from_fs - try: - retval = utils.get_account_details(the_path) - ad = mock_get_account_details_from_fs(the_path, None) - assert retval == (ad.container_list, ad.container_count) - finally: - utils._get_account_details_from_fs = orig_gcdff - - def test_account_details_cached_hit(self): - mc = SimMemcache() - the_path = "/tmp/bar" - def mock_get_account_details_from_fs(acc_path, acc_stats): - mt = 100 - cc = 2 - cl = ['a', 'b'] - return utils.AccountDetails(mt, cc, cl) - def mock_do_stat(path): - class MockStat(object): - def __init__(self, mtime): - self.st_mtime = mtime - return MockStat(100) - ad = mock_get_account_details_from_fs(the_path, None) - ad.container_list = ['x', 'y'] - mc.set(utils.MEMCACHE_ACCOUNT_DETAILS_KEY_PREFIX + the_path, ad) - orig_gcdff = utils._get_account_details_from_fs - orig_ds = utils.do_stat - utils._get_account_details_from_fs = mock_get_account_details_from_fs - utils.do_stat = mock_do_stat - try: - retval = utils.get_account_details(the_path, memcache=mc) - assert retval == (ad.container_list, ad.container_count) - wrong_ad = mock_get_account_details_from_fs(the_path, None) - assert wrong_ad != ad - finally: - utils._get_account_details_from_fs = orig_gcdff - utils.do_stat = orig_ds - - def test_account_details_cached_miss(self): - mc = SimMemcache() - the_path = "/tmp/bar" - def mock_get_account_details_from_fs(acc_path, acc_stats): - mt = 100 - cc = 2 - cl = ['a', 'b'] - return utils.AccountDetails(mt, cc, cl) - def mock_do_stat(path): - class MockStat(object): - def __init__(self, mtime): - self.st_mtime = mtime - return MockStat(100) - ad = mock_get_account_details_from_fs(the_path, None) - ad.container_list = ['x', 'y'] - mc.set(utils.MEMCACHE_ACCOUNT_DETAILS_KEY_PREFIX + the_path + 'u', ad) - orig_gcdff = utils._get_account_details_from_fs - orig_ds = utils.do_stat - utils._get_account_details_from_fs = mock_get_account_details_from_fs - utils.do_stat = mock_do_stat - try: - retval = utils.get_account_details(the_path, memcache=mc) - correct_ad = mock_get_account_details_from_fs(the_path, None) - assert retval == (correct_ad.container_list, correct_ad.container_count) - assert correct_ad != ad - finally: - utils._get_account_details_from_fs = orig_gcdff - utils.do_stat = orig_ds - - def test_account_details_cached_miss_mtime(self): - mc = SimMemcache() - the_path = "/tmp/bar" - def mock_get_account_details_from_fs(acc_path, acc_stats): - mt = 100 - cc = 2 - cl = ['a', 'b'] - return utils.AccountDetails(mt, cc, cl) - def mock_do_stat(path): - class MockStat(object): - def __init__(self, mtime): - self.st_mtime = mtime - return MockStat(100) - ad = mock_get_account_details_from_fs(the_path, None) - ad.container_list = ['x', 'y'] - ad.mtime = 200 - mc.set(utils.MEMCACHE_ACCOUNT_DETAILS_KEY_PREFIX + the_path, ad) - orig_gcdff = utils._get_account_details_from_fs - orig_ds = utils.do_stat - utils._get_account_details_from_fs = mock_get_account_details_from_fs - utils.do_stat = mock_do_stat - try: - retval = utils.get_account_details(the_path, memcache=mc) - correct_ad = mock_get_account_details_from_fs(the_path, None) - assert retval == (correct_ad.container_list, correct_ad.container_count) - assert correct_ad != ad - finally: - utils._get_account_details_from_fs = orig_gcdff - utils.do_stat = orig_ds - - def test_get_account_details_from_fs(self): - orig_cwd = os.getcwd() - td = tempfile.mkdtemp() - try: - tf = tarfile.open("common/data/account_tree.tar.bz2", "r:bz2") - os.chdir(td) - tf.extractall() - - ad = utils._get_account_details_from_fs(td, None) - assert ad.mtime == os.path.getmtime(td) - assert ad.container_count == 3 - assert set(ad.container_list) == set(['c1', 'c2', 'c3']) - finally: - os.chdir(orig_cwd) - shutil.rmtree(td) - - def test_get_container_details_from_fs_notadir(self): - tf = tempfile.NamedTemporaryFile() - cd = utils._get_container_details_from_fs(tf.name) - assert cd.bytes_used == 0 - assert cd.object_count == 0 - assert cd.obj_list == [] - assert cd.dir_list == [] - - def test_get_container_details_from_fs(self): - orig_cwd = os.getcwd() - td = tempfile.mkdtemp() - try: - tf = tarfile.open("common/data/container_tree.tar.bz2", "r:bz2") - os.chdir(td) - tf.extractall() - - cd = utils._get_container_details_from_fs(td) - assert cd.bytes_used == 0, repr(cd.bytes_used) - assert cd.object_count == 8, repr(cd.object_count) - assert set(cd.obj_list) == set(['file1', 'file3', 'file2', - 'dir3', 'dir1', 'dir2', - 'dir1/file1', 'dir1/file2' - ]), repr(cd.obj_list) - - full_dir1 = os.path.join(td, 'dir1') - full_dir2 = os.path.join(td, 'dir2') - full_dir3 = os.path.join(td, 'dir3') - exp_dir_dict = { td: os.path.getmtime(td), - full_dir1: os.path.getmtime(full_dir1), - full_dir2: os.path.getmtime(full_dir2), - full_dir3: os.path.getmtime(full_dir3), - } - for d,m in cd.dir_list: - assert d in exp_dir_dict - assert exp_dir_dict[d] == m - finally: - os.chdir(orig_cwd) - shutil.rmtree(td) - - - def test_get_container_details_from_fs_do_getsize_true(self): - orig_cwd = os.getcwd() - td = tempfile.mkdtemp() - try: - tf = tarfile.open("common/data/container_tree.tar.bz2", "r:bz2") - os.chdir(td) - tf.extractall() - - __do_getsize = Glusterfs._do_getsize - Glusterfs._do_getsize = True - - cd = utils._get_container_details_from_fs(td) - assert cd.bytes_used == 30, repr(cd.bytes_used) - assert cd.object_count == 8, repr(cd.object_count) - assert set(cd.obj_list) == set(['file1', 'file3', 'file2', - 'dir3', 'dir1', 'dir2', - 'dir1/file1', 'dir1/file2' - ]), repr(cd.obj_list) - - full_dir1 = os.path.join(td, 'dir1') - full_dir2 = os.path.join(td, 'dir2') - full_dir3 = os.path.join(td, 'dir3') - exp_dir_dict = { td: os.path.getmtime(td), - full_dir1: os.path.getmtime(full_dir1), - full_dir2: os.path.getmtime(full_dir2), - full_dir3: os.path.getmtime(full_dir3), - } - for d,m in cd.dir_list: - assert d in exp_dir_dict - assert exp_dir_dict[d] == m - finally: - Glusterfs._do_getsize = __do_getsize - os.chdir(orig_cwd) - shutil.rmtree(td) - - def test_get_account_details_from_fs_notadir_w_stats(self): - tf = tempfile.NamedTemporaryFile() - ad = utils._get_account_details_from_fs(tf.name, os.stat(tf.name)) - assert ad.mtime == os.path.getmtime(tf.name) - assert ad.container_count == 0 - assert ad.container_list == [] - - def test_get_account_details_from_fs_notadir(self): - tf = tempfile.NamedTemporaryFile() - ad = utils._get_account_details_from_fs(tf.name, None) - assert ad.mtime == os.path.getmtime(tf.name) - assert ad.container_count == 0 - assert ad.container_list == [] - - def test_write_pickle(self): - td = tempfile.mkdtemp() - try: - fpp = os.path.join(td, 'pp') - utils.write_pickle('pickled peppers', fpp) - with open(fpp, "rb") as f: - contents = f.read() - s = pickle.loads(contents) - assert s == 'pickled peppers', repr(s) - finally: - shutil.rmtree(td) - - def test_write_pickle_ignore_tmp(self): - tf = tempfile.NamedTemporaryFile() - td = tempfile.mkdtemp() - try: - fpp = os.path.join(td, 'pp') - # Also test an explicity pickle protocol - utils.write_pickle('pickled peppers', fpp, tmp=tf.name, pickle_protocol=2) - with open(fpp, "rb") as f: - contents = f.read() - s = pickle.loads(contents) - assert s == 'pickled peppers', repr(s) - with open(tf.name, "rb") as f: - contents = f.read() - assert contents == '' - finally: - shutil.rmtree(td) - - def test_check_user_xattr_bad_path(self): - assert False == utils.check_user_xattr("/tmp/foo/bar/check/user/xattr") - - def test_check_user_xattr_bad_set(self): - td = tempfile.mkdtemp() - xkey = _xkey(td, 'user.test.key1') - _xattr_set_err[xkey] = errno.EOPNOTSUPP - try: - assert False == utils.check_user_xattr(td) - except IOError: - pass - else: - self.fail("Expected IOError") - finally: - shutil.rmtree(td) - - def test_check_user_xattr_bad_remove(self): - td = tempfile.mkdtemp() - xkey = _xkey(td, 'user.test.key1') - _xattr_rem_err[xkey] = errno.EOPNOTSUPP - try: - utils.check_user_xattr(td) - except IOError: - self.fail("Unexpected IOError") - finally: - shutil.rmtree(td) - - def test_check_user_xattr(self): - td = tempfile.mkdtemp() - try: - assert utils.check_user_xattr(td) - finally: - shutil.rmtree(td) - - def test_validate_container_empty(self): - ret = utils.validate_container({}) - assert ret == False - - def test_validate_container_missing_keys(self): - ret = utils.validate_container({ 'foo': 'bar' }) - assert ret == False - - def test_validate_container_bad_type(self): - md = { utils.X_TYPE: ('bad', 0), - utils.X_TIMESTAMP: ('na', 0), - utils.X_PUT_TIMESTAMP: ('na', 0), - utils.X_OBJECTS_COUNT: ('na', 0), - utils.X_BYTES_USED: ('na', 0) } - ret = utils.validate_container(md) - assert ret == False - - def test_validate_container_good_type(self): - md = { utils.X_TYPE: (utils.CONTAINER, 0), - utils.X_TIMESTAMP: ('na', 0), - utils.X_PUT_TIMESTAMP: ('na', 0), - utils.X_OBJECTS_COUNT: ('na', 0), - utils.X_BYTES_USED: ('na', 0) } - ret = utils.validate_container(md) - assert ret - - def test_validate_account_empty(self): - ret = utils.validate_account({}) - assert ret == False - - def test_validate_account_missing_keys(self): - ret = utils.validate_account({ 'foo': 'bar' }) - assert ret == False - - def test_validate_account_bad_type(self): - md = { utils.X_TYPE: ('bad', 0), - utils.X_TIMESTAMP: ('na', 0), - utils.X_PUT_TIMESTAMP: ('na', 0), - utils.X_OBJECTS_COUNT: ('na', 0), - utils.X_BYTES_USED: ('na', 0), - utils.X_CONTAINER_COUNT: ('na', 0) } - ret = utils.validate_account(md) - assert ret == False - - def test_validate_account_good_type(self): - md = { utils.X_TYPE: (utils.ACCOUNT, 0), - utils.X_TIMESTAMP: ('na', 0), - utils.X_PUT_TIMESTAMP: ('na', 0), - utils.X_OBJECTS_COUNT: ('na', 0), - utils.X_BYTES_USED: ('na', 0), - utils.X_CONTAINER_COUNT: ('na', 0) } - ret = utils.validate_account(md) - assert ret - - def test_validate_object_empty(self): - ret = utils.validate_object({}) - assert ret == False - - def test_validate_object_missing_keys(self): - ret = utils.validate_object({ 'foo': 'bar' }) - assert ret == False - - def test_validate_object_bad_type(self): - md = { utils.X_TIMESTAMP: 'na', - utils.X_CONTENT_TYPE: 'na', - utils.X_ETAG: 'bad', - utils.X_CONTENT_LENGTH: 'na', - utils.X_TYPE: 'bad', - utils.X_OBJECT_TYPE: 'na' } - ret = utils.validate_object(md) - assert ret == False - - def test_validate_object_good_type(self): - md = { utils.X_TIMESTAMP: 'na', - utils.X_CONTENT_TYPE: 'na', - utils.X_ETAG: 'bad', - utils.X_CONTENT_LENGTH: 'na', - utils.X_TYPE: utils.OBJECT, - utils.X_OBJECT_TYPE: 'na' } - ret = utils.validate_object(md) - assert ret - - def test_is_marker_empty(self): - assert False == utils.is_marker(None) - - def test_is_marker_missing(self): - assert False == utils.is_marker( { 'foo': 'bar' } ) - - def test_is_marker_not_marker(self): - md = { utils.X_OBJECT_TYPE: utils.DIR } - assert False == utils.is_marker(md) - - def test_is_marker(self): - md = { utils.X_OBJECT_TYPE: utils.MARKER_DIR } - assert utils.is_marker(md) |