[PATCH] rcu-tasks: Make shrink down to a single callback queue safely

From: Zqiang
Date: Wed Nov 30 2022 - 00:07:18 EST


Assume that the current RCU-task belongs to per-CPU callback queuing
mode and the rcu_task_cb_adjust is true.

CPU0 CPU1

rcu_tasks_need_gpcb()
ncbsnz == 0 and
ncbs < rcu_task_collapse_lim

invoke call_rcu_tasks_generic()
enqueue callback to CPU1
(CPU1 n_cbs not equal zero)

if (rcu_task_cb_adjust &&
ncbs <= rcu_task_collapse_lim)
if (rtp->percpu_enqueue_lim > 1)
rtp->percpu_enqueue_lim = 1;
rtp->percpu_dequeue_gpseq =
get_state_synchronize_rcu();


A full RCU grace period has passed


if (rcu_task_cb_adjust && !ncbsnz &&
poll_state_synchronize_rcu(
rtp->percpu_dequeue_gpseq) == true
if (rtp->percpu_enqueue_lim <
rtp->percpu_dequeue_lim)
rtp->percpu_dequeue_lim = 1
for (cpu = rtp->percpu_dequeue_lim;
cpu < nr_cpu_ids; cpu++)
find CPU1 n_cbs is not zero
trigger warning

The above scenario will not only trigger WARN_ONCE(), but also set the
rcu_tasks structure's->percpu_dequeue_lim is one when CPU1 still have
callbacks, which will cause the callback of CPU1 to have no chance to be
called.

This commit add per-cpu callback check(except CPU0) before set the rcu_tasks
structure's->percpu_dequeue_lim to one, if other CPUs(except CPU0) still have
callback, not set the rcu_tasks structure's->percpu_dequeue_lim to one, set it
until the all CPUs(except CPU0) has no callback.

Signed-off-by: Zqiang <qiang1.zhang@xxxxxxxxx>
---
kernel/rcu/tasks.h | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
index e4f7d08bde64..690af479074f 100644
--- a/kernel/rcu/tasks.h
+++ b/kernel/rcu/tasks.h
@@ -433,14 +433,17 @@ static int rcu_tasks_need_gpcb(struct rcu_tasks *rtp)
poll_state_synchronize_rcu(rtp->percpu_dequeue_gpseq)) {
raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
if (rtp->percpu_enqueue_lim < rtp->percpu_dequeue_lim) {
+ for (cpu = rtp->percpu_enqueue_lim; cpu < nr_cpu_ids; cpu++) {
+ struct rcu_tasks_percpu *rtpcp = per_cpu_ptr(rtp->rtpcpu, cpu);
+
+ if(rcu_segcblist_n_cbs(&rtpcp->cblist)) {
+ raw_spin_unlock_irqrestore(&rtp->cbs_gbl_lock, flags);
+ return needgpcb;
+ }
+ }
WRITE_ONCE(rtp->percpu_dequeue_lim, 1);
pr_info("Completing switch %s to CPU-0 callback queuing.\n", rtp->name);
}
- for (cpu = rtp->percpu_dequeue_lim; cpu < nr_cpu_ids; cpu++) {
- struct rcu_tasks_percpu *rtpcp = per_cpu_ptr(rtp->rtpcpu, cpu);
-
- WARN_ON_ONCE(rcu_segcblist_n_cbs(&rtpcp->cblist));
- }
raw_spin_unlock_irqrestore(&rtp->cbs_gbl_lock, flags);
}

--
2.25.1