summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKrutika Dhananjay <kdhananj@redhat.com>2015-04-25 18:50:46 +0530
committerPranith Kumar Karampuri <pkarampu@redhat.com>2015-05-05 18:44:23 -0700
commitebf068c4ca4cfe7abcf56a816de7561130ffabf7 (patch)
tree9ee581ee8727c5610489d15c0e2f4e281dc910bf
parentfa0ad231745846918b2625d0e1a89c0a5c3c24dc (diff)
features/shard: Implement readv() fop
Change-Id: I4cc060710482de8633141170dd35f669f01f639b BUG: 1207615 Signed-off-by: Krutika Dhananjay <kdhananj@redhat.com> Reviewed-on: http://review.gluster.org/10528 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
-rw-r--r--xlators/features/shard/src/shard.c897
-rw-r--r--xlators/features/shard/src/shard.h7
2 files changed, 635 insertions, 269 deletions
diff --git a/xlators/features/shard/src/shard.c b/xlators/features/shard/src/shard.c
index 4e78b49ee56..9fa2d3bf6b3 100644
--- a/xlators/features/shard/src/shard.c
+++ b/xlators/features/shard/src/shard.c
@@ -1517,180 +1517,103 @@ shard_open (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
return 0;
}
-/* - Incomplete - TBD */
-int
-shard_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
- off_t offset, uint32_t flags, dict_t *xdata)
+static int
+shard_init_dot_shard_loc (xlator_t *this, shard_local_t *local)
{
-/*
- int i = 0;
- int32_t op_errno = ENOMEM;
- uint64_t block_size = 0;
- int highest_block = 0;
- int num_blocks = 0;
- int cur_block = 0;
- char shard_abspath[PATH_MAX] = {0,};
- off_t cur_offset = 0;
- size_t total_size = 0;
- fd_t *cur_fd = NULL;
- inode_t *inode = NULL;
- shard_local_t *local = NULL;
-
- ret = shard_inode_ctx_get_block_size (fd->inode, this, &block_size);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "Failed to get inode ctx for "
- "%s", uuid_utoa(fd->inode->gfid));
- goto out;
- }
+ int ret = -1;
+ loc_t *dot_shard_loc = NULL;
- if (!block_size) {
- STACK_WIND (frame, default_readv_cbk, FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->readv, fd, size, offset,
- flags, xdata);
- return 0;
- }
-
- local = mem_get0 (this->local_pool);
if (!local)
- goto err;
-
- frame->local = local;
+ return -1;
- local->block_size = block_size;
- local->offset = offset;
- local->len = size;
- local->first_block = get_lowest_block (offset, block_size);
- highest_block = get_highest_block (offset, size, block_size);
- num_blocks = local->num_blocks = highest_block - local->first_block + 1;
-
- while (num_blocks--) {
- cur_fd = (local->first_block == 0) ? fd_ref (fd) :
- fd_anonymous (inode);
- cur_offset = (cur_block == local->first_block) ?
- get_shard_offset(offset, block_size):0;
- STACK_WIND_COOKIE (frame, shard_readv_cbk, cur_fd,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->readv, cur_fd,
- cur_size, cur_offset, flags, xdata);
+ dot_shard_loc = &local->dot_shard_loc;
+ dot_shard_loc->inode = inode_new (this->itable);
+ dot_shard_loc->parent = inode_ref (this->itable->root);
+ ret = inode_path (dot_shard_loc->parent, GF_SHARD_DIR,
+ (char **)&dot_shard_loc->path);
+ if (ret < 0 || !(dot_shard_loc->inode)) {
+ gf_log (this->name, GF_LOG_ERROR, "Inode path failed on %s",
+ GF_SHARD_DIR);
+ goto out;
}
- return 0;
-
-err:
- SHARD_STACK_UNWIND (readv, frame, -1, op_errno, NULL, 0, NULL, NULL,
- NULL);
-*/
- SHARD_STACK_UNWIND (readv, frame, -1, ENOTCONN, NULL, 0, NULL, NULL,
- NULL);
- return 0;
+ dot_shard_loc->name = strrchr (dot_shard_loc->path, '/');
+ if (dot_shard_loc->name)
+ dot_shard_loc->name++;
+ ret = 0;
+out:
+ return ret;
}
int
-shard_update_file_size_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, dict_t *xdata)
+shard_readv_do_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iovec *vector,
+ int32_t count, struct iatt *stbuf, struct iobref *iobref,
+ dict_t *xdata)
{
- shard_local_t *local = NULL;
+ int i = 0;
+ int call_count = 0;
+ void *address = NULL;
+ uint64_t block_num = 0;
+ off_t off = 0;
+ struct iovec vec = {0,};
+ shard_local_t *local = NULL;
+ fd_t *anon_fd = cookie;
local = frame->local;
if (op_ret < 0) {
local->op_ret = op_ret;
local->op_errno = op_errno;
- goto err;
+ goto out;
}
- SHARD_STACK_UNWIND (writev, frame, local->written_size, local->op_errno,
- &local->prebuf, &local->postbuf, local->xattr_rsp);
- return 0;
+ if (local->op_ret >= 0)
+ local->op_ret += op_ret;
-err:
- SHARD_STACK_UNWIND (writev, frame, -1, local->op_errno, NULL,
- NULL, NULL);
- return 0;
-}
+ fd_ctx_get (anon_fd, this, &block_num);
-int
-shard_update_file_size (call_frame_t *frame, xlator_t *this)
-{
- int ret = -1;
- uint64_t *size_attr = NULL;
- fd_t *fd = NULL;
- shard_local_t *local = NULL;
- dict_t *xattr_req = NULL;
-
- local = frame->local;
- fd = local->fd;
-
- xattr_req = dict_new ();
- if (!xattr_req)
- goto err;
-
- ret = shard_set_size_attrs (local->postbuf.ia_size + local->hole_size,
- local->postbuf.ia_blocks, &size_attr);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "Failed to set size attrs for"
- " %s", uuid_utoa (fd->inode->gfid));
- goto err;
- }
-
- ret = dict_set_bin (xattr_req, GF_XATTR_SHARD_FILE_SIZE, size_attr,
- 8 * 4);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "Failed to set key %s into "
- "dict. gfid=%s", GF_XATTR_SHARD_FILE_SIZE,
- uuid_utoa (fd->inode->gfid));
- GF_FREE (size_attr);
- goto err;
+ if (block_num == local->first_block) {
+ address = local->iobuf->ptr;
+ } else {
+ /* else
+ * address to start writing to = beginning of buffer +
+ * number of bytes until end of first block +
+ * + block_size times number of blocks
+ * between the current block and the first
+ */
+ address = (char *) local->iobuf->ptr + (local->block_size -
+ (local->offset % local->block_size)) +
+ ((block_num - local->first_block - 1) *
+ local->block_size);
}
- STACK_WIND (frame, shard_update_file_size_cbk, FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->fsetxattr, fd, xattr_req, 0, NULL);
-
- dict_unref (xattr_req);
- return 0;
-
-err:
- if (xattr_req)
- dict_unref (xattr_req);
- SHARD_STACK_UNWIND (writev, frame, -1, ENOMEM, NULL, NULL, NULL);
- return 0;
-
-}
-
-int
-shard_writev_do_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
- struct iatt *postbuf, dict_t *xdata)
-{
- int call_count = 0;
- fd_t *anon_fd = cookie;
- shard_local_t *local = NULL;
-
- local = frame->local;
-
- if (op_ret < 0) {
- local->op_ret = op_ret;
- local->op_errno = op_errno;
- } else {
- local->written_size += op_ret;
- local->postbuf.ia_blocks += (postbuf->ia_blocks -
- prebuf->ia_blocks);
- local->postbuf.ia_size += (postbuf->ia_size - prebuf->ia_size);
+ for (i = 0; i < count; i++) {
+ address = (char *) address + off;
+ memcpy (address, vector[i].iov_base, vector[i].iov_len);
+ off += vector[i].iov_len;
}
+out:
if (anon_fd)
fd_unref (anon_fd);
-
call_count = shard_call_count_return (frame);
if (call_count == 0) {
if (local->op_ret < 0) {
- SHARD_STACK_UNWIND (writev, frame, local->written_size,
- local->op_errno, NULL, NULL, NULL);
+ SHARD_STACK_UNWIND (readv, frame, local->op_ret,
+ local->op_errno, NULL, 0, NULL,
+ NULL, NULL);
} else {
if (xdata)
local->xattr_rsp = dict_ref (xdata);
- shard_update_file_size (frame, this);
+ vec.iov_base = local->iobuf->ptr;
+ vec.iov_len = local->op_ret;
+ SHARD_STACK_UNWIND (readv, frame, local->op_ret,
+ local->op_errno, &vec, 1,
+ &local->prebuf, local->iobref,
+ local->xattr_rsp);
+ return 0;
}
}
@@ -1698,64 +1621,44 @@ shard_writev_do_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int
-shard_writev_do (call_frame_t *frame, xlator_t *this)
+shard_readv_do (call_frame_t *frame, xlator_t *this)
{
- int i = 0;
- int count = 0;
- int call_count = 0;
- int last_block = 0;
- uint32_t cur_block = 0;
- fd_t *fd = NULL;
- fd_t *anon_fd = NULL;
- shard_local_t *local = NULL;
- struct iovec *vec = NULL;
- gf_boolean_t wind_failed = _gf_false;
- off_t orig_offset = 0;
- off_t shard_offset = 0;
- off_t vec_offset = 0;
- size_t remaining_size = 0;
- size_t write_size = 0;
+ int i = 0;
+ int ret = 0;
+ int call_count = 0;
+ int last_block = 0;
+ int cur_block = 0;
+ off_t orig_offset = 0;
+ off_t shard_offset = 0;
+ size_t read_size = 0;
+ size_t remaining_size = 0;
+ fd_t *fd = NULL;
+ fd_t *anon_fd = NULL;
+ shard_local_t *local = NULL;
+ gf_boolean_t wind_failed = _gf_false;
local = frame->local;
fd = local->fd;
orig_offset = local->offset;
- remaining_size = local->total_size;
cur_block = local->first_block;
- local->call_count = call_count = local->num_blocks;
last_block = local->last_block;
+ remaining_size = local->total_size;
+ local->call_count = call_count = local->num_blocks;
while (cur_block <= last_block) {
if (wind_failed) {
- shard_writev_do_cbk (frame, (void *) (long) 0, this, -1,
- ENOMEM, NULL, NULL, NULL);
+ shard_readv_do_cbk (frame, (void *) (long) 0, this, -1,
+ ENOMEM, NULL, 0, NULL, NULL, NULL);
goto next;
}
shard_offset = orig_offset % local->block_size;
- write_size = local->block_size - shard_offset;
- if (write_size > remaining_size)
- write_size = remaining_size;
-
- remaining_size -= write_size;
-
- count = iov_subset (local->vector, local->count, vec_offset,
- vec_offset + write_size, NULL);
-
- vec = GF_CALLOC (count, sizeof (struct iovec),
- gf_shard_mt_iovec);
- if (!vec) {
- local->op_ret = -1;
- local->op_errno = ENOMEM;
- wind_failed = _gf_true;
- GF_FREE (vec);
- shard_writev_do_cbk (frame, (void *) (long) 0, this, -1,
- ENOMEM, NULL, NULL, NULL);
- goto next;
- }
+ read_size = local->block_size - shard_offset;
+ if (read_size > remaining_size)
+ read_size = remaining_size;
- count = iov_subset (local->vector, local->count, vec_offset,
- vec_offset + write_size, vec);
+ remaining_size -= read_size;
if (cur_block == 0) {
anon_fd = fd_ref (fd);
@@ -1765,24 +1668,35 @@ shard_writev_do (call_frame_t *frame, xlator_t *this)
local->op_ret = -1;
local->op_errno = ENOMEM;
wind_failed = _gf_true;
- GF_FREE (vec);
- shard_writev_do_cbk (frame,
- (void *) (long) anon_fd,
- this, -1, ENOMEM, NULL,
- NULL, NULL);
+ shard_readv_do_cbk (frame,
+ (void *) (long) anon_fd,
+ this, -1, ENOMEM, NULL, 0,
+ NULL, NULL, NULL);
goto next;
}
}
- STACK_WIND_COOKIE (frame, shard_writev_do_cbk, anon_fd,
+ ret = fd_ctx_set (anon_fd, this, cur_block);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to set fd "
+ "ctx for block %d, gfid=%s", cur_block,
+ uuid_utoa (local->inode_list[i]->gfid));
+ local->op_ret = -1;
+ local->op_errno = ENOMEM;
+ wind_failed = _gf_true;
+ shard_readv_do_cbk (frame, (void *) (long) anon_fd,
+ this, -1, ENOMEM, NULL, 0, NULL,
+ NULL, NULL);
+ goto next;
+ }
+
+ STACK_WIND_COOKIE (frame, shard_readv_do_cbk, anon_fd,
FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->writev, anon_fd,
- vec, count, shard_offset, local->flags,
- local->iobref, local->xattr_req);
- GF_FREE (vec);
- vec = NULL;
- orig_offset += write_size;
- vec_offset += write_size;
+ FIRST_CHILD(this)->fops->readv, anon_fd,
+ read_size, shard_offset, local->flags,
+ local->xattr_req);
+
+ orig_offset += read_size;
next:
cur_block++;
i++;
@@ -1792,51 +1706,46 @@ next:
}
int
-shard_post_lookup_writev_handler (call_frame_t *frame, xlator_t *this)
+shard_post_lookup_shards_readv_handler (call_frame_t *frame, xlator_t *this)
{
shard_local_t *local = NULL;
local = frame->local;
if (local->op_ret < 0) {
- SHARD_STACK_UNWIND (writev, frame, local->op_ret,
- local->op_errno, NULL, NULL, NULL);
+ SHARD_STACK_UNWIND (readv, frame, local->op_ret,
+ local->op_errno, NULL, 0, NULL, NULL, NULL);
return 0;
}
- local->postbuf = local->prebuf;
-
- /* At this point, calculate the size of the hole if it is going to be
- * created as part of this write.
- */
- if (local->offset > local->prebuf.ia_size)
- local->hole_size = local->offset - local->prebuf.ia_size;
-
- shard_writev_do (frame, this);
+ shard_readv_do (frame, this);
return 0;
}
int
-shard_post_lookup_shards_writev_handler (call_frame_t *frame, xlator_t *this)
+shard_post_mknod_readv_handler (call_frame_t *frame, xlator_t *this)
{
shard_local_t *local = NULL;
local = frame->local;
if (local->op_ret < 0) {
- SHARD_STACK_UNWIND (writev, frame, local->op_ret,
- local->op_errno, NULL, NULL, NULL);
+ SHARD_STACK_UNWIND (readv, frame, local->op_ret,
+ local->op_errno, NULL, 0, NULL, NULL, NULL);
return 0;
}
- shard_lookup_base_file (frame, this, &local->loc,
- shard_post_lookup_writev_handler);
+ if (!local->eexist_count)
+ shard_readv_do (frame, this);
+ else
+ shard_common_lookup_shards (frame, this, local->loc.inode,
+ shard_post_lookup_shards_readv_handler);
return 0;
}
int
-shard_writev_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+shard_common_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, inode_t *inode,
struct iatt *buf, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
@@ -1854,9 +1763,8 @@ shard_writev_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local->op_ret = op_ret;
local->op_errno = op_errno;
}
- gf_log (this->name, GF_LOG_DEBUG, "SHARD WRITEV: mknod of "
- "shard %d failed: %s", shard_block_num,
- strerror (op_errno));
+ gf_log (this->name, GF_LOG_DEBUG, "mknod of shard %d "
+ "failed: %s", shard_block_num, strerror (op_errno));
goto done;
}
@@ -1864,32 +1772,15 @@ shard_writev_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
done:
call_count = shard_call_count_return (frame);
- if (call_count == 0) {
- if (local->op_ret < 0) {
- goto unwind;
- } else {
- if (!local->eexist_count) {
- shard_lookup_base_file (frame, this,
- &local->loc,
- shard_post_lookup_writev_handler);
- } else {
- local->call_count = local->eexist_count;
- shard_common_lookup_shards (frame, this,
- local->loc.inode,
- shard_post_lookup_shards_writev_handler);
- }
- }
- }
- return 0;
+ if (call_count == 0)
+ local->post_mknod_handler (frame, this);
-unwind:
- SHARD_STACK_UNWIND (writev, frame, local->op_ret, local->op_errno, NULL,
- NULL, NULL);
return 0;
}
int
-shard_writev_resume_mknod (call_frame_t *frame, xlator_t *this)
+shard_common_resume_mknod (call_frame_t *frame, xlator_t *this,
+ shard_post_mknod_fop_handler_t post_mknod_handler)
{
int i = 0;
int shard_idx_iter = 0;
@@ -1912,11 +1803,14 @@ shard_writev_resume_mknod (call_frame_t *frame, xlator_t *this)
shard_idx_iter = local->first_block;
last_block = local->last_block;
call_count = local->call_count;
+ local->post_mknod_handler = post_mknod_handler;
ret = shard_inode_ctx_get_all (fd->inode, this, &ctx_tmp);
if (ret) {
gf_log (this->name, GF_LOG_ERROR, "Failed to get inode ctx for"
" %s", uuid_utoa (fd->inode->gfid));
+ local->op_ret = -1;
+ local->op_errno = ENOMEM;
goto err;
}
@@ -1928,7 +1822,7 @@ shard_writev_resume_mknod (call_frame_t *frame, xlator_t *this)
}
if (wind_failed) {
- shard_writev_mknod_cbk (frame,
+ shard_common_mknod_cbk (frame,
(void *) (long) shard_idx_iter,
this, -1, ENOMEM, NULL, NULL,
NULL, NULL, NULL);
@@ -1943,7 +1837,7 @@ shard_writev_resume_mknod (call_frame_t *frame, xlator_t *this)
local->op_ret = -1;
local->op_errno = ENOMEM;
wind_failed = _gf_true;
- shard_writev_mknod_cbk (frame,
+ shard_common_mknod_cbk (frame,
(void *) (long) shard_idx_iter,
this, -1, ENOMEM, NULL, NULL,
NULL, NULL, NULL);
@@ -1963,7 +1857,7 @@ shard_writev_resume_mknod (call_frame_t *frame, xlator_t *this)
wind_failed = _gf_true;
loc_wipe (&loc);
dict_unref (xattr_req);
- shard_writev_mknod_cbk (frame,
+ shard_common_mknod_cbk (frame,
(void *) (long) shard_idx_iter,
this, -1, ENOMEM, NULL, NULL,
NULL, NULL, NULL);
@@ -1974,7 +1868,7 @@ shard_writev_resume_mknod (call_frame_t *frame, xlator_t *this)
if (loc.name)
loc.name++;
- STACK_WIND_COOKIE (frame, shard_writev_mknod_cbk,
+ STACK_WIND_COOKIE (frame, shard_common_mknod_cbk,
(void *) (long) shard_idx_iter,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->mknod, &loc,
@@ -1995,22 +1889,29 @@ err:
* This block is for handling failure in shard_inode_ctx_get_all().
* Failures in the while-loop are handled within the loop.
*/
- SHARD_STACK_UNWIND (writev, frame, -1, ENOMEM, NULL, NULL, NULL);
+ post_mknod_handler (frame, this);
return 0;
}
int
-shard_post_resolve_writev_handler (call_frame_t *frame, xlator_t *this)
+shard_post_resolve_readv_handler (call_frame_t *frame, xlator_t *this)
{
shard_local_t *local = NULL;
local = frame->local;
+ if (local->op_ret < 0) {
+ SHARD_STACK_UNWIND (readv, frame, local->op_ret,
+ local->op_errno, NULL, 0, NULL, NULL, NULL);
+ return 0;
+ }
+
if (local->call_count)
- shard_writev_resume_mknod (frame, this);
+ shard_common_resume_mknod (frame, this,
+ shard_post_mknod_readv_handler);
else
- shard_lookup_base_file (frame, this, &local->loc,
- shard_post_lookup_writev_handler);
+ shard_readv_do (frame, this);
+
return 0;
}
@@ -2039,29 +1940,34 @@ shard_lookup_dot_shard_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local = frame->local;
- if (op_ret)
+ if (op_ret) {
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
goto unwind;
+ }
if (!IA_ISDIR (buf->ia_type)) {
gf_log (this->name, GF_LOG_CRITICAL, "/.shard already exists "
"and is not a directory. Please remove /.shard from all"
" bricks and try again");
- op_errno = EIO;
+ local->op_ret = -1;
+ local->op_errno = EIO;
goto unwind;
}
shard_link_dot_shard_inode (local, inode, buf);
shard_common_resolve_shards (frame, this, local->loc.inode,
- shard_post_resolve_writev_handler);
+ local->post_res_handler);
return 0;
unwind:
- SHARD_STACK_UNWIND (writev, frame, -1, op_errno, NULL, NULL, NULL);
+ local->post_res_handler (frame, this);
return 0;
}
int
-shard_lookup_dot_shard (call_frame_t *frame, xlator_t *this)
+shard_lookup_dot_shard (call_frame_t *frame, xlator_t *this,
+ shard_post_resolve_fop_handler_t post_res_handler)
{
int ret = -1;
dict_t *xattr_req = NULL;
@@ -2070,16 +1976,22 @@ shard_lookup_dot_shard (call_frame_t *frame, xlator_t *this)
local = frame->local;
priv = this->private;
+ local->post_res_handler = post_res_handler;
xattr_req = dict_new ();
- if (!xattr_req)
+ if (!xattr_req) {
+ local->op_ret = -1;
+ local->op_errno = ENOMEM;
goto err;
+ }
ret = dict_set_static_bin (xattr_req, "gfid-req", priv->dot_shard_gfid,
16);
if (ret) {
gf_log (this->name, GF_LOG_ERROR, "Failed to set gfid of "
"/.shard into dict");
+ local->op_ret = -1;
+ local->op_errno = ENOMEM;
goto err;
}
@@ -2093,8 +2005,467 @@ shard_lookup_dot_shard (call_frame_t *frame, xlator_t *this)
err:
if (xattr_req)
dict_unref (xattr_req);
+ post_res_handler (frame, this);
+ return 0;
+}
+
+int
+shard_post_lookup_readv_handler (call_frame_t *frame, xlator_t *this)
+{
+ int ret = 0;
+ size_t read_size = 0;
+ size_t actual_size = 0;
+ struct iobuf *iobuf = NULL;
+ shard_local_t *local = NULL;
+ shard_priv_t *priv = NULL;
+
+ priv = this->private;
+ local = frame->local;
+
+ if (local->op_ret < 0) {
+ SHARD_STACK_UNWIND (readv, frame, local->op_ret,
+ local->op_errno, NULL, 0, NULL, NULL, NULL);
+ return 0;
+ }
+
+ if (local->offset >= local->prebuf.ia_size) {
+ /* If the read is being performed past the end of the file,
+ * unwind the FOP with 0 bytes read as status.
+ */
+ struct iovec vec = {0,};
+
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, local->req_size);
+ if (!iobuf)
+ goto err;
+
+ vec.iov_base = iobuf->ptr;
+ vec.iov_len = 0;
+ local->iobref = iobref_new ();
+ iobref_add (local->iobref, iobuf);
+ iobuf_unref (iobuf);
+
+ SHARD_STACK_UNWIND (readv, frame, 0, 0, &vec, 1, &local->prebuf,
+ local->iobref, NULL);
+ return 0;
+ }
+
+ read_size = (local->offset + local->req_size);
+ actual_size = local->prebuf.ia_size;
+
+ local->first_block = get_lowest_block (local->offset,
+ local->block_size);
+
+ /* If the end of read surpasses the file size, only resolve and read
+ * till the end of the file size. If the read is confined within the
+ * size of the file, read only the requested size.
+ */
+
+ if (read_size >= actual_size)
+ local->total_size = actual_size - local->offset;
+ else
+ local->total_size = local->req_size;
+
+ local->last_block = get_highest_block (local->offset, local->total_size,
+ local->block_size);
+
+ local->num_blocks = local->last_block - local->first_block + 1;
+
+ local->inode_list = GF_CALLOC (local->num_blocks, sizeof (inode_t *),
+ gf_shard_mt_inode_list);
+ if (!local->inode_list)
+ goto err;
+
+ iobuf = iobuf_get2 (this->ctx->iobuf_pool, local->total_size);
+ if (!iobuf)
+ goto err;
+
+ local->iobref = iobref_new ();
+ if (!local->iobref) {
+ iobuf_unref (iobuf);
+ goto err;
+ }
+
+ if (iobref_add (local->iobref, iobuf) != 0) {
+ iobuf_unref (iobuf);
+ goto err;
+ }
+
+ iobuf_unref (iobuf);
+ local->iobuf = iobuf;
+ memset (iobuf->ptr, 0, local->total_size);
+
+ local->dot_shard_loc.inode = inode_find (this->itable,
+ priv->dot_shard_gfid);
+ if (!local->dot_shard_loc.inode) {
+ ret = shard_init_dot_shard_loc (this, local);
+ if (ret)
+ goto err;
+ shard_lookup_dot_shard (frame, this,
+ shard_post_resolve_readv_handler);
+ } else {
+ shard_common_resolve_shards (frame, this, local->loc.inode,
+ shard_post_resolve_readv_handler);
+ }
+ return 0;
+
+err:
+ SHARD_STACK_UNWIND (readv, frame, -1, ENOMEM, NULL, 0, NULL, NULL,
+ NULL);
+ return 0;
+}
+
+int
+shard_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
+ off_t offset, uint32_t flags, dict_t *xdata)
+{
+ int ret = 0;
+ uint64_t block_size = 0;
+ shard_local_t *local = NULL;
+
+ ret = shard_inode_ctx_get_block_size (fd->inode, this, &block_size);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to get block size for"
+ "%s from its inode ctx", uuid_utoa (fd->inode->gfid));
+ goto err;
+ }
+
+ if (!block_size) {
+ /* block_size = 0 means that the file was created before
+ * sharding was enabled on the volume.
+ */
+ STACK_WIND (frame, default_readv_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readv, fd, size, offset,
+ flags, xdata);
+ return 0;
+ }
+
+ if (!this->itable)
+ this->itable = fd->inode->table;
+
+ local = mem_get0 (this->local_pool);
+ if (!local)
+ goto err;
+
+ frame->local = local;
+
+ local->fd = fd_ref (fd);
+ local->block_size = block_size;
+ local->offset = offset;
+ local->req_size = size;
+ local->flags = flags;
+ local->xattr_req = (xdata) ? dict_ref (xdata) : dict_new ();
+ if (!local->xattr_req)
+ goto err;
+
+ local->loc.inode = inode_ref (fd->inode);
+ gf_uuid_copy (local->loc.gfid, fd->inode->gfid);
+
+ shard_lookup_base_file (frame, this, &local->loc,
+ shard_post_lookup_readv_handler);
+
+ return 0;
+
+err:
+ SHARD_STACK_UNWIND (readv, frame, -1, ENOMEM, NULL, 0, NULL, NULL,
+ NULL);
+ return 0;
+
+}
+
+int
+shard_update_file_size_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ shard_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (op_ret < 0) {
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
+ goto err;
+ }
+
+ SHARD_STACK_UNWIND (writev, frame, local->written_size, local->op_errno,
+ &local->prebuf, &local->postbuf, local->xattr_rsp);
+ return 0;
+
+err:
+ SHARD_STACK_UNWIND (writev, frame, -1, local->op_errno, NULL,
+ NULL, NULL);
+ return 0;
+}
+
+int
+shard_update_file_size (call_frame_t *frame, xlator_t *this)
+{
+ int ret = -1;
+ uint64_t *size_attr = NULL;
+ fd_t *fd = NULL;
+ shard_local_t *local = NULL;
+ dict_t *xattr_req = NULL;
+
+ local = frame->local;
+ fd = local->fd;
+
+ xattr_req = dict_new ();
+ if (!xattr_req)
+ goto err;
+
+ ret = shard_set_size_attrs (local->postbuf.ia_size + local->hole_size,
+ local->postbuf.ia_blocks, &size_attr);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to set size attrs for"
+ " %s", uuid_utoa (fd->inode->gfid));
+ goto err;
+ }
+
+ ret = dict_set_bin (xattr_req, GF_XATTR_SHARD_FILE_SIZE, size_attr,
+ 8 * 4);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to set key %s into "
+ "dict. gfid=%s", GF_XATTR_SHARD_FILE_SIZE,
+ uuid_utoa (fd->inode->gfid));
+ GF_FREE (size_attr);
+ goto err;
+ }
+
+ STACK_WIND (frame, shard_update_file_size_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->fsetxattr, fd, xattr_req, 0, NULL);
+
+ dict_unref (xattr_req);
+ return 0;
+
+err:
+ if (xattr_req)
+ dict_unref (xattr_req);
SHARD_STACK_UNWIND (writev, frame, -1, ENOMEM, NULL, NULL, NULL);
return 0;
+
+}
+
+int
+shard_writev_do_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
+ struct iatt *postbuf, dict_t *xdata)
+{
+ int call_count = 0;
+ fd_t *anon_fd = cookie;
+ shard_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (op_ret < 0) {
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
+ } else {
+ local->written_size += op_ret;
+ local->postbuf.ia_blocks += (postbuf->ia_blocks -
+ prebuf->ia_blocks);
+ local->postbuf.ia_size += (postbuf->ia_size - prebuf->ia_size);
+ }
+
+ if (anon_fd)
+ fd_unref (anon_fd);
+
+ call_count = shard_call_count_return (frame);
+ if (call_count == 0) {
+ if (local->op_ret < 0) {
+ SHARD_STACK_UNWIND (writev, frame, local->written_size,
+ local->op_errno, NULL, NULL, NULL);
+ } else {
+ if (xdata)
+ local->xattr_rsp = dict_ref (xdata);
+ shard_update_file_size (frame, this);
+ }
+ }
+
+ return 0;
+}
+
+int
+shard_writev_do (call_frame_t *frame, xlator_t *this)
+{
+ int i = 0;
+ int count = 0;
+ int call_count = 0;
+ int last_block = 0;
+ uint32_t cur_block = 0;
+ fd_t *fd = NULL;
+ fd_t *anon_fd = NULL;
+ shard_local_t *local = NULL;
+ struct iovec *vec = NULL;
+ gf_boolean_t wind_failed = _gf_false;
+ off_t orig_offset = 0;
+ off_t shard_offset = 0;
+ off_t vec_offset = 0;
+ size_t remaining_size = 0;
+ size_t write_size = 0;
+
+ local = frame->local;
+ fd = local->fd;
+
+ orig_offset = local->offset;
+ remaining_size = local->total_size;
+ cur_block = local->first_block;
+ local->call_count = call_count = local->num_blocks;
+ last_block = local->last_block;
+
+ while (cur_block <= last_block) {
+ if (wind_failed) {
+ shard_writev_do_cbk (frame, (void *) (long) 0, this, -1,
+ ENOMEM, NULL, NULL, NULL);
+ goto next;
+ }
+
+ shard_offset = orig_offset % local->block_size;
+ write_size = local->block_size - shard_offset;
+ if (write_size > remaining_size)
+ write_size = remaining_size;
+
+ remaining_size -= write_size;
+
+ count = iov_subset (local->vector, local->count, vec_offset,
+ vec_offset + write_size, NULL);
+
+ vec = GF_CALLOC (count, sizeof (struct iovec),
+ gf_shard_mt_iovec);
+ if (!vec) {
+ local->op_ret = -1;
+ local->op_errno = ENOMEM;
+ wind_failed = _gf_true;
+ GF_FREE (vec);
+ shard_writev_do_cbk (frame, (void *) (long) 0, this, -1,
+ ENOMEM, NULL, NULL, NULL);
+ goto next;
+ }
+
+ count = iov_subset (local->vector, local->count, vec_offset,
+ vec_offset + write_size, vec);
+
+ if (cur_block == 0) {
+ anon_fd = fd_ref (fd);
+ } else {
+ anon_fd = fd_anonymous (local->inode_list[i]);
+ if (!anon_fd) {
+ local->op_ret = -1;
+ local->op_errno = ENOMEM;
+ wind_failed = _gf_true;
+ GF_FREE (vec);
+ shard_writev_do_cbk (frame,
+ (void *) (long) anon_fd,
+ this, -1, ENOMEM, NULL,
+ NULL, NULL);
+ goto next;
+ }
+ }
+
+ STACK_WIND_COOKIE (frame, shard_writev_do_cbk, anon_fd,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->writev, anon_fd,
+ vec, count, shard_offset, local->flags,
+ local->iobref, local->xattr_req);
+ GF_FREE (vec);
+ vec = NULL;
+ orig_offset += write_size;
+ vec_offset += write_size;
+next:
+ cur_block++;
+ i++;
+ call_count--;
+ }
+ return 0;
+}
+
+int
+shard_post_lookup_writev_handler (call_frame_t *frame, xlator_t *this)
+{
+ shard_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (local->op_ret < 0) {
+ SHARD_STACK_UNWIND (writev, frame, local->op_ret,
+ local->op_errno, NULL, NULL, NULL);
+ return 0;
+ }
+
+ local->postbuf = local->prebuf;
+
+ /* At this point, calculate the size of the hole if it is going to be
+ * created as part of this write.
+ */
+ if (local->offset > local->prebuf.ia_size)
+ local->hole_size = local->offset - local->prebuf.ia_size;
+
+ shard_writev_do (frame, this);
+
+ return 0;
+}
+
+int
+shard_post_lookup_shards_writev_handler (call_frame_t *frame, xlator_t *this)
+{
+ shard_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (local->op_ret < 0) {
+ SHARD_STACK_UNWIND (writev, frame, local->op_ret,
+ local->op_errno, NULL, NULL, NULL);
+ return 0;
+ }
+
+ shard_lookup_base_file (frame, this, &local->loc,
+ shard_post_lookup_writev_handler);
+ return 0;
+}
+
+int
+shard_post_mknod_writev_handler (call_frame_t *frame, xlator_t *this)
+{
+ shard_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (local->op_ret < 0) {
+ SHARD_STACK_UNWIND (writev, frame, local->op_ret,
+ local->op_errno, NULL, NULL, NULL);
+ return 0;
+ }
+
+ if (!local->eexist_count) {
+ shard_lookup_base_file (frame, this, &local->loc,
+ shard_post_lookup_writev_handler);
+ } else {
+ local->call_count = local->eexist_count;
+ shard_common_lookup_shards (frame, this, local->loc.inode,
+ shard_post_lookup_shards_writev_handler);
+ }
+
+ return 0;
+}
+
+int
+shard_post_resolve_writev_handler (call_frame_t *frame, xlator_t *this)
+{
+ shard_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (local->op_ret < 0) {
+ SHARD_STACK_UNWIND (writev, frame, local->op_ret,
+ local->op_errno, NULL, NULL, NULL);
+ return 0;
+ }
+
+ if (local->call_count)
+ shard_common_resume_mknod (frame, this,
+ shard_post_mknod_writev_handler);
+ else
+ shard_lookup_base_file (frame, this, &local->loc,
+ shard_post_lookup_writev_handler);
+ return 0;
}
int
@@ -2114,7 +2485,8 @@ shard_writev_mkdir_dot_shard_cbk (call_frame_t *frame, void *cookie,
} else {
gf_log (this->name, GF_LOG_DEBUG, "mkdir on /.shard "
"failed with EEXIST. Attempting lookup now");
- shard_lookup_dot_shard (frame, this);
+ shard_lookup_dot_shard (frame, this,
+ shard_post_resolve_writev_handler);
return 0;
}
}
@@ -2135,7 +2507,6 @@ shard_writev_mkdir_dot_shard (call_frame_t *frame, xlator_t *this)
int ret = -1;
shard_local_t *local = NULL;
shard_priv_t *priv = NULL;
- loc_t *dot_shard_loc = NULL;
dict_t *xattr_req = NULL;
local = frame->local;
@@ -2145,21 +2516,9 @@ shard_writev_mkdir_dot_shard (call_frame_t *frame, xlator_t *this)
if (!xattr_req)
goto err;
- dot_shard_loc = &local->dot_shard_loc;
-
- dot_shard_loc->inode = inode_new (this->itable);
- dot_shard_loc->parent = inode_ref (this->itable->root);
- ret = inode_path (dot_shard_loc->parent, GF_SHARD_DIR,
- (char **)&dot_shard_loc->path);
- if (ret < 0 || !(dot_shard_loc->inode)) {
- gf_log (this->name, GF_LOG_ERROR, "Inode path failed on"
- " %s", GF_SHARD_DIR);
+ ret = shard_init_dot_shard_loc (this, local);
+ if (ret)
goto err;
- }
-
- dot_shard_loc->name = strrchr (dot_shard_loc->path, '/');
- if (dot_shard_loc->name)
- dot_shard_loc->name++;
ret = dict_set_static_bin (xattr_req, "gfid-req", priv->dot_shard_gfid,
16);
diff --git a/xlators/features/shard/src/shard.h b/xlators/features/shard/src/shard.h
index 365616c108c..09af56a17ca 100644
--- a/xlators/features/shard/src/shard.h
+++ b/xlators/features/shard/src/shard.h
@@ -135,6 +135,9 @@ typedef int32_t (*shard_post_resolve_fop_handler_t) (call_frame_t *frame,
typedef int32_t (*shard_post_lookup_shards_fop_handler_t) (call_frame_t *frame,
xlator_t *this);
+typedef int32_t (*shard_post_mknod_fop_handler_t) (call_frame_t *frame,
+ xlator_t *this);
+
typedef struct shard_local {
int op_ret;
int op_errno;
@@ -152,6 +155,7 @@ typedef struct shard_local {
size_t total_size;
size_t written_size;
size_t hole_size;
+ size_t req_size;
loc_t loc;
loc_t dot_shard_loc;
loc_t loc2;
@@ -169,8 +173,11 @@ typedef struct shard_local {
struct iatt postnewparent;
struct iovec *vector;
struct iobref *iobref;
+ struct iobuf *iobuf;
shard_post_fop_handler_t handler;
shard_post_lookup_shards_fop_handler_t pls_fop_handler;
+ shard_post_resolve_fop_handler_t post_res_handler;
+ shard_post_mknod_fop_handler_t post_mknod_handler;
struct {
int lock_count;
fop_inodelk_cbk_t inodelk_cbk;