[PATCH v2 2/5] locking/pvqspinlock: Fix missed PV wakeup problem

From: Waiman Long
Date: Tue May 31 2016 - 13:02:49 EST


Currently, calling pv_hash() and setting _Q_SLOW_VAL is only
done once for any pv_node. It is either in pv_kick_node() or in
pv_wait_head_or_lock(). Because of lock stealing, a pv_kick'ed node is
not guaranteed to get the lock before the spinning threshold expires
and has to call pv_wait() again. As a result, the new lock holder
won't see _Q_SLOW_VAL and so won't wake up the sleeping vCPU.

This patch fixes this missed PV wakeup problem by allowing multiple
_Q_SLOW_VAL settings within pv_wait_head_or_lock() and matching each
successful setting of _Q_SLOW_VAL to a pv_hash() call.

Reported-by: Pan Xinhui <xinhui.pan@xxxxxxxxxxxxxxxxxx>
Signed-off-by: Waiman Long <Waiman.Long@xxxxxxx>
---
kernel/locking/qspinlock_paravirt.h | 48 ++++++++++++++++++++++------------
1 files changed, 31 insertions(+), 17 deletions(-)

diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h
index 75cc207..3df975d 100644
--- a/kernel/locking/qspinlock_paravirt.h
+++ b/kernel/locking/qspinlock_paravirt.h
@@ -366,12 +366,16 @@ static void pv_kick_node(struct qspinlock *lock, struct mcs_spinlock *node)
/*
* Put the lock into the hash table and set the _Q_SLOW_VAL.
*
- * As this is the same vCPU that will check the _Q_SLOW_VAL value and
- * the hash table later on at unlock time, no atomic instruction is
- * needed.
+ * It is very unlikely that this will race with the _Q_SLOW_VAL setting
+ * in pv_wait_head_or_lock(). However, we use cmpxchg() here to be
+ * sure that we won't do a double pv_hash().
+ *
+ * As it is the lock holder, it won't race with
+ * __pv_queued_spin_unlock().
*/
- WRITE_ONCE(l->locked, _Q_SLOW_VAL);
- (void)pv_hash(lock, pn);
+ if (likely(cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_SLOW_VAL)
+ == _Q_LOCKED_VAL))
+ pv_hash(lock, pn);
}

/*
@@ -386,18 +390,10 @@ pv_wait_head_or_lock(struct qspinlock *lock, struct mcs_spinlock *node)
{
struct pv_node *pn = (struct pv_node *)node;
struct __qspinlock *l = (void *)lock;
- struct qspinlock **lp = NULL;
int waitcnt = 0;
int loop;

/*
- * If pv_kick_node() already advanced our state, we don't need to
- * insert ourselves into the hash table anymore.
- */
- if (READ_ONCE(pn->state) == vcpu_hashed)
- lp = (struct qspinlock **)1;
-
- /*
* Tracking # of slowpath locking operations
*/
qstat_inc(qstat_pv_lock_slowpath, true);
@@ -419,11 +415,19 @@ pv_wait_head_or_lock(struct qspinlock *lock, struct mcs_spinlock *node)
goto gotlock;
cpu_relax();
}
- clear_pending(lock);

+ /*
+ * Make sure the lock value check below is executed after
+ * all the previous loads.
+ */
+ smp_rmb();

- if (!lp) { /* ONCE */
- lp = pv_hash(lock, pn);
+ /*
+ * Set _Q_SLOW_VAL and hash the PV node, if necessary.
+ */
+ if (READ_ONCE(l->locked) != _Q_SLOW_VAL) {
+ struct qspinlock **lp = pv_hash(lock, pn);
+ u8 locked;

/*
* We must hash before setting _Q_SLOW_VAL, such that
@@ -436,7 +440,8 @@ pv_wait_head_or_lock(struct qspinlock *lock, struct mcs_spinlock *node)
*
* Matches the smp_rmb() in __pv_queued_spin_unlock().
*/
- if (xchg(&l->locked, _Q_SLOW_VAL) == 0) {
+ locked = xchg(&l->locked, _Q_SLOW_VAL);
+ if (locked == 0) {
/*
* The lock was free and now we own the lock.
* Change the lock value back to _Q_LOCKED_VAL
@@ -444,9 +449,18 @@ pv_wait_head_or_lock(struct qspinlock *lock, struct mcs_spinlock *node)
*/
WRITE_ONCE(l->locked, _Q_LOCKED_VAL);
WRITE_ONCE(*lp, NULL);
+ clear_pending(lock);
goto gotlock;
+ } else if (unlikely(locked == _Q_SLOW_VAL)) {
+ /*
+ * Racing with pv_kick_node(), need to undo
+ * the pv_hash().
+ */
+ WRITE_ONCE(*lp, NULL);
}
}
+ clear_pending(lock); /* Enable lock stealing */
+
WRITE_ONCE(pn->state, vcpu_halted);
qstat_inc(qstat_pv_wait_head, true);
qstat_inc(qstat_pv_wait_again, waitcnt);
--
1.7.1