Re: [PATCH 2/2 v2] sched/wait: Introduce lock breaker in wake_up_page_bit

From: Linus Torvalds
Date: Fri Aug 25 2017 - 18:51:27 EST


On Fri, Aug 25, 2017 at 3:19 PM, Tim Chen <tim.c.chen@xxxxxxxxxxxxxxx> wrote:
>
> Also I think patch 1 is still a good idea for a fail safe mechanism
> in case there are other long wait list.

Yeah, I don't hate patch #1.

But that other patch is just nasty.

> That said, I do think your suggested approach is cleaner. However, it
> is a much more substantial change. Let me take a look and see if I
> have any issues implementing it.

Actually, I tried it myself.

It was painful. But I actually have a TOTALLY UNTESTED set of two
patches that implements the idea.

And by "implements the idea" I mean "it kind of compiles, and it kind
of looks like it might work".

But by "kind of compiles" I mean that I didn't implement the nasty
add_page_wait_queue() thing that the cachefiles interface wants.
Honestly, I think the sanest way to do that is to just have a hashed
wait queue *just* for cachefiles.

And by "kind of looks like it might work" I really mean just that.
It's entirely untested. It's more of a "let's take that description of
mine and turn it into code". I really have not tested this AT ALL.

And it's subtle enough that I suspect it really is majorly buggy. It
uses the locking hash list code (hlist_bl_node) to keep the hash table
fairly small and hide the lock in the hash table itself.

And then it plays horrible games with linked lists. Yeah, that may
have been a mistake, but I thought I should try to avoid the doubly
linked lists in that "struct page_wait_struct" because it's allocated
on the stack, and each list_head is 16 bytes on 64-bit architectures.

But that "let's save 24 bytes in the structure" made it much nastier
to remove entries, so it was probably a bad trade-off.

But I'm attaching the two patches because I have no shame. If somebody
is willing to look at my completely untested crap code.

I *really* want to emphasize that "untested crap".

This is meant to be example code of the *concept* rather than anything
that actually works.

So take it as that: example pseudo-code that happens to pass a
compiler, but is meant as a RFD rather than actually working.

The first patch just moves code around because I wanted to experiment
with the new code in a new file. That first patch is probably fine. It
shouldn't change any code, just move it.

The second patch is the "broken patch to illustrate the idea".

Linus
From b0bec1de8a7dba69b223dd30dd2a734401f335aa Mon Sep 17 00:00:00 2001
From: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
Date: Fri, 25 Aug 2017 13:25:43 -0700
Subject: [PATCH 1/2] Split page bit waiting functions into their own file

It turns out that the page-bit waiting logic is very special indeed, and
will get even more so. Split it up into its own file to make this clear.

I'll rewrite the logic of the page wait queue completely, but this is
pure prep-work with no actual code changes, just code movement.

Signed-off-by: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
---
mm/Makefile | 2 +-
mm/filemap.c | 260 --------------------------------------------------
mm/page_wait_bit.c | 274 +++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 275 insertions(+), 261 deletions(-)
create mode 100644 mm/page_wait_bit.c

diff --git a/mm/Makefile b/mm/Makefile
index 411bd24d4a7c..8bc0194207a3 100644
--- a/mm/Makefile
+++ b/mm/Makefile
@@ -32,7 +32,7 @@ ifdef CONFIG_CROSS_MEMORY_ATTACH
mmu-$(CONFIG_MMU) += process_vm_access.o
endif

-obj-y := filemap.o mempool.o oom_kill.o \
+obj-y := filemap.o page_wait_bit.o mempool.o oom_kill.o \
maccess.o page_alloc.o page-writeback.o \
readahead.o swap.o truncate.o vmscan.o shmem.o \
util.o mmzone.o vmstat.o backing-dev.o \
diff --git a/mm/filemap.c b/mm/filemap.c
index a49702445ce0..f9b710e8f76e 100644
--- a/mm/filemap.c
+++ b/mm/filemap.c
@@ -856,195 +856,6 @@ struct page *__page_cache_alloc(gfp_t gfp)
EXPORT_SYMBOL(__page_cache_alloc);
#endif

-/*
- * In order to wait for pages to become available there must be
- * waitqueues associated with pages. By using a hash table of
- * waitqueues where the bucket discipline is to maintain all
- * waiters on the same queue and wake all when any of the pages
- * become available, and for the woken contexts to check to be
- * sure the appropriate page became available, this saves space
- * at a cost of "thundering herd" phenomena during rare hash
- * collisions.
- */
-#define PAGE_WAIT_TABLE_BITS 8
-#define PAGE_WAIT_TABLE_SIZE (1 << PAGE_WAIT_TABLE_BITS)
-static wait_queue_head_t page_wait_table[PAGE_WAIT_TABLE_SIZE] __cacheline_aligned;
-
-static wait_queue_head_t *page_waitqueue(struct page *page)
-{
- return &page_wait_table[hash_ptr(page, PAGE_WAIT_TABLE_BITS)];
-}
-
-void __init pagecache_init(void)
-{
- int i;
-
- for (i = 0; i < PAGE_WAIT_TABLE_SIZE; i++)
- init_waitqueue_head(&page_wait_table[i]);
-
- page_writeback_init();
-}
-
-struct wait_page_key {
- struct page *page;
- int bit_nr;
- int page_match;
-};
-
-struct wait_page_queue {
- struct page *page;
- int bit_nr;
- wait_queue_entry_t wait;
-};
-
-static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync, void *arg)
-{
- struct wait_page_key *key = arg;
- struct wait_page_queue *wait_page
- = container_of(wait, struct wait_page_queue, wait);
-
- if (wait_page->page != key->page)
- return 0;
- key->page_match = 1;
-
- if (wait_page->bit_nr != key->bit_nr)
- return 0;
- if (test_bit(key->bit_nr, &key->page->flags))
- return 0;
-
- return autoremove_wake_function(wait, mode, sync, key);
-}
-
-static void wake_up_page_bit(struct page *page, int bit_nr)
-{
- wait_queue_head_t *q = page_waitqueue(page);
- struct wait_page_key key;
- unsigned long flags;
-
- key.page = page;
- key.bit_nr = bit_nr;
- key.page_match = 0;
-
- spin_lock_irqsave(&q->lock, flags);
- __wake_up_locked_key(q, TASK_NORMAL, &key);
- /*
- * It is possible for other pages to have collided on the waitqueue
- * hash, so in that case check for a page match. That prevents a long-
- * term waiter
- *
- * It is still possible to miss a case here, when we woke page waiters
- * and removed them from the waitqueue, but there are still other
- * page waiters.
- */
- if (!waitqueue_active(q) || !key.page_match) {
- ClearPageWaiters(page);
- /*
- * It's possible to miss clearing Waiters here, when we woke
- * our page waiters, but the hashed waitqueue has waiters for
- * other pages on it.
- *
- * That's okay, it's a rare case. The next waker will clear it.
- */
- }
- spin_unlock_irqrestore(&q->lock, flags);
-}
-
-static void wake_up_page(struct page *page, int bit)
-{
- if (!PageWaiters(page))
- return;
- wake_up_page_bit(page, bit);
-}
-
-static inline int wait_on_page_bit_common(wait_queue_head_t *q,
- struct page *page, int bit_nr, int state, bool lock)
-{
- struct wait_page_queue wait_page;
- wait_queue_entry_t *wait = &wait_page.wait;
- int ret = 0;
-
- init_wait(wait);
- wait->func = wake_page_function;
- wait_page.page = page;
- wait_page.bit_nr = bit_nr;
-
- for (;;) {
- spin_lock_irq(&q->lock);
-
- if (likely(list_empty(&wait->entry))) {
- if (lock)
- __add_wait_queue_entry_tail_exclusive(q, wait);
- else
- __add_wait_queue(q, wait);
- SetPageWaiters(page);
- }
-
- set_current_state(state);
-
- spin_unlock_irq(&q->lock);
-
- if (likely(test_bit(bit_nr, &page->flags))) {
- io_schedule();
- if (unlikely(signal_pending_state(state, current))) {
- ret = -EINTR;
- break;
- }
- }
-
- if (lock) {
- if (!test_and_set_bit_lock(bit_nr, &page->flags))
- break;
- } else {
- if (!test_bit(bit_nr, &page->flags))
- break;
- }
- }
-
- finish_wait(q, wait);
-
- /*
- * A signal could leave PageWaiters set. Clearing it here if
- * !waitqueue_active would be possible (by open-coding finish_wait),
- * but still fail to catch it in the case of wait hash collision. We
- * already can fail to clear wait hash collision cases, so don't
- * bother with signals either.
- */
-
- return ret;
-}
-
-void wait_on_page_bit(struct page *page, int bit_nr)
-{
- wait_queue_head_t *q = page_waitqueue(page);
- wait_on_page_bit_common(q, page, bit_nr, TASK_UNINTERRUPTIBLE, false);
-}
-EXPORT_SYMBOL(wait_on_page_bit);
-
-int wait_on_page_bit_killable(struct page *page, int bit_nr)
-{
- wait_queue_head_t *q = page_waitqueue(page);
- return wait_on_page_bit_common(q, page, bit_nr, TASK_KILLABLE, false);
-}
-
-/**
- * add_page_wait_queue - Add an arbitrary waiter to a page's wait queue
- * @page: Page defining the wait queue of interest
- * @waiter: Waiter to add to the queue
- *
- * Add an arbitrary @waiter to the wait queue for the nominated @page.
- */
-void add_page_wait_queue(struct page *page, wait_queue_entry_t *waiter)
-{
- wait_queue_head_t *q = page_waitqueue(page);
- unsigned long flags;
-
- spin_lock_irqsave(&q->lock, flags);
- __add_wait_queue(q, waiter);
- SetPageWaiters(page);
- spin_unlock_irqrestore(&q->lock, flags);
-}
-EXPORT_SYMBOL_GPL(add_page_wait_queue);
-
#ifndef clear_bit_unlock_is_negative_byte

/*
@@ -1068,57 +879,6 @@ static inline bool clear_bit_unlock_is_negative_byte(long nr, volatile void *mem

#endif

-/**
- * unlock_page - unlock a locked page
- * @page: the page
- *
- * Unlocks the page and wakes up sleepers in ___wait_on_page_locked().
- * Also wakes sleepers in wait_on_page_writeback() because the wakeup
- * mechanism between PageLocked pages and PageWriteback pages is shared.
- * But that's OK - sleepers in wait_on_page_writeback() just go back to sleep.
- *
- * Note that this depends on PG_waiters being the sign bit in the byte
- * that contains PG_locked - thus the BUILD_BUG_ON(). That allows us to
- * clear the PG_locked bit and test PG_waiters at the same time fairly
- * portably (architectures that do LL/SC can test any bit, while x86 can
- * test the sign bit).
- */
-void unlock_page(struct page *page)
-{
- BUILD_BUG_ON(PG_waiters != 7);
- page = compound_head(page);
- VM_BUG_ON_PAGE(!PageLocked(page), page);
- if (clear_bit_unlock_is_negative_byte(PG_locked, &page->flags))
- wake_up_page_bit(page, PG_locked);
-}
-EXPORT_SYMBOL(unlock_page);
-
-/**
- * end_page_writeback - end writeback against a page
- * @page: the page
- */
-void end_page_writeback(struct page *page)
-{
- /*
- * TestClearPageReclaim could be used here but it is an atomic
- * operation and overkill in this particular case. Failing to
- * shuffle a page marked for immediate reclaim is too mild to
- * justify taking an atomic operation penalty at the end of
- * ever page writeback.
- */
- if (PageReclaim(page)) {
- ClearPageReclaim(page);
- rotate_reclaimable_page(page);
- }
-
- if (!test_clear_page_writeback(page))
- BUG();
-
- smp_mb__after_atomic();
- wake_up_page(page, PG_writeback);
-}
-EXPORT_SYMBOL(end_page_writeback);
-
/*
* After completing I/O on a page, call this routine to update the page
* flags appropriately
@@ -1147,26 +907,6 @@ void page_endio(struct page *page, bool is_write, int err)
}
EXPORT_SYMBOL_GPL(page_endio);

-/**
- * __lock_page - get a lock on the page, assuming we need to sleep to get it
- * @__page: the page to lock
- */
-void __lock_page(struct page *__page)
-{
- struct page *page = compound_head(__page);
- wait_queue_head_t *q = page_waitqueue(page);
- wait_on_page_bit_common(q, page, PG_locked, TASK_UNINTERRUPTIBLE, true);
-}
-EXPORT_SYMBOL(__lock_page);
-
-int __lock_page_killable(struct page *__page)
-{
- struct page *page = compound_head(__page);
- wait_queue_head_t *q = page_waitqueue(page);
- return wait_on_page_bit_common(q, page, PG_locked, TASK_KILLABLE, true);
-}
-EXPORT_SYMBOL_GPL(__lock_page_killable);
-
/*
* Return values:
* 1 - page is locked; mmap_sem is still held.
diff --git a/mm/page_wait_bit.c b/mm/page_wait_bit.c
new file mode 100644
index 000000000000..7550b6d2715a
--- /dev/null
+++ b/mm/page_wait_bit.c
@@ -0,0 +1,274 @@
+/*
+ * linux/mm/page_wait_bit.c
+ *
+ * Copyright (C) 2017 Linus Torvalds
+ */
+
+#include <linux/swap.h>
+#include <linux/pagemap.h>
+#include <linux/wait.h>
+#include <linux/export.h>
+#include <linux/sched/signal.h>
+
+#include "internal.h"
+
+/*
+ * In order to wait for pages to become available there must be
+ * waitqueues associated with pages. By using a hash table of
+ * waitqueues where the bucket discipline is to maintain all
+ * waiters on the same queue and wake all when any of the pages
+ * become available, and for the woken contexts to check to be
+ * sure the appropriate page became available, this saves space
+ * at a cost of "thundering herd" phenomena during rare hash
+ * collisions.
+ */
+#define PAGE_WAIT_TABLE_BITS 8
+#define PAGE_WAIT_TABLE_SIZE (1 << PAGE_WAIT_TABLE_BITS)
+static wait_queue_head_t page_wait_table[PAGE_WAIT_TABLE_SIZE] __cacheline_aligned;
+
+static wait_queue_head_t *page_waitqueue(struct page *page)
+{
+ return &page_wait_table[hash_ptr(page, PAGE_WAIT_TABLE_BITS)];
+}
+
+void __init pagecache_init(void)
+{
+ int i;
+
+ for (i = 0; i < PAGE_WAIT_TABLE_SIZE; i++)
+ init_waitqueue_head(&page_wait_table[i]);
+
+ page_writeback_init();
+}
+
+struct wait_page_key {
+ struct page *page;
+ int bit_nr;
+ int page_match;
+};
+
+struct wait_page_queue {
+ struct page *page;
+ int bit_nr;
+ wait_queue_entry_t wait;
+};
+
+static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync, void *arg)
+{
+ struct wait_page_key *key = arg;
+ struct wait_page_queue *wait_page
+ = container_of(wait, struct wait_page_queue, wait);
+
+ if (wait_page->page != key->page)
+ return 0;
+ key->page_match = 1;
+
+ if (wait_page->bit_nr != key->bit_nr)
+ return 0;
+ if (test_bit(key->bit_nr, &key->page->flags))
+ return 0;
+
+ return autoremove_wake_function(wait, mode, sync, key);
+}
+
+static void wake_up_page_bit(struct page *page, int bit_nr)
+{
+ wait_queue_head_t *q = page_waitqueue(page);
+ struct wait_page_key key;
+ unsigned long flags;
+
+ key.page = page;
+ key.bit_nr = bit_nr;
+ key.page_match = 0;
+
+ spin_lock_irqsave(&q->lock, flags);
+ __wake_up_locked_key(q, TASK_NORMAL, &key);
+ /*
+ * It is possible for other pages to have collided on the waitqueue
+ * hash, so in that case check for a page match. That prevents a long-
+ * term waiter
+ *
+ * It is still possible to miss a case here, when we woke page waiters
+ * and removed them from the waitqueue, but there are still other
+ * page waiters.
+ */
+ if (!waitqueue_active(q) || !key.page_match) {
+ ClearPageWaiters(page);
+ /*
+ * It's possible to miss clearing Waiters here, when we woke
+ * our page waiters, but the hashed waitqueue has waiters for
+ * other pages on it.
+ *
+ * That's okay, it's a rare case. The next waker will clear it.
+ */
+ }
+ spin_unlock_irqrestore(&q->lock, flags);
+}
+
+static void wake_up_page(struct page *page, int bit)
+{
+ if (!PageWaiters(page))
+ return;
+ wake_up_page_bit(page, bit);
+}
+
+static inline int wait_on_page_bit_common(wait_queue_head_t *q,
+ struct page *page, int bit_nr, int state, bool lock)
+{
+ struct wait_page_queue wait_page;
+ wait_queue_entry_t *wait = &wait_page.wait;
+ int ret = 0;
+
+ init_wait(wait);
+ wait->func = wake_page_function;
+ wait_page.page = page;
+ wait_page.bit_nr = bit_nr;
+
+ for (;;) {
+ spin_lock_irq(&q->lock);
+
+ if (likely(list_empty(&wait->entry))) {
+ if (lock)
+ __add_wait_queue_entry_tail_exclusive(q, wait);
+ else
+ __add_wait_queue(q, wait);
+ SetPageWaiters(page);
+ }
+
+ set_current_state(state);
+
+ spin_unlock_irq(&q->lock);
+
+ if (likely(test_bit(bit_nr, &page->flags))) {
+ io_schedule();
+ if (unlikely(signal_pending_state(state, current))) {
+ ret = -EINTR;
+ break;
+ }
+ }
+
+ if (lock) {
+ if (!test_and_set_bit_lock(bit_nr, &page->flags))
+ break;
+ } else {
+ if (!test_bit(bit_nr, &page->flags))
+ break;
+ }
+ }
+
+ finish_wait(q, wait);
+
+ /*
+ * A signal could leave PageWaiters set. Clearing it here if
+ * !waitqueue_active would be possible (by open-coding finish_wait),
+ * but still fail to catch it in the case of wait hash collision. We
+ * already can fail to clear wait hash collision cases, so don't
+ * bother with signals either.
+ */
+
+ return ret;
+}
+
+void wait_on_page_bit(struct page *page, int bit_nr)
+{
+ wait_queue_head_t *q = page_waitqueue(page);
+ wait_on_page_bit_common(q, page, bit_nr, TASK_UNINTERRUPTIBLE, false);
+}
+EXPORT_SYMBOL(wait_on_page_bit);
+
+int wait_on_page_bit_killable(struct page *page, int bit_nr)
+{
+ wait_queue_head_t *q = page_waitqueue(page);
+ return wait_on_page_bit_common(q, page, bit_nr, TASK_KILLABLE, false);
+}
+
+/**
+ * add_page_wait_queue - Add an arbitrary waiter to a page's wait queue
+ * @page: Page defining the wait queue of interest
+ * @waiter: Waiter to add to the queue
+ *
+ * Add an arbitrary @waiter to the wait queue for the nominated @page.
+ */
+void add_page_wait_queue(struct page *page, wait_queue_entry_t *waiter)
+{
+ wait_queue_head_t *q = page_waitqueue(page);
+ unsigned long flags;
+
+ spin_lock_irqsave(&q->lock, flags);
+ __add_wait_queue(q, waiter);
+ SetPageWaiters(page);
+ spin_unlock_irqrestore(&q->lock, flags);
+}
+EXPORT_SYMBOL_GPL(add_page_wait_queue);
+
+
+/**
+ * __lock_page - get a lock on the page, assuming we need to sleep to get it
+ * @__page: the page to lock
+ */
+void __lock_page(struct page *__page)
+{
+ struct page *page = compound_head(__page);
+ wait_queue_head_t *q = page_waitqueue(page);
+ wait_on_page_bit_common(q, page, PG_locked, TASK_UNINTERRUPTIBLE, true);
+}
+EXPORT_SYMBOL(__lock_page);
+
+int __lock_page_killable(struct page *__page)
+{
+ struct page *page = compound_head(__page);
+ wait_queue_head_t *q = page_waitqueue(page);
+ return wait_on_page_bit_common(q, page, PG_locked, TASK_KILLABLE, true);
+}
+EXPORT_SYMBOL_GPL(__lock_page_killable);
+
+/**
+ * unlock_page - unlock a locked page
+ * @page: the page
+ *
+ * Unlocks the page and wakes up sleepers in ___wait_on_page_locked().
+ * Also wakes sleepers in wait_on_page_writeback() because the wakeup
+ * mechanism between PageLocked pages and PageWriteback pages is shared.
+ * But that's OK - sleepers in wait_on_page_writeback() just go back to sleep.
+ *
+ * Note that this depends on PG_waiters being the sign bit in the byte
+ * that contains PG_locked - thus the BUILD_BUG_ON(). That allows us to
+ * clear the PG_locked bit and test PG_waiters at the same time fairly
+ * portably (architectures that do LL/SC can test any bit, while x86 can
+ * test the sign bit).
+ */
+void unlock_page(struct page *page)
+{
+ BUILD_BUG_ON(PG_waiters != 7);
+ page = compound_head(page);
+ VM_BUG_ON_PAGE(!PageLocked(page), page);
+ if (clear_bit_unlock_is_negative_byte(PG_locked, &page->flags))
+ wake_up_page_bit(page, PG_locked);
+}
+EXPORT_SYMBOL(unlock_page);
+
+/**
+ * end_page_writeback - end writeback against a page
+ * @page: the page
+ */
+void end_page_writeback(struct page *page)
+{
+ /*
+ * TestClearPageReclaim could be used here but it is an atomic
+ * operation and overkill in this particular case. Failing to
+ * shuffle a page marked for immediate reclaim is too mild to
+ * justify taking an atomic operation penalty at the end of
+ * ever page writeback.
+ */
+ if (PageReclaim(page)) {
+ ClearPageReclaim(page);
+ rotate_reclaimable_page(page);
+ }
+
+ if (!test_clear_page_writeback(page))
+ BUG();
+
+ smp_mb__after_atomic();
+ wake_up_page(page, PG_writeback);
+}
+EXPORT_SYMBOL(end_page_writeback);
--
2.14.0.rc1.2.g4c8247ec3

From 8cf9377c98d77b2a2de0dbc451e8ac283684a4e2 Mon Sep 17 00:00:00 2001
From: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
Date: Fri, 25 Aug 2017 15:45:43 -0700
Subject: [PATCH 2/2] Re-implement the page bit-wait code

The page wait-queues have some horrible scaling issues, and they seem to
be hard to fix. And the way they use the regular wait-queues made that
really bad, with interrupts disabled for long times etc.

This tries to re-implement them with a totally different model. It's
known broken, and the add_page_wait_queue() thing that the cachefiles
code wants to use is not implemented at all (it probably will just need
to have a parallel set of wait-queues that are *only* used for that).

The code is untested and probably horribly buggy, but I'm hoping others
will take a look at this.

Not-signed-off-yet-by: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
---
mm/page_wait_bit.c | 309 ++++++++++++++++++++++++++++++++++++-----------------
1 file changed, 209 insertions(+), 100 deletions(-)

diff --git a/mm/page_wait_bit.c b/mm/page_wait_bit.c
index 7550b6d2715a..47fdb305ddae 100644
--- a/mm/page_wait_bit.c
+++ b/mm/page_wait_bit.c
@@ -9,9 +9,38 @@
#include <linux/wait.h>
#include <linux/export.h>
#include <linux/sched/signal.h>
+#include <linux/list_bl.h>

#include "internal.h"

+/*
+ * Each waiter on a page will register this
+ * 'page_wait_struct' as part of waiting.
+ *
+ * Note that for any particular page, only one of the
+ * structs will actually be visible at the head of the
+ * wait queue hash table at any particular time, but
+ * everybody has one, because as one waiter is woken
+ * up we will need to pick another head for waiters.
+ *
+ * NOTE! All the list operations are protected by the
+ * hlist_bl_lock on the hash table.
+ */
+struct page_wait_struct {
+ // This is the hash queue head entry
+ // only used once per { page, bit }
+ struct hlist_bl_node list;
+ struct page *page;
+ int bit;
+
+ struct page_wait_struct *all;
+ struct page_wait_struct *exclusive;
+
+ // This is the waiter list
+ struct page_wait_struct *next;
+ struct task_struct *wake;
+};
+
/*
* In order to wait for pages to become available there must be
* waitqueues associated with pages. By using a hash table of
@@ -22,11 +51,11 @@
* at a cost of "thundering herd" phenomena during rare hash
* collisions.
*/
-#define PAGE_WAIT_TABLE_BITS 8
+#define PAGE_WAIT_TABLE_BITS 12
#define PAGE_WAIT_TABLE_SIZE (1 << PAGE_WAIT_TABLE_BITS)
-static wait_queue_head_t page_wait_table[PAGE_WAIT_TABLE_SIZE] __cacheline_aligned;
+static struct hlist_bl_head page_wait_table[PAGE_WAIT_TABLE_SIZE] __cacheline_aligned;

-static wait_queue_head_t *page_waitqueue(struct page *page)
+static struct hlist_bl_head *page_waitqueue(struct page *page)
{
return &page_wait_table[hash_ptr(page, PAGE_WAIT_TABLE_BITS)];
}
@@ -36,73 +65,129 @@ void __init pagecache_init(void)
int i;

for (i = 0; i < PAGE_WAIT_TABLE_SIZE; i++)
- init_waitqueue_head(&page_wait_table[i]);
+ INIT_HLIST_BL_HEAD(&page_wait_table[i]);

page_writeback_init();
}

-struct wait_page_key {
- struct page *page;
- int bit_nr;
- int page_match;
-};
+/*
+ * We found a wait entry for the requested page and bit.
+ *
+ * We now need to create a wakeup list, which includes the
+ * first exclusive waiter (if any), and all the non-exclusive
+ * ones.
+ *
+ * If there are more than one exclusive waiters, we need to
+ * turns the next exclusive waiter into a wait entry, and
+ * add it back to the page wait list.
+ */
+static struct page_wait_struct *create_wake_up_list(struct page_wait_struct *entry, struct hlist_bl_head *head)
+{
+ struct page_wait_struct *all = entry->all;
+ struct page_wait_struct *exclusive = entry->exclusive;

-struct wait_page_queue {
- struct page *page;
- int bit_nr;
- wait_queue_entry_t wait;
-};
+ if (exclusive) {
+ struct page_wait_struct *remain = exclusive->next;

-static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync, void *arg)
+ if (remain) {
+ remain->all = NULL;
+ remain->exclusive = remain;
+ hlist_bl_add_head(&remain->list, head);
+ }
+ exclusive->next = all;
+ all = exclusive;
+ }
+ return all;
+}
+
+static inline int remove_myself_from_one_list(struct page_wait_struct **p, struct page_wait_struct *entry)
+{
+ while (*p) {
+ struct page_wait_struct *n = *p;
+ if (n == entry) {
+ *p = n->next;
+ return 1;
+ }
+ p = &n->next;
+ }
+ return 0;
+}
+
+/*
+ * We got woken up, and we need to make sure there is no more
+ * access to us in the list.
+ */
+static void remove_myself_from(struct page_wait_struct *old, struct page_wait_struct *entry, struct hlist_bl_head *head)
{
- struct wait_page_key *key = arg;
- struct wait_page_queue *wait_page
- = container_of(wait, struct wait_page_queue, wait);
+ /* We can be on only one list */
+ if (!remove_myself_from_one_list(&old->all, entry))
+ remove_myself_from_one_list(&old->exclusive, entry);

- if (wait_page->page != key->page)
- return 0;
- key->page_match = 1;
+ hlist_bl_del_init(&entry->list);
+}

- if (wait_page->bit_nr != key->bit_nr)
- return 0;
- if (test_bit(key->bit_nr, &key->page->flags))
- return 0;

- return autoremove_wake_function(wait, mode, sync, key);
+/*
+ * Find and remove the matching page/bit entry from the (locked) bl list
+ *
+ * Return ERR_PTR(-ESRCH) if no matching page at all, NULL if page found
+ * but not with matching bit.
+ */
+static struct page_wait_struct *find_del_entry(struct page *page, int bit_nr, struct hlist_bl_head *head)
+{
+ struct page_wait_struct *entry;
+ struct page_wait_struct *ret = ERR_PTR(-ESRCH);
+ struct hlist_bl_node *node;
+
+ hlist_bl_for_each_entry(entry, node, head, list) {
+ if (entry->page != page)
+ continue;
+ ret = NULL;
+ if (entry->bit != bit_nr)
+ continue;
+ __hlist_bl_del(node);
+ INIT_HLIST_BL_NODE(node);
+ ret = entry;
+ break;
+ }
+ return ret;
}

static void wake_up_page_bit(struct page *page, int bit_nr)
{
- wait_queue_head_t *q = page_waitqueue(page);
- struct wait_page_key key;
+ struct hlist_bl_head *head = page_waitqueue(page);
+ struct page_wait_struct *wake;
unsigned long flags;

- key.page = page;
- key.bit_nr = bit_nr;
- key.page_match = 0;
+ local_save_flags(flags);
+ hlist_bl_lock(head);
+
+ wake = find_del_entry(page, bit_nr, head);
+ if (IS_ERR(wake)) {
+ ClearPageWaiters(page);
+ wake = NULL;
+ } else if (wake) {
+ wake = create_wake_up_list(wake, head);
+ }
+
+ hlist_bl_unlock(head);
+ local_irq_restore(flags);

- spin_lock_irqsave(&q->lock, flags);
- __wake_up_locked_key(q, TASK_NORMAL, &key);
/*
- * It is possible for other pages to have collided on the waitqueue
- * hash, so in that case check for a page match. That prevents a long-
- * term waiter
+ * Actually wake everybody up. Note that as we
+ * wake them up, we can't use the 'wake_list'
+ * entry any more, because it's on their stack.
*
- * It is still possible to miss a case here, when we woke page waiters
- * and removed them from the waitqueue, but there are still other
- * page waiters.
+ * We also clear the 'wake' field so that the
+ * target process can see if they got woken up
+ * by a page bit event.
*/
- if (!waitqueue_active(q) || !key.page_match) {
- ClearPageWaiters(page);
- /*
- * It's possible to miss clearing Waiters here, when we woke
- * our page waiters, but the hashed waitqueue has waiters for
- * other pages on it.
- *
- * That's okay, it's a rare case. The next waker will clear it.
- */
- }
- spin_unlock_irqrestore(&q->lock, flags);
+ while (wake) {
+ struct task_struct *p = wake->wake;
+ wake = wake->next;
+ smp_store_release(&wake->wake, NULL);
+ wake_up_process(p);
+ };
}

static void wake_up_page(struct page *page, int bit)
@@ -112,76 +197,101 @@ static void wake_up_page(struct page *page, int bit)
wake_up_page_bit(page, bit);
}

-static inline int wait_on_page_bit_common(wait_queue_head_t *q,
- struct page *page, int bit_nr, int state, bool lock)
+/*
+ * Wait for the specific page bit to clear.
+ */
+static void wait_once_on_page_bit(struct page *page, int bit_nr, int state, bool lock)
{
- struct wait_page_queue wait_page;
- wait_queue_entry_t *wait = &wait_page.wait;
- int ret = 0;
+ struct page_wait_struct entry, *old;
+ struct hlist_bl_head *head;
+ unsigned long flags;

- init_wait(wait);
- wait->func = wake_page_function;
- wait_page.page = page;
- wait_page.bit_nr = bit_nr;
+ INIT_HLIST_BL_NODE(&entry.list);
+ entry.page = page;
+ entry.bit = bit_nr;
+ entry.all = entry.exclusive = NULL;
+ entry.next = NULL;
+ entry.wake = current;
+
+ head = page_waitqueue(page);
+ local_save_flags(flags);
+ hlist_bl_lock(head);
+
+ old = find_del_entry(page, bit_nr, head);
+ if (IS_ERR(old))
+ old = NULL;
+ if (old) {
+ entry.all = old->all;
+ entry.exclusive = old->exclusive;
+ }

- for (;;) {
- spin_lock_irq(&q->lock);
-
- if (likely(list_empty(&wait->entry))) {
- if (lock)
- __add_wait_queue_entry_tail_exclusive(q, wait);
- else
- __add_wait_queue(q, wait);
- SetPageWaiters(page);
- }
+ if (lock) {
+ entry.next = entry.exclusive;
+ entry.exclusive = &entry;
+ } else {
+ entry.next = entry.all;
+ entry.all = &entry;
+ }

- set_current_state(state);
+ hlist_bl_add_head(&entry.list, head);
+ current->state = state;

- spin_unlock_irq(&q->lock);
+ hlist_bl_unlock(head);
+ local_irq_restore(flags);

- if (likely(test_bit(bit_nr, &page->flags))) {
- io_schedule();
- if (unlikely(signal_pending_state(state, current))) {
- ret = -EINTR;
- break;
- }
- }
+ if (likely(test_bit(bit_nr, &page->flags)))
+ io_schedule();
+
+ /*
+ * NOTE! If we were woken up by something else,
+ * we have to remove ourselves from the hash list.
+ *
+ * But in order to avoid extra locking overhead in
+ * the common case, we only do this if we can't
+ * already tell that we were woken up (and thus
+ * no longer on the lists).
+ */
+ if (smp_load_acquire(&entry.wake) != NULL) {
+ local_save_flags(flags);
+ hlist_bl_lock(head);
+
+ old = find_del_entry(page, bit_nr, head);
+ if (old && !IS_ERR(old))
+ remove_myself_from(old, &entry, head);

+ hlist_bl_unlock(head);
+ local_irq_restore(flags);
+ }
+}
+
+static inline int wait_on_page_bit_common(struct page *page, int bit_nr, int state, bool lock)
+{
+ for (;;) {
+ wait_once_on_page_bit(page, bit_nr, state, lock);
if (lock) {
if (!test_and_set_bit_lock(bit_nr, &page->flags))
- break;
+ return 0;
} else {
if (!test_bit(bit_nr, &page->flags))
- break;
+ return 0;
}
+ if (unlikely(signal_pending_state(state, current)))
+ return -EINTR;
}
-
- finish_wait(q, wait);
-
- /*
- * A signal could leave PageWaiters set. Clearing it here if
- * !waitqueue_active would be possible (by open-coding finish_wait),
- * but still fail to catch it in the case of wait hash collision. We
- * already can fail to clear wait hash collision cases, so don't
- * bother with signals either.
- */
-
- return ret;
}

void wait_on_page_bit(struct page *page, int bit_nr)
{
- wait_queue_head_t *q = page_waitqueue(page);
- wait_on_page_bit_common(q, page, bit_nr, TASK_UNINTERRUPTIBLE, false);
+ wait_on_page_bit_common(page, bit_nr, TASK_UNINTERRUPTIBLE, false);
}
EXPORT_SYMBOL(wait_on_page_bit);

int wait_on_page_bit_killable(struct page *page, int bit_nr)
{
- wait_queue_head_t *q = page_waitqueue(page);
- return wait_on_page_bit_common(q, page, bit_nr, TASK_KILLABLE, false);
+ return wait_on_page_bit_common(page, bit_nr, TASK_KILLABLE, false);
}

+#if 0
/**
* add_page_wait_queue - Add an arbitrary waiter to a page's wait queue
* @page: Page defining the wait queue of interest
@@ -200,6 +310,7 @@ void add_page_wait_queue(struct page *page, wait_queue_entry_t *waiter)
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL_GPL(add_page_wait_queue);
+#endif


/**
@@ -209,16 +320,14 @@ EXPORT_SYMBOL_GPL(add_page_wait_queue);
void __lock_page(struct page *__page)
{
struct page *page = compound_head(__page);
- wait_queue_head_t *q = page_waitqueue(page);
- wait_on_page_bit_common(q, page, PG_locked, TASK_UNINTERRUPTIBLE, true);
+ wait_on_page_bit_common(page, PG_locked, TASK_UNINTERRUPTIBLE, true);
}
EXPORT_SYMBOL(__lock_page);

int __lock_page_killable(struct page *__page)
{
struct page *page = compound_head(__page);
- wait_queue_head_t *q = page_waitqueue(page);
- return wait_on_page_bit_common(q, page, PG_locked, TASK_KILLABLE, true);
+ return wait_on_page_bit_common(page, PG_locked, TASK_KILLABLE, true);
}
EXPORT_SYMBOL_GPL(__lock_page_killable);

--
2.14.0.rc1.2.g4c8247ec3