diff options
-rw-r--r-- | xlators/cluster/afr/src/afr-self-heal-algorithm.c | 372 | ||||
-rw-r--r-- | xlators/cluster/afr/src/afr-self-heal-algorithm.h | 5 |
2 files changed, 267 insertions, 110 deletions
diff --git a/xlators/cluster/afr/src/afr-self-heal-algorithm.c b/xlators/cluster/afr/src/afr-self-heal-algorithm.c index 834b7171d69..e8fce966b3e 100644 --- a/xlators/cluster/afr/src/afr-self-heal-algorithm.c +++ b/xlators/cluster/afr/src/afr-self-heal-algorithm.c @@ -414,9 +414,6 @@ sh_diff_private_cleanup (call_frame_t *frame, xlator_t *this) if (sh_priv->checksum) FREE (sh_priv->checksum); - if (sh_priv->write_needed) - FREE (sh_priv->write_needed); - FREE (sh_priv); } } @@ -437,48 +434,112 @@ sh_diff_number_of_writes_needed (unsigned char *write_needed, int child_count) } +struct sh_diff_loop_state { + off_t offset; + int32_t child_index; + unsigned char *write_needed; +}; + + +static int +sh_diff_loop_driver (call_frame_t *frame, xlator_t *this); + + static int -sh_diff_iter (call_frame_t *frame, xlator_t *this); +sh_diff_loop_return (call_frame_t *rw_frame, xlator_t *this, + struct sh_diff_loop_state *loop_state) +{ + afr_private_t * priv = NULL; + afr_local_t * rw_local = NULL; + afr_self_heal_t * rw_sh = NULL; + + call_frame_t *sh_frame = NULL; + afr_local_t * sh_local = NULL; + afr_self_heal_t *sh = NULL; + afr_sh_algo_diff_private_t *sh_priv = NULL; + + priv = this->private; + + rw_local = rw_frame->local; + rw_sh = &rw_local->self_heal; + + sh_frame = rw_sh->sh_frame; + sh_local = sh_frame->local; + sh = &sh_local->self_heal; + sh_priv = sh->private; + + LOCK (&sh_priv->lock); + { + sh_priv->loops_running--; + } + UNLOCK (&sh_priv->lock); + + gf_log (this->name, GF_LOG_TRACE, + "loop for offset %"PRId64" returned", loop_state->offset); + + AFR_STACK_DESTROY (rw_frame); + + if (loop_state) { + if (loop_state->write_needed) + FREE (loop_state->write_needed); + + FREE (loop_state); + } + + sh_diff_loop_driver (sh_frame, this); + + return 0; +} static int -sh_diff_write_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +sh_diff_write_cbk (call_frame_t *rw_frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct stat *buf, struct stat *postbuf) { - afr_private_t * priv = NULL; - afr_local_t * local = NULL; - afr_self_heal_t * sh = NULL; + afr_private_t * priv = NULL; + afr_local_t * rw_local = NULL; + afr_self_heal_t * rw_sh = NULL; - int child_index = (long) cookie; - int call_count = 0; + call_frame_t *sh_frame = NULL; + afr_local_t * sh_local = NULL; + afr_self_heal_t *sh = NULL; - priv = this->private; - local = frame->local; - sh = &local->self_heal; + struct sh_diff_loop_state *loop_state = (struct sh_diff_loop_state *) cookie; + + int call_count = 0; + + priv = this->private; + rw_local = rw_frame->local; + rw_sh = &rw_local->self_heal; + + sh_frame = rw_sh->sh_frame; + sh_local = sh_frame->local; + sh = &sh_local->self_heal; gf_log (this->name, GF_LOG_TRACE, "wrote %d bytes of data from %s to child %d, offset %"PRId64"", - op_ret, local->loc.path, child_index, sh->offset - op_ret); + op_ret, sh_local->loc.path, loop_state->child_index, + loop_state->offset); - LOCK (&frame->lock); + LOCK (&sh_frame->lock); { if (op_ret == -1) { gf_log (this->name, GF_LOG_DEBUG, "write to %s failed on subvolume %s (%s)", - local->loc.path, - priv->children[child_index]->name, + sh_local->loc.path, + priv->children[loop_state->child_index]->name, strerror (op_errno)); sh->op_failed = 1; } } - UNLOCK (&frame->lock); + UNLOCK (&sh_frame->lock); - call_count = afr_frame_return (frame); + call_count = afr_frame_return (rw_frame); if (call_count == 0) { - sh_diff_iter (frame, this); + sh_diff_loop_return (rw_frame, this, loop_state); } return 0; @@ -486,70 +547,66 @@ sh_diff_write_cbk (call_frame_t *frame, void *cookie, xlator_t *this, static int -sh_diff_read_cbk (call_frame_t *frame, void *cookie, +sh_diff_read_cbk (call_frame_t *rw_frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iovec *vector, int32_t count, struct stat *buf, struct iobref *iobref) { - afr_private_t * priv = NULL; - afr_local_t * local = NULL; - afr_self_heal_t * sh = NULL; + afr_private_t * priv = NULL; + afr_local_t * rw_local = NULL; + afr_self_heal_t * rw_sh = NULL; - afr_sh_algo_diff_private_t *sh_priv = NULL; + afr_sh_algo_diff_private_t * sh_priv = NULL; - int child_index = (long) cookie; + call_frame_t *sh_frame = NULL; + afr_local_t * sh_local = NULL; + afr_self_heal_t *sh = NULL; + + struct sh_diff_loop_state *loop_state = (struct sh_diff_loop_state *) cookie; int i = 0; int call_count = 0; - off_t offset; + priv = this->private; + rw_local = rw_frame->local; + rw_sh = &rw_local->self_heal; - priv = this->private; - local = frame->local; - sh = &local->self_heal; - sh_priv = sh->private; + sh_frame = rw_sh->sh_frame; + sh_local = sh_frame->local; + sh = &sh_local->self_heal; + sh_priv = sh->private; - call_count = sh_diff_number_of_writes_needed (sh_priv->write_needed, + call_count = sh_diff_number_of_writes_needed (loop_state->write_needed, priv->child_count); - local->call_count = call_count; + rw_local->call_count = call_count; gf_log (this->name, GF_LOG_TRACE, - "read %d bytes of data from %s on child %d, offset %"PRId64"", - op_ret, local->loc.path, child_index, sh->offset); + "read %d bytes of data from %s, offset %"PRId64"", + op_ret, sh_local->loc.path, sh->offset); if (op_ret <= 0) { - local->self_heal.algo_completion_cbk (frame, this); + sh_diff_loop_return (rw_frame, this, loop_state); + return 0; } - /* what if we read less than block size? */ - offset = sh->offset; - sh->offset += sh_priv->block_size; - if (sh->file_has_holes) { if (iov_0filled (vector, count) == 0) { - /* - the iter function depends on the - sh->offset already being updated - above - */ - sh_diff_iter (frame, this); + sh_diff_loop_return (rw_frame, this, loop_state); goto out; } } for (i = 0; i < priv->child_count; i++) { - if (sh_priv->write_needed[i]) { - STACK_WIND_COOKIE (frame, sh_diff_write_cbk, - (void *) (long) i, + if (loop_state->write_needed[i]) { + STACK_WIND_COOKIE (rw_frame, sh_diff_write_cbk, + (void *) (long) loop_state, priv->children[i], priv->children[i]->fops->writev, - sh->healing_fd, vector, count, offset, - iobref); - - sh_priv->write_needed[i] = 0; + sh->healing_fd, vector, count, + loop_state->offset, iobref); if (!--call_count) break; @@ -562,73 +619,93 @@ out: static int -sh_diff_read (call_frame_t *frame, xlator_t *this) +sh_diff_read (call_frame_t *rw_frame, xlator_t *this, + struct sh_diff_loop_state *loop_state) { - afr_private_t * priv = NULL; - afr_local_t * local = NULL; - afr_self_heal_t * sh = NULL; + afr_private_t * priv = NULL; + afr_local_t * rw_local = NULL; + afr_self_heal_t * rw_sh = NULL; afr_sh_algo_diff_private_t * sh_priv = NULL; - priv = this->private; - local = frame->local; - sh = &local->self_heal; - sh_priv = sh->private; + call_frame_t *sh_frame = NULL; + afr_local_t * sh_local = NULL; + afr_self_heal_t *sh = NULL; + + priv = this->private; + rw_local = rw_frame->local; + rw_sh = &rw_local->self_heal; - STACK_WIND_COOKIE (frame, sh_diff_read_cbk, - (void *) (long) sh->source, + sh_frame = rw_sh->sh_frame; + sh_local = sh_frame->local; + sh = &sh_local->self_heal; + sh_priv = sh->private; + + STACK_WIND_COOKIE (rw_frame, sh_diff_read_cbk, + (void *) (long) loop_state, priv->children[sh->source], priv->children[sh->source]->fops->readv, sh->healing_fd, sh_priv->block_size, - sh->offset); + loop_state->offset); return 0; } - static int -sh_diff_checksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +sh_diff_checksum_cbk (call_frame_t *rw_frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, uint32_t weak_checksum, uint8_t *strong_checksum) { - afr_private_t * priv = NULL; - afr_local_t * local = NULL; - afr_self_heal_t * sh = NULL; + afr_private_t * priv = NULL; + afr_local_t * rw_local = NULL; + afr_self_heal_t *rw_sh = NULL; + + call_frame_t *sh_frame = NULL; + afr_local_t * sh_local = NULL; + afr_self_heal_t *sh = NULL; + afr_sh_algo_diff_private_t * sh_priv = NULL; - int child_index = (long) cookie; + struct sh_diff_loop_state *loop_state = (struct sh_diff_loop_state *) cookie; + int call_count = 0; int i = 0; int write_needed = 0; priv = this->private; - local = frame->local; - sh = &local->self_heal; + + rw_local = rw_frame->local; + rw_sh = &rw_local->self_heal; + + sh_frame = rw_sh->sh_frame; + sh_local = sh_frame->local; + sh = &sh_local->self_heal; sh_priv = sh->private; if (op_ret < 0) { gf_log (this->name, GF_LOG_ERROR, "checksum on %s failed on subvolume %s (%s)", - local->loc.path, priv->children[child_index]->name, + sh_local->loc.path, priv->children[loop_state->child_index]->name, strerror (op_errno)); sh->op_failed = 1; - local->self_heal.algo_abort_cbk (frame, this); + sh_diff_loop_return (rw_frame, this, loop_state); + return 0; } - memcpy ((void *) sh_priv->checksum + (child_index * MD5_DIGEST_LEN), + memcpy ((void *) sh_priv->checksum + loop_state->child_index * MD5_DIGEST_LEN, strong_checksum, MD5_DIGEST_LEN); - call_count = afr_frame_return (frame); + call_count = afr_frame_return (rw_frame); if (call_count == 0) { for (i = 0; i < priv->child_count; i++) { - if (sh->sources[i] || !local->child_up[i]) + if (sh->sources[i] || !sh_local->child_up[i]) continue; if (memcmp ((const void *) sh_priv->checksum + (i * MD5_DIGEST_LEN), @@ -642,18 +719,18 @@ sh_diff_checksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this, gf_log (this->name, GF_LOG_TRACE, "checksum on subvolume %s at offset %" PRId64" differs from that on source", - priv->children[i]->name, sh->offset); + priv->children[i]->name, loop_state->offset); - write_needed = sh_priv->write_needed[i] = 1; + write_needed = loop_state->write_needed[i] = 1; } } if (write_needed) { - sh_diff_read (frame, this); + sh_diff_read (rw_frame, this, loop_state); } else { sh->offset += sh_priv->block_size; - sh_diff_iter (frame, this); + sh_diff_loop_return (rw_frame, this, loop_state); } } @@ -662,28 +739,55 @@ sh_diff_checksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this, static int -sh_diff_checksum (call_frame_t *frame, xlator_t *this) +sh_diff_checksum (call_frame_t *frame, xlator_t *this, off_t offset) { - afr_private_t * priv = NULL; - afr_local_t * local = NULL; - afr_self_heal_t * sh = NULL; + afr_private_t * priv = NULL; + afr_local_t * local = NULL; + afr_local_t * rw_local = NULL; + afr_self_heal_t * sh = NULL; + afr_self_heal_t * rw_sh = NULL; afr_sh_algo_diff_private_t * sh_priv = NULL; + call_frame_t *rw_frame = NULL; + + struct sh_diff_loop_state *loop_state = NULL; + + int32_t op_errno = 0; + + int call_count = 0; + int i = 0; + priv = this->private; local = frame->local; sh = &local->self_heal; + sh_priv = sh->private; - int call_count = 0; - int i = 0; + rw_frame = copy_frame (frame); + if (!rw_frame) + goto out; + + ALLOC_OR_GOTO (rw_local, afr_local_t, out); + + rw_frame->local = rw_local; + rw_sh = &rw_local->self_heal; + + rw_sh->offset = sh->offset; + rw_sh->sh_frame = frame; call_count = sh->active_sinks + 1; /* sinks and source */ - local->call_count = call_count; + rw_local->call_count = call_count; - STACK_WIND_COOKIE (frame, sh_diff_checksum_cbk, - (void *) (long) i, + loop_state = CALLOC (1, sizeof (*loop_state)); + loop_state->child_index = sh->source; + loop_state->offset = offset; + loop_state->write_needed = CALLOC (priv->child_count, + sizeof (*loop_state->write_needed)); + + STACK_WIND_COOKIE (rw_frame, sh_diff_checksum_cbk, + (void *) (long) loop_state, priv->children[sh->source], priv->children[sh->source]->fops->rchecksum, sh->healing_fd, @@ -693,51 +797,100 @@ sh_diff_checksum (call_frame_t *frame, xlator_t *this) if (sh->sources[i] || !local->child_up[i]) continue; - STACK_WIND_COOKIE (frame, sh_diff_checksum_cbk, - (void *) (long) i, + STACK_WIND_COOKIE (rw_frame, sh_diff_checksum_cbk, + (void *) (long) loop_state, priv->children[i], priv->children[i]->fops->rchecksum, sh->healing_fd, sh->offset, sh_priv->block_size); + if (!--call_count) break; } return 0; + +out: + sh->op_failed = 1; + + sh_diff_loop_driver (frame, this); + + return 0; } static int -sh_diff_iter (call_frame_t *frame, xlator_t *this) +sh_diff_loop_driver (call_frame_t *frame, xlator_t *this) { afr_private_t * priv = NULL; afr_local_t * local = NULL; afr_self_heal_t * sh = NULL; + afr_sh_algo_diff_private_t *sh_priv = NULL; - priv = this->private; - local = frame->local; - sh = &local->self_heal; + int loop = 0; + off_t offset = 0; + + priv = this->private; + local = frame->local; + sh = &local->self_heal; + sh_priv = sh->private; if (sh->op_failed) { - sh_diff_private_cleanup (frame, this); + if (sh_priv->loops_running == 0) { + gf_log (this->name, GF_LOG_TRACE, + "diff self-heal aborting on %s", + local->loc.path); + + sh_diff_private_cleanup (frame, this); + + local->self_heal.algo_abort_cbk (frame, this); + } - local->self_heal.algo_abort_cbk (frame, this); goto out; } - if (sh->offset >= sh->file_size) { - gf_log (this->name, GF_LOG_TRACE, - "closing fd's of %s", - local->loc.path); + if (sh_priv->offset >= sh->file_size) { + if (sh_priv->loops_running == 0) { + gf_log (this->name, GF_LOG_TRACE, + "full self-heal completed on %s", + local->loc.path); - sh_diff_private_cleanup (frame, this); - local->self_heal.algo_completion_cbk (frame, this); + sh_diff_private_cleanup (frame, this); + + local->self_heal.algo_completion_cbk (frame, this); + } goto out; } - sh_diff_checksum (frame, this); +spawn: + loop = 0; + + LOCK (&sh_priv->lock); + { + if ((sh_priv->loops_running < priv->data_self_heal_window_size) + && (sh_priv->offset < sh->file_size)) { + + gf_log (this->name, GF_LOG_TRACE, + "spawning a loop for offset %"PRId64, + sh_priv->offset); + + offset = sh_priv->offset; + sh_priv->offset += sh_priv->block_size; + + sh_priv->loops_running++; + + loop = 1; + } + } + UNLOCK (&sh_priv->lock); + + if (loop) { + sh_diff_checksum (frame, this, offset); + goto spawn; + } + out: return 0; } @@ -757,16 +910,17 @@ afr_sh_algo_diff (call_frame_t *frame, xlator_t *this) sh_priv = CALLOC (1, sizeof (*sh_priv)); - sh_priv->write_needed = CALLOC (priv->child_count, - sizeof (unsigned char)); - sh_priv->checksum = CALLOC (priv->child_count, MD5_DIGEST_LEN); sh_priv->block_size = this->ctx->page_size; sh->private = sh_priv; - sh_diff_checksum (frame, this); + LOCK_INIT (&sh_priv->lock); + + local->call_count = 0; + + sh_diff_loop_driver (frame, this); return 0; } diff --git a/xlators/cluster/afr/src/afr-self-heal-algorithm.h b/xlators/cluster/afr/src/afr-self-heal-algorithm.h index 844d510c654..9995ee20bd3 100644 --- a/xlators/cluster/afr/src/afr-self-heal-algorithm.h +++ b/xlators/cluster/afr/src/afr-self-heal-algorithm.h @@ -41,8 +41,11 @@ typedef struct { uint8_t *checksum; /* array of MD5 checksums for each child Each checksum is MD5_DIGEST_LEN bytes long */ - unsigned char *write_needed; size_t block_size; + + gf_lock_t lock; + unsigned int loops_running; + off_t offset; } afr_sh_algo_diff_private_t; #endif /* __AFR_SELF_HEAL_ALGORITHM_H__ */ |