diff options
Diffstat (limited to 'xlators/performance')
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 86 | ||||
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 10 |
2 files changed, 94 insertions, 2 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 2a5cec7edc2..dbf1929e897 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -30,16 +30,69 @@ int __iot_workers_scale (iot_conf_t *conf); struct volume_options options[]; call_stub_t * -__iot_dequeue (iot_conf_t *conf, int *pri) +__iot_dequeue (iot_conf_t *conf, int *pri, struct timespec *sleep) { call_stub_t *stub = NULL; int i = 0; + struct timeval curtv = {0,}, difftv = {0,}; *pri = -1; + sleep->tv_sec = 0; + sleep->tv_nsec = 0; for (i = 0; i < IOT_PRI_MAX; i++) { if (list_empty (&conf->reqs[i]) || (conf->ac_iot_count[i] >= conf->ac_iot_limit[i])) continue; + + if (i == IOT_PRI_LEAST) { + pthread_mutex_lock(&conf->throttle.lock); + if (!conf->throttle.sample_time.tv_sec) { + /* initialize */ + gettimeofday(&conf->throttle.sample_time, NULL); + } else { + /* + * Maintain a running count of least priority + * operations that are handled over a particular + * time interval. The count is provided via + * state dump and is used as a measure against + * least priority op throttling. + */ + gettimeofday(&curtv, NULL); + timersub(&curtv, &conf->throttle.sample_time, + &difftv); + if (difftv.tv_sec >= IOT_LEAST_THROTTLE_DELAY) { + conf->throttle.cached_rate = + conf->throttle.sample_cnt; + conf->throttle.sample_cnt = 0; + conf->throttle.sample_time = curtv; + } + + /* + * If we're over the configured rate limit, + * provide an absolute time to the caller that + * represents the soonest we're allowed to + * return another least priority request. + */ + if (conf->throttle.rate_limit && + conf->throttle.sample_cnt >= + conf->throttle.rate_limit) { + struct timeval delay; + delay.tv_sec = IOT_LEAST_THROTTLE_DELAY; + delay.tv_usec = 0; + + timeradd(&conf->throttle.sample_time, + &delay, &curtv); + TIMEVAL_TO_TIMESPEC(&curtv, sleep); + + pthread_mutex_unlock( + &conf->throttle.lock); + break; + } + } + conf->throttle.sample_cnt++; + pthread_mutex_unlock(&conf->throttle.lock); + } + stub = list_entry (conf->reqs[i].next, call_stub_t, list); conf->ac_iot_count[i]++; *pri = i; @@ -83,6 +136,7 @@ iot_worker (void *data) int pri = -1; char timeout = 0; char bye = 0; + struct timespec sleep = {0,}; conf = data; this = conf->this; @@ -123,7 +177,13 @@ iot_worker (void *data) } } - stub = __iot_dequeue (conf, &pri); + stub = __iot_dequeue (conf, &pri, &sleep); + if (!stub && (sleep.tv_sec || sleep.tv_nsec)) { + pthread_cond_timedwait(&conf->cond, + &conf->mutex, &sleep); + pthread_mutex_unlock(&conf->mutex); + continue; + } } pthread_mutex_unlock (&conf->mutex); @@ -2485,6 +2545,10 @@ iot_priv_dump (xlator_t *this) gf_proc_dump_write("least_priority_threads", "%d", conf->ac_iot_limit[IOT_PRI_LEAST]); + gf_proc_dump_write("cached least rate", "%u", + conf->throttle.cached_rate); + gf_proc_dump_write("least rate limit", "%u", conf->throttle.rate_limit); + return 0; } @@ -2516,6 +2580,9 @@ reconfigure (xlator_t *this, dict_t *options) GF_OPTION_RECONF ("enable-least-priority", conf->least_priority, options, bool, out); + GF_OPTION_RECONF("least-rate-limit", conf->throttle.rate_limit, options, + int32, out); + ret = 0; out: return ret; @@ -2580,6 +2647,14 @@ init (xlator_t *this) GF_OPTION_INIT ("enable-least-priority", conf->least_priority, bool, out); + GF_OPTION_INIT("least-rate-limit", conf->throttle.rate_limit, int32, + out); + if ((ret = pthread_mutex_init(&conf->throttle.lock, NULL)) != 0) { + gf_log (this->name, GF_LOG_ERROR, + "pthread_mutex_init failed (%d)", ret); + goto out; + } + conf->this = this; for (i = 0; i < IOT_PRI_MAX; i++) { @@ -2722,6 +2797,13 @@ struct volume_options options[] = { .max = 0x7fffffff, .default_value = "120", }, + {.key = {"least-rate-limit"}, + .type = GF_OPTION_TYPE_INT, + .min = 0, + .default_value = "0", + .description = "Max number of least priority operations to handle " + "per-second" + }, { .key = {NULL}, }, }; diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index a6b640884d0..1a9dee9ae2c 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -53,6 +53,14 @@ typedef enum { IOT_PRI_MAX, } iot_pri_t; +#define IOT_LEAST_THROTTLE_DELAY 1 /* sample interval in seconds */ +struct iot_least_throttle { + struct timeval sample_time; /* timestamp of current sample */ + uint32_t sample_cnt; /* sample count for active interval */ + uint32_t cached_rate; /* the most recently measured rate */ + int32_t rate_limit; /* user-specified rate limit */ + pthread_mutex_t lock; +}; struct iot_conf { pthread_mutex_t mutex; @@ -75,6 +83,8 @@ struct iot_conf { xlator_t *this; size_t stack_size; + + struct iot_least_throttle throttle; }; typedef struct iot_conf iot_conf_t; |