[RFC PATCH v1 08/37] perf workqueue: add method to execute work on specific CPU

From: Riccardo Mancini
Date: Sat Aug 21 2021 - 05:20:19 EST


This patch adds the possibility to schedule a work item on a specific
CPU.
There are 2 possibilities:
- threads are pinned to a CPU using the new functions
workqueue_set_affinity_cpu and workqueue_set_affinities_cpu
- no thread is pinned to the requested cpu. In this case, affinity will
be set before (and cleared after) executing the work.

Signed-off-by: Riccardo Mancini <rickyman7@xxxxxxxxx>
---
tools/perf/util/workqueue/workqueue.c | 133 +++++++++++++++++++++++++-
tools/perf/util/workqueue/workqueue.h | 12 +++
2 files changed, 144 insertions(+), 1 deletion(-)

diff --git a/tools/perf/util/workqueue/workqueue.c b/tools/perf/util/workqueue/workqueue.c
index 61f1b6c41deba031..650170a6a11f56bd 100644
--- a/tools/perf/util/workqueue/workqueue.c
+++ b/tools/perf/util/workqueue/workqueue.c
@@ -10,9 +10,12 @@
#include <linux/string.h>
#include <linux/zalloc.h>
#include <linux/kernel.h>
+#include <linux/bitmap.h>
#include "debug.h"
#include <internal/lib.h>
#include "workqueue.h"
+#include <perf/cpumap.h>
+#include "util/affinity.h"

struct workqueue_struct *global_wq;

@@ -43,6 +46,10 @@ struct workqueue_struct {
struct worker **workers; /* array of all workers */
struct worker *next_worker; /* next worker to choose (round robin) */
int first_stopped_worker; /* next worker to start if needed */
+ struct {
+ int *map; /* maps cpu to thread idx */
+ int size; /* size of the map array */
+ } cpu_to_tidx_map;
};

static const char * const workqueue_errno_str[] = {
@@ -429,6 +436,7 @@ static void worker_thread(int tidx, struct task_struct *task)
struct workqueue_struct *create_workqueue(int nr_threads)
{
int ret, err = 0;
+ int nr_cpus = sysconf(_SC_NPROCESSORS_ONLN);
struct workqueue_struct *wq = zalloc(sizeof(struct workqueue_struct));

if (!wq) {
@@ -449,10 +457,18 @@ struct workqueue_struct *create_workqueue(int nr_threads)
goto out_delete_pool;
}

+ wq->cpu_to_tidx_map.size = nr_cpus;
+ wq->cpu_to_tidx_map.map = calloc(nr_cpus, sizeof(*wq->cpu_to_tidx_map.map));
+ if (!wq->workers) {
+ err = -ENOMEM;
+ goto out_free_workers;
+ }
+ memset(wq->cpu_to_tidx_map.map, -1, nr_cpus * sizeof(*wq->cpu_to_tidx_map.map));
+
ret = pthread_mutex_init(&wq->lock, NULL);
if (ret) {
err = -ret;
- goto out_free_workers;
+ goto out_free_cpu_to_idx_map;
}

ret = pthread_cond_init(&wq->idle_cond, NULL);
@@ -494,6 +510,8 @@ struct workqueue_struct *create_workqueue(int nr_threads)
pthread_mutex_destroy(&wq->lock);
out_free_workers:
free(wq->workers);
+out_free_cpu_to_idx_map:
+ free(wq->cpu_to_tidx_map.map);
out_delete_pool:
threadpool__delete(wq->pool);
out_free_wq:
@@ -552,6 +570,7 @@ int destroy_workqueue(struct workqueue_struct *wq)
wq->msg_pipe[1] = -1;

zfree(&wq->workers);
+ zfree(&wq->cpu_to_tidx_map.map);
free(wq);
return err;
}
@@ -779,6 +798,118 @@ int workqueue_set_affinity(struct workqueue_struct *wq, int tidx,
return wq->pool_errno ? -WORKQUEUE_ERROR__POOLAFFINITY : 0;
}

+/**
+ * workqueue_set_affinity_cpu - set affinity to @cpu to thread @tidx in @wq->pool
+ *
+ * If cpu is -1, then affinity is set to all online processors.
+ */
+int workqueue_set_affinity_cpu(struct workqueue_struct *wq, int tidx, int cpu)
+{
+ struct mmap_cpu_mask affinity;
+ int i, err;
+
+ if (cpu >= 0)
+ affinity.nbits = cpu+1;
+ else
+ affinity.nbits = wq->cpu_to_tidx_map.size;
+
+ affinity.bits = bitmap_alloc(affinity.nbits);
+ if (!affinity.bits) {
+ pr_debug2("Failed allocation of bitmapset\n");
+ return -ENOMEM;
+ }
+
+ if (cpu >= 0)
+ test_and_set_bit(cpu, affinity.bits);
+ else
+ bitmap_fill(affinity.bits, affinity.nbits);
+
+ err = workqueue_set_affinity(wq, tidx, &affinity);
+ if (err)
+ goto out;
+
+ // find and unset this thread from the map
+ for (i = 0; i < wq->cpu_to_tidx_map.size; i++) {
+ if (wq->cpu_to_tidx_map.map[i] == tidx)
+ wq->cpu_to_tidx_map.map[i] = -1;
+ }
+
+ if (cpu >= 0)
+ wq->cpu_to_tidx_map.map[cpu] = tidx;
+
+out:
+ bitmap_free(affinity.bits);
+ return err;
+}
+
+/**
+ * workqueue_set_affinities_cpu - set single-cpu affinities to all threads in @wq->pool
+ */
+int workqueue_set_affinities_cpu(struct workqueue_struct *wq,
+ struct perf_cpu_map *cpus)
+{
+ int cpu, idx, err;
+
+ if (perf_cpu_map__nr(cpus) > threadpool__size(wq->pool))
+ return -EINVAL;
+
+
+ perf_cpu_map__for_each_cpu(cpu, idx, cpus) {
+ err = workqueue_set_affinity_cpu(wq, idx, cpu);
+ if (err)
+ return err;
+ }
+
+ return 0;
+}
+
+struct cpu_bound_work {
+ struct work_struct work;
+ int cpu;
+ struct work_struct *original_work;
+};
+
+static void set_affinity_and_execute(struct work_struct *work)
+{
+ struct cpu_bound_work *cpu_bound_work = container_of(work, struct cpu_bound_work, work);
+ struct affinity affinity;
+
+ if (affinity__setup(&affinity) < 0)
+ goto out;
+
+ affinity__set(&affinity, cpu_bound_work->cpu);
+ cpu_bound_work->original_work->func(cpu_bound_work->original_work);
+ affinity__cleanup(&affinity);
+
+out:
+ free(cpu_bound_work);
+}
+
+/**
+ * queue_work_on - execute @work on @cpu
+ *
+ * The work is assigned to the worker pinned to @cpu, if any.
+ * Otherwise, affinity is set before running the work and unset after.
+ */
+int queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
+{
+ struct cpu_bound_work *cpu_bound_work;
+ int tidx = wq->cpu_to_tidx_map.map[cpu];
+
+ if (tidx >= 0)
+ return queue_work_on_worker(tidx, wq, work);
+
+ cpu_bound_work = malloc(sizeof(*cpu_bound_work));
+ if (!cpu_bound_work)
+ return -ENOMEM;
+
+ init_work(&cpu_bound_work->work);
+ cpu_bound_work->work.func = set_affinity_and_execute;
+ cpu_bound_work->cpu = cpu;
+ cpu_bound_work->original_work = work;
+ return queue_work(wq, &cpu_bound_work->work);
+}
+
/**
* init_work - initialize the @work struct
*/
diff --git a/tools/perf/util/workqueue/workqueue.h b/tools/perf/util/workqueue/workqueue.h
index dc6baee138b22ab2..a91a37e367b62d02 100644
--- a/tools/perf/util/workqueue/workqueue.h
+++ b/tools/perf/util/workqueue/workqueue.h
@@ -25,6 +25,7 @@ extern int workqueue_nr_threads(struct workqueue_struct *wq);

extern int queue_work(struct workqueue_struct *wq, struct work_struct *work);
extern int queue_work_on_worker(int tidx, struct workqueue_struct *wq, struct work_struct *work);
+extern int queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work);

extern int flush_workqueue(struct workqueue_struct *wq);

@@ -32,6 +33,9 @@ extern int workqueue_set_affinities(struct workqueue_struct *wq,
struct mmap_cpu_mask *affinities);
extern int workqueue_set_affinity(struct workqueue_struct *wq, int tidx,
struct mmap_cpu_mask *affinity);
+extern int workqueue_set_affinity_cpu(struct workqueue_struct *wq, int tidx, int cpu);
+extern int workqueue_set_affinities_cpu(struct workqueue_struct *wq,
+ struct perf_cpu_map *cpus);

extern void init_work(struct work_struct *work);

@@ -82,6 +86,14 @@ static inline int schedule_work_on_worker(int tidx, struct work_struct *work)
return queue_work_on_worker(tidx, global_wq, work);
}

+/**
+ * schedule_work_on - queue @work to be executed on @cpu by global_wq
+ */
+static inline int schedule_work_on(int cpu, struct work_struct *work)
+{
+ return queue_work_on(cpu, global_wq, work);
+}
+
/**
* flush_scheduled_work - ensure that any scheduled work in global_wq has run to completion
*/
--
2.31.1