diff options
Diffstat (limited to 'xlators/performance/io-threads/src/io-threads.c')
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 158 |
1 files changed, 93 insertions, 65 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 80d42ded5eb..3d24cc97f4b 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -8,20 +8,20 @@ cases as published by the Free Software Foundation. */ -#include "call-stub.h" -#include "defaults.h" -#include "glusterfs.h" -#include "logging.h" -#include "dict.h" -#include "xlator.h" +#include <glusterfs/call-stub.h> +#include <glusterfs/defaults.h> +#include <glusterfs/glusterfs.h> +#include <glusterfs/logging.h> +#include <glusterfs/dict.h> +#include <glusterfs/xlator.h> #include "io-threads.h" #include <signal.h> #include <stdlib.h> #include <sys/time.h> #include <time.h> -#include "locking.h" +#include <glusterfs/locking.h> #include "io-threads-messages.h" -#include "timespec.h" +#include <glusterfs/timespec.h> void * iot_worker(void *arg); @@ -61,7 +61,7 @@ iot_get_ctx(xlator_t *this, client_t *client) int i; if (client_ctx_get(client, this, (void **)&ctx) != 0) { - ctx = GF_CALLOC(GF_FOP_PRI_MAX, sizeof(*ctx), gf_iot_mt_client_ctx_t); + ctx = GF_MALLOC(GF_FOP_PRI_MAX * sizeof(*ctx), gf_iot_mt_client_ctx_t); if (ctx) { for (i = 0; i < GF_FOP_PRI_MAX; ++i) { INIT_LIST_HEAD(&ctx[i].clients); @@ -275,7 +275,7 @@ iot_get_pri_meaning(gf_fop_pri_t pri) name = "slow"; break; case GF_FOP_PRI_LEAST: - name = "least priority"; + name = "least"; break; case GF_FOP_PRI_MAX: name = "invalid"; @@ -294,7 +294,9 @@ iot_schedule(call_frame_t *frame, xlator_t *this, call_stub_t *stub) gf_fop_pri_t pri = GF_FOP_PRI_MAX - 1; iot_conf_t *conf = this->private; - if ((frame->root->pid < GF_CLIENT_PID_MAX) && conf->least_priority) { + if ((frame->root->pid < GF_CLIENT_PID_MAX) && + (frame->root->pid != GF_CLIENT_PID_NO_ROOT_SQUASH) && + conf->least_priority) { pri = GF_FOP_PRI_LEAST; goto out; } @@ -370,7 +372,7 @@ iot_schedule(call_frame_t *frame, xlator_t *this, call_stub_t *stub) return -EINVAL; } out: - gf_msg_debug(this->name, 0, "%s scheduled as %s fop", + gf_msg_debug(this->name, 0, "%s scheduled as %s priority fop", gf_fop_list[stub->fop], iot_get_pri_meaning(pri)); if (this->private) ret = do_iot_schedule(this->private, stub, pri); @@ -612,7 +614,7 @@ iot_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name, conf = this->private; - if (conf && name && strcmp(name, IO_THREADS_QUEUE_SIZE_KEY) == 0) { + if (name && strcmp(name, IO_THREADS_QUEUE_SIZE_KEY) == 0) { /* * We explicitly do not want a reference count * for this dict in this translator @@ -813,9 +815,6 @@ __iot_workers_scale(iot_conf_t *conf) pthread_t thread; int ret = 0; int i = 0; - char thread_name[GF_THREAD_NAMEMAX] = { - 0, - }; for (i = 0; i < GF_FOP_PRI_MAX; i++) scale += min(conf->queue_sizes[i], conf->ac_iot_limit[i]); @@ -833,11 +832,10 @@ __iot_workers_scale(iot_conf_t *conf) while (diff) { diff--; - snprintf(thread_name, sizeof(thread_name), "iotwr%03hx", - (conf->curr_count & 0x3ff)); ret = gf_thread_create(&thread, &conf->w_attr, iot_worker, conf, - thread_name); + "iotwr%03hx", conf->curr_count & 0x3ff); if (ret == 0) { + pthread_detach(thread); conf->curr_count++; gf_msg_debug(conf->this->name, 0, "scaled threads to %d (queue_size=%d/%d)", @@ -881,8 +879,8 @@ set_stack_size(iot_conf_t *conf) err = pthread_attr_init(&conf->w_attr); if (err != 0) { - gf_msg(this->name, GF_LOG_ERROR, err, IO_THREADS_MSG_INIT_FAILED, - "Thread attribute initialization failed"); + gf_smsg(this->name, GF_LOG_ERROR, err, IO_THREADS_MSG_INIT_FAILED, + NULL); return err; } @@ -890,11 +888,11 @@ set_stack_size(iot_conf_t *conf) if (err == EINVAL) { err = pthread_attr_getstacksize(&conf->w_attr, &stacksize); if (!err) { - gf_msg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET, - "Using default thread stack size %zd", stacksize); + gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET, + "size=%zd", stacksize, NULL); } else { - gf_msg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET, - "Using default thread stack size"); + gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_SIZE_NOT_SET, + NULL); err = 0; } } @@ -914,8 +912,8 @@ mem_acct_init(xlator_t *this) ret = xlator_mem_acct_init(this, gf_iot_mt_end + 1); if (ret != 0) { - gf_msg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_NO_MEMORY, - "Memory accounting init failed"); + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_NO_MEMORY, + NULL); return ret; } @@ -927,6 +925,8 @@ iot_priv_dump(xlator_t *this) { iot_conf_t *conf = NULL; char key_prefix[GF_DUMP_MAX_BUF_LEN]; + char key[GF_DUMP_MAX_BUF_LEN]; + int i = 0; if (!this) return 0; @@ -944,14 +944,29 @@ iot_priv_dump(xlator_t *this) gf_proc_dump_write("sleep_count", "%d", conf->sleep_count); gf_proc_dump_write("idle_time", "%d", conf->idle_time); gf_proc_dump_write("stack_size", "%zd", conf->stack_size); - gf_proc_dump_write("high_priority_threads", "%d", + gf_proc_dump_write("max_high_priority_threads", "%d", conf->ac_iot_limit[GF_FOP_PRI_HI]); - gf_proc_dump_write("normal_priority_threads", "%d", + gf_proc_dump_write("max_normal_priority_threads", "%d", conf->ac_iot_limit[GF_FOP_PRI_NORMAL]); - gf_proc_dump_write("low_priority_threads", "%d", + gf_proc_dump_write("max_low_priority_threads", "%d", conf->ac_iot_limit[GF_FOP_PRI_LO]); - gf_proc_dump_write("least_priority_threads", "%d", + gf_proc_dump_write("max_least_priority_threads", "%d", conf->ac_iot_limit[GF_FOP_PRI_LEAST]); + gf_proc_dump_write("current_high_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_HI]); + gf_proc_dump_write("current_normal_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_NORMAL]); + gf_proc_dump_write("current_low_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_LO]); + gf_proc_dump_write("current_least_priority_threads", "%d", + conf->ac_iot_count[GF_FOP_PRI_LEAST]); + for (i = 0; i < GF_FOP_PRI_MAX; i++) { + if (!conf->queue_sizes[i]) + continue; + snprintf(key, sizeof(key), "%s_priority_queue_length", + iot_get_pri_meaning(i)); + gf_proc_dump_write(key, "%d", conf->queue_sizes[i]); + } return 0; } @@ -987,8 +1002,8 @@ iot_priv_dump(xlator_t *this) */ typedef struct { - uint32_t value; time_t update_time; + uint32_t value; } threshold_t; /* * Variables so that I can hack these for testing. @@ -1001,16 +1016,13 @@ static uint32_t THRESH_LIMIT = 1209600; /* SECONDS * (EVENTS-1) */ static void iot_apply_event(xlator_t *this, threshold_t *thresh) { - struct timespec now; - time_t delta; + time_t delta, now = gf_time(); /* Refresh for manual testing/debugging. It's cheap. */ THRESH_LIMIT = THRESH_SECONDS * (THRESH_EVENTS - 1); - timespec_now(&now); - if (thresh->value && thresh->update_time) { - delta = now.tv_sec - thresh->update_time; + delta = now - thresh->update_time; /* Be careful about underflow. */ if (thresh->value <= delta) { thresh->value = 0; @@ -1031,7 +1043,7 @@ iot_apply_event(xlator_t *this, threshold_t *thresh) kill(getpid(), SIGTRAP); } - thresh->update_time = now.tv_sec; + thresh->update_time = now; } static void * @@ -1171,35 +1183,33 @@ init(xlator_t *this) int i = 0; if (!this->children || this->children->next) { - gf_msg("io-threads", GF_LOG_ERROR, 0, - IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED, - "FATAL: iot not configured " - "with exactly one child"); + gf_smsg("io-threads", GF_LOG_ERROR, 0, + IO_THREADS_MSG_XLATOR_CHILD_MISCONFIGURED, NULL); goto out; } if (!this->parents) { - gf_msg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_VOL_MISCONFIGURED, - "dangling volume. check volfile "); + gf_smsg(this->name, GF_LOG_WARNING, 0, IO_THREADS_MSG_VOL_MISCONFIGURED, + NULL); } conf = (void *)GF_CALLOC(1, sizeof(*conf), gf_iot_mt_iot_conf_t); if (conf == NULL) { - gf_msg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_NO_MEMORY, - "out of memory"); + gf_smsg(this->name, GF_LOG_ERROR, ENOMEM, IO_THREADS_MSG_OUT_OF_MEMORY, + NULL); goto out; } if ((ret = pthread_cond_init(&conf->cond, NULL)) != 0) { - gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED, - "pthread_cond_init failed (%d)", ret); + gf_smsg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_PTHREAD_INIT_FAILED, + "pthread_cond_init ret=%d", ret, NULL); goto out; } conf->cond_inited = _gf_true; if ((ret = pthread_mutex_init(&conf->mutex, NULL)) != 0) { - gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED, - "pthread_mutex_init failed (%d)", ret); + gf_smsg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_PTHREAD_INIT_FAILED, + "pthread_mutex_init ret=%d", ret, NULL); goto out; } conf->mutex_inited = _gf_true; @@ -1243,12 +1253,14 @@ init(xlator_t *this) INIT_LIST_HEAD(&conf->no_client[i].reqs); } - ret = iot_workers_scale(conf); + if (!this->pass_through) { + ret = iot_workers_scale(conf); - if (ret == -1) { - gf_msg(this->name, GF_LOG_ERROR, 0, IO_THREADS_MSG_INIT_FAILED, - "cannot initialize worker threads, exiting init"); - goto out; + if (ret == -1) { + gf_smsg(this->name, GF_LOG_ERROR, 0, + IO_THREADS_MSG_WORKER_THREAD_INIT_FAILED, NULL); + goto out; + } } this->private = conf; @@ -1293,20 +1305,21 @@ notify(xlator_t *this, int32_t event, void *data, ...) if (GF_EVENT_PARENT_DOWN == event) { if (victim->cleanup_starting) { - clock_gettime(CLOCK_REALTIME, &sleep_till); - sleep_till.tv_sec += 1; /* Wait for draining stub from queue before notify PARENT_DOWN */ stub_cnt = GF_ATOMIC_GET(conf->stub_cnt); - - pthread_mutex_lock(&conf->mutex); - { - while (stub_cnt) { - (void)pthread_cond_timedwait(&conf->cond, &conf->mutex, - &sleep_till); - stub_cnt = GF_ATOMIC_GET(conf->stub_cnt); + if (stub_cnt) { + timespec_now_realtime(&sleep_till); + sleep_till.tv_sec += 1; + pthread_mutex_lock(&conf->mutex); + { + while (stub_cnt) { + (void)pthread_cond_timedwait(&conf->cond, &conf->mutex, + &sleep_till); + stub_cnt = GF_ATOMIC_GET(conf->stub_cnt); + } } + pthread_mutex_unlock(&conf->mutex); } - pthread_mutex_unlock(&conf->mutex); gf_log(this->name, GF_LOG_INFO, "Notify GF_EVENT_PARENT_DOWN for brick %s", victim->name); @@ -1560,3 +1573,18 @@ struct volume_options options[] = { .key = {NULL}, }, }; + +xlator_api_t xlator_api = { + .init = init, + .fini = fini, + .notify = notify, + .reconfigure = reconfigure, + .mem_acct_init = mem_acct_init, + .op_version = {1}, /* Present from the initial version */ + .dumpops = &dumpops, + .fops = &fops, + .cbks = &cbks, + .options = options, + .identifier = "io-threads", + .category = GF_MAINTAINED, +}; |
