[PATCH v2 3/9] [RFC] virtio_ring: Embed a wrap counter in opaque poll index value

From: Cristian Marussi
Date: Tue Feb 01 2022 - 12:16:26 EST


Exported API virtqueue_poll() can be used to support polling mode operation
on top of virtio layer if needed; currently the parameter last_used_idx is
the opaque value that needs to be passed to the virtqueue_poll() function
to check if there are new pending used buffers in the queue: such opaque
value would have been previously obtained by a call to the API function
virtqueue_enable_cb_prepare().

Since such opaque value is indeed containing simply a snapshot in time of
the internal last_used_index (roughly), it is possible that, if exactly
2**16 buffers are marked as used between two successive calls to
virtqueue_poll(), the caller is fooled into thinking that nothing is
pending (ABA problem).

Keep a full fledged internal wraps counter per virtqueue and embed it into
the upper 16bits of the returned opaque value, so that the above scenario
can be detected transparently by virtqueue_poll(): this way each single
possible last_used_idx value is really belonging to a different wrap.

Cc: "Michael S. Tsirkin" <mst@xxxxxxxxxx>
Cc: Igor Skalkin <igor.skalkin@xxxxxxxxxxxxxxx>
Cc: Peter Hilber <peter.hilber@xxxxxxxxxxxxxxx>
Cc: virtualization@xxxxxxxxxxxxxxxxxxxxxxxxxx
Signed-off-by: Cristian Marussi <cristian.marussi@xxxxxxx>
---
Still no perf data on this, I was wondering what exactly to measure in
term of perf metrics to evaluate the impact of the rolling vq->wraps
counter.
---
drivers/virtio/virtio_ring.c | 51 +++++++++++++++++++++++++++++++++---
1 file changed, 47 insertions(+), 4 deletions(-)

diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
index 00f64f2f8b72..613ec0503509 100644
--- a/drivers/virtio/virtio_ring.c
+++ b/drivers/virtio/virtio_ring.c
@@ -12,6 +12,8 @@
#include <linux/hrtimer.h>
#include <linux/dma-mapping.h>
#include <linux/spinlock.h>
+#include <linux/bits.h>
+#include <linux/bitfield.h>
#include <xen/xen.h>

static bool force_used_validation = false;
@@ -69,6 +71,17 @@ module_param(force_used_validation, bool, 0444);
#define LAST_ADD_TIME_INVALID(vq)
#endif

+#define VRING_IDX_MASK GENMASK(15, 0)
+#define VRING_GET_IDX(opaque) \
+ ((u16)FIELD_GET(VRING_IDX_MASK, (opaque)))
+
+#define VRING_WRAPS_MASK GENMASK(31, 16)
+#define VRING_GET_WRAPS(opaque) \
+ ((u16)FIELD_GET(VRING_WRAPS_MASK, (opaque)))
+
+#define VRING_BUILD_OPAQUE(idx, wraps) \
+ (FIELD_PREP(VRING_WRAPS_MASK, (wraps)) | ((idx) & VRING_IDX_MASK))
+
struct vring_desc_state_split {
void *data; /* Data for callback. */
struct vring_desc *indir_desc; /* Indirect descriptor, if any. */
@@ -117,6 +130,8 @@ struct vring_virtqueue {
/* Last used index we've seen. */
u16 last_used_idx;

+ u16 wraps;
+
/* Hint for event idx: already triggered no need to disable. */
bool event_triggered;

@@ -806,6 +821,8 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
ret = vq->split.desc_state[i].data;
detach_buf_split(vq, i, ctx);
vq->last_used_idx++;
+ if (unlikely(!vq->last_used_idx))
+ vq->wraps++;
/* If we expect an interrupt for the next entry, tell host
* by writing event index and flush out the write before
* the read in the next get_buf call. */
@@ -1508,6 +1525,7 @@ static void *virtqueue_get_buf_ctx_packed(struct virtqueue *_vq,
if (unlikely(vq->last_used_idx >= vq->packed.vring.num)) {
vq->last_used_idx -= vq->packed.vring.num;
vq->packed.used_wrap_counter ^= 1;
+ vq->wraps++;
}

/*
@@ -1744,6 +1762,7 @@ static struct virtqueue *vring_create_virtqueue_packed(
vq->weak_barriers = weak_barriers;
vq->broken = false;
vq->last_used_idx = 0;
+ vq->wraps = 0;
vq->event_triggered = false;
vq->num_added = 0;
vq->packed_ring = true;
@@ -2092,13 +2111,17 @@ EXPORT_SYMBOL_GPL(virtqueue_disable_cb);
*/
unsigned virtqueue_enable_cb_prepare(struct virtqueue *_vq)
{
+ unsigned int last_used_idx;
struct vring_virtqueue *vq = to_vvq(_vq);

if (vq->event_triggered)
vq->event_triggered = false;

- return vq->packed_ring ? virtqueue_enable_cb_prepare_packed(_vq) :
- virtqueue_enable_cb_prepare_split(_vq);
+ last_used_idx = vq->packed_ring ?
+ virtqueue_enable_cb_prepare_packed(_vq) :
+ virtqueue_enable_cb_prepare_split(_vq);
+
+ return VRING_BUILD_OPAQUE(last_used_idx, vq->wraps);
}
EXPORT_SYMBOL_GPL(virtqueue_enable_cb_prepare);

@@ -2107,6 +2130,21 @@ EXPORT_SYMBOL_GPL(virtqueue_enable_cb_prepare);
* @_vq: the struct virtqueue we're talking about.
* @last_used_idx: virtqueue state (from call to virtqueue_enable_cb_prepare).
*
+ * The provided last_used_idx, as returned by virtqueue_enable_cb_prepare(),
+ * is an opaque value representing the queue state and it is built as follows:
+ *
+ * ---------------------------------------------------------
+ * | vq->wraps | vq->last_used_idx |
+ * 31------------------------------------------------------0
+ *
+ * The MSB 16bits embedding the wraps counter for the underlying virtqueue
+ * is stripped out here before reaching into the lower layer helpers.
+ *
+ * This structure of the opaque value mitigates the scenario in which, when
+ * exactly 2**16 messages are marked as used between two successive calls to
+ * virtqueue_poll(), the caller is fooled into thinking nothing new has arrived
+ * since the pure last_used_idx is exactly the same.
+ *
* Returns "true" if there are pending used buffers in the queue.
*
* This does not need to be serialized.
@@ -2118,9 +2156,13 @@ bool virtqueue_poll(struct virtqueue *_vq, unsigned last_used_idx)
if (unlikely(vq->broken))
return false;

+ if (unlikely(vq->wraps != VRING_GET_WRAPS(last_used_idx)))
+ return true;
+
virtio_mb(vq->weak_barriers);
- return vq->packed_ring ? virtqueue_poll_packed(_vq, last_used_idx) :
- virtqueue_poll_split(_vq, last_used_idx);
+ return vq->packed_ring ?
+ virtqueue_poll_packed(_vq, VRING_GET_IDX(last_used_idx)) :
+ virtqueue_poll_split(_vq, VRING_GET_IDX(last_used_idx));
}
EXPORT_SYMBOL_GPL(virtqueue_poll);

@@ -2245,6 +2287,7 @@ struct virtqueue *__vring_new_virtqueue(unsigned int index,
vq->weak_barriers = weak_barriers;
vq->broken = false;
vq->last_used_idx = 0;
+ vq->wraps = 0;
vq->event_triggered = false;
vq->num_added = 0;
vq->use_dma_api = vring_use_dma_api(vdev);
--
2.17.1