diff options
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r-- | io_uring/io_uring.c | 167 |
1 files changed, 110 insertions, 57 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 3436e0b83534..b521186efa5c 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -149,6 +149,7 @@ static void io_clean_op(struct io_kiocb *req); static void io_queue_sqe(struct io_kiocb *req); static void io_move_task_work_from_local(struct io_ring_ctx *ctx); static void __io_submit_flush_completions(struct io_ring_ctx *ctx); +static __cold void io_fallback_tw(struct io_uring_task *tctx); static struct kmem_cache *req_cachep; @@ -326,6 +327,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) spin_lock_init(&ctx->rsrc_ref_lock); INIT_LIST_HEAD(&ctx->rsrc_ref_list); INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work); + init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw); init_llist_head(&ctx->rsrc_put_llist); init_llist_head(&ctx->work_llist); INIT_LIST_HEAD(&ctx->tctx_list); @@ -582,13 +584,25 @@ void __io_commit_cqring_flush(struct io_ring_ctx *ctx) io_eventfd_flush_signal(ctx); } +static inline void __io_cq_lock(struct io_ring_ctx *ctx) + __acquires(ctx->completion_lock) +{ + if (!ctx->task_complete) + spin_lock(&ctx->completion_lock); +} + +static inline void __io_cq_unlock(struct io_ring_ctx *ctx) +{ + if (!ctx->task_complete) + spin_unlock(&ctx->completion_lock); +} + /* keep it inlined for io_submit_flush_completions() */ -static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx) +static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx) __releases(ctx->completion_lock) { io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); - + __io_cq_unlock(ctx); io_commit_cqring_flush(ctx); io_cqring_wake(ctx); } @@ -596,17 +610,37 @@ static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx) void io_cq_unlock_post(struct io_ring_ctx *ctx) __releases(ctx->completion_lock) { - io_cq_unlock_post_inline(ctx); + io_commit_cqring(ctx); + spin_unlock(&ctx->completion_lock); + io_commit_cqring_flush(ctx); + io_cqring_wake(ctx); } /* Returns true if there are no backlogged entries after the flush */ -static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) +static void io_cqring_overflow_kill(struct io_ring_ctx *ctx) +{ + struct io_overflow_cqe *ocqe; + LIST_HEAD(list); + + io_cq_lock(ctx); + list_splice_init(&ctx->cq_overflow_list, &list); + clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); + io_cq_unlock(ctx); + + while (!list_empty(&list)) { + ocqe = list_first_entry(&list, struct io_overflow_cqe, list); + list_del(&ocqe->list); + kfree(ocqe); + } +} + +/* Returns true if there are no backlogged entries after the flush */ +static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx) { - bool all_flushed; size_t cqe_size = sizeof(struct io_uring_cqe); - if (!force && __io_cqring_events(ctx) == ctx->cq_entries) - return false; + if (__io_cqring_events(ctx) == ctx->cq_entries) + return; if (ctx->flags & IORING_SETUP_CQE32) cqe_size <<= 1; @@ -616,43 +650,32 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true); struct io_overflow_cqe *ocqe; - if (!cqe && !force) + if (!cqe) break; ocqe = list_first_entry(&ctx->cq_overflow_list, struct io_overflow_cqe, list); - if (cqe) - memcpy(cqe, &ocqe->cqe, cqe_size); - else - io_account_cq_overflow(ctx); - + memcpy(cqe, &ocqe->cqe, cqe_size); list_del(&ocqe->list); kfree(ocqe); } - all_flushed = list_empty(&ctx->cq_overflow_list); - if (all_flushed) { + if (list_empty(&ctx->cq_overflow_list)) { clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags); } - io_cq_unlock_post(ctx); - return all_flushed; } -static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx) +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx) { - bool ret = true; - if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { /* iopoll syncs against uring_lock, not completion_lock */ if (ctx->flags & IORING_SETUP_IOPOLL) mutex_lock(&ctx->uring_lock); - ret = __io_cqring_overflow_flush(ctx, false); + __io_cqring_overflow_flush(ctx); if (ctx->flags & IORING_SETUP_IOPOLL) mutex_unlock(&ctx->uring_lock); } - - return ret; } void __io_put_task(struct task_struct *task, int nr) @@ -777,12 +800,13 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow) return &rings->cqes[off]; } -static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, - bool allow_overflow) +static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, + u32 cflags) { struct io_uring_cqe *cqe; - lockdep_assert_held(&ctx->completion_lock); + if (!ctx->task_complete) + lockdep_assert_held(&ctx->completion_lock); ctx->cq_extra++; @@ -805,10 +829,6 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 } return true; } - - if (allow_overflow) - return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); - return false; } @@ -822,7 +842,17 @@ static void __io_flush_post_cqes(struct io_ring_ctx *ctx) for (i = 0; i < state->cqes_count; i++) { struct io_uring_cqe *cqe = &state->cqes[i]; - io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags, true); + if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) { + if (ctx->task_complete) { + spin_lock(&ctx->completion_lock); + io_cqring_event_overflow(ctx, cqe->user_data, + cqe->res, cqe->flags, 0, 0); + spin_unlock(&ctx->completion_lock); + } else { + io_cqring_event_overflow(ctx, cqe->user_data, + cqe->res, cqe->flags, 0, 0); + } + } } state->cqes_count = 0; } @@ -833,7 +863,10 @@ static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u bool filled; io_cq_lock(ctx); - filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow); + filled = io_fill_cqe_aux(ctx, user_data, res, cflags); + if (!filled && allow_overflow) + filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); + io_cq_unlock_post(ctx); return filled; } @@ -857,10 +890,10 @@ bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 lockdep_assert_held(&ctx->uring_lock); if (ctx->submit_state.cqes_count == length) { - io_cq_lock(ctx); + __io_cq_lock(ctx); __io_flush_post_cqes(ctx); /* no need to flush - flush is deferred */ - spin_unlock(&ctx->completion_lock); + __io_cq_unlock_post(ctx); } /* For defered completions this is not as strict as it is otherwise, @@ -915,8 +948,11 @@ static void __io_req_complete_post(struct io_kiocb *req) void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) { - if (!(issue_flags & IO_URING_F_UNLOCKED) || - !(req->ctx->flags & IORING_SETUP_IOPOLL)) { + if (req->ctx->task_complete && (issue_flags & IO_URING_F_IOWQ)) { + req->io_task_work.func = io_req_task_complete; + io_req_task_work_add(req); + } else if (!(issue_flags & IO_URING_F_UNLOCKED) || + !(req->ctx->flags & IORING_SETUP_IOPOLL)) { __io_req_complete_post(req); } else { struct io_ring_ctx *ctx = req->ctx; @@ -1139,10 +1175,17 @@ void tctx_task_work(struct callback_head *cb) struct io_uring_task *tctx = container_of(cb, struct io_uring_task, task_work); struct llist_node fake = {}; - struct llist_node *node = io_llist_xchg(&tctx->task_list, &fake); + struct llist_node *node; unsigned int loops = 1; - unsigned int count = handle_tw_list(node, &ctx, &uring_locked, NULL); + unsigned int count; + + if (unlikely(current->flags & PF_EXITING)) { + io_fallback_tw(tctx); + return; + } + node = io_llist_xchg(&tctx->task_list, &fake); + count = handle_tw_list(node, &ctx, &uring_locked, NULL); node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL); while (node != &fake) { loops++; @@ -1385,7 +1428,7 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) struct io_wq_work_node *node, *prev; struct io_submit_state *state = &ctx->submit_state; - io_cq_lock(ctx); + __io_cq_lock(ctx); /* must come first to preserve CQE ordering in failure cases */ if (state->cqes_count) __io_flush_post_cqes(ctx); @@ -1393,10 +1436,18 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) struct io_kiocb *req = container_of(node, struct io_kiocb, comp_list); - if (!(req->flags & REQ_F_CQE_SKIP)) - __io_fill_cqe_req(ctx, req); + if (!(req->flags & REQ_F_CQE_SKIP) && + unlikely(!__io_fill_cqe_req(ctx, req))) { + if (ctx->task_complete) { + spin_lock(&ctx->completion_lock); + io_req_cqe_overflow(req); + spin_unlock(&ctx->completion_lock); + } else { + io_req_cqe_overflow(req); + } + } } - io_cq_unlock_post_inline(ctx); + __io_cq_unlock_post(ctx); if (!wq_list_empty(&ctx->submit_state.compl_reqs)) { io_free_batch_list(ctx, state->compl_reqs.first); @@ -1467,7 +1518,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) check_cq = READ_ONCE(ctx->check_cq); if (unlikely(check_cq)) { if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) - __io_cqring_overflow_flush(ctx, false); + __io_cqring_overflow_flush(ctx); /* * Similarly do not spin if we have not informed the user of any * dropped CQE. @@ -1799,7 +1850,7 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) return ret; /* If the op doesn't have a file, we're not polling for it */ - if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file) + if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue) io_iopoll_req_issued(req, issue_flags); return 0; @@ -1808,8 +1859,6 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) int io_poll_issue(struct io_kiocb *req, bool *locked) { io_tw_lock(req->ctx, locked); - if (unlikely(req->task->flags & PF_EXITING)) - return -EFAULT; return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT| IO_URING_F_COMPLETE_DEFER); } @@ -1826,7 +1875,7 @@ void io_wq_submit_work(struct io_wq_work *work) { struct io_kiocb *req = container_of(work, struct io_kiocb, work); const struct io_op_def *def = &io_op_defs[req->opcode]; - unsigned int issue_flags = IO_URING_F_UNLOCKED; + unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ; bool needs_poll = false; int ret = 0, err = -ECANCELED; @@ -2482,11 +2531,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, trace_io_uring_cqring_wait(ctx, min_events); do { - /* if we can't even flush overflow, don't wait for more */ - if (!io_cqring_overflow_flush(ctx)) { - ret = -EBUSY; - break; - } + io_cqring_overflow_flush(ctx); prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, TASK_INTERRUPTIBLE); ret = io_cqring_wait_schedule(ctx, &iowq, timeout); @@ -2637,8 +2682,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) __io_sqe_buffers_unregister(ctx); if (ctx->file_data) __io_sqe_files_unregister(ctx); - if (ctx->rings) - __io_cqring_overflow_flush(ctx, true); + io_cqring_overflow_kill(ctx); io_eventfd_unregister(ctx); io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free); io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free); @@ -2781,6 +2825,12 @@ static __cold void io_ring_exit_work(struct work_struct *work) * as nobody else will be looking for them. */ do { + if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { + mutex_lock(&ctx->uring_lock); + io_cqring_overflow_kill(ctx); + mutex_unlock(&ctx->uring_lock); + } + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) io_move_task_work_from_local(ctx); @@ -2846,8 +2896,6 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) mutex_lock(&ctx->uring_lock); percpu_ref_kill(&ctx->refs); - if (ctx->rings) - __io_cqring_overflow_flush(ctx, true); xa_for_each(&ctx->personalities, index, creds) io_unregister_personality(ctx, index); if (ctx->rings) @@ -3489,6 +3537,11 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, if (!ctx) return -ENOMEM; + if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && + !(ctx->flags & IORING_SETUP_IOPOLL) && + !(ctx->flags & IORING_SETUP_SQPOLL)) + ctx->task_complete = true; + /* * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user * space applications don't need to do io completion events |