[PATCH net-next v2] libceph: Partially revert changes to support MSG_SPLICE_PAGES

From: David Howells
Date: Mon Jun 26 2023 - 17:06:14 EST


Fix the mishandling of MSG_DONTWAIT and also reinstates the per-page
checking of the source pages (which might have come from a DIO write by
userspace) by partially reverting the changes to support MSG_SPLICE_PAGES
and doing things a little differently. In messenger_v1:

(1) The ceph_tcp_sendpage() is resurrected and the callers reverted to use
that.

(2) The callers now pass MSG_MORE unconditionally. Previously, they were
passing in MSG_MORE|MSG_SENDPAGE_NOTLAST and then degrading that to
just MSG_MORE on the last call to ->sendpage().

(3) Make ceph_tcp_sendpage() a wrapper around sendmsg() rather than
sendpage(), setting MSG_SPLICE_PAGES if sendpage_ok() returns true on
the page.

In messenger_v2:

(4) Bring back do_try_sendpage() and make the callers use that.

(5) Make do_try_sendpage() use sendmsg() for both cases and set
MSG_SPLICE_PAGES if sendpage_ok() is set.

Fixes: 40a8c17aa770 ("ceph: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage")
Fixes: fa094ccae1e7 ("ceph: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage()")
Reported-by: Ilya Dryomov <idryomov@xxxxxxxxx>
Link: https://lore.kernel.org/r/CAOi1vP9vjLfk3W+AJFeexC93jqPaPUn2dD_4NrzxwoZTbYfOnw@xxxxxxxxxxxxxx/
Link: https://lore.kernel.org/r/CAOi1vP_Bn918j24S94MuGyn+Gxk212btw7yWeDrRcW1U8pc_BA@xxxxxxxxxxxxxx/
Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
cc: Ilya Dryomov <idryomov@xxxxxxxxx>
cc: Xiubo Li <xiubli@xxxxxxxxxx>
cc: Jeff Layton <jlayton@xxxxxxxxxx>
cc: "David S. Miller" <davem@xxxxxxxxxxxxx>
cc: Eric Dumazet <edumazet@xxxxxxxxxx>
cc: Jakub Kicinski <kuba@xxxxxxxxxx>
cc: Paolo Abeni <pabeni@xxxxxxxxxx>
cc: Jens Axboe <axboe@xxxxxxxxx>
cc: Matthew Wilcox <willy@xxxxxxxxxxxxx>
cc: ceph-devel@xxxxxxxxxxxxxxx
cc: netdev@xxxxxxxxxxxxxxx
Link: https://lore.kernel.org/r/3101881.1687801973@xxxxxxxxxxxxxxxxxxxxxx/ # v1
---
Notes:
ver #2)
- Removed mention of MSG_SENDPAGE_NOTLAST in comments.
- Changed some refs to sendpage to MSG_SPLICE_PAGES in comments.
- Init msg_iter in ceph_tcp_sendpage().
- Move setting of MSG_SPLICE_PAGES in do_try_sendpage() next to comment
and adjust how it is cleared.

net/ceph/messenger_v1.c | 58 ++++++++++++++++++++-----------
net/ceph/messenger_v2.c | 88 ++++++++++++++++++++++++++++++++++++++----------
2 files changed, 107 insertions(+), 39 deletions(-)

diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c
index 814579f27f04..51a6f28aa798 100644
--- a/net/ceph/messenger_v1.c
+++ b/net/ceph/messenger_v1.c
@@ -74,6 +74,39 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
return r;
}

+/*
+ * @more: MSG_MORE or 0.
+ */
+static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
+ int offset, size_t size, int more)
+{
+ struct msghdr msg = {
+ .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | more,
+ };
+ struct bio_vec bvec;
+ int ret;
+
+ /*
+ * MSG_SPLICE_PAGES cannot properly handle pages with page_count == 0,
+ * we need to fall back to sendmsg if that's the case.
+ *
+ * Same goes for slab pages: skb_can_coalesce() allows
+ * coalescing neighboring slab objects into a single frag which
+ * triggers one of hardened usercopy checks.
+ */
+ if (sendpage_ok(page))
+ msg.msg_flags |= MSG_SPLICE_PAGES;
+
+ bvec_set_page(&bvec, page, size, offset);
+ iov_iter_bvec(&msg.msg_iter, ITER_DEST, &bvec, 1, size);
+
+ ret = sock_sendmsg(sock, &msg);
+ if (ret == -EAGAIN)
+ ret = 0;
+
+ return ret;
+}
+
static void con_out_kvec_reset(struct ceph_connection *con)
{
BUG_ON(con->v1.out_skip);
@@ -450,10 +483,6 @@ static int write_partial_message_data(struct ceph_connection *con)
*/
crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
while (cursor->total_resid) {
- struct bio_vec bvec;
- struct msghdr msghdr = {
- .msg_flags = MSG_SPLICE_PAGES,
- };
struct page *page;
size_t page_offset;
size_t length;
@@ -465,13 +494,8 @@ static int write_partial_message_data(struct ceph_connection *con)
}

page = ceph_msg_data_next(cursor, &page_offset, &length);
- if (length != cursor->total_resid)
- msghdr.msg_flags |= MSG_MORE;
-
- bvec_set_page(&bvec, page, length, page_offset);
- iov_iter_bvec(&msghdr.msg_iter, ITER_SOURCE, &bvec, 1, length);
-
- ret = sock_sendmsg(con->sock, &msghdr);
+ ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
+ MSG_MORE);
if (ret <= 0) {
if (do_datacrc)
msg->footer.data_crc = cpu_to_le32(crc);
@@ -501,22 +525,14 @@ static int write_partial_message_data(struct ceph_connection *con)
*/
static int write_partial_skip(struct ceph_connection *con)
{
- struct bio_vec bvec;
- struct msghdr msghdr = {
- .msg_flags = MSG_SPLICE_PAGES | MSG_MORE,
- };
int ret;

dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
while (con->v1.out_skip > 0) {
size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);

- if (size == con->v1.out_skip)
- msghdr.msg_flags &= ~MSG_MORE;
- bvec_set_page(&bvec, ZERO_PAGE(0), size, 0);
- iov_iter_bvec(&msghdr.msg_iter, ITER_SOURCE, &bvec, 1, size);
-
- ret = sock_sendmsg(con->sock, &msghdr);
+ ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
+ MSG_MORE);
if (ret <= 0)
goto out;
con->v1.out_skip -= ret;
diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c
index 87ac97073e75..1a888b86a494 100644
--- a/net/ceph/messenger_v2.c
+++ b/net/ceph/messenger_v2.c
@@ -117,38 +117,90 @@ static int ceph_tcp_recv(struct ceph_connection *con)
return ret;
}

+static int do_sendmsg(struct socket *sock, struct iov_iter *it)
+{
+ struct msghdr msg = { .msg_flags = CEPH_MSG_FLAGS };
+ int ret;
+
+ msg.msg_iter = *it;
+ while (iov_iter_count(it)) {
+ ret = sock_sendmsg(sock, &msg);
+ if (ret <= 0) {
+ if (ret == -EAGAIN)
+ ret = 0;
+ return ret;
+ }
+
+ iov_iter_advance(it, ret);
+ }
+
+ WARN_ON(msg_data_left(&msg));
+ return 1;
+}
+
+static int do_try_sendpage(struct socket *sock, struct iov_iter *it)
+{
+ struct msghdr msg = { .msg_flags = CEPH_MSG_FLAGS };
+ struct bio_vec bv;
+ int ret;
+
+ if (WARN_ON(!iov_iter_is_bvec(it)))
+ return -EINVAL;
+
+ while (iov_iter_count(it)) {
+ /* iov_iter_iovec() for ITER_BVEC */
+ bvec_set_page(&bv, it->bvec->bv_page,
+ min(iov_iter_count(it),
+ it->bvec->bv_len - it->iov_offset),
+ it->bvec->bv_offset + it->iov_offset);
+
+ /*
+ * MSG_SPLICE_PAGES cannot properly handle pages with
+ * page_count == 0, we need to fall back to sendmsg if
+ * that's the case.
+ *
+ * Same goes for slab pages: skb_can_coalesce() allows
+ * coalescing neighboring slab objects into a single frag
+ * which triggers one of hardened usercopy checks.
+ */
+ if (sendpage_ok(bv.bv_page))
+ msg.msg_flags |= MSG_SPLICE_PAGES;
+ else
+ msg.msg_flags &= ~MSG_SPLICE_PAGES;
+
+ iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bv, 1, bv.bv_len);
+ ret = sock_sendmsg(sock, &msg);
+ if (ret <= 0) {
+ if (ret == -EAGAIN)
+ ret = 0;
+ return ret;
+ }
+
+ iov_iter_advance(it, ret);
+ }
+
+ return 1;
+}
+
/*
* Write as much as possible. The socket is expected to be corked,
* so we don't bother with MSG_MORE here.
*
* Return:
- * >0 - done, nothing (else) to write
+ * 1 - done, nothing (else) to write
* 0 - socket is full, need to wait
* <0 - error
*/
static int ceph_tcp_send(struct ceph_connection *con)
{
- struct msghdr msg = {
- .msg_iter = con->v2.out_iter,
- .msg_flags = CEPH_MSG_FLAGS,
- };
int ret;

- if (WARN_ON(!iov_iter_is_bvec(&con->v2.out_iter)))
- return -EINVAL;
-
- if (con->v2.out_iter_sendpage)
- msg.msg_flags |= MSG_SPLICE_PAGES;
-
dout("%s con %p have %zu try_sendpage %d\n", __func__, con,
iov_iter_count(&con->v2.out_iter), con->v2.out_iter_sendpage);
-
- ret = sock_sendmsg(con->sock, &msg);
- if (ret > 0)
- iov_iter_advance(&con->v2.out_iter, ret);
- else if (ret == -EAGAIN)
- ret = 0;
-
+ if (con->v2.out_iter_sendpage)
+ ret = do_try_sendpage(con->sock, &con->v2.out_iter);
+ else
+ ret = do_sendmsg(con->sock, &con->v2.out_iter);
dout("%s con %p ret %d left %zu\n", __func__, con, ret,
iov_iter_count(&con->v2.out_iter));
return ret;