diff options
Diffstat (limited to 'xlators/performance/write-behind')
-rw-r--r-- | xlators/performance/write-behind/src/write-behind.c | 3486 |
1 files changed, 1068 insertions, 2418 deletions
diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c index ad1e5f03111..53506d948ba 100644 --- a/xlators/performance/write-behind/src/write-behind.c +++ b/xlators/performance/write-behind/src/write-behind.c @@ -8,8 +8,6 @@ cases as published by the Free Software Foundation. */ -/*TODO: check for non null wb_file_data before getting wb_file */ - #ifndef _CONFIG_H #define _CONFIG_H @@ -26,6 +24,7 @@ #include "common-utils.h" #include "call-stub.h" #include "statedump.h" +#include "defaults.h" #include "write-behind-mem-types.h" #define MAX_VECTOR_COUNT 8 @@ -34,96 +33,146 @@ typedef struct list_head list_head_t; struct wb_conf; -struct wb_page; struct wb_inode; typedef struct wb_inode { - size_t window_conf; - size_t window_current; - size_t aggregate_current; - int32_t op_ret; + ssize_t window_conf; + ssize_t window_current; + ssize_t transit; /* size of data stack_wound, and yet + to be fulfilled (wb_fulfill_cbk). + used for trickling_writes + */ + + int32_t op_ret; /* Last found op_ret and op_errno + while completing a liability + operation. Will be picked by + the next arriving writev/flush/fsync + */ int32_t op_errno; - list_head_t request; - list_head_t passive_requests; + + list_head_t all; /* All requests, from enqueue() till destroy(). + Used only for resetting generation + number when empty. + */ + list_head_t todo; /* Work to do (i.e, STACK_WIND to server). + Once we STACK_WIND, the entry is taken + off the list. If it is non-sync write, + then we continue to track it via @liability + or @temptation depending on the status + of its writeback. + */ + list_head_t liability; /* Non-sync writes which are lied + (STACK_UNWIND'ed to caller) but ack + from server not yet complete. This + is the "liability" which we hold, and + must guarantee that dependent operations + which arrive later (which overlap, etc.) + are issued only after their dependencies + in this list are "fulfilled". + + Server acks for entries in this list + shrinks the window. + + The sum total of all req->write_size + of entries in this list must be kept less + than the permitted window size. + */ + list_head_t temptation; /* Operations for which we are tempted + to 'lie' (write-behind), but temporarily + holding off (because of insufficient + window capacity, etc.) + + This is the list to look at to grow + the window (in __wb_pick_unwinds()). + + Entries typically get chosen from + write-behind from this list, and therefore + get "upgraded" to the "liability" list. + */ + uint64_t gen; /* Liability generation number. Represents + the current 'state' of liability. Every + new addition to the liability list bumps + the generation number. + + a newly arrived request is only required + to perform causal checks against the entries + in the liability list which were present + at the time of its addition. the generation + number at the time of its addition is stored + in the request and used during checks. + + the liability list can grow while the request + waits in the todo list waiting for its + dependent operations to complete. however + it is not of the request's concern to depend + itself on those new entries which arrived + after it arrived (i.e, those that have a + liability generation higher than itself) + */ gf_lock_t lock; xlator_t *this; -}wb_inode_t; - -typedef struct wb_file { - int32_t flags; - int disabled; - fd_t *fd; - size_t disable_till; - enum _gf_boolean dont_wind; -} wb_file_t; +} wb_inode_t; typedef struct wb_request { - list_head_t list; + list_head_t all; + list_head_t todo; + list_head_t lie; /* either in @liability or @temptation */ list_head_t winds; list_head_t unwinds; - list_head_t other_requests; + call_stub_t *stub; - size_t write_size; + + size_t write_size; /* currently held size + (after collapsing) */ + size_t orig_size; /* size which arrived with the request. + This is the size by which we grow + the window when unwinding the frame. + */ + size_t total_size; /* valid only in @head in wb_fulfill(). + This is the size with which we perform + STACK_WIND to server and therefore the + amount by which we shrink the window. + */ + + int op_ret; + int op_errno; + int32_t refcount; wb_inode_t *wb_inode; glusterfs_fop_t fop; gf_lkowner_t lk_owner; - union { - struct { - char write_behind; - char stack_wound; - char got_reply; - char virgin; - char flush_all; /* while trying to sync to back-end, - * don't wait till a data of size - * equal to configured aggregate-size - * is accumulated, instead sync - * whatever data currently present in - * request queue. - */ - - }write_request; - - struct { - char marked_for_resume; - }other_requests; - }flags; + struct iobref *iobref; + uint64_t gen; /* inode liability state at the time of + request arrival */ + + fd_t *fd; + struct { + size_t size; /* 0 size == till infinity */ + off_t off; + int append:1; /* offset is invalid. only one + outstanding append at a time */ + int tempted:1; /* true only for non-sync writes */ + int lied:1; /* sin committed */ + int fulfilled:1; /* got server acknowledgement */ + int go:1; /* enough aggregating, good to go */ + } ordering; } wb_request_t; -struct wb_conf { + +typedef struct wb_conf { uint64_t aggregate_size; uint64_t window_size; - uint64_t disable_till; - gf_boolean_t enable_O_SYNC; gf_boolean_t flush_behind; - gf_boolean_t enable_trickling_writes; -}; + gf_boolean_t trickling_writes; + gf_boolean_t strict_write_ordering; + gf_boolean_t strict_O_DIRECT; +} wb_conf_t; -typedef struct wb_local { - list_head_t winds; - int32_t flags; - fd_t *fd; - wb_request_t *request; - int op_ret; - int op_errno; - call_frame_t *frame; - int32_t reply_count; - wb_inode_t *wb_inode; -} wb_local_t; - -typedef struct wb_conf wb_conf_t; -typedef struct wb_page wb_page_t; -int32_t -wb_process_queue (call_frame_t *frame, wb_inode_t *wb_inode); - -ssize_t -wb_sync (call_frame_t *frame, wb_inode_t *wb_inode, list_head_t *winds); +void +wb_process_queue (wb_inode_t *wb_inode); -ssize_t -__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_size, - char enable_trickling_writes); wb_inode_t * __wb_inode_ctx_get (xlator_t *this, inode_t *inode) @@ -156,37 +205,6 @@ out: } -wb_file_t * -__wb_fd_ctx_get (xlator_t *this, fd_t *fd) -{ - wb_file_t *wb_file = NULL; - uint64_t value = 0; - - __fd_ctx_get (fd, this, &value); - wb_file = (wb_file_t *)(unsigned long)value; - - return wb_file; -} - - -wb_file_t * -wb_fd_ctx_get (xlator_t *this, fd_t *fd) -{ - wb_file_t *wb_file = NULL; - - GF_VALIDATE_OR_GOTO ("write-behind", this, out); - GF_VALIDATE_OR_GOTO (this->name, fd, out); - - LOCK (&fd->lock); - { - wb_file = __wb_fd_ctx_get (this, fd); - } - UNLOCK (&fd->lock); - -out: - return wb_file; -} - /* Below is a succinct explanation of the code deciding whether two regions overlap, from Pavan <tcp@gluster.com>. @@ -211,19 +229,26 @@ out: } */ -static inline char -wb_requests_overlap (wb_request_t *request1, wb_request_t *request2) +gf_boolean_t +wb_requests_overlap (wb_request_t *req1, wb_request_t *req2) { - off_t r1_start = 0, r1_end = 0, r2_start = 0, r2_end = 0; + off_t r1_start = 0; + off_t r1_end = 0; + off_t r2_start = 0; + off_t r2_end = 0; enum _gf_boolean do_overlap = 0; - r1_start = request1->stub->args.writev.off; - r1_end = r1_start + iov_length (request1->stub->args.writev.vector, - request1->stub->args.writev.count); + r1_start = req1->ordering.off; + if (req1->ordering.size) + r1_end = r1_start + req1->ordering.size - 1; + else + r1_end = ULLONG_MAX; - r2_start = request2->stub->args.writev.off; - r2_end = r2_start + iov_length (request2->stub->args.writev.vector, - request2->stub->args.writev.count); + r2_start = req2->ordering.off; + if (req2->ordering.size) + r2_end = r2_start + req2->ordering.size - 1; + else + r2_end = ULLONG_MAX; do_overlap = ((r1_end >= r2_start) && (r2_end >= r1_start)); @@ -231,72 +256,112 @@ wb_requests_overlap (wb_request_t *request1, wb_request_t *request2) } -static inline char -wb_overlap (list_head_t *list, wb_request_t *request) +gf_boolean_t +wb_requests_conflict (wb_request_t *lie, wb_request_t *req) { - char overlap = 0; - wb_request_t *tmp = NULL; + wb_conf_t *conf = NULL; - GF_VALIDATE_OR_GOTO ("write-behind", list, out); - GF_VALIDATE_OR_GOTO ("write-behind", request, out); + conf = req->wb_inode->this->private; - list_for_each_entry (tmp, list, list) { - if (tmp == request) { - break; - } + if (lie == req) + /* request cannot conflict with itself */ + return _gf_false; - overlap = wb_requests_overlap (tmp, request); - if (overlap) { - break; - } + if (lie->gen >= req->gen) + /* this liability entry was behind + us in the todo list */ + return _gf_false; + + if (lie->ordering.append) + /* all modifications wait for the completion + of outstanding append */ + return _gf_true; + + if (conf->strict_write_ordering) + /* We are sure (lie->gen < req->gen) by now. So + skip overlap check if strict write ordering is + requested and always return "conflict" against a + lower generation lie. */ + return _gf_true; + + return wb_requests_overlap (lie, req); +} + + +gf_boolean_t +wb_liability_has_conflict (wb_inode_t *wb_inode, wb_request_t *req) +{ + wb_request_t *each = NULL; + + list_for_each_entry (each, &wb_inode->liability, lie) { + if (wb_requests_conflict (each, req)) + return _gf_true; } -out: - return overlap; + return _gf_false; } static int -__wb_request_unref (wb_request_t *this) +__wb_request_unref (wb_request_t *req) { - int ret = -1; + int ret = -1; + wb_inode_t *wb_inode = NULL; - GF_VALIDATE_OR_GOTO ("write-behind", this, out); + wb_inode = req->wb_inode; - if (this->refcount <= 0) { + if (req->refcount <= 0) { gf_log ("wb-request", GF_LOG_WARNING, - "refcount(%d) is <= 0", this->refcount); + "refcount(%d) is <= 0", req->refcount); goto out; } - ret = --this->refcount; - if (this->refcount == 0) { - list_del_init (&this->list); - if (this->stub && this->stub->fop == GF_FOP_WRITE) { - call_stub_destroy (this->stub); - } + ret = --req->refcount; + if (req->refcount == 0) { + list_del_init (&req->todo); + list_del_init (&req->lie); - GF_FREE (this); - } + list_del_init (&req->all); + if (list_empty (&wb_inode->all)) { + wb_inode->gen = 0; + /* in case of accounting errors? */ + wb_inode->window_current = 0; + } + + list_del_init (&req->winds); + list_del_init (&req->unwinds); + + if (req->stub && req->ordering.tempted) { + call_stub_destroy (req->stub); + req->stub = NULL; + } /* else we would have call_resume()'ed */ + + if (req->iobref) + iobref_unref (req->iobref); + if (req->fd) + fd_unref (req->fd); + + GF_FREE (req); + } out: return ret; } static int -wb_request_unref (wb_request_t *this) +wb_request_unref (wb_request_t *req) { wb_inode_t *wb_inode = NULL; int ret = -1; - GF_VALIDATE_OR_GOTO ("write-behind", this, out); + GF_VALIDATE_OR_GOTO ("write-behind", req, out); - wb_inode = this->wb_inode; + wb_inode = req->wb_inode; LOCK (&wb_inode->lock); { - ret = __wb_request_unref (this); + ret = __wb_request_unref (req); } UNLOCK (&wb_inode->lock); @@ -306,117 +371,155 @@ out: static wb_request_t * -__wb_request_ref (wb_request_t *this) +__wb_request_ref (wb_request_t *req) { - GF_VALIDATE_OR_GOTO ("write-behind", this, out); + GF_VALIDATE_OR_GOTO ("write-behind", req, out); - if (this->refcount < 0) { + if (req->refcount < 0) { gf_log ("wb-request", GF_LOG_WARNING, - "refcount(%d) is < 0", this->refcount); - this = NULL; + "refcount(%d) is < 0", req->refcount); + req = NULL; goto out; } - this->refcount++; + req->refcount++; out: - return this; + return req; } wb_request_t * -wb_request_ref (wb_request_t *this) +wb_request_ref (wb_request_t *req) { wb_inode_t *wb_inode = NULL; - GF_VALIDATE_OR_GOTO ("write-behind", this, out); + GF_VALIDATE_OR_GOTO ("write-behind", req, out); - wb_inode = this->wb_inode; + wb_inode = req->wb_inode; LOCK (&wb_inode->lock); { - this = __wb_request_ref (this); + req = __wb_request_ref (req); } UNLOCK (&wb_inode->lock); out: - return this; + return req; } -wb_request_t * -wb_enqueue (wb_inode_t *wb_inode, call_stub_t *stub) +gf_boolean_t +wb_enqueue_common (wb_inode_t *wb_inode, call_stub_t *stub, int tempted) { - wb_request_t *request = NULL, *tmp = NULL; - call_frame_t *frame = NULL; - wb_local_t *local = NULL; - struct iovec *vector = NULL; - int32_t count = 0; + wb_request_t *req = NULL; GF_VALIDATE_OR_GOTO ("write-behind", wb_inode, out); GF_VALIDATE_OR_GOTO (wb_inode->this->name, stub, out); - request = GF_CALLOC (1, sizeof (*request), gf_wb_mt_wb_request_t); - if (request == NULL) { + req = GF_CALLOC (1, sizeof (*req), gf_wb_mt_wb_request_t); + if (!req) goto out; - } - INIT_LIST_HEAD (&request->list); - INIT_LIST_HEAD (&request->winds); - INIT_LIST_HEAD (&request->unwinds); - INIT_LIST_HEAD (&request->other_requests); + INIT_LIST_HEAD (&req->all); + INIT_LIST_HEAD (&req->todo); + INIT_LIST_HEAD (&req->lie); + INIT_LIST_HEAD (&req->winds); + INIT_LIST_HEAD (&req->unwinds); - request->stub = stub; - request->wb_inode = wb_inode; - request->fop = stub->fop; - - frame = stub->frame; - local = frame->local; - if (local) { - local->request = request; - } + req->stub = stub; + req->wb_inode = wb_inode; + req->fop = stub->fop; + req->ordering.tempted = tempted; if (stub->fop == GF_FOP_WRITE) { - vector = stub->args.writev.vector; - count = stub->args.writev.count; + req->write_size = iov_length (stub->args.writev.vector, + stub->args.writev.count); - request->write_size = iov_length (vector, count); - if (local) { - local->op_ret = request->write_size; - local->op_errno = 0; - } + /* req->write_size can change as we collapse + small writes. But the window needs to grow + only by how much we acknowledge the app. so + copy the original size in orig_size for the + purpose of accounting. + */ + req->orig_size = req->write_size; + + /* Let's be optimistic that we can + lie about it + */ + req->op_ret = req->write_size; + req->op_errno = 0; - request->flags.write_request.virgin = 1; + if (stub->args.writev.fd->flags & O_APPEND) + req->ordering.append = 1; } - request->lk_owner = frame->root->lk_owner; + req->lk_owner = stub->frame->root->lk_owner; + + switch (stub->fop) { + case GF_FOP_WRITE: + req->ordering.off = stub->args.writev.off; + req->ordering.size = req->write_size; + + req->fd = fd_ref (stub->args.writev.fd); + + break; + case GF_FOP_READ: + req->ordering.off = stub->args.readv.off; + req->ordering.size = stub->args.readv.size; + + req->fd = fd_ref (stub->args.readv.fd); + + break; + case GF_FOP_TRUNCATE: + req->ordering.off = stub->args.truncate.off; + req->ordering.size = 0; /* till infinity */ + break; + case GF_FOP_FTRUNCATE: + req->ordering.off = stub->args.ftruncate.off; + req->ordering.size = 0; /* till infinity */ + + req->fd = fd_ref (stub->args.ftruncate.fd); + + break; + default: + break; + } LOCK (&wb_inode->lock); { - list_add_tail (&request->list, &wb_inode->request); - if (stub->fop == GF_FOP_WRITE) { - /* reference for stack winding */ - __wb_request_ref (request); - - /* reference for stack unwinding */ - __wb_request_ref (request); - - wb_inode->aggregate_current += request->write_size; - } else { - list_for_each_entry (tmp, &wb_inode->request, list) { - if (tmp->stub && tmp->stub->fop - == GF_FOP_WRITE) { - tmp->flags.write_request.flush_all = 1; - } - } - - /*reference for resuming */ - __wb_request_ref (request); - } + list_add_tail (&req->all, &wb_inode->all); + + req->gen = wb_inode->gen; + + list_add_tail (&req->todo, &wb_inode->todo); + __wb_request_ref (req); /* for wind */ + + if (req->ordering.tempted) { + list_add_tail (&req->lie, &wb_inode->temptation); + __wb_request_ref (req); /* for unwind */ + } } UNLOCK (&wb_inode->lock); out: - return request; + if (!req) + return _gf_false; + + return _gf_true; +} + + +gf_boolean_t +wb_enqueue (wb_inode_t *wb_inode, call_stub_t *stub) +{ + return wb_enqueue_common (wb_inode, stub, 0); +} + + +gf_boolean_t +wb_enqueue_tempted (wb_inode_t *wb_inode, call_stub_t *stub) +{ + return wb_enqueue_common (wb_inode, stub, 1); } @@ -426,18 +529,18 @@ __wb_inode_create (xlator_t *this, inode_t *inode) wb_inode_t *wb_inode = NULL; wb_conf_t *conf = NULL; - GF_VALIDATE_OR_GOTO ("write-behind", this, out); GF_VALIDATE_OR_GOTO (this->name, inode, out); conf = this->private; wb_inode = GF_CALLOC (1, sizeof (*wb_inode), gf_wb_mt_wb_inode_t); - if (wb_inode == NULL) { + if (!wb_inode) goto out; - } - INIT_LIST_HEAD (&wb_inode->request); - INIT_LIST_HEAD (&wb_inode->passive_requests); + INIT_LIST_HEAD (&wb_inode->all); + INIT_LIST_HEAD (&wb_inode->todo); + INIT_LIST_HEAD (&wb_inode->liability); + INIT_LIST_HEAD (&wb_inode->temptation); wb_inode->this = this; @@ -452,58 +555,18 @@ out: } -wb_file_t * -wb_file_create (xlator_t *this, fd_t *fd, int32_t flags) -{ - wb_file_t *file = NULL; - wb_conf_t *conf = NULL; - - GF_VALIDATE_OR_GOTO ("write-behind", this, out); - GF_VALIDATE_OR_GOTO (this->name, fd, out); - - conf = this->private; - - file = GF_CALLOC (1, sizeof (*file), gf_wb_mt_wb_file_t); - if (file == NULL) { - goto out; - } - - /* - fd_ref() not required, file should never decide the existence of - an fd - */ - file->fd= fd; - /* If O_DIRECT then, we disable chaching */ - if (((flags & O_DIRECT) == O_DIRECT) - || ((flags & O_ACCMODE) == O_RDONLY) - || (((flags & O_SYNC) == O_SYNC) - && conf->enable_O_SYNC == _gf_true)) { - file->disabled = 1; - } - - file->flags = flags; - - fd_ctx_set (fd, this, (uint64_t)(unsigned long)file); - -out: - return file; -} - - wb_inode_t * wb_inode_create (xlator_t *this, inode_t *inode) { wb_inode_t *wb_inode = NULL; - GF_VALIDATE_OR_GOTO ("write-behind", this, out); GF_VALIDATE_OR_GOTO (this->name, inode, out); LOCK (&inode->lock); { wb_inode = __wb_inode_ctx_get (this, inode); - if (wb_inode == NULL) { + if (!wb_inode) wb_inode = __wb_inode_create (this, inode); - } } UNLOCK (&inode->lock); @@ -524,2390 +587,1049 @@ out: } -int32_t -wb_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, inode_t *inode, - struct iatt *buf, dict_t *dict, struct iatt *postparent) +void +__wb_fulfill_request (wb_request_t *req) { - wb_inode_t *wb_inode = NULL; + wb_inode_t *wb_inode = NULL; - if (op_ret < 0) { - goto unwind; - } + wb_inode = req->wb_inode; - wb_inode = wb_inode_create (this, inode); - if (wb_inode == NULL) { - op_ret = -1; - op_errno = ENOMEM; - } + req->ordering.fulfilled = 1; + wb_inode->window_current -= req->total_size; + wb_inode->transit -= req->total_size; -unwind: - STACK_UNWIND_STRICT (lookup, frame, op_ret, op_errno, inode, buf, - dict, postparent); + if (!req->ordering.lied) { + /* TODO: fail the req->frame with error if + necessary + */ + } - return 0; + __wb_request_unref (req); } -int32_t -wb_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, - dict_t *xdata) +void +wb_head_done (wb_request_t *head) { - STACK_WIND (frame, wb_lookup_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->lookup, loc, xdata); - return 0; -} - + wb_request_t *req = NULL; + wb_request_t *tmp = NULL; + wb_inode_t *wb_inode = NULL; -int32_t -wb_sync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, - int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, - dict_t *xdata) -{ - wb_local_t *local = NULL; - list_head_t *winds = NULL; - wb_inode_t *wb_inode = NULL; - wb_request_t *request = NULL, *dummy = NULL; - wb_local_t *per_request_local = NULL; - int32_t ret = -1; - int32_t total_write_size = 0; - fd_t *fd = NULL; + wb_inode = head->wb_inode; - GF_ASSERT (frame); - GF_ASSERT (this); + LOCK (&wb_inode->lock); + { + list_for_each_entry_safe (req, tmp, &head->winds, winds) { + __wb_fulfill_request (req); + } + __wb_fulfill_request (head); + } + UNLOCK (&wb_inode->lock); +} - local = frame->local; - winds = &local->winds; - fd = local->fd; +void +wb_inode_err (wb_inode_t *wb_inode, int op_errno) +{ + LOCK (&wb_inode->lock); + { + wb_inode->op_ret = -1; + wb_inode->op_errno = op_errno; + } + UNLOCK (&wb_inode->lock); +} - wb_inode = wb_inode_ctx_get (this, fd->inode); - GF_VALIDATE_OR_GOTO (this->name, wb_inode, out); - LOCK (&wb_inode->lock); - { - list_for_each_entry_safe (request, dummy, winds, winds) { - request->flags.write_request.got_reply = 1; - - if (!request->flags.write_request.write_behind - && (op_ret == -1)) { - per_request_local = request->stub->frame->local; - per_request_local->op_ret = op_ret; - per_request_local->op_errno = op_errno; - } - - if (request->flags.write_request.write_behind) { - wb_inode->window_current -= request->write_size; - total_write_size += request->write_size; - } - - __wb_request_unref (request); - } +int +wb_fulfill_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, struct iatt *prebuf, + struct iatt *postbuf, dict_t *xdata) +{ + wb_inode_t *wb_inode = NULL; + wb_request_t *head = NULL; - if (op_ret == -1) { - wb_inode->op_ret = op_ret; - wb_inode->op_errno = op_errno; - } else if (op_ret < total_write_size) { - /* - * We've encountered a short write, for whatever reason. - * Set an EIO error for the next fop. This should be - * valid for writev or flush (close). - * - * TODO: Retry the write so we can potentially capture - * a real error condition (i.e., ENOSPC). - */ - wb_inode->op_ret = -1; - wb_inode->op_errno = EIO; - } - } - UNLOCK (&wb_inode->lock); + head = frame->local; + frame->local = NULL; - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - if (errno == ENOMEM) { - LOCK (&wb_inode->lock); - { - wb_inode->op_ret = -1; - wb_inode->op_errno = ENOMEM; - } - UNLOCK (&wb_inode->lock); - } + wb_inode = head->wb_inode; - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } + if (op_ret == -1) { + wb_inode_err (wb_inode, op_errno); + } else if (op_ret < head->total_size) { + /* + * We've encountered a short write, for whatever reason. + * Set an EIO error for the next fop. This should be + * valid for writev or flush (close). + * + * TODO: Retry the write so we can potentially capture + * a real error condition (i.e., ENOSPC). + */ + wb_inode_err (wb_inode, EIO); + } - /* safe place to do fd_unref */ - fd_unref (fd); + wb_head_done (head); - frame->local = NULL; - - if (local != NULL) { - mem_put (frame->local); - } + wb_process_queue (wb_inode); STACK_DESTROY (frame->root); -out: return 0; } -ssize_t -wb_sync (call_frame_t *frame, wb_inode_t *wb_inode, list_head_t *winds) -{ - wb_request_t *dummy = NULL, *request = NULL; - wb_request_t *first_request = NULL, *next = NULL; - size_t total_count = 0, count = 0; - size_t copied = 0; - call_frame_t *sync_frame = NULL; - struct iobref *iobref = NULL; - wb_local_t *local = NULL; - struct iovec *vector = NULL; - ssize_t current_size = 0, bytes = 0; - size_t bytecount = 0; - wb_conf_t *conf = NULL; - fd_t *fd = NULL; - int32_t op_errno = -1; - off_t next_offset_expected = 0; - gf_lkowner_t lk_owner = {0, }; - - GF_VALIDATE_OR_GOTO_WITH_ERROR ((wb_inode ? wb_inode->this->name - : "write-behind"), frame, - out, bytes, -1); - GF_VALIDATE_OR_GOTO_WITH_ERROR (frame->this->name, wb_inode, out, bytes, - -1); - GF_VALIDATE_OR_GOTO_WITH_ERROR (frame->this->name, winds, out, bytes, - -1); - - conf = wb_inode->this->private; - list_for_each_entry (request, winds, winds) { - total_count += request->stub->args.writev.count; - if (total_count > 0) { - break; - } - } +#define WB_IOV_LOAD(vec, cnt, req, head) do { \ + memcpy (&vec[cnt], req->stub->args.writev.vector, \ + (req->stub->args.writev.count * sizeof(vec[0]))); \ + cnt += req->stub->args.writev.count; \ + head->total_size += req->write_size; \ + } while (0) - if (total_count == 0) { - gf_log (wb_inode->this->name, GF_LOG_TRACE, - "no vectors are to be synced"); - goto out; - } - list_for_each_entry_safe (request, dummy, winds, winds) { - if (!vector) { - vector = GF_MALLOC (VECTORSIZE (MAX_VECTOR_COUNT), - gf_wb_mt_iovec); - if (vector == NULL) { - bytes = -1; - op_errno = ENOMEM; - goto out; - } - - iobref = iobref_new (); - if (iobref == NULL) { - bytes = -1; - op_errno = ENOMEM; - goto out; - } - - local = mem_get0 (THIS->local_pool); - if (local == NULL) { - bytes = -1; - op_errno = ENOMEM; - goto out; - } - - INIT_LIST_HEAD (&local->winds); - - first_request = request; - current_size = 0; - - next_offset_expected = request->stub->args.writev.off - + request->write_size; - lk_owner = request->lk_owner; - } +void +wb_fulfill_head (wb_inode_t *wb_inode, wb_request_t *head) +{ + struct iovec vector[MAX_VECTOR_COUNT]; + int count = 0; + wb_request_t *req = NULL; + call_frame_t *frame = NULL; - count += request->stub->args.writev.count; - bytecount = VECTORSIZE (request->stub->args.writev.count); - memcpy (((char *)vector)+copied, - request->stub->args.writev.vector, - bytecount); - copied += bytecount; + frame = create_frame (wb_inode->this, wb_inode->this->ctx->pool); + if (!frame) + goto enomem; - current_size += request->write_size; + WB_IOV_LOAD (vector, count, head, head); - if (request->stub->args.writev.iobref) { - iobref_merge (iobref, - request->stub->args.writev.iobref); - } + list_for_each_entry (req, &head->winds, winds) { + WB_IOV_LOAD (vector, count, req, head); - next = NULL; - if (request->winds.next != winds) { - next = list_entry (request->winds.next, - wb_request_t, winds); - } + iobref_merge (head->stub->args.writev.iobref, + req->stub->args.writev.iobref); + } - list_del_init (&request->winds); - list_add_tail (&request->winds, &local->winds); - - if ((!next) - || ((count + next->stub->args.writev.count) - > MAX_VECTOR_COUNT) - || ((current_size + next->write_size) - > conf->aggregate_size) - || (next_offset_expected != next->stub->args.writev.off) - || (!is_same_lkowner (&lk_owner, &next->lk_owner)) - || (request->stub->args.writev.fd - != next->stub->args.writev.fd)) { - - sync_frame = copy_frame (frame); - if (sync_frame == NULL) { - bytes = -1; - op_errno = ENOMEM; - goto out; - } - - frame->root->lk_owner = lk_owner; - - local->wb_inode = wb_inode; - sync_frame->local = local; - - local->fd = fd = fd_ref (request->stub->args.writev.fd); - - bytes += current_size; - STACK_WIND (sync_frame, wb_sync_cbk, - FIRST_CHILD(sync_frame->this), - FIRST_CHILD(sync_frame->this)->fops->writev, - fd, vector, count, - first_request->stub->args.writev.off, - first_request->stub->args.writev.flags, - iobref, NULL); + frame->root->lk_owner = head->lk_owner; + frame->local = head; - iobref_unref (iobref); - GF_FREE (vector); - first_request = NULL; - iobref = NULL; - vector = NULL; - sync_frame = NULL; - local = NULL; - copied = count = 0; - } - } + LOCK (&wb_inode->lock); + { + wb_inode->transit += head->total_size; + } + UNLOCK (&wb_inode->lock); -out: - if (sync_frame != NULL) { - sync_frame->local = NULL; - STACK_DESTROY (sync_frame->root); - } + STACK_WIND (frame, wb_fulfill_cbk, FIRST_CHILD (frame->this), + FIRST_CHILD (frame->this)->fops->writev, + head->fd, vector, count, + head->stub->args.writev.off, + head->stub->args.writev.flags, + head->stub->args.writev.iobref, NULL); - if (local != NULL) { - /* had we winded these requests, we would have unrefed - * in wb_sync_cbk. - */ - list_for_each_entry_safe (request, dummy, &local->winds, - winds) { - wb_request_unref (request); - } + return; +enomem: + wb_inode_err (wb_inode, ENOMEM); - mem_put (local); - local = NULL; - } + wb_head_done (head); - if (iobref != NULL) { - iobref_unref (iobref); - } + return; +} - GF_FREE (vector); - - if (bytes == -1) { - /* - * had we winded these requests, we would have unrefed - * in wb_sync_cbk. - */ - if (local) { - list_for_each_entry_safe (request, dummy, &local->winds, - winds) { - wb_request_unref (request); - } - } - if (wb_inode != NULL) { - LOCK (&wb_inode->lock); - { - wb_inode->op_ret = -1; - wb_inode->op_errno = op_errno; - } - UNLOCK (&wb_inode->lock); - } - } - - return bytes; -} +#define NEXT_HEAD(head, req) do { \ + if (head) \ + wb_fulfill_head (wb_inode, head); \ + head = req; \ + expected_offset = req->stub->args.writev.off + \ + req->write_size; \ + curr_aggregate = 0; \ + vector_count = 0; \ + } while (0) -int32_t -wb_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, - int32_t op_errno, struct iatt *buf, dict_t *xdata) +void +wb_fulfill (wb_inode_t *wb_inode, list_head_t *liabilities) { - wb_local_t *local = NULL; - wb_request_t *request = NULL; - call_frame_t *process_frame = NULL; - wb_inode_t *wb_inode = NULL; - int32_t ret = -1; - - GF_ASSERT (frame); - GF_ASSERT (this); + wb_request_t *req = NULL; + wb_request_t *head = NULL; + wb_request_t *tmp = NULL; + wb_conf_t *conf = NULL; + off_t expected_offset = 0; + size_t curr_aggregate = 0; + size_t vector_count = 0; - local = frame->local; - wb_inode = local->wb_inode; + conf = wb_inode->this->private; - request = local->request; - if (request) { - process_frame = copy_frame (frame); - if (process_frame == NULL) { - op_ret = -1; - op_errno = ENOMEM; - } - } + list_for_each_entry_safe (req, tmp, liabilities, winds) { + list_del_init (&req->winds); - STACK_UNWIND_STRICT (stat, frame, op_ret, op_errno, buf, xdata); + if (!head) { + NEXT_HEAD (head, req); + continue; + } - if (request != NULL) { - wb_request_unref (request); - } + if (req->fd != head->fd) { + NEXT_HEAD (head, req); + continue; + } - if (process_frame != NULL) { - ret = wb_process_queue (process_frame, wb_inode); - if (ret == -1) { - if ((errno == ENOMEM) && (wb_inode != NULL)) { - LOCK (&wb_inode->lock); - { - wb_inode->op_ret = -1; - wb_inode->op_errno = ENOMEM; - } - UNLOCK (&wb_inode->lock); - } - - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } + if (!is_same_lkowner (&req->lk_owner, &head->lk_owner)) { + NEXT_HEAD (head, req); + continue; + } - STACK_DESTROY (process_frame->root); - } + if (expected_offset != req->stub->args.writev.off) { + NEXT_HEAD (head, req); + continue; + } - return 0; -} + if ((curr_aggregate + req->write_size) > conf->aggregate_size) { + NEXT_HEAD (head, req); + continue; + } + if (vector_count + req->stub->args.writev.count > + MAX_VECTOR_COUNT) { + NEXT_HEAD (head, req); + continue; + } -static int32_t -wb_stat_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) -{ - GF_ASSERT (frame); - GF_ASSERT (this); + list_add_tail (&req->winds, &head->winds); + curr_aggregate += req->write_size; + vector_count += req->stub->args.writev.count; + } - STACK_WIND (frame, wb_stat_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->stat, loc, xdata); - return 0; + if (head) + wb_fulfill_head (wb_inode, head); + return; } -int32_t -wb_stat (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) +void +wb_do_unwinds (wb_inode_t *wb_inode, list_head_t *lies) { - wb_inode_t *wb_inode = NULL; - wb_local_t *local = NULL; - call_stub_t *stub = NULL; - wb_request_t *request = NULL; - int32_t ret = -1, op_errno = EINVAL; - - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO (frame->this->name, this, unwind); - GF_VALIDATE_OR_GOTO (frame->this->name, loc, unwind); - - if (loc->inode) { - wb_inode = wb_inode_ctx_get (this, loc->inode); - } - - local = mem_get0 (this->local_pool); - if (local == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - local->wb_inode = wb_inode; - - frame->local = local; - - if (wb_inode) { - stub = fop_stat_stub (frame, wb_stat_helper, loc, xdata); - if (stub == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - request = wb_enqueue (wb_inode, stub); - if (request == NULL) { - op_errno = ENOMEM; - goto unwind; - } + wb_request_t *req = NULL; + wb_request_t *tmp = NULL; + call_frame_t *frame = NULL; + struct iatt buf = {0, }; - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } else { - STACK_WIND (frame, wb_stat_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->stat, loc, xdata); - } + list_for_each_entry_safe (req, tmp, lies, unwinds) { + frame = req->stub->frame; - return 0; -unwind: - STACK_UNWIND_STRICT (stat, frame, -1, op_errno, NULL, NULL); + STACK_UNWIND_STRICT (writev, frame, req->op_ret, req->op_errno, + &buf, &buf, NULL); /* :O */ + req->stub->frame = NULL; - if (stub) { - call_stub_destroy (stub); + list_del_init (&req->unwinds); + wb_request_unref (req); } - return 0; + return; } -int32_t -wb_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, - int32_t op_errno, struct iatt *buf, dict_t *xdata) +void +__wb_pick_unwinds (wb_inode_t *wb_inode, list_head_t *lies) { - wb_local_t *local = NULL; - wb_request_t *request = NULL; - wb_inode_t *wb_inode = NULL; - int32_t ret = -1; - - GF_ASSERT (frame); - - local = frame->local; - wb_inode = local->wb_inode; - - request = local->request; - if ((wb_inode != NULL) && (request != NULL)) { - wb_request_unref (request); - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - if (errno == ENOMEM) { - op_ret = -1; - op_errno = ENOMEM; - } - - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } - - STACK_UNWIND_STRICT (fstat, frame, op_ret, op_errno, buf, xdata); - - return 0; -} + wb_request_t *req = NULL; + wb_request_t *tmp = NULL; + list_for_each_entry_safe (req, tmp, &wb_inode->temptation, lie) { + if (!req->ordering.fulfilled && + wb_inode->window_current > wb_inode->window_conf) + continue; -int32_t -wb_fstat_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) -{ - GF_ASSERT (frame); - GF_ASSERT (this); - - STACK_WIND (frame, wb_fstat_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fstat, fd, xdata); - return 0; -} + list_del_init (&req->lie); + list_move_tail (&req->unwinds, lies); + wb_inode->window_current += req->orig_size; -int32_t -wb_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) -{ - wb_inode_t *wb_inode = NULL; - wb_local_t *local = NULL; - call_stub_t *stub = NULL; - wb_request_t *request = NULL; - int32_t ret = -1; - int op_errno = EINVAL; + if (!req->ordering.fulfilled) { + /* burden increased */ + list_add_tail (&req->lie, &wb_inode->liability); - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO (frame->this->name, this, unwind); - GF_VALIDATE_OR_GOTO (frame->this->name, fd, unwind); + req->ordering.lied = 1; - wb_inode = wb_inode_ctx_get (this, fd->inode); - if ((!IA_ISDIR (fd->inode->ia_type)) && (wb_inode == NULL)) { - gf_log (this->name, GF_LOG_WARNING, - "wb_inode not found for fd %p", fd); - op_errno = EBADFD; - goto unwind; - } + wb_inode->gen++; + } + } - local = mem_get0 (this->local_pool); - if (local == NULL) { - op_errno = ENOMEM; - goto unwind; - } + return; +} - local->wb_inode = wb_inode; - frame->local = local; +int +__wb_collapse_small_writes (wb_request_t *holder, wb_request_t *req) +{ + char *ptr = NULL; + struct iobuf *iobuf = NULL; + struct iobref *iobref = NULL; + int ret = -1; - if (wb_inode) { - stub = fop_fstat_stub (frame, wb_fstat_helper, fd, xdata); - if (stub == NULL) { - op_errno = ENOMEM; - goto unwind; + if (!holder->iobref) { + /* TODO: check the required size */ + iobuf = iobuf_get (req->wb_inode->this->ctx->iobuf_pool); + if (iobuf == NULL) { + goto out; } - request = wb_enqueue (wb_inode, stub); - if (request == NULL) { - op_errno = ENOMEM; - goto unwind; + iobref = iobref_new (); + if (iobref == NULL) { + iobuf_unref (iobuf); + goto out; } - /* - FIXME:should the request queue be emptied in case of error? - */ - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); + ret = iobref_add (iobref, iobuf); + if (ret != 0) { + iobuf_unref (iobuf); + iobref_unref (iobref); + gf_log (req->wb_inode->this->name, GF_LOG_WARNING, + "cannot add iobuf (%p) into iobref (%p)", + iobuf, iobref); + goto out; } - } else { - STACK_WIND (frame, wb_fstat_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fstat, fd, xdata); - } - - return 0; - -unwind: - STACK_UNWIND_STRICT (fstat, frame, -1, op_errno, NULL, NULL); - - if (stub) { - call_stub_destroy (stub); - } - return 0; -} - - -int32_t -wb_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *prebuf, - struct iatt *postbuf, dict_t *xdata) -{ - wb_local_t *local = NULL; - wb_request_t *request = NULL; - wb_inode_t *wb_inode = NULL; - call_frame_t *process_frame = NULL; - int32_t ret = -1; + iov_unload (iobuf->ptr, holder->stub->args.writev.vector, + holder->stub->args.writev.count); + holder->stub->args.writev.vector[0].iov_base = iobuf->ptr; + holder->stub->args.writev.count = 1; - GF_ASSERT (frame); + iobref_unref (holder->stub->args.writev.iobref); + holder->stub->args.writev.iobref = iobref; - local = frame->local; - wb_inode = local->wb_inode; - request = local->request; + iobuf_unref (iobuf); - if ((request != NULL) && (wb_inode != NULL)) { - process_frame = copy_frame (frame); - if (process_frame == NULL) { - op_ret = -1; - op_errno = ENOMEM; - } + holder->iobref = iobref_ref (iobref); } - STACK_UNWIND_STRICT (truncate, frame, op_ret, op_errno, prebuf, - postbuf, xdata); - - if (request) { - wb_request_unref (request); - } + ptr = holder->stub->args.writev.vector[0].iov_base + holder->write_size; - if (process_frame != NULL) { - ret = wb_process_queue (process_frame, wb_inode); - if (ret == -1) { - if ((errno == ENOMEM) && (wb_inode != NULL)) { - LOCK (&wb_inode->lock); - { - wb_inode->op_ret = -1; - wb_inode->op_errno = ENOMEM; - } - UNLOCK (&wb_inode->lock); - } - - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } + iov_unload (ptr, req->stub->args.writev.vector, + req->stub->args.writev.count); - STACK_DESTROY (process_frame->root); - } + holder->stub->args.writev.vector[0].iov_len += req->write_size; + holder->write_size += req->write_size; + holder->ordering.size += req->write_size; - return 0; + ret = 0; +out: + return ret; } -static int32_t -wb_truncate_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, - off_t offset, dict_t *xdata) +void +__wb_preprocess_winds (wb_inode_t *wb_inode) { - GF_ASSERT (frame); - GF_ASSERT (this); - - STACK_WIND (frame, wb_truncate_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); + off_t offset_expected = 0; + size_t space_left = 0; + wb_request_t *req = NULL; + wb_request_t *tmp = NULL; + wb_request_t *holder = NULL; + wb_conf_t *conf = NULL; + int ret = 0; + size_t page_size = 0; + + /* With asynchronous IO from a VM guest (as a file), there + can be two sequential writes happening in two regions + of the file. But individual (broken down) IO requests + can arrive interleaved. + + TODO: cycle for each such sequence sifting + through the interleaved ops + */ + + page_size = wb_inode->this->ctx->page_size; + conf = wb_inode->this->private; + + list_for_each_entry_safe (req, tmp, &wb_inode->todo, todo) { + if (!req->ordering.tempted) { + if (holder) { + if (wb_requests_conflict (holder, req)) + /* do not hold on write if a + dependent write is in queue */ + holder->ordering.go = 1; + } + /* collapse only non-sync writes */ + continue; + } else if (!holder) { + /* holder is always a non-sync write */ + holder = req; + continue; + } - return 0; -} + offset_expected = holder->stub->args.writev.off + + holder->write_size; + if (req->stub->args.writev.off != offset_expected) { + holder->ordering.go = 1; + holder = req; + continue; + } -int32_t -wb_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, - dict_t *xdata) -{ - wb_inode_t *wb_inode = NULL; - wb_local_t *local = NULL; - call_stub_t *stub = NULL; - wb_request_t *request = NULL; - int32_t ret = -1, op_errno = EINVAL; + if (!is_same_lkowner (&req->lk_owner, &holder->lk_owner)) { + holder->ordering.go = 1; + holder = req; + continue; + } - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO (frame->this->name, this, unwind); - GF_VALIDATE_OR_GOTO (frame->this->name, loc, unwind); + space_left = page_size - holder->write_size; - if (loc->inode) { - wb_inode = wb_inode_ctx_get (this, loc->inode); - } + if (space_left < req->write_size) { + holder->ordering.go = 1; + holder = req; + continue; + } - local = mem_get0 (this->local_pool); - if (local == NULL) { - op_errno = ENOMEM; - goto unwind; - } + ret = __wb_collapse_small_writes (holder, req); + if (ret) + continue; - local->wb_inode = wb_inode; + /* collapsed request is as good as wound + (from its p.o.v) + */ + list_del_init (&req->todo); + __wb_fulfill_request (req); - frame->local = local; - if (wb_inode) { - stub = fop_truncate_stub (frame, wb_truncate_helper, loc, - offset, xdata); - if (stub == NULL) { - op_errno = ENOMEM; - goto unwind; - } + /* Only the last @holder in queue which - request = wb_enqueue (wb_inode, stub); - if (request == NULL) { - op_errno = ENOMEM; - goto unwind; - } + - does not have any non-buffered-writes following it + - has not yet filled its capacity - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } else { - STACK_WIND (frame, wb_truncate_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->truncate, loc, offset, - xdata); + does not get its 'go' set, in anticipation of the arrival + of consecutive smaller writes. + */ } - return 0; - -unwind: - STACK_UNWIND_STRICT (truncate, frame, -1, op_errno, NULL, NULL, NULL); + /* but if trickling writes are enabled, then do not hold back + writes if there are no outstanding requests + */ - if (stub) { - call_stub_destroy (stub); - } + if (conf->trickling_writes && !wb_inode->transit && holder) + holder->ordering.go = 1; - return 0; + return; } -int32_t -wb_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *prebuf, - struct iatt *postbuf, dict_t *xdata) +void +__wb_pick_winds (wb_inode_t *wb_inode, list_head_t *tasks, + list_head_t *liabilities) { - wb_local_t *local = NULL; - wb_request_t *request = NULL; - wb_inode_t *wb_inode = NULL; - int32_t ret = -1; - - GF_ASSERT (frame); - - local = frame->local; - wb_inode = local->wb_inode; - request = local->request; - - if ((request != NULL) && (wb_inode != NULL)) { - wb_request_unref (request); - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - if (errno == ENOMEM) { - op_ret = -1; - op_errno = ENOMEM; - } - - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } - - STACK_UNWIND_STRICT (ftruncate, frame, op_ret, op_errno, prebuf, - postbuf, xdata); + wb_request_t *req = NULL; + wb_request_t *tmp = NULL; - return 0; -} + list_for_each_entry_safe (req, tmp, &wb_inode->todo, todo) { + if (wb_liability_has_conflict (wb_inode, req)) + continue; + if (req->ordering.tempted && !req->ordering.go) + /* wait some more */ + continue; -static int32_t -wb_ftruncate_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, - off_t offset, dict_t *xdata) -{ - GF_ASSERT (frame); - GF_ASSERT (this); + list_del_init (&req->todo); - STACK_WIND (frame, wb_ftruncate_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); - return 0; + if (req->ordering.tempted) + list_add_tail (&req->winds, liabilities); + else + list_add_tail (&req->winds, tasks); + } } -int32_t -wb_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, - dict_t *xdata) +void +wb_do_winds (wb_inode_t *wb_inode, list_head_t *tasks) { - wb_inode_t *wb_inode = NULL; - wb_local_t *local = NULL; - call_stub_t *stub = NULL; - wb_request_t *request = NULL; - int32_t ret = -1; - int op_errno = EINVAL; - - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO (frame->this->name, this, unwind); - GF_VALIDATE_OR_GOTO (frame->this->name, fd, unwind); - - wb_inode = wb_inode_ctx_get (this, fd->inode); - if ((!IA_ISDIR (fd->inode->ia_type)) && (wb_inode == NULL)) { - gf_log (this->name, GF_LOG_WARNING, - "wb_inode not found for fd %p", fd); - op_errno = EBADFD; - goto unwind; - } - - local = mem_get0 (this->local_pool); - if (local == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - local->wb_inode = wb_inode; - - frame->local = local; - - if (wb_inode) { - stub = fop_ftruncate_stub (frame, wb_ftruncate_helper, fd, - offset, xdata); - if (stub == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - request = wb_enqueue (wb_inode, stub); - if (request == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } else { - STACK_WIND (frame, wb_ftruncate_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); - } - - return 0; + wb_request_t *req = NULL; + wb_request_t *tmp = NULL; -unwind: - STACK_UNWIND_STRICT (ftruncate, frame, -1, op_errno, NULL, NULL, NULL); + list_for_each_entry_safe (req, tmp, tasks, winds) { + list_del_init (&req->winds); - if (stub) { - call_stub_destroy (stub); - } + call_resume (req->stub); - return 0; + wb_request_unref (req); + } } -int32_t -wb_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *statpre, - struct iatt *statpost, dict_t *xdata) +void +wb_process_queue (wb_inode_t *wb_inode) { - wb_local_t *local = NULL; - wb_request_t *request = NULL; - call_frame_t *process_frame = NULL; - wb_inode_t *wb_inode = NULL; - int32_t ret = -1; + list_head_t tasks = {0, }; + list_head_t lies = {0, }; + list_head_t liabilities = {0, }; - GF_ASSERT (frame); + INIT_LIST_HEAD (&tasks); + INIT_LIST_HEAD (&lies); + INIT_LIST_HEAD (&liabilities); - local = frame->local; - wb_inode = local->wb_inode; - request = local->request; + LOCK (&wb_inode->lock); + { + __wb_preprocess_winds (wb_inode); - if (request) { - process_frame = copy_frame (frame); - if (process_frame == NULL) { - op_ret = -1; - op_errno = ENOMEM; - } - } + __wb_pick_winds (wb_inode, &tasks, &liabilities); - STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, statpre, - statpost, xdata); + __wb_pick_unwinds (wb_inode, &lies); - if (request) { - wb_request_unref (request); } + UNLOCK (&wb_inode->lock); - if (request && (process_frame != NULL)) { - ret = wb_process_queue (process_frame, wb_inode); - if (ret == -1) { - if ((errno == ENOMEM) && (wb_inode != NULL)) { - LOCK (&wb_inode->lock); - { - wb_inode->op_ret = -1; - wb_inode->op_errno = ENOMEM; - } - UNLOCK (&wb_inode->lock); - } - - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } + wb_do_unwinds (wb_inode, &lies); - STACK_DESTROY (process_frame->root); - } + wb_do_winds (wb_inode, &tasks); - return 0; + wb_fulfill (wb_inode, &liabilities); + + return; } -static int32_t -wb_setattr_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, - struct iatt *stbuf, int32_t valid, dict_t *xdata) +int +wb_writev_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, + struct iovec *vector, int32_t count, off_t offset, + uint32_t flags, struct iobref *iobref, dict_t *xdata) { - GF_ASSERT (frame); - GF_ASSERT (this); - - STACK_WIND (frame, wb_setattr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->setattr, loc, stbuf, valid, xdata); - return 0; + STACK_WIND (frame, default_writev_cbk, + FIRST_CHILD (this), FIRST_CHILD (this)->fops->writev, + fd, vector, count, offset, flags, iobref, xdata); + return 0; } -int32_t -wb_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, - struct iatt *stbuf, int32_t valid, dict_t *xdata) +int +wb_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, + int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, + dict_t *xdata) { - wb_inode_t *wb_inode = NULL; - wb_local_t *local = NULL; - call_stub_t *stub = NULL; - wb_request_t *request = NULL; - int32_t ret = -1, op_errno = EINVAL; - - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO (frame->this->name, this, unwind); - GF_VALIDATE_OR_GOTO (frame->this->name, loc, unwind); + wb_inode_t *wb_inode = NULL; + wb_conf_t *conf = NULL; + gf_boolean_t wb_disabled = 0; + call_stub_t *stub = NULL; + int ret = -1; + int op_errno = EINVAL; + int o_direct = O_DIRECT; + + conf = this->private; + wb_inode = wb_inode_create (this, fd->inode); + if (!wb_inode) { + op_errno = ENOMEM; + goto unwind; + } + + if (!conf->strict_O_DIRECT) + o_direct = 0; + + if (fd->flags & (O_SYNC|O_DSYNC|o_direct)) + wb_disabled = 1; + + if (flags & (O_SYNC|O_DSYNC|O_DIRECT)) + /* O_DIRECT flag in params of writev must _always_ be honored */ + wb_disabled = 1; + + op_errno = 0; + LOCK (&wb_inode->lock); + { + /* pick up a previous error in fulfillment */ + if (wb_inode->op_ret < 0) + op_errno = wb_inode->op_errno; + + wb_inode->op_ret = 0; + } + UNLOCK (&wb_inode->lock); + + if (op_errno) + goto unwind; - local = mem_get0 (this->local_pool); - if (local == NULL) { + if (wb_disabled) + stub = fop_writev_stub (frame, wb_writev_helper, fd, vector, + count, offset, flags, iobref, xdata); + else + stub = fop_writev_stub (frame, NULL, fd, vector, count, offset, + flags, iobref, xdata); + if (!stub) { op_errno = ENOMEM; goto unwind; } - frame->local = local; - - if (loc->inode) { - wb_inode = wb_inode_ctx_get (this, loc->inode); - } + if (wb_disabled) + ret = wb_enqueue (wb_inode, stub); + else + ret = wb_enqueue_tempted (wb_inode, stub); - local->wb_inode = wb_inode; + if (!ret) { + op_errno = ENOMEM; + goto unwind; + } - if (wb_inode) { - stub = fop_setattr_stub (frame, wb_setattr_helper, loc, stbuf, - valid, xdata); - if (stub == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - request = wb_enqueue (wb_inode, stub); - if (request == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } else { - STACK_WIND (frame, wb_setattr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->setattr, loc, stbuf, - valid, xdata); - } + wb_process_queue (wb_inode); return 0; + unwind: - STACK_UNWIND_STRICT (setattr, frame, -1, op_errno, NULL, NULL, NULL); + STACK_UNWIND_STRICT (writev, frame, -1, op_errno, NULL, NULL, NULL); - if (stub) { + if (stub) call_stub_destroy (stub); - } return 0; } -void -wb_disable_all (xlator_t *this, fd_t *origfd) -{ - inode_t *inode = NULL; - fd_t *otherfd = NULL; - wb_file_t *wb_file = NULL; - - inode = origfd->inode; - - LOCK(&inode->lock); - { - list_for_each_entry (otherfd, &inode->fd_list, inode_list) { - if (otherfd == origfd) { - continue; - } - - wb_file = wb_fd_ctx_get (this, otherfd); - if (wb_file == NULL) { - continue; - } - - gf_log(this->name,GF_LOG_DEBUG, - "disabling wb on %p because %p is O_SYNC", - otherfd, origfd); - wb_file->disabled = 1; - } - } - UNLOCK(&inode->lock); -} -int32_t -wb_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, - int32_t op_errno, fd_t *fd, dict_t *xdata) +int +wb_readv_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t offset, uint32_t flags, dict_t *xdata) { - int32_t flags = 0; - wb_file_t *file = NULL; - wb_local_t *local = NULL; - - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO_WITH_ERROR (frame->this->name, this, out, op_errno, - EINVAL); - local = frame->local; - GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, local, out, op_errno, - EINVAL); - - flags = local->flags; - - if (op_ret != -1) { - file = wb_file_create (this, fd, flags); - if (file == NULL) { - op_ret = -1; - op_errno = ENOMEM; - goto out; - } - } - -out: - frame->local = NULL; - if (local != NULL) { - mem_put (local); - } - - STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, fd, xdata); + STACK_WIND (frame, default_readv_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, + xdata); return 0; } -int32_t -wb_open (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, - fd_t *fd, dict_t *xdata) +int +wb_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t offset, uint32_t flags, dict_t *xdata) { - wb_local_t *local = NULL; - int32_t op_errno = EINVAL; + wb_inode_t *wb_inode = NULL; + call_stub_t *stub = NULL; - local = mem_get0 (this->local_pool); - if (local == NULL) { - op_errno = ENOMEM; - goto unwind; - } + wb_inode = wb_inode_ctx_get (this, fd->inode); + if (!wb_inode) + goto noqueue; - local->flags = flags; + stub = fop_readv_stub (frame, wb_readv_helper, fd, size, + offset, flags, xdata); + if (!stub) + goto unwind; - frame->local = local; + if (!wb_enqueue (wb_inode, stub)) + goto unwind; + + wb_process_queue (wb_inode); - STACK_WIND (frame, wb_open_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->open, loc, flags, fd, xdata); return 0; unwind: - STACK_UNWIND_STRICT (open, frame, -1, op_errno, NULL, NULL); + STACK_UNWIND_STRICT (readv, frame, -1, ENOMEM, NULL, 0, NULL, NULL, + NULL); return 0; -} - - -int32_t -wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode, - struct iatt *buf, struct iatt *preparent, - struct iatt *postparent, dict_t *xdata) -{ - long flags = 0; - wb_inode_t *wb_inode = NULL; - wb_file_t *file = NULL; - wb_local_t *local = NULL; - - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO_WITH_ERROR (frame->this->name, this, out, - op_errno, EINVAL); - - if (op_ret != -1) { - if (frame->local) { - flags = (long) frame->local; - } - - file = wb_file_create (this, fd, flags); - if (file == NULL) { - op_ret = -1; - op_errno = ENOMEM; - goto out; - } - - LOCK (&inode->lock); - { - wb_inode = __wb_inode_create (this, inode); - if (wb_inode == NULL) { - op_ret = -1; - op_errno = ENOMEM; - } - } - UNLOCK (&inode->lock); - } - - frame->local = NULL; - -out: - STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, buf, - preparent, postparent, xdata); - - if (local != NULL) { - mem_put (local); - } +noqueue: + STACK_WIND (frame, default_readv_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, + xdata); return 0; } -int32_t -wb_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, - mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) +int +wb_flush_bg_cbk (call_frame_t *frame, void *cookie, xlator_t *this, + int32_t op_ret, int32_t op_errno, dict_t *xdata) { - int32_t op_errno = EINVAL; - wb_local_t *local = NULL; - - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO (frame->this->name, this, unwind); - GF_VALIDATE_OR_GOTO (frame->this->name, fd, unwind); - GF_VALIDATE_OR_GOTO (frame->this->name, loc, unwind); - - local = mem_get0 (this->local_pool); - if (local == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - local->flags = flags; - - frame->local = local; - - STACK_WIND (frame, wb_create_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->create, - loc, flags, mode, umask, fd, xdata); - return 0; - -unwind: - STACK_UNWIND_STRICT (create, frame, -1, op_errno, NULL, NULL, NULL, - NULL, NULL, NULL); + STACK_DESTROY (frame->root); return 0; } -/* Mark all the contiguous write requests for winding starting from head of - * request list. Stops marking at the first non-write request found. If - * file is opened with O_APPEND, make sure all the writes marked for winding - * will fit into a single write call to server. - */ -size_t -__wb_mark_wind_all (wb_inode_t *wb_inode, list_head_t *list, list_head_t *winds) +int +wb_flush_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { - wb_request_t *request = NULL, *prev_request = NULL; - wb_file_t *wb_file = NULL, *prev_wb_file = NULL; - wb_file_t *last_wb_file = NULL; - size_t size = 0; - char first_request = 1, overlap = 0; - wb_conf_t *conf = NULL; - int count = 0; - enum _gf_boolean dont_wind_set = 0; - - GF_VALIDATE_OR_GOTO ("write-behind", wb_inode, out); - GF_VALIDATE_OR_GOTO (wb_inode->this->name, list, out); - GF_VALIDATE_OR_GOTO (wb_inode->this->name, winds, out); - - conf = wb_inode->this->private; - - list_for_each_entry (request, list, list) - { - if ((request->stub == NULL) - || (request->stub->fop != GF_FOP_WRITE)) { - break; - } - - wb_file = wb_fd_ctx_get (wb_inode->this, - request->stub->args.writev.fd); - if (wb_file == NULL) { - gf_log (wb_inode->this->name, GF_LOG_WARNING, - "write behind wb_file pointer is" - " not stored in context of fd(%p)", - request->stub->args.writev.fd); - goto out; - } - - /* If write requests from two fds are interleaved, for - * each of them, we can only send first set of adjacent - * requests that are on same fd. This is because, fds - * with O_APPEND cannot have more than one write fop in - * progress while syncing, so that order is not messed - * up. Since we group adjacent requests with same fd into - * single write call whenever possible, we need the above said - * measure. - */ - if ((prev_wb_file != NULL) && (prev_wb_file->flags & O_APPEND) - && (prev_request->stub->args.writev.fd - != request->stub->args.writev.fd) - && (!prev_wb_file->dont_wind)) { - prev_wb_file->dont_wind = 1; - dont_wind_set = 1; - last_wb_file = prev_wb_file; - } - - prev_request = request; - prev_wb_file = wb_file; - - if (!request->flags.write_request.stack_wound) { - if (first_request) { - first_request = 0; - } else { - overlap = wb_overlap (list, request); - if (overlap) { - continue; - } - } - - if ((wb_file->flags & O_APPEND) - && (((size + request->write_size) - > conf->aggregate_size) - || ((count + request->stub->args.writev.count) - > MAX_VECTOR_COUNT) - || (wb_file->dont_wind))) { - continue; - } - - size += request->write_size; - - wb_inode->aggregate_current -= request->write_size; - - count += request->stub->args.writev.count; - - request->flags.write_request.stack_wound = 1; - list_add_tail (&request->winds, winds); - } - } - -out: - if (wb_inode != NULL) { - wb_inode->aggregate_current -= size; - } - - if (dont_wind_set && (list != NULL)) { - list_for_each_entry (request, list, list) { - wb_file = wb_fd_ctx_get (wb_inode->this, - request->stub->args.writev.fd); - if (wb_file != NULL) { - wb_file->dont_wind = 0; - } - - if (wb_file == last_wb_file) { - break; - } - } - } - - return size; -} - + wb_conf_t *conf = NULL; + wb_inode_t *wb_inode = NULL; + call_frame_t *bg_frame = NULL; + int op_errno = 0; + int op_ret = 0; -int32_t -__wb_can_wind (list_head_t *list, char *other_fop_in_queue, - char *overlapping_writes, char *incomplete_writes, - char *wind_all) -{ - wb_request_t *request = NULL; - char first_request = 1; - int32_t ret = -1; - char overlap = 0; + conf = this->private; - GF_VALIDATE_OR_GOTO ("write-behind", list, out); + wb_inode = wb_inode_ctx_get (this, fd->inode); + if (!wb_inode) { + op_ret = -1; + op_errno = EINVAL; + goto unwind; + } - list_for_each_entry (request, list, list) + LOCK (&wb_inode->lock); { - if ((request->stub == NULL) - || (request->stub->fop != GF_FOP_WRITE)) { - if (request->stub && other_fop_in_queue) { - *other_fop_in_queue = 1; - } - break; - } - - if (request->flags.write_request.stack_wound - && !request->flags.write_request.got_reply - && (incomplete_writes != NULL)) { - *incomplete_writes = 1; - break; - } + if (wb_inode->op_ret < 0) { + op_ret = -1; + op_errno = wb_inode->op_errno; + } - if (!request->flags.write_request.stack_wound) { - if (first_request) { - char flush = 0; - first_request = 0; - - flush = request->flags.write_request.flush_all; - if (wind_all != NULL) { - *wind_all = flush; - } - } - - overlap = wb_overlap (list, request); - if (overlap) { - if (overlapping_writes != NULL) { - *overlapping_writes = 1; - } - - break; - } - } + wb_inode->op_ret = 0; } + UNLOCK (&wb_inode->lock); - ret = 0; -out: - return ret; -} + if (op_errno) + goto unwind; + if (conf->flush_behind) + goto flushbehind; -ssize_t -__wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf, - char enable_trickling_writes) -{ - size_t size = 0; - char other_fop_in_queue = 0; - char incomplete_writes = 0; - char overlapping_writes = 0; - wb_request_t *request = NULL; - wb_inode_t *wb_inode = NULL; - char wind_all = 0; - int32_t ret = 0; - - GF_VALIDATE_OR_GOTO ("write-behind", list, out); - GF_VALIDATE_OR_GOTO ("write-behind", winds, out); - - if (list_empty (list)) { - goto out; - } + STACK_WIND (frame, default_flush_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->flush, fd, xdata); + return 0; - request = list_entry (list->next, typeof (*request), list); - wb_inode = request->wb_inode; +flushbehind: + bg_frame = copy_frame (frame); + if (!bg_frame) { + op_ret = -1; + op_errno = ENOMEM; + goto unwind; + } - ret = __wb_can_wind (list, &other_fop_in_queue, - &overlapping_writes, &incomplete_writes, - &wind_all); - if (ret == -1) { - gf_log (wb_inode->this->name, GF_LOG_WARNING, - "cannot decide whether to wind or not"); - goto out; - } - - if (!incomplete_writes && ((enable_trickling_writes) - || (wind_all) || (overlapping_writes) - || (other_fop_in_queue) - || (wb_inode->aggregate_current - >= aggregate_conf))) { - size = __wb_mark_wind_all (wb_inode, list, winds); - } + STACK_WIND (bg_frame, wb_flush_bg_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->flush, fd, xdata); + /* fall through */ +unwind: + STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno, NULL); -out: - return size; + return 0; } -size_t -__wb_mark_unwind_till (list_head_t *list, list_head_t *unwinds, size_t size) +int +wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { - size_t written_behind = 0; - wb_request_t *request = NULL; - wb_inode_t *wb_inode = NULL; - - if (list_empty (list)) { - goto out; - } - - request = list_entry (list->next, typeof (*request), list); - wb_inode = request->wb_inode; - - list_for_each_entry (request, list, list) - { - if ((request->stub == NULL) - || (request->stub->fop != GF_FOP_WRITE)) { - continue; - } - - if (written_behind <= size) { - if (!request->flags.write_request.write_behind) { - written_behind += request->write_size; - request->flags.write_request.write_behind = 1; - list_add_tail (&request->unwinds, unwinds); - - if (!request->flags.write_request.got_reply) { - wb_inode->window_current - += request->write_size; - } - } - } else { - break; - } - } + wb_inode_t *wb_inode = NULL; + call_stub_t *stub = NULL; -out: - return written_behind; -} + wb_inode = wb_inode_ctx_get (this, fd->inode); + if (!wb_inode) + goto noqueue; + stub = fop_flush_stub (frame, wb_flush_helper, fd, xdata); + if (!stub) + goto unwind; -void -__wb_mark_unwinds (list_head_t *list, list_head_t *unwinds) -{ - wb_request_t *request = NULL; - wb_inode_t *wb_inode = NULL; + if (!wb_enqueue (wb_inode, stub)) + goto unwind; - GF_VALIDATE_OR_GOTO ("write-behind", list, out); - GF_VALIDATE_OR_GOTO ("write-behind", unwinds, out); + wb_process_queue (wb_inode); - if (list_empty (list)) { - goto out; - } + return 0; - request = list_entry (list->next, typeof (*request), list); - wb_inode = request->wb_inode; +unwind: + STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM, NULL); - if (wb_inode->window_current <= wb_inode->window_conf) { - __wb_mark_unwind_till (list, unwinds, - wb_inode->window_conf - - wb_inode->window_current); - } + return 0; -out: - return; +noqueue: + STACK_WIND (frame, default_flush_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->flush, fd, xdata); + return 0; } -uint32_t -__wb_get_other_requests (list_head_t *list, list_head_t *other_requests) -{ - wb_request_t *request = NULL; - uint32_t count = 0; - - GF_VALIDATE_OR_GOTO ("write-behind", list, out); - GF_VALIDATE_OR_GOTO ("write-behind", other_requests, out); - - list_for_each_entry (request, list, list) { - if ((request->stub == NULL) - || (request->stub->fop == GF_FOP_WRITE)) { - break; - } - - if (!request->flags.other_requests.marked_for_resume) { - request->flags.other_requests.marked_for_resume = 1; - list_add_tail (&request->other_requests, - other_requests); - count++; - } - } -out: - return count; -} - - -int32_t -wb_stack_unwind (list_head_t *unwinds) +int +wb_fsync_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, + int32_t datasync, dict_t *xdata) { - struct iatt buf = {0,}; - wb_request_t *request = NULL, *dummy = NULL; - call_frame_t *frame = NULL; - wb_local_t *local = NULL; - int ret = 0, write_requests_removed = 0; - - GF_VALIDATE_OR_GOTO ("write-behind", unwinds, out); - - list_for_each_entry_safe (request, dummy, unwinds, unwinds) { - frame = request->stub->frame; - local = frame->local; - - STACK_UNWIND (frame, local->op_ret, local->op_errno, - &buf, &buf, NULL, NULL); - - ret = wb_request_unref (request); - if (ret == 0) { - write_requests_removed++; - } - } - -out: - return write_requests_removed; + STACK_WIND (frame, default_fsync_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsync, fd, datasync, xdata); + return 0; } -int32_t -wb_resume_other_requests (call_frame_t *frame, wb_inode_t *wb_inode, - list_head_t *other_requests) +int +wb_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync, + dict_t *xdata) { - int32_t ret = -1; - wb_request_t *request = NULL, *dummy = NULL; - int32_t fops_removed = 0; - char wind = 0; + wb_inode_t *wb_inode = NULL; call_stub_t *stub = NULL; - GF_VALIDATE_OR_GOTO ((wb_inode ? wb_inode->this->name : "write-behind"), - frame, out); - GF_VALIDATE_OR_GOTO (frame->this->name, wb_inode, out); - GF_VALIDATE_OR_GOTO (frame->this->name, other_requests, out); - - if (list_empty (other_requests)) { - ret = 0; - goto out; - } - - list_for_each_entry_safe (request, dummy, other_requests, - other_requests) { - wind = request->stub->wind; - stub = request->stub; - - LOCK (&wb_inode->lock); - { - request->stub = NULL; - } - UNLOCK (&wb_inode->lock); - - if (!wind) { - wb_request_unref (request); - fops_removed++; - } - - call_resume (stub); - } - - ret = 0; - - if (fops_removed > 0) { - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - gf_log (frame->this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } - -out: - return ret; -} - - -int32_t -wb_do_ops (call_frame_t *frame, wb_inode_t *wb_inode, list_head_t *winds, - list_head_t *unwinds, list_head_t *other_requests) -{ - int32_t ret = -1, write_requests_removed = 0; + wb_inode = wb_inode_ctx_get (this, fd->inode); + if (!wb_inode) + goto noqueue; - GF_VALIDATE_OR_GOTO ((wb_inode ? wb_inode->this->name : "write-behind"), - frame, out); - GF_VALIDATE_OR_GOTO (frame->this->name, wb_inode, out); + stub = fop_fsync_stub (frame, wb_fsync_helper, fd, datasync, xdata); + if (!stub) + goto unwind; - ret = wb_stack_unwind (unwinds); + if (!wb_enqueue (wb_inode, stub)) + goto unwind; - write_requests_removed = ret; + wb_process_queue (wb_inode); - ret = wb_sync (frame, wb_inode, winds); - if (ret == -1) { - gf_log (frame->this->name, GF_LOG_WARNING, - "syncing of write requests failed"); - } + return 0; - ret = wb_resume_other_requests (frame, wb_inode, other_requests); - if (ret == -1) { - gf_log (frame->this->name, GF_LOG_WARNING, - "cannot resume non-write requests in request queue"); - } +unwind: + STACK_UNWIND_STRICT (fsync, frame, -1, ENOMEM, NULL, NULL, NULL); - /* wb_stack_unwind does wb_request_unref after unwinding a write - * request. Hence if a write-request was just freed in wb_stack_unwind, - * we have to process request queue once again to unblock requests - * blocked on the writes just unwound. - */ - if (write_requests_removed > 0) { - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - gf_log (frame->this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } + return 0; -out: - return ret; +noqueue: + STACK_WIND (frame, default_fsync_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsync, fd, datasync, xdata); + return 0; } -inline int -__wb_copy_into_holder (wb_request_t *holder, wb_request_t *request) +int +wb_stat_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { - char *ptr = NULL; - struct iobuf *iobuf = NULL; - struct iobref *iobref = NULL; - int ret = -1; - - if (holder->flags.write_request.virgin) { - /* TODO: check the required size */ - iobuf = iobuf_get (request->wb_inode->this->ctx->iobuf_pool); - if (iobuf == NULL) { - goto out; - } - - iobref = iobref_new (); - if (iobref == NULL) { - iobuf_unref (iobuf); - goto out; - } - - ret = iobref_add (iobref, iobuf); - if (ret != 0) { - iobuf_unref (iobuf); - iobref_unref (iobref); - gf_log (request->wb_inode->this->name, GF_LOG_WARNING, - "cannot add iobuf (%p) into iobref (%p)", - iobuf, iobref); - goto out; - } - - iov_unload (iobuf->ptr, holder->stub->args.writev.vector, - holder->stub->args.writev.count); - holder->stub->args.writev.vector[0].iov_base = iobuf->ptr; - - iobref_unref (holder->stub->args.writev.iobref); - holder->stub->args.writev.iobref = iobref; - - iobuf_unref (iobuf); - - holder->flags.write_request.virgin = 0; - } - - ptr = holder->stub->args.writev.vector[0].iov_base + holder->write_size; - - iov_unload (ptr, request->stub->args.writev.vector, - request->stub->args.writev.count); - - holder->stub->args.writev.vector[0].iov_len += request->write_size; - holder->write_size += request->write_size; - - request->flags.write_request.stack_wound = 1; - list_move_tail (&request->list, &request->wb_inode->passive_requests); - - ret = 0; -out: - return ret; + STACK_WIND (frame, default_stat_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->stat, loc, xdata); + return 0; } -/* this procedure assumes that write requests have only one vector to write */ -void -__wb_collapse_write_bufs (list_head_t *requests, size_t page_size) +int +wb_stat (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { - off_t offset_expected = 0; - size_t space_left = 0; - wb_request_t *request = NULL, *tmp = NULL, *holder = NULL; - int ret = 0; - - GF_VALIDATE_OR_GOTO ("write-behind", requests, out); - - list_for_each_entry_safe (request, tmp, requests, list) { - if ((request->stub == NULL) - || (request->stub->fop != GF_FOP_WRITE) - || (request->flags.write_request.stack_wound)) { - holder = NULL; - continue; - } - - if (request->flags.write_request.write_behind) { - if (holder == NULL) { - holder = request; - continue; - } - - offset_expected = holder->stub->args.writev.off - + holder->write_size; - - if ((request->stub->args.writev.off != offset_expected) - || (!is_same_lkowner (&request->lk_owner, - &holder->lk_owner)) - || (holder->stub->args.writev.fd - != request->stub->args.writev.fd)) { - holder = request; - continue; - } - - space_left = page_size - holder->write_size; - - if (space_left >= request->write_size) { - ret = __wb_copy_into_holder (holder, request); - if (ret != 0) { - break; - } - - __wb_request_unref (request); - } else { - holder = request; - } - } else { - break; - } - } - -out: - return; -} + wb_inode_t *wb_inode = NULL; + call_stub_t *stub = NULL; -int32_t -wb_process_queue (call_frame_t *frame, wb_inode_t *wb_inode) -{ - list_head_t winds = {0, }, unwinds = {0, }, other_requests = {0, }; - size_t size = 0; - wb_conf_t *conf = NULL; - uint32_t count = 0; - int32_t ret = -1; + wb_inode = wb_inode_ctx_get (this, loc->inode); + if (!wb_inode) + goto noqueue; - INIT_LIST_HEAD (&winds); - INIT_LIST_HEAD (&unwinds); - INIT_LIST_HEAD (&other_requests); + stub = fop_stat_stub (frame, wb_stat_helper, loc, xdata); + if (!stub) + goto unwind; - GF_VALIDATE_OR_GOTO ((wb_inode ? wb_inode->this->name : "write-behind"), - frame, out); - GF_VALIDATE_OR_GOTO (wb_inode->this->name, frame, out); + if (!wb_enqueue (wb_inode, stub)) + goto unwind; - conf = wb_inode->this->private; - GF_VALIDATE_OR_GOTO (wb_inode->this->name, conf, out); + wb_process_queue (wb_inode); - size = conf->aggregate_size; - LOCK (&wb_inode->lock); - { - /* - * make sure requests are marked for unwinding and adjacent - * contiguous write buffers (each of size less than that of - * an iobuf) are packed properly so that iobufs are filled to - * their maximum capacity, before calling __wb_mark_winds. - */ - __wb_mark_unwinds (&wb_inode->request, &unwinds); - - __wb_collapse_write_bufs (&wb_inode->request, - wb_inode->this->ctx->page_size); - - count = __wb_get_other_requests (&wb_inode->request, - &other_requests); - - if (count == 0) { - __wb_mark_winds (&wb_inode->request, &winds, size, - conf->enable_trickling_writes); - } + return 0; - } - UNLOCK (&wb_inode->lock); +unwind: + STACK_UNWIND_STRICT (stat, frame, -1, ENOMEM, NULL, NULL); - ret = wb_do_ops (frame, wb_inode, &winds, &unwinds, &other_requests); + if (stub) + call_stub_destroy (stub); + return 0; -out: - return ret; +noqueue: + STACK_WIND (frame, default_stat_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->stat, loc, xdata); + return 0; } -int32_t -wb_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *prebuf, - struct iatt *postbuf, dict_t *xdata) +int +wb_fstat_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { - GF_ASSERT (frame); - - STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, prebuf, postbuf, - xdata); + STACK_WIND (frame, default_fstat_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fstat, fd, xdata); return 0; } -int32_t -wb_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, - int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, - dict_t *xdata) +int +wb_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) { - wb_inode_t *wb_inode = NULL; - wb_file_t *wb_file = NULL; - char wb_disabled = 0; - call_frame_t *process_frame = NULL; - call_stub_t *stub = NULL; - wb_local_t *local = NULL; - wb_request_t *request = NULL; - int32_t ret = -1; - size_t size = 0; - int32_t op_ret = -1, op_errno = EINVAL; - - GF_ASSERT (frame); - - GF_VALIDATE_OR_GOTO_WITH_ERROR ("write-behind", this, unwind, op_errno, - EINVAL); - GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, fd, unwind, op_errno, - EINVAL); + wb_inode_t *wb_inode = NULL; + call_stub_t *stub = NULL; - if (vector != NULL) { - size = iov_length (vector, count); - } wb_inode = wb_inode_ctx_get (this, fd->inode); - if ((!IA_ISDIR (fd->inode->ia_type)) && (wb_inode == NULL)) { - gf_log (this->name, GF_LOG_WARNING, - "write behind wb_inode pointer is" - " not stored in context of inode(%p), returning EBADFD", - fd->inode); - op_errno = EBADFD; - goto unwind; - } - - if (wb_file != NULL) { - if (wb_file->disabled || wb_file->disable_till) { - if (size > wb_file->disable_till) { - wb_file->disable_till = 0; - } else { - wb_file->disable_till -= size; - } - wb_disabled = 1; - } - } else { - wb_disabled = 1; - } - - if (wb_inode != NULL) { - LOCK (&wb_inode->lock); - { - op_ret = wb_inode->op_ret; - op_errno = wb_inode->op_errno; - } - UNLOCK (&wb_inode->lock); - } - - if (op_ret == -1) { - goto unwind; - } - - if (wb_disabled) { - STACK_WIND (frame, wb_writev_cbk, FIRST_CHILD (frame->this), - FIRST_CHILD (frame->this)->fops->writev, - fd, vector, count, offset, flags, iobref, xdata); - return 0; - } - - process_frame = copy_frame (frame); - if (process_frame == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - local = mem_get0 (this->local_pool); - if (local == NULL) { - op_errno = ENOMEM; - goto unwind; - } + if (!wb_inode) + goto noqueue; - frame->local = local; - local->wb_inode = wb_inode; + stub = fop_fstat_stub (frame, wb_fstat_helper, fd, xdata); + if (!stub) + goto unwind; - stub = fop_writev_stub (frame, NULL, fd, vector, count, offset, flags, - iobref, xdata); - if (stub == NULL) { - op_errno = ENOMEM; - goto unwind; - } + if (!wb_enqueue (wb_inode, stub)) + goto unwind; - request = wb_enqueue (wb_inode, stub); - if (request == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - ret = wb_process_queue (process_frame, wb_inode); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - - STACK_DESTROY (process_frame->root); + wb_process_queue (wb_inode); return 0; unwind: - local = frame->local; - frame->local = NULL; - mem_put (local); - - STACK_UNWIND_STRICT (writev, frame, -1, op_errno, NULL, NULL, NULL); + STACK_UNWIND_STRICT (fstat, frame, -1, ENOMEM, NULL, NULL); - if (process_frame) { - STACK_DESTROY (process_frame->root); - } - - if (stub) { + if (stub) call_stub_destroy (stub); - } - return 0; -} - - -int32_t -wb_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, - int32_t op_errno, struct iovec *vector, int32_t count, - struct iatt *stbuf, struct iobref *iobref, dict_t *xdata) -{ - wb_local_t *local = NULL; - wb_inode_t *wb_inode = NULL; - wb_request_t *request = NULL; - int32_t ret = 0; - - GF_ASSERT (frame); - - local = frame->local; - wb_inode = local->wb_inode; - request = local->request; - - if ((request != NULL) && (wb_inode != NULL)) { - wb_request_unref (request); - - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - if (errno == ENOMEM) { - op_ret = -1; - op_errno = ENOMEM; - } - - gf_log (frame->this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } - - STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno, vector, count, - stbuf, iobref, xdata); +noqueue: + STACK_WIND (frame, default_fstat_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fstat, fd, xdata); return 0; } -static int32_t -wb_readv_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, - off_t offset, uint32_t flags, dict_t *xdata) +int +wb_truncate_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, + off_t offset, dict_t *xdata) { - STACK_WIND (frame, wb_readv_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, - xdata); - + STACK_WIND (frame, default_truncate_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->truncate, loc, offset, xdata); return 0; } -int32_t -wb_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, - off_t offset, uint32_t flags, dict_t *xdata) +int +wb_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, + dict_t *xdata) { wb_inode_t *wb_inode = NULL; - wb_local_t *local = NULL; - call_stub_t *stub = NULL; - int32_t ret = -1, op_errno = 0; - wb_request_t *request = NULL; - - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO_WITH_ERROR (frame->this->name, this, unwind, - op_errno, EINVAL); - GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, fd, unwind, op_errno, - EINVAL); - - wb_inode = wb_inode_ctx_get (this, fd->inode); - if ((!IA_ISDIR (fd->inode->ia_type)) && (wb_inode == NULL)) { - gf_log (this->name, GF_LOG_WARNING, - "write behind wb_inode pointer is" - " not stored in context of inode(%p), returning " - "EBADFD", fd->inode); - op_errno = EBADFD; - goto unwind; - } - - local = mem_get0 (this->local_pool); - if (local == NULL) { - op_errno = ENOMEM; - goto unwind; - } + call_stub_t *stub = NULL; - local->wb_inode = wb_inode; + wb_inode = wb_inode_create (this, loc->inode); + if (!wb_inode) + goto unwind; - frame->local = local; - if (wb_inode) { - stub = fop_readv_stub (frame, wb_readv_helper, fd, size, - offset, flags, xdata); - if (stub == NULL) { - op_errno = ENOMEM; - goto unwind; - } + stub = fop_truncate_stub (frame, wb_truncate_helper, loc, + offset, xdata); + if (!stub) + goto unwind; - request = wb_enqueue (wb_inode, stub); - if (request == NULL) { - call_stub_destroy (stub); - op_errno = ENOMEM; - goto unwind; - } + if (!wb_enqueue (wb_inode, stub)) + goto unwind; - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } else { - STACK_WIND (frame, wb_readv_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->readv, - fd, size, offset, flags, xdata); - } + wb_process_queue (wb_inode); return 0; unwind: - STACK_UNWIND_STRICT (readv, frame, -1, op_errno, NULL, 0, NULL, NULL, - NULL); - return 0; -} - - -int32_t -wb_ffr_bg_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, dict_t *xdata) -{ - STACK_DESTROY (frame->root); - return 0; -} - + STACK_UNWIND_STRICT (truncate, frame, -1, ENOMEM, NULL, NULL, NULL); -int32_t -wb_ffr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, - int32_t op_errno, dict_t *xdata) -{ - wb_local_t *local = NULL; - wb_inode_t *wb_inode = NULL; - - GF_ASSERT (frame); - - local = frame->local; - wb_inode = local->wb_inode; - - if (wb_inode != NULL) { - LOCK (&wb_inode->lock); - { - if (wb_inode->op_ret == -1) { - op_ret = wb_inode->op_ret; - op_errno = wb_inode->op_errno; - - wb_inode->op_ret = 0; - } - } - UNLOCK (&wb_inode->lock); - } - - STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno, xdata); + if (stub) + call_stub_destroy (stub); return 0; } -int32_t -wb_flush_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) +int +wb_ftruncate_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, + off_t offset, dict_t *xdata) { - wb_conf_t *conf = NULL; - wb_local_t *local = NULL; - wb_inode_t *wb_inode = NULL; - call_frame_t *flush_frame = NULL, *process_frame = NULL; - int32_t op_ret = -1, op_errno = -1, ret = -1; - - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO_WITH_ERROR (frame->this->name, this, unwind, - op_errno, EINVAL); - - conf = this->private; - - local = frame->local; - wb_inode = local->wb_inode; - - LOCK (&wb_inode->lock); - { - op_ret = wb_inode->op_ret; - op_errno = wb_inode->op_errno; - } - UNLOCK (&wb_inode->lock); - - if (local && local->request) { - process_frame = copy_frame (frame); - if (process_frame == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - wb_request_unref (local->request); - } - - if (conf->flush_behind) { - flush_frame = copy_frame (frame); - if (flush_frame == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - STACK_WIND (flush_frame, wb_ffr_bg_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->flush, fd, xdata); - } else { - STACK_WIND (frame, wb_ffr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->flush, fd, xdata); - } - - if (process_frame != NULL) { - ret = wb_process_queue (process_frame, wb_inode); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - - STACK_DESTROY (process_frame->root); - } - - if (conf->flush_behind) { - STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno, NULL); - } - - return 0; - -unwind: - STACK_UNWIND_STRICT (flush, frame, -1, op_errno, NULL); + STACK_WIND (frame, default_ftruncate_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata); return 0; } -int32_t -wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) +int +wb_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + dict_t *xdata) { - wb_conf_t *conf = NULL; wb_inode_t *wb_inode = NULL; - wb_local_t *local = NULL; call_stub_t *stub = NULL; - call_frame_t *flush_frame = NULL; - wb_request_t *request = NULL; - int32_t ret = 0, op_errno = 0; - - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO_WITH_ERROR (frame->this->name, this, unwind, - op_errno, EINVAL); - GF_VALIDATE_OR_GOTO_WITH_ERROR (this->name, fd, unwind, op_errno, - EINVAL); - - conf = this->private; - - wb_inode = wb_inode_ctx_get (this, fd->inode); - if ((!IA_ISDIR (fd->inode->ia_type)) && (wb_inode == NULL)) { - gf_log (this->name, GF_LOG_WARNING, - "write behind wb_inode pointer is" - " not stored in context of inode(%p), " - "returning EBADFD", fd->inode); - op_errno = EBADFD; - goto unwind; - } - - if (wb_inode != NULL) { - local = mem_get0 (this->local_pool); - if (local == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - local->wb_inode = wb_inode; - frame->local = local; + wb_inode = wb_inode_create (this, fd->inode); + if (!wb_inode) + goto unwind; - stub = fop_flush_stub (frame, wb_flush_helper, fd, xdata); - if (stub == NULL) { - op_errno = ENOMEM; - goto unwind; - } + stub = fop_ftruncate_stub (frame, wb_ftruncate_helper, fd, + offset, xdata); + if (!stub) + goto unwind; - request = wb_enqueue (wb_inode, stub); - if (request == NULL) { - call_stub_destroy (stub); - op_errno = ENOMEM; - goto unwind; - } + if (!wb_enqueue (wb_inode, stub)) + goto unwind; - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } else { - if (conf->flush_behind) { - flush_frame = copy_frame (frame); - if (flush_frame == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - STACK_UNWIND_STRICT (flush, frame, 0, 0, NULL); - - STACK_WIND (flush_frame, wb_ffr_bg_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->flush, fd, xdata); - } else { - STACK_WIND (frame, wb_ffr_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->flush, fd, xdata); - } - } + wb_process_queue (wb_inode); return 0; unwind: - STACK_UNWIND_STRICT (flush, frame, -1, op_errno, NULL); - return 0; -} - - -static int32_t -wb_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, - int32_t op_errno, struct iatt *prebuf, struct iatt *postbuf, - dict_t *xdata) -{ - wb_local_t *local = NULL; - wb_inode_t *wb_inode = NULL; - wb_request_t *request = NULL; - int32_t ret = -1; - - GF_ASSERT (frame); - - local = frame->local; - wb_inode = local->wb_inode; - request = local->request; - - if (wb_inode != NULL) { - LOCK (&wb_inode->lock); - { - if (wb_inode->op_ret == -1) { - op_ret = wb_inode->op_ret; - op_errno = wb_inode->op_errno; - - wb_inode->op_ret = 0; - } - } - UNLOCK (&wb_inode->lock); - - if (request) { - wb_request_unref (request); - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - if (errno == ENOMEM) { - op_ret = -1; - op_errno = ENOMEM; - } - - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } - - } - - STACK_UNWIND_STRICT (fsync, frame, op_ret, op_errno, prebuf, postbuf, - xdata); + STACK_UNWIND_STRICT (ftruncate, frame, -1, ENOMEM, NULL, NULL, NULL); + if (stub) + call_stub_destroy (stub); return 0; } -static int32_t -wb_fsync_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, - int32_t datasync, dict_t *xdata) +int +wb_setattr_helper (call_frame_t *frame, xlator_t *this, loc_t *loc, + struct iatt *stbuf, int32_t valid, dict_t *xdata) { - STACK_WIND (frame, wb_fsync_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fsync, fd, datasync, xdata); + STACK_WIND (frame, default_setattr_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->setattr, loc, stbuf, valid, xdata); return 0; } -int32_t -wb_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync, - dict_t *xdata) +int +wb_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, + struct iatt *stbuf, int32_t valid, dict_t *xdata) { wb_inode_t *wb_inode = NULL; - wb_local_t *local = NULL; call_stub_t *stub = NULL; - wb_request_t *request = NULL; - int32_t ret = -1, op_errno = 0; - - GF_ASSERT (frame); - GF_VALIDATE_OR_GOTO_WITH_ERROR (frame->this->name, this, unwind, - op_errno, EINVAL); - GF_VALIDATE_OR_GOTO_WITH_ERROR (frame->this->name, fd, unwind, - op_errno, EINVAL); - wb_inode = wb_inode_ctx_get (this, fd->inode); - if (wb_inode == NULL && (!IA_ISDIR (fd->inode->ia_type))) { - gf_log (this->name, GF_LOG_WARNING, - "write behind wb_inode pointer is" - " not stored in context of inode(%p), " - "returning EBADFD", fd->inode); - op_errno = EBADFD; - goto unwind; - } + wb_inode = wb_inode_ctx_get (this, loc->inode); + if (!wb_inode) + goto noqueue; - local = mem_get0 (this->local_pool); - if (local == NULL) { - op_errno = ENOMEM; - goto unwind; - } - - frame->local = local; - local->wb_inode = wb_inode; - - if (wb_inode) { - stub = fop_fsync_stub (frame, wb_fsync_helper, fd, datasync, - xdata); - if (stub == NULL) { - op_errno = ENOMEM; - goto unwind; - } + stub = fop_setattr_stub (frame, wb_setattr_helper, loc, stbuf, + valid, xdata); + if (!stub) + goto unwind; - request = wb_enqueue (wb_inode, stub); - if (request == NULL) { - op_errno = ENOMEM; - call_stub_destroy (stub); - goto unwind; - } + if (!wb_enqueue (wb_inode, stub)) + goto unwind; - ret = wb_process_queue (frame, wb_inode); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "request queue processing failed"); - } - } else { - STACK_WIND (frame, wb_fsync_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fsync, fd, datasync, - xdata); - } + wb_process_queue (wb_inode); return 0; - unwind: - STACK_UNWIND_STRICT (fsync, frame, -1, op_errno, NULL, NULL, NULL); - return 0; -} - - -int32_t -wb_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, gf_dirent_t *entries, - dict_t *xdata) -{ - gf_dirent_t *entry = NULL; - - if (op_ret <= 0) { - goto unwind; - } + STACK_UNWIND_STRICT (setattr, frame, -1, ENOMEM, NULL, NULL, NULL); - list_for_each_entry (entry, &entries->list, list) { - if (!entry->inode) - continue; - wb_inode_create (this, entry->inode); - } + if (stub) + call_stub_destroy (stub); + return 0; -unwind: - STACK_UNWIND_STRICT (readdirp, frame, op_ret, op_errno, entries, xdata); +noqueue: + STACK_WIND (frame, default_setattr_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->setattr, loc, stbuf, valid, xdata); return 0; } -int32_t -wb_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, - size_t size, off_t off, dict_t *xdata) +int +wb_fsetattr_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, + struct iatt *stbuf, int32_t valid, dict_t *xdata) { - STACK_WIND (frame, wb_readdirp_cbk, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->readdirp, fd, size, off, xdata); + STACK_WIND (frame, default_fsetattr_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsetattr, fd, stbuf, valid, xdata); return 0; } -int32_t -wb_release (xlator_t *this, fd_t *fd) +int +wb_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd, + struct iatt *stbuf, int32_t valid, dict_t *xdata) { - uint64_t wb_file_ptr = 0; - wb_file_t *wb_file = NULL; + wb_inode_t *wb_inode = NULL; + call_stub_t *stub = NULL; - GF_VALIDATE_OR_GOTO ("write-behind", this, out); - GF_VALIDATE_OR_GOTO (this->name, fd, out); + wb_inode = wb_inode_ctx_get (this, fd->inode); + if (!wb_inode) + goto noqueue; - fd_ctx_del (fd, this, &wb_file_ptr); - wb_file = (wb_file_t *)(long) wb_file_ptr; + stub = fop_fsetattr_stub (frame, wb_fsetattr_helper, fd, stbuf, + valid, xdata); + if (!stub) + goto unwind; - GF_FREE (wb_file); + if (!wb_enqueue (wb_inode, stub)) + goto unwind; -out: + wb_process_queue (wb_inode); + + return 0; +unwind: + STACK_UNWIND_STRICT (fsetattr, frame, -1, ENOMEM, NULL, NULL, NULL); + + if (stub) + call_stub_destroy (stub); + return 0; + +noqueue: + STACK_WIND (frame, default_fsetattr_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->fsetattr, fd, stbuf, valid, xdata); return 0; } -int32_t +int wb_forget (xlator_t *this, inode_t *inode) { uint64_t tmp = 0; @@ -2917,15 +1639,16 @@ wb_forget (xlator_t *this, inode_t *inode) wb_inode = (wb_inode_t *)(long)tmp; - if (wb_inode != NULL) { - LOCK (&wb_inode->lock); - { - GF_ASSERT (list_empty (&wb_inode->request)); - } - UNLOCK (&wb_inode->lock); + if (!wb_inode) + return 0; - wb_inode_destroy (wb_inode); - } + LOCK (&wb_inode->lock); + { + GF_ASSERT (list_empty (&wb_inode->todo)); + GF_ASSERT (list_empty (&wb_inode->liability)); + GF_ASSERT (list_empty (&wb_inode->temptation)); + } + UNLOCK (&wb_inode->lock); return 0; } @@ -2950,10 +1673,8 @@ wb_priv_dump (xlator_t *this) gf_proc_dump_write ("aggregate_size", "%d", conf->aggregate_size); gf_proc_dump_write ("window_size", "%d", conf->window_size); - gf_proc_dump_write ("enable_O_SYNC", "%d", conf->enable_O_SYNC); gf_proc_dump_write ("flush_behind", "%d", conf->flush_behind); - gf_proc_dump_write ("enable_trickling_writes", "%d", - conf->enable_trickling_writes); + gf_proc_dump_write ("trickling_writes", "%d", conf->trickling_writes); ret = 0; out: @@ -2962,48 +1683,45 @@ out: void -__wb_dump_requests (struct list_head *head, char *prefix, char passive) +__wb_dump_requests (struct list_head *head, char *prefix) { char key[GF_DUMP_MAX_BUF_LEN] = {0, }; char key_prefix[GF_DUMP_MAX_BUF_LEN] = {0, }, flag = 0; - wb_request_t *request = NULL; + wb_request_t *req = NULL; - list_for_each_entry (request, head, list) { - gf_proc_dump_build_key (key, prefix, passive ? "passive-request" - : "active-request"); + list_for_each_entry (req, head, all) { gf_proc_dump_build_key (key_prefix, key, - (char *)gf_fop_list[request->fop]); + (char *)gf_fop_list[req->fop]); gf_proc_dump_add_section(key_prefix); - gf_proc_dump_write ("request-ptr", "%p", request); + gf_proc_dump_write ("request-ptr", "%p", req); - gf_proc_dump_write ("refcount", "%d", request->refcount); + gf_proc_dump_write ("refcount", "%d", req->refcount); - if (request->fop == GF_FOP_WRITE) { - flag = request->flags.write_request.stack_wound; - gf_proc_dump_write ("stack_wound", "%d", flag); + if (list_empty (&req->todo)) + gf_proc_dump_write ("wound", "yes"); + else + gf_proc_dump_write ("wound", "no"); + if (req->fop == GF_FOP_WRITE) { gf_proc_dump_write ("size", "%"GF_PRI_SIZET, - request->write_size); + req->write_size); gf_proc_dump_write ("offset", "%"PRId64, - request->stub->args.writev.off); + req->stub->args.writev.off); - flag = request->flags.write_request.write_behind; - gf_proc_dump_write ("write_behind", "%d", flag); + flag = req->ordering.lied; + gf_proc_dump_write ("lied", "%d", flag); - flag = request->flags.write_request.got_reply; - gf_proc_dump_write ("got_reply", "%d", flag); + flag = req->ordering.append; + gf_proc_dump_write ("append", "%d", flag); - flag = request->flags.write_request.virgin; - gf_proc_dump_write ("virgin", "%d", flag); + flag = req->ordering.fulfilled; + gf_proc_dump_write ("fulfilled", "%d", flag); - flag = request->flags.write_request.flush_all; - gf_proc_dump_write ("flush_all", "%d", flag); - } else { - flag = request->flags.other_requests.marked_for_resume; - gf_proc_dump_write ("marked_for_resume", "%d", flag); + flag = req->ordering.go; + gf_proc_dump_write ("go", "%d", flag); } } } @@ -3047,22 +1765,14 @@ wb_inode_dump (xlator_t *this, inode_t *inode) gf_proc_dump_write ("window_current", "%"GF_PRI_SIZET, wb_inode->window_current); - gf_proc_dump_write ("aggregate_current", "%"GF_PRI_SIZET, - wb_inode->aggregate_current); - gf_proc_dump_write ("op_ret", "%d", wb_inode->op_ret); gf_proc_dump_write ("op_errno", "%d", wb_inode->op_errno); LOCK (&wb_inode->lock); { - if (!list_empty (&wb_inode->request)) { - __wb_dump_requests (&wb_inode->request, key_prefix, 0); - } - - if (!list_empty (&wb_inode->passive_requests)) { - __wb_dump_requests (&wb_inode->passive_requests, - key_prefix, 1); + if (!list_empty (&wb_inode->all)) { + __wb_dump_requests (&wb_inode->all, key_prefix); } } UNLOCK (&wb_inode->lock); @@ -3074,65 +1784,6 @@ out: int -wb_fd_dump (xlator_t *this, fd_t *fd) -{ - wb_file_t *wb_file = NULL; - char *path = NULL; - char key_prefix[GF_DUMP_MAX_BUF_LEN] = {0, }; - int ret = -1; - gf_boolean_t section_added = _gf_false; - - gf_proc_dump_build_key (key_prefix, "xlator.performance.write-behind", - "wb_file"); - - if ((fd == NULL) || (this == NULL)) { - goto out; - } - - ret = TRY_LOCK(&fd->lock); - if (ret) - goto out; - { - wb_file = __wb_fd_ctx_get (this, fd); - } - UNLOCK(&fd->lock); - - if (wb_file == NULL) { - goto out; - } - - gf_proc_dump_add_section (key_prefix); - section_added = _gf_true; - - __inode_path (fd->inode, NULL, &path); - if (path != NULL) { - gf_proc_dump_write ("path", "%s", path); - GF_FREE (path); - } - - gf_proc_dump_write ("fd", "%p", fd); - - gf_proc_dump_write ("flags", "%d", wb_file->flags); - - gf_proc_dump_write ("flags", "%s", - (wb_file->flags & O_APPEND) ? "O_APPEND" - : "!O_APPEND"); - - gf_proc_dump_write ("disabled", "%d", wb_file->disabled); - -out: - if (ret && fd && this) { - if (_gf_false == section_added) - gf_proc_dump_add_section (key_prefix); - gf_proc_dump_write ("Unable to dump the fd", - "(Lock acquisition failed) %s", - uuid_utoa (fd->inode->gfid)); - } - return 0; -} - - -int32_t mem_acct_init (xlator_t *this) { int ret = -1; @@ -3166,6 +1817,14 @@ reconfigure (xlator_t *this, dict_t *options) GF_OPTION_RECONF ("flush-behind", conf->flush_behind, options, bool, out); + GF_OPTION_RECONF ("trickling-writes", conf->trickling_writes, options, + bool, out); + + GF_OPTION_RECONF ("strict-O_DIRECT", conf->strict_O_DIRECT, options, + bool, out); + + GF_OPTION_RECONF ("strict-write-ordering", conf->strict_write_ordering, + options, bool, out); ret = 0; out: return ret; @@ -3196,8 +1855,6 @@ init (xlator_t *this) goto out; } - GF_OPTION_INIT("enable-O_SYNC", conf->enable_O_SYNC, bool, out); - /* configure 'options aggregate-size <size>' */ conf->aggregate_size = WB_AGGREGATE_SIZE; @@ -3223,16 +1880,12 @@ init (xlator_t *this) /* configure 'option flush-behind <on/off>' */ GF_OPTION_INIT ("flush-behind", conf->flush_behind, bool, out); - GF_OPTION_INIT ("enable-trickling-writes", - conf->enable_trickling_writes, bool, out); + GF_OPTION_INIT ("trickling-writes", conf->trickling_writes, bool, out); - this->local_pool = mem_pool_new (wb_local_t, 64); - if (!this->local_pool) { - ret = -1; - gf_log (this->name, GF_LOG_ERROR, - "failed to create local_t's memory pool"); - goto out; - } + GF_OPTION_INIT ("strict-O_DIRECT", conf->strict_O_DIRECT, bool, out); + + GF_OPTION_INIT ("strict-write-ordering", conf->strict_write_ordering, + bool, out); this->private = conf; ret = 0; @@ -3266,10 +1919,7 @@ out: struct xlator_fops fops = { - .lookup = wb_lookup, .writev = wb_writev, - .open = wb_open, - .create = wb_create, .readv = wb_readv, .flush = wb_flush, .fsync = wb_fsync, @@ -3278,20 +1928,21 @@ struct xlator_fops fops = { .truncate = wb_truncate, .ftruncate = wb_ftruncate, .setattr = wb_setattr, - .readdirp = wb_readdirp, + .fsetattr = wb_fsetattr, }; + struct xlator_cbks cbks = { .forget = wb_forget, - .release = wb_release, }; + struct xlator_dumpops dumpops = { .priv = wb_priv_dump, .inodectx = wb_inode_dump, - .fdctx = wb_fd_dump, }; + struct volume_options options[] = { { .key = {"flush-behind"}, .type = GF_OPTION_TYPE_BOOL, @@ -3300,7 +1951,7 @@ struct volume_options options[] = { "translator to perform flush in background, by " "returning success (or any errors, if any of " "previous writes were failed) to application even " - "before flush is sent to backend filesystem. " + "before flush FOP is sent to backend filesystem. " }, { .key = {"cache-size", "window-size"}, .type = GF_OPTION_TYPE_SIZET, @@ -3309,21 +1960,20 @@ struct volume_options options[] = { .default_value = "1MB", .description = "Size of the write-behind buffer for a single file " "(inode)." - - }, - { .key = {"disable-for-first-nbytes"}, - .type = GF_OPTION_TYPE_SIZET, - .min = 0, - .max = 1 * GF_UNIT_MB, - .default_value = "0", }, - { .key = {"enable-O_SYNC"}, + { .key = {"trickling-writes"}, .type = GF_OPTION_TYPE_BOOL, .default_value = "on", }, - { .key = {"enable-trickling-writes"}, + { .key = {"strict-O_DIRECT"}, .type = GF_OPTION_TYPE_BOOL, - .default_value = "on", + .default_value = "off", + }, + { .key = {"strict-write-ordering"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .description = "Do not let later writes overtake earlier writes even " + "if they do not overlap", }, { .key = {NULL} }, }; |