diff options
| author | Kotresh HR <khiremat@redhat.com> | 2016-07-01 15:54:07 +0530 | 
|---|---|---|
| committer | Jeff Darcy <jdarcy@redhat.com> | 2016-07-19 06:14:13 -0700 | 
| commit | 2526f9d3ddc3e7f62828ac4289001b22638ae42a (patch) | |
| tree | 7bd7a766098f2ce8d08583647b85c778ccf8505c /libglusterfs/src/throttle-tbf.c | |
| parent | 74d2aaf51c7ff601e4394cad9f8e23092267af55 (diff) | |
features/bitrot: Move throttling code to libglusterfs
Backport of  http://review.gluster.org/14846
Since throttling is a separate feature by itself,
move throttling code to libglusterfs.
Change-Id: If9b99885ceb46e5b1865a4af18b2a2caecf59972
BUG: 1357514
Signed-off-by: Kotresh HR <khiremat@redhat.com>
Reviewed-on: http://review.gluster.org/14944
Smoke: Gluster Build System <jenkins@build.gluster.org>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
Diffstat (limited to 'libglusterfs/src/throttle-tbf.c')
| -rw-r--r-- | libglusterfs/src/throttle-tbf.c | 291 | 
1 files changed, 291 insertions, 0 deletions
diff --git a/libglusterfs/src/throttle-tbf.c b/libglusterfs/src/throttle-tbf.c new file mode 100644 index 00000000000..16630a243c2 --- /dev/null +++ b/libglusterfs/src/throttle-tbf.c @@ -0,0 +1,291 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +/** + * + * Basic token bucket implementation for rate limiting. As of now interfaces + * to throttle disk read request, directory entry scan and hash calculation + * are available. To throttle a particular request (operation), the call needs + * to be wrapped in-between throttling APIs, for e.g. + * + *  TBF_THROTTLE_BEGIN (...);  <-- induces "delays" if required + *  { + *      call (...); + *  } + *  TBF_THROTTLE_END (...);  <-- not used atm, maybe needed later + * + */ + +#include "mem-pool.h" +#include "throttle-tbf.h" + +typedef struct tbf_throttle { +        char done; + +        pthread_mutex_t mutex; +        pthread_cond_t  cond; + +        unsigned long tokens; + +        struct list_head list; +} tbf_throttle_t; + +static tbf_throttle_t * +tbf_init_throttle (unsigned long tokens_required) +{ +        tbf_throttle_t *throttle = NULL; + +        throttle = GF_CALLOC (1, sizeof (*throttle), +                              gf_common_mt_tbf_throttle_t); +        if (!throttle) +                return NULL; + +        throttle->done = 0; +        throttle->tokens = tokens_required; +        INIT_LIST_HEAD (&throttle->list); + +        (void) pthread_mutex_init (&throttle->mutex, NULL); +        (void) pthread_cond_init (&throttle->cond, NULL); + +        return throttle; +} + +void +_tbf_dispatch_queued (tbf_bucket_t *bucket) +{ +        gf_boolean_t xcont = _gf_false; +        tbf_throttle_t *tmp = NULL; +        tbf_throttle_t *throttle = NULL; + +        list_for_each_entry_safe (throttle, tmp, &bucket->queued, list) { + +                pthread_mutex_lock (&throttle->mutex); +                { +                        if (bucket->tokens < throttle->tokens) { +                                xcont = _gf_true; +                                goto unblock; +                        } + +                        /* this request can now be serviced */ +                        throttle->done = 1; +                        list_del_init (&throttle->list); + +                        bucket->tokens -= throttle->tokens; +                        pthread_cond_signal (&throttle->cond); +                } +        unblock: +                pthread_mutex_unlock (&throttle->mutex); +                if (xcont) +                        break; +        } +} + +void *tbf_tokengenerator (void *arg) +{ +        unsigned long tokenrate = 0; +        unsigned long maxtokens = 0; +        unsigned long token_gen_interval = 0; +        tbf_bucket_t *bucket = arg; + +        tokenrate = bucket->tokenrate; +        maxtokens = bucket->maxtokens; +        token_gen_interval = bucket->token_gen_interval; + +        while (1) { +                usleep (token_gen_interval); + +                LOCK (&bucket->lock); +                { +                        bucket->tokens += tokenrate; +                        if (bucket->tokens > maxtokens) +                                bucket->tokens = maxtokens; + +                        if (!list_empty (&bucket->queued)) +                                _tbf_dispatch_queued (bucket); +                } +                UNLOCK (&bucket->lock); +        } + +        return NULL; +} + +/** + * There is lazy synchronization between this routine (when invoked + * under tbf_mod() context) and tbf_throttle(). *bucket is + * updated _after_ all the required variables are initialized. + */ +static int32_t +tbf_init_bucket (tbf_t *tbf, tbf_opspec_t *spec) +{ +        int ret = 0; +        tbf_bucket_t *curr = NULL; +        tbf_bucket_t **bucket = NULL; + +        GF_ASSERT (spec->op >= TBF_OP_MIN); +        GF_ASSERT (spec->op <= TBF_OP_MAX); + +        /* no rate? no throttling. */ +        if (!spec->rate) +                return 0; + +        bucket = tbf->bucket + spec->op; + +        curr = GF_CALLOC (1, sizeof (*curr), gf_common_mt_tbf_bucket_t); +        if (!curr) +                goto error_return; + +        LOCK_INIT (&curr->lock); +        INIT_LIST_HEAD (&curr->queued); + +        curr->tokens = 0; +        curr->tokenrate = spec->rate; +        curr->maxtokens = spec->maxlimit; +        curr->token_gen_interval = spec->token_gen_interval; + +        ret = gf_thread_create (&curr->tokener, +                                NULL, tbf_tokengenerator, curr); +        if (ret != 0) +                goto freemem; + +        *bucket = curr; +        return 0; + + freemem: +        LOCK_DESTROY (&curr->lock); +        GF_FREE (curr); + error_return: +        return -1; +} + +#define TBF_ALLOC_SIZE                                               \ +        (sizeof (tbf_t) + (TBF_OP_MAX * sizeof (tbf_bucket_t))) + +tbf_t * +tbf_init (tbf_opspec_t *tbfspec, unsigned int count) +{ +        int32_t i = 0; +        int32_t ret = 0; +        tbf_t *tbf = NULL; +        tbf_opspec_t *opspec = NULL; + +        tbf = GF_CALLOC (1, TBF_ALLOC_SIZE, gf_common_mt_tbf_t); +        if (!tbf) +                goto error_return; + +        tbf->bucket = (tbf_bucket_t **) ((char *)tbf + sizeof (*tbf)); +        for (i = 0; i < TBF_OP_MAX; i++) { +                *(tbf->bucket + i) = NULL; +        } + +        for (i = 0; i < count; i++) { +                opspec = tbfspec + i; + +                ret = tbf_init_bucket (tbf, opspec); +                if (ret) +                        break; +        } + +        if (ret) +                goto error_return; + +        return tbf; + + error_return: +        return NULL; +} + +static void +tbf_mod_bucket (tbf_bucket_t *bucket, tbf_opspec_t *spec) +{ +        LOCK (&bucket->lock); +        { +                bucket->tokens = 0; +                bucket->tokenrate = spec->rate; +                bucket->maxtokens = spec->maxlimit; +        } +        UNLOCK (&bucket->lock); + +        /* next token tick would unqueue pending operations */ +} + +int +tbf_mod (tbf_t *tbf, tbf_opspec_t *tbfspec) +{ +        int              ret    = 0; +        tbf_bucket_t *bucket = NULL; +        tbf_ops_t     op     = TBF_OP_MIN; + +        if (!tbf || !tbfspec) +                return -1; + +        op = tbfspec->op; + +        GF_ASSERT (op >= TBF_OP_MIN); +        GF_ASSERT (op <= TBF_OP_MAX); + +        bucket = *(tbf->bucket + op); +        if (bucket) { +                tbf_mod_bucket (bucket, tbfspec); +        } else { +                ret = tbf_init_bucket (tbf, tbfspec); +        } + +        return ret; +} + +void +tbf_throttle (tbf_t *tbf, tbf_ops_t op, unsigned long tokens_requested) +{ +        char waitq = 0; +        tbf_bucket_t *bucket = NULL; +        tbf_throttle_t *throttle = NULL; + +        GF_ASSERT (op >= TBF_OP_MIN); +        GF_ASSERT (op <= TBF_OP_MAX); + +        bucket = *(tbf->bucket + op); +        if (!bucket) +                return; + +        LOCK (&bucket->lock); +        { +                /** +                 * if there are enough tokens in the bucket there is no need +                 * to throttle the request: therefore, consume the required +                 * number of tokens and continue. +                 */ +                if (tokens_requested <= bucket->tokens) { +                        bucket->tokens -= tokens_requested; +                } else { +                        throttle = tbf_init_throttle (tokens_requested); +                        if (!throttle) /* let it slip through for now.. */ +                                goto unblock; + +                        waitq = 1; +                        pthread_mutex_lock (&throttle->mutex); +                        list_add_tail (&throttle->list, &bucket->queued); +                } +        } + unblock: +        UNLOCK (&bucket->lock); + +        if (waitq) { +                while (!throttle->done) { +                        pthread_cond_wait (&throttle->cond, &throttle->mutex); +                } + +                pthread_mutex_unlock (&throttle->mutex); + +                pthread_mutex_destroy (&throttle->mutex); +                pthread_cond_destroy (&throttle->cond); + +                GF_FREE (throttle); +        } +}  | 
