Re: [PATCH v6 1/4] rcu: Make call_rcu() lazy to save power

From: Paul E. McKenney
Date: Sat Sep 24 2022 - 17:12:07 EST


On Sat, Sep 24, 2022 at 12:20:00PM -0400, Joel Fernandes wrote:
> Hi Paul,
> Let’s see whether my iPhone can handle replies ;-)

Now let's see if it can handle replies to replies! Are you using a
keyboard, or small-screen finger gestures?

> More below:
>
> > On Sep 23, 2022, at 5:44 PM, Paul E. McKenney <paulmck@xxxxxxxxxx> wrote:
> >
> > On Thu, Sep 22, 2022 at 10:01:01PM +0000, Joel Fernandes (Google) wrote:
> >> Implement timer-based RCU lazy callback batching. The batch is flushed
> >> whenever a certain amount of time has passed, or the batch on a
> >> particular CPU grows too big. Also memory pressure will flush it in a
> >> future patch.
> >>
> >> To handle several corner cases automagically (such as rcu_barrier() and
> >> hotplug), we re-use bypass lists to handle lazy CBs. The bypass list
> >> length has the lazy CB length included in it. A separate lazy CB length
> >> counter is also introduced to keep track of the number of lazy CBs.
> >>
> >> v5->v6:
> >>
> >> [ Frederic Weisbec: Program the lazy timer only if WAKE_NOT, since other
> >> deferral levels wake much earlier so for those it is not needed. ]
> >>
> >> [ Frederic Weisbec: Use flush flags to keep bypass API code clean. ]
> >>
> >> [ Frederic Weisbec: Make rcu_barrier() wake up only if main list empty. ]
> >>
> >> [ Frederic Weisbec: Remove extra 'else if' branch in rcu_nocb_try_bypass(). ]
> >>
> >> [ Joel: Fix issue where I was not resetting lazy_len after moving it to rdp ]
> >>
> >> [ Paul/Thomas/Joel: Make call_rcu() default lazy so users don't mess up. ]
> >>
> >> Suggested-by: Paul McKenney <paulmck@xxxxxxxxxx>
> >> Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx>
> >
> > I am going to put these on a local branch for testing, but a few comments
> > and questions interspersed below.
> >
> > Thanx, Paul
>
> Ok I replied below, thank you!
>
> >> include/linux/rcupdate.h | 7 ++
> >> kernel/rcu/Kconfig | 8 ++
> >> kernel/rcu/rcu.h | 8 ++
> >> kernel/rcu/tiny.c | 2 +-
> >> kernel/rcu/tree.c | 133 ++++++++++++++++++----------
> >> kernel/rcu/tree.h | 17 +++-
> >> kernel/rcu/tree_exp.h | 2 +-
> >> kernel/rcu/tree_nocb.h | 184 ++++++++++++++++++++++++++++++++-------
> >> 8 files changed, 277 insertions(+), 84 deletions(-)
> >>
> >> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
> >> index 08605ce7379d..40ae36904825 100644
> >> --- a/include/linux/rcupdate.h
> >> +++ b/include/linux/rcupdate.h
> >> @@ -108,6 +108,13 @@ static inline int rcu_preempt_depth(void)
> >>
> >> #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
> >>
> >> +#ifdef CONFIG_RCU_LAZY
> >> +void call_rcu_flush(struct rcu_head *head, rcu_callback_t func);
> >> +#else
> >> +static inline void call_rcu_flush(struct rcu_head *head,
> >> + rcu_callback_t func) { call_rcu(head, func); }
> >> +#endif
> >> +
> >> /* Internal to kernel */
> >> void rcu_init(void);
> >> extern int rcu_scheduler_active;
> >> diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig
> >> index f53ad63b2bc6..edd632e68497 100644
> >> --- a/kernel/rcu/Kconfig
> >> +++ b/kernel/rcu/Kconfig
> >> @@ -314,4 +314,12 @@ config TASKS_TRACE_RCU_READ_MB
> >> Say N here if you hate read-side memory barriers.
> >> Take the default if you are unsure.
> >>
> >> +config RCU_LAZY
> >> + bool "RCU callback lazy invocation functionality"
> >> + depends on RCU_NOCB_CPU
> >> + default n
> >> + help
> >> + To save power, batch RCU callbacks and flush after delay, memory
> >> + pressure or callback list growing too big.
> >> +
> >> endmenu # "RCU Subsystem"
> >> diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
> >> index be5979da07f5..65704cbc9df7 100644
> >> --- a/kernel/rcu/rcu.h
> >> +++ b/kernel/rcu/rcu.h
> >> @@ -474,6 +474,14 @@ enum rcutorture_type {
> >> INVALID_RCU_FLAVOR
> >> };
> >>
> >> +#if defined(CONFIG_RCU_LAZY)
> >> +unsigned long rcu_lazy_get_jiffies_till_flush(void);
> >> +void rcu_lazy_set_jiffies_till_flush(unsigned long j);
> >> +#else
> >> +static inline unsigned long rcu_lazy_get_jiffies_till_flush(void) { return 0; }
> >> +static inline void rcu_lazy_set_jiffies_till_flush(unsigned long j) { }
> >> +#endif
> >> +
> >> #if defined(CONFIG_TREE_RCU)
> >> void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags,
> >> unsigned long *gp_seq);
> >> diff --git a/kernel/rcu/tiny.c b/kernel/rcu/tiny.c
> >> index a33a8d4942c3..810479cf17ba 100644
> >> --- a/kernel/rcu/tiny.c
> >> +++ b/kernel/rcu/tiny.c
> >> @@ -44,7 +44,7 @@ static struct rcu_ctrlblk rcu_ctrlblk = {
> >>
> >> void rcu_barrier(void)
> >> {
> >> - wait_rcu_gp(call_rcu);
> >> + wait_rcu_gp(call_rcu_flush);
> >> }
> >> EXPORT_SYMBOL(rcu_barrier);
> >>
> >> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> >> index 5ec97e3f7468..736d0d724207 100644
> >> --- a/kernel/rcu/tree.c
> >> +++ b/kernel/rcu/tree.c
> >> @@ -2728,47 +2728,8 @@ static void check_cb_ovld(struct rcu_data *rdp)
> >> raw_spin_unlock_rcu_node(rnp);
> >> }
> >>
> >> -/**
> >> - * call_rcu() - Queue an RCU callback for invocation after a grace period.
> >> - * @head: structure to be used for queueing the RCU updates.
> >> - * @func: actual callback function to be invoked after the grace period
> >> - *
> >> - * The callback function will be invoked some time after a full grace
> >> - * period elapses, in other words after all pre-existing RCU read-side
> >> - * critical sections have completed. However, the callback function
> >> - * might well execute concurrently with RCU read-side critical sections
> >> - * that started after call_rcu() was invoked.
> >> - *
> >> - * RCU read-side critical sections are delimited by rcu_read_lock()
> >> - * and rcu_read_unlock(), and may be nested. In addition, but only in
> >> - * v5.0 and later, regions of code across which interrupts, preemption,
> >> - * or softirqs have been disabled also serve as RCU read-side critical
> >> - * sections. This includes hardware interrupt handlers, softirq handlers,
> >> - * and NMI handlers.
> >> - *
> >> - * Note that all CPUs must agree that the grace period extended beyond
> >> - * all pre-existing RCU read-side critical section. On systems with more
> >> - * than one CPU, this means that when "func()" is invoked, each CPU is
> >> - * guaranteed to have executed a full memory barrier since the end of its
> >> - * last RCU read-side critical section whose beginning preceded the call
> >> - * to call_rcu(). It also means that each CPU executing an RCU read-side
> >> - * critical section that continues beyond the start of "func()" must have
> >> - * executed a memory barrier after the call_rcu() but before the beginning
> >> - * of that RCU read-side critical section. Note that these guarantees
> >> - * include CPUs that are offline, idle, or executing in user mode, as
> >> - * well as CPUs that are executing in the kernel.
> >> - *
> >> - * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
> >> - * resulting RCU callback function "func()", then both CPU A and CPU B are
> >> - * guaranteed to execute a full memory barrier during the time interval
> >> - * between the call to call_rcu() and the invocation of "func()" -- even
> >> - * if CPU A and CPU B are the same CPU (but again only if the system has
> >> - * more than one CPU).
> >> - *
> >> - * Implementation of these memory-ordering guarantees is described here:
> >> - * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
> >> - */
> >> -void call_rcu(struct rcu_head *head, rcu_callback_t func)
> >> +static void
> >> +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
> >> {
> >> static atomic_t doublefrees;
> >> unsigned long flags;
> >> @@ -2809,7 +2770,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
> >> }
> >>
> >> check_cb_ovld(rdp);
> >> - if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
> >> + if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
> >> return; // Enqueued onto ->nocb_bypass, so just leave.
> >> // If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
> >> rcu_segcblist_enqueue(&rdp->cblist, head);
> >> @@ -2831,8 +2792,84 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
> >> local_irq_restore(flags);
> >> }
> >> }
> >> -EXPORT_SYMBOL_GPL(call_rcu);
> >>
> >> +#ifdef CONFIG_RCU_LAZY
> >> +/**
> >> + * call_rcu_flush() - Queue RCU callback for invocation after grace period, and
> >> + * flush all lazy callbacks (including the new one) to the main ->cblist while
> >> + * doing so.
> >> + *
> >> + * @head: structure to be used for queueing the RCU updates.
> >> + * @func: actual callback function to be invoked after the grace period
> >> + *
> >> + * The callback function will be invoked some time after a full grace
> >> + * period elapses, in other words after all pre-existing RCU read-side
> >> + * critical sections have completed.
> >> + *
> >> + * Use this API instead of call_rcu() if you don't mind the callback being
> >> + * invoked after very long periods of time on systems without memory pressure
> >> + * and on systems which are lightly loaded or mostly idle.
> >
> > This comment is backwards, right? Shouldn't it say something like "Use
> > this API instead of call_rcu() if you don't mind burning extra power..."?
>
> Yes sorry my mistake :-(. It’s a stale comment from the rework and I’ll update it.

Very good, thank you!

> >> + *
> >> + * Other than the extra delay in callbacks being invoked, this function is
> >> + * identical to, and reuses call_rcu()'s logic. Refer to call_rcu() for more
> >> + * details about memory ordering and other functionality.
> >> + */
> >> +void call_rcu_flush(struct rcu_head *head, rcu_callback_t func)
> >> +{
> >> + return __call_rcu_common(head, func, false);
> >> +}
> >> +EXPORT_SYMBOL_GPL(call_rcu_flush);
> >> +#endif
> >> +
> >> +/**
> >> + * call_rcu() - Queue an RCU callback for invocation after a grace period.
> >> + * By default the callbacks are 'lazy' and are kept hidden from the main
> >> + * ->cblist to prevent starting of grace periods too soon.
> >> + * If you desire grace periods to start very soon, use call_rcu_flush().
> >> + *
> >> + * @head: structure to be used for queueing the RCU updates.
> >> + * @func: actual callback function to be invoked after the grace period
> >> + *
> >> + * The callback function will be invoked some time after a full grace
> >> + * period elapses, in other words after all pre-existing RCU read-side
> >> + * critical sections have completed. However, the callback function
> >> + * might well execute concurrently with RCU read-side critical sections
> >> + * that started after call_rcu() was invoked.
> >> + *
> >> + * RCU read-side critical sections are delimited by rcu_read_lock()
> >> + * and rcu_read_unlock(), and may be nested. In addition, but only in
> >> + * v5.0 and later, regions of code across which interrupts, preemption,
> >> + * or softirqs have been disabled also serve as RCU read-side critical
> >> + * sections. This includes hardware interrupt handlers, softirq handlers,
> >> + * and NMI handlers.
> >> + *
> >> + * Note that all CPUs must agree that the grace period extended beyond
> >> + * all pre-existing RCU read-side critical section. On systems with more
> >> + * than one CPU, this means that when "func()" is invoked, each CPU is
> >> + * guaranteed to have executed a full memory barrier since the end of its
> >> + * last RCU read-side critical section whose beginning preceded the call
> >> + * to call_rcu(). It also means that each CPU executing an RCU read-side
> >> + * critical section that continues beyond the start of "func()" must have
> >> + * executed a memory barrier after the call_rcu() but before the beginning
> >> + * of that RCU read-side critical section. Note that these guarantees
> >> + * include CPUs that are offline, idle, or executing in user mode, as
> >> + * well as CPUs that are executing in the kernel.
> >> + *
> >> + * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
> >> + * resulting RCU callback function "func()", then both CPU A and CPU B are
> >> + * guaranteed to execute a full memory barrier during the time interval
> >> + * between the call to call_rcu() and the invocation of "func()" -- even
> >> + * if CPU A and CPU B are the same CPU (but again only if the system has
> >> + * more than one CPU).
> >> + *
> >> + * Implementation of these memory-ordering guarantees is described here:
> >> + * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
> >> + */
> >> +void call_rcu(struct rcu_head *head, rcu_callback_t func)
> >> +{
> >> + return __call_rcu_common(head, func, true);
> >> +}
> >> +EXPORT_SYMBOL_GPL(call_rcu);
> >>
> >> /* Maximum number of jiffies to wait before draining a batch. */
> >> #define KFREE_DRAIN_JIFFIES (5 * HZ)
> >> @@ -3507,7 +3544,7 @@ void synchronize_rcu(void)
> >> if (rcu_gp_is_expedited())
> >> synchronize_rcu_expedited();
> >> else
> >> - wait_rcu_gp(call_rcu);
> >> + wait_rcu_gp(call_rcu_flush);
> >> return;
> >> }
> >>
> >> @@ -3902,7 +3939,11 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
> >> rdp->barrier_head.func = rcu_barrier_callback;
> >> debug_rcu_head_queue(&rdp->barrier_head);
> >> rcu_nocb_lock(rdp);
> >> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
> >> + /*
> >> + * Flush the bypass list, but also wake up the GP thread as otherwise
> >> + * bypass/lazy CBs maynot be noticed, and can cause real long delays!
> >> + */
> >> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, FLUSH_BP_WAKE));
> >> if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
> >> atomic_inc(&rcu_state.barrier_cpu_count);
> >> } else {
> >> @@ -4323,7 +4364,7 @@ void rcutree_migrate_callbacks(int cpu)
> >> my_rdp = this_cpu_ptr(&rcu_data);
> >> my_rnp = my_rdp->mynode;
> >> rcu_nocb_lock(my_rdp); /* irqs already disabled. */
> >> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies));
> >> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, FLUSH_BP_NONE));
> >> raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
> >> /* Leverage recent GPs and set GP for new callbacks. */
> >> needwake = rcu_advance_cbs(my_rnp, rdp) ||
> >> diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
> >> index d4a97e40ea9c..361c41d642c7 100644
> >> --- a/kernel/rcu/tree.h
> >> +++ b/kernel/rcu/tree.h
> >> @@ -263,14 +263,16 @@ struct rcu_data {
> >> unsigned long last_fqs_resched; /* Time of last rcu_resched(). */
> >> unsigned long last_sched_clock; /* Jiffies of last rcu_sched_clock_irq(). */
> >>
> >> + long lazy_len; /* Length of buffered lazy callbacks. */
> >
> > Do we ever actually care about the length as opposed to whether or not all
> > the bypass callbacks are lazy? If not, a "some_nonlazy" boolean would be
> > initialed to zero and ORed with the non-laziness of the added callback.
> > Or, if there was a test anyway, simply set to 1 in the presence of a
> > non-lazy callback. And as now, gets zeroed when the bypass is flushed.
>
> We had discussed this before, and my point was
> we could use it for tracing and future extension
> as well. If it’s ok with you, I prefer to keep it this
> way than having to come back add the length later.
>
> On a minor point as well, if you really want it this
> way, I am afraid of changing this now since I tested
> this way for last several iterations and it’s easy to
> add a regression. So I prefer to keep it this way and
> then I can add more patches later on top if that’s
> Ok with you.

No, you are right, the debug information might be helpful.

But hey, at least I am consistent!

> > This might shorten a few lines of code.
> >
> >> int cpu;
> >> };
> >>
> >> /* Values for nocb_defer_wakeup field in struct rcu_data. */
> >> #define RCU_NOCB_WAKE_NOT 0
> >> #define RCU_NOCB_WAKE_BYPASS 1
> >> -#define RCU_NOCB_WAKE 2
> >> -#define RCU_NOCB_WAKE_FORCE 3
> >> +#define RCU_NOCB_WAKE_LAZY 2
> >> +#define RCU_NOCB_WAKE 3
> >> +#define RCU_NOCB_WAKE_FORCE 4
> >>
> >> #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
> >> /* For jiffies_till_first_fqs and */
> >> @@ -439,10 +441,17 @@ static void zero_cpu_stall_ticks(struct rcu_data *rdp);
> >> static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
> >> static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
> >> static void rcu_init_one_nocb(struct rcu_node *rnp);
> >> +
> >> +#define FLUSH_BP_NONE 0
> >> +/* Is the CB being enqueued after the flush, a lazy CB? */
> >> +#define FLUSH_BP_LAZY BIT(0)
> >> +/* Wake up nocb-GP thread after flush? */
> >> +#define FLUSH_BP_WAKE BIT(1)
> >> static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >> - unsigned long j);
> >> + unsigned long j, unsigned long flush_flags);
> >> static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >> - bool *was_alldone, unsigned long flags);
> >> + bool *was_alldone, unsigned long flags,
> >> + bool lazy);
> >> static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
> >> unsigned long flags);
> >> static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
> >> diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h
> >> index 18e9b4cd78ef..5cac05600798 100644
> >> --- a/kernel/rcu/tree_exp.h
> >> +++ b/kernel/rcu/tree_exp.h
> >> @@ -937,7 +937,7 @@ void synchronize_rcu_expedited(void)
> >>
> >> /* If expedited grace periods are prohibited, fall back to normal. */
> >> if (rcu_gp_is_normal()) {
> >> - wait_rcu_gp(call_rcu);
> >> + wait_rcu_gp(call_rcu_flush);
> >> return;
> >> }
> >>
> >> diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
> >> index f77a6d7e1356..661c685aba3f 100644
> >> --- a/kernel/rcu/tree_nocb.h
> >> +++ b/kernel/rcu/tree_nocb.h
> >> @@ -256,6 +256,31 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
> >> return __wake_nocb_gp(rdp_gp, rdp, force, flags);
> >> }
> >>
> >> +/*
> >> + * LAZY_FLUSH_JIFFIES decides the maximum amount of time that
> >> + * can elapse before lazy callbacks are flushed. Lazy callbacks
> >> + * could be flushed much earlier for a number of other reasons
> >> + * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are
> >> + * left unsubmitted to RCU after those many jiffies.
> >> + */
> >> +#define LAZY_FLUSH_JIFFIES (10 * HZ)
> >> +static unsigned long jiffies_till_flush = LAZY_FLUSH_JIFFIES;
> >> +
> >> +#ifdef CONFIG_RCU_LAZY
> >> +// To be called only from test code.
> >> +void rcu_lazy_set_jiffies_till_flush(unsigned long jif)
> >> +{
> >> + jiffies_till_flush = jif;
> >> +}
> >> +EXPORT_SYMBOL(rcu_lazy_set_jiffies_till_flush);
> >> +
> >> +unsigned long rcu_lazy_get_jiffies_till_flush(void)
> >> +{
> >> + return jiffies_till_flush;
> >> +}
> >> +EXPORT_SYMBOL(rcu_lazy_get_jiffies_till_flush);
> >> +#endif
> >> +
> >> /*
> >> * Arrange to wake the GP kthread for this NOCB group at some future
> >> * time when it is safe to do so.
> >> @@ -269,10 +294,14 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
> >> raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
> >>
> >> /*
> >> - * Bypass wakeup overrides previous deferments. In case
> >> - * of callback storm, no need to wake up too early.
> >> + * Bypass wakeup overrides previous deferments. In case of
> >> + * callback storm, no need to wake up too early.
> >> */
> >> - if (waketype == RCU_NOCB_WAKE_BYPASS) {
> >> + if (waketype == RCU_NOCB_WAKE_LAZY
> >> + && READ_ONCE(rdp->nocb_defer_wakeup) == RCU_NOCB_WAKE_NOT) {
> >
> > Please leave the "&&" on the previous line and line up the "READ_ONCE("
> > with the "waketype". That makes it easier to tell the condition from
> > the following code.
>
> Will do!
>
> >
> >> + mod_timer(&rdp_gp->nocb_timer, jiffies + jiffies_till_flush);
> >> + WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
> >> + } else if (waketype == RCU_NOCB_WAKE_BYPASS) {
> >> mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
> >> WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
> >> } else {
> >> @@ -293,12 +322,16 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
> >> * proves to be initially empty, just return false because the no-CB GP
> >> * kthread may need to be awakened in this case.
> >> *
> >> + * Return true if there was something to be flushed and it succeeded, otherwise
> >> + * false.
> >> + *
> >> * Note that this function always returns true if rhp is NULL.
> >> */
> >> static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >> - unsigned long j)
> >> + unsigned long j, unsigned long flush_flags)
> >> {
> >> struct rcu_cblist rcl;
> >> + bool lazy = flush_flags & FLUSH_BP_LAZY;
> >>
> >> WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp));
> >> rcu_lockdep_assert_cblist_protected(rdp);
> >> @@ -310,7 +343,20 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >> /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
> >> if (rhp)
> >> rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
> >> - rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
> >> +
> >> + /*
> >> + * If the new CB requested was a lazy one, queue it onto the main
> >> + * ->cblist so we can take advantage of a sooner grade period.
> >
> > "take advantage of a grace period that will happen regardless."?
>
> Sure will update.
>
> >
> >> + */
> >> + if (lazy && rhp) {
> >> + rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, NULL);
> >> + rcu_cblist_enqueue(&rcl, rhp);
> >
> > Would it makes sense to enqueue rhp onto ->nocb_bypass first, NULL out
> > rhp, then let the rcu_cblist_flush_enqueue() be common code? Or did this
> > function grow a later use of rhp that I missed?
>
> No that could be done, but it prefer to keep it this
> way because rhp is a function parameter and I
> prefer not to modify those since it could add a
> bug in future where rhp passed by user is now
> NULL for some reason, half way through the
> function.

I agree that changing a function parameter is bad practice.

So the question becomes whether introducing a local would outweigh
consolidating this code. Could you please at least give it a shot?

> >> + WRITE_ONCE(rdp->lazy_len, 0);
> >> + } else {
> >> + rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
> >> + WRITE_ONCE(rdp->lazy_len, 0);
> >
> > This WRITE_ONCE() can be dropped out of the "if" statement, correct?
>
> Yes will update.

Thank you!

> > If so, this could be an "if" statement with two statements in its "then"
> > clause, no "else" clause, and two statements following the "if" statement.
>
> I don’t think we can get rid of the else part but I’ll see what it looks like.

In the function header, s/rhp/rhp_in/, then:

struct rcu_head *rhp = rhp_in;

And then:

if (lazy && rhp) {
rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
rhp = NULL;
}
rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
WRITE_ONCE(rdp->lazy_len, 0);

Or did I mess something up?

> >> + }
> >> +
> >> rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
> >> WRITE_ONCE(rdp->nocb_bypass_first, j);
> >> rcu_nocb_bypass_unlock(rdp);
> >> @@ -326,13 +372,33 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >> * Note that this function always returns true if rhp is NULL.
> >> */
> >> static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >> - unsigned long j)
> >> + unsigned long j, unsigned long flush_flags)
> >> {
> >> + bool ret;
> >> + bool was_alldone = false;
> >> + bool bypass_all_lazy = false;
> >> + long bypass_ncbs;
> >
> > Alphabetical order by variable name, please. (Yes, I know that this is
> > strange, but what can I say?)
>
> Sure.

Thank you!

> >> +
> >> if (!rcu_rdp_is_offloaded(rdp))
> >> return true;
> >> rcu_lockdep_assert_cblist_protected(rdp);
> >> rcu_nocb_bypass_lock(rdp);
> >> - return rcu_nocb_do_flush_bypass(rdp, rhp, j);
> >> +
> >> + if (flush_flags & FLUSH_BP_WAKE) {
> >> + was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> >> + bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> >> + bypass_all_lazy = bypass_ncbs && (bypass_ncbs == rdp->lazy_len);
> >> + }
> >> +
> >> + ret = rcu_nocb_do_flush_bypass(rdp, rhp, j, flush_flags);
> >> +
> >> + // Wake up the nocb GP thread if needed. GP thread could be sleeping
> >> + // while waiting for lazy timer to expire (otherwise rcu_barrier may
> >> + // end up waiting for the duration of the lazy timer).
> >> + if (flush_flags & FLUSH_BP_WAKE && was_alldone && bypass_all_lazy)
> >> + wake_nocb_gp(rdp, false);
> >> +
> >> + return ret;
> >> }
> >>
> >> /*
> >> @@ -345,7 +411,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
> >> if (!rcu_rdp_is_offloaded(rdp) ||
> >> !rcu_nocb_bypass_trylock(rdp))
> >> return;
> >> - WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
> >> + WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, FLUSH_BP_NONE));
> >> }
> >>
> >> /*
> >> @@ -367,12 +433,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
> >> * there is only one CPU in operation.
> >> */
> >> static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >> - bool *was_alldone, unsigned long flags)
> >> + bool *was_alldone, unsigned long flags,
> >> + bool lazy)
> >> {
> >> unsigned long c;
> >> unsigned long cur_gp_seq;
> >> unsigned long j = jiffies;
> >> long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> >> + bool bypass_is_lazy = (ncbs == READ_ONCE(rdp->lazy_len));
> >>
> >> lockdep_assert_irqs_disabled();
> >>
> >> @@ -417,25 +485,30 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >> // If there hasn't yet been all that many ->cblist enqueues
> >> // this jiffy, tell the caller to enqueue onto ->cblist. But flush
> >> // ->nocb_bypass first.
> >> - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
> >> + // Lazy CBs throttle this back and do immediate bypass queuing.
> >> + if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) {
> >> rcu_nocb_lock(rdp);
> >> *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> >> if (*was_alldone)
> >> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> >> TPS("FirstQ"));
> >> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> >> +
> >> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, FLUSH_BP_NONE));
> >> WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
> >> return false; // Caller must enqueue the callback.
> >> }
> >>
> >> // If ->nocb_bypass has been used too long or is too full,
> >> // flush ->nocb_bypass to ->cblist.
> >> - if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
> >> + if ((ncbs && !bypass_is_lazy && j != READ_ONCE(rdp->nocb_bypass_first)) ||
> >> + (ncbs && bypass_is_lazy &&
> >> + (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush))) ||
> >> ncbs >= qhimark) {
> >> rcu_nocb_lock(rdp);
> >> *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> >>
> >> - if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
> >> + if (!rcu_nocb_flush_bypass(rdp, rhp, j,
> >> + lazy ? FLUSH_BP_LAZY : FLUSH_BP_NONE)) {
> >> if (*was_alldone)
> >> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> >> TPS("FirstQ"));
> >> @@ -460,16 +533,29 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >> // We need to use the bypass.
> >> rcu_nocb_wait_contended(rdp);
> >> rcu_nocb_bypass_lock(rdp);
> >> +
> >> ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> >> rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
> >> rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
> >> +
> >> + if (IS_ENABLED(CONFIG_RCU_LAZY) && lazy)
> >
> > Won't !IS_ENABLED(CONFIG_RCU_LAZY) mean that lazy cannot be set?
> > Why do we need to check both? Or are you going for dead code? If so,
> > shouldn't there be IS_ENABLED(CONFIG_RCU_LAZY) checks above as well?
> >
> > Me, I am not convinced that the dead code would buy you much. In fact,
> > the compiler might well be propagating the constants on its own.
> >
> > Ah! The reason the compiler cannot figure this out is because you put
> > the switch into rcu.h. If you instead always export the call_rcu_flush()
> > definition, and check IS_ENABLED(CONFIG_RCU_LAZY) at the beginning of
> > call_rcu(), the compiler should have the information that it needs to
> > do this for you.
>
> Ah ok, I will try to do it this way.

Very good, thank you, for this and for the rest below.

Thanx, Paul

> >> + WRITE_ONCE(rdp->lazy_len, rdp->lazy_len + 1);
> >> +
> >> if (!ncbs) {
> >> WRITE_ONCE(rdp->nocb_bypass_first, j);
> >> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
> >> }
> >> +
> >> rcu_nocb_bypass_unlock(rdp);
> >> smp_mb(); /* Order enqueue before wake. */
> >> - if (ncbs) {
> >> +
> >> + // A wake up of the grace period kthread or timer adjustment needs to
> >> + // be done only if:
> >> + // 1. Bypass list was fully empty before (this is the first bypass list entry).
> >> + // Or, both the below conditions are met:
> >> + // 1. Bypass list had only lazy CBs before.
> >> + // 2. The new CB is non-lazy.
> >> + if (ncbs && (!bypass_is_lazy || lazy)) {
> >> local_irq_restore(flags);
> >> } else {
> >> // No-CBs GP kthread might be indefinitely asleep, if so, wake.
> >> @@ -499,7 +585,7 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
> >> {
> >> unsigned long cur_gp_seq;
> >> unsigned long j;
> >> - long len;
> >> + long len, lazy_len, bypass_len;
> >> struct task_struct *t;
> >
> > Again, alphabetical please, strange though that might seem.
>
> Yes sure
>
> >
> >> // If we are being polled or there is no kthread, just leave.
> >> @@ -512,9 +598,16 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
> >> }
> >> // Need to actually to a wakeup.
> >> len = rcu_segcblist_n_cbs(&rdp->cblist);
> >> + bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> >> + lazy_len = READ_ONCE(rdp->lazy_len);
> >> if (was_alldone) {
> >> rdp->qlen_last_fqs_check = len;
> >> - if (!irqs_disabled_flags(flags)) {
> >> + // Only lazy CBs in bypass list
> >> + if (lazy_len && bypass_len == lazy_len) {
> >> + rcu_nocb_unlock_irqrestore(rdp, flags);
> >> + wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY,
> >> + TPS("WakeLazy"));
> >> + } else if (!irqs_disabled_flags(flags)) {
> >> /* ... if queue was empty ... */
> >> rcu_nocb_unlock_irqrestore(rdp, flags);
> >> wake_nocb_gp(rdp, false);
> >> @@ -604,8 +697,8 @@ static void nocb_gp_sleep(struct rcu_data *my_rdp, int cpu)
> >> */
> >> static void nocb_gp_wait(struct rcu_data *my_rdp)
> >> {
> >> - bool bypass = false;
> >> - long bypass_ncbs;
> >> + bool bypass = false, lazy = false;
> >> + long bypass_ncbs, lazy_ncbs;
> >
> > And ditto.
>
> Ok
>
> >
> >> int __maybe_unused cpu = my_rdp->cpu;
> >> unsigned long cur_gp_seq;
> >> unsigned long flags;
> >> @@ -640,24 +733,41 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
> >> * won't be ignored for long.
> >> */
> >> list_for_each_entry(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp) {
> >> + bool flush_bypass = false;
> >> +
> >> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
> >> rcu_nocb_lock_irqsave(rdp, flags);
> >> lockdep_assert_held(&rdp->nocb_lock);
> >> bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> >> - if (bypass_ncbs &&
> >> + lazy_ncbs = READ_ONCE(rdp->lazy_len);
> >> +
> >> + if (bypass_ncbs && (lazy_ncbs == bypass_ncbs) &&
> >> + (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush) ||
> >> + bypass_ncbs > 2 * qhimark)) {
> >> + flush_bypass = true;
> >> + } else if (bypass_ncbs && (lazy_ncbs != bypass_ncbs) &&
> >> (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
> >> bypass_ncbs > 2 * qhimark)) {
> >> - // Bypass full or old, so flush it.
> >> - (void)rcu_nocb_try_flush_bypass(rdp, j);
> >> - bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> >> + flush_bypass = true;
> >> } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
> >> rcu_nocb_unlock_irqrestore(rdp, flags);
> >> continue; /* No callbacks here, try next. */
> >> }
> >> +
> >> + if (flush_bypass) {
> >> + // Bypass full or old, so flush it.
> >> + (void)rcu_nocb_try_flush_bypass(rdp, j);
> >> + bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> >> + lazy_ncbs = READ_ONCE(rdp->lazy_len);
> >> + }
> >> +
> >> if (bypass_ncbs) {
> >> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> >> - TPS("Bypass"));
> >> - bypass = true;
> >> + bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass"));
> >> + if (bypass_ncbs == lazy_ncbs)
> >> + lazy = true;
> >> + else
> >> + bypass = true;
> >> }
> >> rnp = rdp->mynode;
> >>
> >> @@ -705,12 +815,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
> >> my_rdp->nocb_gp_gp = needwait_gp;
> >> my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
> >>
> >> - if (bypass && !rcu_nocb_poll) {
> >> - // At least one child with non-empty ->nocb_bypass, so set
> >> - // timer in order to avoid stranding its callbacks.
> >> - wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
> >> - TPS("WakeBypassIsDeferred"));
> >> + // At least one child with non-empty ->nocb_bypass, so set
> >> + // timer in order to avoid stranding its callbacks.
> >> + if (!rcu_nocb_poll) {
> >> + // If bypass list only has lazy CBs. Add a deferred
> >> + // lazy wake up.
> >
> > One sentence rather than two.
>
> Ok
>
> >
> >> + if (lazy && !bypass) {
> >> + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY,
> >> + TPS("WakeLazyIsDeferred"));
> >> + // Otherwise add a deferred bypass wake up.
> >> + } else if (bypass) {
> >> + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
> >> + TPS("WakeBypassIsDeferred"));
> >> + }
> >> }
> >> +
> >> if (rcu_nocb_poll) {
> >> /* Polling, so trace if first poll in the series. */
> >> if (gotcbs)
> >> @@ -1036,7 +1155,7 @@ static long rcu_nocb_rdp_deoffload(void *arg)
> >> * return false, which means that future calls to rcu_nocb_try_bypass()
> >> * will refuse to put anything into the bypass.
> >> */
> >> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
> >> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, FLUSH_BP_NONE));
> >> /*
> >> * Start with invoking rcu_core() early. This way if the current thread
> >> * happens to preempt an ongoing call to rcu_core() in the middle,
> >> @@ -1278,6 +1397,7 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
> >> raw_spin_lock_init(&rdp->nocb_gp_lock);
> >> timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
> >> rcu_cblist_init(&rdp->nocb_bypass);
> >> + WRITE_ONCE(rdp->lazy_len, 0);
> >> mutex_init(&rdp->nocb_gp_kthread_mutex);
> >> }
> >>
> >> @@ -1559,13 +1679,13 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
> >> }
> >>
> >> static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >> - unsigned long j)
> >> + unsigned long j, unsigned long flush_flags)
> >> {
> >> return true;
> >> }
> >>
> >> static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >> - bool *was_alldone, unsigned long flags)
> >> + bool *was_alldone, unsigned long flags, bool lazy)
> >> {
> >> return false;
> >> }
> >> --
> >> 2.37.3.998.g577e59143f-goog
> >>