diff options
author | Raghavendra G <raghavendra@gluster.com> | 2010-06-18 02:18:02 +0000 |
---|---|---|
committer | Anand V. Avati <avati@dev.gluster.com> | 2010-07-02 04:44:46 -0700 |
commit | 3dc79ca8e6119f5ff61058cc87f9a4fc251017ef (patch) | |
tree | bc19de662216017cc8d8d5ab5d06d5972de812c4 /xlators/performance/write-behind/src/write-behind.c | |
parent | 01923eed1115e53c5be9fba3e72f75c7c631bf95 (diff) |
performance/write-behind: explicitly enforce ordering of overlapping writes.
- If there are non-contiguous offsets (offsets which do not start where
previous write ended), wait for completion of previous writes to server,
before sending new ones.
- Send flush call to server only when all writes are completed.
- If a file is opened with O_APPEND, at any point of time a maximum only one
write call to server should be in transit. This is to avoid reordering of
writes in the presence of afr which can result in data corruption.
See bug #934 for more details.
Signed-off-by: Raghavendra G <raghavendra@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 970 (extracting kernel tarball hangs midway.)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=970
Diffstat (limited to 'xlators/performance/write-behind/src/write-behind.c')
-rw-r--r-- | xlators/performance/write-behind/src/write-behind.c | 452 |
1 files changed, 279 insertions, 173 deletions
diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c index bd66a7ad5..a71d3a378 100644 --- a/xlators/performance/write-behind/src/write-behind.c +++ b/xlators/performance/write-behind/src/write-behind.c @@ -52,6 +52,7 @@ typedef struct wb_file { uint64_t disable_till; size_t window_conf; size_t window_current; + int32_t flags; size_t aggregate_current; int32_t refcount; int32_t op_ret; @@ -79,6 +80,14 @@ typedef struct wb_request { char stack_wound; char got_reply; char virgin; + char flush_all; /* while trying to sync to back-end, + * don't wait till a data of size + * equal to configured aggregate-size + * is accumulated, instead sync + * whatever data currently present in + * request queue. + */ + }write_request; struct { @@ -116,26 +125,28 @@ typedef struct wb_page wb_page_t; int32_t -wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all); +wb_process_queue (call_frame_t *frame, wb_file_t *file); ssize_t wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds); ssize_t __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_size, - char wind_all, char enable_trickling_writes); + char enable_trickling_writes); -static void +static int __wb_request_unref (wb_request_t *this) { + int ret = -1; + if (this->refcount <= 0) { gf_log ("wb-request", GF_LOG_DEBUG, "refcount(%d) is <= 0", this->refcount); - return; + goto out; } - this->refcount--; + ret = --this->refcount; if (this->refcount == 0) { list_del_init (&this->list); if (this->stub && this->stub->fop == GF_FOP_WRITE) { @@ -144,25 +155,33 @@ __wb_request_unref (wb_request_t *this) GF_FREE (this); } + +out: + return ret; } -static void +static int wb_request_unref (wb_request_t *this) { wb_file_t *file = NULL; + int ret = 0; + if (this == NULL) { gf_log ("wb-request", GF_LOG_DEBUG, "request is NULL"); - return; + goto out; } file = this->file; LOCK (&file->lock); { - __wb_request_unref (this); + ret = __wb_request_unref (this); } UNLOCK (&file->lock); + +out: + return ret; } @@ -204,11 +223,11 @@ wb_request_ref (wb_request_t *this) wb_request_t * wb_enqueue (wb_file_t *file, call_stub_t *stub) { - wb_request_t *request = NULL; - call_frame_t *frame = NULL; - wb_local_t *local = NULL; - struct iovec *vector = NULL; - int32_t count = 0; + wb_request_t *request = NULL, *tmp = NULL; + call_frame_t *frame = NULL; + wb_local_t *local = NULL; + struct iovec *vector = NULL; + int32_t count = 0; request = GF_CALLOC (1, sizeof (*request), gf_wb_mt_wb_request_t); if (request == NULL) { @@ -254,6 +273,13 @@ wb_enqueue (wb_file_t *file, call_stub_t *stub) file->aggregate_current += request->write_size; } else { + list_for_each_entry (tmp, &file->request, list) { + if (tmp->stub && tmp->stub->fop + == GF_FOP_WRITE) { + tmp->flags.write_request.flush_all = 1; + } + } + /*reference for resuming */ __wb_request_ref (request); } @@ -266,7 +292,7 @@ out: wb_file_t * -wb_file_create (xlator_t *this, fd_t *fd) +wb_file_create (xlator_t *this, fd_t *fd, int32_t flags) { wb_file_t *file = NULL; wb_conf_t *conf = this->private; @@ -288,6 +314,7 @@ wb_file_create (xlator_t *this, fd_t *fd) file->this = this; file->refcount = 1; file->window_conf = conf->window_size; + file->flags = flags; fd_ctx_set (fd, this, (uint64_t)(long)file); @@ -359,7 +386,7 @@ wb_sync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, } UNLOCK (&file->lock); - ret = wb_process_queue (frame, file, 0); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { LOCK (&file->lock); { @@ -393,6 +420,12 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds) size_t bytecount = 0; wb_conf_t *conf = NULL; fd_t *fd = NULL; + int32_t op_errno = -1; + + if (frame == NULL) { + op_errno = EINVAL; + goto out; + } conf = file->this->private; list_for_each_entry (request, winds, winds) { @@ -403,6 +436,8 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds) } if (total_count == 0) { + gf_log (file->this->name, GF_LOG_DEBUG, "no vectors are to be" + "synced"); goto out; } @@ -412,12 +447,18 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds) gf_wb_mt_iovec); if (vector == NULL) { bytes = -1; + op_errno = ENOMEM; + gf_log (file->this->name, GF_LOG_ERROR, + "out of memory"); goto out; } iobref = iobref_new (); if (iobref == NULL) { bytes = -1; + op_errno = ENOMEM; + gf_log (file->this->name, GF_LOG_ERROR, + "out of memory"); goto out; } @@ -425,6 +466,9 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds) gf_wb_mt_wb_local_t); if (local == NULL) { bytes = -1; + op_errno = ENOMEM; + gf_log (file->this->name, GF_LOG_ERROR, + "out of memory"); goto out; } @@ -466,6 +510,9 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds) sync_frame = copy_frame (frame); if (sync_frame == NULL) { bytes = -1; + op_errno = ENOMEM; + gf_log (file->this->name, GF_LOG_ERROR, + "out of memory"); goto out; } @@ -508,6 +555,14 @@ out: } if (local != NULL) { + /* had we winded these requests, we would have unrefed + * in wb_sync_cbk. + */ + list_for_each_entry_safe (request, dummy, &local->winds, + winds) { + wb_request_unref (request); + } + GF_FREE (local); } @@ -519,6 +574,27 @@ out: GF_FREE (vector); } + if (bytes == -1) { + /* + * had we winded these requests, we would have unrefed + * in wb_sync_cbk. + */ + + list_for_each_entry_safe (request, dummy, &local->winds, + winds) { + wb_request_unref (request); + } + + if (file != NULL) { + LOCK (&file->lock); + { + file->op_ret = -1; + file->op_errno = op_errno; + } + UNLOCK (&file->lock); + } + } + return bytes; } @@ -553,7 +629,7 @@ wb_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, } if (process_frame != NULL) { - ret = wb_process_queue (process_frame, file, 0); + ret = wb_process_queue (process_frame, file); if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) { LOCK (&file->lock); { @@ -639,7 +715,7 @@ wb_stat (call_frame_t *frame, xlator_t *this, loc_t *loc) goto unwind; } - ret = wb_process_queue (frame, file, 1); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { op_errno = ENOMEM; goto unwind; @@ -683,7 +759,7 @@ wb_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, request = local->request; if ((file != NULL) && (request != NULL)) { wb_request_unref (request); - ret = wb_process_queue (frame, file, 0); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { op_ret = -1; op_errno = ENOMEM; @@ -757,7 +833,7 @@ wb_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd) /* FIXME:should the request queue be emptied in case of error? */ - ret = wb_process_queue (frame, file, 1); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { op_errno = ENOMEM; goto unwind; @@ -813,7 +889,7 @@ wb_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, } if (process_frame != NULL) { - ret = wb_process_queue (process_frame, file, 0); + ret = wb_process_queue (process_frame, file); if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) { LOCK (&file->lock); { @@ -906,7 +982,7 @@ wb_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset) goto unwind; } - ret = wb_process_queue (frame, file, 1); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { op_errno = ENOMEM; goto unwind; @@ -949,7 +1025,7 @@ wb_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, if ((request != NULL) && (file != NULL)) { wb_request_unref (request); - ret = wb_process_queue (frame, file, 0); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { op_ret = -1; op_errno = ENOMEM; @@ -1026,7 +1102,7 @@ wb_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset) goto unwind; } - ret = wb_process_queue (frame, file, 1); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { op_errno = ENOMEM; goto unwind; @@ -1055,7 +1131,8 @@ unwind: int32_t wb_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, - int32_t op_ret, int32_t op_errno, struct iatt *statpre, struct iatt *statpost) + int32_t op_ret, int32_t op_errno, struct iatt *statpre, + struct iatt *statpost) { wb_local_t *local = NULL; wb_request_t *request = NULL; @@ -1076,14 +1153,15 @@ wb_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, } } - STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, statpre, statpost); + STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, statpre, + statpost); if (request) { wb_request_unref (request); } if (request && (process_frame != NULL)) { - ret = wb_process_queue (process_frame, file, 0); + ret = wb_process_queue (process_frame, file); if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) { LOCK (&file->lock); { @@ -1188,7 +1266,7 @@ wb_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc, goto unwind; } - ret = wb_process_queue (frame, file, 1); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { op_errno = ENOMEM; goto unwind; @@ -1236,7 +1314,7 @@ wb_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, wbflags = local->wbflags; if (op_ret != -1) { - file = wb_file_create (this, fd); + file = wb_file_create (this, fd, flags); if (file == NULL) { op_ret = -1; op_errno = ENOMEM; @@ -1307,7 +1385,11 @@ wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, wb_conf_t *conf = this->private; if (op_ret != -1) { - file = wb_file_create (this, fd); + if (frame->local) { + flags = (long) frame->local; + } + + file = wb_file_create (this, fd, flags); if (file == NULL) { op_ret = -1; op_errno = ENOMEM; @@ -1316,7 +1398,6 @@ wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, /* If O_DIRECT then, we disable chaching */ if (frame->local) { - flags = (long)frame->local; if (((flags & O_DIRECT) == O_DIRECT) || ((flags & O_ACCMODE) == O_RDONLY) || (((flags & O_SYNC) == O_SYNC) @@ -1331,8 +1412,8 @@ wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, frame->local = NULL; out: - STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, buf, preparent, - postparent); + STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, buf, + preparent, postparent); return 0; } @@ -1351,14 +1432,22 @@ wb_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, return 0; } - +/* Mark all the contiguous write requests for winding starting from head of + * request list. Stops marking at the first non-write request found. If + * file is opened with O_APPEND, make sure all the writes marked for winding + * will fit into a single write call to server. + */ size_t __wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds) { - wb_request_t *request = NULL; - size_t size = 0; - char first_request = 1; + wb_request_t *request = NULL; + size_t size = 0; + char first_request = 1; off_t offset_expected = 0; + wb_conf_t *conf = NULL; + int count = 0; + + conf = file->this->private; list_for_each_entry (request, list, list) { @@ -1377,9 +1466,18 @@ __wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds) break; } + if ((file->flags & O_APPEND) + && (((size + request->write_size) + > conf->aggregate_size) + || ((count + request->stub->args.writev.count) + > MAX_VECTOR_COUNT))) { + break; + } + size += request->write_size; offset_expected += request->write_size; file->aggregate_current -= request->write_size; + count += request->stub->args.writev.count; request->flags.write_request.stack_wound = 1; list_add_tail (&request->winds, winds); @@ -1392,7 +1490,8 @@ __wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds) void __wb_can_wind (list_head_t *list, char *other_fop_in_queue, - char *non_contiguous_writes, char *incomplete_writes) + char *non_contiguous_writes, char *incomplete_writes, + char *wind_all) { wb_request_t *request = NULL; char first_request = 1; @@ -1418,7 +1517,11 @@ __wb_can_wind (list_head_t *list, char *other_fop_in_queue, if (!request->flags.write_request.stack_wound) { if (first_request) { first_request = 0; - offset_expected = request->stub->args.writev.off; + offset_expected + = request->stub->args.writev.off; + if (wind_all != NULL) { + *wind_all = request->flags.write_request.flush_all; + } } if (offset_expected != request->stub->args.writev.off) { @@ -1438,7 +1541,7 @@ __wb_can_wind (list_head_t *list, char *other_fop_in_queue, ssize_t __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf, - char wind_all, char enable_trickling_writes) + char enable_trickling_writes) { size_t size = 0; char other_fop_in_queue = 0; @@ -1446,6 +1549,7 @@ __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf, char non_contiguous_writes = 0; wb_request_t *request = NULL; wb_file_t *file = NULL; + char wind_all = 0; if (list_empty (list)) { goto out; @@ -1454,17 +1558,16 @@ __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf, request = list_entry (list->next, typeof (*request), list); file = request->file; - if (!wind_all && (file->aggregate_current < aggregate_conf)) { - __wb_can_wind (list, &other_fop_in_queue, - &non_contiguous_writes, &incomplete_writes); - } + __wb_can_wind (list, &other_fop_in_queue, + &non_contiguous_writes, &incomplete_writes, &wind_all); - if ((enable_trickling_writes && !incomplete_writes) - || (wind_all) || (non_contiguous_writes) - || (other_fop_in_queue) - || (file->aggregate_current >= aggregate_conf)) { + if (!incomplete_writes && ((enable_trickling_writes) + || (wind_all) || (non_contiguous_writes) + || (other_fop_in_queue) + || (file->aggregate_current + >= aggregate_conf))) { size = __wb_mark_wind_all (file, list, winds); - } + } out: return size; @@ -1565,6 +1668,7 @@ wb_stack_unwind (list_head_t *unwinds) wb_request_t *request = NULL, *dummy = NULL; call_frame_t *frame = NULL; wb_local_t *local = NULL; + int ret = 0, write_requests_removed = 0; list_for_each_entry_safe (request, dummy, unwinds, unwinds) { @@ -1574,10 +1678,13 @@ wb_stack_unwind (list_head_t *unwinds) STACK_UNWIND (frame, local->op_ret, local->op_errno, &buf, &buf); - wb_request_unref (request); + ret = wb_request_unref (request); + if (ret == 0) { + write_requests_removed++; + } } - return 0; + return write_requests_removed; } @@ -1615,7 +1722,7 @@ wb_resume_other_requests (call_frame_t *frame, wb_file_t *file, } if (fops_removed > 0) { - ret = wb_process_queue (frame, file, 0); + ret = wb_process_queue (frame, file); } out: @@ -1627,19 +1734,27 @@ int32_t wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds, list_head_t *unwinds, list_head_t *other_requests) { - int32_t ret = -1; + int32_t ret = -1, write_requests_removed = 0; ret = wb_stack_unwind (unwinds); - if (ret == -1) { - goto out; - } + + write_requests_removed = ret; ret = wb_sync (frame, file, winds); if (ret == -1) { goto out; } - ret = wb_resume_other_requests (frame, file, other_requests); + wb_resume_other_requests (frame, file, other_requests); + + /* wb_stack_unwind does wb_request_unref after unwinding a write + * request. Hence if a write-request was just freed in wb_stack_unwind, + * we have to process request queue once again to unblock requests + * blocked on the writes just unwound. + */ + if (write_requests_removed > 0) { + ret = wb_process_queue (frame, file); + } out: return ret; @@ -1763,7 +1878,7 @@ __wb_collapse_write_bufs (list_head_t *requests, size_t page_size) int32_t -wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all) +wb_process_queue (call_frame_t *frame, wb_file_t *file) { list_head_t winds, unwinds, other_requests; size_t size = 0; @@ -1800,7 +1915,6 @@ wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all) if (count == 0) { __wb_mark_winds (&file->request, &winds, size, - flush_all, conf->enable_trickling_writes); } @@ -1927,7 +2041,7 @@ wb_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, goto unwind; } - ret = wb_process_queue (process_frame, file, 0); + ret = wb_process_queue (process_frame, file); if ((ret == -1) && (errno == ENOMEM)) { op_errno = ENOMEM; goto unwind; @@ -1969,7 +2083,7 @@ wb_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, if ((request != NULL) && (file != NULL)) { wb_request_unref (request); - ret = wb_process_queue (frame, file, 0); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { op_ret = -1; op_errno = ENOMEM; @@ -2048,7 +2162,7 @@ wb_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, return 0; } - ret = wb_process_queue (frame, file, 1); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { STACK_UNWIND_STRICT (readv, frame, -1, ENOMEM, NULL, 0, NULL, NULL); @@ -2082,67 +2196,102 @@ wb_ffr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno) { wb_local_t *local = NULL; - wb_file_t *file = NULL; - wb_conf_t *conf = NULL; - char unwind = 0; - int32_t ret = -1; - int disabled = 0; - int64_t disable_till = 0; + wb_file_t *file = NULL; + wb_conf_t *conf = NULL; conf = this->private; local = frame->local; + file = local->file; - if ((local != NULL) && (local->file != NULL)) { - file = local->file; - + if (file != NULL) { LOCK (&file->lock); { - disabled = file->disabled; - disable_till = file->disable_till; + if (file->op_ret == -1) { + op_ret = file->op_ret; + op_errno = file->op_errno; + + file->op_ret = 0; + } } UNLOCK (&file->lock); + } + + STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno); - if (conf->flush_behind - && (!disabled) && (disable_till == 0)) { - unwind = 1; - } else { - local->reply_count++; - /* - * without flush-behind, unwind should wait for replies - * of writes queued before and the flush - */ - if (local->reply_count == 2) { - unwind = 1; - } + return 0; +} + + +int32_t +wb_flush_helper (call_frame_t *frame, xlator_t *this, fd_t *fd) +{ + wb_conf_t *conf = NULL; + wb_local_t *local = NULL; + wb_file_t *file = NULL; + call_frame_t *flush_frame = NULL, *process_frame = NULL; + int32_t op_ret = -1, op_errno = -1, ret = -1; + + conf = this->private; + + local = frame->local; + file = local->file; + + LOCK (&file->lock); + { + op_ret = file->op_ret; + op_errno = file->op_errno; + } + UNLOCK (&file->lock); + + if (local && local->request) { + process_frame = copy_frame (frame); + if (process_frame == NULL) { + gf_log (this->name, GF_LOG_ERROR, "out of memory"); + goto unwind; + } + + wb_request_unref (local->request); + } + + if (conf->flush_behind) { + flush_frame = copy_frame (frame); + if (flush_frame == NULL) { + gf_log (this->name, GF_LOG_ERROR, "out of memory"); + goto unwind; } + + STACK_WIND (flush_frame, + wb_ffr_bg_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->flush, + fd); } else { - unwind = 1; + STACK_WIND (frame, + wb_ffr_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->flush, + fd); } - if (unwind) { - if (file != NULL) { - LOCK (&file->lock); - { - if (file->op_ret == -1) { - op_ret = file->op_ret; - op_errno = file->op_errno; + if (process_frame != NULL) { + ret = wb_process_queue (process_frame, file); + if ((ret == -1) && (errno == ENOMEM)) { + STACK_DESTROY (process_frame->root); + goto unwind; + } - file->op_ret = 0; - } - } - UNLOCK (&file->lock); + STACK_DESTROY (process_frame->root); + } - ret = wb_process_queue (frame, file, 0); - if ((ret == -1) && (errno == ENOMEM)) { - op_ret = -1; - op_errno = ENOMEM; - } - } - + if (conf->flush_behind) { STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno); } return 0; + +unwind: + STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); + return 0; } @@ -2154,12 +2303,9 @@ wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd) wb_local_t *local = NULL; uint64_t tmp_file = 0; call_stub_t *stub = NULL; - call_frame_t *process_frame = NULL; - wb_local_t *tmp_local = NULL; + call_frame_t *flush_frame = NULL; wb_request_t *request = NULL; int32_t ret = 0; - int disabled = 0; - int64_t disable_till = 0; conf = this->private; @@ -2176,97 +2322,57 @@ wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd) file = (wb_file_t *)(long)tmp_file; if (file != NULL) { - local = GF_CALLOC (1, sizeof (*local), - gf_wb_mt_wb_local_t); + local = GF_CALLOC (1, sizeof (*local), gf_wb_mt_wb_local_t); if (local == NULL) { - STACK_UNWIND (frame, -1, ENOMEM, NULL); + STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); return 0; } local->file = file; frame->local = local; - stub = fop_flush_cbk_stub (frame, wb_ffr_cbk, 0, 0); - if (stub == NULL) { - STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); - return 0; - } - process_frame = copy_frame (frame); - if (process_frame == NULL) { + stub = fop_flush_stub (frame, wb_flush_helper, fd); + if (stub == NULL) { STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); - call_stub_destroy (stub); return 0; } - LOCK (&file->lock); - { - disabled = file->disabled; - disable_till = file->disable_till; - } - UNLOCK (&file->lock); - - if (conf->flush_behind - && (!disabled) && (disable_till == 0)) { - tmp_local = GF_CALLOC (1, sizeof (*local), - gf_wb_mt_wb_local_t); - if (tmp_local == NULL) { - STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); - - STACK_DESTROY (process_frame->root); - call_stub_destroy (stub); - return 0; - } - tmp_local->file = file; - - process_frame->local = tmp_local; - } - - fd_ref (fd); - request = wb_enqueue (file, stub); if (request == NULL) { STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); - - fd_unref (fd); call_stub_destroy (stub); - STACK_DESTROY (process_frame->root); return 0; } - ret = wb_process_queue (process_frame, file, 1); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); - - fd_unref (fd); call_stub_destroy (stub); - STACK_DESTROY (process_frame->root); return 0; } - } - - if ((file != NULL) && conf->flush_behind - && (!disabled) && (disable_till == 0)) { - STACK_WIND (process_frame, - wb_ffr_bg_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->flush, - fd); } else { - STACK_WIND (frame, - wb_ffr_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->flush, - fd); + if (conf->flush_behind) { + flush_frame = copy_frame (frame); + if (flush_frame == NULL) { + STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); + return 0; + } - if (process_frame != NULL) { - STACK_DESTROY (process_frame->root); - } - } + STACK_UNWIND_STRICT (flush, frame, 0, 0); - - if (file != NULL) { - fd_unref (fd); + STACK_WIND (flush_frame, + wb_ffr_bg_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->flush, + fd); + } else { + STACK_WIND (frame, + wb_ffr_cbk, + FIRST_CHILD(this), + FIRST_CHILD(this)->fops->flush, + fd); + } } return 0; @@ -2300,7 +2406,7 @@ wb_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, if (request) { wb_request_unref (request); - ret = wb_process_queue (frame, file, 0); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { op_ret = -1; op_errno = ENOMEM; @@ -2377,7 +2483,7 @@ wb_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync) return 0; } - ret = wb_process_queue (frame, file, 1); + ret = wb_process_queue (frame, file); if ((ret == -1) && (errno == ENOMEM)) { STACK_UNWIND_STRICT (fsync, frame, -1, ENOMEM, NULL, NULL); |