Re: [PATCH v2] locking/rwsem: add acquire barrier to read_slowpath exit when queue is empty

From: Peter Zijlstra
Date: Thu Jul 18 2019 - 06:58:28 EST


On Thu, Jul 18, 2019 at 10:26:41AM +0100, Will Deacon wrote:

> /*
> * We need to ensure ACQUIRE semantics when reading sem->count so that
> * we pair with the RELEASE store performed by an unlocking/downgrading
> * writer.
> *
> * P0 (writer) P1 (reader)
> *
> * down_write(sem);
> * <write shared data>
> * downgrade_write(sem);
> * -> fetch_add_release(&sem->count)
> *
> * down_read_slowpath(sem);
> * -> atomic_read(&sem->count)
> * <ctrl dep>
> * smp_acquire__after_ctrl_dep()
> * <read shared data>
> */

So I'm thinking all this is excessive; the simple rule is: lock acquire
should imply ACQUIRE, we all know why.

> In writing this, I also noticed that we don't have any explicit ordering
> at the end of the reader slowpath when we wait on the queue but get woken
> immediately:
>
> if (!waiter.task)
> break;
>
> Am I missing something?

Ha!, I ran into the very same one. I keep confusing myself, but I think
you're right and that needs to be smp_load_acquire() to match the
smp_store_release() in rwsem_mark_wake().

(the actual race there is _tiny_ due to the smp_mb() right before it,
but I cannot convince myself that is indeed sufficient)

The signal_pending_state() case is also fun, but I think wait_lock there
is sufficient (even under RCpc).

I've ended up with this..

---
diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index 37524a47f002..9eb630904a17 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -1000,6 +1000,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int state)
atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
adjustment = 0;
if (rwsem_optimistic_spin(sem, false)) {
+ /* rwsem_optimistic_spin() implies ACQUIRE through rwsem_*trylock() */
/*
* Wake up other readers in the wait list if the front
* waiter is a reader.
@@ -1014,6 +1015,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int state)
}
return sem;
} else if (rwsem_reader_phase_trylock(sem, waiter.last_rowner)) {
+ /* rwsem_reader_phase_trylock() implies ACQUIRE */
return sem;
}

@@ -1032,6 +1034,8 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int state)
*/
if (adjustment && !(atomic_long_read(&sem->count) &
(RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
+ /* Provide lock ACQUIRE */
+ smp_acquire__after_ctrl_dep();
raw_spin_unlock_irq(&sem->wait_lock);
rwsem_set_reader_owned(sem);
lockevent_inc(rwsem_rlock_fast);
@@ -1065,15 +1069,25 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int state)
wake_up_q(&wake_q);

/* wait to be given the lock */
- while (true) {
+ for (;;) {
set_current_state(state);
- if (!waiter.task)
+ if (!smp_load_acquire(&waiter.task)) {
+ /*
+ * Matches rwsem_mark_wake()'s smp_store_release() and ensures
+ * we're ordered against its sem->count operations.
+ */
break;
+ }
if (signal_pending_state(state, current)) {
raw_spin_lock_irq(&sem->wait_lock);
if (waiter.task)
goto out_nolock;
raw_spin_unlock_irq(&sem->wait_lock);
+ /*
+ * Ordered by sem->wait_lock against rwsem_mark_wake(), if we
+ * see its waiter.task store, we must also see its sem->count
+ * changes.
+ */
break;
}
schedule();
@@ -1083,6 +1097,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int state)
__set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock);
return sem;
+
out_nolock:
list_del(&waiter.list);
if (list_empty(&sem->wait_list)) {