summaryrefslogtreecommitdiffstats
path: root/ufo/test/unit
diff options
context:
space:
mode:
Diffstat (limited to 'ufo/test/unit')
-rw-r--r--ufo/test/unit/__init__.py95
-rw-r--r--ufo/test/unit/common/__init__.py0
-rw-r--r--ufo/test/unit/common/data/README.rings3
-rw-r--r--ufo/test/unit/common/data/account.builderbin537 -> 0 bytes
-rw-r--r--ufo/test/unit/common/data/account.ring.gzbin183 -> 0 bytes
-rw-r--r--ufo/test/unit/common/data/account_tree.tar.bz2bin228 -> 0 bytes
-rw-r--r--ufo/test/unit/common/data/backups/1365124498.account.builderbin537 -> 0 bytes
-rw-r--r--ufo/test/unit/common/data/backups/1365124498.container.builderbin537 -> 0 bytes
-rw-r--r--ufo/test/unit/common/data/backups/1365124498.object.builderbin228 -> 0 bytes
-rw-r--r--ufo/test/unit/common/data/backups/1365124499.object.builderbin537 -> 0 bytes
-rw-r--r--ufo/test/unit/common/data/container.builderbin537 -> 0 bytes
-rw-r--r--ufo/test/unit/common/data/container.ring.gzbin185 -> 0 bytes
-rw-r--r--ufo/test/unit/common/data/container_tree.tar.bz2bin282 -> 0 bytes
-rw-r--r--ufo/test/unit/common/data/object.builderbin537 -> 0 bytes
-rw-r--r--ufo/test/unit/common/data/object.ring.gzbin182 -> 0 bytes
-rw-r--r--ufo/test/unit/common/test_Glusterfs.py95
-rw-r--r--ufo/test/unit/common/test_diskfile.py932
-rw-r--r--ufo/test/unit/common/test_fs_utils.py277
-rw-r--r--ufo/test/unit/common/test_ring.py55
-rw-r--r--ufo/test/unit/common/test_utils.py1020
20 files changed, 0 insertions, 2477 deletions
diff --git a/ufo/test/unit/__init__.py b/ufo/test/unit/__init__.py
deleted file mode 100644
index cb247643b..000000000
--- a/ufo/test/unit/__init__.py
+++ /dev/null
@@ -1,95 +0,0 @@
-""" Gluster Swift Unit Tests """
-
-import logging
-from collections import defaultdict
-from test import get_config
-from swift.common.utils import TRUE_VALUES
-
-
-class NullLoggingHandler(logging.Handler):
-
- def emit(self, record):
- pass
-
-
-class FakeLogger(object):
- # a thread safe logger
-
- def __init__(self, *args, **kwargs):
- self._clear()
- self.level = logging.NOTSET
- if 'facility' in kwargs:
- self.facility = kwargs['facility']
-
- def _clear(self):
- self.log_dict = defaultdict(list)
-
- def _store_in(store_name):
- def stub_fn(self, *args, **kwargs):
- self.log_dict[store_name].append((args, kwargs))
- return stub_fn
-
- error = _store_in('error')
- info = _store_in('info')
- warning = _store_in('warning')
- debug = _store_in('debug')
-
- def exception(self, *args, **kwargs):
- self.log_dict['exception'].append((args, kwargs, str(exc_info()[1])))
-
- # mock out the StatsD logging methods:
- increment = _store_in('increment')
- decrement = _store_in('decrement')
- timing = _store_in('timing')
- timing_since = _store_in('timing_since')
- update_stats = _store_in('update_stats')
- set_statsd_prefix = _store_in('set_statsd_prefix')
-
- def setFormatter(self, obj):
- self.formatter = obj
-
- def close(self):
- self._clear()
-
- def set_name(self, name):
- # don't touch _handlers
- self._name = name
-
- def acquire(self):
- pass
-
- def release(self):
- pass
-
- def createLock(self):
- pass
-
- def emit(self, record):
- pass
-
- def handle(self, record):
- pass
-
- def flush(self):
- pass
-
- def handleError(self, record):
- pass
-
-
-original_syslog_handler = logging.handlers.SysLogHandler
-
-
-def fake_syslog_handler():
- for attr in dir(original_syslog_handler):
- if attr.startswith('LOG'):
- setattr(FakeLogger, attr,
- copy.copy(getattr(logging.handlers.SysLogHandler, attr)))
- FakeLogger.priority_map = \
- copy.deepcopy(logging.handlers.SysLogHandler.priority_map)
-
- logging.handlers.SysLogHandler = FakeLogger
-
-
-if get_config('unit_test').get('fake_syslog', 'False').lower() in TRUE_VALUES:
- fake_syslog_handler()
diff --git a/ufo/test/unit/common/__init__.py b/ufo/test/unit/common/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/ufo/test/unit/common/__init__.py
+++ /dev/null
diff --git a/ufo/test/unit/common/data/README.rings b/ufo/test/unit/common/data/README.rings
deleted file mode 100644
index 6457501fb..000000000
--- a/ufo/test/unit/common/data/README.rings
+++ /dev/null
@@ -1,3 +0,0 @@
-The unit tests expect certain ring data built using the following command:
-
- ../../../../bin/gluster-swift-gen-builders test iops \ No newline at end of file
diff --git a/ufo/test/unit/common/data/account.builder b/ufo/test/unit/common/data/account.builder
deleted file mode 100644
index 090ba4b74..000000000
--- a/ufo/test/unit/common/data/account.builder
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/data/account.ring.gz b/ufo/test/unit/common/data/account.ring.gz
deleted file mode 100644
index 6d4c85474..000000000
--- a/ufo/test/unit/common/data/account.ring.gz
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/data/account_tree.tar.bz2 b/ufo/test/unit/common/data/account_tree.tar.bz2
deleted file mode 100644
index cb23e4dd7..000000000
--- a/ufo/test/unit/common/data/account_tree.tar.bz2
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/data/backups/1365124498.account.builder b/ufo/test/unit/common/data/backups/1365124498.account.builder
deleted file mode 100644
index 090ba4b74..000000000
--- a/ufo/test/unit/common/data/backups/1365124498.account.builder
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/data/backups/1365124498.container.builder b/ufo/test/unit/common/data/backups/1365124498.container.builder
deleted file mode 100644
index 733d27dd9..000000000
--- a/ufo/test/unit/common/data/backups/1365124498.container.builder
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/data/backups/1365124498.object.builder b/ufo/test/unit/common/data/backups/1365124498.object.builder
deleted file mode 100644
index ff877ec95..000000000
--- a/ufo/test/unit/common/data/backups/1365124498.object.builder
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/data/backups/1365124499.object.builder b/ufo/test/unit/common/data/backups/1365124499.object.builder
deleted file mode 100644
index 8b8cd6c1e..000000000
--- a/ufo/test/unit/common/data/backups/1365124499.object.builder
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/data/container.builder b/ufo/test/unit/common/data/container.builder
deleted file mode 100644
index 733d27dd9..000000000
--- a/ufo/test/unit/common/data/container.builder
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/data/container.ring.gz b/ufo/test/unit/common/data/container.ring.gz
deleted file mode 100644
index 592b84bab..000000000
--- a/ufo/test/unit/common/data/container.ring.gz
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/data/container_tree.tar.bz2 b/ufo/test/unit/common/data/container_tree.tar.bz2
deleted file mode 100644
index b4a149285..000000000
--- a/ufo/test/unit/common/data/container_tree.tar.bz2
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/data/object.builder b/ufo/test/unit/common/data/object.builder
deleted file mode 100644
index 8b8cd6c1e..000000000
--- a/ufo/test/unit/common/data/object.builder
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/data/object.ring.gz b/ufo/test/unit/common/data/object.ring.gz
deleted file mode 100644
index d2f7192b6..000000000
--- a/ufo/test/unit/common/data/object.ring.gz
+++ /dev/null
Binary files differ
diff --git a/ufo/test/unit/common/test_Glusterfs.py b/ufo/test/unit/common/test_Glusterfs.py
deleted file mode 100644
index 7de060aef..000000000
--- a/ufo/test/unit/common/test_Glusterfs.py
+++ /dev/null
@@ -1,95 +0,0 @@
-# Copyright (c) 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-import os, fcntl, errno, shutil
-from tempfile import mkdtemp
-import gluster.swift.common.Glusterfs as gfs
-
-def mock_os_path_ismount(path):
- return True
-
-def mock_get_export_list():
- return ['test', 'test2']
-
-def mock_os_system(cmd):
- return False
-
-def mock_fcntl_lockf(f, *a, **kw):
- raise IOError(errno.EAGAIN)
-
-def _init():
- global _RUN_DIR, _OS_SYSTEM, _FCNTL_LOCKF
- global _OS_PATH_ISMOUNT, __GET_EXPORT_LIST
-
- _RUN_DIR = gfs.RUN_DIR
- _OS_SYSTEM = os.system
- _FCNTL_LOCKF = fcntl.lockf
- _OS_PATH_ISMOUNT = os.path.ismount
- __GET_EXPORT_LIST = gfs._get_export_list
-
-def _init_mock_variables(tmpdir):
- os.system = mock_os_system
- os.path.ismount = mock_os_path_ismount
- gfs.RUN_DIR = os.path.join(tmpdir, 'var/run/swift')
- gfs._get_export_list = mock_get_export_list
-
-def _reset_mock_variables():
- gfs.RUN_DIR = _RUN_DIR
- gfs._get_export_list = __GET_EXPORT_LIST
-
- os.system = _OS_SYSTEM
- fcntl.lockf = _FCNTL_LOCKF
- os.path.ismount = _OS_PATH_ISMOUNT
-
-class TestGlusterfs(unittest.TestCase):
- """ Tests for common.GlusterFS """
-
- def setUp(self):
- _init()
-
- def test_mount(self):
- try:
- tmpdir = mkdtemp()
- root = os.path.join(tmpdir, 'mnt/gluster-object')
- drive = 'test'
-
- _init_mock_variables(tmpdir)
- assert gfs.mount(root, drive)
- finally:
- _reset_mock_variables()
- shutil.rmtree(tmpdir)
-
- def test_mount_egain(self):
- try:
- tmpdir = mkdtemp()
- root = os.path.join(tmpdir, 'mnt/gluster-object')
- drive = 'test'
-
- _init_mock_variables(tmpdir)
- assert gfs.mount(root, drive)
- fcntl.lockf = mock_fcntl_lockf
- assert gfs.mount(root, drive)
- finally:
- _reset_mock_variables()
- shutil.rmtree(tmpdir)
-
- def test_mount_get_export_list_err(self):
- gfs._get_export_list = mock_get_export_list
- assert not gfs.mount(None, 'drive')
- _reset_mock_variables()
-
- def tearDown(self):
- _reset_mock_variables()
diff --git a/ufo/test/unit/common/test_diskfile.py b/ufo/test/unit/common/test_diskfile.py
deleted file mode 100644
index 85d539a29..000000000
--- a/ufo/test/unit/common/test_diskfile.py
+++ /dev/null
@@ -1,932 +0,0 @@
-# Copyright (c) 2012 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-""" Tests for gluster.swift.common.DiskFile """
-
-import os
-import stat
-import errno
-import unittest
-import tempfile
-import shutil
-from hashlib import md5
-from swift.common.utils import normalize_timestamp
-from swift.common.exceptions import DiskFileNotExist
-import gluster.swift.common.DiskFile
-import gluster.swift.common.utils
-from gluster.swift.common.DiskFile import Gluster_DiskFile, \
- AlreadyExistsAsDir
-from gluster.swift.common.utils import DEFAULT_UID, DEFAULT_GID, X_TYPE, \
- X_OBJECT_TYPE
-from test_utils import _initxattr, _destroyxattr
-from test.unit import FakeLogger
-
-
-_metadata = {}
-
-def _mock_read_metadata(filename):
- if filename in _metadata:
- md = _metadata[filename]
- else:
- md = {}
- return md
-
-def _mock_write_metadata(filename, metadata):
- _metadata[filename] = metadata
-
-def _mock_clear_metadata():
- _metadata = {}
-
-
-class MockException(Exception):
- pass
-
-
-def _mock_rmdirs(p):
- raise MockException("gluster.swift.common.DiskFile.rmdirs() called")
-
-def _mock_do_listdir(p):
- raise MockException("gluster.swift.common.DiskFile.do_listdir() called")
-
-def _mock_do_unlink(f):
- ose = OSError()
- ose.errno = errno.ENOENT
- raise ose
-
-
-def _mock_do_unlink_eacces_err(f):
- ose = OSError()
- ose.errno = errno.EACCES
- raise ose
-
-def _mock_getsize_eaccess_err(f):
- ose = OSError()
- ose.errno = errno.EACCES
- raise ose
-
-def _mock_do_rmdir_eacces_err(f):
- ose = OSError()
- ose.errno = errno.EACCES
- raise ose
-
-class MockRenamerCalled(Exception):
- pass
-
-
-def _mock_renamer(a, b):
- raise MockRenamerCalled()
-
-
-class TestDiskFile(unittest.TestCase):
- """ Tests for gluster.swift.common.DiskFile """
-
- def setUp(self):
- self.lg = FakeLogger()
- _initxattr()
- _mock_clear_metadata()
- self._saved_df_wm = gluster.swift.common.DiskFile.write_metadata
- self._saved_df_rm = gluster.swift.common.DiskFile.read_metadata
- gluster.swift.common.DiskFile.write_metadata = _mock_write_metadata
- gluster.swift.common.DiskFile.read_metadata = _mock_read_metadata
- self._saved_ut_wm = gluster.swift.common.utils.write_metadata
- self._saved_ut_rm = gluster.swift.common.utils.read_metadata
- gluster.swift.common.utils.write_metadata = _mock_write_metadata
- gluster.swift.common.utils.read_metadata = _mock_read_metadata
-
- def tearDown(self):
- self.lg = None
- _destroyxattr()
- gluster.swift.common.DiskFile.write_metadata = self._saved_df_wm
- gluster.swift.common.DiskFile.read_metadata = self._saved_df_rm
- gluster.swift.common.utils.write_metadata = self._saved_ut_wm
- gluster.swift.common.utils.read_metadata = self._saved_ut_rm
-
- def test_constructor_no_slash(self):
- assert not os.path.exists("/tmp/foo")
- gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf._obj_path == ""
- assert gdf.name == "bar"
- assert gdf.datadir == "/tmp/foo/vol0/bar"
- assert gdf.device_path == "/tmp/foo/vol0"
- assert gdf._container_path == "/tmp/foo/vol0/bar"
- assert gdf.disk_chunk_size == 65536
- assert gdf.iter_hook == None
- assert gdf.logger == self.lg
- assert gdf.uid == DEFAULT_UID
- assert gdf.gid == DEFAULT_GID
- assert gdf.metadata == {}
- assert gdf.meta_file == None
- assert gdf.data_file == None
- assert gdf.fp == None
- assert gdf.iter_etag == None
- assert not gdf.started_at_0
- assert not gdf.read_to_eof
- assert gdf.quarantined_dir == None
- assert not gdf.keep_cache
- assert not gdf._is_dir
-
- def test_constructor_leadtrail_slash(self):
- assert not os.path.exists("/tmp/foo")
- gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar",
- "/b/a/z/", self.lg)
- assert gdf._obj == "z"
- assert gdf._obj_path == "b/a"
- assert gdf.name == "bar/b/a"
- assert gdf.datadir == "/tmp/foo/vol0/bar/b/a"
- assert gdf.device_path == "/tmp/foo/vol0"
-
- def test_constructor_no_metadata(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- stats = os.stat(the_file)
- ts = normalize_timestamp(stats.st_ctime)
- etag = md5()
- etag.update("1234")
- etag = etag.hexdigest()
- exp_md = {
- 'Content-Length': 4,
- 'ETag': etag,
- 'X-Timestamp': ts,
- 'Content-Type': 'application/octet-stream'}
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf.data_file == the_file
- assert not gdf._is_dir
- assert gdf.fp is None
- assert gdf.metadata == exp_md
- finally:
- shutil.rmtree(td)
-
- def test_constructor_existing_metadata(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- ini_md = {
- 'X-Type': 'Object',
- 'X-Object-Type': 'file',
- 'Content-Length': 5,
- 'ETag': 'etag',
- 'X-Timestamp': 'ts',
- 'Content-Type': 'application/loctet-stream'}
- _metadata[the_file] = ini_md
- exp_md = ini_md.copy()
- del exp_md['X-Type']
- del exp_md['X-Object-Type']
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf.data_file == the_file
- assert not gdf._is_dir
- assert gdf.fp is None
- assert gdf.metadata == exp_md
- finally:
- shutil.rmtree(td)
-
- def test_constructor_invalid_existing_metadata(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- inv_md = {
- 'Content-Length': 5,
- 'ETag': 'etag',
- 'X-Timestamp': 'ts',
- 'Content-Type': 'application/loctet-stream'}
- _metadata[the_file] = inv_md
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf.data_file == the_file
- assert not gdf._is_dir
- assert gdf.fp is None
- assert gdf.metadata != inv_md
- finally:
- shutil.rmtree(td)
-
- def test_constructor_isdir(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "d")
- try:
- os.makedirs(the_dir)
- ini_md = {
- 'X-Type': 'Object',
- 'X-Object-Type': 'dir',
- 'Content-Length': 5,
- 'ETag': 'etag',
- 'X-Timestamp': 'ts',
- 'Content-Type': 'application/loctet-stream'}
- _metadata[the_dir] = ini_md
- exp_md = ini_md.copy()
- del exp_md['X-Type']
- del exp_md['X-Object-Type']
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "d", self.lg, keep_data_fp=True)
- assert gdf._obj == "d"
- assert gdf.data_file == the_dir
- assert gdf._is_dir
- assert gdf.fp is None
- assert gdf.metadata == exp_md
- finally:
- shutil.rmtree(td)
-
- def test_constructor_keep_data_fp(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg, keep_data_fp=True)
- assert gdf._obj == "z"
- assert gdf.data_file == the_file
- assert not gdf._is_dir
- assert gdf.fp is not None
- finally:
- shutil.rmtree(td)
-
- def test_constructor_chunk_size(self):
- assert not os.path.exists("/tmp/foo")
- gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar",
- "z", self.lg, disk_chunk_size=8192)
- assert gdf.disk_chunk_size == 8192
-
- def test_constructor_iter_hook(self):
- assert not os.path.exists("/tmp/foo")
- gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar",
- "z", self.lg, iter_hook='hook')
- assert gdf.iter_hook == 'hook'
-
- def test_close(self):
- assert not os.path.exists("/tmp/foo")
- gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- # Should be a no-op, as by default is_dir is False, but fp is None
- gdf.close()
-
- gdf._is_dir = True
- gdf.fp = "123"
- # Should still be a no-op as is_dir is True (marker directory)
- gdf.close()
- assert gdf.fp == "123"
-
- gdf._is_dir = False
- saved_dc = gluster.swift.common.DiskFile.do_close
- self.called = False
- def our_do_close(fp):
- self.called = True
- gluster.swift.common.DiskFile.do_close = our_do_close
- try:
- gdf.close()
- assert self.called
- assert gdf.fp is None
- finally:
- gluster.swift.common.DiskFile.do_close = saved_dc
-
- def test_is_deleted(self):
- assert not os.path.exists("/tmp/foo")
- gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf.is_deleted()
- gdf.data_file = "/tmp/foo/bar"
- assert not gdf.is_deleted()
-
- def test_create_dir_object(self):
- td = tempfile.mkdtemp()
- the_dir = os.path.join(td, "vol0", "bar", "dir")
- try:
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "dir/z", self.lg)
- # Not created, dir object path is different, just checking
- assert gdf._obj == "z"
- gdf._create_dir_object(the_dir)
- assert os.path.isdir(the_dir)
- assert the_dir in _metadata
- finally:
- shutil.rmtree(td)
-
- def test_create_dir_object_exists(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "dir")
- try:
- os.makedirs(the_path)
- with open(the_dir, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "dir/z", self.lg)
- # Not created, dir object path is different, just checking
- assert gdf._obj == "z"
- def _mock_do_chown(p, u, g):
- assert u == DEFAULT_UID
- assert g == DEFAULT_GID
- dc = gluster.swift.common.DiskFile.do_chown
- gluster.swift.common.DiskFile.do_chown = _mock_do_chown
- try:
- gdf._create_dir_object(the_dir)
- finally:
- gluster.swift.common.DiskFile.do_chown = dc
- assert os.path.isdir(the_dir)
- assert the_dir in _metadata
- finally:
- shutil.rmtree(td)
-
- def test_put_metadata(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "z")
- try:
- os.makedirs(the_dir)
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- md = { 'Content-Type': 'application/octet-stream', 'a': 'b' }
- gdf.put_metadata(md.copy())
- assert gdf.metadata == md, "gdf.metadata = %r, md = %r" % (gdf.metadata, md)
- assert _metadata[the_dir] == md
- finally:
- shutil.rmtree(td)
-
- def test_put_w_tombstone(self):
- assert not os.path.exists("/tmp/foo")
- gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf.metadata == {}
-
- gdf.put_metadata({'x': '1'}, tombstone=True)
- assert gdf.metadata == {}
-
- def test_put_w_meta_file(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- newmd = gdf.metadata.copy()
- newmd['X-Object-Meta-test'] = '1234'
- gdf.put_metadata(newmd)
- assert gdf.metadata == newmd
- assert _metadata[the_file] == newmd
- finally:
- shutil.rmtree(td)
-
- def test_put_w_meta_file_no_content_type(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- newmd = gdf.metadata.copy()
- newmd['Content-Type'] = ''
- newmd['X-Object-Meta-test'] = '1234'
- gdf.put_metadata(newmd)
- assert gdf.metadata == newmd
- assert _metadata[the_file] == newmd
- finally:
- shutil.rmtree(td)
-
- def test_put_w_meta_dir(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "dir")
- try:
- os.makedirs(the_dir)
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "dir", self.lg)
- newmd = gdf.metadata.copy()
- newmd['X-Object-Meta-test'] = '1234'
- gdf.put_metadata(newmd)
- assert gdf.metadata == newmd
- assert _metadata[the_dir] == newmd
- finally:
- shutil.rmtree(td)
-
- def test_put_w_marker_dir(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "dir")
- try:
- os.makedirs(the_dir)
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "dir", self.lg)
- newmd = gdf.metadata.copy()
- newmd['X-Object-Meta-test'] = '1234'
- gdf.put_metadata(newmd)
- assert gdf.metadata == newmd
- assert _metadata[the_dir] == newmd
- finally:
- shutil.rmtree(td)
-
- def test_put_w_marker_dir_create(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "dir")
- try:
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "dir", self.lg)
- assert gdf.metadata == {}
- newmd = {
- 'Content-Length': 0,
- 'ETag': 'etag',
- 'X-Timestamp': 'ts',
- 'Content-Type': 'application/directory'}
- gdf.put(None, newmd, extension='.dir')
- assert gdf.data_file == the_dir
- assert gdf.metadata == newmd
- assert _metadata[the_dir] == newmd
- finally:
- shutil.rmtree(td)
-
- def test_put_is_dir(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "dir")
- try:
- os.makedirs(the_dir)
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "dir", self.lg)
- origmd = gdf.metadata.copy()
- origfmd = _metadata[the_dir]
- newmd = gdf.metadata.copy()
- # FIXME: This is a hack to get to the code-path; it is not clear
- # how this can happen normally.
- newmd['Content-Type'] = ''
- newmd['X-Object-Meta-test'] = '1234'
- try:
- gdf.put(None, newmd, extension='.data')
- except AlreadyExistsAsDir:
- pass
- else:
- self.fail("Expected to encounter 'already-exists-as-dir' exception")
- assert gdf.metadata == origmd
- assert _metadata[the_dir] == origfmd
- finally:
- shutil.rmtree(td)
-
- def test_put(self):
- td = tempfile.mkdtemp()
- try:
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf._obj_path == ""
- assert gdf.name == "bar"
- assert gdf.datadir == os.path.join(td, "vol0", "bar")
- assert gdf.data_file is None
-
- body = '1234\n'
- etag = md5()
- etag.update(body)
- etag = etag.hexdigest()
- metadata = {
- 'X-Timestamp': '1234',
- 'Content-Type': 'file',
- 'ETag': etag,
- 'Content-Length': '5',
- }
-
- with gdf.mkstemp() as fd:
- assert gdf.tmppath is not None
- tmppath = gdf.tmppath
- os.write(fd, body)
- gdf.put(fd, metadata)
-
- assert gdf.data_file == os.path.join(td, "vol0", "bar", "z")
- assert os.path.exists(gdf.data_file)
- assert not os.path.exists(tmppath)
- finally:
- shutil.rmtree(td)
-
- def test_put_obj_path(self):
- the_obj_path = os.path.join("b", "a")
- the_file = os.path.join(the_obj_path, "z")
- td = tempfile.mkdtemp()
- try:
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- the_file, self.lg)
- assert gdf._obj == "z"
- assert gdf._obj_path == the_obj_path
- assert gdf.name == os.path.join("bar", "b", "a")
- assert gdf.datadir == os.path.join(td, "vol0", "bar", "b", "a")
- assert gdf.data_file is None
-
- body = '1234\n'
- etag = md5()
- etag.update(body)
- etag = etag.hexdigest()
- metadata = {
- 'X-Timestamp': '1234',
- 'Content-Type': 'file',
- 'ETag': etag,
- 'Content-Length': '5',
- }
-
- with gdf.mkstemp() as fd:
- assert gdf.tmppath is not None
- tmppath = gdf.tmppath
- os.write(fd, body)
- gdf.put(fd, metadata)
-
- assert gdf.data_file == os.path.join(td, "vol0", "bar", "b", "a", "z")
- assert os.path.exists(gdf.data_file)
- assert not os.path.exists(tmppath)
- finally:
- shutil.rmtree(td)
-
- def test_unlinkold_no_metadata(self):
- assert not os.path.exists("/tmp/foo")
- gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf.metadata == {}
- _saved_rmdirs = gluster.swift.common.DiskFile.rmdirs
- _saved_do_listdir = gluster.swift.common.DiskFile.do_listdir
- gluster.swift.common.DiskFile.rmdirs = _mock_rmdirs
- gluster.swift.common.DiskFile.do_listdir = _mock_do_listdir
- try:
- gdf.unlinkold(None)
- except MockException as exp:
- self.fail(str(exp))
- finally:
- gluster.swift.common.DiskFile.rmdirs = _saved_rmdirs
- gluster.swift.common.DiskFile.do_listdir = _saved_do_listdir
-
- def test_unlinkold_same_timestamp(self):
- assert not os.path.exists("/tmp/foo")
- gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf.metadata == {}
- gdf.metadata['X-Timestamp'] = 1
- _saved_rmdirs = gluster.swift.common.DiskFile.rmdirs
- _saved_do_listdir = gluster.swift.common.DiskFile.do_listdir
- gluster.swift.common.DiskFile.rmdirs = _mock_rmdirs
- gluster.swift.common.DiskFile.do_listdir = _mock_do_listdir
- try:
- gdf.unlinkold(1)
- except MockException as exp:
- self.fail(str(exp))
- finally:
- gluster.swift.common.DiskFile.rmdirs = _saved_rmdirs
- gluster.swift.common.DiskFile.do_listdir = _saved_do_listdir
-
- def test_unlinkold_file(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf.data_file == the_file
- assert not gdf._is_dir
-
- later = float(gdf.metadata['X-Timestamp']) + 1
- gdf.unlinkold(normalize_timestamp(later))
- assert os.path.isdir(gdf.datadir)
- assert not os.path.exists(os.path.join(gdf.datadir, gdf._obj))
- finally:
- shutil.rmtree(td)
-
- def test_unlinkold_file_not_found(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf.data_file == the_file
- assert not gdf._is_dir
-
- # Handle the case the file is not in the directory listing.
- os.unlink(the_file)
-
- later = float(gdf.metadata['X-Timestamp']) + 1
- gdf.unlinkold(normalize_timestamp(later))
- assert os.path.isdir(gdf.datadir)
- assert not os.path.exists(os.path.join(gdf.datadir, gdf._obj))
- finally:
- shutil.rmtree(td)
-
- def test_unlinkold_file_unlink_error(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf.data_file == the_file
- assert not gdf._is_dir
-
- later = float(gdf.metadata['X-Timestamp']) + 1
-
- stats = os.stat(the_path)
- os.chmod(the_path, stats.st_mode & (~stat.S_IWUSR))
-
- # Handle the case do_unlink() raises an OSError
- __os_unlink = os.unlink
- os.unlink = _mock_do_unlink_eacces_err
- try:
- gdf.unlinkold(normalize_timestamp(later))
- except OSError as e:
- assert e.errno == errno.EACCES
- else:
- self.fail("Excepted an OSError when unlinking file")
- finally:
- os.unlink = __os_unlink
- os.chmod(the_path, stats.st_mode)
-
- assert os.path.isdir(gdf.datadir)
- assert os.path.exists(os.path.join(gdf.datadir, gdf._obj))
- finally:
- shutil.rmtree(td)
-
- def test_unlinkold_is_dir(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "d")
- try:
- os.makedirs(the_dir)
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "d", self.lg, keep_data_fp=True)
- assert gdf.data_file == the_dir
- assert gdf._is_dir
-
- later = float(gdf.metadata['X-Timestamp']) + 1
- gdf.unlinkold(normalize_timestamp(later))
- assert os.path.isdir(gdf.datadir)
- assert not os.path.exists(os.path.join(gdf.datadir, gdf._obj))
- finally:
- shutil.rmtree(td)
-
- def test_unlinkold_is_dir_failure(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "d")
- try:
- os.makedirs(the_dir)
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "d", self.lg, keep_data_fp=True)
- assert gdf.data_file == the_dir
- assert gdf._is_dir
-
- stats = os.stat(gdf.datadir)
- os.chmod(gdf.datadir, 0)
- __os_rmdir = os.rmdir
- os.rmdir = _mock_do_rmdir_eacces_err
- try:
- later = float(gdf.metadata['X-Timestamp']) + 1
- gdf.unlinkold(normalize_timestamp(later))
- finally:
- os.chmod(gdf.datadir, stats.st_mode)
- os.rmdir = __os_rmdir
- assert os.path.isdir(gdf.datadir)
- assert os.path.isdir(gdf.data_file)
- finally:
- shutil.rmtree(td)
-
- def test_get_data_file_size(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf.data_file == the_file
- assert not gdf._is_dir
- assert 4 == gdf.get_data_file_size()
- finally:
- shutil.rmtree(td)
-
- def test_get_data_file_size(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf.data_file == the_file
- assert not gdf._is_dir
- assert 4 == gdf.metadata['Content-Length']
- gdf.metadata['Content-Length'] = 3
- assert 4 == gdf.get_data_file_size()
- assert 4 == gdf.metadata['Content-Length']
- finally:
- shutil.rmtree(td)
-
- def test_get_data_file_size_dne(self):
- assert not os.path.exists("/tmp/foo")
- gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar",
- "/b/a/z/", self.lg)
- try:
- s = gdf.get_data_file_size()
- except DiskFileNotExist:
- pass
- else:
- self.fail("Expected DiskFileNotExist exception")
-
- def test_get_data_file_size_dne_os_err(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf.data_file == the_file
- assert not gdf._is_dir
- gdf.data_file = gdf.data_file + ".dne"
- try:
- s = gdf.get_data_file_size()
- except DiskFileNotExist:
- pass
- else:
- self.fail("Expected DiskFileNotExist exception")
- finally:
- shutil.rmtree(td)
-
- def test_get_data_file_size_os_err(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_file = os.path.join(the_path, "z")
- try:
- os.makedirs(the_path)
- with open(the_file, "wb") as fd:
- fd.write("1234")
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf._obj == "z"
- assert gdf.data_file == the_file
- assert not gdf._is_dir
- stats = os.stat(the_path)
- os.chmod(the_path, 0)
- __os_path_getsize = os.path.getsize
- os.path.getsize = _mock_getsize_eaccess_err
- try:
- s = gdf.get_data_file_size()
- except OSError as err:
- assert err.errno == errno.EACCES
- else:
- self.fail("Expected OSError exception")
- finally:
- os.path.getsize = __os_path_getsize
- os.chmod(the_path, stats.st_mode)
- finally:
- shutil.rmtree(td)
-
- def test_get_data_file_size_dir(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "d")
- try:
- os.makedirs(the_dir)
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "d", self.lg, keep_data_fp=True)
- assert gdf._obj == "d"
- assert gdf.data_file == the_dir
- assert gdf._is_dir
- assert 0 == gdf.get_data_file_size()
- finally:
- shutil.rmtree(td)
-
- def test_filter_metadata(self):
- assert not os.path.exists("/tmp/foo")
- gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar",
- "z", self.lg)
- assert gdf.metadata == {}
- gdf.filter_metadata()
- assert gdf.metadata == {}
-
- gdf.metadata[X_TYPE] = 'a'
- gdf.metadata[X_OBJECT_TYPE] = 'b'
- gdf.metadata['foobar'] = 'c'
- gdf.filter_metadata()
- assert X_TYPE not in gdf.metadata
- assert X_OBJECT_TYPE not in gdf.metadata
- assert 'foobar' in gdf.metadata
-
- def test_mkstemp(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "dir")
- try:
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "dir/z", self.lg)
- saved_tmppath = ''
- with gdf.mkstemp() as fd:
- assert gdf.datadir == os.path.join(td, "vol0", "bar", "dir")
- assert os.path.isdir(gdf.datadir)
- saved_tmppath = gdf.tmppath
- assert os.path.dirname(saved_tmppath) == gdf.datadir
- assert os.path.basename(saved_tmppath)[:3] == '.z.'
- assert os.path.exists(saved_tmppath)
- os.write(fd, "123")
- assert not os.path.exists(saved_tmppath)
- finally:
- shutil.rmtree(td)
-
- def test_mkstemp_err_on_close(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "dir")
- try:
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "dir/z", self.lg)
- saved_tmppath = ''
- with gdf.mkstemp() as fd:
- assert gdf.datadir == os.path.join(td, "vol0", "bar", "dir")
- assert os.path.isdir(gdf.datadir)
- saved_tmppath = gdf.tmppath
- assert os.path.dirname(saved_tmppath) == gdf.datadir
- assert os.path.basename(saved_tmppath)[:3] == '.z.'
- assert os.path.exists(saved_tmppath)
- os.write(fd, "123")
- # At the end of previous with block a close on fd is called.
- # Calling os.close on the same fd will raise an OSError
- # exception and we must catch it.
- try:
- os.close(fd)
- except OSError as err:
- pass
- else:
- self.fail("Exception expected")
- assert not os.path.exists(saved_tmppath)
- finally:
- shutil.rmtree(td)
-
- def test_mkstemp_err_on_unlink(self):
- td = tempfile.mkdtemp()
- the_path = os.path.join(td, "vol0", "bar")
- the_dir = os.path.join(the_path, "dir")
- try:
- gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar",
- "dir/z", self.lg)
- saved_tmppath = ''
- with gdf.mkstemp() as fd:
- assert gdf.datadir == os.path.join(td, "vol0", "bar", "dir")
- assert os.path.isdir(gdf.datadir)
- saved_tmppath = gdf.tmppath
- assert os.path.dirname(saved_tmppath) == gdf.datadir
- assert os.path.basename(saved_tmppath)[:3] == '.z.'
- assert os.path.exists(saved_tmppath)
- os.write(fd, "123")
- os.unlink(saved_tmppath)
- assert not os.path.exists(saved_tmppath)
- finally:
- shutil.rmtree(td)
diff --git a/ufo/test/unit/common/test_fs_utils.py b/ufo/test/unit/common/test_fs_utils.py
deleted file mode 100644
index 186e07d59..000000000
--- a/ufo/test/unit/common/test_fs_utils.py
+++ /dev/null
@@ -1,277 +0,0 @@
-# Copyright (c) 2012 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import shutil
-import random
-import unittest
-from tempfile import mkdtemp, mkstemp
-from gluster.swift.common import fs_utils as fs
-from gluster.swift.common.exceptions import NotDirectoryError, \
- FileOrDirNotFoundError
-
-class TestUtils(unittest.TestCase):
- """ Tests for common.utils """
-
- def test_do_walk(self):
- try:
- # create directory structure
- tmpparent = mkdtemp()
- tmpdirs = []
- tmpfiles = []
- for i in range(5):
- tmpdirs.append(mkdtemp(dir=tmpparent).rsplit(os.path.sep, 1)[1])
- tmpfiles.append(mkstemp(dir=tmpparent)[1].rsplit(os.path.sep, \
- 1)[1])
-
- for path, dirnames, filenames in fs.do_walk(tmpparent):
- assert path == tmpparent
- assert dirnames.sort() == tmpdirs.sort()
- assert filenames.sort() == tmpfiles.sort()
- break
- finally:
- shutil.rmtree(tmpparent)
-
- def test_do_open(self):
- try:
- fd, tmpfile = mkstemp()
- f = fs.do_open(tmpfile, 'r')
- try:
- f.write('test')
- except IOError as err:
- pass
- else:
- self.fail("IOError expected")
- finally:
- f.close()
- os.close(fd)
- os.remove(tmpfile)
-
- def test_do_open_err(self):
- try:
- fs.do_open(os.path.join('/tmp', str(random.random())), 'r')
- except IOError:
- pass
- else:
- self.fail("IOError expected")
-
- def test_do_write(self):
- try:
- fd, tmpfile = mkstemp()
- cnt = fs.do_write(fd, "test")
- assert cnt == len("test")
- finally:
- os.close(fd)
- os.remove(tmpfile)
-
- def test_do_write_err(self):
- try:
- fd, tmpfile = mkstemp()
- fd1 = os.open(tmpfile, os.O_RDONLY)
- fs.do_write(fd1, "test")
- except OSError:
- pass
- else:
- self.fail("OSError expected")
- finally:
- os.close(fd)
- os.close(fd1)
-
- def test_do_mkdir(self):
- try:
- path = os.path.join('/tmp', str(random.random()))
- fs.do_mkdir(path)
- assert os.path.exists(path)
- assert fs.do_mkdir(path)
- finally:
- os.rmdir(path)
-
- def test_do_mkdir_err(self):
- try:
- path = os.path.join('/tmp', str(random.random()), str(random.random()))
- fs.do_mkdir(path)
- except OSError:
- pass
- else:
- self.fail("OSError expected")
-
-
- def test_do_makedirs(self):
- try:
- subdir = os.path.join('/tmp', str(random.random()))
- path = os.path.join(subdir, str(random.random()))
- fs.do_makedirs(path)
- assert os.path.exists(path)
- assert fs.do_makedirs(path)
- finally:
- shutil.rmtree(subdir)
-
- def test_do_listdir(self):
- try:
- tmpdir = mkdtemp()
- subdir = []
- for i in range(5):
- subdir.append(mkdtemp(dir=tmpdir).rsplit(os.path.sep, 1)[1])
-
- assert subdir.sort() == fs.do_listdir(tmpdir).sort()
- finally:
- shutil.rmtree(tmpdir)
-
- def test_do_listdir_err(self):
- try:
- path = os.path.join('/tmp', str(random.random()))
- fs.do_listdir(path)
- except OSError:
- pass
- else:
- self.fail("OSError expected")
-
- def test_do_stat(self):
- try:
- tmpdir = mkdtemp()
- fd, tmpfile = mkstemp(dir=tmpdir)
- buf1 = os.stat(tmpfile)
- buf2 = fs.do_stat(fd)
- buf3 = fs.do_stat(tmpfile)
-
- assert buf1 == buf2
- assert buf1 == buf3
- finally:
- os.close(fd)
- os.remove(tmpfile)
- os.rmdir(tmpdir)
-
- def test_do_stat_err(self):
- try:
- fs.do_stat(os.path.join('/tmp', str(random.random())))
- except OSError:
- pass
- else:
- self.fail("OSError expected")
-
- def test_do_close(self):
- try:
- fd, tmpfile = mkstemp()
- fs.do_close(fd);
- try:
- os.write(fd, "test")
- except OSError:
- pass
- else:
- self.fail("OSError expected")
- fp = open(tmpfile)
- fs.do_close(fp)
- finally:
- os.remove(tmpfile)
-
- def test_do_unlink(self):
- try:
- fd, tmpfile = mkstemp()
- fs.do_unlink(tmpfile)
- assert not os.path.exists(tmpfile)
- assert fs.do_unlink(os.path.join('/tmp', str(random.random())))
- finally:
- os.close(fd)
-
- def test_do_unlink_err(self):
- try:
- tmpdir = mkdtemp()
- fs.do_unlink(tmpdir)
- except OSError:
- pass
- else:
- self.fail('OSError expected')
- finally:
- os.rmdir(tmpdir)
-
- def test_do_rmdir(self):
- tmpdir = mkdtemp()
- fs.do_rmdir(tmpdir)
- assert not os.path.exists(tmpdir)
- assert not fs.do_rmdir(os.path.join('/tmp', str(random.random())))
-
- def test_do_rmdir_err(self):
- try:
- fd, tmpfile = mkstemp()
- fs.do_rmdir(tmpfile)
- except OSError:
- pass
- else:
- self.fail('OSError expected')
- finally:
- os.close(fd)
- os.remove(tmpfile)
-
- def test_do_rename(self):
- try:
- srcpath = mkdtemp()
- destpath = os.path.join('/tmp', str(random.random()))
- fs.do_rename(srcpath, destpath)
- assert not os.path.exists(srcpath)
- assert os.path.exists(destpath)
- finally:
- os.rmdir(destpath)
-
- def test_do_rename_err(self):
- try:
- srcpath = os.path.join('/tmp', str(random.random()))
- destpath = os.path.join('/tmp', str(random.random()))
- fs.do_rename(srcpath, destpath)
- except OSError:
- pass
- else:
- self.fail("OSError expected")
-
- def test_dir_empty(self):
- try:
- tmpdir = mkdtemp()
- subdir = mkdtemp(dir=tmpdir)
- assert not fs.dir_empty(tmpdir)
- assert fs.dir_empty(subdir)
- finally:
- shutil.rmtree(tmpdir)
-
- def test_dir_empty_err(self):
- try:
- try:
- assert fs.dir_empty(os.path.join('/tmp', str(random.random())))
- except FileOrDirNotFoundError:
- pass
- else:
- self.fail("FileOrDirNotFoundError exception expected")
-
- fd, tmpfile = mkstemp()
- try:
- fs.dir_empty(tmpfile)
- except NotDirectoryError:
- pass
- else:
- self.fail("NotDirectoryError exception expected")
- finally:
- os.close(fd)
- os.unlink(tmpfile)
-
- def test_rmdirs(self):
- try:
- tmpdir = mkdtemp()
- subdir = mkdtemp(dir=tmpdir)
- fd, tmpfile = mkstemp(dir=tmpdir)
- assert not fs.rmdirs(tmpfile)
- assert not fs.rmdirs(tmpdir)
- assert fs.rmdirs(subdir)
- assert not os.path.exists(subdir)
- finally:
- os.close(fd)
- shutil.rmtree(tmpdir)
diff --git a/ufo/test/unit/common/test_ring.py b/ufo/test/unit/common/test_ring.py
deleted file mode 100644
index 8b7509cce..000000000
--- a/ufo/test/unit/common/test_ring.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# Copyright (c) 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import unittest
-import gluster.swift.common.constraints
-import swift.common.utils
-from gluster.swift.common.ring import Ring
-
-
-class TestRing(unittest.TestCase):
- """ Tests for common.utils """
-
- def setUp(self):
- swift.common.utils.HASH_PATH_SUFFIX = 'endcap'
- swiftdir = os.path.join(os.getcwd(), "common", "data")
- self.ring = Ring(swiftdir, ring_name='object')
-
- def test_first_device(self):
- part, node = self.ring.get_nodes('test')
- assert node[0]['device'] == 'test'
- node = self.ring.get_part_nodes(0)
- assert node[0]['device'] == 'test'
- for node in self.ring.get_more_nodes(0):
- assert node['device'] == 'volume_not_in_ring'
-
- def test_invalid_device(self):
- part, node = self.ring.get_nodes('test2')
- assert node[0]['device'] == 'volume_not_in_ring'
- node = self.ring.get_part_nodes(0)
- assert node[0]['device'] == 'volume_not_in_ring'
-
- def test_second_device(self):
- part, node = self.ring.get_nodes('iops')
- assert node[0]['device'] == 'iops'
- node = self.ring.get_part_nodes(0)
- assert node[0]['device'] == 'iops'
- for node in self.ring.get_more_nodes(0):
- assert node['device'] == 'volume_not_in_ring'
-
- def test_second_device_with_reseller_prefix(self):
- part, node = self.ring.get_nodes('AUTH_iops')
- assert node[0]['device'] == 'iops'
diff --git a/ufo/test/unit/common/test_utils.py b/ufo/test/unit/common/test_utils.py
deleted file mode 100644
index c645509fa..000000000
--- a/ufo/test/unit/common/test_utils.py
+++ /dev/null
@@ -1,1020 +0,0 @@
-# Copyright (c) 2012 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-""" Tests for common.utils """
-
-import os
-import unittest
-import errno
-import xattr
-import cPickle as pickle
-import tempfile
-import hashlib
-import tarfile
-import shutil
-from collections import defaultdict
-from swift.common.utils import normalize_timestamp
-from gluster.swift.common import utils, Glusterfs
-
-#
-# Somewhat hacky way of emulating the operation of xattr calls. They are made
-# against a dictionary that stores the xattr key/value pairs.
-#
-_xattrs = {}
-_xattr_op_cnt = defaultdict(int)
-_xattr_set_err = {}
-_xattr_get_err = {}
-_xattr_rem_err = {}
-
-def _xkey(path, key):
- return "%s:%s" % (path, key)
-
-def _setxattr(path, key, value, *args, **kwargs):
- _xattr_op_cnt['set'] += 1
- xkey = _xkey(path, key)
- if xkey in _xattr_set_err:
- e = IOError()
- e.errno = _xattr_set_err[xkey]
- raise e
- global _xattrs
- _xattrs[xkey] = value
-
-def _getxattr(path, key, *args, **kwargs):
- _xattr_op_cnt['get'] += 1
- xkey = _xkey(path, key)
- if xkey in _xattr_get_err:
- e = IOError()
- e.errno = _xattr_get_err[xkey]
- raise e
- global _xattrs
- if xkey in _xattrs:
- ret_val = _xattrs[xkey]
- else:
- e = IOError("Fake IOError")
- e.errno = errno.ENODATA
- raise e
- return ret_val
-
-def _removexattr(path, key, *args, **kwargs):
- _xattr_op_cnt['remove'] += 1
- xkey = _xkey(path, key)
- if xkey in _xattr_rem_err:
- e = IOError()
- e.errno = _xattr_rem_err[xkey]
- raise e
- global _xattrs
- if xkey in _xattrs:
- del _xattrs[xkey]
- else:
- e = IOError("Fake IOError")
- e.errno = errno.ENODATA
- raise e
-
-def _initxattr():
- global _xattrs
- _xattrs = {}
- global _xattr_op_cnt
- _xattr_op_cnt = defaultdict(int)
- global _xattr_set_err, _xattr_get_err, _xattr_rem_err
- _xattr_set_err = {}
- _xattr_get_err = {}
- _xattr_rem_err = {}
-
- # Save the current methods
- global _xattr_set; _xattr_set = xattr.setxattr
- global _xattr_get; _xattr_get = xattr.getxattr
- global _xattr_remove; _xattr_remove = xattr.removexattr
-
- # Monkey patch the calls we use with our internal unit test versions
- xattr.setxattr = _setxattr
- xattr.getxattr = _getxattr
- xattr.removexattr = _removexattr
-
-def _destroyxattr():
- # Restore the current methods just in case
- global _xattr_set; xattr.setxattr = _xattr_set
- global _xattr_get; xattr.getxattr = _xattr_get
- global _xattr_remove; xattr.removexattr = _xattr_remove
- # Destroy the stored values and
- global _xattrs; _xattrs = None
-
-
-class SimMemcache(object):
- def __init__(self):
- self._d = {}
-
- def get(self, key):
- return self._d.get(key, None)
-
- def set(self, key, value):
- self._d[key] = value
-
-
-class TestUtils(unittest.TestCase):
- """ Tests for common.utils """
-
- def setUp(self):
- _initxattr()
-
- def tearDown(self):
- _destroyxattr()
-
- def test_write_metadata(self):
- path = "/tmp/foo/w"
- orig_d = { 'bar' : 'foo' }
- utils.write_metadata(path, orig_d)
- xkey = _xkey(path, utils.METADATA_KEY)
- assert len(_xattrs.keys()) == 1
- assert xkey in _xattrs
- assert orig_d == pickle.loads(_xattrs[xkey])
- assert _xattr_op_cnt['set'] == 1
-
- def test_write_metadata_err(self):
- path = "/tmp/foo/w"
- orig_d = { 'bar' : 'foo' }
- xkey = _xkey(path, utils.METADATA_KEY)
- _xattr_set_err[xkey] = errno.EOPNOTSUPP
- try:
- utils.write_metadata(path, orig_d)
- except IOError as e:
- assert e.errno == errno.EOPNOTSUPP
- assert len(_xattrs.keys()) == 0
- assert _xattr_op_cnt['set'] == 1
- else:
- self.fail("Expected an IOError exception on write")
-
- def test_write_metadata_multiple(self):
- # At 64 KB an xattr key/value pair, this should generate three keys.
- path = "/tmp/foo/w"
- orig_d = { 'bar' : 'x' * 150000 }
- utils.write_metadata(path, orig_d)
- assert len(_xattrs.keys()) == 3, "Expected 3 keys, found %d" % len(_xattrs.keys())
- payload = ''
- for i in range(0,3):
- xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or ''))
- assert xkey in _xattrs
- assert len(_xattrs[xkey]) <= utils.MAX_XATTR_SIZE
- payload += _xattrs[xkey]
- assert orig_d == pickle.loads(payload)
- assert _xattr_op_cnt['set'] == 3, "%r" % _xattr_op_cnt
-
- def test_clean_metadata(self):
- path = "/tmp/foo/c"
- expected_d = { 'a': 'y' * 150000 }
- expected_p = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL)
- for i in range(0,3):
- xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or ''))
- _xattrs[xkey] = expected_p[:utils.MAX_XATTR_SIZE]
- expected_p = expected_p[utils.MAX_XATTR_SIZE:]
- assert not expected_p
- utils.clean_metadata(path)
- assert _xattr_op_cnt['remove'] == 4, "%r" % _xattr_op_cnt
-
- def test_clean_metadata_err(self):
- path = "/tmp/foo/c"
- xkey = _xkey(path, utils.METADATA_KEY)
- _xattrs[xkey] = pickle.dumps({ 'a': 'y' }, utils.PICKLE_PROTOCOL)
- _xattr_rem_err[xkey] = errno.EOPNOTSUPP
- try:
- utils.clean_metadata(path)
- except IOError as e:
- assert e.errno == errno.EOPNOTSUPP
- assert _xattr_op_cnt['remove'] == 1, "%r" % _xattr_op_cnt
- else:
- self.fail("Expected an IOError exception on remove")
-
- def test_read_metadata(self):
- path = "/tmp/foo/r"
- expected_d = { 'a': 'y' }
- xkey = _xkey(path, utils.METADATA_KEY)
- _xattrs[xkey] = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL)
- res_d = utils.read_metadata(path)
- assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d)
- assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt
-
- def test_read_metadata_notfound(self):
- path = "/tmp/foo/r"
- res_d = utils.read_metadata(path)
- assert res_d == {}
- assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt
-
- def test_read_metadata_err(self):
- path = "/tmp/foo/r"
- expected_d = { 'a': 'y' }
- xkey = _xkey(path, utils.METADATA_KEY)
- _xattrs[xkey] = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL)
- _xattr_get_err[xkey] = errno.EOPNOTSUPP
- try:
- res_d = utils.read_metadata(path)
- except IOError as e:
- assert e.errno == errno.EOPNOTSUPP
- assert (_xattr_op_cnt['get'] == 1), "%r" % _xattr_op_cnt
- else:
- self.fail("Expected an IOError exception on get")
-
- def test_read_metadata_multiple(self):
- path = "/tmp/foo/r"
- expected_d = { 'a': 'y' * 150000 }
- expected_p = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL)
- for i in range(0,3):
- xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or ''))
- _xattrs[xkey] = expected_p[:utils.MAX_XATTR_SIZE]
- expected_p = expected_p[utils.MAX_XATTR_SIZE:]
- assert not expected_p
- res_d = utils.read_metadata(path)
- assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d)
- assert _xattr_op_cnt['get'] == 3, "%r" % _xattr_op_cnt
-
- def test_read_metadata_multiple_one_missing(self):
- path = "/tmp/foo/r"
- expected_d = { 'a': 'y' * 150000 }
- expected_p = pickle.dumps(expected_d, utils.PICKLE_PROTOCOL)
- for i in range(0,2):
- xkey = _xkey(path, "%s%s" % (utils.METADATA_KEY, i or ''))
- _xattrs[xkey] = expected_p[:utils.MAX_XATTR_SIZE]
- expected_p = expected_p[utils.MAX_XATTR_SIZE:]
- assert len(expected_p) <= utils.MAX_XATTR_SIZE
- res_d = utils.read_metadata(path)
- assert res_d == {}
- assert _xattr_op_cnt['get'] == 3, "%r" % _xattr_op_cnt
- assert len(_xattrs.keys()) == 0, "Expected 0 keys, found %d" % len(_xattrs.keys())
-
- def test_restore_metadata_none(self):
- # No initial metadata
- path = "/tmp/foo/i"
- res_d = utils.restore_metadata(path, { 'b': 'y' })
- expected_d = { 'b': 'y' }
- assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d)
- assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt
- assert _xattr_op_cnt['set'] == 1, "%r" % _xattr_op_cnt
-
- def test_restore_metadata(self):
- # Initial metadata
- path = "/tmp/foo/i"
- initial_d = { 'a': 'z' }
- xkey = _xkey(path, utils.METADATA_KEY)
- _xattrs[xkey] = pickle.dumps(initial_d, utils.PICKLE_PROTOCOL)
- res_d = utils.restore_metadata(path, { 'b': 'y' })
- expected_d = { 'a': 'z', 'b': 'y' }
- assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d)
- assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt
- assert _xattr_op_cnt['set'] == 1, "%r" % _xattr_op_cnt
-
- def test_restore_metadata_nochange(self):
- # Initial metadata but no changes
- path = "/tmp/foo/i"
- initial_d = { 'a': 'z' }
- xkey = _xkey(path, utils.METADATA_KEY)
- _xattrs[xkey] = pickle.dumps(initial_d, utils.PICKLE_PROTOCOL)
- res_d = utils.restore_metadata(path, {})
- expected_d = { 'a': 'z' }
- assert res_d == expected_d, "Expected %r, result %r" % (expected_d, res_d)
- assert _xattr_op_cnt['get'] == 1, "%r" % _xattr_op_cnt
- assert _xattr_op_cnt['set'] == 0, "%r" % _xattr_op_cnt
-
- def test_add_timestamp_empty(self):
- orig = {}
- res = utils._add_timestamp(orig)
- assert res == {}
-
- def test_add_timestamp_none(self):
- orig = { 'a': 1, 'b': 2, 'c': 3 }
- exp = { 'a': (1, 0), 'b': (2, 0), 'c': (3, 0) }
- res = utils._add_timestamp(orig)
- assert res == exp
-
- def test_add_timestamp_mixed(self):
- orig = { 'a': 1, 'b': (2, 1), 'c': 3 }
- exp = { 'a': (1, 0), 'b': (2, 1), 'c': (3, 0) }
- res = utils._add_timestamp(orig)
- assert res == exp
-
- def test_add_timestamp_all(self):
- orig = { 'a': (1, 0), 'b': (2, 1), 'c': (3, 0) }
- res = utils._add_timestamp(orig)
- assert res == orig
-
- def test_get_etag_empty(self):
- tf = tempfile.NamedTemporaryFile()
- hd = utils._get_etag(tf.name)
- assert hd == hashlib.md5().hexdigest()
-
- def test_get_etag(self):
- tf = tempfile.NamedTemporaryFile()
- tf.file.write('123' * utils.CHUNK_SIZE)
- tf.file.flush()
- hd = utils._get_etag(tf.name)
- tf.file.seek(0)
- md5 = hashlib.md5()
- while True:
- chunk = tf.file.read(utils.CHUNK_SIZE)
- if not chunk:
- break
- md5.update(chunk)
- assert hd == md5.hexdigest()
-
- def test_get_object_metadata_dne(self):
- md = utils.get_object_metadata("/tmp/doesNotEx1st")
- assert md == {}
-
- def test_get_object_metadata_err(self):
- tf = tempfile.NamedTemporaryFile()
- try:
- md = utils.get_object_metadata(os.path.join(tf.name,"doesNotEx1st"))
- except OSError as e:
- assert e.errno != errno.ENOENT
- else:
- self.fail("Expected exception")
-
- obj_keys = (utils.X_TIMESTAMP, utils.X_CONTENT_TYPE, utils.X_ETAG,
- utils.X_CONTENT_LENGTH, utils.X_TYPE, utils.X_OBJECT_TYPE)
-
- def test_get_object_metadata_file(self):
- tf = tempfile.NamedTemporaryFile()
- tf.file.write('123'); tf.file.flush()
- md = utils.get_object_metadata(tf.name)
- for key in self.obj_keys:
- assert key in md, "Expected key %s in %r" % (key, md)
- assert md[utils.X_TYPE] == utils.OBJECT
- assert md[utils.X_OBJECT_TYPE] == utils.FILE
- assert md[utils.X_CONTENT_TYPE] == utils.FILE_TYPE
- assert md[utils.X_CONTENT_LENGTH] == os.path.getsize(tf.name)
- assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(tf.name))
- assert md[utils.X_ETAG] == utils._get_etag(tf.name)
-
- def test_get_object_metadata_dir(self):
- td = tempfile.mkdtemp()
- try:
- md = utils.get_object_metadata(td)
- for key in self.obj_keys:
- assert key in md, "Expected key %s in %r" % (key, md)
- assert md[utils.X_TYPE] == utils.OBJECT
- assert md[utils.X_OBJECT_TYPE] == utils.DIR
- assert md[utils.X_CONTENT_TYPE] == utils.DIR_TYPE
- assert md[utils.X_CONTENT_LENGTH] == 0
- assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(td))
- assert md[utils.X_ETAG] == hashlib.md5().hexdigest()
- finally:
- os.rmdir(td)
-
- def test_create_object_metadata_file(self):
- tf = tempfile.NamedTemporaryFile()
- tf.file.write('4567'); tf.file.flush()
- r_md = utils.create_object_metadata(tf.name)
-
- xkey = _xkey(tf.name, utils.METADATA_KEY)
- assert len(_xattrs.keys()) == 1
- assert xkey in _xattrs
- assert _xattr_op_cnt['get'] == 1
- assert _xattr_op_cnt['set'] == 1
- md = pickle.loads(_xattrs[xkey])
- assert r_md == md
-
- for key in self.obj_keys:
- assert key in md, "Expected key %s in %r" % (key, md)
- assert md[utils.X_TYPE] == utils.OBJECT
- assert md[utils.X_OBJECT_TYPE] == utils.FILE
- assert md[utils.X_CONTENT_TYPE] == utils.FILE_TYPE
- assert md[utils.X_CONTENT_LENGTH] == os.path.getsize(tf.name)
- assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(tf.name))
- assert md[utils.X_ETAG] == utils._get_etag(tf.name)
-
- def test_create_object_metadata_dir(self):
- td = tempfile.mkdtemp()
- try:
- r_md = utils.create_object_metadata(td)
-
- xkey = _xkey(td, utils.METADATA_KEY)
- assert len(_xattrs.keys()) == 1
- assert xkey in _xattrs
- assert _xattr_op_cnt['get'] == 1
- assert _xattr_op_cnt['set'] == 1
- md = pickle.loads(_xattrs[xkey])
- assert r_md == md
-
- for key in self.obj_keys:
- assert key in md, "Expected key %s in %r" % (key, md)
- assert md[utils.X_TYPE] == utils.OBJECT
- assert md[utils.X_OBJECT_TYPE] == utils.DIR
- assert md[utils.X_CONTENT_TYPE] == utils.DIR_TYPE
- assert md[utils.X_CONTENT_LENGTH] == 0
- assert md[utils.X_TIMESTAMP] == normalize_timestamp(os.path.getctime(td))
- assert md[utils.X_ETAG] == hashlib.md5().hexdigest()
- finally:
- os.rmdir(td)
-
- def test_get_container_metadata(self):
- def _mock_get_container_details(path, memcache=None):
- o_list = [ 'a', 'b', 'c' ]
- o_count = 3
- b_used = 47
- return o_list, o_count, b_used
- orig_gcd = utils.get_container_details
- utils.get_container_details = _mock_get_container_details
- td = tempfile.mkdtemp()
- try:
- exp_md = {
- utils.X_TYPE: (utils.CONTAINER, 0),
- utils.X_TIMESTAMP: (normalize_timestamp(os.path.getctime(td)), 0),
- utils.X_PUT_TIMESTAMP: (normalize_timestamp(os.path.getmtime(td)), 0),
- utils.X_OBJECTS_COUNT: (3, 0),
- utils.X_BYTES_USED: (47, 0),
- }
- md = utils.get_container_metadata(td)
- assert md == exp_md
- finally:
- utils.get_container_details = orig_gcd
- os.rmdir(td)
-
- def test_get_account_metadata(self):
- def _mock_get_account_details(path, memcache=None):
- c_list = [ '123', 'abc' ]
- c_count = 2
- return c_list, c_count
- orig_gad = utils.get_account_details
- utils.get_account_details = _mock_get_account_details
- td = tempfile.mkdtemp()
- try:
- exp_md = {
- utils.X_TYPE: (utils.ACCOUNT, 0),
- utils.X_TIMESTAMP: (normalize_timestamp(os.path.getctime(td)), 0),
- utils.X_PUT_TIMESTAMP: (normalize_timestamp(os.path.getmtime(td)), 0),
- utils.X_OBJECTS_COUNT: (0, 0),
- utils.X_BYTES_USED: (0, 0),
- utils.X_CONTAINER_COUNT: (2, 0),
- }
- md = utils.get_account_metadata(td)
- assert md == exp_md
- finally:
- utils.get_account_details = orig_gad
- os.rmdir(td)
-
- cont_keys = [utils.X_TYPE, utils.X_TIMESTAMP, utils.X_PUT_TIMESTAMP,
- utils.X_OBJECTS_COUNT, utils.X_BYTES_USED]
-
- def test_create_container_metadata(self):
- td = tempfile.mkdtemp()
- try:
- r_md = utils.create_container_metadata(td)
-
- xkey = _xkey(td, utils.METADATA_KEY)
- assert len(_xattrs.keys()) == 1
- assert xkey in _xattrs
- assert _xattr_op_cnt['get'] == 1
- assert _xattr_op_cnt['set'] == 1
- md = pickle.loads(_xattrs[xkey])
- assert r_md == md
-
- for key in self.cont_keys:
- assert key in md, "Expected key %s in %r" % (key, md)
- assert md[utils.X_TYPE] == (utils.CONTAINER, 0)
- assert md[utils.X_TIMESTAMP] == (normalize_timestamp(os.path.getctime(td)), 0)
- assert md[utils.X_PUT_TIMESTAMP] == (normalize_timestamp(os.path.getmtime(td)), 0)
- assert md[utils.X_OBJECTS_COUNT] == (0, 0)
- assert md[utils.X_BYTES_USED] == (0, 0)
- finally:
- os.rmdir(td)
-
- acct_keys = [val for val in cont_keys]
- acct_keys.append(utils.X_CONTAINER_COUNT)
-
- def test_create_account_metadata(self):
- td = tempfile.mkdtemp()
- try:
- r_md = utils.create_account_metadata(td)
-
- xkey = _xkey(td, utils.METADATA_KEY)
- assert len(_xattrs.keys()) == 1
- assert xkey in _xattrs
- assert _xattr_op_cnt['get'] == 1
- assert _xattr_op_cnt['set'] == 1
- md = pickle.loads(_xattrs[xkey])
- assert r_md == md
-
- for key in self.acct_keys:
- assert key in md, "Expected key %s in %r" % (key, md)
- assert md[utils.X_TYPE] == (utils.ACCOUNT, 0)
- assert md[utils.X_TIMESTAMP] == (normalize_timestamp(os.path.getctime(td)), 0)
- assert md[utils.X_PUT_TIMESTAMP] == (normalize_timestamp(os.path.getmtime(td)), 0)
- assert md[utils.X_OBJECTS_COUNT] == (0, 0)
- assert md[utils.X_BYTES_USED] == (0, 0)
- assert md[utils.X_CONTAINER_COUNT] == (0, 0)
- finally:
- os.rmdir(td)
-
- def test_container_details_uncached(self):
- the_path = "/tmp/bar"
- def mock_get_container_details_from_fs(cont_path):
- bu = 5
- oc = 1
- ol = ['foo',]
- dl = [('a',100),]
- return utils.ContainerDetails(bu, oc, ol, dl)
- orig_gcdff = utils._get_container_details_from_fs
- utils._get_container_details_from_fs = mock_get_container_details_from_fs
- try:
- retval = utils.get_container_details(the_path)
- cd = mock_get_container_details_from_fs(the_path)
- assert retval == (cd.obj_list, cd.object_count, cd.bytes_used)
- finally:
- utils._get_container_details_from_fs = orig_gcdff
-
- def test_container_details_cached_hit(self):
- mc = SimMemcache()
- the_path = "/tmp/bar"
- def mock_get_container_details_from_fs(cont_path, bu_p=5):
- bu = bu_p
- oc = 1
- ol = ['foo',]
- dl = [('a',100),]
- return utils.ContainerDetails(bu, oc, ol, dl)
- def mock_do_stat(path):
- class MockStat(object):
- def __init__(self, mtime):
- self.st_mtime = mtime
- return MockStat(100)
- cd = mock_get_container_details_from_fs(the_path, bu_p=6)
- mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path, cd)
- orig_gcdff = utils._get_container_details_from_fs
- utils._get_container_details_from_fs = mock_get_container_details_from_fs
- orig_ds = utils.do_stat
- utils.do_stat = mock_do_stat
- try:
- retval = utils.get_container_details(the_path, memcache=mc)
- # If it did not properly use memcache, the default mocked version
- # of get details from fs would return 5 bytes used instead of the
- # 6 we specified above.
- cd = mock_get_container_details_from_fs(the_path, bu_p=6)
- assert retval == (cd.obj_list, cd.object_count, cd.bytes_used)
- finally:
- utils._get_container_details_from_fs = orig_gcdff
- utils.do_stat = orig_ds
-
- def test_container_details_cached_miss_key(self):
- mc = SimMemcache()
- the_path = "/tmp/bar"
- def mock_get_container_details_from_fs(cont_path, bu_p=5):
- bu = bu_p
- oc = 1
- ol = ['foo',]
- dl = [('a',100),]
- return utils.ContainerDetails(bu, oc, ol, dl)
- def mock_do_stat(path):
- # Be sure we don't miss due to mtimes not matching
- self.fail("do_stat should not have been called")
- cd = mock_get_container_details_from_fs(the_path + "u", bu_p=6)
- mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path + "u", cd)
- orig_gcdff = utils._get_container_details_from_fs
- utils._get_container_details_from_fs = mock_get_container_details_from_fs
- orig_ds = utils.do_stat
- utils.do_stat = mock_do_stat
- try:
- retval = utils.get_container_details(the_path, memcache=mc)
- cd = mock_get_container_details_from_fs(the_path)
- assert retval == (cd.obj_list, cd.object_count, cd.bytes_used)
- mkey = utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path
- assert mkey in mc._d
- finally:
- utils._get_container_details_from_fs = orig_gcdff
- utils.do_stat = orig_ds
-
- def test_container_details_cached_miss_dir_list(self):
- mc = SimMemcache()
- the_path = "/tmp/bar"
- def mock_get_container_details_from_fs(cont_path, bu_p=5):
- bu = bu_p
- oc = 1
- ol = ['foo',]
- dl = []
- return utils.ContainerDetails(bu, oc, ol, dl)
- def mock_do_stat(path):
- # Be sure we don't miss due to mtimes not matching
- self.fail("do_stat should not have been called")
- cd = mock_get_container_details_from_fs(the_path, bu_p=6)
- mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path, cd)
- orig_gcdff = utils._get_container_details_from_fs
- utils._get_container_details_from_fs = mock_get_container_details_from_fs
- orig_ds = utils.do_stat
- utils.do_stat = mock_do_stat
- try:
- retval = utils.get_container_details(the_path, memcache=mc)
- cd = mock_get_container_details_from_fs(the_path)
- assert retval == (cd.obj_list, cd.object_count, cd.bytes_used)
- mkey = utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path
- assert mkey in mc._d
- assert 5 == mc._d[mkey].bytes_used
- finally:
- utils._get_container_details_from_fs = orig_gcdff
- utils.do_stat = orig_ds
-
- def test_container_details_cached_miss_mtime(self):
- mc = SimMemcache()
- the_path = "/tmp/bar"
- def mock_get_container_details_from_fs(cont_path, bu_p=5):
- bu = bu_p
- oc = 1
- ol = ['foo',]
- dl = [('a',100),]
- return utils.ContainerDetails(bu, oc, ol, dl)
- def mock_do_stat(path):
- # Be sure we miss due to mtimes not matching
- class MockStat(object):
- def __init__(self, mtime):
- self.st_mtime = mtime
- return MockStat(200)
- cd = mock_get_container_details_from_fs(the_path, bu_p=6)
- mc.set(utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path, cd)
- orig_gcdff = utils._get_container_details_from_fs
- utils._get_container_details_from_fs = mock_get_container_details_from_fs
- orig_ds = utils.do_stat
- utils.do_stat = mock_do_stat
- try:
- retval = utils.get_container_details(the_path, memcache=mc)
- cd = mock_get_container_details_from_fs(the_path)
- assert retval == (cd.obj_list, cd.object_count, cd.bytes_used)
- mkey = utils.MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + the_path
- assert mkey in mc._d
- assert 5 == mc._d[mkey].bytes_used
- finally:
- utils._get_container_details_from_fs = orig_gcdff
- utils.do_stat = orig_ds
-
- def test_account_details_uncached(self):
- the_path = "/tmp/bar"
- def mock_get_account_details_from_fs(acc_path, acc_stats):
- mt = 100
- cc = 2
- cl = ['a', 'b']
- return utils.AccountDetails(mt, cc, cl)
- orig_gcdff = utils._get_account_details_from_fs
- utils._get_account_details_from_fs = mock_get_account_details_from_fs
- try:
- retval = utils.get_account_details(the_path)
- ad = mock_get_account_details_from_fs(the_path, None)
- assert retval == (ad.container_list, ad.container_count)
- finally:
- utils._get_account_details_from_fs = orig_gcdff
-
- def test_account_details_cached_hit(self):
- mc = SimMemcache()
- the_path = "/tmp/bar"
- def mock_get_account_details_from_fs(acc_path, acc_stats):
- mt = 100
- cc = 2
- cl = ['a', 'b']
- return utils.AccountDetails(mt, cc, cl)
- def mock_do_stat(path):
- class MockStat(object):
- def __init__(self, mtime):
- self.st_mtime = mtime
- return MockStat(100)
- ad = mock_get_account_details_from_fs(the_path, None)
- ad.container_list = ['x', 'y']
- mc.set(utils.MEMCACHE_ACCOUNT_DETAILS_KEY_PREFIX + the_path, ad)
- orig_gcdff = utils._get_account_details_from_fs
- orig_ds = utils.do_stat
- utils._get_account_details_from_fs = mock_get_account_details_from_fs
- utils.do_stat = mock_do_stat
- try:
- retval = utils.get_account_details(the_path, memcache=mc)
- assert retval == (ad.container_list, ad.container_count)
- wrong_ad = mock_get_account_details_from_fs(the_path, None)
- assert wrong_ad != ad
- finally:
- utils._get_account_details_from_fs = orig_gcdff
- utils.do_stat = orig_ds
-
- def test_account_details_cached_miss(self):
- mc = SimMemcache()
- the_path = "/tmp/bar"
- def mock_get_account_details_from_fs(acc_path, acc_stats):
- mt = 100
- cc = 2
- cl = ['a', 'b']
- return utils.AccountDetails(mt, cc, cl)
- def mock_do_stat(path):
- class MockStat(object):
- def __init__(self, mtime):
- self.st_mtime = mtime
- return MockStat(100)
- ad = mock_get_account_details_from_fs(the_path, None)
- ad.container_list = ['x', 'y']
- mc.set(utils.MEMCACHE_ACCOUNT_DETAILS_KEY_PREFIX + the_path + 'u', ad)
- orig_gcdff = utils._get_account_details_from_fs
- orig_ds = utils.do_stat
- utils._get_account_details_from_fs = mock_get_account_details_from_fs
- utils.do_stat = mock_do_stat
- try:
- retval = utils.get_account_details(the_path, memcache=mc)
- correct_ad = mock_get_account_details_from_fs(the_path, None)
- assert retval == (correct_ad.container_list, correct_ad.container_count)
- assert correct_ad != ad
- finally:
- utils._get_account_details_from_fs = orig_gcdff
- utils.do_stat = orig_ds
-
- def test_account_details_cached_miss_mtime(self):
- mc = SimMemcache()
- the_path = "/tmp/bar"
- def mock_get_account_details_from_fs(acc_path, acc_stats):
- mt = 100
- cc = 2
- cl = ['a', 'b']
- return utils.AccountDetails(mt, cc, cl)
- def mock_do_stat(path):
- class MockStat(object):
- def __init__(self, mtime):
- self.st_mtime = mtime
- return MockStat(100)
- ad = mock_get_account_details_from_fs(the_path, None)
- ad.container_list = ['x', 'y']
- ad.mtime = 200
- mc.set(utils.MEMCACHE_ACCOUNT_DETAILS_KEY_PREFIX + the_path, ad)
- orig_gcdff = utils._get_account_details_from_fs
- orig_ds = utils.do_stat
- utils._get_account_details_from_fs = mock_get_account_details_from_fs
- utils.do_stat = mock_do_stat
- try:
- retval = utils.get_account_details(the_path, memcache=mc)
- correct_ad = mock_get_account_details_from_fs(the_path, None)
- assert retval == (correct_ad.container_list, correct_ad.container_count)
- assert correct_ad != ad
- finally:
- utils._get_account_details_from_fs = orig_gcdff
- utils.do_stat = orig_ds
-
- def test_get_account_details_from_fs(self):
- orig_cwd = os.getcwd()
- td = tempfile.mkdtemp()
- try:
- tf = tarfile.open("common/data/account_tree.tar.bz2", "r:bz2")
- os.chdir(td)
- tf.extractall()
-
- ad = utils._get_account_details_from_fs(td, None)
- assert ad.mtime == os.path.getmtime(td)
- assert ad.container_count == 3
- assert set(ad.container_list) == set(['c1', 'c2', 'c3'])
- finally:
- os.chdir(orig_cwd)
- shutil.rmtree(td)
-
- def test_get_container_details_from_fs_notadir(self):
- tf = tempfile.NamedTemporaryFile()
- cd = utils._get_container_details_from_fs(tf.name)
- assert cd.bytes_used == 0
- assert cd.object_count == 0
- assert cd.obj_list == []
- assert cd.dir_list == []
-
- def test_get_container_details_from_fs(self):
- orig_cwd = os.getcwd()
- td = tempfile.mkdtemp()
- try:
- tf = tarfile.open("common/data/container_tree.tar.bz2", "r:bz2")
- os.chdir(td)
- tf.extractall()
-
- cd = utils._get_container_details_from_fs(td)
- assert cd.bytes_used == 0, repr(cd.bytes_used)
- assert cd.object_count == 8, repr(cd.object_count)
- assert set(cd.obj_list) == set(['file1', 'file3', 'file2',
- 'dir3', 'dir1', 'dir2',
- 'dir1/file1', 'dir1/file2'
- ]), repr(cd.obj_list)
-
- full_dir1 = os.path.join(td, 'dir1')
- full_dir2 = os.path.join(td, 'dir2')
- full_dir3 = os.path.join(td, 'dir3')
- exp_dir_dict = { td: os.path.getmtime(td),
- full_dir1: os.path.getmtime(full_dir1),
- full_dir2: os.path.getmtime(full_dir2),
- full_dir3: os.path.getmtime(full_dir3),
- }
- for d,m in cd.dir_list:
- assert d in exp_dir_dict
- assert exp_dir_dict[d] == m
- finally:
- os.chdir(orig_cwd)
- shutil.rmtree(td)
-
-
- def test_get_container_details_from_fs_do_getsize_true(self):
- orig_cwd = os.getcwd()
- td = tempfile.mkdtemp()
- try:
- tf = tarfile.open("common/data/container_tree.tar.bz2", "r:bz2")
- os.chdir(td)
- tf.extractall()
-
- __do_getsize = Glusterfs._do_getsize
- Glusterfs._do_getsize = True
-
- cd = utils._get_container_details_from_fs(td)
- assert cd.bytes_used == 30, repr(cd.bytes_used)
- assert cd.object_count == 8, repr(cd.object_count)
- assert set(cd.obj_list) == set(['file1', 'file3', 'file2',
- 'dir3', 'dir1', 'dir2',
- 'dir1/file1', 'dir1/file2'
- ]), repr(cd.obj_list)
-
- full_dir1 = os.path.join(td, 'dir1')
- full_dir2 = os.path.join(td, 'dir2')
- full_dir3 = os.path.join(td, 'dir3')
- exp_dir_dict = { td: os.path.getmtime(td),
- full_dir1: os.path.getmtime(full_dir1),
- full_dir2: os.path.getmtime(full_dir2),
- full_dir3: os.path.getmtime(full_dir3),
- }
- for d,m in cd.dir_list:
- assert d in exp_dir_dict
- assert exp_dir_dict[d] == m
- finally:
- Glusterfs._do_getsize = __do_getsize
- os.chdir(orig_cwd)
- shutil.rmtree(td)
-
- def test_get_account_details_from_fs_notadir_w_stats(self):
- tf = tempfile.NamedTemporaryFile()
- ad = utils._get_account_details_from_fs(tf.name, os.stat(tf.name))
- assert ad.mtime == os.path.getmtime(tf.name)
- assert ad.container_count == 0
- assert ad.container_list == []
-
- def test_get_account_details_from_fs_notadir(self):
- tf = tempfile.NamedTemporaryFile()
- ad = utils._get_account_details_from_fs(tf.name, None)
- assert ad.mtime == os.path.getmtime(tf.name)
- assert ad.container_count == 0
- assert ad.container_list == []
-
- def test_write_pickle(self):
- td = tempfile.mkdtemp()
- try:
- fpp = os.path.join(td, 'pp')
- utils.write_pickle('pickled peppers', fpp)
- with open(fpp, "rb") as f:
- contents = f.read()
- s = pickle.loads(contents)
- assert s == 'pickled peppers', repr(s)
- finally:
- shutil.rmtree(td)
-
- def test_write_pickle_ignore_tmp(self):
- tf = tempfile.NamedTemporaryFile()
- td = tempfile.mkdtemp()
- try:
- fpp = os.path.join(td, 'pp')
- # Also test an explicity pickle protocol
- utils.write_pickle('pickled peppers', fpp, tmp=tf.name, pickle_protocol=2)
- with open(fpp, "rb") as f:
- contents = f.read()
- s = pickle.loads(contents)
- assert s == 'pickled peppers', repr(s)
- with open(tf.name, "rb") as f:
- contents = f.read()
- assert contents == ''
- finally:
- shutil.rmtree(td)
-
- def test_check_user_xattr_bad_path(self):
- assert False == utils.check_user_xattr("/tmp/foo/bar/check/user/xattr")
-
- def test_check_user_xattr_bad_set(self):
- td = tempfile.mkdtemp()
- xkey = _xkey(td, 'user.test.key1')
- _xattr_set_err[xkey] = errno.EOPNOTSUPP
- try:
- assert False == utils.check_user_xattr(td)
- except IOError:
- pass
- else:
- self.fail("Expected IOError")
- finally:
- shutil.rmtree(td)
-
- def test_check_user_xattr_bad_remove(self):
- td = tempfile.mkdtemp()
- xkey = _xkey(td, 'user.test.key1')
- _xattr_rem_err[xkey] = errno.EOPNOTSUPP
- try:
- utils.check_user_xattr(td)
- except IOError:
- self.fail("Unexpected IOError")
- finally:
- shutil.rmtree(td)
-
- def test_check_user_xattr(self):
- td = tempfile.mkdtemp()
- try:
- assert utils.check_user_xattr(td)
- finally:
- shutil.rmtree(td)
-
- def test_validate_container_empty(self):
- ret = utils.validate_container({})
- assert ret == False
-
- def test_validate_container_missing_keys(self):
- ret = utils.validate_container({ 'foo': 'bar' })
- assert ret == False
-
- def test_validate_container_bad_type(self):
- md = { utils.X_TYPE: ('bad', 0),
- utils.X_TIMESTAMP: ('na', 0),
- utils.X_PUT_TIMESTAMP: ('na', 0),
- utils.X_OBJECTS_COUNT: ('na', 0),
- utils.X_BYTES_USED: ('na', 0) }
- ret = utils.validate_container(md)
- assert ret == False
-
- def test_validate_container_good_type(self):
- md = { utils.X_TYPE: (utils.CONTAINER, 0),
- utils.X_TIMESTAMP: ('na', 0),
- utils.X_PUT_TIMESTAMP: ('na', 0),
- utils.X_OBJECTS_COUNT: ('na', 0),
- utils.X_BYTES_USED: ('na', 0) }
- ret = utils.validate_container(md)
- assert ret
-
- def test_validate_account_empty(self):
- ret = utils.validate_account({})
- assert ret == False
-
- def test_validate_account_missing_keys(self):
- ret = utils.validate_account({ 'foo': 'bar' })
- assert ret == False
-
- def test_validate_account_bad_type(self):
- md = { utils.X_TYPE: ('bad', 0),
- utils.X_TIMESTAMP: ('na', 0),
- utils.X_PUT_TIMESTAMP: ('na', 0),
- utils.X_OBJECTS_COUNT: ('na', 0),
- utils.X_BYTES_USED: ('na', 0),
- utils.X_CONTAINER_COUNT: ('na', 0) }
- ret = utils.validate_account(md)
- assert ret == False
-
- def test_validate_account_good_type(self):
- md = { utils.X_TYPE: (utils.ACCOUNT, 0),
- utils.X_TIMESTAMP: ('na', 0),
- utils.X_PUT_TIMESTAMP: ('na', 0),
- utils.X_OBJECTS_COUNT: ('na', 0),
- utils.X_BYTES_USED: ('na', 0),
- utils.X_CONTAINER_COUNT: ('na', 0) }
- ret = utils.validate_account(md)
- assert ret
-
- def test_validate_object_empty(self):
- ret = utils.validate_object({})
- assert ret == False
-
- def test_validate_object_missing_keys(self):
- ret = utils.validate_object({ 'foo': 'bar' })
- assert ret == False
-
- def test_validate_object_bad_type(self):
- md = { utils.X_TIMESTAMP: 'na',
- utils.X_CONTENT_TYPE: 'na',
- utils.X_ETAG: 'bad',
- utils.X_CONTENT_LENGTH: 'na',
- utils.X_TYPE: 'bad',
- utils.X_OBJECT_TYPE: 'na' }
- ret = utils.validate_object(md)
- assert ret == False
-
- def test_validate_object_good_type(self):
- md = { utils.X_TIMESTAMP: 'na',
- utils.X_CONTENT_TYPE: 'na',
- utils.X_ETAG: 'bad',
- utils.X_CONTENT_LENGTH: 'na',
- utils.X_TYPE: utils.OBJECT,
- utils.X_OBJECT_TYPE: 'na' }
- ret = utils.validate_object(md)
- assert ret
-
- def test_is_marker_empty(self):
- assert False == utils.is_marker(None)
-
- def test_is_marker_missing(self):
- assert False == utils.is_marker( { 'foo': 'bar' } )
-
- def test_is_marker_not_marker(self):
- md = { utils.X_OBJECT_TYPE: utils.DIR }
- assert False == utils.is_marker(md)
-
- def test_is_marker(self):
- md = { utils.X_OBJECT_TYPE: utils.MARKER_DIR }
- assert utils.is_marker(md)