summaryrefslogtreecommitdiffstats
path: root/libglusterfs/src/syncop.c
diff options
context:
space:
mode:
authorXavi Hernandez <xhernandez@redhat.com>2020-04-30 11:19:01 +0200
committerRinku Kothiya <rkothiya@redhat.com>2020-05-30 05:17:12 +0000
commit67cd2b1ab5c9a8db637506b9a6be57fc79672c99 (patch)
tree87fef926b8766becdf5cd708ae580483ddf73927 /libglusterfs/src/syncop.c
parent49713a7ea1d30613a8810b85eca8ab35f6e9fa8a (diff)
syncop: improve scaling and implement more tools
The current scaling of the syncop thread pool is not working properly and can leave some tasks in the run queue more time than necessary when the maximum number of threads is not reached. This patch provides a better scaling condition to react faster to pending work. Condition variables and sleep in the context of a synctask have also been implemented. Their purpose is to replace regular condition variables and sleeps that block synctask threads and prevent other tasks to be executed. The new features have been applied to several places in glusterd. Change-Id: Ic50b7c73c104f9e41f08101a357d30b95efccfbf Fixes: #1116 Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
Diffstat (limited to 'libglusterfs/src/syncop.c')
-rw-r--r--libglusterfs/src/syncop.c306
1 files changed, 240 insertions, 66 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index 0de53c6f4cf..e20cf006493 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -154,10 +154,14 @@ out:
return ret;
}
+void *
+syncenv_processor(void *thdata);
+
static void
__run(struct synctask *task)
{
struct syncenv *env = NULL;
+ int32_t total, ret, i;
env = task->env;
@@ -173,7 +177,6 @@ __run(struct synctask *task)
env->runcount--;
break;
case SYNCTASK_WAIT:
- env->waitcount--;
break;
case SYNCTASK_DONE:
gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_COMPLETED_TASK,
@@ -187,8 +190,27 @@ __run(struct synctask *task)
}
list_add_tail(&task->all_tasks, &env->runq);
- env->runcount++;
task->state = SYNCTASK_RUN;
+
+ env->runcount++;
+
+ total = env->procs + env->runcount - env->procs_idle;
+ if (total > env->procmax) {
+ total = env->procmax;
+ }
+ if (total > env->procs) {
+ for (i = 0; i < env->procmax; i++) {
+ if (env->proc[i].env == NULL) {
+ env->proc[i].env = env;
+ ret = gf_thread_create(&env->proc[i].processor, NULL,
+ syncenv_processor, &env->proc[i],
+ "sproc%d", i);
+ if ((ret < 0) || (++env->procs >= total)) {
+ break;
+ }
+ }
+ }
+ }
}
static void
@@ -210,7 +232,6 @@ __wait(struct synctask *task)
gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_REWAITING_TASK,
"re-waiting already waiting "
"task");
- env->waitcount--;
break;
case SYNCTASK_DONE:
gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_COMPLETED_TASK,
@@ -223,12 +244,11 @@ __wait(struct synctask *task)
}
list_add_tail(&task->all_tasks, &env->waitq);
- env->waitcount++;
task->state = SYNCTASK_WAIT;
}
void
-synctask_yield(struct synctask *task)
+synctask_yield(struct synctask *task, struct timespec *delta)
{
xlator_t *oldTHIS = THIS;
@@ -237,6 +257,8 @@ synctask_yield(struct synctask *task)
task->proc->sched.uc_flags &= ~_UC_TLSBASE;
#endif
+ task->delta = delta;
+
if (task->state != SYNCTASK_DONE) {
task->state = SYNCTASK_SUSPEND;
}
@@ -249,6 +271,35 @@ synctask_yield(struct synctask *task)
}
void
+synctask_sleep(int32_t secs)
+{
+ struct timespec delta;
+ struct synctask *task;
+
+ task = synctask_get();
+
+ if (task == NULL) {
+ sleep(secs);
+ } else {
+ delta.tv_sec = secs;
+ delta.tv_nsec = 0;
+
+ synctask_yield(task, &delta);
+ }
+}
+
+static void
+__synctask_wake(struct synctask *task)
+{
+ task->woken = 1;
+
+ if (task->slept)
+ __run(task);
+
+ pthread_cond_broadcast(&task->env->cond);
+}
+
+void
synctask_wake(struct synctask *task)
{
struct syncenv *env = NULL;
@@ -257,13 +308,18 @@ synctask_wake(struct synctask *task)
pthread_mutex_lock(&env->mutex);
{
- task->woken = 1;
+ if (task->timer != NULL) {
+ if (gf_timer_call_cancel(task->xl->ctx, task->timer) != 0) {
+ goto unlock;
+ }
- if (task->slept)
- __run(task);
+ task->timer = NULL;
+ task->synccond = NULL;
+ }
- pthread_cond_broadcast(&env->cond);
+ __synctask_wake(task);
}
+unlock:
pthread_mutex_unlock(&env->mutex);
}
@@ -282,7 +338,7 @@ synctask_wrap(void)
task->state = SYNCTASK_DONE;
- synctask_yield(task);
+ synctask_yield(task, NULL);
}
void
@@ -422,11 +478,6 @@ synctask_create(struct syncenv *env, size_t stacksize, synctask_fn_t fn,
}
synctask_wake(newtask);
- /*
- * Make sure someone's there to execute anything we just put on the
- * run queue.
- */
- syncenv_scale(env);
return newtask;
err:
@@ -520,8 +571,12 @@ syncenv_task(struct syncproc *proc)
goto unlock;
}
+ env->procs_idle++;
+
sleep_till.tv_sec = time(NULL) + SYNCPROC_IDLE_TIME;
ret = pthread_cond_timedwait(&env->cond, &env->mutex, &sleep_till);
+
+ env->procs_idle--;
}
task = list_entry(env->runq.next, struct synctask, all_tasks);
@@ -540,6 +595,34 @@ unlock:
return task;
}
+static void
+synctask_timer(void *data)
+{
+ struct synctask *task = data;
+ struct synccond *cond;
+
+ cond = task->synccond;
+ if (cond != NULL) {
+ pthread_mutex_lock(&cond->pmutex);
+
+ list_del_init(&task->waitq);
+ task->synccond = NULL;
+
+ pthread_mutex_unlock(&cond->pmutex);
+
+ task->ret = -ETIMEDOUT;
+ }
+
+ pthread_mutex_lock(&task->env->mutex);
+
+ gf_timer_call_cancel(task->xl->ctx, task->timer);
+ task->timer = NULL;
+
+ __synctask_wake(task);
+
+ pthread_mutex_unlock(&task->env->mutex);
+}
+
void
synctask_switchto(struct synctask *task)
{
@@ -572,7 +655,14 @@ synctask_switchto(struct synctask *task)
} else {
task->slept = 1;
__wait(task);
+
+ if (task->delta != NULL) {
+ task->timer = gf_timer_call_after(task->xl->ctx, *task->delta,
+ synctask_timer, task);
+ }
}
+
+ task->delta = NULL;
}
pthread_mutex_unlock(&env->mutex);
}
@@ -580,65 +670,18 @@ synctask_switchto(struct synctask *task)
void *
syncenv_processor(void *thdata)
{
- struct syncenv *env = NULL;
struct syncproc *proc = NULL;
struct synctask *task = NULL;
proc = thdata;
- env = proc->env;
-
- for (;;) {
- task = syncenv_task(proc);
- if (!task)
- break;
+ while ((task = syncenv_task(proc)) != NULL) {
synctask_switchto(task);
-
- syncenv_scale(env);
}
return NULL;
}
-void
-syncenv_scale(struct syncenv *env)
-{
- int diff = 0;
- int scale = 0;
- int i = 0;
- int ret = 0;
-
- pthread_mutex_lock(&env->mutex);
- {
- if (env->procs > env->runcount)
- goto unlock;
-
- scale = env->runcount;
- if (scale > env->procmax)
- scale = env->procmax;
- if (scale > env->procs)
- diff = scale - env->procs;
- while (diff) {
- diff--;
- for (; (i < env->procmax); i++) {
- if (env->proc[i].processor == 0)
- break;
- }
-
- env->proc[i].env = env;
- ret = gf_thread_create(&env->proc[i].processor, NULL,
- syncenv_processor, &env->proc[i],
- "sproc%03hx", env->procs & 0x3ff);
- if (ret)
- break;
- env->procs++;
- i++;
- }
- }
-unlock:
- pthread_mutex_unlock(&env->mutex);
-}
-
/* The syncenv threads are cleaned up in this routine.
*/
void
@@ -715,12 +758,13 @@ syncenv_new(size_t stacksize, int procmin, int procmax)
newenv->stacksize = stacksize;
newenv->procmin = procmin;
newenv->procmax = procmax;
+ newenv->procs_idle = 0;
for (i = 0; i < newenv->procmin; i++) {
newenv->proc[i].env = newenv;
ret = gf_thread_create(&newenv->proc[i].processor, NULL,
syncenv_processor, &newenv->proc[i], "sproc%d",
- newenv->procs);
+ i);
if (ret)
break;
newenv->procs++;
@@ -810,7 +854,7 @@ __synclock_lock(struct synclock *lock)
task->woken = 0;
list_add_tail(&task->waitq, &lock->waitq);
pthread_mutex_unlock(&lock->guard);
- synctask_yield(task);
+ synctask_yield(task, NULL);
/* task is removed from waitq in unlock,
* under lock->guard.*/
pthread_mutex_lock(&lock->guard);
@@ -963,6 +1007,136 @@ synclock_unlock(synclock_t *lock)
return ret;
}
+/* Condition variables */
+
+int32_t
+synccond_init(synccond_t *cond)
+{
+ int32_t ret;
+
+ INIT_LIST_HEAD(&cond->waitq);
+
+ ret = pthread_mutex_init(&cond->pmutex, NULL);
+ if (ret != 0) {
+ return -ret;
+ }
+
+ ret = pthread_cond_init(&cond->pcond, NULL);
+ if (ret != 0) {
+ pthread_mutex_destroy(&cond->pmutex);
+ }
+
+ return -ret;
+}
+
+void
+synccond_destroy(synccond_t *cond)
+{
+ pthread_cond_destroy(&cond->pcond);
+ pthread_mutex_destroy(&cond->pmutex);
+}
+
+int
+synccond_timedwait(synccond_t *cond, synclock_t *lock, struct timespec *delta)
+{
+ struct timespec now;
+ struct synctask *task = NULL;
+ int ret;
+
+ task = synctask_get();
+
+ if (task == NULL) {
+ if (delta != NULL) {
+ timespec_now_realtime(&now);
+ timespec_adjust_delta(&now, *delta);
+ }
+
+ pthread_mutex_lock(&cond->pmutex);
+
+ if (delta == NULL) {
+ ret = -pthread_cond_wait(&cond->pcond, &cond->pmutex);
+ } else {
+ ret = -pthread_cond_timedwait(&cond->pcond, &cond->pmutex, &now);
+ }
+ } else {
+ pthread_mutex_lock(&cond->pmutex);
+
+ list_add_tail(&task->waitq, &cond->waitq);
+ task->synccond = cond;
+
+ ret = synclock_unlock(lock);
+ if (ret == 0) {
+ pthread_mutex_unlock(&cond->pmutex);
+
+ synctask_yield(task, delta);
+
+ ret = synclock_lock(lock);
+ if (ret == 0) {
+ ret = task->ret;
+ }
+ task->ret = 0;
+
+ return ret;
+ }
+
+ list_del_init(&task->waitq);
+ }
+
+ pthread_mutex_unlock(&cond->pmutex);
+
+ return ret;
+}
+
+int
+synccond_wait(synccond_t *cond, synclock_t *lock)
+{
+ return synccond_timedwait(cond, lock, NULL);
+}
+
+void
+synccond_signal(synccond_t *cond)
+{
+ struct synctask *task;
+
+ pthread_mutex_lock(&cond->pmutex);
+
+ if (!list_empty(&cond->waitq)) {
+ task = list_first_entry(&cond->waitq, struct synctask, waitq);
+ list_del_init(&task->waitq);
+
+ pthread_mutex_unlock(&cond->pmutex);
+
+ synctask_wake(task);
+ } else {
+ pthread_cond_signal(&cond->pcond);
+
+ pthread_mutex_unlock(&cond->pmutex);
+ }
+}
+
+void
+synccond_broadcast(synccond_t *cond)
+{
+ struct list_head list;
+ struct synctask *task;
+
+ INIT_LIST_HEAD(&list);
+
+ pthread_mutex_lock(&cond->pmutex);
+
+ list_splice_init(&cond->waitq, &list);
+ pthread_cond_broadcast(&cond->pcond);
+
+ pthread_mutex_unlock(&cond->pmutex);
+
+ while (!list_empty(&list)) {
+ task = list_first_entry(&list, struct synctask, waitq);
+ list_del_init(&task->waitq);
+
+ synctask_wake(task);
+ }
+}
+
/* Barriers */
int
@@ -1032,7 +1206,7 @@ __syncbarrier_wait(struct syncbarrier *barrier, int waitfor)
/* called within a synctask */
list_add_tail(&task->waitq, &barrier->waitq);
pthread_mutex_unlock(&barrier->guard);
- synctask_yield(task);
+ synctask_yield(task, NULL);
pthread_mutex_lock(&barrier->guard);
} else {
/* called by a non-synctask */