From a327553965dede92587e6ccbe7df98dba36edcea Mon Sep 17 00:00:00 2001 From: Omar Sandoval Date: Wed, 9 May 2018 17:16:31 -0700 Subject: sbitmap: fix missed wakeups caused by sbitmap_queue_get_shallow() The sbitmap queue wake batch is calculated such that once allocations start blocking, all of the bits which are already allocated must be enough to fulfill the batch counters of all of the waitqueues. However, the shallow allocation depth can break this invariant, since we block before our full depth is being utilized. Add sbitmap_queue_min_shallow_depth(), which saves the minimum shallow depth the sbq will use, and update sbq_calc_wake_batch() to take it into account. Acked-by: Paolo Valente Signed-off-by: Omar Sandoval Signed-off-by: Jens Axboe --- lib/sbitmap.c | 49 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 9 deletions(-) (limited to 'lib') diff --git a/lib/sbitmap.c b/lib/sbitmap.c index e6a9c06ec70c..d21473b42465 100644 --- a/lib/sbitmap.c +++ b/lib/sbitmap.c @@ -270,18 +270,33 @@ void sbitmap_bitmap_show(struct sbitmap *sb, struct seq_file *m) } EXPORT_SYMBOL_GPL(sbitmap_bitmap_show); -static unsigned int sbq_calc_wake_batch(unsigned int depth) +static unsigned int sbq_calc_wake_batch(struct sbitmap_queue *sbq, + unsigned int depth) { unsigned int wake_batch; + unsigned int shallow_depth; /* * For each batch, we wake up one queue. We need to make sure that our - * batch size is small enough that the full depth of the bitmap is - * enough to wake up all of the queues. + * batch size is small enough that the full depth of the bitmap, + * potentially limited by a shallow depth, is enough to wake up all of + * the queues. + * + * Each full word of the bitmap has bits_per_word bits, and there might + * be a partial word. There are depth / bits_per_word full words and + * depth % bits_per_word bits left over. In bitwise arithmetic: + * + * bits_per_word = 1 << shift + * depth / bits_per_word = depth >> shift + * depth % bits_per_word = depth & ((1 << shift) - 1) + * + * Each word can be limited to sbq->min_shallow_depth bits. */ - wake_batch = SBQ_WAKE_BATCH; - if (wake_batch > depth / SBQ_WAIT_QUEUES) - wake_batch = max(1U, depth / SBQ_WAIT_QUEUES); + shallow_depth = min(1U << sbq->sb.shift, sbq->min_shallow_depth); + depth = ((depth >> sbq->sb.shift) * shallow_depth + + min(depth & ((1U << sbq->sb.shift) - 1), shallow_depth)); + wake_batch = clamp_t(unsigned int, depth / SBQ_WAIT_QUEUES, 1, + SBQ_WAKE_BATCH); return wake_batch; } @@ -307,7 +322,8 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth, *per_cpu_ptr(sbq->alloc_hint, i) = prandom_u32() % depth; } - sbq->wake_batch = sbq_calc_wake_batch(depth); + sbq->min_shallow_depth = UINT_MAX; + sbq->wake_batch = sbq_calc_wake_batch(sbq, depth); atomic_set(&sbq->wake_index, 0); sbq->ws = kzalloc_node(SBQ_WAIT_QUEUES * sizeof(*sbq->ws), flags, node); @@ -327,9 +343,10 @@ int sbitmap_queue_init_node(struct sbitmap_queue *sbq, unsigned int depth, } EXPORT_SYMBOL_GPL(sbitmap_queue_init_node); -void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth) +static void sbitmap_queue_update_wake_batch(struct sbitmap_queue *sbq, + unsigned int depth) { - unsigned int wake_batch = sbq_calc_wake_batch(depth); + unsigned int wake_batch = sbq_calc_wake_batch(sbq, depth); int i; if (sbq->wake_batch != wake_batch) { @@ -342,6 +359,11 @@ void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth) for (i = 0; i < SBQ_WAIT_QUEUES; i++) atomic_set(&sbq->ws[i].wait_cnt, 1); } +} + +void sbitmap_queue_resize(struct sbitmap_queue *sbq, unsigned int depth) +{ + sbitmap_queue_update_wake_batch(sbq, depth); sbitmap_resize(&sbq->sb, depth); } EXPORT_SYMBOL_GPL(sbitmap_queue_resize); @@ -403,6 +425,14 @@ int __sbitmap_queue_get_shallow(struct sbitmap_queue *sbq, } EXPORT_SYMBOL_GPL(__sbitmap_queue_get_shallow); +void sbitmap_queue_min_shallow_depth(struct sbitmap_queue *sbq, + unsigned int min_shallow_depth) +{ + sbq->min_shallow_depth = min_shallow_depth; + sbitmap_queue_update_wake_batch(sbq, sbq->sb.depth); +} +EXPORT_SYMBOL_GPL(sbitmap_queue_min_shallow_depth); + static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq) { int i, wake_index; @@ -528,5 +558,6 @@ void sbitmap_queue_show(struct sbitmap_queue *sbq, struct seq_file *m) seq_puts(m, "}\n"); seq_printf(m, "round_robin=%d\n", sbq->round_robin); + seq_printf(m, "min_shallow_depth=%u\n", sbq->min_shallow_depth); } EXPORT_SYMBOL_GPL(sbitmap_queue_show); -- cgit v1.2.3-58-ga151 From 61445b56d031bc12feafb477848cf4ef9a725fc9 Mon Sep 17 00:00:00 2001 From: Omar Sandoval Date: Wed, 9 May 2018 17:29:24 -0700 Subject: sbitmap: warn if using smaller shallow depth than was setup Make sure the user passed the right value to sbitmap_queue_min_shallow_depth(). Acked-by: Paolo Valente Signed-off-by: Omar Sandoval Signed-off-by: Jens Axboe --- lib/sbitmap.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'lib') diff --git a/lib/sbitmap.c b/lib/sbitmap.c index d21473b42465..8f0950fbaa5c 100644 --- a/lib/sbitmap.c +++ b/lib/sbitmap.c @@ -402,6 +402,8 @@ int __sbitmap_queue_get_shallow(struct sbitmap_queue *sbq, unsigned int hint, depth; int nr; + WARN_ON_ONCE(shallow_depth < sbq->min_shallow_depth); + hint = this_cpu_read(*sbq->alloc_hint); depth = READ_ONCE(sbq->sb.depth); if (unlikely(hint >= depth)) { -- cgit v1.2.3-58-ga151 From c854ab5773be1c1a0d3cef0c3a3261f2c48ab7f8 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Mon, 14 May 2018 12:17:31 -0600 Subject: sbitmap: fix race in wait batch accounting If we have multiple callers of sbq_wake_up(), we can end up in a situation where the wait_cnt will continually go more and more negative. Consider the case where our wake batch is 1, hence wait_cnt will start out as 1. wait_cnt == 1 CPU0 CPU1 atomic_dec_return(), cnt == 0 atomic_dec_return(), cnt == -1 cmpxchg(-1, 0) (succeeds) [wait_cnt now 0] cmpxchg(0, 1) (fails) This ends up with wait_cnt being 0, we'll wakeup immediately next time. Going through the same loop as above again, and we'll have wait_cnt -1. For the case where we have a larger wake batch, the only difference is that the starting point will be higher. We'll still end up with continually smaller batch wakeups, which defeats the purpose of the rolling wakeups. Always reset the wait_cnt to the batch value. Then it doesn't matter who wins the race. But ensure that whomever does win the race is the one that increments the ws index and wakes up our batch count, loser gets to call __sbq_wake_up() again to account his wakeups towards the next active wait state index. Fixes: 6c0ca7ae292a ("sbitmap: fix wakeup hang after sbq resize") Reviewed-by: Omar Sandoval Signed-off-by: Jens Axboe --- lib/sbitmap.c | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) (limited to 'lib') diff --git a/lib/sbitmap.c b/lib/sbitmap.c index 8f0950fbaa5c..e6d7d610778d 100644 --- a/lib/sbitmap.c +++ b/lib/sbitmap.c @@ -457,7 +457,7 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq) return NULL; } -static void sbq_wake_up(struct sbitmap_queue *sbq) +static bool __sbq_wake_up(struct sbitmap_queue *sbq) { struct sbq_wait_state *ws; unsigned int wake_batch; @@ -474,28 +474,43 @@ static void sbq_wake_up(struct sbitmap_queue *sbq) ws = sbq_wake_ptr(sbq); if (!ws) - return; + return false; wait_cnt = atomic_dec_return(&ws->wait_cnt); if (wait_cnt <= 0) { + int ret; + wake_batch = READ_ONCE(sbq->wake_batch); + /* * Pairs with the memory barrier in sbitmap_queue_resize() to * ensure that we see the batch size update before the wait * count is reset. */ smp_mb__before_atomic(); + /* - * If there are concurrent callers to sbq_wake_up(), the last - * one to decrement the wait count below zero will bump it back - * up. If there is a concurrent resize, the count reset will - * either cause the cmpxchg to fail or overwrite after the - * cmpxchg. + * For concurrent callers of this, the one that failed the + * atomic_cmpxhcg() race should call this function again + * to wakeup a new batch on a different 'ws'. */ - atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wait_cnt + wake_batch); - sbq_index_atomic_inc(&sbq->wake_index); - wake_up_nr(&ws->wait, wake_batch); + ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch); + if (ret == wait_cnt) { + sbq_index_atomic_inc(&sbq->wake_index); + wake_up_nr(&ws->wait, wake_batch); + return false; + } + + return true; } + + return false; +} + +static void sbq_wake_up(struct sbitmap_queue *sbq) +{ + while (__sbq_wake_up(sbq)) + ; } void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr, -- cgit v1.2.3-58-ga151 From e6fc46498784e799d3eb95d83079180e413c4e7d Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Thu, 24 May 2018 11:00:39 -0600 Subject: blk-mq: avoid starving tag allocation after allocating process migrates When the allocation process is scheduled back and the mapped hw queue is changed, fake one extra wake up on previous queue for compensating wake up miss, so other allocations on the previous queue won't be starved. This patch fixes one request allocation hang issue, which can be triggered easily in case of very low nr_request. The race is as follows: 1) 2 hw queues, nr_requests are 2, and wake_batch is one 2) there are 3 waiters on hw queue 0 3) two in-flight requests in hw queue 0 are completed, and only two waiters of 3 are waken up because of wake_batch, but both the two waiters can be scheduled to another CPU and cause to switch to hw queue 1 4) then the 3rd waiter will wait for ever, since no in-flight request is in hw queue 0 any more. 5) this patch fixes it by the fake wakeup when waiter is scheduled to another hw queue Cc: Reviewed-by: Omar Sandoval Signed-off-by: Ming Lei Modified commit message to make it clearer, and make it apply on top of the 4.18 branch. Signed-off-by: Jens Axboe --- block/blk-mq-tag.c | 12 ++++++++++++ include/linux/sbitmap.h | 7 +++++++ lib/sbitmap.c | 29 +++++++++++++++-------------- 3 files changed, 34 insertions(+), 14 deletions(-) (limited to 'lib') diff --git a/block/blk-mq-tag.c b/block/blk-mq-tag.c index 336dde07b230..a4e58fc28a06 100644 --- a/block/blk-mq-tag.c +++ b/block/blk-mq-tag.c @@ -134,6 +134,8 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data) ws = bt_wait_ptr(bt, data->hctx); drop_ctx = data->ctx == NULL; do { + struct sbitmap_queue *bt_prev; + /* * We're out of tags on this hardware queue, kick any * pending IO submits before going to sleep waiting for @@ -159,6 +161,7 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data) if (data->ctx) blk_mq_put_ctx(data->ctx); + bt_prev = bt; io_schedule(); data->ctx = blk_mq_get_ctx(data->q); @@ -170,6 +173,15 @@ unsigned int blk_mq_get_tag(struct blk_mq_alloc_data *data) bt = &tags->bitmap_tags; finish_wait(&ws->wait, &wait); + + /* + * If destination hw queue is changed, fake wake up on + * previous queue for compensating the wake up miss, so + * other allocations on previous queue won't be starved. + */ + if (bt != bt_prev) + sbitmap_queue_wake_up(bt_prev); + ws = bt_wait_ptr(bt, data->hctx); } while (1); diff --git a/include/linux/sbitmap.h b/include/linux/sbitmap.h index 0c4a9c242dd7..e6539536dea9 100644 --- a/include/linux/sbitmap.h +++ b/include/linux/sbitmap.h @@ -512,6 +512,13 @@ static inline struct sbq_wait_state *sbq_wait_ptr(struct sbitmap_queue *sbq, */ void sbitmap_queue_wake_all(struct sbitmap_queue *sbq); +/** + * sbitmap_queue_wake_up() - Wake up some of waiters in one waitqueue + * on a &struct sbitmap_queue. + * @sbq: Bitmap queue to wake up. + */ +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq); + /** * sbitmap_queue_show() - Dump &struct sbitmap_queue information to a &struct * seq_file. diff --git a/lib/sbitmap.c b/lib/sbitmap.c index e6d7d610778d..6fdc6267f4a8 100644 --- a/lib/sbitmap.c +++ b/lib/sbitmap.c @@ -352,8 +352,9 @@ static void sbitmap_queue_update_wake_batch(struct sbitmap_queue *sbq, if (sbq->wake_batch != wake_batch) { WRITE_ONCE(sbq->wake_batch, wake_batch); /* - * Pairs with the memory barrier in sbq_wake_up() to ensure that - * the batch size is updated before the wait counts. + * Pairs with the memory barrier in sbitmap_queue_wake_up() + * to ensure that the batch size is updated before the wait + * counts. */ smp_mb__before_atomic(); for (i = 0; i < SBQ_WAIT_QUEUES; i++) @@ -463,15 +464,6 @@ static bool __sbq_wake_up(struct sbitmap_queue *sbq) unsigned int wake_batch; int wait_cnt; - /* - * Pairs with the memory barrier in set_current_state() to ensure the - * proper ordering of clear_bit()/waitqueue_active() in the waker and - * test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the - * waiter. See the comment on waitqueue_active(). This is __after_atomic - * because we just did clear_bit_unlock() in the caller. - */ - smp_mb__after_atomic(); - ws = sbq_wake_ptr(sbq); if (!ws) return false; @@ -507,17 +499,26 @@ static bool __sbq_wake_up(struct sbitmap_queue *sbq) return false; } -static void sbq_wake_up(struct sbitmap_queue *sbq) +void sbitmap_queue_wake_up(struct sbitmap_queue *sbq) { while (__sbq_wake_up(sbq)) ; } +EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up); void sbitmap_queue_clear(struct sbitmap_queue *sbq, unsigned int nr, unsigned int cpu) { sbitmap_clear_bit_unlock(&sbq->sb, nr); - sbq_wake_up(sbq); + /* + * Pairs with the memory barrier in set_current_state() to ensure the + * proper ordering of clear_bit_unlock()/waitqueue_active() in the waker + * and test_and_set_bit_lock()/prepare_to_wait()/finish_wait() in the + * waiter. See the comment on waitqueue_active(). + */ + smp_mb__after_atomic(); + sbitmap_queue_wake_up(sbq); + if (likely(!sbq->round_robin && nr < sbq->sb.depth)) *per_cpu_ptr(sbq->alloc_hint, cpu) = nr; } @@ -529,7 +530,7 @@ void sbitmap_queue_wake_all(struct sbitmap_queue *sbq) /* * Pairs with the memory barrier in set_current_state() like in - * sbq_wake_up(). + * sbitmap_queue_wake_up(). */ smp_mb(); wake_index = atomic_read(&sbq->wake_index); -- cgit v1.2.3-58-ga151