[RFC][PATCH 3/3] locking/qspinlock: Optimize for x86

From: Peter Zijlstra
Date: Wed Sep 26 2018 - 07:30:01 EST


On x86 we cannot do fetch_or with a single instruction and end up
using a cmpxchg loop, this reduces determinism. Replace the fetch_or
with a very tricky composite xchg8 + load.

The basic idea is that we use xchg8 to test-and-set the pending bit
(when it is a byte) and then a load to fetch the whole word. Using
two instructions of course opens a window we previously did not have.
In particular the ordering between pending and tail is of interrest,
because that is where the split happens.

The claim is that if we order them, it all works out just fine. There
are two specific cases where the pending,tail state changes:

- when the 3rd lock(er) comes in and finds pending set, it'll queue
and set tail; since we set tail while pending is set, the ordering
is split is not important (and not fundamentally different form
fetch_or). [*]

- when the last queued lock holder acquires the lock (uncontended),
we clear the tail and set the lock byte. By first setting the
pending bit this cmpxchg will fail and the later load must then
see the remaining tail.

Another interesting scenario is where there are only 2 threads:

lock := (0,0,0)

CPU 0 CPU 1

lock() lock()
trylock(-> 0,0,1) trylock() /* fail */
return; xchg_relaxed(pending, 1) (-> 0,1,1)
mb()
val = smp_load_acquire(*lock);

Where, without the mb() the load would've been allowed to return 0 for
the locked byte.

[*] there is a fun scenario where the 3rd locker observes the pending
bit, but before it sets the tail, the 1st lock (owner) unlocks and the
2nd locker acquires the lock, leaving us with a temporary 0,0,1 state.
But this is not different between fetch_or or this new method.

Suggested-by: Will Deacon <will.deacon@xxxxxxx>
Signed-off-by: Peter Zijlstra (Intel) <peterz@xxxxxxxxxxxxx>
---
arch/x86/include/asm/qspinlock.h | 2 +
kernel/locking/qspinlock.c | 56 ++++++++++++++++++++++++++++++++++++++-
2 files changed, 57 insertions(+), 1 deletion(-)

--- a/arch/x86/include/asm/qspinlock.h
+++ b/arch/x86/include/asm/qspinlock.h
@@ -9,6 +9,8 @@

#define _Q_PENDING_LOOPS (1 << 9)

+#define _Q_NO_FETCH_OR
+
#ifdef CONFIG_PARAVIRT_SPINLOCKS
extern void native_queued_spin_lock_slowpath(struct qspinlock *lock, u32 val);
extern void __pv_init_lock_hash(void);
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -232,6 +232,60 @@ static __always_inline u32 xchg_tail(str
#endif /* _Q_PENDING_BITS == 8 */

/**
+ * set_pending_fetch_acquire - fetch the whole lock value and set pending and locked
+ * @lock : Pointer to queued spinlock structure
+ * Return: The previous lock value
+ *
+ * *,*,* -> *,1,*
+ */
+static __always_inline u32 set_pending_fetch_acquire(struct qspinlock *lock)
+{
+#if defined(_Q_NO_FETCH_OR) && _Q_PENDING_BITS == 8
+ u32 val;
+
+ /*
+ * For the !LL/SC archs that do not have a native atomic_fetch_or
+ * but do have a native xchg (x86) we can do trickery and avoid the
+ * cmpxchg-loop based fetch-or to improve determinism.
+ *
+ * We first xchg the pending byte to set PENDING, and then issue a load
+ * for the rest of the word and mask out the pending byte to compute a
+ * 'full' value.
+ */
+ val = xchg_relaxed(&lock->pending, 1) << _Q_PENDING_OFFSET;
+ /*
+ * Ensures the tail load happens after the xchg().
+ *
+ * lock unlock (a)
+ * xchg ---------------.
+ * (b) lock unlock +----- fetch_or
+ * load ---------------'
+ * lock unlock (c)
+ *
+ * For both lock and unlock, (a) and (c) are the same as fetch_or(),
+ * since either are fully before or after. The only new case is (b),
+ * where a concurrent lock or unlock can interleave with the composite
+ * operation.
+ *
+ * In either case, it is the queueing case that is of interrest, otherwise
+ * the whole operation is covered by the xchg() and the tail will be 0.
+ *
+ * For lock-(b); we only care if we set the PENDING bit or not. If we lost
+ * the PENDING race, we queue. Otherwise we'd observe the tail anyway.
+ *
+ * For unlock-(b); since we'll have set PENDING, the uncontended claim
+ * will fail and we'll observe a !0 tail.
+ */
+ smp_mb__after_atomic();
+ val |= atomic_read_acquire(&lock->val) & ~_Q_PENDING_MASK;
+
+ return val;
+#else
+ return atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);
+#endif
+}
+
+/**
* set_locked - Set the lock bit and own the lock
* @lock: Pointer to queued spinlock structure
*
@@ -328,7 +382,7 @@ void queued_spin_lock_slowpath(struct qs
*
* 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
*/
- val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);
+ val = set_pending_fetch_acquire(lock);

/*
* If we observe contention, there was a concurrent lock.