summaryrefslogtreecommitdiffstats
path: root/xlators/performance/io-threads/src/io-threads.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
-rw-r--r--xlators/performance/io-threads/src/io-threads.c158
1 files changed, 93 insertions, 65 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index 80d42ded5eb..3d24cc97f4b 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -8,20 +8,20 @@
cases as published by the Free Software Foundation.
*/
-#include "call-stub.h"
-#include "defaults.h"
-#include "glusterfs.h"
-#include "logging.h"
-#include "dict.h"
-#include "xlator.h"
+#include <glusterfs/call-stub.h>
+#include <glusterfs/defaults.h>
+#include <glusterfs/glusterfs.h>
+#include <glusterfs/logging.h>
+#include <glusterfs/dict.h>
+#include <glusterfs/xlator.h>
#include "io-threads.h"
#include <signal.h>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
-#include "locking.h"
+#include <glusterfs/locking.h>
#include "io-threads-messages.h"
-#include "timespec.h"
+#include <glusterfs/timespec.h>
void *
iot_worker(void *arg);
@@ -61,7 +61,7 @@ iot_get_ctx(xlator_t *this, client_t *client)
int i;
if (client_ctx_get(client, this, (void **)&ctx) != 0) {
- ctx = GF_CALLOC(GF_FOP_PRI_MAX, sizeof(*ctx), gf_iot_mt_client_ctx_t);
+ ctx = GF_MALLOC(GF_FOP_PRI_MAX * sizeof(*ctx), gf_iot_mt_client_ctx_t);
if (ctx) {
for (i = 0; i < GF_FOP_PRI_MAX; ++i) {
INIT_LIST_HEAD(&ctx[i].clients);
@@ -275,7 +275,7 @@ iot_get_pri_meaning(gf_fop_pri_t pri)
name = "slow";
break;
case GF_FOP_PRI_LEAST:
- name = "least priority";
+ name = "least";
break;
case GF_FOP_PRI_MAX:
name = "invalid";
@@ -294,7 +294,9 @@ iot_schedule(call_frame_t *frame, xlator_t *this, call_stub_t *stub)
gf_fop_pri_t pri = GF_FOP_PRI_MAX - 1;
iot_conf_t *conf = this->private;
- if ((frame->root->pid < GF_CLIENT_PID_MAX) && conf->least_priority) {
+ if ((frame->root->pid < GF_CLIENT_PID_MAX) &&
+ (frame->root->pid != GF_CLIENT_PID_NO_ROOT_SQUASH) &&
+ conf->least_priority) {
pri = GF_FOP_PRI_LEAST;
goto out;
}
@@ -370,7 +372,7 @@ iot_schedule(call_frame_t *frame, xlator_t *this, call_stub_t *stub)
return -EINVAL;
}
out:
- gf_msg_debug(this->name, 0, "%s scheduled as %s fop",
+ gf_msg_debug(this->name, 0, "%s scheduled as %s priority fop",
gf_fop_list[stub->fop], iot_get_pri_meaning(pri));
if (this->private)
ret = do_iot_schedule(this->private, stub, pri);
@@ -612,7 +614,7 @@ iot_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name,
conf = this->private;
- if (conf && name && strcmp(name, IO_THREADS_QUEUE_SIZE_KEY) == 0) {
+ if (name && strcmp(name, IO_THREADS_QUEUE_SIZE_KEY) == 0) {
/*
* We explicitly do not want a reference count
* for this dict in this translator
@@ -813,9 +815,6 @@ __iot_workers_scale(iot_conf_t *conf)
pthread_t thread;
int ret = 0;
int i = 0;
- char thread_name[GF_THREAD_NAMEMAX] = {
- 0,
- };
for (i = 0; i < GF_FOP_PRI_MAX; i++)
scale += min(conf->queue_sizes[i], conf->ac_iot_limit[i]);
@@ -833,11 +832,10 @@ __iot_workers_scale(iot_conf_t *conf)
while (diff) {
diff--;
- snprintf(thread_name, sizeof(thread_name), "iotwr%03hx",
- (conf->curr_count & 0x3ff));
ret = gf_thread_create(&thread, &conf->w_attr, iot_worker, conf,
- thread_name);
+ "iotwr%03hx", conf->curr_count & 0x3ff);
if (ret == 0) {
+ pthread_detach(thread);
conf->curr_count++;
gf_msg_debug(conf->this->name, 0,
"scaled threads to %d (queue_size=%d/%d)",
@@ -881,8 +879,8 @@ set_stack_size(iot_conf_t *conf)
err = pthread_attr_init(&conf->w_attr);
if (err != 0) {
- gf_msg(this->name, GF_LOG_ERROR, err, IO_THREADS_MSG_INIT_FAILED,
- "Thread attribute initialization failed");
+ gf_smsg(this->name, GF_LOG_ERROR, err, IO_THREADS_MSG_INIT_FAILED,
+ NULL);
return err;
}
@@ -890,11 +888,11 @@ set_stack_size(iot_conf_t *conf)
if (err == EINVAL) {
err = pthread_attr_getstacksize(&conf->w_attr, &stacksize);
if (!err) {
- gf_msg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET,
- "Using default thread stack size %zd", stacksize);
+ gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET,
+ "size=%zd", stacksize, NULL);
} else {
- gf_msg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET,
- "Using default thread stack size");
+ gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET,
+ NULL);
err = 0;
}
}
@@ -914,8 +912,8 @@ mem_acct_init(xlator_t *this)
ret = xlator_mem_acct_init(this, gf_iot_mt_end + 1);
if (ret != 0) {
- gf_msg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_NO_MEMORY,
- "Memory accounting init failed");
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_NO_MEMORY,
+ NULL);
return ret;
}
@@ -927,6 +925,8 @@ iot_priv_dump(xlator_t *this)
{
iot_conf_t *conf = NULL;
char key_prefix[GF_DUMP_MAX_BUF_LEN];
+ char key[GF_DUMP_MAX_BUF_LEN];
+ int i = 0;
if (!this)
return 0;
@@ -944,14 +944,29 @@ iot_priv_dump(xlator_t *this)
gf_proc_dump_write("sleep_count", "%d", conf->sleep_count);
gf_proc_dump_write("idle_time", "%d", conf->idle_time);
gf_proc_dump_write("stack_size", "%zd", conf->stack_size);
- gf_proc_dump_write("high_priority_threads", "%d",
+ gf_proc_dump_write("max_high_priority_threads", "%d",
conf->ac_iot_limit[GF_FOP_PRI_HI]);
- gf_proc_dump_write("normal_priority_threads", "%d",
+ gf_proc_dump_write("max_normal_priority_threads", "%d",
conf->ac_iot_limit[GF_FOP_PRI_NORMAL]);
- gf_proc_dump_write("low_priority_threads", "%d",
+ gf_proc_dump_write("max_low_priority_threads", "%d",
conf->ac_iot_limit[GF_FOP_PRI_LO]);
- gf_proc_dump_write("least_priority_threads", "%d",
+ gf_proc_dump_write("max_least_priority_threads", "%d",
conf->ac_iot_limit[GF_FOP_PRI_LEAST]);
+ gf_proc_dump_write("current_high_priority_threads", "%d",
+ conf->ac_iot_count[GF_FOP_PRI_HI]);
+ gf_proc_dump_write("current_normal_priority_threads", "%d",
+ conf->ac_iot_count[GF_FOP_PRI_NORMAL]);
+ gf_proc_dump_write("current_low_priority_threads", "%d",
+ conf->ac_iot_count[GF_FOP_PRI_LO]);
+ gf_proc_dump_write("current_least_priority_threads", "%d",
+ conf->ac_iot_count[GF_FOP_PRI_LEAST]);
+ for (i = 0; i < GF_FOP_PRI_MAX; i++) {
+ if (!conf->queue_sizes[i])
+ continue;
+ snprintf(key, sizeof(key), "%s_priority_queue_length",
+ iot_get_pri_meaning(i));
+ gf_proc_dump_write(key, "%d", conf->queue_sizes[i]);
+ }
return 0;
}
@@ -987,8 +1002,8 @@ iot_priv_dump(xlator_t *this)
*/
typedef struct {
- uint32_t value;
time_t update_time;
+ uint32_t value;
} threshold_t;
/*
* Variables so that I can hack these for testing.
@@ -1001,16 +1016,13 @@ static uint32_t THRESH_LIMIT = 1209600; /* SECONDS * (EVENTS-1) */
static void
iot_apply_event(xlator_t *this, threshold_t *thresh)
{
- struct timespec now;
- time_t delta;
+ time_t delta, now = gf_time();
/* Refresh for manual testing/debugging. It's cheap. */
THRESH_LIMIT = THRESH_SECONDS * (THRESH_EVENTS - 1);
- timespec_now(&now);
-
if (thresh->value && thresh->update_time) {
- delta = now.tv_sec - thresh->update_time;
+ delta = now - thresh->update_time;
/* Be careful about underflow. */
if (thresh->value <= delta) {
thresh->value = 0;
@@ -1031,7 +1043,7 @@ iot_apply_event(xlator_t *this, threshold_t *thresh)
kill(getpid(), SIGTRAP);
}
- thresh->update_time = now.tv_sec;
+ thresh->update_time = now;
}
static void *
@@ -1171,35 +1183,33 @@ init(xlator_t *this)
int i = 0;
if (!this->children || this->children->next) {
- gf_msg("io-threads", GF_LOG_ERROR, 0,
- IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED,
- "FATAL: iot not configured "
- "with exactly one child");
+ gf_smsg("io-threads", GF_LOG_ERROR, 0,
+ IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED, NULL);
goto out;
}
if (!this->parents) {
- gf_msg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_VOL_MISCONFIGURED,
- "dangling volume. check volfile ");
+ gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_VOL_MISCONFIGURED,
+ NULL);
}
conf = (void *)GF_CALLOC(1, sizeof(*conf), gf_iot_mt_iot_conf_t);
if (conf == NULL) {
- gf_msg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_NO_MEMORY,
- "out of memory");
+ gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_OUT_OF_MEMORY,
+ NULL);
goto out;
}
if ((ret = pthread_cond_init(&conf->cond, NULL)) != 0) {
- gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED,
- "pthread_cond_init failed (%d)", ret);
+ gf_smsg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_PTHREAD_INIT_FAILED,
+ "pthread_cond_init ret=%d", ret, NULL);
goto out;
}
conf->cond_inited = _gf_true;
if ((ret = pthread_mutex_init(&conf->mutex, NULL)) != 0) {
- gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED,
- "pthread_mutex_init failed (%d)", ret);
+ gf_smsg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_PTHREAD_INIT_FAILED,
+ "pthread_mutex_init ret=%d", ret, NULL);
goto out;
}
conf->mutex_inited = _gf_true;
@@ -1243,12 +1253,14 @@ init(xlator_t *this)
INIT_LIST_HEAD(&conf->no_client[i].reqs);
}
- ret = iot_workers_scale(conf);
+ if (!this->pass_through) {
+ ret = iot_workers_scale(conf);
- if (ret == -1) {
- gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED,
- "cannot initialize worker threads, exiting init");
- goto out;
+ if (ret == -1) {
+ gf_smsg(this->name, GF_LOG_ERROR, 0,
+ IO_THREADS_MSG_WORKER_THREAD_INIT_FAILED, NULL);
+ goto out;
+ }
}
this->private = conf;
@@ -1293,20 +1305,21 @@ notify(xlator_t *this, int32_t event, void *data, ...)
if (GF_EVENT_PARENT_DOWN == event) {
if (victim->cleanup_starting) {
- clock_gettime(CLOCK_REALTIME, &sleep_till);
- sleep_till.tv_sec += 1;
/* Wait for draining stub from queue before notify PARENT_DOWN */
stub_cnt = GF_ATOMIC_GET(conf->stub_cnt);
-
- pthread_mutex_lock(&conf->mutex);
- {
- while (stub_cnt) {
- (void)pthread_cond_timedwait(&conf->cond, &conf->mutex,
- &sleep_till);
- stub_cnt = GF_ATOMIC_GET(conf->stub_cnt);
+ if (stub_cnt) {
+ timespec_now_realtime(&sleep_till);
+ sleep_till.tv_sec += 1;
+ pthread_mutex_lock(&conf->mutex);
+ {
+ while (stub_cnt) {
+ (void)pthread_cond_timedwait(&conf->cond, &conf->mutex,
+ &sleep_till);
+ stub_cnt = GF_ATOMIC_GET(conf->stub_cnt);
+ }
}
+ pthread_mutex_unlock(&conf->mutex);
}
- pthread_mutex_unlock(&conf->mutex);
gf_log(this->name, GF_LOG_INFO,
"Notify GF_EVENT_PARENT_DOWN for brick %s", victim->name);
@@ -1560,3 +1573,18 @@ struct volume_options options[] = {
.key = {NULL},
},
};
+
+xlator_api_t xlator_api = {
+ .init = init,
+ .fini = fini,
+ .notify = notify,
+ .reconfigure = reconfigure,
+ .mem_acct_init = mem_acct_init,
+ .op_version = {1}, /* Present from the initial version */
+ .dumpops = &dumpops,
+ .fops = &fops,
+ .cbks = &cbks,
+ .options = options,
+ .identifier = "io-threads",
+ .category = GF_MAINTAINED,
+};