mesytec-mnode/external/taskflow-3.8.0/doxygen/algorithms/data_pipeline.dox

198 lines
6 KiB
Text
Raw Permalink Normal View History

2025-01-04 01:25:05 +01:00
namespace tf {
/** @page DataParallelPipeline Data-parallel Pipeline
@tableofcontents
%Taskflow provides another variant, tf::DataPipeline, on top of
tf::Pipeline (see @ref TaskParallelPipeline) to help you
implement data-parallel pipeline algorithms while
leaving data management to %Taskflow.
We recommend you finishing reading TaskParallelPipeline first
before learning tf::DataPipeline.
@section ParallelDataPipelineIncludeHeaderFile Include the Header
You need to include the header file, <tt>%taskflow/algorithm/data_pipeline.hpp</tt>,
for implementing data-parallel pipeline algorithms.
@code{.cpp}
#include <taskflow/algorithm/data_pipeline.hpp>
@endcode
@section CreateADataPipelineModuleTask Create a Data Pipeline Module Task
Similar to creating a task-parallel pipeline (tf::Pipeline),
there are three steps to create a data-parallel pipeline application:
1. Define the pipeline structure (e.g., pipe type, pipe callable, stopping rule, line count)
2. Define the data storage and layout, if needed for the application
3. Define the pipeline taskflow graph using composition
The following example creates a data-parallel pipeline that generates a total of
five dataflow tokens
from `void` to `int` at the first stage,
from `int` to `%std::string` at the second stage,
and `%std::string` to `void` at the final stage.
Data storage between stages is automatically managed by tf::DataPipeline.
@code{.cpp}
#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/data_pipeline.hpp>
int main() {
// data flow => void -> int -> std::string -> void
tf::Taskflow taskflow("pipeline");
tf::Executor executor;
const size_t num_lines = 4;
// create a pipeline graph
tf::DataPipeline pl(num_lines,
tf::make_data_pipe<void, int>(tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) -> int{
if(pf.token() == 5) {
pf.stop();
return 0;
}
else {
printf("first pipe returns %lu\n", pf.token());
return pf.token();
}
}),
tf::make_data_pipe<int, std::string>(tf::PipeType::SERIAL, [](int& input) {
printf("second pipe returns a string of %d\n", input + 100);
return std::to_string(input + 100);
}),
tf::make_data_pipe<std::string, void>(tf::PipeType::SERIAL, [](std::string& input) {
printf("third pipe receives the input string %s\n", input.c_str());
})
);
// build the pipeline graph using composition
taskflow.composed_of(pl).name("pipeline");
// dump the pipeline graph structure (with composition)
taskflow.dump(std::cout);
// run the pipeline
executor.run(taskflow).wait();
return 0;
}
@endcode
The interface of tf::DataPipeline is very similar to tf::Pipeline, except that
the library transparently manages the dataflow between pipes.
To create a stage in a data-parallel pipeline,
you should always use the helper function tf::make_data_pipe:
@code{.cpp}
tf::make_data_pipe<int, std::string>(
tf::PipeType::SERIAL,
[](int& input) {
return std::to_string(input + 100);
}
);
@endcode
The helper function starts with a pair of an input and an output types
in its template arguments.
Both types will always be decayed to their original form using std::decay
(e.g., `const int&` becomes `int`) for storage purpose.
In terms of function arguments,
the first argument specifies the direction of this data pipe,
which can be either tf::PipeType::SERIAL or tf::PipeType::PARALLEL,
and the second argument is a callable to invoke by the pipeline scheduler.
The callable must take the input data type in its first argument
and returns a value of the output data type.
Additionally,
the callable can take a tf::Pipeflow reference in its second argument
which allows you to query the runtime information of a stage task,
such as its line number and token number.
@code{.cpp}
tf::make_data_pipe<int, std::string>(
tf::PipeType::SERIAL,
[](int& input, tf::Pipeflow& pf) {
printf("token=%lu, line=%lu\n", pf.token(), pf.line());
return std::to_string(input + 100);
}
)
@endcode
@note
By default, tf::DataPipeline passes the data
in reference to your callable at which you can take it in copy or in reference
depending on application needs.
For the first pipe, the input type should always be `void` and
the callable must take a tf::Pipeflow reference in its argument.
In this example, we will stop the pipeline when processing five tokens.
@code{.cpp}
tf::make_data_pipe<void, int>(tf::PipeType::SERIAL, [](tf::Pipeflow& pf) -> int{
if(pf.token() == 5) {
pf.stop();
return 0; // returns a dummy value
}
else {
return pf.token();
}
}),
@endcode
Similarly, the output type of the last pipe should be `void` as no more data
will go out of the final pipe.
@code{.cpp}
tf::make_data_pipe<std::string, void>(tf::PipeType::SERIAL, [](std::string& input) {
std::cout << input << std::endl;
})
@endcode
Finally, you need to compose the pipeline graph by creating a module task
(i.e., tf::Taskflow::compoased_of).
@code{.cpp}
// build the pipeline graph using composition
taskflow.composed_of(pl).name("pipeline");
// dump the pipeline graph structure (with composition)
taskflow.dump(std::cout);
// run the pipeline
executor.run(taskflow).wait();
@endcode
@dotfile images/pipeline_basic_dependency_graph.dot
@section UnderstandInternalDataStorage Understand Internal Data Storage
By default, tf::DataPipeline uses @std_variant to store a type-safe union of
all input and output data types extracted from the given data pipes.
To avoid false sharing, each line keeps a variant that is aligned
with the cacheline size.
When invoking a pipe callable, the input data is acquired in reference
from the variant using @std_variant_get.
When returning from a pipe callable, the output data is stored back
to the variant using assignment operator.
@section DataParallelPipelineLearnMore Learn More about Taskflow Pipeline
Visit the following pages to learn more about pipeline:
1. @ref TaskParallelPipeline
2. @ref TaskParallelScalablePipeline
3. @ref TextProcessingPipeline
4. @ref GraphProcessingPipeline
5. @ref TaskflowProcessingPipeline
*/
}