Re: [RT PATCH v3] seqlock: serialize against writers

From: Gregory Haskins
Date: Tue Sep 02 2008 - 09:04:36 EST


Gregory Haskins wrote:
> Hi Guys,
>
> I realized the prologue was not sufficiently descriptive based on the
> feedback I received. Therefore, here is a V3 with a new description.
> The patch content itself is identical to v2.
>
/me grumbles.

Just realized I was in the wrong git branch when I updated this, so this
will not apply to 26.3-rt3. I will correct and send a v4. Sorry for
the noise.

-Greg


> -Greg
>
> ---------------------------------
>
> seqlock: serialize against writers
>
> There are currently several problems in -rt w.r.t. seqlock objects. RT
> moves mainline seqlock_t to "raw_seqlock_t", and creates a new seqlock_t
> object that is fully preemptible. Being preemptible is a great step
> towards deterministic behavior, but there are a few areas that need
> additional measures to protect new vulnerabilities created by the
> preemptible code. For the purposes of demonstration, consider three tasks
> of different priority: A, B, and C. A is the logically highest, and C is
> the lowest. A is trying to acquire a seqlock read critical section, while
> C is involved in write locks.
>
> Problem 1) If A spins in seqbegin due to writer contention retries, it may
> prevent C from running even if C currently holds the write lock. This
> is a deadlock.
>
> Problem 2) B may preempt C, preventing it from releasing the write
> critical section. In this case, A becomes inverted behind B.
>
> Problem 3) Lower priority tasks such as C may continually acquire the write
> section which subsequently causes A to continually retry and thus fail to
> make forward progress. Since C is lower priority it ideally should not
> cause delays in A. E.g. C should block if A is in a read-lock and C is <= A.
>
> This patch addresses Problems 1 & 2, and leaves 3 for a later time.
>
> This patch changes the internal seqlock_t implementation to substitute
> a rwlock for the basic spinlock previously used, and forces the readers
> to serialize with the writers under contention. Blocking on the read_lock
> simultaneously sleeps A (preventing problem 1), while boosting C to A's
> priority (preventing problem 2). Non reader-to-writer contended
> acquisitions, which are the predominant mode, remain free of atomic
> operations. Therefore the fast path should not be perturbed by this
> change.
>
> This fixes a real-world deadlock discovered under testing where all
> high priority readers were hogging the cpus and preventing a writer
> from releasing the lock (i.e. problem 1).
>
> Signed-off-by: Gregory Haskins <ghaskins@xxxxxxxxxx>
> ---
>
> include/linux/seqlock.h | 67 +++++++++++++++++++++++------------------------
> 1 files changed, 33 insertions(+), 34 deletions(-)
>
> diff --git a/include/linux/seqlock.h b/include/linux/seqlock.h
> index b172277..003d6e4 100644
> --- a/include/linux/seqlock.h
> +++ b/include/linux/seqlock.h
> @@ -3,9 +3,11 @@
> /*
> * Reader/writer consistent mechanism without starving writers. This type of
> * lock for data where the reader wants a consistent set of information
> - * and is willing to retry if the information changes. Readers never
> - * block but they may have to retry if a writer is in
> - * progress. Writers do not wait for readers.
> + * and is willing to retry if the information changes. Readers block
> + * on write contention (and where applicable, pi-boost the writer).
> + * Readers without contention on entry acquire the critical section
> + * without any atomic operations, but they may have to retry if a writer
> + * enters before the critical section ends. Writers do not wait for readers.
> *
> * This is not as cache friendly as brlock. Also, this will not work
> * for data that contains pointers, because any writer could
> @@ -24,6 +26,8 @@
> *
> * Based on x86_64 vsyscall gettimeofday
> * by Keith Owens and Andrea Arcangeli
> + *
> + * Priority inheritance and live-lock avoidance by Gregory Haskins
> */
>
> #include <linux/spinlock.h>
> @@ -31,7 +35,7 @@
>
> typedef struct {
> unsigned sequence;
> - spinlock_t lock;
> + rwlock_t lock;
> } __seqlock_t;
>
> typedef struct {
> @@ -57,7 +61,7 @@ typedef __raw_seqlock_t raw_seqlock_t;
> { 0, RAW_SPIN_LOCK_UNLOCKED(lockname) }
>
> #ifdef CONFIG_PREEMPT_RT
> -# define __SEQLOCK_UNLOCKED(lockname) { 0, __SPIN_LOCK_UNLOCKED(lockname) }
> +# define __SEQLOCK_UNLOCKED(lockname) { 0, __RW_LOCK_UNLOCKED(lockname) }
> #else
> # define __SEQLOCK_UNLOCKED(lockname) __RAW_SEQLOCK_UNLOCKED(lockname)
> #endif
> @@ -69,7 +73,7 @@ typedef __raw_seqlock_t raw_seqlock_t;
> do { *(x) = (raw_seqlock_t) __RAW_SEQLOCK_UNLOCKED(x); spin_lock_init(&(x)->lock); } while (0)
>
> #define seqlock_init(x) \
> - do { *(x) = (seqlock_t) __SEQLOCK_UNLOCKED(x); spin_lock_init(&(x)->lock); } while (0)
> + do { *(x) = (seqlock_t) __SEQLOCK_UNLOCKED(x); rwlock_init(&(x)->lock); } while (0)
>
> #define DEFINE_SEQLOCK(x) \
> seqlock_t x = __SEQLOCK_UNLOCKED(x)
> @@ -85,7 +89,7 @@ typedef __raw_seqlock_t raw_seqlock_t;
> */
> static inline void __write_seqlock(seqlock_t *sl)
> {
> - spin_lock(&sl->lock);
> + write_lock(&sl->lock);
> ++sl->sequence;
> smp_wmb();
> }
> @@ -94,12 +98,12 @@ static inline void __write_sequnlock(seqlock_t *sl)
> {
> smp_wmb();
> sl->sequence++;
> - spin_unlock(&sl->lock);
> + write_unlock(&sl->lock);
> }
>
> static inline int __write_tryseqlock(seqlock_t *sl)
> {
> - int ret = spin_trylock(&sl->lock);
> + int ret = write_trylock(&sl->lock);
>
> if (ret) {
> ++sl->sequence;
> @@ -109,18 +113,25 @@ static inline int __write_tryseqlock(seqlock_t *sl)
> }
>
> /* Start of read calculation -- fetch last complete writer token */
> -static __always_inline unsigned __read_seqbegin(const seqlock_t *sl)
> +static __always_inline unsigned __read_seqbegin(seqlock_t *sl)
> {
> unsigned ret;
>
> -repeat:
> ret = sl->sequence;
> smp_rmb();
> if (unlikely(ret & 1)) {
> - cpu_relax();
> - goto repeat;
> + /*
> + * Serialze with the writer which will ensure they are
> + * pi-boosted if necessary and prevent us from starving
> + * them.
> + */
> + read_lock(&sl->lock);
> + ret = sl->sequence;
> + read_unlock(&sl->lock);
> }
>
> + BUG_ON(ret & 1);
> +
> return ret;
> }
>
> @@ -132,20 +143,8 @@ repeat:
> */
> static inline int __read_seqretry(seqlock_t *sl, unsigned start)
> {
> - int ret;
> -
> smp_rmb();
> - ret = (sl->sequence != start);
> - /*
> - * If invalid then serialize with the writer, to make sure we
> - * are not livelocking it:
> - */
> - if (unlikely(ret)) {
> - unsigned long flags;
> - spin_lock_irqsave(&sl->lock, flags);
> - spin_unlock_irqrestore(&sl->lock, flags);
> - }
> - return ret;
> + return (sl->sequence != start);
> }
>
> static __always_inline void __write_seqlock_raw(raw_seqlock_t *sl)
> @@ -173,7 +172,7 @@ static __always_inline int __write_tryseqlock_raw(raw_seqlock_t *sl)
> return ret;
> }
>
> -static __always_inline unsigned __read_seqbegin_raw(const raw_seqlock_t *sl)
> +static __always_inline unsigned __read_seqbegin_raw(raw_seqlock_t *sl)
> {
> unsigned ret;
>
> @@ -188,7 +187,7 @@ repeat:
> return ret;
> }
>
> -static __always_inline int __read_seqretry_raw(const raw_seqlock_t *sl, unsigned start)
> +static __always_inline int __read_seqretry_raw(raw_seqlock_t *sl, unsigned start)
> {
> smp_rmb();
> return (sl->sequence != start);
> @@ -218,12 +217,12 @@ do { \
> __ret; \
> })
>
> -#define PICK_SEQOP_CONST_RET(op, lock) \
> +#define PICK_SEQOP_RET(op, lock) \
> ({ \
> unsigned long __ret; \
> \
> if (TYPE_EQUAL((lock), raw_seqlock_t)) \
> - __ret = op##_raw((const raw_seqlock_t *)(lock));\
> + __ret = op##_raw((raw_seqlock_t *)(lock));\
> else if (TYPE_EQUAL((lock), seqlock_t)) \
> __ret = op((seqlock_t *)(lock)); \
> else __ret = __bad_seqlock_type(); \
> @@ -231,12 +230,12 @@ do { \
> __ret; \
> })
>
> -#define PICK_SEQOP2_CONST_RET(op, lock, arg) \
> +#define PICK_SEQOP2_RET(op, lock, arg) \
> ({ \
> unsigned long __ret; \
> \
> if (TYPE_EQUAL((lock), raw_seqlock_t)) \
> - __ret = op##_raw((const raw_seqlock_t *)(lock), (arg)); \
> + __ret = op##_raw((raw_seqlock_t *)(lock), (arg)); \
> else if (TYPE_EQUAL((lock), seqlock_t)) \
> __ret = op((seqlock_t *)(lock), (arg)); \
> else __ret = __bad_seqlock_type(); \
> @@ -248,8 +247,8 @@ do { \
> #define write_seqlock(sl) PICK_SEQOP(__write_seqlock, sl)
> #define write_sequnlock(sl) PICK_SEQOP(__write_sequnlock, sl)
> #define write_tryseqlock(sl) PICK_SEQOP_RET(__write_tryseqlock, sl)
> -#define read_seqbegin(sl) PICK_SEQOP_CONST_RET(__read_seqbegin, sl)
> -#define read_seqretry(sl, iv) PICK_SEQOP2_CONST_RET(__read_seqretry, sl, iv)
> +#define read_seqbegin(sl) PICK_SEQOP_RET(__read_seqbegin, sl)
> +#define read_seqretry(sl, iv) PICK_SEQOP2_RET(__read_seqretry, sl, iv)
>
> /*
> * Version using sequence counter only.
>
>


Attachment: signature.asc
Description: OpenPGP digital signature