[PATCH] workqueue: DRAFT: Implement atomic workqueue and convert dmcrypt to use it

From: Tejun Heo
Date: Fri Jan 26 2024 - 18:21:42 EST


---
drivers/md/dm-crypt.c | 36 +-----
include/linux/workqueue.h | 6 +
kernel/workqueue.c | 234 +++++++++++++++++++++++++++---------
kernel/workqueue_internal.h | 3 +
4 files changed, 186 insertions(+), 93 deletions(-)

diff --git a/drivers/md/dm-crypt.c b/drivers/md/dm-crypt.c
index 855b482cbff1..d375285db202 100644
--- a/drivers/md/dm-crypt.c
+++ b/drivers/md/dm-crypt.c
@@ -73,11 +73,8 @@ struct dm_crypt_io {
struct bio *base_bio;
u8 *integrity_metadata;
bool integrity_metadata_from_pool:1;
- bool in_tasklet:1;

struct work_struct work;
- struct tasklet_struct tasklet;
-
struct convert_context ctx;

atomic_t io_pending;
@@ -1762,7 +1759,6 @@ static void crypt_io_init(struct dm_crypt_io *io, struct crypt_config *cc,
io->ctx.r.req = NULL;
io->integrity_metadata = NULL;
io->integrity_metadata_from_pool = false;
- io->in_tasklet = false;
atomic_set(&io->io_pending, 0);
}

@@ -1771,13 +1767,6 @@ static void crypt_inc_pending(struct dm_crypt_io *io)
atomic_inc(&io->io_pending);
}

-static void kcryptd_io_bio_endio(struct work_struct *work)
-{
- struct dm_crypt_io *io = container_of(work, struct dm_crypt_io, work);
-
- bio_endio(io->base_bio);
-}
-
/*
* One of the bios was finished. Check for completion of
* the whole request and correctly clean up the buffer.
@@ -1800,21 +1789,6 @@ static void crypt_dec_pending(struct dm_crypt_io *io)
kfree(io->integrity_metadata);

base_bio->bi_status = error;
-
- /*
- * If we are running this function from our tasklet,
- * we can't call bio_endio() here, because it will call
- * clone_endio() from dm.c, which in turn will
- * free the current struct dm_crypt_io structure with
- * our tasklet. In this case we need to delay bio_endio()
- * execution to after the tasklet is done and dequeued.
- */
- if (io->in_tasklet) {
- INIT_WORK(&io->work, kcryptd_io_bio_endio);
- queue_work(cc->io_queue, &io->work);
- return;
- }
-
bio_endio(base_bio);
}

@@ -2246,11 +2220,6 @@ static void kcryptd_crypt(struct work_struct *work)
kcryptd_crypt_write_convert(io);
}

-static void kcryptd_crypt_tasklet(unsigned long work)
-{
- kcryptd_crypt((struct work_struct *)work);
-}
-
static void kcryptd_queue_crypt(struct dm_crypt_io *io)
{
struct crypt_config *cc = io->cc;
@@ -2263,9 +2232,8 @@ static void kcryptd_queue_crypt(struct dm_crypt_io *io)
* it is being executed with irqs disabled.
*/
if (in_hardirq() || irqs_disabled()) {
- io->in_tasklet = true;
- tasklet_init(&io->tasklet, kcryptd_crypt_tasklet, (unsigned long)&io->work);
- tasklet_schedule(&io->tasklet);
+ INIT_WORK(&io->work, kcryptd_crypt);
+ queue_work(system_atomic_wq, &io->work);
return;
}

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 232baea90a1d..1e4938b5b176 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -353,6 +353,7 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
* Documentation/core-api/workqueue.rst.
*/
enum wq_flags {
+ WQ_ATOMIC = 1 << 0, /* execute in softirq context */
WQ_UNBOUND = 1 << 1, /* not bound to any cpu */
WQ_FREEZABLE = 1 << 2, /* freeze during suspend */
WQ_MEM_RECLAIM = 1 << 3, /* may be used for memory reclaim */
@@ -392,6 +393,9 @@ enum wq_flags {
__WQ_ORDERED = 1 << 17, /* internal: workqueue is ordered */
__WQ_LEGACY = 1 << 18, /* internal: create*_workqueue() */
__WQ_ORDERED_EXPLICIT = 1 << 19, /* internal: alloc_ordered_workqueue() */
+
+ /* atomic wq only allows the following flags */
+ __WQ_ATOMIC_ALLOWS = WQ_ATOMIC | WQ_HIGHPRI,
};

enum wq_consts {
@@ -442,6 +446,8 @@ extern struct workqueue_struct *system_unbound_wq;
extern struct workqueue_struct *system_freezable_wq;
extern struct workqueue_struct *system_power_efficient_wq;
extern struct workqueue_struct *system_freezable_power_efficient_wq;
+extern struct workqueue_struct *system_atomic_wq;
+extern struct workqueue_struct *system_atomic_highpri_wq;

/**
* alloc_workqueue - allocate a workqueue
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 23740c9ed57a..2a8f21494676 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -73,7 +73,8 @@ enum worker_pool_flags {
* wq_pool_attach_mutex to avoid changing binding state while
* worker_attach_to_pool() is in progress.
*/
- POOL_MANAGER_ACTIVE = 1 << 0, /* being managed */
+ POOL_ATOMIC = 1 << 0, /* is an atomic pool */
+ POOL_MANAGER_ACTIVE = 1 << 1, /* being managed */
POOL_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
};

@@ -115,6 +116,14 @@ enum wq_internal_consts {
WQ_NAME_LEN = 32,
};

+/*
+ * We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and
+ * MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because
+ * msecs_to_jiffies() can't be an initializer.
+ */
+#define ATOMIC_WORKER_JIFFIES msecs_to_jiffies(2)
+#define ATOMIC_WORKER_RESTARTS 10
+
/*
* Structure fields follow one of the following exclusion rules.
*
@@ -441,8 +450,13 @@ static bool wq_debug_force_rr_cpu = false;
#endif
module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);

+/* the atomic worker pools */
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+ atomic_worker_pools);
+
/* the per-cpu worker pools */
-static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+ cpu_worker_pools);

static DEFINE_IDR(worker_pool_idr); /* PR: idr of all pools */

@@ -476,8 +490,13 @@ struct workqueue_struct *system_power_efficient_wq __ro_after_init;
EXPORT_SYMBOL_GPL(system_power_efficient_wq);
struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
+struct workqueue_struct *system_atomic_wq;
+EXPORT_SYMBOL_GPL(system_atomic_wq);
+struct workqueue_struct *system_atomic_highpri_wq;
+EXPORT_SYMBOL_GPL(system_atomic_highpri_wq);

static int worker_thread(void *__worker);
+static void atomic_worker_taskletfn(struct tasklet_struct *tasklet);
static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
static void show_pwq(struct pool_workqueue *pwq);
static void show_one_worker_pool(struct worker_pool *pool);
@@ -496,6 +515,11 @@ static void show_one_worker_pool(struct worker_pool *pool);
!lockdep_is_held(&wq_pool_mutex), \
"RCU, wq->mutex or wq_pool_mutex should be held")

+#define for_each_atomic_worker_pool(pool, cpu) \
+ for ((pool) = &per_cpu(atomic_worker_pools, cpu)[0]; \
+ (pool) < &per_cpu(atomic_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
+ (pool)++)
+
#define for_each_cpu_worker_pool(pool, cpu) \
for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \
(pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
@@ -1184,6 +1208,14 @@ static bool kick_pool(struct worker_pool *pool)
if (!need_more_worker(pool) || !worker)
return false;

+ if (pool->flags & POOL_ATOMIC) {
+ if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
+ tasklet_hi_schedule(&worker->atomic_tasklet);
+ else
+ tasklet_schedule(&worker->atomic_tasklet);
+ return true;
+ }
+
p = worker->task;

#ifdef CONFIG_SMP
@@ -1663,8 +1695,15 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
lockdep_assert_held(&pool->lock);

if (!nna) {
- /* per-cpu workqueue, pwq->nr_active is sufficient */
- obtained = pwq->nr_active < READ_ONCE(wq->max_active);
+ /*
+ * An atomic workqueue always have a single worker per-cpu and
+ * doesn't impose additional max_active limit. For a per-cpu
+ * workqueue, checking pwq->nr_active is sufficient.
+ */
+ if (wq->flags & WQ_ATOMIC)
+ obtained = true;
+ else
+ obtained = pwq->nr_active < READ_ONCE(wq->max_active);
goto out;
}

@@ -2591,27 +2630,31 @@ static struct worker *create_worker(struct worker_pool *pool)

worker->id = id;

- if (pool->cpu >= 0)
- snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
- pool->attrs->nice < 0 ? "H" : "");
- else
- snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
-
- worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
- "kworker/%s", id_buf);
- if (IS_ERR(worker->task)) {
- if (PTR_ERR(worker->task) == -EINTR) {
- pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
- id_buf);
- } else {
- pr_err_once("workqueue: Failed to create a worker thread: %pe",
- worker->task);
+ if (pool->flags & POOL_ATOMIC) {
+ tasklet_setup(&worker->atomic_tasklet, atomic_worker_taskletfn);
+ } else {
+ if (pool->cpu >= 0)
+ snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
+ pool->attrs->nice < 0 ? "H" : "");
+ else
+ snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
+
+ worker->task = kthread_create_on_node(worker_thread, worker,
+ pool->node, "kworker/%s", id_buf);
+ if (IS_ERR(worker->task)) {
+ if (PTR_ERR(worker->task) == -EINTR) {
+ pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
+ id_buf);
+ } else {
+ pr_err_once("workqueue: Failed to create a worker thread: %pe",
+ worker->task);
+ }
+ goto fail;
}
- goto fail;
- }

- set_user_nice(worker->task, pool->attrs->nice);
- kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
+ set_user_nice(worker->task, pool->attrs->nice);
+ kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
+ }

/* successful, attach the worker to the pool */
worker_attach_to_pool(worker, pool);
@@ -2627,7 +2670,8 @@ static struct worker *create_worker(struct worker_pool *pool)
* check if not woken up soon. As kick_pool() is noop if @pool is empty,
* wake it up explicitly.
*/
- wake_up_process(worker->task);
+ if (worker->task)
+ wake_up_process(worker->task);

raw_spin_unlock_irq(&pool->lock);

@@ -3043,25 +3087,35 @@ __acquires(&pool->lock)
lock_map_release(&lockdep_map);
lock_map_release(&pwq->wq->lockdep_map);

- if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
- rcu_preempt_depth() > 0)) {
- pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
- " last function: %ps\n",
- current->comm, preempt_count(), rcu_preempt_depth(),
- task_pid_nr(current), worker->current_func);
- debug_show_held_locks(current);
- dump_stack();
- }
+ if (worker->task) {
+ if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
+ rcu_preempt_depth() > 0)) {
+ pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
+ " last function: %ps\n",
+ current->comm, preempt_count(),
+ rcu_preempt_depth(), task_pid_nr(current),
+ worker->current_func);
+ debug_show_held_locks(current);
+ dump_stack();
+ }

- /*
- * The following prevents a kworker from hogging CPU on !PREEMPTION
- * kernels, where a requeueing work item waiting for something to
- * happen could deadlock with stop_machine as such work item could
- * indefinitely requeue itself while all other CPUs are trapped in
- * stop_machine. At the same time, report a quiescent RCU state so
- * the same condition doesn't freeze RCU.
- */
- cond_resched();
+ /*
+ * The following prevents a kworker from hogging CPU on
+ * !PREEMPTION kernels, where a requeueing work item waiting for
+ * something to happen could deadlock with stop_machine as such
+ * work item could indefinitely requeue itself while all other
+ * CPUs are trapped in stop_machine. At the same time, report a
+ * quiescent RCU state so the same condition doesn't freeze RCU.
+ */
+ if (worker->task)
+ cond_resched();
+ } else {
+ if (unlikely(lockdep_depth(current) > 0)) {
+ pr_err("BUG: atomic workqueue leaked lock: last function: %ps\n",
+ worker->current_func);
+ debug_show_held_locks(current);
+ }
+ }

raw_spin_lock_irq(&pool->lock);

@@ -3344,6 +3398,44 @@ static int rescuer_thread(void *__rescuer)
goto repeat;
}

+void atomic_worker_taskletfn(struct tasklet_struct *tasklet)
+{
+ struct worker *worker =
+ container_of(tasklet, struct worker, atomic_tasklet);
+ struct worker_pool *pool = worker->pool;
+ int nr_restarts = ATOMIC_WORKER_RESTARTS;
+ unsigned long end = jiffies + ATOMIC_WORKER_JIFFIES;
+
+ raw_spin_lock_irq(&pool->lock);
+ worker_leave_idle(worker);
+
+ /*
+ * This function follows the structure of worker_thread(). See there for
+ * explanations on each step.
+ */
+ if (need_more_worker(pool))
+ goto done;
+
+ WARN_ON_ONCE(!list_empty(&worker->scheduled));
+ worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
+
+ do {
+ struct work_struct *work =
+ list_first_entry(&pool->worklist,
+ struct work_struct, entry);
+
+ if (assign_work(work, worker, NULL))
+ process_scheduled_works(worker);
+ } while (--nr_restarts && time_before(jiffies, end) &&
+ keep_working(pool));
+
+ worker_set_flags(worker, WORKER_PREP);
+done:
+ worker_enter_idle(worker);
+ kick_pool(pool);
+ raw_spin_unlock_irq(&pool->lock);
+}
+
/**
* check_flush_dependency - check for flush dependency sanity
* @target_wq: workqueue being flushed
@@ -5149,6 +5241,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
size_t wq_size;
int name_len;

+ if (flags & WQ_ATOMIC) {
+ if (WARN_ON_ONCE(flags & ~__WQ_ATOMIC_ALLOWS))
+ return NULL;
+ if (WARN_ON_ONCE(max_active))
+ return NULL;
+ }
+
/*
* Unbound && max_active == 1 used to imply ordered, which is no longer
* the case on many machines due to per-pod pools. While
@@ -7094,6 +7193,22 @@ static void __init restrict_unbound_cpumask(const char *name, const struct cpuma
cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, mask);
}

+static void __init init_cpu_worker_pool(struct worker_pool *pool, int cpu, int nice)
+{
+ BUG_ON(init_worker_pool(pool));
+ pool->cpu = cpu;
+ cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
+ cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
+ pool->attrs->nice = nice;
+ pool->attrs->affn_strict = true;
+ pool->node = cpu_to_node(cpu);
+
+ /* alloc pool ID */
+ mutex_lock(&wq_pool_mutex);
+ BUG_ON(worker_pool_assign_id(pool));
+ mutex_unlock(&wq_pool_mutex);
+}
+
/**
* workqueue_init_early - early init for workqueue subsystem
*
@@ -7149,25 +7264,19 @@ void __init workqueue_init_early(void)
pt->pod_node[0] = NUMA_NO_NODE;
pt->cpu_pod[0] = 0;

- /* initialize CPU pools */
+ /* initialize atomic and CPU pools */
for_each_possible_cpu(cpu) {
struct worker_pool *pool;

i = 0;
- for_each_cpu_worker_pool(pool, cpu) {
- BUG_ON(init_worker_pool(pool));
- pool->cpu = cpu;
- cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
- cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
- pool->attrs->nice = std_nice[i++];
- pool->attrs->affn_strict = true;
- pool->node = cpu_to_node(cpu);
-
- /* alloc pool ID */
- mutex_lock(&wq_pool_mutex);
- BUG_ON(worker_pool_assign_id(pool));
- mutex_unlock(&wq_pool_mutex);
+ for_each_atomic_worker_pool(pool, cpu) {
+ init_cpu_worker_pool(pool, cpu, std_nice[i++]);
+ pool->flags |= POOL_ATOMIC;
}
+
+ i = 0;
+ for_each_cpu_worker_pool(pool, cpu)
+ init_cpu_worker_pool(pool, cpu, std_nice[i++]);
}

/* create default unbound and ordered wq attrs */
@@ -7200,10 +7309,14 @@ void __init workqueue_init_early(void)
system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_pwr_efficient",
WQ_FREEZABLE | WQ_POWER_EFFICIENT,
0);
+ system_atomic_wq = alloc_workqueue("system_atomic_wq", WQ_ATOMIC, 0);
+ system_atomic_highpri_wq = alloc_workqueue("system_atomic_highpri_wq",
+ WQ_ATOMIC | WQ_HIGHPRI, 0);
BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
!system_unbound_wq || !system_freezable_wq ||
!system_power_efficient_wq ||
- !system_freezable_power_efficient_wq);
+ !system_freezable_power_efficient_wq ||
+ !system_atomic_wq || !system_atomic_highpri_wq);
}

static void __init wq_cpu_intensive_thresh_init(void)
@@ -7269,9 +7382,10 @@ void __init workqueue_init(void)
* up. Also, create a rescuer for workqueues that requested it.
*/
for_each_possible_cpu(cpu) {
- for_each_cpu_worker_pool(pool, cpu) {
+ for_each_atomic_worker_pool(pool, cpu)
+ pool->node = cpu_to_node(cpu);
+ for_each_cpu_worker_pool(pool, cpu)
pool->node = cpu_to_node(cpu);
- }
}

list_for_each_entry(wq, &workqueues, list) {
@@ -7284,6 +7398,8 @@ void __init workqueue_init(void)

/* create the initial workers */
for_each_online_cpu(cpu) {
+ for_each_atomic_worker_pool(pool, cpu)
+ BUG_ON(!create_worker(pool));
for_each_cpu_worker_pool(pool, cpu) {
pool->flags &= ~POOL_DISASSOCIATED;
BUG_ON(!create_worker(pool));
diff --git a/kernel/workqueue_internal.h b/kernel/workqueue_internal.h
index f6275944ada7..f65f204f38ea 100644
--- a/kernel/workqueue_internal.h
+++ b/kernel/workqueue_internal.h
@@ -10,6 +10,7 @@

#include <linux/workqueue.h>
#include <linux/kthread.h>
+#include <linux/interrupt.h>
#include <linux/preempt.h>

struct worker_pool;
@@ -42,6 +43,8 @@ struct worker {
struct list_head scheduled; /* L: scheduled works */

struct task_struct *task; /* I: worker task */
+ struct tasklet_struct atomic_tasklet; /* I: tasklet for atomic pool */
+
struct worker_pool *pool; /* A: the associated pool */
/* L: for rescuers */
struct list_head node; /* A: anchored at pool->workers */
--
2.43.0