Re: [PATCH 11/12] rwsem: wake all readers when first waiter is areader

From: Peter Hurley
Date: Mon Mar 11 2013 - 16:37:32 EST



On Wed, 2013-03-06 at 15:21 -0800, Michel Lespinasse wrote:
> + retry_reader_grants:
> + oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
> + if (unlikely(oldcount < RWSEM_WAITING_BIAS)) {
> + /* A writer stole the lock. Undo our reader grants. */
> + if (rwsem_atomic_update(-adjustment, sem) < RWSEM_WAITING_BIAS)
> + goto out;
> + /* The writer left. Retry waking readers. */
> + goto retry_reader_grants;
> + }

This can be reduced to single looping cmpxchg in the grant reversal
path; then if reversing the grant fails, the count can simply be
re-tested for grant success, rather than trying to atomically re-grant.
For example, with a helper function, rwsem_cmpxchg():

static inline int rwsem_cmpxchg(long *old, long new, struct rw_semaphore *sem)
{
long tmp = *old;
*old = cmpxchg(&sem->count, *old, new);
return tmp == *old;
}

... then above becomes ...

count = rwsem_atomic_update(adjustment, sem);
do {
if (count - adjustment >= RWSEM_WAITING_BIAS)
break;
if (rwsem_cmpxchg(&count, count - adjustment, sem))
goto out; /* or simply return sem */
} while (1);

< wake up readers >


Also, this series and the original rwsem can mistakenly sleep reader(s)
when the lock is transitioned from writer-owned to waiting readers-owned
with no waiting writers. For example,


CPU 0 | CPU 1
|
| down_write()

... CPU 1 has the write lock for the semaphore.
Meanwhile, 1 or more down_read(s) are attempted and fail;
these are put on the wait list. Then ...

down_read() | up_write()
local = atomic_update(+read_bias) |
local <= 0? | local = atomic_update(-write_bias)
if (true) | local < 0?
down_read_failed() | if (true)
| wake()
| grab wait_lock
wait for wait_lock | wake all readers
| release wait_lock

... At this point, sem->count > 0 and the wait list is empty,
but down_read_failed() will sleep the reader.


In this case, CPU 0 has observed the sem count with the write lock (and
the other waiters) and so is detoured to down_read_failed(). But if
CPU 0 can't grab the wait_lock before the up_write() does (via
rwsem_wake()), then down_read_failed() will wake no one and sleep the
reader.

Unfortunately, this means readers and writers which observe the sem
count after the adjustment is committed by CPU 0 in down_read_failed()
will sleep as well, until the sem count returns to 0.

I think the best solution would be to preserve the observed count when
down_read() fails and pass it to rwsem_down_read_failed() -- of course,
this is also the most disruptive approach as it changes the per-arch
interface (the attached patch does not include the arch changes). The
other alternative is to go through the __rwsem_do_wake() path.

Regards,
Peter Hurley

--- >% ---
Subject: [PATCH] rwsem: Early-out tardy readers


Signed-off-by: Peter Hurley <peter@xxxxxxxxxxxxxxxxxx>
---
lib/rwsem.c | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)

diff --git a/lib/rwsem.c b/lib/rwsem.c
index f9a5705..8eb2cdf 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -118,12 +118,11 @@ __rwsem_do_wake(struct rw_semaphore *sem, bool wakewrite)
/*
* wait for the read lock to be granted
*/
-struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
+struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem, long count)
{
signed long adjustment = -RWSEM_ACTIVE_READ_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
- signed long count;

/* set up my own style of waitqueue */
waiter.task = tsk;
@@ -131,6 +130,20 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
get_task_struct(tsk);

raw_spin_lock_irq(&sem->wait_lock);
+
+ /* Try to reverse the lock attempt but if the count has changed
+ * so that reversing fails, check if there are are no waiters,
+ * and early-out if not */
+ do {
+ if (rwsem_cmpxchg(&count, count + adjust, sem))
+ break;
+ if (count > 0) {
+ raw_spin_unlock_irq(&sem->wait_lock);
+ put_task_struct(tsk);
+ return sem;
+ }
+ } while (1);
+
sem->wait_readers++;
if (list_empty(&sem->wait_list))
adjustment += RWSEM_WAITING_BIAS;
--
1.8.1.2




--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/