Re: [PATCH v3] locking/rwsem: Make handoff bit handling more consistent

From: Waiman Long
Date: Thu Nov 11 2021 - 23:09:46 EST



On 11/11/21 17:33, Waiman Long wrote:
There are some inconsistency in the way that the handoff bit is being
handled in readers and writers.

Firstly, when a queue head writer set the handoff bit, it will clear it
when the writer is being killed or interrupted on its way out without
acquiring the lock. That is not the case for a queue head reader. The
handoff bit will simply be inherited by the next waiter.

Secondly, in the out_nolock path of rwsem_down_read_slowpath(), both
the waiter and handoff bits are cleared if the wait queue becomes empty.
For rwsem_down_write_slowpath(), however, the handoff bit is not checked
and cleared if the wait queue is empty. This can potentially make the
handoff bit set with empty wait queue.

To make the handoff bit handling more consistent and robust,
extract out the rwsem flags handling code into a common
rwsem_out_nolock_clear_flags() function and call it from both the
reader and writer's out_nolock paths. The common function will only
use atomic_long_andnot() to clear bits to avoid possible race condition.
It will also let the next waiter inherit the handoff bit if it had been
set.

This will elminate the handoff bit set with empty wait queue case as
well as the possible race condition that may screw up the count value.

While at it, simplify the trylock for loop in rwsem_down_write_slowpath()
to make it easier to read.

Fixes: 4f23dbc1e657 ("locking/rwsem: Implement lock handoff to prevent lock starvation")
Suggested-by: Peter Zijlstra <peterz@xxxxxxxxxxxxx>
Signed-off-by: Waiman Long <longman@xxxxxxxxxx>
---
kernel/locking/rwsem.c | 117 ++++++++++++++++-------------------------
1 file changed, 44 insertions(+), 73 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index c51387a43265..d587acd1142b 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -104,10 +104,11 @@
* atomic_long_fetch_add() is used to obtain reader lock, whereas
* atomic_long_cmpxchg() will be used to obtain writer lock.
*
- * There are three places where the lock handoff bit may be set or cleared.
- * 1) rwsem_mark_wake() for readers.
- * 2) rwsem_try_write_lock() for writers.
- * 3) Error path of rwsem_down_write_slowpath().
+ * There are four places where the lock handoff bit may be set or cleared.
+ * 1) rwsem_mark_wake() for readers -- set, clear
+ * 2) rwsem_try_write_lock() for writers -- set, clear
+ * 3) Error path of rwsem_down_write_slowpath() -- clear
+ * 4) Error path of rwsem_down_read_slowpath() -- clear
*
* For all the above cases, wait_lock will be held. A writer must also
* be the first one in the wait_list to be eligible for setting the handoff
@@ -334,6 +335,9 @@ struct rwsem_waiter {
struct task_struct *task;
enum rwsem_waiter_type type;
unsigned long timeout;
+
+ /* Writer only, not initialized in reader */
+ bool handoff_set;
};
#define rwsem_first_waiter(sem) \
list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
@@ -344,12 +348,6 @@ enum rwsem_wake_type {
RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */
};
-enum writer_wait_state {
- WRITER_NOT_FIRST, /* Writer is not first in wait list */
- WRITER_FIRST, /* Writer is first in wait list */
- WRITER_HANDOFF /* Writer is first & handoff needed */
-};
-
/*
* The typical HZ value is either 250 or 1000. So set the minimum waiting
* time to at least 4ms or 1 jiffy (if it is higher than 4ms) in the wait
@@ -531,13 +529,11 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
* This function must be called with the sem->wait_lock held to prevent
* race conditions between checking the rwsem wait list and setting the
* sem->count accordingly.
- *
- * If wstate is WRITER_HANDOFF, it will make sure that either the handoff
- * bit is set or the lock is acquired with handoff bit cleared.
*/
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
- enum writer_wait_state wstate)
+ struct rwsem_waiter *waiter)
{
+ bool first = rwsem_first_waiter(sem) == waiter;
long count, new;
lockdep_assert_held(&sem->wait_lock);
@@ -546,13 +542,15 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
do {
bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
- if (has_handoff && wstate == WRITER_NOT_FIRST)
+ if (has_handoff && !first)
return false;
new = count;
if (count & RWSEM_LOCK_MASK) {
- if (has_handoff || (wstate != WRITER_HANDOFF))
+ waiter->handoff_set = true;

Sorry, the setting of the handoff_set flag isn't quite right here. Will send another fixed version out.

Cheers,
Longman