summaryrefslogtreecommitdiff
path: root/io_uring/io_uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r--io_uring/io_uring.c150
1 files changed, 55 insertions, 95 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index c326e2127dd4..8e6faa942a6f 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -95,12 +95,14 @@
#include "futex.h"
#include "napi.h"
#include "uring_cmd.h"
+#include "msg_ring.h"
#include "memmap.h"
#include "timeout.h"
#include "poll.h"
#include "rw.h"
#include "alloc_cache.h"
+#include "eventfd.h"
#define IORING_MAX_ENTRIES 32768
#define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES)
@@ -314,6 +316,9 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
sizeof(struct io_async_rw));
ret |= io_alloc_cache_init(&ctx->uring_cache, IO_ALLOC_CACHE_MAX,
sizeof(struct uring_cache));
+ spin_lock_init(&ctx->msg_lock);
+ ret |= io_alloc_cache_init(&ctx->msg_cache, IO_ALLOC_CACHE_MAX,
+ sizeof(struct io_kiocb));
ret |= io_futex_cache_init(ctx);
if (ret)
goto err;
@@ -350,6 +355,7 @@ err:
io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
io_alloc_cache_free(&ctx->uring_cache, kfree);
+ io_alloc_cache_free(&ctx->msg_cache, io_msg_cache_free);
io_futex_cache_free(ctx);
kfree(ctx->cancel_table.hbs);
kfree(ctx->cancel_table_locked.hbs);
@@ -461,9 +467,9 @@ static void io_prep_async_work(struct io_kiocb *req)
}
req->work.list.next = NULL;
- req->work.flags = 0;
+ atomic_set(&req->work.flags, 0);
if (req->flags & REQ_F_FORCE_ASYNC)
- req->work.flags |= IO_WQ_WORK_CONCURRENT;
+ atomic_or(IO_WQ_WORK_CONCURRENT, &req->work.flags);
if (req->file && !(req->flags & REQ_F_FIXED_FILE))
req->flags |= io_file_get_flags(req->file);
@@ -479,7 +485,7 @@ static void io_prep_async_work(struct io_kiocb *req)
io_wq_hash_work(&req->work, file_inode(req->file));
} else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
if (def->unbound_nonreg_file)
- req->work.flags |= IO_WQ_WORK_UNBOUND;
+ atomic_or(IO_WQ_WORK_UNBOUND, &req->work.flags);
}
}
@@ -519,7 +525,7 @@ static void io_queue_iowq(struct io_kiocb *req)
* worker for it).
*/
if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
- req->work.flags |= IO_WQ_WORK_CANCEL;
+ atomic_or(IO_WQ_WORK_CANCEL, &req->work.flags);
trace_io_uring_queue_async_work(req, io_wq_is_hashed(&req->work));
io_wq_enqueue(tctx->io_wq, &req->work);
@@ -541,84 +547,6 @@ static __cold void io_queue_deferred(struct io_ring_ctx *ctx)
}
}
-void io_eventfd_ops(struct rcu_head *rcu)
-{
- struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
- int ops = atomic_xchg(&ev_fd->ops, 0);
-
- if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT))
- eventfd_signal_mask(ev_fd->cq_ev_fd, EPOLL_URING_WAKE);
-
- /* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback
- * ordering in a race but if references are 0 we know we have to free
- * it regardless.
- */
- if (atomic_dec_and_test(&ev_fd->refs)) {
- eventfd_ctx_put(ev_fd->cq_ev_fd);
- kfree(ev_fd);
- }
-}
-
-static void io_eventfd_signal(struct io_ring_ctx *ctx)
-{
- struct io_ev_fd *ev_fd = NULL;
-
- rcu_read_lock();
- /*
- * rcu_dereference ctx->io_ev_fd once and use it for both for checking
- * and eventfd_signal
- */
- ev_fd = rcu_dereference(ctx->io_ev_fd);
-
- /*
- * Check again if ev_fd exists incase an io_eventfd_unregister call
- * completed between the NULL check of ctx->io_ev_fd at the start of
- * the function and rcu_read_lock.
- */
- if (unlikely(!ev_fd))
- goto out;
- if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
- goto out;
- if (ev_fd->eventfd_async && !io_wq_current_is_worker())
- goto out;
-
- if (likely(eventfd_signal_allowed())) {
- eventfd_signal_mask(ev_fd->cq_ev_fd, EPOLL_URING_WAKE);
- } else {
- atomic_inc(&ev_fd->refs);
- if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops))
- call_rcu_hurry(&ev_fd->rcu, io_eventfd_ops);
- else
- atomic_dec(&ev_fd->refs);
- }
-
-out:
- rcu_read_unlock();
-}
-
-static void io_eventfd_flush_signal(struct io_ring_ctx *ctx)
-{
- bool skip;
-
- spin_lock(&ctx->completion_lock);
-
- /*
- * Eventfd should only get triggered when at least one event has been
- * posted. Some applications rely on the eventfd notification count
- * only changing IFF a new CQE has been added to the CQ ring. There's
- * no depedency on 1:1 relationship between how many times this
- * function is called (and hence the eventfd count) and number of CQEs
- * posted to the CQ ring.
- */
- skip = ctx->cached_cq_tail == ctx->evfd_last_cq_tail;
- ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
- spin_unlock(&ctx->completion_lock);
- if (skip)
- return;
-
- io_eventfd_signal(ctx);
-}
-
void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
{
if (ctx->poll_activated)
@@ -878,20 +806,43 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
return false;
}
-bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
+static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res,
+ u32 cflags)
{
bool filled;
- io_cq_lock(ctx);
filled = io_fill_cqe_aux(ctx, user_data, res, cflags);
if (!filled)
filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
+ return filled;
+}
+
+bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
+{
+ bool filled;
+
+ io_cq_lock(ctx);
+ filled = __io_post_aux_cqe(ctx, user_data, res, cflags);
io_cq_unlock_post(ctx);
return filled;
}
/*
+ * Must be called from inline task_work so we now a flush will happen later,
+ * and obviously with ctx->uring_lock held (tw always has that).
+ */
+void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
+{
+ if (!io_fill_cqe_aux(ctx, user_data, res, cflags)) {
+ spin_lock(&ctx->completion_lock);
+ io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
+ spin_unlock(&ctx->completion_lock);
+ }
+ ctx->submit_state.cq_flush = true;
+}
+
+/*
* A helper for multishot requests posting additional CQEs.
* Should only be used from a task_work including IO_URING_F_MULTISHOT.
*/
@@ -1175,9 +1126,10 @@ void tctx_task_work(struct callback_head *cb)
WARN_ON_ONCE(ret);
}
-static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
+static inline void io_req_local_work_add(struct io_kiocb *req,
+ struct io_ring_ctx *ctx,
+ unsigned flags)
{
- struct io_ring_ctx *ctx = req->ctx;
unsigned nr_wait, nr_tw, nr_tw_prev;
struct llist_node *head;
@@ -1191,6 +1143,8 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
flags &= ~IOU_F_TWQ_LAZY_WAKE;
+ guard(rcu)();
+
head = READ_ONCE(ctx->work_llist.first);
do {
nr_tw_prev = 0;
@@ -1272,13 +1226,18 @@ static void io_req_normal_work_add(struct io_kiocb *req)
void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
{
- if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
- rcu_read_lock();
- io_req_local_work_add(req, flags);
- rcu_read_unlock();
- } else {
+ if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)
+ io_req_local_work_add(req, req->ctx, flags);
+ else
io_req_normal_work_add(req);
- }
+}
+
+void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
+ unsigned flags)
+{
+ if (WARN_ON_ONCE(!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
+ return;
+ io_req_local_work_add(req, ctx, flags);
}
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
@@ -1467,7 +1426,7 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx)
}
__io_cq_unlock_post(ctx);
- if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
+ if (!wq_list_empty(&state->compl_reqs)) {
io_free_batch_list(ctx, state->compl_reqs.first);
INIT_WQ_LIST(&state->compl_reqs);
}
@@ -1813,14 +1772,14 @@ void io_wq_submit_work(struct io_wq_work *work)
io_arm_ltimeout(req);
/* either cancelled or io-wq is dying, so don't touch tctx->iowq */
- if (work->flags & IO_WQ_WORK_CANCEL) {
+ if (atomic_read(&work->flags) & IO_WQ_WORK_CANCEL) {
fail:
io_req_task_queue_fail(req, err);
return;
}
if (!io_assign_file(req, def, issue_flags)) {
err = -EBADF;
- work->flags |= IO_WQ_WORK_CANCEL;
+ atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
goto fail;
}
@@ -2649,6 +2608,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
io_alloc_cache_free(&ctx->uring_cache, kfree);
+ io_alloc_cache_free(&ctx->msg_cache, io_msg_cache_free);
io_futex_cache_free(ctx);
io_destroy_buffers(ctx);
mutex_unlock(&ctx->uring_lock);