Re: [PATCH tip/core/rcu 3/3] rcu: make hot-unplugged CPUrelinquish its own RCU callbacks

From: Mathieu Desnoyers
Date: Tue Sep 29 2009 - 11:57:31 EST


* Paul E. McKenney (paulmck@xxxxxxxxxxxxxxxxxx) wrote:
> On Tue, Sep 29, 2009 at 09:50:17AM -0400, Mathieu Desnoyers wrote:
> > * Paul E. McKenney (paulmck@xxxxxxxxxxxxxxxxxx) wrote:
> > > From: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
> > >
> > > The current interaction between RCU and CPU hotplug requires that
> > > RCU block in CPU notifiers waiting for callbacks to drain. This can
> > > be greatly simplified by haing each CPU relinquish its own callbacks,
> >
> > "having"
>
> I plead jet lag.
>
> > > and for both _rcu_barrier() and CPU_DEAD notifiers to adopt all callbacks
> > > that were previously relinquished. This change also eliminates the
> > > possibility of certain types of hangs due to the previous practice of
> > > waiting for callbacks to be invoked from within CPU notifiers. If you
> > > don't every wait, you cannot hang.
> >
> > "ever"
>
> Twice. ;-)
>
> > This idea reminds me a discussion we had at Plumbers, good ! ;)
> >
> > Simplification of RCU vs CPU hotplug interaction will clearly be
> > welcome. How does it deal with many processors going offline at once
> > while the system is under large RCU callback queue loads ?
>
> Right now, the "orphan" variables are protected by the rcu_state
> structure's ->onofflock spinlock. The code appends the CPU's callbacks
> to whatever is already on the orphan list, so it should work in this
> case. If multiple CPUs go offline, they will serialize appending
> their callbacks onto the orphan list.
>
> This single lock might well turn out to be a scalability bottleneck,
> but someone is going to have to demonstrate this to me before I would
> be willing to complicate the code to increase scalability.
>
> > I guess the rationale for letting cpu hotplug wait for callback
> > execution being going offline is close to telling a CPU : "you asked for
> > this callback to be executed, well, _you_ do it before going to sleep.
> > And no, don't try to push this task on your little brothers."
>
> Not only that, but I am getting really paranoid about RCU and CPU
> hotplug waiting on each other. Getting some hangs that this patchset
> seems to fix.
>
> > The only problem I see here is that a CPU could really load up the
> > system and, in the worse case scenario, never execute its own callbacks
> > and let the others do the work. Have you considered this issue ?
>
> Hmmmm... The only way I can see for this to happen is for the CPU to
> generate a bunch of callbacks, be taken offline, be brought online,
> generate more callbacks, and so on. The thing to remember is that
> it typically takes longer than a grace period to take a CPU offline
> and bring it back online, so callbacks should not normally "pile up".
>

Yes, this is also what I think: if someone brings a CPU offline, he
should expect an increased load on other cpus for a short while.
Simplicity is definitely something palatable in this area.

Thanks,

Mathieu

> > (haha ! just seen the "orphan" variables _after_ writing this up) ;)
>
> ;-)
>
> Thanx, Paul
>
> > Thanks,
> >
> > Mathieu
> >
> > >
> > > Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx>
> > > ---
> > > kernel/rcutree.c | 151 ++++++++++++++++++++++++----------------------
> > > kernel/rcutree.h | 11 +++-
> > > kernel/rcutree_plugin.h | 34 +++++++++++
> > > kernel/rcutree_trace.c | 4 +-
> > > 4 files changed, 125 insertions(+), 75 deletions(-)
> > >
> > > diff --git a/kernel/rcutree.c b/kernel/rcutree.c
> > > index 678b2e2..13b016b 100644
> > > --- a/kernel/rcutree.c
> > > +++ b/kernel/rcutree.c
> > > @@ -62,6 +62,9 @@
> > > .gpnum = -300, \
> > > .completed = -300, \
> > > .onofflock = __SPIN_LOCK_UNLOCKED(&name.onofflock), \
> > > + .orphan_cbs_list = NULL, \
> > > + .orphan_cbs_tail = &name.orphan_cbs_list, \
> > > + .orphan_qlen = 0, \
> > > .fqslock = __SPIN_LOCK_UNLOCKED(&name.fqslock), \
> > > .n_force_qs = 0, \
> > > .n_force_qs_ngp = 0, \
> > > @@ -833,17 +836,63 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
> > > #ifdef CONFIG_HOTPLUG_CPU
> > >
> > > /*
> > > + * Move a dying CPU's RCU callbacks to the ->orphan_cbs_list for the
> > > + * specified flavor of RCU. The callbacks will be adopted by the next
> > > + * _rcu_barrier() invocation or by the CPU_DEAD notifier, whichever
> > > + * comes first. Because this is invoked from the CPU_DYING notifier,
> > > + * irqs are already disabled.
> > > + */
> > > +static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
> > > +{
> > > + int i;
> > > + struct rcu_data *rdp = rsp->rda[smp_processor_id()];
> > > +
> > > + if (rdp->nxtlist == NULL)
> > > + return; /* irqs disabled, so comparison is stable. */
> > > + spin_lock(&rsp->onofflock); /* irqs already disabled. */
> > > + *rsp->orphan_cbs_tail = rdp->nxtlist;
> > > + rsp->orphan_cbs_tail = rdp->nxttail[RCU_NEXT_TAIL];
> > > + rdp->nxtlist = NULL;
> > > + for (i = 0; i < RCU_NEXT_SIZE; i++)
> > > + rdp->nxttail[i] = &rdp->nxtlist;
> > > + rsp->orphan_qlen += rdp->qlen;
> > > + rdp->qlen = 0;
> > > + spin_unlock(&rsp->onofflock); /* irqs remain disabled. */
> > > +}
> > > +
> > > +/*
> > > + * Adopt previously orphaned RCU callbacks.
> > > + */
> > > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > > +{
> > > + unsigned long flags;
> > > + struct rcu_data *rdp;
> > > +
> > > + spin_lock_irqsave(&rsp->onofflock, flags);
> > > + rdp = rsp->rda[smp_processor_id()];
> > > + if (rsp->orphan_cbs_list == NULL) {
> > > + spin_unlock_irqrestore(&rsp->onofflock, flags);
> > > + return;
> > > + }
> > > + *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_list;
> > > + rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_cbs_tail;
> > > + rdp->qlen += rsp->orphan_qlen;
> > > + rsp->orphan_cbs_list = NULL;
> > > + rsp->orphan_cbs_tail = &rsp->orphan_cbs_list;
> > > + rsp->orphan_qlen = 0;
> > > + spin_unlock_irqrestore(&rsp->onofflock, flags);
> > > +}
> > > +
> > > +/*
> > > * Remove the outgoing CPU from the bitmasks in the rcu_node hierarchy
> > > * and move all callbacks from the outgoing CPU to the current one.
> > > */
> > > static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
> > > {
> > > - int i;
> > > unsigned long flags;
> > > long lastcomp;
> > > unsigned long mask;
> > > struct rcu_data *rdp = rsp->rda[cpu];
> > > - struct rcu_data *rdp_me;
> > > struct rcu_node *rnp;
> > >
> > > /* Exclude any attempts to start a new grace period. */
> > > @@ -866,32 +915,9 @@ static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp)
> > > } while (rnp != NULL);
> > > lastcomp = rsp->completed;
> > >
> > > - spin_unlock(&rsp->onofflock); /* irqs remain disabled. */
> > > + spin_unlock_irqrestore(&rsp->onofflock, flags);
> > >
> > > - /*
> > > - * Move callbacks from the outgoing CPU to the running CPU.
> > > - * Note that the outgoing CPU is now quiescent, so it is now
> > > - * (uncharacteristically) safe to access its rcu_data structure.
> > > - * Note also that we must carefully retain the order of the
> > > - * outgoing CPU's callbacks in order for rcu_barrier() to work
> > > - * correctly. Finally, note that we start all the callbacks
> > > - * afresh, even those that have passed through a grace period
> > > - * and are therefore ready to invoke. The theory is that hotplug
> > > - * events are rare, and that if they are frequent enough to
> > > - * indefinitely delay callbacks, you have far worse things to
> > > - * be worrying about.
> > > - */
> > > - if (rdp->nxtlist != NULL) {
> > > - rdp_me = rsp->rda[smp_processor_id()];
> > > - *rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
> > > - rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
> > > - rdp->nxtlist = NULL;
> > > - for (i = 0; i < RCU_NEXT_SIZE; i++)
> > > - rdp->nxttail[i] = &rdp->nxtlist;
> > > - rdp_me->qlen += rdp->qlen;
> > > - rdp->qlen = 0;
> > > - }
> > > - local_irq_restore(flags);
> > > + rcu_adopt_orphan_cbs(rsp);
> > > }
> > >
> > > /*
> > > @@ -909,6 +935,14 @@ static void rcu_offline_cpu(int cpu)
> > >
> > > #else /* #ifdef CONFIG_HOTPLUG_CPU */
> > >
> > > +static void rcu_send_cbs_to_orphanage(struct rcu_state *rsp)
> > > +{
> > > +}
> > > +
> > > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > > +{
> > > +}
> > > +
> > > static void rcu_offline_cpu(int cpu)
> > > {
> > > }
> > > @@ -1362,9 +1396,6 @@ static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> > > static atomic_t rcu_barrier_cpu_count;
> > > static DEFINE_MUTEX(rcu_barrier_mutex);
> > > static struct completion rcu_barrier_completion;
> > > -static atomic_t rcu_migrate_type_count = ATOMIC_INIT(0);
> > > -static struct rcu_head rcu_migrate_head[3];
> > > -static DECLARE_WAIT_QUEUE_HEAD(rcu_migrate_wq);
> > >
> > > static void rcu_barrier_callback(struct rcu_head *notused)
> > > {
> > > @@ -1387,21 +1418,16 @@ static void rcu_barrier_func(void *type)
> > > call_rcu_func(head, rcu_barrier_callback);
> > > }
> > >
> > > -static inline void wait_migrated_callbacks(void)
> > > -{
> > > - wait_event(rcu_migrate_wq, !atomic_read(&rcu_migrate_type_count));
> > > - smp_mb(); /* In case we didn't sleep. */
> > > -}
> > > -
> > > /*
> > > * Orchestrate the specified type of RCU barrier, waiting for all
> > > * RCU callbacks of the specified type to complete.
> > > */
> > > -static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
> > > +static void _rcu_barrier(struct rcu_state *rsp,
> > > + void (*call_rcu_func)(struct rcu_head *head,
> > > void (*func)(struct rcu_head *head)))
> > > {
> > > BUG_ON(in_interrupt());
> > > - /* Take cpucontrol mutex to protect against CPU hotplug */
> > > + /* Take mutex to serialize concurrent rcu_barrier() requests. */
> > > mutex_lock(&rcu_barrier_mutex);
> > > init_completion(&rcu_barrier_completion);
> > > /*
> > > @@ -1414,29 +1440,22 @@ static void _rcu_barrier(void (*call_rcu_func)(struct rcu_head *head,
> > > * early.
> > > */
> > > atomic_set(&rcu_barrier_cpu_count, 1);
> > > + preempt_disable(); /* stop CPU_DYING from filling orphan_cbs_list */
> > > + rcu_adopt_orphan_cbs(rsp);
> > > on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
> > > + preempt_enable(); /* CPU_DYING can again fill orphan_cbs_list */
> > > if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> > > complete(&rcu_barrier_completion);
> > > wait_for_completion(&rcu_barrier_completion);
> > > mutex_unlock(&rcu_barrier_mutex);
> > > - wait_migrated_callbacks();
> > > -}
> > > -
> > > -/**
> > > - * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
> > > - */
> > > -void rcu_barrier(void)
> > > -{
> > > - _rcu_barrier(call_rcu);
> > > }
> > > -EXPORT_SYMBOL_GPL(rcu_barrier);
> > >
> > > /**
> > > * rcu_barrier_bh - Wait until all in-flight call_rcu_bh() callbacks complete.
> > > */
> > > void rcu_barrier_bh(void)
> > > {
> > > - _rcu_barrier(call_rcu_bh);
> > > + _rcu_barrier(&rcu_bh_state, call_rcu_bh);
> > > }
> > > EXPORT_SYMBOL_GPL(rcu_barrier_bh);
> > >
> > > @@ -1445,16 +1464,10 @@ EXPORT_SYMBOL_GPL(rcu_barrier_bh);
> > > */
> > > void rcu_barrier_sched(void)
> > > {
> > > - _rcu_barrier(call_rcu_sched);
> > > + _rcu_barrier(&rcu_sched_state, call_rcu_sched);
> > > }
> > > EXPORT_SYMBOL_GPL(rcu_barrier_sched);
> > >
> > > -static void rcu_migrate_callback(struct rcu_head *notused)
> > > -{
> > > - if (atomic_dec_and_test(&rcu_migrate_type_count))
> > > - wake_up(&rcu_migrate_wq);
> > > -}
> > > -
> > > /*
> > > * Do boot-time initialization of a CPU's per-CPU RCU data.
> > > */
> > > @@ -1551,27 +1564,21 @@ int __cpuinit rcu_cpu_notify(struct notifier_block *self,
> > > case CPU_UP_PREPARE_FROZEN:
> > > rcu_online_cpu(cpu);
> > > break;
> > > - case CPU_DOWN_PREPARE:
> > > - case CPU_DOWN_PREPARE_FROZEN:
> > > - /* Don't need to wait until next removal operation. */
> > > - /* rcu_migrate_head is protected by cpu_add_remove_lock */
> > > - wait_migrated_callbacks();
> > > - break;
> > > case CPU_DYING:
> > > case CPU_DYING_FROZEN:
> > > /*
> > > - * preempt_disable() in on_each_cpu() prevents stop_machine(),
> > > + * preempt_disable() in _rcu_barrier() prevents stop_machine(),
> > > * so when "on_each_cpu(rcu_barrier_func, (void *)type, 1);"
> > > - * returns, all online cpus have queued rcu_barrier_func(),
> > > - * and the dead cpu(if it exist) queues rcu_migrate_callback()s.
> > > - *
> > > - * These callbacks ensure _rcu_barrier() waits for all
> > > - * RCU callbacks of the specified type to complete.
> > > + * returns, all online cpus have queued rcu_barrier_func().
> > > + * The dying CPU clears its cpu_online_mask bit and
> > > + * moves all of its RCU callbacks to ->orphan_cbs_list
> > > + * in the context of stop_machine(), so subsequent calls
> > > + * to _rcu_barrier() will adopt these callbacks and only
> > > + * then queue rcu_barrier_func() on all remaining CPUs.
> > > */
> > > - atomic_set(&rcu_migrate_type_count, 3);
> > > - call_rcu_bh(rcu_migrate_head, rcu_migrate_callback);
> > > - call_rcu_sched(rcu_migrate_head + 1, rcu_migrate_callback);
> > > - call_rcu(rcu_migrate_head + 2, rcu_migrate_callback);
> > > + rcu_send_cbs_to_orphanage(&rcu_bh_state);
> > > + rcu_send_cbs_to_orphanage(&rcu_sched_state);
> > > + rcu_preempt_send_cbs_to_orphanage();
> > > break;
> > > case CPU_DEAD:
> > > case CPU_DEAD_FROZEN:
> > > diff --git a/kernel/rcutree.h b/kernel/rcutree.h
> > > index 676eecd..b40ac57 100644
> > > --- a/kernel/rcutree.h
> > > +++ b/kernel/rcutree.h
> > > @@ -244,7 +244,15 @@ struct rcu_state {
> > > /* End of fields guarded by root rcu_node's lock. */
> > >
> > > spinlock_t onofflock; /* exclude on/offline and */
> > > - /* starting new GP. */
> > > + /* starting new GP. Also */
> > > + /* protects the following */
> > > + /* orphan_cbs fields. */
> > > + struct rcu_head *orphan_cbs_list; /* list of rcu_head structs */
> > > + /* orphaned by all CPUs in */
> > > + /* a given leaf rcu_node */
> > > + /* going offline. */
> > > + struct rcu_head **orphan_cbs_tail; /* And tail pointer. */
> > > + long orphan_qlen; /* Number of orphaned cbs. */
> > > spinlock_t fqslock; /* Only one task forcing */
> > > /* quiescent states. */
> > > unsigned long jiffies_force_qs; /* Time at which to invoke */
> > > @@ -305,6 +313,7 @@ void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu));
> > > static int rcu_preempt_pending(int cpu);
> > > static int rcu_preempt_needs_cpu(int cpu);
> > > static void __cpuinit rcu_preempt_init_percpu_data(int cpu);
> > > +static void rcu_preempt_send_cbs_to_orphanage(void);
> > > static void __init __rcu_init_preempt(void);
> > >
> > > #endif /* #else #ifdef RCU_TREE_NONCORE */
> > > diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h
> > > index d88dfd3..2fa3f39 100644
> > > --- a/kernel/rcutree_plugin.h
> > > +++ b/kernel/rcutree_plugin.h
> > > @@ -411,6 +411,15 @@ static int rcu_preempt_needs_cpu(int cpu)
> > > return !!per_cpu(rcu_preempt_data, cpu).nxtlist;
> > > }
> > >
> > > +/**
> > > + * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
> > > + */
> > > +void rcu_barrier(void)
> > > +{
> > > + _rcu_barrier(&rcu_preempt_state, call_rcu);
> > > +}
> > > +EXPORT_SYMBOL_GPL(rcu_barrier);
> > > +
> > > /*
> > > * Initialize preemptable RCU's per-CPU data.
> > > */
> > > @@ -420,6 +429,14 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
> > > }
> > >
> > > /*
> > > + * Move preemptable RCU's callbacks to ->orphan_cbs_list.
> > > + */
> > > +static void rcu_preempt_send_cbs_to_orphanage(void)
> > > +{
> > > + rcu_send_cbs_to_orphanage(&rcu_preempt_state);
> > > +}
> > > +
> > > +/*
> > > * Initialize preemptable RCU's state structures.
> > > */
> > > static void __init __rcu_init_preempt(void)
> > > @@ -565,6 +582,16 @@ static int rcu_preempt_needs_cpu(int cpu)
> > > }
> > >
> > > /*
> > > + * Because preemptable RCU does not exist, rcu_barrier() is just
> > > + * another name for rcu_barrier_sched().
> > > + */
> > > +void rcu_barrier(void)
> > > +{
> > > + rcu_barrier_sched();
> > > +}
> > > +EXPORT_SYMBOL_GPL(rcu_barrier);
> > > +
> > > +/*
> > > * Because preemptable RCU does not exist, there is no per-CPU
> > > * data to initialize.
> > > */
> > > @@ -573,6 +600,13 @@ static void __cpuinit rcu_preempt_init_percpu_data(int cpu)
> > > }
> > >
> > > /*
> > > + * Because there is no preemptable RCU, there are no callbacks to move.
> > > + */
> > > +static void rcu_preempt_send_cbs_to_orphanage(void)
> > > +{
> > > +}
> > > +
> > > +/*
> > > * Because preemptable RCU does not exist, it need not be initialized.
> > > */
> > > static void __init __rcu_init_preempt(void)
> > > diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
> > > index f09af28..4b31c77 100644
> > > --- a/kernel/rcutree_trace.c
> > > +++ b/kernel/rcutree_trace.c
> > > @@ -159,13 +159,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
> > > struct rcu_node *rnp;
> > >
> > > seq_printf(m, "c=%ld g=%ld s=%d jfq=%ld j=%x "
> > > - "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
> > > + "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld\n",
> > > rsp->completed, rsp->gpnum, rsp->signaled,
> > > (long)(rsp->jiffies_force_qs - jiffies),
> > > (int)(jiffies & 0xffff),
> > > rsp->n_force_qs, rsp->n_force_qs_ngp,
> > > rsp->n_force_qs - rsp->n_force_qs_ngp,
> > > - rsp->n_force_qs_lh);
> > > + rsp->n_force_qs_lh, rsp->orphan_qlen);
> > > for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
> > > if (rnp->level != level) {
> > > seq_puts(m, "\n");
> > > --
> > > 1.5.2.5
> > >
> >
> > --
> > Mathieu Desnoyers
> > OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
> > --
> > To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> > the body of a message to majordomo@xxxxxxxxxxxxxxx
> > More majordomo info at http://vger.kernel.org/majordomo-info.html
> > Please read the FAQ at http://www.tux.org/lkml/

--
Mathieu Desnoyers
OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/