mesytec-mnode/external/taskflow-3.8.0/examples/text_pipeline.cpp
2025-01-04 01:25:05 +01:00

125 lines
3.1 KiB
C++

// This program demonstrates how to create a pipeline scheduling framework
// that computes the maximum occurrence of the character for each input string.
//
// The pipeline has the following structure:
//
// o -> o -> o
// | |
// v v
// o -> o -> o
// | |
// v v
// o -> o -> o
// | |
// v v
// o -> o -> o (string -> unordered_map<char, size_t> -> pair<char, size_t>)
//
// Input:
// abade
// ddddf
// eefge
// xyzzd
// ijjjj
// jiiii
// kkijk
//
// Output:
// a:2
// d:4
// e:3
// z:2
// j:4
// i:4
// k:3
#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
// Function: format the map
std::string format_map(const std::unordered_map<char, size_t>& map) {
std::ostringstream oss;
for(const auto& [i, j] : map) {
oss << i << ':' << j << ' ';
}
return oss.str();
}
int main() {
tf::Taskflow taskflow("text-processing pipeline");
tf::Executor executor;
const size_t num_lines = 2;
// input data
std::vector<std::string> input = {
"abade",
"ddddf",
"eefge",
"xyzzd",
"ijjjj",
"jiiii",
"kkijk"
};
// custom data storage
using data_type = std::variant<
std::string, std::unordered_map<char, size_t>, std::pair<char, size_t>
>;
std::array<data_type, num_lines> buffer;
// the pipeline consists of three pipes (serial-parallel-serial)
// and up to two concurrent scheduling tokens
tf::Pipeline pl(num_lines,
// first pipe processes the input data
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
if(pf.token() == input.size()) {
pf.stop();
}
else {
buffer[pf.line()] = input[pf.token()];
printf("stage 1: input token = %s\n", input[pf.token()].c_str());
}
}},
// second pipe counts the frequency of each character
tf::Pipe{tf::PipeType::PARALLEL, [&](tf::Pipeflow& pf) {
std::unordered_map<char, size_t> map;
for(auto c : std::get<std::string>(buffer[pf.line()])) {
map[c]++;
}
buffer[pf.line()] = map;
printf("stage 2: map = %s\n", format_map(map).c_str());
}},
// third pipe reduces the most frequent character
tf::Pipe{tf::PipeType::SERIAL, [&buffer](tf::Pipeflow& pf) {
auto& map = std::get<std::unordered_map<char, size_t>>(buffer[pf.line()]);
auto sol = std::max_element(map.begin(), map.end(), [](auto& a, auto& b){
return a.second < b.second;
});
printf("stage 3: %c:%zu\n", sol->first, sol->second);
}}
);
// build the pipeline graph using composition
tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
.name("starting pipeline");
tf::Task task = taskflow.composed_of(pl)
.name("pipeline");
tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; })
.name("pipeline stopped");
// create task dependency
init.precede(task);
task.precede(stop);
// dump the pipeline graph structure (with composition)
taskflow.dump(std::cout);
// run the pipeline
executor.run(taskflow).wait();
return 0;
}