diff options
Diffstat (limited to 'libglusterfs')
| -rw-r--r-- | libglusterfs/src/Makefile.am | 4 | ||||
| -rw-r--r-- | libglusterfs/src/mem-types.h | 4 | ||||
| -rw-r--r-- | libglusterfs/src/throttle-tbf.c | 291 | ||||
| -rw-r--r-- | libglusterfs/src/throttle-tbf.h | 74 | 
4 files changed, 371 insertions, 2 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am index 63feb380e66..2ec0f34a670 100644 --- a/libglusterfs/src/Makefile.am +++ b/libglusterfs/src/Makefile.am @@ -32,7 +32,7 @@ libglusterfs_la_SOURCES = dict.c xlator.c logging.c \  	$(CONTRIBDIR)/libexecinfo/execinfo.c quota-common-utils.c rot-buffs.c \  	$(CONTRIBDIR)/timer-wheel/timer-wheel.c \  	$(CONTRIBDIR)/timer-wheel/find_last_bit.c tw.c default-args.c locking.c \ -	compound-fop-utils.c +	compound-fop-utils.c throttle-tbf.c  nodist_libglusterfs_la_SOURCES = y.tab.c graph.lex.c defaults.c  nodist_libglusterfs_la_HEADERS = y.tab.h glusterfs-fops.h @@ -51,7 +51,7 @@ libglusterfs_la_HEADERS = common-utils.h defaults.h default-args.h \  	glfs-message-id.h template-component-messages.h strfd.h \  	syncop-utils.h parse-utils.h libglusterfs-messages.h tw.h \  	ctr-messages.h lvm-defaults.h quota-common-utils.h rot-buffs.h \ -	compat-uuid.h upcall-utils.h +	compat-uuid.h upcall-utils.h throttle-tbf.h  libglusterfs_ladir = $(includedir)/glusterfs diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h index 958de8bc634..b2a1a6ba4c7 100644 --- a/libglusterfs/src/mem-types.h +++ b/libglusterfs/src/mem-types.h @@ -167,6 +167,10 @@ enum gf_common_mem_types_ {          gf_common_mt_tw_timer_list,          /*lock migration*/          gf_common_mt_lock_mig, +        /* throttle */ +        gf_common_mt_tbf_t, +        gf_common_mt_tbf_bucket_t, +        gf_common_mt_tbf_throttle_t,          gf_common_mt_end  };  #endif diff --git a/libglusterfs/src/throttle-tbf.c b/libglusterfs/src/throttle-tbf.c new file mode 100644 index 00000000000..16630a243c2 --- /dev/null +++ b/libglusterfs/src/throttle-tbf.c @@ -0,0 +1,291 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +/** + * + * Basic token bucket implementation for rate limiting. As of now interfaces + * to throttle disk read request, directory entry scan and hash calculation + * are available. To throttle a particular request (operation), the call needs + * to be wrapped in-between throttling APIs, for e.g. + * + *  TBF_THROTTLE_BEGIN (...);  <-- induces "delays" if required + *  { + *      call (...); + *  } + *  TBF_THROTTLE_END (...);  <-- not used atm, maybe needed later + * + */ + +#include "mem-pool.h" +#include "throttle-tbf.h" + +typedef struct tbf_throttle { +        char done; + +        pthread_mutex_t mutex; +        pthread_cond_t  cond; + +        unsigned long tokens; + +        struct list_head list; +} tbf_throttle_t; + +static tbf_throttle_t * +tbf_init_throttle (unsigned long tokens_required) +{ +        tbf_throttle_t *throttle = NULL; + +        throttle = GF_CALLOC (1, sizeof (*throttle), +                              gf_common_mt_tbf_throttle_t); +        if (!throttle) +                return NULL; + +        throttle->done = 0; +        throttle->tokens = tokens_required; +        INIT_LIST_HEAD (&throttle->list); + +        (void) pthread_mutex_init (&throttle->mutex, NULL); +        (void) pthread_cond_init (&throttle->cond, NULL); + +        return throttle; +} + +void +_tbf_dispatch_queued (tbf_bucket_t *bucket) +{ +        gf_boolean_t xcont = _gf_false; +        tbf_throttle_t *tmp = NULL; +        tbf_throttle_t *throttle = NULL; + +        list_for_each_entry_safe (throttle, tmp, &bucket->queued, list) { + +                pthread_mutex_lock (&throttle->mutex); +                { +                        if (bucket->tokens < throttle->tokens) { +                                xcont = _gf_true; +                                goto unblock; +                        } + +                        /* this request can now be serviced */ +                        throttle->done = 1; +                        list_del_init (&throttle->list); + +                        bucket->tokens -= throttle->tokens; +                        pthread_cond_signal (&throttle->cond); +                } +        unblock: +                pthread_mutex_unlock (&throttle->mutex); +                if (xcont) +                        break; +        } +} + +void *tbf_tokengenerator (void *arg) +{ +        unsigned long tokenrate = 0; +        unsigned long maxtokens = 0; +        unsigned long token_gen_interval = 0; +        tbf_bucket_t *bucket = arg; + +        tokenrate = bucket->tokenrate; +        maxtokens = bucket->maxtokens; +        token_gen_interval = bucket->token_gen_interval; + +        while (1) { +                usleep (token_gen_interval); + +                LOCK (&bucket->lock); +                { +                        bucket->tokens += tokenrate; +                        if (bucket->tokens > maxtokens) +                                bucket->tokens = maxtokens; + +                        if (!list_empty (&bucket->queued)) +                                _tbf_dispatch_queued (bucket); +                } +                UNLOCK (&bucket->lock); +        } + +        return NULL; +} + +/** + * There is lazy synchronization between this routine (when invoked + * under tbf_mod() context) and tbf_throttle(). *bucket is + * updated _after_ all the required variables are initialized. + */ +static int32_t +tbf_init_bucket (tbf_t *tbf, tbf_opspec_t *spec) +{ +        int ret = 0; +        tbf_bucket_t *curr = NULL; +        tbf_bucket_t **bucket = NULL; + +        GF_ASSERT (spec->op >= TBF_OP_MIN); +        GF_ASSERT (spec->op <= TBF_OP_MAX); + +        /* no rate? no throttling. */ +        if (!spec->rate) +                return 0; + +        bucket = tbf->bucket + spec->op; + +        curr = GF_CALLOC (1, sizeof (*curr), gf_common_mt_tbf_bucket_t); +        if (!curr) +                goto error_return; + +        LOCK_INIT (&curr->lock); +        INIT_LIST_HEAD (&curr->queued); + +        curr->tokens = 0; +        curr->tokenrate = spec->rate; +        curr->maxtokens = spec->maxlimit; +        curr->token_gen_interval = spec->token_gen_interval; + +        ret = gf_thread_create (&curr->tokener, +                                NULL, tbf_tokengenerator, curr); +        if (ret != 0) +                goto freemem; + +        *bucket = curr; +        return 0; + + freemem: +        LOCK_DESTROY (&curr->lock); +        GF_FREE (curr); + error_return: +        return -1; +} + +#define TBF_ALLOC_SIZE                                               \ +        (sizeof (tbf_t) + (TBF_OP_MAX * sizeof (tbf_bucket_t))) + +tbf_t * +tbf_init (tbf_opspec_t *tbfspec, unsigned int count) +{ +        int32_t i = 0; +        int32_t ret = 0; +        tbf_t *tbf = NULL; +        tbf_opspec_t *opspec = NULL; + +        tbf = GF_CALLOC (1, TBF_ALLOC_SIZE, gf_common_mt_tbf_t); +        if (!tbf) +                goto error_return; + +        tbf->bucket = (tbf_bucket_t **) ((char *)tbf + sizeof (*tbf)); +        for (i = 0; i < TBF_OP_MAX; i++) { +                *(tbf->bucket + i) = NULL; +        } + +        for (i = 0; i < count; i++) { +                opspec = tbfspec + i; + +                ret = tbf_init_bucket (tbf, opspec); +                if (ret) +                        break; +        } + +        if (ret) +                goto error_return; + +        return tbf; + + error_return: +        return NULL; +} + +static void +tbf_mod_bucket (tbf_bucket_t *bucket, tbf_opspec_t *spec) +{ +        LOCK (&bucket->lock); +        { +                bucket->tokens = 0; +                bucket->tokenrate = spec->rate; +                bucket->maxtokens = spec->maxlimit; +        } +        UNLOCK (&bucket->lock); + +        /* next token tick would unqueue pending operations */ +} + +int +tbf_mod (tbf_t *tbf, tbf_opspec_t *tbfspec) +{ +        int              ret    = 0; +        tbf_bucket_t *bucket = NULL; +        tbf_ops_t     op     = TBF_OP_MIN; + +        if (!tbf || !tbfspec) +                return -1; + +        op = tbfspec->op; + +        GF_ASSERT (op >= TBF_OP_MIN); +        GF_ASSERT (op <= TBF_OP_MAX); + +        bucket = *(tbf->bucket + op); +        if (bucket) { +                tbf_mod_bucket (bucket, tbfspec); +        } else { +                ret = tbf_init_bucket (tbf, tbfspec); +        } + +        return ret; +} + +void +tbf_throttle (tbf_t *tbf, tbf_ops_t op, unsigned long tokens_requested) +{ +        char waitq = 0; +        tbf_bucket_t *bucket = NULL; +        tbf_throttle_t *throttle = NULL; + +        GF_ASSERT (op >= TBF_OP_MIN); +        GF_ASSERT (op <= TBF_OP_MAX); + +        bucket = *(tbf->bucket + op); +        if (!bucket) +                return; + +        LOCK (&bucket->lock); +        { +                /** +                 * if there are enough tokens in the bucket there is no need +                 * to throttle the request: therefore, consume the required +                 * number of tokens and continue. +                 */ +                if (tokens_requested <= bucket->tokens) { +                        bucket->tokens -= tokens_requested; +                } else { +                        throttle = tbf_init_throttle (tokens_requested); +                        if (!throttle) /* let it slip through for now.. */ +                                goto unblock; + +                        waitq = 1; +                        pthread_mutex_lock (&throttle->mutex); +                        list_add_tail (&throttle->list, &bucket->queued); +                } +        } + unblock: +        UNLOCK (&bucket->lock); + +        if (waitq) { +                while (!throttle->done) { +                        pthread_cond_wait (&throttle->cond, &throttle->mutex); +                } + +                pthread_mutex_unlock (&throttle->mutex); + +                pthread_mutex_destroy (&throttle->mutex); +                pthread_cond_destroy (&throttle->cond); + +                GF_FREE (throttle); +        } +} diff --git a/libglusterfs/src/throttle-tbf.h b/libglusterfs/src/throttle-tbf.h new file mode 100644 index 00000000000..b6e04962ca4 --- /dev/null +++ b/libglusterfs/src/throttle-tbf.h @@ -0,0 +1,74 @@ +/* +   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> +   This file is part of GlusterFS. + +   This file is licensed to you under your choice of the GNU Lesser +   General Public License, version 3 or any later version (LGPLv3 or +   later), or the GNU General Public License, version 2 (GPLv2), in all +   cases as published by the Free Software Foundation. +*/ + +#include "list.h" +#include "xlator.h" +#include "locking.h" + +#ifndef THROTTLE_TBF_H__ +#define THROTTLE_TBF_H__ + +typedef enum tbf_ops { +        TBF_OP_MIN     = -1, +        TBF_OP_HASH    = 0,    /* checksum calculation  */ +        TBF_OP_READ    = 1,    /* inode read(s)         */ +        TBF_OP_READDIR = 2,    /* dentry read(s)        */ +        TBF_OP_MAX     = 3, +} tbf_ops_t; + +/** + * Operation rate specification + */ +typedef struct tbf_opspec { +        tbf_ops_t op; + +        unsigned long rate; + +        unsigned long maxlimit; + +        unsigned long token_gen_interval;/* Token generation interval in usec */ +} tbf_opspec_t; + +/** + * Token bucket for each operation type + */ +typedef struct tbf_bucket { +        gf_lock_t lock; + +        pthread_t tokener;         /* token generator thread          */ + +        unsigned long tokenrate;   /* token generation rate           */ + +        unsigned long tokens;      /* number of current tokens        */ + +        unsigned long maxtokens;   /* maximum token in the bucket     */ + +        struct list_head queued;   /* list of non-conformant requests */ + +        unsigned long token_gen_interval;/* Token generation interval in usec */ +} tbf_bucket_t; + +typedef struct tbf { +        tbf_bucket_t **bucket; +} tbf_t; + +tbf_t * +tbf_init (tbf_opspec_t *, unsigned int); + +int +tbf_mod (tbf_t *, tbf_opspec_t *); + +void +tbf_throttle (tbf_t *, tbf_ops_t, unsigned long); + +#define TBF_THROTTLE_BEGIN(tbf, op, tokens) (tbf_throttle (tbf, op, tokens)) +#define TBF_THROTTLE_END(tbf, op, tokens) + +#endif /** THROTTLE_TBF_H__ */  | 
