summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/afr/src
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/afr/src')
-rw-r--r--xlators/cluster/afr/src/afr-common.c82
-rw-r--r--xlators/cluster/afr/src/afr-self-heal-common.c143
-rw-r--r--xlators/cluster/afr/src/afr-self-heal-common.h6
-rw-r--r--xlators/cluster/afr/src/afr-self-heal-data.c49
-rw-r--r--xlators/cluster/afr/src/afr-self-heal-entry.c416
-rw-r--r--xlators/cluster/afr/src/afr-transaction.c18
-rw-r--r--xlators/cluster/afr/src/afr-transaction.h2
-rw-r--r--xlators/cluster/afr/src/afr.h11
8 files changed, 358 insertions, 369 deletions
diff --git a/xlators/cluster/afr/src/afr-common.c b/xlators/cluster/afr/src/afr-common.c
index c9a8b5955..b0f7b38f3 100644
--- a/xlators/cluster/afr/src/afr-common.c
+++ b/xlators/cluster/afr/src/afr-common.c
@@ -721,8 +721,6 @@ afr_local_sh_cleanup (afr_local_t *local, xlator_t *this)
{
afr_self_heal_t *sh = NULL;
afr_private_t *priv = NULL;
- int i = 0;
-
sh = &local->self_heal;
priv = this->private;
@@ -744,19 +742,8 @@ afr_local_sh_cleanup (afr_local_t *local, xlator_t *this)
if (sh->child_errno)
GF_FREE (sh->child_errno);
- if (sh->pending_matrix) {
- for (i = 0; i < priv->child_count; i++) {
- GF_FREE (sh->pending_matrix[i]);
- }
- GF_FREE (sh->pending_matrix);
- }
-
- if (sh->delta_matrix) {
- for (i = 0; i < priv->child_count; i++) {
- GF_FREE (sh->delta_matrix[i]);
- }
- GF_FREE (sh->delta_matrix);
- }
+ afr_matrix_cleanup (sh->pending_matrix, priv->child_count);
+ afr_matrix_cleanup (sh->delta_matrix, priv->child_count);
if (sh->sources)
GF_FREE (sh->sources);
@@ -800,17 +787,11 @@ afr_local_sh_cleanup (afr_local_t *local, xlator_t *this)
void
afr_local_transaction_cleanup (afr_local_t *local, xlator_t *this)
{
- int i = 0;
afr_private_t * priv = NULL;
priv = this->private;
- for (i = 0; i < priv->child_count; i++) {
- if (local->pending && local->pending[i])
- GF_FREE (local->pending[i]);
- }
-
- GF_FREE (local->pending);
+ afr_matrix_cleanup (local->pending, priv->child_count);
if (local->internal_lock.locked_nodes)
GF_FREE (local->internal_lock.locked_nodes);
@@ -3670,10 +3651,47 @@ out:
return ret;
}
+void
+afr_matrix_cleanup (int32_t **matrix, unsigned int m)
+{
+ int i = 0;
+
+ if (!matrix)
+ goto out;
+ for (i = 0; i < m; i++) {
+ GF_FREE (matrix[i]);
+ }
+
+ GF_FREE (matrix);
+out:
+ return;
+}
+
+int32_t**
+afr_matrix_create (unsigned int m, unsigned int n)
+{
+ int32_t **matrix = NULL;
+ int i = 0;
+
+ matrix = GF_CALLOC (sizeof (*matrix), m, gf_afr_mt_int32_t);
+ if (!matrix)
+ goto out;
+
+ for (i = 0; i < m; i++) {
+ matrix[i] = GF_CALLOC (sizeof (*matrix[i]), n,
+ gf_afr_mt_int32_t);
+ if (!matrix[i])
+ goto out;
+ }
+ return matrix;
+out:
+ afr_matrix_cleanup (matrix, m);
+ return NULL;
+}
+
int
afr_transaction_local_init (afr_local_t *local, xlator_t *this)
{
- int i = 0;
int child_up_count = 0;
int ret = -ENOMEM;
afr_private_t *priv = NULL;
@@ -3707,13 +3725,6 @@ afr_transaction_local_init (afr_local_t *local, xlator_t *this)
if (!local->transaction.eager_lock)
goto out;
- local->pending = GF_CALLOC (sizeof (*local->pending),
- priv->child_count,
- gf_afr_mt_int32_t);
-
- if (!local->pending)
- goto out;
-
local->fresh_children = afr_children_create (priv->child_count);
if (!local->fresh_children)
goto out;
@@ -3732,13 +3743,10 @@ afr_transaction_local_init (afr_local_t *local, xlator_t *this)
if (!local->transaction.pre_op)
goto out;
- for (i = 0; i < priv->child_count; i++) {
- local->pending[i] = GF_CALLOC (sizeof (*local->pending[i]),
- 3, /* data + metadata + entry */
- gf_afr_mt_int32_t);
- if (!local->pending[i])
- goto out;
- }
+ local->pending = afr_matrix_create (priv->child_count,
+ AFR_NUM_CHANGE_LOGS);
+ if (!local->pending)
+ goto out;
local->transaction.child_errno =
GF_CALLOC (sizeof (*local->transaction.child_errno),
diff --git a/xlators/cluster/afr/src/afr-self-heal-common.c b/xlators/cluster/afr/src/afr-self-heal-common.c
index 5acbf90aa..8fbea8c9d 100644
--- a/xlators/cluster/afr/src/afr-self-heal-common.c
+++ b/xlators/cluster/afr/src/afr-self-heal-common.c
@@ -1023,8 +1023,7 @@ afr_valid_ia_type (ia_type_t ia_type)
int
afr_impunge_frame_create (call_frame_t *frame, xlator_t *this,
- int active_source, int ret_child, mode_t entry_mode,
- call_frame_t **impunge_frame)
+ int active_source, call_frame_t **impunge_frame)
{
afr_local_t *local = NULL;
afr_local_t *impunge_local = NULL;
@@ -1048,14 +1047,17 @@ afr_impunge_frame_create (call_frame_t *frame, xlator_t *this,
impunge_sh = &impunge_local->self_heal;
impunge_sh->sh_frame = frame;
impunge_sh->active_source = active_source;
- impunge_sh->impunge_ret_child = ret_child;
- impunge_sh->impunging_entry_mode = entry_mode;
impunge_local->child_up = memdup (local->child_up,
sizeof (*local->child_up) *
priv->child_count);
if (!impunge_local->child_up)
goto out;
+ impunge_local->pending = afr_matrix_create (priv->child_count,
+ AFR_NUM_CHANGE_LOGS);
+ if (!impunge_local->pending)
+ goto out;
+
ret = afr_sh_common_create (impunge_sh, priv->child_count);
if (ret) {
op_errno = -ret;
@@ -1070,54 +1072,83 @@ out:
}
void
-afr_sh_call_entry_impunge_recreate (call_frame_t *frame, xlator_t *this,
- int child_index, struct iatt *buf,
- struct iatt *postparent,
- afr_impunge_done_cbk_t impunge_done)
+afr_sh_missing_entry_call_impunge_recreate (call_frame_t *frame, xlator_t *this,
+ struct iatt *buf,
+ struct iatt *postparent,
+ afr_impunge_done_cbk_t impunge_done)
{
call_frame_t *impunge_frame = NULL;
afr_local_t *local = NULL;
afr_local_t *impunge_local = NULL;
afr_self_heal_t *sh = NULL;
+ afr_self_heal_t *impunge_sh = NULL;
int ret = 0;
- mode_t mode = 0;
+ unsigned int enoent_count = 0;
+ afr_private_t *priv = NULL;
+ int i = 0;
local = frame->local;
sh = &local->self_heal;
- mode = st_mode_from_ia (buf->ia_prot, buf->ia_type);
- ret = afr_impunge_frame_create (frame, this, sh->source, child_index,
- mode, &impunge_frame);
+ priv = this->private;
+
+ enoent_count = afr_errno_count (NULL, sh->child_errno,
+ priv->child_count, ENOENT);
+ if (!enoent_count) {
+ gf_log (this->name, GF_LOG_INFO,
+ "no missing files - %s. proceeding to metadata check",
+ local->loc.path);
+ goto out;
+ }
+ sh->impunge_done = impunge_done;
+ ret = afr_impunge_frame_create (frame, this, sh->source, &impunge_frame);
if (ret)
goto out;
impunge_local = impunge_frame->local;
+ impunge_sh = &impunge_local->self_heal;
loc_copy (&impunge_local->loc, &local->loc);
- sh->impunge_done = impunge_done;
- impunge_local->call_count = 1;
- afr_sh_entry_impunge_create (impunge_frame, this, child_index, buf,
- postparent);
+ afr_build_parent_loc (&impunge_sh->parent_loc, &impunge_local->loc);
+ impunge_local->call_count = enoent_count;
+ impunge_sh->entrybuf = sh->buf[sh->source];
+ impunge_sh->parentbuf = sh->parentbufs[sh->source];
+ for (i = 0; i < priv->child_count; i++) {
+ if (!impunge_local->child_up[i]) {
+ impunge_sh->child_errno[i] = ENOTCONN;
+ continue;
+ }
+ if (sh->child_errno[i] != ENOENT) {
+ impunge_sh->child_errno[i] = EEXIST;
+ continue;
+ }
+ }
+ for (i = 0; i < priv->child_count; i++) {
+ if (sh->child_errno[i] != ENOENT)
+ continue;
+ afr_sh_entry_impunge_create (impunge_frame, this, i);
+ enoent_count--;
+ }
+ GF_ASSERT (!enoent_count);
return;
out:
- gf_log (this->name, GF_LOG_ERROR, "impunge of %s failed, reason: %s",
- local->loc.path, strerror (-ret));
- impunge_done (frame, this, child_index, -1, -ret);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "impunge of %s failed, "
+ "reason: %s", local->loc.path, strerror (-ret));
+ sh->op_failed = 1;
+ }
+ afr_sh_missing_entries_finish (frame, this);
}
int
-afr_sh_create_entry_cbk (call_frame_t *frame, xlator_t *this, int child,
+afr_sh_create_entry_cbk (call_frame_t *frame, xlator_t *this,
int32_t op_ret, int32_t op_errno)
{
- int call_count = 0;
afr_local_t *local = NULL;
+ afr_self_heal_t *sh = NULL;
local = frame->local;
-
- if (op_ret == -1)
- gf_log (this->name, GF_LOG_ERROR,
- "create entry %s failed, on child %d reason, %s",
- local->loc.path, child, strerror (op_errno));
- call_count = afr_frame_return (frame);
- if (call_count == 0)
- afr_sh_missing_entries_finish (frame, this);
+ sh = &local->self_heal;
+ if (op_ret < 0)
+ sh->op_failed = 1;
+ afr_sh_missing_entries_finish (frame, this);
return 0;
}
@@ -1127,26 +1158,11 @@ sh_missing_entries_create (call_frame_t *frame, xlator_t *this)
afr_local_t *local = NULL;
afr_self_heal_t *sh = NULL;
int type = 0;
- afr_private_t *priv = NULL;
- int enoent_count = 0;
- int i = 0;
struct iatt *buf = NULL;
struct iatt *postparent = NULL;
local = frame->local;
sh = &local->self_heal;
- priv = this->private;
-
- enoent_count = afr_errno_count (NULL, sh->child_errno,
- priv->child_count, ENOENT);
- if (enoent_count == 0) {
- gf_log (this->name, GF_LOG_INFO,
- "no missing files - %s. proceeding to metadata check",
- local->loc.path);
- /* proceed to next step - metadata self-heal */
- afr_sh_missing_entries_finish (frame, this);
- return 0;
- }
buf = &sh->buf[sh->source];
postparent = &sh->parentbufs[sh->source];
@@ -1160,17 +1176,9 @@ sh_missing_entries_create (call_frame_t *frame, xlator_t *this)
goto out;
}
- local->call_count = enoent_count;
- for (i = 0; i < priv->child_count; i++) {
- //If !child_up errno will be zero
- if (sh->child_errno[i] != ENOENT)
- continue;
- afr_sh_call_entry_impunge_recreate (frame, this, i,
+ afr_sh_missing_entry_call_impunge_recreate (frame, this,
buf, postparent,
afr_sh_create_entry_cbk);
- enoent_count--;
- }
- GF_ASSERT (enoent_count == 0);
out:
return 0;
}
@@ -2039,7 +2047,6 @@ afr_self_heal (call_frame_t *frame, xlator_t *this, inode_t *inode)
afr_local_t *local = NULL;
afr_self_heal_t *sh = NULL;
afr_private_t *priv = NULL;
- int i = 0;
int32_t op_errno = 0;
int ret = 0;
afr_self_heal_t *orig_sh = NULL;
@@ -2060,7 +2067,7 @@ afr_self_heal (call_frame_t *frame, xlator_t *this, inode_t *inode)
local->self_heal.do_data_self_heal,
local->self_heal.do_entry_self_heal);
- op_errno = ENOMEM;
+ op_errno = ENOMEM;
sh_frame = copy_frame (frame);
if (!sh_frame)
goto out;
@@ -2093,30 +2100,16 @@ afr_self_heal (call_frame_t *frame, xlator_t *this, inode_t *inode)
if (!sh->locked_nodes)
goto out;
- sh->pending_matrix = GF_CALLOC (sizeof (int32_t *), priv->child_count,
- gf_afr_mt_int32_t);
+ sh->pending_matrix = afr_matrix_create (priv->child_count,
+ priv->child_count);
if (!sh->pending_matrix)
goto out;
- for (i = 0; i < priv->child_count; i++) {
- sh->pending_matrix[i] = GF_CALLOC (sizeof (int32_t),
- priv->child_count,
- gf_afr_mt_int32_t);
- if (!sh->pending_matrix[i])
- goto out;
- }
-
- sh->delta_matrix = GF_CALLOC (sizeof (int32_t *), priv->child_count,
- gf_afr_mt_int32_t);
+ sh->delta_matrix = afr_matrix_create (priv->child_count,
+ priv->child_count);
if (!sh->delta_matrix)
goto out;
- for (i = 0; i < priv->child_count; i++) {
- sh->delta_matrix[i] = GF_CALLOC (sizeof (int32_t),
- priv->child_count,
- gf_afr_mt_int32_t);
- if (!sh->delta_matrix)
- goto out;
- }
+
sh->fresh_parent_dirs = afr_children_create (priv->child_count);
if (!sh->fresh_parent_dirs)
goto out;
@@ -2173,6 +2166,8 @@ afr_self_heal (call_frame_t *frame, xlator_t *this, inode_t *inode)
out:
if (op_errno) {
orig_sh->unwind (frame, this, -1, op_errno);
+ if (sh_frame)
+ AFR_STACK_DESTROY (sh_frame);
}
return 0;
}
diff --git a/xlators/cluster/afr/src/afr-self-heal-common.h b/xlators/cluster/afr/src/afr-self-heal-common.h
index b7a736a74..114c17777 100644
--- a/xlators/cluster/afr/src/afr-self-heal-common.h
+++ b/xlators/cluster/afr/src/afr-self-heal-common.h
@@ -100,8 +100,7 @@ afr_sh_entrylk (call_frame_t *frame, xlator_t *this, loc_t *loc,
char *base_name, afr_lock_cbk_t lock_cbk);
int
afr_sh_entry_impunge_create (call_frame_t *impunge_frame, xlator_t *this,
- int child_index, struct iatt *buf,
- struct iatt *postparent);
+ int child_index);
int
afr_sh_data_unlock (call_frame_t *frame, xlator_t *this,
afr_lock_cbk_t lock_cbk);
@@ -125,6 +124,5 @@ afr_build_child_loc (xlator_t *this, loc_t *child, loc_t *parent, char *name,
uuid_t gfid);
int
afr_impunge_frame_create (call_frame_t *frame, xlator_t *this,
- int active_source, int ret_child, mode_t entry_mode,
- call_frame_t **impunge_frame);
+ int active_source, call_frame_t **impunge_frame);
#endif /* __AFR_SELF_HEAL_COMMON_H__ */
diff --git a/xlators/cluster/afr/src/afr-self-heal-data.c b/xlators/cluster/afr/src/afr-self-heal-data.c
index c1c1d483e..765edd277 100644
--- a/xlators/cluster/afr/src/afr-self-heal-data.c
+++ b/xlators/cluster/afr/src/afr-self-heal-data.c
@@ -749,50 +749,6 @@ afr_sh_data_fix (call_frame_t *frame, xlator_t *this)
return 0;
}
-static void
-afr_destroy_pending_matrix (int32_t **pending_matrix, int32_t child_count)
-{
- int i = 0;
- GF_ASSERT (child_count > 0);
- if (pending_matrix) {
- for (i = 0; i < child_count; i++) {
- if (pending_matrix[i])
- GF_FREE (pending_matrix[i]);
- }
- GF_FREE (pending_matrix);
- }
-}
-
-static int32_t**
-afr_create_pending_matrix (int32_t child_count)
-{
- gf_boolean_t cleanup = _gf_false;
- int32_t **pending_matrix = NULL;
- int i = 0;
-
- GF_ASSERT (child_count > 0);
-
- pending_matrix = GF_CALLOC (sizeof (*pending_matrix), child_count,
- gf_afr_mt_int32_t);
- if (NULL == pending_matrix)
- goto out;
- for (i = 0; i < child_count; i++) {
- pending_matrix[i] = GF_CALLOC (sizeof (**pending_matrix),
- child_count,
- gf_afr_mt_int32_t);
- if (NULL == pending_matrix[i]) {
- cleanup = _gf_true;
- goto out;
- }
- }
-out:
- if (_gf_true == cleanup) {
- afr_destroy_pending_matrix (pending_matrix, child_count);
- pending_matrix = NULL;
- }
- return pending_matrix;
-}
-
int
afr_lookup_select_read_child_by_txn_type (xlator_t *this, afr_local_t *local,
dict_t **xattr,
@@ -813,7 +769,8 @@ afr_lookup_select_read_child_by_txn_type (xlator_t *this, afr_local_t *local,
bufs = local->cont.lookup.bufs;
success_children = local->cont.lookup.success_children;
- pending_matrix = afr_create_pending_matrix (priv->child_count);
+ pending_matrix = afr_matrix_create (priv->child_count,
+ priv->child_count);
if (NULL == pending_matrix)
goto out;
@@ -837,7 +794,7 @@ afr_lookup_select_read_child_by_txn_type (xlator_t *this, afr_local_t *local,
config_read_child,
sources);
out:
- afr_destroy_pending_matrix (pending_matrix, priv->child_count);
+ afr_matrix_cleanup (pending_matrix, priv->child_count);
gf_log (this->name, GF_LOG_DEBUG, "returning read_child: %d",
read_child);
return read_child;
diff --git a/xlators/cluster/afr/src/afr-self-heal-entry.c b/xlators/cluster/afr/src/afr-self-heal-entry.c
index ed1c51a21..570c7080f 100644
--- a/xlators/cluster/afr/src/afr-self-heal-entry.c
+++ b/xlators/cluster/afr/src/afr-self-heal-entry.c
@@ -317,8 +317,7 @@ int
afr_sh_entry_impunge_all (call_frame_t *frame, xlator_t *this);
int
-afr_sh_entry_impunge_subvol (call_frame_t *frame, xlator_t *this,
- int active_src);
+afr_sh_entry_impunge_subvol (call_frame_t *frame, xlator_t *this);
int
afr_sh_entry_expunge_all (call_frame_t *frame, xlator_t *this);
@@ -887,15 +886,19 @@ out:
int
afr_sh_entry_impunge_entry_done (call_frame_t *frame, xlator_t *this,
- int active_src, int32_t op_ret,
- int32_t op_errno)
+ int32_t op_ret, int32_t op_errno)
{
int call_count = 0;
+ afr_local_t *local = NULL;
+ afr_self_heal_t *sh = NULL;
+ local = frame->local;
+ sh = &local->self_heal;
+ if (op_ret < 0)
+ sh->entries_skipped = _gf_true;
call_count = afr_frame_return (frame);
-
if (call_count == 0)
- afr_sh_entry_impunge_subvol (frame, this, active_src);
+ afr_sh_entry_impunge_subvol (frame, this);
return 0;
}
@@ -909,15 +912,12 @@ afr_sh_entry_call_impunge_done (call_frame_t *impunge_frame, xlator_t *this,
afr_self_heal_t *sh = NULL;
afr_self_heal_t *impunge_sh = NULL;
call_frame_t *frame = NULL;
- int32_t impunge_ret_child = 0;
AFR_INIT_SH_FRAME_VALS (impunge_frame, impunge_local, impunge_sh,
frame, local, sh);
- impunge_ret_child = impunge_sh->impunge_ret_child;
AFR_STACK_DESTROY (impunge_frame);
- sh->impunge_done (frame, this, impunge_ret_child, op_ret,
- op_errno);
+ sh->impunge_done (frame, this, op_ret, op_errno);
}
int
@@ -936,7 +936,7 @@ afr_sh_entry_impunge_setattr_cbk (call_frame_t *impunge_frame, void *cookie,
child_index = (long) cookie;
if (op_ret == 0) {
- gf_log (this->name, GF_LOG_TRACE,
+ gf_log (this->name, GF_LOG_DEBUG,
"setattr done for %s on %s",
impunge_local->loc.path,
priv->children[child_index]->name);
@@ -948,19 +948,103 @@ afr_sh_entry_impunge_setattr_cbk (call_frame_t *impunge_frame, void *cookie,
strerror (op_errno));
}
- LOCK (&impunge_frame->lock);
- {
- call_count = --impunge_local->call_count;
+ call_count = afr_frame_return (impunge_frame);
+ if (call_count == 0) {
+ afr_sh_entry_call_impunge_done (impunge_frame, this,
+ 0, op_errno);
}
- UNLOCK (&impunge_frame->lock);
- if (call_count == 0)
- afr_sh_entry_call_impunge_done (impunge_frame, this,
- op_ret, op_errno);
+ return 0;
+}
+
+int
+afr_sh_entry_impunge_parent_setattr_cbk (call_frame_t *setattr_frame,
+ void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt *preop, struct iatt *postop)
+{
+ int call_count = 0;
+ afr_local_t *setattr_local = NULL;
+ setattr_local = setattr_frame->local;
+ if (op_ret != 0) {
+ gf_log (this->name, GF_LOG_INFO,
+ "setattr on parent directory (%s) failed: %s",
+ setattr_local->loc.path, strerror (op_errno));
+ }
+
+ call_count = afr_frame_return (setattr_frame);
+ if (call_count == 0)
+ AFR_STACK_DESTROY (setattr_frame);
return 0;
}
+int
+afr_sh_entry_impunge_setattr (call_frame_t *impunge_frame, xlator_t *this)
+{
+ afr_private_t *priv = NULL;
+ afr_local_t *impunge_local = NULL;
+ afr_local_t *setattr_local = NULL;
+ afr_self_heal_t *impunge_sh = NULL;
+ call_frame_t *setattr_frame = NULL;
+ int32_t valid = 0;
+ int32_t op_errno = 0;
+ int child_index = 0;
+ int call_count = 0;
+ int i = 0;
+
+ priv = this->private;
+ impunge_local = impunge_frame->local;
+ impunge_sh = &impunge_local->self_heal;
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "setting ownership of %s on %s to %d/%d",
+ impunge_local->loc.path,
+ priv->children[child_index]->name,
+ impunge_sh->entrybuf.ia_uid,
+ impunge_sh->entrybuf.ia_gid);
+
+ setattr_frame = copy_frame (impunge_frame);
+ if (!setattr_frame) {
+ op_errno = ENOMEM;
+ goto out;
+ }
+ ALLOC_OR_GOTO (setattr_frame->local, afr_local_t, out);
+ setattr_local = setattr_frame->local;
+ call_count = afr_errno_count (NULL, impunge_sh->child_errno,
+ priv->child_count, 0);
+ loc_copy (&setattr_local->loc, &impunge_sh->parent_loc);
+ impunge_local->call_count = call_count;
+ setattr_local->call_count = call_count;
+ for (i = 0; i < priv->child_count; i++) {
+ if (impunge_sh->child_errno[i])
+ continue;
+ valid = GF_SET_ATTR_ATIME | GF_SET_ATTR_MTIME;
+ STACK_WIND_COOKIE (setattr_frame,
+ afr_sh_entry_impunge_parent_setattr_cbk,
+ (void *) (long) i, priv->children[i],
+ priv->children[i]->fops->setattr,
+ &setattr_local->loc,
+ &impunge_sh->parentbuf, valid);
+
+ valid = GF_SET_ATTR_UID | GF_SET_ATTR_GID |
+ GF_SET_ATTR_ATIME | GF_SET_ATTR_MTIME;
+ STACK_WIND_COOKIE (impunge_frame,
+ afr_sh_entry_impunge_setattr_cbk,
+ (void *) (long) i, priv->children[i],
+ priv->children[i]->fops->setattr,
+ &impunge_local->loc,
+ &impunge_sh->entrybuf, valid);
+ call_count--;
+ }
+ GF_ASSERT (!call_count);
+ return 0;
+out:
+ if (setattr_frame)
+ AFR_STACK_DESTROY (setattr_frame);
+ afr_sh_entry_call_impunge_done (impunge_frame, this, 0, op_errno);
+ return 0;
+}
int
afr_sh_entry_impunge_xattrop_cbk (call_frame_t *impunge_frame, void *cookie,
@@ -971,8 +1055,6 @@ afr_sh_entry_impunge_xattrop_cbk (call_frame_t *impunge_frame, void *cookie,
afr_private_t *priv = NULL;
afr_local_t *impunge_local = NULL;
int child_index = 0;
- struct iatt stbuf = {0};
- int32_t valid = 0;
priv = this->private;
impunge_local = impunge_frame->local;
@@ -985,55 +1067,84 @@ afr_sh_entry_impunge_xattrop_cbk (call_frame_t *impunge_frame, void *cookie,
impunge_local->loc.path,
priv->children[child_index]->name,
strerror (op_errno));
+ goto out;
}
- gf_log (this->name, GF_LOG_TRACE,
- "setting ownership of %s on %s to %d/%d",
- impunge_local->loc.path,
- priv->children[child_index]->name,
- impunge_local->cont.lookup.buf.ia_uid,
- impunge_local->cont.lookup.buf.ia_gid);
-
- stbuf.ia_atime = impunge_local->cont.lookup.buf.ia_atime;
- stbuf.ia_atime_nsec = impunge_local->cont.lookup.buf.ia_atime_nsec;
- stbuf.ia_mtime = impunge_local->cont.lookup.buf.ia_mtime;
- stbuf.ia_mtime_nsec = impunge_local->cont.lookup.buf.ia_mtime_nsec;
-
- stbuf.ia_uid = impunge_local->cont.lookup.buf.ia_uid;
- stbuf.ia_gid = impunge_local->cont.lookup.buf.ia_gid;
-
- valid = GF_SET_ATTR_UID | GF_SET_ATTR_GID |
- GF_SET_ATTR_ATIME | GF_SET_ATTR_MTIME;
-
- STACK_WIND_COOKIE (impunge_frame, afr_sh_entry_impunge_setattr_cbk,
- (void *) (long) child_index,
- priv->children[child_index],
- priv->children[child_index]->fops->setattr,
- &impunge_local->loc,
- &stbuf, valid);
+ afr_sh_entry_impunge_setattr (impunge_frame, this);
+ return 0;
+out:
+ afr_sh_entry_call_impunge_done (impunge_frame, this,
+ -1, op_errno);
return 0;
}
+void
+afr_sh_prepare_new_entry_pending_matrix (int32_t **pending,
+ int *child_errno,
+ struct iatt *buf,
+ unsigned int child_count)
+{
+ int midx = 0;
+ int idx = 0;
+ int i = 0;
+
+ midx = afr_index_for_transaction_type (AFR_METADATA_TRANSACTION);
+ if (IA_ISDIR (buf->ia_type))
+ idx = afr_index_for_transaction_type (AFR_ENTRY_TRANSACTION);
+ else if (IA_ISREG (buf->ia_type))
+ idx = afr_index_for_transaction_type (AFR_DATA_TRANSACTION);
+ else
+ idx = -1;
+ for (i = 0; i < child_count; i++) {
+ if (child_errno[i])
+ continue;
+ pending[i][midx] = hton32 (1);
+ if (idx == -1)
+ continue;
+ pending[i][idx] = hton32 (1);
+ }
+}
int
-afr_sh_entry_impunge_parent_setattr_cbk (call_frame_t *setattr_frame,
- void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno,
- struct iatt *preop, struct iatt *postop)
+afr_sh_entry_impunge_perform_xattrop (call_frame_t *impunge_frame,
+ xlator_t *this)
{
- loc_t *parent_loc = cookie;
+ int active_src = 0;
+ dict_t *xattr = NULL;
+ afr_private_t *priv = NULL;
+ afr_local_t *impunge_local = NULL;
+ afr_self_heal_t *impunge_sh = NULL;
+ int32_t op_errno = 0;
- if (op_ret != 0) {
- gf_log (this->name, GF_LOG_INFO,
- "setattr on parent directory (%s) failed: %s",
- parent_loc->path, strerror (op_errno));
+ priv = this->private;
+ impunge_local = impunge_frame->local;
+ impunge_sh = &impunge_local->self_heal;
+ active_src = impunge_sh->active_source;
+
+ afr_sh_prepare_new_entry_pending_matrix (impunge_local->pending,
+ impunge_sh->child_errno,
+ &impunge_sh->entrybuf,
+ priv->child_count);
+ xattr = dict_new ();
+ if (!xattr) {
+ op_errno = ENOMEM;
+ goto out;
}
- loc_wipe (parent_loc);
+ afr_set_pending_dict (priv, xattr, impunge_local->pending);
- GF_FREE (parent_loc);
+ STACK_WIND_COOKIE (impunge_frame, afr_sh_entry_impunge_xattrop_cbk,
+ (void *) (long) active_src,
+ priv->children[active_src],
+ priv->children[active_src]->fops->xattrop,
+ &impunge_local->loc, GF_XATTROP_ADD_ARRAY, xattr);
- AFR_STACK_DESTROY (setattr_frame);
+ if (xattr)
+ dict_unref (xattr);
+ return 0;
+out:
+ afr_sh_entry_call_impunge_done (impunge_frame, this,
+ -1, op_errno);
return 0;
}
@@ -1049,115 +1160,37 @@ afr_sh_entry_impunge_newfile_cbk (call_frame_t *impunge_frame, void *cookie,
afr_private_t *priv = NULL;
afr_local_t *impunge_local = NULL;
afr_self_heal_t *impunge_sh = NULL;
- int active_src = 0;
int child_index = 0;
- int32_t *pending_array = NULL;
- dict_t *xattr = NULL;
- int ret = 0;
- int idx = 0;
- call_frame_t *setattr_frame = NULL;
- int32_t valid = 0;
- loc_t *parent_loc = NULL;
- struct iatt parentbuf = {0,};
priv = this->private;
impunge_local = impunge_frame->local;
impunge_sh = &impunge_local->self_heal;
- active_src = impunge_sh->active_source;
child_index = (long) cookie;
if (op_ret == -1) {
- ret = -1;
+ impunge_sh->child_errno[child_index] = op_errno;
gf_log (this->name, GF_LOG_ERROR,
"creation of %s on %s failed (%s)",
impunge_local->loc.path,
priv->children[child_index]->name,
strerror (op_errno));
- goto out;
- }
-
- inode->ia_type = stbuf->ia_type;
-
- xattr = dict_new ();
- if (!xattr) {
- ret = -1;
- goto out;
- }
-
- pending_array = (int32_t*) GF_CALLOC (3, sizeof (*pending_array),
- gf_afr_mt_int32_t);
-
- if (!pending_array) {
- ret = -1;
- goto out;
- }
-
- /* Pending data xattrs shouldn't be set for special files
- */
- idx = afr_index_for_transaction_type (AFR_METADATA_TRANSACTION);
- pending_array[idx] = hton32 (1);
- if (IA_ISDIR (stbuf->ia_type))
- idx = afr_index_for_transaction_type (AFR_ENTRY_TRANSACTION);
- else if (IA_ISREG (stbuf->ia_type))
- idx = afr_index_for_transaction_type (AFR_DATA_TRANSACTION);
- else
- goto cont;
- pending_array[idx] = hton32 (1);
-
-cont:
- ret = dict_set_dynptr (xattr, priv->pending_key[child_index],
- pending_array,
- 3 * sizeof (*pending_array));
- if (ret < 0) {
- gf_log (this->name, GF_LOG_WARNING,
- "Unable to set dict value.");
} else {
- pending_array = NULL;
+ impunge_sh->child_errno[child_index] = 0;
}
- valid = GF_SET_ATTR_ATIME | GF_SET_ATTR_MTIME;
- parentbuf = impunge_sh->parentbuf;
- setattr_frame = copy_frame (impunge_frame);
-
- parent_loc = GF_CALLOC (1, sizeof (*parent_loc),
- gf_afr_mt_loc_t);
- if (!parent_loc) {
- ret = -1;
- goto out;
- }
- afr_build_parent_loc (parent_loc, &impunge_local->loc);
-
- STACK_WIND_COOKIE (impunge_frame, afr_sh_entry_impunge_xattrop_cbk,
- (void *) (long) child_index,
- priv->children[active_src],
- priv->children[active_src]->fops->xattrop,
- &impunge_local->loc, GF_XATTROP_ADD_ARRAY, xattr);
-
- STACK_WIND_COOKIE (setattr_frame, afr_sh_entry_impunge_parent_setattr_cbk,
- (void *) (long) parent_loc,
- priv->children[child_index],
- priv->children[child_index]->fops->setattr,
- parent_loc, &parentbuf, valid);
-out:
- if (xattr)
- dict_unref (xattr);
-
- if (ret) {
- if (pending_array)
- GF_FREE (pending_array);
-
- LOCK (&impunge_frame->lock);
- {
- call_count = --impunge_local->call_count;
- }
- UNLOCK (&impunge_frame->lock);
-
- if (call_count == 0)
+ call_count = afr_frame_return (impunge_frame);
+ if (call_count == 0) {
+ if (!afr_errno_count (NULL, impunge_sh->child_errno,
+ priv->child_count, 0)) {
+ // new_file creation failed every where
afr_sh_entry_call_impunge_done (impunge_frame, this,
-1, op_errno);
+ goto out;
+ }
+ afr_sh_entry_impunge_perform_xattrop (impunge_frame, this);
}
-
+out:
return 0;
}
@@ -1538,8 +1571,7 @@ afr_sh_entry_impunge_readlink (call_frame_t *impunge_frame, xlator_t *this,
int
afr_sh_entry_impunge_create (call_frame_t *impunge_frame, xlator_t *this,
- int child_index, struct iatt *buf,
- struct iatt *postparent)
+ int child_index)
{
afr_local_t *impunge_local = NULL;
afr_self_heal_t *impunge_sh = NULL;
@@ -1547,14 +1579,15 @@ afr_sh_entry_impunge_create (call_frame_t *impunge_frame, xlator_t *this,
ia_type_t type = IA_INVAL;
int ret = 0;
int active_src = 0;
+ struct iatt *buf = NULL;
impunge_local = impunge_frame->local;
impunge_sh = &impunge_local->self_heal;
- impunge_sh->parentbuf = *postparent;
active_src = impunge_sh->active_source;
- impunge_local->cont.lookup.buf = *buf;
- afr_update_loc_gfids (&impunge_local->loc, buf, postparent);
+ afr_update_loc_gfids (&impunge_local->loc, &impunge_sh->entrybuf,
+ &impunge_sh->parentbuf);
+ buf = &impunge_sh->entrybuf;
type = buf->ia_type;
switch (type) {
@@ -1587,24 +1620,17 @@ afr_sh_entry_impunge_create (call_frame_t *impunge_frame, xlator_t *this,
}
gf_boolean_t
-afr_sh_need_recreate (afr_self_heal_t *impunge_sh, int *sources,
- unsigned int child, unsigned int child_count)
+afr_sh_need_recreate (afr_self_heal_t *impunge_sh, unsigned int child,
+ unsigned int child_count)
{
- int32_t *success_children = NULL;
gf_boolean_t recreate = _gf_false;
- GF_ASSERT (impunge_sh->impunging_entry_mode);
GF_ASSERT (impunge_sh->child_errno);
- GF_ASSERT (sources);
- success_children = impunge_sh->success_children;
- if (child == impunge_sh->active_source) {
- GF_ASSERT (afr_is_child_present (success_children,
- child_count, child));
+ if (child == impunge_sh->active_source)
goto out;
- }
- if (IA_ISLNK (impunge_sh->impunging_entry_mode)) {
+ if (IA_IFLNK == impunge_sh->entrybuf.ia_type) {
recreate = _gf_true;
goto out;
}
@@ -1623,7 +1649,7 @@ afr_sh_recreate_count (afr_self_heal_t *impunge_sh, int *sources,
int i = 0;
for (i = 0; i < child_count; i++) {
- if (afr_sh_need_recreate (impunge_sh, sources, i, child_count))
+ if (afr_sh_need_recreate (impunge_sh, i, child_count))
count++;
}
@@ -1640,8 +1666,6 @@ afr_sh_entry_call_impunge_recreate (call_frame_t *impunge_frame,
call_frame_t *frame = NULL;
afr_local_t *local = NULL;
afr_self_heal_t *sh = NULL;
- struct iatt *buf = NULL;
- struct iatt *postparent = NULL;
unsigned int recreate_count = 0;
int i = 0;
int active_src = 0;
@@ -1649,24 +1673,34 @@ afr_sh_entry_call_impunge_recreate (call_frame_t *impunge_frame,
priv = this->private;
AFR_INIT_SH_FRAME_VALS (impunge_frame, impunge_local, impunge_sh,
frame, local, sh);
- active_src = impunge_sh->active_source;
- buf = &impunge_sh->buf[active_src];
- postparent = &impunge_sh->parentbufs[active_src];
-
+ active_src = impunge_sh->active_source;
+ impunge_sh->entrybuf = impunge_sh->buf[active_src];
+ impunge_sh->parentbuf = impunge_sh->parentbufs[active_src];
recreate_count = afr_sh_recreate_count (impunge_sh, sh->sources,
priv->child_count);
- GF_ASSERT (recreate_count);
+ if (!recreate_count) {
+ afr_sh_entry_call_impunge_done (impunge_frame, this, 0, 0);
+ goto out;
+ }
impunge_local->call_count = recreate_count;
for (i = 0; i < priv->child_count; i++) {
- if (afr_sh_need_recreate (impunge_sh, sh->sources, i,
- priv->child_count)) {
- (void)afr_sh_entry_impunge_create (impunge_frame, this,
- i, buf,
- postparent);
- recreate_count--;
+ if (!impunge_local->child_up[i]) {
+ impunge_sh->child_errno[i] = ENOTCONN;
+ continue;
}
+ if (!afr_sh_need_recreate (impunge_sh, i, priv->child_count)) {
+ impunge_sh->child_errno[i] = EEXIST;
+ continue;
+ }
+ }
+ for (i = 0; i < priv->child_count; i++) {
+ if (!afr_sh_need_recreate (impunge_sh, i, priv->child_count))
+ continue;
+ (void)afr_sh_entry_impunge_create (impunge_frame, this, i);
+ recreate_count--;
}
GF_ASSERT (!recreate_count);
+out:
return 0;
}
@@ -1680,7 +1714,6 @@ afr_sh_entry_common_lookup_done (call_frame_t *impunge_frame, xlator_t *this,
call_frame_t *frame = NULL;
afr_local_t *local = NULL;
afr_self_heal_t *sh = NULL;
- unsigned int recreate_count = 0;
unsigned int gfid_miss_count = 0;
unsigned int children_up_count = 0;
uuid_t gfid = {0};
@@ -1731,13 +1764,6 @@ afr_sh_entry_common_lookup_done (call_frame_t *impunge_frame, xlator_t *this,
AFR_LOOKUP_FAIL_CONFLICTS |
AFR_LOOKUP_FAIL_MISSING_GFIDS);
} else {
- recreate_count = afr_sh_recreate_count (impunge_sh, sh->sources,
- priv->child_count);
- if (!recreate_count) {
- op_ret = 0;
- op_errno = 0;
- goto done;
- }
afr_sh_entry_call_impunge_recreate (impunge_frame, this);
}
return;
@@ -1753,13 +1779,13 @@ afr_sh_entry_impunge_entry (call_frame_t *frame, xlator_t *this,
{
afr_local_t *local = NULL;
afr_self_heal_t *sh = NULL;
+ afr_self_heal_t *impunge_sh = NULL;
int ret = -1;
call_frame_t *impunge_frame = NULL;
afr_local_t *impunge_local = NULL;
int active_src = 0;
int op_errno = 0;
int op_ret = -1;
- mode_t entry_mode = 0;
local = frame->local;
sh = &local->self_heal;
@@ -1783,18 +1809,18 @@ afr_sh_entry_impunge_entry (call_frame_t *frame, xlator_t *this,
"inspecting existence of %s under %s",
entry->d_name, local->loc.path);
- entry_mode = st_mode_from_ia (entry->d_stat.ia_prot,
- entry->d_stat.ia_type);
- ret = afr_impunge_frame_create (frame, this, active_src, active_src,
- entry_mode, &impunge_frame);
+ ret = afr_impunge_frame_create (frame, this, active_src,
+ &impunge_frame);
if (ret) {
op_errno = -ret;
goto out;
}
impunge_local = impunge_frame->local;
+ impunge_sh = &impunge_local->self_heal;
ret = afr_build_child_loc (this, &impunge_local->loc, &local->loc,
entry->d_name, entry->d_stat.ia_gfid);
+ loc_copy (&impunge_sh->parent_loc, &local->loc);
if (ret != 0) {
op_errno = ENOMEM;
goto out;
@@ -1809,7 +1835,7 @@ out:
if (ret) {
if (impunge_frame)
AFR_STACK_DESTROY (impunge_frame);
- sh->impunge_done (frame, this, active_src, op_ret, op_errno);
+ sh->impunge_done (frame, this, op_ret, op_errno);
}
return 0;
@@ -1843,6 +1869,7 @@ afr_sh_entry_impunge_readdir_cbk (call_frame_t *frame, void *cookie,
local->loc.path,
priv->children[active_src]->name,
strerror (op_errno));
+ sh->op_failed = 1;
} else {
gf_log (this->name, GF_LOG_TRACE,
"readdir of %s on subvolume %s complete",
@@ -1859,7 +1886,7 @@ afr_sh_entry_impunge_readdir_cbk (call_frame_t *frame, void *cookie,
entry_count++;
}
- gf_log (this->name, GF_LOG_TRACE,
+ gf_log (this->name, GF_LOG_DEBUG,
"readdir'ed %d entries from %s",
entry_count, priv->children[active_src]->name);
@@ -1875,16 +1902,19 @@ afr_sh_entry_impunge_readdir_cbk (call_frame_t *frame, void *cookie,
int
-afr_sh_entry_impunge_subvol (call_frame_t *frame, xlator_t *this,
- int active_src)
+afr_sh_entry_impunge_subvol (call_frame_t *frame, xlator_t *this)
{
afr_private_t *priv = NULL;
afr_local_t *local = NULL;
afr_self_heal_t *sh = NULL;
+ int32_t active_src = 0;
priv = this->private;
local = frame->local;
sh = &local->self_heal;
+ active_src = sh->active_source;
+ gf_log (this->name, GF_LOG_DEBUG, "%s: readdir from offset %zd",
+ local->loc.path, sh->offset);
STACK_WIND (frame, afr_sh_entry_impunge_readdir_cbk,
priv->children[active_src],
@@ -1927,7 +1957,7 @@ afr_sh_entry_impunge_all (call_frame_t *frame, xlator_t *this)
"impunging entries of %s on %s to other sinks",
local->loc.path, priv->children[active_src]->name);
- afr_sh_entry_impunge_subvol (frame, this, active_src);
+ afr_sh_entry_impunge_subvol (frame, this);
return 0;
}
diff --git a/xlators/cluster/afr/src/afr-transaction.c b/xlators/cluster/afr/src/afr-transaction.c
index 36d74aed8..36e2812f9 100644
--- a/xlators/cluster/afr/src/afr-transaction.c
+++ b/xlators/cluster/afr/src/afr-transaction.c
@@ -275,8 +275,7 @@ __fop_changelog_needed (call_frame_t *frame, xlator_t *this)
return op_ret;
}
-
-static int
+int
afr_set_pending_dict (afr_private_t *priv, dict_t *xattr, int32_t **pending)
{
int i = 0;
@@ -284,7 +283,8 @@ afr_set_pending_dict (afr_private_t *priv, dict_t *xattr, int32_t **pending)
for (i = 0; i < priv->child_count; i++) {
ret = dict_set_static_bin (xattr, priv->pending_key[i],
- pending[i], 3 * sizeof (int32_t));
+ pending[i],
+ AFR_NUM_CHANGE_LOGS * sizeof (int32_t));
/* 3 = data+metadata+entry */
if (ret < 0)
@@ -568,8 +568,7 @@ afr_changelog_post_op (call_frame_t *frame, xlator_t *this)
for (i = 0; i < priv->child_count; i++) {
if (!local->transaction.pre_op[i])
continue;
- ret = afr_set_pending_dict (priv, xattr[i],
- local->pending);
+ ret = afr_set_pending_dict (priv, xattr[i], local->pending);
if (ret < 0)
gf_log (this->name, GF_LOG_INFO,
@@ -665,8 +664,7 @@ afr_changelog_post_op (call_frame_t *frame, xlator_t *this)
value
*/
- ret = afr_set_pending_dict (priv, xattr[i],
- local->pending);
+ ret = afr_set_pending_dict (priv, xattr[i], local->pending);
if (ret < 0)
gf_log (this->name, GF_LOG_INFO,
@@ -816,8 +814,7 @@ afr_changelog_pre_op (call_frame_t *frame, xlator_t *this)
for (i = 0; i < priv->child_count; i++) {
if (!locked_nodes[i])
continue;
- ret = afr_set_pending_dict (priv, xattr[i],
- local->pending);
+ ret = afr_set_pending_dict (priv, xattr[i], local->pending);
if (ret < 0)
gf_log (this->name, GF_LOG_INFO,
@@ -918,8 +915,7 @@ afr_changelog_pre_op (call_frame_t *frame, xlator_t *this)
value
*/
- ret = afr_set_pending_dict (priv, xattr[i],
- local->pending);
+ ret = afr_set_pending_dict (priv, xattr[i], local->pending);
if (ret < 0)
gf_log (this->name, GF_LOG_INFO,
diff --git a/xlators/cluster/afr/src/afr-transaction.h b/xlators/cluster/afr/src/afr-transaction.h
index 10f274fec..f470f2697 100644
--- a/xlators/cluster/afr/src/afr-transaction.h
+++ b/xlators/cluster/afr/src/afr-transaction.h
@@ -32,4 +32,6 @@ afr_transaction (call_frame_t *frame, xlator_t *this, afr_transaction_type type)
afr_fd_ctx_t *
afr_fd_ctx_get (fd_t *fd, xlator_t *this);
+int
+afr_set_pending_dict (afr_private_t *priv, dict_t *xattr, int32_t **pending);
#endif /* __TRANSACTION_H__ */
diff --git a/xlators/cluster/afr/src/afr.h b/xlators/cluster/afr/src/afr.h
index de25a2d46..0c4bf5f63 100644
--- a/xlators/cluster/afr/src/afr.h
+++ b/xlators/cluster/afr/src/afr.h
@@ -43,8 +43,7 @@ typedef int (*afr_expunge_done_cbk_t) (call_frame_t *frame, xlator_t *this,
int32_t op_errno);
typedef int (*afr_impunge_done_cbk_t) (call_frame_t *frame, xlator_t *this,
- int child, int32_t op_error,
- int32_t op_errno);
+ int32_t op_error, int32_t op_errno);
typedef int (*afr_post_remove_call_t) (call_frame_t *frame, xlator_t *this);
typedef int (*afr_lock_cbk_t) (call_frame_t *frame, xlator_t *this);
@@ -192,7 +191,6 @@ typedef struct {
afr_expunge_done_cbk_t expunge_done;
afr_impunge_done_cbk_t impunge_done;
- int32_t impunge_ret_child;
/* array of xattr's, one for each child */
dict_t **xattr;
@@ -226,7 +224,6 @@ typedef struct {
unsigned char *locked_nodes;
int lock_count;
- mode_t impunging_entry_mode;
const char *linkname;
gf_boolean_t entries_skipped;
@@ -882,6 +879,7 @@ afr_launch_openfd_self_heal (call_frame_t *frame, xlator_t *this, fd_t *fd);
GF_FREE (__local); \
} while (0);
+#define AFR_NUM_CHANGE_LOGS 3 /*data + metadata + entry*/
/* allocate and return a string that is the basename of argument */
static inline char *
AFR_BASENAME (const char *str)
@@ -1013,6 +1011,11 @@ afr_child_fd_ctx_set (xlator_t *this, fd_t *fd, int32_t child,
gf_boolean_t
afr_have_quorum (char *logname, afr_private_t *priv);
+void
+afr_matrix_cleanup (int32_t **pending, unsigned int m);
+
+int32_t**
+afr_matrix_create (unsigned int m, unsigned int n);
/*
* Special value indicating we should use the "auto" quorum method instead of
* a fixed value (including zero to turn off quorum enforcement).