Re: [PATCH] rcu-tasks: Make shrink down to a single callback queue safely

From: Paul E. McKenney
Date: Fri Dec 02 2022 - 14:57:30 EST


On Wed, Nov 30, 2022 at 01:12:53PM +0800, Zqiang wrote:
> Assume that the current RCU-task belongs to per-CPU callback queuing
> mode and the rcu_task_cb_adjust is true.
>
> CPU0 CPU1
>
> rcu_tasks_need_gpcb()
> ncbsnz == 0 and
> ncbs < rcu_task_collapse_lim
>
> invoke call_rcu_tasks_generic()
> enqueue callback to CPU1
> (CPU1 n_cbs not equal zero)
>
> if (rcu_task_cb_adjust &&
> ncbs <= rcu_task_collapse_lim)
> if (rtp->percpu_enqueue_lim > 1)
> rtp->percpu_enqueue_lim = 1;
> rtp->percpu_dequeue_gpseq =
> get_state_synchronize_rcu();
>
>
> A full RCU grace period has passed

I don't see how this grace period can elapse. The rcu_tasks_need_gpcb()
function is invoked only from rcu_tasks_one_gp(), and while holding
->tasks_gp_mutex.

What am I missing here?

Thanx, Paul

> if (rcu_task_cb_adjust && !ncbsnz &&
> poll_state_synchronize_rcu(
> rtp->percpu_dequeue_gpseq) == true
> if (rtp->percpu_enqueue_lim <
> rtp->percpu_dequeue_lim)
> rtp->percpu_dequeue_lim = 1
> for (cpu = rtp->percpu_dequeue_lim;
> cpu < nr_cpu_ids; cpu++)
> find CPU1 n_cbs is not zero
> trigger warning
>
> The above scenario will not only trigger WARN_ONCE(), but also set the
> rcu_tasks structure's->percpu_dequeue_lim is one when CPU1 still have
> callbacks, which will cause the callback of CPU1 to have no chance to be
> called.
>
> This commit add per-cpu callback check(except CPU0) before set the rcu_tasks
> structure's->percpu_dequeue_lim to one, if other CPUs(except CPU0) still have
> callback, not set the rcu_tasks structure's->percpu_dequeue_lim to one, set it
> until the all CPUs(except CPU0) has no callback.
>
> Signed-off-by: Zqiang <qiang1.zhang@xxxxxxxxx>
> ---
> kernel/rcu/tasks.h | 13 ++++++++-----
> 1 file changed, 8 insertions(+), 5 deletions(-)
>
> diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
> index e4f7d08bde64..690af479074f 100644
> --- a/kernel/rcu/tasks.h
> +++ b/kernel/rcu/tasks.h
> @@ -433,14 +433,17 @@ static int rcu_tasks_need_gpcb(struct rcu_tasks *rtp)
> poll_state_synchronize_rcu(rtp->percpu_dequeue_gpseq)) {
> raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
> if (rtp->percpu_enqueue_lim < rtp->percpu_dequeue_lim) {
> + for (cpu = rtp->percpu_enqueue_lim; cpu < nr_cpu_ids; cpu++) {
> + struct rcu_tasks_percpu *rtpcp = per_cpu_ptr(rtp->rtpcpu, cpu);
> +
> + if(rcu_segcblist_n_cbs(&rtpcp->cblist)) {
> + raw_spin_unlock_irqrestore(&rtp->cbs_gbl_lock, flags);
> + return needgpcb;
> + }
> + }
> WRITE_ONCE(rtp->percpu_dequeue_lim, 1);
> pr_info("Completing switch %s to CPU-0 callback queuing.\n", rtp->name);
> }
> - for (cpu = rtp->percpu_dequeue_lim; cpu < nr_cpu_ids; cpu++) {
> - struct rcu_tasks_percpu *rtpcp = per_cpu_ptr(rtp->rtpcpu, cpu);
> -
> - WARN_ON_ONCE(rcu_segcblist_n_cbs(&rtpcp->cblist));
> - }
> raw_spin_unlock_irqrestore(&rtp->cbs_gbl_lock, flags);
> }
>
> --
> 2.25.1
>