1120 lines
30 KiB
C++
1120 lines
30 KiB
C++
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
|
|
|
|
#include <doctest.h>
|
|
#include <taskflow/taskflow.hpp>
|
|
#include <taskflow/algorithm/for_each.hpp>
|
|
#include <taskflow/algorithm/transform.hpp>
|
|
#include <taskflow/algorithm/reduce.hpp>
|
|
#include <taskflow/algorithm/scan.hpp>
|
|
#include <taskflow/algorithm/sort.hpp>
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// embarrassing parallelism
|
|
// ----------------------------------------------------------------------------
|
|
|
|
void embarrassing_parallelism(unsigned W) {
|
|
|
|
tf::Executor executor(W);
|
|
|
|
std::atomic<int> counter(0);
|
|
|
|
int N = 100000;
|
|
|
|
for (int i = 0; i < N/2; ++i) {
|
|
executor.silent_dependent_async(
|
|
tf::TaskParams{std::to_string(i)}, [&](){
|
|
counter.fetch_add(1, std::memory_order_relaxed);
|
|
}
|
|
);
|
|
}
|
|
|
|
for (int i = N/2; i < N; ++i) {
|
|
executor.dependent_async(
|
|
tf::DefaultTaskParams{}, [&](){
|
|
counter.fetch_add(1, std::memory_order_relaxed);
|
|
}
|
|
);
|
|
}
|
|
|
|
executor.wait_for_all();
|
|
|
|
int cnt = counter.load(std::memory_order_relaxed);
|
|
|
|
REQUIRE(cnt == N);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.EmbarrassingParallelism.1thread" * doctest::timeout(300)) {
|
|
embarrassing_parallelism(1);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.EmbarrassingParallelism.2threads" * doctest::timeout(300)) {
|
|
embarrassing_parallelism(2);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.EmbarrassingParallelism.4threads" * doctest::timeout(300)) {
|
|
embarrassing_parallelism(4);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.EmbarrassingParallelism.8threads" * doctest::timeout(300)) {
|
|
embarrassing_parallelism(8);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.EmbarrassingParallelism.16threads" * doctest::timeout(300)) {
|
|
embarrassing_parallelism(16);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Linear Chain
|
|
// ----------------------------------------------------------------------------
|
|
|
|
void silent_dependent_async_linear_chain(unsigned W) {
|
|
|
|
tf::Executor executor(W);
|
|
|
|
int N = 100000;
|
|
std::vector<tf::CachelineAligned<int>> results(N);
|
|
std::vector<tf::AsyncTask> tasks;
|
|
|
|
for (int i = 0; i < N; ++i) {
|
|
if (i == 0) {
|
|
auto t = executor.silent_dependent_async(
|
|
[&results, i](){
|
|
results[i].data = i+1;
|
|
}
|
|
);
|
|
tasks.push_back(t);
|
|
}
|
|
else {
|
|
auto t = executor.silent_dependent_async(
|
|
[&results, i](){
|
|
results[i].data = results[i-1].data + i;
|
|
}, tasks.begin(), tasks.end()
|
|
);
|
|
tasks.clear();
|
|
tasks.push_back(t);
|
|
}
|
|
}
|
|
|
|
executor.wait_for_all();
|
|
|
|
REQUIRE(results[0].data == 1);
|
|
|
|
for (int i = 1; i < N; ++i) {
|
|
REQUIRE(results[i].data == results[i-1].data + i);
|
|
}
|
|
}
|
|
|
|
TEST_CASE("SilentDependentAsync.LinearChain.1thread" * doctest::timeout(300)) {
|
|
silent_dependent_async_linear_chain(1);
|
|
}
|
|
|
|
TEST_CASE("SilentDependentAsync.LinearChain.2threads" * doctest::timeout(300)) {
|
|
silent_dependent_async_linear_chain(2);
|
|
}
|
|
|
|
TEST_CASE("SilentDependentAsync.LinearChain.4threads" * doctest::timeout(300)) {
|
|
silent_dependent_async_linear_chain(4);
|
|
}
|
|
|
|
TEST_CASE("SilentDependentAsync.LinearChain.8threads" * doctest::timeout(300)) {
|
|
silent_dependent_async_linear_chain(8);
|
|
}
|
|
|
|
TEST_CASE("SilentDependentAsync.LinearChain.16threads" * doctest::timeout(300)) {
|
|
silent_dependent_async_linear_chain(16);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Simple Graph
|
|
// ----------------------------------------------------------------------------
|
|
|
|
// task dependence :
|
|
//
|
|
// |--> 1 |--> 4
|
|
// 0 ----> 2 -----> 5
|
|
// |--> 3 |--> 6
|
|
//
|
|
void simple_graph(unsigned W) {
|
|
|
|
tf::Executor executor(W);
|
|
|
|
size_t count = 7;
|
|
|
|
std::vector<int> results;
|
|
std::vector<tf::AsyncTask> tasks;
|
|
|
|
for (int id = 0; id < 100; ++id) {
|
|
|
|
results.resize(count);
|
|
|
|
auto t0 = executor.silent_dependent_async(
|
|
[&](){
|
|
results[0] = 100 + id;
|
|
}
|
|
);
|
|
|
|
tasks.push_back(t0);
|
|
|
|
auto t1 = executor.silent_dependent_async(
|
|
[&](){
|
|
results[1] = results[0] * 6 + id;
|
|
}, tasks.begin(), tasks.end()
|
|
);
|
|
|
|
auto t2 = executor.silent_dependent_async(
|
|
[&](){
|
|
results[2] = results[0] - 200 + id;
|
|
}, tasks.begin(), tasks.end()
|
|
);
|
|
|
|
auto t3 = executor.silent_dependent_async(
|
|
[&](){
|
|
results[3] = results[0] / 9 + id;
|
|
}, tasks.begin(), tasks.end()
|
|
);
|
|
|
|
auto t4 = executor.silent_dependent_async(
|
|
[&](){
|
|
results[4] = results[2] + 66 + id;
|
|
}, t2
|
|
);
|
|
|
|
auto t5 = executor.silent_dependent_async(
|
|
[&](){
|
|
results[5] = results[2] - 999 + id;
|
|
}, t2
|
|
);
|
|
|
|
auto t6 = executor.silent_dependent_async(
|
|
[&](){
|
|
results[6] = results[2] * 9 / 13 + id;
|
|
}, t2
|
|
);
|
|
|
|
executor.wait_for_all();
|
|
|
|
for (size_t i = 0; i < count; ++i) {
|
|
switch (i) {
|
|
case 0:
|
|
REQUIRE(results[i] == 100 + id);
|
|
break;
|
|
|
|
case 1:
|
|
REQUIRE(results[i] == results[0] * 6 + id);
|
|
break;
|
|
|
|
case 2:
|
|
REQUIRE(results[i] == results[0] - 200 + id);
|
|
break;
|
|
|
|
case 3:
|
|
REQUIRE(results[i] == results[0] / 9 + id);
|
|
break;
|
|
|
|
case 4:
|
|
REQUIRE(results[i] == results[2] + 66 + id);
|
|
break;
|
|
|
|
case 5:
|
|
REQUIRE(results[i] == results[2] - 999 + id);
|
|
break;
|
|
|
|
case 6:
|
|
REQUIRE(results[i] == results[2] * 9 / 13 + id);
|
|
break;
|
|
}
|
|
}
|
|
|
|
results.clear();
|
|
tasks.clear();
|
|
}
|
|
}
|
|
|
|
TEST_CASE("SilentDependentAsync.SimpleGraph.1thread" * doctest::timeout(300)) {
|
|
simple_graph(1);
|
|
}
|
|
|
|
TEST_CASE("SilentDependentAsync.SimpleGraph.2threads" * doctest::timeout(300)) {
|
|
simple_graph(2);
|
|
}
|
|
|
|
TEST_CASE("SilentDependentAsync.SimpleGraph.4threads" * doctest::timeout(300)) {
|
|
simple_graph(4);
|
|
}
|
|
|
|
TEST_CASE("SilentDependentAsync.SimpleGraph.8threads" * doctest::timeout(300)) {
|
|
simple_graph(8);
|
|
}
|
|
|
|
TEST_CASE("SilentDependentAsync.SimpleGraph.16threads" * doctest::timeout(300)) {
|
|
simple_graph(16);
|
|
}
|
|
|
|
// task dependence :
|
|
// ----------------------------
|
|
// | |--> 3 --| |
|
|
// | | --> 7 --->|
|
|
// 0 ---| |--> 4 --| |
|
|
// v ^ v
|
|
// --> 2 --| ---------------------> 9
|
|
// ^ v ^
|
|
// 1 ---| |--> 5 --| |
|
|
// | | --> 8 --->|
|
|
// | |--> 6 --| |
|
|
// -----------------------------
|
|
void simple_graph_2(unsigned W) {
|
|
|
|
tf::Executor executor(W);
|
|
|
|
size_t count = 10;
|
|
std::vector<tf::CachelineAligned<int>> results(count);
|
|
std::vector<tf::AsyncTask> tasks1;
|
|
std::vector<tf::AsyncTask> tasks2;
|
|
std::vector<tf::AsyncTask> tasks3;
|
|
std::vector<tf::AsyncTask> tasks4;
|
|
|
|
for (int id = 0; id < 100; ++id) {
|
|
|
|
results.resize(count);
|
|
|
|
auto t0 = executor.silent_dependent_async(
|
|
"t0", [&](){
|
|
results[0].data = 100 + id;
|
|
}
|
|
);
|
|
|
|
auto t1 = executor.silent_dependent_async(
|
|
"t1", [&](){
|
|
results[1].data = 6 * id;
|
|
}
|
|
);
|
|
|
|
auto t2 = executor.silent_dependent_async(
|
|
"t2", [&](){
|
|
results[2].data = results[0].data + results[1].data + id;
|
|
}, t0, t1
|
|
);
|
|
|
|
tasks1.push_back(t2);
|
|
|
|
auto [t3, fu3] = executor.dependent_async(
|
|
"t3", [&](){
|
|
results[3].data = results[2].data + id;
|
|
return results[3].data;
|
|
}, tasks1.begin(), tasks1.end()
|
|
);
|
|
|
|
auto t4 = executor.silent_dependent_async(
|
|
"t4", [&](){
|
|
results[4].data = results[2].data + id;
|
|
}, tasks1.begin(), tasks1.end()
|
|
);
|
|
|
|
auto [t5, fu5] = executor.dependent_async(
|
|
"t5", [&](){
|
|
results[5].data = results[2].data + id;
|
|
return results[5].data;
|
|
}, tasks1.begin(), tasks1.end()
|
|
);
|
|
|
|
auto t6 = executor.silent_dependent_async(
|
|
"t6", [&](){
|
|
results[6].data = results[2].data + id;
|
|
}, tasks1.begin(), tasks1.end()
|
|
);
|
|
|
|
tasks2.push_back(t3);
|
|
tasks2.push_back(t4);
|
|
tasks3.push_back(t5);
|
|
tasks3.push_back(t6);
|
|
|
|
auto [t7, fu7] = executor.dependent_async(
|
|
"t7", [&](){
|
|
results[7].data = results[3].data + results[4].data + id;
|
|
return results[7].data;
|
|
}, tasks2.begin(), tasks2.end()
|
|
);
|
|
|
|
auto t8 = executor.silent_dependent_async(
|
|
"t8", [&](){
|
|
results[8].data = results[5].data + results[6].data + id;
|
|
}, tasks3.begin(), tasks3.end()
|
|
);
|
|
|
|
tasks4.push_back(t0);
|
|
tasks4.push_back(t1);
|
|
tasks4.push_back(t2);
|
|
tasks4.push_back(t7);
|
|
tasks4.push_back(t8);
|
|
|
|
auto [t9, fu9] = executor.dependent_async(
|
|
"t9", [&](){
|
|
results[9].data = results[0].data + results[1].data +
|
|
results[2].data + results[7].data + results[8].data + id;
|
|
return results[9].data;
|
|
}, tasks4.begin(), tasks4.end()
|
|
);
|
|
|
|
|
|
REQUIRE(fu9.get() == results[9].data);
|
|
|
|
REQUIRE(fu3.wait_for(std::chrono::microseconds(1)) == std::future_status::ready);
|
|
REQUIRE(fu3.get() == results[3].data);
|
|
|
|
REQUIRE(fu5.wait_for(std::chrono::microseconds(1)) == std::future_status::ready);
|
|
REQUIRE(fu5.get() == results[5].data);
|
|
|
|
REQUIRE(fu7.wait_for(std::chrono::microseconds(1)) == std::future_status::ready);
|
|
REQUIRE(fu7.get() == results[7].data);
|
|
|
|
for (size_t i = 0; i < count; ++i) {
|
|
switch (i) {
|
|
case 0:
|
|
REQUIRE(results[i].data == 100 + id);
|
|
break;
|
|
|
|
case 1:
|
|
REQUIRE(results[i].data == 6 * id);
|
|
break;
|
|
|
|
case 2:
|
|
REQUIRE(results[i].data == results[0].data + results[1].data + id);
|
|
break;
|
|
|
|
case 3:
|
|
REQUIRE(results[i].data == results[2].data + id);
|
|
break;
|
|
|
|
case 4:
|
|
REQUIRE(results[i].data == results[2].data+ id);
|
|
break;
|
|
|
|
case 5:
|
|
REQUIRE(results[i].data == results[2].data + id);
|
|
break;
|
|
|
|
case 6:
|
|
REQUIRE(results[i].data == results[2].data + id);
|
|
break;
|
|
|
|
case 7:
|
|
REQUIRE(results[i].data == results[3].data + results[4].data + id);
|
|
break;
|
|
|
|
case 8:
|
|
REQUIRE(results[i].data == results[5].data + results[5].data + id);
|
|
break;
|
|
|
|
case 9:
|
|
REQUIRE(results[i].data == results[0].data + results[1].data +
|
|
results[2].data + results[7].data + results[8].data + id);
|
|
break;
|
|
}
|
|
}
|
|
|
|
results.clear();
|
|
tasks1.clear();
|
|
tasks2.clear();
|
|
tasks3.clear();
|
|
tasks4.clear();
|
|
}
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.SimpleGraph2.1thread" * doctest::timeout(300)) {
|
|
simple_graph_2(1);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.SimpleGraph2.2threads" * doctest::timeout(300)) {
|
|
simple_graph_2(2);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.SimpleGraph2.4threads" * doctest::timeout(300)) {
|
|
simple_graph_2(4);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.SimpleGraph2.8threads" * doctest::timeout(300)) {
|
|
simple_graph_2(8);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.SimpleGraph2.16threads" * doctest::timeout(300)) {
|
|
simple_graph_2(16);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------------------
|
|
// Complex Graph
|
|
// -------------------------------------------------------------------------------------
|
|
//
|
|
// task graph
|
|
// ---> 101 ----
|
|
// | . |
|
|
// ---> 1 ----> . ---> 10101 ---
|
|
// | . | . | . |
|
|
// | . ---> 200 ---- . |
|
|
// | . . . |
|
|
// 0 --> . . . ---> 10201
|
|
// | . . . |
|
|
// | . ---> 10001 -- . |
|
|
// | . | . | . |
|
|
// ---> 100 --> . ---> 10200 ---
|
|
// | . |
|
|
// ---> 10100 --
|
|
//
|
|
// level 0 : task 0 has 100 output edges pointing to task 1 to task 100
|
|
// level 1 : task 1 has 100 output edges pointing to task 101 to task 200
|
|
// task 2 has 100 output edges pointing to task 201 to task 300
|
|
// task 100 has 100 output edges pointing to task 10001 to task 10100
|
|
// level 2 : task 101 to task 200 has the same output edge pointing to task 10101
|
|
// task 201 to task 300 has the same output edge pointing to task 10102
|
|
// task 10001 to task 10100 has the same output edge pointing to from task 10200
|
|
// level 3 : task 10101 to task 10200 has the same output edge pointing to task 10201
|
|
|
|
auto make_complex_graph(tf::Executor& executor, int r) {
|
|
|
|
int count = 10202;
|
|
std::vector<tf::CachelineAligned<int>> results(count);
|
|
std::vector<tf::AsyncTask> tasks_level_1;
|
|
std::vector<tf::AsyncTask> tasks_level_2;
|
|
std::vector<tf::AsyncTask> tasks_level_3;
|
|
|
|
// define task 0
|
|
auto task0 = executor.silent_dependent_async(
|
|
"0", [&results, r](){
|
|
results[0].data = 100 + r;
|
|
}
|
|
);
|
|
|
|
// define task 1 to task 100
|
|
// and push them in the vector tasks_level_1
|
|
for (int i = 1; i <= 100; ++i) {
|
|
tasks_level_1.push_back(
|
|
executor.silent_dependent_async(
|
|
[&results, i, r](){
|
|
results[i].data = results[0].data + i + r;
|
|
},
|
|
task0
|
|
)
|
|
);
|
|
}
|
|
|
|
// define task 101 to task 10100
|
|
// and push them in the vector tasks_level_2
|
|
for (int i = 101; i <= 10100; ++i) {
|
|
tasks_level_2.push_back(
|
|
executor.silent_dependent_async(
|
|
[&results, i, r](){
|
|
results[i].data = results[(i-1)/100].data + i + r;
|
|
},
|
|
std::next(tasks_level_1.begin(), (i-1)/100-1),
|
|
std::next(tasks_level_1.begin(), (i-1)/100)
|
|
)
|
|
);
|
|
}
|
|
|
|
// define task 10101 to task 10200
|
|
// and push them in the vector tasks_level_3
|
|
for (int i = 10101; i <= 10200; ++i) {
|
|
tasks_level_3.push_back(
|
|
executor.silent_dependent_async(
|
|
[&results, i, r](){
|
|
int value = 0;
|
|
int beg = i-10101;
|
|
beg = (beg+1)*100+1;
|
|
for (int j = beg; j < beg+100; ++j) {
|
|
value += results[j].data;
|
|
}
|
|
results[i].data = value + i + r;
|
|
},
|
|
std::next(tasks_level_2.begin(),(i-10101)*100),
|
|
std::next(tasks_level_2.begin(), (i-10101)*100+100)
|
|
)
|
|
);
|
|
}
|
|
|
|
// define task 10201
|
|
executor.dependent_async(
|
|
"10201", [&results, r](){
|
|
int value = 0;
|
|
for (int i = 10101; i <= 10200; ++i) {
|
|
value += results[i].data;
|
|
}
|
|
results[10201].data = value + 10201 + r;
|
|
return results[10201].data;
|
|
},
|
|
tasks_level_3.begin(), tasks_level_3.end()
|
|
).second.get();
|
|
|
|
// verify the result
|
|
for (int i = 0; i < 10202; ++i) {
|
|
if (i == 0) {
|
|
REQUIRE(results[i].data == 100 + r);
|
|
}
|
|
|
|
else if (i >= 1 && i <= 100) {
|
|
REQUIRE(results[i].data == results[0].data + i + r);
|
|
}
|
|
|
|
else if (i >= 101 && i <= 10100) {
|
|
REQUIRE(results[i].data == results[(i-1)/100].data + i + r);
|
|
}
|
|
|
|
else if (i >= 10101 && i <= 10200) {
|
|
int value = 0;
|
|
int beg = i-10101;
|
|
beg = (beg+1)*100+1;
|
|
for (int j = beg; j < beg+100; ++j) {
|
|
value += results[j].data;
|
|
}
|
|
REQUIRE(results[i].data == value + i + r);
|
|
}
|
|
|
|
else if (i == 10201) {
|
|
int value = 0;
|
|
for (int j = 10101; j <= 10200; ++j) {
|
|
value += results[j].data;
|
|
}
|
|
REQUIRE(results[i].data == value + r + 10201);
|
|
}
|
|
}
|
|
}
|
|
|
|
void complex_graph(unsigned W) {
|
|
tf::Executor executor(W);
|
|
for (int r = 0; r < 10; ++r) {
|
|
make_complex_graph(executor, r);
|
|
}
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ComplexGraph.1thread" * doctest::timeout(300)) {
|
|
complex_graph(1);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ComplexGraph.2threads" * doctest::timeout(300)) {
|
|
complex_graph(2);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ComplexGraph.4threads" * doctest::timeout(300)) {
|
|
complex_graph(4);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ComplexGraph.8threads" * doctest::timeout(300)) {
|
|
complex_graph(8);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ComplexGraph.16threads" * doctest::timeout(300)) {
|
|
complex_graph(16);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Complex Worker From Worker
|
|
// ----------------------------------------------------------------------------
|
|
|
|
// since make_complex_graph blocks so W must be at least one larger than R
|
|
void complex_graph_from_worker(unsigned W, int R) {
|
|
tf::Executor executor(W);
|
|
tf::Taskflow taskflow;
|
|
for(int r=0; r<R; r++) {
|
|
taskflow.emplace([&executor, r](){
|
|
make_complex_graph(executor, r);
|
|
});
|
|
}
|
|
executor.run(taskflow).wait();
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ComplexGraphFromWorker.2threads" * doctest::timeout(300)) {
|
|
complex_graph_from_worker(2, 1);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ComplexGraphFromWorker.4threads" * doctest::timeout(300)) {
|
|
complex_graph_from_worker(4, 3);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ComplexGraphFromWorker.8threads" * doctest::timeout(300)) {
|
|
complex_graph_from_worker(8, 7);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ComplexGraphFromWorker.16threads" * doctest::timeout(300)) {
|
|
complex_graph_from_worker(16, 10);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Binary Tree
|
|
// ----------------------------------------------------------------------------
|
|
|
|
void binary_tree(unsigned W) {
|
|
|
|
size_t L = 16;
|
|
|
|
tf::Executor executor(W);
|
|
|
|
std::vector<int> data(1<<L, 0);
|
|
|
|
std::vector<tf::AsyncTask> tasks_p, tasks_c;
|
|
std::array<tf::AsyncTask, 1> dep;
|
|
size_t task_id = 1;
|
|
|
|
// iterate all other tasks level by level
|
|
for(size_t i=0; i<L; i++) {
|
|
for(size_t n=0; n < static_cast<size_t>(1<<i); n++) {
|
|
if(task_id == 1) {
|
|
tasks_c.push_back(
|
|
executor.silent_dependent_async(
|
|
[task_id, &data](){
|
|
data[task_id] = 1;
|
|
}
|
|
)
|
|
);
|
|
}
|
|
else {
|
|
dep[0] = tasks_p[n/2];
|
|
tasks_c.push_back(
|
|
executor.dependent_async(
|
|
[task_id, &data](){
|
|
data[task_id] = data[task_id/2] + 1;
|
|
},
|
|
dep.begin(), dep.end()
|
|
).first
|
|
);
|
|
}
|
|
task_id++;
|
|
}
|
|
tasks_p = std::move(tasks_c);
|
|
}
|
|
|
|
executor.wait_for_all();
|
|
|
|
task_id = 1;
|
|
for(size_t i=0; i<L; i++) {
|
|
for(size_t n=0; n<static_cast<size_t>(1<<i); n++) {
|
|
REQUIRE(data[task_id] == i + 1);
|
|
//printf("data[%zu]=%d\n", task_id, data[task_id]);
|
|
task_id++;
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.BinaryTree.1thread" * doctest::timeout(300)) {
|
|
binary_tree(1);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.BinaryTree.2threads" * doctest::timeout(300)) {
|
|
binary_tree(2);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.BinaryTree.3threads" * doctest::timeout(300)) {
|
|
binary_tree(3);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.BinaryTree.4threads" * doctest::timeout(300)) {
|
|
binary_tree(4);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.BinaryTree.8threads" * doctest::timeout(300)) {
|
|
binary_tree(8);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.BinaryTree.16threads" * doctest::timeout(300)) {
|
|
binary_tree(16);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Linear Chain with Complete Dependencies (N choose 2)
|
|
// ----------------------------------------------------------------------------
|
|
|
|
void complete_linear_chain(unsigned W) {
|
|
|
|
tf::Executor executor0(W);
|
|
tf::Executor executor1(W);
|
|
|
|
int N = 2000;
|
|
std::vector<tf::CachelineAligned<int>> results(2*N);
|
|
std::vector<tf::AsyncTask> tasks;
|
|
|
|
// executor 0
|
|
for (int i = 0; i < N; ++i) {
|
|
if (i == 0) {
|
|
auto t = executor0.silent_dependent_async(
|
|
[&results, i](){
|
|
results[i].data = i+1;
|
|
}
|
|
);
|
|
tasks.push_back(t);
|
|
}
|
|
else {
|
|
auto t = executor0.silent_dependent_async(
|
|
[&results, i](){
|
|
results[i].data = results[i-1].data + i;
|
|
}, tasks.begin(), tasks.end()
|
|
);
|
|
tasks.push_back(t);
|
|
}
|
|
}
|
|
|
|
executor0.wait_for_all();
|
|
|
|
REQUIRE(results[0].data == 1);
|
|
for (int i = 1; i < N; ++i) {
|
|
REQUIRE(results[i].data == results[i-1].data + i);
|
|
}
|
|
|
|
tasks.clear();
|
|
|
|
// executor 1
|
|
for (int i = 0; i < N; ++i) {
|
|
if (i == 0) {
|
|
auto t = executor1.silent_dependent_async(
|
|
[&results, i, N](){
|
|
results[i+N].data = results[i-1+N].data + i;
|
|
}
|
|
);
|
|
tasks.push_back(t);
|
|
}
|
|
else {
|
|
auto t = executor1.silent_dependent_async(
|
|
[&results, i, N](){
|
|
results[i+N].data = results[i-1+N].data + i;
|
|
}, tasks.begin(), tasks.end()
|
|
);
|
|
tasks.push_back(t);
|
|
}
|
|
}
|
|
|
|
executor1.wait_for_all();
|
|
|
|
REQUIRE(results[0+N].data == results[0+N-1].data);
|
|
|
|
for (int i = 1; i < N; ++i) {
|
|
REQUIRE(results[i+N].data == results[i-1+N].data + i);
|
|
}
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.CompleteLinearChain.1thread" * doctest::timeout(300)) {
|
|
complete_linear_chain(1);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.CompleteLinearChain.2threads" * doctest::timeout(300)) {
|
|
complete_linear_chain(2);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.CompleteLinearChain.4threads" * doctest::timeout(300)) {
|
|
complete_linear_chain(4);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.CompleteLinearChain.8threads" * doctest::timeout(300)) {
|
|
complete_linear_chain(8);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.CompleteLinearChain.16threads" * doctest::timeout(300)) {
|
|
complete_linear_chain(16);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Parallel Graph Construction
|
|
// ----------------------------------------------------------------------------
|
|
|
|
// multiple workers to construct a pascal diagram simultaneously
|
|
// 0 1 2 3
|
|
// |/|/| /
|
|
// 4 5 6
|
|
// |/| /
|
|
// 7 8
|
|
// |/
|
|
// 9
|
|
|
|
void parallel_graph_construction(unsigned W) {
|
|
|
|
tf::Taskflow taskflow;
|
|
tf::Executor executor(W);
|
|
|
|
int L = 500;
|
|
int id = 0;
|
|
std::vector<tf::Task> tasks((1+L)*L/2);
|
|
std::vector<int> data((1+L)*L/2, -1);
|
|
|
|
std::vector<tf::AsyncTask> async_tasks((1+L)*L/2);
|
|
|
|
for(int l=L; l>=1; l--) {
|
|
for(int i=0; i<l; i++) {
|
|
|
|
int pr = id - l;
|
|
int pl = id - l - 1;
|
|
|
|
tasks[id] = taskflow.emplace([&, pr, pl, id](){
|
|
|
|
if(pl >= 0 && pr >= 0) {
|
|
REQUIRE(async_tasks[pl].empty() == false);
|
|
REQUIRE(async_tasks[pr].empty() == false);
|
|
async_tasks[id] = executor.silent_dependent_async([&, pr, pl, id](){
|
|
REQUIRE(data[pr] == pr);
|
|
REQUIRE(data[pl] == pl);
|
|
data[id] = id;
|
|
}, async_tasks[pl], async_tasks[pr]);
|
|
}
|
|
else {
|
|
async_tasks[id] = executor.silent_dependent_async([&, id](){
|
|
data[id] = id;
|
|
});
|
|
}
|
|
|
|
}).name(std::to_string(id));
|
|
|
|
if(pr >= 0) {
|
|
tasks[id].succeed(tasks[pr]);
|
|
}
|
|
|
|
if(pl >= 0) {
|
|
tasks[id].succeed(tasks[pl]);
|
|
}
|
|
++id;
|
|
}
|
|
}
|
|
|
|
executor.run(taskflow);
|
|
executor.wait_for_all();
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ParallelGraphConstruction.1thread" * doctest::timeout(300)) {
|
|
parallel_graph_construction(1);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ParallelGraphConstruction.2threads" * doctest::timeout(300)) {
|
|
parallel_graph_construction(2);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ParallelGraphConstruction.4threads" * doctest::timeout(300)) {
|
|
parallel_graph_construction(4);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ParallelGraphConstruction.8threads" * doctest::timeout(300)) {
|
|
parallel_graph_construction(8);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.ParallelGraphConstruction.16threads" * doctest::timeout(300)) {
|
|
parallel_graph_construction(16);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Iterative Fibonacci
|
|
// ----------------------------------------------------------------------------
|
|
std::vector<unsigned long long int> fibonacci{0,1,1,2,3,5,8,13,21,34,55,89,144,233,377,610,987,1597,2584,4181,6765,10946,17711,28657,46368,75025,121393,196418,317811,514229,832040,1346269,2178309,3524578,5702887,9227465,14930352,24157817,39088169,63245986,102334155,165580141,267914296,433494437,701408733,1134903170,1836311903,2971215073,4807526976,7778742049,12586269025,20365011074,32951280099,53316291173,86267571272,139583862445,225851433717,365435296162,591286729879,956722026041,1548008755920,2504730781961,4052739537881,6557470319842,10610209857723,17167680177565,27777890035288,44945570212853,72723460248141,117669030460994,190392490709135,308061521170129,498454011879264,806515533049393,1304969544928657,2111485077978050,3416454622906707,5527939700884757,8944394323791464,14472334024676221,23416728348467685,37889062373143906,61305790721611591,99194853094755497,160500643816367088,259695496911122585,420196140727489673,679891637638612258,1100087778366101931,1779979416004714189,2880067194370816120,4660046610375530309,7540113804746346429};
|
|
void iterative_fibonacci(unsigned W) {
|
|
|
|
tf::Executor executor(W);
|
|
|
|
std::vector<tf::AsyncTask> tasks;
|
|
|
|
unsigned long long int val_n_1 = 0, val_n_2 = 0;
|
|
|
|
for (int i = 0; i <= 92; ++i) {
|
|
if (i < 2) {
|
|
auto [t, fut] = executor.dependent_async([i](){ return i; });
|
|
tasks.push_back(t);
|
|
val_n_2 = val_n_1;
|
|
val_n_1 = fut.get();
|
|
}
|
|
else {
|
|
auto [t, fut] = executor.dependent_async([val_n_1, val_n_2](){
|
|
return val_n_2 + val_n_1;
|
|
}, tasks[i-1], tasks[i-2]);
|
|
|
|
tasks.push_back(t);
|
|
val_n_2 = val_n_1;
|
|
val_n_1 = fut.get();
|
|
}
|
|
REQUIRE(val_n_1 == fibonacci[i]);
|
|
}
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.IterativeFibonacci.1thread" * doctest::timeout(300)) {
|
|
iterative_fibonacci(1);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.IterativeFibonacci.2threads" * doctest::timeout(300)) {
|
|
iterative_fibonacci(2);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.IterativeFibonacci.4threads" * doctest::timeout(300)) {
|
|
iterative_fibonacci(4);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.IterativeFibonacci.8threads" * doctest::timeout(300)) {
|
|
iterative_fibonacci(8);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Recursive Fibonacci
|
|
// ----------------------------------------------------------------------------
|
|
void recursive_fibonacci(unsigned W) {
|
|
|
|
tf::Executor executor(W);
|
|
|
|
std::function<int(int)> fib;
|
|
|
|
fib = [&](int N){
|
|
|
|
if (N < 2) {
|
|
return N;
|
|
}
|
|
|
|
std::future<int> fu1, fu2;
|
|
tf::AsyncTask t1, t2;
|
|
|
|
std::tie(t1, fu1) = executor.dependent_async(std::bind(fib, N-1));
|
|
std::tie(t2, fu2) = executor.dependent_async(std::bind(fib, N-2));
|
|
|
|
executor.corun_until([&](){ return t1.is_done() && t2.is_done(); });
|
|
|
|
return fu1.get() + fu2.get();
|
|
};
|
|
|
|
for (size_t i = 0; i <= 11; ++i) {
|
|
auto [tn, fun] = executor.dependent_async(std::bind(fib, i));
|
|
REQUIRE(fun.get() == fibonacci[i]);
|
|
}
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.RecursiveFibonacci.1thread" * doctest::timeout(300)) {
|
|
recursive_fibonacci(1);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.RecursiveFibonacci.2threads" * doctest::timeout(300)) {
|
|
recursive_fibonacci(2);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.RecursiveFibonacci.4threads" * doctest::timeout(300)) {
|
|
recursive_fibonacci(4);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.RecursiveFibonacci.8threads" * doctest::timeout(300)) {
|
|
recursive_fibonacci(8);
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Mixed algorithms
|
|
// ----------------------------------------------------------------------------
|
|
|
|
void mixed_algorithms(unsigned W) {
|
|
|
|
size_t N = 65536;
|
|
|
|
tf::Executor executor(W);
|
|
|
|
int sum1{1}, sum2{1};
|
|
std::vector<int> data(N), data1(N), data2(N), data3(N), data4(N);
|
|
|
|
// initialize data to 10
|
|
tf::AsyncTask A = executor.silent_dependent_async(tf::make_for_each_task(
|
|
data.begin(), data.begin() + N/2, [](int& d){ d = 10; }
|
|
));
|
|
|
|
tf::AsyncTask B = executor.silent_dependent_async(tf::make_for_each_index_task(
|
|
N/2, N, size_t{1}, [&] (size_t i) { data[i] = 10; }
|
|
));
|
|
|
|
// data1[i] = [11, 11, 11, ...]
|
|
tf::AsyncTask T1 = executor.silent_dependent_async(tf::make_transform_task(
|
|
data.begin(), data.end(), data1.begin(), [](int& d) { return d+1; }
|
|
), A, B);
|
|
|
|
// data2[i] = [12, 12, 12, ...]
|
|
tf::AsyncTask T2 = executor.silent_dependent_async(tf::make_transform_task(
|
|
data.begin(), data.end(), data2.begin(), [](int& d) { return d+2; }
|
|
), A, B);
|
|
|
|
// data3[i] = [13, 13, 13, ...]
|
|
tf::AsyncTask T3 = executor.silent_dependent_async(tf::make_transform_task(
|
|
data.begin(), data.end(), data3.begin(), [](int& d) { return d+3; }
|
|
), A, B);
|
|
|
|
// data4[i] = [1, 1, 1, ...]
|
|
tf::AsyncTask T4 = executor.silent_dependent_async(tf::make_transform_task(
|
|
data1.begin(), data1.end(), data2.begin(), data4.begin(),
|
|
[](int a, int b){ return b - a; }
|
|
), T1, T2);
|
|
|
|
// sum1 = 1 + [-1-1-1-1...]
|
|
tf::AsyncTask T5 = executor.silent_dependent_async(tf::make_transform_reduce_task(
|
|
data4.begin(), data4.end(), sum1, std::plus<int>{}, [](int d){ return -d; }
|
|
), T4);
|
|
|
|
tf::AsyncTask T6 = executor.silent_dependent_async(tf::make_transform_reduce_task(
|
|
data4.begin(), data4.end(), data3.begin(), sum2, std::plus<int>{}, std::plus<int>{}
|
|
), T3, T4);
|
|
|
|
// inclusive scan over data1 [11, 22, 33, 44, ...]
|
|
tf::AsyncTask T7 = executor.silent_dependent_async(tf::make_inclusive_scan_task(
|
|
data1.begin(), data1.end(), data1.begin(), std::plus<int>{}
|
|
), T5, T6);
|
|
|
|
// exclusive scan over data2 [-1, 11, 23, 35, ...]
|
|
tf::AsyncTask T8 = executor.silent_dependent_async(tf::make_exclusive_scan_task(
|
|
data2.begin(), data2.end(), data2.begin(), -1, std::plus<int>{}
|
|
), T5, T6);
|
|
|
|
// transform inclusive scan over data3 [-13, -26, -39, ...]
|
|
tf::AsyncTask T9 = executor.silent_dependent_async(tf::make_transform_inclusive_scan_task(
|
|
data3.begin(), data3.end(), data3.begin(), std::plus<int>{},
|
|
[](int i){ return -i; }
|
|
), T5, T6);
|
|
|
|
// transform exclusive scan over data4 [7, 6, 5, 4, ...]
|
|
tf::AsyncTask T10 = executor.silent_dependent_async(tf::make_transform_exclusive_scan_task(
|
|
data4.begin(), data4.end(), data4.begin(), 7, std::plus<int>{},
|
|
[](int i){ return -i; }
|
|
), T5, T6);
|
|
|
|
// sort data4
|
|
tf::AsyncTask T11 = executor.silent_dependent_async(tf::make_sort_task(
|
|
data4.begin(), data4.end()
|
|
), T10);
|
|
|
|
executor.wait_for_all();
|
|
|
|
REQUIRE(sum1 == 1-N);
|
|
REQUIRE(sum2 == 1+N*14);
|
|
|
|
for(size_t i=0; i<N; i++) {
|
|
REQUIRE(data [i] == 10);
|
|
REQUIRE(data1[i] == (i+1)*11);
|
|
REQUIRE(data2[i] == i*12 - 1);
|
|
REQUIRE(data3[i] == (i+1)*-13);
|
|
REQUIRE(data4[N-i-1] == 7-i);
|
|
//printf(
|
|
// "data 0|1|2|3|4 [%2zu]=%5d|%5d|%5d|%5d|%5d\n",
|
|
// i, data[i], data1[i], data2[i], data3[i], data4[i]
|
|
//);
|
|
}
|
|
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.MixedAlgorithms.1thread" * doctest::timeout(300)) {
|
|
mixed_algorithms(1);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.MixedAlgorithms.2threads" * doctest::timeout(300)) {
|
|
mixed_algorithms(2);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.MixedAlgorithms.3threads" * doctest::timeout(300)) {
|
|
mixed_algorithms(3);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.MixedAlgorithms.4threads" * doctest::timeout(300)) {
|
|
mixed_algorithms(4);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.MixedAlgorithms.5threads" * doctest::timeout(300)) {
|
|
mixed_algorithms(5);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.MixedAlgorithms.6threads" * doctest::timeout(300)) {
|
|
mixed_algorithms(6);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.MixedAlgorithms.7threads" * doctest::timeout(300)) {
|
|
mixed_algorithms(7);
|
|
}
|
|
|
|
TEST_CASE("DependentAsync.MixedAlgorithms.8threads" * doctest::timeout(300)) {
|
|
mixed_algorithms(8);
|
|
}
|
|
|