[PATCH] rcu_sync: simplify the state machine, introduce __rcu_sync_enter()

From: Oleg Nesterov
Date: Sat Jul 16 2016 - 13:10:00 EST


On 07/15, Paul E. McKenney wrote:
>
> On Fri, Jul 15, 2016 at 06:49:39PM +0200, Oleg Nesterov wrote:
> >
> > IOW, please ignore 2/2 which adds PERCPU_RWSEM_READER, the new version
> > just adds rcu_sync_sabotage() which should be renamed (and use GP_PASSED).
>
> OK, then move the checks out into the callers that would have used
> __rcu_sync_enter(). ;-)

Cough, now I can't understand which check do you mean ;) OK, let me
show the code, then we will hopefully understand each other.

----------------------------------------------------------------------
So, as you can see I have fooled you ;) I'll send the patch on top of
Peter's changes, this is the (UNTESTED) code with the patch applied.

Peter, Paul, could you review? Do you see any hole?

Why. Firstly, note that the state machine was greatly simplified, and
rsp->cb_state has gone, we have a single "state" variable, gp_state.
Note also the ->sync() op has gone (and actually ->wait() too, see the
"TODO" comment).

GP_IDLE - owned by __rcu_sync_enter() which can only move this state to

GP_ENTER - owned by rcu-callback which moves it to

GP_PASSED - owned by the last rcu_sync_exit() which moves it to

GP_EXIT - owned by rcu-callback which moves it back to GP_IDLE.

Yes, this is a bit simplified, we also have GP_REPLAY, but hopefully
clear.

And, there is another possible transition, GP_ENTER -> GP_IDLE, because
not it is possible to call __rcu_sync_enter() and rcu_sync_exit() in any
state (except obviously they should be balanced), and they do not block.

The only blocking call is __rcu_sync_wait() which actually waits for GP.
Obviously should only be called if gp_count != 0, iow after __rcu_sync_enter.


--------------------------------------------------------------------------
Now, cgroup_init() can simply call __rcu_sync_enter(&cgroup_threadgroup_rwsem)
and switch this sem into the slow mode. Compared to "sabotage" from Peter
this implies the unnecessary call_rcu_sched(), but I hope we can tolerate
this.

And we can even add a runtime knob to switch between "fast" and "slow
aka writer-biased" modes for cgroup_threadgroup_rwsem.

--------------------------------------------------------------------------
And I think __rcu_sync_enter() can have more users. Let's look at
freeze_super(). It calls percpu_down_write() 3 times, and waits for 3 GP's
sequentally.

Now we can add 3 __rcu_sync_enter's at the start and 3 rcu_sync_exit's at
the end (actually we can do better, just to simplify). And again, note
that rcu_sync_exit() will work correctly even if we (say) return -EBUSY,
so rcu_sync_wait and/or percpu_down_write() was not called in between,
and in this case we won't block waiting for GP.

What do you think?

Oleg.
---

// rcu_sync.h: ----------------------------------------------------------------

struct rcu_sync {
int gp_state;
int gp_count;
wait_queue_head_t gp_wait;

struct rcu_head cb_head;
enum rcu_sync_type gp_type;
};

// sync.c ---------------------------------------------------------------------
#include <linux/rcu_sync.h>
#include <linux/sched.h>

#ifdef CONFIG_PROVE_RCU
#define __INIT_HELD(func) .held = func,
#else
#define __INIT_HELD(func)
#endif

static const struct {
void (*call)(struct rcu_head *, void (*)(struct rcu_head *));
void (*wait)(void); // TODO: remove this, see the comment in dtor
#ifdef CONFIG_PROVE_RCU
int (*held)(void);
#endif
} gp_ops[] = {
[RCU_SYNC] = {
.call = call_rcu,
.wait = rcu_barrier,
__INIT_HELD(rcu_read_lock_held)
},
[RCU_SCHED_SYNC] = {
.call = call_rcu_sched,
.wait = rcu_barrier_sched,
__INIT_HELD(rcu_read_lock_sched_held)
},
[RCU_BH_SYNC] = {
.call = call_rcu_bh,
.wait = rcu_barrier_bh,
__INIT_HELD(rcu_read_lock_bh_held)
},
};

#define rss_lock gp_wait.lock

enum { GP_IDLE = 0, GP_ENTER, GP_PASSED, GP_EXIT, GP_REPLAY };

static void rcu_sync_func(struct rcu_head *rcu);

static void rcu_sync_call(struct rcu_sync *rsp)
{
// TODO: THIS IS SUBOPTIMAL. We want to call it directly
// if rcu_blocking_is_gp() == T, but it has might_sleep().
gp_ops[rsp->gp_type].call(&rsp->cb_head, rcu_sync_func);
}

static void rcu_sync_func(struct rcu_head *rcu)
{
struct rcu_sync *rsp = container_of(rcu, struct rcu_sync, cb_head);
unsigned long flags;

BUG_ON(rsp->gp_state == GP_IDLE);
BUG_ON(rsp->gp_state == GP_PASSED);

spin_lock_irqsave(&rsp->rss_lock, flags);
if (rsp->gp_count) {
/*
* We're at least a GP after the first __rcu_sync_enter().
*/
rsp->gp_state = GP_PASSED;
} else if (rsp->gp_state == GP_REPLAY) {
/*
* A new rcu_sync_exit() has happened; requeue the callback
* to catch a later GP.
*/
rsp->gp_state = GP_EXIT;
rcu_sync_call(rsp);
} else {
/*
* We're at least a GP after the last rcu_sync_exit();
* eveybody will now have observed the write side critical
* section. Let 'em rip!.
*
* OR. ->gp_state can be still GP_ENTER if __rcu_sync_wait()
* wasn't called after __rcu_sync_enter(), abort.
*/
rsp->gp_state = GP_IDLE;
}
spin_unlock_irqrestore(&rsp->rss_lock, flags);
}

bool __rcu_sync_enter(struct rcu_sync *rsp)
{
int gp_count, gp_state;

spin_lock_irq(&rsp->rss_lock);
gp_count = rsp->gp_count++;
gp_state = rsp->gp_state;
if (gp_state == GP_IDLE) {
rsp->gp_state = GP_ENTER;
rcu_sync_call(rsp);
}
spin_unlock_irq(&rsp->rss_lock);

BUG_ON(gp_count != 0 && gp_state == GP_IDLE);
BUG_ON(gp_count == 0 && gp_state == GP_PASSED);

return gp_state < GP_PASSED;
}

void __rcu_sync_wait(struct rcu_sync *rsp)
{
BUG_ON(rsp->gp_state == GP_IDLE);
BUG_ON(rsp->gp_count == 0);

wait_event(rsp->gp_wait, rsp->gp_state >= GP_PASSED);
}

void rcu_sync_enter(struct rcu_sync *rsp)
{
if (__rcu_sync_enter(rsp))
__rcu_sync_wait(rsp);
}

void rcu_sync_exit(struct rcu_sync *rsp)
{
BUG_ON(rsp->gp_state == GP_IDLE);
BUG_ON(rsp->gp_count == 0);

spin_lock_irq(&rsp->rss_lock);
if (!--rsp->gp_count) {
if (rsp->gp_state == GP_PASSED) {
rsp->gp_state = GP_EXIT;
rcu_sync_call(rsp);
} else if (rsp->gp_state == GP_EXIT) {
rsp->gp_state = GP_REPLAY;
}
}
spin_unlock_irq(&rsp->rss_lock);
}

void rcu_sync_dtor(struct rcu_sync *rsp)
{
int gp_state;

BUG_ON(rsp->gp_count);
BUG_ON(rsp->gp_state == GP_PASSED);

spin_lock_irq(&rsp->rss_lock);
if (rsp->gp_state == GP_REPLAY)
rsp->gp_state = GP_EXIT;
gp_state = rsp->gp_state;
spin_unlock_irq(&rsp->rss_lock);

// TODO: add another wake_up_locked() into rcu_sync_func(),
// use wait_event + spin_lock_wait, remove gp_ops->wait().
if (gp_state != GP_IDLE) {
gp_ops[rsp->gp_type].wait();
BUG_ON(rsp->gp_state != GP_IDLE);
}
}