[RFC PATCH v0.1 9/9] selftests/umcg: add UMCG server/worker API selftest

From: Peter Oskolkov
Date: Thu May 20 2021 - 14:37:00 EST


Add UMCG server/worker API selftests. These are only basic
tests, they do not cover many important use cases/conditions.

More to come.

Signed-off-by: Peter Oskolkov <posk@xxxxxxxxxx>
---
tools/testing/selftests/umcg/.gitignore | 1 +
tools/testing/selftests/umcg/Makefile | 4 +-
tools/testing/selftests/umcg/umcg_test.c | 475 +++++++++++++++++++++++
3 files changed, 479 insertions(+), 1 deletion(-)
create mode 100644 tools/testing/selftests/umcg/umcg_test.c

diff --git a/tools/testing/selftests/umcg/.gitignore b/tools/testing/selftests/umcg/.gitignore
index 89cca24e5907..f488ec82882a 100644
--- a/tools/testing/selftests/umcg/.gitignore
+++ b/tools/testing/selftests/umcg/.gitignore
@@ -1,2 +1,3 @@
# SPDX-License-Identifier: GPL-2.0-only
umcg_core_test
+umcg_test
diff --git a/tools/testing/selftests/umcg/Makefile b/tools/testing/selftests/umcg/Makefile
index b151098e2ed1..916897d82e53 100644
--- a/tools/testing/selftests/umcg/Makefile
+++ b/tools/testing/selftests/umcg/Makefile
@@ -6,8 +6,10 @@ LIBUMCGDIR := $(TOOLSDIR)/lib/umcg
CFLAGS += -g -O0 -I$(LIBUMCGDIR) -I$(TOOLSDIR)/include/ -I../../../../usr/include/
LDLIBS += -lpthread -static

-TEST_GEN_PROGS := umcg_core_test
+TEST_GEN_PROGS := umcg_core_test umcg_test

include ../lib.mk

$(OUTPUT)/umcg_core_test: umcg_core_test.c $(LIBUMCGDIR)/libumcg.c
+
+$(OUTPUT)/umcg_test: umcg_test.c $(LIBUMCGDIR)/libumcg.c
diff --git a/tools/testing/selftests/umcg/umcg_test.c b/tools/testing/selftests/umcg/umcg_test.c
new file mode 100644
index 000000000000..2c01a61ec3f4
--- /dev/null
+++ b/tools/testing/selftests/umcg/umcg_test.c
@@ -0,0 +1,475 @@
+// SPDX-License-Identifier: GPL-2.0
+#define _GNU_SOURCE
+#include "libumcg.h"
+
+#include <pthread.h>
+#include <stdatomic.h>
+
+#include "../kselftest_harness.h"
+
+#define CHECK_CONFIG() \
+{ \
+ int ret = sys_umcg_api_version(1, 0); \
+ \
+ if (ret == -1 && errno == ENOSYS) \
+ SKIP(return, "CONFIG_UMCG not set"); \
+}
+
+struct worker_args {
+ umcg_t group; /* Which group the worker should join. */
+ umcg_tid utid; /* This worker's utid. */
+ void *(*thread_fn)(void *); /* Function to run. */
+ void *thread_arg;
+ intptr_t tag;
+};
+
+static void validate_state(umcg_tid utid, u32 expected, const char *ctx)
+{
+ u32 state = umcg_get_task_state(utid);
+
+ if (state == expected)
+ return;
+
+ fprintf(stderr, "BAD state for %ld: expected: %u; got: %u; ctx :%s\n",
+ utid, expected, state, ctx);
+ exit(1);
+}
+
+static void *worker_fn(void *arg)
+{
+ void *result;
+ umcg_tid utid;
+ struct worker_args *args = (struct worker_args *)arg;
+
+ validate_state(umcg_get_utid(), UMCG_TASK_NONE, "worker_fn start");
+
+ atomic_thread_fence(memory_order_acquire);
+ atomic_store_explicit(&args->utid, umcg_get_utid(),
+ memory_order_seq_cst);
+
+ utid = umcg_register_worker(args->group, args->tag);
+ if (args->utid != utid) {
+ fprintf(stderr, "umcg_register_worker failed.\n");
+ exit(1);
+ }
+ validate_state(umcg_get_utid(), UMCG_TASK_RUNNING, "worker_fn in");
+
+ /* Fence args->thread_arg */
+ atomic_thread_fence(memory_order_acquire);
+
+ result = args->thread_fn(args->thread_arg);
+ validate_state(umcg_get_utid(), UMCG_TASK_RUNNING, "worker_fn out");
+
+ if (umcg_unregister_task()) {
+ fprintf(stderr, "umcg_unregister_task failed.\n");
+ exit(1);
+ }
+ validate_state(umcg_get_utid(), UMCG_TASK_NONE, "worker_fn finish");
+
+ return result;
+}
+
+static void *simple_running_worker(void *arg)
+{
+ bool *checkpoint = (bool *)arg;
+
+ atomic_store_explicit(checkpoint, true, memory_order_relaxed);
+ return NULL;
+}
+
+TEST(umcg_poll_run_test) {
+ pthread_t worker;
+ bool checkpoint = false;
+ struct worker_args worker_args;
+
+ CHECK_CONFIG();
+
+ worker_args.utid = UMCG_NONE;
+ worker_args.group = umcg_create_group(0);
+ ASSERT_NE(UMCG_NONE, worker_args.group);
+
+ worker_args.thread_fn = &simple_running_worker;
+ worker_args.thread_arg = &checkpoint;
+ worker_args.tag = 0;
+
+ ASSERT_EQ(0, pthread_create(&worker, NULL, &worker_fn, &worker_args));
+
+ /* Wait for the worker to start. */
+ while (UMCG_NONE == atomic_load_explicit(&worker_args.utid,
+ memory_order_relaxed))
+ ;
+
+ /*
+ * Make sure that the worker does not checkpoint until the server
+ * runs it.
+ */
+ usleep(1000);
+ ASSERT_FALSE(atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+ ASSERT_NE(0, umcg_register_server(worker_args.group, 0));
+
+ /*
+ * Run the worker until it exits. Need to loop because the worker
+ * may pagefault and wake the server.
+ */
+ do {
+ u32 state;
+
+ /* Poll the worker. */
+ ASSERT_EQ(worker_args.utid, umcg_poll_worker());
+ validate_state(worker_args.utid, UMCG_TASK_RUNNABLE, "wns poll");
+
+ umcg_tid utid = umcg_run_worker(worker_args.utid);
+ if (utid == UMCG_NONE) {
+ ASSERT_EQ(0, errno);
+ break;
+ }
+
+ ASSERT_EQ(utid, worker_args.utid);
+
+ state = umcg_get_task_state(utid);
+ ASSERT_TRUE(state == UMCG_TASK_BLOCKED || UMCG_TASK_UNBLOCKED);
+ } while (true);
+
+ ASSERT_TRUE(atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+ /* Can't destroy group while this thread still belongs to it. */
+ ASSERT_NE(0, umcg_destroy_group(worker_args.group));
+ ASSERT_EQ(0, umcg_unregister_task());
+ ASSERT_EQ(0, umcg_destroy_group(worker_args.group));
+ ASSERT_EQ(0, pthread_join(worker, NULL));
+}
+
+static void *sleeping_worker(void *arg)
+{
+ int *checkpoint = (int *)arg;
+
+ atomic_store_explicit(checkpoint, 1, memory_order_relaxed);
+ usleep(2000);
+ atomic_store_explicit(checkpoint, 2, memory_order_relaxed);
+
+ return NULL;
+}
+
+TEST(umcg_sleep_test) {
+ pthread_t worker;
+ u32 state;
+ int checkpoint = 0;
+ struct worker_args worker_args;
+
+ CHECK_CONFIG();
+
+ worker_args.utid = UMCG_NONE;
+ worker_args.group = umcg_create_group(0);
+ ASSERT_NE(UMCG_NONE, worker_args.group);
+
+ worker_args.thread_fn = &sleeping_worker;
+ worker_args.thread_arg = &checkpoint;
+ worker_args.tag = 0;
+
+ ASSERT_EQ(0, pthread_create(&worker, NULL, &worker_fn, &worker_args));
+
+ /* Wait for the worker to start. */
+ while (UMCG_NONE == atomic_load_explicit(&worker_args.utid,
+ memory_order_relaxed))
+ ;
+
+ /*
+ * Make sure that the worker does not checkpoint until the server
+ * runs it.
+ */
+ usleep(1000);
+ ASSERT_EQ(0, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+ validate_state(umcg_get_utid(), UMCG_TASK_NONE, "sws prereg");
+
+ ASSERT_NE(0, umcg_register_server(worker_args.group, 0));
+
+ validate_state(umcg_get_utid(), UMCG_TASK_PROCESSING, "sws postreg");
+
+ /*
+ * Run the worker until it checkpoints 1. Need to loop because
+ * the worker may pagefault and wake the server.
+ */
+ do {
+ ASSERT_EQ(worker_args.utid, umcg_poll_worker());
+ validate_state(worker_args.utid, UMCG_TASK_RUNNABLE,
+ "sws poll");
+
+ umcg_tid utid = umcg_run_worker(worker_args.utid);
+ ASSERT_EQ(utid, worker_args.utid);
+ } while (1 != atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+ state = umcg_get_task_state(worker_args.utid);
+ ASSERT_TRUE(state == UMCG_TASK_BLOCKED || UMCG_TASK_UNBLOCKED);
+ validate_state(umcg_get_utid(), UMCG_TASK_PROCESSING, "sws mid");
+
+ /* The worker cannot reach checkpoint 2 without the server running it. */
+ usleep(2000);
+ ASSERT_EQ(1, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+ state = umcg_get_task_state(worker_args.utid);
+ ASSERT_TRUE(state == UMCG_TASK_BLOCKED || UMCG_TASK_UNBLOCKED);
+
+ /* Run the worker until it exits. */
+ do {
+ ASSERT_EQ(worker_args.utid, umcg_poll_worker());
+ umcg_tid utid = umcg_run_worker(worker_args.utid);
+ if (utid == UMCG_NONE) {
+ ASSERT_EQ(0, errno);
+ break;
+ }
+
+ ASSERT_EQ(utid, worker_args.utid);
+ } while (true);
+
+ /* The final check and cleanup. */
+ ASSERT_EQ(2, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+ validate_state(umcg_get_utid(), UMCG_TASK_PROCESSING, "sws preunreg");
+ ASSERT_EQ(0, pthread_join(worker, NULL));
+ ASSERT_EQ(0, umcg_unregister_task());
+ validate_state(umcg_get_utid(), UMCG_TASK_NONE, "sws postunreg");
+ ASSERT_EQ(0, umcg_destroy_group(worker_args.group));
+}
+
+static void *waiting_worker(void *arg)
+{
+ int *checkpoint = (int *)arg;
+
+ atomic_store_explicit(checkpoint, 1, memory_order_relaxed);
+ if (umcg_wait(NULL)) {
+ fprintf(stderr, "umcg_wait() failed.\n");
+ exit(1);
+ }
+ atomic_store_explicit(checkpoint, 2, memory_order_relaxed);
+
+ return NULL;
+}
+
+TEST(umcg_wait_wake_test) {
+ pthread_t worker;
+ int checkpoint = 0;
+ struct worker_args worker_args;
+
+ CHECK_CONFIG();
+
+ worker_args.utid = UMCG_NONE;
+ worker_args.group = umcg_create_group(0);
+ ASSERT_NE(UMCG_NONE, worker_args.group);
+
+ worker_args.thread_fn = &waiting_worker;
+ worker_args.thread_arg = &checkpoint;
+ worker_args.tag = 0;
+
+ ASSERT_EQ(0, pthread_create(&worker, NULL, &worker_fn, &worker_args));
+
+ /* Wait for the worker to start. */
+ while (UMCG_NONE == atomic_load_explicit(&worker_args.utid,
+ memory_order_relaxed))
+ ;
+
+ /*
+ * Make sure that the worker does not checkpoint until the server
+ * runs it.
+ */
+ usleep(1000);
+ ASSERT_EQ(0, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+ ASSERT_NE(0, umcg_register_server(worker_args.group, 0));
+
+ /*
+ * Run the worker until it checkpoints 1. Need to loop because
+ * the worker may pagefault and wake the server.
+ */
+ do {
+ ASSERT_EQ(worker_args.utid, umcg_poll_worker());
+ ASSERT_EQ(worker_args.utid, umcg_run_worker(worker_args.utid));
+ } while (1 != atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+ validate_state(worker_args.utid, UMCG_TASK_RUNNABLE, "wait_wake wait");
+
+ /* The worker cannot reach checkpoint 2 without the server waking it. */
+ usleep(2000);
+ ASSERT_EQ(1, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+ validate_state(worker_args.utid, UMCG_TASK_RUNNABLE, "wait_wake wait");
+
+
+ ASSERT_EQ(0, umcg_wake(worker_args.utid));
+
+ /*
+ * umcg_wake() above marks the worker as RUNNING; it will become
+ * UNBLOCKED upon wakeup as it does not have a server. But this may
+ * be delayed.
+ */
+ while (umcg_get_task_state(worker_args.utid) != UMCG_TASK_UNBLOCKED)
+ ;
+
+ /* The worker cannot reach checkpoint 2 without the server running it. */
+ usleep(2000);
+ ASSERT_EQ(1, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+
+ /* Run the worker until it exits. */
+ do {
+ ASSERT_EQ(worker_args.utid, umcg_poll_worker());
+ umcg_tid utid = umcg_run_worker(worker_args.utid);
+ if (utid == UMCG_NONE) {
+ ASSERT_EQ(0, errno);
+ break;
+ }
+
+ ASSERT_EQ(utid, worker_args.utid);
+ } while (true);
+
+ /* The final check and cleanup. */
+ ASSERT_EQ(2, atomic_load_explicit(&checkpoint, memory_order_relaxed));
+ ASSERT_EQ(0, pthread_join(worker, NULL));
+ ASSERT_EQ(0, umcg_unregister_task());
+ ASSERT_EQ(0, umcg_destroy_group(worker_args.group));
+}
+
+static void *swapping_worker(void *arg)
+{
+ umcg_tid next;
+
+ atomic_thread_fence(memory_order_acquire);
+ next = (umcg_tid)arg;
+
+ if (next == UMCG_NONE) {
+ if (0 != umcg_wait(NULL)) {
+ fprintf(stderr, "swapping_worker: umcg_wait failed\n");
+ exit(1);
+ }
+ } else {
+ if (0 != umcg_swap(next, NULL)) {
+ fprintf(stderr, "swapping_worker: umcg_swap failed\n");
+ exit(1);
+ }
+ }
+
+ return NULL;
+}
+
+TEST(umcg_swap_test) {
+ const int n_workers = 10;
+ struct worker_args *worker_args;
+ int swap_chain_wakeups = 0;
+ umcg_tid utid = UMCG_NONE;
+ bool *workers_polled;
+ pthread_t *workers;
+ umcg_t group_id;
+ int idx;
+
+ CHECK_CONFIG();
+
+ group_id = umcg_create_group(0);
+ ASSERT_NE(UMCG_NONE, group_id);
+
+ workers = malloc(n_workers * sizeof(pthread_t));
+ worker_args = malloc(n_workers * sizeof(struct worker_args));
+ workers_polled = malloc(n_workers * sizeof(bool));
+ if (!workers || !worker_args || !workers_polled) {
+ fprintf(stderr, "malloc failed\n");
+ exit(1);
+ }
+
+ memset(worker_args, 0, n_workers * sizeof(struct worker_args));
+
+ /* Start workers. All will block in umcg_register_worker(). */
+ for (idx = 0; idx < n_workers; ++idx) {
+ workers_polled[idx] = false;
+
+ worker_args[idx].group = group_id;
+ worker_args[idx].thread_fn = &swapping_worker;
+ worker_args[idx].tag = idx;
+ atomic_thread_fence(memory_order_release);
+
+ ASSERT_EQ(0, pthread_create(&workers[idx], NULL, &worker_fn,
+ &worker_args[idx]));
+ }
+
+ /* Wait for all workers to update their utids. */
+ for (idx = 0; idx < n_workers; ++idx) {
+ uint64_t counter = 0;
+ while (UMCG_NONE == atomic_load_explicit(&worker_args[idx].utid,
+ memory_order_seq_cst)) {
+ ++counter;
+ if (!(counter % 1000000))
+ fprintf(stderr, "looping for utid: %d %lu\n",
+ idx, counter);
+ }
+ }
+
+ /* Update worker args. */
+ for (idx = 0; idx < (n_workers - 1); ++idx) {
+ worker_args[idx].thread_arg = (void *)worker_args[idx + 1].utid;
+ }
+ atomic_thread_fence(memory_order_release);
+
+ ASSERT_NE(0, umcg_register_server(group_id, 0));
+
+ /* Poll workers. */
+ for (idx = 0; idx < n_workers; ++idx) {
+ utid = umcg_poll_worker();
+
+ ASSERT_NE(UMCG_NONE, utid);
+ workers_polled[umcg_get_task_tag(utid)] = true;
+
+ validate_state(utid, UMCG_TASK_RUNNABLE, "swap poll");
+ }
+
+ /* Check that all workers have been polled. */
+ for (idx = 0; idx < n_workers; ++idx) {
+ ASSERT_TRUE(workers_polled[idx]);
+ }
+
+ /* Run the first worker; the swap chain will lead to the last worker. */
+ utid = worker_args[0].utid;
+ idx = 0;
+ do {
+ uint32_t state;
+
+ utid = umcg_run_worker(utid);
+ if (utid == worker_args[n_workers - 1].utid &&
+ umcg_get_task_state(utid) == UMCG_TASK_RUNNABLE)
+ break;
+
+ /* There can be an occasional mid-swap wakeup due to pagefault. */
+ ++swap_chain_wakeups;
+
+ /* Validate progression. */
+ ASSERT_GE(umcg_get_task_tag(utid), idx);
+ idx = umcg_get_task_tag(utid);
+
+ /* Validate state. */
+ state = umcg_get_task_state(utid);
+ ASSERT_TRUE(state == UMCG_TASK_BLOCKED ||
+ state == UMCG_TASK_UNBLOCKED);
+
+ ASSERT_EQ(utid, umcg_poll_worker());
+ } while (true);
+
+ ASSERT_LT(swap_chain_wakeups, 4);
+ if (swap_chain_wakeups)
+ fprintf(stderr, "WARNING: %d swap chain wakeups\n",
+ swap_chain_wakeups);
+
+ /* Finally run/release all workers. */
+ for (idx = 0; idx < n_workers; ++idx) {
+ utid = worker_args[idx].utid;
+ do {
+ utid = umcg_run_worker(utid);
+ if (utid) {
+ ASSERT_EQ(utid, worker_args[idx].utid);
+ ASSERT_EQ(utid, umcg_poll_worker());
+ }
+ } while (utid != UMCG_NONE);
+ }
+
+ /* Cleanup. */
+ for (idx = 0; idx < n_workers; ++idx)
+ ASSERT_EQ(0, pthread_join(workers[idx], NULL));
+ ASSERT_EQ(0, umcg_unregister_task());
+ ASSERT_EQ(0, umcg_destroy_group(group_id));
+}
+
+TEST_HARNESS_MAIN
--
2.31.1.818.g46aad6cb9e-goog