[RFC PATCH 02/18] percpu_ref: make ref stable after percpu_ref_switch_to_atomic_sync() returns

From: Qi Zheng
Date: Fri Apr 29 2022 - 09:36:36 EST


In the percpu_ref_call_confirm_rcu(), we call the wake_up_all()
before calling percpu_ref_put(), which will cause the value of
percpu_ref to be unstable when percpu_ref_switch_to_atomic_sync()
returns.

CPU0 CPU1

percpu_ref_switch_to_atomic_sync(&ref)
--> percpu_ref_switch_to_atomic(&ref)
--> percpu_ref_get(ref); /* put after confirmation */
call_rcu(&ref->data->rcu, percpu_ref_switch_to_atomic_rcu);

percpu_ref_switch_to_atomic_rcu
--> percpu_ref_call_confirm_rcu
--> data->confirm_switch = NULL;
wake_up_all(&percpu_ref_switch_waitq);

/* here waiting to wake up */
wait_event(percpu_ref_switch_waitq, !ref->data->confirm_switch);
(A)percpu_ref_put(ref);
/* The value of &ref is unstable! */
percpu_ref_is_zero(&ref)
(B)percpu_ref_put(ref);

As shown above, assuming that the counts on each cpu add up to 0 before
calling percpu_ref_switch_to_atomic_sync(), we expect that after switching
to atomic mode, percpu_ref_is_zero() can return true. But actually it will
return different values in the two cases of A and B, which is not what
we expected.

Now there are two users of percpu_ref_switch_to_atomic_sync() in the kernel:

i. mddev->writes_pending in the driver/md/md.c
ii. q->q_usage_counter in the block/blk-pm.c

And they are all used as shown above. In the worst case, percpu_ref_is_zero()
may not hold because of the case B every time. While this is unlikely to occur
in a production environment, it is a problem.

This patch moves percpu_ref_put() out of the rcu handler and call it after
wait_event(), which can makes ref stable after percpu_ref_switch_to_atomic_sync()
returns. Then in the example above, percpu_ref_is_zero() can see a steady 0 value,
which is what we would expect.

Signed-off-by: Qi Zheng <zhengqi.arch@xxxxxxxxxxxxx>
---
include/linux/percpu-refcount.h | 4 ++-
lib/percpu-refcount.c | 56 +++++++++++++++++++++++----------
2 files changed, 43 insertions(+), 17 deletions(-)

diff --git a/include/linux/percpu-refcount.h b/include/linux/percpu-refcount.h
index d73a1c08c3e3..75844939a965 100644
--- a/include/linux/percpu-refcount.h
+++ b/include/linux/percpu-refcount.h
@@ -98,6 +98,7 @@ struct percpu_ref_data {
percpu_ref_func_t *confirm_switch;
bool force_atomic:1;
bool allow_reinit:1;
+ bool sync:1;
struct rcu_head rcu;
struct percpu_ref *ref;
};
@@ -123,7 +124,8 @@ int __must_check percpu_ref_init(struct percpu_ref *ref,
gfp_t gfp);
void percpu_ref_exit(struct percpu_ref *ref);
void percpu_ref_switch_to_atomic(struct percpu_ref *ref,
- percpu_ref_func_t *confirm_switch);
+ percpu_ref_func_t *confirm_switch,
+ bool sync);
void percpu_ref_switch_to_atomic_sync(struct percpu_ref *ref);
void percpu_ref_switch_to_percpu(struct percpu_ref *ref);
void percpu_ref_kill_and_confirm(struct percpu_ref *ref,
diff --git a/lib/percpu-refcount.c b/lib/percpu-refcount.c
index af9302141bcf..3a8906715e09 100644
--- a/lib/percpu-refcount.c
+++ b/lib/percpu-refcount.c
@@ -99,6 +99,7 @@ int percpu_ref_init(struct percpu_ref *ref, percpu_ref_func_t *release,
data->release = release;
data->confirm_switch = NULL;
data->ref = ref;
+ data->sync = false;
ref->data = data;
return 0;
}
@@ -146,21 +147,33 @@ void percpu_ref_exit(struct percpu_ref *ref)
}
EXPORT_SYMBOL_GPL(percpu_ref_exit);

+static inline void percpu_ref_switch_to_atomic_post(struct percpu_ref *ref)
+{
+ struct percpu_ref_data *data = ref->data;
+
+ if (!data->allow_reinit)
+ __percpu_ref_exit(ref);
+
+ /* drop ref from percpu_ref_switch_to_atomic() */
+ percpu_ref_put(ref);
+}
+
static void percpu_ref_call_confirm_rcu(struct rcu_head *rcu)
{
struct percpu_ref_data *data = container_of(rcu,
struct percpu_ref_data, rcu);
struct percpu_ref *ref = data->ref;
+ bool need_put = true;
+
+ if (data->sync)
+ need_put = data->sync = false;

data->confirm_switch(ref);
data->confirm_switch = NULL;
wake_up_all(&percpu_ref_switch_waitq);

- if (!data->allow_reinit)
- __percpu_ref_exit(ref);
-
- /* drop ref from percpu_ref_switch_to_atomic() */
- percpu_ref_put(ref);
+ if (need_put)
+ percpu_ref_switch_to_atomic_post(ref);
}

static void percpu_ref_switch_to_atomic_rcu(struct rcu_head *rcu)
@@ -210,14 +223,19 @@ static void percpu_ref_noop_confirm_switch(struct percpu_ref *ref)
}

static void __percpu_ref_switch_to_atomic(struct percpu_ref *ref,
- percpu_ref_func_t *confirm_switch)
+ percpu_ref_func_t *confirm_switch,
+ bool sync)
{
if (ref->percpu_count_ptr & __PERCPU_REF_ATOMIC) {
if (confirm_switch)
confirm_switch(ref);
+ if (sync)
+ percpu_ref_get(ref);
return;
}

+ ref->data->sync = sync;
+
/* switching from percpu to atomic */
ref->percpu_count_ptr |= __PERCPU_REF_ATOMIC;

@@ -232,13 +250,16 @@ static void __percpu_ref_switch_to_atomic(struct percpu_ref *ref,
call_rcu(&ref->data->rcu, percpu_ref_switch_to_atomic_rcu);
}

-static void __percpu_ref_switch_to_percpu(struct percpu_ref *ref)
+static void __percpu_ref_switch_to_percpu(struct percpu_ref *ref, bool sync)
{
unsigned long __percpu *percpu_count = percpu_count_ptr(ref);
int cpu;

BUG_ON(!percpu_count);

+ if (sync)
+ percpu_ref_get(ref);
+
if (!(ref->percpu_count_ptr & __PERCPU_REF_ATOMIC))
return;

@@ -261,7 +282,8 @@ static void __percpu_ref_switch_to_percpu(struct percpu_ref *ref)
}

static void __percpu_ref_switch_mode(struct percpu_ref *ref,
- percpu_ref_func_t *confirm_switch)
+ percpu_ref_func_t *confirm_switch,
+ bool sync)
{
struct percpu_ref_data *data = ref->data;

@@ -276,9 +298,9 @@ static void __percpu_ref_switch_mode(struct percpu_ref *ref,
percpu_ref_switch_lock);

if (data->force_atomic || percpu_ref_is_dying(ref))
- __percpu_ref_switch_to_atomic(ref, confirm_switch);
+ __percpu_ref_switch_to_atomic(ref, confirm_switch, sync);
else
- __percpu_ref_switch_to_percpu(ref);
+ __percpu_ref_switch_to_percpu(ref, sync);
}

/**
@@ -302,14 +324,15 @@ static void __percpu_ref_switch_mode(struct percpu_ref *ref,
* switching to atomic mode, this function can be called from any context.
*/
void percpu_ref_switch_to_atomic(struct percpu_ref *ref,
- percpu_ref_func_t *confirm_switch)
+ percpu_ref_func_t *confirm_switch,
+ bool sync)
{
unsigned long flags;

spin_lock_irqsave(&percpu_ref_switch_lock, flags);

ref->data->force_atomic = true;
- __percpu_ref_switch_mode(ref, confirm_switch);
+ __percpu_ref_switch_mode(ref, confirm_switch, sync);

spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
}
@@ -325,8 +348,9 @@ EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic);
*/
void percpu_ref_switch_to_atomic_sync(struct percpu_ref *ref)
{
- percpu_ref_switch_to_atomic(ref, NULL);
+ percpu_ref_switch_to_atomic(ref, NULL, true);
wait_event(percpu_ref_switch_waitq, !ref->data->confirm_switch);
+ percpu_ref_switch_to_atomic_post(ref);
}
EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic_sync);

@@ -355,7 +379,7 @@ void percpu_ref_switch_to_percpu(struct percpu_ref *ref)
spin_lock_irqsave(&percpu_ref_switch_lock, flags);

ref->data->force_atomic = false;
- __percpu_ref_switch_mode(ref, NULL);
+ __percpu_ref_switch_mode(ref, NULL, false);

spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
}
@@ -390,7 +414,7 @@ void percpu_ref_kill_and_confirm(struct percpu_ref *ref,
ref->data->release);

ref->percpu_count_ptr |= __PERCPU_REF_DEAD;
- __percpu_ref_switch_mode(ref, confirm_kill);
+ __percpu_ref_switch_mode(ref, confirm_kill, false);
percpu_ref_put(ref);

spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
@@ -470,7 +494,7 @@ void percpu_ref_resurrect(struct percpu_ref *ref)

ref->percpu_count_ptr &= ~__PERCPU_REF_DEAD;
percpu_ref_get(ref);
- __percpu_ref_switch_mode(ref, NULL);
+ __percpu_ref_switch_mode(ref, NULL, false);

spin_unlock_irqrestore(&percpu_ref_switch_lock, flags);
}
--
2.20.1