Re: [RFC PATCH 1/3] lib/list_batch: A simple list insertion/deletion batching facility

From: Peter Zijlstra
Date: Wed Jan 27 2016 - 11:35:09 EST


On Tue, Jan 26, 2016 at 11:03:37AM -0500, Waiman Long wrote:
> +static __always_inline void _list_batch_cmd(enum list_batch_cmd cmd,
> + struct list_head *head,
> + struct list_head *entry)
> +{
> + if (cmd == lb_cmd_add)
> + list_add(entry, head);
> + else if (cmd == lb_cmd_del)
> + list_del(entry);
> + else /* cmd == lb_cmd_del_init */
> + list_del_init(entry);

Maybe use switch(), GCC has fancy warns with enums and switch().

> +}

> +static inline void do_list_batch(enum list_batch_cmd cmd, spinlock_t *lock,
> + struct list_batch *batch,
> + struct list_head *entry)
> +{
> + /*
> + * Fast path
> + */
> + if (spin_trylock(lock)) {
> + _list_batch_cmd(cmd, batch->list, entry);
> + spin_unlock(lock);

This is still quite a lot of code for an inline function

> + return;
> + }
> + do_list_batch_slowpath(cmd, lock, batch, entry);
> +}



> +void do_list_batch_slowpath(enum list_batch_cmd cmd, spinlock_t *lock,
> + struct list_batch *batch, struct list_head *entry)
> +{
> + struct list_batch_qnode node, *prev, *next, *nptr;
> + int loop;
> +
> + /*
> + * Put itself into the list_batch queue
> + */
> + node.next = NULL;
> + node.entry = entry;
> + node.cmd = cmd;
> + node.state = lb_state_waiting;
> +

Here we rely on the release barrier implied by xchg() to ensure the node
initialization is complete before the xchg() publishes the thing.

But do we also need the acquire part of this barrier? From what I could
tell, the primitive as a whole does not imply any ordering.

> + prev = xchg(&batch->tail, &node);
> +
> + if (prev) {
> + WRITE_ONCE(prev->next, &node);
> + while (READ_ONCE(node.state) == lb_state_waiting)
> + cpu_relax();
> + if (node.state == lb_state_done)
> + return;
> + WARN_ON(node.state != lb_state_batch);
> + }
> +
> + /*
> + * We are now the queue head, we shold now acquire the lock and
> + * process a batch of qnodes.
> + */
> + loop = LB_BATCH_SIZE;

Have you tried different sizes?

> + next = &node;
> + spin_lock(lock);
> +
> +do_list_again:
> + do {
> + nptr = next;
> + _list_batch_cmd(nptr->cmd, batch->list, nptr->entry);
> + next = READ_ONCE(nptr->next);
> + /*
> + * As soon as the state is marked lb_state_done, we
> + * can no longer assume the content of *nptr as valid.
> + * So we have to hold off marking it done until we no
> + * longer need its content.
> + *
> + * The release barrier here is to make sure that we
> + * won't access its content after marking it done.
> + */
> + if (next)
> + smp_store_release(&nptr->state, lb_state_done);
> + } while (--loop && next);
> + if (!next) {
> + /*
> + * The queue tail should equal to nptr, so clear it to
> + * mark the queue as empty.
> + */
> + if (cmpxchg(&batch->tail, nptr, NULL) != nptr) {
> + /*
> + * Queue not empty, wait until the next pointer is
> + * initialized.
> + */
> + while (!(next = READ_ONCE(nptr->next)))
> + cpu_relax();
> + }
> + /* The above cmpxchg acts as a memory barrier */

for what? :-)

Also, if that cmpxchg() fails, it very much does _not_ act as one.

I suspect you want smp_store_release() setting the state_done, just as
above, and then use cmpxchg_relaxed().

> + WRITE_ONCE(nptr->state, lb_state_done);
> + }
> + if (next) {
> + if (loop)
> + goto do_list_again; /* More qnodes to process */
> + /*
> + * Mark the next qnode as head to process the next batch
> + * of qnodes. The new queue head cannot proceed until we
> + * release the lock.
> + */
> + WRITE_ONCE(next->state, lb_state_batch);
> + }
> + spin_unlock(lock);
> +}
> +EXPORT_SYMBOL_GPL(do_list_batch_slowpath);