mesytec-mnode/external/taskflow-3.8.0/unittests/test_asyncs.cpp
2025-01-04 01:25:05 +01:00

485 lines
11 KiB
C++

#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include <doctest.h>
#include <taskflow/taskflow.hpp>
// --------------------------------------------------------
// Testcase: Async
// --------------------------------------------------------
void async(unsigned W) {
tf::Executor executor(W);
std::vector<std::future<int>> fus;
std::atomic<int> counter(0);
int N = 100000;
for(int i=0; i<N; ++i) {
if(auto r = i%3; r==0) {
fus.emplace_back(executor.async(std::to_string(i), [&](){
counter.fetch_add(1, std::memory_order_relaxed);
return -2;
}));
}else if(r == 1) {
fus.emplace_back(executor.async(tf::DefaultTaskParams{}, [&](){
counter.fetch_add(1, std::memory_order_relaxed);
return -2;
}));
}
else {
fus.emplace_back(executor.async(tf::TaskParams{std::to_string(i)}, [&](){
counter.fetch_add(1, std::memory_order_relaxed);
return -2;
}));
}
}
executor.wait_for_all();
REQUIRE(counter == N);
int c = 0;
for(auto& fu : fus) {
c += fu.get();
}
REQUIRE(-c == 2*N);
}
TEST_CASE("Async.1thread" * doctest::timeout(300)) {
async(1);
}
TEST_CASE("Async.2threads" * doctest::timeout(300)) {
async(2);
}
TEST_CASE("Async.4threads" * doctest::timeout(300)) {
async(4);
}
TEST_CASE("Async.8threads" * doctest::timeout(300)) {
async(8);
}
TEST_CASE("Async.16threads" * doctest::timeout(300)) {
async(16);
}
// --------------------------------------------------------
// Testcase: NestedAsync
// --------------------------------------------------------
void nested_async(unsigned W) {
tf::Executor executor(W);
std::vector<std::future<int>> fus;
std::atomic<int> counter(0);
int N = 100000;
for(int i=0; i<N; ++i) {
fus.emplace_back(executor.async([&](){
counter.fetch_add(1, std::memory_order_relaxed);
executor.async([&](){
counter.fetch_add(1, std::memory_order_relaxed);
executor.async([&](){
counter.fetch_add(1, std::memory_order_relaxed);
executor.async([&](){
counter.fetch_add(1, std::memory_order_relaxed);
});
});
});
return -2;
}));
}
executor.wait_for_all();
REQUIRE(counter == 4*N);
int c = 0;
for(auto& fu : fus) {
c += fu.get();
}
REQUIRE(-c == 2*N);
}
TEST_CASE("NestedAsync.1thread" * doctest::timeout(300)) {
nested_async(1);
}
TEST_CASE("NestedAsync.2threads" * doctest::timeout(300)) {
nested_async(2);
}
TEST_CASE("NestedAsync.4threads" * doctest::timeout(300)) {
nested_async(4);
}
TEST_CASE("NestedAsync.8threads" * doctest::timeout(300)) {
nested_async(8);
}
TEST_CASE("NestedAsync.16threads" * doctest::timeout(300)) {
nested_async(16);
}
// --------------------------------------------------------
// Testcase MixedExecutorAsync
// --------------------------------------------------------
void mixed_executor_async(unsigned N) {
const size_t T = 1000;
std::vector<tf::Executor> executors(N);
std::atomic<size_t> counter(0);
auto check_wid = [&](unsigned e){
for(size_t i=0; i<N; i++) {
if(i == e) {
REQUIRE(executors[i].this_worker_id() != -1);
}
else {
REQUIRE(executors[i].this_worker_id() == -1);
}
}
};
for(size_t j=0; j<T; j++) {
for(size_t i=0; i<N; i++) {
executors[i].async([&, i, j](){
check_wid(i);
counter.fetch_add(1, std::memory_order_relaxed);
auto n = j % N;
executors[n].async([&, n](tf::Runtime&){
check_wid(n);
counter.fetch_add(1, std::memory_order_relaxed);
});
});
executors[i].silent_async([&, i, j](){
check_wid(i);
counter.fetch_add(1, std::memory_order_relaxed);
auto n = (j + 1) % N;
executors[n].silent_async([&, n](tf::Runtime){
check_wid(n);
counter.fetch_add(1, std::memory_order_relaxed);
});
});
}
}
while(counter.load() != 4000*N);
for(auto& executor : executors) {
executor.wait_for_all();
}
}
TEST_CASE("MixedAsync.1Executor" * doctest::timeout(300)) {
mixed_executor_async(1);
}
TEST_CASE("MixedAsync.2Executors" * doctest::timeout(300)) {
mixed_executor_async(2);
}
TEST_CASE("MixedAsync.4Executors" * doctest::timeout(300)) {
mixed_executor_async(4);
}
TEST_CASE("MixedAsync.5Executors" * doctest::timeout(300)) {
mixed_executor_async(5);
}
TEST_CASE("MixedAsync.6Executors" * doctest::timeout(300)) {
mixed_executor_async(6);
}
TEST_CASE("MixedAsync.7Executors" * doctest::timeout(300)) {
mixed_executor_async(7);
}
TEST_CASE("MixedAsync.8Executors" * doctest::timeout(300)) {
mixed_executor_async(8);
}
// --------------------------------------------------------
// Testcase: MixedAsync
// --------------------------------------------------------
void mixed_async(unsigned W) {
tf::Taskflow taskflow;
tf::Executor executor(W);
std::atomic<int> counter(0);
int N = 10000;
for(int i=0; i<N; i=i+1) {
tf::Task A, B, C, D;
std::tie(A, B, C, D) = taskflow.emplace(
[&] () {
executor.async([&](){
counter.fetch_add(1, std::memory_order_relaxed);
});
},
[&] () {
executor.async([&](){
counter.fetch_add(1, std::memory_order_relaxed);
});
},
[&] () {
executor.silent_async([&](){
counter.fetch_add(1, std::memory_order_relaxed);
});
},
[&] () {
executor.silent_async([&](){
counter.fetch_add(1, std::memory_order_relaxed);
});
}
);
A.precede(B, C);
D.succeed(B, C);
}
executor.run(taskflow);
executor.wait_for_all();
REQUIRE(counter == 4*N);
}
TEST_CASE("MixedAsync.1thread" * doctest::timeout(300)) {
mixed_async(1);
}
TEST_CASE("MixedAsync.2threads" * doctest::timeout(300)) {
mixed_async(2);
}
TEST_CASE("MixedAsync.4threads" * doctest::timeout(300)) {
mixed_async(4);
}
TEST_CASE("MixedAsync.8threads" * doctest::timeout(300)) {
mixed_async(8);
}
TEST_CASE("MixedAsync.16threads" * doctest::timeout(300)) {
mixed_async(16);
}
// --------------------------------------------------------
// Testcase: SubflowAsync
// --------------------------------------------------------
void subflow_async(size_t W) {
tf::Taskflow taskflow;
tf::Executor executor(W);
std::atomic<int> counter{0};
auto A = taskflow.emplace(
[&](){ counter.fetch_add(1, std::memory_order_relaxed); }
);
auto B = taskflow.emplace(
[&](){ counter.fetch_add(1, std::memory_order_relaxed); }
);
taskflow.emplace(
[&](){ counter.fetch_add(1, std::memory_order_relaxed); }
);
auto S1 = taskflow.emplace([&] (tf::Subflow& sf){
for(int i=0; i<1000; i++) {
sf.async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
}
});
auto S2 = taskflow.emplace([&] (tf::Subflow& sf){
sf.emplace([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
for(int i=0; i<1000; i++) {
sf.async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
}
});
taskflow.emplace([&] (tf::Subflow& sf){
sf.emplace([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
for(int i=0; i<1000; i++) {
sf.async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
}
sf.join();
});
taskflow.emplace([&] (tf::Subflow& sf){
for(int i=0; i<1000; i++) {
sf.async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
}
sf.join();
});
A.precede(S1, S2);
B.succeed(S1, S2);
executor.run(taskflow).wait();
REQUIRE(counter == 4005);
}
TEST_CASE("SubflowAsync.1thread") {
subflow_async(1);
}
TEST_CASE("SubflowAsync.3threads") {
subflow_async(3);
}
TEST_CASE("SubflowAsync.11threads") {
subflow_async(11);
}
// --------------------------------------------------------
// Testcase: NestedSubflowAsync
// --------------------------------------------------------
void nested_subflow_async(size_t W) {
tf::Taskflow taskflow;
tf::Executor executor(W);
std::atomic<int> counter{0};
taskflow.emplace([&](tf::Subflow& sf1){
for(int i=0; i<100; i++) {
sf1.async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
}
sf1.emplace([&](tf::Subflow& sf2){
for(int i=0; i<100; i++) {
sf2.async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
sf1.async(
[&](){ counter.fetch_add(1, std::memory_order_relaxed); }
);
}
sf2.emplace([&](tf::Subflow& sf3){
for(int i=0; i<100; i++) {
sf3.silent_async(
[&](){ counter.fetch_add(1, std::memory_order_relaxed); }
);
sf2.silent_async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
sf1.silent_async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
}
});
});
sf1.join();
REQUIRE(counter == 600);
});
executor.run(taskflow).wait();
REQUIRE(counter == 600);
}
TEST_CASE("NestedSubflowAsync.1thread") {
nested_subflow_async(1);
}
TEST_CASE("NestedSubflowAsync.3threads") {
nested_subflow_async(3);
}
TEST_CASE("NestedSubflowAsync.11threads") {
nested_subflow_async(11);
}
// --------------------------------------------------------
// Testcase: RuntimeAsync
// --------------------------------------------------------
void runtime_async(size_t W) {
tf::Taskflow taskflow;
tf::Executor executor(W);
std::atomic<int> counter{0};
auto A = taskflow.emplace(
[&](){ counter.fetch_add(1, std::memory_order_relaxed); }
);
auto B = taskflow.emplace(
[&](){ counter.fetch_add(1, std::memory_order_relaxed); }
);
taskflow.emplace(
[&](){ counter.fetch_add(1, std::memory_order_relaxed); }
);
auto S1 = taskflow.emplace([&] (tf::Runtime& sf){
for(int i=0; i<1000; i++) {
sf.silent_async(
[&](){counter.fetch_add(1, std::memory_order_relaxed);}
);
}
sf.corun_all();
});
auto S2 = taskflow.emplace([&] (tf::Runtime& sf){
sf.silent_async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
for(int i=0; i<1000; i++) {
sf.silent_async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
}
sf.corun_all();
});
taskflow.emplace([&] (tf::Runtime& sf){
sf.silent_async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
for(int i=0; i<1000; i++) {
sf.async(
[&](){ counter.fetch_add(1, std::memory_order_relaxed); }
);
}
sf.corun_all();
});
taskflow.emplace([&] (tf::Runtime& sf){
for(int i=0; i<1000; i++) {
sf.async([&](){ counter.fetch_add(1, std::memory_order_relaxed); });
}
sf.corun_all();
});
A.precede(S1, S2);
B.succeed(S1, S2);
executor.run(taskflow).wait();
REQUIRE(counter == 4005);
}
TEST_CASE("RuntimeAsync.1thread") {
runtime_async(1);
}
TEST_CASE("RuntimeAsync.3threads") {
runtime_async(3);
}
TEST_CASE("RuntimeAsync.11threads") {
runtime_async(11);
}