diff options
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-volume-set.c | 5 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 217 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 7 | 
3 files changed, 213 insertions, 16 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c index 64fa7e77c1b..857c494e039 100644 --- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c +++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c @@ -1541,6 +1541,11 @@ struct volopt_map_entry glusterd_volopt_map[] = {            .voltype     = "performance/io-threads",            .op_version  = 2          }, +        { .key         = "performance.iot-watchdog-secs", +          .voltype     = "performance/io-threads", +          .option      = "watchdog-secs", +          .op_version  = 2 +        },          /* Other perf xlators' options */          { .key        = "performance.cache-size", diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 7f9dc5f82a8..97d2dea1b71 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -15,11 +15,13 @@  #include "dict.h"  #include "xlator.h"  #include "io-threads.h" +#include <signal.h>  #include <stdlib.h>  #include <sys/time.h>  #include <time.h>  #include "locking.h"  #include "io-threads-messages.h" +#include "timespec.h"  void *iot_worker (void *arg);  int iot_workers_scale (iot_conf_t *conf); @@ -114,6 +116,7 @@ __iot_dequeue (iot_conf_t *conf, int *pri, struct timespec *sleep)                  stub = list_entry (conf->reqs[i].next, call_stub_t, list);                  conf->ac_iot_count[i]++; +                conf->queue_marked[i] = _gf_false;                  *pri = i;                  break;          } @@ -935,6 +938,165 @@ iot_priv_dump (xlator_t *this)          return 0;  } +/* + * We use a decay model to keep track and make sure we're not spawning new + * threads too often.  Each increment adds a large value to a counter, and that + * counter keeps ticking back down to zero over a fairly long period.  For + * example, let's use ONE_WEEK=604800 seconds, and we want to detect when we + * have N=3 increments during that time.  Thus, our threshold is + * (N-1)*ONE_WEEK.  To see how it works, look at three examples. + * + *   (a) Two events close together, then one more almost a week later.  The + *   first two events push our counter to 2*ONE_WEEK plus a bit.  At the third + *   event, we decay down to ONE_WEEK plus a bit and then add ONE_WEEK for the + *   new event, exceeding our threshold. + * + *   (b) One event, then two more almost a week later.  At the time of the + *   second and third events, the counter is already non-zero, so when we add + *   2*ONE_WEEK we exceed again. + * + *   (c) Three events, spaced three days apart.  At the time of the second + *   event, we decay down to approxitely ONE_WEEK*4/7 and then add another + *   ONE_WEEK.  At the third event, we decay again down to ONE_WEEK*8/7 and add + *   another ONE_WEEK, so boom. + * + * Note that in all three cases if that last event came a day later our counter + * would have decayed a bit more and we would *not* exceed our threshold.  It's + * not exactly the same as a precise "three in one week" limit, but it's very + * close and it allows the same kind of tweaking while requiring only constant + * space - no arrays of variable length N to allocate or maintain.  All we need + * (for each queue) is the value plus the time of the last update. + */ + +typedef struct { +        uint32_t        value; +        time_t          update_time; +} threshold_t; +/* + * Variables so that I can hack these for testing. + * TBD: make these tunable? + */ +static uint32_t        THRESH_SECONDS  = 604800; +static uint32_t        THRESH_EVENTS   = 3; +static uint32_t        THRESH_LIMIT    = 1209600;       /* SECONDS * (EVENTS-1) */ + +static void +iot_apply_event (xlator_t *this, threshold_t *thresh) +{ +        struct timespec now; +        time_t          delta; + +        /* Refresh for manual testing/debugging.  It's cheap. */ +        THRESH_LIMIT = THRESH_SECONDS * (THRESH_EVENTS - 1); + +        timespec_now (&now); + +        if (thresh->value && thresh->update_time) { +                delta = now.tv_sec - thresh->update_time; +                /* Be careful about underflow. */ +                if (thresh->value <= delta) { +                        thresh->value = 0; +                } else { +                        thresh->value -= delta; +                } +        } + +        thresh->value += THRESH_SECONDS; +        if (thresh->value >= THRESH_LIMIT) { +                gf_log (this->name, GF_LOG_EMERG, "watchdog firing too often"); +                /* +                 * The default action for SIGTRAP is to dump core, but the fact +                 * that it's distinct from other signals we use means that +                 * there are other possibilities as well (e.g. drop into gdb or +                 * invoke a special handler). +                 */ +                kill (getpid (), SIGTRAP); +        } + +        thresh->update_time = now.tv_sec; +} + +static void * +iot_watchdog (void *arg) +{ +        xlator_t        *this   = arg; +        iot_conf_t      *priv   = this->private; +        int             i; +        int             bad_times[IOT_PRI_MAX] = { 0, }; +        threshold_t     thresholds[IOT_PRI_MAX] = { { 0, } }; + +        for (;;) { +                sleep (max (priv->watchdog_secs/5, 1)); +                pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); +                pthread_mutex_lock (&priv->mutex); +                for (i = 0; i < IOT_PRI_MAX; ++i) { +                        if (priv->queue_marked[i]) { +                                if (++bad_times[i] >= 5) { +                                        gf_log (this->name, GF_LOG_WARNING, +                                                "queue %d stalled", i); +                                        iot_apply_event (this, &thresholds[i]); +                                        /* +                                         * We might not get here if the event +                                         * put us over our threshold. +                                         */ +                                        ++(priv->ac_iot_limit[i]); +                                        bad_times[i] = 0; +                                } +                        } else { +                                bad_times[i] = 0; +                        } +                        priv->queue_marked[i] = (priv->queue_sizes[i] > 0); +                } +                pthread_mutex_unlock (&priv->mutex); +                pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); +        } + +        /* NOTREACHED */ +        return NULL; +} + +static void +start_iot_watchdog (xlator_t *this) +{ +        iot_conf_t      *priv   = this->private; +        int             ret; + +        if (priv->watchdog_running) { +                return; +        } + +        ret = pthread_create (&priv->watchdog_thread, NULL, iot_watchdog, this); +        if (ret == 0) { +                priv->watchdog_running = _gf_true; +        } else { +                gf_log (this->name, GF_LOG_WARNING, +                        "pthread_create(iot_watchdog) failed"); +        } +} + +static void +stop_iot_watchdog (xlator_t *this) +{ +        iot_conf_t      *priv   = this->private; + +        if (!priv->watchdog_running) { +                return; +        } + +        if (pthread_cancel (priv->watchdog_thread) != 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "pthread_cancel(iot_watchdog) failed"); +        } + +        if (pthread_join (priv->watchdog_thread, NULL) != 0) { +                gf_log (this->name, GF_LOG_WARNING, +                        "pthread_join(iot_watchdog) failed"); +        } + +        /* Failure probably means it's already dead. */ +        priv->watchdog_running = _gf_false; +} +  int  reconfigure (xlator_t *this, dict_t *options)  { @@ -966,8 +1128,16 @@ reconfigure (xlator_t *this, dict_t *options)          GF_OPTION_RECONF ("enable-least-priority", conf->least_priority,                            options, bool, out); -	GF_OPTION_RECONF("least-rate-limit", conf->throttle.rate_limit, options, -			 int32, out); +	GF_OPTION_RECONF ("least-rate-limit", conf->throttle.rate_limit, +                          options, int32, out); + +        GF_OPTION_RECONF ("watchdog-secs", conf->watchdog_secs, options, +                          int32, out); +        if (conf->watchdog_secs > 0) { +                start_iot_watchdog (this); +        } else { +                stop_iot_watchdog (this); +        }  	ret = 0;  out: @@ -1043,8 +1213,9 @@ init (xlator_t *this)          GF_OPTION_INIT ("enable-least-priority", conf->least_priority,                          bool, out); -	GF_OPTION_INIT("least-rate-limit", conf->throttle.rate_limit, int32, -		       out); +	GF_OPTION_INIT ("least-rate-limit", conf->throttle.rate_limit, int32, +		        out); +          if ((ret = pthread_mutex_init(&conf->throttle.lock, NULL)) != 0) {                  gf_msg (this->name, GF_LOG_ERROR, 0,                          IO_THREADS_MSG_INIT_FAILED, @@ -1068,6 +1239,13 @@ init (xlator_t *this)          }  	this->private = conf; + +        conf->watchdog_secs = 0; +        GF_OPTION_INIT ("watchdog-secs", conf->watchdog_secs, int32, out); +        if (conf->watchdog_secs > 0) { +                start_iot_watchdog (this); +        } +          ret = 0;  out:          if (ret) @@ -1120,6 +1298,8 @@ fini (xlator_t *this)          if (conf->mutex_inited)                  pthread_mutex_destroy (&conf->mutex); +        stop_iot_watchdog (this); +  	GF_FREE (conf);  	this->private = NULL; @@ -1243,20 +1423,27 @@ struct volume_options options[] = {            .default_value = "on",            .description = "Enable/Disable least priority"          }, -        {.key   = {"idle-time"}, -         .type  = GF_OPTION_TYPE_INT, -         .min   = 1, -         .max   = 0x7fffffff, -         .default_value = "120", +        { .key   = {"idle-time"}, +          .type  = GF_OPTION_TYPE_INT, +          .min   = 1, +          .max   = 0x7fffffff, +          .default_value = "120",          }, -	{.key	= {"least-rate-limit"}, -	 .type	= GF_OPTION_TYPE_INT, -	 .min	= 0, -         .max	= INT_MAX, -	 .default_value = "0", -	 .description = "Max number of least priority operations to handle " +	{ .key	= {"least-rate-limit"}, +	  .type	= GF_OPTION_TYPE_INT, +	  .min	= 0, +          .max	= INT_MAX, +	  .default_value = "0", +	  .description = "Max number of least priority operations to handle "  			"per-second"  	}, +        { .key   = {"watchdog-secs"}, +          .type  = GF_OPTION_TYPE_INT, +          .min   = 0, +          .default_value = 0, +          .description = "Number of seconds a queue must be stalled before " +                         "starting an 'emergency' thread." +        },  	{ .key  = {NULL},          },  }; diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index 011d4a00f7f..4056eb5fe09 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -77,7 +77,12 @@ struct iot_conf {          gf_boolean_t         down; /*PARENT_DOWN event is notified*/          gf_boolean_t         mutex_inited;          gf_boolean_t         cond_inited; -	struct iot_least_throttle throttle; +	      struct              iot_least_throttle throttle; + +        int32_t             watchdog_secs; +        gf_boolean_t        watchdog_running; +        pthread_t           watchdog_thread; +        gf_boolean_t        queue_marked[IOT_PRI_MAX];  };  typedef struct iot_conf iot_conf_t;  | 
