aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2019-10-26 14:59:51 -0400
committerLinus Torvalds <torvalds@linux-foundation.org>2019-10-26 14:59:51 -0400
commitacf913b7fb89f4313806ceb317d3730067f84f20 (patch)
treeb46ecc7fa1a384d8ce31830cdba36f96c5e719a5 /fs
parentMerge tag 's390-5.4-5' of git://git.kernel.org/pub/scm/linux/kernel/git/s390/linux (diff)
parentnbd: verify socket is supported during setup (diff)
downloadlinux-dev-acf913b7fb89f4313806ceb317d3730067f84f20.tar.xz
linux-dev-acf913b7fb89f4313806ceb317d3730067f84f20.zip
Merge tag 'for-linus-2019-10-26' of git://git.kernel.dk/linux-block
Pull block and io_uring fixes from Jens Axboe: "A bit bigger than usual at this point in time, mostly due to some good bug hunting work by Pavel that resulted in three io_uring fixes from him and two from me. Anyway, this pull request contains: - Revert of the submit-and-wait optimization for io_uring, it can't always be done safely. It depends on commands always making progress on their own, which isn't necessarily the case outside of strict file IO. (me) - Series of two patches from me and three from Pavel, fixing issues with shared data and sequencing for io_uring. - Lastly, two timeout sequence fixes for io_uring (zhangyi) - Two nbd patches fixing races (Josef) - libahci regulator_get_optional() fix (Mark)" * tag 'for-linus-2019-10-26' of git://git.kernel.dk/linux-block: nbd: verify socket is supported during setup ata: libahci_platform: Fix regulator_get_optional() misuse nbd: handle racing with error'ed out commands nbd: protect cmd->status with cmd->lock io_uring: fix bad inflight accounting for SETUP_IOPOLL|SETUP_SQTHREAD io_uring: used cached copies of sq->dropped and cq->overflow io_uring: Fix race for sqes with userspace io_uring: Fix broken links with offloading io_uring: Fix corrupted user_data io_uring: correct timeout req sequence when inserting a new entry io_uring : correct timeout req sequence when waiting timeout io_uring: revert "io_uring: optimize submit_and_wait API"
Diffstat (limited to 'fs')
-rw-r--r--fs/io_uring.c207
1 files changed, 113 insertions, 94 deletions
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 67dbe0201e0d..a30c4f622cb3 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -197,6 +197,7 @@ struct io_ring_ctx {
unsigned sq_entries;
unsigned sq_mask;
unsigned sq_thread_idle;
+ unsigned cached_sq_dropped;
struct io_uring_sqe *sq_sqes;
struct list_head defer_list;
@@ -212,6 +213,7 @@ struct io_ring_ctx {
struct {
unsigned cached_cq_tail;
+ atomic_t cached_cq_overflow;
unsigned cq_entries;
unsigned cq_mask;
struct wait_queue_head cq_wait;
@@ -420,7 +422,8 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
static inline bool __io_sequence_defer(struct io_ring_ctx *ctx,
struct io_kiocb *req)
{
- return req->sequence != ctx->cached_cq_tail + ctx->rings->sq_dropped;
+ return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
+ + atomic_read(&ctx->cached_cq_overflow);
}
static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
@@ -567,9 +570,8 @@ static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
WRITE_ONCE(cqe->res, res);
WRITE_ONCE(cqe->flags, 0);
} else {
- unsigned overflow = READ_ONCE(ctx->rings->cq_overflow);
-
- WRITE_ONCE(ctx->rings->cq_overflow, overflow + 1);
+ WRITE_ONCE(ctx->rings->cq_overflow,
+ atomic_inc_return(&ctx->cached_cq_overflow));
}
}
@@ -735,6 +737,14 @@ static unsigned io_cqring_events(struct io_rings *rings)
return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
}
+static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
+{
+ struct io_rings *rings = ctx->rings;
+
+ /* make sure SQ entry isn't read before tail */
+ return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
+}
+
/*
* Find and free completed poll iocbs
*/
@@ -864,19 +874,11 @@ static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
mutex_unlock(&ctx->uring_lock);
}
-static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
- long min)
+static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
+ long min)
{
- int iters, ret = 0;
-
- /*
- * We disallow the app entering submit/complete with polling, but we
- * still need to lock the ring to prevent racing with polled issue
- * that got punted to a workqueue.
- */
- mutex_lock(&ctx->uring_lock);
+ int iters = 0, ret = 0;
- iters = 0;
do {
int tmin = 0;
@@ -912,6 +914,21 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
ret = 0;
} while (min && !*nr_events && !need_resched());
+ return ret;
+}
+
+static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
+ long min)
+{
+ int ret;
+
+ /*
+ * We disallow the app entering submit/complete with polling, but we
+ * still need to lock the ring to prevent racing with polled issue
+ * that got punted to a workqueue.
+ */
+ mutex_lock(&ctx->uring_lock);
+ ret = __io_iopoll_check(ctx, nr_events, min);
mutex_unlock(&ctx->uring_lock);
return ret;
}
@@ -1877,7 +1894,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
{
struct io_ring_ctx *ctx;
- struct io_kiocb *req;
+ struct io_kiocb *req, *prev;
unsigned long flags;
req = container_of(timer, struct io_kiocb, timeout.timer);
@@ -1885,6 +1902,15 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
atomic_inc(&ctx->cq_timeouts);
spin_lock_irqsave(&ctx->completion_lock, flags);
+ /*
+ * Adjust the reqs sequence before the current one because it
+ * will consume a slot in the cq_ring and the the cq_tail pointer
+ * will be increased, otherwise other timeout reqs may return in
+ * advance without waiting for enough wait_nr.
+ */
+ prev = req;
+ list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
+ prev->sequence++;
list_del(&req->list);
io_cqring_fill_event(ctx, req->user_data, -ETIME);
@@ -1903,6 +1929,7 @@ static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
struct io_ring_ctx *ctx = req->ctx;
struct list_head *entry;
struct timespec64 ts;
+ unsigned span = 0;
if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
return -EINVAL;
@@ -1951,9 +1978,17 @@ static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
if (ctx->cached_sq_head < nxt_sq_head)
tmp += UINT_MAX;
- if (tmp >= tmp_nxt)
+ if (tmp > tmp_nxt)
break;
+
+ /*
+ * Sequence of reqs after the insert one and itself should
+ * be adjusted because each timeout req consumes a slot.
+ */
+ span++;
+ nxt->sequence++;
}
+ req->sequence -= span;
list_add(&req->list, entry);
spin_unlock_irq(&ctx->completion_lock);
@@ -2292,11 +2327,11 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
}
static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
- struct sqe_submit *s, bool force_nonblock)
+ struct sqe_submit *s)
{
int ret;
- ret = __io_submit_sqe(ctx, req, s, force_nonblock);
+ ret = __io_submit_sqe(ctx, req, s, true);
/*
* We async punt it if the file wasn't marked NOWAIT, or if the file
@@ -2343,7 +2378,7 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
}
static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
- struct sqe_submit *s, bool force_nonblock)
+ struct sqe_submit *s)
{
int ret;
@@ -2356,18 +2391,17 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
return 0;
}
- return __io_queue_sqe(ctx, req, s, force_nonblock);
+ return __io_queue_sqe(ctx, req, s);
}
static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
- struct sqe_submit *s, struct io_kiocb *shadow,
- bool force_nonblock)
+ struct sqe_submit *s, struct io_kiocb *shadow)
{
int ret;
int need_submit = false;
if (!shadow)
- return io_queue_sqe(ctx, req, s, force_nonblock);
+ return io_queue_sqe(ctx, req, s);
/*
* Mark the first IO in link list as DRAIN, let all the following
@@ -2396,7 +2430,7 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
spin_unlock_irq(&ctx->completion_lock);
if (need_submit)
- return __io_queue_sqe(ctx, req, s, force_nonblock);
+ return __io_queue_sqe(ctx, req, s);
return 0;
}
@@ -2404,8 +2438,7 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
#define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
- struct io_submit_state *state, struct io_kiocb **link,
- bool force_nonblock)
+ struct io_submit_state *state, struct io_kiocb **link)
{
struct io_uring_sqe *sqe_copy;
struct io_kiocb *req;
@@ -2432,6 +2465,8 @@ err:
return;
}
+ req->user_data = s->sqe->user_data;
+
/*
* If we already have a head request, queue this one for async
* submittal once the head completes. If we don't have a head but
@@ -2458,7 +2493,7 @@ err:
INIT_LIST_HEAD(&req->link_list);
*link = req;
} else {
- io_queue_sqe(ctx, req, s, force_nonblock);
+ io_queue_sqe(ctx, req, s);
}
}
@@ -2538,12 +2573,13 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
/* drop invalid entries */
ctx->cached_sq_head++;
- rings->sq_dropped++;
+ ctx->cached_sq_dropped++;
+ WRITE_ONCE(rings->sq_dropped, ctx->cached_sq_dropped);
return false;
}
-static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
- unsigned int nr, bool has_user, bool mm_fault)
+static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
+ bool has_user, bool mm_fault)
{
struct io_submit_state state, *statep = NULL;
struct io_kiocb *link = NULL;
@@ -2557,19 +2593,23 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
}
for (i = 0; i < nr; i++) {
+ struct sqe_submit s;
+
+ if (!io_get_sqring(ctx, &s))
+ break;
+
/*
* If previous wasn't linked and we have a linked command,
* that's the end of the chain. Submit the previous link.
*/
if (!prev_was_link && link) {
- io_queue_link_head(ctx, link, &link->submit, shadow_req,
- true);
+ io_queue_link_head(ctx, link, &link->submit, shadow_req);
link = NULL;
shadow_req = NULL;
}
- prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
+ prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
- if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) {
+ if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
if (!shadow_req) {
shadow_req = io_get_req(ctx, NULL);
if (unlikely(!shadow_req))
@@ -2577,24 +2617,24 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
refcount_dec(&shadow_req->refs);
}
- shadow_req->sequence = sqes[i].sequence;
+ shadow_req->sequence = s.sequence;
}
out:
if (unlikely(mm_fault)) {
- io_cqring_add_event(ctx, sqes[i].sqe->user_data,
+ io_cqring_add_event(ctx, s.sqe->user_data,
-EFAULT);
} else {
- sqes[i].has_user = has_user;
- sqes[i].needs_lock = true;
- sqes[i].needs_fixed_file = true;
- io_submit_sqe(ctx, &sqes[i], statep, &link, true);
+ s.has_user = has_user;
+ s.needs_lock = true;
+ s.needs_fixed_file = true;
+ io_submit_sqe(ctx, &s, statep, &link);
submitted++;
}
}
if (link)
- io_queue_link_head(ctx, link, &link->submit, shadow_req, true);
+ io_queue_link_head(ctx, link, &link->submit, shadow_req);
if (statep)
io_submit_state_end(&state);
@@ -2603,7 +2643,6 @@ out:
static int io_sq_thread(void *data)
{
- struct sqe_submit sqes[IO_IOPOLL_BATCH];
struct io_ring_ctx *ctx = data;
struct mm_struct *cur_mm = NULL;
mm_segment_t old_fs;
@@ -2618,14 +2657,27 @@ static int io_sq_thread(void *data)
timeout = inflight = 0;
while (!kthread_should_park()) {
- bool all_fixed, mm_fault = false;
- int i;
+ bool mm_fault = false;
+ unsigned int to_submit;
if (inflight) {
unsigned nr_events = 0;
if (ctx->flags & IORING_SETUP_IOPOLL) {
- io_iopoll_check(ctx, &nr_events, 0);
+ /*
+ * inflight is the count of the maximum possible
+ * entries we submitted, but it can be smaller
+ * if we dropped some of them. If we don't have
+ * poll entries available, then we know that we
+ * have nothing left to poll for. Reset the
+ * inflight count to zero in that case.
+ */
+ mutex_lock(&ctx->uring_lock);
+ if (!list_empty(&ctx->poll_list))
+ __io_iopoll_check(ctx, &nr_events, 0);
+ else
+ inflight = 0;
+ mutex_unlock(&ctx->uring_lock);
} else {
/*
* Normal IO, just pretend everything completed.
@@ -2639,7 +2691,8 @@ static int io_sq_thread(void *data)
timeout = jiffies + ctx->sq_thread_idle;
}
- if (!io_get_sqring(ctx, &sqes[0])) {
+ to_submit = io_sqring_entries(ctx);
+ if (!to_submit) {
/*
* We're polling. If we're within the defined idle
* period, then let us spin without work before going
@@ -2670,7 +2723,8 @@ static int io_sq_thread(void *data)
/* make sure to read SQ tail after writing flags */
smp_mb();
- if (!io_get_sqring(ctx, &sqes[0])) {
+ to_submit = io_sqring_entries(ctx);
+ if (!to_submit) {
if (kthread_should_park()) {
finish_wait(&ctx->sqo_wait, &wait);
break;
@@ -2688,19 +2742,8 @@ static int io_sq_thread(void *data)
ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
}
- i = 0;
- all_fixed = true;
- do {
- if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
- all_fixed = false;
-
- i++;
- if (i == ARRAY_SIZE(sqes))
- break;
- } while (io_get_sqring(ctx, &sqes[i]));
-
/* Unless all new commands are FIXED regions, grab mm */
- if (!all_fixed && !cur_mm) {
+ if (!cur_mm) {
mm_fault = !mmget_not_zero(ctx->sqo_mm);
if (!mm_fault) {
use_mm(ctx->sqo_mm);
@@ -2708,8 +2751,9 @@ static int io_sq_thread(void *data)
}
}
- inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
- mm_fault);
+ to_submit = min(to_submit, ctx->sq_entries);
+ inflight += io_submit_sqes(ctx, to_submit, cur_mm != NULL,
+ mm_fault);
/* Commit SQ ring head once we've consumed all SQEs */
io_commit_sqring(ctx);
@@ -2726,8 +2770,7 @@ static int io_sq_thread(void *data)
return 0;
}
-static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
- bool block_for_last)
+static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
{
struct io_submit_state state, *statep = NULL;
struct io_kiocb *link = NULL;
@@ -2741,7 +2784,6 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
}
for (i = 0; i < to_submit; i++) {
- bool force_nonblock = true;
struct sqe_submit s;
if (!io_get_sqring(ctx, &s))
@@ -2752,8 +2794,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
* that's the end of the chain. Submit the previous link.
*/
if (!prev_was_link && link) {
- io_queue_link_head(ctx, link, &link->submit, shadow_req,
- force_nonblock);
+ io_queue_link_head(ctx, link, &link->submit, shadow_req);
link = NULL;
shadow_req = NULL;
}
@@ -2775,27 +2816,16 @@ out:
s.needs_lock = false;
s.needs_fixed_file = false;
submit++;
-
- /*
- * The caller will block for events after submit, submit the
- * last IO non-blocking. This is either the only IO it's
- * submitting, or it already submitted the previous ones. This
- * improves performance by avoiding an async punt that we don't
- * need to do.
- */
- if (block_for_last && submit == to_submit)
- force_nonblock = false;
-
- io_submit_sqe(ctx, &s, statep, &link, force_nonblock);
+ io_submit_sqe(ctx, &s, statep, &link);
}
- io_commit_sqring(ctx);
if (link)
- io_queue_link_head(ctx, link, &link->submit, shadow_req,
- !block_for_last);
+ io_queue_link_head(ctx, link, &link->submit, shadow_req);
if (statep)
io_submit_state_end(statep);
+ io_commit_sqring(ctx);
+
return submit;
}
@@ -3636,21 +3666,10 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
wake_up(&ctx->sqo_wait);
submitted = to_submit;
} else if (to_submit) {
- bool block_for_last = false;
-
to_submit = min(to_submit, ctx->sq_entries);
- /*
- * Allow last submission to block in a series, IFF the caller
- * asked to wait for events and we don't currently have
- * enough. This potentially avoids an async punt.
- */
- if (to_submit == min_complete &&
- io_cqring_events(ctx->rings) < min_complete)
- block_for_last = true;
-
mutex_lock(&ctx->uring_lock);
- submitted = io_ring_submit(ctx, to_submit, block_for_last);
+ submitted = io_ring_submit(ctx, to_submit);
mutex_unlock(&ctx->uring_lock);
}
if (flags & IORING_ENTER_GETEVENTS) {