[PATCH V2 net-next 2/2] net: qrtr: Add support for processing DEL_PROC type control message

From: Sricharan Ramabadhran
Date: Wed Sep 20 2023 - 01:34:07 EST


For certain rproc's like modem, when it goes down and endpoint gets
un-registered, DEL_PROC control message gets forwarded to other
remote nodes. So remote nodes should listen on the message,
wakeup all local waiters waiting for tx_resume notifications
(which will never come) and also forward the message to all
local qrtr sockets like QMI etc. Adding the support here.

Introduced a new rx worker here, because endpoint_post can get called in
atomic contexts, but processing of DEL_PROC needs to acquire node
qrtr_tx mutex.

Signed-off-by: Sricharan Ramabadhran <quic_srichara@xxxxxxxxxxx>
---
[v2] Fixed a sparse warning.

include/uapi/linux/qrtr.h | 1 +
net/qrtr/af_qrtr.c | 65 +++++++++++++++++++++++++++++++++++++++
2 files changed, 66 insertions(+)

diff --git a/include/uapi/linux/qrtr.h b/include/uapi/linux/qrtr.h
index f7e2fb3d752b..1c920159d610 100644
--- a/include/uapi/linux/qrtr.h
+++ b/include/uapi/linux/qrtr.h
@@ -26,6 +26,7 @@ enum qrtr_pkt_type {
QRTR_TYPE_PING = 9,
QRTR_TYPE_NEW_LOOKUP = 10,
QRTR_TYPE_DEL_LOOKUP = 11,
+ QRTR_TYPE_DEL_PROC = 13,
};

struct qrtr_ctrl_pkt {
diff --git a/net/qrtr/af_qrtr.c b/net/qrtr/af_qrtr.c
index e5cf4245c3dc..016b946f308b 100644
--- a/net/qrtr/af_qrtr.c
+++ b/net/qrtr/af_qrtr.c
@@ -3,6 +3,7 @@
* Copyright (c) 2015, Sony Mobile Communications Inc.
* Copyright (c) 2013, The Linux Foundation. All rights reserved.
*/
+#include <linux/kthread.h>
#include <linux/module.h>
#include <linux/netlink.h>
#include <linux/qrtr.h>
@@ -122,6 +123,9 @@ static DEFINE_XARRAY_ALLOC(qrtr_ports);
* @qrtr_tx_lock: lock for qrtr_tx_flow inserts
* @rx_queue: receive queue
* @item: list item for broadcast list
+ * @kworker: worker thread for recv work
+ * @task: task to run the worker thread
+ * @read_data: scheduled work for recv work
*/
struct qrtr_node {
struct mutex ep_lock;
@@ -134,6 +138,9 @@ struct qrtr_node {

struct sk_buff_head rx_queue;
struct list_head item;
+ struct kthread_worker kworker;
+ struct task_struct *task;
+ struct kthread_work read_data;
};

/**
@@ -186,6 +193,9 @@ static void __qrtr_node_release(struct kref *kref)
list_del(&node->item);
mutex_unlock(&qrtr_node_lock);

+ kthread_flush_worker(&node->kworker);
+ kthread_stop(node->task);
+
skb_queue_purge(&node->rx_queue);

/* Free tx flow counters */
@@ -526,6 +536,9 @@ int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len)

if (cb->type == QRTR_TYPE_RESUME_TX) {
qrtr_tx_resume(node, skb);
+ } else if (cb->type == QRTR_TYPE_DEL_PROC) {
+ skb_queue_tail(&node->rx_queue, skb);
+ kthread_queue_work(&node->kworker, &node->read_data);
} else {
ipc = qrtr_port_lookup(cb->dst_port);
if (!ipc)
@@ -574,6 +587,50 @@ static struct sk_buff *qrtr_alloc_ctrl_packet(struct qrtr_ctrl_pkt **pkt,
return skb;
}

+/* Handle DEL_PROC control message */
+static void qrtr_node_rx_work(struct kthread_work *work)
+{
+ struct qrtr_node *node = container_of(work, struct qrtr_node,
+ read_data);
+ struct radix_tree_iter iter;
+ struct qrtr_tx_flow *flow;
+ struct qrtr_ctrl_pkt *pkt;
+ struct qrtr_sock *ipc;
+ struct sk_buff *skb;
+ void __rcu **slot;
+
+ while ((skb = skb_dequeue(&node->rx_queue)) != NULL) {
+ struct qrtr_cb *cb = (struct qrtr_cb *)skb->cb;
+
+ ipc = qrtr_port_lookup(cb->dst_port);
+ if (!ipc) {
+ kfree_skb(skb);
+ continue;
+ }
+
+ if (cb->type == QRTR_TYPE_DEL_PROC) {
+ /* Free tx flow counters */
+ mutex_lock(&node->qrtr_tx_lock);
+ radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) {
+ flow = rcu_dereference_raw(*slot);
+ wake_up_interruptible_all(&flow->resume_tx);
+ }
+ mutex_unlock(&node->qrtr_tx_lock);
+
+ /* Translate DEL_PROC to BYE for local enqueue */
+ cb->type = QRTR_TYPE_BYE;
+ pkt = (struct qrtr_ctrl_pkt *)skb->data;
+ memset(pkt, 0, sizeof(*pkt));
+ pkt->cmd = cpu_to_le32(QRTR_TYPE_BYE);
+
+ if (sock_queue_rcv_skb(&ipc->sk, skb))
+ kfree_skb(skb);
+
+ qrtr_port_put(ipc);
+ }
+ }
+}
+
/**
* qrtr_endpoint_register() - register a new endpoint
* @ep: endpoint to register
@@ -599,6 +656,14 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid)
node->nid = QRTR_EP_NID_AUTO;
node->ep = ep;

+ kthread_init_work(&node->read_data, qrtr_node_rx_work);
+ kthread_init_worker(&node->kworker);
+ node->task = kthread_run(kthread_worker_fn, &node->kworker, "qrtr_rx");
+ if (IS_ERR(node->task)) {
+ kfree(node);
+ return -ENOMEM;
+ }
+
INIT_RADIX_TREE(&node->qrtr_tx_flow, GFP_KERNEL);
mutex_init(&node->qrtr_tx_lock);

--
2.34.1