Re: [PATCH v4 14/16] locking/rwsem: Guard against making count negative

From: Waiman Long
Date: Sun Apr 21 2019 - 17:18:44 EST


On 4/19/19 3:39 PM, Waiman Long wrote:
> On 04/19/2019 09:15 AM, Peter Zijlstra wrote:
>> On Fri, Apr 19, 2019 at 03:03:04PM +0200, Peter Zijlstra wrote:
>>> On Fri, Apr 19, 2019 at 02:02:07PM +0200, Peter Zijlstra wrote:
>>>> On Fri, Apr 19, 2019 at 12:26:47PM +0200, Peter Zijlstra wrote:
>>>>> I thought of a horrible horrible alternative:
>>>> Hurm, that's broken as heck. Let me try again.
>>> So I can't make that scheme work, it all ends up wanting to have
>>> cmpxchg().
>>>
>>> Do we have a performance comparison somewhere of xadd vs cmpxchg
>>> readers? I tried looking in the old threads, but I can't seem to locate
>>> it.
>>>
>>> We need new instructions :/ Or more clever than I can muster just now.
>> In particular, an (unsigned) saturation arithmetic variant of XADD would
>> be very nice to have at this point.
> I just want to clear about my current scheme. There will be 16 bits
> allocated for reader count. I use the MS bit for signaling that there
> are too many readers. So the fast path will fail and the readers will be
> put into the wait list. This effectively limit readers to 32k-1, but it
> doesn't mean the actual reader count cannot go over that. As long as the
> actual count is less than 64k, everything should still work perfectly.
> IOW, even though we have reached the limit of 32k, we need to pile on an
> additional 32k readers to really overflow the count and cause problem.

How about the following chunks to disable preemption temporarily for the
increment-check-decrement sequence?

diff --git a/include/linux/preempt.h b/include/linux/preempt.h
index dd92b1a93919..4cc03ac66e13 100644
--- a/include/linux/preempt.h
+++ b/include/linux/preempt.h
@@ -250,6 +250,8 @@ do { \
Â#define preempt_enable_notrace()ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ barrier()
Â#define preemptible()ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ 0
Â
+#define __preempt_disable_nop /* preempt_disable() is nop */
+
Â#endif /* CONFIG_PREEMPT_COUNT */
Â
Â#ifdef MODULE
diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index 043fd29b7534..54029e6af17b 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -256,11 +256,64 @@ static inline struct task_struct
*rwsem_get_owner(struct r
ÂÂÂÂÂÂÂ return (struct task_struct *) (cowner
ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ ? cowner | (sowner & RWSEM_NONSPINNABLE) : sowner);
Â}
+
+/*
+ * If __preempt_disable_nop is defined, calling preempt_disable() and
+ * preempt_enable() directly is the most efficient way. Otherwise, it may
+ * be more efficient to disable and enable interrupt instead for disabling
+ * preemption tempoarily.
+ */
+#ifdef __preempt_disable_nop
+#define disable_preemption()ÂÂ preempt_disable()
+#define enable_preemption()ÂÂÂ preempt_enable()
+#else
+#define disable_preemption()ÂÂ local_irq_disable()
+#define enable_preemption()ÂÂÂ local_irq_enable()
+#endif
+
+/*
+ * When the owner task structure pointer is merged into couunt, less bits
+ * will be available for readers. Therefore, there is a very slight chance
+ * that the reader count may overflow. We try to prevent that from
happening
+ * by checking for the MS bit of the count and failing the trylock attempt
+ * if this bit is set.
+ *
+ * With preemption enabled, there is a remote possibility that preemption
+ * can happen in the narrow timing window between incrementing and
+ * decrementing the reader count and the task is put to sleep for a
+ * considerable amount of time. If sufficient number of such unfortunate
+ * sequence of events happen, we may still overflow the reader count.
+ * To avoid such possibility, we have to disable preemption for the
+ * whole increment-check-decrement sequence.
+ *
+ * The function returns true if there are too many readers and the count
+ * has already been properly decremented so the reader must go directly
+ * into the wait list.
+ */
+static inline bool rwsem_read_trylock(struct rw_semaphore *sem, long *cnt)
+{
+ÂÂÂÂÂÂ bool wait = false;ÂÂÂÂÂ /* Wait now flag */
+
+ÂÂÂÂÂÂ disable_preemption();
+ÂÂÂÂÂÂ *cnt = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
&sem->count);
+ÂÂÂÂÂÂ if (unlikely(*cnt < 0)) {
+ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
+ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ wait = true;
+ÂÂÂÂÂÂ }
+ÂÂÂÂÂÂ enable_preemption();
+ÂÂÂÂÂÂ return wait;
+}
Â#else /* !CONFIG_RWSEM_OWNER_COUNT */
Âstatic inline struct task_struct *rwsem_get_owner(struct rw_semaphore *sem)
Â{
ÂÂÂÂÂÂÂ return READ_ONCE(sem->owner);
Â}
+
+static inline bool rwsem_read_trylock(struct rw_semaphore *sem, long *cnt)
+{
+ÂÂÂÂÂÂ *cnt = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
&sem->count);
+ÂÂÂÂÂÂ return false;
+}
Â#endif /* CONFIG_RWSEM_OWNER_COUNT */
Â
Â/*
@@ -981,32 +1034,18 @@ static inline void clear_wr_nonspinnable(struct
rw_semaph
 * Wait for the read lock to be granted
 */
Âstatic struct rw_semaphore __sched *
-rwsem_down_read_slowpath(struct rw_semaphore *sem, int state, long count)
+rwsem_down_read_slowpath(struct rw_semaphore *sem, int state, const
bool wait)
Â{
-ÂÂÂÂÂÂ long adjustment = -RWSEM_READER_BIAS;
+ÂÂÂÂÂÂ long count, adjustment = -RWSEM_READER_BIAS;
ÂÂÂÂÂÂÂ bool wake = false;
ÂÂÂÂÂÂÂ struct rwsem_waiter waiter;
ÂÂÂÂÂÂÂ DEFINE_WAKE_Q(wake_q);
Â
-ÂÂÂÂÂÂ if (unlikely(count < 0)) {
+ÂÂÂÂÂÂ if (unlikely(wait)) {
ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ /*
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * The sign bit has been set meaning that too many active
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * readers are present. We need to decrement reader count &
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * enter wait queue immediately to avoid overflowing the
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * reader count.
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ *
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * As preemption is not disabled, there is a remote
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * possibility that preemption can happen in the narrow
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * timing window between incrementing and decrementing
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * the reader count and the task is put to sleep for a
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * considerable amount of time. If sufficient number
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * of such unfortunate sequence of events happen, we
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * may still overflow the reader count. It is extremely
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * unlikey, though. If this is a concern, we should consider
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * disable preemption during this timing window to make
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * sure that such unfortunate event will not happen.
+ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * The reader count has already been decremented and the
+ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ * reader should go directly into the wait list now.
ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ */
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ adjustment = 0;
ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ goto queue;
ÂÂÂÂÂÂÂ }
@@ -1358,11 +1397,12 @@ static struct rw_semaphore
*rwsem_downgrade_wake(struct
 */
Âinline void __down_read(struct rw_semaphore *sem)
Â{
-ÂÂÂÂÂÂ long tmp = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ &sem->count);
+ÂÂÂÂÂÂ long tmp;
+ÂÂÂÂÂÂ bool wait;
Â
+ÂÂÂÂÂÂ wait = rwsem_read_trylock(sem, &tmp);
ÂÂÂÂÂÂÂ if (unlikely(tmp & RWSEM_READ_FAILED_MASK)) {
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE, tmp);
+ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE, wait);
ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
ÂÂÂÂÂÂÂ } else {
ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ rwsem_set_reader_owned(sem);
@@ -1371,11 +1411,12 @@ inline void __down_read(struct rw_semaphore *sem)
Â
Âstatic inline int __down_read_killable(struct rw_semaphore *sem)
Â{
-ÂÂÂÂÂÂ long tmp = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS,
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ &sem->count);
+ÂÂÂÂÂÂ long tmp;
+ÂÂÂÂÂÂ bool wait;
Â
+ÂÂÂÂÂÂ wait = rwsem_read_trylock(sem, &tmp);
ÂÂÂÂÂÂÂ if (unlikely(tmp & RWSEM_READ_FAILED_MASK)) {
-ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ if (IS_ERR(rwsem_down_read_slowpath(sem, TASK_KILLABLE,
tmp)))
+ÂÂÂÂÂÂÂÂÂÂÂÂÂÂ if (IS_ERR(rwsem_down_read_slowpath(sem, TASK_KILLABLE,
wait)))
ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ return -EINTR;
ÂÂÂÂÂÂÂÂÂÂÂÂÂÂÂ DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem);
ÂÂÂÂÂÂÂ } else {