diff options
Diffstat (limited to 'ufo/gluster')
-rw-r--r-- | ufo/gluster/__init__.py | 0 | ||||
-rw-r--r-- | ufo/gluster/swift/__init__.py | 18 | ||||
-rw-r--r-- | ufo/gluster/swift/account/__init__.py | 0 | ||||
-rw-r--r-- | ufo/gluster/swift/account/server.py | 45 | ||||
-rw-r--r-- | ufo/gluster/swift/common/DiskDir.py | 500 | ||||
-rw-r--r-- | ufo/gluster/swift/common/DiskFile.py | 323 | ||||
-rw-r--r-- | ufo/gluster/swift/common/Glusterfs.py | 126 | ||||
-rw-r--r-- | ufo/gluster/swift/common/__init__.py | 0 | ||||
-rw-r--r-- | ufo/gluster/swift/common/constraints.py | 82 | ||||
-rw-r--r-- | ufo/gluster/swift/common/fs_utils.py | 156 | ||||
-rw-r--r-- | ufo/gluster/swift/common/middleware/__init__.py | 0 | ||||
-rw-r--r-- | ufo/gluster/swift/common/middleware/gluster.py | 40 | ||||
-rw-r--r-- | ufo/gluster/swift/common/utils.py | 457 | ||||
-rw-r--r-- | ufo/gluster/swift/container/__init__.py | 0 | ||||
-rw-r--r-- | ufo/gluster/swift/container/server.py | 46 | ||||
-rw-r--r-- | ufo/gluster/swift/obj/__init__.py | 0 | ||||
-rw-r--r-- | ufo/gluster/swift/obj/server.py | 33 | ||||
-rw-r--r-- | ufo/gluster/swift/proxy/__init__.py | 0 | ||||
-rw-r--r-- | ufo/gluster/swift/proxy/server.py | 27 |
19 files changed, 1853 insertions, 0 deletions
diff --git a/ufo/gluster/__init__.py b/ufo/gluster/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/ufo/gluster/__init__.py diff --git a/ufo/gluster/swift/__init__.py b/ufo/gluster/swift/__init__.py new file mode 100644 index 00000000000..17578ebee40 --- /dev/null +++ b/ufo/gluster/swift/__init__.py @@ -0,0 +1,18 @@ +""" Gluster Swift UFO """ + +class Version(object): + def __init__(self, canonical_version, final): + self.canonical_version = canonical_version + self.final = final + + @property + def pretty_version(self): + if self.final: + return self.canonical_version + else: + return '%s-dev' % (self.canonical_version,) + + +_version = Version('1.1', True) +__version__ = _version.pretty_version +__canonical_version__ = _version.canonical_version diff --git a/ufo/gluster/swift/account/__init__.py b/ufo/gluster/swift/account/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/ufo/gluster/swift/account/__init__.py diff --git a/ufo/gluster/swift/account/server.py b/ufo/gluster/swift/account/server.py new file mode 100644 index 00000000000..8b98315790a --- /dev/null +++ b/ufo/gluster/swift/account/server.py @@ -0,0 +1,45 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Account Server for Gluster Swift UFO """ + +# Simply importing this monkey patches the constraint handling to fit our +# needs +import gluster.swift.common.constraints + +from swift.account import server +from gluster.swift.common.DiskDir import DiskAccount + + +class AccountController(server.AccountController): + def _get_account_broker(self, drive, part, account): + """ + Overriden to provide the GlusterFS specific broker that talks to + Gluster for the information related to servicing a given request + instead of talking to a database. + + :param drive: drive that holds the container + :param part: partition the container is in + :param account: account name + :returns: DiskDir object + """ + return DiskAccount(self.root, account, self.logger) + + +def app_factory(global_conf, **local_conf): + """paste.deploy app factory for creating WSGI account server apps.""" + conf = global_conf.copy() + conf.update(local_conf) + return AccountController(conf) diff --git a/ufo/gluster/swift/common/DiskDir.py b/ufo/gluster/swift/common/DiskDir.py new file mode 100644 index 00000000000..8602fabae83 --- /dev/null +++ b/ufo/gluster/swift/common/DiskDir.py @@ -0,0 +1,500 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os, errno + +from gluster.swift.common.utils import clean_metadata, dir_empty, rmdirs, \ + mkdirs, validate_account, validate_container, is_marker, \ + get_container_details, get_account_details, get_container_metadata, \ + create_container_metadata, create_account_metadata, DEFAULT_GID, \ + DEFAULT_UID, validate_object, create_object_metadata, read_metadata, \ + write_metadata, X_CONTENT_TYPE, X_CONTENT_LENGTH, X_TIMESTAMP, \ + X_PUT_TIMESTAMP, X_TYPE, X_ETAG, X_OBJECTS_COUNT, X_BYTES_USED, \ + X_CONTAINER_COUNT, CONTAINER +from gluster.swift.common import Glusterfs + +from swift.common.constraints import CONTAINER_LISTING_LIMIT +from swift.common.utils import normalize_timestamp, TRUE_VALUES + + +DATADIR = 'containers' + +# Create a dummy db_file in /etc/swift +_unittests_enabled = os.getenv('GLUSTER_UNIT_TEST_ENABLED', 'no') +if _unittests_enabled in TRUE_VALUES: + _tmp_dir = '/tmp/gluster_unit_tests' + try: + os.mkdir(_tmp_dir) + except OSError as e: + if e.errno != errno.EEXIST: + raise + _db_file = os.path.join(_tmp_dir, 'db_file.db') +else: + _db_file = '/etc/swift/db_file.db' +if not os.path.exists(_db_file): + file(_db_file, 'w+') + + +def _read_metadata(dd): + """ Filter read metadata so that it always returns a tuple that includes + some kind of timestamp. With 1.4.8 of the Swift integration the + timestamps were not stored. Here we fabricate timestamps for volumes + where the existing data has no timestamp (that is, stored data is not + a tuple), allowing us a measure of backward compatibility. + + FIXME: At this time it does not appear that the timestamps on each + metadata are used for much, so this should not hurt anything. + """ + metadata_i = read_metadata(dd) + metadata = {} + timestamp = 0 + for key, value in metadata_i.iteritems(): + if not isinstance(value, tuple): + value = (value, timestamp) + metadata[key] = value + return metadata + + +class DiskCommon(object): + def is_deleted(self): + return not os.path.exists(self.datadir) + + def filter_prefix(self, objects, prefix): + """ + Accept sorted list. + """ + found = 0 + filtered_objs = [] + for object_name in objects: + if object_name.startswith(prefix): + filtered_objs.append(object_name) + found = 1 + else: + if found: + break + return filtered_objs + + def filter_delimiter(self, objects, delimiter, prefix): + """ + Accept sorted list. + Objects should start with prefix. + """ + filtered_objs=[] + for object_name in objects: + tmp_obj = object_name.replace(prefix, '', 1) + sufix = tmp_obj.split(delimiter, 1) + new_obj = prefix + sufix[0] + if new_obj and new_obj not in filtered_objs: + filtered_objs.append(new_obj) + + return filtered_objs + + def filter_marker(self, objects, marker): + """ + TODO: We can traverse in reverse order to optimize. + Accept sorted list. + """ + filtered_objs=[] + found = 0 + if objects[-1] < marker: + return filtered_objs + for object_name in objects: + if object_name > marker: + filtered_objs.append(object_name) + + return filtered_objs + + def filter_end_marker(self, objects, end_marker): + """ + Accept sorted list. + """ + filtered_objs=[] + for object_name in objects: + if object_name < end_marker: + filtered_objs.append(object_name) + else: + break + + return filtered_objs + + def filter_limit(self, objects, limit): + filtered_objs=[] + for i in range(0, limit): + filtered_objs.append(objects[i]) + + return filtered_objs + + +class DiskDir(DiskCommon): + """ + Manage object files on disk. + + :param path: path to devices on the node + :param account: account name for the object + :param container: container name for the object + :param logger: account or container server logging object + :param uid: user ID container object should assume + :param gid: group ID container object should assume + """ + + def __init__(self, path, account, container, logger, + uid=DEFAULT_UID, gid=DEFAULT_GID): + self.root = path + if container: + self.container = container + else: + self.container = None + if self.container: + self.datadir = os.path.join(path, account, self.container) + else: + self.datadir = os.path.join(path, account) + # Note that the account name has a one-to-one mapping to the gluster + # mount point, or volume name. + self.account = account + assert logger is not None + self.logger = logger + self.metadata = {} + self.container_info = None + self.object_info = None + self.uid = int(uid) + self.gid = int(gid) + self.db_file = _db_file + self.dir_exists = os.path.exists(self.datadir) + if self.dir_exists: + try: + self.metadata = _read_metadata(self.datadir) + except EOFError: + create_container_metadata(self.datadir) + else: + return + if self.container: + if not self.metadata: + create_container_metadata(self.datadir) + self.metadata = _read_metadata(self.datadir) + else: + if not validate_container(self.metadata): + create_container_metadata(self.datadir) + self.metadata = _read_metadata(self.datadir) + else: + if not self.metadata: + create_account_metadata(self.datadir) + self.metadata = _read_metadata(self.datadir) + else: + if not validate_account(self.metadata): + create_account_metadata(self.datadir) + self.metadata = _read_metadata(self.datadir) + + def empty(self): + return dir_empty(self.datadir) + + def delete(self): + if self.empty(): + #For delete account. + if os.path.ismount(self.datadir): + clean_metadata(self.datadir) + else: + rmdirs(self.datadir) + self.dir_exists = False + + def put_metadata(self, metadata): + """ + Write metadata to directory/container. + """ + write_metadata(self.datadir, metadata) + self.metadata = metadata + + def put(self, metadata): + """ + Create and write metatdata to directory/container. + :param metadata: Metadata to write. + """ + if not self.dir_exists: + mkdirs(self.datadir) + + os.chown(self.datadir, self.uid, self.gid) + write_metadata(self.datadir, metadata) + self.metadata = metadata + self.dir_exists = True + + def put_obj(self, content_length, timestamp): + ocnt = self.metadata[X_OBJECTS_COUNT][0] + self.metadata[X_OBJECTS_COUNT] = (int(ocnt) + 1, timestamp) + self.metadata[X_PUT_TIMESTAMP] = timestamp + bused = self.metadata[X_BYTES_USED][0] + self.metadata[X_BYTES_USED] = (int(bused) + int(content_length), timestamp) + #TODO: define update_metadata instad of writing whole metadata again. + self.put_metadata(self.metadata) + + def delete_obj(self, content_length): + ocnt, timestamp = self.metadata[X_OBJECTS_COUNT][0] + self.metadata[X_OBJECTS_COUNT] = (int(ocnt) - 1, timestamp) + bused, timestamp = self.metadata[X_BYTES_USED] + self.metadata[X_BYTES_USED] = (int(bused) - int(content_length), timestamp) + self.put_metadata(self.metadata) + + def put_container(self, container, put_timestamp, del_timestamp, object_count, bytes_used): + """ + For account server. + """ + self.metadata[X_OBJECTS_COUNT] = (0, put_timestamp) + self.metadata[X_BYTES_USED] = (0, put_timestamp) + ccnt = self.metadata[X_CONTAINER_COUNT][0] + self.metadata[X_CONTAINER_COUNT] = (int(ccnt) + 1, put_timestamp) + self.metadata[X_PUT_TIMESTAMP] = (1, put_timestamp) + self.put_metadata(self.metadata) + + def delete_container(self, object_count, bytes_used): + """ + For account server. + """ + self.metadata[X_OBJECTS_COUNT] = (0, 0) + self.metadata[X_BYTES_USED] = (0, 0) + ccnt, timestamp = self.metadata[X_CONTAINER_COUNT] + self.metadata[X_CONTAINER_COUNT] = (int(ccnt) - 1, timestamp) + self.put_metadata(self.metadata) + + def unlink(self): + """ + Remove directory/container if empty. + """ + if dir_empty(self.datadir): + rmdirs(self.datadir) + + def list_objects_iter(self, limit, marker, end_marker, + prefix, delimiter, path): + """ + Returns tuple of name, created_at, size, content_type, etag. + """ + if path: + prefix = path = path.rstrip('/') + '/' + delimiter = '/' + if delimiter and not prefix: + prefix = '' + + self.update_object_count() + + objects, object_count, bytes_used = self.object_info + + if objects: + objects.sort() + + if objects and prefix: + objects = self.filter_prefix(objects, prefix) + + if objects and delimiter: + objects = self.filter_delimiter(objects, delimiter, prefix) + + if objects and marker: + objects = self.filter_marker(objects, marker) + + if objects and end_marker: + objects = self.filter_end_marker(objects, end_marker) + + if objects and limit: + if len(objects) > limit: + objects = self.filter_limit(objects, limit) + + container_list = [] + if objects: + for obj in objects: + list_item = [] + list_item.append(obj) + obj_path = os.path.join(self.datadir, obj) + metadata = read_metadata(obj_path) + if not metadata or not validate_object(metadata): + metadata = create_object_metadata(obj_path) + if metadata: + list_item.append(metadata[X_TIMESTAMP]) + list_item.append(int(metadata[X_CONTENT_LENGTH])) + list_item.append(metadata[X_CONTENT_TYPE]) + list_item.append(metadata[X_ETAG]) + container_list.append(list_item) + + return container_list + + def update_object_count(self): + if not self.object_info: + self.object_info = get_container_details(self.datadir) + + objects, object_count, bytes_used = self.object_info + + if X_OBJECTS_COUNT not in self.metadata \ + or int(self.metadata[X_OBJECTS_COUNT][0]) != object_count \ + or X_BYTES_USED not in self.metadata \ + or int(self.metadata[X_BYTES_USED][0]) != bytes_used: + self.metadata[X_OBJECTS_COUNT] = (object_count, 0) + self.metadata[X_BYTES_USED] = (bytes_used, 0) + write_metadata(self.datadir, self.metadata) + + def update_container_count(self): + if not self.container_info: + self.container_info = get_account_details(self.datadir) + + containers, container_count = self.container_info + + if X_CONTAINER_COUNT not in self.metadata \ + or int(self.metadata[X_CONTAINER_COUNT][0]) != container_count: + self.metadata[X_CONTAINER_COUNT] = (container_count, 0) + write_metadata(self.datadir, self.metadata) + + def get_info(self, include_metadata=False): + """ + Get global data for the container. + :returns: dict with keys: account, container, object_count, bytes_used, + hash, id, created_at, put_timestamp, delete_timestamp, + reported_put_timestamp, reported_delete_timestamp, + reported_object_count, and reported_bytes_used. + If include_metadata is set, metadata is included as a key + pointing to a dict of tuples of the metadata + """ + # TODO: delete_timestamp, reported_put_timestamp + # reported_delete_timestamp, reported_object_count, + # reported_bytes_used, created_at + if not Glusterfs.OBJECT_ONLY: + # If we are not configured for object only environments, we should + # update the object counts in case they changed behind our back. + self.update_object_count() + + data = {'account' : self.account, 'container' : self.container, + 'object_count' : self.metadata.get(X_OBJECTS_COUNT, ('0', 0))[0], + 'bytes_used' : self.metadata.get(X_BYTES_USED, ('0',0))[0], + 'hash': '', 'id' : '', 'created_at' : '1', + 'put_timestamp' : self.metadata.get(X_PUT_TIMESTAMP, ('0',0))[0], + 'delete_timestamp' : '1', + 'reported_put_timestamp' : '1', 'reported_delete_timestamp' : '1', + 'reported_object_count' : '1', 'reported_bytes_used' : '1'} + if include_metadata: + data['metadata'] = self.metadata + return data + + def put_object(self, name, timestamp, size, content_type, + etag, deleted=0): + # TODO: Implement the specifics of this func. + pass + + def initialize(self, timestamp): + pass + + def update_put_timestamp(self, timestamp): + """ + Create the container if it doesn't exist and update the timestamp + """ + if not os.path.exists(self.datadir): + self.put(self.metadata) + + def delete_object(self, name, timestamp): + # TODO: Implement the delete object + pass + + def delete_db(self, timestamp): + """ + Delete the container + """ + self.unlink() + + def update_metadata(self, metadata): + assert self.metadata, "Valid container/account metadata should have been created by now" + if metadata: + new_metadata = self.metadata.copy() + new_metadata.update(metadata) + if new_metadata != self.metadata: + write_metadata(self.datadir, new_metadata) + self.metadata = new_metadata + + +class DiskAccount(DiskDir): + def __init__(self, root, account, logger): + super(DiskAccount, self).__init__(root, account, None, logger) + assert self.dir_exists + + def list_containers_iter(self, limit, marker, end_marker, + prefix, delimiter): + """ + Return tuple of name, object_count, bytes_used, 0(is_subdir). + Used by account server. + """ + if delimiter and not prefix: + prefix = '' + + self.update_container_count() + + containers, container_count = self.container_info + + if containers: + containers.sort() + + if containers and prefix: + containers = self.filter_prefix(containers, prefix) + + if containers and delimiter: + containers = self.filter_delimiter(containers, delimiter, prefix) + + if containers and marker: + containers = self.filter_marker(containers, marker) + + if containers and end_marker: + containers = self.filter_end_marker(containers, end_marker) + + if containers and limit: + if len(containers) > limit: + containers = self.filter_limit(containers, limit) + + account_list = [] + if containers: + for cont in containers: + list_item = [] + metadata = None + list_item.append(cont) + cont_path = os.path.join(self.datadir, cont) + metadata = _read_metadata(cont_path) + if not metadata or not validate_container(metadata): + metadata = create_container_metadata(cont_path) + + if metadata: + list_item.append(metadata[X_OBJECTS_COUNT][0]) + list_item.append(metadata[X_BYTES_USED][0]) + list_item.append(0) + account_list.append(list_item) + + return account_list + + def get_info(self, include_metadata=False): + """ + Get global data for the account. + :returns: dict with keys: account, created_at, put_timestamp, + delete_timestamp, container_count, object_count, + bytes_used, hash, id + """ + if not Glusterfs.OBJECT_ONLY: + # If we are not configured for object only environments, we should + # update the container counts in case they changed behind our back. + self.update_container_count() + + data = {'account' : self.account, 'created_at' : '1', + 'put_timestamp' : '1', 'delete_timestamp' : '1', + 'container_count' : self.metadata.get(X_CONTAINER_COUNT, (0,0))[0], + 'object_count' : self.metadata.get(X_OBJECTS_COUNT, (0,0))[0], + 'bytes_used' : self.metadata.get(X_BYTES_USED, (0,0))[0], + 'hash' : '', 'id' : ''} + + if include_metadata: + data['metadata'] = self.metadata + return data + + def get_container_timestamp(self, container): + cont_path = os.path.join(self.datadir, container) + metadata = read_metadata(cont_path) + + return int(metadata.get(X_PUT_TIMESTAMP, ('0',0))[0]) or None diff --git a/ufo/gluster/swift/common/DiskFile.py b/ufo/gluster/swift/common/DiskFile.py new file mode 100644 index 00000000000..6404be6d68f --- /dev/null +++ b/ufo/gluster/swift/common/DiskFile.py @@ -0,0 +1,323 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from eventlet import tpool +from tempfile import mkstemp +from contextlib import contextmanager +from swift.common.utils import normalize_timestamp, renamer +from gluster.swift.common.utils import mkdirs, rmdirs, validate_object, \ + create_object_metadata, do_open, do_close, do_unlink, do_chown, \ + do_stat, do_listdir, read_metadata, write_metadata +from gluster.swift.common.utils import X_CONTENT_TYPE, X_CONTENT_LENGTH, \ + X_TIMESTAMP, X_PUT_TIMESTAMP, X_TYPE, X_ETAG, X_OBJECTS_COUNT, \ + X_BYTES_USED, X_OBJECT_TYPE, FILE, DIR, MARKER_DIR, OBJECT, DIR_TYPE, \ + FILE_TYPE, DEFAULT_UID, DEFAULT_GID + +import logging +from swift.obj.server import DiskFile + + +DATADIR = 'objects' +ASYNCDIR = 'async_pending' +KEEP_CACHE_SIZE = (5 * 1024 * 1024) +# keep these lower-case +DISALLOWED_HEADERS = set('content-length content-type deleted etag'.split()) + + +class Gluster_DiskFile(DiskFile): + """ + Manage object files on disk. + + :param path: path to devices on the node/mount path for UFO. + :param device: device name/account_name for UFO. + :param partition: partition on the device the object lives in + :param account: account name for the object + :param container: container name for the object + :param obj: object name for the object + :param keep_data_fp: if True, don't close the fp, otherwise close it + :param disk_chunk_Size: size of chunks on file reads + :param uid: user ID disk object should assume (file or directory) + :param gid: group ID disk object should assume (file or directory) + """ + + def __init__(self, path, device, partition, account, container, obj, + logger, keep_data_fp=False, disk_chunk_size=65536, + uid=DEFAULT_UID, gid=DEFAULT_GID): + self.disk_chunk_size = disk_chunk_size + device = account + #Don't support obj_name ending/begining with '/', like /a, a/, /a/b/ etc + obj = obj.strip('/') + if '/' in obj: + self.obj_path, self.obj = obj.rsplit('/', 1) + else: + self.obj_path = '' + self.obj = obj + + if self.obj_path: + self.name = '/'.join((container, self.obj_path)) + else: + self.name = container + #Absolute path for obj directory. + self.datadir = os.path.join(path, device, self.name) + self.device_path = os.path.join(path, device) + self.container_path = os.path.join(path, device, container) + self.tmpdir = os.path.join(path, device, 'tmp') + self.logger = logger + self.metadata = {} + self.data_file = None + self.fp = None + self.iter_etag = None + self.started_at_0 = False + self.read_to_eof = False + self.quarantined_dir = None + self.keep_cache = False + self.is_dir = False + self.is_valid = True + self.uid = int(uid) + self.gid = int(gid) + if not os.path.exists(self.datadir + '/' + self.obj): + return + + self.data_file = os.path.join(self.datadir, self.obj) + self.metadata = read_metadata(self.datadir + '/' + self.obj) + if not self.metadata: + create_object_metadata(self.datadir + '/' + self.obj) + self.metadata = read_metadata(self.datadir + '/' + self.obj) + + if not validate_object(self.metadata): + create_object_metadata(self.datadir + '/' + self.obj) + self.metadata = read_metadata(self.datadir + '/' + + self.obj) + + self.filter_metadata() + + if os.path.isdir(self.datadir + '/' + self.obj): + self.is_dir = True + else: + self.fp = do_open(self.data_file, 'rb') + if not keep_data_fp: + self.close(verify_file=False) + + def close(self, verify_file=True): + """ + Close the file. Will handle quarantining file if necessary. + + :param verify_file: Defaults to True. If false, will not check + file to see if it needs quarantining. + """ + #Marker directory + if self.is_dir: + return + if self.fp: + do_close(self.fp) + self.fp = None + + def is_deleted(self): + """ + Check if the file is deleted. + + :returns: True if the file doesn't exist or has been flagged as + deleted. + """ + return not self.data_file + + def create_dir_object(self, dir_path): + #TODO: if object already exists??? + if os.path.exists(dir_path) and not os.path.isdir(dir_path): + self.logger.error("Deleting file %s", dir_path) + do_unlink(dir_path) + #If dir aleady exist just override metadata. + mkdirs(dir_path) + do_chown(dir_path, self.uid, self.gid) + create_object_metadata(dir_path) + return True + + def put_metadata(self, metadata): + obj_path = self.datadir + '/' + self.obj + write_metadata(obj_path, metadata) + self.metadata = metadata + + def put(self, fd, tmppath, metadata, extension=''): + """ + Finalize writing the file on disk, and renames it from the temp file to + the real location. This should be called after the data has been + written to the temp file. + + :params fd: file descriptor of the temp file + :param tmppath: path to the temporary file being used + :param metadata: dictionary of metadata to be written + :param extention: extension to be used when making the file + """ + if extension == '.ts': + # TombStone marker (deleted) + return True + + # Fix up the metadata to ensure it has a proper value for the + # Content-Type metadata, as well as an X_TYPE and X_OBJECT_TYPE + # metadata values. + + content_type = metadata['Content-Type'] + if not content_type: + metadata['Content-Type'] = FILE_TYPE + x_object_type = FILE + else: + x_object_type = MARKER_DIR if content_type.lower() == DIR_TYPE else FILE + metadata[X_TYPE] = OBJECT + metadata[X_OBJECT_TYPE] = x_object_type + + if extension == '.meta': + # Metadata recorded separately from the file + self.put_metadata(metadata) + return True + + extension = '' + + if metadata[X_OBJECT_TYPE] == MARKER_DIR: + self.create_dir_object(os.path.join(self.datadir, self.obj)) + self.put_metadata(metadata) + self.data_file = self.datadir + '/' + self.obj + return True + + # Check if directory already exists. + if self.is_dir: + self.logger.error('Directory already exists %s/%s' % \ + (self.datadir , self.obj)) + return False + + timestamp = normalize_timestamp(metadata[X_TIMESTAMP]) + write_metadata(tmppath, metadata) + if X_CONTENT_LENGTH in metadata: + self.drop_cache(fd, 0, int(metadata[X_CONTENT_LENGTH])) + tpool.execute(os.fsync, fd) + if self.obj_path: + dir_objs = self.obj_path.split('/') + tmp_path = '' + if len(dir_objs): + for dir_name in dir_objs: + if tmp_path: + tmp_path = tmp_path + '/' + dir_name + else: + tmp_path = dir_name + if not self.create_dir_object(os.path.join(self.container_path, + tmp_path)): + self.logger.error("Failed in subdir %s",\ + os.path.join(self.container_path,tmp_path)) + return False + + renamer(tmppath, os.path.join(self.datadir, + self.obj + extension)) + do_chown(os.path.join(self.datadir, self.obj + extension), \ + self.uid, self.gid) + self.metadata = metadata + self.data_file = self.datadir + '/' + self.obj + extension + return True + + def unlinkold(self, timestamp): + """ + Remove any older versions of the object file. Any file that has an + older timestamp than timestamp will be deleted. + + :param timestamp: timestamp to compare with each file + """ + if self.metadata and self.metadata['X-Timestamp'] != timestamp: + self.unlink() + + def unlink(self): + """ + Remove the file. + """ + #Marker dir. + if self.is_dir: + rmdirs(os.path.join(self.datadir, self.obj)) + if not os.path.isdir(os.path.join(self.datadir, self.obj)): + self.metadata = {} + self.data_file = None + else: + logging.error('Unable to delete dir %s' % os.path.join(self.datadir, self.obj)) + return + + for fname in do_listdir(self.datadir): + if fname == self.obj: + try: + do_unlink(os.path.join(self.datadir, fname)) + except OSError, err: + if err.errno != errno.ENOENT: + raise + + #Remove entire path for object. + #remove_dir_path(self.obj_path, self.container_path) + + self.metadata = {} + self.data_file = None + + def get_data_file_size(self): + """ + Returns the os.path.getsize for the file. Raises an exception if this + file does not match the Content-Length stored in the metadata. Or if + self.data_file does not exist. + + :returns: file size as an int + :raises DiskFileError: on file size mismatch. + :raises DiskFileNotExist: on file not existing (including deleted) + """ + #Marker directory. + if self.is_dir: + return 0 + try: + file_size = 0 + if self.data_file: + file_size = os.path.getsize(self.data_file) + if X_CONTENT_LENGTH in self.metadata: + metadata_size = int(self.metadata[X_CONTENT_LENGTH]) + if file_size != metadata_size: + self.metadata[X_CONTENT_LENGTH] = file_size + self.update_object(self.metadata) + + return file_size + except OSError, err: + if err.errno != errno.ENOENT: + raise + raise DiskFileNotExist('Data File does not exist.') + + def update_object(self, metadata): + obj_path = self.datadir + '/' + self.obj + write_metadata(obj_path, metadata) + self.metadata = metadata + + def filter_metadata(self): + if X_TYPE in self.metadata: + self.metadata.pop(X_TYPE) + if X_OBJECT_TYPE in self.metadata: + self.metadata.pop(X_OBJECT_TYPE) + + @contextmanager + def mkstemp(self): + """Contextmanager to make a temporary file.""" + + if not os.path.exists(self.tmpdir): + mkdirs(self.tmpdir) + fd, tmppath = mkstemp(dir=self.tmpdir) + try: + yield fd, tmppath + finally: + try: + os.close(fd) + except OSError: + pass + try: + os.unlink(tmppath) + except OSError: + pass diff --git a/ufo/gluster/swift/common/Glusterfs.py b/ufo/gluster/swift/common/Glusterfs.py new file mode 100644 index 00000000000..2d3273ed429 --- /dev/null +++ b/ufo/gluster/swift/common/Glusterfs.py @@ -0,0 +1,126 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import os, fcntl, time +from ConfigParser import ConfigParser +from swift.common.utils import TRUE_VALUES +from gluster.swift.common.fs_utils import mkdirs + + +# +# Read the fs.conf file once at startup (module load) +# +_fs_conf = ConfigParser() +MOUNT_IP = 'localhost' +REMOTE_CLUSTER = False +OBJECT_ONLY = False +if _fs_conf.read(os.path.join('/etc/swift', 'fs.conf')): + try: + MOUNT_IP = _fs_conf.get('DEFAULT', 'mount_ip', 'localhost') + except (NoSectionError, NoOptionError): + pass + try: + REMOTE_CLUSTER = _fs_conf.get('DEFAULT', 'remote_cluster', False) in TRUE_VALUES + except (NoSectionError, NoOptionError): + pass + try: + OBJECT_ONLY = _fs_conf.get('DEFAULT', 'object_only', "no") in TRUE_VALUES + except (NoSectionError, NoOptionError): + pass +NAME = 'glusterfs' + + +def _busy_wait(full_mount_path): + # Iterate for definite number of time over a given + # interval for successful mount + for i in range(0, 5): + if os.path.ismount(os.path.join(full_mount_path)): + return True + time.sleep(2) + logging.error('Busy wait for mount timed out for mount %s', full_mount_path) + return False + +def mount(root, drive): + # FIXME: Possible thundering herd problem here + + el = _get_export_list() + for export in el: + if drive == export: + break + else: + logging.error('No export found in %r matching drive %s', el, drive) + return False + + # NOTE: root is typically the default value of /mnt/gluster-object + full_mount_path = os.path.join(root, drive) + if not os.path.isdir(full_mount_path): + mkdirs(full_mount_path) + + pid_dir = "/var/lib/glusterd/vols/%s/run/" % drive + pid_file = os.path.join(pid_dir, 'swift.pid'); + + if not os.path.exists(pid_dir): + mkdirs(pid_dir) + + fd = os.open(pid_file, os.O_CREAT|os.O_RDWR) + with os.fdopen(fd, 'r+b') as f: + try: + fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB) + except: + ex = sys.exc_info()[1] + if isinstance(ex, IOError) and ex.errno in (EACCES, EAGAIN): + # This means that some other process is mounting the + # filesystem, so wait for the mount process to complete + return _busy_wait(full_mount_path) + + mnt_cmd = 'mount -t glusterfs %s:%s %s' % (MOUNT_IP, export, \ + full_mount_path) + if os.system(mnt_cmd) or not _busy_wait(full_mount_path): + logging.error('Mount failed %s: %s', NAME, mnt_cmd) + return False + return True + +def unmount(full_mount_path): + # FIXME: Possible thundering herd problem here + + umnt_cmd = 'umount %s 2>> /dev/null' % full_mount_path + if os.system(umnt_cmd): + logging.error('Unable to unmount %s %s' % (full_mount_path, NAME)) + +def _get_export_list(): + if REMOTE_CLUSTER: + cmnd = 'ssh %s gluster volume info' % MOUNT_IP + else: + cmnd = 'gluster volume info' + + export_list = [] + + if os.system(cmnd + ' >> /dev/null'): + if REMOTE_CLUSTER: + logging.error('Getting volume info failed %s, make sure to have '\ + 'passwordless ssh on %s', NAME, MOUNT_IP) + else: + logging.error('Getting volume failed %s', NAME) + else: + fp = os.popen(cmnd) + while True: + item = fp.readline() + if not item: + break + item = item.strip('\n').strip(' ') + if item.lower().startswith('volume name:'): + export_list.append(item.split(':')[1].strip(' ')) + + return export_list diff --git a/ufo/gluster/swift/common/__init__.py b/ufo/gluster/swift/common/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/ufo/gluster/swift/common/__init__.py diff --git a/ufo/gluster/swift/common/constraints.py b/ufo/gluster/swift/common/constraints.py new file mode 100644 index 00000000000..a4fc8008c7e --- /dev/null +++ b/ufo/gluster/swift/common/constraints.py @@ -0,0 +1,82 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from webob.exc import HTTPBadRequest + +import swift.common.constraints +from gluster.swift.common import Glusterfs + + +MAX_OBJECT_NAME_COMPONENT_LENGTH = swift.common.constraints.constraints_conf_int( + 'max_object_name_component_length', 255) + +def validate_obj_name_component(obj): + if len(obj) > MAX_OBJECT_NAME_COMPONENT_LENGTH: + return 'too long (%d)' % len(obj) + if obj == '.' or obj == '..': + return 'cannot be . or ..' + return '' + +# Save the original check object creation +__check_object_creation = swift.common.constraints.check_object_creation + +# Define our new one which invokes the original +def gluster_check_object_creation(req, object_name): + """ + Check to ensure that everything is alright about an object to be created. + Monkey patches swift.common.constraints.check_object_creation, invoking + the original, and then adding an additional check for individual object + name components. + + :param req: HTTP request object + :param object_name: name of object to be created + :raises HTTPRequestEntityTooLarge: the object is too large + :raises HTTPLengthRequered: missing content-length header and not + a chunked request + :raises HTTPBadRequest: missing or bad content-type header, or + bad metadata + """ + ret = __check_object_creation(req, object_name) + + if ret is None: + for obj in object_name.split('/'): + reason = validate_obj_name_component(obj) + if reason: + bdy = 'Invalid object name "%s", component "%s" %s' \ + % (object_name, obj, reason) + ret = HTTPBadRequest(body=bdy, + request=req, + content_type='text/plain') + + return ret + +# Replace the original check object creation with ours +swift.common.constraints.check_object_creation = gluster_check_object_creation + +# Save the original check mount +__check_mount = swift.common.constraints.check_mount + +# Define our new one which invokes the original +def gluster_check_mount(root, drive): + # FIXME: Potential performance optimization here to not call the original + # check mount which makes two stat calls. We could do what they do with + # just one. + if __check_mount(root, drive): + return True + + return Glusterfs.mount(root, drive) + +# Replace the original check mount with ours +swift.common.constraints.check_mount = gluster_check_mount diff --git a/ufo/gluster/swift/common/fs_utils.py b/ufo/gluster/swift/common/fs_utils.py new file mode 100644 index 00000000000..7f5292c2bf1 --- /dev/null +++ b/ufo/gluster/swift/common/fs_utils.py @@ -0,0 +1,156 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import errno + +def do_mkdir(path): + try: + os.mkdir(path) + except Exception, err: + logging.exception("Mkdir failed on %s err: %s", path, str(err)) + if err.errno != errno.EEXIST: + raise + return True + +def do_makedirs(path): + try: + os.makedirs(path) + except Exception, err: + logging.exception("Makedirs failed on %s err: %s", path, str(err)) + if err.errno != errno.EEXIST: + raise + return True + +def do_listdir(path): + try: + buf = os.listdir(path) + except Exception, err: + logging.exception("Listdir failed on %s err: %s", path, str(err)) + raise + return buf + +def do_chown(path, uid, gid): + try: + os.chown(path, uid, gid) + except Exception, err: + logging.exception("Chown failed on %s err: %s", path, str(err)) + raise + return True + +def do_stat(path): + try: + #Check for fd. + if isinstance(path, int): + buf = os.fstat(path) + else: + buf = os.stat(path) + except Exception, err: + logging.exception("Stat failed on %s err: %s", path, str(err)) + raise + + return buf + +def do_open(path, mode): + try: + fd = open(path, mode) + except Exception, err: + logging.exception("Open failed on %s err: %s", path, str(err)) + raise + return fd + +def do_close(fd): + #fd could be file or int type. + try: + if isinstance(fd, int): + os.close(fd) + else: + fd.close() + except Exception, err: + logging.exception("Close failed on %s err: %s", fd, str(err)) + raise + return True + +def do_unlink(path, log = True): + try: + os.unlink(path) + except Exception, err: + if log: + logging.exception("Unlink failed on %s err: %s", path, str(err)) + if err.errno != errno.ENOENT: + raise + return True + +def do_rmdir(path): + try: + os.rmdir(path) + except Exception, err: + logging.exception("Rmdir failed on %s err: %s", path, str(err)) + if err.errno != errno.ENOENT: + raise + return True + +def do_rename(old_path, new_path): + try: + os.rename(old_path, new_path) + except Exception, err: + logging.exception("Rename failed on %s to %s err: %s", old_path, new_path, \ + str(err)) + raise + return True + +def mkdirs(path): + """ + Ensures the path is a directory or makes it if not. Errors if the path + exists but is a file or on permissions failure. + + :param path: path to create + """ + if not os.path.isdir(path): + try: + do_makedirs(path) + except OSError, err: + #TODO: check, isdir will fail if mounted and volume stopped. + #if err.errno != errno.EEXIST or not os.path.isdir(path) + if err.errno != errno.EEXIST: + raise + +def dir_empty(path): + """ + Return true if directory/container is empty. + :param path: Directory path. + :returns: True/False. + """ + if os.path.isdir(path): + try: + files = do_listdir(path) + except Exception, err: + logging.exception("listdir failed on %s err: %s", path, str(err)) + raise + if not files: + return True + else: + return False + else: + if not os.path.exists(path): + return True + +def rmdirs(path): + if os.path.isdir(path) and dir_empty(path): + do_rmdir(path) + else: + logging.error("rmdirs failed dir may not be empty or not valid dir") + return False diff --git a/ufo/gluster/swift/common/middleware/__init__.py b/ufo/gluster/swift/common/middleware/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/ufo/gluster/swift/common/middleware/__init__.py diff --git a/ufo/gluster/swift/common/middleware/gluster.py b/ufo/gluster/swift/common/middleware/gluster.py new file mode 100644 index 00000000000..ab63c51e12f --- /dev/null +++ b/ufo/gluster/swift/common/middleware/gluster.py @@ -0,0 +1,40 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Noop Middleware that simply allows us to monkey patch the constraints +import gluster.swift.common.constraints + +class Gluster(object): + """ + Noop middleware for use with the proxy server to get paste.deploy to load + this middleware such that the plugin constraints monkey patch the common + constraints ahead of their use. + """ + def __init__(self, app, conf): + self.app = app + self.conf = conf + + def __call__(self, env, start_response): + return self.app(env, start_response) + + +def filter_factory(global_conf, **local_conf): + """Returns a WSGI filter app for use with paste.deploy.""" + conf = global_conf.copy() + conf.update(local_conf) + + def gluster_filter(app): + return Gluster(app, conf) + return gluster_filter diff --git a/ufo/gluster/swift/common/utils.py b/ufo/gluster/swift/common/utils.py new file mode 100644 index 00000000000..56376f8eed5 --- /dev/null +++ b/ufo/gluster/swift/common/utils.py @@ -0,0 +1,457 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import errno +import xattr +from hashlib import md5 +import cPickle as pickle +from ConfigParser import ConfigParser, NoSectionError, NoOptionError +from swift.common.utils import normalize_timestamp, TRUE_VALUES +from gluster.swift.common.fs_utils import * +from gluster.swift.common import Glusterfs + +X_CONTENT_TYPE = 'Content-Type' +X_CONTENT_LENGTH = 'Content-Length' +X_TIMESTAMP = 'X-Timestamp' +X_PUT_TIMESTAMP = 'X-PUT-Timestamp' +X_TYPE = 'X-Type' +X_ETAG = 'ETag' +X_OBJECTS_COUNT = 'X-Object-Count' +X_BYTES_USED = 'X-Bytes-Used' +X_CONTAINER_COUNT = 'X-Container-Count' +X_OBJECT_TYPE = 'X-Object-Type' +DIR_TYPE = 'application/directory' +ACCOUNT = 'Account' +METADATA_KEY = 'user.swift.metadata' +MAX_XATTR_SIZE = 65536 +CONTAINER = 'container' +DIR = 'dir' +MARKER_DIR = 'marker_dir' +TEMP_DIR = 'tmp' +ASYNCDIR = 'async_pending' # Keep in sync with swift.obj.server.ASYNCDIR +FILE = 'file' +FILE_TYPE = 'application/octet-stream' +OBJECT = 'Object' +OBJECT_TYPE = 'application/octet-stream' +DEFAULT_UID = -1 +DEFAULT_GID = -1 +PICKLE_PROTOCOL = 2 +CHUNK_SIZE = 65536 +MEMCACHE_KEY_PREFIX = 'gluster.swift.' +MEMCACHE_ACCOUNT_DETAILS_KEY_PREFIX = MEMCACHE_KEY_PREFIX + 'account.details.' +MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX = MEMCACHE_KEY_PREFIX + 'container.details.' + + +def read_metadata(path): + """ + Helper function to read the pickled metadata from a File/Directory. + + :param path: File/Directory to read metadata from. + + :returns: dictionary of metadata + """ + metadata = None + metadata_s = '' + key = 0 + while metadata is None: + try: + metadata_s += xattr.get(path, '%s%s' % (METADATA_KEY, (key or ''))) + except IOError as err: + if err.errno == errno.ENODATA: + if key > 0: + # No errors reading the xattr keys, but since we have not + # been able to find enough chunks to get a successful + # unpickle operation, we consider the metadata lost, and + # drop the existing data so that the internal state can be + # recreated. + clean_metadata(path) + # We either could not find any metadata key, or we could find + # some keys, but were not successful in performing the + # unpickling (missing keys perhaps)? Either way, just report + # to the caller we have no metadata. + metadata = {} + else: + logging.exception("xattr.get failed on %s key %s err: %s", + path, key, str(err)) + # Note that we don't touch the keys on errors fetching the + # data since it could be a transient state. + raise + else: + try: + # If this key provides all or the remaining part of the pickle + # data, we don't need to keep searching for more keys. This + # means if we only need to store data in N xattr key/value + # pair, we only need to invoke xattr get N times. With large + # keys sizes we are shooting for N = 1. + metadata = pickle.loads(metadata_s) + assert isinstance(metadata, dict) + except EOFError, pickle.UnpicklingError: + # We still are not able recognize this existing data collected + # as a pickled object. Make sure we loop around to try to get + # more from another xattr key. + metadata = None + key += 1 + return metadata + +def write_metadata(path, metadata): + """ + Helper function to write pickled metadata for a File/Directory. + + :param path: File/Directory path to write the metadata + :param metadata: dictionary to metadata write + """ + assert isinstance(metadata, dict) + metastr = pickle.dumps(metadata, PICKLE_PROTOCOL) + key = 0 + while metastr: + try: + xattr.set(path, '%s%s' % (METADATA_KEY, key or ''), metastr[:MAX_XATTR_SIZE]) + except IOError as err: + logging.exception("xattr.set failed on %s key %s err: %s", path, key, str(err)) + raise + metastr = metastr[MAX_XATTR_SIZE:] + key += 1 + +def clean_metadata(path): + key = 0 + while True: + try: + xattr.remove(path, '%s%s' % (METADATA_KEY, (key or ''))) + except IOError as err: + if err.errno == errno.ENODATA: + break + raise + key += 1 + +def check_user_xattr(path): + if not os.path.exists(path): + return False + try: + xattr.set(path, 'user.test.key1', 'value1') + except IOError as err: + logging.exception("check_user_xattr: set failed on %s err: %s", path, str(err)) + raise + try: + xattr.remove(path, 'user.test.key1') + except IOError as err: + logging.exception("check_user_xattr: remove failed on %s err: %s", path, str(err)) + #Remove xattr may fail in case of concurrent remove. + return True + +def validate_container(metadata): + if not metadata: + logging.warn('validate_container: No metadata') + return False + + if X_TYPE not in metadata.keys() or \ + X_TIMESTAMP not in metadata.keys() or \ + X_PUT_TIMESTAMP not in metadata.keys() or \ + X_OBJECTS_COUNT not in metadata.keys() or \ + X_BYTES_USED not in metadata.keys(): + #logging.warn('validate_container: Metadata missing entries: %s' % metadata) + return False + + (value, timestamp) = metadata[X_TYPE] + if value == CONTAINER: + return True + + logging.warn('validate_container: metadata type is not CONTAINER (%r)' % (value,)) + return False + +def validate_account(metadata): + if not metadata: + logging.warn('validate_account: No metadata') + return False + + if X_TYPE not in metadata.keys() or \ + X_TIMESTAMP not in metadata.keys() or \ + X_PUT_TIMESTAMP not in metadata.keys() or \ + X_OBJECTS_COUNT not in metadata.keys() or \ + X_BYTES_USED not in metadata.keys() or \ + X_CONTAINER_COUNT not in metadata.keys(): + #logging.warn('validate_account: Metadata missing entries: %s' % metadata) + return False + + (value, timestamp) = metadata[X_TYPE] + if value == ACCOUNT: + return True + + logging.warn('validate_account: metadata type is not ACCOUNT (%r)' % (value,)) + return False + +def validate_object(metadata): + if not metadata: + logging.warn('validate_object: No metadata') + return False + + if X_TIMESTAMP not in metadata.keys() or \ + X_CONTENT_TYPE not in metadata.keys() or \ + X_ETAG not in metadata.keys() or \ + X_CONTENT_LENGTH not in metadata.keys() or \ + X_TYPE not in metadata.keys() or \ + X_OBJECT_TYPE not in metadata.keys(): + #logging.warn('validate_object: Metadata missing entries: %s' % metadata) + return False + + if metadata[X_TYPE] == OBJECT: + return True + + logging.warn('validate_object: metadata type is not OBJECT (%r)' % (metadata[X_TYPE],)) + return False + +def is_marker(metadata): + if not metadata: + logging.warn('is_marker: No metadata') + return False + + if X_OBJECT_TYPE not in metadata.keys(): + logging.warn('is_marker: X_OBJECT_TYPE missing from metadata: %s' % metadata) + return False + + if metadata[X_OBJECT_TYPE] == MARKER_DIR: + return True + else: + return False + +def _update_list(path, cont_path, src_list, reg_file=True, object_count=0, + bytes_used=0, obj_list=[]): + # strip the prefix off, also stripping the leading and trailing slashes + obj_path = path.replace(cont_path, '').strip(os.path.sep) + + for i in src_list: + if obj_path: + obj_list.append(os.path.join(obj_path, i)) + else: + obj_list.append(i) + + object_count += 1 + + if reg_file: + bytes_used += os.path.getsize(path + '/' + i) + + return object_count, bytes_used + +def update_list(path, cont_path, dirs=[], files=[], object_count=0, + bytes_used=0, obj_list=[]): + object_count, bytes_used = _update_list(path, cont_path, files, True, + object_count, bytes_used, + obj_list) + object_count, bytes_used = _update_list(path, cont_path, dirs, False, + object_count, bytes_used, + obj_list) + return object_count, bytes_used + + +class ContainerDetails(object): + def __init__(self, bytes_used, object_count, obj_list, dir_list): + self.bytes_used = bytes_used + self.object_count = object_count + self.obj_list = obj_list + self.dir_list = dir_list + + +def _get_container_details_from_fs(cont_path): + """ + get container details by traversing the filesystem + """ + bytes_used = 0 + object_count = 0 + obj_list = [] + dir_list = [] + + if os.path.isdir(cont_path): + for (path, dirs, files) in os.walk(cont_path): + object_count, bytes_used = update_list(path, cont_path, dirs, files, + object_count, bytes_used, + obj_list) + + dir_list.append((path, do_stat(path).st_mtime)) + + return ContainerDetails(bytes_used, object_count, obj_list, dir_list) + +def get_container_details(cont_path, memcache=None): + """ + Return object_list, object_count and bytes_used. + """ + mkey = '' + if memcache: + mkey = MEMCACHE_CONTAINER_DETAILS_KEY_PREFIX + cont_path + cd = memcache.get(mkey) + if cd: + if not cd.dir_list: + cd = None + else: + for (path, mtime) in cd.dir_list: + if mtime != do_stat(path).st_mtime: + cd = None + else: + cd = None + if not cd: + cd = _get_container_details_from_fs(cont_path) + if memcache: + memcache.set(mkey, cd) + return cd.obj_list, cd.object_count, cd.bytes_used + + +class AccountDetails(object): + """ A simple class to store the three pieces of information associated + with an account: + + 1. The last known modification time + 2. The count of containers in the following list + 3. The list of containers + """ + def __init__(self, mtime, container_count, container_list): + self.mtime = mtime + self.container_count = container_count + self.container_list = container_list + + +def _get_account_details_from_fs(acc_path, acc_stats): + container_list = [] + container_count = 0 + + if not acc_stats: + acc_stats = do_stat(acc_path) + is_dir = (acc_stats.st_mode & 0040000) != 0 + if is_dir: + for name in do_listdir(acc_path): + if name.lower() == TEMP_DIR \ + or name.lower() == ASYNCDIR \ + or not os.path.isdir(os.path.join(acc_path, name)): + continue + container_count += 1 + container_list.append(name) + + return AccountDetails(acc_stats.st_mtime, container_count, container_list) + +def get_account_details(acc_path, memcache=None): + """ + Return container_list and container_count. + """ + acc_stats = None + mkey = '' + if memcache: + mkey = MEMCACHE_ACCOUNT_DETAILS_KEY_PREFIX + acc_path + ad = memcache.get(mkey) + if ad: + # FIXME: Do we really need to stat the file? If we are object + # only, then we can track the other Swift HTTP APIs that would + # modify the account and invalidate the cached entry there. If we + # are not object only, are we even called on this path? + acc_stats = do_stat(acc_path) + if ad.mtime != acc_stats.st_mtime: + ad = None + else: + ad = None + if not ad: + ad = _get_account_details_from_fs(acc_path, acc_stats) + if memcache: + memcache.set(mkey, ad) + return ad.container_list, ad.container_count + +def _get_etag(path): + etag = md5() + with open(path, 'rb') as fp: + while True: + chunk = fp.read(CHUNK_SIZE) + if chunk: + etag.update(chunk) + else: + break + return etag.hexdigest() + +def get_object_metadata(obj_path): + """ + Return metadata of object. + """ + try: + stats = os.stat(obj_path) + except OSError as e: + if e.errno != errno.ENOENT: + raise + metadata = {} + else: + is_dir = (stats.st_mode & 0040000) != 0 + metadata = { + X_TYPE: OBJECT, + X_TIMESTAMP: normalize_timestamp(stats.st_ctime), + X_CONTENT_TYPE: DIR_TYPE if is_dir else FILE_TYPE, + X_OBJECT_TYPE: DIR if is_dir else FILE, + X_CONTENT_LENGTH: 0 if is_dir else stats.st_size, + X_ETAG: md5().hexdigest() if is_dir else _get_etag(obj_path), + } + return metadata + +def _add_timestamp(metadata_i): + # At this point we have a simple key/value dictionary, turn it into + # key/(value,timestamp) pairs. + timestamp = 0 + metadata = {} + for key, value_i in metadata_i.iteritems(): + if not isinstance(value_i, tuple): + metadata[key] = (value_i, timestamp) + else: + metadata[key] = value_i + return metadata + +def get_container_metadata(cont_path, memcache=None): + objects = [] + object_count = 0 + bytes_used = 0 + objects, object_count, bytes_used = get_container_details(cont_path, memcache) + metadata = {X_TYPE: CONTAINER, + X_TIMESTAMP: normalize_timestamp(os.path.getctime(cont_path)), + X_PUT_TIMESTAMP: normalize_timestamp(os.path.getmtime(cont_path)), + X_OBJECTS_COUNT: object_count, + X_BYTES_USED: bytes_used} + return _add_timestamp(metadata) + +def get_account_metadata(acc_path, memcache=None): + containers = [] + container_count = 0 + containers, container_count = get_account_details(acc_path, memcache) + metadata = {X_TYPE: ACCOUNT, + X_TIMESTAMP: normalize_timestamp(os.path.getctime(acc_path)), + X_PUT_TIMESTAMP: normalize_timestamp(os.path.getmtime(acc_path)), + X_OBJECTS_COUNT: 0, + X_BYTES_USED: 0, + X_CONTAINER_COUNT: container_count} + return _add_timestamp(metadata) + +def restore_metadata(path, metadata): + meta_orig = read_metadata(path) + if meta_orig: + meta_new = meta_orig.copy() + meta_new.update(metadata) + else: + meta_new = metadata + if meta_orig != meta_new: + write_metadata(path, meta_new) + return meta_new + +def create_object_metadata(obj_path): + metadata = get_object_metadata(obj_path) + return restore_metadata(obj_path, metadata) + +def create_container_metadata(cont_path, memcache=None): + metadata = get_container_metadata(cont_path, memcache) + return restore_metadata(cont_path, metadata) + +def create_account_metadata(acc_path, memcache=None): + metadata = get_account_metadata(acc_path, memcache) + return restore_metadata(acc_path, metadata) diff --git a/ufo/gluster/swift/container/__init__.py b/ufo/gluster/swift/container/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/ufo/gluster/swift/container/__init__.py diff --git a/ufo/gluster/swift/container/server.py b/ufo/gluster/swift/container/server.py new file mode 100644 index 00000000000..e2a19730220 --- /dev/null +++ b/ufo/gluster/swift/container/server.py @@ -0,0 +1,46 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Container Server for Gluster Swift UFO """ + +# Simply importing this monkey patches the constraint handling to fit our +# needs +import gluster.swift.common.constraints + +from swift.container import server +from gluster.swift.common.DiskDir import DiskDir + + +class ContainerController(server.ContainerController): + def _get_container_broker(self, drive, part, account, container): + """ + Overriden to provide the GlusterFS specific broker that talks to + Gluster for the information related to servicing a given request + instead of talking to a database. + + :param drive: drive that holds the container + :param part: partition the container is in + :param account: account name + :param container: container name + :returns: DiskDir object + """ + return DiskDir(self.root, account, container, self.logger) + + +def app_factory(global_conf, **local_conf): + """paste.deploy app factory for creating WSGI container server apps.""" + conf = global_conf.copy() + conf.update(local_conf) + return ContainerController(conf) diff --git a/ufo/gluster/swift/obj/__init__.py b/ufo/gluster/swift/obj/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/ufo/gluster/swift/obj/__init__.py diff --git a/ufo/gluster/swift/obj/server.py b/ufo/gluster/swift/obj/server.py new file mode 100644 index 00000000000..43cdd8890d2 --- /dev/null +++ b/ufo/gluster/swift/obj/server.py @@ -0,0 +1,33 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Object Server for Gluster Swift UFO """ + +# Simply importing this monkey patches the constraint handling to fit our +# needs +import gluster.swift.common.constraints + +from swift.obj import server +from gluster.swift.common.DiskFile import Gluster_DiskFile + +# Monkey patch the object server module to use Gluster's DiskFile definition +server.DiskFile = Gluster_DiskFile + + +def app_factory(global_conf, **local_conf): + """paste.deploy app factory for creating WSGI object server apps""" + conf = global_conf.copy() + conf.update(local_conf) + return server.ObjectController(conf) diff --git a/ufo/gluster/swift/proxy/__init__.py b/ufo/gluster/swift/proxy/__init__.py new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/ufo/gluster/swift/proxy/__init__.py diff --git a/ufo/gluster/swift/proxy/server.py b/ufo/gluster/swift/proxy/server.py new file mode 100644 index 00000000000..792a97df9a2 --- /dev/null +++ b/ufo/gluster/swift/proxy/server.py @@ -0,0 +1,27 @@ +# Copyright (c) 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Simply importing this monkey patches the constraint handling to fit our +# needs +import gluster.swift.common.constraints + +from swift.proxy import server + +def app_factory(global_conf, **local_conf): + """paste.deploy app factory for creating WSGI proxy apps.""" + conf = global_conf.copy() + conf.update(local_conf) + return server.Application(conf) |