diff options
-rw-r--r-- | xlators/cluster/stripe/src/stripe-mem-types.h | 2 | ||||
-rw-r--r-- | xlators/cluster/stripe/src/stripe.c | 136 | ||||
-rw-r--r-- | xlators/cluster/stripe/src/stripe.h | 6 |
3 files changed, 106 insertions, 38 deletions
diff --git a/xlators/cluster/stripe/src/stripe-mem-types.h b/xlators/cluster/stripe/src/stripe-mem-types.h index c8781d7d..e05ba0c2 100644 --- a/xlators/cluster/stripe/src/stripe-mem-types.h +++ b/xlators/cluster/stripe/src/stripe-mem-types.h @@ -16,7 +16,7 @@ enum gf_stripe_mem_types_ { gf_stripe_mt_iovec = gf_common_mt_end + 1, - gf_stripe_mt_readv_replies, + gf_stripe_mt_stripe_replies, gf_stripe_mt_stripe_fd_ctx_t, gf_stripe_mt_char, gf_stripe_mt_int8_t, diff --git a/xlators/cluster/stripe/src/stripe.c b/xlators/cluster/stripe/src/stripe.c index e2176eea..6588a449 100644 --- a/xlators/cluster/stripe/src/stripe.c +++ b/xlators/cluster/stripe/src/stripe.c @@ -3482,8 +3482,8 @@ stripe_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, frame->local = local; /* This is where all the vectors should be copied. */ - local->replies = GF_CALLOC (num_stripe, sizeof (struct readv_replies), - gf_stripe_mt_readv_replies); + local->replies = GF_CALLOC (num_stripe, sizeof (struct stripe_replies), + gf_stripe_mt_stripe_replies); if (!local->replies) { op_errno = ENOMEM; goto err; @@ -3543,7 +3543,11 @@ stripe_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, { int32_t callcnt = 0; stripe_local_t *local = NULL; + stripe_local_t *mlocal = NULL; call_frame_t *prev = NULL; + call_frame_t *mframe = NULL; + struct stripe_replies *reply = NULL; + int32_t i = 0; if (!this || !frame || !frame->local || !cookie) { gf_log ("stripe", GF_LOG_DEBUG, "possible NULL deref"); @@ -3552,48 +3556,75 @@ stripe_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, prev = cookie; local = frame->local; + mframe = local->orig_frame; + mlocal = mframe->local; LOCK(&frame->lock); { - callcnt = ++local->call_count; + callcnt = ++mlocal->call_count; + + mlocal->replies[local->node_index].op_ret = op_ret; + mlocal->replies[local->node_index].op_errno = op_errno; - if (op_ret == -1) { - gf_log (this->name, GF_LOG_DEBUG, - "%s returned error %s", - prev->this->name, strerror (op_errno)); - local->op_errno = op_errno; - local->op_ret = -1; - } if (op_ret >= 0) { - local->op_ret += op_ret; - local->post_buf = *postbuf; - local->pre_buf = *prebuf; + mlocal->post_buf = *postbuf; + mlocal->pre_buf = *prebuf; - local->prebuf_blocks += prebuf->ia_blocks; - local->postbuf_blocks += postbuf->ia_blocks; + mlocal->prebuf_blocks += prebuf->ia_blocks; + mlocal->postbuf_blocks += postbuf->ia_blocks; - correct_file_size(prebuf, local->fctx, prev); - correct_file_size(postbuf, local->fctx, prev); + correct_file_size(prebuf, mlocal->fctx, prev); + correct_file_size(postbuf, mlocal->fctx, prev); - if (local->prebuf_size < prebuf->ia_size) - local->prebuf_size = prebuf->ia_size; - if (local->postbuf_size < postbuf->ia_size) - local->postbuf_size = postbuf->ia_size; + if (mlocal->prebuf_size < prebuf->ia_size) + mlocal->prebuf_size = prebuf->ia_size; + if (mlocal->postbuf_size < postbuf->ia_size) + mlocal->postbuf_size = postbuf->ia_size; } } UNLOCK (&frame->lock); - if ((callcnt == local->wind_count) && local->unwind) { - local->pre_buf.ia_size = local->prebuf_size; - local->pre_buf.ia_blocks = local->prebuf_blocks; - local->post_buf.ia_size = local->postbuf_size; - local->post_buf.ia_blocks = local->postbuf_blocks; + if ((callcnt == mlocal->wind_count) && mlocal->unwind) { + mlocal->pre_buf.ia_size = mlocal->prebuf_size; + mlocal->pre_buf.ia_blocks = mlocal->prebuf_blocks; + mlocal->post_buf.ia_size = mlocal->postbuf_size; + mlocal->post_buf.ia_blocks = mlocal->postbuf_blocks; + + /* + * Only return the number of consecutively written bytes up until + * the first error. Only return an error if it occurs first. + * + * When a short write occurs, the application should retry at the + * appropriate offset, at which point we'll potentially pass back + * the error. + */ + for (i = 0, reply = mlocal->replies; i < mlocal->wind_count; + i++, reply++) { + if (reply->op_ret == -1) { + gf_log(this->name, GF_LOG_DEBUG, "reply %d " + "returned error %s", i, + strerror(reply->op_errno)); + if (!mlocal->op_ret) { + mlocal->op_ret = -1; + mlocal->op_errno = reply->op_errno; + } + break; + } - STRIPE_STACK_UNWIND (writev, frame, local->op_ret, - local->op_errno, &local->pre_buf, - &local->post_buf, NULL); + mlocal->op_ret += reply->op_ret; + + if (reply->op_ret < reply->requested_size) + break; + } + + GF_FREE(mlocal->replies); + + STRIPE_STACK_UNWIND (writev, mframe, mlocal->op_ret, + mlocal->op_errno, &mlocal->pre_buf, + &mlocal->post_buf, NULL); } out: + STRIPE_STACK_DESTROY(frame); return 0; } @@ -3615,6 +3646,11 @@ stripe_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, uint64_t stripe_size = 0; uint64_t tmp_fctx = 0; off_t dest_offset = 0; + off_t rounded_start = 0; + off_t rounded_end = 0; + int32_t total_chunks = 0; + call_frame_t *wframe = NULL; + stripe_local_t *wlocal = NULL; VALIDATE_OR_GOTO (frame, err); VALIDATE_OR_GOTO (this, err); @@ -3653,7 +3689,27 @@ stripe_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, goto err; } + rounded_start = floor(offset, stripe_size); + rounded_end = roof(offset + total_size, stripe_size); + total_chunks = (rounded_end - rounded_start) / stripe_size; + local->replies = GF_CALLOC(total_chunks, sizeof(struct stripe_replies), + gf_stripe_mt_stripe_replies); + if (!local->replies) { + op_errno = ENOMEM; + goto err; + } + + total_chunks = 0; while (1) { + wframe = copy_frame(frame); + wlocal = mem_get0(this->local_pool); + if (!wlocal) { + op_errno = ENOMEM; + goto err; + } + wlocal->orig_frame = frame; + wframe->local = wlocal; + /* Send striped chunk of the vector to child nodes appropriately. */ idx = (((offset + offset_offset) / @@ -3681,25 +3737,37 @@ stripe_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, if (remaining_size == 0) local->unwind = 1; + /* + * Store off the request index (with respect to the chunk of the + * initial offset) and the size of the request. This is required + * in the callback to calculate an appropriate return value in + * the event of a write failure in one or more requests. + */ + wlocal->node_index = total_chunks; + local->replies[total_chunks].requested_size = fill_size; + + dest_offset = offset + offset_offset; if (fctx->stripe_coalesce) - dest_offset = coalesced_offset(offset + offset_offset, - local->stripe_size, fctx->stripe_count); - else - dest_offset = offset + offset_offset; + dest_offset = coalesced_offset(dest_offset, + local->stripe_size, fctx->stripe_count); - STACK_WIND (frame, stripe_writev_cbk, fctx->xl_array[idx], + STACK_WIND (wframe, stripe_writev_cbk, fctx->xl_array[idx], fctx->xl_array[idx]->fops->writev, fd, tmp_vec, tmp_count, dest_offset, flags, iobref, xdata); GF_FREE (tmp_vec); offset_offset += fill_size; + total_chunks++; if (remaining_size == 0) break; } return 0; err: + if (wframe) + STRIPE_STACK_DESTROY(wframe); + STRIPE_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL); return 0; } diff --git a/xlators/cluster/stripe/src/stripe.h b/xlators/cluster/stripe/src/stripe.h index 1b9e660c..a440f87b 100644 --- a/xlators/cluster/stripe/src/stripe.h +++ b/xlators/cluster/stripe/src/stripe.h @@ -106,9 +106,9 @@ struct stripe_private { }; /** - * Used to keep info about the replies received from fops->readv calls + * Used to keep info about the replies received from readv/writev calls */ -struct readv_replies { +struct stripe_replies { struct iovec *vector; int32_t count; //count of vector int32_t op_ret; //op_ret of readv @@ -156,7 +156,7 @@ struct stripe_local { blkcnt_t preparent_blocks; blkcnt_t postparent_blocks; - struct readv_replies *replies; + struct stripe_replies *replies; struct statvfs statvfs_buf; dir_entry_t *entry; |