aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/fs/io-wq.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r--fs/io-wq.c156
1 files changed, 68 insertions, 88 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index aa9656eb832e..8cba77a937a1 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -32,7 +32,7 @@ enum {
};
enum {
- IO_WQE_FLAG_STALLED = 1, /* stalled on hash */
+ IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
};
/*
@@ -71,25 +71,24 @@ struct io_wqe_acct {
unsigned max_workers;
int index;
atomic_t nr_running;
+ struct io_wq_work_list work_list;
+ unsigned long flags;
};
enum {
IO_WQ_ACCT_BOUND,
IO_WQ_ACCT_UNBOUND,
+ IO_WQ_ACCT_NR,
};
/*
* Per-node worker thread pool
*/
struct io_wqe {
- struct {
- raw_spinlock_t lock;
- struct io_wq_work_list work_list;
- unsigned flags;
- } ____cacheline_aligned_in_smp;
+ raw_spinlock_t lock;
+ struct io_wqe_acct acct[2];
int node;
- struct io_wqe_acct acct[2];
struct hlist_nulls_head free_list;
struct list_head all_list;
@@ -195,11 +194,10 @@ static void io_worker_exit(struct io_worker *worker)
do_exit(0);
}
-static inline bool io_wqe_run_queue(struct io_wqe *wqe)
- __must_hold(wqe->lock)
+static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
{
- if (!wq_list_empty(&wqe->work_list) &&
- !(wqe->flags & IO_WQE_FLAG_STALLED))
+ if (!wq_list_empty(&acct->work_list) &&
+ !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
return true;
return false;
}
@@ -208,7 +206,8 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
* Check head of free list for an available worker. If one isn't available,
* caller must create one.
*/
-static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
+static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
+ struct io_wqe_acct *acct)
__must_hold(RCU)
{
struct hlist_nulls_node *n;
@@ -222,6 +221,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
if (!io_worker_get(worker))
continue;
+ if (io_wqe_get_acct(worker) != acct) {
+ io_worker_release(worker);
+ continue;
+ }
if (wake_up_process(worker->task)) {
io_worker_release(worker);
return true;
@@ -340,7 +343,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
if (!(worker->flags & IO_WORKER_F_UP))
return;
- if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
+ if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
io_queue_worker_create(wqe, worker, acct);
@@ -355,29 +358,10 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
struct io_wq_work *work)
__must_hold(wqe->lock)
{
- bool worker_bound, work_bound;
-
- BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);
-
if (worker->flags & IO_WORKER_F_FREE) {
worker->flags &= ~IO_WORKER_F_FREE;
hlist_nulls_del_init_rcu(&worker->nulls_node);
}
-
- /*
- * If worker is moving from bound to unbound (or vice versa), then
- * ensure we update the running accounting.
- */
- worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
- work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
- if (worker_bound != work_bound) {
- int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
- io_wqe_dec_running(worker);
- worker->flags ^= IO_WORKER_F_BOUND;
- wqe->acct[index].nr_workers--;
- wqe->acct[index ^ 1].nr_workers++;
- io_wqe_inc_running(worker);
- }
}
/*
@@ -416,44 +400,23 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
spin_unlock_irq(&wq->hash->wait.lock);
}
-/*
- * We can always run the work if the worker is currently the same type as
- * the work (eg both are bound, or both are unbound). If they are not the
- * same, only allow it if incrementing the worker count would be allowed.
- */
-static bool io_worker_can_run_work(struct io_worker *worker,
- struct io_wq_work *work)
-{
- struct io_wqe_acct *acct;
-
- if (!(worker->flags & IO_WORKER_F_BOUND) !=
- !(work->flags & IO_WQ_WORK_UNBOUND))
- return true;
-
- /* not the same type, check if we'd go over the limit */
- acct = io_work_get_acct(worker->wqe, work);
- return acct->nr_workers < acct->max_workers;
-}
-
-static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
+static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
struct io_worker *worker)
__must_hold(wqe->lock)
{
struct io_wq_work_node *node, *prev;
struct io_wq_work *work, *tail;
unsigned int stall_hash = -1U;
+ struct io_wqe *wqe = worker->wqe;
- wq_list_for_each(node, prev, &wqe->work_list) {
+ wq_list_for_each(node, prev, &acct->work_list) {
unsigned int hash;
work = container_of(node, struct io_wq_work, list);
- if (!io_worker_can_run_work(worker, work))
- break;
-
/* not hashed, can run anytime */
if (!io_wq_is_hashed(work)) {
- wq_list_del(&wqe->work_list, node, prev);
+ wq_list_del(&acct->work_list, node, prev);
return work;
}
@@ -464,7 +427,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
/* hashed, can run if not already running */
if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
wqe->hash_tail[hash] = NULL;
- wq_list_cut(&wqe->work_list, &tail->list, prev);
+ wq_list_cut(&acct->work_list, &tail->list, prev);
return work;
}
if (stall_hash == -1U)
@@ -478,7 +441,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe,
* Set this before dropping the lock to avoid racing with new
* work being added and clearing the stalled bit.
*/
- wqe->flags |= IO_WQE_FLAG_STALLED;
+ set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&wqe->lock);
io_wait_on_hash(wqe, stall_hash);
raw_spin_lock(&wqe->lock);
@@ -515,6 +478,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
static void io_worker_handle_work(struct io_worker *worker)
__releases(wqe->lock)
{
+ struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
@@ -529,7 +493,7 @@ get_next:
* can't make progress, any work completion or insertion will
* clear the stalled flag.
*/
- work = io_get_next_work(wqe, worker);
+ work = io_get_next_work(acct, worker);
if (work)
__io_worker_busy(wqe, worker, work);
@@ -563,10 +527,10 @@ get_next:
if (hash != -1U && !next_hashed) {
clear_bit(hash, &wq->hash->map);
+ clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
if (wq_has_sleeper(&wq->hash->wait))
wake_up(&wq->hash->wait);
raw_spin_lock(&wqe->lock);
- wqe->flags &= ~IO_WQE_FLAG_STALLED;
/* skip unnecessary unlock-lock wqe->lock */
if (!work)
goto get_next;
@@ -581,6 +545,7 @@ get_next:
static int io_wqe_worker(void *data)
{
struct io_worker *worker = data;
+ struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
char buf[TASK_COMM_LEN];
@@ -596,7 +561,7 @@ static int io_wqe_worker(void *data)
set_current_state(TASK_INTERRUPTIBLE);
loop:
raw_spin_lock(&wqe->lock);
- if (io_wqe_run_queue(wqe)) {
+ if (io_acct_run_queue(acct)) {
io_worker_handle_work(worker);
goto loop;
}
@@ -764,12 +729,13 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
+ struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
unsigned int hash;
struct io_wq_work *tail;
if (!io_wq_is_hashed(work)) {
append:
- wq_list_add_tail(&work->list, &wqe->work_list);
+ wq_list_add_tail(&work->list, &acct->work_list);
return;
}
@@ -779,7 +745,7 @@ append:
if (!tail)
goto append;
- wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
+ wq_list_add_after(&work->list, &tail->list, &acct->work_list);
}
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
@@ -800,10 +766,10 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
raw_spin_lock(&wqe->lock);
io_wqe_insert_work(wqe, work);
- wqe->flags &= ~IO_WQE_FLAG_STALLED;
+ clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
rcu_read_lock();
- do_create = !io_wqe_activate_free_worker(wqe);
+ do_create = !io_wqe_activate_free_worker(wqe, acct);
rcu_read_unlock();
raw_spin_unlock(&wqe->lock);
@@ -855,6 +821,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
struct io_wq_work *work,
struct io_wq_work_node *prev)
{
+ struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
unsigned int hash = io_get_work_hash(work);
struct io_wq_work *prev_work = NULL;
@@ -866,7 +833,7 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
else
wqe->hash_tail[hash] = NULL;
}
- wq_list_del(&wqe->work_list, &work->list, prev);
+ wq_list_del(&acct->work_list, &work->list, prev);
}
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
@@ -874,22 +841,27 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
{
struct io_wq_work_node *node, *prev;
struct io_wq_work *work;
+ int i;
retry:
raw_spin_lock(&wqe->lock);
- wq_list_for_each(node, prev, &wqe->work_list) {
- work = container_of(node, struct io_wq_work, list);
- if (!match->fn(work, match->data))
- continue;
- io_wqe_remove_pending(wqe, work, prev);
- raw_spin_unlock(&wqe->lock);
- io_run_cancel(work, wqe);
- match->nr_pending++;
- if (!match->cancel_all)
- return;
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
- /* not safe to continue after unlock */
- goto retry;
+ wq_list_for_each(node, prev, &acct->work_list) {
+ work = container_of(node, struct io_wq_work, list);
+ if (!match->fn(work, match->data))
+ continue;
+ io_wqe_remove_pending(wqe, work, prev);
+ raw_spin_unlock(&wqe->lock);
+ io_run_cancel(work, wqe);
+ match->nr_pending++;
+ if (!match->cancel_all)
+ return;
+
+ /* not safe to continue after unlock */
+ goto retry;
+ }
}
raw_spin_unlock(&wqe->lock);
}
@@ -950,18 +922,24 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
int sync, void *key)
{
struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
+ int i;
list_del_init(&wait->entry);
rcu_read_lock();
- io_wqe_activate_free_worker(wqe);
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ struct io_wqe_acct *acct = &wqe->acct[i];
+
+ if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
+ io_wqe_activate_free_worker(wqe, acct);
+ }
rcu_read_unlock();
return 1;
}
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
{
- int ret, node;
+ int ret, node, i;
struct io_wq *wq;
if (WARN_ON_ONCE(!data->free_work || !data->do_work))
@@ -996,18 +974,20 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
wq->wqes[node] = wqe;
wqe->node = alloc_node;
- wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
- wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
- atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
task_rlimit(current, RLIMIT_NPROC);
- atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
- wqe->wait.func = io_wqe_hash_wake;
INIT_LIST_HEAD(&wqe->wait.entry);
+ wqe->wait.func = io_wqe_hash_wake;
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
+ struct io_wqe_acct *acct = &wqe->acct[i];
+
+ acct->index = i;
+ atomic_set(&acct->nr_running, 0);
+ INIT_WQ_LIST(&acct->work_list);
+ }
wqe->wq = wq;
raw_spin_lock_init(&wqe->lock);
- INIT_WQ_LIST(&wqe->work_list);
INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
INIT_LIST_HEAD(&wqe->all_list);
}
@@ -1189,7 +1169,7 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
for_each_node(node) {
struct io_wqe_acct *acct;
- for (i = 0; i < 2; i++) {
+ for (i = 0; i < IO_WQ_ACCT_NR; i++) {
acct = &wq->wqes[node]->acct[i];
prev = max_t(int, acct->max_workers, prev);
if (new_count[i])