diff options
author | Varsha Rao <varao@redhat.com> | 2018-02-21 23:03:48 +0530 |
---|---|---|
committer | Amar Tumballi <amarts@redhat.com> | 2018-02-22 17:25:13 +0000 |
commit | cfbc524239b1d3bc417849e68379c81e83fd56d9 (patch) | |
tree | b1533cb9a6ceacb726e2ed38db1c7dc659bb946c /xlators/performance/io-threads/src/io-threads.c | |
parent | efbfd17dfc73c4e6055adeffc65be1697f094168 (diff) |
performance/io-threads: Add threads to priority based stagnant queues
> performance/io-threads: Add watchdog to cover up a possible thread leak
> Commit ID: 8b6804f75c
> https://review.gluster.org/#/c/18239/
> By Shreyas Siravara <sshreyas@fb.com>
This patch is required to forward port io-threads namespace patch.
Updates: #401
Change-Id: Id057c34a2abb9fc6dfb4afcd5c7bbbfe5693bbb8
Signed-off-by: Varsha Rao <varao@redhat.com>
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 200 |
1 files changed, 195 insertions, 5 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index fa067cc4d6a..601e04abad6 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -15,11 +15,13 @@ #include "dict.h" #include "xlator.h" #include "io-threads.h" +#include <signal.h> #include <stdlib.h> #include <sys/time.h> #include <time.h> #include "locking.h" #include "io-threads-messages.h" +#include "timespec.h" void *iot_worker (void *arg); int iot_workers_scale (iot_conf_t *conf); @@ -113,6 +115,7 @@ __iot_dequeue (iot_conf_t *conf, int *pri) } conf->ac_iot_count[i]++; + conf->queue_marked[i] = _gf_false; *pri = i; break; } @@ -953,6 +956,165 @@ iot_priv_dump (xlator_t *this) return 0; } +/* + * We use a decay model to keep track and make sure we're not spawning new + * threads too often. Each increment adds a large value to a counter, and that + * counter keeps ticking back down to zero over a fairly long period. For + * example, let's use ONE_WEEK=604800 seconds, and we want to detect when we + * have N=3 increments during that time. Thus, our threshold is + * (N-1)*ONE_WEEK. To see how it works, look at three examples. + * + * (a) Two events close together, then one more almost a week later. The + * first two events push our counter to 2*ONE_WEEK plus a bit. At the third + * event, we decay down to ONE_WEEK plus a bit and then add ONE_WEEK for the + * new event, exceeding our threshold. + * + * (b) One event, then two more almost a week later. At the time of the + * second and third events, the counter is already non-zero, so when we add + * 2*ONE_WEEK we exceed again. + * + * (c) Three events, spaced three days apart. At the time of the second + * event, we decay down to approxitely ONE_WEEK*4/7 and then add another + * ONE_WEEK. At the third event, we decay again down to ONE_WEEK*8/7 and add + * another ONE_WEEK, so boom. + * + * Note that in all three cases if that last event came a day later our counter + * would have decayed a bit more and we would *not* exceed our threshold. It's + * not exactly the same as a precise "three in one week" limit, but it's very + * close and it allows the same kind of tweaking while requiring only constant + * space - no arrays of variable length N to allocate or maintain. All we need + * (for each queue) is the value plus the time of the last update. + */ + +typedef struct { + uint32_t value; + time_t update_time; +} threshold_t; +/* + * Variables so that I can hack these for testing. + * TBD: make these tunable? + */ +static uint32_t THRESH_SECONDS = 604800; +static uint32_t THRESH_EVENTS = 3; +static uint32_t THRESH_LIMIT = 1209600; /* SECONDS * (EVENTS-1) */ + +static void +iot_apply_event (xlator_t *this, threshold_t *thresh) +{ + struct timespec now; + time_t delta; + + /* Refresh for manual testing/debugging. It's cheap. */ + THRESH_LIMIT = THRESH_SECONDS * (THRESH_EVENTS - 1); + + timespec_now (&now); + + if (thresh->value && thresh->update_time) { + delta = now.tv_sec - thresh->update_time; + /* Be careful about underflow. */ + if (thresh->value <= delta) { + thresh->value = 0; + } else { + thresh->value -= delta; + } + } + + thresh->value += THRESH_SECONDS; + if (thresh->value >= THRESH_LIMIT) { + gf_log (this->name, GF_LOG_EMERG, "watchdog firing too often"); + /* + * The default action for SIGTRAP is to dump core, but the fact + * that it's distinct from other signals we use means that + * there are other possibilities as well (e.g. drop into gdb or + * invoke a special handler). + */ + kill (getpid (), SIGTRAP); + } + + thresh->update_time = now.tv_sec; +} + +static void * +iot_watchdog (void *arg) +{ + xlator_t *this = arg; + iot_conf_t *priv = this->private; + int i; + int bad_times[GF_FOP_PRI_MAX] = { 0, }; + threshold_t thresholds[GF_FOP_PRI_MAX] = { { 0, } }; + + for (;;) { + sleep (max (priv->watchdog_secs/5, 1)); + pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); + pthread_mutex_lock (&priv->mutex); + for (i = 0; i < GF_FOP_PRI_MAX; ++i) { + if (priv->queue_marked[i]) { + if (++bad_times[i] >= 5) { + gf_log (this->name, GF_LOG_WARNING, + "queue %d stalled", i); + iot_apply_event (this, &thresholds[i]); + /* + * We might not get here if the event + * put us over our threshold. + */ + ++(priv->ac_iot_limit[i]); + bad_times[i] = 0; + } + } else { + bad_times[i] = 0; + } + priv->queue_marked[i] = (priv->queue_sizes[i] > 0); + } + pthread_mutex_unlock (&priv->mutex); + pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); + } + + /* NOTREACHED */ + return NULL; +} + +static void +start_iot_watchdog (xlator_t *this) +{ + iot_conf_t *priv = this->private; + int ret; + + if (priv->watchdog_running) { + return; + } + + ret = pthread_create (&priv->watchdog_thread, NULL, iot_watchdog, this); + if (ret == 0) { + priv->watchdog_running = _gf_true; + } else { + gf_log (this->name, GF_LOG_WARNING, + "pthread_create(iot_watchdog) failed"); + } +} + +static void +stop_iot_watchdog (xlator_t *this) +{ + iot_conf_t *priv = this->private; + + if (!priv->watchdog_running) { + return; + } + + if (pthread_cancel (priv->watchdog_thread) != 0) { + gf_log (this->name, GF_LOG_WARNING, + "pthread_cancel(iot_watchdog) failed"); + } + + if (pthread_join (priv->watchdog_thread, NULL) != 0) { + gf_log (this->name, GF_LOG_WARNING, + "pthread_join(iot_watchdog) failed"); + } + + /* Failure probably means it's already dead. */ + priv->watchdog_running = _gf_false; +} + int reconfigure (xlator_t *this, dict_t *options) { @@ -980,9 +1142,18 @@ reconfigure (xlator_t *this, dict_t *options) GF_OPTION_RECONF ("least-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LEAST], options, int32, out); + GF_OPTION_RECONF ("enable-least-priority", conf->least_priority, options, bool, out); + GF_OPTION_RECONF ("watchdog-secs", conf->watchdog_secs, options, + int32, out); + if (conf->watchdog_secs > 0) { + start_iot_watchdog (this); + } else { + stop_iot_watchdog (this); + } + ret = 0; out: return ret; @@ -1074,6 +1245,13 @@ init (xlator_t *this) } this->private = conf; + + conf->watchdog_secs = 0; + GF_OPTION_INIT ("watchdog-secs", conf->watchdog_secs, int32, out); + if (conf->watchdog_secs > 0) { + start_iot_watchdog (this); + } + ret = 0; out: if (ret) @@ -1127,6 +1305,8 @@ fini (xlator_t *this) if (conf->mutex_inited) pthread_mutex_destroy (&conf->mutex); + stop_iot_watchdog (this); + GF_FREE (conf); this->private = NULL; @@ -1274,11 +1454,21 @@ struct volume_options options[] = { .tags = {"io-threads"}, .description = "Enable/Disable least priority" }, - {.key = {"idle-time"}, - .type = GF_OPTION_TYPE_INT, - .min = 1, - .max = 0x7fffffff, - .default_value = "120", + { .key = {"idle-time"}, + .type = GF_OPTION_TYPE_INT, + .min = 1, + .max = 0x7fffffff, + .default_value = "120", + }, + { .key = {"watchdog-secs"}, + .type = GF_OPTION_TYPE_INT, + .min = 0, + .default_value = 0, + .op_version = {GD_OP_VERSION_4_1_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Number of seconds a queue must be stalled before " + "starting an 'emergency' thread." }, { .key = {NULL}, }, |