diff options
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 106 | ||||
-rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 10 |
2 files changed, 80 insertions, 36 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index a0d5d97df67..a04f664e816 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -31,6 +31,7 @@ #include <stdlib.h> #include <sys/time.h> #include <time.h> +#include "locking.h" typedef void *(*iot_worker_fn)(void*); @@ -62,6 +63,64 @@ iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc); void iot_destroy_request (iot_worker_t *worker, iot_request_t * req); +void +iot_notify_worker (iot_worker_t *worker) +{ +#ifndef HAVE_SPINLOCK + pthread_cond_broadcast (&worker->notifier); +#else + sem_post (&worker->notifier); +#endif + + return; +} + +int +iot_notify_wait (iot_worker_t *worker, int idletime) +{ + struct timeval tv; + struct timespec ts; + int waitres = 0; + + gettimeofday (&tv, NULL); + ts.tv_sec = tv.tv_sec + idletime; + /* Slightly skew the idle time for threads so that, we dont + * have all of them rushing to exit at the same time, if + * they've been idle. + */ + ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000; + +#ifndef HAVE_SPINLOCK + waitres = pthread_cond_timedwait (&worker->notifier, &worker->qlock, + &ts); +#else + UNLOCK (&worker->qlock); + errno = 0; + waitres = sem_timedwait (&worker->notifier, &ts); + LOCK (&worker->qlock); + if (waitres < 0) + waitres = errno; +#endif + + return waitres; +} + +void +iot_notify_init (iot_worker_t *worker) +{ + if (worker == NULL) + return; + + LOCK_INIT (&worker->qlock); + +#ifndef HAVE_SPINLOCK + pthread_cond_init (&worker->notifier, NULL); +#else + sem_init (&worker->notifier, 0, 0); +#endif + + return; +} /* I know this function modularizes things a bit too much, * but it is easier on the eyes to read this than see all that locking, @@ -73,7 +132,7 @@ iot_request_queue_and_thread_fire (iot_worker_t *worker, iot_worker_fn workerfunc, iot_request_t *req) { int ret = -1; - pthread_mutex_lock (&worker->qlock); + LOCK (&worker->qlock); { if (iot_worker_active (worker)) { _iot_queue (worker, req); @@ -87,7 +146,7 @@ iot_request_queue_and_thread_fire (iot_worker_t *worker, } } unlock: - pthread_mutex_unlock (&worker->qlock); + UNLOCK (&worker->qlock); return ret; } @@ -2229,7 +2288,7 @@ _iot_queue (iot_worker_t *worker, iot_request_t *req) /* dq_cond */ worker->queue_size++; - pthread_cond_broadcast (&worker->dq_cond); + iot_notify_worker(worker); } @@ -2300,8 +2359,6 @@ iot_ordered_exit (int cond_waitres, iot_worker_t *worker) int iot_ordered_request_wait (iot_worker_t * worker) { - struct timeval tv; - struct timespec ts; int waitres = 0; int retstat = 0; @@ -2310,15 +2367,7 @@ iot_ordered_request_wait (iot_worker_t * worker) goto out; } - gettimeofday (&tv, NULL); - ts.tv_sec = tv.tv_sec + worker->conf->o_idle_time; - /* Slightly skew the idle time for threads so that, we dont - * have all of them rushing to exit at the same time, if - * they've been idle. - */ - ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000; - waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock, - &ts); + waitres = iot_notify_wait (worker, worker->conf->o_idle_time); if (iot_ordered_exit (waitres, worker)) { retstat = -1; } @@ -2335,7 +2384,7 @@ iot_dequeue_ordered (iot_worker_t *worker) iot_request_t *req = NULL; int waitstat = 0; - pthread_mutex_lock (&worker->qlock); + LOCK (&worker->qlock); { while (!worker->queue_size) { waitstat = 0; @@ -2355,7 +2404,7 @@ iot_dequeue_ordered (iot_worker_t *worker) worker->queue_size--; } out: - pthread_mutex_unlock (&worker->qlock); + UNLOCK (&worker->qlock); iot_destroy_request (worker, req); return stub; @@ -2425,8 +2474,6 @@ iot_unordered_exit (int cond_waitres, iot_worker_t *worker) int iot_unordered_request_wait (iot_worker_t * worker) { - struct timeval tv; - struct timespec ts; int waitres = 0; int retstat = 0; @@ -2435,15 +2482,7 @@ iot_unordered_request_wait (iot_worker_t * worker) goto out; } - gettimeofday (&tv, NULL); - ts.tv_sec = tv.tv_sec + worker->conf->u_idle_time; - /* Slightly skew the idle time for threads so that, we dont - * have all of them rushing to exit at the same time, if - * they've been idle. - */ - ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000; - waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock, - &ts); + waitres = iot_notify_wait (worker, worker->conf->u_idle_time); if (iot_unordered_exit (waitres, worker)) { retstat = -1; } @@ -2460,7 +2499,7 @@ iot_dequeue_unordered (iot_worker_t *worker) iot_request_t *req = NULL; int waitstat = 0; - pthread_mutex_lock (&worker->qlock); + LOCK (&worker->qlock); { while (!worker->queue_size) { waitstat = 0; @@ -2480,7 +2519,7 @@ iot_dequeue_unordered (iot_worker_t *worker) worker->queue_size--; } out: - pthread_mutex_unlock (&worker->qlock); + UNLOCK (&worker->qlock); iot_destroy_request (worker, req); return stub; @@ -2560,8 +2599,7 @@ allocate_worker (iot_conf_t * conf) INIT_LIST_HEAD (&wrk->rqlist); wrk->conf = conf; - pthread_cond_init (&wrk->dq_cond, NULL); - pthread_mutex_init (&wrk->qlock, NULL); + iot_notify_init (wrk); wrk->state = IOT_STATE_DEAD; out: @@ -2599,13 +2637,13 @@ out: void iot_stop_worker (iot_worker_t *worker) { - pthread_mutex_lock (&worker->qlock); + LOCK (&worker->qlock); { worker->state = IOT_STATE_EXIT_REQUEST; } - pthread_mutex_unlock (&worker->qlock); + UNLOCK (&worker->qlock); - pthread_cond_broadcast (&worker->dq_cond); + iot_notify_worker (worker); pthread_join (worker->thread, NULL); } diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index c5ca760000a..ec17356942b 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -34,6 +34,8 @@ #include "common-utils.h" #include "list.h" #include <stdlib.h> +#include "locking.h" +#include <semaphore.h> #define min(a,b) ((a)<(b)?(a):(b)) #define max(a,b) ((a)>(b)?(a):(b)) @@ -88,9 +90,13 @@ typedef enum { struct iot_worker { struct list_head rqlist; /* List of requests assigned to me. */ struct iot_conf *conf; +#ifndef HAVE_SPINLOCK + pthread_cond_t notifier; +#else + sem_t notifier; +#endif int64_t q,dq; - pthread_cond_t dq_cond; - pthread_mutex_t qlock; + gf_lock_t qlock; int32_t queue_size; pthread_t thread; iot_state_t state; /* What state is the thread in. */ |