aboutsummaryrefslogtreecommitdiffstats
path: root/fs/io-wq.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r--fs/io-wq.c240
1 files changed, 152 insertions, 88 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 5147d2213b01..5cef075c0b37 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -16,6 +16,7 @@
#include <linux/slab.h>
#include <linux/kthread.h>
#include <linux/rculist_nulls.h>
+#include <linux/fs_struct.h>
#include "io-wq.h"
@@ -56,8 +57,10 @@ struct io_worker {
struct rcu_head rcu;
struct mm_struct *mm;
- const struct cred *creds;
+ const struct cred *cur_creds;
+ const struct cred *saved_creds;
struct files_struct *restore_files;
+ struct fs_struct *restore_fs;
};
#if BITS_PER_LONG == 64
@@ -109,10 +112,10 @@ struct io_wq {
struct task_struct *manager;
struct user_struct *user;
- const struct cred *creds;
- struct mm_struct *mm;
refcount_t refs;
struct completion done;
+
+ refcount_t use_refs;
};
static bool io_worker_get(struct io_worker *worker)
@@ -135,9 +138,9 @@ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
{
bool dropped_lock = false;
- if (worker->creds) {
- revert_creds(worker->creds);
- worker->creds = NULL;
+ if (worker->saved_creds) {
+ revert_creds(worker->saved_creds);
+ worker->cur_creds = worker->saved_creds = NULL;
}
if (current->files != worker->restore_files) {
@@ -150,6 +153,9 @@ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
task_unlock(current);
}
+ if (current->fs != worker->restore_fs)
+ current->fs = worker->restore_fs;
+
/*
* If we have an active mm, we need to drop the wq lock before unusing
* it. If we do, return true and let the caller retry the idle loop.
@@ -310,6 +316,7 @@ static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
worker->restore_files = current->files;
+ worker->restore_fs = current->fs;
io_wqe_inc_running(wqe, worker);
}
@@ -396,6 +403,43 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
return NULL;
}
+static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work)
+{
+ if (worker->mm) {
+ unuse_mm(worker->mm);
+ mmput(worker->mm);
+ worker->mm = NULL;
+ }
+ if (!work->mm) {
+ set_fs(KERNEL_DS);
+ return;
+ }
+ if (mmget_not_zero(work->mm)) {
+ use_mm(work->mm);
+ if (!worker->mm)
+ set_fs(USER_DS);
+ worker->mm = work->mm;
+ /* hang on to this mm */
+ work->mm = NULL;
+ return;
+ }
+
+ /* failed grabbing mm, ensure work gets cancelled */
+ work->flags |= IO_WQ_WORK_CANCEL;
+}
+
+static void io_wq_switch_creds(struct io_worker *worker,
+ struct io_wq_work *work)
+{
+ const struct cred *old_creds = override_creds(work->creds);
+
+ worker->cur_creds = work->creds;
+ if (worker->saved_creds)
+ put_cred(old_creds); /* creds set by previous switch */
+ else
+ worker->saved_creds = old_creds;
+}
+
static void io_worker_handle_work(struct io_worker *worker)
__releases(wqe->lock)
{
@@ -438,30 +482,27 @@ next:
if (work->flags & IO_WQ_WORK_CB)
work->func(&work);
- if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
- current->files != work->files) {
+ if (work->files && current->files != work->files) {
task_lock(current);
current->files = work->files;
task_unlock(current);
}
- if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm &&
- wq->mm) {
- if (mmget_not_zero(wq->mm)) {
- use_mm(wq->mm);
- set_fs(USER_DS);
- worker->mm = wq->mm;
- } else {
- work->flags |= IO_WQ_WORK_CANCEL;
- }
- }
- if (!worker->creds)
- worker->creds = override_creds(wq->creds);
+ if (work->fs && current->fs != work->fs)
+ current->fs = work->fs;
+ if (work->mm != worker->mm)
+ io_wq_switch_mm(worker, work);
+ if (worker->cur_creds != work->creds)
+ io_wq_switch_creds(worker, work);
+ /*
+ * OK to set IO_WQ_WORK_CANCEL even for uncancellable work,
+ * the worker function will do the right thing.
+ */
if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
work->flags |= IO_WQ_WORK_CANCEL;
if (worker->mm)
work->flags |= IO_WQ_WORK_HAS_MM;
- if (wq->get_work && !(work->flags & IO_WQ_WORK_INTERNAL)) {
+ if (wq->get_work) {
put_work = work;
wq->get_work(work);
}
@@ -494,42 +535,23 @@ next:
} while (1);
}
-static inline void io_worker_spin_for_work(struct io_wqe *wqe)
-{
- int i = 0;
-
- while (++i < 1000) {
- if (io_wqe_run_queue(wqe))
- break;
- if (need_resched())
- break;
- cpu_relax();
- }
-}
-
static int io_wqe_worker(void *data)
{
struct io_worker *worker = data;
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
- bool did_work;
io_worker_start(wqe, worker);
- did_work = false;
while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
set_current_state(TASK_INTERRUPTIBLE);
loop:
- if (did_work)
- io_worker_spin_for_work(wqe);
spin_lock_irq(&wqe->lock);
if (io_wqe_run_queue(wqe)) {
__set_current_state(TASK_RUNNING);
io_worker_handle_work(worker);
- did_work = true;
goto loop;
}
- did_work = false;
/* drops the lock on success, retry */
if (__io_worker_idle(wqe, worker)) {
__release(&wqe->lock);
@@ -658,11 +680,16 @@ static int io_wq_manager(void *data)
/* create fixed workers */
refcount_set(&wq->refs, workers_to_create);
for_each_node(node) {
+ if (!node_online(node))
+ continue;
if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
goto err;
workers_to_create--;
}
+ while (workers_to_create--)
+ refcount_dec(&wq->refs);
+
complete(&wq->done);
while (!kthread_should_stop()) {
@@ -670,6 +697,9 @@ static int io_wq_manager(void *data)
struct io_wqe *wqe = wq->wqes[node];
bool fork_worker[2] = { false, false };
+ if (!node_online(node))
+ continue;
+
spin_lock_irq(&wqe->lock);
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
fork_worker[IO_WQ_ACCT_BOUND] = true;
@@ -717,9 +747,21 @@ static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
return true;
}
+static void io_run_cancel(struct io_wq_work *work)
+{
+ do {
+ struct io_wq_work *old_work = work;
+
+ work->flags |= IO_WQ_WORK_CANCEL;
+ work->func(&work);
+ work = (work == old_work) ? NULL : work;
+ } while (work);
+}
+
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
+ int work_flags;
unsigned long flags;
/*
@@ -729,17 +771,18 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
* It's close enough to not be an issue, fork() has the same delay.
*/
if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
- work->flags |= IO_WQ_WORK_CANCEL;
- work->func(&work);
+ io_run_cancel(work);
return;
}
+ work_flags = work->flags;
spin_lock_irqsave(&wqe->lock, flags);
wq_list_add_tail(&work->list, &wqe->work_list);
wqe->flags &= ~IO_WQE_FLAG_STALLED;
spin_unlock_irqrestore(&wqe->lock, flags);
- if (!atomic_read(&acct->nr_running))
+ if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
+ !atomic_read(&acct->nr_running))
io_wqe_wake_worker(wqe, acct);
}
@@ -785,7 +828,9 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe,
list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
if (io_worker_get(worker)) {
- ret = func(worker, data);
+ /* no task if node is/was offline */
+ if (worker->task)
+ ret = func(worker, data);
io_worker_release(worker);
if (ret)
break;
@@ -828,6 +873,7 @@ static bool io_work_cancel(struct io_worker *worker, void *cancel_data)
*/
spin_lock_irqsave(&worker->lock, flags);
if (worker->cur_work &&
+ !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) &&
data->cancel(worker->cur_work, data->caller_data)) {
send_sig(SIGINT, worker->task, 1);
ret = true;
@@ -864,8 +910,7 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
spin_unlock_irqrestore(&wqe->lock, flags);
if (found) {
- work->flags |= IO_WQ_WORK_CANCEL;
- work->func(&work);
+ io_run_cancel(work);
return IO_WQ_CANCEL_OK;
}
@@ -892,17 +937,20 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
return ret;
}
+struct work_match {
+ bool (*fn)(struct io_wq_work *, void *data);
+ void *data;
+};
+
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
{
- struct io_wq_work *work = data;
+ struct work_match *match = data;
unsigned long flags;
bool ret = false;
- if (worker->cur_work != work)
- return false;
-
spin_lock_irqsave(&worker->lock, flags);
- if (worker->cur_work == work) {
+ if (match->fn(worker->cur_work, match->data) &&
+ !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL)) {
send_sig(SIGINT, worker->task, 1);
ret = true;
}
@@ -912,15 +960,13 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
}
static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
- struct io_wq_work *cwork)
+ struct work_match *match)
{
struct io_wq_work_node *node, *prev;
struct io_wq_work *work;
unsigned long flags;
bool found = false;
- cwork->flags |= IO_WQ_WORK_CANCEL;
-
/*
* First check pending list, if we're lucky we can just remove it
* from there. CANCEL_OK means that the work is returned as-new,
@@ -930,7 +976,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
wq_list_for_each(node, prev, &wqe->work_list) {
work = container_of(node, struct io_wq_work, list);
- if (work == cwork) {
+ if (match->fn(work, match->data)) {
wq_node_del(&wqe->work_list, node, prev);
found = true;
break;
@@ -939,8 +985,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
spin_unlock_irqrestore(&wqe->lock, flags);
if (found) {
- work->flags |= IO_WQ_WORK_CANCEL;
- work->func(&work);
+ io_run_cancel(work);
return IO_WQ_CANCEL_OK;
}
@@ -951,20 +996,31 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
* completion will run normally in this case.
*/
rcu_read_lock();
- found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork);
+ found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
rcu_read_unlock();
return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
}
+static bool io_wq_work_match(struct io_wq_work *work, void *data)
+{
+ return work == data;
+}
+
enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
{
+ struct work_match match = {
+ .fn = io_wq_work_match,
+ .data = cwork
+ };
enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
int node;
+ cwork->flags |= IO_WQ_WORK_CANCEL;
+
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
- ret = io_wqe_cancel_work(wqe, cwork);
+ ret = io_wqe_cancel_work(wqe, &match);
if (ret != IO_WQ_CANCEL_NOTFOUND)
break;
}
@@ -972,38 +1028,33 @@ enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
return ret;
}
-struct io_wq_flush_data {
- struct io_wq_work work;
- struct completion done;
-};
-
-static void io_wq_flush_func(struct io_wq_work **workptr)
+static bool io_wq_pid_match(struct io_wq_work *work, void *data)
{
- struct io_wq_work *work = *workptr;
- struct io_wq_flush_data *data;
+ pid_t pid = (pid_t) (unsigned long) data;
- data = container_of(work, struct io_wq_flush_data, work);
- complete(&data->done);
+ if (work)
+ return work->task_pid == pid;
+ return false;
}
-/*
- * Doesn't wait for previously queued work to finish. When this completes,
- * it just means that previously queued work was started.
- */
-void io_wq_flush(struct io_wq *wq)
+enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid)
{
- struct io_wq_flush_data data;
+ struct work_match match = {
+ .fn = io_wq_pid_match,
+ .data = (void *) (unsigned long) pid
+ };
+ enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
int node;
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
- init_completion(&data.done);
- INIT_IO_WORK(&data.work, io_wq_flush_func);
- data.work.flags |= IO_WQ_WORK_INTERNAL;
- io_wqe_enqueue(wqe, &data.work);
- wait_for_completion(&data.done);
+ ret = io_wqe_cancel_work(wqe, &match);
+ if (ret != IO_WQ_CANCEL_NOTFOUND)
+ break;
}
+
+ return ret;
}
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
@@ -1026,16 +1077,18 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
/* caller must already hold a reference to this */
wq->user = data->user;
- wq->creds = data->creds;
for_each_node(node) {
struct io_wqe *wqe;
+ int alloc_node = node;
- wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node);
+ if (!node_online(alloc_node))
+ alloc_node = NUMA_NO_NODE;
+ wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
if (!wqe)
goto err;
wq->wqes[node] = wqe;
- wqe->node = node;
+ wqe->node = alloc_node;
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
if (wq->user) {
@@ -1043,7 +1096,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
task_rlimit(current, RLIMIT_NPROC);
}
atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
- wqe->node = node;
wqe->wq = wq;
spin_lock_init(&wqe->lock);
INIT_WQ_LIST(&wqe->work_list);
@@ -1053,9 +1105,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
init_completion(&wq->done);
- /* caller must have already done mmgrab() on this mm */
- wq->mm = data->mm;
-
wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
if (!IS_ERR(wq->manager)) {
wake_up_process(wq->manager);
@@ -1064,6 +1113,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
ret = -ENOMEM;
goto err;
}
+ refcount_set(&wq->use_refs, 1);
reinit_completion(&wq->done);
return wq;
}
@@ -1078,13 +1128,21 @@ err:
return ERR_PTR(ret);
}
+bool io_wq_get(struct io_wq *wq, struct io_wq_data *data)
+{
+ if (data->get_work != wq->get_work || data->put_work != wq->put_work)
+ return false;
+
+ return refcount_inc_not_zero(&wq->use_refs);
+}
+
static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
wake_up_process(worker->task);
return false;
}
-void io_wq_destroy(struct io_wq *wq)
+static void __io_wq_destroy(struct io_wq *wq)
{
int node;
@@ -1104,3 +1162,9 @@ void io_wq_destroy(struct io_wq *wq)
kfree(wq->wqes);
kfree(wq);
}
+
+void io_wq_destroy(struct io_wq *wq)
+{
+ if (refcount_dec_and_test(&wq->use_refs))
+ __io_wq_destroy(wq);
+}