Re: [PATCH v2 11/13] rwsem: implement support for write lockstealing on the fastpath

From: Peter Hurley
Date: Thu Mar 28 2013 - 12:17:50 EST



On Fri, 2013-03-15 at 03:54 -0700, Michel Lespinasse wrote:
> When we decide to wake up readers, we must first grant them as many
> read locks as necessary, and then actually wake up all these readers.
> But in order to know how many read shares to grant, we must first
> count the readers at the head of the queue. This might take a while
> if there are many readers, and we want to be protected against a
> writer stealing the lock while we're counting. To that end, we grant
> the first reader lock before counting how many more readers are queued.
>
> We also require some adjustments to the wake_type semantics.
>
> RWSEM_WAKE_NO_ACTIVE used to mean that we had found the count to
> be RWSEM_WAITING_BIAS, in which case the rwsem was known to be free
> as nobody could steal it while we hold the wait_lock. This doesn't
> make sense once we implement fastpath write lock stealing, so we now
> use RWSEM_WAKE_ANY in that case.
>
> Similarly, when rwsem_down_write_failed found that a read lock was active,
> it would use RWSEM_WAKE_READ_OWNED which signalled that new readers could
> be woken without checking first that the rwsem was available. We can't do
> that anymore since the existing readers might release their read locks,
> and a writer could steal the lock before we wake up additional readers.
> So, we have to use a new RWSEM_WAKE_READERS value to indicate we only want
> to wake readers, but we don't currently hold any read lock.
>
> Signed-off-by: Michel Lespinasse <walken@xxxxxxxxxx>
>
> ---
> lib/rwsem.c | 63 ++++++++++++++++++++++++++++++-------------------------------
> 1 file changed, 31 insertions(+), 32 deletions(-)
>
> diff --git a/lib/rwsem.c b/lib/rwsem.c
> index 9a675fa9d78e..09bf03e7808c 100644
> --- a/lib/rwsem.c
> +++ b/lib/rwsem.c
> @@ -41,13 +41,11 @@ struct rwsem_waiter {
> enum rwsem_waiter_type type;
> };
>
> -/* Wake types for __rwsem_do_wake(). Note that RWSEM_WAKE_NO_ACTIVE and
> - * RWSEM_WAKE_READ_OWNED imply that the spinlock must have been kept held
> - * since the rwsem value was observed.
> - */
> -#define RWSEM_WAKE_ANY 0 /* Wake whatever's at head of wait list */
> -#define RWSEM_WAKE_NO_ACTIVE 1 /* rwsem was observed with no active thread */
> -#define RWSEM_WAKE_READ_OWNED 2 /* rwsem was observed to be read owned */
> +enum rwsem_wake_type {
> + RWSEM_WAKE_ANY, /* Wake whatever's at head of wait list */
> + RWSEM_WAKE_READERS, /* Wake readers only */
> + RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */
> +};
>
> /*
> * handle the lock release when processes blocked on it that can now run
> @@ -60,16 +58,16 @@ struct rwsem_waiter {
> * - writers are only woken if downgrading is false
> */
> static struct rw_semaphore *
> -__rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
> +__rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
> {
> struct rwsem_waiter *waiter;
> struct task_struct *tsk;
> struct list_head *next;
> - signed long woken, loop, adjustment;
> + signed long oldcount, woken, loop, adjustment;
>
> waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
> if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
> - if (wake_type != RWSEM_WAKE_READ_OWNED)
> + if (wake_type == RWSEM_WAKE_ANY)
> /* Wake writer at the front of the queue, but do not
> * grant it the lock yet as we want other writers
> * to be able to steal it. Readers, on the other hand,
> @@ -79,24 +77,24 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
> goto out;
> }
>
> - /* If we come here from up_xxxx(), another thread might have reached
> - * rwsem_down_failed_common() before we acquired the spinlock and
> - * woken up a waiter, making it now active. We prefer to check for
> - * this first in order to not spend too much time with the spinlock
> - * held if we're not going to be able to wake up readers in the end.
> - *
> - * Note that we do not need to update the rwsem count: any writer
> - * trying to acquire rwsem will run rwsem_down_write_failed() due
> - * to the waiting threads and block trying to acquire the spinlock.
> - *
> - * We use a dummy atomic update in order to acquire the cache line
> - * exclusively since we expect to succeed and run the final rwsem
> - * count adjustment pretty soon.
> + /* Writers might steal the lock before we grant it to the next reader.
> + * We prefer to do the first reader grant before counting readers
> + * so we can bail out early if a writer stole the lock.
> */
> - if (wake_type == RWSEM_WAKE_ANY &&
> - rwsem_atomic_update(0, sem) < RWSEM_WAITING_BIAS)
> - /* Someone grabbed the sem for write already */
> - goto out;
> + adjustment = 0;
> + if (wake_type != RWSEM_WAKE_READ_OWNED) {
> + adjustment = RWSEM_ACTIVE_READ_BIAS;
> + try_reader_grant:
> + oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
> + if (unlikely(oldcount < RWSEM_WAITING_BIAS)) {
> + /* A writer stole the lock. Undo our reader grant. */
> + if (rwsem_atomic_update(-adjustment, sem) &
> + RWSEM_ACTIVE_MASK)
> + goto out;
> + /* Last active locker left. Retry waking readers. */
> + goto try_reader_grant;
> + }

I'm not a big fan of goto loops (though I know they were in here
before). The equivalent solution by factoring:

if (!__rwsem_try_reader_grant(sem, adjustment))
goto out;


static inline int __rwsem_try_reader_grant(struct rw_semaphore *sem, long adj)
{
while (1) {
long count = rwsem_atomic_update(adj, sem) - adj;
if (likely(count >= RWSEM_WAITING_BIAS))
break;

/* A writer stole the lock. Undo our reader grant. */
if (rwsem_atomic_update(-adj, sem) & RWSEM_ACTIVE_MASK)
return 0;
/* Last active locker left. Retry waking readers. */
}
return 1;
}

Everything else here looks good.

> + }
>
> /* Grant an infinite number of read locks to the readers at the front
> * of the queue. Note we increment the 'active part' of the count by
> @@ -114,12 +112,13 @@ __rwsem_do_wake(struct rw_semaphore *sem, int wake_type)
>
> } while (waiter->type != RWSEM_WAITING_FOR_WRITE);
>
> - adjustment = woken * RWSEM_ACTIVE_READ_BIAS;
> + adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment;
> if (waiter->type != RWSEM_WAITING_FOR_WRITE)
> /* hit end of list above */
> adjustment -= RWSEM_WAITING_BIAS;
>
> - rwsem_atomic_add(adjustment, sem);
> + if (adjustment)
> + rwsem_atomic_add(adjustment, sem);
>
> next = sem->wait_list.next;
> loop = woken;
> @@ -164,8 +163,8 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
> count = rwsem_atomic_update(adjustment, sem);
>
> /* If there are no active locks, wake the front queued process(es). */
> - if (count == RWSEM_WAITING_BIAS)
> - sem = __rwsem_do_wake(sem, RWSEM_WAKE_NO_ACTIVE);
> + if (!(count & RWSEM_ACTIVE_MASK))
> + sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);
>
> raw_spin_unlock_irq(&sem->wait_lock);
>
> @@ -209,7 +208,7 @@ struct rw_semaphore __sched *rwsem_down_write_failed(struct rw_semaphore *sem)
> * any read locks that were queued ahead of us. */
> if (count > RWSEM_WAITING_BIAS &&
> adjustment == -RWSEM_ACTIVE_WRITE_BIAS)
> - sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
> + sem = __rwsem_do_wake(sem, RWSEM_WAKE_READERS);
>
> /* wait until we successfully acquire the lock */
> set_task_state(tsk, TASK_UNINTERRUPTIBLE);



--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/