diff options
Diffstat (limited to 'ufo/test')
21 files changed, 0 insertions, 2526 deletions
diff --git a/ufo/test/__init__.py b/ufo/test/__init__.py deleted file mode 100644 index 50b24ed1..00000000 --- a/ufo/test/__init__.py +++ /dev/null @@ -1,49 +0,0 @@ -# See http://code.google.com/p/python-nose/issues/detail?id=373 -# The code below enables nosetests to work with i18n _() blocks - -import __builtin__ -import sys -import os -from ConfigParser import MissingSectionHeaderError -from StringIO import StringIO - -from swift.common.utils import readconf - -setattr(__builtin__, '_', lambda x: x) - - -# Work around what seems to be a Python bug. -# c.f. https://bugs.launchpad.net/swift/+bug/820185. -import logging -logging.raiseExceptions = False - - -def get_config(section_name=None, defaults=None): -    """ -    Attempt to get a test config dictionary. - -    :param section_name: the section to read (all sections if not defined) -    :param defaults: an optional dictionary namespace of defaults -    """ -    config_file = os.environ.get('SWIFT_TEST_CONFIG_FILE', -                                 '/etc/swift/test.conf') -    config = {} -    if defaults is not None: -        config.update(defaults) - -    try: -        config = readconf(config_file, section_name) -    except SystemExit: -        if not os.path.exists(config_file): -            print >>sys.stderr, \ -                'Unable to read test config %s - file not found' \ -                % config_file -        elif not os.access(config_file, os.R_OK): -            print >>sys.stderr, \ -                'Unable to read test config %s - permission denied' \ -                % config_file -        else: -            print >>sys.stderr, \ -                'Unable to read test config %s - section %s not found' \ -                % (config_file, section_name) -    return config diff --git a/ufo/test/unit/__init__.py b/ufo/test/unit/__init__.py deleted file mode 100644 index cb247643..00000000 --- a/ufo/test/unit/__init__.py +++ /dev/null @@ -1,95 +0,0 @@ -""" Gluster Swift Unit Tests """ - -import logging -from collections import defaultdict -from test import get_config -from swift.common.utils import TRUE_VALUES - - -class NullLoggingHandler(logging.Handler): - -    def emit(self, record): -        pass - - -class FakeLogger(object): -    # a thread safe logger - -    def __init__(self, *args, **kwargs): -        self._clear() -        self.level = logging.NOTSET -        if 'facility' in kwargs: -            self.facility = kwargs['facility'] - -    def _clear(self): -        self.log_dict = defaultdict(list) - -    def _store_in(store_name): -        def stub_fn(self, *args, **kwargs): -            self.log_dict[store_name].append((args, kwargs)) -        return stub_fn - -    error = _store_in('error') -    info = _store_in('info') -    warning = _store_in('warning') -    debug = _store_in('debug') - -    def exception(self, *args, **kwargs): -        self.log_dict['exception'].append((args, kwargs, str(exc_info()[1]))) - -    # mock out the StatsD logging methods: -    increment = _store_in('increment') -    decrement = _store_in('decrement') -    timing = _store_in('timing') -    timing_since = _store_in('timing_since') -    update_stats = _store_in('update_stats') -    set_statsd_prefix = _store_in('set_statsd_prefix') - -    def setFormatter(self, obj): -        self.formatter = obj - -    def close(self): -        self._clear() - -    def set_name(self, name): -        # don't touch _handlers -        self._name = name - -    def acquire(self): -        pass - -    def release(self): -        pass - -    def createLock(self): -        pass - -    def emit(self, record): -        pass - -    def handle(self, record): -        pass - -    def flush(self): -        pass - -    def handleError(self, record): -        pass - - -original_syslog_handler = logging.handlers.SysLogHandler - - -def fake_syslog_handler(): -    for attr in dir(original_syslog_handler): -        if attr.startswith('LOG'): -            setattr(FakeLogger, attr, -                    copy.copy(getattr(logging.handlers.SysLogHandler, attr))) -    FakeLogger.priority_map = \ -        copy.deepcopy(logging.handlers.SysLogHandler.priority_map) - -    logging.handlers.SysLogHandler = FakeLogger - - -if get_config('unit_test').get('fake_syslog', 'False').lower() in TRUE_VALUES: -    fake_syslog_handler() diff --git a/ufo/test/unit/common/__init__.py b/ufo/test/unit/common/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/ufo/test/unit/common/__init__.py +++ /dev/null diff --git a/ufo/test/unit/common/data/README.rings b/ufo/test/unit/common/data/README.rings deleted file mode 100644 index 6457501f..00000000 --- a/ufo/test/unit/common/data/README.rings +++ /dev/null @@ -1,3 +0,0 @@ -The unit tests expect certain ring data built using the following command: - -    ../../../../bin/gluster-swift-gen-builders test iops
\ No newline at end of file diff --git a/ufo/test/unit/common/data/account.builder b/ufo/test/unit/common/data/account.builder Binary files differdeleted file mode 100644 index 090ba4b7..00000000 --- a/ufo/test/unit/common/data/account.builder +++ /dev/null diff --git a/ufo/test/unit/common/data/account.ring.gz b/ufo/test/unit/common/data/account.ring.gz Binary files differdeleted file mode 100644 index 6d4c8547..00000000 --- a/ufo/test/unit/common/data/account.ring.gz +++ /dev/null diff --git a/ufo/test/unit/common/data/account_tree.tar.bz2 b/ufo/test/unit/common/data/account_tree.tar.bz2 Binary files differdeleted file mode 100644 index cb23e4dd..00000000 --- a/ufo/test/unit/common/data/account_tree.tar.bz2 +++ /dev/null diff --git a/ufo/test/unit/common/data/backups/1365124498.account.builder b/ufo/test/unit/common/data/backups/1365124498.account.builder Binary files differdeleted file mode 100644 index 090ba4b7..00000000 --- a/ufo/test/unit/common/data/backups/1365124498.account.builder +++ /dev/null diff --git a/ufo/test/unit/common/data/backups/1365124498.container.builder b/ufo/test/unit/common/data/backups/1365124498.container.builder Binary files differdeleted file mode 100644 index 733d27dd..00000000 --- a/ufo/test/unit/common/data/backups/1365124498.container.builder +++ /dev/null diff --git a/ufo/test/unit/common/data/backups/1365124498.object.builder b/ufo/test/unit/common/data/backups/1365124498.object.builder Binary files differdeleted file mode 100644 index ff877ec9..00000000 --- a/ufo/test/unit/common/data/backups/1365124498.object.builder +++ /dev/null diff --git a/ufo/test/unit/common/data/backups/1365124499.object.builder b/ufo/test/unit/common/data/backups/1365124499.object.builder Binary files differdeleted file mode 100644 index 8b8cd6c1..00000000 --- a/ufo/test/unit/common/data/backups/1365124499.object.builder +++ /dev/null diff --git a/ufo/test/unit/common/data/container.builder b/ufo/test/unit/common/data/container.builder Binary files differdeleted file mode 100644 index 733d27dd..00000000 --- a/ufo/test/unit/common/data/container.builder +++ /dev/null diff --git a/ufo/test/unit/common/data/container.ring.gz b/ufo/test/unit/common/data/container.ring.gz Binary files differdeleted file mode 100644 index 592b84ba..00000000 --- a/ufo/test/unit/common/data/container.ring.gz +++ /dev/null diff --git a/ufo/test/unit/common/data/container_tree.tar.bz2 b/ufo/test/unit/common/data/container_tree.tar.bz2 Binary files differdeleted file mode 100644 index b4a14928..00000000 --- a/ufo/test/unit/common/data/container_tree.tar.bz2 +++ /dev/null diff --git a/ufo/test/unit/common/data/object.builder b/ufo/test/unit/common/data/object.builder Binary files differdeleted file mode 100644 index 8b8cd6c1..00000000 --- a/ufo/test/unit/common/data/object.builder +++ /dev/null diff --git a/ufo/test/unit/common/data/object.ring.gz b/ufo/test/unit/common/data/object.ring.gz Binary files differdeleted file mode 100644 index d2f7192b..00000000 --- a/ufo/test/unit/common/data/object.ring.gz +++ /dev/null diff --git a/ufo/test/unit/common/test_Glusterfs.py b/ufo/test/unit/common/test_Glusterfs.py deleted file mode 100644 index 7de060ae..00000000 --- a/ufo/test/unit/common/test_Glusterfs.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright (c) 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -#    http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import os, fcntl, errno, shutil -from tempfile import mkdtemp -import gluster.swift.common.Glusterfs as gfs - -def mock_os_path_ismount(path): -    return True - -def mock_get_export_list(): -    return ['test', 'test2'] - -def mock_os_system(cmd): -    return False - -def mock_fcntl_lockf(f, *a, **kw): -    raise IOError(errno.EAGAIN) - -def _init(): -    global _RUN_DIR, _OS_SYSTEM, _FCNTL_LOCKF -    global _OS_PATH_ISMOUNT, __GET_EXPORT_LIST - -    _RUN_DIR          = gfs.RUN_DIR -    _OS_SYSTEM        = os.system -    _FCNTL_LOCKF      = fcntl.lockf -    _OS_PATH_ISMOUNT  = os.path.ismount -    __GET_EXPORT_LIST = gfs._get_export_list - -def _init_mock_variables(tmpdir): -    os.system            = mock_os_system -    os.path.ismount      = mock_os_path_ismount -    gfs.RUN_DIR          = os.path.join(tmpdir, 'var/run/swift') -    gfs._get_export_list = mock_get_export_list - -def _reset_mock_variables(): -    gfs.RUN_DIR          = _RUN_DIR -    gfs._get_export_list = __GET_EXPORT_LIST - -    os.system       = _OS_SYSTEM -    fcntl.lockf     = _FCNTL_LOCKF -    os.path.ismount = _OS_PATH_ISMOUNT - -class TestGlusterfs(unittest.TestCase): -    """ Tests for common.GlusterFS """ - -    def setUp(self): -        _init() - -    def test_mount(self): -        try: -            tmpdir = mkdtemp() -            root   = os.path.join(tmpdir, 'mnt/gluster-object') -            drive  = 'test' - -            _init_mock_variables(tmpdir) -            assert gfs.mount(root, drive) -        finally: -            _reset_mock_variables() -            shutil.rmtree(tmpdir) - -    def test_mount_egain(self): -        try: -            tmpdir = mkdtemp() -            root   = os.path.join(tmpdir, 'mnt/gluster-object') -            drive  = 'test' - -            _init_mock_variables(tmpdir) -            assert gfs.mount(root, drive) -            fcntl.lockf  = mock_fcntl_lockf -            assert gfs.mount(root, drive) -        finally: -            _reset_mock_variables() -            shutil.rmtree(tmpdir) - -    def test_mount_get_export_list_err(self): -        gfs._get_export_list = mock_get_export_list -        assert not gfs.mount(None, 'drive') -        _reset_mock_variables() - -    def tearDown(self): -        _reset_mock_variables() diff --git a/ufo/test/unit/common/test_diskfile.py b/ufo/test/unit/common/test_diskfile.py deleted file mode 100644 index 85d539a2..00000000 --- a/ufo/test/unit/common/test_diskfile.py +++ /dev/null @@ -1,932 +0,0 @@ -# Copyright (c) 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -#    http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" Tests for gluster.swift.common.DiskFile """ - -import os -import stat -import errno -import unittest -import tempfile -import shutil -from hashlib import md5 -from swift.common.utils import normalize_timestamp -from swift.common.exceptions import DiskFileNotExist -import gluster.swift.common.DiskFile -import gluster.swift.common.utils -from gluster.swift.common.DiskFile import Gluster_DiskFile, \ -    AlreadyExistsAsDir -from gluster.swift.common.utils import DEFAULT_UID, DEFAULT_GID, X_TYPE, \ -    X_OBJECT_TYPE -from test_utils import _initxattr, _destroyxattr -from test.unit import FakeLogger - - -_metadata = {} - -def _mock_read_metadata(filename): -    if filename in _metadata: -        md = _metadata[filename] -    else: -        md = {} -    return md - -def _mock_write_metadata(filename, metadata): -    _metadata[filename] = metadata - -def _mock_clear_metadata(): -    _metadata = {} - - -class MockException(Exception): -    pass - - -def _mock_rmdirs(p): -    raise MockException("gluster.swift.common.DiskFile.rmdirs() called") - -def _mock_do_listdir(p): -    raise MockException("gluster.swift.common.DiskFile.do_listdir() called") - -def _mock_do_unlink(f): -    ose = OSError() -    ose.errno = errno.ENOENT -    raise ose - - -def _mock_do_unlink_eacces_err(f): -    ose = OSError() -    ose.errno = errno.EACCES -    raise ose - -def _mock_getsize_eaccess_err(f): -    ose = OSError() -    ose.errno = errno.EACCES -    raise ose - -def _mock_do_rmdir_eacces_err(f): -    ose = OSError() -    ose.errno = errno.EACCES -    raise ose - -class MockRenamerCalled(Exception): -    pass - - -def _mock_renamer(a, b): -    raise MockRenamerCalled() - - -class TestDiskFile(unittest.TestCase): -    """ Tests for gluster.swift.common.DiskFile """ - -    def setUp(self): -        self.lg = FakeLogger() -        _initxattr() -        _mock_clear_metadata() -        self._saved_df_wm = gluster.swift.common.DiskFile.write_metadata -        self._saved_df_rm = gluster.swift.common.DiskFile.read_metadata -        gluster.swift.common.DiskFile.write_metadata = _mock_write_metadata -        gluster.swift.common.DiskFile.read_metadata = _mock_read_metadata -        self._saved_ut_wm = gluster.swift.common.utils.write_metadata -        self._saved_ut_rm = gluster.swift.common.utils.read_metadata -        gluster.swift.common.utils.write_metadata = _mock_write_metadata -        gluster.swift.common.utils.read_metadata = _mock_read_metadata - -    def tearDown(self): -        self.lg = None -        _destroyxattr() -        gluster.swift.common.DiskFile.write_metadata = self._saved_df_wm -        gluster.swift.common.DiskFile.read_metadata = self._saved_df_rm -        gluster.swift.common.utils.write_metadata = self._saved_ut_wm -        gluster.swift.common.utils.read_metadata = self._saved_ut_rm - -    def test_constructor_no_slash(self): -        assert not os.path.exists("/tmp/foo") -        gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", -                               "z", self.lg) -        assert gdf._obj == "z" -        assert gdf._obj_path == "" -        assert gdf.name == "bar" -        assert gdf.datadir == "/tmp/foo/vol0/bar" -        assert gdf.device_path == "/tmp/foo/vol0" -        assert gdf._container_path == "/tmp/foo/vol0/bar" -        assert gdf.disk_chunk_size == 65536 -        assert gdf.iter_hook == None -        assert gdf.logger == self.lg -        assert gdf.uid == DEFAULT_UID -        assert gdf.gid == DEFAULT_GID -        assert gdf.metadata == {} -        assert gdf.meta_file == None -        assert gdf.data_file == None -        assert gdf.fp == None -        assert gdf.iter_etag == None -        assert not gdf.started_at_0 -        assert not gdf.read_to_eof -        assert gdf.quarantined_dir == None -        assert not gdf.keep_cache -        assert not gdf._is_dir - -    def test_constructor_leadtrail_slash(self): -        assert not os.path.exists("/tmp/foo") -        gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", -                               "/b/a/z/", self.lg) -        assert gdf._obj == "z" -        assert gdf._obj_path == "b/a" -        assert gdf.name == "bar/b/a" -        assert gdf.datadir == "/tmp/foo/vol0/bar/b/a" -        assert gdf.device_path == "/tmp/foo/vol0" - -    def test_constructor_no_metadata(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            stats = os.stat(the_file) -            ts = normalize_timestamp(stats.st_ctime) -            etag = md5() -            etag.update("1234") -            etag = etag.hexdigest() -            exp_md = { -                'Content-Length': 4, -                'ETag': etag, -                'X-Timestamp': ts, -                'Content-Type': 'application/octet-stream'} -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            assert gdf._obj == "z" -            assert gdf.data_file == the_file -            assert not gdf._is_dir -            assert gdf.fp is None -            assert gdf.metadata == exp_md -        finally: -            shutil.rmtree(td) - -    def test_constructor_existing_metadata(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            ini_md = { -                'X-Type': 'Object', -                'X-Object-Type': 'file', -                'Content-Length': 5, -                'ETag': 'etag', -                'X-Timestamp': 'ts', -                'Content-Type': 'application/loctet-stream'} -            _metadata[the_file] = ini_md -            exp_md = ini_md.copy() -            del exp_md['X-Type'] -            del exp_md['X-Object-Type'] -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            assert gdf._obj == "z" -            assert gdf.data_file == the_file -            assert not gdf._is_dir -            assert gdf.fp is None -            assert gdf.metadata == exp_md -        finally: -            shutil.rmtree(td) - -    def test_constructor_invalid_existing_metadata(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        inv_md = { -            'Content-Length': 5, -            'ETag': 'etag', -            'X-Timestamp': 'ts', -            'Content-Type': 'application/loctet-stream'} -        _metadata[the_file] = inv_md -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            assert gdf._obj == "z" -            assert gdf.data_file == the_file -            assert not gdf._is_dir -            assert gdf.fp is None -            assert gdf.metadata != inv_md -        finally: -            shutil.rmtree(td) - -    def test_constructor_isdir(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "d") -        try: -            os.makedirs(the_dir) -            ini_md = { -                'X-Type': 'Object', -                'X-Object-Type': 'dir', -                'Content-Length': 5, -                'ETag': 'etag', -                'X-Timestamp': 'ts', -                'Content-Type': 'application/loctet-stream'} -            _metadata[the_dir] = ini_md -            exp_md = ini_md.copy() -            del exp_md['X-Type'] -            del exp_md['X-Object-Type'] -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "d", self.lg, keep_data_fp=True) -            assert gdf._obj == "d" -            assert gdf.data_file == the_dir -            assert gdf._is_dir -            assert gdf.fp is None -            assert gdf.metadata == exp_md -        finally: -            shutil.rmtree(td) - -    def test_constructor_keep_data_fp(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg, keep_data_fp=True) -            assert gdf._obj == "z" -            assert gdf.data_file == the_file -            assert not gdf._is_dir -            assert gdf.fp is not None -        finally: -            shutil.rmtree(td) - -    def test_constructor_chunk_size(self): -        assert not os.path.exists("/tmp/foo") -        gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", -                               "z", self.lg, disk_chunk_size=8192) -        assert gdf.disk_chunk_size == 8192 - -    def test_constructor_iter_hook(self): -        assert not os.path.exists("/tmp/foo") -        gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", -                               "z", self.lg, iter_hook='hook') -        assert gdf.iter_hook == 'hook' - -    def test_close(self): -        assert not os.path.exists("/tmp/foo") -        gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", -                               "z", self.lg) -        # Should be a no-op, as by default is_dir is False, but fp is None -        gdf.close() - -        gdf._is_dir = True -        gdf.fp = "123" -        # Should still be a no-op as is_dir is True (marker directory) -        gdf.close() -        assert gdf.fp == "123" - -        gdf._is_dir = False -        saved_dc = gluster.swift.common.DiskFile.do_close -        self.called = False -        def our_do_close(fp): -            self.called = True -        gluster.swift.common.DiskFile.do_close = our_do_close -        try: -            gdf.close() -            assert self.called -            assert gdf.fp is None -        finally: -            gluster.swift.common.DiskFile.do_close = saved_dc - -    def test_is_deleted(self): -        assert not os.path.exists("/tmp/foo") -        gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", -                               "z", self.lg) -        assert gdf.is_deleted() -        gdf.data_file = "/tmp/foo/bar" -        assert not gdf.is_deleted() - -    def test_create_dir_object(self): -        td = tempfile.mkdtemp() -        the_dir = os.path.join(td, "vol0", "bar", "dir") -        try: -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "dir/z", self.lg) -            # Not created, dir object path is different, just checking -            assert gdf._obj == "z" -            gdf._create_dir_object(the_dir) -            assert os.path.isdir(the_dir) -            assert the_dir in _metadata -        finally: -            shutil.rmtree(td) - -    def test_create_dir_object_exists(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "dir") -        try: -            os.makedirs(the_path) -            with open(the_dir, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "dir/z", self.lg) -            # Not created, dir object path is different, just checking -            assert gdf._obj == "z" -            def _mock_do_chown(p, u, g): -                assert u == DEFAULT_UID -                assert g == DEFAULT_GID -            dc = gluster.swift.common.DiskFile.do_chown -            gluster.swift.common.DiskFile.do_chown = _mock_do_chown -            try: -                gdf._create_dir_object(the_dir) -            finally: -                gluster.swift.common.DiskFile.do_chown = dc -            assert os.path.isdir(the_dir) -            assert the_dir in _metadata -        finally: -            shutil.rmtree(td) - -    def test_put_metadata(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "z") -        try: -            os.makedirs(the_dir) -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            md = { 'Content-Type': 'application/octet-stream', 'a': 'b' } -            gdf.put_metadata(md.copy()) -            assert gdf.metadata == md, "gdf.metadata = %r, md = %r" % (gdf.metadata, md) -            assert _metadata[the_dir] == md -        finally: -            shutil.rmtree(td) - -    def test_put_w_tombstone(self): -        assert not os.path.exists("/tmp/foo") -        gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", -                               "z", self.lg) -        assert gdf.metadata == {} - -        gdf.put_metadata({'x': '1'}, tombstone=True) -        assert gdf.metadata == {} - -    def test_put_w_meta_file(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            newmd = gdf.metadata.copy() -            newmd['X-Object-Meta-test'] = '1234' -            gdf.put_metadata(newmd) -            assert gdf.metadata == newmd -            assert _metadata[the_file] == newmd -        finally: -            shutil.rmtree(td) - -    def test_put_w_meta_file_no_content_type(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            newmd = gdf.metadata.copy() -            newmd['Content-Type'] = '' -            newmd['X-Object-Meta-test'] = '1234' -            gdf.put_metadata(newmd) -            assert gdf.metadata == newmd -            assert _metadata[the_file] == newmd -        finally: -            shutil.rmtree(td) - -    def test_put_w_meta_dir(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "dir") -        try: -            os.makedirs(the_dir) -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "dir", self.lg) -            newmd = gdf.metadata.copy() -            newmd['X-Object-Meta-test'] = '1234' -            gdf.put_metadata(newmd) -            assert gdf.metadata == newmd -            assert _metadata[the_dir] == newmd -        finally: -            shutil.rmtree(td) - -    def test_put_w_marker_dir(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "dir") -        try: -            os.makedirs(the_dir) -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "dir", self.lg) -            newmd = gdf.metadata.copy() -            newmd['X-Object-Meta-test'] = '1234' -            gdf.put_metadata(newmd) -            assert gdf.metadata == newmd -            assert _metadata[the_dir] == newmd -        finally: -            shutil.rmtree(td) - -    def test_put_w_marker_dir_create(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "dir") -        try: -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "dir", self.lg) -            assert gdf.metadata == {} -            newmd = { -                'Content-Length': 0, -                'ETag': 'etag', -                'X-Timestamp': 'ts', -                'Content-Type': 'application/directory'} -            gdf.put(None, newmd, extension='.dir') -            assert gdf.data_file == the_dir -            assert gdf.metadata == newmd -            assert _metadata[the_dir] == newmd -        finally: -            shutil.rmtree(td) - -    def test_put_is_dir(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "dir") -        try: -            os.makedirs(the_dir) -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "dir", self.lg) -            origmd = gdf.metadata.copy() -            origfmd = _metadata[the_dir] -            newmd = gdf.metadata.copy() -            # FIXME: This is a hack to get to the code-path; it is not clear -            # how this can happen normally. -            newmd['Content-Type'] = '' -            newmd['X-Object-Meta-test'] = '1234' -            try: -                gdf.put(None, newmd, extension='.data') -            except AlreadyExistsAsDir: -                pass -            else: -                self.fail("Expected to encounter 'already-exists-as-dir' exception") -            assert gdf.metadata == origmd -            assert _metadata[the_dir] == origfmd -        finally: -            shutil.rmtree(td) - -    def test_put(self): -        td = tempfile.mkdtemp() -        try: -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            assert gdf._obj == "z" -            assert gdf._obj_path == "" -            assert gdf.name == "bar" -            assert gdf.datadir == os.path.join(td, "vol0", "bar") -            assert gdf.data_file is None - -            body = '1234\n' -            etag = md5() -            etag.update(body) -            etag = etag.hexdigest() -            metadata = { -                'X-Timestamp': '1234', -                'Content-Type': 'file', -                'ETag': etag, -                'Content-Length': '5', -                } - -            with gdf.mkstemp() as fd: -                assert gdf.tmppath is not None -                tmppath = gdf.tmppath -                os.write(fd, body) -                gdf.put(fd, metadata) - -            assert gdf.data_file == os.path.join(td, "vol0", "bar", "z") -            assert os.path.exists(gdf.data_file) -            assert not os.path.exists(tmppath) -        finally: -            shutil.rmtree(td) - -    def test_put_obj_path(self): -        the_obj_path = os.path.join("b", "a") -        the_file = os.path.join(the_obj_path, "z") -        td = tempfile.mkdtemp() -        try: -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   the_file, self.lg) -            assert gdf._obj == "z" -            assert gdf._obj_path == the_obj_path -            assert gdf.name == os.path.join("bar", "b", "a") -            assert gdf.datadir == os.path.join(td, "vol0", "bar", "b", "a") -            assert gdf.data_file is None - -            body = '1234\n' -            etag = md5() -            etag.update(body) -            etag = etag.hexdigest() -            metadata = { -                'X-Timestamp': '1234', -                'Content-Type': 'file', -                'ETag': etag, -                'Content-Length': '5', -                } - -            with gdf.mkstemp() as fd: -                assert gdf.tmppath is not None -                tmppath = gdf.tmppath -                os.write(fd, body) -                gdf.put(fd, metadata) - -            assert gdf.data_file == os.path.join(td, "vol0", "bar", "b", "a", "z") -            assert os.path.exists(gdf.data_file) -            assert not os.path.exists(tmppath) -        finally: -            shutil.rmtree(td) - -    def test_unlinkold_no_metadata(self): -        assert not os.path.exists("/tmp/foo") -        gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", -                               "z", self.lg) -        assert gdf.metadata == {} -        _saved_rmdirs = gluster.swift.common.DiskFile.rmdirs -        _saved_do_listdir = gluster.swift.common.DiskFile.do_listdir -        gluster.swift.common.DiskFile.rmdirs = _mock_rmdirs -        gluster.swift.common.DiskFile.do_listdir = _mock_do_listdir -        try: -            gdf.unlinkold(None) -        except MockException as exp: -            self.fail(str(exp)) -        finally: -            gluster.swift.common.DiskFile.rmdirs = _saved_rmdirs -            gluster.swift.common.DiskFile.do_listdir = _saved_do_listdir - -    def test_unlinkold_same_timestamp(self): -        assert not os.path.exists("/tmp/foo") -        gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", -                               "z", self.lg) -        assert gdf.metadata == {} -        gdf.metadata['X-Timestamp'] = 1 -        _saved_rmdirs = gluster.swift.common.DiskFile.rmdirs -        _saved_do_listdir = gluster.swift.common.DiskFile.do_listdir -        gluster.swift.common.DiskFile.rmdirs = _mock_rmdirs -        gluster.swift.common.DiskFile.do_listdir = _mock_do_listdir -        try: -            gdf.unlinkold(1) -        except MockException as exp: -            self.fail(str(exp)) -        finally: -            gluster.swift.common.DiskFile.rmdirs = _saved_rmdirs -            gluster.swift.common.DiskFile.do_listdir = _saved_do_listdir - -    def test_unlinkold_file(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            assert gdf._obj == "z" -            assert gdf.data_file == the_file -            assert not gdf._is_dir - -            later = float(gdf.metadata['X-Timestamp']) + 1 -            gdf.unlinkold(normalize_timestamp(later)) -            assert os.path.isdir(gdf.datadir) -            assert not os.path.exists(os.path.join(gdf.datadir, gdf._obj)) -        finally: -            shutil.rmtree(td) - -    def test_unlinkold_file_not_found(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            assert gdf._obj == "z" -            assert gdf.data_file == the_file -            assert not gdf._is_dir - -            # Handle the case the file is not in the directory listing. -            os.unlink(the_file) - -            later = float(gdf.metadata['X-Timestamp']) + 1 -            gdf.unlinkold(normalize_timestamp(later)) -            assert os.path.isdir(gdf.datadir) -            assert not os.path.exists(os.path.join(gdf.datadir, gdf._obj)) -        finally: -            shutil.rmtree(td) - -    def test_unlinkold_file_unlink_error(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            assert gdf._obj == "z" -            assert gdf.data_file == the_file -            assert not gdf._is_dir - -            later = float(gdf.metadata['X-Timestamp']) + 1 - -            stats = os.stat(the_path) -            os.chmod(the_path, stats.st_mode & (~stat.S_IWUSR)) - -            # Handle the case do_unlink() raises an OSError -            __os_unlink = os.unlink -            os.unlink = _mock_do_unlink_eacces_err -            try: -                gdf.unlinkold(normalize_timestamp(later)) -            except OSError as e: -                assert e.errno == errno.EACCES -            else: -                self.fail("Excepted an OSError when unlinking file") -            finally: -                os.unlink = __os_unlink -                os.chmod(the_path, stats.st_mode) - -            assert os.path.isdir(gdf.datadir) -            assert os.path.exists(os.path.join(gdf.datadir, gdf._obj)) -        finally: -            shutil.rmtree(td) - -    def test_unlinkold_is_dir(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "d") -        try: -            os.makedirs(the_dir) -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "d", self.lg, keep_data_fp=True) -            assert gdf.data_file == the_dir -            assert gdf._is_dir - -            later = float(gdf.metadata['X-Timestamp']) + 1 -            gdf.unlinkold(normalize_timestamp(later)) -            assert os.path.isdir(gdf.datadir) -            assert not os.path.exists(os.path.join(gdf.datadir, gdf._obj)) -        finally: -            shutil.rmtree(td) - -    def test_unlinkold_is_dir_failure(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "d") -        try: -            os.makedirs(the_dir) -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "d", self.lg, keep_data_fp=True) -            assert gdf.data_file == the_dir -            assert gdf._is_dir - -            stats = os.stat(gdf.datadir) -            os.chmod(gdf.datadir, 0) -            __os_rmdir = os.rmdir -            os.rmdir = _mock_do_rmdir_eacces_err -            try: -                later = float(gdf.metadata['X-Timestamp']) + 1 -                gdf.unlinkold(normalize_timestamp(later)) -            finally: -                os.chmod(gdf.datadir, stats.st_mode) -                os.rmdir = __os_rmdir -            assert os.path.isdir(gdf.datadir) -            assert os.path.isdir(gdf.data_file) -        finally: -            shutil.rmtree(td) - -    def test_get_data_file_size(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            assert gdf._obj == "z" -            assert gdf.data_file == the_file -            assert not gdf._is_dir -            assert 4 == gdf.get_data_file_size() -        finally: -            shutil.rmtree(td) - -    def test_get_data_file_size(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            assert gdf._obj == "z" -            assert gdf.data_file == the_file -            assert not gdf._is_dir -            assert 4 == gdf.metadata['Content-Length'] -            gdf.metadata['Content-Length'] = 3 -            assert 4 == gdf.get_data_file_size() -            assert 4 == gdf.metadata['Content-Length'] -        finally: -            shutil.rmtree(td) - -    def test_get_data_file_size_dne(self): -        assert not os.path.exists("/tmp/foo") -        gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", -                               "/b/a/z/", self.lg) -        try: -            s = gdf.get_data_file_size() -        except DiskFileNotExist: -            pass -        else: -            self.fail("Expected DiskFileNotExist exception") - -    def test_get_data_file_size_dne_os_err(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            assert gdf._obj == "z" -            assert gdf.data_file == the_file -            assert not gdf._is_dir -            gdf.data_file = gdf.data_file + ".dne" -            try: -                s = gdf.get_data_file_size() -            except DiskFileNotExist: -                pass -            else: -                self.fail("Expected DiskFileNotExist exception") -        finally: -            shutil.rmtree(td) - -    def test_get_data_file_size_os_err(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_file = os.path.join(the_path, "z") -        try: -            os.makedirs(the_path) -            with open(the_file, "wb") as fd: -                fd.write("1234") -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "z", self.lg) -            assert gdf._obj == "z" -            assert gdf.data_file == the_file -            assert not gdf._is_dir -            stats = os.stat(the_path) -            os.chmod(the_path, 0) -            __os_path_getsize = os.path.getsize -            os.path.getsize = _mock_getsize_eaccess_err -            try: -                s = gdf.get_data_file_size() -            except OSError as err: -                assert err.errno == errno.EACCES -            else: -                self.fail("Expected OSError exception") -            finally: -                os.path.getsize = __os_path_getsize -                os.chmod(the_path, stats.st_mode) -        finally: -            shutil.rmtree(td) - -    def test_get_data_file_size_dir(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "d") -        try: -            os.makedirs(the_dir) -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "d", self.lg, keep_data_fp=True) -            assert gdf._obj == "d" -            assert gdf.data_file == the_dir -            assert gdf._is_dir -            assert 0 == gdf.get_data_file_size() -        finally: -            shutil.rmtree(td) - -    def test_filter_metadata(self): -        assert not os.path.exists("/tmp/foo") -        gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", -                               "z", self.lg) -        assert gdf.metadata == {} -        gdf.filter_metadata() -        assert gdf.metadata == {} - -        gdf.metadata[X_TYPE] = 'a' -        gdf.metadata[X_OBJECT_TYPE] = 'b' -        gdf.metadata['foobar'] = 'c' -        gdf.filter_metadata() -        assert X_TYPE not in gdf.metadata -        assert X_OBJECT_TYPE not in gdf.metadata -        assert 'foobar' in gdf.metadata - -    def test_mkstemp(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "dir") -        try: -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "dir/z", self.lg) -            saved_tmppath = '' -            with gdf.mkstemp() as fd: -                assert gdf.datadir == os.path.join(td, "vol0", "bar", "dir") -                assert os.path.isdir(gdf.datadir) -                saved_tmppath = gdf.tmppath -                assert os.path.dirname(saved_tmppath) == gdf.datadir -                assert os.path.basename(saved_tmppath)[:3] == '.z.' -                assert os.path.exists(saved_tmppath) -                os.write(fd, "123") -            assert not os.path.exists(saved_tmppath) -        finally: -            shutil.rmtree(td) - -    def test_mkstemp_err_on_close(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "dir") -        try: -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "dir/z", self.lg) -            saved_tmppath = '' -            with gdf.mkstemp() as fd: -                assert gdf.datadir == os.path.join(td, "vol0", "bar", "dir") -                assert os.path.isdir(gdf.datadir) -                saved_tmppath = gdf.tmppath -                assert os.path.dirname(saved_tmppath) == gdf.datadir -                assert os.path.basename(saved_tmppath)[:3] == '.z.' -                assert os.path.exists(saved_tmppath) -                os.write(fd, "123") -            # At the end of previous with block a close on fd is called. -            # Calling os.close on the same fd will raise an OSError -            # exception and we must catch it. -            try: -                os.close(fd) -            except OSError as err: -                pass -            else: -                self.fail("Exception expected") -            assert not os.path.exists(saved_tmppath) -        finally: -            shutil.rmtree(td) - -    def test_mkstemp_err_on_unlink(self): -        td = tempfile.mkdtemp() -        the_path = os.path.join(td, "vol0", "bar") -        the_dir = os.path.join(the_path, "dir") -        try: -            gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", -                                   "dir/z", self.lg) -            saved_tmppath = '' -            with gdf.mkstemp() as fd: -                assert gdf.datadir == os.path.join(td, "vol0", "bar", "dir") -                assert os.path.isdir(gdf.datadir) -                saved_tmppath = gdf.tmppath -                assert os.path.dirname(saved_tmppath) == gdf.datadir -                assert os.path.basename(saved_tmppath)[:3] == '.z.' -                assert os.path.exists(saved_tmppath) -                os.write(fd, "123") -                os.unlink(saved_tmppath) -            assert not os.path.exists(saved_tmppath) -        finally: -            shutil.rmtree(td) diff --git a/ufo/test/unit/common/test_fs_utils.py b/ufo/test/unit/common/test_fs_utils.py deleted file mode 100644 index 186e07d5..00000000 --- a/ufo/test/unit/common/test_fs_utils.py +++ /dev/null @@ -1,277 +0,0 @@ -# Copyright (c) 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -#    http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import shutil -import random -import unittest -from tempfile import mkdtemp, mkstemp -from gluster.swift.common import fs_utils as fs -from gluster.swift.common.exceptions import NotDirectoryError, \ -    FileOrDirNotFoundError - -class TestUtils(unittest.TestCase): -    """ Tests for common.utils """ - -    def test_do_walk(self): -        try: -            # create directory structure -            tmpparent = mkdtemp() -            tmpdirs = [] -            tmpfiles = [] -            for i in range(5): -                tmpdirs.append(mkdtemp(dir=tmpparent).rsplit(os.path.sep, 1)[1]) -                tmpfiles.append(mkstemp(dir=tmpparent)[1].rsplit(os.path.sep, \ -                                                                     1)[1]) - -                for path, dirnames, filenames in fs.do_walk(tmpparent): -                    assert path == tmpparent -                    assert dirnames.sort() == tmpdirs.sort() -                    assert filenames.sort() == tmpfiles.sort() -                    break -        finally: -            shutil.rmtree(tmpparent) - -    def test_do_open(self): -        try: -            fd, tmpfile = mkstemp() -            f = fs.do_open(tmpfile, 'r') -            try: -                f.write('test') -            except IOError as err: -                pass -            else: -                self.fail("IOError expected") -        finally: -            f.close() -            os.close(fd) -            os.remove(tmpfile) - -    def test_do_open_err(self): -        try: -            fs.do_open(os.path.join('/tmp', str(random.random())), 'r') -        except IOError: -            pass -        else: -            self.fail("IOError expected") - -    def test_do_write(self): -        try: -            fd, tmpfile = mkstemp() -            cnt = fs.do_write(fd, "test") -            assert cnt == len("test") -        finally: -            os.close(fd) -            os.remove(tmpfile) - -    def test_do_write_err(self): -        try: -            fd, tmpfile = mkstemp() -            fd1 = os.open(tmpfile, os.O_RDONLY) -            fs.do_write(fd1, "test") -        except OSError: -            pass -        else: -            self.fail("OSError expected") -        finally: -            os.close(fd) -            os.close(fd1) - -    def test_do_mkdir(self): -        try: -            path = os.path.join('/tmp', str(random.random())) -            fs.do_mkdir(path) -            assert os.path.exists(path) -            assert fs.do_mkdir(path) -        finally: -            os.rmdir(path) - -    def test_do_mkdir_err(self): -        try: -            path = os.path.join('/tmp', str(random.random()), str(random.random())) -            fs.do_mkdir(path) -        except OSError: -            pass -        else: -            self.fail("OSError expected") - - -    def test_do_makedirs(self): -        try: -            subdir = os.path.join('/tmp', str(random.random())) -            path = os.path.join(subdir, str(random.random())) -            fs.do_makedirs(path) -            assert os.path.exists(path) -            assert fs.do_makedirs(path) -        finally: -            shutil.rmtree(subdir) - -    def test_do_listdir(self): -        try: -            tmpdir = mkdtemp() -            subdir = [] -            for i in range(5): -                subdir.append(mkdtemp(dir=tmpdir).rsplit(os.path.sep, 1)[1]) - -            assert subdir.sort() == fs.do_listdir(tmpdir).sort() -        finally: -            shutil.rmtree(tmpdir) - -    def test_do_listdir_err(self): -        try: -            path = os.path.join('/tmp', str(random.random())) -            fs.do_listdir(path) -        except OSError: -            pass -        else: -            self.fail("OSError expected") - -    def test_do_stat(self): -        try: -            tmpdir = mkdtemp() -            fd, tmpfile = mkstemp(dir=tmpdir) -            buf1 = os.stat(tmpfile) -            buf2 = fs.do_stat(fd) -            buf3 = fs.do_stat(tmpfile) - -            assert buf1 == buf2 -            assert buf1 == buf3 -        finally: -            os.close(fd) -            os.remove(tmpfile) -            os.rmdir(tmpdir) - -    def test_do_stat_err(self): -        try: -            fs.do_stat(os.path.join('/tmp', str(random.random()))) -        except OSError: -            pass -        else: -            self.fail("OSError expected") - -    def test_do_close(self): -        try: -            fd, tmpfile = mkstemp() -            fs.do_close(fd); -            try: -                os.write(fd, "test") -            except OSError: -                pass -            else: -                self.fail("OSError expected") -            fp = open(tmpfile) -            fs.do_close(fp) -        finally: -            os.remove(tmpfile) - -    def test_do_unlink(self): -        try: -            fd, tmpfile = mkstemp() -            fs.do_unlink(tmpfile) -            assert not os.path.exists(tmpfile) -            assert fs.do_unlink(os.path.join('/tmp', str(random.random()))) -        finally: -            os.close(fd) - -    def test_do_unlink_err(self): -        try: -            tmpdir = mkdtemp() -            fs.do_unlink(tmpdir) -        except OSError: -            pass -        else: -            self.fail('OSError expected') -        finally: -            os.rmdir(tmpdir) - -    def test_do_rmdir(self): -        tmpdir = mkdtemp() -        fs.do_rmdir(tmpdir) -        assert not os.path.exists(tmpdir) -        assert not fs.do_rmdir(os.path.join('/tmp', str(random.random()))) - -    def test_do_rmdir_err(self): -        try: -            fd, tmpfile = mkstemp() -            fs.do_rmdir(tmpfile) -        except OSError: -            pass -        else: -            self.fail('OSError expected') -        finally: -            os.close(fd) -            os.remove(tmpfile) - -    def test_do_rename(self): -        try: -            srcpath = mkdtemp() -            destpath = os.path.join('/tmp', str(random.random())) -            fs.do_rename(srcpath, destpath) -            assert not os.path.exists(srcpath) -            assert os.path.exists(destpath) -        finally: -            os.rmdir(destpath) - -    def test_do_rename_err(self): -        try: -            srcpath = os.path.join('/tmp', str(random.random())) -            destpath = os.path.join('/tmp', str(random.random())) -            fs.do_rename(srcpath, destpath) -        except OSError: -            pass -        else: -            self.fail("OSError expected") - -    def test_dir_empty(self): -        try: -            tmpdir = mkdtemp() -            subdir = mkdtemp(dir=tmpdir) -            assert not fs.dir_empty(tmpdir) -            assert fs.dir_empty(subdir) -        finally: -            shutil.rmtree(tmpdir) - -    def test_dir_empty_err(self): -        try: -            try: -                assert fs.dir_empty(os.path.join('/tmp', str(random.random()))) -            except FileOrDirNotFoundError: -                pass -            else: -                self.fail("FileOrDirNotFoundError exception expected") - -            fd, tmpfile = mkstemp() -            try: -                fs.dir_empty(tmpfile) -            except NotDirectoryError: -                pass -            else: -                self.fail("NotDirectoryError exception expected") -        finally: -            os.close(fd) -            os.unlink(tmpfile) - -    def test_rmdirs(self): -        try: -            tmpdir = mkdtemp() -            subdir = mkdtemp(dir=tmpdir) -            fd, tmpfile = mkstemp(dir=tmpdir) -            assert not fs.rmdirs(tmpfile) -            assert not fs.rmdirs(tmpdir) -            assert fs.rmdirs(subdir) -            assert not os.path.exists(subdir) -        finally: -            os.close(fd) -            shutil.rmtree(tmpdir) diff --git a/ufo/test/unit/common/test_ring.py b/ufo/test/unit/common/test_ring.py deleted file mode 100644 index 8b7509cc..00000000 --- a/ufo/test/unit/common/test_ring.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright (c) 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -#    http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import unittest -import gluster.swift.common.constraints -import swift.common.utils -from gluster.swift.common.ring import Ring - - -class TestRing(unittest.TestCase): -    """ Tests for common.utils """ - -    def setUp(self): -        swift.common.utils.HASH_PATH_SUFFIX = 'endcap' -        swiftdir = os.path.join(os.getcwd(), "common", "data") -        self.ring = Ring(swiftdir, ring_name='object') - -    def test_first_device(self): -        part, node = self.ring.get_nodes('test') -        assert node[0]['device'] == 'test' -        node = self.ring.get_part_nodes(0) -        assert node[0]['device'] == 'test' -        for node in self.ring.get_more_nodes(0): -            assert node['device'] == 'volume_not_in_ring' - -    def test_invalid_device(self): -        part, node = self.ring.get_nodes('test2') -        assert node[0]['device'] == 'volume_not_in_ring' -        node = self.ring.get_part_nodes(0) -        assert node[0]['device'] == 'volume_not_in_ring' - -    def test_second_device(self): -        part, node = self.ring.get_nodes('iops') -        assert node[0]['device'] == 'iops' -        node = self.ring.get_part_nodes(0) -        assert node[0]['device'] == 'iops' -        for node in self.ring.get_more_nodes(0): -            assert node['device'] == 'volume_not_in_ring' - -    def test_second_device_with_reseller_prefix(self): -        part, node = self.ring.get_nodes('AUTH_iops') -        assert node[0]['device'] == 'iops' diff --git a/ufo/test/unit/common/test_utils.py b/ufo/test/unit/common/test_utils.py deleted file mode 100644 index c645509f..00000000 --- a/ufo/test/unit/common/test_utils.py +++ /dev/null @@ -1,1020 +0,0 @@ -# Copyright (c) 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -#    http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" Tests for common.utils """ - -import os -import unittest -import errno -import xattr -import cPickle as pickle -import tempfile -import hashlib -import tarfile -import shutil -from collections import defaultdict -from swift.common.utils import normalize_timestamp -from gluster.swift.common import utils, Glusterfs - -# -# Somewhat hacky way of emulating the operation of xattr calls. They are made -# against a dictionary that stores the xattr key/value pairs. -# -_xattrs = {} -_xattr_op_cnt = defaultdict(int) -_xattr_set_err = {} -_xattr_get_err = {} -_xattr_rem_err = {} - -def _xkey(path, key): -    return "%s:%s" % (path, key) - -def _setxattr(path, key, value, *args, **kwargs): -    _xattr_op_cnt['set'] += 1 -    xkey = _xkey(path, key) -    if xkey in _xattr_set_err: -        e = IOError() -        e.errno = _xattr_set_err[xkey] -        raise e -    global _xattrs -    _xattrs[xkey] = value - -def _getxattr(path, key, *args, **kwargs): -    _xattr_op_cnt['get'] += 1 -    xkey = _xkey(path, key) -    if xkey in _xattr_get_err: -        e = IOError() -        e.errno = _xattr_get_err[xkey] -        raise e -    global _xattrs -    if xkey in _xattrs: -        ret_val = _xattrs[xkey] -    else: -        e = IOError("Fake IOError") -        e.errno = errno.ENODATA -        raise e -    return ret_val - -def _removexattr(path, key, *args, **kwargs): -    _xattr_op_cnt['remove'] += 1 -    xkey = _xkey(path, key) -    if xkey in _xattr_rem_err: -        e = IOError() -        e.errno = _xattr_rem_err[xkey] -        raise e -    global _xattrs -    if xkey in _xattrs: -        del _xattrs[xkey] -    else: -        e = IOError("Fake IOError") -        e.errno = errno.ENODATA -        raise e - -def _initxattr(): -    global _xattrs -    _xattrs = {} -    global _xattr_op_cnt -    _xattr_op_cnt = defaultdict(int) -    global _xattr_set_err, _xattr_get_err, _xattr_rem_err -    _xattr_set_err = {} -    _xattr_get_err = {} -    _xattr_rem_err = {} - -    # Save the current methods -    global _xattr_set;    _xattr_set    = xattr.setxattr -    global _xattr_get;    _xattr_get    = xattr.getxattr -    global _xattr_remove; _xattr_remove = xattr.removexattr - -    # Monkey patch the calls we use with our internal unit test versions -    xattr.setxattr    = _setxattr -    xattr.getxattr    = _getxattr -    xattr.removexattr = _removexattr - -def _destroyxattr(): -    # Restore the current methods just in case -    global _xattr_set;    xattr.setxattr    = _xattr_set -    global _xattr_get;    xattr.getxattr    = _xattr_get -    global _xattr_remove; xattr.removexattr = _xattr_remove -    # Destroy the stored values and -    global _xattrs; _xattrs = None - - -class SimMemcache(object): -    def __init__(self): -        self._d = {} - -    def get(self, key): -        return self._d.get(key, None) - -    def set(self, key, value): -        self._d[key] = value - - -class TestUtils(unittest.TestCase): -    """ Tests for common.utils """ - -    def setUp(self): -        _initxattr() - -    def tearDown(self): -        _destroyxattr() - -    def test_write_metadata(self): -        path = "/tmp/foo/w" -        orig_d = { 'bar' : 'foo' } -        utils.write_metadata(path, orig_d) -        xkey = _xkey(path, utils.METADATA_KEY) -        assert len(_xattrs.keys()) == 1 -        assert xkey in _xattrs -        assert orig_d == pickle.loads(_xattrs[xkey]) -        assert _xattr_op_cnt['set'] == 1 - -    def test_write_metadata_err(self): -        path = "/tmp/foo/w" -        orig_d = { 'bar' : 'foo' } -        xkey = _xkey(path, utils.METADATA_KEY) -        _xattr_set_err[xkey] = errno.EOPNOTSUPP -        try: -            utils.write_metadata(path, orig_d) -        except IOError as e: -            assert e.errno == errno.EOPNOTSUPP -            assert len(_xattrs.keys()) == 0 -            assert _xattr_op_cnt['set'] == 1 -        else: -            self.fail("Expected an IOError exception on write") - -    def test_write_metadata_multiple(self): -        # At 64 KB an xattr key/value pair, this should generate three keys. -        path = "/tmp/foo/w" -        orig_d = { 'bar' : 'x' * 150000 } -        utils.write_metadata(path, orig_d) -        assert len(_xattrs.keys()) == 3, "Expected 3 keys, found %d" % len(_xattrs.keys()) -        payload = '' -        for i in range(0,3): -            xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or '')) -            assert xkey in _xattrs -            assert len(_xattrs[xkey]) <= utils.MAX_XATTR_SIZE -            payload += _xattrs[xkey] -        assert orig_d == pickle.loads(payload) -        assert _xattr_op_cnt['set'] == 3, "%r" % _xattr_op_cnt - -    def test_clean_metadata(self): -        path = "/tmp/foo/c" -        expected_d = { 'a': 'y' * 150000 } -        expected_p = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL) -        for i in range(0,3): -            xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or '')) -            _xattrs[xkey] = expected_p[:utils.MAX_XATTR_SIZE] -            expected_p = expected_p[utils.MAX_XATTR_SIZE:] -        assert not expected_p -        utils.clean_metadata(path) -        assert _xattr_op_cnt['remove'] == 4, "%r" % _xattr_op_cnt - -    def test_clean_metadata_err(self): -        path = "/tmp/foo/c" -        xkey = _xkey(path, utils.METADATA_KEY) -        _xattrs[xkey] = pickle.dumps({ 'a': 'y' }, utils.PICKLE_PROTOCOL) -        _xattr_rem_err[xkey] = errno.EOPNOTSUPP -        try: -            utils.clean_metadata(path) -        except IOError as e: -            assert e.errno == errno.EOPNOTSUPP -            assert _xattr_op_cnt['remove'] == 1, "%r" % _xattr_op_cnt -        else: -            self.fail("Expected an IOError exception on remove") - -    def test_read_metadata(self): -        path = "/tmp/foo/r" -        expected_d = { 'a': 'y' } -        xkey = _xkey(path, utils.METADATA_KEY) -        _xattrs[xkey] = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL) -        res_d = utils.read_metadata(path) -        assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d) -        assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt - -    def test_read_metadata_notfound(self): -        path = "/tmp/foo/r" -        res_d = utils.read_metadata(path) -        assert res_d == {} -        assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt - -    def test_read_metadata_err(self): -        path = "/tmp/foo/r" -        expected_d = { 'a': 'y' } -        xkey = _xkey(path, utils.METADATA_KEY) -        _xattrs[xkey] = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL) -        _xattr_get_err[xkey] = errno.EOPNOTSUPP -        try: -            res_d = utils.read_metadata(path) -        except IOError as e: -            assert e.errno == errno.EOPNOTSUPP -            assert (_xattr_op_cnt['get'] == 1), "%r" % _xattr_op_cnt -        else: -            self.fail("Expected an IOError exception on get") - -    def test_read_metadata_multiple(self): -        path = "/tmp/foo/r" -        expected_d = { 'a': 'y' * 150000 } -        expected_p = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL) -        for i in range(0,3): -            xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or '')) -            _xattrs[xkey] = expected_p[:utils.MAX_XATTR_SIZE] -            expected_p = expected_p[utils.MAX_XATTR_SIZE:] -        assert not expected_p -        res_d = utils.read_metadata(path) -        assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d) -        assert _xattr_op_cnt['get'] == 3, "%r" % _xattr_op_cnt - -    def test_read_metadata_multiple_one_missing(self): -        path = "/tmp/foo/r" -        expected_d = { 'a': 'y' * 150000 } -        expected_p = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL) -        for i in range(0,2): -            xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or '')) -            _xattrs[xkey] = expected_p[:utils.MAX_XATTR_SIZE] -            expected_p = expected_p[utils.MAX_XATTR_SIZE:] -        assert len(expected_p) <= utils.MAX_XATTR_SIZE -        res_d = utils.read_metadata(path) -        assert res_d == {} -        assert _xattr_op_cnt['get'] == 3, "%r" % _xattr_op_cnt -        assert len(_xattrs.keys()) == 0, "Expected 0 keys, found %d" % len(_xattrs.keys()) - -    def test_restore_metadata_none(self): -        # No initial metadata -        path = "/tmp/foo/i" -        res_d = utils.restore_metadata(path, { 'b': 'y' }) -        expected_d = { 'b': 'y' } -        assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d) -        assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt -        assert _xattr_op_cnt['set'] == 1, "%r" % _xattr_op_cnt - -    def test_restore_metadata(self): -        # Initial metadata -        path = "/tmp/foo/i" -        initial_d = { 'a': 'z' } -        xkey = _xkey(path, utils.METADATA_KEY) -        _xattrs[xkey] = pickle.dumps(initial_d, utils.PICKLE_PROTOCOL) -        res_d = utils.restore_metadata(path, { 'b': 'y' }) -        expected_d = { 'a': 'z', 'b': 'y' } -        assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d) -        assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt -        assert _xattr_op_cnt['set'] == 1, "%r" % _xattr_op_cnt - -    def test_restore_metadata_nochange(self): -        # Initial metadata but no changes -        path = "/tmp/foo/i" -        initial_d = { 'a': 'z' } -        xkey = _xkey(path, utils.METADATA_KEY) -        _xattrs[xkey] = pickle.dumps(initial_d, utils.PICKLE_PROTOCOL) -        res_d = utils.restore_metadata(path, {}) -        expected_d = { 'a': 'z' } -        assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d) -        assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt -        assert _xattr_op_cnt['set'] == 0, "%r" % _xattr_op_cnt - -    def test_add_timestamp_empty(self): -        orig = {} -        res = utils._add_timestamp(orig) -        assert res == {} - -    def test_add_timestamp_none(self): -        orig = { 'a': 1, 'b': 2, 'c': 3 } -        exp = { 'a': (1, 0), 'b': (2, 0), 'c': (3, 0) } -        res = utils._add_timestamp(orig) -        assert res == exp - -    def test_add_timestamp_mixed(self): -        orig = { 'a': 1, 'b': (2, 1), 'c': 3 } -        exp = { 'a': (1, 0), 'b': (2, 1), 'c': (3, 0) } -        res = utils._add_timestamp(orig) -        assert res == exp - -    def test_add_timestamp_all(self): -        orig = { 'a': (1, 0), 'b': (2, 1), 'c': (3, 0) } -        res = utils._add_timestamp(orig) -        assert res == orig - -    def test_get_etag_empty(self): -        tf = tempfile.NamedTemporaryFile() -        hd = utils._get_etag(tf.name) -        assert hd == hashlib.md5().hexdigest() - -    def test_get_etag(self): -        tf = tempfile.NamedTemporaryFile() -        tf.file.write('123' * utils.CHUNK_SIZE) -        tf.file.flush() -        hd = utils._get_etag(tf.name) -        tf.file.seek(0) -        md5 = hashlib.md5() -        while True: -            chunk = tf.file.read(utils.CHUNK_SIZE) -            if not chunk: -                break -            md5.update(chunk) -        assert hd == md5.hexdigest() - -    def test_get_object_metadata_dne(self): -        md = utils.get_object_metadata("/tmp/doesNotEx1st") -        assert md == {} - -    def test_get_object_metadata_err(self): -        tf = tempfile.NamedTemporaryFile() -        try: -            md = utils.get_object_metadata(os.path.join(tf.name,"doesNotEx1st")) -        except OSError as e: -            assert e.errno != errno.ENOENT -        else: -            self.fail("Expected exception") - -    obj_keys = (utils.X_TIMESTAMP, utils.X_CONTENT_TYPE, utils.X_ETAG, -                utils.X_CONTENT_LENGTH, utils.X_TYPE, utils.X_OBJECT_TYPE) - -    def test_get_object_metadata_file(self): -        tf = tempfile.NamedTemporaryFile() -        tf.file.write('123'); tf.file.flush() -        md = utils.get_object_metadata(tf.name) -        for key in self.obj_keys: -            assert key in md, "Expected key %s in %r" % (key, md) -        assert md[utils.X_TYPE] == utils.OBJECT -        assert md[utils.X_OBJECT_TYPE] == utils.FILE -        assert md[utils.X_CONTENT_TYPE] == utils.FILE_TYPE -        assert md[utils.X_CONTENT_LENGTH] == os.path.getsize(tf.name) -        assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(tf.name)) -        assert md[utils.X_ETAG] == utils._get_etag(tf.name) - -    def test_get_object_metadata_dir(self): -        td = tempfile.mkdtemp() -        try: -            md = utils.get_object_metadata(td) -            for key in self.obj_keys: -                assert key in md, "Expected key %s in %r" % (key, md) -            assert md[utils.X_TYPE] == utils.OBJECT -            assert md[utils.X_OBJECT_TYPE] == utils.DIR -            assert md[utils.X_CONTENT_TYPE] == utils.DIR_TYPE -            assert md[utils.X_CONTENT_LENGTH] == 0 -            assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(td)) -            assert md[utils.X_ETAG] == hashlib.md5().hexdigest() -        finally: -            os.rmdir(td) - -    def test_create_object_metadata_file(self): -        tf = tempfile.NamedTemporaryFile() -        tf.file.write('4567'); tf.file.flush() -        r_md = utils.create_object_metadata(tf.name) - -        xkey = _xkey(tf.name, utils.METADATA_KEY) -        assert len(_xattrs.keys()) == 1 -        assert xkey in _xattrs -        assert _xattr_op_cnt['get'] == 1 -        assert _xattr_op_cnt['set'] == 1 -        md = pickle.loads(_xattrs[xkey]) -        assert r_md == md - -        for key in self.obj_keys: -            assert key in md, "Expected key %s in %r" % (key, md) -        assert md[utils.X_TYPE] == utils.OBJECT -        assert md[utils.X_OBJECT_TYPE] == utils.FILE -        assert md[utils.X_CONTENT_TYPE] == utils.FILE_TYPE -        assert md[utils.X_CONTENT_LENGTH] == os.path.getsize(tf.name) -        assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(tf.name)) -        assert md[utils.X_ETAG] == utils._get_etag(tf.name) - -    def test_create_object_metadata_dir(self): -        td = tempfile.mkdtemp() -        try: -            r_md = utils.create_object_metadata(td) - -            xkey = _xkey(td, utils.METADATA_KEY) -            assert len(_xattrs.keys()) == 1 -            assert xkey in _xattrs -            assert _xattr_op_cnt['get'] == 1 -            assert _xattr_op_cnt['set'] == 1 -            md = pickle.loads(_xattrs[xkey]) -            assert r_md == md - -            for key in self.obj_keys: -                assert key in md, "Expected key %s in %r" % (key, md) -            assert md[utils.X_TYPE] == utils.OBJECT -            assert md[utils.X_OBJECT_TYPE] == utils.DIR -            assert md[utils.X_CONTENT_TYPE] == utils.DIR_TYPE -            assert md[utils.X_CONTENT_LENGTH] == 0 -            assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(td)) -            assert md[utils.X_ETAG] == hashlib.md5().hexdigest() -        finally: -            os.rmdir(td) - -    def test_get_container_metadata(self): -        def _mock_get_container_details(path, memcache=None): -            o_list = [ 'a', 'b', 'c' ] -            o_count = 3 -            b_used = 47 -            return o_list, o_count, b_used -        orig_gcd = utils.get_container_details -        utils.get_container_details = _mock_get_container_details -        td = tempfile.mkdtemp() -        try: -            exp_md = { -                utils.X_TYPE: (utils.CONTAINER, 0), -                utils.X_TIMESTAMP: (normalize_timestamp(os.path.getctime(td)), 0), -                utils.X_PUT_TIMESTAMP: (normalize_timestamp(os.path.getmtime(td)), 0), -                utils.X_OBJECTS_COUNT: (3, 0), -                utils.X_BYTES_USED: (47, 0), -                } -            md = utils.get_container_metadata(td) -            assert md == exp_md -        finally: -            utils.get_container_details = orig_gcd -            os.rmdir(td) - -    def test_get_account_metadata(self): -        def _mock_get_account_details(path, memcache=None): -            c_list = [ '123', 'abc' ] -            c_count = 2 -            return c_list, c_count -        orig_gad = utils.get_account_details -        utils.get_account_details = _mock_get_account_details -        td = tempfile.mkdtemp() -        try: -            exp_md = { -                utils.X_TYPE: (utils.ACCOUNT, 0), -                utils.X_TIMESTAMP: (normalize_timestamp(os.path.getctime(td)), 0), -                utils.X_PUT_TIMESTAMP: (normalize_timestamp(os.path.getmtime(td)), 0), -                utils.X_OBJECTS_COUNT: (0, 0), -                utils.X_BYTES_USED: (0, 0), -                utils.X_CONTAINER_COUNT: (2, 0), -                } -            md = utils.get_account_metadata(td) -            assert md == exp_md -        finally: -            utils.get_account_details = orig_gad -            os.rmdir(td) - -    cont_keys = [utils.X_TYPE, utils.X_TIMESTAMP, utils.X_PUT_TIMESTAMP, -                 utils.X_OBJECTS_COUNT, utils.X_BYTES_USED] - -    def test_create_container_metadata(self): -        td = tempfile.mkdtemp() -        try: -            r_md = utils.create_container_metadata(td) - -            xkey = _xkey(td, utils.METADATA_KEY) -            assert len(_xattrs.keys()) == 1 -            assert xkey in _xattrs -            assert _xattr_op_cnt['get'] == 1 -            assert _xattr_op_cnt['set'] == 1 -            md = pickle.loads(_xattrs[xkey]) -            assert r_md == md - -            for key in self.cont_keys: -                assert key in md, "Expected key %s in %r" % (key, md) -            assert md[utils.X_TYPE] == (utils.CONTAINER, 0) -            assert md[utils.X_TIMESTAMP] == (normalize_timestamp(os.path.getctime(td)), 0) -            assert md[utils.X_PUT_TIMESTAMP] == (normalize_timestamp(os.path.getmtime(td)), 0) -            assert md[utils.X_OBJECTS_COUNT] == (0, 0) -            assert md[utils.X_BYTES_USED] == (0, 0) -        finally: -            os.rmdir(td) - -    acct_keys = [val for val in cont_keys] -    acct_keys.append(utils.X_CONTAINER_COUNT) - -    def test_create_account_metadata(self): -        td = tempfile.mkdtemp() -        try: -            r_md = utils.create_account_metadata(td) - -            xkey = _xkey(td, utils.METADATA_KEY) -            assert len(_xattrs.keys()) == 1 -            assert xkey in _xattrs -            assert _xattr_op_cnt['get'] == 1 -            assert _xattr_op_cnt['set'] == 1 -            md = pickle.loads(_xattrs[xkey]) -            assert r_md == md - -            for key in self.acct_keys: -                assert key in md, "Expected key %s in %r" % (key, md) -            assert md[utils.X_TYPE] == (utils.ACCOUNT, 0) -            assert md[utils.X_TIMESTAMP] == (normalize_timestamp(os.path.getctime(td)), 0) -            assert md[utils.X_PUT_TIMESTAMP] == (normalize_timestamp(os.path.getmtime(td)), 0) -            assert md[utils.X_OBJECTS_COUNT] == (0, 0) -            assert md[utils.X_BYTES_USED] == (0, 0) -            assert md[utils.X_CONTAINER_COUNT] == (0, 0) -        finally: -            os.rmdir(td) - -    def test_container_details_uncached(self): -        the_path = "/tmp/bar" -        def mock_get_container_details_from_fs(cont_path): -            bu = 5 -            oc = 1 -            ol = ['foo',] -            dl = [('a',100),] -            return utils.ContainerDetails(bu, oc, ol, dl) -        orig_gcdff = utils._get_container_details_from_fs -        utils._get_container_details_from_fs = mock_get_container_details_from_fs -        try: -            retval = utils.get_container_details(the_path) -            cd = mock_get_container_details_from_fs(the_path) -            assert retval == (cd.obj_list, cd.object_count, cd.bytes_used) -        finally: -            utils._get_container_details_from_fs = orig_gcdff - -    def test_container_details_cached_hit(self): -        mc = SimMemcache() -        the_path = "/tmp/bar" -        def mock_get_container_details_from_fs(cont_path, bu_p=5): -            bu = bu_p -            oc = 1 -            ol = ['foo',] -            dl = [('a',100),] -            return utils.ContainerDetails(bu, oc, ol, dl) -        def mock_do_stat(path): -            class MockStat(object): -                def __init__(self, mtime): -                    self.st_mtime = mtime -            return MockStat(100) -        cd = mock_get_container_details_from_fs(the_path, bu_p=6) -        mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path, cd) -        orig_gcdff = utils._get_container_details_from_fs -        utils._get_container_details_from_fs = mock_get_container_details_from_fs -        orig_ds = utils.do_stat -        utils.do_stat = mock_do_stat -        try: -            retval = utils.get_container_details(the_path, memcache=mc) -            # If it did not properly use memcache, the default mocked version -            # of get details from fs would return 5 bytes used instead of the -            # 6 we specified above. -            cd = mock_get_container_details_from_fs(the_path, bu_p=6) -            assert retval == (cd.obj_list, cd.object_count, cd.bytes_used) -        finally: -            utils._get_container_details_from_fs = orig_gcdff -            utils.do_stat = orig_ds - -    def test_container_details_cached_miss_key(self): -        mc = SimMemcache() -        the_path = "/tmp/bar" -        def mock_get_container_details_from_fs(cont_path, bu_p=5): -            bu = bu_p -            oc = 1 -            ol = ['foo',] -            dl = [('a',100),] -            return utils.ContainerDetails(bu, oc, ol, dl) -        def mock_do_stat(path): -            # Be sure we don't miss due to mtimes not matching -            self.fail("do_stat should not have been called") -        cd = mock_get_container_details_from_fs(the_path + "u", bu_p=6) -        mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path + "u", cd) -        orig_gcdff = utils._get_container_details_from_fs -        utils._get_container_details_from_fs = mock_get_container_details_from_fs -        orig_ds = utils.do_stat -        utils.do_stat = mock_do_stat -        try: -            retval = utils.get_container_details(the_path, memcache=mc) -            cd = mock_get_container_details_from_fs(the_path) -            assert retval == (cd.obj_list, cd.object_count, cd.bytes_used) -            mkey = utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path -            assert mkey in mc._d -        finally: -            utils._get_container_details_from_fs = orig_gcdff -            utils.do_stat = orig_ds - -    def test_container_details_cached_miss_dir_list(self): -        mc = SimMemcache() -        the_path = "/tmp/bar" -        def mock_get_container_details_from_fs(cont_path, bu_p=5): -            bu = bu_p -            oc = 1 -            ol = ['foo',] -            dl = [] -            return utils.ContainerDetails(bu, oc, ol, dl) -        def mock_do_stat(path): -            # Be sure we don't miss due to mtimes not matching -            self.fail("do_stat should not have been called") -        cd = mock_get_container_details_from_fs(the_path, bu_p=6) -        mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path, cd) -        orig_gcdff = utils._get_container_details_from_fs -        utils._get_container_details_from_fs = mock_get_container_details_from_fs -        orig_ds = utils.do_stat -        utils.do_stat = mock_do_stat -        try: -            retval = utils.get_container_details(the_path, memcache=mc) -            cd = mock_get_container_details_from_fs(the_path) -            assert retval == (cd.obj_list, cd.object_count, cd.bytes_used) -            mkey = utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path -            assert mkey in mc._d -            assert 5 == mc._d[mkey].bytes_used -        finally: -            utils._get_container_details_from_fs = orig_gcdff -            utils.do_stat = orig_ds - -    def test_container_details_cached_miss_mtime(self): -        mc = SimMemcache() -        the_path = "/tmp/bar" -        def mock_get_container_details_from_fs(cont_path, bu_p=5): -            bu = bu_p -            oc = 1 -            ol = ['foo',] -            dl = [('a',100),] -            return utils.ContainerDetails(bu, oc, ol, dl) -        def mock_do_stat(path): -            # Be sure we miss due to mtimes not matching -            class MockStat(object): -                def __init__(self, mtime): -                    self.st_mtime = mtime -            return MockStat(200) -        cd = mock_get_container_details_from_fs(the_path, bu_p=6) -        mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path, cd) -        orig_gcdff = utils._get_container_details_from_fs -        utils._get_container_details_from_fs = mock_get_container_details_from_fs -        orig_ds = utils.do_stat -        utils.do_stat = mock_do_stat -        try: -            retval = utils.get_container_details(the_path, memcache=mc) -            cd = mock_get_container_details_from_fs(the_path) -            assert retval == (cd.obj_list, cd.object_count, cd.bytes_used) -            mkey = utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path -            assert mkey in mc._d -            assert 5 == mc._d[mkey].bytes_used -        finally: -            utils._get_container_details_from_fs = orig_gcdff -            utils.do_stat = orig_ds - -    def test_account_details_uncached(self): -        the_path = "/tmp/bar" -        def mock_get_account_details_from_fs(acc_path, acc_stats): -            mt = 100 -            cc = 2 -            cl = ['a', 'b'] -            return utils.AccountDetails(mt, cc, cl) -        orig_gcdff = utils._get_account_details_from_fs -        utils._get_account_details_from_fs = mock_get_account_details_from_fs -        try: -            retval = utils.get_account_details(the_path) -            ad = mock_get_account_details_from_fs(the_path, None) -            assert retval == (ad.container_list, ad.container_count) -        finally: -            utils._get_account_details_from_fs = orig_gcdff - -    def test_account_details_cached_hit(self): -        mc = SimMemcache() -        the_path = "/tmp/bar" -        def mock_get_account_details_from_fs(acc_path, acc_stats): -            mt = 100 -            cc = 2 -            cl = ['a', 'b'] -            return utils.AccountDetails(mt, cc, cl) -        def mock_do_stat(path): -            class MockStat(object): -                def __init__(self, mtime): -                    self.st_mtime = mtime -            return MockStat(100) -        ad = mock_get_account_details_from_fs(the_path, None) -        ad.container_list = ['x', 'y'] -        mc.set(utils.MEMCACHE_ACCOUNT_DETAILS_KEY_PREFIX + the_path, ad) -        orig_gcdff = utils._get_account_details_from_fs -        orig_ds = utils.do_stat -        utils._get_account_details_from_fs = mock_get_account_details_from_fs -        utils.do_stat = mock_do_stat -        try: -            retval = utils.get_account_details(the_path, memcache=mc) -            assert retval == (ad.container_list, ad.container_count) -            wrong_ad = mock_get_account_details_from_fs(the_path, None) -            assert wrong_ad != ad -        finally: -            utils._get_account_details_from_fs = orig_gcdff -            utils.do_stat = orig_ds - -    def test_account_details_cached_miss(self): -        mc = SimMemcache() -        the_path = "/tmp/bar" -        def mock_get_account_details_from_fs(acc_path, acc_stats): -            mt = 100 -            cc = 2 -            cl = ['a', 'b'] -            return utils.AccountDetails(mt, cc, cl) -        def mock_do_stat(path): -            class MockStat(object): -                def __init__(self, mtime): -                    self.st_mtime = mtime -            return MockStat(100) -        ad = mock_get_account_details_from_fs(the_path, None) -        ad.container_list = ['x', 'y'] -        mc.set(utils.MEMCACHE_ACCOUNT_DETAILS_KEY_PREFIX + the_path + 'u', ad) -        orig_gcdff = utils._get_account_details_from_fs -        orig_ds = utils.do_stat -        utils._get_account_details_from_fs = mock_get_account_details_from_fs -        utils.do_stat = mock_do_stat -        try: -            retval = utils.get_account_details(the_path, memcache=mc) -            correct_ad = mock_get_account_details_from_fs(the_path, None) -            assert retval == (correct_ad.container_list, correct_ad.container_count) -            assert correct_ad != ad -        finally: -            utils._get_account_details_from_fs = orig_gcdff -            utils.do_stat = orig_ds - -    def test_account_details_cached_miss_mtime(self): -        mc = SimMemcache() -        the_path = "/tmp/bar" -        def mock_get_account_details_from_fs(acc_path, acc_stats): -            mt = 100 -            cc = 2 -            cl = ['a', 'b'] -            return utils.AccountDetails(mt, cc, cl) -        def mock_do_stat(path): -            class MockStat(object): -                def __init__(self, mtime): -                    self.st_mtime = mtime -            return MockStat(100) -        ad = mock_get_account_details_from_fs(the_path, None) -        ad.container_list = ['x', 'y'] -        ad.mtime = 200 -        mc.set(utils.MEMCACHE_ACCOUNT_DETAILS_KEY_PREFIX + the_path, ad) -        orig_gcdff = utils._get_account_details_from_fs -        orig_ds = utils.do_stat -        utils._get_account_details_from_fs = mock_get_account_details_from_fs -        utils.do_stat = mock_do_stat -        try: -            retval = utils.get_account_details(the_path, memcache=mc) -            correct_ad = mock_get_account_details_from_fs(the_path, None) -            assert retval == (correct_ad.container_list, correct_ad.container_count) -            assert correct_ad != ad -        finally: -            utils._get_account_details_from_fs = orig_gcdff -            utils.do_stat = orig_ds - -    def test_get_account_details_from_fs(self): -        orig_cwd = os.getcwd() -        td = tempfile.mkdtemp() -        try: -            tf = tarfile.open("common/data/account_tree.tar.bz2", "r:bz2") -            os.chdir(td) -            tf.extractall() - -            ad = utils._get_account_details_from_fs(td, None) -            assert ad.mtime == os.path.getmtime(td) -            assert ad.container_count == 3 -            assert set(ad.container_list) == set(['c1', 'c2', 'c3']) -        finally: -            os.chdir(orig_cwd) -            shutil.rmtree(td) - -    def test_get_container_details_from_fs_notadir(self): -        tf = tempfile.NamedTemporaryFile() -        cd = utils._get_container_details_from_fs(tf.name) -        assert cd.bytes_used == 0 -        assert cd.object_count == 0 -        assert cd.obj_list == [] -        assert cd.dir_list == [] - -    def test_get_container_details_from_fs(self): -        orig_cwd = os.getcwd() -        td = tempfile.mkdtemp() -        try: -            tf = tarfile.open("common/data/container_tree.tar.bz2", "r:bz2") -            os.chdir(td) -            tf.extractall() - -            cd = utils._get_container_details_from_fs(td) -            assert cd.bytes_used == 0, repr(cd.bytes_used) -            assert cd.object_count == 8, repr(cd.object_count) -            assert set(cd.obj_list) == set(['file1', 'file3', 'file2', -                                   'dir3', 'dir1', 'dir2', -                                   'dir1/file1', 'dir1/file2' -                                   ]), repr(cd.obj_list) - -            full_dir1 = os.path.join(td, 'dir1') -            full_dir2 = os.path.join(td, 'dir2') -            full_dir3 = os.path.join(td, 'dir3') -            exp_dir_dict = { td:        os.path.getmtime(td), -                             full_dir1: os.path.getmtime(full_dir1), -                             full_dir2: os.path.getmtime(full_dir2), -                             full_dir3: os.path.getmtime(full_dir3), -                             } -            for d,m in cd.dir_list: -                assert d in exp_dir_dict -                assert exp_dir_dict[d] == m -        finally: -            os.chdir(orig_cwd) -            shutil.rmtree(td) - - -    def test_get_container_details_from_fs_do_getsize_true(self): -        orig_cwd = os.getcwd() -        td = tempfile.mkdtemp() -        try: -            tf = tarfile.open("common/data/container_tree.tar.bz2", "r:bz2") -            os.chdir(td) -            tf.extractall() - -            __do_getsize = Glusterfs._do_getsize -            Glusterfs._do_getsize = True - -            cd = utils._get_container_details_from_fs(td) -            assert cd.bytes_used == 30, repr(cd.bytes_used) -            assert cd.object_count == 8, repr(cd.object_count) -            assert set(cd.obj_list) == set(['file1', 'file3', 'file2', -                                   'dir3', 'dir1', 'dir2', -                                   'dir1/file1', 'dir1/file2' -                                   ]), repr(cd.obj_list) - -            full_dir1 = os.path.join(td, 'dir1') -            full_dir2 = os.path.join(td, 'dir2') -            full_dir3 = os.path.join(td, 'dir3') -            exp_dir_dict = { td:        os.path.getmtime(td), -                             full_dir1: os.path.getmtime(full_dir1), -                             full_dir2: os.path.getmtime(full_dir2), -                             full_dir3: os.path.getmtime(full_dir3), -                             } -            for d,m in cd.dir_list: -                assert d in exp_dir_dict -                assert exp_dir_dict[d] == m -        finally: -            Glusterfs._do_getsize = __do_getsize -            os.chdir(orig_cwd) -            shutil.rmtree(td) - -    def test_get_account_details_from_fs_notadir_w_stats(self): -        tf = tempfile.NamedTemporaryFile() -        ad = utils._get_account_details_from_fs(tf.name, os.stat(tf.name)) -        assert ad.mtime == os.path.getmtime(tf.name) -        assert ad.container_count == 0 -        assert ad.container_list == [] - -    def test_get_account_details_from_fs_notadir(self): -        tf = tempfile.NamedTemporaryFile() -        ad = utils._get_account_details_from_fs(tf.name, None) -        assert ad.mtime == os.path.getmtime(tf.name) -        assert ad.container_count == 0 -        assert ad.container_list == [] - -    def test_write_pickle(self): -        td = tempfile.mkdtemp() -        try: -            fpp = os.path.join(td, 'pp') -            utils.write_pickle('pickled peppers', fpp) -            with open(fpp, "rb") as f: -                contents = f.read() -            s = pickle.loads(contents) -            assert s == 'pickled peppers', repr(s) -        finally: -            shutil.rmtree(td) - -    def test_write_pickle_ignore_tmp(self): -        tf = tempfile.NamedTemporaryFile() -        td = tempfile.mkdtemp() -        try: -            fpp = os.path.join(td, 'pp') -            # Also test an explicity pickle protocol -            utils.write_pickle('pickled peppers', fpp, tmp=tf.name, pickle_protocol=2) -            with open(fpp, "rb") as f: -                contents = f.read() -            s = pickle.loads(contents) -            assert s == 'pickled peppers', repr(s) -            with open(tf.name, "rb") as f: -                contents = f.read() -            assert contents == '' -        finally: -            shutil.rmtree(td) - -    def test_check_user_xattr_bad_path(self): -        assert False == utils.check_user_xattr("/tmp/foo/bar/check/user/xattr") - -    def test_check_user_xattr_bad_set(self): -        td = tempfile.mkdtemp() -        xkey = _xkey(td, 'user.test.key1') -        _xattr_set_err[xkey] = errno.EOPNOTSUPP -        try: -            assert False == utils.check_user_xattr(td) -        except IOError: -            pass -        else: -            self.fail("Expected IOError") -        finally: -            shutil.rmtree(td) - -    def test_check_user_xattr_bad_remove(self): -        td = tempfile.mkdtemp() -        xkey = _xkey(td, 'user.test.key1') -        _xattr_rem_err[xkey] = errno.EOPNOTSUPP -        try: -            utils.check_user_xattr(td) -        except IOError: -            self.fail("Unexpected IOError") -        finally: -            shutil.rmtree(td) - -    def test_check_user_xattr(self): -        td = tempfile.mkdtemp() -        try: -            assert utils.check_user_xattr(td) -        finally: -            shutil.rmtree(td) - -    def test_validate_container_empty(self): -        ret = utils.validate_container({}) -        assert ret == False - -    def test_validate_container_missing_keys(self): -        ret = utils.validate_container({ 'foo': 'bar' }) -        assert ret == False - -    def test_validate_container_bad_type(self): -        md = { utils.X_TYPE: ('bad', 0), -               utils.X_TIMESTAMP: ('na', 0), -               utils.X_PUT_TIMESTAMP: ('na', 0), -               utils.X_OBJECTS_COUNT: ('na', 0), -               utils.X_BYTES_USED: ('na', 0) } -        ret = utils.validate_container(md) -        assert ret == False - -    def test_validate_container_good_type(self): -        md = { utils.X_TYPE: (utils.CONTAINER, 0), -               utils.X_TIMESTAMP: ('na', 0), -               utils.X_PUT_TIMESTAMP: ('na', 0), -               utils.X_OBJECTS_COUNT: ('na', 0), -               utils.X_BYTES_USED: ('na', 0) } -        ret = utils.validate_container(md) -        assert ret - -    def test_validate_account_empty(self): -        ret = utils.validate_account({}) -        assert ret == False - -    def test_validate_account_missing_keys(self): -        ret = utils.validate_account({ 'foo': 'bar' }) -        assert ret == False - -    def test_validate_account_bad_type(self): -        md = { utils.X_TYPE: ('bad', 0), -               utils.X_TIMESTAMP: ('na', 0), -               utils.X_PUT_TIMESTAMP: ('na', 0), -               utils.X_OBJECTS_COUNT: ('na', 0), -               utils.X_BYTES_USED: ('na', 0), -               utils.X_CONTAINER_COUNT: ('na', 0) } -        ret = utils.validate_account(md) -        assert ret == False - -    def test_validate_account_good_type(self): -        md = { utils.X_TYPE: (utils.ACCOUNT, 0), -               utils.X_TIMESTAMP: ('na', 0), -               utils.X_PUT_TIMESTAMP: ('na', 0), -               utils.X_OBJECTS_COUNT: ('na', 0), -               utils.X_BYTES_USED: ('na', 0), -               utils.X_CONTAINER_COUNT: ('na', 0) } -        ret = utils.validate_account(md) -        assert ret - -    def test_validate_object_empty(self): -        ret = utils.validate_object({}) -        assert ret == False - -    def test_validate_object_missing_keys(self): -        ret = utils.validate_object({ 'foo': 'bar' }) -        assert ret == False - -    def test_validate_object_bad_type(self): -        md = { utils.X_TIMESTAMP: 'na', -               utils.X_CONTENT_TYPE: 'na', -               utils.X_ETAG: 'bad', -               utils.X_CONTENT_LENGTH: 'na', -               utils.X_TYPE: 'bad', -               utils.X_OBJECT_TYPE: 'na' } -        ret = utils.validate_object(md) -        assert ret == False - -    def test_validate_object_good_type(self): -        md = { utils.X_TIMESTAMP: 'na', -               utils.X_CONTENT_TYPE: 'na', -               utils.X_ETAG: 'bad', -               utils.X_CONTENT_LENGTH: 'na', -               utils.X_TYPE: utils.OBJECT, -               utils.X_OBJECT_TYPE: 'na' } -        ret = utils.validate_object(md) -        assert ret - -    def test_is_marker_empty(self): -        assert False == utils.is_marker(None) - -    def test_is_marker_missing(self): -        assert False == utils.is_marker( { 'foo': 'bar' } ) - -    def test_is_marker_not_marker(self): -        md = { utils.X_OBJECT_TYPE: utils.DIR } -        assert False == utils.is_marker(md) - -    def test_is_marker(self): -        md = { utils.X_OBJECT_TYPE: utils.MARKER_DIR } -        assert utils.is_marker(md)  | 
