diff options
Diffstat (limited to 'ufo/test/unit/common/test_diskfile.py')
-rw-r--r-- | ufo/test/unit/common/test_diskfile.py | 884 |
1 files changed, 884 insertions, 0 deletions
diff --git a/ufo/test/unit/common/test_diskfile.py b/ufo/test/unit/common/test_diskfile.py new file mode 100644 index 00000000000..8c4756a7b69 --- /dev/null +++ b/ufo/test/unit/common/test_diskfile.py @@ -0,0 +1,884 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Tests for gluster.swift.common.DiskFile """ + +import os +import stat +import errno +import unittest +import tempfile +import shutil +from hashlib import md5 +from swift.common.utils import normalize_timestamp +from swift.common.exceptions import DiskFileNotExist +import gluster.swift.common.DiskFile +import gluster.swift.common.utils +from gluster.swift.common.DiskFile import Gluster_DiskFile, \ + AlreadyExistsAsDir +from gluster.swift.common.utils import DEFAULT_UID, DEFAULT_GID, X_TYPE, \ + X_OBJECT_TYPE +from test_utils import _initxattr, _destroyxattr +from test.unit import FakeLogger + + +_metadata = {} + +def _mock_read_metadata(filename): + if filename in _metadata: + md = _metadata[filename] + else: + md = {} + return md + +def _mock_write_metadata(filename, metadata): + _metadata[filename] = metadata + +def _mock_clear_metadata(): + _metadata = {} + + +class MockException(Exception): + pass + + +def _mock_rmdirs(p): + raise MockException("gluster.swift.common.DiskFile.rmdirs() called") + +def _mock_do_listdir(p): + raise MockException("gluster.swift.common.DiskFile.do_listdir() called") + +def _mock_do_unlink(f): + ose = OSError() + ose.errno = errno.ENOENT + raise ose + + +class MockRenamerCalled(Exception): + pass + + +def _mock_renamer(a, b): + raise MockRenamerCalled() + + +class TestDiskFile(unittest.TestCase): + """ Tests for gluster.swift.common.DiskFile """ + + def setUp(self): + self.lg = FakeLogger() + _initxattr() + _mock_clear_metadata() + self._saved_df_wm = gluster.swift.common.DiskFile.write_metadata + self._saved_df_rm = gluster.swift.common.DiskFile.read_metadata + gluster.swift.common.DiskFile.write_metadata = _mock_write_metadata + gluster.swift.common.DiskFile.read_metadata = _mock_read_metadata + self._saved_ut_wm = gluster.swift.common.utils.write_metadata + self._saved_ut_rm = gluster.swift.common.utils.read_metadata + gluster.swift.common.utils.write_metadata = _mock_write_metadata + gluster.swift.common.utils.read_metadata = _mock_read_metadata + + def tearDown(self): + self.lg = None + _destroyxattr() + gluster.swift.common.DiskFile.write_metadata = self._saved_df_wm + gluster.swift.common.DiskFile.read_metadata = self._saved_df_rm + gluster.swift.common.utils.write_metadata = self._saved_ut_wm + gluster.swift.common.utils.read_metadata = self._saved_ut_rm + + def test_constructor_no_slash(self): + assert not os.path.exists("/tmp/foo") + gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.obj_path == "" + assert gdf.name == "bar" + assert gdf.datadir == "/tmp/foo/vol0/bar" + assert gdf.device_path == "/tmp/foo/vol0" + assert gdf.container_path == "/tmp/foo/vol0/bar" + assert gdf.tmpdir == "/tmp/foo/vol0/tmp" + assert gdf.disk_chunk_size == 65536 + assert gdf.logger == self.lg + assert gdf.uid == DEFAULT_UID + assert gdf.gid == DEFAULT_GID + assert gdf.metadata == {} + assert gdf.data_file == None + assert gdf.fp == None + assert gdf.iter_etag == None + assert not gdf.started_at_0 + assert not gdf.read_to_eof + assert gdf.quarantined_dir == None + assert not gdf.keep_cache + assert not gdf.is_dir + assert gdf.is_valid + + def test_constructor_leadtrail_slash(self): + assert not os.path.exists("/tmp/foo") + gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", + "/b/a/z/", self.lg) + assert gdf.obj == "z" + assert gdf.obj_path == "b/a" + assert gdf.name == "bar/b/a" + assert gdf.datadir == "/tmp/foo/vol0/bar/b/a" + assert gdf.device_path == "/tmp/foo/vol0" + assert gdf.tmpdir == "/tmp/foo/vol0/tmp" + + def test_constructor_no_metadata(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + stats = os.stat(the_file) + ts = normalize_timestamp(stats.st_ctime) + etag = md5() + etag.update("1234") + etag = etag.hexdigest() + exp_md = { + 'Content-Length': 4, + 'ETag': etag, + 'X-Timestamp': ts, + 'Content-Type': 'application/octet-stream'} + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.data_file == the_file + assert not gdf.is_dir + assert gdf.fp is None + assert gdf.metadata == exp_md + finally: + shutil.rmtree(td) + + def test_constructor_existing_metadata(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + ini_md = { + 'X-Type': 'Object', + 'X-Object-Type': 'file', + 'Content-Length': 5, + 'ETag': 'etag', + 'X-Timestamp': 'ts', + 'Content-Type': 'application/loctet-stream'} + _metadata[the_file] = ini_md + exp_md = ini_md.copy() + del exp_md['X-Type'] + del exp_md['X-Object-Type'] + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.data_file == the_file + assert not gdf.is_dir + assert gdf.fp is None + assert gdf.metadata == exp_md + finally: + shutil.rmtree(td) + + def test_constructor_invalid_existing_metadata(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + inv_md = { + 'Content-Length': 5, + 'ETag': 'etag', + 'X-Timestamp': 'ts', + 'Content-Type': 'application/loctet-stream'} + _metadata[the_file] = inv_md + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.data_file == the_file + assert not gdf.is_dir + assert gdf.fp is None + assert gdf.metadata != inv_md + finally: + shutil.rmtree(td) + + def test_constructor_isdir(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "d") + try: + os.makedirs(the_dir) + ini_md = { + 'X-Type': 'Object', + 'X-Object-Type': 'dir', + 'Content-Length': 5, + 'ETag': 'etag', + 'X-Timestamp': 'ts', + 'Content-Type': 'application/loctet-stream'} + _metadata[the_dir] = ini_md + exp_md = ini_md.copy() + del exp_md['X-Type'] + del exp_md['X-Object-Type'] + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "d", self.lg, keep_data_fp=True) + assert gdf.obj == "d" + assert gdf.data_file == the_dir + assert gdf.is_dir + assert gdf.fp is None + assert gdf.metadata == exp_md + finally: + shutil.rmtree(td) + + def test_constructor_keep_data_fp(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg, keep_data_fp=True) + assert gdf.obj == "z" + assert gdf.data_file == the_file + assert not gdf.is_dir + assert gdf.fp is not None + finally: + shutil.rmtree(td) + + def test_constructor_chunk_size(self): + assert not os.path.exists("/tmp/foo") + gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", + "z", self.lg, disk_chunk_size=8192) + assert gdf.disk_chunk_size == 8192 + + def test_close(self): + assert not os.path.exists("/tmp/foo") + gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", + "z", self.lg) + # Should be a no-op, as by default is_dir is False, but fp is None + gdf.close() + + gdf.is_dir = True + gdf.fp = "123" + # Should still be a no-op as is_dir is True (marker directory) + gdf.close() + assert gdf.fp == "123" + + gdf.is_dir = False + saved_dc = gluster.swift.common.DiskFile.do_close + self.called = False + def our_do_close(fp): + self.called = True + gluster.swift.common.DiskFile.do_close = our_do_close + try: + gdf.close() + assert self.called + assert gdf.fp is None + finally: + gluster.swift.common.DiskFile.do_close = saved_dc + + def test_is_deleted(self): + assert not os.path.exists("/tmp/foo") + gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.is_deleted() + gdf.data_file = "/tmp/foo/bar" + assert not gdf.is_deleted() + + def test_create_dir_object(self): + td = tempfile.mkdtemp() + the_dir = os.path.join(td, "vol0", "bar", "dir") + try: + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "dir/z", self.lg) + # Not created, dir object path is different, just checking + assert gdf.obj == "z" + gdf.create_dir_object(the_dir) + assert os.path.isdir(the_dir) + assert the_dir in _metadata + finally: + shutil.rmtree(td) + + def test_create_dir_object_exists(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "dir") + try: + os.makedirs(the_path) + with open(the_dir, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "dir/z", self.lg) + # Not created, dir object path is different, just checking + assert gdf.obj == "z" + def _mock_do_chown(p, u, g): + assert u == DEFAULT_UID + assert g == DEFAULT_GID + dc = gluster.swift.common.DiskFile.do_chown + gluster.swift.common.DiskFile.do_chown = _mock_do_chown + try: + gdf.create_dir_object(the_dir) + finally: + gluster.swift.common.DiskFile.do_chown = dc + assert os.path.isdir(the_dir) + assert the_dir in _metadata + finally: + shutil.rmtree(td) + + def test_put_metadata(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "dir") + try: + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "dir/z", self.lg) + md = { 'a': 'b' } + gdf.put_metadata(md) + assert gdf.metadata == md + assert _metadata[os.path.join(the_dir, "z")] == md + finally: + shutil.rmtree(td) + + def test_put_w_tombstone(self): + assert not os.path.exists("/tmp/foo") + gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.metadata == {} + + gdf.put(None, '', {'x': '1'}, extension='.ts') + assert gdf.metadata == {} + + def test_put_w_meta_file(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + newmd = gdf.metadata.copy() + newmd['X-Object-Meta-test'] = '1234' + with gdf.mkstemp() as (fd, tmppath): + gdf.put(fd, tmppath, newmd, extension='.meta') + assert gdf.metadata == newmd + assert _metadata[the_file] == newmd + finally: + shutil.rmtree(td) + + def test_put_w_meta_file_no_content_type(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + newmd = gdf.metadata.copy() + newmd['Content-Type'] = '' + newmd['X-Object-Meta-test'] = '1234' + with gdf.mkstemp() as (fd, tmppath): + gdf.put(fd, tmppath, newmd, extension='.meta') + assert gdf.metadata == newmd + assert _metadata[the_file] == newmd + finally: + shutil.rmtree(td) + + def test_put_w_meta_dir(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "dir") + try: + os.makedirs(the_dir) + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "dir", self.lg) + newmd = gdf.metadata.copy() + newmd['X-Object-Meta-test'] = '1234' + gdf.put(None, None, newmd, extension='.meta') + assert gdf.metadata == newmd + assert _metadata[the_dir] == newmd + finally: + shutil.rmtree(td) + + def test_put_w_marker_dir(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "dir") + try: + os.makedirs(the_dir) + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "dir", self.lg) + newmd = gdf.metadata.copy() + newmd['X-Object-Meta-test'] = '1234' + gdf.put(None, None, newmd, extension='.data') + assert gdf.metadata == newmd + assert _metadata[the_dir] == newmd + finally: + shutil.rmtree(td) + + + def test_put_is_dir(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "dir") + try: + os.makedirs(the_dir) + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "dir", self.lg) + origmd = gdf.metadata.copy() + origfmd = _metadata[the_dir] + newmd = gdf.metadata.copy() + # FIXME: This is a hack to get to the code-path; it is not clear + # how this can happen normally. + newmd['Content-Type'] = '' + newmd['X-Object-Meta-test'] = '1234' + try: + gdf.put(None, None, newmd, extension='.data') + except AlreadyExistsAsDir: + pass + else: + self.fail("Expected to encounter 'already-exists-as-dir' exception") + assert gdf.metadata == origmd + assert _metadata[the_dir] == origfmd + finally: + shutil.rmtree(td) + + def test_put(self): + td = tempfile.mkdtemp() + try: + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.obj_path == "" + assert gdf.name == "bar" + assert gdf.datadir == os.path.join(td, "vol0", "bar") + assert gdf.data_file is None + + body = '1234\n' + etag = md5() + etag.update(body) + etag = etag.hexdigest() + metadata = { + 'X-Timestamp': '1234', + 'Content-Type': 'file', + 'ETag': etag, + 'Content-Length': '5', + } + + with gdf.mkstemp() as (fd, tmppath): + os.write(fd, body) + gdf.put(fd, tmppath, metadata) + + assert gdf.data_file == os.path.join(td, "vol0", "bar", "z") + assert os.path.exists(gdf.data_file) + assert not os.path.exists(tmppath) + finally: + shutil.rmtree(td) + + def test_put_obj_path(self): + the_obj_path = os.path.join("b", "a") + the_file = os.path.join(the_obj_path, "z") + td = tempfile.mkdtemp() + try: + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + the_file, self.lg) + assert gdf.obj == "z" + assert gdf.obj_path == the_obj_path + assert gdf.name == os.path.join("bar", "b", "a") + assert gdf.datadir == os.path.join(td, "vol0", "bar", "b", "a") + assert gdf.data_file is None + + body = '1234\n' + etag = md5() + etag.update(body) + etag = etag.hexdigest() + metadata = { + 'X-Timestamp': '1234', + 'Content-Type': 'file', + 'ETag': etag, + 'Content-Length': '5', + } + + with gdf.mkstemp() as (fd, tmppath): + os.write(fd, body) + gdf.put(fd, tmppath, metadata) + + assert gdf.data_file == os.path.join(td, "vol0", "bar", "b", "a", "z") + assert os.path.exists(gdf.data_file) + assert not os.path.exists(tmppath) + finally: + shutil.rmtree(td) + + def test_unlinkold_no_metadata(self): + assert not os.path.exists("/tmp/foo") + gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.metadata == {} + _saved_rmdirs = gluster.swift.common.DiskFile.rmdirs + _saved_do_listdir = gluster.swift.common.DiskFile.do_listdir + gluster.swift.common.DiskFile.rmdirs = _mock_rmdirs + gluster.swift.common.DiskFile.do_listdir = _mock_do_listdir + try: + gdf.unlinkold(None) + except MockException as exp: + self.fail(str(exp)) + finally: + gluster.swift.common.DiskFile.rmdirs = _saved_rmdirs + gluster.swift.common.DiskFile.do_listdir = _saved_do_listdir + + def test_unlinkold_same_timestamp(self): + assert not os.path.exists("/tmp/foo") + gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.metadata == {} + gdf.metadata['X-Timestamp'] = 1 + _saved_rmdirs = gluster.swift.common.DiskFile.rmdirs + _saved_do_listdir = gluster.swift.common.DiskFile.do_listdir + gluster.swift.common.DiskFile.rmdirs = _mock_rmdirs + gluster.swift.common.DiskFile.do_listdir = _mock_do_listdir + try: + gdf.unlinkold(1) + except MockException as exp: + self.fail(str(exp)) + finally: + gluster.swift.common.DiskFile.rmdirs = _saved_rmdirs + gluster.swift.common.DiskFile.do_listdir = _saved_do_listdir + + def test_unlinkold_file(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.data_file == the_file + assert not gdf.is_dir + + later = float(gdf.metadata['X-Timestamp']) + 1 + gdf.unlinkold(normalize_timestamp(later)) + assert os.path.isdir(gdf.datadir) + assert not os.path.exists(os.path.join(gdf.datadir, gdf.obj)) + finally: + shutil.rmtree(td) + + def test_unlinkold_file_not_found(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.data_file == the_file + assert not gdf.is_dir + + # Handle the case the file is not in the directory listing. + os.unlink(the_file) + + later = float(gdf.metadata['X-Timestamp']) + 1 + gdf.unlinkold(normalize_timestamp(later)) + assert os.path.isdir(gdf.datadir) + assert not os.path.exists(os.path.join(gdf.datadir, gdf.obj)) + finally: + shutil.rmtree(td) + + def test_unlinkold_file_unlink_error(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.data_file == the_file + assert not gdf.is_dir + + later = float(gdf.metadata['X-Timestamp']) + 1 + + stats = os.stat(the_path) + os.chmod(the_path, stats.st_mode & (~stat.S_IWUSR)) + + # Handle the case do_unlink() raises an OSError + try: + gdf.unlinkold(normalize_timestamp(later)) + except OSError as e: + assert e.errno != errno.ENOENT + else: + self.fail("Excepted an OSError when unlinking file") + finally: + os.chmod(the_path, stats.st_mode) + + assert os.path.isdir(gdf.datadir) + assert os.path.exists(os.path.join(gdf.datadir, gdf.obj)) + finally: + shutil.rmtree(td) + + def test_unlinkold_is_dir(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "d") + try: + os.makedirs(the_dir) + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "d", self.lg, keep_data_fp=True) + assert gdf.data_file == the_dir + assert gdf.is_dir + + later = float(gdf.metadata['X-Timestamp']) + 1 + gdf.unlinkold(normalize_timestamp(later)) + assert os.path.isdir(gdf.datadir) + assert not os.path.exists(os.path.join(gdf.datadir, gdf.obj)) + finally: + shutil.rmtree(td) + + def test_unlinkold_is_dir_failure(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "d") + try: + os.makedirs(the_dir) + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "d", self.lg, keep_data_fp=True) + assert gdf.data_file == the_dir + assert gdf.is_dir + + stats = os.stat(gdf.datadir) + os.chmod(gdf.datadir, 0) + try: + later = float(gdf.metadata['X-Timestamp']) + 1 + gdf.unlinkold(normalize_timestamp(later)) + finally: + os.chmod(gdf.datadir, stats.st_mode) + assert os.path.isdir(gdf.datadir) + assert os.path.isdir(gdf.data_file) + finally: + shutil.rmtree(td) + + def test_get_data_file_size(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.data_file == the_file + assert not gdf.is_dir + assert 4 == gdf.get_data_file_size() + finally: + shutil.rmtree(td) + + def test_get_data_file_size(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.data_file == the_file + assert not gdf.is_dir + assert 4 == gdf.metadata['Content-Length'] + gdf.metadata['Content-Length'] = 3 + assert 4 == gdf.get_data_file_size() + assert 4 == gdf.metadata['Content-Length'] + finally: + shutil.rmtree(td) + + def test_get_data_file_size_dne(self): + assert not os.path.exists("/tmp/foo") + gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", + "/b/a/z/", self.lg) + try: + s = gdf.get_data_file_size() + except DiskFileNotExist: + pass + else: + self.fail("Expected DiskFileNotExist exception") + + def test_get_data_file_size_dne_os_err(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.data_file == the_file + assert not gdf.is_dir + gdf.data_file = gdf.data_file + ".dne" + try: + s = gdf.get_data_file_size() + except DiskFileNotExist: + pass + else: + self.fail("Expected DiskFileNotExist exception") + finally: + shutil.rmtree(td) + + def test_get_data_file_size_os_err(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_file = os.path.join(the_path, "z") + try: + os.makedirs(the_path) + with open(the_file, "wb") as fd: + fd.write("1234") + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.obj == "z" + assert gdf.data_file == the_file + assert not gdf.is_dir + stats = os.stat(the_path) + os.chmod(the_path, 0) + try: + s = gdf.get_data_file_size() + except OSError as err: + assert err.errno != errno.ENOENT + else: + self.fail("Expected OSError exception") + finally: + os.chmod(the_path, stats.st_mode) + finally: + shutil.rmtree(td) + + def test_get_data_file_size_dir(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "d") + try: + os.makedirs(the_dir) + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "d", self.lg, keep_data_fp=True) + assert gdf.obj == "d" + assert gdf.data_file == the_dir + assert gdf.is_dir + assert 0 == gdf.get_data_file_size() + finally: + shutil.rmtree(td) + + def test_update_object(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "dir") + try: + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "dir/z", self.lg) + md = { 'a': 'b' } + gdf.update_object(md) + assert gdf.metadata == md + assert _metadata[os.path.join(the_dir, "z")] == md + finally: + shutil.rmtree(td) + + def test_filter_metadata(self): + assert not os.path.exists("/tmp/foo") + gdf = Gluster_DiskFile("/tmp/foo", "vol0", "p57", "ufo47", "bar", + "z", self.lg) + assert gdf.metadata == {} + gdf.filter_metadata() + assert gdf.metadata == {} + + gdf.metadata[X_TYPE] = 'a' + gdf.metadata[X_OBJECT_TYPE] = 'b' + gdf.metadata['foobar'] = 'c' + gdf.filter_metadata() + assert X_TYPE not in gdf.metadata + assert X_OBJECT_TYPE not in gdf.metadata + assert 'foobar' in gdf.metadata + + def test_mkstemp(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "dir") + try: + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "dir/z", self.lg) + saved_tmppath = '' + with gdf.mkstemp() as (fd, tmppath): + assert gdf.tmpdir == os.path.join(td, "vol0", "tmp") + assert os.path.isdir(gdf.tmpdir) + assert os.path.dirname(tmppath) == gdf.tmpdir + assert os.path.exists(tmppath) + saved_tmppath = tmppath + os.write(fd, "123") + assert not os.path.exists(saved_tmppath) + finally: + shutil.rmtree(td) + + def test_mkstemp_err_on_close(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "dir") + try: + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "dir/z", self.lg) + saved_tmppath = '' + with gdf.mkstemp() as (fd, tmppath): + assert gdf.tmpdir == os.path.join(td, "vol0", "tmp") + assert os.path.isdir(gdf.tmpdir) + assert os.path.dirname(tmppath) == gdf.tmpdir + assert os.path.exists(tmppath) + saved_tmppath = tmppath + os.write(fd, "123") + os.close(fd) + assert not os.path.exists(saved_tmppath) + finally: + shutil.rmtree(td) + + def test_mkstemp_err_on_unlink(self): + td = tempfile.mkdtemp() + the_path = os.path.join(td, "vol0", "bar") + the_dir = os.path.join(the_path, "dir") + try: + gdf = Gluster_DiskFile(td, "vol0", "p57", "ufo47", "bar", + "dir/z", self.lg) + saved_tmppath = '' + with gdf.mkstemp() as (fd, tmppath): + assert gdf.tmpdir == os.path.join(td, "vol0", "tmp") + assert os.path.isdir(gdf.tmpdir) + assert os.path.dirname(tmppath) == gdf.tmpdir + assert os.path.exists(tmppath) + saved_tmppath = tmppath + os.write(fd, "123") + os.unlink(tmppath) + assert not os.path.exists(saved_tmppath) + finally: + shutil.rmtree(td) |