summaryrefslogtreecommitdiffstats
path: root/xlators
diff options
context:
space:
mode:
authorVikas Gorur <vikas@gluster.com>2009-10-23 09:31:52 +0000
committerAnand V. Avati <avati@dev.gluster.com>2009-10-23 08:29:25 -0700
commit6490122f107c992f2600fc7d3214a43c3f50df70 (patch)
treef577fd821df91355d70fb889552dbb4ada375dc9 /xlators
parent10dea439359238aed4dbad0e2c25df9fb2d5f28e (diff)
cluster/afr: Pipeline the "diff" data self-heal read-write loop.
Start upto "data-self-heal-window-size" instances of the read-write loop of the "diff" data self-heal algorithm simultaneously. Signed-off-by: Anand V. Avati <avati@dev.gluster.com> BUG: 320 (Improve self-heal performance) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=320
Diffstat (limited to 'xlators')
-rw-r--r--xlators/cluster/afr/src/afr-self-heal-algorithm.c372
-rw-r--r--xlators/cluster/afr/src/afr-self-heal-algorithm.h5
2 files changed, 267 insertions, 110 deletions
diff --git a/xlators/cluster/afr/src/afr-self-heal-algorithm.c b/xlators/cluster/afr/src/afr-self-heal-algorithm.c
index 834b7171d..e8fce966b 100644
--- a/xlators/cluster/afr/src/afr-self-heal-algorithm.c
+++ b/xlators/cluster/afr/src/afr-self-heal-algorithm.c
@@ -414,9 +414,6 @@ sh_diff_private_cleanup (call_frame_t *frame, xlator_t *this)
if (sh_priv->checksum)
FREE (sh_priv->checksum);
- if (sh_priv->write_needed)
- FREE (sh_priv->write_needed);
-
FREE (sh_priv);
}
}
@@ -437,48 +434,112 @@ sh_diff_number_of_writes_needed (unsigned char *write_needed, int child_count)
}
+struct sh_diff_loop_state {
+ off_t offset;
+ int32_t child_index;
+ unsigned char *write_needed;
+};
+
+
+static int
+sh_diff_loop_driver (call_frame_t *frame, xlator_t *this);
+
+
static int
-sh_diff_iter (call_frame_t *frame, xlator_t *this);
+sh_diff_loop_return (call_frame_t *rw_frame, xlator_t *this,
+ struct sh_diff_loop_state *loop_state)
+{
+ afr_private_t * priv = NULL;
+ afr_local_t * rw_local = NULL;
+ afr_self_heal_t * rw_sh = NULL;
+
+ call_frame_t *sh_frame = NULL;
+ afr_local_t * sh_local = NULL;
+ afr_self_heal_t *sh = NULL;
+ afr_sh_algo_diff_private_t *sh_priv = NULL;
+
+ priv = this->private;
+
+ rw_local = rw_frame->local;
+ rw_sh = &rw_local->self_heal;
+
+ sh_frame = rw_sh->sh_frame;
+ sh_local = sh_frame->local;
+ sh = &sh_local->self_heal;
+ sh_priv = sh->private;
+
+ LOCK (&sh_priv->lock);
+ {
+ sh_priv->loops_running--;
+ }
+ UNLOCK (&sh_priv->lock);
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "loop for offset %"PRId64" returned", loop_state->offset);
+
+ AFR_STACK_DESTROY (rw_frame);
+
+ if (loop_state) {
+ if (loop_state->write_needed)
+ FREE (loop_state->write_needed);
+
+ FREE (loop_state);
+ }
+
+ sh_diff_loop_driver (sh_frame, this);
+
+ return 0;
+}
static int
-sh_diff_write_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+sh_diff_write_cbk (call_frame_t *rw_frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct stat *buf,
struct stat *postbuf)
{
- afr_private_t * priv = NULL;
- afr_local_t * local = NULL;
- afr_self_heal_t * sh = NULL;
+ afr_private_t * priv = NULL;
+ afr_local_t * rw_local = NULL;
+ afr_self_heal_t * rw_sh = NULL;
- int child_index = (long) cookie;
- int call_count = 0;
+ call_frame_t *sh_frame = NULL;
+ afr_local_t * sh_local = NULL;
+ afr_self_heal_t *sh = NULL;
- priv = this->private;
- local = frame->local;
- sh = &local->self_heal;
+ struct sh_diff_loop_state *loop_state = (struct sh_diff_loop_state *) cookie;
+
+ int call_count = 0;
+
+ priv = this->private;
+ rw_local = rw_frame->local;
+ rw_sh = &rw_local->self_heal;
+
+ sh_frame = rw_sh->sh_frame;
+ sh_local = sh_frame->local;
+ sh = &sh_local->self_heal;
gf_log (this->name, GF_LOG_TRACE,
"wrote %d bytes of data from %s to child %d, offset %"PRId64"",
- op_ret, local->loc.path, child_index, sh->offset - op_ret);
+ op_ret, sh_local->loc.path, loop_state->child_index,
+ loop_state->offset);
- LOCK (&frame->lock);
+ LOCK (&sh_frame->lock);
{
if (op_ret == -1) {
gf_log (this->name, GF_LOG_DEBUG,
"write to %s failed on subvolume %s (%s)",
- local->loc.path,
- priv->children[child_index]->name,
+ sh_local->loc.path,
+ priv->children[loop_state->child_index]->name,
strerror (op_errno));
sh->op_failed = 1;
}
}
- UNLOCK (&frame->lock);
+ UNLOCK (&sh_frame->lock);
- call_count = afr_frame_return (frame);
+ call_count = afr_frame_return (rw_frame);
if (call_count == 0) {
- sh_diff_iter (frame, this);
+ sh_diff_loop_return (rw_frame, this, loop_state);
}
return 0;
@@ -486,70 +547,66 @@ sh_diff_write_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
static int
-sh_diff_read_cbk (call_frame_t *frame, void *cookie,
+sh_diff_read_cbk (call_frame_t *rw_frame, void *cookie,
xlator_t *this, int32_t op_ret, int32_t op_errno,
struct iovec *vector, int32_t count, struct stat *buf,
struct iobref *iobref)
{
- afr_private_t * priv = NULL;
- afr_local_t * local = NULL;
- afr_self_heal_t * sh = NULL;
+ afr_private_t * priv = NULL;
+ afr_local_t * rw_local = NULL;
+ afr_self_heal_t * rw_sh = NULL;
- afr_sh_algo_diff_private_t *sh_priv = NULL;
+ afr_sh_algo_diff_private_t * sh_priv = NULL;
- int child_index = (long) cookie;
+ call_frame_t *sh_frame = NULL;
+ afr_local_t * sh_local = NULL;
+ afr_self_heal_t *sh = NULL;
+
+ struct sh_diff_loop_state *loop_state = (struct sh_diff_loop_state *) cookie;
int i = 0;
int call_count = 0;
- off_t offset;
+ priv = this->private;
+ rw_local = rw_frame->local;
+ rw_sh = &rw_local->self_heal;
- priv = this->private;
- local = frame->local;
- sh = &local->self_heal;
- sh_priv = sh->private;
+ sh_frame = rw_sh->sh_frame;
+ sh_local = sh_frame->local;
+ sh = &sh_local->self_heal;
+ sh_priv = sh->private;
- call_count = sh_diff_number_of_writes_needed (sh_priv->write_needed,
+ call_count = sh_diff_number_of_writes_needed (loop_state->write_needed,
priv->child_count);
- local->call_count = call_count;
+ rw_local->call_count = call_count;
gf_log (this->name, GF_LOG_TRACE,
- "read %d bytes of data from %s on child %d, offset %"PRId64"",
- op_ret, local->loc.path, child_index, sh->offset);
+ "read %d bytes of data from %s, offset %"PRId64"",
+ op_ret, sh_local->loc.path, sh->offset);
if (op_ret <= 0) {
- local->self_heal.algo_completion_cbk (frame, this);
+ sh_diff_loop_return (rw_frame, this, loop_state);
+
return 0;
}
- /* what if we read less than block size? */
- offset = sh->offset;
- sh->offset += sh_priv->block_size;
-
if (sh->file_has_holes) {
if (iov_0filled (vector, count) == 0) {
- /*
- the iter function depends on the
- sh->offset already being updated
- above
- */
- sh_diff_iter (frame, this);
+ sh_diff_loop_return (rw_frame, this, loop_state);
goto out;
}
}
for (i = 0; i < priv->child_count; i++) {
- if (sh_priv->write_needed[i]) {
- STACK_WIND_COOKIE (frame, sh_diff_write_cbk,
- (void *) (long) i,
+ if (loop_state->write_needed[i]) {
+ STACK_WIND_COOKIE (rw_frame, sh_diff_write_cbk,
+ (void *) (long) loop_state,
priv->children[i],
priv->children[i]->fops->writev,
- sh->healing_fd, vector, count, offset,
- iobref);
-
- sh_priv->write_needed[i] = 0;
+ sh->healing_fd, vector, count,
+ loop_state->offset, iobref);
if (!--call_count)
break;
@@ -562,73 +619,93 @@ out:
static int
-sh_diff_read (call_frame_t *frame, xlator_t *this)
+sh_diff_read (call_frame_t *rw_frame, xlator_t *this,
+ struct sh_diff_loop_state *loop_state)
{
- afr_private_t * priv = NULL;
- afr_local_t * local = NULL;
- afr_self_heal_t * sh = NULL;
+ afr_private_t * priv = NULL;
+ afr_local_t * rw_local = NULL;
+ afr_self_heal_t * rw_sh = NULL;
afr_sh_algo_diff_private_t * sh_priv = NULL;
- priv = this->private;
- local = frame->local;
- sh = &local->self_heal;
- sh_priv = sh->private;
+ call_frame_t *sh_frame = NULL;
+ afr_local_t * sh_local = NULL;
+ afr_self_heal_t *sh = NULL;
+
+ priv = this->private;
+ rw_local = rw_frame->local;
+ rw_sh = &rw_local->self_heal;
- STACK_WIND_COOKIE (frame, sh_diff_read_cbk,
- (void *) (long) sh->source,
+ sh_frame = rw_sh->sh_frame;
+ sh_local = sh_frame->local;
+ sh = &sh_local->self_heal;
+ sh_priv = sh->private;
+
+ STACK_WIND_COOKIE (rw_frame, sh_diff_read_cbk,
+ (void *) (long) loop_state,
priv->children[sh->source],
priv->children[sh->source]->fops->readv,
sh->healing_fd, sh_priv->block_size,
- sh->offset);
+ loop_state->offset);
return 0;
}
-
static int
-sh_diff_checksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+sh_diff_checksum_cbk (call_frame_t *rw_frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
uint32_t weak_checksum, uint8_t *strong_checksum)
{
- afr_private_t * priv = NULL;
- afr_local_t * local = NULL;
- afr_self_heal_t * sh = NULL;
+ afr_private_t * priv = NULL;
+ afr_local_t * rw_local = NULL;
+ afr_self_heal_t *rw_sh = NULL;
+
+ call_frame_t *sh_frame = NULL;
+ afr_local_t * sh_local = NULL;
+ afr_self_heal_t *sh = NULL;
+
afr_sh_algo_diff_private_t * sh_priv = NULL;
- int child_index = (long) cookie;
+ struct sh_diff_loop_state *loop_state = (struct sh_diff_loop_state *) cookie;
+
int call_count = 0;
int i = 0;
int write_needed = 0;
priv = this->private;
- local = frame->local;
- sh = &local->self_heal;
+
+ rw_local = rw_frame->local;
+ rw_sh = &rw_local->self_heal;
+
+ sh_frame = rw_sh->sh_frame;
+ sh_local = sh_frame->local;
+ sh = &sh_local->self_heal;
sh_priv = sh->private;
if (op_ret < 0) {
gf_log (this->name, GF_LOG_ERROR,
"checksum on %s failed on subvolume %s (%s)",
- local->loc.path, priv->children[child_index]->name,
+ sh_local->loc.path, priv->children[loop_state->child_index]->name,
strerror (op_errno));
sh->op_failed = 1;
- local->self_heal.algo_abort_cbk (frame, this);
+ sh_diff_loop_return (rw_frame, this, loop_state);
+
return 0;
}
- memcpy ((void *) sh_priv->checksum + (child_index * MD5_DIGEST_LEN),
+ memcpy ((void *) sh_priv->checksum + loop_state->child_index * MD5_DIGEST_LEN,
strong_checksum,
MD5_DIGEST_LEN);
- call_count = afr_frame_return (frame);
+ call_count = afr_frame_return (rw_frame);
if (call_count == 0) {
for (i = 0; i < priv->child_count; i++) {
- if (sh->sources[i] || !local->child_up[i])
+ if (sh->sources[i] || !sh_local->child_up[i])
continue;
if (memcmp ((const void *) sh_priv->checksum + (i * MD5_DIGEST_LEN),
@@ -642,18 +719,18 @@ sh_diff_checksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
gf_log (this->name, GF_LOG_TRACE,
"checksum on subvolume %s at offset %"
PRId64" differs from that on source",
- priv->children[i]->name, sh->offset);
+ priv->children[i]->name, loop_state->offset);
- write_needed = sh_priv->write_needed[i] = 1;
+ write_needed = loop_state->write_needed[i] = 1;
}
}
if (write_needed) {
- sh_diff_read (frame, this);
+ sh_diff_read (rw_frame, this, loop_state);
} else {
sh->offset += sh_priv->block_size;
- sh_diff_iter (frame, this);
+ sh_diff_loop_return (rw_frame, this, loop_state);
}
}
@@ -662,28 +739,55 @@ sh_diff_checksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
static int
-sh_diff_checksum (call_frame_t *frame, xlator_t *this)
+sh_diff_checksum (call_frame_t *frame, xlator_t *this, off_t offset)
{
- afr_private_t * priv = NULL;
- afr_local_t * local = NULL;
- afr_self_heal_t * sh = NULL;
+ afr_private_t * priv = NULL;
+ afr_local_t * local = NULL;
+ afr_local_t * rw_local = NULL;
+ afr_self_heal_t * sh = NULL;
+ afr_self_heal_t * rw_sh = NULL;
afr_sh_algo_diff_private_t * sh_priv = NULL;
+ call_frame_t *rw_frame = NULL;
+
+ struct sh_diff_loop_state *loop_state = NULL;
+
+ int32_t op_errno = 0;
+
+ int call_count = 0;
+ int i = 0;
+
priv = this->private;
local = frame->local;
sh = &local->self_heal;
+
sh_priv = sh->private;
- int call_count = 0;
- int i = 0;
+ rw_frame = copy_frame (frame);
+ if (!rw_frame)
+ goto out;
+
+ ALLOC_OR_GOTO (rw_local, afr_local_t, out);
+
+ rw_frame->local = rw_local;
+ rw_sh = &rw_local->self_heal;
+
+ rw_sh->offset = sh->offset;
+ rw_sh->sh_frame = frame;
call_count = sh->active_sinks + 1; /* sinks and source */
- local->call_count = call_count;
+ rw_local->call_count = call_count;
- STACK_WIND_COOKIE (frame, sh_diff_checksum_cbk,
- (void *) (long) i,
+ loop_state = CALLOC (1, sizeof (*loop_state));
+ loop_state->child_index = sh->source;
+ loop_state->offset = offset;
+ loop_state->write_needed = CALLOC (priv->child_count,
+ sizeof (*loop_state->write_needed));
+
+ STACK_WIND_COOKIE (rw_frame, sh_diff_checksum_cbk,
+ (void *) (long) loop_state,
priv->children[sh->source],
priv->children[sh->source]->fops->rchecksum,
sh->healing_fd,
@@ -693,51 +797,100 @@ sh_diff_checksum (call_frame_t *frame, xlator_t *this)
if (sh->sources[i] || !local->child_up[i])
continue;
- STACK_WIND_COOKIE (frame, sh_diff_checksum_cbk,
- (void *) (long) i,
+ STACK_WIND_COOKIE (rw_frame, sh_diff_checksum_cbk,
+ (void *) (long) loop_state,
priv->children[i],
priv->children[i]->fops->rchecksum,
sh->healing_fd,
sh->offset, sh_priv->block_size);
+
if (!--call_count)
break;
}
return 0;
+
+out:
+ sh->op_failed = 1;
+
+ sh_diff_loop_driver (frame, this);
+
+ return 0;
}
static int
-sh_diff_iter (call_frame_t *frame, xlator_t *this)
+sh_diff_loop_driver (call_frame_t *frame, xlator_t *this)
{
afr_private_t * priv = NULL;
afr_local_t * local = NULL;
afr_self_heal_t * sh = NULL;
+ afr_sh_algo_diff_private_t *sh_priv = NULL;
- priv = this->private;
- local = frame->local;
- sh = &local->self_heal;
+ int loop = 0;
+ off_t offset = 0;
+
+ priv = this->private;
+ local = frame->local;
+ sh = &local->self_heal;
+ sh_priv = sh->private;
if (sh->op_failed) {
- sh_diff_private_cleanup (frame, this);
+ if (sh_priv->loops_running == 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "diff self-heal aborting on %s",
+ local->loc.path);
+
+ sh_diff_private_cleanup (frame, this);
+
+ local->self_heal.algo_abort_cbk (frame, this);
+ }
- local->self_heal.algo_abort_cbk (frame, this);
goto out;
}
- if (sh->offset >= sh->file_size) {
- gf_log (this->name, GF_LOG_TRACE,
- "closing fd's of %s",
- local->loc.path);
+ if (sh_priv->offset >= sh->file_size) {
+ if (sh_priv->loops_running == 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "full self-heal completed on %s",
+ local->loc.path);
- sh_diff_private_cleanup (frame, this);
- local->self_heal.algo_completion_cbk (frame, this);
+ sh_diff_private_cleanup (frame, this);
+
+ local->self_heal.algo_completion_cbk (frame, this);
+ }
goto out;
}
- sh_diff_checksum (frame, this);
+spawn:
+ loop = 0;
+
+ LOCK (&sh_priv->lock);
+ {
+ if ((sh_priv->loops_running < priv->data_self_heal_window_size)
+ && (sh_priv->offset < sh->file_size)) {
+
+ gf_log (this->name, GF_LOG_TRACE,
+ "spawning a loop for offset %"PRId64,
+ sh_priv->offset);
+
+ offset = sh_priv->offset;
+ sh_priv->offset += sh_priv->block_size;
+
+ sh_priv->loops_running++;
+
+ loop = 1;
+ }
+ }
+ UNLOCK (&sh_priv->lock);
+
+ if (loop) {
+ sh_diff_checksum (frame, this, offset);
+ goto spawn;
+ }
+
out:
return 0;
}
@@ -757,16 +910,17 @@ afr_sh_algo_diff (call_frame_t *frame, xlator_t *this)
sh_priv = CALLOC (1, sizeof (*sh_priv));
- sh_priv->write_needed = CALLOC (priv->child_count,
- sizeof (unsigned char));
-
sh_priv->checksum = CALLOC (priv->child_count, MD5_DIGEST_LEN);
sh_priv->block_size = this->ctx->page_size;
sh->private = sh_priv;
- sh_diff_checksum (frame, this);
+ LOCK_INIT (&sh_priv->lock);
+
+ local->call_count = 0;
+
+ sh_diff_loop_driver (frame, this);
return 0;
}
diff --git a/xlators/cluster/afr/src/afr-self-heal-algorithm.h b/xlators/cluster/afr/src/afr-self-heal-algorithm.h
index 844d510c6..9995ee20b 100644
--- a/xlators/cluster/afr/src/afr-self-heal-algorithm.h
+++ b/xlators/cluster/afr/src/afr-self-heal-algorithm.h
@@ -41,8 +41,11 @@ typedef struct {
uint8_t *checksum; /* array of MD5 checksums for each child
Each checksum is MD5_DIGEST_LEN bytes long */
- unsigned char *write_needed;
size_t block_size;
+
+ gf_lock_t lock;
+ unsigned int loops_running;
+ off_t offset;
} afr_sh_algo_diff_private_t;
#endif /* __AFR_SELF_HEAL_ALGORITHM_H__ */