[PATCH-tip v2 02/12] locking/rwsem: Implement lock handoff to prevent lock starvation

From: Waiman Long
Date: Fri Apr 05 2019 - 15:22:55 EST


Because of writer lock stealing, it is possible that a constant
stream of incoming writers will cause a waiting writer or reader to
wait indefinitely leading to lock starvation.

The mutex code has a lock handoff mechanism to prevent lock starvation.
This patch implements a similar lock handoff mechanism to disable
lock stealing and force lock handoff to the first waiter in the queue
after at least a 5ms waiting period. The waiting period is used to
avoid discouraging lock stealing too much to affect performance.

A rwsem microbenchmark was run for 5 seconds on a 8-socket 120-core
240-thread IvyBridge-EX system with a v5.1 based kernel and 240
write_lock threads with 5us sleep critical section.

Before the patch, the min/mean/max numbers of locking operations for the
locking threads were 1/2,433/320,955. After the patch, the figures became
891/1,807/3,174. It can be seen that the rwsem became much more fair,
though there was a drop of about 26% in the mean locking operations
done which was a tradeoff of having better fairness.

Signed-off-by: Waiman Long <longman@xxxxxxxxxx>
---
kernel/locking/lock_events_list.h | 2 +
kernel/locking/rwsem-xadd.c | 154 ++++++++++++++++++++++--------
kernel/locking/rwsem.h | 23 +++--
3 files changed, 134 insertions(+), 45 deletions(-)

diff --git a/kernel/locking/lock_events_list.h b/kernel/locking/lock_events_list.h
index ad7668cfc9da..29e5c52197fa 100644
--- a/kernel/locking/lock_events_list.h
+++ b/kernel/locking/lock_events_list.h
@@ -61,7 +61,9 @@ LOCK_EVENT(rwsem_opt_fail) /* # of failed opt-spinnings */
LOCK_EVENT(rwsem_rlock) /* # of read locks acquired */
LOCK_EVENT(rwsem_rlock_fast) /* # of fast read locks acquired */
LOCK_EVENT(rwsem_rlock_fail) /* # of failed read lock acquisitions */
+LOCK_EVENT(rwsem_rlock_handoff) /* # of read lock handoffs */
LOCK_EVENT(rwsem_rtrylock) /* # of read trylock calls */
LOCK_EVENT(rwsem_wlock) /* # of write locks acquired */
LOCK_EVENT(rwsem_wlock_fail) /* # of failed write lock acquisitions */
+LOCK_EVENT(rwsem_wlock_handoff) /* # of write lock handoffs */
LOCK_EVENT(rwsem_wtrylock) /* # of write trylock calls */
diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c
index adab6477be51..58b3a64e6f2c 100644
--- a/kernel/locking/rwsem-xadd.c
+++ b/kernel/locking/rwsem-xadd.c
@@ -73,6 +73,7 @@ struct rwsem_waiter {
struct list_head list;
struct task_struct *task;
enum rwsem_waiter_type type;
+ unsigned long timeout;
};

enum rwsem_wake_type {
@@ -81,6 +82,12 @@ enum rwsem_wake_type {
RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */
};

+/*
+ * The minimum waiting time (5ms) in the wait queue before initiating the
+ * handoff protocol.
+ */
+#define RWSEM_WAIT_TIMEOUT ((HZ - 1)/200 + 1)
+
/*
* handle the lock release when processes blocked on it that can now run
* - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
@@ -131,6 +138,15 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
adjustment = RWSEM_READER_BIAS;
oldcount = atomic_long_fetch_add(adjustment, &sem->count);
if (unlikely(oldcount & RWSEM_WRITER_MASK)) {
+ /*
+ * Initiate handoff to reader, if applicable.
+ */
+ if (!(oldcount & RWSEM_FLAG_HANDOFF) &&
+ time_after(jiffies, waiter->timeout)) {
+ adjustment -= RWSEM_FLAG_HANDOFF;
+ lockevent_inc(rwsem_rlock_handoff);
+ }
+
atomic_long_sub(adjustment, &sem->count);
return;
}
@@ -179,6 +195,12 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
adjustment -= RWSEM_FLAG_WAITERS;
}

+ /*
+ * Clear the handoff flag
+ */
+ if (woken && RWSEM_COUNT_HANDOFF(atomic_long_read(&sem->count)))
+ adjustment -= RWSEM_FLAG_HANDOFF;
+
if (adjustment)
atomic_long_add(adjustment, &sem->count);
}
@@ -188,15 +210,19 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
* race conditions between checking the rwsem wait list and setting the
* sem->count accordingly.
*/
-static inline bool rwsem_try_write_lock(long count, struct rw_semaphore *sem)
+static inline bool
+rwsem_try_write_lock(long count, struct rw_semaphore *sem, bool first)
{
long new;

if (RWSEM_COUNT_LOCKED(count))
return false;

- new = count + RWSEM_WRITER_LOCKED -
- (list_is_singular(&sem->wait_list) ? RWSEM_FLAG_WAITERS : 0);
+ if (!first && RWSEM_COUNT_HANDOFF(count))
+ return false;
+
+ new = (count & ~RWSEM_FLAG_HANDOFF) + RWSEM_WRITER_LOCKED -
+ (list_is_singular(&sem->wait_list) ? RWSEM_FLAG_WAITERS : 0);

if (atomic_long_try_cmpxchg_acquire(&sem->count, &count, new)) {
rwsem_set_owner(sem);
@@ -214,7 +240,7 @@ static inline bool rwsem_try_write_lock_unqueued(struct rw_semaphore *sem)
{
long count = atomic_long_read(&sem->count);

- while (!RWSEM_COUNT_LOCKED(count)) {
+ while (!RWSEM_COUNT_LOCKED_OR_HANDOFF(count)) {
if (atomic_long_try_cmpxchg_acquire(&sem->count, &count,
count + RWSEM_WRITER_LOCKED)) {
rwsem_set_owner(sem);
@@ -367,6 +393,16 @@ static inline bool rwsem_has_spinner(struct rw_semaphore *sem)
}
#endif

+/*
+ * This is safe to be called without holding the wait_lock.
+ */
+static inline bool
+rwsem_waiter_is_first(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
+{
+ return list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
+ == waiter;
+}
+
/*
* Wait for the read lock to be granted
*/
@@ -379,16 +415,18 @@ __rwsem_down_read_failed_common(struct rw_semaphore *sem, int state)

waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_READ;
+ waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;

raw_spin_lock_irq(&sem->wait_lock);
if (list_empty(&sem->wait_list)) {
/*
* In case the wait queue is empty and the lock isn't owned
- * by a writer, this reader can exit the slowpath and return
- * immediately as its RWSEM_READER_BIAS has already been
- * set in the count.
+ * by a writer or has the handoff bit set, this reader can
+ * exit the slowpath and return immediately as its
+ * RWSEM_READER_BIAS has already been set in the count.
*/
- if (!(atomic_long_read(&sem->count) & RWSEM_WRITER_MASK)) {
+ if (!(atomic_long_read(&sem->count) &
+ (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
raw_spin_unlock_irq(&sem->wait_lock);
rwsem_set_reader_owned(sem);
lockevent_inc(rwsem_rlock_fast);
@@ -436,7 +474,8 @@ __rwsem_down_read_failed_common(struct rw_semaphore *sem, int state)
out_nolock:
list_del(&waiter.list);
if (list_empty(&sem->wait_list))
- atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
+ atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
+ &sem->count);
raw_spin_unlock_irq(&sem->wait_lock);
__set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock_fail);
@@ -464,7 +503,7 @@ static inline struct rw_semaphore *
__rwsem_down_write_failed_common(struct rw_semaphore *sem, int state)
{
long count;
- bool waiting = true; /* any queued threads before us */
+ int first; /* First one in queue (>1 if handoff set) */
struct rwsem_waiter waiter;
struct rw_semaphore *ret = sem;
DEFINE_WAKE_Q(wake_q);
@@ -479,56 +518,63 @@ __rwsem_down_write_failed_common(struct rw_semaphore *sem, int state)
*/
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE;
+ waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;

raw_spin_lock_irq(&sem->wait_lock);

/* account for this before adding a new element to the list */
- if (list_empty(&sem->wait_list))
- waiting = false;
+ first = list_empty(&sem->wait_list);

list_add_tail(&waiter.list, &sem->wait_list);

/* we're now waiting on the lock */
- if (waiting) {
+ if (!first) {
count = atomic_long_read(&sem->count);

/*
- * If there were already threads queued before us and there are
- * no active writers and some readers, the lock must be read
- * owned; so we try to any read locks that were queued ahead
- * of us.
+ * If there were already threads queued before us and:
+ * 1) there are no no active locks, wake the front
+ * queued process(es) as the handoff bit might be set.
+ * 2) there are no active writers and some readers, the lock
+ * must be read owned; so we try to wake any read lock
+ * waiters that were queued ahead of us.
*/
- if (!(count & RWSEM_WRITER_MASK) &&
- (count & RWSEM_READER_MASK)) {
+ if (!RWSEM_COUNT_LOCKED(count))
+ __rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
+ else if (!(count & RWSEM_WRITER_MASK) &&
+ (count & RWSEM_READER_MASK))
__rwsem_mark_wake(sem, RWSEM_WAKE_READERS, &wake_q);
- /*
- * The wakeup is normally called _after_ the wait_lock
- * is released, but given that we are proactively waking
- * readers we can deal with the wake_q overhead as it is
- * similar to releasing and taking the wait_lock again
- * for attempting rwsem_try_write_lock().
- */
- wake_up_q(&wake_q);
+ else
+ goto wait;

- /*
- * Reinitialize wake_q after use.
- */
- wake_q_init(&wake_q);
- }
+ /*
+ * The wakeup is normally called _after_ the wait_lock
+ * is released, but given that we are proactively waking
+ * readers we can deal with the wake_q overhead as it is
+ * similar to releasing and taking the wait_lock again
+ * for attempting rwsem_try_write_lock().
+ */
+ wake_up_q(&wake_q);

+ /*
+ * Reinitialize wake_q after use.
+ */
+ wake_q_init(&wake_q);
} else {
count = atomic_long_add_return(RWSEM_FLAG_WAITERS, &sem->count);
}

+wait:
/* wait until we successfully acquire the lock */
set_current_state(state);
while (true) {
- if (rwsem_try_write_lock(count, sem))
+ if (rwsem_try_write_lock(count, sem, first))
break;
+
raw_spin_unlock_irq(&sem->wait_lock);

/* Block until there are no active lockers. */
- do {
+ for (;;) {
if (signal_pending_state(state, current))
goto out_nolock;

@@ -536,7 +582,31 @@ __rwsem_down_write_failed_common(struct rw_semaphore *sem, int state)
lockevent_inc(rwsem_sleep_writer);
set_current_state(state);
count = atomic_long_read(&sem->count);
- } while (RWSEM_COUNT_LOCKED(count));
+
+ if (!first)
+ first = rwsem_waiter_is_first(sem, &waiter);
+
+ if (!RWSEM_COUNT_LOCKED(count))
+ break;
+
+ if (first && !RWSEM_COUNT_HANDOFF(count) &&
+ time_after(jiffies, waiter.timeout)) {
+ atomic_long_or(RWSEM_FLAG_HANDOFF, &sem->count);
+ first++;
+
+ /*
+ * Make sure the handoff bit is seen by
+ * others before proceeding.
+ */
+ smp_mb__after_atomic();
+ lockevent_inc(rwsem_wlock_handoff);
+ /*
+ * Do a trylock after setting the handoff
+ * flag to avoid missed wakeup.
+ */
+ break;
+ }
+ }

raw_spin_lock_irq(&sem->wait_lock);
}
@@ -551,6 +621,12 @@ __rwsem_down_write_failed_common(struct rw_semaphore *sem, int state)
__set_current_state(TASK_RUNNING);
raw_spin_lock_irq(&sem->wait_lock);
list_del(&waiter.list);
+ /*
+ * If handoff bit is set by this waiter, make sure that the clearing
+ * of the handoff bit is seen by all before proceeding.
+ */
+ if (unlikely(first > 1))
+ atomic_long_add_return(-RWSEM_FLAG_HANDOFF, &sem->count);
if (list_empty(&sem->wait_list))
atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
else
@@ -581,7 +657,7 @@ EXPORT_SYMBOL(rwsem_down_write_failed_killable);
* - up_read/up_write has decremented the active part of count if we come here
*/
__visible
-struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem)
+struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem, long count)
{
unsigned long flags;
DEFINE_WAKE_Q(wake_q);
@@ -614,7 +690,9 @@ struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem)
smp_rmb();

/*
- * If a spinner is present, it is not necessary to do the wakeup.
+ * If a spinner is present and the handoff flag isn't set, it is
+ * not necessary to do the wakeup.
+ *
* Try to do wakeup only if the trylock succeeds to minimize
* spinlock contention which may introduce too much delay in the
* unlock operation.
@@ -633,7 +711,7 @@ struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem)
* rwsem_has_spinner() is true, it will guarantee at least one
* trylock attempt on the rwsem later on.
*/
- if (rwsem_has_spinner(sem)) {
+ if (rwsem_has_spinner(sem) && !RWSEM_COUNT_HANDOFF(count)) {
/*
* The smp_rmb() here is to make sure that the spinner
* state is consulted before reading the wait_lock.
diff --git a/kernel/locking/rwsem.h b/kernel/locking/rwsem.h
index a49ebce1b4ab..2befb5ab1181 100644
--- a/kernel/locking/rwsem.h
+++ b/kernel/locking/rwsem.h
@@ -45,7 +45,8 @@
*
* Bit 0 - writer locked bit
* Bit 1 - waiters present bit
- * Bits 2-7 - reserved
+ * Bit 2 - lock handoff bit
+ * Bits 3-7 - reserved
* Bits 8-X - 24-bit (32-bit) or 56-bit reader count
*
* atomic_long_fetch_add() is used to obtain reader lock, whereas
@@ -53,14 +54,20 @@
*/
#define RWSEM_WRITER_LOCKED (1UL << 0)
#define RWSEM_FLAG_WAITERS (1UL << 1)
+#define RWSEM_FLAG_HANDOFF (1UL << 2)
+
#define RWSEM_READER_SHIFT 8
#define RWSEM_READER_BIAS (1UL << RWSEM_READER_SHIFT)
#define RWSEM_READER_MASK (~(RWSEM_READER_BIAS - 1))
#define RWSEM_WRITER_MASK RWSEM_WRITER_LOCKED
#define RWSEM_LOCK_MASK (RWSEM_WRITER_MASK|RWSEM_READER_MASK)
-#define RWSEM_READ_FAILED_MASK (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS)
+#define RWSEM_READ_FAILED_MASK (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|\
+ RWSEM_FLAG_HANDOFF)

#define RWSEM_COUNT_LOCKED(c) ((c) & RWSEM_LOCK_MASK)
+#define RWSEM_COUNT_HANDOFF(c) ((c) & RWSEM_FLAG_HANDOFF)
+#define RWSEM_COUNT_LOCKED_OR_HANDOFF(c) \
+ ((c) & (RWSEM_LOCK_MASK|RWSEM_FLAG_HANDOFF))

#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
/*
@@ -167,7 +174,7 @@ extern struct rw_semaphore *rwsem_down_read_failed(struct rw_semaphore *sem);
extern struct rw_semaphore *rwsem_down_read_failed_killable(struct rw_semaphore *sem);
extern struct rw_semaphore *rwsem_down_write_failed(struct rw_semaphore *sem);
extern struct rw_semaphore *rwsem_down_write_failed_killable(struct rw_semaphore *sem);
-extern struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem);
+extern struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem, long count);
extern struct rw_semaphore *rwsem_downgrade_wake(struct rw_semaphore *sem);

/*
@@ -265,7 +272,7 @@ static inline void __up_read(struct rw_semaphore *sem)
tmp = atomic_long_add_return_release(-RWSEM_READER_BIAS, &sem->count);
if (unlikely((tmp & (RWSEM_LOCK_MASK|RWSEM_FLAG_WAITERS))
== RWSEM_FLAG_WAITERS))
- rwsem_wake(sem);
+ rwsem_wake(sem, tmp);
}

/*
@@ -273,11 +280,13 @@ static inline void __up_read(struct rw_semaphore *sem)
*/
static inline void __up_write(struct rw_semaphore *sem)
{
+ long tmp;
+
DEBUG_RWSEMS_WARN_ON(sem->owner != current, sem);
rwsem_clear_owner(sem);
- if (unlikely(atomic_long_fetch_add_release(-RWSEM_WRITER_LOCKED,
- &sem->count) & RWSEM_FLAG_WAITERS))
- rwsem_wake(sem);
+ tmp = atomic_long_fetch_add_release(-RWSEM_WRITER_LOCKED, &sem->count);
+ if (unlikely(tmp & RWSEM_FLAG_WAITERS))
+ rwsem_wake(sem, tmp);
}

/*
--
2.18.1