Re: [BUG]locking/rwsem: only clean RWSEM_FLAG_HANDOFF when already set

From: Waiman Long
Date: Thu Nov 11 2021 - 16:01:24 EST



On 11/11/21 15:26, Peter Zijlstra wrote:
On Thu, Nov 11, 2021 at 02:36:52PM -0500, Waiman Long wrote:

@@ -434,6 +430,7 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
if (!(oldcount & RWSEM_FLAG_HANDOFF) &&
time_after(jiffies, waiter->timeout)) {
adjustment -= RWSEM_FLAG_HANDOFF;
+ waiter->handoff_set = true;
lockevent_inc(rwsem_rlock_handoff);
}
Do we really need this flag? Wouldn't it be the same as waiter-is-first
AND sem-has-handoff ?
That is true. The only downside is that we have to read the count first in rwsem_out_nolock_clear_flags(). Since this is not a fast path, it should be OK to do that.
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
+ struct rwsem_waiter *waiter)
{
long count, new;
+ bool first = rwsem_first_waiter(sem) == waiter;
flip those lines for reverse xmas please
Sure, will do.

lockdep_assert_held(&sem->wait_lock);
@@ -546,13 +541,14 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
do {
bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
- if (has_handoff && wstate == WRITER_NOT_FIRST)
+ if (has_handoff && !first)
return false;
new = count;
if (count & RWSEM_LOCK_MASK) {
- if (has_handoff || (wstate != WRITER_HANDOFF))
+ if (has_handoff || (!waiter->rt_task &&
+ !time_after(jiffies, waiter->timeout)))

Does ->rt_task really help over rt_task(current) ? I suppose there's an
argument for locality, but that should be pretty much it, no?
Waiting for the timeout may introduce too much latency for RT task. That is the only reason I am doing it. I can take it out if you think it is not necessary.

return false;
new |= RWSEM_FLAG_HANDOFF;
@@ -889,6 +888,24 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
}
#endif
+/*
+ * Common code to handle rwsem flags in out_nolock path with wait_lock held.
+ */
+static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
+ struct rwsem_waiter *waiter)
+{
+ long flags = 0;
+
+ list_del(&waiter->list);
+ if (list_empty(&sem->wait_list))
+ flags = RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS;
+ else if (waiter->handoff_set)
+ flags = RWSEM_FLAG_HANDOFF;
+
+ if (flags)
+ atomic_long_andnot(flags, &sem->count);
+}
Right, so I like sharing this between the two _slowpath functions, that
makes sense.

The difference between this and my approach is that I unconditionally
clear HANDOFF when @waiter was the first. Because if it was set, it
must've been ours, and if it wasn't set, clearing it doesn't really hurt
much. This is an unlikely path, I don't think the potentially extra
atomic is an issue here.
That is true, we shouldn't worry too much about performance for this unlikely path. Will make the change.

+
/*
* Wait for the read lock to be granted
*/
@@ -936,6 +953,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_READ;
waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
+ waiter.handoff_set = false;
Forgot to set rt_task

We don't use rt_task for reader. It is writer only. I will document that.


raw_spin_lock_irq(&sem->wait_lock);
if (list_empty(&sem->wait_list)) {
@@ -1038,16 +1051,13 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE;
waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
Forget to set handoff_set
Yes, I was aware of that.

+ waiter.rt_task = rt_task(current);
raw_spin_lock_irq(&sem->wait_lock);
Again, I'm not convinced we need these variables.
I will take out handoff_set as suggested. I can can also take out rt_task if you don't think we need to test it.

@@ -1083,13 +1093,16 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
/* wait until we successfully acquire the lock */
set_current_state(state);
for (;;) {
- if (rwsem_try_write_lock(sem, wstate)) {
+ if (rwsem_try_write_lock(sem, &waiter)) {
/* rwsem_try_write_lock() implies ACQUIRE on success */
break;
}
raw_spin_unlock_irq(&sem->wait_lock);
+ if (signal_pending_state(state, current))
+ goto out_nolock;
+
/*
* After setting the handoff bit and failing to acquire
* the lock, attempt to spin on owner to accelerate lock
@@ -1098,7 +1111,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
* In this case, we attempt to acquire the lock again
* without sleeping.
*/
- if (wstate == WRITER_HANDOFF) {
+ if (waiter.handoff_set) {
enum owner_state owner_state;
preempt_disable();
Does it matter much if we spin-wait for every first or only for handoff?
Only for handoff as no other task will be spinning for the lock.

Either way around, I think spin-wait ought to terminate on sigpending
(same for mutex I suppose).

I am thinking about that too. Time for another followup patch, I think.

Cheers,
Longman