[RFC PATCH v1 28/37] perf evlist: add multithreading to evlist__open

From: Riccardo Mancini
Date: Sat Aug 21 2021 - 05:22:17 EST


This patch enables multithreading in evlist__open using the new
evsel__open_per_cpu_no_fallback function.

The multithreaded version tries to open everything in parallel. Once
workers are done, it checks their result and, in case of error, it
tries the fallback mechanisms present in evsel__open_cpu and restarts
the workers from were they've left.

Signed-off-by: Riccardo Mancini <rickyman7@xxxxxxxxx>
---
tools/perf/util/evlist.c | 189 +++++++++++++++++++++++++++++++++++++--
1 file changed, 183 insertions(+), 6 deletions(-)

diff --git a/tools/perf/util/evlist.c b/tools/perf/util/evlist.c
index f0a839107e8849bf..3472038d719ec7d4 100644
--- a/tools/perf/util/evlist.c
+++ b/tools/perf/util/evlist.c
@@ -19,6 +19,8 @@
#include "units.h"
#include "bpf_counter.h"
#include <internal/lib.h> // page_size
+#include <internal/cpumap.h>
+#include <internal/threadmap.h>
#include "affinity.h"
#include "../perf.h"
#include "asm/bug.h"
@@ -1403,11 +1405,184 @@ static int evlist__create_syswide_maps(struct evlist *evlist)
return err;
}

-int evlist__open(struct evlist *evlist)
+static int evlist__open_singlethreaded(struct evlist *evlist)
{
struct evsel *evsel;
int err;

+ evlist__for_each_entry(evlist, evsel) {
+ err = evsel__open(evsel, evsel->core.cpus, evsel->core.threads);
+ if (err < 0)
+ return err;
+ }
+
+ return 0;
+}
+
+struct evlist_open_work {
+ struct work_struct work;
+ struct evlist *evlist;
+ int cpu;
+ union {
+ int cpu_resume;
+ int cpu_err;
+ };
+ union {
+ struct evsel *evsel_resume;
+ struct evsel *evsel_err;
+ };
+ struct evsel_open_result res; // this is also used to resume work
+ bool progress; // did the worker do any progress?
+};
+
+static void evlist__open_multithreaded_func(struct work_struct *_work)
+{
+ struct evlist_open_work *work = container_of(_work, struct evlist_open_work, work);
+ struct evsel *evsel = work->evsel_resume;
+ int cpu_idx, thread_resume = work->res.thread;
+
+ work->res.peo_res.err = PEO_SUCCESS;
+ work->progress = false;
+
+ if (!evsel) // nothing to do
+ return;
+
+ work->evsel_err = NULL;
+
+ evlist__for_each_entry_from(work->evlist, evsel) {
+ cpu_idx = evsel__find_cpu(evsel, work->cpu);
+ if (cpu_idx < work->cpu_resume)
+ continue;
+
+ work->res = evsel__open_per_cpu_no_fallback(evsel,
+ evsel->core.cpus,
+ evsel->core.threads,
+ cpu_idx, thread_resume);
+ work->progress |= work->res.thread != thread_resume;
+ if (work->res.peo_res.err != PEO_SUCCESS) {
+ work->evsel_err = evsel;
+ work->cpu_err = cpu_idx;
+ break;
+ }
+
+ thread_resume = 0;
+ }
+}
+
+static int evlist__open_multithreaded(struct evlist *evlist)
+{
+ int cpu, cpuid, cpuidx, thread, err;
+ struct evlist_open_work *works;
+ char errbuf[WORKQUEUE_STRERR_BUFSIZE];
+ struct perf_event_open_result peo_res;
+ struct evsel *evsel;
+ struct perf_cpu_map *cpus;
+ struct perf_thread_map *threads;
+ enum rlimit_action set_rlimit = NO_CHANGE;
+ bool progress;
+
+ works = calloc(perf_cpu_map__nr(evlist->core.all_cpus), sizeof(*works));
+ if (!works)
+ return -ENOMEM;
+
+ perf_cpu_map__for_each_cpu(cpuid, cpuidx, evlist->core.all_cpus) {
+ init_work(&works[cpuidx].work);
+ works[cpuidx].work.func = evlist__open_multithreaded_func;
+ works[cpuidx].evlist = evlist;
+ works[cpuidx].cpu = cpuid;
+ works[cpuidx].evsel_resume = evlist__first(evlist);
+ }
+
+reprepare:
+ evlist__for_each_entry(evlist, evsel) {
+ err = evsel__prepare_open(evsel, evsel->core.cpus,
+ evsel->core.threads);
+ if (err)
+ goto out;
+ }
+retry:
+ perf_cpu_map__for_each_cpu(cpuid, cpuidx, evlist->core.all_cpus) {
+ err = schedule_work_on(cpuid, &works[cpuidx].work);
+ if (err) {
+ workqueue_strerror(global_wq, err, errbuf, sizeof(errbuf));
+ pr_debug("schedule_work: %s\n", errbuf);
+ goto out;
+ }
+ }
+
+ err = flush_scheduled_work();
+ if (err) {
+ workqueue_strerror(global_wq, err, errbuf, sizeof(errbuf));
+ pr_debug("flush_scheduled_work: %s\n", errbuf);
+ goto out;
+ }
+
+ // check if any event was opened (progress = true)
+ progress = false;
+ perf_cpu_map__for_each_cpu(cpuid, cpuidx, evlist->core.all_cpus) {
+ if (works[cpuidx].progress) {
+ progress = true;
+ break;
+ }
+ }
+
+ perf_cpu_map__for_each_cpu(cpuid, cpuidx, evlist->core.all_cpus) {
+ peo_res = works[cpuidx].res.peo_res;
+
+ switch (peo_res.err) {
+ case PEO_SUCCESS:
+ continue;
+ case PEO_FALLBACK:
+ err = peo_res.rc;
+ break;
+ default:
+ case PEO_ERROR:
+ err = peo_res.rc;
+ goto out;
+ }
+
+ // fallback
+ evsel = works[cpuidx].evsel_err;
+ cpus = evsel->core.cpus;
+ cpu = works[cpuidx].cpu_err;
+ threads = evsel->core.threads;
+ thread = works[cpuidx].res.thread;
+
+ if (evsel__precise_ip_fallback(evsel))
+ goto retry;
+
+ if (evsel__ignore_missing_thread(evsel, cpus->nr, cpu,
+ threads, thread, err))
+ goto retry;
+
+ // increase rlimit only if no progress was made
+ if (progress)
+ set_rlimit = NO_CHANGE;
+ if (err == -EMFILE && evsel__increase_rlimit(&set_rlimit))
+ goto retry;
+
+ if (err != -EINVAL || cpu > 0 || thread > 0)
+ goto out;
+
+ if (evsel__detect_missing_features(evsel))
+ goto reprepare;
+
+ // no fallback worked, return the error
+ goto out;
+ }
+
+ err = 0;
+
+out:
+ free(works);
+
+ return err;
+}
+
+int evlist__open(struct evlist *evlist)
+{
+ int err;
+
/*
* Default: one fd per CPU, all threads, aka systemwide
* as sys_perf_event_open(cpu = -1, thread = -1) is EINVAL
@@ -1420,11 +1595,13 @@ int evlist__open(struct evlist *evlist)

evlist__update_id_pos(evlist);

- evlist__for_each_entry(evlist, evsel) {
- err = evsel__open(evsel, evsel->core.cpus, evsel->core.threads);
- if (err < 0)
- goto out_err;
- }
+ if (perf_singlethreaded)
+ err = evlist__open_singlethreaded(evlist);
+ else
+ err = evlist__open_multithreaded(evlist);
+
+ if (err)
+ goto out_err;

return 0;
out_err:
--
2.31.1