aboutsummaryrefslogtreecommitdiffstats
path: root/io_uring/io_uring.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--io_uring/io_uring.c372
1 files changed, 285 insertions, 87 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index b9640ad5069f..4a1e482747cc 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -125,6 +125,11 @@ enum {
IO_CHECK_CQ_DROPPED_BIT,
};
+enum {
+ IO_EVENTFD_OP_SIGNAL_BIT,
+ IO_EVENTFD_OP_FREE_BIT,
+};
+
struct io_defer_entry {
struct list_head list;
struct io_kiocb *req;
@@ -142,7 +147,7 @@ static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
static void io_dismantle_req(struct io_kiocb *req);
static void io_clean_op(struct io_kiocb *req);
static void io_queue_sqe(struct io_kiocb *req);
-
+static void io_move_task_work_from_local(struct io_ring_ctx *ctx);
static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
static struct kmem_cache *req_cachep;
@@ -171,6 +176,11 @@ static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
}
+static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
+{
+ return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
+}
+
static bool io_match_linked(struct io_kiocb *head)
{
struct io_kiocb *req;
@@ -316,6 +326,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->rsrc_ref_list);
INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
init_llist_head(&ctx->rsrc_put_llist);
+ init_llist_head(&ctx->work_llist);
INIT_LIST_HEAD(&ctx->tctx_list);
ctx->submit_state.free_list.next = NULL;
INIT_WQ_LIST(&ctx->locked_free_list);
@@ -477,25 +488,28 @@ static __cold void io_queue_deferred(struct io_ring_ctx *ctx)
}
}
-static void io_eventfd_signal(struct io_ring_ctx *ctx)
+
+static void io_eventfd_ops(struct rcu_head *rcu)
{
- struct io_ev_fd *ev_fd;
- bool skip;
+ struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
+ int ops = atomic_xchg(&ev_fd->ops, 0);
- spin_lock(&ctx->completion_lock);
- /*
- * Eventfd should only get triggered when at least one event has been
- * posted. Some applications rely on the eventfd notification count only
- * changing IFF a new CQE has been added to the CQ ring. There's no
- * depedency on 1:1 relationship between how many times this function is
- * called (and hence the eventfd count) and number of CQEs posted to the
- * CQ ring.
+ if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT))
+ eventfd_signal(ev_fd->cq_ev_fd, 1);
+
+ /* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback
+ * ordering in a race but if references are 0 we know we have to free
+ * it regardless.
*/
- skip = ctx->cached_cq_tail == ctx->evfd_last_cq_tail;
- ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
- spin_unlock(&ctx->completion_lock);
- if (skip)
- return;
+ if (atomic_dec_and_test(&ev_fd->refs)) {
+ eventfd_ctx_put(ev_fd->cq_ev_fd);
+ kfree(ev_fd);
+ }
+}
+
+static void io_eventfd_signal(struct io_ring_ctx *ctx)
+{
+ struct io_ev_fd *ev_fd = NULL;
rcu_read_lock();
/*
@@ -513,13 +527,46 @@ static void io_eventfd_signal(struct io_ring_ctx *ctx)
goto out;
if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
goto out;
+ if (ev_fd->eventfd_async && !io_wq_current_is_worker())
+ goto out;
- if (!ev_fd->eventfd_async || io_wq_current_is_worker())
+ if (likely(eventfd_signal_allowed())) {
eventfd_signal(ev_fd->cq_ev_fd, 1);
+ } else {
+ atomic_inc(&ev_fd->refs);
+ if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops))
+ call_rcu(&ev_fd->rcu, io_eventfd_ops);
+ else
+ atomic_dec(&ev_fd->refs);
+ }
+
out:
rcu_read_unlock();
}
+static void io_eventfd_flush_signal(struct io_ring_ctx *ctx)
+{
+ bool skip;
+
+ spin_lock(&ctx->completion_lock);
+
+ /*
+ * Eventfd should only get triggered when at least one event has been
+ * posted. Some applications rely on the eventfd notification count
+ * only changing IFF a new CQE has been added to the CQ ring. There's
+ * no depedency on 1:1 relationship between how many times this
+ * function is called (and hence the eventfd count) and number of CQEs
+ * posted to the CQ ring.
+ */
+ skip = ctx->cached_cq_tail == ctx->evfd_last_cq_tail;
+ ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
+ spin_unlock(&ctx->completion_lock);
+ if (skip)
+ return;
+
+ io_eventfd_signal(ctx);
+}
+
void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
{
if (ctx->off_timeout_used || ctx->drain_active) {
@@ -531,7 +578,7 @@ void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
spin_unlock(&ctx->completion_lock);
}
if (ctx->has_evfd)
- io_eventfd_signal(ctx);
+ io_eventfd_flush_signal(ctx);
}
static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx)
@@ -567,7 +614,7 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
io_cq_lock(ctx);
while (!list_empty(&ctx->cq_overflow_list)) {
- struct io_uring_cqe *cqe = io_get_cqe(ctx);
+ struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true);
struct io_overflow_cqe *ocqe;
if (!cqe && !force)
@@ -694,12 +741,19 @@ bool io_req_cqe_overflow(struct io_kiocb *req)
* control dependency is enough as we're using WRITE_ONCE to
* fill the cq entry
*/
-struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx)
+struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow)
{
struct io_rings *rings = ctx->rings;
unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
unsigned int free, queued, len;
+ /*
+ * Posting into the CQ when there are pending overflowed CQEs may break
+ * ordering guarantees, which will affect links, F_MORE users and more.
+ * Force overflow the completion.
+ */
+ if (!overflow && (ctx->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)))
+ return NULL;
/* userspace may cheat modifying the tail, be safe and do min */
queued = min(__io_cqring_events(ctx), ctx->cq_entries);
@@ -823,8 +877,12 @@ inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags)
void io_req_complete_failed(struct io_kiocb *req, s32 res)
{
+ const struct io_op_def *def = &io_op_defs[req->opcode];
+
req_set_fail(req);
io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
+ if (def->fail)
+ def->fail(req);
io_req_complete_post(req);
}
@@ -1047,17 +1105,41 @@ void tctx_task_work(struct callback_head *cb)
trace_io_uring_task_work_run(tctx, count, loops);
}
-void io_req_task_work_add(struct io_kiocb *req)
+static void io_req_local_work_add(struct io_kiocb *req)
+{
+ struct io_ring_ctx *ctx = req->ctx;
+
+ if (!llist_add(&req->io_task_work.node, &ctx->work_llist))
+ return;
+ /* need it for the following io_cqring_wake() */
+ smp_mb__after_atomic();
+
+ if (unlikely(atomic_read(&req->task->io_uring->in_idle))) {
+ io_move_task_work_from_local(ctx);
+ return;
+ }
+
+ if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+ atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+
+ if (ctx->has_evfd)
+ io_eventfd_signal(ctx);
+ __io_cqring_wake(ctx);
+}
+
+static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
{
struct io_uring_task *tctx = req->task->io_uring;
struct io_ring_ctx *ctx = req->ctx;
struct llist_node *node;
- bool running;
- running = !llist_add(&req->io_task_work.node, &tctx->task_list);
+ if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
+ io_req_local_work_add(req);
+ return;
+ }
/* task_work already pending, we're done */
- if (running)
+ if (!llist_add(&req->io_task_work.node, &tctx->task_list))
return;
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
@@ -1077,6 +1159,84 @@ void io_req_task_work_add(struct io_kiocb *req)
}
}
+void io_req_task_work_add(struct io_kiocb *req)
+{
+ __io_req_task_work_add(req, true);
+}
+
+static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
+{
+ struct llist_node *node;
+
+ node = llist_del_all(&ctx->work_llist);
+ while (node) {
+ struct io_kiocb *req = container_of(node, struct io_kiocb,
+ io_task_work.node);
+
+ node = node->next;
+ __io_req_task_work_add(req, false);
+ }
+}
+
+int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
+{
+ struct llist_node *node;
+ struct llist_node fake;
+ struct llist_node *current_final = NULL;
+ int ret;
+ unsigned int loops = 1;
+
+ if (unlikely(ctx->submitter_task != current))
+ return -EEXIST;
+
+ node = io_llist_xchg(&ctx->work_llist, &fake);
+ ret = 0;
+again:
+ while (node != current_final) {
+ struct llist_node *next = node->next;
+ struct io_kiocb *req = container_of(node, struct io_kiocb,
+ io_task_work.node);
+ prefetch(container_of(next, struct io_kiocb, io_task_work.node));
+ req->io_task_work.func(req, locked);
+ ret++;
+ node = next;
+ }
+
+ if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+ atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+
+ node = io_llist_cmpxchg(&ctx->work_llist, &fake, NULL);
+ if (node != &fake) {
+ loops++;
+ current_final = &fake;
+ node = io_llist_xchg(&ctx->work_llist, &fake);
+ goto again;
+ }
+
+ if (*locked)
+ io_submit_flush_completions(ctx);
+ trace_io_uring_local_work_run(ctx, ret, loops);
+ return ret;
+
+}
+
+int io_run_local_work(struct io_ring_ctx *ctx)
+{
+ bool locked;
+ int ret;
+
+ if (llist_empty(&ctx->work_llist))
+ return 0;
+
+ __set_current_state(TASK_RUNNING);
+ locked = mutex_trylock(&ctx->uring_lock);
+ ret = __io_run_local_work(ctx, &locked);
+ if (locked)
+ mutex_unlock(&ctx->uring_lock);
+
+ return ret;
+}
+
static void io_req_tw_post(struct io_kiocb *req, bool *locked)
{
io_req_complete_post(req);
@@ -1183,7 +1343,7 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
struct io_wq_work_node *node, *prev;
struct io_submit_state *state = &ctx->submit_state;
- spin_lock(&ctx->completion_lock);
+ io_cq_lock(ctx);
wq_list_for_each(node, prev, &state->compl_reqs) {
struct io_kiocb *req = container_of(node, struct io_kiocb,
comp_list);
@@ -1254,6 +1414,9 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
int ret = 0;
unsigned long check_cq;
+ if (!io_allowed_run_tw(ctx))
+ return -EEXIST;
+
check_cq = READ_ONCE(ctx->check_cq);
if (unlikely(check_cq)) {
if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
@@ -1284,13 +1447,18 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
* forever, while the workqueue is stuck trying to acquire the
* very same mutex.
*/
- if (wq_list_empty(&ctx->iopoll_list)) {
+ if (wq_list_empty(&ctx->iopoll_list) ||
+ io_task_work_pending(ctx)) {
u32 tail = ctx->cached_cq_tail;
- mutex_unlock(&ctx->uring_lock);
- io_run_task_work();
- mutex_lock(&ctx->uring_lock);
+ (void) io_run_local_work_locked(ctx);
+ if (task_work_pending(current) ||
+ wq_list_empty(&ctx->iopoll_list)) {
+ mutex_unlock(&ctx->uring_lock);
+ io_run_task_work();
+ mutex_lock(&ctx->uring_lock);
+ }
/* some requests don't go through iopoll_list */
if (tail != ctx->cached_cq_tail ||
wq_list_empty(&ctx->iopoll_list))
@@ -1377,7 +1545,7 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
static bool io_bdev_nowait(struct block_device *bdev)
{
- return !bdev || blk_queue_nowait(bdev_get_queue(bdev));
+ return !bdev || bdev_nowait(bdev);
}
/*
@@ -1423,8 +1591,6 @@ unsigned int io_file_get_flags(struct file *file)
res |= FFS_ISREG;
if (__io_file_supports_nowait(file, mode))
res |= FFS_NOWAIT;
- if (io_file_need_scm(file))
- res |= FFS_SCM;
return res;
}
@@ -1696,7 +1862,6 @@ inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
/* mask in overlapping REQ_F and FFS bits */
req->flags |= (file_ptr << REQ_F_SUPPORT_NOWAIT_BIT);
io_req_set_rsrc_node(req, ctx, 0);
- WARN_ON_ONCE(file && !test_bit(fd, ctx->file_table.bitmap));
out:
io_ring_submit_unlock(ctx, issue_flags);
return file;
@@ -1732,10 +1897,6 @@ static void io_queue_async(struct io_kiocb *req, int ret)
io_req_task_queue(req);
break;
case IO_APOLL_ABORTED:
- /*
- * Queued up for async execution, worker will release
- * submit reference when the iocb is actually submitted.
- */
io_kbuf_recycle(req, 0);
io_queue_iowq(req, NULL);
break;
@@ -2149,10 +2310,17 @@ struct io_wait_queue {
unsigned nr_timeouts;
};
+static inline bool io_has_work(struct io_ring_ctx *ctx)
+{
+ return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
+ ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
+ !llist_empty(&ctx->work_llist));
+}
+
static inline bool io_should_wake(struct io_wait_queue *iowq)
{
struct io_ring_ctx *ctx = iowq->ctx;
- int dist = ctx->cached_cq_tail - (int) iowq->cq_tail;
+ int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail;
/*
* Wake up if we have enough events, or if a timeout occurred since we
@@ -2167,20 +2335,20 @@ static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
{
struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
wq);
+ struct io_ring_ctx *ctx = iowq->ctx;
/*
* Cannot safely flush overflowed CQEs from here, ensure we wake up
* the task, and the next invocation will do it.
*/
- if (io_should_wake(iowq) ||
- test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &iowq->ctx->check_cq))
+ if (io_should_wake(iowq) || io_has_work(ctx))
return autoremove_wake_function(curr, mode, wake_flags, key);
return -1;
}
-int io_run_task_work_sig(void)
+int io_run_task_work_sig(struct io_ring_ctx *ctx)
{
- if (io_run_task_work())
+ if (io_run_task_work_ctx(ctx) > 0)
return 1;
if (task_sigpending(current))
return -EINTR;
@@ -2196,7 +2364,7 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
unsigned long check_cq;
/* make sure we run task_work before checking for signals */
- ret = io_run_task_work_sig();
+ ret = io_run_task_work_sig(ctx);
if (ret || io_should_wake(iowq))
return ret;
@@ -2226,13 +2394,20 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
ktime_t timeout = KTIME_MAX;
int ret;
+ if (!io_allowed_run_tw(ctx))
+ return -EEXIST;
+
do {
+ /* always run at least 1 task work to process local work */
+ ret = io_run_task_work_ctx(ctx);
+ if (ret < 0)
+ return ret;
io_cqring_overflow_flush(ctx);
- if (io_cqring_events(ctx) >= min_events)
+
+ /* if user messes with these they will just get an early return */
+ if (__io_cqring_events_user(ctx) >= min_events)
return 0;
- if (!io_run_task_work())
- break;
- } while (1);
+ } while (ret > 0);
if (sig) {
#ifdef CONFIG_COMPAT
@@ -2366,17 +2541,11 @@ static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg,
ev_fd->eventfd_async = eventfd_async;
ctx->has_evfd = true;
rcu_assign_pointer(ctx->io_ev_fd, ev_fd);
+ atomic_set(&ev_fd->refs, 1);
+ atomic_set(&ev_fd->ops, 0);
return 0;
}
-static void io_eventfd_put(struct rcu_head *rcu)
-{
- struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
-
- eventfd_ctx_put(ev_fd->cq_ev_fd);
- kfree(ev_fd);
-}
-
static int io_eventfd_unregister(struct io_ring_ctx *ctx)
{
struct io_ev_fd *ev_fd;
@@ -2386,7 +2555,8 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx)
if (ev_fd) {
ctx->has_evfd = false;
rcu_assign_pointer(ctx->io_ev_fd, NULL);
- call_rcu(&ev_fd->rcu, io_eventfd_put);
+ if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_FREE_BIT), &ev_fd->ops))
+ call_rcu(&ev_fd->rcu, io_eventfd_ops);
return 0;
}
@@ -2395,18 +2565,14 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx)
static void io_req_caches_free(struct io_ring_ctx *ctx)
{
- struct io_submit_state *state = &ctx->submit_state;
int nr = 0;
mutex_lock(&ctx->uring_lock);
- io_flush_cached_locked_reqs(ctx, state);
+ io_flush_cached_locked_reqs(ctx, &ctx->submit_state);
while (!io_req_cache_empty(ctx)) {
- struct io_wq_work_node *node;
- struct io_kiocb *req;
+ struct io_kiocb *req = io_alloc_req(ctx);
- node = wq_stack_extract(&state->free_list);
- req = container_of(node, struct io_kiocb, comp_list);
kmem_cache_free(req_cachep, req);
nr++;
}
@@ -2418,12 +2584,6 @@ static void io_req_caches_free(struct io_ring_ctx *ctx)
static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
{
io_sq_thread_finish(ctx);
-
- if (ctx->mm_account) {
- mmdrop(ctx->mm_account);
- ctx->mm_account = NULL;
- }
-
io_rsrc_refs_drop(ctx);
/* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
io_wait_rsrc_data(ctx->buf_data);
@@ -2464,8 +2624,11 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
}
#endif
WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
- WARN_ON_ONCE(ctx->notif_slots || ctx->nr_notif_slots);
+ if (ctx->mm_account) {
+ mmdrop(ctx->mm_account);
+ ctx->mm_account = NULL;
+ }
io_mem_free(ctx->rings);
io_mem_free(ctx->sq_sqes);
@@ -2509,8 +2672,8 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait)
* Users may get EPOLLIN meanwhile seeing nothing in cqring, this
* pushs them to do the flush.
*/
- if (io_cqring_events(ctx) ||
- test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
+
+ if (io_cqring_events(ctx) || io_has_work(ctx))
mask |= EPOLLIN | EPOLLRDNORM;
return mask;
@@ -2573,6 +2736,9 @@ static __cold void io_ring_exit_work(struct work_struct *work)
* as nobody else will be looking for them.
*/
do {
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+ io_move_task_work_from_local(ctx);
+
while (io_uring_try_cancel_requests(ctx, NULL, true))
cond_resched();
@@ -2643,12 +2809,12 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
io_poll_remove_all(ctx, NULL, true);
mutex_unlock(&ctx->uring_lock);
- /* failed during ring init, it couldn't have issued any requests */
- if (ctx->rings) {
+ /*
+ * If we failed setting up the ctx, we might not have any rings
+ * and therefore did not submit any requests
+ */
+ if (ctx->rings)
io_kill_timeouts(ctx, NULL, true);
- /* if we failed setting up the ctx, we might not have any rings */
- io_iopoll_try_reap_events(ctx);
- }
INIT_WORK(&ctx->exit_work, io_ring_exit_work);
/*
@@ -2767,13 +2933,15 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
}
}
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+ ret |= io_run_local_work(ctx) > 0;
ret |= io_cancel_defer_files(ctx, task, cancel_all);
mutex_lock(&ctx->uring_lock);
ret |= io_poll_remove_all(ctx, task, cancel_all);
mutex_unlock(&ctx->uring_lock);
ret |= io_kill_timeouts(ctx, task, cancel_all);
if (task)
- ret |= io_run_task_work();
+ ret |= io_run_task_work() > 0;
return ret;
}
@@ -2989,8 +3157,6 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
struct fd f;
long ret;
- io_run_task_work();
-
if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
IORING_ENTER_REGISTERED_RING)))
@@ -3056,12 +3222,22 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
mutex_unlock(&ctx->uring_lock);
goto out;
}
- if ((flags & IORING_ENTER_GETEVENTS) && ctx->syscall_iopoll)
- goto iopoll_locked;
+ if (flags & IORING_ENTER_GETEVENTS) {
+ if (ctx->syscall_iopoll)
+ goto iopoll_locked;
+ /*
+ * Ignore errors, we'll soon call io_cqring_wait() and
+ * it should handle ownership problems if any.
+ */
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+ (void)io_run_local_work_locked(ctx);
+ }
mutex_unlock(&ctx->uring_lock);
}
+
if (flags & IORING_ENTER_GETEVENTS) {
int ret2;
+
if (ctx->syscall_iopoll) {
/*
* We disallow the app entering submit/complete with
@@ -3180,7 +3356,7 @@ static int io_uring_install_fd(struct io_ring_ctx *ctx, struct file *file)
if (fd < 0)
return fd;
- ret = __io_uring_add_tctx_node(ctx, false);
+ ret = __io_uring_add_tctx_node(ctx);
if (ret) {
put_unused_fd(fd);
return ret;
@@ -3290,18 +3466,30 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
if (ctx->flags & IORING_SETUP_SQPOLL) {
/* IPI related flags don't make sense with SQPOLL */
if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
- IORING_SETUP_TASKRUN_FLAG))
+ IORING_SETUP_TASKRUN_FLAG |
+ IORING_SETUP_DEFER_TASKRUN))
goto err;
ctx->notify_method = TWA_SIGNAL_NO_IPI;
} else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
ctx->notify_method = TWA_SIGNAL_NO_IPI;
} else {
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
+ if (ctx->flags & IORING_SETUP_TASKRUN_FLAG &&
+ !(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
goto err;
ctx->notify_method = TWA_SIGNAL;
}
/*
+ * For DEFER_TASKRUN we require the completion task to be the same as the
+ * submission task. This implies that there is only one submitter, so enforce
+ * that.
+ */
+ if (ctx->flags & IORING_SETUP_DEFER_TASKRUN &&
+ !(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) {
+ goto err;
+ }
+
+ /*
* This is just grabbed for accounting purposes. When a process exits,
* the mm is exited and dropped before the files, hence we need to hang
* on to this mm purely for the purposes of being able to unaccount
@@ -3354,6 +3542,10 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
goto err;
}
+ if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
+ && !(ctx->flags & IORING_SETUP_R_DISABLED))
+ ctx->submitter_task = get_task_struct(current);
+
file = io_uring_get_file(ctx);
if (IS_ERR(file)) {
ret = PTR_ERR(file);
@@ -3401,7 +3593,7 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
IORING_SETUP_SQE128 | IORING_SETUP_CQE32 |
- IORING_SETUP_SINGLE_ISSUER))
+ IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN))
return -EINVAL;
return io_uring_create(entries, &p, params);
@@ -3545,6 +3737,9 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx)
if (!(ctx->flags & IORING_SETUP_R_DISABLED))
return -EBADFD;
+ if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task)
+ ctx->submitter_task = get_task_struct(current);
+
if (ctx->restrictions.registered)
ctx->restricted = 1;
@@ -3696,6 +3891,9 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
if (WARN_ON_ONCE(percpu_ref_is_dying(&ctx->refs)))
return -ENXIO;
+ if (ctx->submitter_task && ctx->submitter_task != current)
+ return -EEXIST;
+
if (ctx->restricted) {
if (opcode >= IORING_REGISTER_LAST)
return -EINVAL;
@@ -3864,7 +4062,7 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
ctx = f.file->private_data;
- io_run_task_work();
+ io_run_task_work_ctx(ctx);
mutex_lock(&ctx->uring_lock);
ret = __io_uring_register(ctx, opcode, arg, nr_args);