summaryrefslogtreecommitdiffstats
path: root/xlators/performance/write-behind/src/write-behind.c
diff options
context:
space:
mode:
authorRaghavendra G <raghavendra@gluster.com>2011-01-21 07:47:08 +0000
committerAnand V. Avati <avati@dev.gluster.com>2011-01-27 12:18:17 -0800
commitbf5c0efdec755297a976a6253665431d700d0737 (patch)
tree166e033a51e498b0689e0c9e9a3f29072176e654 /xlators/performance/write-behind/src/write-behind.c
parent4acbecec9a605ba4f1ba360923d7903465183e1a (diff)
performance/write-behind: backport write-behind from 3.1
Signed-off-by: Raghavendra G <raghavendra@gluster.com> Signed-off-by: Anand V. Avati <avati@dev.gluster.com> BUG: 934 (md5sum mismatch when files are transferred using vsftpd) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=934
Diffstat (limited to 'xlators/performance/write-behind/src/write-behind.c')
-rw-r--r--xlators/performance/write-behind/src/write-behind.c851
1 files changed, 620 insertions, 231 deletions
diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c
index 91342700ee6..79fe056d327 100644
--- a/xlators/performance/write-behind/src/write-behind.c
+++ b/xlators/performance/write-behind/src/write-behind.c
@@ -1,18 +1,18 @@
/*
- Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com>
+ Copyright (c) 2006-2010 Gluster, Inc. <http://www.gluster.com>
This file is part of GlusterFS.
GlusterFS is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
+ it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation; either version 3 of the License,
or (at your option) any later version.
GlusterFS is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
+ Affero General Public License for more details.
- You should have received a copy of the GNU General Public License
+ You should have received a copy of the GNU Affero General Public License
along with this program. If not, see
<http://www.gnu.org/licenses/>.
*/
@@ -35,6 +35,7 @@
#include "common-utils.h"
#include "call-stub.h"
#include "statedump.h"
+#include <assert.h>
#define MAX_VECTOR_COUNT 8
#define WB_AGGREGATE_SIZE 131072 /* 128 KB */
@@ -51,6 +52,7 @@ typedef struct wb_file {
uint64_t disable_till;
size_t window_conf;
size_t window_current;
+ int32_t flags;
size_t aggregate_current;
int32_t refcount;
int32_t op_ret;
@@ -64,20 +66,29 @@ typedef struct wb_file {
typedef struct wb_request {
- list_head_t list;
- list_head_t winds;
- list_head_t unwinds;
- list_head_t other_requests;
- call_stub_t *stub;
- size_t write_size;
- int32_t refcount;
- wb_file_t *file;
+ list_head_t list;
+ list_head_t winds;
+ list_head_t unwinds;
+ list_head_t other_requests;
+ call_stub_t *stub;
+ size_t write_size;
+ int32_t refcount;
+ wb_file_t *file;
+ glusterfs_fop_t fop;
union {
struct {
char write_behind;
char stack_wound;
char got_reply;
char virgin;
+ char flush_all; /* while trying to sync to back-end,
+ * don't wait till a data of size
+ * equal to configured aggregate-size
+ * is accumulated, instead sync
+ * whatever data currently present in
+ * request queue.
+ */
+
}write_request;
struct {
@@ -106,7 +117,6 @@ typedef struct wb_local {
int op_ret;
int op_errno;
call_frame_t *frame;
- fd_t *fd;
int32_t reply_count;
} wb_local_t;
@@ -116,26 +126,28 @@ typedef struct wb_page wb_page_t;
int32_t
-wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all);
+wb_process_queue (call_frame_t *frame, wb_file_t *file);
ssize_t
wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds);
ssize_t
__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_size,
- char wind_all, char enable_trickling_writes);
+ char enable_trickling_writes);
-static void
+static int
__wb_request_unref (wb_request_t *this)
{
+ int ret = -1;
+
if (this->refcount <= 0) {
gf_log ("wb-request", GF_LOG_DEBUG,
"refcount(%d) is <= 0", this->refcount);
- return;
+ goto out;
}
- this->refcount--;
+ ret = --this->refcount;
if (this->refcount == 0) {
list_del_init (&this->list);
if (this->stub && this->stub->fop == GF_FOP_WRITE) {
@@ -144,25 +156,33 @@ __wb_request_unref (wb_request_t *this)
FREE (this);
}
+
+out:
+ return ret;
}
-static void
+static int
wb_request_unref (wb_request_t *this)
{
wb_file_t *file = NULL;
+ int ret = 0;
+
if (this == NULL) {
gf_log ("wb-request", GF_LOG_DEBUG,
"request is NULL");
- return;
+ goto out;
}
file = this->file;
LOCK (&file->lock);
{
- __wb_request_unref (this);
+ ret = __wb_request_unref (this);
}
UNLOCK (&file->lock);
+
+out:
+ return ret;
}
@@ -204,11 +224,11 @@ wb_request_ref (wb_request_t *this)
wb_request_t *
wb_enqueue (wb_file_t *file, call_stub_t *stub)
{
- wb_request_t *request = NULL;
- call_frame_t *frame = NULL;
- wb_local_t *local = NULL;
- struct iovec *vector = NULL;
- int32_t count = 0;
+ wb_request_t *request = NULL, *tmp = NULL;
+ call_frame_t *frame = NULL;
+ wb_local_t *local = NULL;
+ struct iovec *vector = NULL;
+ int32_t count = 0;
request = CALLOC (1, sizeof (*request));
if (request == NULL) {
@@ -222,6 +242,7 @@ wb_enqueue (wb_file_t *file, call_stub_t *stub)
request->stub = stub;
request->file = file;
+ request->fop = stub->fop;
frame = stub->frame;
local = frame->local;
@@ -233,11 +254,11 @@ wb_enqueue (wb_file_t *file, call_stub_t *stub)
vector = stub->args.writev.vector;
count = stub->args.writev.count;
- frame = stub->frame;
- local = frame->local;
request->write_size = iov_length (vector, count);
- local->op_ret = request->write_size;
- local->op_errno = 0;
+ if (local) {
+ local->op_ret = request->write_size;
+ local->op_errno = 0;
+ }
request->flags.write_request.virgin = 1;
}
@@ -254,6 +275,13 @@ wb_enqueue (wb_file_t *file, call_stub_t *stub)
file->aggregate_current += request->write_size;
} else {
+ list_for_each_entry (tmp, &file->request, list) {
+ if (tmp->stub && tmp->stub->fop
+ == GF_FOP_WRITE) {
+ tmp->flags.write_request.flush_all = 1;
+ }
+ }
+
/*reference for resuming */
__wb_request_ref (request);
}
@@ -266,7 +294,7 @@ out:
wb_file_t *
-wb_file_create (xlator_t *this, fd_t *fd)
+wb_file_create (xlator_t *this, fd_t *fd, int32_t flags)
{
wb_file_t *file = NULL;
wb_conf_t *conf = this->private;
@@ -288,6 +316,7 @@ wb_file_create (xlator_t *this, fd_t *fd)
file->this = this;
file->refcount = 1;
file->window_conf = conf->window_size;
+ file->flags = flags;
fd_ctx_set (fd, this, (uint64_t)(long)file);
@@ -359,7 +388,7 @@ wb_sync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
}
UNLOCK (&file->lock);
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
LOCK (&file->lock);
{
@@ -393,6 +422,12 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
size_t bytecount = 0;
wb_conf_t *conf = NULL;
fd_t *fd = NULL;
+ int32_t op_errno = -1;
+
+ if (frame == NULL) {
+ op_errno = EINVAL;
+ goto out;
+ }
conf = file->this->private;
list_for_each_entry (request, winds, winds) {
@@ -403,6 +438,8 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
}
if (total_count == 0) {
+ gf_log (file->this->name, GF_LOG_TRACE, "no vectors are to be"
+ "synced");
goto out;
}
@@ -411,18 +448,27 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
vector = MALLOC (VECTORSIZE (MAX_VECTOR_COUNT));
if (vector == NULL) {
bytes = -1;
+ op_errno = ENOMEM;
+ gf_log (file->this->name, GF_LOG_ERROR,
+ "out of memory");
goto out;
}
iobref = iobref_new ();
if (iobref == NULL) {
bytes = -1;
+ op_errno = ENOMEM;
+ gf_log (file->this->name, GF_LOG_ERROR,
+ "out of memory");
goto out;
}
local = CALLOC (1, sizeof (*local));
if (local == NULL) {
bytes = -1;
+ op_errno = ENOMEM;
+ gf_log (file->this->name, GF_LOG_ERROR,
+ "out of memory");
goto out;
}
@@ -464,6 +510,9 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
sync_frame = copy_frame (frame);
if (sync_frame == NULL) {
bytes = -1;
+ op_errno = ENOMEM;
+ gf_log (file->this->name, GF_LOG_ERROR,
+ "out of memory");
goto out;
}
@@ -506,7 +555,16 @@ out:
}
if (local != NULL) {
+ /* had we winded these requests, we would have unrefed
+ * in wb_sync_cbk.
+ */
+ list_for_each_entry_safe (request, dummy, &local->winds,
+ winds) {
+ wb_request_unref (request);
+ }
+
FREE (local);
+ local = NULL;
}
if (iobref != NULL) {
@@ -517,6 +575,28 @@ out:
FREE (vector);
}
+ if (bytes == -1) {
+ /*
+ * had we winded these requests, we would have unrefed
+ * in wb_sync_cbk.
+ */
+ if (local) {
+ list_for_each_entry_safe (request, dummy, &local->winds,
+ winds) {
+ wb_request_unref (request);
+ }
+ }
+
+ if (file != NULL) {
+ LOCK (&file->lock);
+ {
+ file->op_ret = -1;
+ file->op_errno = op_errno;
+ }
+ UNLOCK (&file->lock);
+ }
+ }
+
return bytes;
}
@@ -551,7 +631,7 @@ wb_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
}
if (process_frame != NULL) {
- ret = wb_process_queue (process_frame, file, 0);
+ ret = wb_process_queue (process_frame, file);
if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) {
LOCK (&file->lock);
{
@@ -636,7 +716,7 @@ wb_stat (call_frame_t *frame, xlator_t *this, loc_t *loc)
goto unwind;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -680,7 +760,7 @@ wb_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
request = local->request;
if ((file != NULL) && (request != NULL)) {
wb_request_unref (request);
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_ret = -1;
op_errno = ENOMEM;
@@ -718,14 +798,15 @@ wb_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd)
if ((!S_ISDIR (fd->inode->st_mode))
&& fd_ctx_get (fd, this, &tmp_file)) {
- gf_log (this->name, GF_LOG_DEBUG, "write behind file pointer is"
- " not stored in context of fd(%p), returning EBADFD",
+ gf_log (this->name, GF_LOG_DEBUG,
+ "write behind file pointer is not stored in "
+ "context of fd(%p), returning EBADFD",
fd);
STACK_UNWIND_STRICT (fstat, frame, -1, EBADFD, NULL);
return 0;
}
-
+
file = (wb_file_t *)(long)tmp_file;
local = CALLOC (1, sizeof (*local));
if (local == NULL) {
@@ -753,7 +834,7 @@ wb_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd)
/*
FIXME:should the request queue be emptied in case of error?
*/
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -802,14 +883,15 @@ wb_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
}
- STACK_UNWIND_STRICT (truncate, frame, op_ret, op_errno, prebuf, postbuf);
+ STACK_UNWIND_STRICT (truncate, frame, op_ret, op_errno, prebuf,
+ postbuf);
if (request) {
wb_request_unref (request);
}
if (process_frame != NULL) {
- ret = wb_process_queue (process_frame, file, 0);
+ ret = wb_process_queue (process_frame, file);
if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) {
LOCK (&file->lock);
{
@@ -901,7 +983,7 @@ wb_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset)
goto unwind;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -944,14 +1026,15 @@ wb_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if ((request != NULL) && (file != NULL)) {
wb_request_unref (request);
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_ret = -1;
op_errno = ENOMEM;
}
}
- STACK_UNWIND_STRICT (ftruncate, frame, op_ret, op_errno, prebuf, postbuf);
+ STACK_UNWIND_STRICT (ftruncate, frame, op_ret, op_errno, prebuf,
+ postbuf);
return 0;
}
@@ -1020,7 +1103,7 @@ wb_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset)
goto unwind;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -1049,7 +1132,8 @@ unwind:
int32_t
wb_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct stat *statpre, struct stat *statpost)
+ int32_t op_ret, int32_t op_errno, struct stat *statpre,
+ struct stat *statpost)
{
wb_local_t *local = NULL;
wb_request_t *request = NULL;
@@ -1070,14 +1154,15 @@ wb_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
}
- STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, statpre, statpost);
+ STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, statpre,
+ statpost);
if (request) {
wb_request_unref (request);
}
if (request && (process_frame != NULL)) {
- ret = wb_process_queue (process_frame, file, 0);
+ ret = wb_process_queue (process_frame, file);
if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) {
LOCK (&file->lock);
{
@@ -1169,7 +1254,8 @@ wb_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,
local->file = file;
if (file) {
- stub = fop_setattr_stub (frame, wb_setattr_helper, loc, stbuf, valid);
+ stub = fop_setattr_stub (frame, wb_setattr_helper, loc, stbuf,
+ valid);
if (stub == NULL) {
op_errno = ENOMEM;
goto unwind;
@@ -1181,7 +1267,7 @@ wb_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,
goto unwind;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -1229,22 +1315,13 @@ wb_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
wbflags = local->wbflags;
if (op_ret != -1) {
- file = wb_file_create (this, fd);
+ file = wb_file_create (this, fd, flags);
if (file == NULL) {
op_ret = -1;
op_errno = ENOMEM;
goto out;
}
- /*
- If mandatory locking has been enabled on this file,
- we disable caching on it
- */
-
- if ((fd->inode->st_mode & S_ISGID)
- && !(fd->inode->st_mode & S_IXGRP))
- file->disabled = 1;
-
/* If O_DIRECT then, we disable chaching */
if (((flags & O_DIRECT) == O_DIRECT)
|| ((flags & O_ACCMODE) == O_RDONLY)
@@ -1308,23 +1385,19 @@ wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
wb_conf_t *conf = this->private;
if (op_ret != -1) {
- file = wb_file_create (this, fd);
+ if (frame->local) {
+ flags = (long) frame->local;
+ }
+
+ file = wb_file_create (this, fd, flags);
if (file == NULL) {
op_ret = -1;
op_errno = ENOMEM;
goto out;
}
- /*
- * If mandatory locking has been enabled on this file,
- * we disable caching on it
- */
- if ((fd->inode->st_mode & S_ISGID)
- && !(fd->inode->st_mode & S_IXGRP))
- file->disabled = 1;
/* If O_DIRECT then, we disable chaching */
if (frame->local) {
- flags = (long)frame->local;
if (((flags & O_DIRECT) == O_DIRECT)
|| ((flags & O_ACCMODE) == O_RDONLY)
|| (((flags & O_SYNC) == O_SYNC)
@@ -1339,8 +1412,8 @@ wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
frame->local = NULL;
out:
- STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, buf, preparent,
- postparent);
+ STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, buf,
+ preparent, postparent);
return 0;
}
@@ -1351,22 +1424,29 @@ wb_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
{
frame->local = (void *)(long)flags;
- STACK_WIND (frame,
- wb_create_cbk,
+ STACK_WIND (frame, wb_create_cbk,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->create,
loc, flags, mode, fd);
return 0;
}
-
+/* Mark all the contiguous write requests for winding starting from head of
+ * request list. Stops marking at the first non-write request found. If
+ * file is opened with O_APPEND, make sure all the writes marked for winding
+ * will fit into a single write call to server.
+ */
size_t
__wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds)
{
- wb_request_t *request = NULL;
- size_t size = 0;
- char first_request = 1;
+ wb_request_t *request = NULL;
+ size_t size = 0;
+ char first_request = 1;
off_t offset_expected = 0;
+ wb_conf_t *conf = NULL;
+ int count = 0;
+
+ conf = file->this->private;
list_for_each_entry (request, list, list)
{
@@ -1385,9 +1465,18 @@ __wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds)
break;
}
+ if ((file->flags & O_APPEND)
+ && (((size + request->write_size)
+ > conf->aggregate_size)
+ || ((count + request->stub->args.writev.count)
+ > MAX_VECTOR_COUNT))) {
+ break;
+ }
+
size += request->write_size;
offset_expected += request->write_size;
file->aggregate_current -= request->write_size;
+ count += request->stub->args.writev.count;
request->flags.write_request.stack_wound = 1;
list_add_tail (&request->winds, winds);
@@ -1400,7 +1489,8 @@ __wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds)
void
__wb_can_wind (list_head_t *list, char *other_fop_in_queue,
- char *non_contiguous_writes, char *incomplete_writes)
+ char *non_contiguous_writes, char *incomplete_writes,
+ char *wind_all)
{
wb_request_t *request = NULL;
char first_request = 1;
@@ -1426,7 +1516,11 @@ __wb_can_wind (list_head_t *list, char *other_fop_in_queue,
if (!request->flags.write_request.stack_wound) {
if (first_request) {
first_request = 0;
- offset_expected = request->stub->args.writev.off;
+ offset_expected
+ = request->stub->args.writev.off;
+ if (wind_all != NULL) {
+ *wind_all = request->flags.write_request.flush_all;
+ }
}
if (offset_expected != request->stub->args.writev.off) {
@@ -1446,7 +1540,7 @@ __wb_can_wind (list_head_t *list, char *other_fop_in_queue,
ssize_t
__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf,
- char wind_all, char enable_trickling_writes)
+ char enable_trickling_writes)
{
size_t size = 0;
char other_fop_in_queue = 0;
@@ -1454,6 +1548,7 @@ __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf,
char non_contiguous_writes = 0;
wb_request_t *request = NULL;
wb_file_t *file = NULL;
+ char wind_all = 0;
if (list_empty (list)) {
goto out;
@@ -1462,17 +1557,16 @@ __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf,
request = list_entry (list->next, typeof (*request), list);
file = request->file;
- if (!wind_all && (file->aggregate_current < aggregate_conf)) {
- __wb_can_wind (list, &other_fop_in_queue,
- &non_contiguous_writes, &incomplete_writes);
- }
+ __wb_can_wind (list, &other_fop_in_queue,
+ &non_contiguous_writes, &incomplete_writes, &wind_all);
- if ((enable_trickling_writes && !incomplete_writes)
- || (wind_all) || (non_contiguous_writes)
- || (other_fop_in_queue)
- || (file->aggregate_current >= aggregate_conf)) {
+ if (!incomplete_writes && ((enable_trickling_writes)
+ || (wind_all) || (non_contiguous_writes)
+ || (other_fop_in_queue)
+ || (file->aggregate_current
+ >= aggregate_conf))) {
size = __wb_mark_wind_all (file, list, winds);
- }
+ }
out:
return size;
@@ -1535,7 +1629,8 @@ __wb_mark_unwinds (list_head_t *list, list_head_t *unwinds)
if (file->window_current <= file->window_conf) {
__wb_mark_unwind_till (list, unwinds,
- file->window_conf - file->window_current);
+ file->window_conf
+ - file->window_current);
}
out:
@@ -1573,6 +1668,7 @@ wb_stack_unwind (list_head_t *unwinds)
wb_request_t *request = NULL, *dummy = NULL;
call_frame_t *frame = NULL;
wb_local_t *local = NULL;
+ int ret = 0, write_requests_removed = 0;
list_for_each_entry_safe (request, dummy, unwinds, unwinds)
{
@@ -1582,10 +1678,13 @@ wb_stack_unwind (list_head_t *unwinds)
STACK_UNWIND (frame, local->op_ret, local->op_errno, &buf,
&buf);
- wb_request_unref (request);
+ ret = wb_request_unref (request);
+ if (ret == 0) {
+ write_requests_removed++;
+ }
}
- return 0;
+ return write_requests_removed;
}
@@ -1623,7 +1722,7 @@ wb_resume_other_requests (call_frame_t *frame, wb_file_t *file,
}
if (fops_removed > 0) {
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
}
out:
@@ -1635,19 +1734,27 @@ int32_t
wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds,
list_head_t *unwinds, list_head_t *other_requests)
{
- int32_t ret = -1;
+ int32_t ret = -1, write_requests_removed = 0;
ret = wb_stack_unwind (unwinds);
- if (ret == -1) {
- goto out;
- }
+
+ write_requests_removed = ret;
ret = wb_sync (frame, file, winds);
if (ret == -1) {
goto out;
}
- ret = wb_resume_other_requests (frame, file, other_requests);
+ wb_resume_other_requests (frame, file, other_requests);
+
+ /* wb_stack_unwind does wb_request_unref after unwinding a write
+ * request. Hence if a write-request was just freed in wb_stack_unwind,
+ * we have to process request queue once again to unblock requests
+ * blocked on the writes just unwound.
+ */
+ if (write_requests_removed > 0) {
+ ret = wb_process_queue (frame, file);
+ }
out:
return ret;
@@ -1771,7 +1878,7 @@ __wb_collapse_write_bufs (list_head_t *requests, size_t page_size)
int32_t
-wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all)
+wb_process_queue (call_frame_t *frame, wb_file_t *file)
{
list_head_t winds, unwinds, other_requests;
size_t size = 0;
@@ -1808,7 +1915,6 @@ wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all)
if (count == 0) {
__wb_mark_winds (&file->request, &winds, size,
- flush_all,
conf->enable_trickling_writes);
}
@@ -1934,7 +2040,7 @@ wb_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector,
goto unwind;
}
- ret = wb_process_queue (process_frame, file, 0);
+ ret = wb_process_queue (process_frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_errno = ENOMEM;
goto unwind;
@@ -1976,14 +2082,15 @@ wb_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
if ((request != NULL) && (file != NULL)) {
wb_request_unref (request);
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_ret = -1;
op_errno = ENOMEM;
}
}
- STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno, vector, count, stbuf, iobref);
+ STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno, vector, count,
+ stbuf, iobref);
return 0;
}
@@ -2054,7 +2161,7 @@ wb_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
return 0;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
STACK_UNWIND_STRICT (readv, frame, -1, ENOMEM,
NULL, 0, NULL, NULL);
@@ -2074,123 +2181,118 @@ wb_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
}
-int
-wb_flush_bg_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+int32_t
+wb_ffr_bg_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno)
{
- wb_local_t *local = NULL;
- wb_file_t *file = NULL;
- wb_request_t *request = NULL;
- fd_t *fd = NULL;
-
- local = frame->local;
- if (local) {
- file = local->file;
- request = local->request;
- }
-
- if (request)
- wb_request_unref (request);
-
- if (file) {
- wb_process_queue (frame, file, 0);
- fd = file->fd;
- }
-
- if (fd)
- fd_unref (fd);
-
STACK_DESTROY (frame->root);
return 0;
}
-int
-wb_flush_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno)
+int32_t
+wb_ffr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
+ int32_t op_errno)
{
- wb_local_t *local = NULL;
- wb_request_t *request = NULL;
- wb_file_t *file = NULL;
- int fop_ret = 0;
- int fop_errno = 0;
+ wb_local_t *local = NULL;
+ wb_file_t *file = NULL;
local = frame->local;
- if (local) {
- file = local->file;
- request = local->request;
- }
-
- fop_ret = op_ret;
- fop_errno = op_errno;
-
- if (request)
- wb_request_unref (request);
+ file = local->file;
- if (!file)
- goto unwind;
+ if (file != NULL) {
+ LOCK (&file->lock);
+ {
+ if (file->op_ret == -1) {
+ op_ret = file->op_ret;
+ op_errno = file->op_errno;
- if (file->op_ret < 0) {
- fop_ret = file->op_ret;
- fop_errno = file->op_errno;
+ file->op_ret = 0;
+ }
+ }
+ UNLOCK (&file->lock);
}
-
- wb_process_queue (frame, file, 0);
-
-unwind:
- STACK_UNWIND_STRICT (flush, frame, fop_ret, fop_errno);
+
+ STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno);
return 0;
}
-int
+int32_t
wb_flush_helper (call_frame_t *frame, xlator_t *this, fd_t *fd)
{
- wb_conf_t *conf = NULL;
- call_frame_t *flush_frame = NULL;
- wb_file_t *file = NULL;
- wb_local_t *local = NULL;
- int op_ret = 0;
- int op_errno = 0;
+ wb_conf_t *conf = NULL;
+ wb_local_t *local = NULL;
+ wb_file_t *file = NULL;
+ call_frame_t *flush_frame = NULL, *process_frame = NULL;
+ int32_t op_ret = -1, op_errno = -1, ret = -1;
conf = this->private;
+
local = frame->local;
- if (local)
- file = local->file;
+ file = local->file;
- if (conf->flush_behind)
- flush_frame = copy_frame (frame);
+ LOCK (&file->lock);
+ {
+ op_ret = file->op_ret;
+ op_errno = file->op_errno;
+ }
+ UNLOCK (&file->lock);
- if (flush_frame) {
- flush_frame->local = frame->local;
- frame->local = NULL;
+ if (local && local->request) {
+ process_frame = copy_frame (frame);
+ if (process_frame == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "out of memory");
+ goto unwind;
+ }
- file->fd = fd_ref (fd);
+ wb_request_unref (local->request);
+ }
+
+ if (conf->flush_behind) {
+ flush_frame = copy_frame (frame);
+ if (flush_frame == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "out of memory");
+ goto unwind;
+ }
- STACK_WIND (flush_frame, wb_flush_bg_cbk,
+ STACK_WIND (flush_frame,
+ wb_ffr_bg_cbk,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->flush,
fd);
- if (file) {
- op_ret = file->op_ret;
- op_errno = file->op_errno;
- }
-
- STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno);
-
} else {
- STACK_WIND (frame, wb_flush_cbk,
+ STACK_WIND (frame,
+ wb_ffr_cbk,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->flush,
fd);
}
+ if (process_frame != NULL) {
+ ret = wb_process_queue (process_frame, file);
+ if ((ret == -1) && (errno == ENOMEM)) {
+ STACK_DESTROY (process_frame->root);
+ goto unwind;
+ }
+
+ STACK_DESTROY (process_frame->root);
+ }
+
+ if (conf->flush_behind) {
+ STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno);
+ }
+
+ return 0;
+
+unwind:
+ STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
return 0;
}
-int
+int32_t
wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd)
{
wb_conf_t *conf = NULL;
@@ -2198,7 +2300,7 @@ wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd)
wb_local_t *local = NULL;
uint64_t tmp_file = 0;
call_stub_t *stub = NULL;
- call_frame_t *process_frame = NULL;
+ call_frame_t *flush_frame = NULL;
wb_request_t *request = NULL;
int32_t ret = 0;
@@ -2216,59 +2318,60 @@ wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd)
file = (wb_file_t *)(long)tmp_file;
- if (!file)
- goto nofile;
+ if (file != NULL) {
+ local = CALLOC (1, sizeof (*local));
+ if (local == NULL) {
+ STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
+ return 0;
+ }
- if (file->disabled)
- goto nofile;
+ local->file = file;
- local = CALLOC (1, sizeof (*local));
- if (local == NULL) {
- STACK_UNWIND (frame, -1, ENOMEM, NULL);
- return 0;
- }
+ frame->local = local;
- local->file = file;
+ stub = fop_flush_stub (frame, wb_flush_helper, fd);
+ if (stub == NULL) {
+ STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
+ return 0;
+ }
- frame->local = local;
- stub = fop_flush_stub (frame, wb_flush_helper, fd);
- if (stub == NULL) {
- STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
- return 0;
- }
+ request = wb_enqueue (file, stub);
+ if (request == NULL) {
+ STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
+ call_stub_destroy (stub);
+ return 0;
+ }
- process_frame = copy_frame (frame);
- if (process_frame == NULL) {
- STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
- call_stub_destroy (stub);
- return 0;
- }
+ ret = wb_process_queue (frame, file);
+ if ((ret == -1) && (errno == ENOMEM)) {
+ STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
+ call_stub_destroy (stub);
+ return 0;
+ }
+ } else {
+ if (conf->flush_behind) {
+ flush_frame = copy_frame (frame);
+ if (flush_frame == NULL) {
+ STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
+ return 0;
+ }
- request = wb_enqueue (file, stub);
- if (request == NULL) {
- STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
- call_stub_destroy (stub);
- STACK_DESTROY (process_frame->root);
- return 0;
- }
+ STACK_UNWIND_STRICT (flush, frame, 0, 0);
- ret = wb_process_queue (process_frame, file, 1);
- if (ret == -1) {
- STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);
- call_stub_destroy (stub);
- STACK_DESTROY (process_frame->root);
- return 0;
+ STACK_WIND (flush_frame,
+ wb_ffr_bg_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->flush,
+ fd);
+ } else {
+ STACK_WIND (frame,
+ wb_ffr_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->flush,
+ fd);
+ }
}
- STACK_DESTROY (process_frame->root);
-
- return 0;
-
-nofile:
- STACK_WIND (frame, wb_flush_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->flush,
- fd);
return 0;
}
@@ -2300,7 +2403,7 @@ wb_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
if (request) {
wb_request_unref (request);
- ret = wb_process_queue (frame, file, 0);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
op_ret = -1;
op_errno = ENOMEM;
@@ -2376,7 +2479,7 @@ wb_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync)
return 0;
}
- ret = wb_process_queue (frame, file, 1);
+ ret = wb_process_queue (frame, file);
if ((ret == -1) && (errno == ENOMEM)) {
STACK_UNWIND_STRICT (fsync, frame, -1, ENOMEM,
NULL, NULL);
@@ -2457,6 +2560,288 @@ wb_priv_dump (xlator_t *this)
return 0;
}
+
+void
+__wb_dump_requests (struct list_head *head, char *prefix, char passive)
+{
+ char key[GF_DUMP_MAX_BUF_LEN];
+ char key_prefix[GF_DUMP_MAX_BUF_LEN];
+ wb_request_t *request = NULL;
+
+ list_for_each_entry (request, head, list) {
+ gf_proc_dump_build_key (key, prefix,
+ passive ? "passive-request"
+ : "active-request");
+ gf_proc_dump_build_key (key_prefix, key,
+ gf_fop_list[request->fop]);
+
+ gf_proc_dump_add_section(key_prefix);
+
+ gf_proc_dump_build_key (key, key_prefix, "request-ptr");
+ gf_proc_dump_write (key, "%p", request);
+
+ gf_proc_dump_build_key (key, key_prefix, "refcount");
+ gf_proc_dump_write (key, "%d", request->refcount);
+
+ if (request->fop == GF_FOP_WRITE) {
+ gf_proc_dump_build_key (key, key_prefix, "stack_wound");
+ gf_proc_dump_write (key, "%d",
+ request->flags.write_request.stack_wound);
+
+ gf_proc_dump_build_key (key, key_prefix, "size");
+ gf_proc_dump_write (key, "%"GF_PRI_SIZET,
+ request->write_size);
+
+ gf_proc_dump_build_key (key, key_prefix, "offset");
+ gf_proc_dump_write (key, "%"PRId64,
+ request->stub->args.writev.off);
+
+ gf_proc_dump_build_key (key, key_prefix,
+ "write_behind");
+ gf_proc_dump_write (key, "%d",
+ request->flags.write_request.write_behind);
+
+ gf_proc_dump_build_key (key, key_prefix, "got_reply");
+ gf_proc_dump_write (key, "%d",
+ request->flags.write_request.got_reply);
+
+ gf_proc_dump_build_key (key, key_prefix, "virgin");
+ gf_proc_dump_write (key, "%d",
+ request->flags.write_request.virgin);
+
+ gf_proc_dump_build_key (key, key_prefix, "flush_all");
+ gf_proc_dump_write (key, "%d",
+ request->flags.write_request.flush_all);
+ } else {
+ gf_proc_dump_build_key (key, key_prefix,
+ "marked_for_resume");
+ gf_proc_dump_write (key, "%d",
+ request->flags.other_requests.marked_for_resume);
+ }
+ }
+}
+
+
+int
+wb_file_dump (xlator_t *this, fd_t *fd)
+{
+ wb_file_t *file = NULL;
+ uint64_t tmp_file = 0;
+ int32_t ret = -1;
+ char key[GF_DUMP_MAX_BUF_LEN];
+ char key_prefix[GF_DUMP_MAX_BUF_LEN];
+
+ if ((fd == NULL) || (this == NULL)) {
+ ret = 0;
+ goto out;
+ }
+
+ ret = fd_ctx_get (fd, this, &tmp_file);
+ if (ret == -1) {
+ ret = 0;
+ goto out;
+ }
+
+ file = (wb_file_t *)(long)tmp_file;
+ if (file == NULL) {
+ ret = 0;
+ goto out;
+ }
+
+ gf_proc_dump_build_key (key_prefix,
+ "xlator.performance.write-behind",
+ "file");
+
+ gf_proc_dump_add_section (key_prefix);
+
+ gf_proc_dump_build_key (key, key_prefix, "fd");
+ gf_proc_dump_write (key, "%p", fd);
+
+ gf_proc_dump_build_key (key, key_prefix, "disabled");
+ gf_proc_dump_write (key, "%d", file->disabled);
+
+ gf_proc_dump_build_key (key, key_prefix, "disable_till");
+ gf_proc_dump_write (key, "%lu", file->disable_till);
+
+ gf_proc_dump_build_key (key, key_prefix, "window_conf");
+ gf_proc_dump_write (key, "%"GF_PRI_SIZET, file->window_conf);
+
+ gf_proc_dump_build_key (key, key_prefix, "window_current");
+ gf_proc_dump_write (key, "%"GF_PRI_SIZET, file->window_current);
+
+ gf_proc_dump_build_key (key, key_prefix, "flags");
+ gf_proc_dump_write (key, "%s", (file->flags & O_APPEND) ? "O_APPEND"
+ : "!O_APPEND");
+
+ gf_proc_dump_build_key (key, key_prefix, "aggregate_current");
+ gf_proc_dump_write (key, "%"GF_PRI_SIZET, file->aggregate_current);
+
+ gf_proc_dump_build_key (key, key_prefix, "refcount");
+ gf_proc_dump_write (key, "%d", file->refcount);
+
+ gf_proc_dump_build_key (key, key_prefix, "op_ret");
+ gf_proc_dump_write (key, "%d", file->op_ret);
+
+ gf_proc_dump_build_key (key, key_prefix, "op_errno");
+ gf_proc_dump_write (key, "%d", file->op_errno);
+
+ LOCK (&file->lock);
+ {
+ if (!list_empty (&file->request)) {
+ __wb_dump_requests (&file->request, key_prefix, 0);
+ }
+
+ if (!list_empty (&file->passive_requests)) {
+ __wb_dump_requests (&file->passive_requests, key_prefix,
+ 1);
+ }
+ }
+ UNLOCK (&file->lock);
+
+out:
+ return ret;
+}
+
+
+int
+validate_options (xlator_t *this, dict_t *options, char **op_errstr)
+{
+ char *str=NULL;
+ uint64_t window_size;
+ gf_boolean_t flush_behind;
+
+ int ret = 0;
+
+
+
+ ret = dict_get_str (options, "cache-size",
+ &str);
+ if (ret == 0) {
+ ret = gf_string2bytesize (str, &window_size);
+ if (ret != 0) {
+ gf_log(this->name, GF_LOG_WARNING, "Validation"
+ "'option cache-size %s failed , Invalid"
+ " number format, ", str);
+ *op_errstr = strdup ("Error, Invalid num format");
+ ret = -1;
+ goto out;
+ }
+
+ if (window_size < (512 * GF_UNIT_KB)) {
+ gf_log(this->name, GF_LOG_WARNING, "Validation"
+ "'option cache-size %s' failed , Min value"
+ "should be 512KiB ", str);
+ *op_errstr = strdup ("Error, Should be min 512KB");
+ ret = -1;
+ goto out;
+ }
+
+ if (window_size > (1 * GF_UNIT_GB)) {
+ gf_log(this->name, GF_LOG_WARNING, "Reconfiguration"
+ "'option cache-size %s' failed , Max value"
+ "can be 1 GiB", str);
+ *op_errstr = strdup ("Error, Max Value is 1GB");
+ ret = -1;
+ goto out;
+ }
+
+
+ gf_log(this->name, GF_LOG_DEBUG, "Validated "
+ "'option cache-size %s '", str);
+ }
+ ret = dict_get_str (options, "flush-behind",
+ &str);
+ if (ret == 0) {
+ ret = gf_string2boolean (str,
+ &flush_behind);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "'flush-behind' takes only boolean arguments");
+ *op_errstr = strdup ("Error, should be boolean");
+ ret = -1;
+ goto out;
+ }
+ }
+ ret =0;
+out:
+ return ret;
+}
+
+int
+reconfigure (xlator_t *this, dict_t *options)
+{
+ char *str=NULL;
+ uint64_t window_size;
+ wb_conf_t *conf = NULL;
+ int ret = 0;
+
+ conf = this->private;
+
+ ret = dict_get_str (options, "cache-size",
+ &str);
+ if (ret == 0) {
+ ret = gf_string2bytesize (str, &window_size);
+ if (ret != 0) {
+ gf_log(this->name, GF_LOG_ERROR, "Reconfiguration"
+ "'option cache-size %s failed , Invalid"
+ " number format, Defaulting to old value "
+ "(%"PRIu64")", str, conf->window_size);
+ ret = -1;
+ goto out;
+ }
+
+ if (window_size < (512 * GF_UNIT_KB)) {
+ gf_log(this->name, GF_LOG_ERROR, "Reconfiguration"
+ "'option cache-size %s' failed , Max value"
+ "can be 512KiB, Defaulting to old value "
+ "(%"PRIu64")", str, conf->window_size);
+ ret = -1;
+ goto out;
+ }
+
+ if (window_size > (2 * GF_UNIT_GB)) {
+ gf_log(this->name, GF_LOG_ERROR, "Reconfiguration"
+ "'option cache-size %s' failed , Max value"
+ "can be 1 GiB, Defaulting to old value "
+ "(%"PRIu64")", str, conf->window_size);
+ ret = -1;
+ goto out;
+ }
+
+ conf->window_size = window_size;
+ gf_log(this->name, GF_LOG_DEBUG, "Reconfiguring "
+ "'option cache-size %s ' to %"PRIu64, str,
+ conf->window_size);
+ }
+ else
+ conf->window_size = WB_WINDOW_SIZE;
+
+ ret = dict_get_str (options, "flush-behind",
+ &str);
+ if (ret == 0) {
+ ret = gf_string2boolean (str,
+ &conf->flush_behind);
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'flush-behind' takes only boolean arguments");
+ conf->flush_behind = 1;
+ return -1;
+ }
+ if (conf->flush_behind) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "enabling flush-behind");
+ }
+ else
+ gf_log (this->name, GF_LOG_DEBUG,
+ "disabling flush-behind");
+ }
+
+
+out:
+ return 0;
+
+}
+
int32_t
init (xlator_t *this)
{
@@ -2551,7 +2936,7 @@ init (xlator_t *this)
gf_log (this->name, GF_LOG_ERROR,
"aggregate-size(%"PRIu64") cannot be more than "
"window-size"
- "(%"PRIu64")", conf->window_size, conf->aggregate_size);
+ "(%"PRIu64")", conf->aggregate_size, conf->window_size);
FREE (conf);
return -1;
}
@@ -2599,10 +2984,16 @@ fini (xlator_t *this)
{
wb_conf_t *conf = this->private;
+ if (!conf)
+ return;
+ this->private = NULL;
FREE (conf);
return;
}
+struct xlator_mops mops = {
+
+};
struct xlator_fops fops = {
.writev = wb_writev,
@@ -2618,8 +3009,6 @@ struct xlator_fops fops = {
.setattr = wb_setattr,
};
-struct xlator_mops mops = {
-};
struct xlator_cbks cbks = {
.release = wb_release