diff options
| author | Raghavendra G <raghavendra@gluster.com> | 2010-06-18 02:18:02 +0000 | 
|---|---|---|
| committer | Anand V. Avati <avati@dev.gluster.com> | 2010-07-02 04:44:46 -0700 | 
| commit | 3dc79ca8e6119f5ff61058cc87f9a4fc251017ef (patch) | |
| tree | bc19de662216017cc8d8d5ab5d06d5972de812c4 | |
| parent | 01923eed1115e53c5be9fba3e72f75c7c631bf95 (diff) | |
performance/write-behind: explicitly enforce ordering of overlapping writes.
- If there are non-contiguous offsets (offsets which do not start where
    previous write ended), wait for completion of previous writes to server,
    before sending new ones.
  - Send flush call to server only when all writes are completed.
  - If a file is opened with O_APPEND, at any point of time a maximum only one
    write call to server should be in transit. This is to avoid reordering of
    writes in the presence of afr which can result in data corruption.
    See bug #934 for more details.
Signed-off-by: Raghavendra G <raghavendra@gluster.com>
Signed-off-by: Anand V. Avati <avati@dev.gluster.com>
BUG: 970 (extracting kernel tarball hangs midway.)
URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=970
| -rw-r--r-- | xlators/performance/write-behind/src/write-behind.c | 452 | 
1 files changed, 279 insertions, 173 deletions
diff --git a/xlators/performance/write-behind/src/write-behind.c b/xlators/performance/write-behind/src/write-behind.c index bd66a7ad5..a71d3a378 100644 --- a/xlators/performance/write-behind/src/write-behind.c +++ b/xlators/performance/write-behind/src/write-behind.c @@ -52,6 +52,7 @@ typedef struct wb_file {          uint64_t     disable_till;          size_t       window_conf;          size_t       window_current; +        int32_t      flags;          size_t       aggregate_current;          int32_t      refcount;          int32_t      op_ret; @@ -79,6 +80,14 @@ typedef struct wb_request {                          char stack_wound;                          char got_reply;                          char virgin; +                        char flush_all;     /* while trying to sync to back-end, +                                             * don't wait till a data of size +                                             * equal to configured aggregate-size +                                             * is accumulated, instead sync +                                             * whatever data currently present in +                                             * request queue. +                                             */ +                                                              }write_request;                  struct { @@ -116,26 +125,28 @@ typedef struct wb_page wb_page_t;  int32_t  -wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all); +wb_process_queue (call_frame_t *frame, wb_file_t *file);  ssize_t  wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds);  ssize_t  __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_size, -                 char wind_all, char enable_trickling_writes); +                 char enable_trickling_writes); -static void +static int  __wb_request_unref (wb_request_t *this)  { +        int ret = -1; +          if (this->refcount <= 0) {                  gf_log ("wb-request", GF_LOG_DEBUG,                          "refcount(%d) is <= 0", this->refcount); -                return; +                goto out;          } -        this->refcount--; +        ret = --this->refcount;          if (this->refcount == 0) {                  list_del_init (&this->list);                  if (this->stub && this->stub->fop == GF_FOP_WRITE) { @@ -144,25 +155,33 @@ __wb_request_unref (wb_request_t *this)                  GF_FREE (this);          } + +out: +        return ret;  } -static void +static int  wb_request_unref (wb_request_t *this)  {          wb_file_t *file = NULL; +        int        ret  = 0; +          if (this == NULL) {                  gf_log ("wb-request", GF_LOG_DEBUG,                          "request is NULL"); -                return; +                goto out;          }          file = this->file;          LOCK (&file->lock);          { -                __wb_request_unref (this); +                ret = __wb_request_unref (this);          }          UNLOCK (&file->lock); + +out: +        return ret;  } @@ -204,11 +223,11 @@ wb_request_ref (wb_request_t *this)  wb_request_t *  wb_enqueue (wb_file_t *file, call_stub_t *stub)  { -        wb_request_t *request = NULL; -        call_frame_t *frame = NULL; -        wb_local_t   *local = NULL; -        struct iovec *vector = NULL; -        int32_t       count = 0; +        wb_request_t *request = NULL, *tmp = NULL; +        call_frame_t *frame   = NULL; +        wb_local_t   *local   = NULL; +        struct iovec *vector  = NULL; +        int32_t       count   = 0;          request = GF_CALLOC (1, sizeof (*request), gf_wb_mt_wb_request_t);          if (request == NULL) { @@ -254,6 +273,13 @@ wb_enqueue (wb_file_t *file, call_stub_t *stub)                          file->aggregate_current += request->write_size;                  } else { +                        list_for_each_entry (tmp, &file->request, list) { +                                if (tmp->stub && tmp->stub->fop +                                    == GF_FOP_WRITE) { +                                        tmp->flags.write_request.flush_all = 1; +                                } +                        } +                          /*reference for resuming */                          __wb_request_ref (request);                  } @@ -266,7 +292,7 @@ out:  wb_file_t * -wb_file_create (xlator_t *this, fd_t *fd) +wb_file_create (xlator_t *this, fd_t *fd, int32_t flags)  {          wb_file_t *file = NULL;          wb_conf_t *conf = this->private;  @@ -288,6 +314,7 @@ wb_file_create (xlator_t *this, fd_t *fd)          file->this = this;          file->refcount = 1;          file->window_conf = conf->window_size; +        file->flags = flags;          fd_ctx_set (fd, this, (uint64_t)(long)file); @@ -359,7 +386,7 @@ wb_sync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,          }          UNLOCK (&file->lock); -        ret = wb_process_queue (frame, file, 0);   +        ret = wb_process_queue (frame, file);            if ((ret == -1) && (errno == ENOMEM)) {                  LOCK (&file->lock);                  { @@ -393,6 +420,12 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)          size_t          bytecount = 0;          wb_conf_t      *conf = NULL;          fd_t           *fd   = NULL; +        int32_t         op_errno = -1; + +        if (frame == NULL) { +                op_errno = EINVAL; +                goto out; +        }          conf = file->this->private;          list_for_each_entry (request, winds, winds) { @@ -403,6 +436,8 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)          }          if (total_count == 0) { +                gf_log (file->this->name, GF_LOG_DEBUG, "no vectors are to be" +                        "synced");                  goto out;          } @@ -412,12 +447,18 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)                                              gf_wb_mt_iovec);                          if (vector == NULL) {                                  bytes = -1; +                                op_errno = ENOMEM; +                                gf_log (file->this->name, GF_LOG_ERROR, +                                        "out of memory");                                  goto out;                          }                          iobref = iobref_new ();                          if (iobref == NULL) {                                  bytes = -1; +                                op_errno = ENOMEM; +                                gf_log (file->this->name, GF_LOG_ERROR, +                                        "out of memory");                                  goto out;                          } @@ -425,6 +466,9 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)                                             gf_wb_mt_wb_local_t);                          if (local == NULL) {                                  bytes = -1; +                                op_errno = ENOMEM; +                                gf_log (file->this->name, GF_LOG_ERROR, +                                        "out of memory");                                  goto out;                          } @@ -466,6 +510,9 @@ wb_sync (call_frame_t *frame, wb_file_t *file, list_head_t *winds)                          sync_frame = copy_frame (frame);                            if (sync_frame == NULL) {                                  bytes = -1; +                                op_errno = ENOMEM; +                                gf_log (file->this->name, GF_LOG_ERROR, +                                        "out of memory");                                  goto out;                          } @@ -508,6 +555,14 @@ out:          }          if (local != NULL) { +                /* had we winded these requests, we would have unrefed +                 * in wb_sync_cbk. +                 */ +                list_for_each_entry_safe (request, dummy, &local->winds, +                                          winds) { +                        wb_request_unref (request); +                } +                  GF_FREE (local);          } @@ -519,6 +574,27 @@ out:                  GF_FREE (vector);          } +        if (bytes == -1) { +                /* +                 * had we winded these requests, we would have unrefed +                 * in wb_sync_cbk. +                 */ + +                list_for_each_entry_safe (request, dummy, &local->winds, +                                          winds) { +                        wb_request_unref (request); +                } + +                if (file != NULL) { +                        LOCK (&file->lock); +                        { +                                file->op_ret = -1; +                                file->op_errno = op_errno; +                        } +                        UNLOCK (&file->lock); +                } +        } +          return bytes;  } @@ -553,7 +629,7 @@ wb_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,          }          if (process_frame != NULL) { -                ret = wb_process_queue (process_frame, file, 0); +                ret = wb_process_queue (process_frame, file);                  if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) {                          LOCK (&file->lock);                          { @@ -639,7 +715,7 @@ wb_stat (call_frame_t *frame, xlator_t *this, loc_t *loc)                          goto unwind;                  } -                ret = wb_process_queue (frame, file, 1); +                ret = wb_process_queue (frame, file);                  if ((ret == -1) && (errno == ENOMEM)) {                          op_errno = ENOMEM;                          goto unwind; @@ -683,7 +759,7 @@ wb_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,          request = local->request;          if ((file != NULL) && (request != NULL)) {                  wb_request_unref (request); -                ret = wb_process_queue (frame, file, 0); +                ret = wb_process_queue (frame, file);                  if ((ret == -1) && (errno == ENOMEM)) {                          op_ret = -1;                          op_errno = ENOMEM; @@ -757,7 +833,7 @@ wb_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd)                  /*                    FIXME:should the request queue be emptied in case of error?                  */ -                ret = wb_process_queue (frame, file, 1); +                ret = wb_process_queue (frame, file);                  if ((ret == -1) && (errno == ENOMEM)) {                          op_errno = ENOMEM;                          goto unwind; @@ -813,7 +889,7 @@ wb_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          }          if (process_frame != NULL) { -                ret = wb_process_queue (process_frame, file, 0); +                ret = wb_process_queue (process_frame, file);                  if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) {                          LOCK (&file->lock);                          { @@ -906,7 +982,7 @@ wb_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset)                          goto unwind;                  } -                ret = wb_process_queue (frame, file, 1); +                ret = wb_process_queue (frame, file);                  if ((ret == -1) && (errno == ENOMEM)) {                          op_errno = ENOMEM;                          goto unwind; @@ -949,7 +1025,7 @@ wb_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          if ((request != NULL) && (file != NULL)) {                  wb_request_unref (request); -                ret = wb_process_queue (frame, file, 0); +                ret = wb_process_queue (frame, file);                  if ((ret == -1) && (errno == ENOMEM)) {                          op_ret = -1;                          op_errno = ENOMEM; @@ -1026,7 +1102,7 @@ wb_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset)                          goto unwind;                  } -                ret = wb_process_queue (frame, file, 1); +                ret = wb_process_queue (frame, file);                  if ((ret == -1) && (errno == ENOMEM)) {                          op_errno = ENOMEM;                          goto unwind; @@ -1055,7 +1131,8 @@ unwind:  int32_t   wb_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -                int32_t op_ret, int32_t op_errno, struct iatt *statpre, struct iatt *statpost) +                int32_t op_ret, int32_t op_errno, struct iatt *statpre, +                struct iatt *statpost)  {          wb_local_t   *local = NULL;                 wb_request_t *request = NULL; @@ -1076,14 +1153,15 @@ wb_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  }          } -        STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, statpre, statpost); +        STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, statpre, +                             statpost);          if (request) {                  wb_request_unref (request);          }          if (request && (process_frame != NULL)) { -                ret = wb_process_queue (process_frame, file, 0); +                ret = wb_process_queue (process_frame, file);                  if ((ret == -1) && (errno == ENOMEM) && (file != NULL)) {                          LOCK (&file->lock);                          { @@ -1188,7 +1266,7 @@ wb_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,                          goto unwind;                  } -                ret = wb_process_queue (frame, file, 1); +                ret = wb_process_queue (frame, file);                  if ((ret == -1) && (errno == ENOMEM)) {                          op_errno = ENOMEM;                          goto unwind; @@ -1236,7 +1314,7 @@ wb_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,          wbflags = local->wbflags;          if (op_ret != -1) { -                file = wb_file_create (this, fd); +                file = wb_file_create (this, fd, flags);                  if (file == NULL) {                          op_ret = -1;                          op_errno = ENOMEM; @@ -1307,7 +1385,11 @@ wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          wb_conf_t *conf = this->private;          if (op_ret != -1) { -                file = wb_file_create (this, fd); +                if (frame->local) { +                        flags = (long) frame->local; +                } + +                file = wb_file_create (this, fd, flags);                  if (file == NULL) {                          op_ret = -1;                          op_errno = ENOMEM; @@ -1316,7 +1398,6 @@ wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                  /* If O_DIRECT then, we disable chaching */                  if (frame->local) { -                        flags = (long)frame->local;                          if (((flags & O_DIRECT) == O_DIRECT)                              || ((flags & O_ACCMODE) == O_RDONLY)                              || (((flags & O_SYNC) == O_SYNC) @@ -1331,8 +1412,8 @@ wb_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          frame->local = NULL;  out:         -        STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, buf, preparent, -                      postparent); +        STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, buf, +                             preparent, postparent);          return 0;  } @@ -1351,14 +1432,22 @@ wb_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,          return 0;  } - +/* Mark all the contiguous write requests for winding starting from head of + * request list. Stops marking at the first non-write request found. If + * file is opened with O_APPEND, make sure all the writes marked for winding + * will fit into a single write call to server. + */  size_t  __wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds)  { -        wb_request_t *request = NULL; -        size_t        size = 0; -        char          first_request = 1; +        wb_request_t *request         = NULL; +        size_t        size            = 0; +        char          first_request   = 1;          off_t         offset_expected = 0; +        wb_conf_t    *conf            = NULL; +        int           count           = 0; + +        conf = file->this->private;          list_for_each_entry (request, list, list)          { @@ -1377,9 +1466,18 @@ __wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds)                                  break;                          } +                        if ((file->flags & O_APPEND) +                            && (((size + request->write_size) +                                 > conf->aggregate_size) +                                || ((count + request->stub->args.writev.count) +                                    > MAX_VECTOR_COUNT))) { +                                break; +                        } +                          size += request->write_size;                          offset_expected += request->write_size;                          file->aggregate_current -= request->write_size; +                        count += request->stub->args.writev.count;                          request->flags.write_request.stack_wound = 1;                          list_add_tail (&request->winds, winds); @@ -1392,7 +1490,8 @@ __wb_mark_wind_all (wb_file_t *file, list_head_t *list, list_head_t *winds)  void  __wb_can_wind (list_head_t *list, char *other_fop_in_queue, -               char *non_contiguous_writes, char *incomplete_writes) +               char *non_contiguous_writes, char *incomplete_writes, +               char *wind_all)  {          wb_request_t *request = NULL;          char          first_request = 1; @@ -1418,7 +1517,11 @@ __wb_can_wind (list_head_t *list, char *other_fop_in_queue,                  if (!request->flags.write_request.stack_wound) {                          if (first_request) {                                  first_request = 0; -                                offset_expected = request->stub->args.writev.off; +                                offset_expected +                                        = request->stub->args.writev.off; +                                if (wind_all != NULL) { +                                        *wind_all = request->flags.write_request.flush_all; +                                }                          }                           if (offset_expected != request->stub->args.writev.off) { @@ -1438,7 +1541,7 @@ __wb_can_wind (list_head_t *list, char *other_fop_in_queue,  ssize_t  __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf, -                 char wind_all, char enable_trickling_writes) +                 char enable_trickling_writes)  {          size_t        size                   = 0;          char          other_fop_in_queue     = 0; @@ -1446,6 +1549,7 @@ __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf,          char          non_contiguous_writes  = 0;          wb_request_t *request                = NULL;          wb_file_t    *file                   = NULL; +        char          wind_all               = 0;          if (list_empty (list)) {                  goto out; @@ -1454,17 +1558,16 @@ __wb_mark_winds (list_head_t *list, list_head_t *winds, size_t aggregate_conf,          request = list_entry (list->next, typeof (*request), list);          file = request->file; -        if (!wind_all && (file->aggregate_current < aggregate_conf)) { -                __wb_can_wind (list, &other_fop_in_queue, -                               &non_contiguous_writes, &incomplete_writes); -        } +        __wb_can_wind (list, &other_fop_in_queue, +                       &non_contiguous_writes, &incomplete_writes, &wind_all); -        if ((enable_trickling_writes && !incomplete_writes) -            || (wind_all) || (non_contiguous_writes) -            || (other_fop_in_queue) -            || (file->aggregate_current >= aggregate_conf)) { +        if (!incomplete_writes && ((enable_trickling_writes) +                                   || (wind_all) || (non_contiguous_writes) +                                   || (other_fop_in_queue) +                                   || (file->aggregate_current +                                       >= aggregate_conf))) {                  size = __wb_mark_wind_all (file, list, winds); -        }  +        }  out:          return size; @@ -1565,6 +1668,7 @@ wb_stack_unwind (list_head_t *unwinds)          wb_request_t *request = NULL, *dummy = NULL;          call_frame_t *frame = NULL;          wb_local_t   *local = NULL; +        int           ret   = 0, write_requests_removed = 0;          list_for_each_entry_safe (request, dummy, unwinds, unwinds)          { @@ -1574,10 +1678,13 @@ wb_stack_unwind (list_head_t *unwinds)                  STACK_UNWIND (frame, local->op_ret, local->op_errno, &buf,                                &buf); -                wb_request_unref (request); +                ret = wb_request_unref (request); +                if (ret == 0) { +                        write_requests_removed++; +                }          } -        return 0; +        return write_requests_removed;  } @@ -1615,7 +1722,7 @@ wb_resume_other_requests (call_frame_t *frame, wb_file_t *file,          }          if (fops_removed > 0) { -                ret = wb_process_queue (frame, file, 0); +                ret = wb_process_queue (frame, file);          }  out: @@ -1627,19 +1734,27 @@ int32_t  wb_do_ops (call_frame_t *frame, wb_file_t *file, list_head_t *winds,             list_head_t *unwinds, list_head_t *other_requests)  { -        int32_t ret = -1; +        int32_t ret = -1, write_requests_removed = 0;          ret = wb_stack_unwind (unwinds); -        if (ret == -1) { -                goto out; -        } + +        write_requests_removed = ret;          ret = wb_sync (frame, file, winds);          if (ret == -1) {                  goto out;          } -        ret = wb_resume_other_requests (frame, file, other_requests); +        wb_resume_other_requests (frame, file, other_requests); + +        /* wb_stack_unwind does wb_request_unref after unwinding a write +         * request. Hence if a write-request was just freed in wb_stack_unwind, +         * we have to process request queue once again to unblock requests +         * blocked on the writes just unwound. +         */ +        if (write_requests_removed > 0) { +                ret = wb_process_queue (frame, file); +        }  out:          return ret; @@ -1763,7 +1878,7 @@ __wb_collapse_write_bufs (list_head_t *requests, size_t page_size)  int32_t  -wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all)  +wb_process_queue (call_frame_t *frame, wb_file_t *file)  {          list_head_t winds, unwinds, other_requests;          size_t      size = 0; @@ -1800,7 +1915,6 @@ wb_process_queue (call_frame_t *frame, wb_file_t *file, char flush_all)                  if (count == 0) {                          __wb_mark_winds (&file->request, &winds, size, -                                         flush_all,                                           conf->enable_trickling_writes);                  } @@ -1927,7 +2041,7 @@ wb_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector,                  goto unwind;          } -        ret = wb_process_queue (process_frame, file, 0); +        ret = wb_process_queue (process_frame, file);          if ((ret == -1) && (errno == ENOMEM)) {                  op_errno = ENOMEM;                  goto unwind; @@ -1969,7 +2083,7 @@ wb_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,          if ((request != NULL) && (file != NULL)) {                  wb_request_unref (request); -                ret = wb_process_queue (frame, file, 0); +                ret = wb_process_queue (frame, file);                  if ((ret == -1) && (errno == ENOMEM)) {                          op_ret = -1;                          op_errno = ENOMEM; @@ -2048,7 +2162,7 @@ wb_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,                          return 0;                  } -                ret = wb_process_queue (frame, file, 1); +                ret = wb_process_queue (frame, file);                  if ((ret == -1) && (errno == ENOMEM)) {                          STACK_UNWIND_STRICT (readv, frame, -1, ENOMEM,                                               NULL, 0, NULL, NULL); @@ -2082,67 +2196,102 @@ wb_ffr_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,              int32_t op_errno)  {          wb_local_t *local = NULL; -        wb_file_t  *file = NULL; -        wb_conf_t  *conf = NULL; -        char        unwind = 0; -        int32_t     ret = -1; -        int         disabled = 0; -        int64_t     disable_till = 0; +        wb_file_t  *file  = NULL; +        wb_conf_t  *conf  = NULL;          conf = this->private;          local = frame->local; +        file = local->file; -        if ((local != NULL) && (local->file != NULL)) { -                file = local->file; - +        if (file != NULL) {                  LOCK (&file->lock);                  { -                        disabled = file->disabled; -                        disable_till = file->disable_till; +                        if (file->op_ret == -1) { +                                op_ret = file->op_ret; +                                op_errno = file->op_errno; + +                                file->op_ret = 0; +                        }                  }                  UNLOCK (&file->lock); +        } +                 +        STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno); -                if (conf->flush_behind -                    && (!disabled) && (disable_till == 0)) { -                        unwind = 1; -                } else { -                        local->reply_count++; -                        /*  -                         * without flush-behind, unwind should wait for replies -                         * of writes queued before and the flush  -                         */ -                        if (local->reply_count == 2) { -                                unwind = 1; -                        } +        return 0; +} + + +int32_t +wb_flush_helper (call_frame_t *frame, xlator_t *this, fd_t *fd) +{ +        wb_conf_t    *conf        = NULL; +        wb_local_t   *local       = NULL; +        wb_file_t    *file        = NULL; +        call_frame_t *flush_frame = NULL, *process_frame = NULL; +        int32_t       op_ret      = -1, op_errno = -1, ret = -1; + +        conf = this->private; + +        local = frame->local; +        file = local->file; + +        LOCK (&file->lock); +        { +                op_ret = file->op_ret; +                op_errno = file->op_errno; +        } +        UNLOCK (&file->lock); + +        if (local && local->request) { +                process_frame = copy_frame (frame); +                if (process_frame == NULL) { +                        gf_log (this->name, GF_LOG_ERROR, "out of memory"); +                        goto unwind; +                } + +                wb_request_unref (local->request); +        } +         +        if (conf->flush_behind) { +                flush_frame = copy_frame (frame); +                if (flush_frame == NULL) { +                        gf_log (this->name, GF_LOG_ERROR, "out of memory"); +                        goto unwind;                  } + +                STACK_WIND (flush_frame, +                            wb_ffr_bg_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->flush, +                            fd);          } else { -                unwind = 1; +                STACK_WIND (frame, +                            wb_ffr_cbk, +                            FIRST_CHILD(this), +                            FIRST_CHILD(this)->fops->flush, +                            fd);          } -        if (unwind) { -                if (file != NULL) { -                        LOCK (&file->lock); -                        { -                                if (file->op_ret == -1) { -                                        op_ret = file->op_ret; -                                        op_errno = file->op_errno; +        if (process_frame != NULL) { +                ret = wb_process_queue (process_frame, file); +                if ((ret == -1) && (errno == ENOMEM)) { +                        STACK_DESTROY (process_frame->root); +                        goto unwind; +                } -                                        file->op_ret = 0; -                                } -                        } -                        UNLOCK (&file->lock); +                STACK_DESTROY (process_frame->root); +        } -                        ret = wb_process_queue (frame, file, 0); -                        if ((ret == -1) && (errno == ENOMEM)) { -                                op_ret = -1; -                                op_errno = ENOMEM; -                        } -                } -                 +        if (conf->flush_behind) {                  STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno);          }          return 0; + +unwind: +        STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); +        return 0;  } @@ -2154,12 +2303,9 @@ wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd)          wb_local_t   *local = NULL;  	uint64_t      tmp_file = 0;          call_stub_t  *stub = NULL; -        call_frame_t *process_frame = NULL; -        wb_local_t   *tmp_local = NULL; +        call_frame_t *flush_frame = NULL;          wb_request_t *request = NULL;          int32_t       ret = 0; -        int           disabled = 0; -        int64_t       disable_till = 0;          conf = this->private; @@ -2176,97 +2322,57 @@ wb_flush (call_frame_t *frame, xlator_t *this, fd_t *fd)  	file = (wb_file_t *)(long)tmp_file;          if (file != NULL) { -                local = GF_CALLOC (1, sizeof (*local), -                                   gf_wb_mt_wb_local_t); +                local = GF_CALLOC (1, sizeof (*local), gf_wb_mt_wb_local_t);                  if (local == NULL) { -                        STACK_UNWIND (frame, -1, ENOMEM, NULL); +                        STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM);                          return 0;                  }                  local->file = file;                  frame->local = local; -                stub = fop_flush_cbk_stub (frame, wb_ffr_cbk, 0, 0); -                if (stub == NULL) { -                        STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); -                        return 0; -                } -                process_frame = copy_frame (frame); -                if (process_frame == NULL) { +                stub = fop_flush_stub (frame, wb_flush_helper, fd); +                if (stub == NULL) {                          STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); -                        call_stub_destroy (stub);                          return 0;                  } -                LOCK (&file->lock); -                { -                        disabled = file->disabled; -                        disable_till = file->disable_till; -                } -                UNLOCK (&file->lock); -                 -                if (conf->flush_behind -                    && (!disabled) && (disable_till == 0)) { -                        tmp_local = GF_CALLOC (1, sizeof (*local), -                                               gf_wb_mt_wb_local_t); -                        if (tmp_local == NULL) { -                                STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); -                                  -                                STACK_DESTROY (process_frame->root); -                                call_stub_destroy (stub); -                                return 0; -                        } -                        tmp_local->file = file; - -                        process_frame->local = tmp_local; -                } - -                fd_ref (fd); -                  request = wb_enqueue (file, stub);                  if (request == NULL) {                          STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); - -                        fd_unref (fd);                          call_stub_destroy (stub); -                        STACK_DESTROY (process_frame->root);                          return 0;                  } -                ret = wb_process_queue (process_frame, file, 1);  +                ret = wb_process_queue (frame, file);                  if ((ret == -1) && (errno == ENOMEM)) {                          STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); - -                        fd_unref (fd);                          call_stub_destroy (stub); -                        STACK_DESTROY (process_frame->root);                          return 0;                  } -        } -                 -        if ((file != NULL) && conf->flush_behind -            && (!disabled) && (disable_till == 0)) { -                STACK_WIND (process_frame, -                            wb_ffr_bg_cbk, -                            FIRST_CHILD(this), -                            FIRST_CHILD(this)->fops->flush, -                            fd);          } else { -                STACK_WIND (frame, -                            wb_ffr_cbk, -                            FIRST_CHILD(this), -                            FIRST_CHILD(this)->fops->flush, -                            fd); +                if (conf->flush_behind) { +                        flush_frame = copy_frame (frame); +                        if (flush_frame == NULL) { +                                STACK_UNWIND_STRICT (flush, frame, -1, ENOMEM); +                                return 0; +                        } -                if (process_frame != NULL) { -                        STACK_DESTROY (process_frame->root); -                } -        } +                        STACK_UNWIND_STRICT (flush, frame, 0, 0); -          -        if (file != NULL) { -                fd_unref (fd); +                        STACK_WIND (flush_frame, +                                    wb_ffr_bg_cbk, +                                    FIRST_CHILD(this), +                                    FIRST_CHILD(this)->fops->flush, +                                    fd); +                } else { +                        STACK_WIND (frame, +                                    wb_ffr_cbk, +                                    FIRST_CHILD(this), +                                    FIRST_CHILD(this)->fops->flush, +                                    fd); +                }          }          return 0; @@ -2300,7 +2406,7 @@ wb_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,                  if (request) {                          wb_request_unref (request); -                        ret = wb_process_queue (frame, file, 0); +                        ret = wb_process_queue (frame, file);                          if ((ret == -1) && (errno == ENOMEM)) {                                  op_ret = -1;                                  op_errno = ENOMEM; @@ -2377,7 +2483,7 @@ wb_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync)                          return 0;                  } -                ret = wb_process_queue (frame, file, 1); +                ret = wb_process_queue (frame, file);                  if ((ret == -1) && (errno == ENOMEM)) {                          STACK_UNWIND_STRICT (fsync, frame, -1, ENOMEM,                                               NULL, NULL);  | 
