diff options
-rw-r--r-- | xlators/cluster/afr/src/afr-self-heal-algorithm.c | 267 | ||||
-rw-r--r-- | xlators/cluster/afr/src/afr-self-heal-algorithm.h | 6 | ||||
-rw-r--r-- | xlators/cluster/afr/src/afr.c | 19 | ||||
-rw-r--r-- | xlators/cluster/afr/src/afr.h | 6 |
4 files changed, 243 insertions, 55 deletions
diff --git a/xlators/cluster/afr/src/afr-self-heal-algorithm.c b/xlators/cluster/afr/src/afr-self-heal-algorithm.c index 79edb1c00..834b7171d 100644 --- a/xlators/cluster/afr/src/afr-self-heal-algorithm.c +++ b/xlators/cluster/afr/src/afr-self-heal-algorithm.c @@ -50,46 +50,116 @@ source to sinks. */ + +static void +sh_full_private_cleanup (call_frame_t *frame, xlator_t *this) +{ + afr_private_t * priv = NULL; + afr_local_t * local = NULL; + afr_self_heal_t * sh = NULL; + afr_sh_algo_full_private_t *sh_priv = NULL; + + priv = this->private; + local = frame->local; + sh = &local->self_heal; + + sh_priv = sh->private; + + if (sh_priv) + FREE (sh_priv); +} + + +static int +sh_full_loop_driver (call_frame_t *frame, xlator_t *this); + static int -sh_full_read_write_iter (call_frame_t *frame, xlator_t *this); +sh_full_loop_return (call_frame_t *rw_frame, xlator_t *this, off_t offset) +{ + afr_private_t * priv = NULL; + afr_local_t * rw_local = NULL; + afr_self_heal_t * rw_sh = NULL; + + call_frame_t *sh_frame = NULL; + afr_local_t * sh_local = NULL; + afr_self_heal_t *sh = NULL; + afr_sh_algo_full_private_t *sh_priv = NULL; + + priv = this->private; + + rw_local = rw_frame->local; + rw_sh = &rw_local->self_heal; + + sh_frame = rw_sh->sh_frame; + sh_local = sh_frame->local; + sh = &sh_local->self_heal; + sh_priv = sh->private; + + LOCK (&sh_priv->lock); + { + sh_priv->loops_running--; + } + UNLOCK (&sh_priv->lock); + + gf_log (this->name, GF_LOG_TRACE, + "loop for offset %"PRId64" returned", offset); + + AFR_STACK_DESTROY (rw_frame); + + sh_full_loop_driver (sh_frame, this); + + return 0; +} + static int -sh_full_write_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +sh_full_write_cbk (call_frame_t *rw_frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct stat *prebuf, struct stat *postbuf) { - afr_private_t * priv = NULL; - afr_local_t * local = NULL; - afr_self_heal_t *sh = NULL; + afr_private_t * priv = NULL; + afr_local_t * rw_local = NULL; + afr_self_heal_t *rw_sh = NULL; + + call_frame_t *sh_frame = NULL; + afr_local_t * sh_local = NULL; + afr_self_heal_t *sh = NULL; int child_index = (long) cookie; int call_count = 0; priv = this->private; - local = frame->local; - sh = &local->self_heal; + + rw_local = rw_frame->local; + rw_sh = &rw_local->self_heal; + + sh_frame = rw_sh->sh_frame; + sh_local = sh_frame->local; + sh = &sh_local->self_heal; gf_log (this->name, GF_LOG_TRACE, "wrote %d bytes of data from %s to child %d, offset %"PRId64"", - op_ret, local->loc.path, child_index, sh->offset - op_ret); + op_ret, sh_local->loc.path, child_index, + rw_sh->offset - op_ret); - LOCK (&frame->lock); + LOCK (&sh_frame->lock); { if (op_ret == -1) { gf_log (this->name, GF_LOG_DEBUG, "write to %s failed on subvolume %s (%s)", - local->loc.path, + sh_local->loc.path, priv->children[child_index]->name, strerror (op_errno)); + sh->op_failed = 1; } } - UNLOCK (&frame->lock); + UNLOCK (&sh_frame->lock); - call_count = afr_frame_return (frame); + call_count = afr_frame_return (rw_frame); if (call_count == 0) { - sh_full_read_write_iter (frame, this); + sh_full_loop_return (rw_frame, this, rw_sh->offset - op_ret); } return 0; @@ -97,41 +167,48 @@ sh_full_write_cbk (call_frame_t *frame, void *cookie, xlator_t *this, static int -sh_full_read_cbk (call_frame_t *frame, void *cookie, +sh_full_read_cbk (call_frame_t *rw_frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iovec *vector, int32_t count, struct stat *buf, struct iobref *iobref) { - afr_private_t * priv = NULL; - afr_local_t * local = NULL; - afr_self_heal_t *sh = NULL; + afr_private_t * priv = NULL; + afr_local_t * rw_local = NULL; + afr_self_heal_t *rw_sh = NULL; + + call_frame_t *sh_frame = NULL; + afr_local_t * sh_local = NULL; + afr_self_heal_t *sh = NULL; - int child_index = (long) cookie; int i = 0; int call_count = 0; - off_t offset; + off_t offset = (long) cookie; priv = this->private; - local = frame->local; - sh = &local->self_heal; + rw_local = rw_frame->local; + rw_sh = &rw_local->self_heal; + + sh_frame = rw_sh->sh_frame; + sh_local = sh_frame->local; + sh = &sh_local->self_heal; call_count = sh->active_sinks; - local->call_count = call_count; + rw_local->call_count = call_count; gf_log (this->name, GF_LOG_TRACE, - "read %d bytes of data from %s on child %d, offset %"PRId64"", - op_ret, local->loc.path, child_index, sh->offset); + "read %d bytes of data from %s, offset %"PRId64"", + op_ret, sh_local->loc.path, offset); if (op_ret <= 0) { - local->self_heal.algo_completion_cbk (frame, this); + sh->op_failed = 1; + + sh_full_loop_return (rw_frame, this, offset); return 0; } - /* what if we read less than block size? */ - offset = sh->offset; - sh->offset += op_ret; + rw_sh->offset += op_ret; if (sh->file_has_holes) { if (iov_0filled (vector, count) == 0) { @@ -140,17 +217,18 @@ sh_full_read_cbk (call_frame_t *frame, void *cookie, above */ - sh_full_read_write_iter (frame, this); + sh_full_loop_return (rw_frame, this, offset); goto out; } } for (i = 0; i < priv->child_count; i++) { - if (sh->sources[i] || !local->child_up[i]) + if (sh->sources[i] || !sh_local->child_up[i]) continue; /* this is a sink, so write to it */ - STACK_WIND_COOKIE (frame, sh_full_write_cbk, + + STACK_WIND_COOKIE (rw_frame, sh_full_write_cbk, (void *) (long) i, priv->children[i], priv->children[i]->fops->writev, @@ -167,54 +245,120 @@ out: static int -sh_full_read_write (call_frame_t *frame, xlator_t *this) +sh_full_read_write (call_frame_t *frame, xlator_t *this, off_t offset) { - afr_private_t * priv = NULL; - afr_local_t * local = NULL; - afr_self_heal_t *sh = NULL; + afr_private_t * priv = NULL; + afr_local_t * local = NULL; + afr_local_t * rw_local = NULL; + afr_self_heal_t *rw_sh = NULL; + afr_self_heal_t *sh = NULL; - priv = this->private; - local = frame->local; - sh = &local->self_heal; + call_frame_t *rw_frame = NULL; - STACK_WIND_COOKIE (frame, sh_full_read_cbk, - (void *) (long) sh->source, + int32_t op_errno = 0; + + priv = this->private; + local = frame->local; + sh = &local->self_heal; + + rw_frame = copy_frame (frame); + if (!rw_frame) + goto out; + + ALLOC_OR_GOTO (rw_local, afr_local_t, out); + + rw_frame->local = rw_local; + rw_sh = &rw_local->self_heal; + + rw_sh->offset = sh->offset; + rw_sh->sh_frame = frame; + + STACK_WIND_COOKIE (rw_frame, sh_full_read_cbk, + (void *) (long) offset, priv->children[sh->source], priv->children[sh->source]->fops->readv, sh->healing_fd, sh->block_size, - sh->offset); + offset); + return 0; + +out: + sh->op_failed = 1; + + sh_full_loop_driver (frame, this); return 0; } static int -sh_full_read_write_iter (call_frame_t *frame, xlator_t *this) +sh_full_loop_driver (call_frame_t *frame, xlator_t *this) { afr_private_t * priv = NULL; afr_local_t * local = NULL; afr_self_heal_t *sh = NULL; + afr_sh_algo_full_private_t *sh_priv = NULL; - priv = this->private; - local = frame->local; - sh = &local->self_heal; + int loop = 0; + off_t offset = 0; + + priv = this->private; + local = frame->local; + sh = &local->self_heal; + sh_priv = sh->private; if (sh->op_failed) { - local->self_heal.algo_abort_cbk (frame, this); + if (sh_priv->loops_running == 0) { + gf_log (this->name, GF_LOG_TRACE, + "full self-heal aborting on %s", + local->loc.path); + + sh_full_private_cleanup (frame, this); + local->self_heal.algo_abort_cbk (frame, this); + } + goto out; } - if (sh->offset >= sh->file_size) { - gf_log (this->name, GF_LOG_TRACE, - "closing fd's of %s", - local->loc.path); + if (sh_priv->offset >= sh->file_size) { + if (sh_priv->loops_running == 0) { - local->self_heal.algo_completion_cbk (frame, this); + gf_log (this->name, GF_LOG_TRACE, + "full self-heal completed on %s", + local->loc.path); + + sh_full_private_cleanup (frame, this); + local->self_heal.algo_completion_cbk (frame, this); + } goto out; } - sh_full_read_write (frame, this); +spawn: + loop = 0; + + LOCK (&sh_priv->lock); + { + if ((sh_priv->loops_running < priv->data_self_heal_window_size) + && (sh_priv->offset < sh->file_size)) { + + gf_log (this->name, GF_LOG_TRACE, + "spawning a loop for offset %"PRId64, + sh_priv->offset); + + offset = sh_priv->offset; + sh_priv->offset += sh->block_size; + + sh_priv->loops_running++; + + loop = 1; + } + } + UNLOCK (&sh_priv->lock); + + if (loop) { + sh_full_read_write (frame, this, offset); + goto spawn; + } out: return 0; @@ -224,7 +368,24 @@ out: int afr_sh_algo_full (call_frame_t *frame, xlator_t *this) { - sh_full_read_write (frame, this); + afr_private_t * priv = NULL; + afr_local_t * local = NULL; + afr_self_heal_t * sh = NULL; + afr_sh_algo_full_private_t *sh_priv = NULL; + + priv = this->private; + local = frame->local; + sh = &local->self_heal; + + sh_priv = CALLOC (1, sizeof (*sh_priv)); + + LOCK_INIT (&sh_priv->lock); + + sh->private = sh_priv; + + local->call_count = 0; + + sh_full_loop_driver (frame, this); return 0; } diff --git a/xlators/cluster/afr/src/afr-self-heal-algorithm.h b/xlators/cluster/afr/src/afr-self-heal-algorithm.h index 8552b19d0..844d510c6 100644 --- a/xlators/cluster/afr/src/afr-self-heal-algorithm.h +++ b/xlators/cluster/afr/src/afr-self-heal-algorithm.h @@ -32,6 +32,12 @@ struct afr_sh_algorithm { struct afr_sh_algorithm afr_self_heal_algorithms[3]; typedef struct { + gf_lock_t lock; + unsigned int loops_running; + off_t offset; +} afr_sh_algo_full_private_t; + +typedef struct { uint8_t *checksum; /* array of MD5 checksums for each child Each checksum is MD5_DIGEST_LEN bytes long */ diff --git a/xlators/cluster/afr/src/afr.c b/xlators/cluster/afr/src/afr.c index eea030ad1..bf023ce27 100644 --- a/xlators/cluster/afr/src/afr.c +++ b/xlators/cluster/afr/src/afr.c @@ -2338,6 +2338,7 @@ init (xlator_t *this) char * change_log = NULL; int32_t lock_server_count = 1; + int32_t window_size; int fav_ret = -1; int read_ret = -1; @@ -2391,6 +2392,19 @@ init (xlator_t *this) priv->data_self_heal_algorithm = strdup (algo); } + + priv->data_self_heal_window_size = 16; + + dict_ret = dict_get_int32 (this->options, "data-self-heal-window-size", + &window_size); + if (dict_ret == 0) { + gf_log (this->name, GF_LOG_DEBUG, + "Setting data self-heal window size to %d.", + window_size); + + priv->data_self_heal_window_size = window_size; + } + dict_ret = dict_get_str (this->options, "metadata-self-heal", &self_heal); if (dict_ret == 0) { @@ -2665,6 +2679,11 @@ struct volume_options options[] = { { .key = {"data-self-heal-algorithm"}, .type = GF_OPTION_TYPE_STR }, + { .key = {"data-self-heal-window-size"}, + .type = GF_OPTION_TYPE_INT, + .min = 1, + .max = 1024 + }, { .key = {"metadata-self-heal"}, .type = GF_OPTION_TYPE_BOOL }, diff --git a/xlators/cluster/afr/src/afr.h b/xlators/cluster/afr/src/afr.h index 4e937fb17..e3ab4ebe4 100644 --- a/xlators/cluster/afr/src/afr.h +++ b/xlators/cluster/afr/src/afr.h @@ -45,8 +45,10 @@ typedef struct _afr_private { char **pending_key; - gf_boolean_t data_self_heal; /* on/off */ - char * data_self_heal_algorithm; /* name of algorithm */ + gf_boolean_t data_self_heal; /* on/off */ + char * data_self_heal_algorithm; /* name of algorithm */ + unsigned int data_self_heal_window_size; /* max number of pipelined + read/writes */ gf_boolean_t metadata_self_heal; /* on/off */ gf_boolean_t entry_self_heal; /* on/off */ |