diff options
Diffstat (limited to 'xlators/performance/write-behind/src/write-behind.c')
-rw-r--r-- | xlators/performance/write-behind/src/write-behind.c | 1351 |
1 files changed, 947 insertions, 404 deletions
diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c index 86752cc946e..c41bf3d58e0 100644 --- a/xlators/performance/write-behind/src/write-behind.c +++ b/xlators/performance/write-behind/src/write-behind.c @@ -33,6 +33,7 @@ #include "compat.h" #include "compat-errno.h" #include "common-utils.h" +#include "call-stub.h" #define MAX_VECTOR_COUNT 8 @@ -42,75 +43,207 @@ struct wb_page; struct wb_file; +typedef struct wb_file { + int disabled; + uint64_t disable_till; + size_t window_size; + int32_t refcount; + int32_t op_ret; + int32_t op_errno; + list_head_t request; + fd_t *fd; + gf_lock_t lock; + xlator_t *this; +}wb_file_t; + + +typedef struct wb_request { + list_head_t list; + list_head_t winds; + list_head_t unwinds; + list_head_t other_requests; + call_stub_t *stub; + int32_t refcount; + wb_file_t *file; + union { + struct { + char write_behind; + char stack_wound; + char got_reply; + }write_request; + + struct { + char marked_for_resume; + }other_requests; + }flags; +} wb_request_t; + + struct wb_conf { - uint64_t aggregate_size; - uint64_t window_size; - uint64_t disable_till; + uint64_t aggregate_size; + uint64_t window_size; + uint64_t disable_till; gf_boolean_t enable_O_SYNC; gf_boolean_t flush_behind; }; typedef struct wb_local { - list_head_t winds; + list_head_t winds; struct wb_file *file; - list_head_t unwind_frames; - int op_ret; - int op_errno; - call_frame_t *frame; + wb_request_t *request; + int op_ret; + int op_errno; + call_frame_t *frame; + int32_t reply_count; } wb_local_t; -typedef struct write_request { - call_frame_t *frame; - off_t offset; - /* int32_t op_ret; - int32_t op_errno; */ - struct iovec *vector; - int32_t count; - dict_t *refs; - char write_behind; - char stack_wound; - char got_reply; - list_head_t list; - list_head_t winds; - /* list_head_t unwinds;*/ -} wb_write_request_t; - - -struct wb_file { - int disabled; - uint64_t disable_till; - off_t offset; - size_t window_size; - int32_t refcount; - int32_t op_ret; - int32_t op_errno; - list_head_t request; - fd_t *fd; - gf_lock_t lock; - xlator_t *this; -}; - - typedef struct wb_conf wb_conf_t; typedef struct wb_page wb_page_t; -typedef struct wb_file wb_file_t; int32_t wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all); -int32_t +size_t wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds); -int32_t -wb_sync_all (call_frame_t *frame, wb_file_t *file); - -int32_t +size_t __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_size); +static void +__wb_request_unref (wb_request_t *this) +{ + if (this->refcount <= 0) { + gf_log ("wb-request", GF_LOG_ERROR, + "refcount(%d) is <= 0", this->refcount); + return; + } + + this->refcount--; + if (this->refcount == 0) { + list_del_init (&this->list); + if (this->stub && this->stub->fop == GF_FOP_WRITE) { + call_stub_destroy (this->stub); + } + + FREE (this); + } +} + + +static void +wb_request_unref (wb_request_t *this) +{ + wb_file_t *file = NULL; + if (this == NULL) { + gf_log ("wb-request", GF_LOG_DEBUG, + "request is NULL"); + return; + } + + file = this->file; + LOCK (&file->lock); + { + __wb_request_unref (this); + } + UNLOCK (&file->lock); +} + + +static wb_request_t * +__wb_request_ref (wb_request_t *this) +{ + if (this->refcount < 0) { + gf_log ("wb-request", GF_LOG_DEBUG, + "refcount(%d) is < 0", this->refcount); + return NULL; + } + + this->refcount++; + return this; +} + + +wb_request_t * +wb_request_ref (wb_request_t *this) +{ + wb_file_t *file = NULL; + if (this == NULL) { + gf_log ("wb-request", GF_LOG_DEBUG, + "request is NULL"); + return NULL; + } + + file = this->file; + LOCK (&file->lock); + { + this = __wb_request_ref (this); + } + UNLOCK (&file->lock); + + return this; +} + + +wb_request_t * +wb_enqueue (wb_file_t *file, + call_stub_t *stub) +{ + wb_request_t *request = NULL; + call_frame_t *frame = NULL; + wb_local_t *local = NULL; + struct iovec *vector = NULL; + int32_t count = 0; + + request = CALLOC (1, sizeof (*request)); + + INIT_LIST_HEAD (&request->list); + INIT_LIST_HEAD (&request->winds); + INIT_LIST_HEAD (&request->unwinds); + INIT_LIST_HEAD (&request->other_requests); + + request->stub = stub; + request->file = file; + + frame = stub->frame; + local = frame->local; + if (local) { + local->request = request; + } + + if (stub->fop == GF_FOP_WRITE) { + vector = stub->args.writev.vector; + count = stub->args.writev.count; + + frame = stub->frame; + local = frame->local; + local->op_ret = iov_length (vector, count); + local->op_errno = 0; + } + + LOCK (&file->lock); + { + list_add_tail (&request->list, &file->request); + if (stub->fop == GF_FOP_WRITE) { + /* reference for stack winding */ + __wb_request_ref (request); + + /* reference for stack unwinding */ + __wb_request_ref (request); + } else { + /*reference for resuming */ + __wb_request_ref (request); + } + } + UNLOCK (&file->lock); + + return request; +} + + wb_file_t * wb_file_create (xlator_t *this, fd_t *fd) @@ -161,10 +294,12 @@ wb_sync_cbk (call_frame_t *frame, int32_t op_errno, struct stat *stbuf) { - wb_local_t *local = NULL; - list_head_t *winds = NULL; - wb_file_t *file = NULL; - wb_write_request_t *request = NULL, *dummy = NULL; + wb_local_t *local = NULL; + list_head_t *winds = NULL; + wb_file_t *file = NULL; + wb_request_t *request = NULL, *dummy = NULL; + wb_local_t *per_request_local = NULL; + local = frame->local; winds = &local->winds; @@ -173,27 +308,25 @@ wb_sync_cbk (call_frame_t *frame, LOCK (&file->lock); { list_for_each_entry_safe (request, dummy, winds, winds) { - request->got_reply = 1; - if (!request->write_behind && (op_ret == -1)) { - wb_local_t *per_request_local = request->frame->local; + request->flags.write_request.got_reply = 1; + + if (!request->flags.write_request.write_behind + && (op_ret == -1)) { + per_request_local = request->stub->frame->local; per_request_local->op_ret = op_ret; per_request_local->op_errno = op_errno; } - /* - request->op_ret = op_ret; - request->op_errno = op_errno; - */ + __wb_request_unref (request); + } + + if (op_ret == -1) { + file->op_ret = op_ret; + file->op_errno = op_errno; } } UNLOCK (&file->lock); - if (op_ret == -1) - { - file->op_ret = op_ret; - file->op_errno = op_errno; - } - wb_process_queue (frame, file, 0); /* safe place to do fd_unref */ @@ -204,43 +337,25 @@ wb_sync_cbk (call_frame_t *frame, return 0; } -int32_t -wb_sync_all (call_frame_t *frame, wb_file_t *file) -{ - list_head_t winds; - int32_t bytes = 0; - - INIT_LIST_HEAD (&winds); - - LOCK (&file->lock); - { - bytes = __wb_mark_winds (&file->request, &winds, 0); - } - UNLOCK (&file->lock); - - wb_sync (frame, file, &winds); - - return bytes; -} - -int32_t +size_t wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds) { - wb_write_request_t *dummy = NULL, *request = NULL, *first_request = NULL, *next = NULL; - size_t total_count = 0, count = 0; - size_t copied = 0; - call_frame_t *sync_frame = NULL; - dict_t *refs = NULL; - wb_local_t *local = NULL; - struct iovec *vector = NULL; - int32_t bytes = 0; - size_t bytecount = 0; - - list_for_each_entry (request, winds, winds) - { - total_count += request->count; - bytes += iov_length (request->vector, request->count); + wb_request_t *dummy = NULL, *request = NULL; + wb_request_t *first_request = NULL, *next = NULL; + size_t total_count = 0, count = 0; + size_t copied = 0; + call_frame_t *sync_frame = NULL; + dict_t *refs = NULL; + wb_local_t *local = NULL; + struct iovec *vector = NULL; + size_t bytes = 0; + size_t bytecount = 0; + + list_for_each_entry (request, winds, winds) { + total_count += request->stub->args.writev.count; + bytes += iov_length (request->stub->args.writev.vector, + request->stub->args.writev.count); } if (!total_count) { @@ -258,26 +373,29 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds) first_request = request; } - count += request->count; - bytecount = VECTORSIZE (request->count); + count += request->stub->args.writev.count; + bytecount = VECTORSIZE (request->stub->args.writev.count); memcpy (((char *)vector)+copied, - request->vector, + request->stub->args.writev.vector, bytecount); copied += bytecount; - if (request->refs) { - dict_copy (request->refs, refs); + if (request->stub->args.writev.req_refs) { + dict_copy (request->stub->args.writev.req_refs, refs); } next = NULL; if (request->winds.next != winds) { - next = list_entry (request->winds.next, struct write_request, winds); + next = list_entry (request->winds.next, + wb_request_t, winds); } list_del_init (&request->winds); list_add_tail (&request->winds, &local->winds); - if (!next || ((count + next->count) > MAX_VECTOR_COUNT)) { + if (!next + || ((count + next->stub->args.writev.count) > MAX_VECTOR_COUNT)) + { sync_frame = copy_frame (frame); sync_frame->local = local; local->file = file; @@ -288,7 +406,8 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds) FIRST_CHILD(sync_frame->this), FIRST_CHILD(sync_frame->this)->fops->writev, file->fd, vector, - count, first_request->offset); + count, + first_request->stub->args.writev.off); dict_unref (refs); FREE (vector); @@ -311,15 +430,44 @@ wb_stat_cbk (call_frame_t *frame, int32_t op_errno, struct stat *buf) { - wb_local_t *local = NULL; + wb_local_t *local = NULL; + wb_request_t *request = NULL; + call_frame_t *process_frame = NULL; + wb_file_t *file = NULL; local = frame->local; - - if (local->file) - fd_unref (local->file->fd); + file = local->file; + + request = local->request; + if (request) { + process_frame = copy_frame (frame); + } STACK_UNWIND (frame, op_ret, op_errno, buf); + if (request) { + wb_request_unref (request); + wb_process_queue (process_frame, file, 0); + STACK_DESTROY (process_frame->root); + } + + if (file) { + fd_unref (file->fd); + } + + return 0; +} + + +static int32_t +wb_stat_helper (call_frame_t *frame, + xlator_t *this, + loc_t *loc) +{ + STACK_WIND (frame, wb_stat_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->stat, + loc); return 0; } @@ -329,13 +477,14 @@ wb_stat (call_frame_t *frame, xlator_t *this, loc_t *loc) { - wb_file_t *file = NULL; - fd_t *iter_fd = NULL; - wb_local_t *local = NULL; - uint64_t tmp_file = 0; + wb_file_t *file = NULL; + fd_t *iter_fd = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + call_stub_t *stub = NULL; - if (loc->inode) - { + if (loc->inode) { + /* FIXME: fd_lookup extends life of fd till stat returns */ iter_fd = fd_lookup (loc->inode, frame->root->pid); if (iter_fd) { if (!fd_ctx_get (iter_fd, this, &tmp_file)) { @@ -344,9 +493,6 @@ wb_stat (call_frame_t *frame, fd_unref (iter_fd); } } - if (file) { - wb_sync_all (frame, file); - } } local = CALLOC (1, sizeof (*local)); @@ -354,10 +500,64 @@ wb_stat (call_frame_t *frame, frame->local = local; - STACK_WIND (frame, wb_stat_cbk, + if (file) { + stub = fop_stat_stub (frame, wb_stat_helper, loc); + if (stub == NULL) { + STACK_UNWIND (frame, -1, ENOMEM, NULL); + return 0; + } + + wb_enqueue (file, stub); + + wb_process_queue (frame, file, 1); + } else { + STACK_WIND (frame, wb_stat_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->stat, + loc); + } + + return 0; +} + + +int32_t +wb_fstat_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *buf) +{ + wb_local_t *local = NULL; + wb_request_t *request = NULL; + wb_file_t *file = NULL; + + local = frame->local; + file = local->file; + + request = local->request; + if (request) { + wb_request_unref (request); + wb_process_queue (frame, file, 0); + } + + STACK_UNWIND (frame, op_ret, op_errno, buf); + + return 0; +} + + +int32_t +wb_fstat_helper (call_frame_t *frame, + xlator_t *this, + fd_t *fd) +{ + STACK_WIND (frame, + wb_fstat_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->stat, - loc); + FIRST_CHILD(this)->fops->fstat, + fd); return 0; } @@ -367,9 +567,10 @@ wb_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd) { - wb_file_t *file = NULL; - wb_local_t *local = NULL; - uint64_t tmp_file = 0; + wb_file_t *file = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + call_stub_t *stub = NULL; if (fd_ctx_get (fd, this, &tmp_file)) { gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); @@ -378,21 +579,29 @@ wb_fstat (call_frame_t *frame, } file = (wb_file_t *)(long)tmp_file; - if (file) { - fd_ref (file->fd); - wb_sync_all (frame, file); - } - local = CALLOC (1, sizeof (*local)); local->file = file; frame->local = local; - - STACK_WIND (frame, - wb_stat_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fstat, - fd); + + if (file) { + stub = fop_fstat_stub (frame, wb_fstat_helper, fd); + if (stub == NULL) { + STACK_UNWIND (frame, -1, ENOMEM, NULL); + return 0; + } + + wb_enqueue (file, stub); + + wb_process_queue (frame, file, 1); + } else { + STACK_WIND (frame, + wb_fstat_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fstat, + fd); + } + return 0; } @@ -405,13 +614,48 @@ wb_truncate_cbk (call_frame_t *frame, int32_t op_errno, struct stat *buf) { - wb_local_t *local = NULL; - + wb_local_t *local = NULL; + wb_request_t *request = NULL; + wb_file_t *file = NULL; + call_frame_t *process_frame = NULL; + local = frame->local; - if (local->file) - fd_unref (local->file->fd); + file = local->file; + request = local->request; + + if (request) { + process_frame = copy_frame (frame); + } STACK_UNWIND (frame, op_ret, op_errno, buf); + + if (request) { + wb_request_unref (request); + wb_process_queue (process_frame, file, 0); + STACK_DESTROY (process_frame->root); + } + + if (file) { + fd_unref (file->fd); + } + + return 0; +} + + +static int32_t +wb_truncate_helper (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + off_t offset) +{ + STACK_WIND (frame, + wb_truncate_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, + loc, + offset); + return 0; } @@ -422,13 +666,16 @@ wb_truncate (call_frame_t *frame, loc_t *loc, off_t offset) { - wb_file_t *file = NULL; - fd_t *iter_fd = NULL; - wb_local_t *local = NULL; - uint64_t tmp_file = 0; + wb_file_t *file = NULL; + fd_t *iter_fd = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + call_stub_t *stub = NULL; if (loc->inode) { + /* FIXME: fd_lookup extends life of fd till the execution of + truncate_cbk */ iter_fd = fd_lookup (loc->inode, frame->root->pid); if (iter_fd) { if (!fd_ctx_get (iter_fd, this, &tmp_file)){ @@ -437,37 +684,90 @@ wb_truncate (call_frame_t *frame, fd_unref (iter_fd); } } - - if (file) - { - wb_sync_all (frame, file); - } } local = CALLOC (1, sizeof (*local)); local->file = file; - + frame->local = local; + if (file) { + stub = fop_truncate_stub (frame, wb_truncate_helper, loc, + offset); + if (stub == NULL) { + STACK_UNWIND (frame, -1, ENOMEM, NULL); + return 0; + } + + wb_enqueue (file, stub); + + wb_process_queue (frame, file, 1); + + } else { + STACK_WIND (frame, + wb_truncate_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, + loc, + offset); + } + return 0; +} + + +int32_t +wb_ftruncate_cbk (call_frame_t *frame, + void *cookie, + xlator_t *this, + int32_t op_ret, + int32_t op_errno, + struct stat *buf) +{ + wb_local_t *local = NULL; + wb_request_t *request = NULL; + wb_file_t *file = NULL; + + local = frame->local; + file = local->file; + request = local->request; + + if (request) { + wb_request_unref (request); + wb_process_queue (frame, file, 0); + } + + STACK_UNWIND (frame, op_ret, op_errno, buf); + + return 0; +} + + +static int32_t +wb_ftruncate_helper (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + off_t offset) +{ STACK_WIND (frame, - wb_truncate_cbk, + wb_ftruncate_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->truncate, - loc, + FIRST_CHILD(this)->fops->ftruncate, + fd, offset); return 0; } - + int32_t wb_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset) { - wb_file_t *file = NULL; - wb_local_t *local = NULL; - uint64_t tmp_file = 0; + wb_file_t *file = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + call_stub_t *stub = NULL; if (fd_ctx_get (fd, this, &tmp_file)) { gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); @@ -476,23 +776,32 @@ wb_ftruncate (call_frame_t *frame, } file = (wb_file_t *)(long)tmp_file; - if (file) - wb_sync_all (frame, file); local = CALLOC (1, sizeof (*local)); local->file = file; - if (file) - fd_ref (file->fd); - frame->local = local; - STACK_WIND (frame, - wb_truncate_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->ftruncate, - fd, - offset); + if (file) { + stub = fop_ftruncate_stub (frame, wb_ftruncate_helper, fd, + offset); + if (stub == NULL) { + STACK_UNWIND (frame, -1, ENOMEM, NULL); + return 0; + } + + wb_enqueue (file, stub); + + wb_process_queue (frame, file, 1); + } else { + STACK_WIND (frame, + wb_ftruncate_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->ftruncate, + fd, + offset); + } + return 0; } @@ -505,13 +814,48 @@ wb_utimens_cbk (call_frame_t *frame, int32_t op_errno, struct stat *buf) { - wb_local_t *local = NULL; + wb_local_t *local = NULL; + wb_request_t *request = NULL; + call_frame_t *process_frame = NULL; + wb_file_t *file = NULL; local = frame->local; - if (local->file) - fd_unref (local->file->fd); + file = local->file; + request = local->request; + + if (request) { + process_frame = copy_frame (frame); + } STACK_UNWIND (frame, op_ret, op_errno, buf); + + if (request) { + wb_request_unref (request); + wb_process_queue (process_frame, file, 0); + STACK_DESTROY (process_frame->root); + } + + if (file) { + fd_unref (file->fd); + } + + return 0; +} + + +static int32_t +wb_utimens_helper (call_frame_t *frame, + xlator_t *this, + loc_t *loc, + struct timespec tv[2]) +{ + STACK_WIND (frame, + wb_utimens_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->utimens, + loc, + tv); + return 0; } @@ -522,12 +866,15 @@ wb_utimens (call_frame_t *frame, loc_t *loc, struct timespec tv[2]) { - wb_file_t *file = NULL; - fd_t *iter_fd = NULL; - wb_local_t *local = NULL; - uint64_t tmp_file = 0; + wb_file_t *file = NULL; + fd_t *iter_fd = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + call_stub_t *stub = NULL; if (loc->inode) { + /* FIXME: fd_lookup extends life of fd till the execution + of wb_utimens_cbk */ iter_fd = fd_lookup (loc->inode, frame->root->pid); if (iter_fd) { if (!fd_ctx_get (iter_fd, this, &tmp_file)) { @@ -537,8 +884,6 @@ wb_utimens (call_frame_t *frame, } } - if (file) - wb_sync_all (frame, file); } local = CALLOC (1, sizeof (*local)); @@ -546,12 +891,25 @@ wb_utimens (call_frame_t *frame, frame->local = local; - STACK_WIND (frame, - wb_utimens_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->utimens, - loc, - tv); + if (file) { + stub = fop_utimens_stub (frame, wb_utimens_helper, loc, tv); + if (stub == NULL) { + STACK_UNWIND (frame, -1, ENOMEM, NULL); + return 0; + } + + wb_enqueue (file, stub); + + wb_process_queue (frame, file, 1); + } else { + STACK_WIND (frame, + wb_utimens_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->utimens, + loc, + tv); + } + return 0; } @@ -563,7 +921,7 @@ wb_open_cbk (call_frame_t *frame, int32_t op_errno, fd_t *fd) { - int32_t flags = 0; + int32_t flags = 0; wb_file_t *file = NULL; wb_conf_t *conf = this->private; @@ -574,17 +932,18 @@ wb_open_cbk (call_frame_t *frame, /* If mandatory locking has been enabled on this file, we disable caching on it */ - if ((fd->inode->st_mode & S_ISGID) && !(fd->inode->st_mode & S_IXGRP)) + if ((fd->inode->st_mode & S_ISGID) + && !(fd->inode->st_mode & S_IXGRP)) file->disabled = 1; /* If O_DIRECT then, we disable chaching */ if (frame->local) { flags = *((int32_t *)frame->local); - if (((flags & O_DIRECT) == O_DIRECT) || - ((flags & O_RDONLY) == O_RDONLY) || - (((flags & O_SYNC) == O_SYNC) && - conf->enable_O_SYNC == _gf_true)) { + if (((flags & O_DIRECT) == O_DIRECT) + || ((flags & O_RDONLY) == O_RDONLY) + || (((flags & O_SYNC) == O_SYNC) + && conf->enable_O_SYNC == _gf_true)) { file->disabled = 1; } } @@ -635,8 +994,8 @@ wb_create_cbk (call_frame_t *frame, * If mandatory locking has been enabled on this file, * we disable caching on it */ - if ((fd->inode->st_mode & S_ISGID) && - !(fd->inode->st_mode & S_IXGRP)) + if ((fd->inode->st_mode & S_ISGID) + && !(fd->inode->st_mode & S_IXGRP)) { file->disabled = 1; } @@ -666,44 +1025,43 @@ wb_create (call_frame_t *frame, } -int32_t -__wb_cleanup_queue (wb_file_t *file) +size_t +__wb_mark_wind_all (list_head_t *list, list_head_t *winds) { - wb_write_request_t *request = NULL, *dummy = NULL; - int32_t bytes = 0; + wb_request_t *request = NULL; + size_t size = 0; + struct iovec *vector = NULL; + int32_t count = 0; + char first_request = 1; + off_t offset_expected = 0; + size_t length = 0; - list_for_each_entry_safe (request, dummy, &file->request, list) + list_for_each_entry (request, list, list) { - if (request->got_reply && request->write_behind) - { - bytes += iov_length (request->vector, request->count); - list_del_init (&request->list); - - FREE (request->vector); - dict_unref (request->refs); - - FREE (request); + if ((request->stub == NULL) + || (request->stub->fop != GF_FOP_WRITE)) { + break; } - } - return bytes; -} + vector = request->stub->args.writev.vector; + count = request->stub->args.writev.count; + if (!request->flags.write_request.stack_wound) { + if (first_request) { + first_request = 0; + offset_expected = request->stub->args.writev.off; + } + + if (request->stub->args.writev.off != offset_expected) { + break; + } + length = iov_length (vector, count); + size += length; + offset_expected += length; -int32_t -__wb_mark_wind_all (list_head_t *list, list_head_t *winds) -{ - wb_write_request_t *request = NULL; - size_t size = 0; - - list_for_each_entry (request, list, list) - { - if (!request->stack_wound) - { - size += iov_length (request->vector, request->count); - request->stack_wound = 1; + request->flags.write_request.stack_wound = 1; list_add_tail (&request->winds, winds); - } + } } return size; @@ -711,32 +1069,66 @@ __wb_mark_wind_all (list_head_t *list, list_head_t *winds) size_t -__wb_get_aggregate_size (list_head_t *list) +__wb_get_aggregate_size (list_head_t *list, char *other_fop_in_queue, + char *non_contiguous_writes) { - wb_write_request_t *request = NULL; - size_t size = 0; + wb_request_t *request = NULL; + size_t size = 0, length = 0; + struct iovec *vector = NULL; + int32_t count = 0; + char first_request = 1; + off_t offset_expected = 0; list_for_each_entry (request, list, list) { - if (!request->stack_wound) - { - size += iov_length (request->vector, request->count); + if ((request->stub == NULL) + || (request->stub->fop != GF_FOP_WRITE)) { + if (request->stub && other_fop_in_queue) { + *other_fop_in_queue = 1; + } + break; + } + + vector = request->stub->args.writev.vector; + count = request->stub->args.writev.count; + if (!request->flags.write_request.stack_wound) { + if (first_request) { + first_request = 0; + offset_expected = request->stub->args.writev.off; + } + + if (offset_expected != request->stub->args.writev.off) { + if (non_contiguous_writes) { + *non_contiguous_writes = 1; + } + break; + } + + length = iov_length (vector, count); + size += length; + offset_expected += length; } } return size; } + uint32_t __wb_get_incomplete_writes (list_head_t *list) { - wb_write_request_t *request = NULL; - uint32_t count = 0; + wb_request_t *request = NULL; + uint32_t count = 0; list_for_each_entry (request, list, list) { - if (request->stack_wound && !request->got_reply) - { + if ((request->stub == NULL) + || (request->stub->fop != GF_FOP_WRITE)) { + break; + } + + if (request->flags.write_request.stack_wound + && !request->flags.write_request.got_reply) { count++; } } @@ -744,18 +1136,22 @@ __wb_get_incomplete_writes (list_head_t *list) return count; } -int32_t + +size_t __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf) { - size_t aggregate_current = 0; + size_t aggregate_current = 0; uint32_t incomplete_writes = 0; + char other_fop_in_queue = 0; + char non_contiguous_writes = 0; incomplete_writes = __wb_get_incomplete_writes (list); - aggregate_current = __wb_get_aggregate_size (list); + aggregate_current = __wb_get_aggregate_size (list, &other_fop_in_queue, + &non_contiguous_writes); - if ((incomplete_writes == 0) || (aggregate_current >= aggregate_conf)) - { + if ((incomplete_writes == 0) || (aggregate_current >= aggregate_conf) + || other_fop_in_queue || non_contiguous_writes) { __wb_mark_wind_all (list, winds); } @@ -766,14 +1162,25 @@ __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf) size_t __wb_get_window_size (list_head_t *list) { - wb_write_request_t *request = NULL; - size_t size = 0; + wb_request_t *request = NULL; + size_t size = 0; + struct iovec *vector = NULL; + int32_t count = 0; list_for_each_entry (request, list, list) { - if (request->write_behind && !request->got_reply) + if ((request->stub == NULL) + || (request->stub->fop != GF_FOP_WRITE)) { + continue; + } + + vector = request->stub->args.writev.vector; + count = request->stub->args.writev.count; + + if (request->flags.write_request.write_behind + && !request->flags.write_request.got_reply) { - size += iov_length (request->vector, request->count); + size += iov_length (vector, count); } } @@ -784,23 +1191,28 @@ __wb_get_window_size (list_head_t *list) size_t __wb_mark_unwind_till (list_head_t *list, list_head_t *unwinds, size_t size) { - size_t written_behind = 0; - wb_write_request_t *request = NULL; + size_t written_behind = 0; + wb_request_t *request = NULL; + struct iovec *vector = NULL; + int32_t count = 0; list_for_each_entry (request, list, list) { - if (written_behind <= size) - { - if (!request->write_behind) - { - wb_local_t *local = request->frame->local; - written_behind += iov_length (request->vector, request->count); - request->write_behind = 1; - list_add_tail (&local->unwind_frames, unwinds); - } + if ((request->stub == NULL) + || (request->stub->fop != GF_FOP_WRITE)) { + continue; } - else - { + + vector = request->stub->args.writev.vector; + count = request->stub->args.writev.count; + + if (written_behind <= size) { + if (!request->flags.write_request.write_behind) { + written_behind += iov_length (vector, count); + request->flags.write_request.write_behind = 1; + list_add_tail (&request->unwinds, unwinds); + } + } else { break; } } @@ -825,16 +1237,48 @@ __wb_mark_unwinds (list_head_t *list, list_head_t *unwinds, size_t window_conf) } +uint32_t +__wb_get_other_requests (list_head_t *list, list_head_t *other_requests) +{ + wb_request_t *request = NULL; + uint32_t count = 0; + list_for_each_entry (request, list, list) { + if ((request->stub == NULL) + || (request->stub->fop == GF_FOP_WRITE)) { + break; + } + + if (!request->flags.other_requests.marked_for_resume) { + request->flags.other_requests.marked_for_resume = 1; + list_add_tail (&request->other_requests, + other_requests); + count++; + + /* lets handle one at a time */ + break; + } + } + + return count; +} + + int32_t wb_stack_unwind (list_head_t *unwinds) { - struct stat buf = {0,}; - wb_local_t *local = NULL, *dummy = NULL; + struct stat buf = {0,}; + wb_request_t *request = NULL, *dummy = NULL; + call_frame_t *frame = NULL; + wb_local_t *local = NULL; - list_for_each_entry_safe (local, dummy, unwinds, unwind_frames) + list_for_each_entry_safe (request, dummy, unwinds, unwinds) { - list_del_init (&local->unwind_frames); - STACK_UNWIND (local->frame, local->op_ret, local->op_errno, &buf); + frame = request->stub->frame; + local = frame->local; + + STACK_UNWIND (frame, local->op_ret, local->op_errno, &buf); + + wb_request_unref (request); } return 0; @@ -842,13 +1286,54 @@ wb_stack_unwind (list_head_t *unwinds) int32_t -wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, list_head_t *unwinds) +wb_resume_other_requests (call_frame_t *frame, wb_file_t *file, + list_head_t *other_requests) +{ + int32_t ret = 0; + wb_request_t *request = NULL, *dummy = NULL; + int32_t fops_removed = 0; + char wind = 0; + call_stub_t *stub = NULL; + + if (list_empty (other_requests)) { + goto out; + } + + list_for_each_entry_safe (request, dummy, other_requests, + other_requests) { + wind = request->stub->wind; + stub = request->stub; + + LOCK (&file->lock); + { + request->stub = NULL; + } + UNLOCK (&file->lock); + + if (!wind) { + wb_request_unref (request); + fops_removed++; + } + + call_resume (stub); + } + + if (fops_removed > 0) { + wb_process_queue (frame, file, 0); + } + +out: + return ret; +} + + +int32_t +wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, + list_head_t *unwinds, list_head_t *other_requests) { - /* copy the frame before calling wb_stack_unwind, since this request containing current frame might get unwound */ - /* call_frame_t *sync_frame = copy_frame (frame); */ - wb_stack_unwind (unwinds); wb_sync (frame, file, winds); + wb_resume_other_requests (frame, file, other_requests); return 0; } @@ -857,67 +1342,35 @@ wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, list_head_t int32_t wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all) { - list_head_t winds, unwinds; - size_t size = 0; - wb_conf_t *conf = file->this->private; + list_head_t winds, unwinds, other_requests; + size_t size = 0; + wb_conf_t *conf = file->this->private; + uint32_t count = 0; INIT_LIST_HEAD (&winds); INIT_LIST_HEAD (&unwinds); - - if (!file) - { + INIT_LIST_HEAD (&other_requests); + + if (!file) { return -1; } size = flush_all ? 0 : conf->aggregate_size; LOCK (&file->lock); { - __wb_cleanup_queue (file); - __wb_mark_winds (&file->request, &winds, size); - __wb_mark_unwinds (&file->request, &unwinds, conf->window_size); - } - UNLOCK (&file->lock); - - wb_do_ops (frame, file, &winds, &unwinds); - return 0; -} - - -wb_write_request_t * -wb_enqueue (wb_file_t *file, - call_frame_t *frame, - struct iovec *vector, - int32_t count, - off_t offset) -{ - wb_write_request_t *request = NULL; - wb_local_t *local = CALLOC (1, sizeof (*local)); + count = __wb_get_other_requests (&file->request, + &other_requests); - request = CALLOC (1, sizeof (*request)); - - INIT_LIST_HEAD (&request->list); - INIT_LIST_HEAD (&request->winds); - - request->frame = frame; - request->vector = iov_dup (vector, count); - request->count = count; - request->offset = offset; - request->refs = dict_ref (frame->root->req_refs); - - frame->local = local; - local->frame = frame; - local->op_ret = iov_length (vector, count); - local->op_errno = 0; - INIT_LIST_HEAD (&local->unwind_frames); + if (count == 0) { + __wb_mark_winds (&file->request, &winds, size); + } - LOCK (&file->lock); - { - list_add_tail (&request->list, &file->request); - file->offset = offset + iov_length (vector, count); + __wb_mark_unwinds (&file->request, &unwinds, conf->window_size); } UNLOCK (&file->lock); - return request; + wb_do_ops (frame, file, &winds, &unwinds, &other_requests); + return 0; } @@ -942,11 +1395,13 @@ wb_writev (call_frame_t *frame, int32_t count, off_t offset) { - wb_file_t *file = NULL; - char offset_expected = 1, wb_disabled = 0; + wb_file_t *file = NULL; + char wb_disabled = 0; call_frame_t *process_frame = NULL; - size_t size = 0; - uint64_t tmp_file = 0; + size_t size = 0; + uint64_t tmp_file = 0; + call_stub_t *stub = NULL; + wb_local_t *local = NULL; if (vector != NULL) size = iov_length (vector, count); @@ -975,9 +1430,6 @@ wb_writev (call_frame_t *frame, } wb_disabled = 1; } - - if (file->offset != offset) - offset_expected = 0; } UNLOCK (&file->lock); @@ -986,7 +1438,7 @@ wb_writev (call_frame_t *frame, wb_writev_cbk, FIRST_CHILD (frame->this), FIRST_CHILD (frame->this)->fops->writev, - file->fd, + fd, vector, count, offset); @@ -995,10 +1447,17 @@ wb_writev (call_frame_t *frame, process_frame = copy_frame (frame); - if (!offset_expected) - wb_process_queue (process_frame, file, 1); + local = CALLOC (1, sizeof (*local)); + frame->local = local; + local->file = file; + + stub = fop_writev_stub (frame, NULL, fd, vector, count, offset); + if (stub == NULL) { + STACK_UNWIND (frame, -1, ENOMEM, NULL); + return 0; + } - wb_enqueue (file, frame, vector, count, offset); + wb_enqueue (file, stub); wb_process_queue (process_frame, file, 0); STACK_DESTROY (process_frame->root); @@ -1017,11 +1476,38 @@ wb_readv_cbk (call_frame_t *frame, int32_t count, struct stat *stbuf) { - wb_local_t *local = NULL; + wb_local_t *local = NULL; + wb_file_t *file = NULL; + wb_request_t *request = NULL; local = frame->local; + file = local->file; + request = local->request; + + if (request) { + wb_request_unref (request); + wb_process_queue (frame, file, 0); + } STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); + + return 0; +} + + +static int32_t +wb_readv_helper (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + size_t size, + off_t offset) +{ + STACK_WIND (frame, + wb_readv_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readv, + fd, size, offset); + return 0; } @@ -1033,9 +1519,10 @@ wb_readv (call_frame_t *frame, size_t size, off_t offset) { - wb_file_t *file = NULL; - wb_local_t *local = NULL; - uint64_t tmp_file = 0; + wb_file_t *file = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + call_stub_t *stub = NULL; if (fd_ctx_get (fd, this, &tmp_file)) { gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); @@ -1044,19 +1531,29 @@ wb_readv (call_frame_t *frame, } file = (wb_file_t *)(long)tmp_file; - if (file) - wb_sync_all (frame, file); local = CALLOC (1, sizeof (*local)); local->file = file; frame->local = local; + if (file) { + stub = fop_readv_stub (frame, wb_readv_helper, fd, size, + offset); + if (stub == NULL) { + STACK_UNWIND (frame, -1, ENOMEM, NULL); + return 0; + } - STACK_WIND (frame, - wb_readv_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->readv, - fd, size, offset); + wb_enqueue (file, stub); + + wb_process_queue (frame, file, 1); + } else { + STACK_WIND (frame, + wb_readv_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readv, + fd, size, offset); + } return 0; } @@ -1069,24 +1566,6 @@ wb_ffr_bg_cbk (call_frame_t *frame, int32_t op_ret, int32_t op_errno) { - wb_local_t *local = NULL; - wb_file_t *file = NULL; - - local = frame->local; - file = local->file; - - if (file) { - fd_unref (file->fd); - } - - if (file->op_ret == -1) - { - op_ret = file->op_ret; - op_errno = file->op_errno; - - file->op_ret = 0; - } - STACK_DESTROY (frame->root); return 0; } @@ -1100,24 +1579,39 @@ wb_ffr_cbk (call_frame_t *frame, int32_t op_errno) { wb_local_t *local = NULL; - wb_file_t *file = NULL; + wb_file_t *file = NULL; + wb_conf_t *conf = NULL; + char unwind = 0; + conf = this->private; local = frame->local; file = local->file; - if (file) { - /* corresponds to the fd_ref() done during wb_file_create() */ - fd_unref (file->fd); + + if (conf->flush_behind + && (!file->disabled) && (file->disable_till == 0)) { + unwind = 1; + } else { + local->reply_count++; + /* without flush-behind, unwind should wait for replies of + writes queued before and the flush */ + if (local->reply_count == 2) { + unwind = 1; + } } - if (file->op_ret == -1) - { - op_ret = file->op_ret; - op_errno = file->op_errno; + if (unwind) { + if (file->op_ret == -1) { + op_ret = file->op_ret; + op_errno = file->op_errno; - file->op_ret = 0; + file->op_ret = 0; + } + + wb_process_queue (frame, file, 0); + + STACK_UNWIND (frame, op_ret, op_errno); } - STACK_UNWIND (frame, op_ret, op_errno); return 0; } @@ -1127,11 +1621,13 @@ wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd) { - wb_conf_t *conf = NULL; - wb_file_t *file = NULL; - call_frame_t *flush_frame = NULL; - wb_local_t *local = NULL; - uint64_t tmp_file = 0; + wb_conf_t *conf = NULL; + wb_file_t *file = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + call_stub_t *stub = NULL; + call_frame_t *process_frame = NULL; + wb_local_t *tmp_local = NULL; conf = this->private; @@ -1145,32 +1641,35 @@ wb_flush (call_frame_t *frame, local = CALLOC (1, sizeof (*local)); local->file = file; - if (file) - fd_ref (file->fd); - if (&file->request != file->request.next) { - gf_log (this->name, GF_LOG_DEBUG, - "request queue is not empty, it has to be synced"); + frame->local = local; + stub = fop_flush_cbk_stub (frame, wb_ffr_cbk, 0, 0); + if (stub == NULL) { + STACK_UNWIND (frame, -1, ENOMEM); + return 0; } - if (conf->flush_behind && - (!file->disabled) && (file->disable_till == 0)) { - flush_frame = copy_frame (frame); - STACK_UNWIND (frame, file->op_ret, - file->op_errno); // liar! liar! :O + if (conf->flush_behind + && (!file->disabled) && (file->disable_till == 0)) { + tmp_local = CALLOC (1, sizeof (*local)); + tmp_local->file = file; - flush_frame->local = local; - wb_sync_all (flush_frame, file); + process_frame = copy_frame (frame); + process_frame->local = tmp_local; + } + + wb_enqueue (file, stub); - STACK_WIND (flush_frame, + wb_process_queue (process_frame, file, 1); + + if (conf->flush_behind + && (!file->disabled) && (file->disable_till == 0)) { + STACK_WIND (process_frame, wb_ffr_bg_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->flush, fd); } else { - wb_sync_all (frame, file); - - frame->local = local; STACK_WIND (frame, wb_ffr_cbk, FIRST_CHILD(this), @@ -1182,40 +1681,64 @@ wb_flush (call_frame_t *frame, } -int32_t +static int32_t wb_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno) { - wb_local_t *local = NULL; - wb_file_t *file = NULL; + wb_local_t *local = NULL; + wb_file_t *file = NULL; + wb_request_t *request = NULL; local = frame->local; file = local->file; + request = local->request; - if (file->op_ret == -1) - { + if (file->op_ret == -1) { op_ret = file->op_ret; op_errno = file->op_errno; file->op_ret = 0; } + if (request) { + wb_request_unref (request); + wb_process_queue (frame, file, 0); + } + STACK_UNWIND (frame, op_ret, op_errno); + return 0; } + +static int32_t +wb_fsync_helper (call_frame_t *frame, + xlator_t *this, + fd_t *fd, + int32_t datasync) +{ + STACK_WIND (frame, + wb_fsync_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsync, + fd, datasync); + return 0; +} + + int32_t wb_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync) { - wb_file_t *file = NULL; - wb_local_t *local = NULL; - uint64_t tmp_file = 0; + wb_file_t *file = NULL; + wb_local_t *local = NULL; + uint64_t tmp_file = 0; + call_stub_t *stub = NULL; if (fd_ctx_get (fd, this, &tmp_file)) { gf_log (this->name, GF_LOG_ERROR, "returning EBADFD"); @@ -1224,19 +1747,30 @@ wb_fsync (call_frame_t *frame, } file = (wb_file_t *)(long)tmp_file; - if (file) - wb_sync_all (frame, file); local = CALLOC (1, sizeof (*local)); local->file = file; frame->local = local; - STACK_WIND (frame, - wb_fsync_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fsync, - fd, datasync); + if (file) { + stub = fop_fsync_stub (frame, wb_fsync_helper, fd, datasync); + if (stub == NULL) { + STACK_UNWIND (frame, -1, ENOMEM); + return 0; + } + + wb_enqueue (file, stub); + + wb_process_queue (frame, file, 1); + } else { + STACK_WIND (frame, + wb_fsync_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsync, + fd, datasync); + } + return 0; } @@ -1245,10 +1779,19 @@ int32_t wb_release (xlator_t *this, fd_t *fd) { - uint64_t file = 0; + uint64_t file_ptr = 0; + wb_file_t *file = NULL; + + fd_ctx_get (fd, this, &file_ptr); + file = (wb_file_t *) (long) file_ptr; + + LOCK (&file->lock); + { + assert (list_empty (&file->request)); + } + UNLOCK (&file->lock); - fd_ctx_get (fd, this, &file); - wb_file_destroy ((wb_file_t *)(long)file); + wb_file_destroy (file); return 0; } @@ -1257,14 +1800,14 @@ wb_release (xlator_t *this, int32_t init (xlator_t *this) { - dict_t *options = NULL; + dict_t *options = NULL; wb_conf_t *conf = NULL; - char *aggregate_size_string = NULL; - char *window_size_string = NULL; - char *flush_behind_string = NULL; - char *disable_till_string = NULL; - char *enable_O_SYNC_string = NULL; - int32_t ret = -1; + char *aggregate_size_string = NULL; + char *window_size_string = NULL; + char *flush_behind_string = NULL; + char *disable_till_string = NULL; + char *enable_O_SYNC_string = NULL; + int32_t ret = -1; if ((this->children == NULL) || this->children->next) { |