summaryrefslogtreecommitdiffstats
path: root/xlators/performance/io-threads/src/io-threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
-rw-r--r--xlators/performance/io-threads/src/io-threads.c3031
1 files changed, 1193 insertions, 1838 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index e31bd4bba20..3d24cc97f4b 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -1,2235 +1,1590 @@
/*
- Copyright (c) 2006-2009 Gluster, Inc. <http://www.gluster.com>
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
- GlusterFS is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3 of the License,
- or (at your option) any later version.
-
- GlusterFS is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see
- <http://www.gnu.org/licenses/>.
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
*/
-#ifndef _CONFIG_H
-#define _CONFIG_H
-#include "config.h"
-#endif
-
-#include "call-stub.h"
-#include "glusterfs.h"
-#include "logging.h"
-#include "dict.h"
-#include "xlator.h"
+#include <glusterfs/call-stub.h>
+#include <glusterfs/defaults.h>
+#include <glusterfs/glusterfs.h>
+#include <glusterfs/logging.h>
+#include <glusterfs/dict.h>
+#include <glusterfs/xlator.h>
#include "io-threads.h"
+#include <signal.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
-#include "locking.h"
-
-void *iot_worker (void *arg);
-int iot_workers_scale (iot_conf_t *conf);
-int __iot_workers_scale (iot_conf_t *conf);
-
-
-call_stub_t *
-__iot_dequeue (iot_conf_t *conf)
-{
- call_stub_t *stub = NULL;
-
- if (list_empty (&conf->req))
- return NULL;
-
- stub = list_entry (conf->req.next, call_stub_t, list);
- list_del_init (&stub->list);
- conf->queue_size--;
+#include <glusterfs/locking.h>
+#include "io-threads-messages.h"
+#include <glusterfs/timespec.h>
- return stub;
+void *
+iot_worker(void *arg);
+int
+iot_workers_scale(iot_conf_t *conf);
+int
+__iot_workers_scale(iot_conf_t *conf);
+struct volume_options options[];
+
+#define IOT_FOP(name, frame, this, args...) \
+ do { \
+ call_stub_t *__stub = NULL; \
+ int __ret = -1; \
+ \
+ __stub = fop_##name##_stub(frame, default_##name##_resume, args); \
+ if (!__stub) { \
+ __ret = -ENOMEM; \
+ goto out; \
+ } \
+ \
+ __ret = iot_schedule(frame, this, __stub); \
+ \
+ out: \
+ if (__ret < 0) { \
+ default_##name##_failure_cbk(frame, -__ret); \
+ if (__stub != NULL) { \
+ call_stub_destroy(__stub); \
+ } \
+ } \
+ } while (0)
+
+iot_client_ctx_t *
+iot_get_ctx(xlator_t *this, client_t *client)
+{
+ iot_client_ctx_t *ctx = NULL;
+ iot_client_ctx_t *setted_ctx = NULL;
+ int i;
+
+ if (client_ctx_get(client, this, (void **)&ctx) != 0) {
+ ctx = GF_MALLOC(GF_FOP_PRI_MAX * sizeof(*ctx), gf_iot_mt_client_ctx_t);
+ if (ctx) {
+ for (i = 0; i < GF_FOP_PRI_MAX; ++i) {
+ INIT_LIST_HEAD(&ctx[i].clients);
+ INIT_LIST_HEAD(&ctx[i].reqs);
+ }
+ setted_ctx = client_ctx_set(client, this, ctx);
+ if (ctx != setted_ctx) {
+ GF_FREE(ctx);
+ ctx = setted_ctx;
+ }
+ }
+ }
+
+ return ctx;
}
-
-void
-__iot_enqueue (iot_conf_t *conf, call_stub_t *stub)
+call_stub_t *
+__iot_dequeue(iot_conf_t *conf, int *pri)
{
- list_add_tail (&stub->list, &conf->req);
- conf->queue_size++;
+ call_stub_t *stub = NULL;
+ int i = 0;
+ iot_client_ctx_t *ctx;
- return;
-}
+ *pri = -1;
+ for (i = 0; i < GF_FOP_PRI_MAX; i++) {
+ if (conf->ac_iot_count[i] >= conf->ac_iot_limit[i]) {
+ continue;
+ }
+ if (list_empty(&conf->clients[i])) {
+ continue;
+ }
-void *
-iot_worker (void *data)
-{
- iot_conf_t *conf = NULL;
- xlator_t *this = NULL;
- call_stub_t *stub = NULL;
- struct timespec sleep_till = {0, };
- int ret = 0;
- char timeout = 0;
- char bye = 0;
+ /* Get the first per-client queue for this priority. */
+ ctx = list_first_entry(&conf->clients[i], iot_client_ctx_t, clients);
+ if (!ctx) {
+ continue;
+ }
- conf = data;
- this = conf->this;
- THIS = this;
+ if (list_empty(&ctx->reqs)) {
+ continue;
+ }
- for (;;) {
- sleep_till.tv_sec = time (NULL) + conf->idle_time;
+ /* Get the first request on that queue. */
+ stub = list_first_entry(&ctx->reqs, call_stub_t, list);
+ list_del_init(&stub->list);
+ if (list_empty(&ctx->reqs)) {
+ list_del_init(&ctx->clients);
+ } else {
+ list_rotate_left(&conf->clients[i]);
+ }
- pthread_mutex_lock (&conf->mutex);
- {
- while (list_empty (&conf->req)) {
- conf->sleep_count++;
-
- ret = pthread_cond_timedwait (&conf->cond,
- &conf->mutex,
- &sleep_till);
- conf->sleep_count--;
-
- if (ret == ETIMEDOUT) {
- timeout = 1;
- break;
- }
- }
-
- if (timeout) {
- if (conf->curr_count > IOT_MIN_THREADS) {
- conf->curr_count--;
- bye = 1;
- gf_log (conf->this->name, GF_LOG_DEBUG,
- "timeout, terminated. conf->curr_count=%d",
- conf->curr_count);
- } else {
- timeout = 0;
- }
- }
-
- stub = __iot_dequeue (conf);
- }
- pthread_mutex_unlock (&conf->mutex);
+ conf->ac_iot_count[i]++;
+ conf->queue_marked[i] = _gf_false;
+ *pri = i;
+ break;
+ }
- if (stub) /* guard against spurious wakeups */
- call_resume (stub);
+ if (!stub)
+ return NULL;
- if (bye)
- break;
- }
+ conf->queue_size--;
+ conf->queue_sizes[*pri]--;
- return NULL;
+ return stub;
}
-
-int
-iot_schedule (iot_conf_t *conf, call_stub_t *stub)
+void
+__iot_enqueue(iot_conf_t *conf, call_stub_t *stub, int pri)
{
- int ret = 0;
+ client_t *client = stub->frame->root->client;
+ iot_client_ctx_t *ctx;
- pthread_mutex_lock (&conf->mutex);
- {
- __iot_enqueue (conf, stub);
+ if (pri < 0 || pri >= GF_FOP_PRI_MAX)
+ pri = GF_FOP_PRI_MAX - 1;
- pthread_cond_signal (&conf->cond);
-
- ret = __iot_workers_scale (conf);
+ if (client) {
+ ctx = iot_get_ctx(THIS, client);
+ if (ctx) {
+ ctx = &ctx[pri];
}
- pthread_mutex_unlock (&conf->mutex);
-
- return ret;
-}
+ } else {
+ ctx = NULL;
+ }
+ if (!ctx) {
+ ctx = &conf->no_client[pri];
+ }
+ if (list_empty(&ctx->reqs)) {
+ list_add_tail(&ctx->clients, &conf->clients[pri]);
+ }
+ list_add_tail(&stub->list, &ctx->reqs);
-int
-iot_schedule_unordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub)
-{
- return iot_schedule (conf, stub);
+ conf->queue_size++;
+ GF_ATOMIC_INC(conf->stub_cnt);
+ conf->queue_sizes[pri]++;
}
+void *
+iot_worker(void *data)
+{
+ iot_conf_t *conf = NULL;
+ xlator_t *this = NULL;
+ call_stub_t *stub = NULL;
+ struct timespec sleep_till = {
+ 0,
+ };
+ int ret = 0;
+ int pri = -1;
+ gf_boolean_t bye = _gf_false;
+
+ conf = data;
+ this = conf->this;
+ THIS = this;
+
+ for (;;) {
+ pthread_mutex_lock(&conf->mutex);
+ {
+ if (pri != -1) {
+ conf->ac_iot_count[pri]--;
+ pri = -1;
+ }
+ while (conf->queue_size == 0) {
+ if (conf->down) {
+ bye = _gf_true; /*Avoid sleep*/
+ break;
+ }
-int
-iot_schedule_ordered (iot_conf_t *conf, inode_t *inode, call_stub_t *stub)
-{
-
- return iot_schedule (conf, stub);
-}
+ clock_gettime(CLOCK_REALTIME_COARSE, &sleep_till);
+ sleep_till.tv_sec += conf->idle_time;
+ conf->sleep_count++;
+ ret = pthread_cond_timedwait(&conf->cond, &conf->mutex,
+ &sleep_till);
+ conf->sleep_count--;
-int
-iot_lookup_cbk (call_frame_t *frame, void * cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno,
- inode_t *inode, struct iatt *buf, dict_t *xattr,
- struct iatt *postparent)
-{
- STACK_UNWIND_STRICT (lookup, frame, op_ret, op_errno, inode, buf, xattr,
- postparent);
- return 0;
+ if (conf->down || ret == ETIMEDOUT) {
+ bye = _gf_true;
+ break;
+ }
+ }
+
+ if (bye) {
+ if (conf->down || conf->curr_count > IOT_MIN_THREADS) {
+ conf->curr_count--;
+ if (conf->curr_count == 0)
+ pthread_cond_broadcast(&conf->cond);
+ gf_msg_debug(conf->this->name, 0,
+ "terminated. "
+ "conf->curr_count=%d",
+ conf->curr_count);
+ } else {
+ bye = _gf_false;
+ }
+ }
+
+ if (!bye)
+ stub = __iot_dequeue(conf, &pri);
+ }
+ pthread_mutex_unlock(&conf->mutex);
+
+ if (stub) { /* guard against spurious wakeups */
+ if (stub->poison) {
+ gf_log(this->name, GF_LOG_INFO, "Dropping poisoned request %p.",
+ stub);
+ call_stub_destroy(stub);
+ } else {
+ call_resume(stub);
+ }
+ GF_ATOMIC_DEC(conf->stub_cnt);
+ }
+ stub = NULL;
+
+ if (bye)
+ break;
+ }
+
+ return NULL;
+}
+
+int
+do_iot_schedule(iot_conf_t *conf, call_stub_t *stub, int pri)
+{
+ int ret = 0;
+
+ pthread_mutex_lock(&conf->mutex);
+ {
+ __iot_enqueue(conf, stub, pri);
+
+ pthread_cond_signal(&conf->cond);
+
+ ret = __iot_workers_scale(conf);
+ }
+ pthread_mutex_unlock(&conf->mutex);
+
+ return ret;
+}
+
+char *
+iot_get_pri_meaning(gf_fop_pri_t pri)
+{
+ char *name = NULL;
+ switch (pri) {
+ case GF_FOP_PRI_HI:
+ name = "fast";
+ break;
+ case GF_FOP_PRI_NORMAL:
+ name = "normal";
+ break;
+ case GF_FOP_PRI_LO:
+ name = "slow";
+ break;
+ case GF_FOP_PRI_LEAST:
+ name = "least";
+ break;
+ case GF_FOP_PRI_MAX:
+ name = "invalid";
+ break;
+ case GF_FOP_PRI_UNSPEC:
+ name = "unspecified";
+ break;
+ }
+ return name;
+}
+
+int
+iot_schedule(call_frame_t *frame, xlator_t *this, call_stub_t *stub)
+{
+ int ret = -1;
+ gf_fop_pri_t pri = GF_FOP_PRI_MAX - 1;
+ iot_conf_t *conf = this->private;
+
+ if ((frame->root->pid < GF_CLIENT_PID_MAX) &&
+ (frame->root->pid != GF_CLIENT_PID_NO_ROOT_SQUASH) &&
+ conf->least_priority) {
+ pri = GF_FOP_PRI_LEAST;
+ goto out;
+ }
+
+ switch (stub->fop) {
+ case GF_FOP_OPEN:
+ case GF_FOP_STAT:
+ case GF_FOP_FSTAT:
+ case GF_FOP_LOOKUP:
+ case GF_FOP_ACCESS:
+ case GF_FOP_READLINK:
+ case GF_FOP_OPENDIR:
+ case GF_FOP_STATFS:
+ case GF_FOP_READDIR:
+ case GF_FOP_READDIRP:
+ case GF_FOP_GETACTIVELK:
+ case GF_FOP_SETACTIVELK:
+ case GF_FOP_ICREATE:
+ case GF_FOP_NAMELINK:
+ pri = GF_FOP_PRI_HI;
+ break;
+
+ case GF_FOP_CREATE:
+ case GF_FOP_FLUSH:
+ case GF_FOP_LK:
+ case GF_FOP_INODELK:
+ case GF_FOP_FINODELK:
+ case GF_FOP_ENTRYLK:
+ case GF_FOP_FENTRYLK:
+ case GF_FOP_LEASE:
+ case GF_FOP_UNLINK:
+ case GF_FOP_SETATTR:
+ case GF_FOP_FSETATTR:
+ case GF_FOP_MKNOD:
+ case GF_FOP_MKDIR:
+ case GF_FOP_RMDIR:
+ case GF_FOP_SYMLINK:
+ case GF_FOP_RENAME:
+ case GF_FOP_LINK:
+ case GF_FOP_SETXATTR:
+ case GF_FOP_GETXATTR:
+ case GF_FOP_FGETXATTR:
+ case GF_FOP_FSETXATTR:
+ case GF_FOP_REMOVEXATTR:
+ case GF_FOP_FREMOVEXATTR:
+ case GF_FOP_PUT:
+ pri = GF_FOP_PRI_NORMAL;
+ break;
+
+ case GF_FOP_READ:
+ case GF_FOP_WRITE:
+ case GF_FOP_FSYNC:
+ case GF_FOP_TRUNCATE:
+ case GF_FOP_FTRUNCATE:
+ case GF_FOP_FSYNCDIR:
+ case GF_FOP_XATTROP:
+ case GF_FOP_FXATTROP:
+ case GF_FOP_RCHECKSUM:
+ case GF_FOP_FALLOCATE:
+ case GF_FOP_DISCARD:
+ case GF_FOP_ZEROFILL:
+ case GF_FOP_SEEK:
+ pri = GF_FOP_PRI_LO;
+ break;
+
+ case GF_FOP_FORGET:
+ case GF_FOP_RELEASE:
+ case GF_FOP_RELEASEDIR:
+ case GF_FOP_GETSPEC:
+ break;
+ case GF_FOP_IPC:
+ default:
+ return -EINVAL;
+ }
+out:
+ gf_msg_debug(this->name, 0, "%s scheduled as %s priority fop",
+ gf_fop_list[stub->fop], iot_get_pri_meaning(pri));
+ if (this->private)
+ ret = do_iot_schedule(this->private, stub, pri);
+ return ret;
}
-
int
-iot_lookup_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,
- dict_t *xattr_req)
+iot_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
- STACK_WIND (frame, iot_lookup_cbk,
- FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->lookup,
- loc, xattr_req);
- return 0;
+ IOT_FOP(lookup, frame, this, loc, xdata);
+ return 0;
}
-
int
-iot_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xattr_req)
+iot_setattr(call_frame_t *frame, xlator_t *this, loc_t *loc, struct iatt *stbuf,
+ int32_t valid, dict_t *xdata)
{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_lookup_stub (frame, iot_lookup_wrapper, loc, xattr_req);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create lookup stub (out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode,
- stub);
-
-out:
- if (ret < 0) {
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- STACK_UNWIND_STRICT (lookup, frame, -1, -ret, NULL, NULL, NULL,
- NULL);
- }
-
- return 0;
+ IOT_FOP(setattr, frame, this, loc, stbuf, valid, xdata);
+ return 0;
}
-
int
-iot_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno,
- struct iatt *preop, struct iatt *postop)
+iot_fsetattr(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iatt *stbuf,
+ int32_t valid, dict_t *xdata)
{
- STACK_UNWIND_STRICT (setattr, frame, op_ret, op_errno, preop, postop);
- return 0;
+ IOT_FOP(fsetattr, frame, this, fd, stbuf, valid, xdata);
+ return 0;
}
-
int
-iot_setattr_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,
- struct iatt *stbuf, int32_t valid)
+iot_access(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t mask,
+ dict_t *xdata)
{
- STACK_WIND (frame, iot_setattr_cbk,
- FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->setattr,
- loc, stbuf, valid);
- return 0;
-}
-
-
-int
-iot_setattr (call_frame_t *frame, xlator_t *this, loc_t *loc,
- struct iatt *stbuf, int32_t valid)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_setattr_stub (frame, iot_setattr_wrapper, loc, stbuf, valid);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "Cannot create setattr stub"
- "(Out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_unordered ((iot_conf_t *)this->private,
- loc->inode, stub);
-
-out:
- if (ret < 0) {
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
-
- STACK_UNWIND_STRICT (setattr, frame, -1, -ret, NULL, NULL);
- }
-
- return 0;
+ IOT_FOP(access, frame, this, loc, mask, xdata);
+ return 0;
}
-
int
-iot_fsetattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno,
- struct iatt *preop, struct iatt *postop)
+iot_readlink(call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size,
+ dict_t *xdata)
{
- STACK_UNWIND_STRICT (fsetattr, frame, op_ret, op_errno, preop, postop);
- return 0;
+ IOT_FOP(readlink, frame, this, loc, size, xdata);
+ return 0;
}
-
int
-iot_fsetattr_wrapper (call_frame_t *frame, xlator_t *this,
- fd_t *fd, struct iatt *stbuf, int32_t valid)
+iot_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ dev_t rdev, mode_t umask, dict_t *xdata)
{
- STACK_WIND (frame, iot_fsetattr_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->fsetattr, fd, stbuf, valid);
- return 0;
-}
-
-
-int
-iot_fsetattr (call_frame_t *frame, xlator_t *this, fd_t *fd,
- struct iatt *stbuf, int32_t valid)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_fsetattr_stub (frame, iot_fsetattr_wrapper, fd, stbuf,
- valid);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create fsetattr stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (fsetattr, frame, -1, -ret, NULL, NULL);
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ IOT_FOP(mknod, frame, this, loc, mode, rdev, umask, xdata);
+ return 0;
}
-
int
-iot_access_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno)
+iot_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ mode_t umask, dict_t *xdata)
{
- STACK_UNWIND_STRICT (access, frame, op_ret, op_errno);
- return 0;
+ IOT_FOP(mkdir, frame, this, loc, mode, umask, xdata);
+ return 0;
}
-
int
-iot_access_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,
- int32_t mask)
+iot_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
+ dict_t *xdata)
{
- STACK_WIND (frame, iot_access_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->access, loc, mask);
- return 0;
+ IOT_FOP(rmdir, frame, this, loc, flags, xdata);
+ return 0;
}
-
int
-iot_access (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t mask)
+iot_symlink(call_frame_t *frame, xlator_t *this, const char *linkname,
+ loc_t *loc, mode_t umask, dict_t *xdata)
{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_access_stub (frame, iot_access_wrapper, loc, mask);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create access stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (access, frame, -1, -ret);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ IOT_FOP(symlink, frame, this, linkname, loc, umask, xdata);
+ return 0;
}
-
int
-iot_readlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, const char *path,
- struct iatt *stbuf)
+iot_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
+ dict_t *xdata)
{
- STACK_UNWIND_STRICT (readlink, frame, op_ret, op_errno, path, stbuf);
- return 0;
+ IOT_FOP(rename, frame, this, oldloc, newloc, xdata);
+ return 0;
}
-
int
-iot_readlink_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,
- size_t size)
+iot_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
+ fd_t *fd, dict_t *xdata)
{
- STACK_WIND (frame, iot_readlink_cbk,
- FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->readlink,
- loc, size);
- return 0;
+ IOT_FOP(open, frame, this, loc, flags, fd, xdata);
+ return 0;
}
-
int
-iot_readlink (call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size)
+iot_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
+ mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)
{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_readlink_stub (frame, iot_readlink_wrapper, loc, size);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create readlink stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode,
- stub);
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (readlink, frame, -1, -ret, NULL, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
-
- return 0;
+ IOT_FOP(create, frame, this, loc, flags, mode, umask, fd, xdata);
+ return 0;
}
-
int
-iot_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, inode_t *inode,
- struct iatt *buf, struct iatt *preparent,
- struct iatt *postparent)
+iot_put(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ mode_t umask, uint32_t flags, struct iovec *vector, int32_t count,
+ off_t offset, struct iobref *iobref, dict_t *xattr, dict_t *xdata)
{
- STACK_UNWIND_STRICT (mknod, frame, op_ret, op_errno, inode, buf,
- preparent, postparent);
- return 0;
+ IOT_FOP(put, frame, this, loc, mode, umask, flags, vector, count, offset,
+ iobref, xattr, xdata);
+ return 0;
}
-
int
-iot_mknod_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
- dev_t rdev, dict_t *params)
+iot_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
+ off_t offset, uint32_t flags, dict_t *xdata)
{
- STACK_WIND (frame, iot_mknod_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->mknod, loc, mode, rdev, params);
- return 0;
-}
-
-
-int
-iot_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
- dev_t rdev, dict_t *params)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_mknod_stub (frame, iot_mknod_wrapper, loc, mode, rdev,
- params);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create mknod stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode,
- stub);
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (mknod, frame, -1, -ret, NULL, NULL, NULL,
- NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ IOT_FOP(readv, frame, this, fd, size, offset, flags, xdata);
+ return 0;
}
-
int
-iot_mkdir_cbk (call_frame_t *frame, void * cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, inode_t *inode,
- struct iatt *buf, struct iatt *preparent,
- struct iatt *postparent)
+iot_flush(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)
{
- STACK_UNWIND_STRICT (mkdir, frame, op_ret, op_errno, inode, buf,
- preparent, postparent);
- return 0;
+ IOT_FOP(flush, frame, this, fd, xdata);
+ return 0;
}
-
int
-iot_mkdir_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
- dict_t *params)
+iot_fsync(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync,
+ dict_t *xdata)
{
- STACK_WIND (frame, iot_mkdir_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->mkdir, loc, mode, params);
- return 0;
+ IOT_FOP(fsync, frame, this, fd, datasync, xdata);
+ return 0;
}
-
int
-iot_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
- dict_t *params)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_mkdir_stub (frame, iot_mkdir_wrapper, loc, mode, params);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create mkdir stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode,
- stub);
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (mkdir, frame, -1, -ret, NULL, NULL, NULL,
- NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
-}
-
-
-int
-iot_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *preparent,
- struct iatt *postparent)
+iot_writev(call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector,
+ int32_t count, off_t offset, uint32_t flags, struct iobref *iobref,
+ dict_t *xdata)
{
- STACK_UNWIND_STRICT (rmdir, frame, op_ret, op_errno, preparent,
- postparent);
- return 0;
+ IOT_FOP(writev, frame, this, fd, vector, count, offset, flags, iobref,
+ xdata);
+ return 0;
}
-
int
-iot_rmdir_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc)
+iot_lk(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t cmd,
+ struct gf_flock *flock, dict_t *xdata)
{
- STACK_WIND (frame, iot_rmdir_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->rmdir, loc);
- return 0;
+ IOT_FOP(lk, frame, this, fd, cmd, flock, xdata);
+ return 0;
}
-
int
-iot_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc)
+iot_stat(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_rmdir_stub (frame, iot_rmdir_wrapper, loc);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create rmdir stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (rmdir, frame, -1, -ret, NULL, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ IOT_FOP(stat, frame, this, loc, xdata);
+ return 0;
}
-
int
-iot_symlink_cbk (call_frame_t *frame, void * cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, inode_t *inode,
- struct iatt *buf, struct iatt *preparent,
- struct iatt *postparent)
+iot_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)
{
- STACK_UNWIND_STRICT (symlink, frame, op_ret, op_errno, inode, buf,
- preparent, postparent);
- return 0;
+ IOT_FOP(fstat, frame, this, fd, xdata);
+ return 0;
}
-
int
-iot_symlink_wrapper (call_frame_t *frame, xlator_t *this, const char *linkname,
- loc_t *loc, dict_t *params)
+iot_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
+ dict_t *xdata)
{
- STACK_WIND (frame, iot_symlink_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->symlink, linkname, loc, params);
- return 0;
+ IOT_FOP(truncate, frame, this, loc, offset, xdata);
+ return 0;
}
-
int
-iot_symlink (call_frame_t *frame, xlator_t *this, const char *linkname,
- loc_t *loc, dict_t *params)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_symlink_stub (frame, iot_symlink_wrapper, linkname, loc,
- params);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create symlink stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode,
- stub);
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (symlink, frame, -1, -ret, NULL, NULL, NULL,
- NULL);
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
-
- return 0;
-}
-
-
-int
-iot_rename_cbk (call_frame_t *frame, void * cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *buf,
- struct iatt *preoldparent, struct iatt *postoldparent,
- struct iatt *prenewparent, struct iatt *postnewparent)
+iot_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
+ dict_t *xdata)
{
- STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf, preoldparent,
- postoldparent, prenewparent, postnewparent);
- return 0;
+ IOT_FOP(ftruncate, frame, this, fd, offset, xdata);
+ return 0;
}
-
int
-iot_rename_wrapper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
- loc_t *newloc)
+iot_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t xflag,
+ dict_t *xdata)
{
- STACK_WIND (frame, iot_rename_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->rename, oldloc, newloc);
- return 0;
+ IOT_FOP(unlink, frame, this, loc, xflag, xdata);
+ return 0;
}
-
int
-iot_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc)
+iot_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
+ dict_t *xdata)
{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_rename_stub (frame, iot_rename_wrapper, oldloc, newloc);
- if (!stub) {
- gf_log (this->name, GF_LOG_DEBUG, "cannot create rename stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_unordered ((iot_conf_t *)this->private,
- oldloc->inode, stub);
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (rename, frame, -1, -ret, NULL, NULL, NULL,
- NULL, NULL);
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
-
- return 0;
+ IOT_FOP(link, frame, this, oldloc, newloc, xdata);
+ return 0;
}
-
int
-iot_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
- int32_t op_errno, fd_t *fd)
+iot_opendir(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd,
+ dict_t *xdata)
{
- STACK_UNWIND_STRICT (open, frame, op_ret, op_errno, fd);
- return 0;
+ IOT_FOP(opendir, frame, this, loc, fd, xdata);
+ return 0;
}
-
int
-iot_open_wrapper (call_frame_t * frame, xlator_t * this, loc_t *loc,
- int32_t flags, fd_t * fd, int32_t wbflags)
+iot_fsyncdir(call_frame_t *frame, xlator_t *this, fd_t *fd, int datasync,
+ dict_t *xdata)
{
- STACK_WIND (frame, iot_open_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->open, loc, flags, fd, wbflags);
- return 0;
+ IOT_FOP(fsyncdir, frame, this, fd, datasync, xdata);
+ return 0;
}
-
int
-iot_open (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
- fd_t *fd, int32_t wbflags)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_open_stub (frame, iot_open_wrapper, loc, flags, fd, wbflags);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create open call stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode,
- stub);
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (open, frame, -1, -ret, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
-
- return 0;
+iot_statfs(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
+{
+ IOT_FOP(statfs, frame, this, loc, xdata);
+ return 0;
}
-
int
-iot_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode,
- struct iatt *stbuf, struct iatt *preparent,
- struct iatt *postparent)
+iot_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
+ int32_t flags, dict_t *xdata)
{
- STACK_UNWIND_STRICT (create, frame, op_ret, op_errno, fd, inode, stbuf,
- preparent, postparent);
- return 0;
+ IOT_FOP(setxattr, frame, this, loc, dict, flags, xdata);
+ return 0;
}
-
int
-iot_create_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,
- int32_t flags, mode_t mode, fd_t *fd, dict_t *params)
+iot_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name,
+ dict_t *xdata)
{
- STACK_WIND (frame, iot_create_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->create,
- loc, flags, mode, fd, params);
- return 0;
-}
+ iot_conf_t *conf = NULL;
+ dict_t *depths = NULL;
+ int i = 0;
+ int32_t op_ret = 0;
+ int32_t op_errno = 0;
+ conf = this->private;
-int
-iot_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
- mode_t mode, fd_t *fd, dict_t *params)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_create_stub (frame, iot_create_wrapper, loc, flags, mode,
- fd, params);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create \"create\" call stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
+ if (name && strcmp(name, IO_THREADS_QUEUE_SIZE_KEY) == 0) {
+ /*
+ * We explicitly do not want a reference count
+ * for this dict in this translator
+ */
+ depths = dict_new();
+ if (!depths) {
+ op_ret = -1;
+ op_errno = ENOMEM;
+ goto unwind_special_getxattr;
}
- ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode,
- stub);
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (create, frame, -1, -ret, NULL, NULL, NULL,
- NULL, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
+ for (i = 0; i < GF_FOP_PRI_MAX; i++) {
+ if (dict_set_int32(depths, (char *)fop_pri_to_string(i),
+ conf->queue_sizes[i]) != 0) {
+ dict_unref(depths);
+ depths = NULL;
+ goto unwind_special_getxattr;
+ }
}
+ unwind_special_getxattr:
+ STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, depths, xdata);
+ if (depths)
+ dict_unref(depths);
return 0;
-}
-
+ }
-int
-iot_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iovec *vector,
- int32_t count, struct iatt *stbuf, struct iobref *iobref)
-{
- STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno, vector, count,
- stbuf, iobref);
-
- return 0;
+ IOT_FOP(getxattr, frame, this, loc, name, xdata);
+ return 0;
}
-
int
-iot_readv_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
- off_t offset)
+iot_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name,
+ dict_t *xdata)
{
- STACK_WIND (frame, iot_readv_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->readv,
- fd, size, offset);
- return 0;
+ IOT_FOP(fgetxattr, frame, this, fd, name, xdata);
+ return 0;
}
-
int
-iot_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
- off_t offset)
+iot_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict,
+ int32_t flags, dict_t *xdata)
{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_readv_stub (frame, iot_readv_wrapper, fd, size, offset);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create readv call stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (readv, frame, -1, -ret, NULL, -1, NULL,
- NULL);
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ IOT_FOP(fsetxattr, frame, this, fd, dict, flags, xdata);
+ return 0;
}
-
int
-iot_flush_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno)
+iot_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
+ const char *name, dict_t *xdata)
{
- STACK_UNWIND_STRICT (flush, frame, op_ret, op_errno);
- return 0;
+ IOT_FOP(removexattr, frame, this, loc, name, xdata);
+ return 0;
}
-
int
-iot_flush_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd)
+iot_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ const char *name, dict_t *xdata)
{
- STACK_WIND (frame, iot_flush_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->flush,
- fd);
- return 0;
+ IOT_FOP(fremovexattr, frame, this, fd, name, xdata);
+ return 0;
}
-
int
-iot_flush (call_frame_t *frame, xlator_t *this, fd_t *fd)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_flush_stub (frame, iot_flush_wrapper, fd);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create flush_cbk call stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (flush, frame, -1, -ret);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
-}
-
-
-int
-iot_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
- struct iatt *postbuf)
+iot_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
+ off_t offset, dict_t *xdata)
{
- STACK_UNWIND_STRICT (fsync, frame, op_ret, op_errno, prebuf, postbuf);
- return 0;
+ IOT_FOP(readdirp, frame, this, fd, size, offset, xdata);
+ return 0;
}
-
int
-iot_fsync_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd,
- int32_t datasync)
+iot_readdir(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
+ off_t offset, dict_t *xdata)
{
- STACK_WIND (frame, iot_fsync_cbk,
- FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->fsync,
- fd, datasync);
- return 0;
+ IOT_FOP(readdir, frame, this, fd, size, offset, xdata);
+ return 0;
}
-
int
-iot_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync)
+iot_inodelk(call_frame_t *frame, xlator_t *this, const char *volume, loc_t *loc,
+ int32_t cmd, struct gf_flock *lock, dict_t *xdata)
{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_fsync_stub (frame, iot_fsync_wrapper, fd, datasync);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create fsync_cbk call stub"
- "(out of memory)");
- ret = -1;
- goto out;
- }
-
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (fsync, frame, -1, -ret, NULL, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ IOT_FOP(inodelk, frame, this, volume, loc, cmd, lock, xdata);
+ return 0;
}
-
int
-iot_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
- struct iatt *postbuf)
+iot_finodelk(call_frame_t *frame, xlator_t *this, const char *volume, fd_t *fd,
+ int32_t cmd, struct gf_flock *lock, dict_t *xdata)
{
- STACK_UNWIND_STRICT (writev, frame, op_ret, op_errno, prebuf, postbuf);
- return 0;
+ IOT_FOP(finodelk, frame, this, volume, fd, cmd, lock, xdata);
+ return 0;
}
-
int
-iot_writev_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd,
- struct iovec *vector, int32_t count,
- off_t offset, struct iobref *iobref)
+iot_entrylk(call_frame_t *frame, xlator_t *this, const char *volume, loc_t *loc,
+ const char *basename, entrylk_cmd cmd, entrylk_type type,
+ dict_t *xdata)
{
- STACK_WIND (frame, iot_writev_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->writev,
- fd, vector, count, offset, iobref);
- return 0;
+ IOT_FOP(entrylk, frame, this, volume, loc, basename, cmd, type, xdata);
+ return 0;
}
-
int
-iot_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
- struct iovec *vector, int32_t count, off_t offset,
- struct iobref *iobref)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_writev_stub (frame, iot_writev_wrapper,
- fd, vector, count, offset, iobref);
-
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create writev call stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (writev, frame, -1, -ret, NULL, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
-
- return 0;
-}
-
-
-int32_t
-iot_lk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct flock *flock)
+iot_fentrylk(call_frame_t *frame, xlator_t *this, const char *volume, fd_t *fd,
+ const char *basename, entrylk_cmd cmd, entrylk_type type,
+ dict_t *xdata)
{
- STACK_UNWIND_STRICT (lk, frame, op_ret, op_errno, flock);
- return 0;
+ IOT_FOP(fentrylk, frame, this, volume, fd, basename, cmd, type, xdata);
+ return 0;
}
-
int
-iot_lk_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd,
- int32_t cmd, struct flock *flock)
+iot_xattrop(call_frame_t *frame, xlator_t *this, loc_t *loc,
+ gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata)
{
- STACK_WIND (frame, iot_lk_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->lk,
- fd, cmd, flock);
- return 0;
+ IOT_FOP(xattrop, frame, this, loc, optype, xattr, xdata);
+ return 0;
}
-
int
-iot_lk (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t cmd,
- struct flock *flock)
+iot_fxattrop(call_frame_t *frame, xlator_t *this, fd_t *fd,
+ gf_xattrop_flags_t optype, dict_t *xattr, dict_t *xdata)
{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_lk_stub (frame, iot_lk_wrapper, fd, cmd, flock);
-
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create fop_lk call stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (lk, frame, -1, -ret, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ IOT_FOP(fxattrop, frame, this, fd, optype, xattr, xdata);
+ return 0;
}
-
-int
-iot_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *buf)
+int32_t
+iot_rchecksum(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
+ int32_t len, dict_t *xdata)
{
- STACK_UNWIND_STRICT (stat, frame, op_ret, op_errno, buf);
- return 0;
+ IOT_FOP(rchecksum, frame, this, fd, offset, len, xdata);
+ return 0;
}
-
int
-iot_stat_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc)
+iot_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
+ off_t offset, size_t len, dict_t *xdata)
{
- STACK_WIND (frame, iot_stat_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->stat,
- loc);
- return 0;
+ IOT_FOP(fallocate, frame, this, fd, mode, offset, len, xdata);
+ return 0;
}
-
int
-iot_stat (call_frame_t *frame, xlator_t *this, loc_t *loc)
-{
- call_stub_t *stub = NULL;
- fd_t *fd = NULL;
- int ret = -1;
-
- stub = fop_stat_stub (frame, iot_stat_wrapper, loc);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create fop_stat call stub"
- "(out of memory)");
- ret = -1;
- goto out;
- }
-
- fd = fd_lookup (loc->inode, frame->root->pid);
- /* File is not open, so we can send it through unordered pool.
- */
- if (fd == NULL)
- ret = iot_schedule_unordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- else {
- ret = iot_schedule_ordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- fd_unref (fd);
- }
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (stat, frame, -1, -ret, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
-}
-
-
-int
-iot_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *buf)
-{
- STACK_UNWIND_STRICT (fstat, frame, op_ret, op_errno, buf);
- return 0;
-}
-
-
-int
-iot_fstat_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd)
+iot_discard(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
+ size_t len, dict_t *xdata)
{
- STACK_WIND (frame, iot_fstat_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->fstat,
- fd);
- return 0;
-}
-
-
-int
-iot_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_fstat_stub (frame, iot_fstat_wrapper, fd);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create fop_fstat call stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (fstat, frame, -1, -ret, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ IOT_FOP(discard, frame, this, fd, offset, len, xdata);
+ return 0;
}
-
int
-iot_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
- struct iatt *postbuf)
+iot_zerofill(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
+ off_t len, dict_t *xdata)
{
- STACK_UNWIND_STRICT (truncate, frame, op_ret, op_errno, prebuf,
- postbuf);
- return 0;
+ IOT_FOP(zerofill, frame, this, fd, offset, len, xdata);
+ return 0;
}
-
int
-iot_truncate_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,
- off_t offset)
+iot_seek(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
+ gf_seek_what_t what, dict_t *xdata)
{
- STACK_WIND (frame, iot_truncate_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->truncate,
- loc, offset);
- return 0;
+ IOT_FOP(seek, frame, this, fd, offset, what, xdata);
+ return 0;
}
-
int
-iot_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset)
-{
- call_stub_t *stub;
- fd_t *fd = NULL;
- int ret = -1;
-
- stub = fop_truncate_stub (frame, iot_truncate_wrapper, loc, offset);
-
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create fop_stat call stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- fd = fd_lookup (loc->inode, frame->root->pid);
- if (fd == NULL)
- ret = iot_schedule_unordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- else {
- ret = iot_schedule_ordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- fd_unref (fd);
- }
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (truncate, frame, -1, -ret, NULL, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
-
- return 0;
-}
-
-
-int
-iot_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
- struct iatt *postbuf)
+iot_lease(call_frame_t *frame, xlator_t *this, loc_t *loc,
+ struct gf_lease *lease, dict_t *xdata)
{
- STACK_UNWIND_STRICT (ftruncate, frame, op_ret, op_errno, prebuf,
- postbuf);
- return 0;
+ IOT_FOP(lease, frame, this, loc, lease, xdata);
+ return 0;
}
-
int
-iot_ftruncate_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd,
- off_t offset)
+iot_getactivelk(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
{
- STACK_WIND (frame, iot_ftruncate_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->ftruncate,
- fd, offset);
- return 0;
-}
-
-
-int
-iot_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_ftruncate_stub (frame, iot_ftruncate_wrapper, fd, offset);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create fop_ftruncate call stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (ftruncate, frame, -1, -ret, NULL, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ IOT_FOP(getactivelk, frame, this, loc, xdata);
+ return 0;
}
-
-
int
-iot_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *preparent,
- struct iatt *postparent)
+iot_setactivelk(call_frame_t *frame, xlator_t *this, loc_t *loc,
+ lock_migration_info_t *locklist, dict_t *xdata)
{
- STACK_UNWIND_STRICT (unlink, frame, op_ret, op_errno, preparent,
- postparent);
- return 0;
+ IOT_FOP(setactivelk, frame, this, loc, locklist, xdata);
+ return 0;
}
-
int
-iot_unlink_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc)
+__iot_workers_scale(iot_conf_t *conf)
{
- STACK_WIND (frame, iot_unlink_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->unlink,
- loc);
- return 0;
-}
+ int scale = 0;
+ int diff = 0;
+ pthread_t thread;
+ int ret = 0;
+ int i = 0;
+ for (i = 0; i < GF_FOP_PRI_MAX; i++)
+ scale += min(conf->queue_sizes[i], conf->ac_iot_limit[i]);
-int
-iot_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
+ if (scale < IOT_MIN_THREADS)
+ scale = IOT_MIN_THREADS;
- stub = fop_unlink_stub (frame, iot_unlink_wrapper, loc);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot create fop_unlink call stub"
- "(out of memory)");
- ret = -1;
- goto out;
- }
+ if (scale > conf->max_count)
+ scale = conf->max_count;
- ret = iot_schedule_unordered((iot_conf_t *)this->private, loc->inode,
- stub);
+ if (conf->curr_count < scale) {
+ diff = scale - conf->curr_count;
+ }
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (unlink, frame, -1, -ret, NULL, NULL);
+ while (diff) {
+ diff--;
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
+ ret = gf_thread_create(&thread, &conf->w_attr, iot_worker, conf,
+ "iotwr%03hx", conf->curr_count & 0x3ff);
+ if (ret == 0) {
+ pthread_detach(thread);
+ conf->curr_count++;
+ gf_msg_debug(conf->this->name, 0,
+ "scaled threads to %d (queue_size=%d/%d)",
+ conf->curr_count, conf->queue_size, scale);
+ } else {
+ break;
}
+ }
- return 0;
+ return diff;
}
-
int
-iot_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, inode_t *inode,
- struct iatt *buf, struct iatt *preparent, struct iatt *postparent)
+iot_workers_scale(iot_conf_t *conf)
{
- STACK_UNWIND_STRICT (link, frame, op_ret, op_errno, inode, buf,
- preparent, postparent);
- return 0;
-}
+ int ret = -1;
+ if (conf == NULL) {
+ ret = -EINVAL;
+ goto out;
+ }
-int
-iot_link_wrapper (call_frame_t *frame, xlator_t *this, loc_t *old, loc_t *new)
-{
- STACK_WIND (frame, iot_link_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->link, old, new);
+ pthread_mutex_lock(&conf->mutex);
+ {
+ ret = __iot_workers_scale(conf);
+ }
+ pthread_mutex_unlock(&conf->mutex);
- return 0;
+out:
+ return ret;
}
-
int
-iot_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc)
+set_stack_size(iot_conf_t *conf)
{
- call_stub_t *stub = NULL;
- int ret = -1;
+ int err = 0;
+ size_t stacksize = IOT_THREAD_STACK_SIZE;
+ xlator_t *this = NULL;
- stub = fop_link_stub (frame, iot_link_wrapper, oldloc, newloc);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create link stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
+ this = THIS;
- ret = iot_schedule_unordered ((iot_conf_t *)this->private,
- oldloc->inode, stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (link, frame, -1, -ret, NULL, NULL, NULL,
- NULL);
+ err = pthread_attr_init(&conf->w_attr);
+ if (err != 0) {
+ gf_smsg(this->name, GF_LOG_ERROR, err, IO_THREADS_MSG_INIT_FAILED,
+ NULL);
+ return err;
+ }
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
+ err = pthread_attr_setstacksize(&conf->w_attr, stacksize);
+ if (err == EINVAL) {
+ err = pthread_attr_getstacksize(&conf->w_attr, &stacksize);
+ if (!err) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET,
+ "size=%zd", stacksize, NULL);
+ } else {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET,
+ NULL);
+ err = 0;
}
- return 0;
-}
-
+ }
-int
-iot_opendir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, fd_t *fd)
-{
- STACK_UNWIND_STRICT (opendir, frame, op_ret, op_errno, fd);
- return 0;
+ conf->stack_size = stacksize;
+ return err;
}
-
-int
-iot_opendir_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd)
+int32_t
+mem_acct_init(xlator_t *this)
{
- STACK_WIND (frame, iot_opendir_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->opendir, loc, fd);
- return 0;
-}
+ int ret = -1;
+ if (!this)
+ return ret;
-int
-iot_opendir (call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_opendir_stub (frame, iot_opendir_wrapper, loc, fd);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create opendir stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
+ ret = xlator_mem_acct_init(this, gf_iot_mt_end + 1);
- ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (opendir, frame, -1, -ret, NULL);
+ if (ret != 0) {
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_NO_MEMORY,
+ NULL);
+ return ret;
+ }
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ return ret;
}
-
int
-iot_fsyncdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno)
+iot_priv_dump(xlator_t *this)
{
- STACK_UNWIND_STRICT (fsyncdir, frame, op_ret, op_errno);
- return 0;
-}
-
+ iot_conf_t *conf = NULL;
+ char key_prefix[GF_DUMP_MAX_BUF_LEN];
+ char key[GF_DUMP_MAX_BUF_LEN];
+ int i = 0;
-int
-iot_fsyncdir_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd,
- int datasync)
-{
- STACK_WIND (frame, iot_fsyncdir_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->fsyncdir, fd, datasync);
+ if (!this)
return 0;
-}
-
-
-int
-iot_fsyncdir (call_frame_t *frame, xlator_t *this, fd_t *fd, int datasync)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_fsyncdir_stub (frame, iot_fsyncdir_wrapper, fd, datasync);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create fsyncdir stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (fsyncdir, frame, -1, -ret);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
+ conf = this->private;
+ if (!conf)
return 0;
-}
+ snprintf(key_prefix, GF_DUMP_MAX_BUF_LEN, "%s.%s", this->type, this->name);
-int
-iot_statfs_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct statvfs *buf)
-{
- STACK_UNWIND_STRICT (statfs, frame, op_ret, op_errno, buf);
- return 0;
-}
+ gf_proc_dump_add_section("%s", key_prefix);
+ gf_proc_dump_write("maximum_threads_count", "%d", conf->max_count);
+ gf_proc_dump_write("current_threads_count", "%d", conf->curr_count);
+ gf_proc_dump_write("sleep_count", "%d", conf->sleep_count);
+ gf_proc_dump_write("idle_time", "%d", conf->idle_time);
+ gf_proc_dump_write("stack_size", "%zd", conf->stack_size);
+ gf_proc_dump_write("max_high_priority_threads", "%d",
+ conf->ac_iot_limit[GF_FOP_PRI_HI]);
+ gf_proc_dump_write("max_normal_priority_threads", "%d",
+ conf->ac_iot_limit[GF_FOP_PRI_NORMAL]);
+ gf_proc_dump_write("max_low_priority_threads", "%d",
+ conf->ac_iot_limit[GF_FOP_PRI_LO]);
+ gf_proc_dump_write("max_least_priority_threads", "%d",
+ conf->ac_iot_limit[GF_FOP_PRI_LEAST]);
+ gf_proc_dump_write("current_high_priority_threads", "%d",
+ conf->ac_iot_count[GF_FOP_PRI_HI]);
+ gf_proc_dump_write("current_normal_priority_threads", "%d",
+ conf->ac_iot_count[GF_FOP_PRI_NORMAL]);
+ gf_proc_dump_write("current_low_priority_threads", "%d",
+ conf->ac_iot_count[GF_FOP_PRI_LO]);
+ gf_proc_dump_write("current_least_priority_threads", "%d",
+ conf->ac_iot_count[GF_FOP_PRI_LEAST]);
+ for (i = 0; i < GF_FOP_PRI_MAX; i++) {
+ if (!conf->queue_sizes[i])
+ continue;
+ snprintf(key, sizeof(key), "%s_priority_queue_length",
+ iot_get_pri_meaning(i));
+ gf_proc_dump_write(key, "%d", conf->queue_sizes[i]);
+ }
-int
-iot_statfs_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc)
-{
- STACK_WIND (frame, iot_statfs_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->statfs, loc);
- return 0;
+ return 0;
}
+/*
+ * We use a decay model to keep track and make sure we're not spawning new
+ * threads too often. Each increment adds a large value to a counter, and that
+ * counter keeps ticking back down to zero over a fairly long period. For
+ * example, let's use ONE_WEEK=604800 seconds, and we want to detect when we
+ * have N=3 increments during that time. Thus, our threshold is
+ * (N-1)*ONE_WEEK. To see how it works, look at three examples.
+ *
+ * (a) Two events close together, then one more almost a week later. The
+ * first two events push our counter to 2*ONE_WEEK plus a bit. At the third
+ * event, we decay down to ONE_WEEK plus a bit and then add ONE_WEEK for the
+ * new event, exceeding our threshold.
+ *
+ * (b) One event, then two more almost a week later. At the time of the
+ * second and third events, the counter is already non-zero, so when we add
+ * 2*ONE_WEEK we exceed again.
+ *
+ * (c) Three events, spaced three days apart. At the time of the second
+ * event, we decay down to approxitely ONE_WEEK*4/7 and then add another
+ * ONE_WEEK. At the third event, we decay again down to ONE_WEEK*8/7 and add
+ * another ONE_WEEK, so boom.
+ *
+ * Note that in all three cases if that last event came a day later our counter
+ * would have decayed a bit more and we would *not* exceed our threshold. It's
+ * not exactly the same as a precise "three in one week" limit, but it's very
+ * close and it allows the same kind of tweaking while requiring only constant
+ * space - no arrays of variable length N to allocate or maintain. All we need
+ * (for each queue) is the value plus the time of the last update.
+ */
-int
-iot_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_statfs_stub (frame, iot_statfs_wrapper, loc);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create statfs stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_unordered ((iot_conf_t *)this->private, loc->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (statfs, frame, -1, -ret, NULL);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
+typedef struct {
+ time_t update_time;
+ uint32_t value;
+} threshold_t;
+/*
+ * Variables so that I can hack these for testing.
+ * TBD: make these tunable?
+ */
+static uint32_t THRESH_SECONDS = 604800;
+static uint32_t THRESH_EVENTS = 3;
+static uint32_t THRESH_LIMIT = 1209600; /* SECONDS * (EVENTS-1) */
+
+static void
+iot_apply_event(xlator_t *this, threshold_t *thresh)
+{
+ time_t delta, now = gf_time();
+
+ /* Refresh for manual testing/debugging. It's cheap. */
+ THRESH_LIMIT = THRESH_SECONDS * (THRESH_EVENTS - 1);
+
+ if (thresh->value && thresh->update_time) {
+ delta = now - thresh->update_time;
+ /* Be careful about underflow. */
+ if (thresh->value <= delta) {
+ thresh->value = 0;
+ } else {
+ thresh->value -= delta;
+ }
+ }
+
+ thresh->value += THRESH_SECONDS;
+ if (thresh->value >= THRESH_LIMIT) {
+ gf_log(this->name, GF_LOG_EMERG, "watchdog firing too often");
+ /*
+ * The default action for SIGTRAP is to dump core, but the fact
+ * that it's distinct from other signals we use means that
+ * there are other possibilities as well (e.g. drop into gdb or
+ * invoke a special handler).
+ */
+ kill(getpid(), SIGTRAP);
+ }
+
+ thresh->update_time = now;
+}
+
+static void *
+iot_watchdog(void *arg)
+{
+ xlator_t *this = arg;
+ iot_conf_t *priv = this->private;
+ int i;
+ int bad_times[GF_FOP_PRI_MAX] = {
+ 0,
+ };
+ threshold_t thresholds[GF_FOP_PRI_MAX] = {{
+ 0,
+ }};
+
+ for (;;) {
+ sleep(max(priv->watchdog_secs / 5, 1));
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ pthread_mutex_lock(&priv->mutex);
+ for (i = 0; i < GF_FOP_PRI_MAX; ++i) {
+ if (priv->queue_marked[i]) {
+ if (++bad_times[i] >= 5) {
+ gf_log(this->name, GF_LOG_WARNING, "queue %d stalled", i);
+ iot_apply_event(this, &thresholds[i]);
+ /*
+ * We might not get here if the event
+ * put us over our threshold.
+ */
+ ++(priv->ac_iot_limit[i]);
+ bad_times[i] = 0;
}
+ } else {
+ bad_times[i] = 0;
+ }
+ priv->queue_marked[i] = (priv->queue_sizes[i] > 0);
}
- return 0;
-}
-
+ pthread_mutex_unlock(&priv->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ }
-int
-iot_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno)
-{
- STACK_UNWIND_STRICT (setxattr, frame, op_ret, op_errno);
- return 0;
+ /* NOTREACHED */
+ return NULL;
}
-
-int
-iot_setxattr_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,
- dict_t *dict, int32_t flags)
+static void
+start_iot_watchdog(xlator_t *this)
{
- STACK_WIND (frame, iot_setxattr_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->setxattr, loc, dict, flags);
- return 0;
-}
-
-
-int
-iot_setxattr (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
- int32_t flags)
-{
- call_stub_t *stub = NULL;
- fd_t *fd = NULL;
- int ret = -1;
-
- stub = fop_setxattr_stub (frame, iot_setxattr_wrapper, loc, dict,
- flags);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create setxattr stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
+ iot_conf_t *priv = this->private;
+ int ret;
- fd = fd_lookup (loc->inode, frame->root->pid);
- if (fd == NULL)
- ret = iot_schedule_unordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- else {
- ret = iot_schedule_ordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- fd_unref (fd);
- }
-
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (setxattr, frame, -1, -ret);
+ if (priv->watchdog_running) {
+ return;
+ }
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ ret = pthread_create(&priv->watchdog_thread, NULL, iot_watchdog, this);
+ if (ret == 0) {
+ priv->watchdog_running = _gf_true;
+ } else {
+ gf_log(this->name, GF_LOG_WARNING,
+ "pthread_create(iot_watchdog) failed");
+ }
}
-
-int
-iot_getxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, dict_t *dict)
+static void
+stop_iot_watchdog(xlator_t *this)
{
- STACK_UNWIND_STRICT (getxattr, frame, op_ret, op_errno, dict);
- return 0;
-}
-
+ iot_conf_t *priv = this->private;
-int
-iot_getxattr_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,
- const char *name)
-{
- STACK_WIND (frame, iot_getxattr_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->getxattr, loc, name);
- return 0;
-}
-
-
-int
-iot_getxattr (call_frame_t *frame, xlator_t *this, loc_t *loc,
- const char *name)
-{
- call_stub_t *stub = NULL;
- fd_t *fd = NULL;
- int ret = -1;
-
- stub = fop_getxattr_stub (frame, iot_getxattr_wrapper, loc, name);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create getxattr stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
+ if (!priv->watchdog_running) {
+ return;
+ }
- fd = fd_lookup (loc->inode, frame->root->pid);
- if (!fd)
- ret = iot_schedule_unordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- else {
- ret = iot_schedule_ordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- fd_unref (fd);
- }
+ if (pthread_cancel(priv->watchdog_thread) != 0) {
+ gf_log(this->name, GF_LOG_WARNING,
+ "pthread_cancel(iot_watchdog) failed");
+ }
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (getxattr, frame, -1, -ret, NULL);
+ if (pthread_join(priv->watchdog_thread, NULL) != 0) {
+ gf_log(this->name, GF_LOG_WARNING, "pthread_join(iot_watchdog) failed");
+ }
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ /* Failure probably means it's already dead. */
+ priv->watchdog_running = _gf_false;
}
-
int
-iot_fgetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, dict_t *dict)
+reconfigure(xlator_t *this, dict_t *options)
{
- STACK_UNWIND_STRICT (fgetxattr, frame, op_ret, op_errno, dict);
- return 0;
-}
+ iot_conf_t *conf = NULL;
+ int ret = -1;
+ conf = this->private;
+ if (!conf)
+ goto out;
-int
-iot_fgetxattr_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd,
- const char *name)
-{
- STACK_WIND (frame, iot_fgetxattr_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->fgetxattr, fd, name);
- return 0;
-}
+ GF_OPTION_RECONF("thread-count", conf->max_count, options, int32, out);
+ GF_OPTION_RECONF("high-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_HI],
+ options, int32, out);
-int
-iot_fgetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd,
- const char *name)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_fgetxattr_stub (frame, iot_fgetxattr_wrapper, fd, name);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create fgetxattr stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
+ GF_OPTION_RECONF("normal-prio-threads",
+ conf->ac_iot_limit[GF_FOP_PRI_NORMAL], options, int32,
+ out);
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (fgetxattr, frame, -1, -ret, NULL);
+ GF_OPTION_RECONF("low-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LO],
+ options, int32, out);
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
-}
+ GF_OPTION_RECONF("least-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LEAST],
+ options, int32, out);
+ GF_OPTION_RECONF("enable-least-priority", conf->least_priority, options,
+ bool, out);
-int
-iot_fsetxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno)
-{
- STACK_UNWIND_STRICT (fsetxattr, frame, op_ret, op_errno);
- return 0;
-}
+ GF_OPTION_RECONF("cleanup-disconnected-reqs",
+ conf->cleanup_disconnected_reqs, options, bool, out);
+ GF_OPTION_RECONF("watchdog-secs", conf->watchdog_secs, options, int32, out);
-int
-iot_fsetxattr_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd,
- dict_t *dict, int32_t flags)
-{
- STACK_WIND (frame, iot_fsetxattr_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->fsetxattr, fd, dict, flags);
- return 0;
-}
+ GF_OPTION_RECONF("pass-through", this->pass_through, options, bool, out);
+ if (conf->watchdog_secs > 0) {
+ start_iot_watchdog(this);
+ } else {
+ stop_iot_watchdog(this);
+ }
-int
-iot_fsetxattr (call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict,
- int32_t flags)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_fsetxattr_stub (frame, iot_fsetxattr_wrapper, fd, dict,
- flags);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create fsetxattr stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
+ ret = 0;
out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (fsetxattr, frame, -1, -ret);
-
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ return ret;
}
-
int
-iot_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno)
+init(xlator_t *this)
{
- STACK_UNWIND_STRICT (removexattr, frame, op_ret, op_errno);
- return 0;
-}
+ iot_conf_t *conf = NULL;
+ int ret = -1;
+ int i = 0;
+ if (!this->children || this->children->next) {
+ gf_smsg("io-threads", GF_LOG_ERROR, 0,
+ IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED, NULL);
+ goto out;
+ }
-int
-iot_removexattr_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,
- const char *name)
-{
- STACK_WIND (frame, iot_removexattr_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->removexattr, loc, name);
- return 0;
-}
+ if (!this->parents) {
+ gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_VOL_MISCONFIGURED,
+ NULL);
+ }
+ conf = (void *)GF_CALLOC(1, sizeof(*conf), gf_iot_mt_iot_conf_t);
+ if (conf == NULL) {
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_OUT_OF_MEMORY,
+ NULL);
+ goto out;
+ }
-int
-iot_removexattr (call_frame_t *frame, xlator_t *this, loc_t *loc,
- const char *name)
-{
- call_stub_t *stub = NULL;
- fd_t *fd = NULL;
- int ret = -1;
-
- stub = fop_removexattr_stub (frame, iot_removexattr_wrapper, loc,
- name);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR,"cannot get removexattr fop"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
+ if ((ret = pthread_cond_init(&conf->cond, NULL)) != 0) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_PTHREAD_INIT_FAILED,
+ "pthread_cond_init ret=%d", ret, NULL);
+ goto out;
+ }
+ conf->cond_inited = _gf_true;
- fd = fd_lookup (loc->inode, frame->root->pid);
- if (!fd)
- ret = iot_schedule_unordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- else {
- ret = iot_schedule_ordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- fd_unref (fd);
- }
+ if ((ret = pthread_mutex_init(&conf->mutex, NULL)) != 0) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_PTHREAD_INIT_FAILED,
+ "pthread_mutex_init ret=%d", ret, NULL);
+ goto out;
+ }
+ conf->mutex_inited = _gf_true;
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (removexattr, frame, -1, -ret);
+ ret = set_stack_size(conf);
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
-}
+ if (ret != 0)
+ goto out;
+ ret = -1;
-int
-iot_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, gf_dirent_t *entries)
-{
- STACK_UNWIND_STRICT (readdirp, frame, op_ret, op_errno, entries);
- return 0;
-}
+ GF_OPTION_INIT("thread-count", conf->max_count, int32, out);
+ GF_OPTION_INIT("high-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_HI],
+ int32, out);
-int
-iot_readdirp_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd,
- size_t size, off_t offset)
-{
- STACK_WIND (frame, iot_readdirp_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->readdirp, fd, size, offset);
- return 0;
-}
+ GF_OPTION_INIT("normal-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_NORMAL],
+ int32, out);
+ GF_OPTION_INIT("low-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LO], int32,
+ out);
-int
-iot_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
- off_t offset)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_readdirp_stub (frame, iot_readdirp_wrapper, fd, size,
- offset);
- if (!stub) {
- gf_log (this->private, GF_LOG_ERROR,"cannot get readdir stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
+ GF_OPTION_INIT("least-prio-threads", conf->ac_iot_limit[GF_FOP_PRI_LEAST],
+ int32, out);
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (readdirp, frame, -1, -ret, NULL);
+ GF_OPTION_INIT("idle-time", conf->idle_time, int32, out);
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
-}
+ GF_OPTION_INIT("enable-least-priority", conf->least_priority, bool, out);
+ GF_OPTION_INIT("cleanup-disconnected-reqs", conf->cleanup_disconnected_reqs,
+ bool, out);
-int
-iot_readdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, gf_dirent_t *entries)
-{
- STACK_UNWIND_STRICT (readdir, frame, op_ret, op_errno, entries);
- return 0;
-}
+ GF_OPTION_INIT("pass-through", this->pass_through, bool, out);
+ conf->this = this;
+ GF_ATOMIC_INIT(conf->stub_cnt, 0);
-int
-iot_readdir_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd,
- size_t size, off_t offset)
-{
- STACK_WIND (frame, iot_readdir_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->readdir, fd, size, offset);
- return 0;
-}
+ for (i = 0; i < GF_FOP_PRI_MAX; i++) {
+ INIT_LIST_HEAD(&conf->clients[i]);
+ INIT_LIST_HEAD(&conf->no_client[i].clients);
+ INIT_LIST_HEAD(&conf->no_client[i].reqs);
+ }
+ if (!this->pass_through) {
+ ret = iot_workers_scale(conf);
-int
-iot_readdir (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
- off_t offset)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_readdir_stub (frame, iot_readdir_wrapper, fd, size, offset);
- if (!stub) {
- gf_log (this->private, GF_LOG_ERROR,"cannot get readdir stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
+ if (ret == -1) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ IO_THREADS_MSG_WORKER_THREAD_INIT_FAILED, NULL);
+ goto out;
}
+ }
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
+ this->private = conf;
+
+ conf->watchdog_secs = 0;
+ GF_OPTION_INIT("watchdog-secs", conf->watchdog_secs, int32, out);
+ if (conf->watchdog_secs > 0) {
+ start_iot_watchdog(this);
+ }
+
+ ret = 0;
out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (readdir, frame, -1, -ret, NULL);
+ if (ret)
+ GF_FREE(conf);
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ return ret;
}
-
-int
-iot_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, dict_t *xattr)
+static void
+iot_exit_threads(iot_conf_t *conf)
{
- STACK_UNWIND_STRICT (xattrop, frame, op_ret, op_errno, xattr);
- return 0;
+ pthread_mutex_lock(&conf->mutex);
+ {
+ conf->down = _gf_true;
+ /*Let all the threads know that xl is going down*/
+ pthread_cond_broadcast(&conf->cond);
+ while (conf->curr_count) /*Wait for threads to exit*/
+ pthread_cond_wait(&conf->cond, &conf->mutex);
+ }
+ pthread_mutex_unlock(&conf->mutex);
}
-
int
-iot_xattrop_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc,
- gf_xattrop_flags_t optype, dict_t *xattr)
+notify(xlator_t *this, int32_t event, void *data, ...)
{
- STACK_WIND (frame, iot_xattrop_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->xattrop, loc, optype, xattr);
- return 0;
-}
+ iot_conf_t *conf = this->private;
+ xlator_t *victim = data;
+ uint64_t stub_cnt = 0;
+ struct timespec sleep_till = {
+ 0,
+ };
+ if (GF_EVENT_PARENT_DOWN == event) {
+ if (victim->cleanup_starting) {
+ /* Wait for draining stub from queue before notify PARENT_DOWN */
+ stub_cnt = GF_ATOMIC_GET(conf->stub_cnt);
+ if (stub_cnt) {
+ timespec_now_realtime(&sleep_till);
+ sleep_till.tv_sec += 1;
+ pthread_mutex_lock(&conf->mutex);
+ {
+ while (stub_cnt) {
+ (void)pthread_cond_timedwait(&conf->cond, &conf->mutex,
+ &sleep_till);
+ stub_cnt = GF_ATOMIC_GET(conf->stub_cnt);
+ }
+ }
+ pthread_mutex_unlock(&conf->mutex);
+ }
-int
-iot_xattrop (call_frame_t *frame, xlator_t *this, loc_t *loc,
- gf_xattrop_flags_t optype, dict_t *xattr)
-{
- call_stub_t *stub = NULL;
- fd_t *fd = NULL;
- int ret = -1;
-
- stub = fop_xattrop_stub (frame, iot_xattrop_wrapper, loc, optype,
- xattr);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create xattrop stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
+ gf_log(this->name, GF_LOG_INFO,
+ "Notify GF_EVENT_PARENT_DOWN for brick %s", victim->name);
+ } else {
+ iot_exit_threads(conf);
}
+ }
- fd = fd_lookup (loc->inode, frame->root->pid);
- if (!fd)
- ret = iot_schedule_unordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- else {
- ret = iot_schedule_ordered ((iot_conf_t *)this->private,
- loc->inode, stub);
- fd_unref (fd);
+ if (GF_EVENT_CHILD_DOWN == event) {
+ if (victim->cleanup_starting) {
+ iot_exit_threads(conf);
+ gf_log(this->name, GF_LOG_INFO,
+ "Notify GF_EVENT_CHILD_DOWN for brick %s", victim->name);
}
+ }
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (xattrop, frame, -1, -ret, NULL);
+ default_notify(this, event, data);
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
+ return 0;
}
-
-int
-iot_fxattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, dict_t *xattr)
-{
- STACK_UNWIND_STRICT (fxattrop, frame, op_ret, op_errno, xattr);
- return 0;
-}
-
-int
-iot_fxattrop_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd,
- gf_xattrop_flags_t optype, dict_t *xattr)
-{
- STACK_WIND (frame, iot_fxattrop_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->fxattrop, fd, optype, xattr);
- return 0;
-}
-
-int
-iot_fxattrop (call_frame_t *frame, xlator_t *this, fd_t *fd,
- gf_xattrop_flags_t optype, dict_t *xattr)
-{
- call_stub_t *stub = NULL;
- int ret = -1;
-
- stub = fop_fxattrop_stub (frame, iot_fxattrop_wrapper, fd, optype,
- xattr);
- if (!stub) {
- gf_log (this->name, GF_LOG_ERROR, "cannot create fxattrop stub"
- "(out of memory)");
- ret = -ENOMEM;
- goto out;
- }
-
- ret = iot_schedule_ordered ((iot_conf_t *)this->private, fd->inode,
- stub);
-out:
- if (ret < 0) {
- STACK_UNWIND_STRICT (fxattrop, frame, -1, -ret, NULL);
- if (stub != NULL) {
- call_stub_destroy (stub);
- }
- }
- return 0;
-}
-
-
-int
-__iot_workers_scale (iot_conf_t *conf)
+void
+fini(xlator_t *this)
{
- int log2 = 0;
- int scale = 0;
- int diff = 0;
- pthread_t thread;
- int ret = 0;
+ iot_conf_t *conf = this->private;
- log2 = log_base2 (conf->queue_size);
-
- scale = log2;
+ if (!conf)
+ return;
- if (log2 < IOT_MIN_THREADS)
- scale = IOT_MIN_THREADS;
+ if (conf->mutex_inited && conf->cond_inited)
+ iot_exit_threads(conf);
- if (log2 > conf->max_count)
- scale = conf->max_count;
+ if (conf->cond_inited)
+ pthread_cond_destroy(&conf->cond);
- if (conf->curr_count < scale) {
- diff = scale - conf->curr_count;
- }
+ if (conf->mutex_inited)
+ pthread_mutex_destroy(&conf->mutex);
- while (diff) {
- diff --;
+ stop_iot_watchdog(this);
- ret = pthread_create (&thread, &conf->w_attr, iot_worker, conf);
- if (ret == 0) {
- conf->curr_count++;
- gf_log (conf->this->name, GF_LOG_DEBUG,
- "scaled threads to %d (queue_size=%d/%d)",
- conf->curr_count, conf->queue_size, scale);
- } else {
- break;
- }
- }
+ GF_FREE(conf);
- return diff;
+ this->private = NULL;
+ return;
}
-
int
-iot_workers_scale (iot_conf_t *conf)
+iot_client_destroy(xlator_t *this, client_t *client)
{
- int ret = -1;
+ void *tmp = NULL;
- if (conf == NULL) {
- ret = -EINVAL;
- goto out;
- }
+ if (client_ctx_del(client, this, &tmp) == 0) {
+ GF_FREE(tmp);
+ }
- pthread_mutex_lock (&conf->mutex);
- {
- ret = __iot_workers_scale (conf);
- }
- pthread_mutex_unlock (&conf->mutex);
-
-out:
- return ret;
+ return 0;
}
-
-void
-set_stack_size (iot_conf_t *conf)
+static int
+iot_disconnect_cbk(xlator_t *this, client_t *client)
{
- int err = 0;
- size_t stacksize = IOT_THREAD_STACK_SIZE;
+ int i;
+ call_stub_t *curr;
+ call_stub_t *next;
+ iot_conf_t *conf = this->private;
+ iot_client_ctx_t *ctx;
- pthread_attr_init (&conf->w_attr);
- err = pthread_attr_setstacksize (&conf->w_attr, stacksize);
- if (err == EINVAL) {
- gf_log (conf->this->name, GF_LOG_WARNING,
- "Using default thread stack size");
- }
-}
-
-int32_t
-mem_acct_init (xlator_t *this)
-{
- int ret = -1;
-
- if (!this)
- return ret;
-
- ret = xlator_mem_acct_init (this, gf_iot_mt_end + 1);
-
- if (ret != 0) {
- gf_log (this->name, GF_LOG_ERROR, "Memory accounting init"
- "failed");
- return ret;
- }
-
- return ret;
-}
-
-int
-init (xlator_t *this)
-{
- iot_conf_t *conf = NULL;
- dict_t *options = this->options;
- int thread_count = IOT_DEFAULT_THREADS;
- int idle_time = IOT_DEFAULT_IDLE;
- int ret = -1;
-
- if (!this->children || this->children->next) {
- gf_log ("io-threads", GF_LOG_ERROR,
- "FATAL: iot not configured with exactly one child");
- goto out;
- }
-
- if (!this->parents) {
- gf_log (this->name, GF_LOG_WARNING,
- "dangling volume. check volfile ");
- }
-
- conf = (void *) GF_CALLOC (1, sizeof (*conf),
- gf_iot_mt_iot_conf_t);
- if (conf == NULL) {
- gf_log (this->name, GF_LOG_ERROR,
- "out of memory");
- goto out;
- }
-
- set_stack_size (conf);
-
- thread_count = IOT_DEFAULT_THREADS;
-
- if (dict_get (options, "thread-count")) {
- thread_count = data_to_int32 (dict_get (options,
- "thread-count"));
- if (thread_count < IOT_MIN_THREADS) {
- gf_log ("io-threads", GF_LOG_WARNING,
- "Number of threads opted is less then min"
- "threads allowed scaling it up to min");
- thread_count = IOT_MIN_THREADS;
- }
- if (thread_count > IOT_MAX_THREADS) {
- gf_log ("io-threads", GF_LOG_WARNING,
- "Number of threads opted is more then max"
- " threads allowed scaling it down to max");
- thread_count = IOT_MAX_THREADS;
- }
- }
- conf->max_count = thread_count;
+ if (!conf || !conf->cleanup_disconnected_reqs) {
+ goto out;
+ }
- if (dict_get (options, "idle-time")) {
- idle_time = data_to_int32 (dict_get (options,
- "idle-time"));
- if (idle_time < 0)
- idle_time = 1;
- }
- conf->idle_time = idle_time;
-
- conf->this = this;
-
- INIT_LIST_HEAD (&conf->req);
-
- ret = iot_workers_scale (conf);
-
- if (ret == -1) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot initialize worker threads, exiting init");
- GF_FREE (conf);
- goto out;
+ pthread_mutex_lock(&conf->mutex);
+ for (i = 0; i < GF_FOP_PRI_MAX; i++) {
+ ctx = &conf->no_client[i];
+ list_for_each_entry_safe(curr, next, &ctx->reqs, list)
+ {
+ if (curr->frame->root->client != client) {
+ continue;
+ }
+ gf_log(this->name, GF_LOG_INFO,
+ "poisoning %s fop at %p for client %s",
+ gf_fop_list[curr->fop], curr, client->client_uid);
+ curr->poison = _gf_true;
}
+ }
+ pthread_mutex_unlock(&conf->mutex);
- this->private = conf;
- ret = 0;
out:
- return ret;
+ return 0;
}
+struct xlator_dumpops dumpops = {
+ .priv = iot_priv_dump,
+};
-void
-fini (xlator_t *this)
-{
- iot_conf_t *conf = this->private;
-
- GF_FREE (conf);
-
- this->private = NULL;
- return;
-}
-
-/*
- * O - Goes to ordered threadpool.
- * U - Goes to un-ordered threadpool.
- * V - Variable, depends on whether the file is open.
- * If it is, then goes to ordered, otherwise to
- * un-ordered.
- */
struct xlator_fops fops = {
- .open = iot_open, /* U */
- .create = iot_create, /* U */
- .readv = iot_readv, /* O */
- .writev = iot_writev, /* O */
- .flush = iot_flush, /* O */
- .fsync = iot_fsync, /* O */
- .lk = iot_lk, /* O */
- .stat = iot_stat, /* V */
- .fstat = iot_fstat, /* O */
- .truncate = iot_truncate, /* V */
- .ftruncate = iot_ftruncate, /* O */
- .unlink = iot_unlink, /* U */
- .lookup = iot_lookup, /* U */
- .setattr = iot_setattr, /* U */
- .fsetattr = iot_fsetattr, /* O */
- .access = iot_access, /* U */
- .readlink = iot_readlink, /* U */
- .mknod = iot_mknod, /* U */
- .mkdir = iot_mkdir, /* U */
- .rmdir = iot_rmdir, /* U */
- .symlink = iot_symlink, /* U */
- .rename = iot_rename, /* U */
- .link = iot_link, /* U */
- .opendir = iot_opendir, /* U */
- .fsyncdir = iot_fsyncdir, /* O */
- .statfs = iot_statfs, /* U */
- .setxattr = iot_setxattr, /* U */
- .getxattr = iot_getxattr, /* U */
- .fgetxattr = iot_fgetxattr, /* O */
- .fsetxattr = iot_fsetxattr, /* O */
- .removexattr = iot_removexattr, /* U */
- .readdir = iot_readdir, /* O */
- .readdirp = iot_readdirp, /* O */
- .xattrop = iot_xattrop, /* U */
- .fxattrop = iot_fxattrop, /* O */
+ .open = iot_open,
+ .create = iot_create,
+ .readv = iot_readv,
+ .writev = iot_writev,
+ .flush = iot_flush,
+ .fsync = iot_fsync,
+ .lk = iot_lk,
+ .stat = iot_stat,
+ .fstat = iot_fstat,
+ .truncate = iot_truncate,
+ .ftruncate = iot_ftruncate,
+ .unlink = iot_unlink,
+ .lookup = iot_lookup,
+ .setattr = iot_setattr,
+ .fsetattr = iot_fsetattr,
+ .access = iot_access,
+ .readlink = iot_readlink,
+ .mknod = iot_mknod,
+ .mkdir = iot_mkdir,
+ .rmdir = iot_rmdir,
+ .symlink = iot_symlink,
+ .rename = iot_rename,
+ .link = iot_link,
+ .opendir = iot_opendir,
+ .fsyncdir = iot_fsyncdir,
+ .statfs = iot_statfs,
+ .setxattr = iot_setxattr,
+ .getxattr = iot_getxattr,
+ .fgetxattr = iot_fgetxattr,
+ .fsetxattr = iot_fsetxattr,
+ .removexattr = iot_removexattr,
+ .fremovexattr = iot_fremovexattr,
+ .readdir = iot_readdir,
+ .readdirp = iot_readdirp,
+ .inodelk = iot_inodelk,
+ .finodelk = iot_finodelk,
+ .entrylk = iot_entrylk,
+ .fentrylk = iot_fentrylk,
+ .xattrop = iot_xattrop,
+ .fxattrop = iot_fxattrop,
+ .rchecksum = iot_rchecksum,
+ .fallocate = iot_fallocate,
+ .discard = iot_discard,
+ .zerofill = iot_zerofill,
+ .seek = iot_seek,
+ .lease = iot_lease,
+ .getactivelk = iot_getactivelk,
+ .setactivelk = iot_setactivelk,
+ .put = iot_put,
};
struct xlator_cbks cbks = {
+ .client_destroy = iot_client_destroy,
+ .client_disconnect = iot_disconnect_cbk,
};
struct volume_options options[] = {
- { .key = {"thread-count"},
- .type = GF_OPTION_TYPE_INT,
- .min = IOT_MIN_THREADS,
- .max = IOT_MAX_THREADS
- },
- {.key = {"idle-time"},
- .type = GF_OPTION_TYPE_INT,
- .min = 1,
- .max = 0x7fffffff,
- },
- { .key = {NULL},
- },
+ {.key = {"thread-count"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = IOT_MIN_THREADS,
+ .max = IOT_MAX_THREADS,
+ .default_value = "16",
+ .op_version = {1},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
+ .tags = {"io-threads"},
+ /*.option = "thread-count"*/
+ .description = "Number of threads in IO threads translator which "
+ "perform concurrent IO operations"
+
+ },
+ {.key = {"high-prio-threads"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = IOT_MIN_THREADS,
+ .max = IOT_MAX_THREADS,
+ .default_value = "16",
+ .op_version = {1},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
+ .tags = {"io-threads"},
+ .description = "Max number of threads in IO threads translator which "
+ "perform high priority IO operations at a given time"
+
+ },
+ {.key = {"normal-prio-threads"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = IOT_MIN_THREADS,
+ .max = IOT_MAX_THREADS,
+ .default_value = "16",
+ .op_version = {1},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
+ .tags = {"io-threads"},
+ .description = "Max number of threads in IO threads translator which "
+ "perform normal priority IO operations at a given time"
+
+ },
+ {.key = {"low-prio-threads"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = IOT_MIN_THREADS,
+ .max = IOT_MAX_THREADS,
+ .default_value = "16",
+ .op_version = {1},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
+ .tags = {"io-threads"},
+ .description = "Max number of threads in IO threads translator which "
+ "perform low priority IO operations at a given time"
+
+ },
+ {.key = {"least-prio-threads"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = IOT_MIN_THREADS,
+ .max = IOT_MAX_THREADS,
+ .default_value = "1",
+ .op_version = {1},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
+ .tags = {"io-threads"},
+ .description = "Max number of threads in IO threads translator which "
+ "perform least priority IO operations at a given time"},
+ {.key = {"enable-least-priority"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = SITE_H_ENABLE_LEAST_PRIORITY,
+ .op_version = {1},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
+ .tags = {"io-threads"},
+ .description = "Enable/Disable least priority"},
+ {
+ .key = {"idle-time"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 1,
+ .max = 0x7fffffff,
+ .default_value = "120",
+ },
+ {.key = {"watchdog-secs"},
+ .type = GF_OPTION_TYPE_INT,
+ .min = 0,
+ .default_value = 0,
+ .op_version = {GD_OP_VERSION_4_1_0},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
+ .tags = {"io-threads"},
+ .description = "Number of seconds a queue must be stalled before "
+ "starting an 'emergency' thread."},
+ {.key = {"cleanup-disconnected-reqs"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "off",
+ .op_version = {GD_OP_VERSION_4_1_0},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT,
+ .tags = {"io-threads"},
+ .description = "'Poison' queued requests when a client disconnects"},
+ {.key = {"pass-through"},
+ .type = GF_OPTION_TYPE_BOOL,
+ .default_value = "false",
+ .op_version = {GD_OP_VERSION_4_1_0},
+ .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC | OPT_FLAG_CLIENT_OPT,
+ .tags = {"io-threads"},
+ .description = "Enable/Disable io threads translator"},
+ {
+ .key = {NULL},
+ },
+};
+
+xlator_api_t xlator_api = {
+ .init = init,
+ .fini = fini,
+ .notify = notify,
+ .reconfigure = reconfigure,
+ .mem_acct_init = mem_acct_init,
+ .op_version = {1}, /* Present from the initial version */
+ .dumpops = &dumpops,
+ .fops = &fops,
+ .cbks = &cbks,
+ .options = options,
+ .identifier = "io-threads",
+ .category = GF_MAINTAINED,
};