summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/ec/src/ec-inode-write.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/ec/src/ec-inode-write.c')
-rw-r--r--xlators/cluster/ec/src/ec-inode-write.c183
1 files changed, 91 insertions, 92 deletions
diff --git a/xlators/cluster/ec/src/ec-inode-write.c b/xlators/cluster/ec/src/ec-inode-write.c
index 6aeda5a2481..88145d98c83 100644
--- a/xlators/cluster/ec/src/ec-inode-write.c
+++ b/xlators/cluster/ec/src/ec-inode-write.c
@@ -11,12 +11,13 @@
#include "xlator.h"
#include "defaults.h"
+#include "ec.h"
+#include "ec-messages.h"
#include "ec-helpers.h"
#include "ec-common.h"
#include "ec-combine.h"
#include "ec-method.h"
#include "ec-fops.h"
-#include "ec-messages.h"
int
ec_inode_write_cbk (call_frame_t *frame, xlator_t *this, void *cookie,
@@ -1285,27 +1286,78 @@ out:
return -1;
}
+static int32_t
+ec_writev_prepare_buffers(ec_t *ec, ec_fop_data_t *fop)
+{
+ struct iobref *iobref = NULL;
+ struct iovec *iov;
+ void *ptr;
+ int32_t err;
+
+ fop->user_size = iov_length(fop->vector, fop->int32);
+ fop->head = ec_adjust_offset(ec, &fop->offset, 0);
+ fop->size = ec_adjust_size(ec, fop->user_size + fop->head, 0);
+
+ if ((fop->int32 != 1) || (fop->head != 0) ||
+ (fop->size > fop->user_size) ||
+ !EC_ALIGN_CHECK(fop->vector[0].iov_base, EC_METHOD_WORD_SIZE)) {
+ err = ec_buffer_alloc(ec->xl, fop->size, &iobref, &ptr);
+ if (err != 0) {
+ goto out;
+ }
+
+ ec_iov_copy_to(ptr + fop->head, fop->vector, fop->int32, 0,
+ fop->user_size);
+
+ fop->vector[0].iov_base = ptr;
+ fop->vector[0].iov_len = fop->size;
+
+ iobref_unref(fop->buffers);
+ fop->buffers = iobref;
+ }
+
+ if (fop->int32 != 2) {
+ iov = GF_MALLOC(VECTORSIZE(2), gf_common_mt_iovec);
+ if (iov == NULL) {
+ err = -ENOMEM;
+
+ goto out;
+ }
+ iov[0].iov_base = fop->vector[0].iov_base;
+ iov[0].iov_len = fop->vector[0].iov_len;
+
+ GF_FREE(fop->vector);
+ fop->vector = iov;
+ }
+
+ fop->vector[1].iov_len = fop->size / ec->fragments;
+ err = ec_buffer_alloc(ec->xl, fop->vector[1].iov_len * ec->nodes,
+ &fop->buffers, &fop->vector[1].iov_base);
+ if (err != 0) {
+ goto out;
+ }
+
+ err = 0;
+
+out:
+ return err;
+}
+
void ec_writev_start(ec_fop_data_t *fop)
{
ec_t *ec = fop->xl->private;
- struct iobref *iobref = NULL;
- struct iobuf *iobuf = NULL;
- void *ptr = NULL;
ec_fd_t *ctx;
fd_t *fd;
- size_t tail;
- uint64_t current;
+ dict_t *xdata = NULL;
+ uint64_t tail, current;
int32_t err = -ENOMEM;
- dict_t *xdata = NULL;
/* This shouldn't fail because we have the inode locked. */
GF_ASSERT(ec_get_inode_size(fop, fop->fd->inode, &current));
fd = fd_anonymous(fop->fd->inode);
if (fd == NULL) {
- ec_fop_set_error(fop, ENOMEM);
-
- return;
+ goto failed;
}
fop->frame->root->uid = 0;
@@ -1318,38 +1370,15 @@ void ec_writev_start(ec_fop_data_t *fop)
}
}
- fop->user_size = iov_length(fop->vector, fop->int32);
- fop->head = ec_adjust_offset(ec, &fop->offset, 0);
- fop->size = ec_adjust_size(ec, fop->user_size + fop->head, 0);
-
- iobref = iobref_new();
- if (iobref == NULL) {
- goto out;
- }
- iobuf = iobuf_get2(fop->xl->ctx->iobuf_pool, fop->size);
- if (iobuf == NULL) {
- goto out;
- }
- err = iobref_add(iobref, iobuf);
+ err = ec_writev_prepare_buffers(ec, fop);
if (err != 0) {
- goto out;
+ goto failed_fd;
}
- ptr = iobuf->ptr + fop->head;
- ec_iov_copy_to(ptr, fop->vector, fop->int32, 0, fop->user_size);
-
- fop->vector[0].iov_base = iobuf->ptr;
- fop->vector[0].iov_len = fop->size;
-
- iobuf_unref(iobuf);
-
- iobref_unref(fop->buffers);
- fop->buffers = iobref;
-
if (fop->head > 0) {
if (ec_make_internal_fop_xdata (&xdata)) {
err = -ENOMEM;
- goto out;
+ goto failed_xdata;
}
ec_readv(fop->frame, fop->xl, -1, EC_MINIMUM_MIN, ec_writev_merge_head,
NULL, fd, ec->stripe_size, fop->offset, 0, xdata);
@@ -1359,7 +1388,7 @@ void ec_writev_start(ec_fop_data_t *fop)
if (current > fop->offset + fop->head + fop->user_size) {
if (ec_make_internal_fop_xdata (&xdata)) {
err = -ENOMEM;
- goto out;
+ goto failed_xdata;
}
ec_readv(fop->frame, fop->xl, -1, EC_MINIMUM_MIN,
ec_writev_merge_tail, NULL, fd, ec->stripe_size,
@@ -1369,24 +1398,15 @@ void ec_writev_start(ec_fop_data_t *fop)
}
}
- fd_unref(fd);
- if (xdata)
- dict_unref (xdata);
-
- return;
+ err = 0;
-out:
- if (iobuf != NULL) {
- iobuf_unref(iobuf);
+failed_xdata:
+ if (xdata) {
+ dict_unref(xdata);
}
- if (iobref != NULL) {
- iobref_unref(iobref);
- }
-
+failed_fd:
fd_unref(fd);
- if (xdata)
- dict_unref (xdata);
-
+failed:
ec_fop_set_error(fop, -err);
}
@@ -1411,55 +1431,32 @@ void ec_wind_writev(ec_t * ec, ec_fop_data_t * fop, int32_t idx)
ec_trace("WIND", fop, "idx=%d", idx);
struct iovec vector[1];
- struct iobref * iobref = NULL;
- struct iobuf * iobuf = NULL;
- ssize_t size = 0, bufsize = 0;
- int32_t err = -ENOMEM;
+ size_t size;
- iobref = iobref_new();
- if (iobref == NULL) {
- goto out;
- }
+ size = fop->vector[1].iov_len;
- size = fop->vector[0].iov_len;
- bufsize = size / ec->fragments;
-
- iobuf = iobuf_get2(fop->xl->ctx->iobuf_pool, bufsize);
- if (iobuf == NULL) {
- goto out;
- }
- err = iobref_add(iobref, iobuf);
- if (err != 0) {
- goto out;
- }
-
- ec_method_encode(size, ec->fragments, idx, fop->vector[0].iov_base,
- iobuf->ptr);
-
- vector[0].iov_base = iobuf->ptr;
- vector[0].iov_len = bufsize;
-
- iobuf_unref(iobuf);
+ vector[0].iov_base = fop->vector[1].iov_base + idx * size;
+ vector[0].iov_len = size;
STACK_WIND_COOKIE(fop->frame, ec_writev_cbk, (void *)(uintptr_t)idx,
ec->xl_list[idx], ec->xl_list[idx]->fops->writev,
fop->fd, vector, 1, fop->offset / ec->fragments,
- fop->uint32, iobref, fop->xdata);
-
- iobref_unref(iobref);
+ fop->uint32, fop->buffers, fop->xdata);
+}
- return;
+static void
+ec_writev_encode(ec_fop_data_t *fop)
+{
+ ec_t *ec = fop->xl->private;
+ void *blocks[ec->nodes];
+ uint32_t i;
-out:
- if (iobuf != NULL) {
- iobuf_unref(iobuf);
+ blocks[0] = fop->vector[1].iov_base;
+ for (i = 1; i < ec->nodes; i++) {
+ blocks[i] = blocks[i - 1] + fop->vector[1].iov_len;
}
- if (iobref != NULL) {
- iobref_unref(iobref);
- }
-
- ec_writev_cbk(fop->frame, (void *)(uintptr_t)idx, fop->xl, -1, -err, NULL,
- NULL, NULL);
+ ec_method_encode(&ec->matrix, fop->vector[0].iov_len,
+ fop->vector[0].iov_base, blocks);
}
int32_t ec_manager_writev(ec_fop_data_t *fop, int32_t state)
@@ -1488,6 +1485,8 @@ int32_t ec_manager_writev(ec_fop_data_t *fop, int32_t state)
fop->frame->root->uid = fop->uid;
fop->frame->root->gid = fop->gid;
+ ec_writev_encode(fop);
+
ec_dispatch_all(fop);
return EC_STATE_PREPARE_ANSWER;