diff options
| -rw-r--r-- | xlators/cluster/afr/src/afr-self-heal-data.c | 385 | ||||
| -rw-r--r-- | xlators/cluster/afr/src/afr.h | 6 | 
2 files changed, 248 insertions, 143 deletions
diff --git a/xlators/cluster/afr/src/afr-self-heal-data.c b/xlators/cluster/afr/src/afr-self-heal-data.c index bafb92fd218..46d0748318c 100644 --- a/xlators/cluster/afr/src/afr-self-heal-data.c +++ b/xlators/cluster/afr/src/afr-self-heal-data.c @@ -66,7 +66,9 @@ afr_sh_data_done (call_frame_t *frame, xlator_t *this)  	   TODO: cleanup sh->*   	 */ -        if (sh->healing_fd) { +        if (sh->healing_fd && !sh->healing_fd_opened) { +                /* unref only if we created the fd ourselves */ +  		fd_unref (sh->healing_fd);  		sh->healing_fd = NULL;          } @@ -158,6 +160,13 @@ afr_sh_data_close (call_frame_t *frame, xlator_t *this)  	stbuf.st_mtime = sh->buf[source].st_mtime;  #endif +        if (sh->healing_fd_opened) { +                /* not our job to close the fd */ + +                afr_sh_data_done (frame, this); +                return 0; +        } +  	if (!sh->healing_fd) {  		afr_sh_data_done (frame, this);  		return 0; @@ -268,6 +277,13 @@ afr_sh_data_unlock (call_frame_t *frame, xlator_t *this)  	sh = &local->self_heal;  	priv = this->private; +        if (sh->data_lock_held) { +                /* not our job to unlock, proceed to close */ + +                afr_sh_data_close (frame, this); +                return 0; +        } +          for (i = 0; i < priv->child_count; i++) {                  if (sh->locked_nodes[i])                          call_count++; @@ -393,8 +409,8 @@ afr_sh_data_erase_pending (call_frame_t *frame, xlator_t *this)  		STACK_WIND_COOKIE (frame, afr_sh_data_erase_pending_cbk,  				   (void *) (long) i,  				   priv->children[i], -				   priv->children[i]->fops->xattrop, -				   &local->loc, +				   priv->children[i]->fops->fxattrop, +				   sh->healing_fd,  				   GF_XATTROP_ADD_ARRAY, erase_xattr[i]);  		if (!--call_count)  			break; @@ -571,131 +587,6 @@ afr_sh_data_pick_algo (call_frame_t *frame, xlator_t *this)  int -afr_sh_data_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, -		      int32_t op_ret, int32_t op_errno, fd_t *fd) -{ -	afr_local_t     *local = NULL; -	afr_self_heal_t *sh = NULL; -	afr_private_t   *priv = NULL; -	int              call_count = 0; -	int              child_index = 0; - -        struct afr_sh_algorithm *sh_algo = NULL; - -	local = frame->local; -	sh = &local->self_heal; -	priv = this->private; - -	child_index = (long) cookie; - -	/* TODO: some of the open's might fail. -	   In that case, modify cleanup fn to send flush on those  -	   fd's which are already open */ - -	LOCK (&frame->lock); -	{ -		if (op_ret == -1) { -			gf_log (this->name, GF_LOG_TRACE, -				"open of %s failed on child %s (%s)", -				local->loc.path, -				priv->children[child_index]->name, -				strerror (op_errno)); -			sh->op_failed = 1; -		} - -	} -	UNLOCK (&frame->lock); - -	call_count = afr_frame_return (frame); - -	if (call_count == 0) { -		if (sh->op_failed) { -			afr_sh_data_finish (frame, this); -			return 0; -		} -		gf_log (this->name, GF_LOG_TRACE, -			"fd for %s opened, commencing sync", -			local->loc.path); - -		gf_log (this->name, GF_LOG_TRACE, -			"sourcing file %s from %s to other sinks", -			local->loc.path, priv->children[sh->source]->name); - -                sh->algo_completion_cbk = afr_sh_data_trim_sinks; -                sh->algo_abort_cbk      = afr_sh_data_finish; - -                sh_algo = afr_sh_data_pick_algo (frame, this); - -		sh_algo->fn (frame, this); -	} - -	return 0; -} - - -int -afr_sh_data_open (call_frame_t *frame, xlator_t *this) -{ -	int i = 0;				 -	int call_count = 0;		      - -	int source = -1; -	int *sources = NULL; - -	fd_t *fd = NULL; - -	afr_local_t *   local = NULL; -	afr_private_t * priv  = NULL; -	afr_self_heal_t *sh = NULL; - -	local = frame->local; -	sh = &local->self_heal; -	priv = this->private; - -	call_count = sh->active_sinks + 1; -	local->call_count = call_count; - -	fd = fd_create (local->loc.inode, frame->root->pid); -	sh->healing_fd = fd; - -	source  = local->self_heal.source; -	sources = local->self_heal.sources; - -	sh->block_size = 65536; -	sh->file_size  = sh->buf[source].st_size; - -	if (FILE_HAS_HOLES (&sh->buf[source])) -		sh->file_has_holes = 1; - -	/* open source */ -	STACK_WIND_COOKIE (frame, afr_sh_data_open_cbk, -			   (void *) (long) source, -			   priv->children[source], -			   priv->children[source]->fops->open, -			   &local->loc, O_RDWR|O_LARGEFILE, fd, 0); -	call_count--; - -	/* open sinks */ -	for (i = 0; i < priv->child_count; i++) { -		if(sources[i] || !local->child_up[i]) -			continue; - -		STACK_WIND_COOKIE (frame, afr_sh_data_open_cbk, -				   (void *) (long) i, -				   priv->children[i],  -				   priv->children[i]->fops->open, -				   &local->loc,  -				   O_RDWR|O_LARGEFILE, fd, 0);  - -		if (!--call_count) -			break; -	} - -	return 0; -} - - -int  afr_sh_data_sync_prepare (call_frame_t *frame, xlator_t *this)  {  	afr_local_t     *local = NULL; @@ -705,6 +596,8 @@ afr_sh_data_sync_prepare (call_frame_t *frame, xlator_t *this)  	int              source = 0;  	int              i = 0; +        struct afr_sh_algorithm *sh_algo = NULL; +  	local = frame->local;  	sh = &local->self_heal;  	priv = this->private; @@ -732,7 +625,12 @@ afr_sh_data_sync_prepare (call_frame_t *frame, xlator_t *this)  		"self-healing file %s from subvolume %s to %d other",  		local->loc.path, priv->children[source]->name, active_sinks); -	afr_sh_data_open (frame, this); +        sh->algo_completion_cbk = afr_sh_data_trim_sinks; +        sh->algo_abort_cbk      = afr_sh_data_finish; + +        sh_algo = afr_sh_data_pick_algo (frame, this); + +        sh_algo->fn (frame, this);  	return 0;  } @@ -809,7 +707,13 @@ afr_sh_data_fix (call_frame_t *frame, xlator_t *this)                  return 0;          } -	sh->source = source; +	sh->source     = source; +	sh->block_size = 65536; +	sh->file_size  = sh->buf[source].st_size; + +	if (FILE_HAS_HOLES (&sh->buf[source])) +		sh->file_has_holes = 1; +          local->cont.lookup.buf.st_size = sh->buf[source].st_size;  	/* detect changes not visible through pending flags -- JIC */ @@ -834,10 +738,9 @@ afr_sh_data_fix (call_frame_t *frame, xlator_t *this)  int -afr_sh_data_lookup_cbk (call_frame_t *frame, void *cookie, -			xlator_t *this, int32_t op_ret, int32_t op_errno, -                        inode_t *inode, struct stat *buf, dict_t *xattr, -                        struct stat *postparent) +afr_sh_data_fstat_cbk (call_frame_t *frame, void *cookie, +                       xlator_t *this, int32_t op_ret, int32_t op_errno, +                       struct stat *buf)  {  	afr_private_t   *priv  = NULL;  	afr_local_t     *local = NULL; @@ -853,7 +756,11 @@ afr_sh_data_lookup_cbk (call_frame_t *frame, void *cookie,  	LOCK (&frame->lock);  	{  		if (op_ret != -1) { -			sh->xattr[child_index] = dict_ref (xattr); +                        gf_log (this->name, GF_LOG_TRACE, +                                "fstat of %s on %s succeeded", +                                local->loc.path, +                                priv->children[child_index]->name); +  			sh->buf[child_index] = *buf;  		}  	} @@ -870,13 +777,89 @@ afr_sh_data_lookup_cbk (call_frame_t *frame, void *cookie,  int -afr_sh_data_lookup (call_frame_t *frame, xlator_t *this) +afr_sh_data_fstat (call_frame_t *frame, xlator_t *this) +{ +	afr_self_heal_t *sh    = NULL;  +	afr_local_t     *local = NULL; +	afr_private_t   *priv  = NULL; + +	int call_count = 0; +	int i = 0; + +	priv  = this->private; +	local = frame->local; +	sh    = &local->self_heal; + +	call_count = local->child_count; + +	local->call_count = call_count; + +	for (i = 0; i < priv->child_count; i++) { +		if (local->child_up[i]) { +			STACK_WIND_COOKIE (frame, afr_sh_data_fstat_cbk, +					   (void *) (long) i, +					   priv->children[i],  +					   priv->children[i]->fops->fstat, +					   sh->healing_fd); + +			if (!--call_count) +				break; +		} +	} + +	return 0; +} + + +int +afr_sh_data_fxattrop_cbk (call_frame_t *frame, void *cookie, +                          xlator_t *this, int32_t op_ret, int32_t op_errno, +                          dict_t *xattr) +{ +	afr_private_t   *priv  = NULL; +	afr_local_t     *local = NULL; +	afr_self_heal_t *sh = NULL; + +	int call_count  = -1; +	int child_index = (long) cookie; + +	local = frame->local; +	sh = &local->self_heal; +	priv = this->private; + +	LOCK (&frame->lock); +	{ +		if (op_ret != -1) { +                        gf_log (this->name, GF_LOG_TRACE, +                                "fxattrop of %s on %s succeeded", +                                local->loc.path, +                                priv->children[child_index]->name); + +			sh->xattr[child_index] = dict_ref (xattr); +		} +	} +	UNLOCK (&frame->lock); + +	call_count = afr_frame_return (frame); + +	if (call_count == 0) { +		afr_sh_data_fstat (frame, this); +	} + +	return 0; +} + + +int +afr_sh_data_fxattrop (call_frame_t *frame, xlator_t *this)  {  	afr_self_heal_t *sh    = NULL;   	afr_local_t     *local = NULL;  	afr_private_t   *priv  = NULL;  	dict_t          *xattr_req = NULL; +        int32_t zero_pending[3] = {0, 0, 0}; +  	int call_count = 0;  	int i = 0;  	int ret = 0; @@ -892,18 +875,20 @@ afr_sh_data_lookup (call_frame_t *frame, xlator_t *this)  	xattr_req = dict_new();  	if (xattr_req) {                  for (i = 0; i < priv->child_count; i++) { -                        ret = dict_set_uint64 (xattr_req, priv->pending_key[i], -                                               3 * sizeof(int32_t)); +                        ret = dict_set_static_bin (xattr_req, priv->pending_key[i], +                                                   zero_pending, 3 * sizeof(int32_t));                  }          }  	for (i = 0; i < priv->child_count; i++) {  		if (local->child_up[i]) { -			STACK_WIND_COOKIE (frame, afr_sh_data_lookup_cbk, +			STACK_WIND_COOKIE (frame, afr_sh_data_fxattrop_cbk,  					   (void *) (long) i,  					   priv->children[i],  -					   priv->children[i]->fops->lookup, -					   &local->loc, xattr_req); +					   priv->children[i]->fops->fxattrop, +					   sh->healing_fd, GF_XATTROP_ADD_ARRAY, +                                           xattr_req); +  			if (!--call_count)  				break;  		} @@ -993,7 +978,7 @@ afr_sh_data_lock_rec (call_frame_t *frame, xlator_t *this, int child_index)          if ((child_index == priv->child_count)              || (sh->lock_count == afr_lock_server_count (priv, AFR_DATA_TRANSACTION))) { -                afr_sh_data_lookup (frame, this); +                afr_sh_data_fxattrop (frame, this);                  return 0;          } @@ -1025,6 +1010,14 @@ afr_sh_data_lock (call_frame_t *frame, xlator_t *this)  	sh    = &local->self_heal;  	priv  = this->private; +        if (sh->data_lock_held) { +                /* caller has held the lock already, +                   so skip locking */ + +                afr_sh_data_fxattrop (frame, this); +                return 0; +        } +          for (i = 0; i < priv->child_count; i++)                  sh->locked_nodes[i] = 0; @@ -1033,6 +1026,112 @@ afr_sh_data_lock (call_frame_t *frame, xlator_t *this)  int +afr_sh_data_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +		      int32_t op_ret, int32_t op_errno, fd_t *fd) +{ +	afr_local_t     *local = NULL; +	afr_self_heal_t *sh = NULL; +	afr_private_t   *priv = NULL; +	int              call_count = 0; +	int              child_index = 0; + +	local = frame->local; +	sh = &local->self_heal; +	priv = this->private; + +	child_index = (long) cookie; + +	/* TODO: some of the open's might fail. +	   In that case, modify cleanup fn to send flush on those  +	   fd's which are already open */ + +	LOCK (&frame->lock); +	{ +		if (op_ret == -1) { +			gf_log (this->name, GF_LOG_TRACE, +				"open of %s failed on child %s (%s)", +				local->loc.path, +				priv->children[child_index]->name, +				strerror (op_errno)); +			sh->op_failed = 1; +		} + +                gf_log (this->name, GF_LOG_TRACE, +                        "open of %s succeeded on child %s", +                        local->loc.path, +                        priv->children[child_index]->name); +	} +	UNLOCK (&frame->lock); + +	call_count = afr_frame_return (frame); + +	if (call_count == 0) { +		if (sh->op_failed) { +			afr_sh_data_finish (frame, this); +			return 0; +		} + +		gf_log (this->name, GF_LOG_TRACE, +			"fd for %s opened, commencing sync", +			local->loc.path); + +                afr_sh_data_lock (frame, this); +	} + +	return 0; +} + + +int +afr_sh_data_open (call_frame_t *frame, xlator_t *this) +{ +	int i = 0; +	int call_count = 0; + +	fd_t *fd = NULL; + +	afr_local_t *   local = NULL; +	afr_private_t * priv  = NULL; +	afr_self_heal_t *sh = NULL; + +	local = frame->local; +	sh = &local->self_heal; +	priv = this->private; + +        if (sh->healing_fd_opened) { +                /* caller has opened the fd for us already, so skip open */ + +                afr_sh_data_lock (frame, this); +                return 0; +        } + +	call_count = afr_up_children_count (priv->child_count, local->child_up); +	local->call_count = call_count; + +	fd = fd_create (local->loc.inode, frame->root->pid); +	sh->healing_fd = fd; + +	/* open sinks */ +	for (i = 0; i < priv->child_count; i++) { +		if(!local->child_up[i]) +			continue; + +		STACK_WIND_COOKIE (frame, afr_sh_data_open_cbk, +				   (void *) (long) i, +				   priv->children[i],  +				   priv->children[i]->fops->open, +				   &local->loc,  +				   O_RDWR|O_LARGEFILE, fd, 0);  + +		if (!--call_count) +			break; +	} + +	return 0; +} + + +int  afr_self_heal_data (call_frame_t *frame, xlator_t *this)  {  	afr_local_t   *local = NULL; @@ -1044,7 +1143,7 @@ afr_self_heal_data (call_frame_t *frame, xlator_t *this)  	sh = &local->self_heal;  	if (local->need_data_self_heal && priv->data_self_heal) { -		afr_sh_data_lock (frame, this); +		afr_sh_data_open (frame, this);  	} else {  		gf_log (this->name, GF_LOG_TRACE,  			"not doing data self heal on %s", diff --git a/xlators/cluster/afr/src/afr.h b/xlators/cluster/afr/src/afr.h index 2f57426621d..412f38afc79 100644 --- a/xlators/cluster/afr/src/afr.h +++ b/xlators/cluster/afr/src/afr.h @@ -118,6 +118,12 @@ typedef struct {          /* private data for the particular self-heal algorithm */          void *private; +        gf_boolean_t healing_fd_opened;   /* set by caller: true if caller +                                             has already opened fd */ + +        gf_boolean_t data_lock_held;      /* set by caller: true if caller +                                             has already acquired 0-0 lock */ +  	int (*completion_cbk) (call_frame_t *frame, xlator_t *this);          int (*algo_completion_cbk) (call_frame_t *frame, xlator_t *this);          int (*algo_abort_cbk) (call_frame_t *frame, xlator_t *this);  | 
