[PATCH v3] lib/dlock-list: Scale dlock_lists_empty()

From: Davidlohr Bueso
Date: Fri Nov 03 2017 - 10:24:14 EST


Instead of the current O(N) implementation, at the cost
of adding an atomic counter, we can convert the call to
an atomic_read(). The counter only serves for accounting
empty to non-empty transitions, and vice versa; therefore
only modified twice for each of the lists during the
lifetime of the dlock (while used).

In addition, to be able to unaccount a list_del(), we
add a dlist pointer to each head, thus minimizing the
overall memory footprint.

Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx>
---

Changes from v2: Removed some bogus lines and an overoptimistic
changelog.

include/linux/dlock-list.h | 2 ++
lib/dlock-list.c | 52 +++++++++++++++++++++++++++++++++++++---------
2 files changed, 44 insertions(+), 10 deletions(-)

diff --git a/include/linux/dlock-list.h b/include/linux/dlock-list.h
index c00c7f92ada4..d176a2d00cd1 100644
--- a/include/linux/dlock-list.h
+++ b/include/linux/dlock-list.h
@@ -32,10 +32,12 @@
struct dlock_list_head {
struct list_head list;
spinlock_t lock;
+ struct dlock_list_heads *dlist;
} ____cacheline_aligned_in_smp;

struct dlock_list_heads {
struct dlock_list_head *heads;
+ atomic_t waiters;
};

/*
diff --git a/lib/dlock-list.c b/lib/dlock-list.c
index a4ddecc01b12..5814e42c5b81 100644
--- a/lib/dlock-list.c
+++ b/lib/dlock-list.c
@@ -122,8 +122,11 @@ int __alloc_dlock_list_heads(struct dlock_list_heads *dlist,

INIT_LIST_HEAD(&head->list);
head->lock = __SPIN_LOCK_UNLOCKED(&head->lock);
+ head->dlist = dlist;
lockdep_set_class(&head->lock, key);
}
+
+ atomic_set(&dlist->waiters, 0);
return 0;
}
EXPORT_SYMBOL(__alloc_dlock_list_heads);
@@ -139,29 +142,36 @@ void free_dlock_list_heads(struct dlock_list_heads *dlist)
{
kfree(dlist->heads);
dlist->heads = NULL;
+ atomic_set(&dlist->waiters, 0);
}
EXPORT_SYMBOL(free_dlock_list_heads);

/**
* dlock_lists_empty - Check if all the dlock lists are empty
* @dlist: Pointer to the dlock_list_heads structure
- * Return: true if list is empty, false otherwise.
*
- * This can be a pretty expensive function call. If this function is required
- * in a performance critical path, we may have to maintain a global count
- * of the list entries in the global dlock_list_heads structure instead.
+ * Return: true if all dlock lists are empty, false otherwise.
*/
bool dlock_lists_empty(struct dlock_list_heads *dlist)
{
- int idx;
-
/* Shouldn't be called before nr_dlock_lists is initialized */
WARN_ON_ONCE(!nr_dlock_lists);

- for (idx = 0; idx < nr_dlock_lists; idx++)
- if (!list_empty(&dlist->heads[idx].list))
- return false;
- return true;
+ /*
+ * Serialize dlist->waiters such that a 0->1 transition is not missed
+ * by another thread checking if any of the dlock lists are used.
+ *
+ * CPU0 CPU1
+ * dlock_list_add() dlock_lists_empty()
+ * [S] atomic_inc(waiters);
+ * smp_mb__after_atomic();
+ * smp_mb__before_atomic();
+ * [L] atomic_read(waiters)
+ * list_add()
+ *
+ */
+ smp_mb__before_atomic();
+ return !atomic_read(&dlist->waiters);
}
EXPORT_SYMBOL(dlock_lists_empty);

@@ -179,6 +189,16 @@ void dlock_lists_add(struct dlock_list_node *node,
struct dlock_list_head *head = &dlist->heads[this_cpu_read(cpu2idx)];

/*
+ * Bump the waiters counter _before_ taking the head->lock
+ * such that we don't miss a thread adding itself to a list
+ * while spinning for the lock.
+ */
+ if (list_empty_careful(&head->list)) {
+ atomic_inc(&dlist->waiters);
+ smp_mb__after_atomic();
+ }
+
+ /*
* There is no need to disable preemption
*/
spin_lock(&head->lock);
@@ -212,6 +232,18 @@ void dlock_lists_del(struct dlock_list_node *node)
spin_lock(&head->lock);
if (likely(head == node->head)) {
list_del_init(&node->list);
+ /*
+ * We still hold the head->lock, a normal list_empty()
+ * check will do.
+ */
+ if (list_empty(&head->list)) {
+ struct dlock_list_heads *dlist;
+ dlist = node->head->dlist;
+
+ atomic_dec(&dlist->waiters);
+ smp_mb__after_atomic();
+ }
+
node->head = NULL;
retry = false;
} else {
--
2.13.6