diff options
Diffstat (limited to 'xlators/performance/write-behind')
-rw-r--r-- | xlators/performance/write-behind/src/write-behind.c | 480 |
1 files changed, 376 insertions, 104 deletions
diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c index 1adda4eaff4..285420526f4 100644 --- a/xlators/performance/write-behind/src/write-behind.c +++ b/xlators/performance/write-behind/src/write-behind.c @@ -107,6 +107,14 @@ typedef struct wb_inode { size_t size; /* Size of the file to catch write after EOF. */ gf_lock_t lock; xlator_t *this; + int dontsync; /* If positive, dont pick lies for + * winding. This is needed to break infinite + * recursion during invocation of + * wb_process_queue from + * wb_fulfill_cbk in case of an + * error during fulfill. + */ + } wb_inode_t; @@ -144,6 +152,8 @@ typedef struct wb_request { request arrival */ fd_t *fd; + int wind_count; /* number of sync-attempts. Only + for debug purposes */ struct { size_t size; /* 0 size == till infinity */ off_t off; @@ -164,6 +174,7 @@ typedef struct wb_conf { gf_boolean_t trickling_writes; gf_boolean_t strict_write_ordering; gf_boolean_t strict_O_DIRECT; + gf_boolean_t resync_after_fsync; } wb_conf_t; @@ -202,26 +213,6 @@ out: } -gf_boolean_t -wb_fd_err (fd_t *fd, xlator_t *this, int32_t *op_errno) -{ - gf_boolean_t err = _gf_false; - uint64_t value = 0; - int32_t tmp = 0; - - if (fd_ctx_get (fd, this, &value) == 0) { - if (op_errno) { - tmp = value; - *op_errno = tmp; - } - - err = _gf_true; - } - - return err; -} - - /* Below is a succinct explanation of the code deciding whether two regions overlap, from Pavan <tcp@gluster.com>. @@ -305,17 +296,17 @@ wb_requests_conflict (wb_request_t *lie, wb_request_t *req) } -gf_boolean_t +wb_request_t * wb_liability_has_conflict (wb_inode_t *wb_inode, wb_request_t *req) { wb_request_t *each = NULL; list_for_each_entry (each, &wb_inode->liability, lie) { if (wb_requests_conflict (each, req)) - return _gf_true; + return each; } - return _gf_false; + return NULL; } @@ -552,6 +543,9 @@ wb_enqueue_common (wb_inode_t *wb_inode, call_stub_t *stub, int tempted) break; default: + if (stub && stub->args.fd) + req->fd = fd_ref (stub->args.fd); + break; } @@ -679,6 +673,88 @@ __wb_fulfill_request (wb_request_t *req) } +/* get a flush/fsync waiting on req */ +wb_request_t * +__wb_request_waiting_on (wb_request_t *req) +{ + wb_inode_t *wb_inode = NULL; + wb_request_t *trav = NULL; + + wb_inode = req->wb_inode; + + list_for_each_entry (trav, &wb_inode->todo, todo) { + if ((trav->fd == req->fd) + && ((trav->stub->fop == GF_FOP_FLUSH) + || (trav->stub->fop == GF_FOP_FSYNC)) + && (trav->gen >= req->gen)) + return trav; + } + + return NULL; +} + + +void +__wb_fulfill_request_err (wb_request_t *req, int32_t op_errno) +{ + wb_inode_t *wb_inode = NULL; + wb_request_t *waiter = NULL; + wb_conf_t *conf = NULL; + + wb_inode = req->wb_inode; + + conf = wb_inode->this->private; + + req->op_ret = -1; + req->op_errno = op_errno; + + if (req->ordering.lied) + waiter = __wb_request_waiting_on (req); + + if (!req->ordering.lied || waiter) { + if (!req->ordering.lied) { + /* response to app is still pending, send failure in + * response. + */ + } else { + /* response was sent, store the error in a + * waiter (either an fsync or flush). + */ + waiter->op_ret = -1; + waiter->op_errno = op_errno; + } + + if (!req->ordering.lied + || (waiter->stub->fop == GF_FOP_FLUSH) + || ((waiter->stub->fop == GF_FOP_FSYNC) + && !conf->resync_after_fsync)) { + /* No retry needed, forget the request */ + __wb_fulfill_request (req); + return; + } + } + + /* response was unwound and no waiter waiting on this request, retry + till a flush or fsync (subject to conf->resync_after_fsync). + */ + wb_inode->transit -= req->total_size; + + req->total_size = 0; + + list_del_init (&req->winds); + list_del_init (&req->todo); + list_del_init (&req->wip); + + /* sanitize ordering flags to retry */ + req->ordering.go = 0; + + /* Add back to todo list to retry */ + list_add (&req->todo, &wb_inode->todo); + + return; +} + + void wb_head_done (wb_request_t *head) { @@ -693,6 +769,7 @@ wb_head_done (wb_request_t *head) list_for_each_entry_safe (req, tmp, &head->winds, winds) { __wb_fulfill_request (req); } + __wb_fulfill_request (head); } UNLOCK (&wb_inode->lock); @@ -700,29 +777,130 @@ wb_head_done (wb_request_t *head) void +__wb_fulfill_err (wb_request_t *head, int op_errno) +{ + wb_request_t *req = NULL, *tmp = NULL; + + if (!head) + goto out; + + head->wb_inode->dontsync++; + + list_for_each_entry_safe_reverse (req, tmp, &head->winds, + winds) { + __wb_fulfill_request_err (req, op_errno); + } + + __wb_fulfill_request_err (head, op_errno); + +out: + return; +} + + +void wb_fulfill_err (wb_request_t *head, int op_errno) { - wb_inode_t *wb_inode; - wb_request_t *req; + wb_inode_t *wb_inode = NULL; wb_inode = head->wb_inode; - /* for all future requests yet to arrive */ - fd_ctx_set (head->fd, THIS, op_errno); - LOCK (&wb_inode->lock); { - /* for all requests already arrived */ - list_for_each_entry (req, &wb_inode->all, all) { - if (req->fd != head->fd) - continue; - req->op_ret = -1; - req->op_errno = op_errno; - } + __wb_fulfill_err (head, op_errno); + } UNLOCK (&wb_inode->lock); } +inline void +__wb_modify_write_request (wb_request_t *req, int synced_size, + int head_total_size) +{ + struct iovec *vector = NULL; + int count = 0; + + if (!req || synced_size == 0) + goto out; + + req->write_size -= synced_size; + req->stub->args.offset += synced_size; + req->total_size = head_total_size; + + vector = req->stub->args.vector; + count = req->stub->args.count; + + req->stub->args.count = iov_subset (vector, count, synced_size, + iov_length (vector, count), vector); + +out: + return; +} + +int +__wb_fulfill_short_write (wb_request_t *req, int size, int total_size) +{ + int accounted_size = 0; + + if (req == NULL) + goto out; + + if (req->write_size <= size) { + accounted_size = req->write_size; + __wb_fulfill_request (req); + } else { + accounted_size = size; + __wb_modify_write_request (req, size, total_size); + } + +out: + return accounted_size; +} + +void +wb_fulfill_short_write (wb_request_t *head, int size) +{ + wb_inode_t *wb_inode = NULL; + wb_request_t *req = NULL, *tmp = NULL; + int total_size = 0, accounted_size = 0; + + if (!head) + goto out; + + wb_inode = head->wb_inode; + + total_size = head->total_size - size; + head->total_size = size; + + req = head; + + LOCK (&wb_inode->lock); + { + accounted_size = __wb_fulfill_short_write (head, size, + total_size); + + size -= accounted_size; + + if (size == 0) + goto done; + + list_for_each_entry_safe (req, tmp, &head->winds, winds) { + accounted_size = __wb_fulfill_short_write (req, size, + total_size); + size -= accounted_size; + + if (size == 0) + break; + + } + } +done: + UNLOCK (&wb_inode->lock); + + wb_fulfill_err (req, EIO); +out: + return; +} int wb_fulfill_cbk (call_frame_t *frame, void *cookie, xlator_t *this, @@ -740,18 +918,10 @@ wb_fulfill_cbk (call_frame_t *frame, void *cookie, xlator_t *this, if (op_ret == -1) { wb_fulfill_err (head, op_errno); } else if (op_ret < head->total_size) { - /* - * We've encountered a short write, for whatever reason. - * Set an EIO error for the next fop. This should be - * valid for writev or flush (close). - * - * TODO: Retry the write so we can potentially capture - * a real error condition (i.e., ENOSPC). - */ - wb_fulfill_err (head, EIO); - } - - wb_head_done (head); + wb_fulfill_short_write (head, op_ret); + } else { + wb_head_done (head); + } wb_process_queue (wb_inode); @@ -776,10 +946,6 @@ wb_fulfill_head (wb_inode_t *wb_inode, wb_request_t *head) int count = 0; wb_request_t *req = NULL; call_frame_t *frame = NULL; - gf_boolean_t fderr = _gf_false; - xlator_t *this = NULL; - - this = THIS; /* make sure head->total_size is updated before we run into any * errors @@ -795,11 +961,6 @@ wb_fulfill_head (wb_inode_t *wb_inode, wb_request_t *head) goto err; } - if (wb_fd_err (head->fd, this, NULL)) { - fderr = _gf_true; - goto err; - } - frame = create_frame (wb_inode->this, wb_inode->this->ctx->pool); if (!frame) goto err; @@ -822,26 +983,21 @@ wb_fulfill_head (wb_inode_t *wb_inode, wb_request_t *head) return 0; err: - if (!fderr) { - /* frame creation failure */ - fderr = ENOMEM; - wb_fulfill_err (head, fderr); - } - - wb_head_done (head); + /* frame creation failure */ + wb_fulfill_err (head, ENOMEM); - return fderr; + return ENOMEM; } -#define NEXT_HEAD(head, req) do { \ - if (head) \ - ret |= wb_fulfill_head (wb_inode, head); \ - head = req; \ - expected_offset = req->stub->args.offset + \ - req->write_size; \ - curr_aggregate = 0; \ - vector_count = 0; \ +#define NEXT_HEAD(head, req) do { \ + if (head) \ + ret |= wb_fulfill_head (wb_inode, head); \ + head = req; \ + expected_offset = req->stub->args.offset + \ + req->write_size; \ + curr_aggregate = 0; \ + vector_count = 0; \ } while (0) @@ -1053,6 +1209,17 @@ __wb_preprocess_winds (wb_inode_t *wb_inode) conf = wb_inode->this->private; list_for_each_entry_safe (req, tmp, &wb_inode->todo, todo) { + if (wb_inode->dontsync && req->ordering.lied) { + /* sync has failed. Don't pick lies _again_ for winding + * as winding these lies again will trigger an infinite + * recursion of wb_process_queue being called from a + * failed fulfill. However, pick non-lied requests for + * winding so that application wont block indefinitely + * waiting for write result. + */ + continue; + } + if (!req->ordering.tempted) { if (holder) { if (wb_requests_conflict (holder, req)) @@ -1124,20 +1291,96 @@ __wb_preprocess_winds (wb_inode_t *wb_inode) if (conf->trickling_writes && !wb_inode->transit && holder) holder->ordering.go = 1; + if (wb_inode->dontsync > 0) + wb_inode->dontsync--; + return; } +int +__wb_handle_failed_conflict (wb_request_t *req, wb_request_t *conflict, + list_head_t *tasks) +{ + wb_conf_t *conf = NULL; + + conf = req->wb_inode->this->private; + + if ((req->stub->fop != GF_FOP_FLUSH) + && ((req->stub->fop != GF_FOP_FSYNC) || conf->resync_after_fsync)) { + if (!req->ordering.lied && list_empty (&conflict->wip)) { + /* If request itself is in liability queue, + * 1. We cannot unwind as the response has already been + * sent. + * 2. We cannot wind till conflict clears up. + * 3. So, skip the request for now. + * 4. Otherwise, resume (unwind) it with error. + */ + req->op_ret = -1; + req->op_errno = conflict->op_errno; + + list_del_init (&req->todo); + list_add_tail (&req->winds, tasks); + } + } else { + /* flush and fsync (without conf->resync_after_fsync) act as + barriers. We cannot unwind them out of + order, when there are earlier generation writes just because + there is a conflicting liability with an error. So, wait for + our turn till there are no conflicting liabilities. + + This situation can arise when there liabilities spread across + multiple generations. For eg., consider two writes with + following characterstics: + + 1. they belong to different generations gen1, gen2 and + (gen1 > gen2). + 2. they overlap. + 3. both are liabilities. + 4. gen1 write was attempted to sync, but the attempt failed. + 5. there was no attempt to sync gen2 write yet. + 6. A flush (as part of close) is issued and gets a gen no + gen3. + + In the above scenario, if flush is unwound without waiting + for gen1 and gen2 writes either to be successfully synced or + purged, we end up with these two writes in wb_inode->todo + list forever as there will be no attempt to process the queue + as flush is the last operation. + */ + } + + return 0; +} + -void +int __wb_pick_winds (wb_inode_t *wb_inode, list_head_t *tasks, list_head_t *liabilities) { - wb_request_t *req = NULL; - wb_request_t *tmp = NULL; + wb_request_t *req = NULL; + wb_request_t *tmp = NULL; + wb_request_t *conflict = NULL; list_for_each_entry_safe (req, tmp, &wb_inode->todo, todo) { - if (wb_liability_has_conflict (wb_inode, req)) - continue; + conflict = wb_liability_has_conflict (wb_inode, req); + if (conflict) { + if (conflict->op_ret == -1) { + /* There is a conflicting liability which failed + * to sync in previous attempts, resume the req + * and fail, unless its an fsync/flush. + */ + + __wb_handle_failed_conflict (req, conflict, + tasks); + } else { + /* There is a conflicting liability which was + * not attempted to sync even once. Wait till + * atleast one attempt to sync is made. + */ + } + + continue; + } if (req->ordering.tempted && !req->ordering.go) /* wait some more */ @@ -1148,6 +1391,7 @@ __wb_pick_winds (wb_inode_t *wb_inode, list_head_t *tasks, continue; list_add_tail (&req->wip, &wb_inode->wip); + req->wind_count++; if (!req->ordering.tempted) /* unrefed in wb_writev_cbk */ @@ -1162,6 +1406,8 @@ __wb_pick_winds (wb_inode_t *wb_inode, list_head_t *tasks, else list_add_tail (&req->winds, tasks); } + + return 0; } @@ -1174,7 +1420,12 @@ wb_do_winds (wb_inode_t *wb_inode, list_head_t *tasks) list_for_each_entry_safe (req, tmp, tasks, winds) { list_del_init (&req->winds); - call_resume (req->stub); + if (req->op_ret == -1) { + call_unwind_error (req->stub, req->op_ret, + req->op_errno); + } else { + call_resume (req->stub); + } wb_request_unref (req); } @@ -1184,10 +1435,10 @@ wb_do_winds (wb_inode_t *wb_inode, list_head_t *tasks) void wb_process_queue (wb_inode_t *wb_inode) { - list_head_t tasks = {0, }; - list_head_t lies = {0, }; - list_head_t liabilities = {0, }; - int retry = 0; + list_head_t tasks = {0, }; + list_head_t lies = {0, }; + list_head_t liabilities = {0, }; + int wind_failure = 0; INIT_LIST_HEAD (&tasks); INIT_LIST_HEAD (&lies); @@ -1209,13 +1460,12 @@ wb_process_queue (wb_inode_t *wb_inode) wb_do_winds (wb_inode, &tasks); - /* fd might've been marked bad due to previous errors. - * Since, caller of wb_process_queue might be the last fop on - * inode, make sure we keep processing request queue, till there - * are no requests left. + /* If there is an error in wb_fulfill before winding write + * requests, we would miss invocation of wb_process_queue + * from wb_fulfill_cbk. So, retry processing again. */ - retry = wb_fulfill (wb_inode, &liabilities); - } while (retry); + wind_failure = wb_fulfill (wb_inode, &liabilities); + } while (wind_failure); return; } @@ -1285,10 +1535,6 @@ wb_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, conf = this->private; - if (wb_fd_err (fd, this, &op_errno)) { - goto unwind; - } - wb_inode = wb_inode_create (this, fd->inode); if (!wb_inode) { op_errno = ENOMEM; @@ -1309,7 +1555,7 @@ wb_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, count, offset, flags, iobref, xdata); else stub = fop_writev_stub (frame, NULL, fd, vector, count, offset, - flags, iobref, xdata); + flags, iobref, xdata); if (!stub) { op_errno = ENOMEM; goto unwind; @@ -1413,10 +1659,6 @@ wb_flush_helper (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) goto unwind; } - if (wb_fd_err (fd, this, &op_errno)) { - op_ret = -1; - goto unwind; - } if (conf->flush_behind) goto flushbehind; @@ -1495,9 +1737,6 @@ wb_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync, call_stub_t *stub = NULL; int32_t op_errno = EINVAL; - if (wb_fd_err (fd, this, &op_errno)) - goto unwind; - wb_inode = wb_inode_ctx_get (this, fd->inode); if (!wb_inode) goto noqueue; @@ -1720,9 +1959,6 @@ wb_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, goto unwind; } - if (wb_fd_err (fd, this, &op_errno)) - goto unwind; - frame->local = wb_inode; stub = fop_ftruncate_stub (frame, wb_ftruncate_helper, fd, @@ -2003,7 +2239,18 @@ __wb_dump_requests (struct list_head *head, char *prefix) else gf_proc_dump_write ("wound", "no"); + gf_proc_dump_write ("generation-number", "%d", req->gen); + + gf_proc_dump_write ("req->op_ret", "%d", req->op_ret); + gf_proc_dump_write ("req->op_errno", "%d", req->op_errno); + gf_proc_dump_write ("sync-attempts", "%d", req->wind_count); + if (req->fop == GF_FOP_WRITE) { + if (list_empty (&req->wip)) + gf_proc_dump_write ("sync-in-progress", "no"); + else + gf_proc_dump_write ("sync-in-progress", "yes"); + gf_proc_dump_write ("size", "%"GF_PRI_SIZET, req->write_size); @@ -2021,6 +2268,7 @@ __wb_dump_requests (struct list_head *head, char *prefix) flag = req->ordering.go; gf_proc_dump_write ("go", "%d", flag); + } } } @@ -2066,6 +2314,11 @@ wb_inode_dump (xlator_t *this, inode_t *inode) wb_inode->window_current); + gf_proc_dump_write ("transit-size", "%"GF_PRI_SIZET, + wb_inode->transit); + + gf_proc_dump_write ("dontsync", "%d", wb_inode->dontsync); + ret = TRY_LOCK (&wb_inode->lock); if (!ret) { @@ -2117,7 +2370,8 @@ reconfigure (xlator_t *this, dict_t *options) conf = this->private; - GF_OPTION_RECONF ("cache-size", conf->window_size, options, size_uint64, out); + GF_OPTION_RECONF ("cache-size", conf->window_size, options, size_uint64, + out); GF_OPTION_RECONF ("flush-behind", conf->flush_behind, options, bool, out); @@ -2130,6 +2384,9 @@ reconfigure (xlator_t *this, dict_t *options) GF_OPTION_RECONF ("strict-write-ordering", conf->strict_write_ordering, options, bool, out); + GF_OPTION_RECONF ("resync-failed-syncs-after-fsync", + conf->resync_after_fsync, options, bool, out); + ret = 0; out: return ret; @@ -2196,6 +2453,9 @@ init (xlator_t *this) GF_OPTION_INIT ("strict-write-ordering", conf->strict_write_ordering, bool, out); + GF_OPTION_INIT ("resync-failed-syncs-after-fsync", + conf->resync_after_fsync, bool, out); + this->private = conf; ret = 0; @@ -2287,5 +2547,17 @@ struct volume_options options[] = { .description = "Do not let later writes overtake earlier writes even " "if they do not overlap", }, + { .key = {"resync-failed-syncs-after-fsync"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .description = "If sync of \"cached-writes issued before fsync\" " + "(to backend) fails, this option configures whether " + "to retry syncing them after fsync or forget them. " + "If set to on, cached-writes are retried " + "till a \"flush\" fop (or a successful sync) on sync " + "failures. " + "fsync itself is failed irrespective of the value of " + "this option. ", + }, { .key = {NULL} }, }; |