[PATCH net-next 11/19] rxrpc: Offload the completion of service conn security to the I/O thread

From: David Howells
Date: Fri Dec 23 2022 - 07:03:53 EST


Offload the completion of the challenge/response cycle on a service
connection to the I/O thread. After the RESPONSE packet has been
successfully decrypted and verified by the work queue, offloading the
changing of the call states to the I/O thread makes iteration over the
conn's channel list simpler.

Do this by marking the RESPONSE skbuff and putting it onto the receive
queue for the I/O thread to collect. We put it on the front of the queue
as we've already received the packet for it.

Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
cc: Marc Dionne <marc.dionne@xxxxxxxxxxxx>
cc: linux-afs@xxxxxxxxxxxxxxxxxxx
---

include/trace/events/rxrpc.h | 2 ++
net/rxrpc/ar-internal.h | 1 +
net/rxrpc/conn_event.c | 46 +++++++++++++++++++++++++++++-------------
net/rxrpc/io_thread.c | 5 +++++
4 files changed, 40 insertions(+), 14 deletions(-)

diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index caeabd50e049..85671f4a77de 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -126,6 +126,7 @@
#define rxrpc_skb_traces \
EM(rxrpc_skb_eaten_by_unshare, "ETN unshare ") \
EM(rxrpc_skb_eaten_by_unshare_nomem, "ETN unshar-nm") \
+ EM(rxrpc_skb_get_conn_secured, "GET conn-secd") \
EM(rxrpc_skb_get_conn_work, "GET conn-work") \
EM(rxrpc_skb_get_local_work, "GET locl-work") \
EM(rxrpc_skb_get_reject_work, "GET rej-work ") \
@@ -135,6 +136,7 @@
EM(rxrpc_skb_new_error_report, "NEW error-rpt") \
EM(rxrpc_skb_new_jumbo_subpacket, "NEW jumbo-sub") \
EM(rxrpc_skb_new_unshared, "NEW unshared ") \
+ EM(rxrpc_skb_put_conn_secured, "PUT conn-secd") \
EM(rxrpc_skb_put_conn_work, "PUT conn-work") \
EM(rxrpc_skb_put_error_report, "PUT error-rep") \
EM(rxrpc_skb_put_input, "PUT input ") \
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index bc3927883143..61f6d58a8cd4 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -38,6 +38,7 @@ struct rxrpc_txbuf;
enum rxrpc_skb_mark {
RXRPC_SKB_MARK_PACKET, /* Received packet */
RXRPC_SKB_MARK_ERROR, /* Error notification */
+ RXRPC_SKB_MARK_SERVICE_CONN_SECURED, /* Service connection response has been verified */
RXRPC_SKB_MARK_REJECT_BUSY, /* Reject with BUSY */
RXRPC_SKB_MARK_REJECT_ABORT, /* Reject with ABORT (code in skb->priority) */
};
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index 71d4f38fc10f..ede2c7571bc1 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -248,7 +248,7 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
- int loop, ret;
+ int ret;

if (conn->state == RXRPC_CONN_ABORTED)
return -ECONNABORTED;
@@ -269,22 +269,21 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
if (ret < 0)
return ret;

- spin_lock(&conn->bundle->channel_lock);
spin_lock(&conn->state_lock);
+ if (conn->state == RXRPC_CONN_SERVICE_CHALLENGING)
+ smp_store_release(&conn->state, RXRPC_CONN_SERVICE);
+ spin_unlock(&conn->state_lock);

- if (conn->state == RXRPC_CONN_SERVICE_CHALLENGING) {
- conn->state = RXRPC_CONN_SERVICE;
- spin_unlock(&conn->state_lock);
- for (loop = 0; loop < RXRPC_MAXCALLS; loop++)
- rxrpc_call_is_secure(
- rcu_dereference_protected(
- conn->channels[loop].call,
- lockdep_is_held(&conn->bundle->channel_lock)));
- } else {
- spin_unlock(&conn->state_lock);
+ if (conn->state == RXRPC_CONN_SERVICE) {
+ /* Offload call state flipping to the I/O thread. As
+ * we've already received the packet, put it on the
+ * front of the queue.
+ */
+ skb->mark = RXRPC_SKB_MARK_SERVICE_CONN_SECURED;
+ rxrpc_get_skb(skb, rxrpc_skb_get_conn_secured);
+ skb_queue_head(&conn->local->rx_queue, skb);
+ rxrpc_wake_up_io_thread(conn->local);
}
-
- spin_unlock(&conn->bundle->channel_lock);
return 0;

default:
@@ -442,9 +441,28 @@ bool rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
*/
void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb)
{
+ unsigned int loop;
+
if (test_and_clear_bit(RXRPC_CONN_EV_ABORT_CALLS, &conn->events))
rxrpc_abort_calls(conn);

+ switch (skb->mark) {
+ case RXRPC_SKB_MARK_SERVICE_CONN_SECURED:
+ if (conn->state != RXRPC_CONN_SERVICE)
+ break;
+
+ spin_lock(&conn->bundle->channel_lock);
+
+ for (loop = 0; loop < RXRPC_MAXCALLS; loop++)
+ rxrpc_call_is_secure(
+ rcu_dereference_protected(
+ conn->channels[loop].call,
+ lockdep_is_held(&conn->bundle->channel_lock)));
+
+ spin_unlock(&conn->bundle->channel_lock);
+ break;
+ }
+
/* Process delayed ACKs whose time has come. */
if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
rxrpc_process_delayed_final_acks(conn, false);
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index 3e13e700329a..e7db137f78e1 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -450,6 +450,7 @@ int rxrpc_io_thread(void *data)

/* Process received packets and errors. */
if ((skb = __skb_dequeue(&rx_queue))) {
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
switch (skb->mark) {
case RXRPC_SKB_MARK_PACKET:
skb->priority = 0;
@@ -462,6 +463,10 @@ int rxrpc_io_thread(void *data)
rxrpc_input_error(local, skb);
rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
break;
+ case RXRPC_SKB_MARK_SERVICE_CONN_SECURED:
+ rxrpc_input_conn_event(sp->conn, skb);
+ rxrpc_put_connection(sp->conn, rxrpc_conn_put_poke);
+ rxrpc_free_skb(skb, rxrpc_skb_put_conn_secured);
break;
default:
WARN_ON_ONCE(1);