diff options
author | Vikas Gorur <vikas@zresearch.com> | 2009-02-18 17:36:07 +0530 |
---|---|---|
committer | Vikas Gorur <vikas@zresearch.com> | 2009-02-18 17:36:07 +0530 |
commit | 77adf4cd648dce41f89469dd185deec6b6b53a0b (patch) | |
tree | 02e155a5753b398ee572b45793f889b538efab6b /xlators/performance/write-behind | |
parent | f3b2e6580e5663292ee113c741343c8a43ee133f (diff) |
Added all files
Diffstat (limited to 'xlators/performance/write-behind')
-rw-r--r-- | xlators/performance/write-behind/Makefile.am | 3 | ||||
-rw-r--r-- | xlators/performance/write-behind/src/Makefile.am | 12 | ||||
-rw-r--r-- | xlators/performance/write-behind/src/write-behind.c | 1444 |
3 files changed, 1459 insertions, 0 deletions
diff --git a/xlators/performance/write-behind/Makefile.am b/xlators/performance/write-behind/Makefile.am new file mode 100644 index 00000000000..d471a3f9243 --- /dev/null +++ b/xlators/performance/write-behind/Makefile.am @@ -0,0 +1,3 @@ +SUBDIRS = src + +CLEANFILES = diff --git a/xlators/performance/write-behind/src/Makefile.am b/xlators/performance/write-behind/src/Makefile.am new file mode 100644 index 00000000000..f800abad50d --- /dev/null +++ b/xlators/performance/write-behind/src/Makefile.am @@ -0,0 +1,12 @@ +xlator_LTLIBRARIES = write-behind.la +xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance + +write_behind_la_LDFLAGS = -module -avoidversion + +write_behind_la_SOURCES = write-behind.c +write_behind_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ + -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) + +CLEANFILES = diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c new file mode 100644 index 00000000000..04a447d49e9 --- /dev/null +++ b/xlators/performance/write-behind/src/write-behind.c @@ -0,0 +1,1444 @@ +/* + Copyright (c) 2006, 2007, 2008, 2009 Z RESEARCH, Inc. <http://www.zresearch.com> + This file is part of GlusterFS. + + GlusterFS is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + GlusterFS is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see + <http://www.gnu.org/licenses/>. +*/ + +/*TODO: check for non null wb_file_data before getting wb_file */ + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "glusterfs.h" +#include "logging.h" +#include "dict.h" +#include "xlator.h" +#include "list.h" +#include "compat.h" +#include "compat-errno.h" +#include "common-utils.h" + +#define MAX_VECTOR_COUNT 8 + +typedef struct list_head list_head_t; +struct wb_conf; +struct wb_page; +struct wb_file; + + +struct wb_conf { + uint64_t aggregate_size; + uint64_t window_size; + uint64_t disable_till; + gf_boolean_t enable_O_SYNC; + gf_boolean_t flush_behind; +}; + + +typedef struct wb_local { + list_head_t winds; + struct wb_file *file; + list_head_t unwind_frames; + int op_ret; + int op_errno; + call_frame_t *frame; +} wb_local_t; + + +typedef struct write_request { + call_frame_t *frame; + off_t offset; + /* int32_t op_ret; + int32_t op_errno; */ + struct iovec *vector; + int32_t count; + dict_t *refs; + char write_behind; + char stack_wound; + char got_reply; + list_head_t list; + list_head_t winds; + /* list_head_t unwinds;*/ +} wb_write_request_t; + + +struct wb_file { + int disabled; + uint64_t disable_till; + off_t offset; + size_t window_size; + int32_t refcount; + int32_t op_ret; + int32_t op_errno; + list_head_t request; + fd_t *fd; + gf_lock_t lock; + xlator_t *this; +}; + + +typedef struct wb_conf wb_conf_t; +typedef struct wb_page wb_page_t; +typedef struct wb_file wb_file_t; + + +int32_t +wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all); + +int32_t +wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds); + +int32_t +wb_sync_all (call_frame_t *frame, wb_file_t *file); + +int32_t +__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_size); + + +wb_file_t * +wb_file_create (xlator_t *this, + fd_t *fd) +{ + wb_file_t *file = NULL; + wb_conf_t *conf = this->private; + + file = CALLOC (1, sizeof (*file)); + INIT_LIST_HEAD (&file->request); + + /* fd_ref() not required, file should never decide the existance of + * an fd */ + file->fd= fd; + file->disable_till = conf->disable_till; + file->this = this; + file->refcount = 1; + + fd_ctx_set (fd, this, (uint64_t)(long)file); + + return file; +} + +void +wb_file_destroy (wb_file_t *file) +{ + int32_t refcount = 0; + + LOCK (&file->lock); + { + refcount = --file->refcount; + } + UNLOCK (&file->lock); + + if (!refcount){ + LOCK_DESTROY (&file->lock); + FREE (file); + } + + return; +} + + +int32_t +wb_sync_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *stbuf) +{ + wb_local_t *local = NULL; + list_head_t *winds = NULL; + wb_file_t *file = NULL; + wb_write_request_t *request = NULL, *dummy = NULL; + + local = frame->local; + winds = &local->winds; + file = local->file; + + LOCK (&file->lock); + { + list_for_each_entry_safe (request, dummy, winds, winds) { + request->got_reply = 1; + if (!request->write_behind && (op_ret == -1)) { + wb_local_t *per_request_local = request->frame->local; + per_request_local->op_ret = op_ret; + per_request_local->op_errno = op_errno; + } + + /* + request->op_ret = op_ret; + request->op_errno = op_errno; + */ + } + } + UNLOCK (&file->lock); + + if (op_ret == -1) + { + file->op_ret = op_ret; + file->op_errno = op_errno; + } + + wb_process_queue (frame, file, 0); + + /* safe place to do fd_unref */ + fd_unref (file->fd); + + STACK_DESTROY (frame->root); + + return 0; +} + +int32_t +wb_sync_all (call_frame_t *frame, wb_file_t *file) +{ + list_head_t winds; + int32_t bytes = 0; + + INIT_LIST_HEAD (&winds); + + LOCK (&file->lock); + { + bytes = __wb_mark_winds (&file->request, &winds, 0); + } + UNLOCK (&file->lock); + + wb_sync (frame, file, &winds); + + return bytes; +} + + +int32_t +wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds) +{ + wb_write_request_t *dummy = NULL, *request = NULL, *first_request = NULL, *next = NULL; + size_t total_count = 0, count = 0; + size_t copied = 0; + call_frame_t *sync_frame = NULL; + dict_t *refs = NULL; + wb_local_t *local = NULL; + struct iovec *vector = NULL; + int32_t bytes = 0; + size_t bytecount = 0; + + list_for_each_entry (request, winds, winds) + { + total_count += request->count; + bytes += iov_length (request->vector, request->count); + } + + if (!total_count) { + return 0; + } + + list_for_each_entry_safe (request, dummy, winds, winds) { + if (!vector) { + vector = MALLOC (VECTORSIZE (MAX_VECTOR_COUNT)); + refs = get_new_dict (); + + local = CALLOC (1, sizeof (*local)); + INIT_LIST_HEAD (&local->winds); + + first_request = request; + } + + count += request->count; + bytecount = VECTORSIZE (request->count); + memcpy (((char *)vector)+copied, + request->vector, + bytecount); + copied += bytecount; + + if (request->refs) { + dict_copy (request->refs, refs); + } + + next = NULL; + if (request->winds.next != winds) { + next = list_entry (request->winds.next, struct write_request, winds); + } + + list_del_init (&request->winds); + list_add_tail (&request->winds, &local->winds); + + if (!next || ((count + next->count) > MAX_VECTOR_COUNT)) { + sync_frame = copy_frame (frame); + sync_frame->local = local; + local->file = file; + sync_frame->root->req_refs = dict_ref (refs); + fd_ref (file->fd); + STACK_WIND (sync_frame, + wb_sync_cbk, + FIRST_CHILD(sync_frame->this), + FIRST_CHILD(sync_frame->this)->fops->writev, + file->fd, vector, + count, first_request->offset); + + dict_unref (refs); + FREE (vector); + first_request = NULL; + refs = NULL; + vector = NULL; + copied = count = 0; + } + } + + return bytes; +} + + +int32_t +wb_stat_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *buf) +{ + wb_local_t *local = NULL; + + local = frame->local; + + if (local->file) + fd_unref (local->file->fd); + + STACK_UNWIND (frame, op_ret, op_errno, buf); + + return 0; +} + + +int32_t +wb_stat (call_frame_t *frame, + xlator_t *this, + loc_t *loc) +{ + wb_file_t *file = NULL; + fd_t *iter_fd = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + + if (loc->inode) + { + iter_fd = fd_lookup (loc->inode, frame->root->pid); + if (iter_fd) { + if (!fd_ctx_get (iter_fd, this, &tmp_file)) { + file = (wb_file_t *)(long)tmp_file; + } else { + fd_unref (iter_fd); + } + } + if (file) { + wb_sync_all (frame, file); + } + } + + local = CALLOC (1, sizeof (*local)); + local->file = file; + + frame->local = local; + + STACK_WIND (frame, wb_stat_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->stat, + loc); + return 0; +} + + +int32_t +wb_fstat (call_frame_t *frame, + xlator_t *this, + fd_t *fd) +{ + wb_file_t *file = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + + if (fd_ctx_get (fd, this, &tmp_file)) { + gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); + STACK_UNWIND (frame, -1, EBADFD, NULL); + return 0; + } + + file = (wb_file_t *)(long)tmp_file; + if (file) { + fd_ref (file->fd); + wb_sync_all (frame, file); + } + + local = CALLOC (1, sizeof (*local)); + local->file = file; + + frame->local = local; + + STACK_WIND (frame, + wb_stat_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fstat, + fd); + return 0; +} + + +int32_t +wb_truncate_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *buf) +{ + wb_local_t *local = NULL; + + local = frame->local; + if (local->file) + fd_unref (local->file->fd); + + STACK_UNWIND (frame, op_ret, op_errno, buf); + return 0; +} + + +int32_t +wb_truncate (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + off_t offset) +{ + wb_file_t *file = NULL; + fd_t *iter_fd = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + + if (loc->inode) + { + iter_fd = fd_lookup (loc->inode, frame->root->pid); + if (iter_fd) { + if (!fd_ctx_get (iter_fd, this, &tmp_file)){ + file = (wb_file_t *)(long)tmp_file; + } else { + fd_unref (iter_fd); + } + } + + if (file) + { + wb_sync_all (frame, file); + } + } + + local = CALLOC (1, sizeof (*local)); + local->file = file; + + frame->local = local; + + STACK_WIND (frame, + wb_truncate_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, + loc, + offset); + return 0; +} + + +int32_t +wb_ftruncate (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + off_t offset) +{ + wb_file_t *file = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + + if (fd_ctx_get (fd, this, &tmp_file)) { + gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); + STACK_UNWIND (frame, -1, EBADFD, NULL); + return 0; + } + + file = (wb_file_t *)(long)tmp_file; + if (file) + wb_sync_all (frame, file); + + local = CALLOC (1, sizeof (*local)); + local->file = file; + + if (file) + fd_ref (file->fd); + + frame->local = local; + + STACK_WIND (frame, + wb_truncate_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->ftruncate, + fd, + offset); + return 0; +} + + +int32_t +wb_utimens_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *buf) +{ + wb_local_t *local = NULL; + + local = frame->local; + if (local->file) + fd_unref (local->file->fd); + + STACK_UNWIND (frame, op_ret, op_errno, buf); + return 0; +} + + +int32_t +wb_utimens (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + struct timespec tv[2]) +{ + wb_file_t *file = NULL; + fd_t *iter_fd = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + + if (loc->inode) { + iter_fd = fd_lookup (loc->inode, frame->root->pid); + if (iter_fd) { + if (!fd_ctx_get (iter_fd, this, &tmp_file)) { + file = (wb_file_t *)(long)tmp_file; + } else { + fd_unref (iter_fd); + } + } + + if (file) + wb_sync_all (frame, file); + } + + local = CALLOC (1, sizeof (*local)); + local->file = file; + + frame->local = local; + + STACK_WIND (frame, + wb_utimens_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->utimens, + loc, + tv); + return 0; +} + +int32_t +wb_open_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + fd_t *fd) +{ + int32_t flags = 0; + wb_file_t *file = NULL; + wb_conf_t *conf = this->private; + + if (op_ret != -1) + { + file = wb_file_create (this, fd); + + /* If mandatory locking has been enabled on this file, + we disable caching on it */ + + if ((fd->inode->st_mode & S_ISGID) && !(fd->inode->st_mode & S_IXGRP)) + file->disabled = 1; + + /* If O_DIRECT then, we disable chaching */ + if (frame->local) + { + flags = *((int32_t *)frame->local); + if (((flags & O_DIRECT) == O_DIRECT) || + ((flags & O_RDONLY) == O_RDONLY) || + (((flags & O_SYNC) == O_SYNC) && + conf->enable_O_SYNC == _gf_true)) { + file->disabled = 1; + } + } + + LOCK_INIT (&file->lock); + } + + STACK_UNWIND (frame, op_ret, op_errno, fd); + return 0; +} + + +int32_t +wb_open (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + int32_t flags, + fd_t *fd) +{ + frame->local = CALLOC (1, sizeof(int32_t)); + *((int32_t *)frame->local) = flags; + + STACK_WIND (frame, + wb_open_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->open, + loc, flags, fd); + return 0; +} + + +int32_t +wb_create_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + fd_t *fd, + inode_t *inode, + struct stat *buf) +{ + wb_file_t *file = NULL; + + if (op_ret != -1) + { + file = wb_file_create (this, fd); + /* + * If mandatory locking has been enabled on this file, + * we disable caching on it + */ + if ((fd->inode->st_mode & S_ISGID) && + !(fd->inode->st_mode & S_IXGRP)) + { + file->disabled = 1; + } + + LOCK_INIT (&file->lock); + } + + STACK_UNWIND (frame, op_ret, op_errno, fd, inode, buf); + return 0; +} + + +int32_t +wb_create (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + int32_t flags, + mode_t mode, + fd_t *fd) +{ + STACK_WIND (frame, + wb_create_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->create, + loc, flags, mode, fd); + return 0; +} + + +int32_t +__wb_cleanup_queue (wb_file_t *file) +{ + wb_write_request_t *request = NULL, *dummy = NULL; + int32_t bytes = 0; + + list_for_each_entry_safe (request, dummy, &file->request, list) + { + if (request->got_reply && request->write_behind) + { + bytes += iov_length (request->vector, request->count); + list_del_init (&request->list); + + FREE (request->vector); + dict_unref (request->refs); + + FREE (request); + } + } + + return bytes; +} + + +int32_t +__wb_mark_wind_all (list_head_t *list, list_head_t *winds) +{ + wb_write_request_t *request = NULL; + size_t size = 0; + + list_for_each_entry (request, list, list) + { + if (!request->stack_wound) + { + size += iov_length (request->vector, request->count); + request->stack_wound = 1; + list_add_tail (&request->winds, winds); + } + } + + return size; +} + + +size_t +__wb_get_aggregate_size (list_head_t *list) +{ + wb_write_request_t *request = NULL; + size_t size = 0; + + list_for_each_entry (request, list, list) + { + if (!request->stack_wound) + { + size += iov_length (request->vector, request->count); + } + } + + return size; +} + +uint32_t +__wb_get_incomplete_writes (list_head_t *list) +{ + wb_write_request_t *request = NULL; + uint32_t count = 0; + + list_for_each_entry (request, list, list) + { + if (request->stack_wound && !request->got_reply) + { + count++; + } + } + + return count; +} + +int32_t +__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf) +{ + size_t aggregate_current = 0; + uint32_t incomplete_writes = 0; + + incomplete_writes = __wb_get_incomplete_writes (list); + + aggregate_current = __wb_get_aggregate_size (list); + + if ((incomplete_writes == 0) || (aggregate_current >= aggregate_conf)) + { + __wb_mark_wind_all (list, winds); + } + + return aggregate_current; +} + + +size_t +__wb_get_window_size (list_head_t *list) +{ + wb_write_request_t *request = NULL; + size_t size = 0; + + list_for_each_entry (request, list, list) + { + if (request->write_behind && !request->got_reply) + { + size += iov_length (request->vector, request->count); + } + } + + return size; +} + + +size_t +__wb_mark_unwind_till (list_head_t *list, list_head_t *unwinds, size_t size) +{ + size_t written_behind = 0; + wb_write_request_t *request = NULL; + + list_for_each_entry (request, list, list) + { + if (written_behind <= size) + { + if (!request->write_behind) + { + wb_local_t *local = request->frame->local; + written_behind += iov_length (request->vector, request->count); + request->write_behind = 1; + list_add_tail (&local->unwind_frames, unwinds); + } + } + else + { + break; + } + } + + return written_behind; +} + + +int32_t +__wb_mark_unwinds (list_head_t *list, list_head_t *unwinds, size_t window_conf) +{ + size_t window_current = 0; + + window_current = __wb_get_window_size (list); + if (window_current <= window_conf) + { + window_current += __wb_mark_unwind_till (list, unwinds, + window_conf - window_current); + } + + return window_current; +} + + +int32_t +wb_stack_unwind (list_head_t *unwinds) +{ + struct stat buf = {0,}; + wb_local_t *local = NULL, *dummy = NULL; + + list_for_each_entry_safe (local, dummy, unwinds, unwind_frames) + { + list_del_init (&local->unwind_frames); + STACK_UNWIND (local->frame, local->op_ret, local->op_errno, &buf); + } + + return 0; +} + + +int32_t +wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, list_head_t *unwinds) +{ + /* copy the frame before calling wb_stack_unwind, since this request containing current frame might get unwound */ + /* call_frame_t *sync_frame = copy_frame (frame); */ + + wb_stack_unwind (unwinds); + wb_sync (frame, file, winds); + + return 0; +} + + +int32_t +wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all) +{ + list_head_t winds, unwinds; + size_t size = 0; + wb_conf_t *conf = file->this->private; + + INIT_LIST_HEAD (&winds); + INIT_LIST_HEAD (&unwinds); + + if (!file) + { + return -1; + } + + size = flush_all ? 0 : conf->aggregate_size; + LOCK (&file->lock); + { + __wb_cleanup_queue (file); + __wb_mark_winds (&file->request, &winds, size); + __wb_mark_unwinds (&file->request, &unwinds, conf->window_size); + } + UNLOCK (&file->lock); + + wb_do_ops (frame, file, &winds, &unwinds); + return 0; +} + + +wb_write_request_t * +wb_enqueue (wb_file_t *file, + call_frame_t *frame, + struct iovec *vector, + int32_t count, + off_t offset) +{ + wb_write_request_t *request = NULL; + wb_local_t *local = CALLOC (1, sizeof (*local)); + + request = CALLOC (1, sizeof (*request)); + + INIT_LIST_HEAD (&request->list); + INIT_LIST_HEAD (&request->winds); + + request->frame = frame; + request->vector = iov_dup (vector, count); + request->count = count; + request->offset = offset; + request->refs = dict_ref (frame->root->req_refs); + + frame->local = local; + local->frame = frame; + local->op_ret = iov_length (vector, count); + local->op_errno = 0; + INIT_LIST_HEAD (&local->unwind_frames); + + LOCK (&file->lock); + { + list_add_tail (&request->list, &file->request); + file->offset = offset + iov_length (vector, count); + } + UNLOCK (&file->lock); + + return request; +} + + +int32_t +wb_writev_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *stbuf) +{ + STACK_UNWIND (frame, op_ret, op_errno, stbuf); + return 0; +} + + +int32_t +wb_writev (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + struct iovec *vector, + int32_t count, + off_t offset) +{ + wb_file_t *file = NULL; + char offset_expected = 1, wb_disabled = 0; + call_frame_t *process_frame = NULL; + size_t size = 0; + uint64_t tmp_file = 0; + + if (vector != NULL) + size = iov_length (vector, count); + + if (fd_ctx_get (fd, this, &tmp_file)) { + gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); + STACK_UNWIND (frame, -1, EBADFD, NULL); + return 0; + } + + file = (wb_file_t *)(long)tmp_file; + if (!file) { + gf_log (this->name, GF_LOG_ERROR, + "wb_file not found for fd %p", fd); + STACK_UNWIND (frame, -1, EBADFD, NULL); + return 0; + } + + LOCK (&file->lock); + { + if (file->disabled || file->disable_till) { + if (size > file->disable_till) { + file->disable_till = 0; + } else { + file->disable_till -= size; + } + wb_disabled = 1; + } + + if (file->offset != offset) + offset_expected = 0; + } + UNLOCK (&file->lock); + + if (wb_disabled) { + STACK_WIND (frame, + wb_writev_cbk, + FIRST_CHILD (frame->this), + FIRST_CHILD (frame->this)->fops->writev, + file->fd, + vector, + count, + offset); + return 0; + } + + process_frame = copy_frame (frame); + + if (!offset_expected) + wb_process_queue (process_frame, file, 1); + + wb_enqueue (file, frame, vector, count, offset); + wb_process_queue (process_frame, file, 0); + + STACK_DESTROY (process_frame->root); + + return 0; +} + + +int32_t +wb_readv_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct iovec *vector, + int32_t count, + struct stat *stbuf) +{ + wb_local_t *local = NULL; + + local = frame->local; + + STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); + return 0; +} + + +int32_t +wb_readv (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + size_t size, + off_t offset) +{ + wb_file_t *file = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + + if (fd_ctx_get (fd, this, &tmp_file)) { + gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); + STACK_UNWIND (frame, -1, EBADFD, NULL); + return 0; + } + + file = (wb_file_t *)(long)tmp_file; + if (file) + wb_sync_all (frame, file); + + local = CALLOC (1, sizeof (*local)); + local->file = file; + + frame->local = local; + + STACK_WIND (frame, + wb_readv_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readv, + fd, size, offset); + + return 0; +} + + +int32_t +wb_ffr_bg_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + wb_local_t *local = NULL; + wb_file_t *file = NULL; + + local = frame->local; + file = local->file; + + if (file) { + fd_unref (file->fd); + } + + if (file->op_ret == -1) + { + op_ret = file->op_ret; + op_errno = file->op_errno; + + file->op_ret = 0; + } + + STACK_DESTROY (frame->root); + return 0; +} + + +int32_t +wb_ffr_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + wb_local_t *local = NULL; + wb_file_t *file = NULL; + + local = frame->local; + file = local->file; + if (file) { + /* corresponds to the fd_ref() done during wb_file_create() */ + fd_unref (file->fd); + } + + if (file->op_ret == -1) + { + op_ret = file->op_ret; + op_errno = file->op_errno; + + file->op_ret = 0; + } + + STACK_UNWIND (frame, op_ret, op_errno); + return 0; +} + + +int32_t +wb_flush (call_frame_t *frame, + xlator_t *this, + fd_t *fd) +{ + wb_conf_t *conf = NULL; + wb_file_t *file = NULL; + call_frame_t *flush_frame = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + + conf = this->private; + + if (fd_ctx_get (fd, this, &tmp_file)) { + gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); + STACK_UNWIND (frame, -1, EBADFD); + return 0; + } + + file = (wb_file_t *)(long)tmp_file; + + local = CALLOC (1, sizeof (*local)); + local->file = file; + if (file) + fd_ref (file->fd); + + if (&file->request != file->request.next) { + gf_log (this->name, GF_LOG_DEBUG, + "request queue is not empty, it has to be synced"); + } + + if (conf->flush_behind && + (!file->disabled) && (file->disable_till == 0)) { + flush_frame = copy_frame (frame); + STACK_UNWIND (frame, file->op_ret, + file->op_errno); // liar! liar! :O + + flush_frame->local = local; + wb_sync_all (flush_frame, file); + + STACK_WIND (flush_frame, + wb_ffr_bg_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->flush, + fd); + } else { + wb_sync_all (frame, file); + + frame->local = local; + STACK_WIND (frame, + wb_ffr_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->flush, + fd); + } + + return 0; +} + + +int32_t +wb_fsync_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno) +{ + wb_local_t *local = NULL; + wb_file_t *file = NULL; + + local = frame->local; + file = local->file; + + if (file->op_ret == -1) + { + op_ret = file->op_ret; + op_errno = file->op_errno; + + file->op_ret = 0; + } + + STACK_UNWIND (frame, op_ret, op_errno); + return 0; +} + +int32_t +wb_fsync (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + int32_t datasync) +{ + wb_file_t *file = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + + if (fd_ctx_get (fd, this, &tmp_file)) { + gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); + STACK_UNWIND (frame, -1, EBADFD); + return 0; + } + + file = (wb_file_t *)(long)tmp_file; + if (file) + wb_sync_all (frame, file); + + local = CALLOC (1, sizeof (*local)); + local->file = file; + + frame->local = local; + + STACK_WIND (frame, + wb_fsync_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsync, + fd, datasync); + return 0; +} + + +int32_t +wb_release (xlator_t *this, + fd_t *fd) +{ + uint64_t file = 0; + + fd_ctx_get (fd, this, &file); + wb_file_destroy ((wb_file_t *)(long)file); + + return 0; +} + + +int32_t +init (xlator_t *this) +{ + dict_t *options = NULL; + wb_conf_t *conf = NULL; + char *aggregate_size_string = NULL; + char *window_size_string = NULL; + char *flush_behind_string = NULL; + char *disable_till_string = NULL; + char *enable_O_SYNC_string = NULL; + int32_t ret = -1; + + if ((this->children == NULL) + || this->children->next) { + gf_log (this->name, GF_LOG_ERROR, + "FATAL: write-behind (%s) not configured with exactly one child", + this->name); + return -1; + } + + if (this->parents == NULL) { + gf_log (this->name, GF_LOG_WARNING, + "dangling volume. check volfile"); + } + + options = this->options; + + conf = CALLOC (1, sizeof (*conf)); + + conf->enable_O_SYNC = _gf_false; + ret = dict_get_str (options, "enable-O_SYNC", + &enable_O_SYNC_string); + if (ret == 0) { + ret = gf_string2boolean (enable_O_SYNC_string, + &conf->enable_O_SYNC); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'enable-O_SYNC' takes only boolean arguments"); + return -1; + } + } + + /* configure 'options aggregate-size <size>' */ + conf->aggregate_size = 0; + ret = dict_get_str (options, "block-size", + &aggregate_size_string); + if (ret == 0) { + ret = gf_string2bytesize (aggregate_size_string, + &conf->aggregate_size); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "invalid number format \"%s\" of \"option aggregate-size\"", + aggregate_size_string); + return -1; + } + } + + gf_log (this->name, GF_LOG_DEBUG, + "using aggregate-size = %"PRIu64"", + conf->aggregate_size); + + conf->disable_till = 1; + ret = dict_get_str (options, "disable-for-first-nbytes", + &disable_till_string); + if (ret == 0) { + ret = gf_string2bytesize (disable_till_string, + &conf->disable_till); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "invalid number format \"%s\" of \"option disable-for-first-nbytes\"", + disable_till_string); + return -1; + } + } + + gf_log (this->name, GF_LOG_DEBUG, + "disabling write-behind for first %"PRIu64" bytes", + conf->disable_till); + + /* configure 'option window-size <size>' */ + conf->window_size = 0; + ret = dict_get_str (options, "cache-size", + &window_size_string); + if (ret == 0) { + ret = gf_string2bytesize (window_size_string, + &conf->window_size); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "invalid number format \"%s\" of \"option window-size\"", + window_size_string); + FREE (conf); + return -1; + } + } + + if (!conf->window_size && conf->aggregate_size) { + gf_log (this->name, GF_LOG_WARNING, + "setting window-size to be equal to aggregate-size(%"PRIu64")", + conf->aggregate_size); + conf->window_size = conf->aggregate_size; + } + + if (conf->window_size < conf->aggregate_size) { + gf_log (this->name, GF_LOG_ERROR, + "aggregate-size(%"PRIu64") cannot be more than window-size" + "(%"PRIu64")", conf->window_size, conf->aggregate_size); + FREE (conf); + return -1; + } + + /* configure 'option flush-behind <on/off>' */ + conf->flush_behind = 0; + ret = dict_get_str (options, "flush-behind", + &flush_behind_string); + if (ret == 0) { + ret = gf_string2boolean (flush_behind_string, + &conf->flush_behind); + if (ret == -1) { + gf_log (this->name, GF_LOG_ERROR, + "'flush-behind' takes only boolean arguments"); + return -1; + } + + if (conf->flush_behind) { + gf_log (this->name, GF_LOG_DEBUG, + "enabling flush-behind"); + } + } + this->private = conf; + return 0; +} + + +void +fini (xlator_t *this) +{ + wb_conf_t *conf = this->private; + + FREE (conf); + return; +} + + +struct xlator_fops fops = { + .writev = wb_writev, + .open = wb_open, + .create = wb_create, + .readv = wb_readv, + .flush = wb_flush, + .fsync = wb_fsync, + .stat = wb_stat, + .fstat = wb_fstat, + .truncate = wb_truncate, + .ftruncate = wb_ftruncate, + .utimens = wb_utimens, +}; + +struct xlator_mops mops = { +}; + +struct xlator_cbks cbks = { + .release = wb_release +}; + +struct volume_options options[] = { + { .key = {"flush-behind"}, + .type = GF_OPTION_TYPE_BOOL + }, + { .key = {"block-size", "aggregate-size"}, + .type = GF_OPTION_TYPE_SIZET, + .min = 128 * GF_UNIT_KB, + .max = 4 * GF_UNIT_MB + }, + { .key = {"cache-size", "window-size"}, + .type = GF_OPTION_TYPE_SIZET, + .min = 512 * GF_UNIT_KB, + .max = 1 * GF_UNIT_GB + }, + { .key = {"disable-for-first-nbytes"}, + .type = GF_OPTION_TYPE_SIZET, + .min = 1, + .max = 1 * GF_UNIT_MB, + }, + { .key = {"enable-O_SYNC"}, + .type = GF_OPTION_TYPE_BOOL, + }, + { .key = {NULL} }, +}; |