Re: Pending splice(file -> FIFO) excludes all other FIFO operations forever (was: ... always blocks read(FIFO), regardless of O_NONBLOCK on read side?)

From: Linus Torvalds
Date: Thu Jul 06 2023 - 17:57:11 EST


On Mon, 26 Jun 2023 at 09:14, Ahelenia Ziemiańska
<nabijaczleweli@xxxxxxxxxxxxxxxxxx> wrote:
>
> And even if that was a working work-around, the fundamental problem of
> ./spl>fifo excluding all other access to fifo is quite unfortunate too.

So I already sent an earlier broken version of this patch to Ahelenia
and Christian, but here's an actually slightly tested version with the
obvious bugs fixed.

The keyword here being "obvious". It's probably broken in many
non-obvious ways, but I'm sending it out in case anybody wants to play
around with it.

It boots for me. It even changes behavior of programs that splice()
and used to keep the pipe lock over the IO and no longer do.

But it might do unspeakable things to your pets, so caveat emptor. It
"feels" right to me. But it's a rather quick hack, and really needs
more eyes and more thought. I mention O_NDELAY in the (preliminary)
commit message, but there are probably other issues that need thinking
about.

Linus
From 3475e4d56cefab9f8b061dc824db4e314d076a59 Mon Sep 17 00:00:00 2001
From: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
Date: Thu, 6 Jul 2023 11:23:07 -0700
Subject: [PATCH] splice: lock the pages and unlock the pipe during IO

This is already what we do for page cache pages, where we can add raw
pages that are not up-to-date yet to the pipe, and the readers call the
pipe buffer '->confirm()' function to wait for the data to be ready.

So just do the same for splice reading, allowing us to unlock the pipe
during the read. That then avoids problems with users that get blocked
on the pipe lock.

Now they get blocked on the pipe buffer ->confirm() instead.

TODO: teach 'O_NDELAY' and select/poll about this too.

Signed-off-by: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
---
fs/pipe.c | 18 +++---
fs/splice.c | 122 +++++++++++++++++++++++++++++-----------
include/linux/pagemap.h | 1 +
mm/filemap.c | 6 ++
4 files changed, 105 insertions(+), 42 deletions(-)

diff --git a/fs/pipe.c b/fs/pipe.c
index 2d88f73f585a..71942d240c98 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -284,10 +284,17 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)

if (!pipe_empty(head, tail)) {
struct pipe_buffer *buf = &pipe->bufs[tail & mask];
- size_t chars = buf->len;
- size_t written;
+ size_t chars, written;
int error;

+ error = pipe_buf_confirm(pipe, buf);
+ if (error) {
+ if (!ret)
+ ret = error;
+ break;
+ }
+
+ chars = buf->len;
if (chars > total_len) {
if (buf->flags & PIPE_BUF_FLAG_WHOLE) {
if (ret == 0)
@@ -297,13 +304,6 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
chars = total_len;
}

- error = pipe_buf_confirm(pipe, buf);
- if (error) {
- if (!ret)
- ret = error;
- break;
- }
-
written = copy_page_to_iter(buf->page, buf->offset, chars, to);
if (unlikely(written < chars)) {
if (!ret)
diff --git a/fs/splice.c b/fs/splice.c
index 004eb1c4ce31..503f7eff41b6 100644
--- a/fs/splice.c
+++ b/fs/splice.c
@@ -300,6 +300,42 @@ void splice_shrink_spd(struct splice_pipe_desc *spd)
kfree(spd->partial);
}

+static void finalize_pipe_buf(struct pipe_buffer *buf, unsigned int chunk)
+{
+ buf->len = chunk;
+ buf->ops = &default_pipe_buf_ops;
+ unlock_page(buf->page);
+}
+
+static int busy_pipe_buf_confirm(struct pipe_inode_info *pipe,
+ struct pipe_buffer *buf)
+{
+ struct page *page = buf->page;
+
+ if (folio_wait_bit_interruptible(page_folio(page), PG_locked))
+ return -EINTR;
+ return 0;
+}
+
+/*
+ * These are the same as the default pipe buf operations,
+ * but the '.confirm()' function requires that any user
+ * wait for the page to unlock before use.
+ *
+ * I guess we could use the whole PG_uptodate logic too,
+ * and treat these as some kind of special page table pages.
+ *
+ * PIPE_BUF_FLAG_CAN_MERGE must obviously not be set when
+ * using these, and it's important that any pipe reader
+ * look at buf->len only _after_ confirming the buffer!
+ */
+const struct pipe_buf_operations busy_pipe_buf_ops = {
+ .confirm = busy_pipe_buf_confirm,
+ .release = generic_pipe_buf_release,
+ .try_steal = generic_pipe_buf_try_steal,
+ .get = generic_pipe_buf_get,
+};
+
/**
* copy_splice_read - Copy data from a file and splice the copy into a pipe
* @in: The file to read from
@@ -328,6 +364,7 @@ ssize_t copy_splice_read(struct file *in, loff_t *ppos,
struct bio_vec *bv;
struct kiocb kiocb;
struct page **pages;
+ struct pipe_buffer **bufs;
ssize_t ret;
size_t used, npages, chunk, remain, keep = 0;
int i;
@@ -339,11 +376,13 @@ ssize_t copy_splice_read(struct file *in, loff_t *ppos,
npages = DIV_ROUND_UP(len, PAGE_SIZE);

bv = kzalloc(array_size(npages, sizeof(bv[0])) +
- array_size(npages, sizeof(struct page *)), GFP_KERNEL);
+ array_size(npages, sizeof(struct page *)) +
+ array_size(npages, sizeof(struct pipe_buffer *)), GFP_KERNEL);
if (!bv)
return -ENOMEM;

pages = (struct page **)(bv + npages);
+ bufs = (struct pipe_buffer **)(pages + npages);
npages = alloc_pages_bulk_array(GFP_USER, npages, pages);
if (!npages) {
kfree(bv);
@@ -352,14 +391,34 @@ ssize_t copy_splice_read(struct file *in, loff_t *ppos,

remain = len = min_t(size_t, len, npages * PAGE_SIZE);

+ /* Push them into the pipe and build up the bio_vec */
for (i = 0; i < npages; i++) {
+ struct pipe_buffer *buf = pipe_head_buf(pipe);
+ struct page *page = pages[i];
+
+ pipe->head++;
+ lock_page(page);
+ *buf = (struct pipe_buffer) {
+ .ops = &busy_pipe_buf_ops,
+ .page = page,
+ .offset = 0,
+ .len = chunk,
+ };
+ bufs[i] = buf;
+
chunk = min_t(size_t, PAGE_SIZE, remain);
- bv[i].bv_page = pages[i];
+ bv[i].bv_page = page;
bv[i].bv_offset = 0;
bv[i].bv_len = chunk;
remain -= chunk;
}

+ /*
+ * We have now reserved the space with locked pages,
+ * and can unlock the pipe during the IO.
+ */
+ pipe_unlock(pipe);
+
/* Do the I/O */
iov_iter_bvec(&to, ITER_DEST, bv, npages, len);
init_sync_kiocb(&kiocb, in);
@@ -378,26 +437,22 @@ ssize_t copy_splice_read(struct file *in, loff_t *ppos,
if (ret == -EFAULT)
ret = -EAGAIN;

- /* Free any pages that didn't get touched at all. */
- if (keep < npages)
- release_pages(pages + keep, npages - keep);
-
- /* Push the remaining pages into the pipe. */
+ /* Update the page state in the pipe */
remain = ret;
- for (i = 0; i < keep; i++) {
- struct pipe_buffer *buf = pipe_head_buf(pipe);
+ for (i = 0; i < npages; i++) {
+ struct pipe_buffer *buf = bufs[i];

chunk = min_t(size_t, remain, PAGE_SIZE);
- *buf = (struct pipe_buffer) {
- .ops = &default_pipe_buf_ops,
- .page = bv[i].bv_page,
- .offset = 0,
- .len = chunk,
- };
- pipe->head++;
remain -= chunk;
+
+ /*
+ * NOTE! The size might have changed, and
+ * chunk may be smaller or zero!
+ */
+ finalize_pipe_buf(buf, chunk);
}

+ pipe_lock(pipe);
kfree(bv);
return ret;
}
@@ -455,10 +510,6 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des
while (!pipe_empty(head, tail)) {
struct pipe_buffer *buf = &pipe->bufs[tail & mask];

- sd->len = buf->len;
- if (sd->len > sd->total_len)
- sd->len = sd->total_len;
-
ret = pipe_buf_confirm(pipe, buf);
if (unlikely(ret)) {
if (ret == -ENODATA)
@@ -466,6 +517,10 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des
return ret;
}

+ sd->len = buf->len;
+ if (sd->len > sd->total_len)
+ sd->len = sd->total_len;
+
ret = actor(pipe, buf, sd);
if (ret <= 0)
return ret;
@@ -715,12 +770,7 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
left = sd.total_len;
for (n = 0; !pipe_empty(head, tail) && left && n < nbufs; tail++) {
struct pipe_buffer *buf = &pipe->bufs[tail & mask];
- size_t this_len = buf->len;
-
- /* zero-length bvecs are not supported, skip them */
- if (!this_len)
- continue;
- this_len = min(this_len, left);
+ size_t this_len;

ret = pipe_buf_confirm(pipe, buf);
if (unlikely(ret)) {
@@ -729,6 +779,12 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
goto done;
}

+ /* zero-length bvecs are not supported, skip them */
+ this_len = buf->len;
+ if (!this_len)
+ continue;
+ this_len = min(this_len, left);
+
bvec_set_page(&array[n], buf->page, this_len,
buf->offset);
left -= this_len;
@@ -847,13 +903,6 @@ ssize_t splice_to_socket(struct pipe_inode_info *pipe, struct file *out,
struct pipe_buffer *buf = &pipe->bufs[tail & mask];
size_t seg;

- if (!buf->len) {
- tail++;
- continue;
- }
-
- seg = min_t(size_t, remain, buf->len);
-
ret = pipe_buf_confirm(pipe, buf);
if (unlikely(ret)) {
if (ret == -ENODATA)
@@ -861,6 +910,13 @@ ssize_t splice_to_socket(struct pipe_inode_info *pipe, struct file *out,
break;
}

+ if (!buf->len) {
+ tail++;
+ continue;
+ }
+
+ seg = min_t(size_t, remain, buf->len);
+
bvec_set_page(&bvec[bc++], buf->page, seg, buf->offset);
remain -= seg;
if (remain == 0 || bc >= ARRAY_SIZE(bvec))
diff --git a/include/linux/pagemap.h b/include/linux/pagemap.h
index 716953ee1ebd..cc51ea910a91 100644
--- a/include/linux/pagemap.h
+++ b/include/linux/pagemap.h
@@ -1018,6 +1018,7 @@ static inline bool folio_lock_or_retry(struct folio *folio,
*/
void folio_wait_bit(struct folio *folio, int bit_nr);
int folio_wait_bit_killable(struct folio *folio, int bit_nr);
+int folio_wait_bit_interruptible(struct folio *folio, int bit_nr);

/*
* Wait for a folio to be unlocked.
diff --git a/mm/filemap.c b/mm/filemap.c
index 9e44a49bbd74..e36e052bb991 100644
--- a/mm/filemap.c
+++ b/mm/filemap.c
@@ -1450,6 +1450,12 @@ int folio_wait_bit_killable(struct folio *folio, int bit_nr)
}
EXPORT_SYMBOL(folio_wait_bit_killable);

+int folio_wait_bit_interruptible(struct folio *folio, int bit_nr)
+{
+ return folio_wait_bit_common(folio, bit_nr, TASK_INTERRUPTIBLE, SHARED);
+}
+EXPORT_SYMBOL(folio_wait_bit_interruptible);
+
/**
* folio_put_wait_locked - Drop a reference and wait for it to be unlocked
* @folio: The folio to wait for.
--
2.41.0.203.ga4f2cd32bb.dirty