[PATCH net-next v2 19/21] rxrpc: Record probes after transmission and reduce number of time-gets

From: David Howells
Date: Mon Mar 04 2024 - 03:50:34 EST


Move the recording of a successfully transmitted DATA or ACK packet that
will provide RTT probing to after the transmission. With the I/O thread
model, this can be done because parsing of the responding ACK can no longer
race with the post-transmission code.

Move the various timeout-settings done after successfully transmitting a
DATA packet into rxrpc_tstamp_data_packets() and eliminate a number of
calls to get the current time.

As a consequence we no longer need to cancel a proposed RTT probe on
transmission failure.

Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
cc: Marc Dionne <marc.dionne@xxxxxxxxxxxx>
cc: "David S. Miller" <davem@xxxxxxxxxxxxx>
cc: Eric Dumazet <edumazet@xxxxxxxxxx>
cc: Jakub Kicinski <kuba@xxxxxxxxxx>
cc: Paolo Abeni <pabeni@xxxxxxxxxx>
cc: linux-afs@xxxxxxxxxxxxxxxxxxx
cc: netdev@xxxxxxxxxxxxxxx
---
net/rxrpc/output.c | 105 +++++++++++++++++----------------------------
1 file changed, 40 insertions(+), 65 deletions(-)

diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index ec82193e5681..5ea9601efd05 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -63,7 +63,7 @@ static void rxrpc_tx_backoff(struct rxrpc_call *call, int ret)
* Receiving a response to the ping will prevent the ->expect_rx_by timer from
* expiring.
*/
-static void rxrpc_set_keepalive(struct rxrpc_call *call)
+static void rxrpc_set_keepalive(struct rxrpc_call *call, ktime_t now)
{
ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo) / 6);

@@ -147,8 +147,8 @@ static void rxrpc_fill_out_ack(struct rxrpc_call *call,
/*
* Record the beginning of an RTT probe.
*/
-static int rxrpc_begin_rtt_probe(struct rxrpc_call *call, rxrpc_serial_t serial,
- enum rxrpc_rtt_tx_trace why)
+static void rxrpc_begin_rtt_probe(struct rxrpc_call *call, rxrpc_serial_t serial,
+ ktime_t now, enum rxrpc_rtt_tx_trace why)
{
unsigned long avail = call->rtt_avail;
int rtt_slot = 9;
@@ -161,30 +161,15 @@ static int rxrpc_begin_rtt_probe(struct rxrpc_call *call, rxrpc_serial_t serial,
goto no_slot;

call->rtt_serial[rtt_slot] = serial;
- call->rtt_sent_at[rtt_slot] = ktime_get_real();
+ call->rtt_sent_at[rtt_slot] = now;
smp_wmb(); /* Write data before avail bit */
set_bit(rtt_slot + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);

trace_rxrpc_rtt_tx(call, why, rtt_slot, serial);
- return rtt_slot;
+ return;

no_slot:
trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_no_slot, rtt_slot, serial);
- return -1;
-}
-
-/*
- * Cancel an RTT probe.
- */
-static void rxrpc_cancel_rtt_probe(struct rxrpc_call *call,
- rxrpc_serial_t serial, int rtt_slot)
-{
- if (rtt_slot != -1) {
- clear_bit(rtt_slot + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
- smp_wmb(); /* Clear pending bit before setting slot */
- set_bit(rtt_slot, &call->rtt_avail);
- trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_cancel, rtt_slot, serial);
- }
}

/*
@@ -196,7 +181,8 @@ static void rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
struct rxrpc_connection *conn;
struct rxrpc_ackpacket *ack = (struct rxrpc_ackpacket *)(whdr + 1);
struct msghdr msg;
- int ret, rtt_slot = -1;
+ ktime_t now;
+ int ret;

if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
return;
@@ -218,9 +204,6 @@ static void rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
ntohl(ack->serial), ack->reason, ack->nAcks,
txb->ack_rwind);

- if (ack->reason == RXRPC_ACK_PING)
- rtt_slot = rxrpc_begin_rtt_probe(call, txb->serial, rxrpc_rtt_tx_ping);
-
rxrpc_inc_stat(call->rxnet, stat_tx_ack_send);

iov_iter_kvec(&msg.msg_iter, WRITE, txb->kvec, txb->nr_kvec, txb->len);
@@ -233,16 +216,14 @@ static void rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
} else {
trace_rxrpc_tx_packet(call->debug_id, whdr,
rxrpc_tx_point_call_ack);
+ now = ktime_get_real();
+ if (ack->reason == RXRPC_ACK_PING)
+ rxrpc_begin_rtt_probe(call, txb->serial, now, rxrpc_rtt_tx_ping);
if (txb->flags & RXRPC_REQUEST_ACK)
- call->peer->rtt_last_req = ktime_get_real();
+ call->peer->rtt_last_req = now;
+ rxrpc_set_keepalive(call, now);
}
rxrpc_tx_backoff(call, ret);
-
- if (!__rxrpc_call_is_complete(call)) {
- if (ret < 0)
- rxrpc_cancel_rtt_probe(call, txb->serial, rtt_slot);
- rxrpc_set_keepalive(call);
- }
}

/*
@@ -413,18 +394,36 @@ static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_tx
}

/*
- * Set the times on a packet before transmission
+ * Set timeouts after transmitting a packet.
*/
-static int rxrpc_tstamp_data_packets(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
+static void rxrpc_tstamp_data_packets(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
{
- ktime_t tstamp = ktime_get_real();
- int rtt_slot = -1;
+ ktime_t now = ktime_get_real();
+ bool ack_requested = txb->flags & RXRPC_REQUEST_ACK;

- txb->last_sent = tstamp;
- if (txb->flags & RXRPC_REQUEST_ACK)
- rtt_slot = rxrpc_begin_rtt_probe(call, txb->serial, rxrpc_rtt_tx_data);
+ call->tx_last_sent = now;
+ txb->last_sent = now;
+
+ if (ack_requested) {
+ rxrpc_begin_rtt_probe(call, txb->serial, now, rxrpc_rtt_tx_data);
+
+ call->peer->rtt_last_req = now;
+ if (call->peer->rtt_count > 1) {
+ ktime_t delay = rxrpc_get_rto_backoff(call->peer, false);

- return rtt_slot;
+ call->ack_lost_at = ktime_add(now, delay);
+ trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_lost_ack);
+ }
+ }
+
+ if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags)) {
+ ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo));
+
+ call->expect_rx_by = ktime_add(now, delay);
+ trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx);
+ }
+
+ rxrpc_set_keepalive(call, now);
}

/*
@@ -437,7 +436,7 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
enum rxrpc_tx_point frag;
struct msghdr msg;
size_t len;
- int ret, rtt_slot = -1;
+ int ret;

_enter("%x,{%d}", txb->seq, txb->len);

@@ -479,8 +478,6 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
}

retry:
- rtt_slot = rxrpc_tstamp_data_packets(call, txb);
-
/* send the packet by UDP
* - returns -EMSGSIZE if UDP would have to fragment the packet
* to go out of the interface
@@ -493,7 +490,6 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t

if (ret < 0) {
rxrpc_inc_stat(call->rxnet, stat_tx_data_send_fail);
- rxrpc_cancel_rtt_probe(call, txb->serial, rtt_slot);
trace_rxrpc_tx_fail(call->debug_id, txb->serial, ret, frag);
} else {
trace_rxrpc_tx_packet(call->debug_id, whdr, frag);
@@ -508,28 +504,7 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t

done:
if (ret >= 0) {
- call->tx_last_sent = txb->last_sent;
- if (txb->flags & RXRPC_REQUEST_ACK) {
- call->peer->rtt_last_req = txb->last_sent;
- if (call->peer->rtt_count > 1) {
- ktime_t delay = rxrpc_get_rto_backoff(call->peer, false);
- ktime_t now = ktime_get_real();
-
- call->ack_lost_at = ktime_add(now, delay);
- trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_lost_ack);
- }
- }
-
- if (txb->seq == 1 &&
- !test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER,
- &call->flags)) {
- ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo));
-
- call->expect_rx_by = ktime_add(ktime_get_real(), delay);
- trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx);
- }
-
- rxrpc_set_keepalive(call);
+ rxrpc_tstamp_data_packets(call, txb);
} else {
/* Cancel the call if the initial transmission fails,
* particularly if that's due to network routing issues that