diff options
Diffstat (limited to 'libglusterfs/src/throttle-tbf.c')
-rw-r--r-- | libglusterfs/src/throttle-tbf.c | 375 |
1 files changed, 187 insertions, 188 deletions
diff --git a/libglusterfs/src/throttle-tbf.c b/libglusterfs/src/throttle-tbf.c index a425166b681..9519defa37f 100644 --- a/libglusterfs/src/throttle-tbf.c +++ b/libglusterfs/src/throttle-tbf.c @@ -27,93 +27,93 @@ #include "throttle-tbf.h" typedef struct tbf_throttle { - char done; + char done; - pthread_mutex_t mutex; - pthread_cond_t cond; + pthread_mutex_t mutex; + pthread_cond_t cond; - unsigned long tokens; + unsigned long tokens; - struct list_head list; + struct list_head list; } tbf_throttle_t; static tbf_throttle_t * -tbf_init_throttle (unsigned long tokens_required) +tbf_init_throttle(unsigned long tokens_required) { - tbf_throttle_t *throttle = NULL; + tbf_throttle_t *throttle = NULL; - throttle = GF_CALLOC (1, sizeof (*throttle), - gf_common_mt_tbf_throttle_t); - if (!throttle) - return NULL; + throttle = GF_CALLOC(1, sizeof(*throttle), gf_common_mt_tbf_throttle_t); + if (!throttle) + return NULL; - throttle->done = 0; - throttle->tokens = tokens_required; - INIT_LIST_HEAD (&throttle->list); + throttle->done = 0; + throttle->tokens = tokens_required; + INIT_LIST_HEAD(&throttle->list); - (void) pthread_mutex_init (&throttle->mutex, NULL); - (void) pthread_cond_init (&throttle->cond, NULL); + (void)pthread_mutex_init(&throttle->mutex, NULL); + (void)pthread_cond_init(&throttle->cond, NULL); - return throttle; + return throttle; } void -_tbf_dispatch_queued (tbf_bucket_t *bucket) +_tbf_dispatch_queued(tbf_bucket_t *bucket) { - gf_boolean_t xcont = _gf_false; - tbf_throttle_t *tmp = NULL; - tbf_throttle_t *throttle = NULL; - - list_for_each_entry_safe (throttle, tmp, &bucket->queued, list) { - - pthread_mutex_lock (&throttle->mutex); - { - if (bucket->tokens < throttle->tokens) { - xcont = _gf_true; - goto unblock; - } - - /* this request can now be serviced */ - throttle->done = 1; - list_del_init (&throttle->list); - - bucket->tokens -= throttle->tokens; - pthread_cond_signal (&throttle->cond); - } - unblock: - pthread_mutex_unlock (&throttle->mutex); - if (xcont) - break; + gf_boolean_t xcont = _gf_false; + tbf_throttle_t *tmp = NULL; + tbf_throttle_t *throttle = NULL; + + list_for_each_entry_safe(throttle, tmp, &bucket->queued, list) + { + pthread_mutex_lock(&throttle->mutex); + { + if (bucket->tokens < throttle->tokens) { + xcont = _gf_true; + goto unblock; + } + + /* this request can now be serviced */ + throttle->done = 1; + list_del_init(&throttle->list); + + bucket->tokens -= throttle->tokens; + pthread_cond_signal(&throttle->cond); } + unblock: + pthread_mutex_unlock(&throttle->mutex); + if (xcont) + break; + } } -void *tbf_tokengenerator (void *arg) +void * +tbf_tokengenerator(void *arg) { - unsigned long tokenrate = 0; - unsigned long maxtokens = 0; - unsigned long token_gen_interval = 0; - tbf_bucket_t *bucket = arg; - - tokenrate = bucket->tokenrate; - maxtokens = bucket->maxtokens; - token_gen_interval = bucket->token_gen_interval; - - while (1) { - usleep (token_gen_interval); - - LOCK (&bucket->lock); - { - bucket->tokens += tokenrate; - if (bucket->tokens > maxtokens) - bucket->tokens = maxtokens; - - if (!list_empty (&bucket->queued)) - _tbf_dispatch_queued (bucket); - } - UNLOCK (&bucket->lock); + unsigned long tokenrate = 0; + unsigned long maxtokens = 0; + unsigned long token_gen_interval = 0; + tbf_bucket_t *bucket = arg; + + tokenrate = bucket->tokenrate; + maxtokens = bucket->maxtokens; + token_gen_interval = bucket->token_gen_interval; + + while (1) { + usleep(token_gen_interval); + + LOCK(&bucket->lock); + { + bucket->tokens += tokenrate; + if (bucket->tokens > maxtokens) + bucket->tokens = maxtokens; + + if (!list_empty(&bucket->queued)) + _tbf_dispatch_queued(bucket); } + UNLOCK(&bucket->lock); + } - return NULL; + return NULL; } /** @@ -122,170 +122,169 @@ void *tbf_tokengenerator (void *arg) * updated _after_ all the required variables are initialized. */ static int32_t -tbf_init_bucket (tbf_t *tbf, tbf_opspec_t *spec) +tbf_init_bucket(tbf_t *tbf, tbf_opspec_t *spec) { - int ret = 0; - tbf_bucket_t *curr = NULL; - tbf_bucket_t **bucket = NULL; + int ret = 0; + tbf_bucket_t *curr = NULL; + tbf_bucket_t **bucket = NULL; - GF_ASSERT (spec->op >= TBF_OP_MIN); - GF_ASSERT (spec->op <= TBF_OP_MAX); + GF_ASSERT(spec->op >= TBF_OP_MIN); + GF_ASSERT(spec->op <= TBF_OP_MAX); - /* no rate? no throttling. */ - if (!spec->rate) - return 0; + /* no rate? no throttling. */ + if (!spec->rate) + return 0; - bucket = tbf->bucket + spec->op; + bucket = tbf->bucket + spec->op; - curr = GF_CALLOC (1, sizeof (*curr), gf_common_mt_tbf_bucket_t); - if (!curr) - goto error_return; + curr = GF_CALLOC(1, sizeof(*curr), gf_common_mt_tbf_bucket_t); + if (!curr) + goto error_return; - LOCK_INIT (&curr->lock); - INIT_LIST_HEAD (&curr->queued); + LOCK_INIT(&curr->lock); + INIT_LIST_HEAD(&curr->queued); - curr->tokens = 0; - curr->tokenrate = spec->rate; - curr->maxtokens = spec->maxlimit; - curr->token_gen_interval = spec->token_gen_interval; + curr->tokens = 0; + curr->tokenrate = spec->rate; + curr->maxtokens = spec->maxlimit; + curr->token_gen_interval = spec->token_gen_interval; - ret = gf_thread_create (&curr->tokener, - NULL, tbf_tokengenerator, curr, "tbfclock"); - if (ret != 0) - goto freemem; + ret = gf_thread_create(&curr->tokener, NULL, tbf_tokengenerator, curr, + "tbfclock"); + if (ret != 0) + goto freemem; - *bucket = curr; - return 0; + *bucket = curr; + return 0; - freemem: - LOCK_DESTROY (&curr->lock); - GF_FREE (curr); - error_return: - return -1; +freemem: + LOCK_DESTROY(&curr->lock); + GF_FREE(curr); +error_return: + return -1; } -#define TBF_ALLOC_SIZE \ - (sizeof (tbf_t) + (TBF_OP_MAX * sizeof (tbf_bucket_t))) +#define TBF_ALLOC_SIZE (sizeof(tbf_t) + (TBF_OP_MAX * sizeof(tbf_bucket_t))) tbf_t * -tbf_init (tbf_opspec_t *tbfspec, unsigned int count) +tbf_init(tbf_opspec_t *tbfspec, unsigned int count) { - int32_t i = 0; - int32_t ret = 0; - tbf_t *tbf = NULL; - tbf_opspec_t *opspec = NULL; - - tbf = GF_CALLOC (1, TBF_ALLOC_SIZE, gf_common_mt_tbf_t); - if (!tbf) - goto error_return; - - tbf->bucket = (tbf_bucket_t **) ((char *)tbf + sizeof (*tbf)); - for (i = 0; i < TBF_OP_MAX; i++) { - *(tbf->bucket + i) = NULL; - } + int32_t i = 0; + int32_t ret = 0; + tbf_t *tbf = NULL; + tbf_opspec_t *opspec = NULL; - for (i = 0; i < count; i++) { - opspec = tbfspec + i; + tbf = GF_CALLOC(1, TBF_ALLOC_SIZE, gf_common_mt_tbf_t); + if (!tbf) + goto error_return; - ret = tbf_init_bucket (tbf, opspec); - if (ret) - break; - } + tbf->bucket = (tbf_bucket_t **)((char *)tbf + sizeof(*tbf)); + for (i = 0; i < TBF_OP_MAX; i++) { + *(tbf->bucket + i) = NULL; + } + for (i = 0; i < count; i++) { + opspec = tbfspec + i; + + ret = tbf_init_bucket(tbf, opspec); if (ret) - goto error_return; + break; + } - return tbf; + if (ret) + goto error_return; - error_return: - return NULL; + return tbf; + +error_return: + return NULL; } static void -tbf_mod_bucket (tbf_bucket_t *bucket, tbf_opspec_t *spec) +tbf_mod_bucket(tbf_bucket_t *bucket, tbf_opspec_t *spec) { - LOCK (&bucket->lock); - { - bucket->tokens = 0; - bucket->tokenrate = spec->rate; - bucket->maxtokens = spec->maxlimit; - } - UNLOCK (&bucket->lock); - - /* next token tick would unqueue pending operations */ + LOCK(&bucket->lock); + { + bucket->tokens = 0; + bucket->tokenrate = spec->rate; + bucket->maxtokens = spec->maxlimit; + } + UNLOCK(&bucket->lock); + + /* next token tick would unqueue pending operations */ } int -tbf_mod (tbf_t *tbf, tbf_opspec_t *tbfspec) +tbf_mod(tbf_t *tbf, tbf_opspec_t *tbfspec) { - int ret = 0; - tbf_bucket_t *bucket = NULL; - tbf_ops_t op = TBF_OP_MIN; + int ret = 0; + tbf_bucket_t *bucket = NULL; + tbf_ops_t op = TBF_OP_MIN; - if (!tbf || !tbfspec) - return -1; + if (!tbf || !tbfspec) + return -1; - op = tbfspec->op; + op = tbfspec->op; - GF_ASSERT (op >= TBF_OP_MIN); - GF_ASSERT (op <= TBF_OP_MAX); + GF_ASSERT(op >= TBF_OP_MIN); + GF_ASSERT(op <= TBF_OP_MAX); - bucket = *(tbf->bucket + op); - if (bucket) { - tbf_mod_bucket (bucket, tbfspec); - } else { - ret = tbf_init_bucket (tbf, tbfspec); - } + bucket = *(tbf->bucket + op); + if (bucket) { + tbf_mod_bucket(bucket, tbfspec); + } else { + ret = tbf_init_bucket(tbf, tbfspec); + } - return ret; + return ret; } void -tbf_throttle (tbf_t *tbf, tbf_ops_t op, unsigned long tokens_requested) +tbf_throttle(tbf_t *tbf, tbf_ops_t op, unsigned long tokens_requested) { - char waitq = 0; - tbf_bucket_t *bucket = NULL; - tbf_throttle_t *throttle = NULL; - - GF_ASSERT (op >= TBF_OP_MIN); - GF_ASSERT (op <= TBF_OP_MAX); - - bucket = *(tbf->bucket + op); - if (!bucket) - return; + char waitq = 0; + tbf_bucket_t *bucket = NULL; + tbf_throttle_t *throttle = NULL; + + GF_ASSERT(op >= TBF_OP_MIN); + GF_ASSERT(op <= TBF_OP_MAX); + + bucket = *(tbf->bucket + op); + if (!bucket) + return; + + LOCK(&bucket->lock); + { + /** + * if there are enough tokens in the bucket there is no need + * to throttle the request: therefore, consume the required + * number of tokens and continue. + */ + if (tokens_requested <= bucket->tokens) { + bucket->tokens -= tokens_requested; + } else { + throttle = tbf_init_throttle(tokens_requested); + if (!throttle) /* let it slip through for now.. */ + goto unblock; - LOCK (&bucket->lock); - { - /** - * if there are enough tokens in the bucket there is no need - * to throttle the request: therefore, consume the required - * number of tokens and continue. - */ - if (tokens_requested <= bucket->tokens) { - bucket->tokens -= tokens_requested; - } else { - throttle = tbf_init_throttle (tokens_requested); - if (!throttle) /* let it slip through for now.. */ - goto unblock; - - waitq = 1; - pthread_mutex_lock (&throttle->mutex); - list_add_tail (&throttle->list, &bucket->queued); - } + waitq = 1; + pthread_mutex_lock(&throttle->mutex); + list_add_tail(&throttle->list, &bucket->queued); } - unblock: - UNLOCK (&bucket->lock); + } +unblock: + UNLOCK(&bucket->lock); - if (waitq) { - while (!throttle->done) { - pthread_cond_wait (&throttle->cond, &throttle->mutex); - } + if (waitq) { + while (!throttle->done) { + pthread_cond_wait(&throttle->cond, &throttle->mutex); + } - pthread_mutex_unlock (&throttle->mutex); + pthread_mutex_unlock(&throttle->mutex); - pthread_mutex_destroy (&throttle->mutex); - pthread_cond_destroy (&throttle->cond); + pthread_mutex_destroy(&throttle->mutex); + pthread_cond_destroy(&throttle->cond); - GF_FREE (throttle); - } + GF_FREE(throttle); + } } |