Re: [RFC v2 6/7] virtio: in order support for virtio_ring

From: Guo Zhi
Date: Thu Aug 25 2022 - 23:18:37 EST




----- Original Message -----
> From: "jasowang" <jasowang@xxxxxxxxxx>
> To: "Guo Zhi" <qtxuning1999@xxxxxxxxxxx>, "eperezma" <eperezma@xxxxxxxxxx>, "sgarzare" <sgarzare@xxxxxxxxxx>, "Michael
> Tsirkin" <mst@xxxxxxxxxx>
> Cc: "netdev" <netdev@xxxxxxxxxxxxxxx>, "linux-kernel" <linux-kernel@xxxxxxxxxxxxxxx>, "kvm list" <kvm@xxxxxxxxxxxxxxx>,
> "virtualization" <virtualization@xxxxxxxxxxxxxxxxxxxxxxxxxx>
> Sent: Thursday, August 25, 2022 3:44:41 PM
> Subject: Re: [RFC v2 6/7] virtio: in order support for virtio_ring

> 在 2022/8/17 21:57, Guo Zhi 写道:
>> If in order feature negotiated, we can skip the used ring to get
>> buffer's desc id sequentially.
>>
>> Signed-off-by: Guo Zhi <qtxuning1999@xxxxxxxxxxx>
>> ---
>> drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
>> 1 file changed, 45 insertions(+), 8 deletions(-)
>>
>> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
>> index 1c1b3fa376a2..143184ebb5a1 100644
>> --- a/drivers/virtio/virtio_ring.c
>> +++ b/drivers/virtio/virtio_ring.c
>> @@ -144,6 +144,9 @@ struct vring_virtqueue {
>> /* DMA address and size information */
>> dma_addr_t queue_dma_addr;
>> size_t queue_size_in_bytes;
>> +
>> + /* In order feature batch begin here */
>
>
> We need tweak the comment, it's not easy for me to understand the
> meaning here.
>
>
>> + u16 next_desc_begin;
>> } split;
>>
>> /* Available for packed ring */
>> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq,
>> unsigned int head,
>> }
>>
>> vring_unmap_one_split(vq, i);
>> - vq->split.desc_extra[i].next = vq->free_head;
>> - vq->free_head = head;
>> + /* In order feature use desc in order,
>> + * that means, the next desc will always be free
>> + */
>
>
> Maybe we should add something like "The descriptors are prepared in order".
>
>
>> + if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
>> + vq->split.desc_extra[i].next = vq->free_head;
>> + vq->free_head = head;
>> + }
>>
>> /* Plus final descriptor */
>> vq->vq.num_free++;
>> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>> {
>> struct vring_virtqueue *vq = to_vvq(_vq);
>> void *ret;
>> - unsigned int i;
>> + unsigned int i, j;
>> u16 last_used;
>>
>> START_USE(vq);
>> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>> /* Only get used array entries after they have been exposed by host. */
>> virtio_rmb(vq->weak_barriers);
>>
>> - last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> - i = virtio32_to_cpu(_vq->vdev,
>> - vq->split.vring.used->ring[last_used].id);
>> - *len = virtio32_to_cpu(_vq->vdev,
>> - vq->split.vring.used->ring[last_used].len);
>> + if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
>> + /* Skip used ring and get used desc in order*/
>> + i = vq->split.next_desc_begin;
>> + j = i;
>> + /* Indirect only takes one descriptor in descriptor table */
>> + while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
>> + j = (j + 1) % vq->split.vring.num;
>
>
> Let's move the expensive mod outside the loop. Or it's split so we can
> use and here actually since the size is guaranteed to be power of the
> two? Another question, is it better to store the next_desc in e.g
> desc_extra?
>
> And this seems very expensive if the device doesn't do the batching
> (which is not mandatory).
>
>
>> + /* move to next */
>> + j = (j + 1) % vq->split.vring.num;
>> + /* Next buffer will use this descriptor in order */
>> + vq->split.next_desc_begin = j;
>> + if (!vq->indirect) {
>> + *len = vq->split.desc_extra[i].len;
>> + } else {
>> + struct vring_desc *indir_desc =
>> + vq->split.desc_state[i].indir_desc;
>> + u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
>> +
>> + if (indir_desc) {
>> + for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
>> + buffer_len += indir_desc[j].len;
>
>
> So I think we need to finalize this, then we can have much more stress
> on the cache:
>
> https://lkml.org/lkml/2021/10/26/1300
>
> It was reverted since it's too aggressive, we should instead:
>
> 1) do the validation only for morden device
>
> 2) fail only when we enable the validation via (e.g a module parameter).
>
> Thanks
>

Sorry for this obsolete implementation, we will not get buffer'len like this(in a loop).
Actually, for not skipped buffers, we can get length from used ring directly, for skipped buffers
I think we don’t have to get the length, because the driver is not interested in the skipped buffers(tx)’ length.

>
>> + }
>> +
>> + *len = buffer_len;
>> + }
>> + } else {
>> + last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> + i = virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].id);
>> + *len = virtio32_to_cpu(_vq->vdev,
>> + vq->split.vring.used->ring[last_used].len);
>> + }
>>
>> if (unlikely(i >= vq->split.vring.num)) {
>> BAD_RING(vq, "id %u out of range\n", i);
>> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int
>> index,
>> vq->split.avail_flags_shadow = 0;
>> vq->split.avail_idx_shadow = 0;
>>
>> + vq->split.next_desc_begin = 0;
>> +
>> /* No callback? Tell other side not to bother us. */
>> if (!callback) {
>> vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;