[PATCH net-next 07/19] rxrpc: Implement a mechanism to send an event notification to a connection

From: David Howells
Date: Fri Dec 23 2022 - 07:03:48 EST


Provide a means by which an event notification can be sent to a connection
through such that the I/O thread can pick it up and handle it rather than
doing it in a separate workqueue.

This is then used to move the deferred final ACK of a call into the I/O
thread rather than a separate work queue as part of the drive to do all
transmission from the I/O thread.

Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
cc: Marc Dionne <marc.dionne@xxxxxxxxxxxx>
cc: linux-afs@xxxxxxxxxxxxxxxxxxx
---

include/trace/events/rxrpc.h | 5 ++---
net/rxrpc/ar-internal.h | 5 +++++
net/rxrpc/conn_event.c | 14 ++++++++++----
net/rxrpc/conn_object.c | 20 +++++++++++++++++++-
net/rxrpc/io_thread.c | 18 +++++++++++++++++-
net/rxrpc/local_object.c | 1 +
6 files changed, 54 insertions(+), 9 deletions(-)

diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index eac513668e33..b969756f97fc 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -111,7 +111,7 @@
EM(rxrpc_conn_get_call_input, "GET inp-call") \
EM(rxrpc_conn_get_conn_input, "GET inp-conn") \
EM(rxrpc_conn_get_idle, "GET idle ") \
- EM(rxrpc_conn_get_poke, "GET poke ") \
+ EM(rxrpc_conn_get_poke_timer, "GET poke ") \
EM(rxrpc_conn_get_service_conn, "GET svc-conn") \
EM(rxrpc_conn_new_client, "NEW client ") \
EM(rxrpc_conn_new_service, "NEW service ") \
@@ -126,10 +126,9 @@
EM(rxrpc_conn_put_service_reaped, "PUT svc-reap") \
EM(rxrpc_conn_put_unbundle, "PUT unbundle") \
EM(rxrpc_conn_put_unidle, "PUT unidle ") \
+ EM(rxrpc_conn_put_work, "PUT work ") \
EM(rxrpc_conn_queue_challenge, "QUE chall ") \
- EM(rxrpc_conn_queue_retry_work, "QUE retry-wk") \
EM(rxrpc_conn_queue_rx_work, "QUE rx-work ") \
- EM(rxrpc_conn_queue_timer, "QUE timer ") \
EM(rxrpc_conn_see_new_service_conn, "SEE new-svc ") \
EM(rxrpc_conn_see_reap_service, "SEE reap-svc") \
E_(rxrpc_conn_see_work, "SEE work ")
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index b7023ae360e7..1a9822aa3592 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -202,6 +202,7 @@ struct rxrpc_host_header {
* - max 48 bytes (struct sk_buff::cb)
*/
struct rxrpc_skb_priv {
+ struct rxrpc_connection *conn; /* Connection referred to (poke packet) */
u16 offset; /* Offset of data */
u16 len; /* Length of data */
u8 flags;
@@ -292,6 +293,7 @@ struct rxrpc_local {
struct rxrpc_sock __rcu *service; /* Service(s) listening on this endpoint */
struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */
struct sk_buff_head rx_queue; /* Received packets */
+ struct list_head conn_attend_q; /* Conns requiring immediate attention */
struct list_head call_attend_q; /* Calls requiring immediate attention */
struct rb_root client_bundles; /* Client connection bundles by socket params */
spinlock_t client_bundles_lock; /* Lock for client_bundles */
@@ -441,6 +443,7 @@ struct rxrpc_connection {
struct rxrpc_peer *peer; /* Remote endpoint */
struct rxrpc_net *rxnet; /* Network namespace to which call belongs */
struct key *key; /* Security details */
+ struct list_head attend_link; /* Link in local->conn_attend_q */

refcount_t ref;
atomic_t active; /* Active count for service conns */
@@ -905,6 +908,7 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *, struct sk_buff *,
void rxrpc_process_connection(struct work_struct *);
void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool);
int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);
+void rxrpc_input_conn_event(struct rxrpc_connection *, struct sk_buff *);

/*
* conn_object.c
@@ -912,6 +916,7 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);
extern unsigned int rxrpc_connection_expiry;
extern unsigned int rxrpc_closed_conn_expiry;

+void rxrpc_poke_conn(struct rxrpc_connection *, enum rxrpc_conn_trace);
struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t);
struct rxrpc_connection *rxrpc_find_client_connection_rcu(struct rxrpc_local *,
struct sockaddr_rxrpc *,
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index dfd29882126f..7a980a32344f 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -412,10 +412,6 @@ static void rxrpc_do_process_connection(struct rxrpc_connection *conn)
if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events))
rxrpc_secure_connection(conn);

- /* Process delayed ACKs whose time has come. */
- if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
- rxrpc_process_delayed_final_acks(conn, false);
-
/* go through the conn-level event packets, releasing the ref on this
* connection that each one has when we've finished with it */
while ((skb = skb_dequeue(&conn->rx_queue))) {
@@ -515,3 +511,13 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
return -EPROTO;
}
}
+
+/*
+ * Input a connection event.
+ */
+void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb)
+{
+ /* Process delayed ACKs whose time has come. */
+ if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
+ rxrpc_process_delayed_final_acks(conn, false);
+}
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 354a5ecb34ee..649c7fc94658 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -23,12 +23,30 @@ static void rxrpc_clean_up_connection(struct work_struct *work);
static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet,
unsigned long reap_at);

+void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why)
+{
+ struct rxrpc_local *local = conn->local;
+ bool busy;
+
+ if (WARN_ON_ONCE(!local))
+ return;
+
+ spin_lock_bh(&local->lock);
+ busy = !list_empty(&conn->attend_link);
+ if (!busy) {
+ rxrpc_get_connection(conn, why);
+ list_add_tail(&conn->attend_link, &local->conn_attend_q);
+ }
+ spin_unlock_bh(&local->lock);
+ rxrpc_wake_up_io_thread(local);
+}
+
static void rxrpc_connection_timer(struct timer_list *timer)
{
struct rxrpc_connection *conn =
container_of(timer, struct rxrpc_connection, timer);

- rxrpc_queue_conn(conn, rxrpc_conn_queue_timer);
+ rxrpc_poke_conn(conn, rxrpc_conn_get_poke_timer);
}

/*
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index 0e1a548d35f8..dcf1ade0abdd 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -421,6 +421,7 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
*/
int rxrpc_io_thread(void *data)
{
+ struct rxrpc_connection *conn;
struct sk_buff_head rx_queue;
struct rxrpc_local *local = data;
struct rxrpc_call *call;
@@ -436,6 +437,19 @@ int rxrpc_io_thread(void *data)
for (;;) {
rxrpc_inc_stat(local->rxnet, stat_io_loop);

+ /* Deal with connections that want immediate attention. */
+ if ((conn = list_first_entry_or_null(&local->conn_attend_q,
+ struct rxrpc_connection,
+ attend_link))) {
+ spin_lock_bh(&local->lock);
+ list_del_init(&conn->attend_link);
+ spin_unlock_bh(&local->lock);
+
+ rxrpc_input_conn_event(conn, NULL);
+ rxrpc_put_connection(conn, rxrpc_conn_put_poke);
+ continue;
+ }
+
/* Deal with calls that want immediate attention. */
if ((call = list_first_entry_or_null(&local->call_attend_q,
struct rxrpc_call,
@@ -463,6 +477,7 @@ int rxrpc_io_thread(void *data)
rxrpc_input_error(local, skb);
rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
break;
+ break;
default:
WARN_ON_ONCE(1);
rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
@@ -481,7 +496,8 @@ int rxrpc_io_thread(void *data)
set_current_state(TASK_INTERRUPTIBLE);
should_stop = kthread_should_stop();
if (!skb_queue_empty(&local->rx_queue) ||
- !list_empty(&local->call_attend_q)) {
+ !list_empty(&local->call_attend_q) ||
+ !list_empty(&local->conn_attend_q)) {
__set_current_state(TASK_RUNNING);
continue;
}
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index c0ac2fe07ec4..8ef6cd8defa4 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -100,6 +100,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
init_rwsem(&local->defrag_sem);
init_completion(&local->io_thread_ready);
skb_queue_head_init(&local->rx_queue);
+ INIT_LIST_HEAD(&local->conn_attend_q);
INIT_LIST_HEAD(&local->call_attend_q);
local->client_bundles = RB_ROOT;
spin_lock_init(&local->client_bundles_lock);