summaryrefslogtreecommitdiffstats
path: root/xlators/performance/io-threads/src/io-threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
-rw-r--r--xlators/performance/io-threads/src/io-threads.c200
1 files changed, 195 insertions, 5 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index fa067cc4d6a..601e04abad6 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -15,11 +15,13 @@
#include "dict.h"
#include "xlator.h"
#include "io-threads.h"
+#include <signal.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include "locking.h"
#include "io-threads-messages.h"
+#include "timespec.h"
void *iot_worker (void *arg);
int iot_workers_scale (iot_conf_t *conf);
@@ -113,6 +115,7 @@ __iot_dequeue (iot_conf_t *conf, int *pri)
}
conf->ac_iot_count[i]++;
+ conf->queue_marked[i] = _gf_false;
*pri = i;
break;
}
@@ -953,6 +956,165 @@ iot_priv_dump (xlator_t *this)
return 0;
}
+/*
+ * We use a decay model to keep track and make sure we're not spawning new
+ * threads too often. Each increment adds a large value to a counter, and that
+ * counter keeps ticking back down to zero over a fairly long period. For
+ * example, let's use ONE_WEEK=604800 seconds, and we want to detect when we
+ * have N=3 increments during that time. Thus, our threshold is
+ * (N-1)*ONE_WEEK. To see how it works, look at three examples.
+ *
+ * (a) Two events close together, then one more almost a week later. The
+ * first two events push our counter to 2*ONE_WEEK plus a bit. At the third
+ * event, we decay down to ONE_WEEK plus a bit and then add ONE_WEEK for the
+ * new event, exceeding our threshold.
+ *
+ * (b) One event, then two more almost a week later. At the time of the
+ * second and third events, the counter is already non-zero, so when we add
+ * 2*ONE_WEEK we exceed again.
+ *
+ * (c) Three events, spaced three days apart. At the time of the second
+ * event, we decay down to approxitely ONE_WEEK*4/7 and then add another
+ * ONE_WEEK. At the third event, we decay again down to ONE_WEEK*8/7 and add
+ * another ONE_WEEK, so boom.
+ *
+ * Note that in all three cases if that last event came a day later our counter
+ * would have decayed a bit more and we would *not* exceed our threshold. It's
+ * not exactly the same as a precise "three in one week" limit, but it's very
+ * close and it allows the same kind of tweaking while requiring only constant
+ * space - no arrays of variable length N to allocate or maintain. All we need
+ * (for each queue) is the value plus the time of the last update.
+ */
+
+typedef struct {
+ uint32_t value;
+ time_t update_time;
+} threshold_t;
+/*
+ * Variables so that I can hack these for testing.
+ * TBD: make these tunable?
+ */
+static uint32_t THRESH_SECONDS = 604800;
+static uint32_t THRESH_EVENTS = 3;
+static uint32_t THRESH_LIMIT = 1209600; /* SECONDS * (EVENTS-1) */
+
+static void
+iot_apply_event (xlator_t *this, threshold_t *thresh)
+{
+ struct timespec now;
+ time_t delta;
+
+ /* Refresh for manual testing/debugging. It's cheap. */
+ THRESH_LIMIT = THRESH_SECONDS * (THRESH_EVENTS - 1);
+
+ timespec_now (&now);
+
+ if (thresh->value && thresh->update_time) {
+ delta = now.tv_sec - thresh->update_time;
+ /* Be careful about underflow. */
+ if (thresh->value <= delta) {
+ thresh->value = 0;
+ } else {
+ thresh->value -= delta;
+ }
+ }
+
+ thresh->value += THRESH_SECONDS;
+ if (thresh->value >= THRESH_LIMIT) {
+ gf_log (this->name, GF_LOG_EMERG, "watchdog firing too often");
+ /*
+ * The default action for SIGTRAP is to dump core, but the fact
+ * that it's distinct from other signals we use means that
+ * there are other possibilities as well (e.g. drop into gdb or
+ * invoke a special handler).
+ */
+ kill (getpid (), SIGTRAP);
+ }
+
+ thresh->update_time = now.tv_sec;
+}
+
+static void *
+iot_watchdog (void *arg)
+{
+ xlator_t *this = arg;
+ iot_conf_t *priv = this->private;
+ int i;
+ int bad_times[GF_FOP_PRI_MAX] = { 0, };
+ threshold_t thresholds[GF_FOP_PRI_MAX] = { { 0, } };
+
+ for (;;) {
+ sleep (max (priv->watchdog_secs/5, 1));
+ pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL);
+ pthread_mutex_lock (&priv->mutex);
+ for (i = 0; i < GF_FOP_PRI_MAX; ++i) {
+ if (priv->queue_marked[i]) {
+ if (++bad_times[i] >= 5) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "queue %d stalled", i);
+ iot_apply_event (this, &thresholds[i]);
+ /*
+ * We might not get here if the event
+ * put us over our threshold.
+ */
+ ++(priv->ac_iot_limit[i]);
+ bad_times[i] = 0;
+ }
+ } else {
+ bad_times[i] = 0;
+ }
+ priv->queue_marked[i] = (priv->queue_sizes[i] > 0);
+ }
+ pthread_mutex_unlock (&priv->mutex);
+ pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
+ }
+
+ /* NOTREACHED */
+ return NULL;
+}
+
+static void
+start_iot_watchdog (xlator_t *this)
+{
+ iot_conf_t *priv = this->private;
+ int ret;
+
+ if (priv->watchdog_running) {
+ return;
+ }
+
+ ret = pthread_create (&priv->watchdog_thread, NULL, iot_watchdog, this);
+ if (ret == 0) {
+ priv->watchdog_running = _gf_true;
+ } else {
+ gf_log (this->name, GF_LOG_WARNING,
+ "pthread_create(iot_watchdog) failed");
+ }
+}
+
+static void
+stop_iot_watchdog (xlator_t *this)
+{
+ iot_conf_t *priv = this->private;
+
+ if (!priv->watchdog_running) {
+ return;
+ }
+
+ if (pthread_cancel (priv->watchdog_thread) != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "pthread_cancel(iot_watchdog) failed");
+ }
+
+ if (pthread_join (priv->watchdog_thread, NULL) != 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "pthread_join(iot_watchdog) failed");
+ }
+
+ /* Failure probably means it's already dead. */
+ priv->watchdog_running = _gf_false;
+}
+
int
reconfigure (xlator_t *this, dict_t *options)
{
@@ -980,9 +1142,18 @@ reconfigure (xlator_t *this, dict_t *options)
GF_OPTION_RECONF ("least-prio-threads",
conf->ac_iot_limit[GF_FOP_PRI_LEAST], options, int32,
out);
+
GF_OPTION_RECONF ("enable-least-priority", conf->least_priority,
options, bool, out);
+ GF_OPTION_RECONF ("watchdog-secs", conf->watchdog_secs, options,
+ int32, out);
+ if (conf->watchdog_secs > 0) {
+ start_iot_watchdog (this);
+ } else {
+ stop_iot_watchdog (this);
+ }
+
ret = 0;
out:
return ret;
@@ -1074,6 +1245,13 @@ init (xlator_t *this)
}
this->private = conf;
+
+ conf->watchdog_secs = 0;
+ GF_OPTION_INIT ("watchdog-secs", conf->watchdog_secs, int32, out);
+ if (conf->watchdog_secs > 0) {
+ start_iot_watchdog (this);
+ }
+
ret = 0;
out:
if (ret)
@@ -1127,6 +1305,8 @@ fini (xlator_t *this)
if (conf->mutex_inited)
pthread_mutex_destroy (&conf->mutex);
+ stop_iot_watchdog (this);
+
GF_FREE (conf);
this->private = NULL;
@@ -1274,11 +1454,21 @@ struct volume_options options[] = {
.tags = {"io-threads"},
.description = "Enable/Disable least priority"
},
- {.key = {"idle-time"},
- .type = GF_OPTION_TYPE_INT,
- .min = 1,
- .max = 0x7fffffff,
- .default_value = "120",
+ { .key = {"idle-time"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 1,
+ .max = 0x7fffffff,
+ .default_value = "120",
+ },
+ { .key = {"watchdog-secs"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 0,
+ .default_value = 0,
+ .op_version = {GD_OP_VERSION_4_1_0},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
+ .tags = {"io-threads"},
+ .description = "Number of seconds a queue must be stalled before "
+ "starting an 'emergency' thread."
},
{ .key = {NULL},
},