diff options
Diffstat (limited to 'libglusterfs/src/syncop.c')
| -rw-r--r-- | libglusterfs/src/syncop.c | 572 |
1 files changed, 372 insertions, 200 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index edf978d797d..df20cec559f 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -8,8 +8,12 @@ cases as published by the Free Software Foundation. */ -#include "syncop.h" -#include "libglusterfs-messages.h" +#include "glusterfs/syncop.h" +#include "glusterfs/libglusterfs-messages.h" + +#ifdef HAVE_TSAN_API +#include <sanitizer/tsan_interface.h> +#endif int syncopctx_setfsuid(void *uid) @@ -26,28 +30,10 @@ syncopctx_setfsuid(void *uid) opctx = syncopctx_getctx(); - /* alloc for this thread the first time */ - if (!opctx) { - opctx = GF_CALLOC(1, sizeof(*opctx), gf_common_mt_syncopctx); - if (!opctx) { - ret = -1; - goto out; - } - - ret = syncopctx_setctx(opctx); - if (ret != 0) { - GF_FREE(opctx); - opctx = NULL; - goto out; - } - } + opctx->uid = *(uid_t *)uid; + opctx->valid |= SYNCOPCTX_UID; out: - if (opctx && uid) { - opctx->uid = *(uid_t *)uid; - opctx->valid |= SYNCOPCTX_UID; - } - return ret; } @@ -66,28 +52,10 @@ syncopctx_setfsgid(void *gid) opctx = syncopctx_getctx(); - /* alloc for this thread the first time */ - if (!opctx) { - opctx = GF_CALLOC(1, sizeof(*opctx), gf_common_mt_syncopctx); - if (!opctx) { - ret = -1; - goto out; - } - - ret = syncopctx_setctx(opctx); - if (ret != 0) { - GF_FREE(opctx); - opctx = NULL; - goto out; - } - } + opctx->gid = *(gid_t *)gid; + opctx->valid |= SYNCOPCTX_GID; out: - if (opctx && gid) { - opctx->gid = *(gid_t *)gid; - opctx->valid |= SYNCOPCTX_GID; - } - return ret; } @@ -107,43 +75,20 @@ syncopctx_setfsgroups(int count, const void *groups) opctx = syncopctx_getctx(); - /* alloc for this thread the first time */ - if (!opctx) { - opctx = GF_CALLOC(1, sizeof(*opctx), gf_common_mt_syncopctx); - if (!opctx) { - ret = -1; - goto out; - } - - ret = syncopctx_setctx(opctx); - if (ret != 0) { - GF_FREE(opctx); - opctx = NULL; - goto out; - } - } - /* resize internal groups as required */ if (count && opctx->grpsize < count) { if (opctx->groups) { - tmpgroups = GF_REALLOC(opctx->groups, (sizeof(gid_t) * count)); - /* NOTE: Not really required to zero the reallocation, - * as ngrps controls the validity of data, - * making a note irrespective */ - if (tmpgroups == NULL) { - opctx->grpsize = 0; - GF_FREE(opctx->groups); - opctx->groups = NULL; - ret = -1; - goto out; - } - } else { - tmpgroups = GF_CALLOC(count, sizeof(gid_t), gf_common_mt_syncopctx); - if (tmpgroups == NULL) { - opctx->grpsize = 0; - ret = -1; - goto out; - } + /* Group list will be updated later, so no need to keep current + * data and waste time copying it. It's better to free the current + * allocation and then allocate a fresh new memory block. */ + GF_FREE(opctx->groups); + opctx->groups = NULL; + opctx->grpsize = 0; + } + tmpgroups = GF_MALLOC(count * sizeof(gid_t), gf_common_mt_syncopctx); + if (tmpgroups == NULL) { + ret = -1; + goto out; } opctx->groups = tmpgroups; @@ -156,6 +101,13 @@ syncopctx_setfsgroups(int count, const void *groups) /* set/reset the ngrps, this is where reset of groups is handled */ opctx->ngrps = count; + + if ((opctx->valid & SYNCOPCTX_GROUPS) == 0) { + /* This is the first time we are storing groups into the TLS structure + * so we mark the current thread so that it will be properly cleaned + * up when the thread terminates. */ + gf_thread_needs_cleanup(); + } opctx->valid |= SYNCOPCTX_GROUPS; out: @@ -177,28 +129,10 @@ syncopctx_setfspid(void *pid) opctx = syncopctx_getctx(); - /* alloc for this thread the first time */ - if (!opctx) { - opctx = GF_CALLOC(1, sizeof(*opctx), gf_common_mt_syncopctx); - if (!opctx) { - ret = -1; - goto out; - } - - ret = syncopctx_setctx(opctx); - if (ret != 0) { - GF_FREE(opctx); - opctx = NULL; - goto out; - } - } + opctx->pid = *(pid_t *)pid; + opctx->valid |= SYNCOPCTX_PID; out: - if (opctx && pid) { - opctx->pid = *(pid_t *)pid; - opctx->valid |= SYNCOPCTX_PID; - } - return ret; } @@ -217,35 +151,21 @@ syncopctx_setfslkowner(gf_lkowner_t *lk_owner) opctx = syncopctx_getctx(); - /* alloc for this thread the first time */ - if (!opctx) { - opctx = GF_CALLOC(1, sizeof(*opctx), gf_common_mt_syncopctx); - if (!opctx) { - ret = -1; - goto out; - } - - ret = syncopctx_setctx(opctx); - if (ret != 0) { - GF_FREE(opctx); - opctx = NULL; - goto out; - } - } + opctx->lk_owner = *lk_owner; + opctx->valid |= SYNCOPCTX_LKOWNER; out: - if (opctx && lk_owner) { - opctx->lk_owner = *lk_owner; - opctx->valid |= SYNCOPCTX_LKOWNER; - } - return ret; } +void * +syncenv_processor(void *thdata); + static void __run(struct synctask *task) { struct syncenv *env = NULL; + int32_t total, ret, i; env = task->env; @@ -261,7 +181,6 @@ __run(struct synctask *task) env->runcount--; break; case SYNCTASK_WAIT: - env->waitcount--; break; case SYNCTASK_DONE: gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_COMPLETED_TASK, @@ -275,8 +194,27 @@ __run(struct synctask *task) } list_add_tail(&task->all_tasks, &env->runq); - env->runcount++; task->state = SYNCTASK_RUN; + + env->runcount++; + + total = env->procs + env->runcount - env->procs_idle; + if (total > env->procmax) { + total = env->procmax; + } + if (total > env->procs) { + for (i = 0; i < env->procmax; i++) { + if (env->proc[i].env == NULL) { + env->proc[i].env = env; + ret = gf_thread_create(&env->proc[i].processor, NULL, + syncenv_processor, &env->proc[i], + "sproc%d", i); + if ((ret < 0) || (++env->procs >= total)) { + break; + } + } + } + } } static void @@ -298,7 +236,6 @@ __wait(struct synctask *task) gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_REWAITING_TASK, "re-waiting already waiting " "task"); - env->waitcount--; break; case SYNCTASK_DONE: gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_COMPLETED_TASK, @@ -311,12 +248,11 @@ __wait(struct synctask *task) } list_add_tail(&task->all_tasks, &env->waitq); - env->waitcount++; task->state = SYNCTASK_WAIT; } void -synctask_yield(struct synctask *task) +synctask_yield(struct synctask *task, struct timespec *delta) { xlator_t *oldTHIS = THIS; @@ -325,10 +261,16 @@ synctask_yield(struct synctask *task) task->proc->sched.uc_flags &= ~_UC_TLSBASE; #endif + task->delta = delta; + if (task->state != SYNCTASK_DONE) { task->state = SYNCTASK_SUSPEND; - (void)gf_backtrace_save(task->btbuf); } + +#ifdef HAVE_TSAN_API + __tsan_switch_to_fiber(task->proc->tsan.fiber, 0); +#endif + if (swapcontext(&task->ctx, &task->proc->sched) < 0) { gf_msg("syncop", GF_LOG_ERROR, errno, LG_MSG_SWAPCONTEXT_FAILED, "swapcontext failed"); @@ -338,6 +280,35 @@ synctask_yield(struct synctask *task) } void +synctask_sleep(int32_t secs) +{ + struct timespec delta; + struct synctask *task; + + task = synctask_get(); + + if (task == NULL) { + sleep(secs); + } else { + delta.tv_sec = secs; + delta.tv_nsec = 0; + + synctask_yield(task, &delta); + } +} + +static void +__synctask_wake(struct synctask *task) +{ + task->woken = 1; + + if (task->slept) + __run(task); + + pthread_cond_broadcast(&task->env->cond); +} + +void synctask_wake(struct synctask *task) { struct syncenv *env = NULL; @@ -346,13 +317,18 @@ synctask_wake(struct synctask *task) pthread_mutex_lock(&env->mutex); { - task->woken = 1; + if (task->timer != NULL) { + if (gf_timer_call_cancel(task->xl->ctx, task->timer) != 0) { + goto unlock; + } - if (task->slept) - __run(task); + task->timer = NULL; + task->synccond = NULL; + } - pthread_cond_broadcast(&env->cond); + __synctask_wake(task); } +unlock: pthread_mutex_unlock(&env->mutex); } @@ -371,7 +347,7 @@ synctask_wrap(void) task->state = SYNCTASK_DONE; - synctask_yield(task); + synctask_yield(task, NULL); } void @@ -390,6 +366,10 @@ synctask_destroy(struct synctask *task) pthread_cond_destroy(&task->cond); } +#ifdef HAVE_TSAN_API + __tsan_destroy_fiber(task->tsan.fiber); +#endif + GF_FREE(task); } @@ -500,6 +480,13 @@ synctask_create(struct syncenv *env, size_t stacksize, synctask_fn_t fn, makecontext(&newtask->ctx, (void (*)(void))synctask_wrap, 0); +#ifdef HAVE_TSAN_API + newtask->tsan.fiber = __tsan_create_fiber(0); + snprintf(newtask->tsan.name, TSAN_THREAD_NAMELEN, "<synctask of %s>", + this->name); + __tsan_set_fiber_name(newtask->tsan.fiber, newtask->tsan.name); +#endif + newtask->state = SYNCTASK_INIT; newtask->slept = 1; @@ -511,11 +498,6 @@ synctask_create(struct syncenv *env, size_t stacksize, synctask_fn_t fn, } synctask_wake(newtask); - /* - * Make sure someone's there to execute anything we just put on the - * run queue. - */ - syncenv_scale(env); return newtask; err: @@ -609,8 +591,12 @@ syncenv_task(struct syncproc *proc) goto unlock; } - sleep_till.tv_sec = time(NULL) + SYNCPROC_IDLE_TIME; + env->procs_idle++; + + sleep_till.tv_sec = gf_time() + SYNCPROC_IDLE_TIME; ret = pthread_cond_timedwait(&env->cond, &env->mutex, &sleep_till); + + env->procs_idle--; } task = list_entry(env->runq.next, struct synctask, all_tasks); @@ -629,6 +615,34 @@ unlock: return task; } +static void +synctask_timer(void *data) +{ + struct synctask *task = data; + struct synccond *cond; + + cond = task->synccond; + if (cond != NULL) { + pthread_mutex_lock(&cond->pmutex); + + list_del_init(&task->waitq); + task->synccond = NULL; + + pthread_mutex_unlock(&cond->pmutex); + + task->ret = -ETIMEDOUT; + } + + pthread_mutex_lock(&task->env->mutex); + + gf_timer_call_cancel(task->xl->ctx, task->timer); + task->timer = NULL; + + __synctask_wake(task); + + pthread_mutex_unlock(&task->env->mutex); +} + void synctask_switchto(struct synctask *task) { @@ -644,6 +658,10 @@ synctask_switchto(struct synctask *task) task->ctx.uc_flags &= ~_UC_TLSBASE; #endif +#ifdef HAVE_TSAN_API + __tsan_switch_to_fiber(task->tsan.fiber, 0); +#endif + if (swapcontext(&task->proc->sched, &task->ctx) < 0) { gf_msg("syncop", GF_LOG_ERROR, errno, LG_MSG_SWAPCONTEXT_FAILED, "swapcontext failed"); @@ -661,7 +679,14 @@ synctask_switchto(struct synctask *task) } else { task->slept = 1; __wait(task); + + if (task->delta != NULL) { + task->timer = gf_timer_call_after(task->xl->ctx, *task->delta, + synctask_timer, task); + } } + + task->delta = NULL; } pthread_mutex_unlock(&env->mutex); } @@ -669,68 +694,27 @@ synctask_switchto(struct synctask *task) void * syncenv_processor(void *thdata) { - struct syncenv *env = NULL; struct syncproc *proc = NULL; struct synctask *task = NULL; proc = thdata; - env = proc->env; - for (;;) { - task = syncenv_task(proc); - if (!task) - break; +#ifdef HAVE_TSAN_API + proc->tsan.fiber = __tsan_create_fiber(0); + snprintf(proc->tsan.name, TSAN_THREAD_NAMELEN, "<sched of syncenv@%p>", + proc); + __tsan_set_fiber_name(proc->tsan.fiber, proc->tsan.name); +#endif + while ((task = syncenv_task(proc)) != NULL) { synctask_switchto(task); - - syncenv_scale(env); } - return NULL; -} - -void -syncenv_scale(struct syncenv *env) -{ - int diff = 0; - int scale = 0; - int i = 0; - int ret = 0; - char thread_name[GF_THREAD_NAMEMAX] = { - 0, - }; - - pthread_mutex_lock(&env->mutex); - { - if (env->procs > env->runcount) - goto unlock; - - scale = env->runcount; - if (scale > env->procmax) - scale = env->procmax; - if (scale > env->procs) - diff = scale - env->procs; - while (diff) { - diff--; - for (; (i < env->procmax); i++) { - if (env->proc[i].processor == 0) - break; - } +#ifdef HAVE_TSAN_API + __tsan_destroy_fiber(proc->tsan.fiber); +#endif - env->proc[i].env = env; - snprintf(thread_name, sizeof(thread_name), "sproc%03hx", - (env->procs & 0x3ff)); - ret = gf_thread_create(&env->proc[i].processor, NULL, - syncenv_processor, &env->proc[i], - thread_name); - if (ret) - break; - env->procs++; - i++; - } - } -unlock: - pthread_mutex_unlock(&env->mutex); + return NULL; } /* The syncenv threads are cleaned up in this routine. @@ -784,9 +768,6 @@ syncenv_new(size_t stacksize, int procmin, int procmax) struct syncenv *newenv = NULL; int ret = 0; int i = 0; - char thread_name[GF_THREAD_NAMEMAX] = { - 0, - }; if (!procmin || procmin < 0) procmin = SYNCENV_PROC_MIN; @@ -812,14 +793,13 @@ syncenv_new(size_t stacksize, int procmin, int procmax) newenv->stacksize = stacksize; newenv->procmin = procmin; newenv->procmax = procmax; + newenv->procs_idle = 0; for (i = 0; i < newenv->procmin; i++) { newenv->proc[i].env = newenv; - snprintf(thread_name, sizeof(thread_name), "%s%d", "sproc", - (newenv->procs)); ret = gf_thread_create(&newenv->proc[i].processor, NULL, - syncenv_processor, &newenv->proc[i], - thread_name); + syncenv_processor, &newenv->proc[i], "sproc%d", + i); if (ret) break; newenv->procs++; @@ -909,7 +889,7 @@ __synclock_lock(struct synclock *lock) task->woken = 0; list_add_tail(&task->waitq, &lock->waitq); pthread_mutex_unlock(&lock->guard); - synctask_yield(task); + synctask_yield(task, NULL); /* task is removed from waitq in unlock, * under lock->guard.*/ pthread_mutex_lock(&lock->guard); @@ -1062,6 +1042,136 @@ synclock_unlock(synclock_t *lock) return ret; } +/* Condition variables */ + +int32_t +synccond_init(synccond_t *cond) +{ + int32_t ret; + + INIT_LIST_HEAD(&cond->waitq); + + ret = pthread_mutex_init(&cond->pmutex, NULL); + if (ret != 0) { + return -ret; + } + + ret = pthread_cond_init(&cond->pcond, NULL); + if (ret != 0) { + pthread_mutex_destroy(&cond->pmutex); + } + + return -ret; +} + +void +synccond_destroy(synccond_t *cond) +{ + pthread_cond_destroy(&cond->pcond); + pthread_mutex_destroy(&cond->pmutex); +} + +int +synccond_timedwait(synccond_t *cond, synclock_t *lock, struct timespec *delta) +{ + struct timespec now; + struct synctask *task = NULL; + int ret; + + task = synctask_get(); + + if (task == NULL) { + if (delta != NULL) { + timespec_now_realtime(&now); + timespec_adjust_delta(&now, *delta); + } + + pthread_mutex_lock(&cond->pmutex); + + if (delta == NULL) { + ret = -pthread_cond_wait(&cond->pcond, &cond->pmutex); + } else { + ret = -pthread_cond_timedwait(&cond->pcond, &cond->pmutex, &now); + } + } else { + pthread_mutex_lock(&cond->pmutex); + + list_add_tail(&task->waitq, &cond->waitq); + task->synccond = cond; + + ret = synclock_unlock(lock); + if (ret == 0) { + pthread_mutex_unlock(&cond->pmutex); + + synctask_yield(task, delta); + + ret = synclock_lock(lock); + if (ret == 0) { + ret = task->ret; + } + task->ret = 0; + + return ret; + } + + list_del_init(&task->waitq); + } + + pthread_mutex_unlock(&cond->pmutex); + + return ret; +} + +int +synccond_wait(synccond_t *cond, synclock_t *lock) +{ + return synccond_timedwait(cond, lock, NULL); +} + +void +synccond_signal(synccond_t *cond) +{ + struct synctask *task; + + pthread_mutex_lock(&cond->pmutex); + + if (!list_empty(&cond->waitq)) { + task = list_first_entry(&cond->waitq, struct synctask, waitq); + list_del_init(&task->waitq); + + pthread_mutex_unlock(&cond->pmutex); + + synctask_wake(task); + } else { + pthread_cond_signal(&cond->pcond); + + pthread_mutex_unlock(&cond->pmutex); + } +} + +void +synccond_broadcast(synccond_t *cond) +{ + struct list_head list; + struct synctask *task; + + INIT_LIST_HEAD(&list); + + pthread_mutex_lock(&cond->pmutex); + + list_splice_init(&cond->waitq, &list); + pthread_cond_broadcast(&cond->pcond); + + pthread_mutex_unlock(&cond->pmutex); + + while (!list_empty(&list)) { + task = list_first_entry(&list, struct synctask, waitq); + list_del_init(&task->waitq); + + synctask_wake(task); + } +} + /* Barriers */ int @@ -1131,7 +1241,7 @@ __syncbarrier_wait(struct syncbarrier *barrier, int waitfor) /* called within a synctask */ list_add_tail(&task->waitq, &barrier->waitq); pthread_mutex_unlock(&barrier->guard); - synctask_yield(task); + synctask_yield(task, NULL); pthread_mutex_lock(&barrier->guard); } else { /* called by a non-synctask */ @@ -2980,12 +3090,13 @@ syncop_seek(xlator_t *subvol, fd_t *fd, off_t offset, gf_seek_what_t what, SYNCOP(subvol, (&args), syncop_seek_cbk, subvol->fops->seek, fd, offset, what, xdata_in); - if (*off) - *off = args.offset; - - if (args.op_ret == -1) + if (args.op_ret < 0) { return -args.op_errno; - return args.op_ret; + } else { + if (off) + *off = args.offset; + return args.op_ret; + } } int @@ -3397,4 +3508,65 @@ syncop_namelink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, __wake(args); return 0; -}
\ No newline at end of file +} + +int +syncop_copy_file_range(xlator_t *subvol, fd_t *fd_in, off64_t off_in, + fd_t *fd_out, off64_t off_out, size_t len, + uint32_t flags, struct iatt *stbuf, + struct iatt *preiatt_dst, struct iatt *postiatt_dst, + dict_t *xdata_in, dict_t **xdata_out) +{ + struct syncargs args = { + 0, + }; + + SYNCOP(subvol, (&args), syncop_copy_file_range_cbk, + subvol->fops->copy_file_range, fd_in, off_in, fd_out, off_out, len, + flags, xdata_in); + + if (stbuf) { + *stbuf = args.iatt1; + } + if (preiatt_dst) { + *preiatt_dst = args.iatt2; + } + if (postiatt_dst) { + *postiatt_dst = args.iatt3; + } + + if (xdata_out) { + *xdata_out = args.xdata; + } else if (args.xdata) { + dict_unref(args.xdata); + } + + errno = args.op_errno; + return args.op_ret; +} + +int +syncop_copy_file_range_cbk(call_frame_t *frame, void *cookie, xlator_t *this, + int op_ret, int op_errno, struct iatt *stbuf, + struct iatt *prebuf_dst, struct iatt *postbuf_dst, + dict_t *xdata) +{ + struct syncargs *args = NULL; + + args = cookie; + + args->op_ret = op_ret; + args->op_errno = op_errno; + if (xdata) + args->xdata = dict_ref(xdata); + + if (op_ret >= 0) { + args->iatt1 = *stbuf; + args->iatt2 = *prebuf_dst; + args->iatt3 = *postbuf_dst; + } + + __wake(args); + + return 0; +} |
