[PATCH v5 1/2] watch_queue: refactor post_one_notification

From: Hongchen Zhang
Date: Thu Aug 10 2023 - 21:03:43 EST


Refactor post_one_notification so that we can lock pipe using
sleepable lock.

Signed-off-by: Hongchen Zhang <zhanghongchen@xxxxxxxxxxx>
---
fs/pipe.c | 5 +++-
include/linux/watch_queue.h | 14 ++++++++++-
kernel/watch_queue.c | 47 +++++++++++++++++++++++++++----------
3 files changed, 51 insertions(+), 15 deletions(-)

diff --git a/fs/pipe.c b/fs/pipe.c
index 2d88f73f585a..5c6b3daed938 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -834,8 +834,11 @@ void free_pipe_info(struct pipe_inode_info *pipe)
unsigned int i;

#ifdef CONFIG_WATCH_QUEUE
- if (pipe->watch_queue)
+ if (pipe->watch_queue) {
watch_queue_clear(pipe->watch_queue);
+ smp_cond_load_relaxed(&pipe->watch_queue->state,
+ (VAL & WATCH_QUEUE_POST_CNT_MASK) == 0);
+ }
#endif

(void) account_pipe_buffers(pipe->user, pipe->nr_accounted, 0);
diff --git a/include/linux/watch_queue.h b/include/linux/watch_queue.h
index fc6bba20273b..1db3eee2137a 100644
--- a/include/linux/watch_queue.h
+++ b/include/linux/watch_queue.h
@@ -35,6 +35,7 @@ struct watch_filter {
struct watch_type_filter filters[];
};

+#define WATCH_QUEUE_POST_CNT_MASK GENMASK(30, 0)
struct watch_queue {
struct rcu_head rcu;
struct watch_filter __rcu *filter;
@@ -46,7 +47,18 @@ struct watch_queue {
spinlock_t lock;
unsigned int nr_notes; /* Number of notes */
unsigned int nr_pages; /* Number of pages in notes[] */
- bool defunct; /* T when queues closed */
+ union {
+ struct {
+#ifdef __LITTLE_ENDIAN
+ u32 post_cnt:31; /* How many threads are posting notification */
+ u32 defunct:1; /* T when queues closed */
+#else
+ u32 defunct:1; /* T when queues closed */
+ u32 post_cnt:31; /* How many threads are posting notification */
+#endif
+ };
+ u32 state;
+ };
};

/*
diff --git a/kernel/watch_queue.c b/kernel/watch_queue.c
index e91cb4c2833f..bd14f054ffb8 100644
--- a/kernel/watch_queue.c
+++ b/kernel/watch_queue.c
@@ -33,6 +33,8 @@ MODULE_AUTHOR("Red Hat, Inc.");
#define WATCH_QUEUE_NOTE_SIZE 128
#define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE)

+static void put_watch(struct watch *watch);
+
/*
* This must be called under the RCU read-lock, which makes
* sure that the wqueue still exists. It can then take the lock,
@@ -88,24 +90,40 @@ static const struct pipe_buf_operations watch_queue_pipe_buf_ops = {
};

/*
- * Post a notification to a watch queue.
- *
- * Must be called with the RCU lock for reading, and the
- * watch_queue lock held, which guarantees that the pipe
- * hasn't been released.
+ * Post a notification to a watch queue with RCU lock held.
*/
-static bool post_one_notification(struct watch_queue *wqueue,
+static bool post_one_notification(struct watch *watch,
struct watch_notification *n)
{
void *p;
- struct pipe_inode_info *pipe = wqueue->pipe;
+ struct watch_queue *wqueue;
+ struct pipe_inode_info *pipe;
struct pipe_buffer *buf;
struct page *page;
unsigned int head, tail, mask, note, offset, len;
bool done = false;
+ u32 state;
+
+ if (!kref_get_unless_zero(&watch->usage))
+ return false;
+ wqueue = rcu_dereference(watch->queue);
+
+ pipe = wqueue->pipe;

- if (!pipe)
+ if (!pipe) {
+ put_watch(watch);
return false;
+ }
+
+ do {
+ if (wqueue->defunct) {
+ put_watch(watch);
+ return false;
+ }
+ state = wqueue->state;
+ } while (cmpxchg(&wqueue->state, state, state + 1) != state);
+
+ rcu_read_unlock();

spin_lock_irq(&pipe->rd_wait.lock);

@@ -145,6 +163,12 @@ static bool post_one_notification(struct watch_queue *wqueue,

out:
spin_unlock_irq(&pipe->rd_wait.lock);
+ do {
+ state = wqueue->state;
+ } while (cmpxchg(&wqueue->state, state, state - 1) != state);
+
+ rcu_read_lock();
+ put_watch(watch);
if (done)
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
return done;
@@ -224,10 +248,7 @@ void __post_watch_notification(struct watch_list *wlist,
if (security_post_notification(watch->cred, cred, n) < 0)
continue;

- if (lock_wqueue(wqueue)) {
- post_one_notification(wqueue, n);
- unlock_wqueue(wqueue);
- }
+ post_one_notification(watch, n);
}

rcu_read_unlock();
@@ -560,8 +581,8 @@ int remove_watch_from_object(struct watch_list *wlist, struct watch_queue *wq,

wqueue = rcu_dereference(watch->queue);

+ post_one_notification(watch, &n.watch);
if (lock_wqueue(wqueue)) {
- post_one_notification(wqueue, &n.watch);

if (!hlist_unhashed(&watch->queue_node)) {
hlist_del_init_rcu(&watch->queue_node);

base-commit: 6995e2de6891c724bfeb2db33d7b87775f913ad1
--
2.33.0