summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/ec
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/ec')
-rw-r--r--xlators/cluster/ec/src/ec-common.h1
-rw-r--r--xlators/cluster/ec/src/ec-data.c2
-rw-r--r--xlators/cluster/ec/src/ec-data.h1
-rw-r--r--xlators/cluster/ec/src/ec-heal.c109
-rw-r--r--xlators/cluster/ec/src/ec.c2
-rw-r--r--xlators/cluster/ec/src/ec.h4
6 files changed, 114 insertions, 5 deletions
diff --git a/xlators/cluster/ec/src/ec-common.h b/xlators/cluster/ec/src/ec-common.h
index 3334a7bfe0e..4c7fe0c820b 100644
--- a/xlators/cluster/ec/src/ec-common.h
+++ b/xlators/cluster/ec/src/ec-common.h
@@ -112,5 +112,6 @@ void ec_resume_parent(ec_fop_data_t * fop, int32_t error);
void ec_manager(ec_fop_data_t * fop, int32_t error);
gf_boolean_t ec_is_recoverable_error (int32_t op_errno);
+void ec_handle_healers_done (ec_fop_data_t *fop);
#endif /* __EC_COMMON_H__ */
diff --git a/xlators/cluster/ec/src/ec-data.c b/xlators/cluster/ec/src/ec-data.c
index 0632371bb6d..2a34f78999c 100644
--- a/xlators/cluster/ec/src/ec-data.c
+++ b/xlators/cluster/ec/src/ec-data.c
@@ -135,6 +135,7 @@ ec_fop_data_t * ec_fop_data_allocate(call_frame_t * frame, xlator_t * this,
}
INIT_LIST_HEAD(&fop->cbk_list);
+ INIT_LIST_HEAD(&fop->healer);
INIT_LIST_HEAD(&fop->answer_list);
INIT_LIST_HEAD(&fop->pending_list);
INIT_LIST_HEAD(&fop->locks[0].wait_list);
@@ -300,6 +301,7 @@ void ec_fop_data_release(ec_fop_data_t * fop)
ec = fop->xl->private;
ec_handle_last_pending_fop_completion (fop, &notify);
+ ec_handle_healers_done (fop);
mem_put(fop);
if (notify) {
ec_pending_fops_completed(ec);
diff --git a/xlators/cluster/ec/src/ec-data.h b/xlators/cluster/ec/src/ec-data.h
index 670b3b88670..135ccdf5f53 100644
--- a/xlators/cluster/ec/src/ec-data.h
+++ b/xlators/cluster/ec/src/ec-data.h
@@ -213,6 +213,7 @@ struct _ec_fop_data
ec_cbk_t cbks;
void *data;
ec_heal_t *heal;
+ struct list_head healer;
uint64_t user_size;
uint32_t head;
diff --git a/xlators/cluster/ec/src/ec-heal.c b/xlators/cluster/ec/src/ec-heal.c
index b014df25b94..31685356db0 100644
--- a/xlators/cluster/ec/src/ec-heal.c
+++ b/xlators/cluster/ec/src/ec-heal.c
@@ -26,6 +26,9 @@
#include "syncop-utils.h"
#include "cluster-syncop.h"
+#define EC_MAX_BACKGROUND_HEALS 8
+#define EC_MAX_HEAL_WAITERS 128
+
#define alloca0(size) ({void *__ptr; __ptr = alloca(size); memset(__ptr, 0, size); __ptr; })
#define EC_COUNT(array, max) ({int __i; int __res = 0; for (__i = 0; __i < max; __i++) if (array[__i]) __res++; __res; })
#define EC_INTERSECT(dst, src1, src2, max) ({int __i; for (__i = 0; __i < max; __i++) dst[__i] = src1[__i] && src2[__i]; })
@@ -2318,6 +2321,106 @@ ec_heal_done (int ret, call_frame_t *heal, void *opaque)
return 0;
}
+ec_fop_data_t*
+__ec_dequeue_heals (ec_t *ec)
+{
+ ec_fop_data_t *fop = NULL;
+
+ if (list_empty (&ec->heal_waiting))
+ goto none;
+
+ if (ec->healers == EC_MAX_BACKGROUND_HEALS)
+ goto none;
+
+ GF_ASSERT (ec->healers < EC_MAX_BACKGROUND_HEALS);
+ fop = list_entry(ec->heal_waiting.next, ec_fop_data_t, healer);
+ ec->heal_waiters--;
+ list_del_init(&fop->healer);
+ list_add(&fop->healer, &ec->healing);
+ ec->healers++;
+ return fop;
+none:
+ gf_msg_debug (ec->xl->name, 0, "Num healers: %d, Num Waiters: %d",
+ ec->healers, ec->heal_waiters);
+ return NULL;
+}
+
+void
+ec_heal_fail (ec_t *ec, ec_fop_data_t *fop)
+{
+ if (fop->cbks.heal) {
+ fop->cbks.heal (fop->req_frame, NULL, ec->xl, -1, EIO, 0, 0,
+ 0, NULL);
+ }
+ if (fop)
+ ec_fop_data_release (fop);
+}
+
+void
+ec_launch_heal (ec_t *ec, ec_fop_data_t *fop)
+{
+ int ret = 0;
+
+ ret = synctask_new (ec->xl->ctx->env, ec_synctask_heal_wrap,
+ ec_heal_done, NULL, fop);
+ if (ret < 0) {
+ ec_heal_fail (ec, fop);
+ }
+}
+
+void
+ec_handle_healers_done (ec_fop_data_t *fop)
+{
+ ec_t *ec = fop->xl->private;
+ ec_fop_data_t *heal_fop = NULL;
+
+ if (list_empty (&fop->healer))
+ return;
+
+ LOCK (&ec->lock);
+ {
+ list_del_init (&fop->healer);
+ ec->healers--;
+ heal_fop = __ec_dequeue_heals (ec);
+ }
+ UNLOCK (&ec->lock);
+
+ if (heal_fop)
+ ec_launch_heal (ec, heal_fop);
+
+}
+
+void
+ec_heal_throttle (xlator_t *this, ec_fop_data_t *fop)
+{
+ gf_boolean_t can_heal = _gf_true;
+ ec_t *ec = this->private;
+
+ if (fop->req_frame == NULL) {
+
+ LOCK (&ec->lock);
+ {
+ if (ec->heal_waiters >= EC_MAX_HEAL_WAITERS) {
+ can_heal = _gf_false;
+ } else {
+ list_add_tail(&fop->healer, &ec->heal_waiting);
+ ec->heal_waiters++;
+ fop = __ec_dequeue_heals (ec);
+ }
+ }
+ UNLOCK (&ec->lock);
+ }
+
+ if (can_heal) {
+ if (fop)
+ ec_launch_heal (ec, fop);
+ } else {
+ gf_msg_debug (this->name, 0, "Max number of heals are pending, "
+ "background self-heal rejected");
+ ec_heal_fail (ec, fop);
+ }
+}
+
void
ec_heal (call_frame_t *frame, xlator_t *this, uintptr_t target,
int32_t minimum, fop_heal_cbk_t func, void *data, loc_t *loc,
@@ -2325,7 +2428,6 @@ ec_heal (call_frame_t *frame, xlator_t *this, uintptr_t target,
{
ec_cbk_t callback = { .heal = func };
ec_fop_data_t *fop = NULL;
- int ret = 0;
gf_msg_trace ("ec", 0, "EC(HEAL) %p", frame);
@@ -2353,10 +2455,7 @@ ec_heal (call_frame_t *frame, xlator_t *this, uintptr_t target,
if (xdata)
fop->xdata = dict_ref(xdata);
- ret = synctask_new (this->ctx->env, ec_synctask_heal_wrap,
- ec_heal_done, NULL, fop);
- if (ret < 0)
- goto fail;
+ ec_heal_throttle (this, fop);
return;
fail:
if (fop)
diff --git a/xlators/cluster/ec/src/ec.c b/xlators/cluster/ec/src/ec.c
index 64ab91bf9bd..dd51630ea79 100644
--- a/xlators/cluster/ec/src/ec.c
+++ b/xlators/cluster/ec/src/ec.c
@@ -543,6 +543,8 @@ init (xlator_t *this)
LOCK_INIT(&ec->lock);
INIT_LIST_HEAD(&ec->pending_fops);
+ INIT_LIST_HEAD(&ec->heal_waiting);
+ INIT_LIST_HEAD(&ec->healing);
ec->fop_pool = mem_pool_new(ec_fop_data_t, 1024);
ec->cbk_pool = mem_pool_new(ec_cbk_data_t, 4096);
diff --git a/xlators/cluster/ec/src/ec.h b/xlators/cluster/ec/src/ec.h
index fdedb89ec18..7f140204ece 100644
--- a/xlators/cluster/ec/src/ec.h
+++ b/xlators/cluster/ec/src/ec.h
@@ -28,6 +28,8 @@
struct _ec
{
xlator_t * xl;
+ int32_t healers;
+ int32_t heal_waiters;
int32_t nodes;
int32_t bits_for_nodes;
int32_t fragments;
@@ -46,6 +48,8 @@ struct _ec
gf_timer_t * timer;
gf_boolean_t shutdown;
struct list_head pending_fops;
+ struct list_head heal_waiting;
+ struct list_head healing;
struct mem_pool * fop_pool;
struct mem_pool * cbk_pool;
struct mem_pool * lock_pool;