summaryrefslogtreecommitdiffstats
path: root/xlators/performance
diff options
context:
space:
mode:
authorRaghavendra G <raghavendra@zresearch.com>2009-03-12 04:41:23 -0700
committerAnand V. Avati <avati@amp.gluster.com>2009-03-13 20:36:08 +0530
commit7d61f9d69309ccb0f9aa787caacfef77bc4e32d2 (patch)
tree39c6c60d93af32187d3141513a6908e70eece013 /xlators/performance
parent473d02d1698259b4a0a6c22fdf70071e69c6e987 (diff)
write behind preserves order of fops with respect to writes
- the execution order of fops like read, stat, fsync, truncate etc whose results are affected by writes, are preserved. Signed-off-by: Anand V. Avati <avati@amp.gluster.com>
Diffstat (limited to 'xlators/performance')
-rw-r--r--xlators/performance/write-behind/src/write-behind.c1351
1 files changed, 947 insertions, 404 deletions
diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c
index 86752cc94..c41bf3d58 100644
--- a/xlators/performance/write-behind/src/write-behind.c
+++ b/xlators/performance/write-behind/src/write-behind.c
@@ -33,6 +33,7 @@
#include "compat.h"
#include "compat-errno.h"
#include "common-utils.h"
+#include "call-stub.h"
#define MAX_VECTOR_COUNT 8
@@ -42,75 +43,207 @@ struct wb_page;
struct wb_file;
+typedef struct wb_file {
+ int disabled;
+ uint64_t disable_till;
+ size_t window_size;
+ int32_t refcount;
+ int32_t op_ret;
+ int32_t op_errno;
+ list_head_t request;
+ fd_t *fd;
+ gf_lock_t lock;
+ xlator_t *this;
+}wb_file_t;
+
+
+typedef struct wb_request {
+ list_head_t list;
+ list_head_t winds;
+ list_head_t unwinds;
+ list_head_t other_requests;
+ call_stub_t *stub;
+ int32_t refcount;
+ wb_file_t *file;
+ union {
+ struct {
+ char write_behind;
+ char stack_wound;
+ char got_reply;
+ }write_request;
+
+ struct {
+ char marked_for_resume;
+ }other_requests;
+ }flags;
+} wb_request_t;
+
+
struct wb_conf {
- uint64_t aggregate_size;
- uint64_t window_size;
- uint64_t disable_till;
+ uint64_t aggregate_size;
+ uint64_t window_size;
+ uint64_t disable_till;
gf_boolean_t enable_O_SYNC;
gf_boolean_t flush_behind;
};
typedef struct wb_local {
- list_head_t winds;
+ list_head_t winds;
struct wb_file *file;
- list_head_t unwind_frames;
- int op_ret;
- int op_errno;
- call_frame_t *frame;
+ wb_request_t *request;
+ int op_ret;
+ int op_errno;
+ call_frame_t *frame;
+ int32_t reply_count;
} wb_local_t;
-typedef struct write_request {
- call_frame_t *frame;
- off_t offset;
- /* int32_t op_ret;
- int32_t op_errno; */
- struct iovec *vector;
- int32_t count;
- dict_t *refs;
- char write_behind;
- char stack_wound;
- char got_reply;
- list_head_t list;
- list_head_t winds;
- /* list_head_t unwinds;*/
-} wb_write_request_t;
-
-
-struct wb_file {
- int disabled;
- uint64_t disable_till;
- off_t offset;
- size_t window_size;
- int32_t refcount;
- int32_t op_ret;
- int32_t op_errno;
- list_head_t request;
- fd_t *fd;
- gf_lock_t lock;
- xlator_t *this;
-};
-
-
typedef struct wb_conf wb_conf_t;
typedef struct wb_page wb_page_t;
-typedef struct wb_file wb_file_t;
int32_t
wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all);
-int32_t
+size_t
wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds);
-int32_t
-wb_sync_all (call_frame_t *frame, wb_file_t *file);
-
-int32_t
+size_t
__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_size);
+static void
+__wb_request_unref (wb_request_t *this)
+{
+ if (this->refcount <= 0) {
+ gf_log ("wb-request", GF_LOG_ERROR,
+ "refcount(%d) is <= 0", this->refcount);
+ return;
+ }
+
+ this->refcount--;
+ if (this->refcount == 0) {
+ list_del_init (&this->list);
+ if (this->stub && this->stub->fop == GF_FOP_WRITE) {
+ call_stub_destroy (this->stub);
+ }
+
+ FREE (this);
+ }
+}
+
+
+static void
+wb_request_unref (wb_request_t *this)
+{
+ wb_file_t *file = NULL;
+ if (this == NULL) {
+ gf_log ("wb-request", GF_LOG_DEBUG,
+ "request is NULL");
+ return;
+ }
+
+ file = this->file;
+ LOCK (&file->lock);
+ {
+ __wb_request_unref (this);
+ }
+ UNLOCK (&file->lock);
+}
+
+
+static wb_request_t *
+__wb_request_ref (wb_request_t *this)
+{
+ if (this->refcount < 0) {
+ gf_log ("wb-request", GF_LOG_DEBUG,
+ "refcount(%d) is < 0", this->refcount);
+ return NULL;
+ }
+
+ this->refcount++;
+ return this;
+}
+
+
+wb_request_t *
+wb_request_ref (wb_request_t *this)
+{
+ wb_file_t *file = NULL;
+ if (this == NULL) {
+ gf_log ("wb-request", GF_LOG_DEBUG,
+ "request is NULL");
+ return NULL;
+ }
+
+ file = this->file;
+ LOCK (&file->lock);
+ {
+ this = __wb_request_ref (this);
+ }
+ UNLOCK (&file->lock);
+
+ return this;
+}
+
+
+wb_request_t *
+wb_enqueue (wb_file_t *file,
+ call_stub_t *stub)
+{
+ wb_request_t *request = NULL;
+ call_frame_t *frame = NULL;
+ wb_local_t *local = NULL;
+ struct iovec *vector = NULL;
+ int32_t count = 0;
+
+ request = CALLOC (1, sizeof (*request));
+
+ INIT_LIST_HEAD (&request->list);
+ INIT_LIST_HEAD (&request->winds);
+ INIT_LIST_HEAD (&request->unwinds);
+ INIT_LIST_HEAD (&request->other_requests);
+
+ request->stub = stub;
+ request->file = file;
+
+ frame = stub->frame;
+ local = frame->local;
+ if (local) {
+ local->request = request;
+ }
+
+ if (stub->fop == GF_FOP_WRITE) {
+ vector = stub->args.writev.vector;
+ count = stub->args.writev.count;
+
+ frame = stub->frame;
+ local = frame->local;
+ local->op_ret = iov_length (vector, count);
+ local->op_errno = 0;
+ }
+
+ LOCK (&file->lock);
+ {
+ list_add_tail (&request->list, &file->request);
+ if (stub->fop == GF_FOP_WRITE) {
+ /* reference for stack winding */
+ __wb_request_ref (request);
+
+ /* reference for stack unwinding */
+ __wb_request_ref (request);
+ } else {
+ /*reference for resuming */
+ __wb_request_ref (request);
+ }
+ }
+ UNLOCK (&file->lock);
+
+ return request;
+}
+
+
wb_file_t *
wb_file_create (xlator_t *this,
fd_t *fd)
@@ -161,10 +294,12 @@ wb_sync_cbk (call_frame_t *frame,
int32_t op_errno,
struct stat *stbuf)
{
- wb_local_t *local = NULL;
- list_head_t *winds = NULL;
- wb_file_t *file = NULL;
- wb_write_request_t *request = NULL, *dummy = NULL;
+ wb_local_t *local = NULL;
+ list_head_t *winds = NULL;
+ wb_file_t *file = NULL;
+ wb_request_t *request = NULL, *dummy = NULL;
+ wb_local_t *per_request_local = NULL;
+
local = frame->local;
winds = &local->winds;
@@ -173,27 +308,25 @@ wb_sync_cbk (call_frame_t *frame,
LOCK (&file->lock);
{
list_for_each_entry_safe (request, dummy, winds, winds) {
- request->got_reply = 1;
- if (!request->write_behind && (op_ret == -1)) {
- wb_local_t *per_request_local = request->frame->local;
+ request->flags.write_request.got_reply = 1;
+
+ if (!request->flags.write_request.write_behind
+ && (op_ret == -1)) {
+ per_request_local = request->stub->frame->local;
per_request_local->op_ret = op_ret;
per_request_local->op_errno = op_errno;
}
- /*
- request->op_ret = op_ret;
- request->op_errno = op_errno;
- */
+ __wb_request_unref (request);
+ }
+
+ if (op_ret == -1) {
+ file->op_ret = op_ret;
+ file->op_errno = op_errno;
}
}
UNLOCK (&file->lock);
- if (op_ret == -1)
- {
- file->op_ret = op_ret;
- file->op_errno = op_errno;
- }
-
wb_process_queue (frame, file, 0);
/* safe place to do fd_unref */
@@ -204,43 +337,25 @@ wb_sync_cbk (call_frame_t *frame,
return 0;
}
-int32_t
-wb_sync_all (call_frame_t *frame, wb_file_t *file)
-{
- list_head_t winds;
- int32_t bytes = 0;
-
- INIT_LIST_HEAD (&winds);
-
- LOCK (&file->lock);
- {
- bytes = __wb_mark_winds (&file->request, &winds, 0);
- }
- UNLOCK (&file->lock);
-
- wb_sync (frame, file, &winds);
-
- return bytes;
-}
-
-int32_t
+size_t
wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
{
- wb_write_request_t *dummy = NULL, *request = NULL, *first_request = NULL, *next = NULL;
- size_t total_count = 0, count = 0;
- size_t copied = 0;
- call_frame_t *sync_frame = NULL;
- dict_t *refs = NULL;
- wb_local_t *local = NULL;
- struct iovec *vector = NULL;
- int32_t bytes = 0;
- size_t bytecount = 0;
-
- list_for_each_entry (request, winds, winds)
- {
- total_count += request->count;
- bytes += iov_length (request->vector, request->count);
+ wb_request_t *dummy = NULL, *request = NULL;
+ wb_request_t *first_request = NULL, *next = NULL;
+ size_t total_count = 0, count = 0;
+ size_t copied = 0;
+ call_frame_t *sync_frame = NULL;
+ dict_t *refs = NULL;
+ wb_local_t *local = NULL;
+ struct iovec *vector = NULL;
+ size_t bytes = 0;
+ size_t bytecount = 0;
+
+ list_for_each_entry (request, winds, winds) {
+ total_count += request->stub->args.writev.count;
+ bytes += iov_length (request->stub->args.writev.vector,
+ request->stub->args.writev.count);
}
if (!total_count) {
@@ -258,26 +373,29 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
first_request = request;
}
- count += request->count;
- bytecount = VECTORSIZE (request->count);
+ count += request->stub->args.writev.count;
+ bytecount = VECTORSIZE (request->stub->args.writev.count);
memcpy (((char *)vector)+copied,
- request->vector,
+ request->stub->args.writev.vector,
bytecount);
copied += bytecount;
- if (request->refs) {
- dict_copy (request->refs, refs);
+ if (request->stub->args.writev.req_refs) {
+ dict_copy (request->stub->args.writev.req_refs, refs);
}
next = NULL;
if (request->winds.next != winds) {
- next = list_entry (request->winds.next, struct write_request, winds);
+ next = list_entry (request->winds.next,
+ wb_request_t, winds);
}
list_del_init (&request->winds);
list_add_tail (&request->winds, &local->winds);
- if (!next || ((count + next->count) > MAX_VECTOR_COUNT)) {
+ if (!next
+ || ((count + next->stub->args.writev.count) > MAX_VECTOR_COUNT))
+ {
sync_frame = copy_frame (frame);
sync_frame->local = local;
local->file = file;
@@ -288,7 +406,8 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)
FIRST_CHILD(sync_frame->this),
FIRST_CHILD(sync_frame->this)->fops->writev,
file->fd, vector,
- count, first_request->offset);
+ count,
+ first_request->stub->args.writev.off);
dict_unref (refs);
FREE (vector);
@@ -311,15 +430,44 @@ wb_stat_cbk (call_frame_t *frame,
int32_t op_errno,
struct stat *buf)
{
- wb_local_t *local = NULL;
+ wb_local_t *local = NULL;
+ wb_request_t *request = NULL;
+ call_frame_t *process_frame = NULL;
+ wb_file_t *file = NULL;
local = frame->local;
-
- if (local->file)
- fd_unref (local->file->fd);
+ file = local->file;
+
+ request = local->request;
+ if (request) {
+ process_frame = copy_frame (frame);
+ }
STACK_UNWIND (frame, op_ret, op_errno, buf);
+ if (request) {
+ wb_request_unref (request);
+ wb_process_queue (process_frame, file, 0);
+ STACK_DESTROY (process_frame->root);
+ }
+
+ if (file) {
+ fd_unref (file->fd);
+ }
+
+ return 0;
+}
+
+
+static int32_t
+wb_stat_helper (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc)
+{
+ STACK_WIND (frame, wb_stat_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->stat,
+ loc);
return 0;
}
@@ -329,13 +477,14 @@ wb_stat (call_frame_t *frame,
xlator_t *this,
loc_t *loc)
{
- wb_file_t *file = NULL;
- fd_t *iter_fd = NULL;
- wb_local_t *local = NULL;
- uint64_t tmp_file = 0;
+ wb_file_t *file = NULL;
+ fd_t *iter_fd = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+ call_stub_t *stub = NULL;
- if (loc->inode)
- {
+ if (loc->inode) {
+ /* FIXME: fd_lookup extends life of fd till stat returns */
iter_fd = fd_lookup (loc->inode, frame->root->pid);
if (iter_fd) {
if (!fd_ctx_get (iter_fd, this, &tmp_file)) {
@@ -344,9 +493,6 @@ wb_stat (call_frame_t *frame,
fd_unref (iter_fd);
}
}
- if (file) {
- wb_sync_all (frame, file);
- }
}
local = CALLOC (1, sizeof (*local));
@@ -354,10 +500,64 @@ wb_stat (call_frame_t *frame,
frame->local = local;
- STACK_WIND (frame, wb_stat_cbk,
+ if (file) {
+ stub = fop_stat_stub (frame, wb_stat_helper, loc);
+ if (stub == NULL) {
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+
+ wb_enqueue (file, stub);
+
+ wb_process_queue (frame, file, 1);
+ } else {
+ STACK_WIND (frame, wb_stat_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->stat,
+ loc);
+ }
+
+ return 0;
+}
+
+
+int32_t
+wb_fstat_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ wb_local_t *local = NULL;
+ wb_request_t *request = NULL;
+ wb_file_t *file = NULL;
+
+ local = frame->local;
+ file = local->file;
+
+ request = local->request;
+ if (request) {
+ wb_request_unref (request);
+ wb_process_queue (frame, file, 0);
+ }
+
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+
+ return 0;
+}
+
+
+int32_t
+wb_fstat_helper (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd)
+{
+ STACK_WIND (frame,
+ wb_fstat_cbk,
FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->stat,
- loc);
+ FIRST_CHILD(this)->fops->fstat,
+ fd);
return 0;
}
@@ -367,9 +567,10 @@ wb_fstat (call_frame_t *frame,
xlator_t *this,
fd_t *fd)
{
- wb_file_t *file = NULL;
- wb_local_t *local = NULL;
- uint64_t tmp_file = 0;
+ wb_file_t *file = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+ call_stub_t *stub = NULL;
if (fd_ctx_get (fd, this, &tmp_file)) {
gf_log (this->name, GF_LOG_ERROR, "returning EBADFD");
@@ -378,21 +579,29 @@ wb_fstat (call_frame_t *frame,
}
file = (wb_file_t *)(long)tmp_file;
- if (file) {
- fd_ref (file->fd);
- wb_sync_all (frame, file);
- }
-
local = CALLOC (1, sizeof (*local));
local->file = file;
frame->local = local;
-
- STACK_WIND (frame,
- wb_stat_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->fstat,
- fd);
+
+ if (file) {
+ stub = fop_fstat_stub (frame, wb_fstat_helper, fd);
+ if (stub == NULL) {
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+
+ wb_enqueue (file, stub);
+
+ wb_process_queue (frame, file, 1);
+ } else {
+ STACK_WIND (frame,
+ wb_fstat_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fstat,
+ fd);
+ }
+
return 0;
}
@@ -405,13 +614,48 @@ wb_truncate_cbk (call_frame_t *frame,
int32_t op_errno,
struct stat *buf)
{
- wb_local_t *local = NULL;
-
+ wb_local_t *local = NULL;
+ wb_request_t *request = NULL;
+ wb_file_t *file = NULL;
+ call_frame_t *process_frame = NULL;
+
local = frame->local;
- if (local->file)
- fd_unref (local->file->fd);
+ file = local->file;
+ request = local->request;
+
+ if (request) {
+ process_frame = copy_frame (frame);
+ }
STACK_UNWIND (frame, op_ret, op_errno, buf);
+
+ if (request) {
+ wb_request_unref (request);
+ wb_process_queue (process_frame, file, 0);
+ STACK_DESTROY (process_frame->root);
+ }
+
+ if (file) {
+ fd_unref (file->fd);
+ }
+
+ return 0;
+}
+
+
+static int32_t
+wb_truncate_helper (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ off_t offset)
+{
+ STACK_WIND (frame,
+ wb_truncate_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->truncate,
+ loc,
+ offset);
+
return 0;
}
@@ -422,13 +666,16 @@ wb_truncate (call_frame_t *frame,
loc_t *loc,
off_t offset)
{
- wb_file_t *file = NULL;
- fd_t *iter_fd = NULL;
- wb_local_t *local = NULL;
- uint64_t tmp_file = 0;
+ wb_file_t *file = NULL;
+ fd_t *iter_fd = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+ call_stub_t *stub = NULL;
if (loc->inode)
{
+ /* FIXME: fd_lookup extends life of fd till the execution of
+ truncate_cbk */
iter_fd = fd_lookup (loc->inode, frame->root->pid);
if (iter_fd) {
if (!fd_ctx_get (iter_fd, this, &tmp_file)){
@@ -437,37 +684,90 @@ wb_truncate (call_frame_t *frame,
fd_unref (iter_fd);
}
}
-
- if (file)
- {
- wb_sync_all (frame, file);
- }
}
local = CALLOC (1, sizeof (*local));
local->file = file;
-
+
frame->local = local;
+ if (file) {
+ stub = fop_truncate_stub (frame, wb_truncate_helper, loc,
+ offset);
+ if (stub == NULL) {
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+
+ wb_enqueue (file, stub);
+
+ wb_process_queue (frame, file, 1);
+
+ } else {
+ STACK_WIND (frame,
+ wb_truncate_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->truncate,
+ loc,
+ offset);
+ }
+ return 0;
+}
+
+
+int32_t
+wb_ftruncate_cbk (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct stat *buf)
+{
+ wb_local_t *local = NULL;
+ wb_request_t *request = NULL;
+ wb_file_t *file = NULL;
+
+ local = frame->local;
+ file = local->file;
+ request = local->request;
+
+ if (request) {
+ wb_request_unref (request);
+ wb_process_queue (frame, file, 0);
+ }
+
+ STACK_UNWIND (frame, op_ret, op_errno, buf);
+
+ return 0;
+}
+
+
+static int32_t
+wb_ftruncate_helper (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ off_t offset)
+{
STACK_WIND (frame,
- wb_truncate_cbk,
+ wb_ftruncate_cbk,
FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->truncate,
- loc,
+ FIRST_CHILD(this)->fops->ftruncate,
+ fd,
offset);
return 0;
}
-
+
int32_t
wb_ftruncate (call_frame_t *frame,
xlator_t *this,
fd_t *fd,
off_t offset)
{
- wb_file_t *file = NULL;
- wb_local_t *local = NULL;
- uint64_t tmp_file = 0;
+ wb_file_t *file = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+ call_stub_t *stub = NULL;
if (fd_ctx_get (fd, this, &tmp_file)) {
gf_log (this->name, GF_LOG_ERROR, "returning EBADFD");
@@ -476,23 +776,32 @@ wb_ftruncate (call_frame_t *frame,
}
file = (wb_file_t *)(long)tmp_file;
- if (file)
- wb_sync_all (frame, file);
local = CALLOC (1, sizeof (*local));
local->file = file;
- if (file)
- fd_ref (file->fd);
-
frame->local = local;
- STACK_WIND (frame,
- wb_truncate_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->ftruncate,
- fd,
- offset);
+ if (file) {
+ stub = fop_ftruncate_stub (frame, wb_ftruncate_helper, fd,
+ offset);
+ if (stub == NULL) {
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+
+ wb_enqueue (file, stub);
+
+ wb_process_queue (frame, file, 1);
+ } else {
+ STACK_WIND (frame,
+ wb_ftruncate_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->ftruncate,
+ fd,
+ offset);
+ }
+
return 0;
}
@@ -505,13 +814,48 @@ wb_utimens_cbk (call_frame_t *frame,
int32_t op_errno,
struct stat *buf)
{
- wb_local_t *local = NULL;
+ wb_local_t *local = NULL;
+ wb_request_t *request = NULL;
+ call_frame_t *process_frame = NULL;
+ wb_file_t *file = NULL;
local = frame->local;
- if (local->file)
- fd_unref (local->file->fd);
+ file = local->file;
+ request = local->request;
+
+ if (request) {
+ process_frame = copy_frame (frame);
+ }
STACK_UNWIND (frame, op_ret, op_errno, buf);
+
+ if (request) {
+ wb_request_unref (request);
+ wb_process_queue (process_frame, file, 0);
+ STACK_DESTROY (process_frame->root);
+ }
+
+ if (file) {
+ fd_unref (file->fd);
+ }
+
+ return 0;
+}
+
+
+static int32_t
+wb_utimens_helper (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ struct timespec tv[2])
+{
+ STACK_WIND (frame,
+ wb_utimens_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->utimens,
+ loc,
+ tv);
+
return 0;
}
@@ -522,12 +866,15 @@ wb_utimens (call_frame_t *frame,
loc_t *loc,
struct timespec tv[2])
{
- wb_file_t *file = NULL;
- fd_t *iter_fd = NULL;
- wb_local_t *local = NULL;
- uint64_t tmp_file = 0;
+ wb_file_t *file = NULL;
+ fd_t *iter_fd = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+ call_stub_t *stub = NULL;
if (loc->inode) {
+ /* FIXME: fd_lookup extends life of fd till the execution
+ of wb_utimens_cbk */
iter_fd = fd_lookup (loc->inode, frame->root->pid);
if (iter_fd) {
if (!fd_ctx_get (iter_fd, this, &tmp_file)) {
@@ -537,8 +884,6 @@ wb_utimens (call_frame_t *frame,
}
}
- if (file)
- wb_sync_all (frame, file);
}
local = CALLOC (1, sizeof (*local));
@@ -546,12 +891,25 @@ wb_utimens (call_frame_t *frame,
frame->local = local;
- STACK_WIND (frame,
- wb_utimens_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->utimens,
- loc,
- tv);
+ if (file) {
+ stub = fop_utimens_stub (frame, wb_utimens_helper, loc, tv);
+ if (stub == NULL) {
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
+
+ wb_enqueue (file, stub);
+
+ wb_process_queue (frame, file, 1);
+ } else {
+ STACK_WIND (frame,
+ wb_utimens_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->utimens,
+ loc,
+ tv);
+ }
+
return 0;
}
@@ -563,7 +921,7 @@ wb_open_cbk (call_frame_t *frame,
int32_t op_errno,
fd_t *fd)
{
- int32_t flags = 0;
+ int32_t flags = 0;
wb_file_t *file = NULL;
wb_conf_t *conf = this->private;
@@ -574,17 +932,18 @@ wb_open_cbk (call_frame_t *frame,
/* If mandatory locking has been enabled on this file,
we disable caching on it */
- if ((fd->inode->st_mode & S_ISGID) && !(fd->inode->st_mode & S_IXGRP))
+ if ((fd->inode->st_mode & S_ISGID)
+ && !(fd->inode->st_mode & S_IXGRP))
file->disabled = 1;
/* If O_DIRECT then, we disable chaching */
if (frame->local)
{
flags = *((int32_t *)frame->local);
- if (((flags & O_DIRECT) == O_DIRECT) ||
- ((flags & O_RDONLY) == O_RDONLY) ||
- (((flags & O_SYNC) == O_SYNC) &&
- conf->enable_O_SYNC == _gf_true)) {
+ if (((flags & O_DIRECT) == O_DIRECT)
+ || ((flags & O_RDONLY) == O_RDONLY)
+ || (((flags & O_SYNC) == O_SYNC)
+ && conf->enable_O_SYNC == _gf_true)) {
file->disabled = 1;
}
}
@@ -635,8 +994,8 @@ wb_create_cbk (call_frame_t *frame,
* If mandatory locking has been enabled on this file,
* we disable caching on it
*/
- if ((fd->inode->st_mode & S_ISGID) &&
- !(fd->inode->st_mode & S_IXGRP))
+ if ((fd->inode->st_mode & S_ISGID)
+ && !(fd->inode->st_mode & S_IXGRP))
{
file->disabled = 1;
}
@@ -666,44 +1025,43 @@ wb_create (call_frame_t *frame,
}
-int32_t
-__wb_cleanup_queue (wb_file_t *file)
+size_t
+__wb_mark_wind_all (list_head_t *list, list_head_t *winds)
{
- wb_write_request_t *request = NULL, *dummy = NULL;
- int32_t bytes = 0;
+ wb_request_t *request = NULL;
+ size_t size = 0;
+ struct iovec *vector = NULL;
+ int32_t count = 0;
+ char first_request = 1;
+ off_t offset_expected = 0;
+ size_t length = 0;
- list_for_each_entry_safe (request, dummy, &file->request, list)
+ list_for_each_entry (request, list, list)
{
- if (request->got_reply && request->write_behind)
- {
- bytes += iov_length (request->vector, request->count);
- list_del_init (&request->list);
-
- FREE (request->vector);
- dict_unref (request->refs);
-
- FREE (request);
+ if ((request->stub == NULL)
+ || (request->stub->fop != GF_FOP_WRITE)) {
+ break;
}
- }
- return bytes;
-}
+ vector = request->stub->args.writev.vector;
+ count = request->stub->args.writev.count;
+ if (!request->flags.write_request.stack_wound) {
+ if (first_request) {
+ first_request = 0;
+ offset_expected = request->stub->args.writev.off;
+ }
+
+ if (request->stub->args.writev.off != offset_expected) {
+ break;
+ }
+ length = iov_length (vector, count);
+ size += length;
+ offset_expected += length;
-int32_t
-__wb_mark_wind_all (list_head_t *list, list_head_t *winds)
-{
- wb_write_request_t *request = NULL;
- size_t size = 0;
-
- list_for_each_entry (request, list, list)
- {
- if (!request->stack_wound)
- {
- size += iov_length (request->vector, request->count);
- request->stack_wound = 1;
+ request->flags.write_request.stack_wound = 1;
list_add_tail (&request->winds, winds);
- }
+ }
}
return size;
@@ -711,32 +1069,66 @@ __wb_mark_wind_all (list_head_t *list, list_head_t *winds)
size_t
-__wb_get_aggregate_size (list_head_t *list)
+__wb_get_aggregate_size (list_head_t *list, char *other_fop_in_queue,
+ char *non_contiguous_writes)
{
- wb_write_request_t *request = NULL;
- size_t size = 0;
+ wb_request_t *request = NULL;
+ size_t size = 0, length = 0;
+ struct iovec *vector = NULL;
+ int32_t count = 0;
+ char first_request = 1;
+ off_t offset_expected = 0;
list_for_each_entry (request, list, list)
{
- if (!request->stack_wound)
- {
- size += iov_length (request->vector, request->count);
+ if ((request->stub == NULL)
+ || (request->stub->fop != GF_FOP_WRITE)) {
+ if (request->stub && other_fop_in_queue) {
+ *other_fop_in_queue = 1;
+ }
+ break;
+ }
+
+ vector = request->stub->args.writev.vector;
+ count = request->stub->args.writev.count;
+ if (!request->flags.write_request.stack_wound) {
+ if (first_request) {
+ first_request = 0;
+ offset_expected = request->stub->args.writev.off;
+ }
+
+ if (offset_expected != request->stub->args.writev.off) {
+ if (non_contiguous_writes) {
+ *non_contiguous_writes = 1;
+ }
+ break;
+ }
+
+ length = iov_length (vector, count);
+ size += length;
+ offset_expected += length;
}
}
return size;
}
+
uint32_t
__wb_get_incomplete_writes (list_head_t *list)
{
- wb_write_request_t *request = NULL;
- uint32_t count = 0;
+ wb_request_t *request = NULL;
+ uint32_t count = 0;
list_for_each_entry (request, list, list)
{
- if (request->stack_wound && !request->got_reply)
- {
+ if ((request->stub == NULL)
+ || (request->stub->fop != GF_FOP_WRITE)) {
+ break;
+ }
+
+ if (request->flags.write_request.stack_wound
+ && !request->flags.write_request.got_reply) {
count++;
}
}
@@ -744,18 +1136,22 @@ __wb_get_incomplete_writes (list_head_t *list)
return count;
}
-int32_t
+
+size_t
__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf)
{
- size_t aggregate_current = 0;
+ size_t aggregate_current = 0;
uint32_t incomplete_writes = 0;
+ char other_fop_in_queue = 0;
+ char non_contiguous_writes = 0;
incomplete_writes = __wb_get_incomplete_writes (list);
- aggregate_current = __wb_get_aggregate_size (list);
+ aggregate_current = __wb_get_aggregate_size (list, &other_fop_in_queue,
+ &non_contiguous_writes);
- if ((incomplete_writes == 0) || (aggregate_current >= aggregate_conf))
- {
+ if ((incomplete_writes == 0) || (aggregate_current >= aggregate_conf)
+ || other_fop_in_queue || non_contiguous_writes) {
__wb_mark_wind_all (list, winds);
}
@@ -766,14 +1162,25 @@ __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf)
size_t
__wb_get_window_size (list_head_t *list)
{
- wb_write_request_t *request = NULL;
- size_t size = 0;
+ wb_request_t *request = NULL;
+ size_t size = 0;
+ struct iovec *vector = NULL;
+ int32_t count = 0;
list_for_each_entry (request, list, list)
{
- if (request->write_behind && !request->got_reply)
+ if ((request->stub == NULL)
+ || (request->stub->fop != GF_FOP_WRITE)) {
+ continue;
+ }
+
+ vector = request->stub->args.writev.vector;
+ count = request->stub->args.writev.count;
+
+ if (request->flags.write_request.write_behind
+ && !request->flags.write_request.got_reply)
{
- size += iov_length (request->vector, request->count);
+ size += iov_length (vector, count);
}
}
@@ -784,23 +1191,28 @@ __wb_get_window_size (list_head_t *list)
size_t
__wb_mark_unwind_till (list_head_t *list, list_head_t *unwinds, size_t size)
{
- size_t written_behind = 0;
- wb_write_request_t *request = NULL;
+ size_t written_behind = 0;
+ wb_request_t *request = NULL;
+ struct iovec *vector = NULL;
+ int32_t count = 0;
list_for_each_entry (request, list, list)
{
- if (written_behind <= size)
- {
- if (!request->write_behind)
- {
- wb_local_t *local = request->frame->local;
- written_behind += iov_length (request->vector, request->count);
- request->write_behind = 1;
- list_add_tail (&local->unwind_frames, unwinds);
- }
+ if ((request->stub == NULL)
+ || (request->stub->fop != GF_FOP_WRITE)) {
+ continue;
}
- else
- {
+
+ vector = request->stub->args.writev.vector;
+ count = request->stub->args.writev.count;
+
+ if (written_behind <= size) {
+ if (!request->flags.write_request.write_behind) {
+ written_behind += iov_length (vector, count);
+ request->flags.write_request.write_behind = 1;
+ list_add_tail (&request->unwinds, unwinds);
+ }
+ } else {
break;
}
}
@@ -825,16 +1237,48 @@ __wb_mark_unwinds (list_head_t *list, list_head_t *unwinds, size_t window_conf)
}
+uint32_t
+__wb_get_other_requests (list_head_t *list, list_head_t *other_requests)
+{
+ wb_request_t *request = NULL;
+ uint32_t count = 0;
+ list_for_each_entry (request, list, list) {
+ if ((request->stub == NULL)
+ || (request->stub->fop == GF_FOP_WRITE)) {
+ break;
+ }
+
+ if (!request->flags.other_requests.marked_for_resume) {
+ request->flags.other_requests.marked_for_resume = 1;
+ list_add_tail (&request->other_requests,
+ other_requests);
+ count++;
+
+ /* lets handle one at a time */
+ break;
+ }
+ }
+
+ return count;
+}
+
+
int32_t
wb_stack_unwind (list_head_t *unwinds)
{
- struct stat buf = {0,};
- wb_local_t *local = NULL, *dummy = NULL;
+ struct stat buf = {0,};
+ wb_request_t *request = NULL, *dummy = NULL;
+ call_frame_t *frame = NULL;
+ wb_local_t *local = NULL;
- list_for_each_entry_safe (local, dummy, unwinds, unwind_frames)
+ list_for_each_entry_safe (request, dummy, unwinds, unwinds)
{
- list_del_init (&local->unwind_frames);
- STACK_UNWIND (local->frame, local->op_ret, local->op_errno, &buf);
+ frame = request->stub->frame;
+ local = frame->local;
+
+ STACK_UNWIND (frame, local->op_ret, local->op_errno, &buf);
+
+ wb_request_unref (request);
}
return 0;
@@ -842,13 +1286,54 @@ wb_stack_unwind (list_head_t *unwinds)
int32_t
-wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, list_head_t *unwinds)
+wb_resume_other_requests (call_frame_t *frame, wb_file_t *file,
+ list_head_t *other_requests)
+{
+ int32_t ret = 0;
+ wb_request_t *request = NULL, *dummy = NULL;
+ int32_t fops_removed = 0;
+ char wind = 0;
+ call_stub_t *stub = NULL;
+
+ if (list_empty (other_requests)) {
+ goto out;
+ }
+
+ list_for_each_entry_safe (request, dummy, other_requests,
+ other_requests) {
+ wind = request->stub->wind;
+ stub = request->stub;
+
+ LOCK (&file->lock);
+ {
+ request->stub = NULL;
+ }
+ UNLOCK (&file->lock);
+
+ if (!wind) {
+ wb_request_unref (request);
+ fops_removed++;
+ }
+
+ call_resume (stub);
+ }
+
+ if (fops_removed > 0) {
+ wb_process_queue (frame, file, 0);
+ }
+
+out:
+ return ret;
+}
+
+
+int32_t
+wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds,
+ list_head_t *unwinds, list_head_t *other_requests)
{
- /* copy the frame before calling wb_stack_unwind, since this request containing current frame might get unwound */
- /* call_frame_t *sync_frame = copy_frame (frame); */
-
wb_stack_unwind (unwinds);
wb_sync (frame, file, winds);
+ wb_resume_other_requests (frame, file, other_requests);
return 0;
}
@@ -857,67 +1342,35 @@ wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, list_head_t
int32_t
wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all)
{
- list_head_t winds, unwinds;
- size_t size = 0;
- wb_conf_t *conf = file->this->private;
+ list_head_t winds, unwinds, other_requests;
+ size_t size = 0;
+ wb_conf_t *conf = file->this->private;
+ uint32_t count = 0;
INIT_LIST_HEAD (&winds);
INIT_LIST_HEAD (&unwinds);
-
- if (!file)
- {
+ INIT_LIST_HEAD (&other_requests);
+
+ if (!file) {
return -1;
}
size = flush_all ? 0 : conf->aggregate_size;
LOCK (&file->lock);
{
- __wb_cleanup_queue (file);
- __wb_mark_winds (&file->request, &winds, size);
- __wb_mark_unwinds (&file->request, &unwinds, conf->window_size);
- }
- UNLOCK (&file->lock);
-
- wb_do_ops (frame, file, &winds, &unwinds);
- return 0;
-}
-
-
-wb_write_request_t *
-wb_enqueue (wb_file_t *file,
- call_frame_t *frame,
- struct iovec *vector,
- int32_t count,
- off_t offset)
-{
- wb_write_request_t *request = NULL;
- wb_local_t *local = CALLOC (1, sizeof (*local));
+ count = __wb_get_other_requests (&file->request,
+ &other_requests);
- request = CALLOC (1, sizeof (*request));
-
- INIT_LIST_HEAD (&request->list);
- INIT_LIST_HEAD (&request->winds);
-
- request->frame = frame;
- request->vector = iov_dup (vector, count);
- request->count = count;
- request->offset = offset;
- request->refs = dict_ref (frame->root->req_refs);
-
- frame->local = local;
- local->frame = frame;
- local->op_ret = iov_length (vector, count);
- local->op_errno = 0;
- INIT_LIST_HEAD (&local->unwind_frames);
+ if (count == 0) {
+ __wb_mark_winds (&file->request, &winds, size);
+ }
- LOCK (&file->lock);
- {
- list_add_tail (&request->list, &file->request);
- file->offset = offset + iov_length (vector, count);
+ __wb_mark_unwinds (&file->request, &unwinds, conf->window_size);
}
UNLOCK (&file->lock);
- return request;
+ wb_do_ops (frame, file, &winds, &unwinds, &other_requests);
+ return 0;
}
@@ -942,11 +1395,13 @@ wb_writev (call_frame_t *frame,
int32_t count,
off_t offset)
{
- wb_file_t *file = NULL;
- char offset_expected = 1, wb_disabled = 0;
+ wb_file_t *file = NULL;
+ char wb_disabled = 0;
call_frame_t *process_frame = NULL;
- size_t size = 0;
- uint64_t tmp_file = 0;
+ size_t size = 0;
+ uint64_t tmp_file = 0;
+ call_stub_t *stub = NULL;
+ wb_local_t *local = NULL;
if (vector != NULL)
size = iov_length (vector, count);
@@ -975,9 +1430,6 @@ wb_writev (call_frame_t *frame,
}
wb_disabled = 1;
}
-
- if (file->offset != offset)
- offset_expected = 0;
}
UNLOCK (&file->lock);
@@ -986,7 +1438,7 @@ wb_writev (call_frame_t *frame,
wb_writev_cbk,
FIRST_CHILD (frame->this),
FIRST_CHILD (frame->this)->fops->writev,
- file->fd,
+ fd,
vector,
count,
offset);
@@ -995,10 +1447,17 @@ wb_writev (call_frame_t *frame,
process_frame = copy_frame (frame);
- if (!offset_expected)
- wb_process_queue (process_frame, file, 1);
+ local = CALLOC (1, sizeof (*local));
+ frame->local = local;
+ local->file = file;
+
+ stub = fop_writev_stub (frame, NULL, fd, vector, count, offset);
+ if (stub == NULL) {
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
- wb_enqueue (file, frame, vector, count, offset);
+ wb_enqueue (file, stub);
wb_process_queue (process_frame, file, 0);
STACK_DESTROY (process_frame->root);
@@ -1017,11 +1476,38 @@ wb_readv_cbk (call_frame_t *frame,
int32_t count,
struct stat *stbuf)
{
- wb_local_t *local = NULL;
+ wb_local_t *local = NULL;
+ wb_file_t *file = NULL;
+ wb_request_t *request = NULL;
local = frame->local;
+ file = local->file;
+ request = local->request;
+
+ if (request) {
+ wb_request_unref (request);
+ wb_process_queue (frame, file, 0);
+ }
STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf);
+
+ return 0;
+}
+
+
+static int32_t
+wb_readv_helper (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ size_t size,
+ off_t offset)
+{
+ STACK_WIND (frame,
+ wb_readv_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readv,
+ fd, size, offset);
+
return 0;
}
@@ -1033,9 +1519,10 @@ wb_readv (call_frame_t *frame,
size_t size,
off_t offset)
{
- wb_file_t *file = NULL;
- wb_local_t *local = NULL;
- uint64_t tmp_file = 0;
+ wb_file_t *file = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+ call_stub_t *stub = NULL;
if (fd_ctx_get (fd, this, &tmp_file)) {
gf_log (this->name, GF_LOG_ERROR, "returning EBADFD");
@@ -1044,19 +1531,29 @@ wb_readv (call_frame_t *frame,
}
file = (wb_file_t *)(long)tmp_file;
- if (file)
- wb_sync_all (frame, file);
local = CALLOC (1, sizeof (*local));
local->file = file;
frame->local = local;
+ if (file) {
+ stub = fop_readv_stub (frame, wb_readv_helper, fd, size,
+ offset);
+ if (stub == NULL) {
+ STACK_UNWIND (frame, -1, ENOMEM, NULL);
+ return 0;
+ }
- STACK_WIND (frame,
- wb_readv_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->readv,
- fd, size, offset);
+ wb_enqueue (file, stub);
+
+ wb_process_queue (frame, file, 1);
+ } else {
+ STACK_WIND (frame,
+ wb_readv_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readv,
+ fd, size, offset);
+ }
return 0;
}
@@ -1069,24 +1566,6 @@ wb_ffr_bg_cbk (call_frame_t *frame,
int32_t op_ret,
int32_t op_errno)
{
- wb_local_t *local = NULL;
- wb_file_t *file = NULL;
-
- local = frame->local;
- file = local->file;
-
- if (file) {
- fd_unref (file->fd);
- }
-
- if (file->op_ret == -1)
- {
- op_ret = file->op_ret;
- op_errno = file->op_errno;
-
- file->op_ret = 0;
- }
-
STACK_DESTROY (frame->root);
return 0;
}
@@ -1100,24 +1579,39 @@ wb_ffr_cbk (call_frame_t *frame,
int32_t op_errno)
{
wb_local_t *local = NULL;
- wb_file_t *file = NULL;
+ wb_file_t *file = NULL;
+ wb_conf_t *conf = NULL;
+ char unwind = 0;
+ conf = this->private;
local = frame->local;
file = local->file;
- if (file) {
- /* corresponds to the fd_ref() done during wb_file_create() */
- fd_unref (file->fd);
+
+ if (conf->flush_behind
+ && (!file->disabled) && (file->disable_till == 0)) {
+ unwind = 1;
+ } else {
+ local->reply_count++;
+ /* without flush-behind, unwind should wait for replies of
+ writes queued before and the flush */
+ if (local->reply_count == 2) {
+ unwind = 1;
+ }
}
- if (file->op_ret == -1)
- {
- op_ret = file->op_ret;
- op_errno = file->op_errno;
+ if (unwind) {
+ if (file->op_ret == -1) {
+ op_ret = file->op_ret;
+ op_errno = file->op_errno;
- file->op_ret = 0;
+ file->op_ret = 0;
+ }
+
+ wb_process_queue (frame, file, 0);
+
+ STACK_UNWIND (frame, op_ret, op_errno);
}
- STACK_UNWIND (frame, op_ret, op_errno);
return 0;
}
@@ -1127,11 +1621,13 @@ wb_flush (call_frame_t *frame,
xlator_t *this,
fd_t *fd)
{
- wb_conf_t *conf = NULL;
- wb_file_t *file = NULL;
- call_frame_t *flush_frame = NULL;
- wb_local_t *local = NULL;
- uint64_t tmp_file = 0;
+ wb_conf_t *conf = NULL;
+ wb_file_t *file = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+ call_stub_t *stub = NULL;
+ call_frame_t *process_frame = NULL;
+ wb_local_t *tmp_local = NULL;
conf = this->private;
@@ -1145,32 +1641,35 @@ wb_flush (call_frame_t *frame,
local = CALLOC (1, sizeof (*local));
local->file = file;
- if (file)
- fd_ref (file->fd);
- if (&file->request != file->request.next) {
- gf_log (this->name, GF_LOG_DEBUG,
- "request queue is not empty, it has to be synced");
+ frame->local = local;
+ stub = fop_flush_cbk_stub (frame, wb_ffr_cbk, 0, 0);
+ if (stub == NULL) {
+ STACK_UNWIND (frame, -1, ENOMEM);
+ return 0;
}
- if (conf->flush_behind &&
- (!file->disabled) && (file->disable_till == 0)) {
- flush_frame = copy_frame (frame);
- STACK_UNWIND (frame, file->op_ret,
- file->op_errno); // liar! liar! :O
+ if (conf->flush_behind
+ && (!file->disabled) && (file->disable_till == 0)) {
+ tmp_local = CALLOC (1, sizeof (*local));
+ tmp_local->file = file;
- flush_frame->local = local;
- wb_sync_all (flush_frame, file);
+ process_frame = copy_frame (frame);
+ process_frame->local = tmp_local;
+ }
+
+ wb_enqueue (file, stub);
- STACK_WIND (flush_frame,
+ wb_process_queue (process_frame, file, 1);
+
+ if (conf->flush_behind
+ && (!file->disabled) && (file->disable_till == 0)) {
+ STACK_WIND (process_frame,
wb_ffr_bg_cbk,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->flush,
fd);
} else {
- wb_sync_all (frame, file);
-
- frame->local = local;
STACK_WIND (frame,
wb_ffr_cbk,
FIRST_CHILD(this),
@@ -1182,40 +1681,64 @@ wb_flush (call_frame_t *frame,
}
-int32_t
+static int32_t
wb_fsync_cbk (call_frame_t *frame,
void *cookie,
xlator_t *this,
int32_t op_ret,
int32_t op_errno)
{
- wb_local_t *local = NULL;
- wb_file_t *file = NULL;
+ wb_local_t *local = NULL;
+ wb_file_t *file = NULL;
+ wb_request_t *request = NULL;
local = frame->local;
file = local->file;
+ request = local->request;
- if (file->op_ret == -1)
- {
+ if (file->op_ret == -1) {
op_ret = file->op_ret;
op_errno = file->op_errno;
file->op_ret = 0;
}
+ if (request) {
+ wb_request_unref (request);
+ wb_process_queue (frame, file, 0);
+ }
+
STACK_UNWIND (frame, op_ret, op_errno);
+
return 0;
}
+
+static int32_t
+wb_fsync_helper (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ int32_t datasync)
+{
+ STACK_WIND (frame,
+ wb_fsync_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsync,
+ fd, datasync);
+ return 0;
+}
+
+
int32_t
wb_fsync (call_frame_t *frame,
xlator_t *this,
fd_t *fd,
int32_t datasync)
{
- wb_file_t *file = NULL;
- wb_local_t *local = NULL;
- uint64_t tmp_file = 0;
+ wb_file_t *file = NULL;
+ wb_local_t *local = NULL;
+ uint64_t tmp_file = 0;
+ call_stub_t *stub = NULL;
if (fd_ctx_get (fd, this, &tmp_file)) {
gf_log (this->name, GF_LOG_ERROR, "returning EBADFD");
@@ -1224,19 +1747,30 @@ wb_fsync (call_frame_t *frame,
}
file = (wb_file_t *)(long)tmp_file;
- if (file)
- wb_sync_all (frame, file);
local = CALLOC (1, sizeof (*local));
local->file = file;
frame->local = local;
- STACK_WIND (frame,
- wb_fsync_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->fsync,
- fd, datasync);
+ if (file) {
+ stub = fop_fsync_stub (frame, wb_fsync_helper, fd, datasync);
+ if (stub == NULL) {
+ STACK_UNWIND (frame, -1, ENOMEM);
+ return 0;
+ }
+
+ wb_enqueue (file, stub);
+
+ wb_process_queue (frame, file, 1);
+ } else {
+ STACK_WIND (frame,
+ wb_fsync_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsync,
+ fd, datasync);
+ }
+
return 0;
}
@@ -1245,10 +1779,19 @@ int32_t
wb_release (xlator_t *this,
fd_t *fd)
{
- uint64_t file = 0;
+ uint64_t file_ptr = 0;
+ wb_file_t *file = NULL;
+
+ fd_ctx_get (fd, this, &file_ptr);
+ file = (wb_file_t *) (long) file_ptr;
+
+ LOCK (&file->lock);
+ {
+ assert (list_empty (&file->request));
+ }
+ UNLOCK (&file->lock);
- fd_ctx_get (fd, this, &file);
- wb_file_destroy ((wb_file_t *)(long)file);
+ wb_file_destroy (file);
return 0;
}
@@ -1257,14 +1800,14 @@ wb_release (xlator_t *this,
int32_t
init (xlator_t *this)
{
- dict_t *options = NULL;
+ dict_t *options = NULL;
wb_conf_t *conf = NULL;
- char *aggregate_size_string = NULL;
- char *window_size_string = NULL;
- char *flush_behind_string = NULL;
- char *disable_till_string = NULL;
- char *enable_O_SYNC_string = NULL;
- int32_t ret = -1;
+ char *aggregate_size_string = NULL;
+ char *window_size_string = NULL;
+ char *flush_behind_string = NULL;
+ char *disable_till_string = NULL;
+ char *enable_O_SYNC_string = NULL;
+ int32_t ret = -1;
if ((this->children == NULL)
|| this->children->next) {