diff options
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r-- | fs/io-wq.c | 240 |
1 files changed, 152 insertions, 88 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index 5147d2213b01..5cef075c0b37 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -16,6 +16,7 @@ #include <linux/slab.h> #include <linux/kthread.h> #include <linux/rculist_nulls.h> +#include <linux/fs_struct.h> #include "io-wq.h" @@ -56,8 +57,10 @@ struct io_worker { struct rcu_head rcu; struct mm_struct *mm; - const struct cred *creds; + const struct cred *cur_creds; + const struct cred *saved_creds; struct files_struct *restore_files; + struct fs_struct *restore_fs; }; #if BITS_PER_LONG == 64 @@ -109,10 +112,10 @@ struct io_wq { struct task_struct *manager; struct user_struct *user; - const struct cred *creds; - struct mm_struct *mm; refcount_t refs; struct completion done; + + refcount_t use_refs; }; static bool io_worker_get(struct io_worker *worker) @@ -135,9 +138,9 @@ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) { bool dropped_lock = false; - if (worker->creds) { - revert_creds(worker->creds); - worker->creds = NULL; + if (worker->saved_creds) { + revert_creds(worker->saved_creds); + worker->cur_creds = worker->saved_creds = NULL; } if (current->files != worker->restore_files) { @@ -150,6 +153,9 @@ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) task_unlock(current); } + if (current->fs != worker->restore_fs) + current->fs = worker->restore_fs; + /* * If we have an active mm, we need to drop the wq lock before unusing * it. If we do, return true and let the caller retry the idle loop. @@ -310,6 +316,7 @@ static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); worker->restore_files = current->files; + worker->restore_fs = current->fs; io_wqe_inc_running(wqe, worker); } @@ -396,6 +403,43 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) return NULL; } +static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work) +{ + if (worker->mm) { + unuse_mm(worker->mm); + mmput(worker->mm); + worker->mm = NULL; + } + if (!work->mm) { + set_fs(KERNEL_DS); + return; + } + if (mmget_not_zero(work->mm)) { + use_mm(work->mm); + if (!worker->mm) + set_fs(USER_DS); + worker->mm = work->mm; + /* hang on to this mm */ + work->mm = NULL; + return; + } + + /* failed grabbing mm, ensure work gets cancelled */ + work->flags |= IO_WQ_WORK_CANCEL; +} + +static void io_wq_switch_creds(struct io_worker *worker, + struct io_wq_work *work) +{ + const struct cred *old_creds = override_creds(work->creds); + + worker->cur_creds = work->creds; + if (worker->saved_creds) + put_cred(old_creds); /* creds set by previous switch */ + else + worker->saved_creds = old_creds; +} + static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { @@ -438,30 +482,27 @@ next: if (work->flags & IO_WQ_WORK_CB) work->func(&work); - if ((work->flags & IO_WQ_WORK_NEEDS_FILES) && - current->files != work->files) { + if (work->files && current->files != work->files) { task_lock(current); current->files = work->files; task_unlock(current); } - if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm && - wq->mm) { - if (mmget_not_zero(wq->mm)) { - use_mm(wq->mm); - set_fs(USER_DS); - worker->mm = wq->mm; - } else { - work->flags |= IO_WQ_WORK_CANCEL; - } - } - if (!worker->creds) - worker->creds = override_creds(wq->creds); + if (work->fs && current->fs != work->fs) + current->fs = work->fs; + if (work->mm != worker->mm) + io_wq_switch_mm(worker, work); + if (worker->cur_creds != work->creds) + io_wq_switch_creds(worker, work); + /* + * OK to set IO_WQ_WORK_CANCEL even for uncancellable work, + * the worker function will do the right thing. + */ if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) work->flags |= IO_WQ_WORK_CANCEL; if (worker->mm) work->flags |= IO_WQ_WORK_HAS_MM; - if (wq->get_work && !(work->flags & IO_WQ_WORK_INTERNAL)) { + if (wq->get_work) { put_work = work; wq->get_work(work); } @@ -494,42 +535,23 @@ next: } while (1); } -static inline void io_worker_spin_for_work(struct io_wqe *wqe) -{ - int i = 0; - - while (++i < 1000) { - if (io_wqe_run_queue(wqe)) - break; - if (need_resched()) - break; - cpu_relax(); - } -} - static int io_wqe_worker(void *data) { struct io_worker *worker = data; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; - bool did_work; io_worker_start(wqe, worker); - did_work = false; while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { set_current_state(TASK_INTERRUPTIBLE); loop: - if (did_work) - io_worker_spin_for_work(wqe); spin_lock_irq(&wqe->lock); if (io_wqe_run_queue(wqe)) { __set_current_state(TASK_RUNNING); io_worker_handle_work(worker); - did_work = true; goto loop; } - did_work = false; /* drops the lock on success, retry */ if (__io_worker_idle(wqe, worker)) { __release(&wqe->lock); @@ -658,11 +680,16 @@ static int io_wq_manager(void *data) /* create fixed workers */ refcount_set(&wq->refs, workers_to_create); for_each_node(node) { + if (!node_online(node)) + continue; if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND)) goto err; workers_to_create--; } + while (workers_to_create--) + refcount_dec(&wq->refs); + complete(&wq->done); while (!kthread_should_stop()) { @@ -670,6 +697,9 @@ static int io_wq_manager(void *data) struct io_wqe *wqe = wq->wqes[node]; bool fork_worker[2] = { false, false }; + if (!node_online(node)) + continue; + spin_lock_irq(&wqe->lock); if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) fork_worker[IO_WQ_ACCT_BOUND] = true; @@ -717,9 +747,21 @@ static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, return true; } +static void io_run_cancel(struct io_wq_work *work) +{ + do { + struct io_wq_work *old_work = work; + + work->flags |= IO_WQ_WORK_CANCEL; + work->func(&work); + work = (work == old_work) ? NULL : work; + } while (work); +} + static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); + int work_flags; unsigned long flags; /* @@ -729,17 +771,18 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) * It's close enough to not be an issue, fork() has the same delay. */ if (unlikely(!io_wq_can_queue(wqe, acct, work))) { - work->flags |= IO_WQ_WORK_CANCEL; - work->func(&work); + io_run_cancel(work); return; } + work_flags = work->flags; spin_lock_irqsave(&wqe->lock, flags); wq_list_add_tail(&work->list, &wqe->work_list); wqe->flags &= ~IO_WQE_FLAG_STALLED; spin_unlock_irqrestore(&wqe->lock, flags); - if (!atomic_read(&acct->nr_running)) + if ((work_flags & IO_WQ_WORK_CONCURRENT) || + !atomic_read(&acct->nr_running)) io_wqe_wake_worker(wqe, acct); } @@ -785,7 +828,9 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe, list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { if (io_worker_get(worker)) { - ret = func(worker, data); + /* no task if node is/was offline */ + if (worker->task) + ret = func(worker, data); io_worker_release(worker); if (ret) break; @@ -828,6 +873,7 @@ static bool io_work_cancel(struct io_worker *worker, void *cancel_data) */ spin_lock_irqsave(&worker->lock, flags); if (worker->cur_work && + !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) && data->cancel(worker->cur_work, data->caller_data)) { send_sig(SIGINT, worker->task, 1); ret = true; @@ -864,8 +910,7 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe, spin_unlock_irqrestore(&wqe->lock, flags); if (found) { - work->flags |= IO_WQ_WORK_CANCEL; - work->func(&work); + io_run_cancel(work); return IO_WQ_CANCEL_OK; } @@ -892,17 +937,20 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, return ret; } +struct work_match { + bool (*fn)(struct io_wq_work *, void *data); + void *data; +}; + static bool io_wq_worker_cancel(struct io_worker *worker, void *data) { - struct io_wq_work *work = data; + struct work_match *match = data; unsigned long flags; bool ret = false; - if (worker->cur_work != work) - return false; - spin_lock_irqsave(&worker->lock, flags); - if (worker->cur_work == work) { + if (match->fn(worker->cur_work, match->data) && + !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL)) { send_sig(SIGINT, worker->task, 1); ret = true; } @@ -912,15 +960,13 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data) } static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, - struct io_wq_work *cwork) + struct work_match *match) { struct io_wq_work_node *node, *prev; struct io_wq_work *work; unsigned long flags; bool found = false; - cwork->flags |= IO_WQ_WORK_CANCEL; - /* * First check pending list, if we're lucky we can just remove it * from there. CANCEL_OK means that the work is returned as-new, @@ -930,7 +976,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, wq_list_for_each(node, prev, &wqe->work_list) { work = container_of(node, struct io_wq_work, list); - if (work == cwork) { + if (match->fn(work, match->data)) { wq_node_del(&wqe->work_list, node, prev); found = true; break; @@ -939,8 +985,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, spin_unlock_irqrestore(&wqe->lock, flags); if (found) { - work->flags |= IO_WQ_WORK_CANCEL; - work->func(&work); + io_run_cancel(work); return IO_WQ_CANCEL_OK; } @@ -951,20 +996,31 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, * completion will run normally in this case. */ rcu_read_lock(); - found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork); + found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, match); rcu_read_unlock(); return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND; } +static bool io_wq_work_match(struct io_wq_work *work, void *data) +{ + return work == data; +} + enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) { + struct work_match match = { + .fn = io_wq_work_match, + .data = cwork + }; enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; int node; + cwork->flags |= IO_WQ_WORK_CANCEL; + for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; - ret = io_wqe_cancel_work(wqe, cwork); + ret = io_wqe_cancel_work(wqe, &match); if (ret != IO_WQ_CANCEL_NOTFOUND) break; } @@ -972,38 +1028,33 @@ enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) return ret; } -struct io_wq_flush_data { - struct io_wq_work work; - struct completion done; -}; - -static void io_wq_flush_func(struct io_wq_work **workptr) +static bool io_wq_pid_match(struct io_wq_work *work, void *data) { - struct io_wq_work *work = *workptr; - struct io_wq_flush_data *data; + pid_t pid = (pid_t) (unsigned long) data; - data = container_of(work, struct io_wq_flush_data, work); - complete(&data->done); + if (work) + return work->task_pid == pid; + return false; } -/* - * Doesn't wait for previously queued work to finish. When this completes, - * it just means that previously queued work was started. - */ -void io_wq_flush(struct io_wq *wq) +enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid) { - struct io_wq_flush_data data; + struct work_match match = { + .fn = io_wq_pid_match, + .data = (void *) (unsigned long) pid + }; + enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; int node; for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; - init_completion(&data.done); - INIT_IO_WORK(&data.work, io_wq_flush_func); - data.work.flags |= IO_WQ_WORK_INTERNAL; - io_wqe_enqueue(wqe, &data.work); - wait_for_completion(&data.done); + ret = io_wqe_cancel_work(wqe, &match); + if (ret != IO_WQ_CANCEL_NOTFOUND) + break; } + + return ret; } struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) @@ -1026,16 +1077,18 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) /* caller must already hold a reference to this */ wq->user = data->user; - wq->creds = data->creds; for_each_node(node) { struct io_wqe *wqe; + int alloc_node = node; - wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node); + if (!node_online(alloc_node)) + alloc_node = NUMA_NO_NODE; + wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node); if (!wqe) goto err; wq->wqes[node] = wqe; - wqe->node = node; + wqe->node = alloc_node; wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); if (wq->user) { @@ -1043,7 +1096,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) task_rlimit(current, RLIMIT_NPROC); } atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); - wqe->node = node; wqe->wq = wq; spin_lock_init(&wqe->lock); INIT_WQ_LIST(&wqe->work_list); @@ -1053,9 +1105,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) init_completion(&wq->done); - /* caller must have already done mmgrab() on this mm */ - wq->mm = data->mm; - wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager"); if (!IS_ERR(wq->manager)) { wake_up_process(wq->manager); @@ -1064,6 +1113,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ret = -ENOMEM; goto err; } + refcount_set(&wq->use_refs, 1); reinit_completion(&wq->done); return wq; } @@ -1078,13 +1128,21 @@ err: return ERR_PTR(ret); } +bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) +{ + if (data->get_work != wq->get_work || data->put_work != wq->put_work) + return false; + + return refcount_inc_not_zero(&wq->use_refs); +} + static bool io_wq_worker_wake(struct io_worker *worker, void *data) { wake_up_process(worker->task); return false; } -void io_wq_destroy(struct io_wq *wq) +static void __io_wq_destroy(struct io_wq *wq) { int node; @@ -1104,3 +1162,9 @@ void io_wq_destroy(struct io_wq *wq) kfree(wq->wqes); kfree(wq); } + +void io_wq_destroy(struct io_wq *wq) +{ + if (refcount_dec_and_test(&wq->use_refs)) + __io_wq_destroy(wq); +} |