diff options
Diffstat (limited to 'xlators/performance/io-threads')
| -rw-r--r-- | xlators/performance/io-threads/src/Makefile.am | 10 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads-messages.h | 41 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 2254 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 120 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/iot-mem-types.h | 21 |
5 files changed, 1570 insertions, 876 deletions
diff --git a/xlators/performance/io-threads/src/Makefile.am b/xlators/performance/io-threads/src/Makefile.am index 38dea3eb7fc..7570cf41ed2 100644 --- a/xlators/performance/io-threads/src/Makefile.am +++ b/xlators/performance/io-threads/src/Makefile.am @@ -1,14 +1,16 @@ xlator_LTLIBRARIES = io-threads.la xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/performance -io_threads_la_LDFLAGS = -module -avoidversion +io_threads_la_LDFLAGS = -module $(GF_XLATOR_DEFAULT_LDFLAGS) io_threads_la_SOURCES = io-threads.c io_threads_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la -noinst_HEADERS = io-threads.h +noinst_HEADERS = io-threads.h iot-mem-types.h io-threads-messages.h -AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ - -I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) +AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \ + -I$(top_srcdir)/rpc/xdr/src -I$(top_builddir)/rpc/xdr/src + +AM_CFLAGS = -Wall $(GF_CFLAGS) CLEANFILES = diff --git a/xlators/performance/io-threads/src/io-threads-messages.h b/xlators/performance/io-threads/src/io-threads-messages.h new file mode 100644 index 00000000000..6229c353f96 --- /dev/null +++ b/xlators/performance/io-threads/src/io-threads-messages.h @@ -0,0 +1,41 @@ +/*Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef _IO_THREADS_MESSAGES_H_ +#define _IO_THREADS_MESSAGES_H_ + +#include <glusterfs/glfs-message-id.h> + +/* To add new message IDs, append new identifiers at the end of the list. + * + * Never remove a message ID. If it's not used anymore, you can rename it or + * leave it as it is, but not delete it. This is to prevent reutilization of + * IDs by other messages. + * + * The component name must match one of the entries defined in + * glfs-message-id.h. + */ + +GLFS_MSGID(IO_THREADS, IO_THREADS_MSG_INIT_FAILED, + IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED, IO_THREADS_MSG_NO_MEMORY, + IO_THREADS_MSG_VOL_MISCONFIGURED, IO_THREADS_MSG_SIZE_NOT_SET, + IO_THREADS_MSG_OUT_OF_MEMORY, IO_THREADS_MSG_PTHREAD_INIT_FAILED, + IO_THREADS_MSG_WORKER_THREAD_INIT_FAILED); + +#define IO_THREADS_MSG_INIT_FAILED_STR "Thread attribute initialization failed" +#define IO_THREADS_MSG_SIZE_NOT_SET_STR "Using default thread stack size" +#define IO_THREADS_MSG_NO_MEMORY_STR "Memory accounting init failed" +#define IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED_STR \ + "FATAL: iot not configured with exactly one child" +#define IO_THREADS_MSG_VOL_MISCONFIGURED_STR "dangling volume. check volfile" +#define IO_THREADS_MSG_OUT_OF_MEMORY_STR "out of memory" +#define IO_THREADS_MSG_PTHREAD_INIT_FAILED_STR "init failed" +#define IO_THREADS_MSG_WORKER_THREAD_INIT_FAILED_STR \ + "cannot initialize worker threads, exiting init" +#endif /* _IO_THREADS_MESSAGES_H_ */ diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 9ace53ba520..3d24cc97f4b 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -1,976 +1,1590 @@ /* - Copyright (c) 2006-2009 Z RESEARCH, Inc. <http://www.zresearch.com> + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. +#include <glusterfs/call-stub.h> +#include <glusterfs/defaults.h> +#include <glusterfs/glusterfs.h> +#include <glusterfs/logging.h> +#include <glusterfs/dict.h> +#include <glusterfs/xlator.h> +#include "io-threads.h" +#include <signal.h> +#include <stdlib.h> +#include <sys/time.h> +#include <time.h> +#include <glusterfs/locking.h> +#include "io-threads-messages.h" +#include <glusterfs/timespec.h> + +void * +iot_worker(void *arg); +int +iot_workers_scale(iot_conf_t *conf); +int +__iot_workers_scale(iot_conf_t *conf); +struct volume_options options[]; + +#define IOT_FOP(name, frame, this, args...) \ + do { \ + call_stub_t *__stub = NULL; \ + int __ret = -1; \ + \ + __stub = fop_##name##_stub(frame, default_##name##_resume, args); \ + if (!__stub) { \ + __ret = -ENOMEM; \ + goto out; \ + } \ + \ + __ret = iot_schedule(frame, this, __stub); \ + \ + out: \ + if (__ret < 0) { \ + default_##name##_failure_cbk(frame, -__ret); \ + if (__stub != NULL) { \ + call_stub_destroy(__stub); \ + } \ + } \ + } while (0) + +iot_client_ctx_t * +iot_get_ctx(xlator_t *this, client_t *client) +{ + iot_client_ctx_t *ctx = NULL; + iot_client_ctx_t *setted_ctx = NULL; + int i; + + if (client_ctx_get(client, this, (void **)&ctx) != 0) { + ctx = GF_MALLOC(GF_FOP_PRI_MAX * sizeof(*ctx), gf_iot_mt_client_ctx_t); + if (ctx) { + for (i = 0; i < GF_FOP_PRI_MAX; ++i) { + INIT_LIST_HEAD(&ctx[i].clients); + INIT_LIST_HEAD(&ctx[i].reqs); + } + setted_ctx = client_ctx_set(client, this, ctx); + if (ctx != setted_ctx) { + GF_FREE(ctx); + ctx = setted_ctx; + } + } + } - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. -*/ + return ctx; +} -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif +call_stub_t * +__iot_dequeue(iot_conf_t *conf, int *pri) +{ + call_stub_t *stub = NULL; + int i = 0; + iot_client_ctx_t *ctx; + + *pri = -1; + for (i = 0; i < GF_FOP_PRI_MAX; i++) { + if (conf->ac_iot_count[i] >= conf->ac_iot_limit[i]) { + continue; + } -#include "call-stub.h" -#include "glusterfs.h" -#include "logging.h" -#include "dict.h" -#include "xlator.h" -#include "io-threads.h" + if (list_empty(&conf->clients[i])) { + continue; + } -static void -iot_queue (iot_worker_t *worker, - call_stub_t *stub); + /* Get the first per-client queue for this priority. */ + ctx = list_first_entry(&conf->clients[i], iot_client_ctx_t, clients); + if (!ctx) { + continue; + } -static call_stub_t * -iot_dequeue (iot_worker_t *worker); + if (list_empty(&ctx->reqs)) { + continue; + } -static void -iot_schedule (iot_conf_t *conf, - inode_t *inode, - call_stub_t *stub) + /* Get the first request on that queue. */ + stub = list_first_entry(&ctx->reqs, call_stub_t, list); + list_del_init(&stub->list); + if (list_empty(&ctx->reqs)) { + list_del_init(&ctx->clients); + } else { + list_rotate_left(&conf->clients[i]); + } + + conf->ac_iot_count[i]++; + conf->queue_marked[i] = _gf_false; + *pri = i; + break; + } + + if (!stub) + return NULL; + + conf->queue_size--; + conf->queue_sizes[*pri]--; + + return stub; +} + +void +__iot_enqueue(iot_conf_t *conf, call_stub_t *stub, int pri) { - int32_t idx = 0; - iot_worker_t *selected_worker = NULL; + client_t *client = stub->frame->root->client; + iot_client_ctx_t *ctx; - idx = (inode->ino % conf->thread_count); - selected_worker = conf->workers[idx]; + if (pri < 0 || pri >= GF_FOP_PRI_MAX) + pri = GF_FOP_PRI_MAX - 1; - iot_queue (selected_worker, stub); + if (client) { + ctx = iot_get_ctx(THIS, client); + if (ctx) { + ctx = &ctx[pri]; + } + } else { + ctx = NULL; + } + if (!ctx) { + ctx = &conf->no_client[pri]; + } + + if (list_empty(&ctx->reqs)) { + list_add_tail(&ctx->clients, &conf->clients[pri]); + } + list_add_tail(&stub->list, &ctx->reqs); + + conf->queue_size++; + GF_ATOMIC_INC(conf->stub_cnt); + conf->queue_sizes[pri]++; } -int32_t -iot_open_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - fd_t *fd) +void * +iot_worker(void *data) { - STACK_UNWIND (frame, op_ret, op_errno, fd); - return 0; + iot_conf_t *conf = NULL; + xlator_t *this = NULL; + call_stub_t *stub = NULL; + struct timespec sleep_till = { + 0, + }; + int ret = 0; + int pri = -1; + gf_boolean_t bye = _gf_false; + + conf = data; + this = conf->this; + THIS = this; + + for (;;) { + pthread_mutex_lock(&conf->mutex); + { + if (pri != -1) { + conf->ac_iot_count[pri]--; + pri = -1; + } + while (conf->queue_size == 0) { + if (conf->down) { + bye = _gf_true; /*Avoid sleep*/ + break; + } + + clock_gettime(CLOCK_REALTIME_COARSE, &sleep_till); + sleep_till.tv_sec += conf->idle_time; + + conf->sleep_count++; + ret = pthread_cond_timedwait(&conf->cond, &conf->mutex, + &sleep_till); + conf->sleep_count--; + + if (conf->down || ret == ETIMEDOUT) { + bye = _gf_true; + break; + } + } + + if (bye) { + if (conf->down || conf->curr_count > IOT_MIN_THREADS) { + conf->curr_count--; + if (conf->curr_count == 0) + pthread_cond_broadcast(&conf->cond); + gf_msg_debug(conf->this->name, 0, + "terminated. " + "conf->curr_count=%d", + conf->curr_count); + } else { + bye = _gf_false; + } + } + + if (!bye) + stub = __iot_dequeue(conf, &pri); + } + pthread_mutex_unlock(&conf->mutex); + + if (stub) { /* guard against spurious wakeups */ + if (stub->poison) { + gf_log(this->name, GF_LOG_INFO, "Dropping poisoned request %p.", + stub); + call_stub_destroy(stub); + } else { + call_resume(stub); + } + GF_ATOMIC_DEC(conf->stub_cnt); + } + stub = NULL; + + if (bye) + break; + } + + return NULL; } -static int32_t -iot_open_wrapper (call_frame_t * frame, - xlator_t * this, - loc_t *loc, - int32_t flags, - fd_t * fd) +int +do_iot_schedule(iot_conf_t *conf, call_stub_t *stub, int pri) { - STACK_WIND (frame, iot_open_cbk, FIRST_CHILD (this), - FIRST_CHILD (this)->fops->open, loc, flags, fd); - return 0; + int ret = 0; + + pthread_mutex_lock(&conf->mutex); + { + __iot_enqueue(conf, stub, pri); + + pthread_cond_signal(&conf->cond); + + ret = __iot_workers_scale(conf); + } + pthread_mutex_unlock(&conf->mutex); + + return ret; } -int32_t -iot_open (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - int32_t flags, - fd_t *fd) -{ - call_stub_t *stub = NULL; - - stub = fop_open_stub (frame, iot_open_wrapper, loc, flags, fd); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot get open call stub"); - STACK_UNWIND (frame, -1, ENOMEM, NULL, 0); - return 0; - } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); +char * +iot_get_pri_meaning(gf_fop_pri_t pri) +{ + char *name = NULL; + switch (pri) { + case GF_FOP_PRI_HI: + name = "fast"; + break; + case GF_FOP_PRI_NORMAL: + name = "normal"; + break; + case GF_FOP_PRI_LO: + name = "slow"; + break; + case GF_FOP_PRI_LEAST: + name = "least"; + break; + case GF_FOP_PRI_MAX: + name = "invalid"; + break; + case GF_FOP_PRI_UNSPEC: + name = "unspecified"; + break; + } + return name; +} - return 0; +int +iot_schedule(call_frame_t *frame, xlator_t *this, call_stub_t *stub) +{ + int ret = -1; + gf_fop_pri_t pri = GF_FOP_PRI_MAX - 1; + iot_conf_t *conf = this->private; + + if ((frame->root->pid < GF_CLIENT_PID_MAX) && + (frame->root->pid != GF_CLIENT_PID_NO_ROOT_SQUASH) && + conf->least_priority) { + pri = GF_FOP_PRI_LEAST; + goto out; + } + + switch (stub->fop) { + case GF_FOP_OPEN: + case GF_FOP_STAT: + case GF_FOP_FSTAT: + case GF_FOP_LOOKUP: + case GF_FOP_ACCESS: + case GF_FOP_READLINK: + case GF_FOP_OPENDIR: + case GF_FOP_STATFS: + case GF_FOP_READDIR: + case GF_FOP_READDIRP: + case GF_FOP_GETACTIVELK: + case GF_FOP_SETACTIVELK: + case GF_FOP_ICREATE: + case GF_FOP_NAMELINK: + pri = GF_FOP_PRI_HI; + break; + + case GF_FOP_CREATE: + case GF_FOP_FLUSH: + case GF_FOP_LK: + case GF_FOP_INODELK: + case GF_FOP_FINODELK: + case GF_FOP_ENTRYLK: + case GF_FOP_FENTRYLK: + case GF_FOP_LEASE: + case GF_FOP_UNLINK: + case GF_FOP_SETATTR: + case GF_FOP_FSETATTR: + case GF_FOP_MKNOD: + case GF_FOP_MKDIR: + case GF_FOP_RMDIR: + case GF_FOP_SYMLINK: + case GF_FOP_RENAME: + case GF_FOP_LINK: + case GF_FOP_SETXATTR: + case GF_FOP_GETXATTR: + case GF_FOP_FGETXATTR: + case GF_FOP_FSETXATTR: + case GF_FOP_REMOVEXATTR: + case GF_FOP_FREMOVEXATTR: + case GF_FOP_PUT: + pri = GF_FOP_PRI_NORMAL; + break; + + case GF_FOP_READ: + case GF_FOP_WRITE: + case GF_FOP_FSYNC: + case GF_FOP_TRUNCATE: + case GF_FOP_FTRUNCATE: + case GF_FOP_FSYNCDIR: + case GF_FOP_XATTROP: + case GF_FOP_FXATTROP: + case GF_FOP_RCHECKSUM: + case GF_FOP_FALLOCATE: + case GF_FOP_DISCARD: + case GF_FOP_ZEROFILL: + case GF_FOP_SEEK: + pri = GF_FOP_PRI_LO; + break; + + case GF_FOP_FORGET: + case GF_FOP_RELEASE: + case GF_FOP_RELEASEDIR: + case GF_FOP_GETSPEC: + break; + case GF_FOP_IPC: + default: + return -EINVAL; + } +out: + gf_msg_debug(this->name, 0, "%s scheduled as %s priority fop", + gf_fop_list[stub->fop], iot_get_pri_meaning(pri)); + if (this->private) + ret = do_iot_schedule(this->private, stub, pri); + return ret; } +int +iot_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) +{ + IOT_FOP(lookup, frame, this, loc, xdata); + return 0; +} -int32_t -iot_create_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - fd_t *fd, - inode_t *inode, - struct stat *stbuf) +int +iot_setattr(call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *stbuf, + int32_t valid, dict_t *xdata) { - STACK_UNWIND (frame, op_ret, op_errno, fd, inode, stbuf); - return 0; + IOT_FOP(setattr, frame, this, loc, stbuf, valid, xdata); + return 0; } -int32_t -iot_create_wrapper (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - int32_t flags, - mode_t mode, - fd_t *fd) -{ - STACK_WIND (frame, - iot_create_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->create, - loc, - flags, - mode, - fd); - return 0; +int +iot_fsetattr(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iatt *stbuf, + int32_t valid, dict_t *xdata) +{ + IOT_FOP(fsetattr, frame, this, fd, stbuf, valid, xdata); + return 0; } -int32_t -iot_create (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - int32_t flags, - mode_t mode, - fd_t *fd) -{ - call_stub_t *stub = NULL; - - stub = fop_create_stub (frame, iot_create_wrapper, loc, flags, mode, - fd); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot get create call stub"); - STACK_UNWIND (frame, -1, ENOMEM, NULL, 0); - return 0; - } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); - return 0; +int +iot_access(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t mask, + dict_t *xdata) +{ + IOT_FOP(access, frame, this, loc, mask, xdata); + return 0; } -int32_t -iot_readv_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - struct iovec *vector, - int32_t count, - struct stat *stbuf) +int +iot_readlink(call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size, + dict_t *xdata) { - STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); + IOT_FOP(readlink, frame, this, loc, size, xdata); + return 0; +} - return 0; +int +iot_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, + dev_t rdev, mode_t umask, dict_t *xdata) +{ + IOT_FOP(mknod, frame, this, loc, mode, rdev, umask, xdata); + return 0; } -static int32_t -iot_readv_wrapper (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - size_t size, - off_t offset) +int +iot_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, + mode_t umask, dict_t *xdata) { - STACK_WIND (frame, - iot_readv_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->readv, - fd, - size, - offset); - return 0; + IOT_FOP(mkdir, frame, this, loc, mode, umask, xdata); + return 0; } -int32_t -iot_readv (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - size_t size, - off_t offset) -{ - call_stub_t *stub; - stub = fop_readv_stub (frame, - iot_readv_wrapper, - fd, - size, - offset); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, - "cannot get readv call stub"); - STACK_UNWIND (frame, -1, ENOMEM, NULL, 0); - return 0; - } - - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); - return 0; +int +iot_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags, + dict_t *xdata) +{ + IOT_FOP(rmdir, frame, this, loc, flags, xdata); + return 0; } -int32_t -iot_flush_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno) +int +iot_symlink(call_frame_t *frame, xlator_t *this, const char *linkname, + loc_t *loc, mode_t umask, dict_t *xdata) { - STACK_UNWIND (frame, op_ret, op_errno); - return 0; + IOT_FOP(symlink, frame, this, linkname, loc, umask, xdata); + return 0; } -static int32_t -iot_flush_wrapper (call_frame_t *frame, - xlator_t *this, - fd_t *fd) +int +iot_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, + dict_t *xdata) { - STACK_WIND (frame, - iot_flush_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->flush, - fd); - return 0; + IOT_FOP(rename, frame, this, oldloc, newloc, xdata); + return 0; } -int32_t -iot_flush (call_frame_t *frame, - xlator_t *this, - fd_t *fd) +int +iot_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, + fd_t *fd, dict_t *xdata) { - call_stub_t *stub; - stub = fop_flush_stub (frame, - iot_flush_wrapper, - fd); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot get flush_cbk call stub"); - STACK_UNWIND (frame, -1, ENOMEM); - return 0; - } + IOT_FOP(open, frame, this, loc, flags, fd, xdata); + return 0; +} - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); - return 0; +int +iot_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, + mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata) +{ + IOT_FOP(create, frame, this, loc, flags, mode, umask, fd, xdata); + return 0; } -int32_t -iot_fsync_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno) +int +iot_put(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, + mode_t umask, uint32_t flags, struct iovec *vector, int32_t count, + off_t offset, struct iobref *iobref, dict_t *xattr, dict_t *xdata) { - STACK_UNWIND (frame, op_ret, op_errno); - return 0; + IOT_FOP(put, frame, this, loc, mode, umask, flags, vector, count, offset, + iobref, xattr, xdata); + return 0; } -static int32_t -iot_fsync_wrapper (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - int32_t datasync) +int +iot_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t offset, uint32_t flags, dict_t *xdata) { - STACK_WIND (frame, - iot_fsync_cbk, - FIRST_CHILD (this), - FIRST_CHILD (this)->fops->fsync, - fd, - datasync); - return 0; + IOT_FOP(readv, frame, this, fd, size, offset, flags, xdata); + return 0; } -int32_t -iot_fsync (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - int32_t datasync) -{ - call_stub_t *stub; - stub = fop_fsync_stub (frame, - iot_fsync_wrapper, - fd, - datasync); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot get fsync_cbk call stub"); - STACK_UNWIND (frame, -1, ENOMEM); - return 0; - } - - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); - return 0; +int +iot_flush(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) +{ + IOT_FOP(flush, frame, this, fd, xdata); + return 0; } -int32_t -iot_writev_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - struct stat *stbuf) -{ - STACK_UNWIND (frame, op_ret, op_errno, stbuf); - return 0; -} - -static int32_t -iot_writev_wrapper (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - struct iovec *vector, - int32_t count, - off_t offset) -{ - STACK_WIND (frame, - iot_writev_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->writev, - fd, - vector, - count, - offset); - return 0; +int +iot_fsync(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync, + dict_t *xdata) +{ + IOT_FOP(fsync, frame, this, fd, datasync, xdata); + return 0; } -int32_t -iot_writev (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - struct iovec *vector, - int32_t count, - off_t offset) +int +iot_writev(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, + int32_t count, off_t offset, uint32_t flags, struct iobref *iobref, + dict_t *xdata) { - call_stub_t *stub; - stub = fop_writev_stub (frame, iot_writev_wrapper, - fd, vector, count, offset); + IOT_FOP(writev, frame, this, fd, vector, count, offset, flags, iobref, + xdata); + return 0; +} - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot get writev call stub"); - STACK_UNWIND (frame, -1, ENOMEM); - return 0; - } +int +iot_lk(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t cmd, + struct gf_flock *flock, dict_t *xdata) +{ + IOT_FOP(lk, frame, this, fd, cmd, flock, xdata); + return 0; +} - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); +int +iot_stat(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) +{ + IOT_FOP(stat, frame, this, loc, xdata); + return 0; +} - return 0; +int +iot_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata) +{ + IOT_FOP(fstat, frame, this, fd, xdata); + return 0; } +int +iot_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset, + dict_t *xdata) +{ + IOT_FOP(truncate, frame, this, loc, offset, xdata); + return 0; +} -int32_t -iot_lk_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - struct flock *flock) +int +iot_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + dict_t *xdata) { - STACK_UNWIND (frame, op_ret, op_errno, flock); - return 0; + IOT_FOP(ftruncate, frame, this, fd, offset, xdata); + return 0; } +int +iot_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t xflag, + dict_t *xdata) +{ + IOT_FOP(unlink, frame, this, loc, xflag, xdata); + return 0; +} -static int32_t -iot_lk_wrapper (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - int32_t cmd, - struct flock *flock) +int +iot_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, + dict_t *xdata) { - STACK_WIND (frame, - iot_lk_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->lk, - fd, - cmd, - flock); - return 0; + IOT_FOP(link, frame, this, oldloc, newloc, xdata); + return 0; } +int +iot_opendir(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd, + dict_t *xdata) +{ + IOT_FOP(opendir, frame, this, loc, fd, xdata); + return 0; +} -int32_t -iot_lk (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - int32_t cmd, - struct flock *flock) +int +iot_fsyncdir(call_frame_t *frame, xlator_t *this, fd_t *fd, int datasync, + dict_t *xdata) { - call_stub_t *stub; - stub = fop_lk_stub (frame, iot_lk_wrapper, - fd, cmd, flock); + IOT_FOP(fsyncdir, frame, this, fd, datasync, xdata); + return 0; +} - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot get fop_lk call stub"); - STACK_UNWIND (frame, -1, ENOMEM, NULL); - return 0; - } - - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); - return 0; -} - - -int32_t -iot_stat_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - struct stat *buf) +int +iot_statfs(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) { - STACK_UNWIND (frame, op_ret, op_errno, buf); - return 0; -} - - -static int32_t -iot_stat_wrapper (call_frame_t *frame, - xlator_t *this, - loc_t *loc) + IOT_FOP(statfs, frame, this, loc, xdata); + return 0; +} + +int +iot_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict, + int32_t flags, dict_t *xdata) { - STACK_WIND (frame, - iot_stat_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->stat, - loc); - return 0; + IOT_FOP(setxattr, frame, this, loc, dict, flags, xdata); + return 0; } -int32_t -iot_stat (call_frame_t *frame, - xlator_t *this, - loc_t *loc) -{ - call_stub_t *stub; - fd_t *fd = NULL; +int +iot_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name, + dict_t *xdata) +{ + iot_conf_t *conf = NULL; + dict_t *depths = NULL; + int i = 0; + int32_t op_ret = 0; + int32_t op_errno = 0; + + conf = this->private; + + if (name && strcmp(name, IO_THREADS_QUEUE_SIZE_KEY) == 0) { + /* + * We explicitly do not want a reference count + * for this dict in this translator + */ + depths = dict_new(); + if (!depths) { + op_ret = -1; + op_errno = ENOMEM; + goto unwind_special_getxattr; + } + + for (i = 0; i < GF_FOP_PRI_MAX; i++) { + if (dict_set_int32(depths, (char *)fop_pri_to_string(i), + conf->queue_sizes[i]) != 0) { + dict_unref(depths); + depths = NULL; + goto unwind_special_getxattr; + } + } + + unwind_special_getxattr: + STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, depths, xdata); + if (depths) + dict_unref(depths); + return 0; + } + + IOT_FOP(getxattr, frame, this, loc, name, xdata); + return 0; +} - fd = fd_lookup (loc->inode, frame->root->pid); - - if (fd == NULL) { - STACK_WIND(frame, - iot_stat_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->stat, - loc); - return 0; - } - - fd_unref (fd); - - stub = fop_stat_stub (frame, - iot_stat_wrapper, - loc); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub"); - STACK_UNWIND (frame, -1, ENOMEM, NULL); - return 0; - } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); - - return 0; -} - - -int32_t -iot_fstat_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - struct stat *buf) -{ - STACK_UNWIND (frame, op_ret, op_errno, buf); - return 0; -} - -static int32_t -iot_fstat_wrapper (call_frame_t *frame, - xlator_t *this, - fd_t *fd) -{ - STACK_WIND (frame, - iot_fstat_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->fstat, - fd); - return 0; -} - -int32_t -iot_fstat (call_frame_t *frame, - xlator_t *this, - fd_t *fd) -{ - call_stub_t *stub; - stub = fop_fstat_stub (frame, - iot_fstat_wrapper, - fd); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot get fop_fstat call stub"); - STACK_UNWIND (frame, -1, ENOMEM, NULL); - return 0; - } - - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); - - return 0; -} - -int32_t -iot_truncate_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - struct stat *buf) -{ - STACK_UNWIND (frame, op_ret, op_errno, buf); - return 0; -} - -static int32_t -iot_truncate_wrapper (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - off_t offset) -{ - STACK_WIND (frame, - iot_truncate_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->truncate, - loc, - offset); - return 0; -} - -int32_t -iot_truncate (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - off_t offset) -{ - call_stub_t *stub; - fd_t *fd = NULL; - - fd = fd_lookup (loc->inode, frame->root->pid); - - if (fd == NULL) { - STACK_WIND(frame, - iot_truncate_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->truncate, - loc, - offset); - return 0; - } - - fd_unref (fd); - - stub = fop_truncate_stub (frame, - iot_truncate_wrapper, - loc, - offset); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub"); - STACK_UNWIND (frame, -1, ENOMEM, NULL); - return 0; - } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); - - return 0; -} - -int32_t -iot_ftruncate_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - struct stat *buf) -{ - STACK_UNWIND (frame, op_ret, op_errno, buf); - return 0; -} - -static int32_t -iot_ftruncate_wrapper (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - off_t offset) -{ - STACK_WIND (frame, - iot_ftruncate_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->ftruncate, - fd, - offset); - return 0; -} - -int32_t -iot_ftruncate (call_frame_t *frame, - xlator_t *this, - fd_t *fd, - off_t offset) -{ - call_stub_t *stub; - stub = fop_ftruncate_stub (frame, - iot_ftruncate_wrapper, - fd, - offset); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot get fop_ftruncate call stub"); - STACK_UNWIND (frame, -1, ENOMEM, NULL); - return 0; - } - iot_schedule ((iot_conf_t *)this->private, fd->inode, stub); - - return 0; -} - -int32_t -iot_utimens_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - struct stat *buf) -{ - STACK_UNWIND (frame, op_ret, op_errno, buf); - return 0; -} - -static int32_t -iot_utimens_wrapper (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - struct timespec tv[2]) -{ - STACK_WIND (frame, - iot_utimens_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->utimens, - loc, - tv); - - return 0; -} - -int32_t -iot_utimens (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - struct timespec tv[2]) -{ - call_stub_t *stub; - fd_t *fd = NULL; - - fd = fd_lookup (loc->inode, frame->root->pid); - - if (fd == NULL) { - STACK_WIND(frame, - iot_utimens_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->utimens, - loc, - tv); - return 0; - } - - fd_unref (fd); - - stub = fop_utimens_stub (frame, - iot_utimens_wrapper, - loc, - tv); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot get fop_utimens call stub"); - STACK_UNWIND (frame, -1, ENOMEM, NULL); - return 0; - } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); - - return 0; -} - - -int32_t -iot_checksum_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno, - uint8_t *file_checksum, - uint8_t *dir_checksum) -{ - STACK_UNWIND (frame, op_ret, op_errno, file_checksum, dir_checksum); - return 0; -} - -static int32_t -iot_checksum_wrapper (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - int32_t flags) -{ - STACK_WIND (frame, - iot_checksum_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->checksum, - loc, - flags); - - return 0; -} - -int32_t -iot_checksum (call_frame_t *frame, - xlator_t *this, - loc_t *loc, - int32_t flags) -{ - call_stub_t *stub = NULL; - - stub = fop_checksum_stub (frame, - iot_checksum_wrapper, - loc, - flags); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot get fop_checksum call stub"); - STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL); - return 0; - } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); - - return 0; -} - - -int32_t -iot_unlink_cbk (call_frame_t *frame, - void *cookie, - xlator_t *this, - int32_t op_ret, - int32_t op_errno) -{ - STACK_UNWIND (frame, op_ret, op_errno); - return 0; -} - -static int32_t -iot_unlink_wrapper (call_frame_t *frame, - xlator_t *this, - loc_t *loc) -{ - STACK_WIND (frame, - iot_unlink_cbk, - FIRST_CHILD(this), - FIRST_CHILD(this)->fops->unlink, - loc); - - return 0; -} - -int32_t -iot_unlink (call_frame_t *frame, - xlator_t *this, - loc_t *loc) +int +iot_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name, + dict_t *xdata) { - call_stub_t *stub = NULL; - stub = fop_unlink_stub (frame, iot_unlink_wrapper, loc); - if (!stub) { - gf_log (this->name, GF_LOG_ERROR, "cannot get fop_unlink call stub"); - STACK_UNWIND (frame, -1, ENOMEM); - return 0; - } - iot_schedule ((iot_conf_t *)this->private, loc->inode, stub); + IOT_FOP(fgetxattr, frame, this, fd, name, xdata); + return 0; +} - return 0; +int +iot_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict, + int32_t flags, dict_t *xdata) +{ + IOT_FOP(fsetxattr, frame, this, fd, dict, flags, xdata); + return 0; } +int +iot_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc, + const char *name, dict_t *xdata) +{ + IOT_FOP(removexattr, frame, this, loc, name, xdata); + return 0; +} -static void -iot_queue (iot_worker_t *worker, - call_stub_t *stub) +int +iot_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd, + const char *name, dict_t *xdata) +{ + IOT_FOP(fremovexattr, frame, this, fd, name, xdata); + return 0; +} + +int +iot_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t offset, dict_t *xdata) { - iot_request_t *req = NULL; + IOT_FOP(readdirp, frame, this, fd, size, offset, xdata); + return 0; +} - req = CALLOC (1, sizeof (iot_request_t)); - ERR_ABORT (req); - req->stub = stub; +int +iot_readdir(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, + off_t offset, dict_t *xdata) +{ + IOT_FOP(readdir, frame, this, fd, size, offset, xdata); + return 0; +} - pthread_mutex_lock (&worker->qlock); - { - list_add_tail (&req->list, &worker->rqlist); +int +iot_inodelk(call_frame_t *frame, xlator_t *this, const char *volume, loc_t *loc, + int32_t cmd, struct gf_flock *lock, dict_t *xdata) +{ + IOT_FOP(inodelk, frame, this, volume, loc, cmd, lock, xdata); + return 0; +} - /* dq_cond */ - worker->queue_size++; - pthread_cond_broadcast (&worker->dq_cond); +int +iot_finodelk(call_frame_t *frame, xlator_t *this, const char *volume, fd_t *fd, + int32_t cmd, struct gf_flock *lock, dict_t *xdata) +{ + IOT_FOP(finodelk, frame, this, volume, fd, cmd, lock, xdata); + return 0; +} + +int +iot_entrylk(call_frame_t *frame, xlator_t *this, const char *volume, loc_t *loc, + const char *basename, entrylk_cmd cmd, entrylk_type type, + dict_t *xdata) +{ + IOT_FOP(entrylk, frame, this, volume, loc, basename, cmd, type, xdata); + return 0; +} + +int +iot_fentrylk(call_frame_t *frame, xlator_t *this, const char *volume, fd_t *fd, + const char *basename, entrylk_cmd cmd, entrylk_type type, + dict_t *xdata) +{ + IOT_FOP(fentrylk, frame, this, volume, fd, basename, cmd, type, xdata); + return 0; +} + +int +iot_xattrop(call_frame_t *frame, xlator_t *this, loc_t *loc, + gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata) +{ + IOT_FOP(xattrop, frame, this, loc, optype, xattr, xdata); + return 0; +} + +int +iot_fxattrop(call_frame_t *frame, xlator_t *this, fd_t *fd, + gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata) +{ + IOT_FOP(fxattrop, frame, this, fd, optype, xattr, xdata); + return 0; +} + +int32_t +iot_rchecksum(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + int32_t len, dict_t *xdata) +{ + IOT_FOP(rchecksum, frame, this, fd, offset, len, xdata); + return 0; +} + +int +iot_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode, + off_t offset, size_t len, dict_t *xdata) +{ + IOT_FOP(fallocate, frame, this, fd, mode, offset, len, xdata); + return 0; +} + +int +iot_discard(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + size_t len, dict_t *xdata) +{ + IOT_FOP(discard, frame, this, fd, offset, len, xdata); + return 0; +} + +int +iot_zerofill(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + off_t len, dict_t *xdata) +{ + IOT_FOP(zerofill, frame, this, fd, offset, len, xdata); + return 0; +} + +int +iot_seek(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset, + gf_seek_what_t what, dict_t *xdata) +{ + IOT_FOP(seek, frame, this, fd, offset, what, xdata); + return 0; +} + +int +iot_lease(call_frame_t *frame, xlator_t *this, loc_t *loc, + struct gf_lease *lease, dict_t *xdata) +{ + IOT_FOP(lease, frame, this, loc, lease, xdata); + return 0; +} + +int +iot_getactivelk(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata) +{ + IOT_FOP(getactivelk, frame, this, loc, xdata); + return 0; +} + +int +iot_setactivelk(call_frame_t *frame, xlator_t *this, loc_t *loc, + lock_migration_info_t *locklist, dict_t *xdata) +{ + IOT_FOP(setactivelk, frame, this, loc, locklist, xdata); + return 0; +} + +int +__iot_workers_scale(iot_conf_t *conf) +{ + int scale = 0; + int diff = 0; + pthread_t thread; + int ret = 0; + int i = 0; + + for (i = 0; i < GF_FOP_PRI_MAX; i++) + scale += min(conf->queue_sizes[i], conf->ac_iot_limit[i]); + + if (scale < IOT_MIN_THREADS) + scale = IOT_MIN_THREADS; + + if (scale > conf->max_count) + scale = conf->max_count; + + if (conf->curr_count < scale) { + diff = scale - conf->curr_count; + } + + while (diff) { + diff--; + + ret = gf_thread_create(&thread, &conf->w_attr, iot_worker, conf, + "iotwr%03hx", conf->curr_count & 0x3ff); + if (ret == 0) { + pthread_detach(thread); + conf->curr_count++; + gf_msg_debug(conf->this->name, 0, + "scaled threads to %d (queue_size=%d/%d)", + conf->curr_count, conf->queue_size, scale); + } else { + break; } - pthread_mutex_unlock (&worker->qlock); + } + + return diff; } -static call_stub_t * -iot_dequeue (iot_worker_t *worker) +int +iot_workers_scale(iot_conf_t *conf) { - call_stub_t *stub = NULL; - iot_request_t *req = NULL; + int ret = -1; - pthread_mutex_lock (&worker->qlock); - { - while (!worker->queue_size) - pthread_cond_wait (&worker->dq_cond, &worker->qlock); + if (conf == NULL) { + ret = -EINVAL; + goto out; + } - list_for_each_entry (req, &worker->rqlist, list) - break; - list_del (&req->list); - stub = req->stub; + pthread_mutex_lock(&conf->mutex); + { + ret = __iot_workers_scale(conf); + } + pthread_mutex_unlock(&conf->mutex); - worker->queue_size--; +out: + return ret; +} + +int +set_stack_size(iot_conf_t *conf) +{ + int err = 0; + size_t stacksize = IOT_THREAD_STACK_SIZE; + xlator_t *this = NULL; + + this = THIS; + + err = pthread_attr_init(&conf->w_attr); + if (err != 0) { + gf_smsg(this->name, GF_LOG_ERROR, err, IO_THREADS_MSG_INIT_FAILED, + NULL); + return err; + } + + err = pthread_attr_setstacksize(&conf->w_attr, stacksize); + if (err == EINVAL) { + err = pthread_attr_getstacksize(&conf->w_attr, &stacksize); + if (!err) { + gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET, + "size=%zd", stacksize, NULL); + } else { + gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET, + NULL); + err = 0; } - pthread_mutex_unlock (&worker->qlock); + } + + conf->stack_size = stacksize; + return err; +} + +int32_t +mem_acct_init(xlator_t *this) +{ + int ret = -1; + + if (!this) + return ret; + + ret = xlator_mem_acct_init(this, gf_iot_mt_end + 1); + + if (ret != 0) { + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_NO_MEMORY, + NULL); + return ret; + } + + return ret; +} + +int +iot_priv_dump(xlator_t *this) +{ + iot_conf_t *conf = NULL; + char key_prefix[GF_DUMP_MAX_BUF_LEN]; + char key[GF_DUMP_MAX_BUF_LEN]; + int i = 0; + + if (!this) + return 0; + + conf = this->private; + if (!conf) + return 0; + + snprintf(key_prefix, GF_DUMP_MAX_BUF_LEN, "%s.%s", this->type, this->name); + + gf_proc_dump_add_section("%s", key_prefix); + + gf_proc_dump_write("maximum_threads_count", "%d", conf->max_count); + gf_proc_dump_write("current_threads_count", "%d", conf->curr_count); + gf_proc_dump_write("sleep_count", "%d", conf->sleep_count); + gf_proc_dump_write("idle_time", "%d", conf->idle_time); + gf_proc_dump_write("stack_size", "%zd", conf->stack_size); + gf_proc_dump_write("max_high_priority_threads", "%d", + conf->ac_iot_limit[GF_FOP_PRI_HI]); + gf_proc_dump_write("max_normal_priority_threads", "%d", + conf->ac_iot_limit[GF_FOP_PRI_NORMAL]); + gf_proc_dump_write("max_low_priority_threads", "%d", + conf->ac_iot_limit[GF_FOP_PRI_LO]); + gf_proc_dump_write("max_least_priority_threads", "%d", + conf->ac_iot_limit[GF_FOP_PRI_LEAST]); + gf_proc_dump_write("current_high_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_HI]); + gf_proc_dump_write("current_normal_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_NORMAL]); + gf_proc_dump_write("current_low_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_LO]); + gf_proc_dump_write("current_least_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_LEAST]); + for (i = 0; i < GF_FOP_PRI_MAX; i++) { + if (!conf->queue_sizes[i]) + continue; + snprintf(key, sizeof(key), "%s_priority_queue_length", + iot_get_pri_meaning(i)); + gf_proc_dump_write(key, "%d", conf->queue_sizes[i]); + } + + return 0; +} - FREE (req); +/* + * We use a decay model to keep track and make sure we're not spawning new + * threads too often. Each increment adds a large value to a counter, and that + * counter keeps ticking back down to zero over a fairly long period. For + * example, let's use ONE_WEEK=604800 seconds, and we want to detect when we + * have N=3 increments during that time. Thus, our threshold is + * (N-1)*ONE_WEEK. To see how it works, look at three examples. + * + * (a) Two events close together, then one more almost a week later. The + * first two events push our counter to 2*ONE_WEEK plus a bit. At the third + * event, we decay down to ONE_WEEK plus a bit and then add ONE_WEEK for the + * new event, exceeding our threshold. + * + * (b) One event, then two more almost a week later. At the time of the + * second and third events, the counter is already non-zero, so when we add + * 2*ONE_WEEK we exceed again. + * + * (c) Three events, spaced three days apart. At the time of the second + * event, we decay down to approxitely ONE_WEEK*4/7 and then add another + * ONE_WEEK. At the third event, we decay again down to ONE_WEEK*8/7 and add + * another ONE_WEEK, so boom. + * + * Note that in all three cases if that last event came a day later our counter + * would have decayed a bit more and we would *not* exceed our threshold. It's + * not exactly the same as a precise "three in one week" limit, but it's very + * close and it allows the same kind of tweaking while requiring only constant + * space - no arrays of variable length N to allocate or maintain. All we need + * (for each queue) is the value plus the time of the last update. + */ + +typedef struct { + time_t update_time; + uint32_t value; +} threshold_t; +/* + * Variables so that I can hack these for testing. + * TBD: make these tunable? + */ +static uint32_t THRESH_SECONDS = 604800; +static uint32_t THRESH_EVENTS = 3; +static uint32_t THRESH_LIMIT = 1209600; /* SECONDS * (EVENTS-1) */ - return stub; +static void +iot_apply_event(xlator_t *this, threshold_t *thresh) +{ + time_t delta, now = gf_time(); + + /* Refresh for manual testing/debugging. It's cheap. */ + THRESH_LIMIT = THRESH_SECONDS * (THRESH_EVENTS - 1); + + if (thresh->value && thresh->update_time) { + delta = now - thresh->update_time; + /* Be careful about underflow. */ + if (thresh->value <= delta) { + thresh->value = 0; + } else { + thresh->value -= delta; + } + } + + thresh->value += THRESH_SECONDS; + if (thresh->value >= THRESH_LIMIT) { + gf_log(this->name, GF_LOG_EMERG, "watchdog firing too often"); + /* + * The default action for SIGTRAP is to dump core, but the fact + * that it's distinct from other signals we use means that + * there are other possibilities as well (e.g. drop into gdb or + * invoke a special handler). + */ + kill(getpid(), SIGTRAP); + } + + thresh->update_time = now; } static void * -iot_worker (void *arg) +iot_watchdog(void *arg) { - iot_worker_t *worker = arg; + xlator_t *this = arg; + iot_conf_t *priv = this->private; + int i; + int bad_times[GF_FOP_PRI_MAX] = { + 0, + }; + threshold_t thresholds[GF_FOP_PRI_MAX] = {{ + 0, + }}; + + for (;;) { + sleep(max(priv->watchdog_secs / 5, 1)); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + pthread_mutex_lock(&priv->mutex); + for (i = 0; i < GF_FOP_PRI_MAX; ++i) { + if (priv->queue_marked[i]) { + if (++bad_times[i] >= 5) { + gf_log(this->name, GF_LOG_WARNING, "queue %d stalled", i); + iot_apply_event(this, &thresholds[i]); + /* + * We might not get here if the event + * put us over our threshold. + */ + ++(priv->ac_iot_limit[i]); + bad_times[i] = 0; + } + } else { + bad_times[i] = 0; + } + priv->queue_marked[i] = (priv->queue_sizes[i] > 0); + } + pthread_mutex_unlock(&priv->mutex); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + } - while (1) { - call_stub_t *stub; + /* NOTREACHED */ + return NULL; +} - stub = iot_dequeue (worker); - call_resume (stub); - } +static void +start_iot_watchdog(xlator_t *this) +{ + iot_conf_t *priv = this->private; + int ret; + + if (priv->watchdog_running) { + return; + } + + ret = pthread_create(&priv->watchdog_thread, NULL, iot_watchdog, this); + if (ret == 0) { + priv->watchdog_running = _gf_true; + } else { + gf_log(this->name, GF_LOG_WARNING, + "pthread_create(iot_watchdog) failed"); + } } -static iot_worker_t ** -allocate_worker_array (int count) +static void +stop_iot_watchdog(xlator_t *this) { - iot_worker_t ** warr = NULL; + iot_conf_t *priv = this->private; + + if (!priv->watchdog_running) { + return; + } - warr = CALLOC (count, sizeof(iot_worker_t *)); - ERR_ABORT (warr); + if (pthread_cancel(priv->watchdog_thread) != 0) { + gf_log(this->name, GF_LOG_WARNING, + "pthread_cancel(iot_watchdog) failed"); + } - return warr; + if (pthread_join(priv->watchdog_thread, NULL) != 0) { + gf_log(this->name, GF_LOG_WARNING, "pthread_join(iot_watchdog) failed"); + } + + /* Failure probably means it's already dead. */ + priv->watchdog_running = _gf_false; } -static iot_worker_t * -allocate_worker (iot_conf_t * conf) +int +reconfigure(xlator_t *this, dict_t *options) { - iot_worker_t *wrk = NULL; + iot_conf_t *conf = NULL; + int ret = -1; + + conf = this->private; + if (!conf) + goto out; + + GF_OPTION_RECONF("thread-count", conf->max_count, options, int32, out); + + GF_OPTION_RECONF("high-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_HI], + options, int32, out); - wrk = CALLOC (1, sizeof (iot_worker_t)); - ERR_ABORT (wrk); + GF_OPTION_RECONF("normal-prio-threads", + conf->ac_iot_limit[GF_FOP_PRI_NORMAL], options, int32, + out); - INIT_LIST_HEAD (&wrk->rqlist); - wrk->conf = conf; - pthread_cond_init (&wrk->dq_cond, NULL); - pthread_mutex_init (&wrk->qlock, NULL); + GF_OPTION_RECONF("low-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LO], + options, int32, out); - return wrk; + GF_OPTION_RECONF("least-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LEAST], + options, int32, out); + + GF_OPTION_RECONF("enable-least-priority", conf->least_priority, options, + bool, out); + + GF_OPTION_RECONF("cleanup-disconnected-reqs", + conf->cleanup_disconnected_reqs, options, bool, out); + + GF_OPTION_RECONF("watchdog-secs", conf->watchdog_secs, options, int32, out); + + GF_OPTION_RECONF("pass-through", this->pass_through, options, bool, out); + + if (conf->watchdog_secs > 0) { + start_iot_watchdog(this); + } else { + stop_iot_watchdog(this); + } + + ret = 0; +out: + return ret; } -static void -allocate_workers (iot_conf_t *conf, - int count, - int start_alloc_idx) +int +init(xlator_t *this) { - int i, end_count; + iot_conf_t *conf = NULL; + int ret = -1; + int i = 0; + + if (!this->children || this->children->next) { + gf_smsg("io-threads", GF_LOG_ERROR, 0, + IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED, NULL); + goto out; + } + + if (!this->parents) { + gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_VOL_MISCONFIGURED, + NULL); + } + + conf = (void *)GF_CALLOC(1, sizeof(*conf), gf_iot_mt_iot_conf_t); + if (conf == NULL) { + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_OUT_OF_MEMORY, + NULL); + goto out; + } + + if ((ret = pthread_cond_init(&conf->cond, NULL)) != 0) { + gf_smsg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_PTHREAD_INIT_FAILED, + "pthread_cond_init ret=%d", ret, NULL); + goto out; + } + conf->cond_inited = _gf_true; + + if ((ret = pthread_mutex_init(&conf->mutex, NULL)) != 0) { + gf_smsg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_PTHREAD_INIT_FAILED, + "pthread_mutex_init ret=%d", ret, NULL); + goto out; + } + conf->mutex_inited = _gf_true; + + ret = set_stack_size(conf); + + if (ret != 0) + goto out; + + ret = -1; + + GF_OPTION_INIT("thread-count", conf->max_count, int32, out); + + GF_OPTION_INIT("high-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_HI], + int32, out); + + GF_OPTION_INIT("normal-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_NORMAL], + int32, out); + + GF_OPTION_INIT("low-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LO], int32, + out); + + GF_OPTION_INIT("least-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LEAST], + int32, out); + + GF_OPTION_INIT("idle-time", conf->idle_time, int32, out); - end_count = count + start_alloc_idx; - for (i = start_alloc_idx; i < end_count; i++) { - conf->workers[i] = allocate_worker (conf); - pthread_create (&conf->workers[i]->thread, NULL, iot_worker, - conf->workers[i]); + GF_OPTION_INIT("enable-least-priority", conf->least_priority, bool, out); + + GF_OPTION_INIT("cleanup-disconnected-reqs", conf->cleanup_disconnected_reqs, + bool, out); + + GF_OPTION_INIT("pass-through", this->pass_through, bool, out); + + conf->this = this; + GF_ATOMIC_INIT(conf->stub_cnt, 0); + + for (i = 0; i < GF_FOP_PRI_MAX; i++) { + INIT_LIST_HEAD(&conf->clients[i]); + INIT_LIST_HEAD(&conf->no_client[i].clients); + INIT_LIST_HEAD(&conf->no_client[i].reqs); + } + + if (!this->pass_through) { + ret = iot_workers_scale(conf); + + if (ret == -1) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + IO_THREADS_MSG_WORKER_THREAD_INIT_FAILED, NULL); + goto out; } + } + + this->private = conf; + + conf->watchdog_secs = 0; + GF_OPTION_INIT("watchdog-secs", conf->watchdog_secs, int32, out); + if (conf->watchdog_secs > 0) { + start_iot_watchdog(this); + } + + ret = 0; +out: + if (ret) + GF_FREE(conf); + + return ret; } static void -workers_init (iot_conf_t *conf) +iot_exit_threads(iot_conf_t *conf) { - conf->workers = allocate_worker_array (conf->thread_count); - allocate_workers (conf, conf->thread_count, 0); + pthread_mutex_lock(&conf->mutex); + { + conf->down = _gf_true; + /*Let all the threads know that xl is going down*/ + pthread_cond_broadcast(&conf->cond); + while (conf->curr_count) /*Wait for threads to exit*/ + pthread_cond_wait(&conf->cond, &conf->mutex); + } + pthread_mutex_unlock(&conf->mutex); } +int +notify(xlator_t *this, int32_t event, void *data, ...) +{ + iot_conf_t *conf = this->private; + xlator_t *victim = data; + uint64_t stub_cnt = 0; + struct timespec sleep_till = { + 0, + }; + + if (GF_EVENT_PARENT_DOWN == event) { + if (victim->cleanup_starting) { + /* Wait for draining stub from queue before notify PARENT_DOWN */ + stub_cnt = GF_ATOMIC_GET(conf->stub_cnt); + if (stub_cnt) { + timespec_now_realtime(&sleep_till); + sleep_till.tv_sec += 1; + pthread_mutex_lock(&conf->mutex); + { + while (stub_cnt) { + (void)pthread_cond_timedwait(&conf->cond, &conf->mutex, + &sleep_till); + stub_cnt = GF_ATOMIC_GET(conf->stub_cnt); + } + } + pthread_mutex_unlock(&conf->mutex); + } + + gf_log(this->name, GF_LOG_INFO, + "Notify GF_EVENT_PARENT_DOWN for brick %s", victim->name); + } else { + iot_exit_threads(conf); + } + } + + if (GF_EVENT_CHILD_DOWN == event) { + if (victim->cleanup_starting) { + iot_exit_threads(conf); + gf_log(this->name, GF_LOG_INFO, + "Notify GF_EVENT_CHILD_DOWN for brick %s", victim->name); + } + } + + default_notify(this, event, data); + return 0; +} -int32_t -init (xlator_t *this) +void +fini(xlator_t *this) { - iot_conf_t *conf; - dict_t *options = this->options; + iot_conf_t *conf = this->private; - if (!this->children || this->children->next) { - gf_log ("io-threads", - GF_LOG_ERROR, - "FATAL: iot not configured with exactly one child"); - return -1; - } + if (!conf) + return; - if (!this->parents) { - gf_log (this->name, GF_LOG_WARNING, - "dangling volume. check volfile "); - } + if (conf->mutex_inited && conf->cond_inited) + iot_exit_threads(conf); - conf = (void *) CALLOC (1, sizeof (*conf)); - ERR_ABORT (conf); + if (conf->cond_inited) + pthread_cond_destroy(&conf->cond); - conf->thread_count = 1; + if (conf->mutex_inited) + pthread_mutex_destroy(&conf->mutex); - if (dict_get (options, "thread-count")) { - conf->thread_count = data_to_int32 (dict_get (options, - "thread-count")); - gf_log ("io-threads", - GF_LOG_DEBUG, - "Using conf->thread_count = %d", - conf->thread_count); - } + stop_iot_watchdog(this); - workers_init (conf); + GF_FREE(conf); - this->private = conf; - return 0; + this->private = NULL; + return; } -void -fini (xlator_t *this) +int +iot_client_destroy(xlator_t *this, client_t *client) { - iot_conf_t *conf = this->private; + void *tmp = NULL; - FREE (conf); + if (client_ctx_del(client, this, &tmp) == 0) { + GF_FREE(tmp); + } - this->private = NULL; - return; + return 0; } -struct xlator_fops fops = { - .open = iot_open, - .create = iot_create, - .readv = iot_readv, - .writev = iot_writev, - .flush = iot_flush, - .fsync = iot_fsync, - .lk = iot_lk, - .stat = iot_stat, - .fstat = iot_fstat, - .truncate = iot_truncate, - .ftruncate = iot_ftruncate, - .utimens = iot_utimens, - .checksum = iot_checksum, - .unlink = iot_unlink, +static int +iot_disconnect_cbk(xlator_t *this, client_t *client) +{ + int i; + call_stub_t *curr; + call_stub_t *next; + iot_conf_t *conf = this->private; + iot_client_ctx_t *ctx; + + if (!conf || !conf->cleanup_disconnected_reqs) { + goto out; + } + + pthread_mutex_lock(&conf->mutex); + for (i = 0; i < GF_FOP_PRI_MAX; i++) { + ctx = &conf->no_client[i]; + list_for_each_entry_safe(curr, next, &ctx->reqs, list) + { + if (curr->frame->root->client != client) { + continue; + } + gf_log(this->name, GF_LOG_INFO, + "poisoning %s fop at %p for client %s", + gf_fop_list[curr->fop], curr, client->client_uid); + curr->poison = _gf_true; + } + } + pthread_mutex_unlock(&conf->mutex); + +out: + return 0; +} + +struct xlator_dumpops dumpops = { + .priv = iot_priv_dump, }; -struct xlator_mops mops = { +struct xlator_fops fops = { + .open = iot_open, + .create = iot_create, + .readv = iot_readv, + .writev = iot_writev, + .flush = iot_flush, + .fsync = iot_fsync, + .lk = iot_lk, + .stat = iot_stat, + .fstat = iot_fstat, + .truncate = iot_truncate, + .ftruncate = iot_ftruncate, + .unlink = iot_unlink, + .lookup = iot_lookup, + .setattr = iot_setattr, + .fsetattr = iot_fsetattr, + .access = iot_access, + .readlink = iot_readlink, + .mknod = iot_mknod, + .mkdir = iot_mkdir, + .rmdir = iot_rmdir, + .symlink = iot_symlink, + .rename = iot_rename, + .link = iot_link, + .opendir = iot_opendir, + .fsyncdir = iot_fsyncdir, + .statfs = iot_statfs, + .setxattr = iot_setxattr, + .getxattr = iot_getxattr, + .fgetxattr = iot_fgetxattr, + .fsetxattr = iot_fsetxattr, + .removexattr = iot_removexattr, + .fremovexattr = iot_fremovexattr, + .readdir = iot_readdir, + .readdirp = iot_readdirp, + .inodelk = iot_inodelk, + .finodelk = iot_finodelk, + .entrylk = iot_entrylk, + .fentrylk = iot_fentrylk, + .xattrop = iot_xattrop, + .fxattrop = iot_fxattrop, + .rchecksum = iot_rchecksum, + .fallocate = iot_fallocate, + .discard = iot_discard, + .zerofill = iot_zerofill, + .seek = iot_seek, + .lease = iot_lease, + .getactivelk = iot_getactivelk, + .setactivelk = iot_setactivelk, + .put = iot_put, }; struct xlator_cbks cbks = { + .client_destroy = iot_client_destroy, + .client_disconnect = iot_disconnect_cbk, }; struct volume_options options[] = { - { .key = {"thread-count"}, - .type = GF_OPTION_TYPE_INT, - .min = 1, - .max = 32 - }, - { .key = {NULL} }, + {.key = {"thread-count"}, + .type = GF_OPTION_TYPE_INT, + .min = IOT_MIN_THREADS, + .max = IOT_MAX_THREADS, + .default_value = "16", + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + /*.option = "thread-count"*/ + .description = "Number of threads in IO threads translator which " + "perform concurrent IO operations" + + }, + {.key = {"high-prio-threads"}, + .type = GF_OPTION_TYPE_INT, + .min = IOT_MIN_THREADS, + .max = IOT_MAX_THREADS, + .default_value = "16", + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Max number of threads in IO threads translator which " + "perform high priority IO operations at a given time" + + }, + {.key = {"normal-prio-threads"}, + .type = GF_OPTION_TYPE_INT, + .min = IOT_MIN_THREADS, + .max = IOT_MAX_THREADS, + .default_value = "16", + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Max number of threads in IO threads translator which " + "perform normal priority IO operations at a given time" + + }, + {.key = {"low-prio-threads"}, + .type = GF_OPTION_TYPE_INT, + .min = IOT_MIN_THREADS, + .max = IOT_MAX_THREADS, + .default_value = "16", + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Max number of threads in IO threads translator which " + "perform low priority IO operations at a given time" + + }, + {.key = {"least-prio-threads"}, + .type = GF_OPTION_TYPE_INT, + .min = IOT_MIN_THREADS, + .max = IOT_MAX_THREADS, + .default_value = "1", + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Max number of threads in IO threads translator which " + "perform least priority IO operations at a given time"}, + {.key = {"enable-least-priority"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = SITE_H_ENABLE_LEAST_PRIORITY, + .op_version = {1}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Enable/Disable least priority"}, + { + .key = {"idle-time"}, + .type = GF_OPTION_TYPE_INT, + .min = 1, + .max = 0x7fffffff, + .default_value = "120", + }, + {.key = {"watchdog-secs"}, + .type = GF_OPTION_TYPE_INT, + .min = 0, + .default_value = 0, + .op_version = {GD_OP_VERSION_4_1_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC, + .tags = {"io-threads"}, + .description = "Number of seconds a queue must be stalled before " + "starting an 'emergency' thread."}, + {.key = {"cleanup-disconnected-reqs"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .op_version = {GD_OP_VERSION_4_1_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, + .tags = {"io-threads"}, + .description = "'Poison' queued requests when a client disconnects"}, + {.key = {"pass-through"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "false", + .op_version = {GD_OP_VERSION_4_1_0}, + .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT, + .tags = {"io-threads"}, + .description = "Enable/Disable io threads translator"}, + { + .key = {NULL}, + }, +}; + +xlator_api_t xlator_api = { + .init = init, + .fini = fini, + .notify = notify, + .reconfigure = reconfigure, + .mem_acct_init = mem_acct_init, + .op_version = {1}, /* Present from the initial version */ + .dumpops = &dumpops, + .fops = &fops, + .cbks = &cbks, + .options = options, + .identifier = "io-threads", + .category = GF_MAINTAINED, }; diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index 7975529007a..f54d2f4912d 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -1,70 +1,86 @@ /* - Copyright (c) 2006-2009 Z RESEARCH, Inc. <http://www.zresearch.com> - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ #ifndef __IOT_H #define __IOT_H -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif +#include <glusterfs/compat-errno.h> +#include <glusterfs/glusterfs.h> +#include <glusterfs/logging.h> +#include <glusterfs/dict.h> +#include <glusterfs/xlator.h> +#include <glusterfs/common-utils.h> +#include <glusterfs/list.h> +#include <stdlib.h> +#include <glusterfs/locking.h> +#include "iot-mem-types.h" +#include <semaphore.h> +#include <glusterfs/statedump.h> +struct iot_conf; -#include "compat-errno.h" -#include "glusterfs.h" -#include "logging.h" -#include "dict.h" -#include "xlator.h" -#include "common-utils.h" -#include "list.h" +#define MAX_IDLE_SKEW 4 /* In secs */ +#define skew_sec_idle_time(sec) ((sec) + (random() % MAX_IDLE_SKEW)) +#define IOT_DEFAULT_IDLE 120 /* In secs. */ -#define min(a,b) ((a)<(b)?(a):(b)) -#define max(a,b) ((a)>(b)?(a):(b)) +#define IOT_MIN_THREADS 1 +#define IOT_DEFAULT_THREADS 16 +#define IOT_MAX_THREADS 64 -struct iot_conf; -struct iot_worker; -struct iot_request; - -struct iot_request { - struct list_head list; /* Attaches this request to the list of - requests. - */ - call_stub_t *stub; -}; +#define IOT_THREAD_STACK_SIZE ((size_t)(256 * 1024)) -struct iot_worker { - struct list_head rqlist; /* List of requests assigned to me. */ - struct iot_conf *conf; - int64_t q,dq; - pthread_cond_t dq_cond; - pthread_mutex_t qlock; - int32_t queue_size; - pthread_t thread; -}; +typedef struct { + struct list_head clients; + struct list_head reqs; +} iot_client_ctx_t; struct iot_conf { - int32_t thread_count; - struct iot_worker ** workers; + pthread_mutex_t mutex; + pthread_cond_t cond; + + int32_t max_count; /* configured maximum */ + int32_t curr_count; /* actual number of threads running */ + int32_t sleep_count; + + int32_t idle_time; /* in seconds */ + + struct list_head clients[GF_FOP_PRI_MAX]; + /* + * It turns out that there are several ways a frame can get to us + * without having an associated client (server_first_lookup was the + * first one I hit). Instead of trying to update all such callers, + * we use this to queue them. + */ + iot_client_ctx_t no_client[GF_FOP_PRI_MAX]; + + int32_t ac_iot_limit[GF_FOP_PRI_MAX]; + int32_t ac_iot_count[GF_FOP_PRI_MAX]; + int queue_sizes[GF_FOP_PRI_MAX]; + int32_t queue_size; + gf_atomic_t stub_cnt; + pthread_attr_t w_attr; + gf_boolean_t least_priority; /*Enable/Disable least-priority */ + + xlator_t *this; + size_t stack_size; + gf_boolean_t down; /*PARENT_DOWN event is notified*/ + gf_boolean_t mutex_inited; + gf_boolean_t cond_inited; + + int32_t watchdog_secs; + gf_boolean_t watchdog_running; + pthread_t watchdog_thread; + gf_boolean_t queue_marked[GF_FOP_PRI_MAX]; + gf_boolean_t cleanup_disconnected_reqs; }; typedef struct iot_conf iot_conf_t; -typedef struct iot_worker iot_worker_t; -typedef struct iot_request iot_request_t; #endif /* __IOT_H */ diff --git a/xlators/performance/io-threads/src/iot-mem-types.h b/xlators/performance/io-threads/src/iot-mem-types.h new file mode 100644 index 00000000000..29565f34dd4 --- /dev/null +++ b/xlators/performance/io-threads/src/iot-mem-types.h @@ -0,0 +1,21 @@ +/* + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> + This file is part of GlusterFS. + + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. +*/ + +#ifndef __IOT_MEM_TYPES_H__ +#define __IOT_MEM_TYPES_H__ + +#include <glusterfs/mem-types.h> + +enum gf_iot_mem_types_ { + gf_iot_mt_iot_conf_t = gf_common_mt_end + 1, + gf_iot_mt_client_ctx_t, + gf_iot_mt_end +}; +#endif |
