Re: [PATCH] sbitmap: fix possible io hung due to lost wakeup

From: Jan Kara
Date: Wed Sep 07 2022 - 06:24:05 EST


On Tue 06-09-22 15:27:51, Keith Busch wrote:
> On Wed, Aug 03, 2022 at 08:15:04PM +0800, Yu Kuai wrote:
> > wait_cnt = atomic_dec_return(&ws->wait_cnt);
> > - if (wait_cnt <= 0) {
> > - int ret;
> > + /*
> > + * For concurrent callers of this, callers should call this function
> > + * again to wakeup a new batch on a different 'ws'.
> > + */
> > + if (wait_cnt < 0 || !waitqueue_active(&ws->wait))
> > + return true;
>
> If wait_cnt is '0', but the waitqueue_active happens to be false due to racing
> with add_wait_queue(), this returns true so the caller will retry.

Well, note that sbq_wake_ptr() called to obtain 'ws' did waitqueue_active()
check. So !waitqueue_active() should really happen only if waiter was woken
up by someone else or so. Not that it would matter much but I wanted to
point it out.

> The next atomic_dec will set the current waitstate wait_cnt < 0, which
> also forces an early return true. When does the wake up happen, or
> wait_cnt and wait_index get updated in that case?

I guess your concern could be rephrased as: Who's going to ever set
ws->wait_cnt to value > 0 if we ever exit with wait_cnt == 0 due to
!waitqueue_active() condition?

And that is a good question and I think that's a bug in this patch. I think
we need something like:

...
/*
* For concurrent callers of this, callers should call this function
* again to wakeup a new batch on a different 'ws'.
*/
if (wait_cnt < 0)
return true;
/*
* If we decremented queue without waiters, retry to avoid lost
* wakeups.
*/
if (wait_cnt > 0)
return !waitqueue_active(&ws->wait);

/*
* When wait_cnt == 0, we have to be particularly careful as we are
* responsible to reset wait_cnt regardless whether we've actually
* woken up anybody. But in case we didn't wakeup anybody, we still
* need to retry.
*/
ret = !waitqueue_active(&ws->wait);
wake_batch = READ_ONCE(sbq->wake_batch);
/*
* Wake up first in case that concurrent callers decrease wait_cnt
* while waitqueue is empty.
*/
wake_up_nr(&ws->wait, wake_batch);
...

return ret;

Does this fix your concern Keith?

Honza

>
>
> > - wake_batch = READ_ONCE(sbq->wake_batch);
> > + if (wait_cnt > 0)
> > + return false;
> >
> > - /*
> > - * Pairs with the memory barrier in sbitmap_queue_resize() to
> > - * ensure that we see the batch size update before the wait
> > - * count is reset.
> > - */
> > - smp_mb__before_atomic();
> > + wake_batch = READ_ONCE(sbq->wake_batch);
> >
> > - /*
> > - * For concurrent callers of this, the one that failed the
> > - * atomic_cmpxhcg() race should call this function again
> > - * to wakeup a new batch on a different 'ws'.
> > - */
> > - ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch);
> > - if (ret == wait_cnt) {
> > - sbq_index_atomic_inc(&sbq->wake_index);
> > - wake_up_nr(&ws->wait, wake_batch);
> > - return false;
> > - }
> > + /*
> > + * Wake up first in case that concurrent callers decrease wait_cnt
> > + * while waitqueue is empty.
> > + */
> > + wake_up_nr(&ws->wait, wake_batch);
> >
> > - return true;
> > - }
> > + /*
> > + * Pairs with the memory barrier in sbitmap_queue_resize() to
> > + * ensure that we see the batch size update before the wait
> > + * count is reset.
> > + *
> > + * Also pairs with the implicit barrier between decrementing wait_cnt
> > + * and checking for waitqueue_active() to make sure waitqueue_active()
> > + * sees result of the wakeup if atomic_dec_return() has seen the result
> > + * of atomic_set().
> > + */
> > + smp_mb__before_atomic();
> > +
> > + /*
> > + * Increase wake_index before updating wait_cnt, otherwise concurrent
> > + * callers can see valid wait_cnt in old waitqueue, which can cause
> > + * invalid wakeup on the old waitqueue.
> > + */
> > + sbq_index_atomic_inc(&sbq->wake_index);
> > + atomic_set(&ws->wait_cnt, wake_batch);
> >
> > return false;
> > }
> > --
> > 2.31.1
> >
--
Jan Kara <jack@xxxxxxxx>
SUSE Labs, CR