Re: [BUG]locking/rwsem: only clean RWSEM_FLAG_HANDOFF when already set

From: Waiman Long
Date: Thu Nov 11 2021 - 14:37:11 EST



On 11/11/21 14:20, Peter Zijlstra wrote:
On Thu, Nov 11, 2021 at 02:14:48PM -0500, Waiman Long wrote:
As for the PHASE_CHANGE name, we have to be consistent in both rwsem and
mutex. Maybe a follow up patch if you think we should change the
terminology.
Well, that's exactly the point, they do radically different things.
Having the same name for two different things is confusing.

Anyway, let me go read that patch you sent.

My understanding of handoff is to disable optimistic spinning to let waiters in the wait queue have an opportunity to acquire the lock. There are difference in details on how to do that in mutex and rwsem, though.

BTW, I have decided that we should further simply the trylock for loop in rwsem_down_write_slowpath() to make it easier to read. That is the only difference in the attached v2 patch compared with the previous one.

Cheers,
LongmaN
From 956fb4bf54e61735c0df1fedf97e59a1972efbca Mon Sep 17 00:00:00 2001
From: Waiman Long <longman@xxxxxxxxxx>
Date: Thu, 11 Nov 2021 14:27:35 -0500
Subject: [PATCH v2] locking/rwsem: Make handoff bit handling more consistent

There are some inconsistency in the way that the handoff bit is being
handled in readers and writers.

Firstly, when a queue head writer set the handoff bit, it will clear it
when the writer is being killed or interrupted on its way out without
acquiring the lock. That is not the case for a queue head reader. The
handoff bit will simply be inherited by the next waiter.

Secondly, in the out_nolock path of rwsem_down_read_slowpath(), both
the waiter and handoff bits are cleared if the wait queue becomes empty.
For rwsem_down_write_slowpath(), however, the handoff bit is not checked
and cleared if the wait queue is empty. This can potentially make the
handoff bit set with empty wait queue.

To make the handoff bit handling more consistent and robust, extract
out the rwsem flags handling code into a commont rwsem_out_nolock()
function and call it from both the reader and writer's out_nolock paths.
The common function will only use atomic_long_andnot() to clear bits
to avoid possible race condition.

This will elminate the handoff bit set with empty wait queue case as
well as the possible race condition that may screw up the count value.

More states are stored in rwsem_waiter structure and writer handoff bit
setting are all pushed to rwsem_try_write_lock(). This simplifies the
trylock for loop in rwsem_down_write_slowpath().

Fixes: 4f23dbc1e657 ("locking/rwsem: Implement lock handoff to prevent lock starvation")
Suggested-by: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
Signed-off-by: Waiman Long <longman@xxxxxxxxxx>
---
kernel/locking/rwsem.c | 121 ++++++++++++++++-------------------------
1 file changed, 48 insertions(+), 73 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index c51387a43265..a876a3acc383 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -104,10 +104,11 @@
* atomic_long_fetch_add() is used to obtain reader lock, whereas
* atomic_long_cmpxchg() will be used to obtain writer lock.
*
- * There are three places where the lock handoff bit may be set or cleared.
- * 1) rwsem_mark_wake() for readers.
- * 2) rwsem_try_write_lock() for writers.
- * 3) Error path of rwsem_down_write_slowpath().
+ * There are four places where the lock handoff bit may be set or cleared.
+ * 1) rwsem_mark_wake() for readers -- set, clear
+ * 2) rwsem_try_write_lock() for writers -- set, clear
+ * 3) Error path of rwsem_down_write_slowpath() -- clear
+ * 4) Error path of rwsem_down_read_slowpath() -- clear
*
* For all the above cases, wait_lock will be held. A writer must also
* be the first one in the wait_list to be eligible for setting the handoff
@@ -334,6 +335,7 @@ struct rwsem_waiter {
struct task_struct *task;
enum rwsem_waiter_type type;
unsigned long timeout;
+ bool handoff_set, rt_task;
};
#define rwsem_first_waiter(sem) \
list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
@@ -344,12 +346,6 @@ enum rwsem_wake_type {
RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */
};

-enum writer_wait_state {
- WRITER_NOT_FIRST, /* Writer is not first in wait list */
- WRITER_FIRST, /* Writer is first in wait list */
- WRITER_HANDOFF /* Writer is first & handoff needed */
-};
-
/*
* The typical HZ value is either 250 or 1000. So set the minimum waiting
* time to at least 4ms or 1 jiffy (if it is higher than 4ms) in the wait
@@ -434,6 +430,7 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
if (!(oldcount & RWSEM_FLAG_HANDOFF) &&
time_after(jiffies, waiter->timeout)) {
adjustment -= RWSEM_FLAG_HANDOFF;
+ waiter->handoff_set = true;
lockevent_inc(rwsem_rlock_handoff);
}

@@ -531,14 +528,12 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
* This function must be called with the sem->wait_lock held to prevent
* race conditions between checking the rwsem wait list and setting the
* sem->count accordingly.
- *
- * If wstate is WRITER_HANDOFF, it will make sure that either the handoff
- * bit is set or the lock is acquired with handoff bit cleared.
*/
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
- enum writer_wait_state wstate)
+ struct rwsem_waiter *waiter)
{
long count, new;
+ bool first = rwsem_first_waiter(sem) == waiter;

lockdep_assert_held(&sem->wait_lock);

@@ -546,13 +541,14 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
do {
bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);

- if (has_handoff && wstate == WRITER_NOT_FIRST)
+ if (has_handoff && !first)
return false;

new = count;

if (count & RWSEM_LOCK_MASK) {
- if (has_handoff || (wstate != WRITER_HANDOFF))
+ if (has_handoff || (!waiter->rt_task &&
+ !time_after(jiffies, waiter->timeout)))
return false;

new |= RWSEM_FLAG_HANDOFF;
@@ -569,8 +565,11 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
* We have either acquired the lock with handoff bit cleared or
* set the handoff bit.
*/
- if (new & RWSEM_FLAG_HANDOFF)
+ if (new & RWSEM_FLAG_HANDOFF) {
+ waiter->handoff_set = true;
+ lockevent_inc(rwsem_wlock_handoff);
return false;
+ }

rwsem_set_owner(sem);
return true;
@@ -889,6 +888,24 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
}
#endif

+/*
+ * Common code to handle rwsem flags in out_nolock path with wait_lock held.
+ */
+static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
+ struct rwsem_waiter *waiter)
+{
+ long flags = 0;
+
+ list_del(&waiter->list);
+ if (list_empty(&sem->wait_list))
+ flags = RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS;
+ else if (waiter->handoff_set)
+ flags = RWSEM_FLAG_HANDOFF;
+
+ if (flags)
+ atomic_long_andnot(flags, &sem->count);
+}
+
/*
* Wait for the read lock to be granted
*/
@@ -936,6 +953,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_READ;
waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
+ waiter.handoff_set = false;

raw_spin_lock_irq(&sem->wait_lock);
if (list_empty(&sem->wait_list)) {
@@ -1002,11 +1020,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
return sem;

out_nolock:
- list_del(&waiter.list);
- if (list_empty(&sem->wait_list)) {
- atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
- &sem->count);
- }
+ rwsem_out_nolock_clear_flags(sem, &waiter);
raw_spin_unlock_irq(&sem->wait_lock);
__set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock_fail);
@@ -1020,7 +1034,6 @@ static struct rw_semaphore *
rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
{
long count;
- enum writer_wait_state wstate;
struct rwsem_waiter waiter;
struct rw_semaphore *ret = sem;
DEFINE_WAKE_Q(wake_q);
@@ -1038,16 +1051,13 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE;
waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
+ waiter.rt_task = rt_task(current);

raw_spin_lock_irq(&sem->wait_lock);
-
- /* account for this before adding a new element to the list */
- wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;
-
list_add_tail(&waiter.list, &sem->wait_list);

/* we're now waiting on the lock */
- if (wstate == WRITER_NOT_FIRST) {
+ if (rwsem_first_waiter(sem) != &waiter) {
count = atomic_long_read(&sem->count);

/*
@@ -1083,13 +1093,16 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
/* wait until we successfully acquire the lock */
set_current_state(state);
for (;;) {
- if (rwsem_try_write_lock(sem, wstate)) {
+ if (rwsem_try_write_lock(sem, &waiter)) {
/* rwsem_try_write_lock() implies ACQUIRE on success */
break;
}

raw_spin_unlock_irq(&sem->wait_lock);

+ if (signal_pending_state(state, current))
+ goto out_nolock;
+
/*
* After setting the handoff bit and failing to acquire
* the lock, attempt to spin on owner to accelerate lock
@@ -1098,7 +1111,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
* In this case, we attempt to acquire the lock again
* without sleeping.
*/
- if (wstate == WRITER_HANDOFF) {
+ if (waiter.handoff_set) {
enum owner_state owner_state;

preempt_disable();
@@ -1109,40 +1122,9 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
goto trylock_again;
}

- /* Block until there are no active lockers. */
- for (;;) {
- if (signal_pending_state(state, current))
- goto out_nolock;
-
- schedule();
- lockevent_inc(rwsem_sleep_writer);
- set_current_state(state);
- /*
- * If HANDOFF bit is set, unconditionally do
- * a trylock.
- */
- if (wstate == WRITER_HANDOFF)
- break;
-
- if ((wstate == WRITER_NOT_FIRST) &&
- (rwsem_first_waiter(sem) == &waiter))
- wstate = WRITER_FIRST;
-
- count = atomic_long_read(&sem->count);
- if (!(count & RWSEM_LOCK_MASK))
- break;
-
- /*
- * The setting of the handoff bit is deferred
- * until rwsem_try_write_lock() is called.
- */
- if ((wstate == WRITER_FIRST) && (rt_task(current) ||
- time_after(jiffies, waiter.timeout))) {
- wstate = WRITER_HANDOFF;
- lockevent_inc(rwsem_wlock_handoff);
- break;
- }
- }
+ schedule();
+ lockevent_inc(rwsem_sleep_writer);
+ set_current_state(state);
trylock_again:
raw_spin_lock_irq(&sem->wait_lock);
}
@@ -1156,19 +1138,12 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
out_nolock:
__set_current_state(TASK_RUNNING);
raw_spin_lock_irq(&sem->wait_lock);
- list_del(&waiter.list);
-
- if (unlikely(wstate == WRITER_HANDOFF))
- atomic_long_add(-RWSEM_FLAG_HANDOFF, &sem->count);
-
- if (list_empty(&sem->wait_list))
- atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
- else
+ rwsem_out_nolock_clear_flags(sem, &waiter);
+ if (!list_empty(&sem->wait_list))
rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q);
lockevent_inc(rwsem_wlock_fail);
-
return ERR_PTR(-EINTR);
}

--
2.27.0