diff options
Diffstat (limited to 'io_uring/io-wq.c')
-rw-r--r-- | io_uring/io-wq.c | 39 |
1 files changed, 22 insertions, 17 deletions
diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index d1c47a9d9215..f1e7c670add8 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -23,6 +23,7 @@ #include "io_uring.h" #define WORKER_IDLE_TIMEOUT (5 * HZ) +#define WORKER_INIT_LIMIT 3 enum { IO_WORKER_F_UP = 0, /* up and active */ @@ -58,6 +59,7 @@ struct io_worker { unsigned long create_state; struct callback_head create_work; + int init_retries; union { struct rcu_head rcu; @@ -159,7 +161,7 @@ static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, struct io_wq_work *work) { - return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND)); + return io_get_acct(wq, !(atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND)); } static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) @@ -451,7 +453,7 @@ static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) static inline unsigned int io_get_work_hash(struct io_wq_work *work) { - return work->flags >> IO_WQ_HASH_SHIFT; + return atomic_read(&work->flags) >> IO_WQ_HASH_SHIFT; } static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) @@ -592,8 +594,9 @@ static void io_worker_handle_work(struct io_wq_acct *acct, next_hashed = wq_next_work(work); - if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND)) - work->flags |= IO_WQ_WORK_CANCEL; + if (do_kill && + (atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND)) + atomic_or(IO_WQ_WORK_CANCEL, &work->flags); wq->do_work(work); io_assign_current_work(worker, NULL); @@ -744,7 +747,7 @@ static bool io_wq_work_match_all(struct io_wq_work *work, void *data) return true; } -static inline bool io_should_retry_thread(long err) +static inline bool io_should_retry_thread(struct io_worker *worker, long err) { /* * Prevent perpetual task_work retry, if the task (or its group) is @@ -752,6 +755,8 @@ static inline bool io_should_retry_thread(long err) */ if (fatal_signal_pending(current)) return false; + if (worker->init_retries++ >= WORKER_INIT_LIMIT) + return false; switch (err) { case -EAGAIN: @@ -778,7 +783,7 @@ static void create_worker_cont(struct callback_head *cb) io_init_new_worker(wq, worker, tsk); io_worker_release(worker); return; - } else if (!io_should_retry_thread(PTR_ERR(tsk))) { + } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { struct io_wq_acct *acct = io_wq_get_acct(worker); atomic_dec(&acct->nr_running); @@ -845,7 +850,7 @@ fail: tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); if (!IS_ERR(tsk)) { io_init_new_worker(wq, worker, tsk); - } else if (!io_should_retry_thread(PTR_ERR(tsk))) { + } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { kfree(worker); goto fail; } else { @@ -891,7 +896,7 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data) static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) { do { - work->flags |= IO_WQ_WORK_CANCEL; + atomic_or(IO_WQ_WORK_CANCEL, &work->flags); wq->do_work(work); work = wq->free_work(work); } while (work); @@ -926,8 +931,12 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data) void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) { struct io_wq_acct *acct = io_work_get_acct(wq, work); - unsigned long work_flags = work->flags; - struct io_cb_cancel_data match; + unsigned int work_flags = atomic_read(&work->flags); + struct io_cb_cancel_data match = { + .fn = io_wq_work_match_item, + .data = work, + .cancel_all = false, + }; bool do_create; /* @@ -935,7 +944,7 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) * been marked as one that should not get executed, cancel it here. */ if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || - (work->flags & IO_WQ_WORK_CANCEL)) { + (work_flags & IO_WQ_WORK_CANCEL)) { io_run_cancel(work, wq); return; } @@ -965,10 +974,6 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) raw_spin_unlock(&wq->lock); /* fatal condition, failed to create the first worker */ - match.fn = io_wq_work_match_item, - match.data = work, - match.cancel_all = false, - io_acct_cancel_pending_work(wq, acct, &match); } } @@ -982,7 +987,7 @@ void io_wq_hash_work(struct io_wq_work *work, void *val) unsigned int bit; bit = hash_ptr(val, IO_WQ_HASH_ORDER); - work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); + atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags); } static bool __io_wq_worker_cancel(struct io_worker *worker, @@ -990,7 +995,7 @@ static bool __io_wq_worker_cancel(struct io_worker *worker, struct io_wq_work *work) { if (work && match->fn(work, match->data)) { - work->flags |= IO_WQ_WORK_CANCEL; + atomic_or(IO_WQ_WORK_CANCEL, &work->flags); __set_notify_signal(worker->task); return true; } |