Re: [PATCH] rpmsg: char: Use preallocated SKBs.

From: Arnaud POULIQUEN
Date: Tue Dec 06 2022 - 07:55:26 EST


Hello Piotr

On 12/6/22 09:50, Piotr Wojtaszczyk wrote:
> On a message reception copy the message to a SKB taken from preallocated
> pool instead of allocating a new SKB each time.
> During high rpmsg traffic this reduces consumed CPU time noticeably.

Do you have any metrics to share?

>
> Signed-off-by: Piotr Wojtaszczyk <piotr.wojtaszczyk@xxxxxxxxxxx>
> ---
> drivers/rpmsg/rpmsg_char.c | 58 ++++++++++++++++++++++++++++----
> drivers/rpmsg/rpmsg_internal.h | 21 ++++++++++++
> drivers/rpmsg/virtio_rpmsg_bus.c | 21 ------------
> 3 files changed, 72 insertions(+), 28 deletions(-)
>
> diff --git a/drivers/rpmsg/rpmsg_char.c b/drivers/rpmsg/rpmsg_char.c
> index ac50ed757765..76546ba72cdc 100644
> --- a/drivers/rpmsg/rpmsg_char.c
> +++ b/drivers/rpmsg/rpmsg_char.c
> @@ -75,9 +75,44 @@ struct rpmsg_eptdev {
>
> spinlock_t queue_lock;
> struct sk_buff_head queue;
> + struct sk_buff_head skb_pool;
> wait_queue_head_t readq;
> };
>
> +static inline
> +struct sk_buff *rpmsg_eptdev_get_skb(struct rpmsg_eptdev *eptdev)
> +{
> + struct sk_buff *skb;
> +
> + skb = skb_dequeue(&eptdev->skb_pool);
> + if (!skb)
> + skb = alloc_skb(MAX_RPMSG_BUF_SIZE, GFP_ATOMIC);

The "get_mtu" endpoint ops should be used here.
But in any case this works for the virtio backend which defines get_mtu ops
(asit define the MAX_RPMSG_BUF_SIZE), but not for other backend such as glink.
Your proposal needs to be compatible with the legacy.

Here is a proposal:

static struct
sk_buff *rpmsg_eptdev_get_skb(struct rpmsg_eptdev *eptdev, int len)
{
struct sk_buff *skb;

if (eptdev->ept->ops->get_mtu) {
skb = skb_dequeue(&eptdev->skb_pool);
if (!skb)
skb = alloc_skb(eptdev->ept->ops->get_mtu(eptdev->ept),
GFP_ATOMIC);
} else {
alloc_skb (len);
}
}

> + return skb;
> +}
> +
> +static inline
> +void rpmsg_eptdev_put_skb(struct rpmsg_eptdev *eptdev, struct sk_buff *skb)
> +{
> + /* Recycle the skb */
> + skb->tail = 0;
> + skb->len = 0;
> + skb_queue_head(&eptdev->skb_pool, skb);
> +}
> +
> +static void rpmsg_eptdev_free_all_skb(struct rpmsg_eptdev *eptdev)
> +{
> + struct sk_buff *skb;
> +
> + while (!skb_queue_empty(&eptdev->queue)) {
> + skb = skb_dequeue(&eptdev->queue);
> + kfree_skb(skb);
> + }
> + while (!skb_queue_empty(&eptdev->skb_pool)) {
> + skb = skb_dequeue(&eptdev->skb_pool);
> + kfree_skb(skb);
> + }
> +}
> +
> static int rpmsg_eptdev_destroy(struct device *dev, void *data)
> {
> struct rpmsg_eptdev *eptdev = dev_to_eptdev(dev);
> @@ -104,7 +139,7 @@ static int rpmsg_ept_cb(struct rpmsg_device *rpdev, void *buf, int len,
> struct rpmsg_eptdev *eptdev = priv;
> struct sk_buff *skb;
>
> - skb = alloc_skb(len, GFP_ATOMIC);
> + skb = rpmsg_eptdev_get_skb(eptdev);
> if (!skb)
> return -ENOMEM;
>
> @@ -126,6 +161,18 @@ static int rpmsg_eptdev_open(struct inode *inode, struct file *filp)
> struct rpmsg_endpoint *ept;
> struct rpmsg_device *rpdev = eptdev->rpdev;
> struct device *dev = &eptdev->dev;
> + struct sk_buff *skb;
> + int i;
> +
> + /* Preallocate 8 SKBs */
> + for (i = 0; i < 8; i++) {

Do you need to preallocate them?
during runtime, it will try to reuse SKBs of the skb_pool and if no more
available it will create a new one.
This would also help to solve the issue of using MAX_RPMSG_BUF_SIZE

Regards,
Arnaud
> + skb = rpmsg_eptdev_get_skb(eptdev);
> + if (!skb) {
> + rpmsg_eptdev_free_all_skb(eptdev);
> + return -ENOMEM;
> + }
> + rpmsg_eptdev_put_skb(eptdev, skb);
> + }
>
> get_device(dev);
>
> @@ -146,7 +193,6 @@ static int rpmsg_eptdev_release(struct inode *inode, struct file *filp)
> {
> struct rpmsg_eptdev *eptdev = cdev_to_eptdev(inode->i_cdev);
> struct device *dev = &eptdev->dev;
> - struct sk_buff *skb;
>
> /* Close the endpoint, if it's not already destroyed by the parent */
> mutex_lock(&eptdev->ept_lock);
> @@ -157,10 +203,7 @@ static int rpmsg_eptdev_release(struct inode *inode, struct file *filp)
> mutex_unlock(&eptdev->ept_lock);
>
> /* Discard all SKBs */
> - while (!skb_queue_empty(&eptdev->queue)) {
> - skb = skb_dequeue(&eptdev->queue);
> - kfree_skb(skb);
> - }
> + rpmsg_eptdev_free_all_skb(eptdev);
>
> put_device(dev);
>
> @@ -209,7 +252,7 @@ static ssize_t rpmsg_eptdev_read_iter(struct kiocb *iocb, struct iov_iter *to)
> if (copy_to_iter(skb->data, use, to) != use)
> use = -EFAULT;
>
> - kfree_skb(skb);
> + rpmsg_eptdev_put_skb(eptdev, skb);
>
> return use;
> }
> @@ -358,6 +401,7 @@ static int rpmsg_eptdev_create(struct rpmsg_ctrldev *ctrldev,
> mutex_init(&eptdev->ept_lock);
> spin_lock_init(&eptdev->queue_lock);
> skb_queue_head_init(&eptdev->queue);
> + skb_queue_head_init(&eptdev->skb_pool);
> init_waitqueue_head(&eptdev->readq);
>
> device_initialize(dev);
> diff --git a/drivers/rpmsg/rpmsg_internal.h b/drivers/rpmsg/rpmsg_internal.h
> index 3fc83cd50e98..5acaa54a277a 100644
> --- a/drivers/rpmsg/rpmsg_internal.h
> +++ b/drivers/rpmsg/rpmsg_internal.h
> @@ -15,6 +15,27 @@
> #include <linux/rpmsg.h>
> #include <linux/poll.h>
>
> +/*
> + * We're allocating buffers of 512 bytes each for communications. The
> + * number of buffers will be computed from the number of buffers supported
> + * by the vring, upto a maximum of 512 buffers (256 in each direction).
> + *
> + * Each buffer will have 16 bytes for the msg header and 496 bytes for
> + * the payload.
> + *
> + * This will utilize a maximum total space of 256KB for the buffers.
> + *
> + * We might also want to add support for user-provided buffers in time.
> + * This will allow bigger buffer size flexibility, and can also be used
> + * to achieve zero-copy messaging.
> + *
> + * Note that these numbers are purely a decision of this driver - we
> + * can change this without changing anything in the firmware of the remote
> + * processor.
> + */
> +#define MAX_RPMSG_NUM_BUFS (512)
> +#define MAX_RPMSG_BUF_SIZE (512)
> +
> #define to_rpmsg_device(d) container_of(d, struct rpmsg_device, dev)
> #define to_rpmsg_driver(d) container_of(d, struct rpmsg_driver, drv)
>
> diff --git a/drivers/rpmsg/virtio_rpmsg_bus.c b/drivers/rpmsg/virtio_rpmsg_bus.c
> index 3d9e442883e1..6552928a440d 100644
> --- a/drivers/rpmsg/virtio_rpmsg_bus.c
> +++ b/drivers/rpmsg/virtio_rpmsg_bus.c
> @@ -133,27 +133,6 @@ struct virtio_rpmsg_channel {
> #define to_virtio_rpmsg_channel(_rpdev) \
> container_of(_rpdev, struct virtio_rpmsg_channel, rpdev)
>
> -/*
> - * We're allocating buffers of 512 bytes each for communications. The
> - * number of buffers will be computed from the number of buffers supported
> - * by the vring, upto a maximum of 512 buffers (256 in each direction).
> - *
> - * Each buffer will have 16 bytes for the msg header and 496 bytes for
> - * the payload.
> - *
> - * This will utilize a maximum total space of 256KB for the buffers.
> - *
> - * We might also want to add support for user-provided buffers in time.
> - * This will allow bigger buffer size flexibility, and can also be used
> - * to achieve zero-copy messaging.
> - *
> - * Note that these numbers are purely a decision of this driver - we
> - * can change this without changing anything in the firmware of the remote
> - * processor.
> - */
> -#define MAX_RPMSG_NUM_BUFS (512)
> -#define MAX_RPMSG_BUF_SIZE (512)
> -
> /*
> * Local addresses are dynamically allocated on-demand.
> * We do not dynamically assign addresses from the low 1024 range,