diff options
Diffstat (limited to 'xlators/cluster/afr/src/afr-self-heal-algorithm.c')
| -rw-r--r-- | xlators/cluster/afr/src/afr-self-heal-algorithm.c | 372 | 
1 files changed, 263 insertions, 109 deletions
diff --git a/xlators/cluster/afr/src/afr-self-heal-algorithm.c b/xlators/cluster/afr/src/afr-self-heal-algorithm.c index 834b7171d69..e8fce966b3e 100644 --- a/xlators/cluster/afr/src/afr-self-heal-algorithm.c +++ b/xlators/cluster/afr/src/afr-self-heal-algorithm.c @@ -414,9 +414,6 @@ sh_diff_private_cleanup (call_frame_t *frame, xlator_t *this)                  if (sh_priv->checksum)                          FREE (sh_priv->checksum); -                if (sh_priv->write_needed) -                        FREE (sh_priv->write_needed); -                  FREE (sh_priv);          }  } @@ -437,48 +434,112 @@ sh_diff_number_of_writes_needed (unsigned char *write_needed, int child_count)  } +struct sh_diff_loop_state { +        off_t   offset; +        int32_t child_index; +        unsigned char *write_needed; +}; + + +static int +sh_diff_loop_driver (call_frame_t *frame, xlator_t *this); + +  static int -sh_diff_iter (call_frame_t *frame, xlator_t *this); +sh_diff_loop_return (call_frame_t *rw_frame, xlator_t *this, +                     struct sh_diff_loop_state *loop_state) +{ +        afr_private_t *             priv       = NULL; +        afr_local_t *               rw_local   = NULL; +        afr_self_heal_t *           rw_sh      = NULL; + +        call_frame_t *sh_frame              = NULL; +	afr_local_t * sh_local              = NULL; +	afr_self_heal_t *sh                 = NULL; +        afr_sh_algo_diff_private_t *sh_priv = NULL; + +        priv  = this->private; + +        rw_local = rw_frame->local; +        rw_sh    = &rw_local->self_heal; + +        sh_frame = rw_sh->sh_frame; +        sh_local = sh_frame->local; +        sh       = &sh_local->self_heal; +        sh_priv  = sh->private; + +        LOCK (&sh_priv->lock); +        { +                sh_priv->loops_running--; +        } +        UNLOCK (&sh_priv->lock); + +        gf_log (this->name, GF_LOG_TRACE, +                "loop for offset %"PRId64" returned", loop_state->offset); + +        AFR_STACK_DESTROY (rw_frame); + +        if (loop_state) { +                if (loop_state->write_needed) +                        FREE (loop_state->write_needed); + +                FREE (loop_state); +        } + +        sh_diff_loop_driver (sh_frame, this); + +        return 0; +}  static int -sh_diff_write_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +sh_diff_write_cbk (call_frame_t *rw_frame, void *cookie, xlator_t *this,                     int32_t op_ret, int32_t op_errno, struct stat *buf,                     struct stat *postbuf)  { -	afr_private_t *   priv  = NULL; -	afr_local_t *     local = NULL; -	afr_self_heal_t * sh    = NULL; +	afr_private_t *   priv     = NULL; +	afr_local_t *     rw_local = NULL; +	afr_self_heal_t * rw_sh    = NULL; -	int child_index = (long) cookie; -	int call_count  = 0; +        call_frame_t *sh_frame  = NULL; +	afr_local_t * sh_local  = NULL; +	afr_self_heal_t *sh     = NULL; -	priv  = this->private; -	local = frame->local; -	sh    = &local->self_heal; +        struct sh_diff_loop_state *loop_state = (struct sh_diff_loop_state *) cookie; + +	int call_count = 0; + +	priv     = this->private; +	rw_local = rw_frame->local; +	rw_sh    = &rw_local->self_heal; + +        sh_frame = rw_sh->sh_frame; +        sh_local = sh_frame->local; +        sh       = &sh_local->self_heal;  	gf_log (this->name, GF_LOG_TRACE,  		"wrote %d bytes of data from %s to child %d, offset %"PRId64"", -		op_ret, local->loc.path, child_index, sh->offset - op_ret); +		op_ret, sh_local->loc.path, loop_state->child_index, +                loop_state->offset); -	LOCK (&frame->lock); +	LOCK (&sh_frame->lock);  	{  		if (op_ret == -1) {  			gf_log (this->name, GF_LOG_DEBUG,  				"write to %s failed on subvolume %s (%s)", -				local->loc.path, -				priv->children[child_index]->name, +				sh_local->loc.path, +				priv->children[loop_state->child_index]->name,  				strerror (op_errno));  			sh->op_failed = 1;  		}  	} -	UNLOCK (&frame->lock); +	UNLOCK (&sh_frame->lock); -	call_count = afr_frame_return (frame); +	call_count = afr_frame_return (rw_frame);  	if (call_count == 0) { -		sh_diff_iter (frame, this); +		sh_diff_loop_return (rw_frame, this, loop_state);  	}  	return 0; @@ -486,70 +547,66 @@ sh_diff_write_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  static int -sh_diff_read_cbk (call_frame_t *frame, void *cookie, +sh_diff_read_cbk (call_frame_t *rw_frame, void *cookie,                    xlator_t *this, int32_t op_ret, int32_t op_errno,                    struct iovec *vector, int32_t count, struct stat *buf,                    struct iobref *iobref)  { -	afr_private_t *   priv  = NULL; -	afr_local_t *     local = NULL; -	afr_self_heal_t * sh    = NULL; +	afr_private_t *   priv     = NULL; +	afr_local_t *     rw_local = NULL; +	afr_self_heal_t * rw_sh    = NULL; -        afr_sh_algo_diff_private_t *sh_priv = NULL; +        afr_sh_algo_diff_private_t * sh_priv = NULL; -	int child_index = (long) cookie; +        call_frame_t *sh_frame  = NULL; +	afr_local_t * sh_local  = NULL; +	afr_self_heal_t *sh     = NULL; + +        struct sh_diff_loop_state *loop_state = (struct sh_diff_loop_state *) cookie;  	int i = 0;  	int call_count = 0; -	off_t offset; +	priv     = this->private; +	rw_local = rw_frame->local; +	rw_sh    = &rw_local->self_heal; -	priv    = this->private; -	local   = frame->local; -	sh      = &local->self_heal; -        sh_priv = sh->private; +        sh_frame = rw_sh->sh_frame; +        sh_local = sh_frame->local; +        sh       = &sh_local->self_heal; +        sh_priv  = sh->private; -	call_count = sh_diff_number_of_writes_needed (sh_priv->write_needed, +	call_count = sh_diff_number_of_writes_needed (loop_state->write_needed,                                                        priv->child_count); -	local->call_count = call_count; +	rw_local->call_count = call_count;  	gf_log (this->name, GF_LOG_TRACE, -		"read %d bytes of data from %s on child %d, offset %"PRId64"", -		op_ret, local->loc.path, child_index, sh->offset); +		"read %d bytes of data from %s, offset %"PRId64"", +		op_ret, sh_local->loc.path, sh->offset);  	if (op_ret <= 0) { -		local->self_heal.algo_completion_cbk (frame, this); +                sh_diff_loop_return (rw_frame, this, loop_state); +  		return 0;  	} -	/* what if we read less than block size? */ -	offset      = sh->offset; -	sh->offset += sh_priv->block_size; -  	if (sh->file_has_holes) {  		if (iov_0filled (vector, count) == 0) { -			/* -                           the iter function depends on the -			   sh->offset already being updated -			   above -			*/ -                        sh_diff_iter (frame, this); +                        sh_diff_loop_return (rw_frame, this, loop_state);  			goto out;  		}  	}  	for (i = 0; i < priv->child_count; i++) { -                if (sh_priv->write_needed[i]) { -                        STACK_WIND_COOKIE (frame, sh_diff_write_cbk, -                                           (void *) (long) i, +                if (loop_state->write_needed[i]) { +                        STACK_WIND_COOKIE (rw_frame, sh_diff_write_cbk, +                                           (void *) (long) loop_state,                                             priv->children[i],                                             priv->children[i]->fops->writev, -                                           sh->healing_fd, vector, count, offset, -                                           iobref); - -                        sh_priv->write_needed[i] = 0; +                                           sh->healing_fd, vector, count, +                                           loop_state->offset, iobref);                          if (!--call_count)                                  break; @@ -562,73 +619,93 @@ out:  static int -sh_diff_read (call_frame_t *frame, xlator_t *this) +sh_diff_read (call_frame_t *rw_frame, xlator_t *this, +              struct sh_diff_loop_state *loop_state)  { -	afr_private_t *   priv  = NULL; -	afr_local_t *     local = NULL; -	afr_self_heal_t * sh    = NULL; +	afr_private_t *   priv     = NULL; +	afr_local_t *     rw_local = NULL; +	afr_self_heal_t * rw_sh    = NULL;          afr_sh_algo_diff_private_t * sh_priv = NULL; -	priv    = this->private; -	local   = frame->local; -	sh      = &local->self_heal; -        sh_priv = sh->private; +        call_frame_t *sh_frame  = NULL; +	afr_local_t * sh_local  = NULL; +	afr_self_heal_t *sh     = NULL; + +	priv     = this->private; +	rw_local = rw_frame->local; +	rw_sh    = &rw_local->self_heal; -	STACK_WIND_COOKIE (frame, sh_diff_read_cbk, -			   (void *) (long) sh->source, +        sh_frame = rw_sh->sh_frame; +        sh_local = sh_frame->local; +        sh       = &sh_local->self_heal; +        sh_priv  = sh->private; + +	STACK_WIND_COOKIE (rw_frame, sh_diff_read_cbk, +			   (void *) (long) loop_state,  			   priv->children[sh->source],  			   priv->children[sh->source]->fops->readv,  			   sh->healing_fd, sh_priv->block_size, -			   sh->offset); +			   loop_state->offset);  	return 0;  } -  static int -sh_diff_checksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +sh_diff_checksum_cbk (call_frame_t *rw_frame, void *cookie, xlator_t *this,                        int32_t op_ret, int32_t op_errno,                        uint32_t weak_checksum, uint8_t *strong_checksum)  { -	afr_private_t *              priv    = NULL; -	afr_local_t *                local   = NULL; -	afr_self_heal_t *            sh      = NULL; +	afr_private_t * priv    = NULL; +	afr_local_t * rw_local  = NULL; +	afr_self_heal_t *rw_sh  = NULL; + +        call_frame_t *sh_frame  = NULL; +	afr_local_t * sh_local  = NULL; +	afr_self_heal_t *sh     = NULL; +          afr_sh_algo_diff_private_t * sh_priv = NULL; -        int child_index  = (long) cookie; +        struct sh_diff_loop_state *loop_state = (struct sh_diff_loop_state *) cookie; +          int call_count   = 0;          int i            = 0;          int write_needed = 0;  	priv  = this->private; -	local = frame->local; -	sh    = &local->self_heal; + +	rw_local = rw_frame->local; +	rw_sh    = &rw_local->self_heal; + +        sh_frame = rw_sh->sh_frame; +        sh_local = sh_frame->local; +        sh       = &sh_local->self_heal;          sh_priv = sh->private;          if (op_ret < 0) {                  gf_log (this->name, GF_LOG_ERROR,                          "checksum on %s failed on subvolume %s (%s)", -                        local->loc.path, priv->children[child_index]->name, +                        sh_local->loc.path, priv->children[loop_state->child_index]->name,                          strerror (op_errno));                  sh->op_failed = 1; -                local->self_heal.algo_abort_cbk (frame, this); +                sh_diff_loop_return (rw_frame, this, loop_state); +                  return 0;          } -        memcpy ((void *) sh_priv->checksum + (child_index * MD5_DIGEST_LEN), +        memcpy ((void *) sh_priv->checksum + loop_state->child_index * MD5_DIGEST_LEN,                  strong_checksum,                  MD5_DIGEST_LEN); -        call_count = afr_frame_return (frame); +        call_count = afr_frame_return (rw_frame);          if (call_count == 0) {                  for (i = 0; i < priv->child_count; i++) { -                        if (sh->sources[i] || !local->child_up[i]) +                        if (sh->sources[i] || !sh_local->child_up[i])                                  continue;                          if (memcmp ((const void *) sh_priv->checksum + (i * MD5_DIGEST_LEN), @@ -642,18 +719,18 @@ sh_diff_checksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                                  gf_log (this->name, GF_LOG_TRACE,                                          "checksum on subvolume %s at offset %"                                          PRId64" differs from that on source", -                                        priv->children[i]->name, sh->offset); +                                        priv->children[i]->name, loop_state->offset); -                                write_needed = sh_priv->write_needed[i] = 1; +                                write_needed = loop_state->write_needed[i] = 1;                          }                  }                  if (write_needed) { -                        sh_diff_read (frame, this); +                        sh_diff_read (rw_frame, this, loop_state);                  } else {                          sh->offset += sh_priv->block_size; -                        sh_diff_iter (frame, this); +                        sh_diff_loop_return (rw_frame, this, loop_state);                  }          } @@ -662,28 +739,55 @@ sh_diff_checksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this,  static int -sh_diff_checksum (call_frame_t *frame, xlator_t *this) +sh_diff_checksum (call_frame_t *frame, xlator_t *this, off_t offset)  { -	afr_private_t *   priv  = NULL; -	afr_local_t *     local = NULL; -	afr_self_heal_t * sh    = NULL; +	afr_private_t *   priv     = NULL; +	afr_local_t *     local    = NULL; +	afr_local_t *     rw_local = NULL; +	afr_self_heal_t * sh       = NULL; +	afr_self_heal_t * rw_sh    = NULL;          afr_sh_algo_diff_private_t * sh_priv = NULL; +        call_frame_t *rw_frame = NULL; + +        struct sh_diff_loop_state *loop_state = NULL; + +        int32_t op_errno = 0; + +        int call_count = 0; +        int i          = 0; +  	priv    = this->private;  	local   = frame->local;  	sh      = &local->self_heal; +          sh_priv = sh->private; -        int call_count = 0; -        int i          = 0; +        rw_frame = copy_frame (frame); +        if (!rw_frame) +                goto out; + +        ALLOC_OR_GOTO (rw_local, afr_local_t, out); + +        rw_frame->local = rw_local; +	rw_sh           = &rw_local->self_heal; + +        rw_sh->offset       = sh->offset; +        rw_sh->sh_frame     = frame;          call_count = sh->active_sinks + 1;  /* sinks and source */ -        local->call_count = call_count; +        rw_local->call_count = call_count; -        STACK_WIND_COOKIE (frame, sh_diff_checksum_cbk, -                           (void *) (long) i, +        loop_state = CALLOC (1, sizeof (*loop_state)); +        loop_state->child_index  = sh->source; +        loop_state->offset       = offset; +        loop_state->write_needed = CALLOC (priv->child_count, +                                           sizeof (*loop_state->write_needed)); + +        STACK_WIND_COOKIE (rw_frame, sh_diff_checksum_cbk, +                           (void *) (long) loop_state,                             priv->children[sh->source],                             priv->children[sh->source]->fops->rchecksum,                             sh->healing_fd, @@ -693,51 +797,100 @@ sh_diff_checksum (call_frame_t *frame, xlator_t *this)                  if (sh->sources[i] || !local->child_up[i])                          continue; -                STACK_WIND_COOKIE (frame, sh_diff_checksum_cbk, -                                   (void *) (long) i, +                STACK_WIND_COOKIE (rw_frame, sh_diff_checksum_cbk, +                                   (void *) (long) loop_state,                                     priv->children[i],                                     priv->children[i]->fops->rchecksum,                                     sh->healing_fd,                                     sh->offset, sh_priv->block_size); +                  if (!--call_count)                          break;          }          return 0; + +out: +        sh->op_failed = 1; + +        sh_diff_loop_driver (frame, this); + +        return 0;  }  static int -sh_diff_iter (call_frame_t *frame, xlator_t *this) +sh_diff_loop_driver (call_frame_t *frame, xlator_t *this)  {  	afr_private_t *   priv  = NULL;  	afr_local_t *     local = NULL;  	afr_self_heal_t * sh    = NULL; +        afr_sh_algo_diff_private_t *sh_priv = NULL; -	priv  = this->private; -	local = frame->local; -	sh    = &local->self_heal; +        int   loop   = 0; +        off_t offset = 0; +  +	priv    = this->private; +	local   = frame->local; +	sh      = &local->self_heal; +        sh_priv = sh->private;  	if (sh->op_failed) { -                sh_diff_private_cleanup (frame, this); +                if (sh_priv->loops_running == 0) { +                        gf_log (this->name, GF_LOG_TRACE, +                                "diff self-heal aborting on %s", +                                local->loc.path); + +                        sh_diff_private_cleanup (frame, this); + +                        local->self_heal.algo_abort_cbk (frame, this); +                } -		local->self_heal.algo_abort_cbk (frame, this);  		goto out;  	} -	if (sh->offset >= sh->file_size) { -		gf_log (this->name, GF_LOG_TRACE, -			"closing fd's of %s", -			local->loc.path); +	if (sh_priv->offset >= sh->file_size) { +                if (sh_priv->loops_running == 0) { +                        gf_log (this->name, GF_LOG_TRACE, +                                "full self-heal completed on %s", +                                local->loc.path); -                sh_diff_private_cleanup (frame, this); -		local->self_heal.algo_completion_cbk (frame, this); +                        sh_diff_private_cleanup (frame, this); + +                        local->self_heal.algo_completion_cbk (frame, this); +                }  		goto out;  	} -        sh_diff_checksum (frame, this); +spawn: +        loop = 0; + +        LOCK (&sh_priv->lock); +        { +                if ((sh_priv->loops_running < priv->data_self_heal_window_size) +                    && (sh_priv->offset < sh->file_size)) { + +                        gf_log (this->name, GF_LOG_TRACE, +                                "spawning a loop for offset %"PRId64, +                                sh_priv->offset); + +                        offset           = sh_priv->offset; +                        sh_priv->offset += sh_priv->block_size; + +                        sh_priv->loops_running++; + +                        loop = 1; +                } +        } +        UNLOCK (&sh_priv->lock); + +        if (loop) { +                sh_diff_checksum (frame, this, offset); +                goto spawn; +        } +  out:          return 0;  } @@ -757,16 +910,17 @@ afr_sh_algo_diff (call_frame_t *frame, xlator_t *this)          sh_priv = CALLOC (1, sizeof (*sh_priv)); -        sh_priv->write_needed = CALLOC (priv->child_count, -                                        sizeof (unsigned char)); -          sh_priv->checksum = CALLOC (priv->child_count, MD5_DIGEST_LEN);          sh_priv->block_size = this->ctx->page_size;          sh->private = sh_priv; -        sh_diff_checksum (frame, this); +        LOCK_INIT (&sh_priv->lock); + +        local->call_count = 0; + +        sh_diff_loop_driver (frame, this);          return 0;  }  | 
