diff options
Diffstat (limited to 'libglusterfs/src/syncop.c')
-rw-r--r-- | libglusterfs/src/syncop.c | 154 |
1 files changed, 126 insertions, 28 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index 87f398417..dbc52259f 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -80,24 +80,15 @@ __wait (struct synctask *task) void -synctask_waitfor (struct synctask *task, int waitfor) +synctask_yield (struct synctask *task) { - struct syncenv *env = NULL; xlator_t *oldTHIS = THIS; - env = task->env; - #if defined(__NetBSD__) && defined(_UC_TLSBASE) /* Preserve pthread private pointer through swapcontex() */ task->proc->sched.uc_flags &= ~_UC_TLSBASE; #endif - pthread_mutex_lock (&env->mutex); - { - task->waitfor = waitfor; - } - pthread_mutex_unlock (&env->mutex); - if (swapcontext (&task->ctx, &task->proc->sched) < 0) { gf_log ("syncop", GF_LOG_ERROR, "swapcontext failed (%s)", strerror (errno)); @@ -108,13 +99,6 @@ synctask_waitfor (struct synctask *task, int waitfor) void -synctask_yield (struct synctask *task) -{ - synctask_waitfor (task, 1); -} - - -void synctask_yawn (struct synctask *task) { struct syncenv *env = NULL; @@ -124,7 +108,6 @@ synctask_yawn (struct synctask *task) pthread_mutex_lock (&env->mutex); { task->woken = 0; - task->waitfor = 0; } pthread_mutex_unlock (&env->mutex); } @@ -139,9 +122,9 @@ synctask_wake (struct synctask *task) pthread_mutex_lock (&env->mutex); { - task->woken++; + task->woken = 1; - if (task->slept && task->woken >= task->waitfor) + if (task->slept) __run (task); pthread_cond_broadcast (&env->cond); @@ -352,7 +335,6 @@ syncenv_task (struct syncproc *proc) task->woken = 0; task->slept = 0; - task->waitfor = 0; task->proc = proc; } @@ -390,7 +372,7 @@ synctask_switchto (struct synctask *task) pthread_mutex_lock (&env->mutex); { - if (task->woken >= task->waitfor) { + if (task->woken) { __run (task); } else { task->slept = 1; @@ -547,12 +529,11 @@ __synclock_lock (struct synclock *lock) if (task) { /* called within a synctask */ list_add_tail (&task->waitq, &lock->waitq); - { - pthread_mutex_unlock (&lock->guard); - synctask_yield (task); - pthread_mutex_lock (&lock->guard); - } - list_del_init (&task->waitq); + pthread_mutex_unlock (&lock->guard); + synctask_yield (task); + /* task is removed from waitq in unlock, + * under lock->guard.*/ + pthread_mutex_lock (&lock->guard); } else { /* called by a non-synctask */ pthread_cond_wait (&lock->cond, &lock->guard); @@ -634,6 +615,7 @@ __synclock_unlock (synclock_t *lock) pthread_cond_signal (&lock->cond); if (!list_empty (&lock->waitq)) { task = list_entry (lock->waitq.next, struct synctask, waitq); + list_del_init (&task->waitq); synctask_wake (task); } @@ -655,6 +637,122 @@ synclock_unlock (synclock_t *lock) return ret; } +/* Barriers */ + +int +syncbarrier_init (struct syncbarrier *barrier) +{ + if (!barrier) { + errno = EINVAL; + return -1; + } + + pthread_cond_init (&barrier->cond, 0); + barrier->count = 0; + INIT_LIST_HEAD (&barrier->waitq); + + return pthread_mutex_init (&barrier->guard, 0); +} + + +int +syncbarrier_destroy (struct syncbarrier *barrier) +{ + if (!barrier) { + errno = EINVAL; + return -1; + } + + pthread_cond_destroy (&barrier->cond); + return pthread_mutex_destroy (&barrier->guard); +} + + +static int +__syncbarrier_wait (struct syncbarrier *barrier, int waitfor) +{ + struct synctask *task = NULL; + + if (!barrier) { + errno = EINVAL; + return -1; + } + + task = synctask_get (); + + while (barrier->count < waitfor) { + if (task) { + /* called within a synctask */ + list_add_tail (&task->waitq, &barrier->waitq); + { + pthread_mutex_unlock (&barrier->guard); + synctask_yield (task); + pthread_mutex_lock (&barrier->guard); + } + list_del_init (&task->waitq); + } else { + /* called by a non-synctask */ + pthread_cond_wait (&barrier->cond, &barrier->guard); + } + } + + barrier->count = 0; + + return 0; +} + + +int +syncbarrier_wait (struct syncbarrier *barrier, int waitfor) +{ + int ret = 0; + + pthread_mutex_lock (&barrier->guard); + { + ret = __syncbarrier_wait (barrier, waitfor); + } + pthread_mutex_unlock (&barrier->guard); + + return ret; +} + + +static int +__syncbarrier_wake (struct syncbarrier *barrier) +{ + struct synctask *task = NULL; + + if (!barrier) { + errno = EINVAL; + return -1; + } + + barrier->count++; + + pthread_cond_signal (&barrier->cond); + if (!list_empty (&barrier->waitq)) { + task = list_entry (barrier->waitq.next, struct synctask, waitq); + synctask_wake (task); + } + + return 0; +} + + +int +syncbarrier_wake (struct syncbarrier *barrier) +{ + int ret = 0; + + pthread_mutex_lock (&barrier->guard); + { + ret = __syncbarrier_wake (barrier); + } + pthread_mutex_unlock (&barrier->guard); + + return ret; +} + /* FOPS */ |