diff options
Diffstat (limited to 'gluster/swift/common/DiskFile.py')
-rw-r--r-- | gluster/swift/common/DiskFile.py | 536 |
1 files changed, 465 insertions, 71 deletions
diff --git a/gluster/swift/common/DiskFile.py b/gluster/swift/common/DiskFile.py index 1ae5f7c..623248a 100644 --- a/gluster/swift/common/DiskFile.py +++ b/gluster/swift/common/DiskFile.py @@ -1,4 +1,4 @@ -# Copyright (c) 2012 Red Hat, Inc. +# Copyright (c) 2013 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,29 +15,241 @@ import os import stat +import fcntl import errno import random +import logging from hashlib import md5 +from eventlet import sleep from contextlib import contextmanager -from swift.common.utils import renamer +from swift.common.utils import TRUE_VALUES, fallocate from swift.common.exceptions import DiskFileNotExist, DiskFileError -from gluster.swift.common.exceptions import AlreadyExistsAsDir -from gluster.swift.common.fs_utils import mkdirs, do_open, do_close, \ +from gluster.swift.common.exceptions import GlusterFileSystemOSError +from gluster.swift.common.fs_utils import do_fstat, do_open, do_close, \ do_unlink, do_chown, os_path, do_fsync, do_fchown, do_stat from gluster.swift.common.utils import read_metadata, write_metadata, \ - validate_object, create_object_metadata, rmobjdir, dir_is_object + validate_object, create_object_metadata, rmobjdir, dir_is_object, \ + get_object_metadata from gluster.swift.common.utils import X_CONTENT_LENGTH, X_CONTENT_TYPE, \ X_TIMESTAMP, X_TYPE, X_OBJECT_TYPE, FILE, OBJECT, DIR_TYPE, \ FILE_TYPE, DEFAULT_UID, DEFAULT_GID, DIR_NON_OBJECT, DIR_OBJECT +from ConfigParser import ConfigParser, NoSectionError, NoOptionError from swift.obj.server import DiskFile +# FIXME: Hopefully we'll be able to move to Python 2.7+ where O_CLOEXEC will +# be back ported. See http://www.python.org/dev/peps/pep-0433/ +O_CLOEXEC = 02000000 DEFAULT_DISK_CHUNK_SIZE = 65536 # keep these lower-case DISALLOWED_HEADERS = set('content-length content-type deleted etag'.split()) +def _random_sleep(): + sleep(random.uniform(0.5, 0.15)) + + +def _lock_parent(full_path): + parent_path, _ = full_path.rsplit(os.path.sep, 1) + try: + fd = os.open(parent_path, os.O_RDONLY | O_CLOEXEC) + except OSError as err: + if err.errno == errno.ENOENT: + # Cannot lock the parent because it does not exist, let the caller + # handle this situation. + return False + raise + else: + while True: + # Spin sleeping for 1/10th of a second until we get the lock. + # FIXME: Consider adding a final timeout just abort the operation. + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as err: + if err.errno == errno.EAGAIN: + _random_sleep() + else: + # Don't leak an open file on an exception + os.close(fd) + raise + except Exception: + # Don't leak an open file for any other exception + os.close(fd) + raise + else: + break + return fd + + +def _make_directory_locked(full_path, uid, gid, metadata=None): + fd = _lock_parent(full_path) + if fd is False: + # Parent does not exist either, pass this situation on to the caller + # to handle. + return False, metadata + try: + # Check for directory existence + stats = do_stat(full_path) + if stats: + # It now exists, having acquired the lock of its parent directory, + # but verify it is actually a directory + is_dir = stat.S_ISDIR(stats.st_mode) + if not is_dir: + # It is not a directory! + raise DiskFileError("_make_directory_locked: non-directory" + " found at path %s when expecting a" + " directory", full_path) + return True, metadata + + # We know the parent directory exists, and we have it locked, attempt + # the creation of the target directory. + return _make_directory_unlocked(full_path, uid, gid, metadata=metadata) + finally: + # We're done here, be sure to remove our lock and close our open FD. + try: + fcntl.flock(fd, fcntl.LOCK_UN) + except: + pass + os.close(fd) + + +def _make_directory_unlocked(full_path, uid, gid, metadata=None): + """ + Make a directory and change the owner ship as specified, and potentially + creating the object metadata if requested. + """ + try: + os.mkdir(full_path) + except OSError as err: + if err.errno == errno.ENOENT: + # Tell the caller some directory of the parent path does not + # exist. + return False, metadata + elif err.errno == errno.EEXIST: + # Possible race, in that the caller invoked this method when it + # had previously determined the file did not exist. + # + # FIXME: When we are confident, remove this stat() call as it is + # not necessary. + try: + stats = os.stat(full_path) + except OSError as serr: + # FIXME: Ideally we'd want to return an appropriate error + # message and code in the PUT Object REST API response. + raise DiskFileError("_make_directory_unlocked: os.mkdir failed" + " because path %s already exists, and" + " a subsequent os.stat on that same" + " path failed (%s)" % (full_path, + str(serr))) + else: + is_dir = stat.S_ISDIR(stats.st_mode) + if not is_dir: + # FIXME: Ideally we'd want to return an appropriate error + # message and code in the PUT Object REST API response. + raise DiskFileError("_make_directory_unlocked: os.mkdir" + " failed on path %s because it already" + " exists but not as a directory" % ( + full_path)) + return True, metadata + elif err.errno == errno.ENOTDIR: + # FIXME: Ideally we'd want to return an appropriate error + # message and code in the PUT Object REST API response. + raise DiskFileError("_make_directory_unlocked: os.mkdir failed" + " because some part of path %s is not in fact" + " a directory" % (full_path)) + elif err.errno == errno.EIO: + # Sometimes Fuse will return an EIO error when it does not know + # how to handle an unexpected, but transient situation. It is + # possible the directory now exists, stat() it to find out after a + # short period of time. + _random_sleep() + try: + stats = os.stat(full_path) + except OSError as serr: + if serr.errno == errno.ENOENT: + errmsg = "_make_directory_unlocked: os.mkdir failed on" \ + " path %s (EIO), and a subsequent os.stat on" \ + " that same path did not find the file." % ( + full_path,) + else: + errmsg = "_make_directory_unlocked: os.mkdir failed on" \ + " path %s (%s), and a subsequent os.stat on" \ + " that same path failed as well (%s)" % ( + full_path, str(err), str(serr)) + raise DiskFileError(errmsg) + else: + # The directory at least exists now + is_dir = stat.S_ISDIR(stats.st_mode) + if is_dir: + # Dump the stats to the log with the original exception. + logging.warn("_make_directory_unlocked: os.mkdir initially" + " failed on path %s (%s) but a stat()" + " following that succeeded: %r" % (full_path, + str(err), + stats)) + # Assume another entity took care of the proper setup. + return True, metadata + else: + raise DiskFileError("_make_directory_unlocked: os.mkdir" + " initially failed on path %s (%s) but" + " now we see that it exists but is not" + " a directory (%r)" % (full_path, + str(err), + stats)) + else: + # Some other potentially rare exception occurred that does not + # currently warrant a special log entry to help diagnose. + raise DiskFileError("_make_directory_unlocked: os.mkdir failed on" + " path %s (%s)" % (full_path, str(err))) + else: + if metadata: + # We were asked to set the initial metadata for this object. + metadata_orig = get_object_metadata(full_path) + metadata_orig.update(metadata) + write_metadata(full_path, metadata_orig) + metadata = metadata_orig + + # We created it, so we are reponsible for always setting the proper + # ownership. + do_chown(full_path, uid, gid) + return True, metadata + + +_fs_conf = ConfigParser() +if _fs_conf.read(os.path.join('/etc/swift', 'fs.conf')): + try: + _mkdir_locking = _fs_conf.get('DEFAULT', 'mkdir_locking', "no") \ + in TRUE_VALUES + except (NoSectionError, NoOptionError): + _mkdir_locking = False + try: + _use_put_mount = _fs_conf.get('DEFAULT', 'use_put_mount', "no") \ + in TRUE_VALUES + except (NoSectionError, NoOptionError): + _use_put_mount = False + try: + _relaxed_writes = _fs_conf.get('DEFAULT', 'relaxed_writes', "no") \ + in TRUE_VALUES + except (NoSectionError, NoOptionError): + _relaxed_writes = False + try: + _preallocate = _fs_conf.get('DEFAULT', 'preallocate', "no") \ + in TRUE_VALUES + except (NoSectionError, NoOptionError): + _preallocate = False +else: + _mkdir_locking = False + _use_put_mount = False + _relaxed_writes = False + _preallocate = False + +if _mkdir_locking: + make_directory = _make_directory_locked +else: + make_directory = _make_directory_unlocked + + def _adjust_metadata(metadata): # Fix up the metadata to ensure it has a proper value for the # Content-Type metadata, as well as an X_TYPE and X_OBJECT_TYPE @@ -106,6 +318,11 @@ class Gluster_DiskFile(DiskFile): self.datadir = os.path.join(path, device, self.name) self.device_path = os.path.join(path, device) self._container_path = os.path.join(path, device, container) + if _use_put_mount: + self.put_datadir = os.path.join(self.device_path + '_PUT', + self.name) + else: + self.put_datadir = self.datadir self._is_dir = False self.tmppath = None self.logger = logger @@ -123,15 +340,16 @@ class Gluster_DiskFile(DiskFile): # Don't store a value for data_file until we know it exists. self.data_file = None - data_file = os.path.join(self.datadir, self._obj) + data_file = os.path.join(self.put_datadir, self._obj) try: stats = do_stat(data_file) - except OSError as ose: - if ose.errno == errno.ENOENT or ose.errno == errno.ENOTDIR: + except OSError as err: + if err.errno == errno.ENOTDIR: + return + else: + if not stats: return - else: - raise self.data_file = data_file self._is_dir = stat.S_ISDIR(stats.st_mode) @@ -162,8 +380,9 @@ class Gluster_DiskFile(DiskFile): :param verify_file: Defaults to True. If false, will not check file to see if it needs quarantining. """ - #Marker directory + # Marker directory if self._is_dir: + assert not self.fp return if self.fp: do_close(self.fp) @@ -178,20 +397,61 @@ class Gluster_DiskFile(DiskFile): """ return not self.data_file - def _create_dir_object(self, dir_path): - stats = None - try: - stats = do_stat(dir_path) - except OSError: - pass + def _create_dir_object(self, dir_path, metadata=None): + """ + Create a directory object at the specified path. No check is made to + see if the directory object already exists, that is left to the + caller (this avoids a potentially duplicate stat() system call). + + The "dir_path" must be relative to its container, self._container_path. + + The "metadata" object is an optional set of metadata to apply to the + newly created directory object. If not present, no initial metadata is + applied. + + The algorithm used is as follows: - if not stats: - mkdirs(dir_path) - do_chown(dir_path, self.uid, self.gid) - create_object_metadata(dir_path) - elif not stat.S_ISDIR(stats.st_mode): - raise DiskFileError("Cannot overwrite " - "file %s with a directory" % dir_path) + 1. An attempt is made to create the directory, assuming the parent + directory already exists + + * Directory creation races are detected, returning success in + those cases + + 2. If the directory creation fails because some part of the path to + the directory does not exist, then a search back up the path is + performed to find the first existing ancestor directory, and then + the missing parents are successively created, finally creating + the target directory + """ + full_path = os.path.join(self._container_path, dir_path) + cur_path = full_path + stack = [] + while True: + md = None if cur_path != full_path else metadata + ret, newmd = make_directory(cur_path, self.uid, self.gid, md) + if ret: + break + # Some path of the parent did not exist, so loop around and + # create that, pushing this parent on the stack. + if os.path.sep not in cur_path: + raise DiskFileError("DiskFile._create_dir_object(): failed to" + " create directory path while exhausting" + " path elements to create: %s" % full_path) + cur_path, child = cur_path.rsplit(os.path.sep, 1) + assert child + stack.append(child) + + child = stack.pop() if stack else None + while child: + cur_path = os.path.join(cur_path, child) + md = None if cur_path != full_path else metadata + ret, newmd = make_directory(cur_path, self.uid, self.gid, md) + if not ret: + raise DiskFileError("DiskFile._create_dir_object(): failed to" + " create directory path to target, %s," + " on subpath: %s" % (full_path, cur_path)) + child = stack.pop() if stack else None + return True, newmd def put_metadata(self, metadata, tombstone=False): """ @@ -227,39 +487,112 @@ class Gluster_DiskFile(DiskFile): if dir_is_object(metadata): if not self.data_file: - self.data_file = os.path.join(self.datadir, self._obj) - self._create_dir_object(self.data_file) - self.put_metadata(metadata) + # Does not exist, create it + data_file = os.path.join(self._obj_path, self._obj) + _, self.metadata = self._create_dir_object(data_file, metadata) + self.data_file = os.path.join(self._container_path, data_file) + elif not self.is_dir: + # Exists, but as a file + raise DiskFileError('DiskFile.put(): directory creation failed' + ' since the target, %s, already exists as' + ' a file' % self.data_file) return - # Check if directory already exists. if self._is_dir: # A pre-existing directory already exists on the file # system, perhaps gratuitously created when another # object was created, or created externally to Swift # REST API servicing (UFO use case). - msg = 'File object exists as a directory: %s' % self.data_file - raise AlreadyExistsAsDir(msg) + raise DiskFileError('DiskFile.put(): file creation failed since' + ' the target, %s, already exists as a' + ' directory' % self.data_file) + + # Write out metadata before fsync() to ensure it is also forced to + # disk. + write_metadata(fd, metadata) + + if not _relaxed_writes: + do_fsync(fd) + if X_CONTENT_LENGTH in metadata: + # Don't bother doing this before fsync in case the OS gets any + # ideas to issue partial writes. + fsize = int(metadata[X_CONTENT_LENGTH]) + self.drop_cache(fd, 0, fsize) + + # At this point we know that the object's full directory path exists, + # so we can just rename it directly without using Swift's + # swift.common.utils.renamer(), which makes the directory path and + # adds extra stat() calls. + data_file = os.path.join(self.put_datadir, self._obj) + while True: + try: + os.rename(self.tmppath, data_file) + except OSError as err: + if err.errno in (errno.ENOENT, errno.EIO): + # FIXME: Why either of these two error conditions is + # happening is unknown at this point. This might be a FUSE + # issue of some sort or a possible race condition. So + # let's sleep on it, and double check the environment + # after a good nap. + _random_sleep() + # Tease out why this error occurred. The man page for + # rename reads: + # "The link named by tmppath does not exist; or, a + # directory component in data_file does not exist; + # or, tmppath or data_file is an empty string." + assert len(self.tmppath) > 0 and len(data_file) > 0 + tpstats = do_stat(self.tmppath) + tfstats = do_fstat(fd) + assert tfstats + if not tpstats or tfstats.st_ino != tpstats.st_ino: + # Temporary file name conflict + raise DiskFileError('DiskFile.put(): temporary file,' + ' %s, was already renamed' + ' (targeted for %s)' % ( + self.tmppath, data_file)) + else: + # Data file target name now has a bad path! + dfstats = do_stat(self.put_datadir) + if not dfstats: + raise DiskFileError('DiskFile.put(): path to' + ' object, %s, no longer exists' + ' (targeted for %s)' % ( + self.put_datadir, + data_file)) + else: + is_dir = stat.S_ISDIR(dfstats.st_mode) + if not is_dir: + raise DiskFileError('DiskFile.put(): path to' + ' object, %s, no longer a' + ' directory (targeted for' + ' %s)' % (self.put_datadir, + data_file)) + else: + # Let's retry since everything looks okay + logging.warn("DiskFile.put(): os.rename('%s'," + "'%s') initially failed (%s) but" + " a stat('%s') following that" + " succeeded: %r" % ( + self.tmppath, data_file, + str(err), self.put_datadir, + dfstats)) + continue + else: + raise GlusterFileSystemOSError( + err.errno, "%s, os.rename('%s', '%s')" % ( + err.strerror, self.tmppath, data_file)) + else: + # Success! + break + + # Avoid the unlink() system call as part of the mkstemp context cleanup + self.tmppath = None - write_metadata(self.tmppath, metadata) - if X_CONTENT_LENGTH in metadata: - self.drop_cache(fd, 0, int(metadata[X_CONTENT_LENGTH])) - do_fsync(fd) - if self._obj_path: - dir_objs = self._obj_path.split('/') - assert len(dir_objs) >= 1 - tmp_path = self._container_path - for dir_name in dir_objs: - tmp_path = os.path.join(tmp_path, dir_name) - self._create_dir_object(tmp_path) - - do_fchown(fd, self.uid, self.gid) - newpath = os.path.join(self.datadir, self._obj) - renamer(self.tmppath, newpath) self.metadata = metadata - self.data_file = newpath self.filter_metadata() - return + + # Mark that it actually exists now + self.data_file = os.path.join(self.datadir, self._obj) def unlinkold(self, timestamp): """ @@ -313,7 +646,7 @@ class Gluster_DiskFile(DiskFile): def get_data_file_size(self): """ Returns the os_path.getsize for the file. Raises an exception if this - file does not match the Content-Length stored in the metadata. Or if + file does not match the Content-Length stored in the metadata, or if self.data_file does not exist. :returns: file size as an int @@ -346,33 +679,94 @@ class Gluster_DiskFile(DiskFile): self.metadata.pop(X_OBJECT_TYPE) @contextmanager - def mkstemp(self): - """Contextmanager to make a temporary file.""" - - # Creating intermidiate directories and corresponding metadata. - # For optimization, check if the subdirectory already exists, - # if exists, then it means that it also has its metadata. - # Not checking for container, since the container should already - # exist for the call to come here. - if not os_path.exists(self.datadir): - path = self._container_path - subdir_list = self._obj_path.split(os.path.sep) - for i in range(len(subdir_list)): - path = os.path.join(path, subdir_list[i]) - if not os_path.exists(path): - self._create_dir_object(path) - - tmpfile = '.' + self._obj + '.' + md5(self._obj + - str(random.random())).hexdigest() - - self.tmppath = os.path.join(self.datadir, tmpfile) - fd = do_open(self.tmppath, os.O_RDWR | os.O_CREAT | os.O_EXCL) + def mkstemp(self, size=None): + """ + Contextmanager to make a temporary file, optionally of a specified + initial size. + + For Gluster, we first optimistically create the temporary file using + the "rsync-friendly" .NAME.random naming. If we find that some path to + the file does not exist, we then create that path and then create the + temporary file again. If we get file name conflict, we'll retry using + different random suffixes 1,000 times before giving up. + """ + data_file = os.path.join(self.put_datadir, self._obj) + + # Assume the full directory path exists to the file already, and + # construct the proper name for the temporary file. + for i in range(0, 1000): + tmpfile = '.' + self._obj + '.' + md5(self._obj + + str(random.random())).hexdigest() + tmppath = os.path.join(self.put_datadir, tmpfile) + try: + fd = do_open(tmppath, + os.O_WRONLY | os.O_CREAT | os.O_EXCL | O_CLOEXEC) + except GlusterFileSystemOSError as gerr: + if gerr.errno == errno.EEXIST: + # Retry with a different random number. + continue + if gerr.errno == errno.EIO: + # FIXME: Possible FUSE issue or race condition, let's + # sleep on it and retry the operation. + _random_sleep() + logging.warn("DiskFile.mkstemp(): %s ... retrying in" + " 0.1 secs", gerr) + continue + if gerr.errno != errno.ENOENT: + # FIXME: Other cases we should handle? + raise + if not self._obj_path: + # No directory hierarchy and the create failed telling us + # the container or volume directory does not exist. This + # could be a FUSE issue or some race condition, so let's + # sleep a bit and retry. + _random_sleep() + logging.warn("DiskFile.mkstemp(): %s ... retrying in" + " 0.1 secs", gerr) + continue + if i != 0: + # Got ENOENT after previously making the path. This could + # also be a FUSE issue or some race condition, nap and + # retry. + _random_sleep() + logging.warn("DiskFile.mkstemp(): %s ... retrying in" + " 0.1 secs" % gerr) + continue + # It looks like the path to the object does not already exist + self._create_dir_object(self._obj_path) + continue + else: + break + else: + # We failed after 1,000 attempts to create the temporary file. + raise DiskFileError('DiskFile.mkstemp(): failed to successfully' + ' create a temporary file without running' + ' into a name conflict after 1,000 attempts' + ' for: %s' % (data_file,)) + + self.tmppath = tmppath + try: + # Ensure it is properly owned before we make it available. + do_fchown(fd, self.uid, self.gid) + if _preallocate and size: + # For XFS, fallocate() turns off speculative pre-allocation + # until a write is issued either to the last block of the file + # before the EOF or beyond the EOF. This means that we are + # less likely to fragment free space with pre-allocated + # extents that get truncated back to the known file size. + # However, this call also turns holes into allocated but + # unwritten extents, so that allocation occurs before the + # write, not during XFS writeback. This effectively defeats + # any allocation optimizations the filesystem can make at + # writeback time. + fallocate(fd, size) yield fd finally: try: do_close(fd) except OSError: pass - tmppath, self.tmppath = self.tmppath, None - do_unlink(tmppath) + if self.tmppath: + tmppath, self.tmppath = self.tmppath, None + do_unlink(tmppath) |