aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--include/linux/io_uring_types.h12
-rw-r--r--io_uring/cancel.c42
-rw-r--r--io_uring/cancel.h8
-rw-r--r--io_uring/futex.c58
-rw-r--r--io_uring/io-wq.c230
-rw-r--r--io_uring/io-wq.h7
-rw-r--r--io_uring/io_uring.c169
-rw-r--r--io_uring/io_uring.h9
-rw-r--r--io_uring/kbuf.c172
-rw-r--r--io_uring/kbuf.h100
-rw-r--r--io_uring/msg_ring.c2
-rw-r--r--io_uring/net.c18
-rw-r--r--io_uring/notif.c4
-rw-r--r--io_uring/poll.c18
-rw-r--r--io_uring/poll.h4
-rw-r--r--io_uring/rsrc.h2
-rw-r--r--io_uring/rw.c4
-rw-r--r--io_uring/rw.h3
-rw-r--r--io_uring/splice.c3
-rw-r--r--io_uring/timeout.c16
-rw-r--r--io_uring/uring_cmd.c2
-rw-r--r--io_uring/waitid.c50
22 files changed, 440 insertions, 493 deletions
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index 3def525a1da3..123e69368730 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -360,7 +360,6 @@ struct io_ring_ctx {
spinlock_t completion_lock;
- struct list_head io_buffers_comp;
struct list_head cq_overflow_list;
struct hlist_head waitid_list;
@@ -379,8 +378,6 @@ struct io_ring_ctx {
unsigned int file_alloc_start;
unsigned int file_alloc_end;
- struct list_head io_buffers_cache;
-
/* Keep this last, we don't need it for the fast path */
struct wait_queue_head poll_wq;
struct io_restriction restrictions;
@@ -439,8 +436,15 @@ struct io_ring_ctx {
struct io_mapped_region param_region;
};
+/*
+ * Token indicating function is called in task work context:
+ * ctx->uring_lock is held and any completions generated will be flushed.
+ * ONLY core io_uring.c should instantiate this struct.
+ */
struct io_tw_state {
};
+/* Alias to use in code that doesn't instantiate struct io_tw_state */
+typedef struct io_tw_state io_tw_token_t;
enum {
REQ_F_FIXED_FILE_BIT = IOSQE_FIXED_FILE_BIT,
@@ -566,7 +570,7 @@ enum {
REQ_F_HAS_METADATA = IO_REQ_FLAG(REQ_F_HAS_METADATA_BIT),
};
-typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
+typedef void (*io_req_tw_func_t)(struct io_kiocb *req, io_tw_token_t tw);
struct io_task_work {
struct llist_node node;
diff --git a/io_uring/cancel.c b/io_uring/cancel.c
index 484193567839..0870060bac7c 100644
--- a/io_uring/cancel.c
+++ b/io_uring/cancel.c
@@ -341,3 +341,45 @@ out:
fput(file);
return ret;
}
+
+bool io_cancel_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx,
+ struct hlist_head *list, bool cancel_all,
+ bool (*cancel)(struct io_kiocb *))
+{
+ struct hlist_node *tmp;
+ struct io_kiocb *req;
+ bool found = false;
+
+ lockdep_assert_held(&ctx->uring_lock);
+
+ hlist_for_each_entry_safe(req, tmp, list, hash_node) {
+ if (!io_match_task_safe(req, tctx, cancel_all))
+ continue;
+ hlist_del_init(&req->hash_node);
+ if (cancel(req))
+ found = true;
+ }
+
+ return found;
+}
+
+int io_cancel_remove(struct io_ring_ctx *ctx, struct io_cancel_data *cd,
+ unsigned int issue_flags, struct hlist_head *list,
+ bool (*cancel)(struct io_kiocb *))
+{
+ struct hlist_node *tmp;
+ struct io_kiocb *req;
+ int nr = 0;
+
+ io_ring_submit_lock(ctx, issue_flags);
+ hlist_for_each_entry_safe(req, tmp, list, hash_node) {
+ if (!io_cancel_req_match(req, cd))
+ continue;
+ if (cancel(req))
+ nr++;
+ if (!(cd->flags & IORING_ASYNC_CANCEL_ALL))
+ break;
+ }
+ io_ring_submit_unlock(ctx, issue_flags);
+ return nr ?: -ENOENT;
+}
diff --git a/io_uring/cancel.h b/io_uring/cancel.h
index bbfea2cd00ea..43e9bb74e9d1 100644
--- a/io_uring/cancel.h
+++ b/io_uring/cancel.h
@@ -24,6 +24,14 @@ int io_try_cancel(struct io_uring_task *tctx, struct io_cancel_data *cd,
int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg);
bool io_cancel_req_match(struct io_kiocb *req, struct io_cancel_data *cd);
+bool io_cancel_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx,
+ struct hlist_head *list, bool cancel_all,
+ bool (*cancel)(struct io_kiocb *));
+
+int io_cancel_remove(struct io_ring_ctx *ctx, struct io_cancel_data *cd,
+ unsigned int issue_flags, struct hlist_head *list,
+ bool (*cancel)(struct io_kiocb *));
+
static inline bool io_cancel_match_sequence(struct io_kiocb *req, int sequence)
{
if (req->cancel_seq_set && sequence == req->work.cancel_seq)
diff --git a/io_uring/futex.c b/io_uring/futex.c
index 43e2143255f5..b7581766406c 100644
--- a/io_uring/futex.c
+++ b/io_uring/futex.c
@@ -44,30 +44,30 @@ void io_futex_cache_free(struct io_ring_ctx *ctx)
io_alloc_cache_free(&ctx->futex_cache, kfree);
}
-static void __io_futex_complete(struct io_kiocb *req, struct io_tw_state *ts)
+static void __io_futex_complete(struct io_kiocb *req, io_tw_token_t tw)
{
req->async_data = NULL;
hlist_del_init(&req->hash_node);
- io_req_task_complete(req, ts);
+ io_req_task_complete(req, tw);
}
-static void io_futex_complete(struct io_kiocb *req, struct io_tw_state *ts)
+static void io_futex_complete(struct io_kiocb *req, io_tw_token_t tw)
{
struct io_futex_data *ifd = req->async_data;
struct io_ring_ctx *ctx = req->ctx;
- io_tw_lock(ctx, ts);
+ io_tw_lock(ctx, tw);
if (!io_alloc_cache_put(&ctx->futex_cache, ifd))
kfree(ifd);
- __io_futex_complete(req, ts);
+ __io_futex_complete(req, tw);
}
-static void io_futexv_complete(struct io_kiocb *req, struct io_tw_state *ts)
+static void io_futexv_complete(struct io_kiocb *req, io_tw_token_t tw)
{
struct io_futex *iof = io_kiocb_to_cmd(req, struct io_futex);
struct futex_vector *futexv = req->async_data;
- io_tw_lock(req->ctx, ts);
+ io_tw_lock(req->ctx, tw);
if (!iof->futexv_unqueued) {
int res;
@@ -79,7 +79,7 @@ static void io_futexv_complete(struct io_kiocb *req, struct io_tw_state *ts)
kfree(req->async_data);
req->flags &= ~REQ_F_ASYNC_DATA;
- __io_futex_complete(req, ts);
+ __io_futex_complete(req, tw);
}
static bool io_futexv_claim(struct io_futex *iof)
@@ -90,7 +90,7 @@ static bool io_futexv_claim(struct io_futex *iof)
return true;
}
-static bool __io_futex_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req)
+static bool __io_futex_cancel(struct io_kiocb *req)
{
/* futex wake already done or in progress */
if (req->opcode == IORING_OP_FUTEX_WAIT) {
@@ -116,49 +116,13 @@ static bool __io_futex_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req)
int io_futex_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd,
unsigned int issue_flags)
{
- struct hlist_node *tmp;
- struct io_kiocb *req;
- int nr = 0;
-
- if (cd->flags & (IORING_ASYNC_CANCEL_FD|IORING_ASYNC_CANCEL_FD_FIXED))
- return -ENOENT;
-
- io_ring_submit_lock(ctx, issue_flags);
- hlist_for_each_entry_safe(req, tmp, &ctx->futex_list, hash_node) {
- if (req->cqe.user_data != cd->data &&
- !(cd->flags & IORING_ASYNC_CANCEL_ANY))
- continue;
- if (__io_futex_cancel(ctx, req))
- nr++;
- if (!(cd->flags & IORING_ASYNC_CANCEL_ALL))
- break;
- }
- io_ring_submit_unlock(ctx, issue_flags);
-
- if (nr)
- return nr;
-
- return -ENOENT;
+ return io_cancel_remove(ctx, cd, issue_flags, &ctx->futex_list, __io_futex_cancel);
}
bool io_futex_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx,
bool cancel_all)
{
- struct hlist_node *tmp;
- struct io_kiocb *req;
- bool found = false;
-
- lockdep_assert_held(&ctx->uring_lock);
-
- hlist_for_each_entry_safe(req, tmp, &ctx->futex_list, hash_node) {
- if (!io_match_task_safe(req, tctx, cancel_all))
- continue;
- hlist_del_init(&req->hash_node);
- __io_futex_cancel(ctx, req);
- found = true;
- }
-
- return found;
+ return io_cancel_remove_all(ctx, tctx, &ctx->futex_list, cancel_all, __io_futex_cancel);
}
int io_futex_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c
index 5d0928f37471..f7d328feb722 100644
--- a/io_uring/io-wq.c
+++ b/io_uring/io-wq.c
@@ -30,7 +30,6 @@ enum {
IO_WORKER_F_UP = 0, /* up and active */
IO_WORKER_F_RUNNING = 1, /* account as running */
IO_WORKER_F_FREE = 2, /* worker on free list */
- IO_WORKER_F_BOUND = 3, /* is doing bounded work */
};
enum {
@@ -46,12 +45,12 @@ enum {
*/
struct io_worker {
refcount_t ref;
- int create_index;
unsigned long flags;
struct hlist_nulls_node nulls_node;
struct list_head all_list;
struct task_struct *task;
struct io_wq *wq;
+ struct io_wq_acct *acct;
struct io_wq_work *cur_work;
raw_spinlock_t lock;
@@ -77,10 +76,27 @@ struct io_worker {
#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
struct io_wq_acct {
+ /**
+ * Protects access to the worker lists.
+ */
+ raw_spinlock_t workers_lock;
+
unsigned nr_workers;
unsigned max_workers;
- int index;
atomic_t nr_running;
+
+ /**
+ * The list of free workers. Protected by #workers_lock
+ * (write) and RCU (read).
+ */
+ struct hlist_nulls_head free_list;
+
+ /**
+ * The list of all workers. Protected by #workers_lock
+ * (write) and RCU (read).
+ */
+ struct list_head all_list;
+
raw_spinlock_t lock;
struct io_wq_work_list work_list;
unsigned long flags;
@@ -112,12 +128,6 @@ struct io_wq {
struct io_wq_acct acct[IO_WQ_ACCT_NR];
- /* lock protects access to elements below */
- raw_spinlock_t lock;
-
- struct hlist_nulls_head free_list;
- struct list_head all_list;
-
struct wait_queue_entry wait;
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
@@ -135,7 +145,7 @@ struct io_cb_cancel_data {
bool cancel_all;
};
-static bool create_io_worker(struct io_wq *wq, int index);
+static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct);
static void io_wq_dec_running(struct io_worker *worker);
static bool io_acct_cancel_pending_work(struct io_wq *wq,
struct io_wq_acct *acct,
@@ -160,14 +170,14 @@ static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
}
static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
- struct io_wq_work *work)
+ unsigned int work_flags)
{
- return io_get_acct(wq, !(atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND));
+ return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND));
}
static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
{
- return io_get_acct(worker->wq, test_bit(IO_WORKER_F_BOUND, &worker->flags));
+ return worker->acct;
}
static void io_worker_ref_put(struct io_wq *wq)
@@ -192,9 +202,9 @@ static void io_worker_cancel_cb(struct io_worker *worker)
struct io_wq *wq = worker->wq;
atomic_dec(&acct->nr_running);
- raw_spin_lock(&wq->lock);
+ raw_spin_lock(&acct->workers_lock);
acct->nr_workers--;
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
io_worker_ref_put(wq);
clear_bit_unlock(0, &worker->create_state);
io_worker_release(worker);
@@ -213,6 +223,7 @@ static bool io_task_worker_match(struct callback_head *cb, void *data)
static void io_worker_exit(struct io_worker *worker)
{
struct io_wq *wq = worker->wq;
+ struct io_wq_acct *acct = io_wq_get_acct(worker);
while (1) {
struct callback_head *cb = task_work_cancel_match(wq->task,
@@ -226,11 +237,11 @@ static void io_worker_exit(struct io_worker *worker)
io_worker_release(worker);
wait_for_completion(&worker->ref_done);
- raw_spin_lock(&wq->lock);
+ raw_spin_lock(&acct->workers_lock);
if (test_bit(IO_WORKER_F_FREE, &worker->flags))
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
io_wq_dec_running(worker);
/*
* this worker is a goner, clear ->worker_private to avoid any
@@ -269,8 +280,7 @@ static inline bool io_acct_run_queue(struct io_wq_acct *acct)
* Check head of free list for an available worker. If one isn't available,
* caller must create one.
*/
-static bool io_wq_activate_free_worker(struct io_wq *wq,
- struct io_wq_acct *acct)
+static bool io_acct_activate_free_worker(struct io_wq_acct *acct)
__must_hold(RCU)
{
struct hlist_nulls_node *n;
@@ -281,13 +291,9 @@ static bool io_wq_activate_free_worker(struct io_wq *wq,
* activate. If a given worker is on the free_list but in the process
* of exiting, keep trying.
*/
- hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) {
+ hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) {
if (!io_worker_get(worker))
continue;
- if (io_wq_get_acct(worker) != acct) {
- io_worker_release(worker);
- continue;
- }
/*
* If the worker is already running, it's either already
* starting work or finishing work. In either case, if it does
@@ -314,16 +320,16 @@ static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
if (unlikely(!acct->max_workers))
pr_warn_once("io-wq is not configured for unbound workers");
- raw_spin_lock(&wq->lock);
+ raw_spin_lock(&acct->workers_lock);
if (acct->nr_workers >= acct->max_workers) {
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
return true;
}
acct->nr_workers++;
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
atomic_inc(&acct->nr_running);
atomic_inc(&wq->worker_refs);
- return create_io_worker(wq, acct->index);
+ return create_io_worker(wq, acct);
}
static void io_wq_inc_running(struct io_worker *worker)
@@ -343,16 +349,16 @@ static void create_worker_cb(struct callback_head *cb)
worker = container_of(cb, struct io_worker, create_work);
wq = worker->wq;
- acct = &wq->acct[worker->create_index];
- raw_spin_lock(&wq->lock);
+ acct = worker->acct;
+ raw_spin_lock(&acct->workers_lock);
if (acct->nr_workers < acct->max_workers) {
acct->nr_workers++;
do_create = true;
}
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
if (do_create) {
- create_io_worker(wq, worker->create_index);
+ create_io_worker(wq, acct);
} else {
atomic_dec(&acct->nr_running);
io_worker_ref_put(wq);
@@ -384,7 +390,6 @@ static bool io_queue_worker_create(struct io_worker *worker,
atomic_inc(&wq->worker_refs);
init_task_work(&worker->create_work, func);
- worker->create_index = acct->index;
if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
/*
* EXIT may have been set after checking it above, check after
@@ -430,31 +435,36 @@ static void io_wq_dec_running(struct io_worker *worker)
* Worker will start processing some work. Move it to the busy list, if
* it's currently on the freelist
*/
-static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker)
+static void __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker)
{
if (test_bit(IO_WORKER_F_FREE, &worker->flags)) {
clear_bit(IO_WORKER_F_FREE, &worker->flags);
- raw_spin_lock(&wq->lock);
+ raw_spin_lock(&acct->workers_lock);
hlist_nulls_del_init_rcu(&worker->nulls_node);
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
}
}
/*
* No work, worker going to sleep. Move to freelist.
*/
-static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker)
- __must_hold(wq->lock)
+static void __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker)
+ __must_hold(acct->workers_lock)
{
if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) {
set_bit(IO_WORKER_F_FREE, &worker->flags);
- hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
+ hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list);
}
}
+static inline unsigned int __io_get_work_hash(unsigned int work_flags)
+{
+ return work_flags >> IO_WQ_HASH_SHIFT;
+}
+
static inline unsigned int io_get_work_hash(struct io_wq_work *work)
{
- return atomic_read(&work->flags) >> IO_WQ_HASH_SHIFT;
+ return __io_get_work_hash(atomic_read(&work->flags));
}
static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
@@ -475,26 +485,27 @@ static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
}
static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct,
- struct io_worker *worker)
+ struct io_wq *wq)
__must_hold(acct->lock)
{
struct io_wq_work_node *node, *prev;
struct io_wq_work *work, *tail;
unsigned int stall_hash = -1U;
- struct io_wq *wq = worker->wq;
wq_list_for_each(node, prev, &acct->work_list) {
+ unsigned int work_flags;
unsigned int hash;
work = container_of(node, struct io_wq_work, list);
/* not hashed, can run anytime */
- if (!io_wq_is_hashed(work)) {
+ work_flags = atomic_read(&work->flags);
+ if (!__io_wq_is_hashed(work_flags)) {
wq_list_del(&acct->work_list, node, prev);
return work;
}
- hash = io_get_work_hash(work);
+ hash = __io_get_work_hash(work_flags);
/* all items with this hash lie in [work, tail] */
tail = wq->hash_tail[hash];
@@ -564,7 +575,7 @@ static void io_worker_handle_work(struct io_wq_acct *acct,
* can't make progress, any work completion or insertion will
* clear the stalled flag.
*/
- work = io_get_next_work(acct, worker);
+ work = io_get_next_work(acct, wq);
if (work) {
/*
* Make sure cancelation can find this, even before
@@ -583,7 +594,7 @@ static void io_worker_handle_work(struct io_wq_acct *acct,
if (!work)
break;
- __io_worker_busy(wq, worker);
+ __io_worker_busy(acct, worker);
io_assign_current_work(worker, work);
__set_current_state(TASK_RUNNING);
@@ -591,12 +602,15 @@ static void io_worker_handle_work(struct io_wq_acct *acct,
/* handle a whole dependent link */
do {
struct io_wq_work *next_hashed, *linked;
- unsigned int hash = io_get_work_hash(work);
+ unsigned int work_flags = atomic_read(&work->flags);
+ unsigned int hash = __io_wq_is_hashed(work_flags)
+ ? __io_get_work_hash(work_flags)
+ : -1U;
next_hashed = wq_next_work(work);
if (do_kill &&
- (atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND))
+ (work_flags & IO_WQ_WORK_UNBOUND))
atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
wq->do_work(work);
io_assign_current_work(worker, NULL);
@@ -654,20 +668,20 @@ static int io_wq_worker(void *data)
while (io_acct_run_queue(acct))
io_worker_handle_work(acct, worker);
- raw_spin_lock(&wq->lock);
+ raw_spin_lock(&acct->workers_lock);
/*
* Last sleep timed out. Exit if we're not the last worker,
* or if someone modified our affinity.
*/
if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
acct->nr_workers--;
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
__set_current_state(TASK_RUNNING);
break;
}
last_timeout = false;
- __io_worker_idle(wq, worker);
- raw_spin_unlock(&wq->lock);
+ __io_worker_idle(acct, worker);
+ raw_spin_unlock(&acct->workers_lock);
if (io_run_task_work())
continue;
ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
@@ -728,18 +742,18 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
io_wq_dec_running(worker);
}
-static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker,
+static void io_init_new_worker(struct io_wq *wq, struct io_wq_acct *acct, struct io_worker *worker,
struct task_struct *tsk)
{
tsk->worker_private = worker;
worker->task = tsk;
set_cpus_allowed_ptr(tsk, wq->cpu_mask);
- raw_spin_lock(&wq->lock);
- hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
- list_add_tail_rcu(&worker->all_list, &wq->all_list);
+ raw_spin_lock(&acct->workers_lock);
+ hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list);
+ list_add_tail_rcu(&worker->all_list, &acct->all_list);
set_bit(IO_WORKER_F_FREE, &worker->flags);
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
wake_up_new_task(tsk);
}
@@ -775,20 +789,20 @@ static void create_worker_cont(struct callback_head *cb)
struct io_worker *worker;
struct task_struct *tsk;
struct io_wq *wq;
+ struct io_wq_acct *acct;
worker = container_of(cb, struct io_worker, create_work);
clear_bit_unlock(0, &worker->create_state);
wq = worker->wq;
+ acct = io_wq_get_acct(worker);
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
- io_init_new_worker(wq, worker, tsk);
+ io_init_new_worker(wq, acct, worker, tsk);
io_worker_release(worker);
return;
} else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
- struct io_wq_acct *acct = io_wq_get_acct(worker);
-
atomic_dec(&acct->nr_running);
- raw_spin_lock(&wq->lock);
+ raw_spin_lock(&acct->workers_lock);
acct->nr_workers--;
if (!acct->nr_workers) {
struct io_cb_cancel_data match = {
@@ -796,11 +810,11 @@ static void create_worker_cont(struct callback_head *cb)
.cancel_all = true,
};
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
while (io_acct_cancel_pending_work(wq, acct, &match))
;
} else {
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
}
io_worker_ref_put(wq);
kfree(worker);
@@ -821,9 +835,8 @@ static void io_workqueue_create(struct work_struct *work)
kfree(worker);
}
-static bool create_io_worker(struct io_wq *wq, int index)
+static bool create_io_worker(struct io_wq *wq, struct io_wq_acct *acct)
{
- struct io_wq_acct *acct = &wq->acct[index];
struct io_worker *worker;
struct task_struct *tsk;
@@ -833,24 +846,22 @@ static bool create_io_worker(struct io_wq *wq, int index)
if (!worker) {
fail:
atomic_dec(&acct->nr_running);
- raw_spin_lock(&wq->lock);
+ raw_spin_lock(&acct->workers_lock);
acct->nr_workers--;
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
io_worker_ref_put(wq);
return false;
}
refcount_set(&worker->ref, 1);
worker->wq = wq;
+ worker->acct = acct;
raw_spin_lock_init(&worker->lock);
init_completion(&worker->ref_done);
- if (index == IO_WQ_ACCT_BOUND)
- set_bit(IO_WORKER_F_BOUND, &worker->flags);
-
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
- io_init_new_worker(wq, worker, tsk);
+ io_init_new_worker(wq, acct, worker, tsk);
} else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
kfree(worker);
goto fail;
@@ -866,14 +877,14 @@ fail:
* Iterate the passed in list and call the specific function for each
* worker that isn't exiting
*/
-static bool io_wq_for_each_worker(struct io_wq *wq,
- bool (*func)(struct io_worker *, void *),
- void *data)
+static bool io_acct_for_each_worker(struct io_wq_acct *acct,
+ bool (*func)(struct io_worker *, void *),
+ void *data)
{
struct io_worker *worker;
bool ret = false;
- list_for_each_entry_rcu(worker, &wq->all_list, all_list) {
+ list_for_each_entry_rcu(worker, &acct->all_list, all_list) {
if (io_worker_get(worker)) {
/* no task if node is/was offline */
if (worker->task)
@@ -887,6 +898,18 @@ static bool io_wq_for_each_worker(struct io_wq *wq,
return ret;
}
+static bool io_wq_for_each_worker(struct io_wq *wq,
+ bool (*func)(struct io_worker *, void *),
+ void *data)
+{
+ for (int i = 0; i < IO_WQ_ACCT_NR; i++) {
+ if (!io_acct_for_each_worker(&wq->acct[i], func, data))
+ return false;
+ }
+
+ return true;
+}
+
static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{
__set_notify_signal(worker->task);
@@ -903,19 +926,19 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
} while (work);
}
-static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work)
+static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct,
+ struct io_wq_work *work, unsigned int work_flags)
{
- struct io_wq_acct *acct = io_work_get_acct(wq, work);
unsigned int hash;
struct io_wq_work *tail;
- if (!io_wq_is_hashed(work)) {
+ if (!__io_wq_is_hashed(work_flags)) {
append:
wq_list_add_tail(&work->list, &acct->work_list);
return;
}
- hash = io_get_work_hash(work);
+ hash = __io_get_work_hash(work_flags);
tail = wq->hash_tail[hash];
wq->hash_tail[hash] = work;
if (!tail)
@@ -931,8 +954,8 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
- struct io_wq_acct *acct = io_work_get_acct(wq, work);
unsigned int work_flags = atomic_read(&work->flags);
+ struct io_wq_acct *acct = io_work_get_acct(wq, work_flags);
struct io_cb_cancel_data match = {
.fn = io_wq_work_match_item,
.data = work,
@@ -951,12 +974,12 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
}
raw_spin_lock(&acct->lock);
- io_wq_insert_work(wq, work);
+ io_wq_insert_work(wq, acct, work, work_flags);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
rcu_read_lock();
- do_create = !io_wq_activate_free_worker(wq, acct);
+ do_create = !io_acct_activate_free_worker(acct);
rcu_read_unlock();
if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
@@ -967,12 +990,12 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
if (likely(did_create))
return;
- raw_spin_lock(&wq->lock);
+ raw_spin_lock(&acct->workers_lock);
if (acct->nr_workers) {
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
return;
}
- raw_spin_unlock(&wq->lock);
+ raw_spin_unlock(&acct->workers_lock);
/* fatal condition, failed to create the first worker */
io_acct_cancel_pending_work(wq, acct, &match);
@@ -1021,10 +1044,10 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
}
static inline void io_wq_remove_pending(struct io_wq *wq,
+ struct io_wq_acct *acct,
struct io_wq_work *work,
struct io_wq_work_node *prev)
{
- struct io_wq_acct *acct = io_work_get_acct(wq, work);
unsigned int hash = io_get_work_hash(work);
struct io_wq_work *prev_work = NULL;
@@ -1051,7 +1074,7 @@ static bool io_acct_cancel_pending_work(struct io_wq *wq,
work = container_of(node, struct io_wq_work, list);
if (!match->fn(work, match->data))
continue;
- io_wq_remove_pending(wq, work, prev);
+ io_wq_remove_pending(wq, acct, work, prev);
raw_spin_unlock(&acct->lock);
io_run_cancel(work, wq);
match->nr_pending++;
@@ -1079,11 +1102,22 @@ retry:
}
}
+static void io_acct_cancel_running_work(struct io_wq_acct *acct,
+ struct io_cb_cancel_data *match)
+{
+ raw_spin_lock(&acct->workers_lock);
+ io_acct_for_each_worker(acct, io_wq_worker_cancel, match);
+ raw_spin_unlock(&acct->workers_lock);
+}
+
static void io_wq_cancel_running_work(struct io_wq *wq,
struct io_cb_cancel_data *match)
{
rcu_read_lock();
- io_wq_for_each_worker(wq, io_wq_worker_cancel, match);
+
+ for (int i = 0; i < IO_WQ_ACCT_NR; i++)
+ io_acct_cancel_running_work(&wq->acct[i], match);
+
rcu_read_unlock();
}
@@ -1106,16 +1140,14 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
* as an indication that we attempt to signal cancellation. The
* completion will run normally in this case.
*
- * Do both of these while holding the wq->lock, to ensure that
+ * Do both of these while holding the acct->workers_lock, to ensure that
* we'll find a work item regardless of state.
*/
io_wq_cancel_pending_work(wq, &match);
if (match.nr_pending && !match.cancel_all)
return IO_WQ_CANCEL_OK;
- raw_spin_lock(&wq->lock);
io_wq_cancel_running_work(wq, &match);
- raw_spin_unlock(&wq->lock);
if (match.nr_running && !match.cancel_all)
return IO_WQ_CANCEL_RUNNING;
@@ -1139,7 +1171,7 @@ static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode,
struct io_wq_acct *acct = &wq->acct[i];
if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
- io_wq_activate_free_worker(wq, acct);
+ io_acct_activate_free_worker(acct);
}
rcu_read_unlock();
return 1;
@@ -1177,16 +1209,16 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wq_acct *acct = &wq->acct[i];
- acct->index = i;
atomic_set(&acct->nr_running, 0);
+
+ raw_spin_lock_init(&acct->workers_lock);
+ INIT_HLIST_NULLS_HEAD(&acct->free_list, 0);
+ INIT_LIST_HEAD(&acct->all_list);
+
INIT_WQ_LIST(&acct->work_list);
raw_spin_lock_init(&acct->lock);
}
- raw_spin_lock_init(&wq->lock);
- INIT_HLIST_NULLS_HEAD(&wq->free_list, 0);
- INIT_LIST_HEAD(&wq->all_list);
-
wq->task = get_task_struct(data->task);
atomic_set(&wq->worker_refs, 1);
init_completion(&wq->worker_done);
@@ -1372,14 +1404,14 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
rcu_read_lock();
- raw_spin_lock(&wq->lock);
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
acct = &wq->acct[i];
+ raw_spin_lock(&acct->workers_lock);
prev[i] = max_t(int, acct->max_workers, prev[i]);
if (new_count[i])
acct->max_workers = new_count[i];
+ raw_spin_unlock(&acct->workers_lock);
}
- raw_spin_unlock(&wq->lock);
rcu_read_unlock();
for (i = 0; i < IO_WQ_ACCT_NR; i++)
diff --git a/io_uring/io-wq.h b/io_uring/io-wq.h
index b3b004a7b625..d4fb2940e435 100644
--- a/io_uring/io-wq.h
+++ b/io_uring/io-wq.h
@@ -54,9 +54,14 @@ int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask);
int io_wq_max_workers(struct io_wq *wq, int *new_count);
bool io_wq_worker_stopped(void);
+static inline bool __io_wq_is_hashed(unsigned int work_flags)
+{
+ return work_flags & IO_WQ_WORK_HASHED;
+}
+
static inline bool io_wq_is_hashed(struct io_wq_work *work)
{
- return atomic_read(&work->flags) & IO_WQ_WORK_HASHED;
+ return __io_wq_is_hashed(atomic_read(&work->flags));
}
typedef bool (work_cancel_fn)(struct io_wq_work *, void *);
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index ceacf6230e34..bd1ab21ed539 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -110,11 +110,13 @@
#define SQE_VALID_FLAGS (SQE_COMMON_FLAGS | IOSQE_BUFFER_SELECT | \
IOSQE_IO_DRAIN | IOSQE_CQE_SKIP_SUCCESS)
+#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
+
#define IO_REQ_CLEAN_FLAGS (REQ_F_BUFFER_SELECTED | REQ_F_NEED_CLEANUP | \
REQ_F_POLLED | REQ_F_INFLIGHT | REQ_F_CREDS | \
REQ_F_ASYNC_DATA)
-#define IO_REQ_CLEAN_SLOW_FLAGS (REQ_F_REFCOUNT | REQ_F_LINK | REQ_F_HARDLINK |\
+#define IO_REQ_CLEAN_SLOW_FLAGS (REQ_F_REFCOUNT | IO_REQ_LINK_FLAGS | \
REQ_F_REISSUE | IO_REQ_CLEAN_FLAGS)
#define IO_TCTX_REFS_CACHE_NR (1U << 10)
@@ -131,7 +133,6 @@ struct io_defer_entry {
/* requests with any of those set should undergo io_disarm_next() */
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
-#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
/*
* No waiters. It's larger than any valid value of the tw counter
@@ -254,7 +255,7 @@ static __cold void io_fallback_req_func(struct work_struct *work)
percpu_ref_get(&ctx->refs);
mutex_lock(&ctx->uring_lock);
llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
- req->io_task_work.func(req, &ts);
+ req->io_task_work.func(req, ts);
io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock);
percpu_ref_put(&ctx->refs);
@@ -282,6 +283,16 @@ static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
return 0;
}
+static void io_free_alloc_caches(struct io_ring_ctx *ctx)
+{
+ io_alloc_cache_free(&ctx->apoll_cache, kfree);
+ io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
+ io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
+ io_alloc_cache_free(&ctx->uring_cache, kfree);
+ io_alloc_cache_free(&ctx->msg_cache, kfree);
+ io_futex_cache_free(ctx);
+}
+
static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
{
struct io_ring_ctx *ctx;
@@ -313,7 +324,6 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
init_waitqueue_head(&ctx->sqo_sq_wait);
INIT_LIST_HEAD(&ctx->sqd_list);
INIT_LIST_HEAD(&ctx->cq_overflow_list);
- INIT_LIST_HEAD(&ctx->io_buffers_cache);
ret = io_alloc_cache_init(&ctx->apoll_cache, IO_POLL_ALLOC_CACHE_MAX,
sizeof(struct async_poll), 0);
ret |= io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
@@ -338,7 +348,6 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
spin_lock_init(&ctx->completion_lock);
raw_spin_lock_init(&ctx->timeout_lock);
INIT_WQ_LIST(&ctx->iopoll_list);
- INIT_LIST_HEAD(&ctx->io_buffers_comp);
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
INIT_LIST_HEAD(&ctx->ltimeout_list);
@@ -360,12 +369,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
free_ref:
percpu_ref_exit(&ctx->refs);
err:
- io_alloc_cache_free(&ctx->apoll_cache, kfree);
- io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
- io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
- io_alloc_cache_free(&ctx->uring_cache, kfree);
- io_alloc_cache_free(&ctx->msg_cache, kfree);
- io_futex_cache_free(ctx);
+ io_free_alloc_caches(ctx);
kvfree(ctx->cancel_table.hbs);
xa_destroy(&ctx->io_bl_xa);
kfree(ctx);
@@ -393,11 +397,8 @@ static bool req_need_defer(struct io_kiocb *req, u32 seq)
static void io_clean_op(struct io_kiocb *req)
{
- if (req->flags & REQ_F_BUFFER_SELECTED) {
- spin_lock(&req->ctx->completion_lock);
- io_kbuf_drop(req);
- spin_unlock(&req->ctx->completion_lock);
- }
+ if (unlikely(req->flags & REQ_F_BUFFER_SELECTED))
+ io_kbuf_drop_legacy(req);
if (req->flags & REQ_F_NEED_CLEANUP) {
const struct io_cold_def *def = &io_cold_defs[req->opcode];
@@ -542,7 +543,7 @@ static void io_queue_iowq(struct io_kiocb *req)
io_queue_linked_timeout(link);
}
-static void io_req_queue_iowq_tw(struct io_kiocb *req, struct io_tw_state *ts)
+static void io_req_queue_iowq_tw(struct io_kiocb *req, io_tw_token_t tw)
{
io_queue_iowq(req);
}
@@ -899,7 +900,7 @@ static void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
* Handle special CQ sync cases via task_work. DEFER_TASKRUN requires
* the submitter task context, IOPOLL protects with uring_lock.
*/
- if (ctx->task_complete || (ctx->flags & IORING_SETUP_IOPOLL)) {
+ if (ctx->lockless_cq) {
req->io_task_work.func = io_req_task_complete;
io_req_task_work_add(req);
return;
@@ -1021,7 +1022,7 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
return nxt;
}
-static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
+static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
{
if (!ctx)
return;
@@ -1051,24 +1052,24 @@ struct llist_node *io_handle_tw_list(struct llist_node *node,
io_task_work.node);
if (req->ctx != ctx) {
- ctx_flush_and_put(ctx, &ts);
+ ctx_flush_and_put(ctx, ts);
ctx = req->ctx;
mutex_lock(&ctx->uring_lock);
percpu_ref_get(&ctx->refs);
}
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
- req, &ts);
+ req, ts);
node = next;
(*count)++;
if (unlikely(need_resched())) {
- ctx_flush_and_put(ctx, &ts);
+ ctx_flush_and_put(ctx, ts);
ctx = NULL;
cond_resched();
}
} while (node && *count < max_entries);
- ctx_flush_and_put(ctx, &ts);
+ ctx_flush_and_put(ctx, ts);
return node;
}
@@ -1157,7 +1158,7 @@ static inline void io_req_local_work_add(struct io_kiocb *req,
* We don't know how many reuqests is there in the link and whether
* they can even be queued lazily, fall back to non-lazy.
*/
- if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
+ if (req->flags & IO_REQ_LINK_FLAGS)
flags &= ~IOU_F_TWQ_LAZY_WAKE;
guard(rcu)();
@@ -1276,7 +1277,7 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
}
static int __io_run_local_work_loop(struct llist_node **node,
- struct io_tw_state *ts,
+ io_tw_token_t tw,
int events)
{
int ret = 0;
@@ -1287,7 +1288,7 @@ static int __io_run_local_work_loop(struct llist_node **node,
io_task_work.node);
INDIRECT_CALL_2(req->io_task_work.func,
io_poll_task_func, io_req_rw_complete,
- req, ts);
+ req, tw);
*node = next;
if (++ret >= events)
break;
@@ -1296,7 +1297,7 @@ static int __io_run_local_work_loop(struct llist_node **node,
return ret;
}
-static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
+static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
int min_events, int max_events)
{
struct llist_node *node;
@@ -1309,7 +1310,7 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
again:
min_events -= ret;
- ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, max_events);
+ ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
if (ctx->retry_llist.first)
goto retry_done;
@@ -1318,7 +1319,7 @@ again:
* running the pending items.
*/
node = llist_reverse_order(llist_del_all(&ctx->work_llist));
- ret += __io_run_local_work_loop(&node, ts, max_events - ret);
+ ret += __io_run_local_work_loop(&node, tw, max_events - ret);
ctx->retry_llist.first = node;
loops++;
@@ -1340,7 +1341,7 @@ static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
if (!io_local_work_pending(ctx))
return 0;
- return __io_run_local_work(ctx, &ts, min_events,
+ return __io_run_local_work(ctx, ts, min_events,
max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
}
@@ -1351,20 +1352,20 @@ static int io_run_local_work(struct io_ring_ctx *ctx, int min_events,
int ret;
mutex_lock(&ctx->uring_lock);
- ret = __io_run_local_work(ctx, &ts, min_events, max_events);
+ ret = __io_run_local_work(ctx, ts, min_events, max_events);
mutex_unlock(&ctx->uring_lock);
return ret;
}
-static void io_req_task_cancel(struct io_kiocb *req, struct io_tw_state *ts)
+static void io_req_task_cancel(struct io_kiocb *req, io_tw_token_t tw)
{
- io_tw_lock(req->ctx, ts);
+ io_tw_lock(req->ctx, tw);
io_req_defer_failed(req, req->cqe.res);
}
-void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts)
+void io_req_task_submit(struct io_kiocb *req, io_tw_token_t tw)
{
- io_tw_lock(req->ctx, ts);
+ io_tw_lock(req->ctx, tw);
if (unlikely(io_should_terminate_tw()))
io_req_defer_failed(req, -EFAULT);
else if (req->flags & REQ_F_FORCE_ASYNC)
@@ -1582,7 +1583,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
return 0;
}
-void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts)
+void io_req_task_complete(struct io_kiocb *req, io_tw_token_t tw)
{
io_req_complete_defer(req);
}
@@ -1762,9 +1763,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
return ret;
}
-int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts)
+int io_poll_issue(struct io_kiocb *req, io_tw_token_t tw)
{
- io_tw_lock(req->ctx, ts);
+ io_tw_lock(req->ctx, tw);
return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT|
IO_URING_F_COMPLETE_DEFER);
}
@@ -1996,9 +1997,8 @@ static inline bool io_check_restriction(struct io_ring_ctx *ctx,
return true;
}
-static void io_init_req_drain(struct io_kiocb *req)
+static void io_init_drain(struct io_ring_ctx *ctx)
{
- struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *head = ctx->submit_state.link.head;
ctx->drain_active = true;
@@ -2060,7 +2060,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
if (sqe_flags & IOSQE_IO_DRAIN) {
if (ctx->drain_disabled)
return io_init_fail_req(req, -EOPNOTSUPP);
- io_init_req_drain(req);
+ io_init_drain(ctx);
}
}
if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
@@ -2702,12 +2702,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_sqe_files_unregister(ctx);
io_cqring_overflow_kill(ctx);
io_eventfd_unregister(ctx);
- io_alloc_cache_free(&ctx->apoll_cache, kfree);
- io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
- io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
- io_alloc_cache_free(&ctx->uring_cache, kfree);
- io_alloc_cache_free(&ctx->msg_cache, kfree);
- io_futex_cache_free(ctx);
+ io_free_alloc_caches(ctx);
io_destroy_buffers(ctx);
io_free_region(ctx, &ctx->param_region);
mutex_unlock(&ctx->uring_lock);
@@ -3535,6 +3530,44 @@ static struct file *io_uring_get_file(struct io_ring_ctx *ctx)
O_RDWR | O_CLOEXEC, NULL);
}
+static int io_uring_sanitise_params(struct io_uring_params *p)
+{
+ unsigned flags = p->flags;
+
+ /* There is no way to mmap rings without a real fd */
+ if ((flags & IORING_SETUP_REGISTERED_FD_ONLY) &&
+ !(flags & IORING_SETUP_NO_MMAP))
+ return -EINVAL;
+
+ if (flags & IORING_SETUP_SQPOLL) {
+ /* IPI related flags don't make sense with SQPOLL */
+ if (flags & (IORING_SETUP_COOP_TASKRUN |
+ IORING_SETUP_TASKRUN_FLAG |
+ IORING_SETUP_DEFER_TASKRUN))
+ return -EINVAL;
+ }
+
+ if (flags & IORING_SETUP_TASKRUN_FLAG) {
+ if (!(flags & (IORING_SETUP_COOP_TASKRUN |
+ IORING_SETUP_DEFER_TASKRUN)))
+ return -EINVAL;
+ }
+
+ /* HYBRID_IOPOLL only valid with IOPOLL */
+ if ((flags & IORING_SETUP_HYBRID_IOPOLL) && !(flags & IORING_SETUP_IOPOLL))
+ return -EINVAL;
+
+ /*
+ * For DEFER_TASKRUN we require the completion task to be the same as
+ * the submission task. This implies that there is only one submitter.
+ */
+ if ((flags & IORING_SETUP_DEFER_TASKRUN) &&
+ !(flags & IORING_SETUP_SINGLE_ISSUER))
+ return -EINVAL;
+
+ return 0;
+}
+
int io_uring_fill_params(unsigned entries, struct io_uring_params *p)
{
if (!entries)
@@ -3545,10 +3578,6 @@ int io_uring_fill_params(unsigned entries, struct io_uring_params *p)
entries = IORING_MAX_ENTRIES;
}
- if ((p->flags & IORING_SETUP_REGISTERED_FD_ONLY)
- && !(p->flags & IORING_SETUP_NO_MMAP))
- return -EINVAL;
-
/*
* Use twice as many entries for the CQ ring. It's possible for the
* application to drive a higher depth than the size of the SQ ring,
@@ -3610,6 +3639,10 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
struct file *file;
int ret;
+ ret = io_uring_sanitise_params(p);
+ if (ret)
+ return ret;
+
ret = io_uring_fill_params(entries, p);
if (unlikely(ret))
return ret;
@@ -3657,37 +3690,10 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
* For SQPOLL, we just need a wakeup, always. For !SQPOLL, if
* COOP_TASKRUN is set, then IPIs are never needed by the app.
*/
- ret = -EINVAL;
- if (ctx->flags & IORING_SETUP_SQPOLL) {
- /* IPI related flags don't make sense with SQPOLL */
- if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
- IORING_SETUP_TASKRUN_FLAG |
- IORING_SETUP_DEFER_TASKRUN))
- goto err;
+ if (ctx->flags & (IORING_SETUP_SQPOLL|IORING_SETUP_COOP_TASKRUN))
ctx->notify_method = TWA_SIGNAL_NO_IPI;
- } else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
- ctx->notify_method = TWA_SIGNAL_NO_IPI;
- } else {
- if (ctx->flags & IORING_SETUP_TASKRUN_FLAG &&
- !(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
- goto err;
+ else
ctx->notify_method = TWA_SIGNAL;
- }
-
- /* HYBRID_IOPOLL only valid with IOPOLL */
- if ((ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_HYBRID_IOPOLL)) ==
- IORING_SETUP_HYBRID_IOPOLL)
- goto err;
-
- /*
- * For DEFER_TASKRUN we require the completion task to be the same as the
- * submission task. This implies that there is only one submitter, so enforce
- * that.
- */
- if (ctx->flags & IORING_SETUP_DEFER_TASKRUN &&
- !(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) {
- goto err;
- }
/*
* This is just grabbed for accounting purposes. When a process exits,
@@ -3916,10 +3922,9 @@ static int __init io_uring_init(void)
req_cachep = kmem_cache_create("io_kiocb", sizeof(struct io_kiocb), &kmem_args,
SLAB_HWCACHE_ALIGN | SLAB_PANIC | SLAB_ACCOUNT |
SLAB_TYPESAFE_BY_RCU);
- io_buf_cachep = KMEM_CACHE(io_buffer,
- SLAB_HWCACHE_ALIGN | SLAB_PANIC | SLAB_ACCOUNT);
iou_wq = alloc_workqueue("iou_exit", WQ_UNBOUND, 64);
+ BUG_ON(!iou_wq);
#ifdef CONFIG_SYSCTL
register_sysctl_init("kernel", kernel_io_uring_disabled_table);
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index ab619e63ef39..6c46d9cdd7aa 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -90,9 +90,9 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
unsigned flags);
bool io_alloc_async_data(struct io_kiocb *req);
void io_req_task_queue(struct io_kiocb *req);
-void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts);
+void io_req_task_complete(struct io_kiocb *req, io_tw_token_t tw);
void io_req_task_queue_fail(struct io_kiocb *req, int ret);
-void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
+void io_req_task_submit(struct io_kiocb *req, io_tw_token_t tw);
struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
void tctx_task_work(struct callback_head *cb);
@@ -104,7 +104,7 @@ int io_ring_add_registered_file(struct io_uring_task *tctx, struct file *file,
int start, int end);
void io_req_queue_iowq(struct io_kiocb *req);
-int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts);
+int io_poll_issue(struct io_kiocb *req, io_tw_token_t tw);
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr);
int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin);
void __io_submit_flush_completions(struct io_ring_ctx *ctx);
@@ -376,7 +376,7 @@ static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
return task_work_pending(current) || io_local_work_pending(ctx);
}
-static inline void io_tw_lock(struct io_ring_ctx *ctx, struct io_tw_state *ts)
+static inline void io_tw_lock(struct io_ring_ctx *ctx, io_tw_token_t tw)
{
lockdep_assert_held(&ctx->uring_lock);
}
@@ -418,7 +418,6 @@ static inline bool io_req_cache_empty(struct io_ring_ctx *ctx)
}
extern struct kmem_cache *req_cachep;
-extern struct kmem_cache *io_buf_cachep;
static inline struct io_kiocb *io_extract_req(struct io_ring_ctx *ctx)
{
diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c
index 8e72de7712ac..3478be6d02ab 100644
--- a/io_uring/kbuf.c
+++ b/io_uring/kbuf.c
@@ -20,7 +20,8 @@
/* BIDs are addressed by a 16-bit field in a CQE */
#define MAX_BIDS_PER_BGID (1 << 16)
-struct kmem_cache *io_buf_cachep;
+/* Mapped buffer ring, return io_uring_buf from head */
+#define io_ring_head_to_buf(br, head, mask) &(br)->bufs[(head) & (mask)]
struct io_provide_buf {
struct file *file;
@@ -31,6 +32,34 @@ struct io_provide_buf {
__u16 bid;
};
+bool io_kbuf_commit(struct io_kiocb *req,
+ struct io_buffer_list *bl, int len, int nr)
+{
+ if (unlikely(!(req->flags & REQ_F_BUFFERS_COMMIT)))
+ return true;
+
+ req->flags &= ~REQ_F_BUFFERS_COMMIT;
+
+ if (unlikely(len < 0))
+ return true;
+
+ if (bl->flags & IOBL_INC) {
+ struct io_uring_buf *buf;
+
+ buf = io_ring_head_to_buf(bl->buf_ring, bl->head, bl->mask);
+ if (WARN_ON_ONCE(len > buf->len))
+ len = buf->len;
+ buf->len -= len;
+ if (buf->len) {
+ buf->addr += len;
+ return false;
+ }
+ }
+
+ bl->head += nr;
+ return true;
+}
+
static inline struct io_buffer_list *io_buffer_get_list(struct io_ring_ctx *ctx,
unsigned int bgid)
{
@@ -52,6 +81,16 @@ static int io_buffer_add_list(struct io_ring_ctx *ctx,
return xa_err(xa_store(&ctx->io_bl_xa, bgid, bl, GFP_KERNEL));
}
+void io_kbuf_drop_legacy(struct io_kiocb *req)
+{
+ if (WARN_ON_ONCE(!(req->flags & REQ_F_BUFFER_SELECTED)))
+ return;
+ req->buf_index = req->kbuf->bgid;
+ req->flags &= ~REQ_F_BUFFER_SELECTED;
+ kfree(req->kbuf);
+ req->kbuf = NULL;
+}
+
bool io_kbuf_recycle_legacy(struct io_kiocb *req, unsigned issue_flags)
{
struct io_ring_ctx *ctx = req->ctx;
@@ -70,33 +109,6 @@ bool io_kbuf_recycle_legacy(struct io_kiocb *req, unsigned issue_flags)
return true;
}
-void __io_put_kbuf(struct io_kiocb *req, int len, unsigned issue_flags)
-{
- /*
- * We can add this buffer back to two lists:
- *
- * 1) The io_buffers_cache list. This one is protected by the
- * ctx->uring_lock. If we already hold this lock, add back to this
- * list as we can grab it from issue as well.
- * 2) The io_buffers_comp list. This one is protected by the
- * ctx->completion_lock.
- *
- * We migrate buffers from the comp_list to the issue cache list
- * when we need one.
- */
- if (issue_flags & IO_URING_F_UNLOCKED) {
- struct io_ring_ctx *ctx = req->ctx;
-
- spin_lock(&ctx->completion_lock);
- __io_put_kbuf_list(req, len, &ctx->io_buffers_comp);
- spin_unlock(&ctx->completion_lock);
- } else {
- lockdep_assert_held(&req->ctx->uring_lock);
-
- __io_put_kbuf_list(req, len, &req->ctx->io_buffers_cache);
- }
-}
-
static void __user *io_provided_buffer_select(struct io_kiocb *req, size_t *len,
struct io_buffer_list *bl)
{
@@ -342,6 +354,35 @@ int io_buffers_peek(struct io_kiocb *req, struct buf_sel_arg *arg)
return io_provided_buffers_select(req, &arg->max_len, bl, arg->iovs);
}
+static inline bool __io_put_kbuf_ring(struct io_kiocb *req, int len, int nr)
+{
+ struct io_buffer_list *bl = req->buf_list;
+ bool ret = true;
+
+ if (bl) {
+ ret = io_kbuf_commit(req, bl, len, nr);
+ req->buf_index = bl->bgid;
+ }
+ req->flags &= ~REQ_F_BUFFER_RING;
+ return ret;
+}
+
+unsigned int __io_put_kbufs(struct io_kiocb *req, int len, int nbufs)
+{
+ unsigned int ret;
+
+ ret = IORING_CQE_F_BUFFER | (req->buf_index << IORING_CQE_BUFFER_SHIFT);
+
+ if (unlikely(!(req->flags & REQ_F_BUFFER_RING))) {
+ io_kbuf_drop_legacy(req);
+ return ret;
+ }
+
+ if (!__io_put_kbuf_ring(req, len, nbufs))
+ ret |= IORING_CQE_F_BUF_MORE;
+ return ret;
+}
+
static int __io_remove_buffers(struct io_ring_ctx *ctx,
struct io_buffer_list *bl, unsigned nbufs)
{
@@ -367,7 +408,9 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx,
struct io_buffer *nxt;
nxt = list_first_entry(&bl->buf_list, struct io_buffer, list);
- list_move(&nxt->list, &ctx->io_buffers_cache);
+ list_del(&nxt->list);
+ kfree(nxt);
+
if (++i == nbufs)
return i;
cond_resched();
@@ -385,8 +428,6 @@ static void io_put_bl(struct io_ring_ctx *ctx, struct io_buffer_list *bl)
void io_destroy_buffers(struct io_ring_ctx *ctx)
{
struct io_buffer_list *bl;
- struct list_head *item, *tmp;
- struct io_buffer *buf;
while (1) {
unsigned long index = 0;
@@ -400,19 +441,6 @@ void io_destroy_buffers(struct io_ring_ctx *ctx)
break;
io_put_bl(ctx, bl);
}
-
- /*
- * Move deferred locked entries to cache before pruning
- */
- spin_lock(&ctx->completion_lock);
- if (!list_empty(&ctx->io_buffers_comp))
- list_splice_init(&ctx->io_buffers_comp, &ctx->io_buffers_cache);
- spin_unlock(&ctx->completion_lock);
-
- list_for_each_safe(item, tmp, &ctx->io_buffers_cache) {
- buf = list_entry(item, struct io_buffer, list);
- kmem_cache_free(io_buf_cachep, buf);
- }
}
static void io_destroy_bl(struct io_ring_ctx *ctx, struct io_buffer_list *bl)
@@ -501,53 +529,6 @@ int io_provide_buffers_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe
return 0;
}
-#define IO_BUFFER_ALLOC_BATCH 64
-
-static int io_refill_buffer_cache(struct io_ring_ctx *ctx)
-{
- struct io_buffer *bufs[IO_BUFFER_ALLOC_BATCH];
- int allocated;
-
- /*
- * Completions that don't happen inline (eg not under uring_lock) will
- * add to ->io_buffers_comp. If we don't have any free buffers, check
- * the completion list and splice those entries first.
- */
- if (!list_empty_careful(&ctx->io_buffers_comp)) {
- spin_lock(&ctx->completion_lock);
- if (!list_empty(&ctx->io_buffers_comp)) {
- list_splice_init(&ctx->io_buffers_comp,
- &ctx->io_buffers_cache);
- spin_unlock(&ctx->completion_lock);
- return 0;
- }
- spin_unlock(&ctx->completion_lock);
- }
-
- /*
- * No free buffers and no completion entries either. Allocate a new
- * batch of buffer entries and add those to our freelist.
- */
-
- allocated = kmem_cache_alloc_bulk(io_buf_cachep, GFP_KERNEL_ACCOUNT,
- ARRAY_SIZE(bufs), (void **) bufs);
- if (unlikely(!allocated)) {
- /*
- * Bulk alloc is all-or-nothing. If we fail to get a batch,
- * retry single alloc to be on the safe side.
- */
- bufs[0] = kmem_cache_alloc(io_buf_cachep, GFP_KERNEL);
- if (!bufs[0])
- return -ENOMEM;
- allocated = 1;
- }
-
- while (allocated)
- list_add_tail(&bufs[--allocated]->list, &ctx->io_buffers_cache);
-
- return 0;
-}
-
static int io_add_buffers(struct io_ring_ctx *ctx, struct io_provide_buf *pbuf,
struct io_buffer_list *bl)
{
@@ -556,12 +537,11 @@ static int io_add_buffers(struct io_ring_ctx *ctx, struct io_provide_buf *pbuf,
int i, bid = pbuf->bid;
for (i = 0; i < pbuf->nbufs; i++) {
- if (list_empty(&ctx->io_buffers_cache) &&
- io_refill_buffer_cache(ctx))
+ buf = kmalloc(sizeof(*buf), GFP_KERNEL_ACCOUNT);
+ if (!buf)
break;
- buf = list_first_entry(&ctx->io_buffers_cache, struct io_buffer,
- list);
- list_move_tail(&buf->list, &bl->buf_list);
+
+ list_add_tail(&buf->list, &bl->buf_list);
buf->addr = addr;
buf->len = min_t(__u32, pbuf->len, MAX_RW_COUNT);
buf->bid = bid;
diff --git a/io_uring/kbuf.h b/io_uring/kbuf.h
index bd80c44c5af1..2ec0b983ce24 100644
--- a/io_uring/kbuf.h
+++ b/io_uring/kbuf.h
@@ -74,9 +74,12 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg);
int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg);
int io_register_pbuf_status(struct io_ring_ctx *ctx, void __user *arg);
-void __io_put_kbuf(struct io_kiocb *req, int len, unsigned issue_flags);
-
bool io_kbuf_recycle_legacy(struct io_kiocb *req, unsigned issue_flags);
+void io_kbuf_drop_legacy(struct io_kiocb *req);
+
+unsigned int __io_put_kbufs(struct io_kiocb *req, int len, int nbufs);
+bool io_kbuf_commit(struct io_kiocb *req,
+ struct io_buffer_list *bl, int len, int nr);
struct io_mapped_region *io_pbuf_get_region(struct io_ring_ctx *ctx,
unsigned int bgid);
@@ -116,100 +119,19 @@ static inline bool io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags)
return false;
}
-/* Mapped buffer ring, return io_uring_buf from head */
-#define io_ring_head_to_buf(br, head, mask) &(br)->bufs[(head) & (mask)]
-
-static inline bool io_kbuf_commit(struct io_kiocb *req,
- struct io_buffer_list *bl, int len, int nr)
-{
- if (unlikely(!(req->flags & REQ_F_BUFFERS_COMMIT)))
- return true;
-
- req->flags &= ~REQ_F_BUFFERS_COMMIT;
-
- if (unlikely(len < 0))
- return true;
-
- if (bl->flags & IOBL_INC) {
- struct io_uring_buf *buf;
-
- buf = io_ring_head_to_buf(bl->buf_ring, bl->head, bl->mask);
- if (WARN_ON_ONCE(len > buf->len))
- len = buf->len;
- buf->len -= len;
- if (buf->len) {
- buf->addr += len;
- return false;
- }
- }
-
- bl->head += nr;
- return true;
-}
-
-static inline bool __io_put_kbuf_ring(struct io_kiocb *req, int len, int nr)
-{
- struct io_buffer_list *bl = req->buf_list;
- bool ret = true;
-
- if (bl) {
- ret = io_kbuf_commit(req, bl, len, nr);
- req->buf_index = bl->bgid;
- }
- req->flags &= ~REQ_F_BUFFER_RING;
- return ret;
-}
-
-static inline void __io_put_kbuf_list(struct io_kiocb *req, int len,
- struct list_head *list)
-{
- if (req->flags & REQ_F_BUFFER_RING) {
- __io_put_kbuf_ring(req, len, 1);
- } else {
- req->buf_index = req->kbuf->bgid;
- list_add(&req->kbuf->list, list);
- req->flags &= ~REQ_F_BUFFER_SELECTED;
- }
-}
-
-static inline void io_kbuf_drop(struct io_kiocb *req)
-{
- lockdep_assert_held(&req->ctx->completion_lock);
-
- if (!(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
- return;
-
- /* len == 0 is fine here, non-ring will always drop all of it */
- __io_put_kbuf_list(req, 0, &req->ctx->io_buffers_comp);
-}
-
-static inline unsigned int __io_put_kbufs(struct io_kiocb *req, int len,
- int nbufs, unsigned issue_flags)
-{
- unsigned int ret;
-
- if (!(req->flags & (REQ_F_BUFFER_RING | REQ_F_BUFFER_SELECTED)))
- return 0;
-
- ret = IORING_CQE_F_BUFFER | (req->buf_index << IORING_CQE_BUFFER_SHIFT);
- if (req->flags & REQ_F_BUFFER_RING) {
- if (!__io_put_kbuf_ring(req, len, nbufs))
- ret |= IORING_CQE_F_BUF_MORE;
- } else {
- __io_put_kbuf(req, len, issue_flags);
- }
- return ret;
-}
-
static inline unsigned int io_put_kbuf(struct io_kiocb *req, int len,
unsigned issue_flags)
{
- return __io_put_kbufs(req, len, 1, issue_flags);
+ if (!(req->flags & (REQ_F_BUFFER_RING | REQ_F_BUFFER_SELECTED)))
+ return 0;
+ return __io_put_kbufs(req, len, 1);
}
static inline unsigned int io_put_kbufs(struct io_kiocb *req, int len,
int nbufs, unsigned issue_flags)
{
- return __io_put_kbufs(req, len, nbufs, issue_flags);
+ if (!(req->flags & (REQ_F_BUFFER_RING | REQ_F_BUFFER_SELECTED)))
+ return 0;
+ return __io_put_kbufs(req, len, nbufs);
}
#endif
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index 7e6f68e911f1..0bbcbbcdebfd 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -71,7 +71,7 @@ static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx)
return target_ctx->task_complete;
}
-static void io_msg_tw_complete(struct io_kiocb *req, struct io_tw_state *ts)
+static void io_msg_tw_complete(struct io_kiocb *req, io_tw_token_t tw)
{
struct io_ring_ctx *ctx = req->ctx;
diff --git a/io_uring/net.c b/io_uring/net.c
index 17852a6616ff..10344b3a6d89 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -76,6 +76,7 @@ struct io_sr_msg {
/* initialised and used only by !msg send variants */
u16 buf_group;
u16 buf_index;
+ bool retry;
void __user *msg_control;
/* used only for send zerocopy */
struct io_kiocb *notif;
@@ -187,6 +188,7 @@ static inline void io_mshot_prep_retry(struct io_kiocb *req,
req->flags &= ~REQ_F_BL_EMPTY;
sr->done_io = 0;
+ sr->retry = false;
sr->len = 0; /* get from the provided buffer */
req->buf_index = sr->buf_group;
}
@@ -402,6 +404,7 @@ int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
sr->done_io = 0;
+ sr->retry = false;
if (req->opcode != IORING_OP_SEND) {
if (sqe->addr2 || sqe->file_index)
@@ -785,6 +788,7 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
struct io_sr_msg *sr = io_kiocb_to_cmd(req, struct io_sr_msg);
sr->done_io = 0;
+ sr->retry = false;
if (unlikely(sqe->file_index || sqe->addr2))
return -EINVAL;
@@ -833,6 +837,9 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return io_recvmsg_prep_setup(req);
}
+/* bits to clear in old and inherit in new cflags on bundle retry */
+#define CQE_F_MASK (IORING_CQE_F_SOCK_NONEMPTY|IORING_CQE_F_MORE)
+
/*
* Finishes io_recv and io_recvmsg.
*
@@ -852,9 +859,19 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
if (sr->flags & IORING_RECVSEND_BUNDLE) {
cflags |= io_put_kbufs(req, *ret, io_bundle_nbufs(kmsg, *ret),
issue_flags);
+ if (sr->retry)
+ cflags = req->cqe.flags | (cflags & CQE_F_MASK);
/* bundle with no more immediate buffers, we're done */
if (req->flags & REQ_F_BL_EMPTY)
goto finish;
+ /* if more is available, retry and append to this one */
+ if (!sr->retry && kmsg->msg.msg_inq > 0 && *ret > 0) {
+ req->cqe.flags = cflags & ~CQE_F_MASK;
+ sr->len = kmsg->msg.msg_inq;
+ sr->done_io += *ret;
+ sr->retry = true;
+ return false;
+ }
} else {
cflags |= io_put_kbuf(req, *ret, issue_flags);
}
@@ -1233,6 +1250,7 @@ int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
struct io_kiocb *notif;
zc->done_io = 0;
+ zc->retry = false;
req->flags |= REQ_F_POLL_NO_LAZY;
if (unlikely(READ_ONCE(sqe->__pad2[0]) || READ_ONCE(sqe->addr3)))
diff --git a/io_uring/notif.c b/io_uring/notif.c
index ee3a33510b3c..7bd92538dccb 100644
--- a/io_uring/notif.c
+++ b/io_uring/notif.c
@@ -11,7 +11,7 @@
static const struct ubuf_info_ops io_ubuf_ops;
-static void io_notif_tw_complete(struct io_kiocb *notif, struct io_tw_state *ts)
+static void io_notif_tw_complete(struct io_kiocb *notif, io_tw_token_t tw)
{
struct io_notif_data *nd = io_notif_to_data(notif);
@@ -29,7 +29,7 @@ static void io_notif_tw_complete(struct io_kiocb *notif, struct io_tw_state *ts)
}
nd = nd->next;
- io_req_task_complete(notif, ts);
+ io_req_task_complete(notif, tw);
} while (nd);
}
diff --git a/io_uring/poll.c b/io_uring/poll.c
index bb1c0cd4f809..176854882ba6 100644
--- a/io_uring/poll.c
+++ b/io_uring/poll.c
@@ -220,7 +220,7 @@ static inline void io_poll_execute(struct io_kiocb *req, int res)
* req->cqe.res. IOU_POLL_REMOVE_POLL_USE_RES indicates to remove multishot
* poll and that the result is stored in req->cqe.
*/
-static int io_poll_check_events(struct io_kiocb *req, struct io_tw_state *ts)
+static int io_poll_check_events(struct io_kiocb *req, io_tw_token_t tw)
{
int v;
@@ -288,7 +288,7 @@ static int io_poll_check_events(struct io_kiocb *req, struct io_tw_state *ts)
return IOU_POLL_REMOVE_POLL_USE_RES;
}
} else {
- int ret = io_poll_issue(req, ts);
+ int ret = io_poll_issue(req, tw);
if (ret == IOU_STOP_MULTISHOT)
return IOU_POLL_REMOVE_POLL_USE_RES;
else if (ret == IOU_REQUEUE)
@@ -311,11 +311,11 @@ static int io_poll_check_events(struct io_kiocb *req, struct io_tw_state *ts)
return IOU_POLL_NO_ACTION;
}
-void io_poll_task_func(struct io_kiocb *req, struct io_tw_state *ts)
+void io_poll_task_func(struct io_kiocb *req, io_tw_token_t tw)
{
int ret;
- ret = io_poll_check_events(req, ts);
+ ret = io_poll_check_events(req, tw);
if (ret == IOU_POLL_NO_ACTION) {
io_kbuf_recycle(req, 0);
return;
@@ -335,7 +335,7 @@ void io_poll_task_func(struct io_kiocb *req, struct io_tw_state *ts)
poll = io_kiocb_to_cmd(req, struct io_poll);
req->cqe.res = mangle_poll(req->cqe.res & poll->events);
} else if (ret == IOU_POLL_REISSUE) {
- io_req_task_submit(req, ts);
+ io_req_task_submit(req, tw);
return;
} else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) {
req->cqe.res = ret;
@@ -343,14 +343,14 @@ void io_poll_task_func(struct io_kiocb *req, struct io_tw_state *ts)
}
io_req_set_res(req, req->cqe.res, 0);
- io_req_task_complete(req, ts);
+ io_req_task_complete(req, tw);
} else {
- io_tw_lock(req->ctx, ts);
+ io_tw_lock(req->ctx, tw);
if (ret == IOU_POLL_REMOVE_POLL_USE_RES)
- io_req_task_complete(req, ts);
+ io_req_task_complete(req, tw);
else if (ret == IOU_POLL_DONE || ret == IOU_POLL_REISSUE)
- io_req_task_submit(req, ts);
+ io_req_task_submit(req, tw);
else
io_req_defer_failed(req, ret);
}
diff --git a/io_uring/poll.h b/io_uring/poll.h
index 04ede93113dc..27e2db2ed4ae 100644
--- a/io_uring/poll.h
+++ b/io_uring/poll.h
@@ -1,5 +1,7 @@
// SPDX-License-Identifier: GPL-2.0
+#include <linux/io_uring_types.h>
+
#define IO_POLL_ALLOC_CACHE_MAX 32
enum {
@@ -43,4 +45,4 @@ int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags);
bool io_poll_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx,
bool cancel_all);
-void io_poll_task_func(struct io_kiocb *req, struct io_tw_state *ts);
+void io_poll_task_func(struct io_kiocb *req, io_tw_token_t tw);
diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h
index 190f7ee45de9..a6d883c62b22 100644
--- a/io_uring/rsrc.h
+++ b/io_uring/rsrc.h
@@ -83,7 +83,7 @@ static inline struct io_rsrc_node *io_rsrc_node_lookup(struct io_rsrc_data *data
static inline void io_put_rsrc_node(struct io_ring_ctx *ctx, struct io_rsrc_node *node)
{
lockdep_assert_held(&ctx->uring_lock);
- if (node && !--node->refs)
+ if (!--node->refs)
io_free_rsrc_node(ctx, node);
}
diff --git a/io_uring/rw.c b/io_uring/rw.c
index 7aa1e4c9f64a..16f12f94943f 100644
--- a/io_uring/rw.c
+++ b/io_uring/rw.c
@@ -511,7 +511,7 @@ static inline int io_fixup_rw_res(struct io_kiocb *req, long res)
return res;
}
-void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts)
+void io_req_rw_complete(struct io_kiocb *req, io_tw_token_t tw)
{
struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
struct kiocb *kiocb = &rw->kiocb;
@@ -528,7 +528,7 @@ void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts)
req->cqe.flags |= io_put_kbuf(req, req->cqe.res, 0);
io_req_rw_cleanup(req, 0);
- io_req_task_complete(req, ts);
+ io_req_task_complete(req, tw);
}
static void io_complete_rw(struct kiocb *kiocb, long res)
diff --git a/io_uring/rw.h b/io_uring/rw.h
index eaa59bd64870..a45e0c71b59d 100644
--- a/io_uring/rw.h
+++ b/io_uring/rw.h
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: GPL-2.0
+#include <linux/io_uring_types.h>
#include <linux/pagemap.h>
struct io_meta_state {
@@ -39,7 +40,7 @@ int io_read(struct io_kiocb *req, unsigned int issue_flags);
int io_write(struct io_kiocb *req, unsigned int issue_flags);
void io_readv_writev_cleanup(struct io_kiocb *req);
void io_rw_fail(struct io_kiocb *req);
-void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts);
+void io_req_rw_complete(struct io_kiocb *req, io_tw_token_t tw);
int io_read_mshot_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
int io_read_mshot(struct io_kiocb *req, unsigned int issue_flags);
void io_rw_cache_free(const void *entry);
diff --git a/io_uring/splice.c b/io_uring/splice.c
index 5b84f1630611..7b89bd84d486 100644
--- a/io_uring/splice.c
+++ b/io_uring/splice.c
@@ -51,7 +51,8 @@ void io_splice_cleanup(struct io_kiocb *req)
{
struct io_splice *sp = io_kiocb_to_cmd(req, struct io_splice);
- io_put_rsrc_node(req->ctx, sp->rsrc_node);
+ if (sp->rsrc_node)
+ io_put_rsrc_node(req->ctx, sp->rsrc_node);
}
static struct file *io_splice_get_file(struct io_kiocb *req,
diff --git a/io_uring/timeout.c b/io_uring/timeout.c
index 48fc8cf70784..fec6ec7beb62 100644
--- a/io_uring/timeout.c
+++ b/io_uring/timeout.c
@@ -65,7 +65,7 @@ static inline bool io_timeout_finish(struct io_timeout *timeout,
static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer);
-static void io_timeout_complete(struct io_kiocb *req, struct io_tw_state *ts)
+static void io_timeout_complete(struct io_kiocb *req, io_tw_token_t tw)
{
struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout);
struct io_timeout_data *data = req->async_data;
@@ -82,7 +82,7 @@ static void io_timeout_complete(struct io_kiocb *req, struct io_tw_state *ts)
}
}
- io_req_task_complete(req, ts);
+ io_req_task_complete(req, tw);
}
static __cold bool io_flush_killed_timeouts(struct list_head *list, int err)
@@ -154,9 +154,9 @@ __cold void io_flush_timeouts(struct io_ring_ctx *ctx)
io_flush_killed_timeouts(&list, 0);
}
-static void io_req_tw_fail_links(struct io_kiocb *link, struct io_tw_state *ts)
+static void io_req_tw_fail_links(struct io_kiocb *link, io_tw_token_t tw)
{
- io_tw_lock(link->ctx, ts);
+ io_tw_lock(link->ctx, tw);
while (link) {
struct io_kiocb *nxt = link->link;
long res = -ECANCELED;
@@ -165,7 +165,7 @@ static void io_req_tw_fail_links(struct io_kiocb *link, struct io_tw_state *ts)
res = link->cqe.res;
link->link = NULL;
io_req_set_res(link, res, 0);
- io_req_task_complete(link, ts);
+ io_req_task_complete(link, tw);
link = nxt;
}
}
@@ -312,7 +312,7 @@ int io_timeout_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd)
return 0;
}
-static void io_req_task_link_timeout(struct io_kiocb *req, struct io_tw_state *ts)
+static void io_req_task_link_timeout(struct io_kiocb *req, io_tw_token_t tw)
{
struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout);
struct io_kiocb *prev = timeout->prev;
@@ -330,11 +330,11 @@ static void io_req_task_link_timeout(struct io_kiocb *req, struct io_tw_state *t
ret = -ECANCELED;
}
io_req_set_res(req, ret ?: -ETIME, 0);
- io_req_task_complete(req, ts);
+ io_req_task_complete(req, tw);
io_put_req(prev);
} else {
io_req_set_res(req, -ETIME, 0);
- io_req_task_complete(req, ts);
+ io_req_task_complete(req, tw);
}
}
diff --git a/io_uring/uring_cmd.c b/io_uring/uring_cmd.c
index e6701b7aa147..8bdf2c9b3fef 100644
--- a/io_uring/uring_cmd.c
+++ b/io_uring/uring_cmd.c
@@ -102,7 +102,7 @@ void io_uring_cmd_mark_cancelable(struct io_uring_cmd *cmd,
}
EXPORT_SYMBOL_GPL(io_uring_cmd_mark_cancelable);
-static void io_uring_cmd_work(struct io_kiocb *req, struct io_tw_state *ts)
+static void io_uring_cmd_work(struct io_kiocb *req, io_tw_token_t tw)
{
struct io_uring_cmd *ioucmd = io_kiocb_to_cmd(req, struct io_uring_cmd);
unsigned int flags = IO_URING_F_COMPLETE_DEFER;
diff --git a/io_uring/waitid.c b/io_uring/waitid.c
index 15a7daf3ff4f..347b8f53efa7 100644
--- a/io_uring/waitid.c
+++ b/io_uring/waitid.c
@@ -16,7 +16,7 @@
#include "waitid.h"
#include "../kernel/exit.h"
-static void io_waitid_cb(struct io_kiocb *req, struct io_tw_state *ts);
+static void io_waitid_cb(struct io_kiocb *req, io_tw_token_t tw);
#define IO_WAITID_CANCEL_FLAG BIT(31)
#define IO_WAITID_REF_MASK GENMASK(30, 0)
@@ -132,7 +132,7 @@ static void io_waitid_complete(struct io_kiocb *req, int ret)
io_req_set_res(req, ret, 0);
}
-static bool __io_waitid_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req)
+static bool __io_waitid_cancel(struct io_kiocb *req)
{
struct io_waitid *iw = io_kiocb_to_cmd(req, struct io_waitid);
struct io_waitid_async *iwa = req->async_data;
@@ -158,49 +158,13 @@ static bool __io_waitid_cancel(struct io_ring_ctx *ctx, struct io_kiocb *req)
int io_waitid_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd,
unsigned int issue_flags)
{
- struct hlist_node *tmp;
- struct io_kiocb *req;
- int nr = 0;
-
- if (cd->flags & (IORING_ASYNC_CANCEL_FD|IORING_ASYNC_CANCEL_FD_FIXED))
- return -ENOENT;
-
- io_ring_submit_lock(ctx, issue_flags);
- hlist_for_each_entry_safe(req, tmp, &ctx->waitid_list, hash_node) {
- if (req->cqe.user_data != cd->data &&
- !(cd->flags & IORING_ASYNC_CANCEL_ANY))
- continue;
- if (__io_waitid_cancel(ctx, req))
- nr++;
- if (!(cd->flags & IORING_ASYNC_CANCEL_ALL))
- break;
- }
- io_ring_submit_unlock(ctx, issue_flags);
-
- if (nr)
- return nr;
-
- return -ENOENT;
+ return io_cancel_remove(ctx, cd, issue_flags, &ctx->waitid_list, __io_waitid_cancel);
}
bool io_waitid_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx,
bool cancel_all)
{
- struct hlist_node *tmp;
- struct io_kiocb *req;
- bool found = false;
-
- lockdep_assert_held(&ctx->uring_lock);
-
- hlist_for_each_entry_safe(req, tmp, &ctx->waitid_list, hash_node) {
- if (!io_match_task_safe(req, tctx, cancel_all))
- continue;
- hlist_del_init(&req->hash_node);
- __io_waitid_cancel(ctx, req);
- found = true;
- }
-
- return found;
+ return io_cancel_remove_all(ctx, tctx, &ctx->waitid_list, cancel_all, __io_waitid_cancel);
}
static inline bool io_waitid_drop_issue_ref(struct io_kiocb *req)
@@ -221,13 +185,13 @@ static inline bool io_waitid_drop_issue_ref(struct io_kiocb *req)
return true;
}
-static void io_waitid_cb(struct io_kiocb *req, struct io_tw_state *ts)
+static void io_waitid_cb(struct io_kiocb *req, io_tw_token_t tw)
{
struct io_waitid_async *iwa = req->async_data;
struct io_ring_ctx *ctx = req->ctx;
int ret;
- io_tw_lock(ctx, ts);
+ io_tw_lock(ctx, tw);
ret = __do_wait(&iwa->wo);
@@ -257,7 +221,7 @@ static void io_waitid_cb(struct io_kiocb *req, struct io_tw_state *ts)
}
io_waitid_complete(req, ret);
- io_req_task_complete(req, ts);
+ io_req_task_complete(req, tw);
}
static int io_waitid_wait(struct wait_queue_entry *wait, unsigned mode,