summaryrefslogtreecommitdiffstats
path: root/xlators/performance/io-threads
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/performance/io-threads')
-rw-r--r--xlators/performance/io-threads/src/io-threads.c106
-rw-r--r--xlators/performance/io-threads/src/io-threads.h10
2 files changed, 80 insertions, 36 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index a0d5d97df..a04f664e8 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -31,6 +31,7 @@
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
+#include "locking.h"
typedef void *(*iot_worker_fn)(void*);
@@ -62,6 +63,64 @@ iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc);
void
iot_destroy_request (iot_worker_t *worker, iot_request_t * req);
+void
+iot_notify_worker (iot_worker_t *worker)
+{
+#ifndef HAVE_SPINLOCK
+ pthread_cond_broadcast (&worker->notifier);
+#else
+ sem_post (&worker->notifier);
+#endif
+
+ return;
+}
+
+int
+iot_notify_wait (iot_worker_t *worker, int idletime)
+{
+ struct timeval tv;
+ struct timespec ts;
+ int waitres = 0;
+
+ gettimeofday (&tv, NULL);
+ ts.tv_sec = tv.tv_sec + idletime;
+ /* Slightly skew the idle time for threads so that, we dont
+ * have all of them rushing to exit at the same time, if
+ * they've been idle.
+ */
+ ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000;
+
+#ifndef HAVE_SPINLOCK
+ waitres = pthread_cond_timedwait (&worker->notifier, &worker->qlock,
+ &ts);
+#else
+ UNLOCK (&worker->qlock);
+ errno = 0;
+ waitres = sem_timedwait (&worker->notifier, &ts);
+ LOCK (&worker->qlock);
+ if (waitres < 0)
+ waitres = errno;
+#endif
+
+ return waitres;
+}
+
+void
+iot_notify_init (iot_worker_t *worker)
+{
+ if (worker == NULL)
+ return;
+
+ LOCK_INIT (&worker->qlock);
+
+#ifndef HAVE_SPINLOCK
+ pthread_cond_init (&worker->notifier, NULL);
+#else
+ sem_init (&worker->notifier, 0, 0);
+#endif
+
+ return;
+}
/* I know this function modularizes things a bit too much,
* but it is easier on the eyes to read this than see all that locking,
@@ -73,7 +132,7 @@ iot_request_queue_and_thread_fire (iot_worker_t *worker,
iot_worker_fn workerfunc, iot_request_t *req)
{
int ret = -1;
- pthread_mutex_lock (&worker->qlock);
+ LOCK (&worker->qlock);
{
if (iot_worker_active (worker)) {
_iot_queue (worker, req);
@@ -87,7 +146,7 @@ iot_request_queue_and_thread_fire (iot_worker_t *worker,
}
}
unlock:
- pthread_mutex_unlock (&worker->qlock);
+ UNLOCK (&worker->qlock);
return ret;
}
@@ -2229,7 +2288,7 @@ _iot_queue (iot_worker_t *worker, iot_request_t *req)
/* dq_cond */
worker->queue_size++;
- pthread_cond_broadcast (&worker->dq_cond);
+ iot_notify_worker(worker);
}
@@ -2300,8 +2359,6 @@ iot_ordered_exit (int cond_waitres, iot_worker_t *worker)
int
iot_ordered_request_wait (iot_worker_t * worker)
{
- struct timeval tv;
- struct timespec ts;
int waitres = 0;
int retstat = 0;
@@ -2310,15 +2367,7 @@ iot_ordered_request_wait (iot_worker_t * worker)
goto out;
}
- gettimeofday (&tv, NULL);
- ts.tv_sec = tv.tv_sec + worker->conf->o_idle_time;
- /* Slightly skew the idle time for threads so that, we dont
- * have all of them rushing to exit at the same time, if
- * they've been idle.
- */
- ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000;
- waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock,
- &ts);
+ waitres = iot_notify_wait (worker, worker->conf->o_idle_time);
if (iot_ordered_exit (waitres, worker)) {
retstat = -1;
}
@@ -2335,7 +2384,7 @@ iot_dequeue_ordered (iot_worker_t *worker)
iot_request_t *req = NULL;
int waitstat = 0;
- pthread_mutex_lock (&worker->qlock);
+ LOCK (&worker->qlock);
{
while (!worker->queue_size) {
waitstat = 0;
@@ -2355,7 +2404,7 @@ iot_dequeue_ordered (iot_worker_t *worker)
worker->queue_size--;
}
out:
- pthread_mutex_unlock (&worker->qlock);
+ UNLOCK (&worker->qlock);
iot_destroy_request (worker, req);
return stub;
@@ -2425,8 +2474,6 @@ iot_unordered_exit (int cond_waitres, iot_worker_t *worker)
int
iot_unordered_request_wait (iot_worker_t * worker)
{
- struct timeval tv;
- struct timespec ts;
int waitres = 0;
int retstat = 0;
@@ -2435,15 +2482,7 @@ iot_unordered_request_wait (iot_worker_t * worker)
goto out;
}
- gettimeofday (&tv, NULL);
- ts.tv_sec = tv.tv_sec + worker->conf->u_idle_time;
- /* Slightly skew the idle time for threads so that, we dont
- * have all of them rushing to exit at the same time, if
- * they've been idle.
- */
- ts.tv_nsec = skew_usec_idle_time (tv.tv_usec) * 1000;
- waitres = pthread_cond_timedwait (&worker->dq_cond, &worker->qlock,
- &ts);
+ waitres = iot_notify_wait (worker, worker->conf->u_idle_time);
if (iot_unordered_exit (waitres, worker)) {
retstat = -1;
}
@@ -2460,7 +2499,7 @@ iot_dequeue_unordered (iot_worker_t *worker)
iot_request_t *req = NULL;
int waitstat = 0;
- pthread_mutex_lock (&worker->qlock);
+ LOCK (&worker->qlock);
{
while (!worker->queue_size) {
waitstat = 0;
@@ -2480,7 +2519,7 @@ iot_dequeue_unordered (iot_worker_t *worker)
worker->queue_size--;
}
out:
- pthread_mutex_unlock (&worker->qlock);
+ UNLOCK (&worker->qlock);
iot_destroy_request (worker, req);
return stub;
@@ -2560,8 +2599,7 @@ allocate_worker (iot_conf_t * conf)
INIT_LIST_HEAD (&wrk->rqlist);
wrk->conf = conf;
- pthread_cond_init (&wrk->dq_cond, NULL);
- pthread_mutex_init (&wrk->qlock, NULL);
+ iot_notify_init (wrk);
wrk->state = IOT_STATE_DEAD;
out:
@@ -2599,13 +2637,13 @@ out:
void
iot_stop_worker (iot_worker_t *worker)
{
- pthread_mutex_lock (&worker->qlock);
+ LOCK (&worker->qlock);
{
worker->state = IOT_STATE_EXIT_REQUEST;
}
- pthread_mutex_unlock (&worker->qlock);
+ UNLOCK (&worker->qlock);
- pthread_cond_broadcast (&worker->dq_cond);
+ iot_notify_worker (worker);
pthread_join (worker->thread, NULL);
}
diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h
index c5ca76000..ec1735694 100644
--- a/xlators/performance/io-threads/src/io-threads.h
+++ b/xlators/performance/io-threads/src/io-threads.h
@@ -34,6 +34,8 @@
#include "common-utils.h"
#include "list.h"
#include <stdlib.h>
+#include "locking.h"
+#include <semaphore.h>
#define min(a,b) ((a)<(b)?(a):(b))
#define max(a,b) ((a)>(b)?(a):(b))
@@ -88,9 +90,13 @@ typedef enum {
struct iot_worker {
struct list_head rqlist; /* List of requests assigned to me. */
struct iot_conf *conf;
+#ifndef HAVE_SPINLOCK
+ pthread_cond_t notifier;
+#else
+ sem_t notifier;
+#endif
int64_t q,dq;
- pthread_cond_t dq_cond;
- pthread_mutex_t qlock;
+ gf_lock_t qlock;
int32_t queue_size;
pthread_t thread;
iot_state_t state; /* What state is the thread in. */