summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--xlators/performance/io-threads/src/io-threads.c82
1 files changed, 62 insertions, 20 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index 98621529923..9a387d93ac7 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -41,6 +41,7 @@ void iot_startup_workers (iot_worker_t **workers, int start_idx, int count,
void * iot_worker_unordered (void *arg);
void * iot_worker_ordered (void *arg);
void iot_startup_worker (iot_worker_t *worker, iot_worker_fn workerfunc);
+void iot_destroy_request (iot_request_t * req);
/* I know this function modularizes things a bit too much,
* but it is easier on the eyes to read this than see all that locking,
@@ -107,33 +108,56 @@ iot_schedule_unordered (iot_conf_t *conf,
iot_worker_unordered, req);
}
-/* Assumes inode lock is held. */
-int
-iot_ordered_request_balancer (iot_conf_t *conf,
- inode_t *inode)
+/* Only to be used with ordered requests.
+ */
+uint64_t
+iot_create_inode_worker_assoc (iot_conf_t * conf, inode_t * inode)
{
- int ctxret = 0;
long int rand = 0;
uint64_t idx = 0;
- ctxret = __inode_ctx_get (inode, conf->this, &idx);
- if (ctxret < 0) {
- rand = random ();
- /* If scaling is on, we can choose from any thread
- * that has been allocated upto, max_o_threads, but
- * with scaling off, we'll never have threads more
- * than min_o_threads.
- */
- if (iot_ordered_scaling_on (conf))
- idx = (rand % conf->max_o_threads);
- else
- idx = (rand % conf->min_o_threads);
- __inode_ctx_put (inode, conf->this, idx);
- }
+ rand = random ();
+ /* If scaling is on, we can choose from any thread
+ * that has been allocated upto, max_o_threads, but
+ * with scaling off, we'll never have threads more
+ * than min_o_threads.
+ */
+ if (iot_ordered_scaling_on (conf))
+ idx = (rand % conf->max_o_threads);
+ else
+ idx = (rand % conf->min_o_threads);
+
+ __inode_ctx_put (inode, conf->this, idx);
return idx;
}
+/* Assumes inode lock is held. */
+int
+iot_ordered_request_balancer (iot_conf_t *conf, inode_t *inode, uint64_t *idx)
+{
+ int ret = 0;
+
+ if (__inode_ctx_get (inode, conf->this, idx) < 0)
+ *idx = iot_create_inode_worker_assoc (conf, inode);
+ else {
+ /* Sanity check to ensure the idx received from the inode
+ * context is within bounds. We're a bit optimistic in
+ * assuming that if an index is within bounds, it is
+ * not corrupted. idx is uint so we dont check for less
+ * than 0.
+ */
+ if ((*idx >= (uint64_t)conf->max_o_threads)) {
+ gf_log (conf->this->name, GF_LOG_ERROR,
+ "inode context returned insane thread index %"
+ PRIu64, *idx);
+ ret = -1;
+ }
+ }
+
+ return ret;
+}
+
void
iot_schedule_ordered (iot_conf_t *conf,
inode_t *inode,
@@ -142,6 +166,7 @@ iot_schedule_ordered (iot_conf_t *conf,
uint64_t idx = 0;
iot_worker_t *selected_worker = NULL;
iot_request_t * req = NULL;
+ int balstatus = 0;
if (inode == NULL) {
gf_log (conf->this->name, GF_LOG_ERROR,
@@ -151,7 +176,14 @@ iot_schedule_ordered (iot_conf_t *conf,
req = iot_init_request (stub);
LOCK (&inode->lock);
{
- idx = iot_ordered_request_balancer (conf, inode);
+ balstatus = iot_ordered_request_balancer (conf, inode, &idx);
+ if (balstatus < 0) {
+ gf_log (conf->this->name, GF_LOG_ERROR,
+ "Insane worker index. Unwinding stack");
+ STACK_UNWIND (stub->frame, -1, ECANCELED, NULL);
+ iot_destroy_request (req);
+ goto unlock_out;
+ }
/* inode lock once acquired, cannot be left here
* because other gluster main threads might be
* contending on it to append a request for this file.
@@ -162,6 +194,7 @@ iot_schedule_ordered (iot_conf_t *conf,
iot_request_queue_and_thread_fire (selected_worker,
iot_worker_ordered, req);
}
+unlock_out:
UNLOCK (&inode->lock);
}
@@ -1444,6 +1477,15 @@ iot_init_request (call_stub_t *stub)
return req;
}
+void
+iot_destroy_request (iot_request_t * req)
+{
+ if (req == NULL)
+ return;
+
+ FREE (req);
+}
+
/* Must be called with worker lock held. */
int
iot_can_ordered_exit (iot_worker_t * worker)