mesytec-mnode/external/taskflow-3.8.0/docs/TextProcessingPipeline.html
2025-01-04 01:25:05 +01:00

353 lines
46 KiB
HTML

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Learning from Examples &raquo; Text Processing Pipeline | Taskflow QuickStart</title>
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Source+Sans+Pro:400,400i,600,600i%7CSource+Code+Pro:400,400i,600" />
<link rel="stylesheet" href="m-dark+documentation.compiled.css" />
<link rel="icon" href="favicon.ico" type="image/vnd.microsoft.icon" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta name="theme-color" content="#22272e" />
</head>
<body>
<header><nav id="navigation">
<div class="m-container">
<div class="m-row">
<span id="m-navbar-brand" class="m-col-t-8 m-col-m-none m-left-m">
<a href="https://taskflow.github.io"><img src="taskflow_logo.png" alt="" />Taskflow</a> <span class="m-breadcrumb">|</span> <a href="index.html" class="m-thin">QuickStart</a>
</span>
<div class="m-col-t-4 m-hide-m m-text-right m-nopadr">
<a href="#search" class="m-doc-search-icon" title="Search" onclick="return showSearch()"><svg style="height: 0.9rem;" viewBox="0 0 16 16">
<path id="m-doc-search-icon-path" d="m6 0c-3.31 0-6 2.69-6 6 0 3.31 2.69 6 6 6 1.49 0 2.85-0.541 3.89-1.44-0.0164 0.338 0.147 0.759 0.5 1.15l3.22 3.79c0.552 0.614 1.45 0.665 2 0.115 0.55-0.55 0.499-1.45-0.115-2l-3.79-3.22c-0.392-0.353-0.812-0.515-1.15-0.5 0.895-1.05 1.44-2.41 1.44-3.89 0-3.31-2.69-6-6-6zm0 1.56a4.44 4.44 0 0 1 4.44 4.44 4.44 4.44 0 0 1-4.44 4.44 4.44 4.44 0 0 1-4.44-4.44 4.44 4.44 0 0 1 4.44-4.44z"/>
</svg></a>
<a id="m-navbar-show" href="#navigation" title="Show navigation"></a>
<a id="m-navbar-hide" href="#" title="Hide navigation"></a>
</div>
<div id="m-navbar-collapse" class="m-col-t-12 m-show-m m-col-m-none m-right-m">
<div class="m-row">
<ol class="m-col-t-6 m-col-m-none">
<li><a href="pages.html">Handbook</a></li>
<li><a href="namespaces.html">Namespaces</a></li>
</ol>
<ol class="m-col-t-6 m-col-m-none" start="3">
<li><a href="annotated.html">Classes</a></li>
<li><a href="files.html">Files</a></li>
<li class="m-show-m"><a href="#search" class="m-doc-search-icon" title="Search" onclick="return showSearch()"><svg style="height: 0.9rem;" viewBox="0 0 16 16">
<use href="#m-doc-search-icon-path" />
</svg></a></li>
</ol>
</div>
</div>
</div>
</div>
</nav></header>
<main><article>
<div class="m-container m-container-inflatable">
<div class="m-row">
<div class="m-col-l-10 m-push-l-1">
<h1>
<span class="m-breadcrumb"><a href="Examples.html">Learning from Examples</a> &raquo;</span>
Text Processing Pipeline
</h1>
<nav class="m-block m-default">
<h3>Contents</h3>
<ul>
<li><a href="#FormulateTheTextProcessingPipelineProblem">Formulate the Text Processing Pipeline Problem</a></li>
<li>
<a href="#CreateAParallelTextPipeline">Create a Text Processing Pipeline</a>
<ul>
<li><a href="#TextPipelineDefineTheDataBuffer">Define the Data Buffer</a></li>
<li><a href="#TextPipelineDefineThePipes">Define the Pipes</a></li>
<li><a href="#TextPipelineDefineTheTaskGraph">Define the Task Graph</a></li>
<li><a href="#TextPipelineSubmitTheTaskGraph">Submit the Task Graph</a></li>
</ul>
</li>
</ul>
</nav>
<p>We study a text processing pipeline that finds the most frequent character of each string from an input source. Parallelism exhibits in the form of a three-stage pipeline that transforms the input string to a final pair type.</p><section id="FormulateTheTextProcessingPipelineProblem"><h2><a href="#FormulateTheTextProcessingPipelineProblem">Formulate the Text Processing Pipeline Problem</a></h2><p>Given an input vector of strings, we want to compute the most frequent character for each string using a series of transform operations. For example:</p><pre class="m-console"><span class="gp"># </span>input strings
<span class="go">abade</span>
<span class="go">ddddf</span>
<span class="go">eefge</span>
<span class="go">xyzzd</span>
<span class="go">ijjjj</span>
<span class="go">jiiii</span>
<span class="go">kkijk</span>
<span class="gp"># </span>output
<span class="go">a:2</span>
<span class="go">d:4</span>
<span class="go">e:3</span>
<span class="go">z:2</span>
<span class="go">j:4</span>
<span class="go">i:4</span>
<span class="go">k:3</span></pre><p>We decompose the algorithm into three stages:</p><ol><li>read a <code>std::string</code> from the input vector</li><li>generate a <code>std::unorder_map&lt;char, size_t&gt;</code> frequency map from the string</li><li>reduce the most frequent character to a <code>std::pair&lt;char, size_t&gt;</code> from the map</li></ol><p>The first and the third stages process inputs and generate results in serial, and the second stage can run in parallel. The algorithm is a perfect fit to pipeline parallelism, as different stages can overlap with each other in time across parallel lines.</p></section><section id="CreateAParallelTextPipeline"><h2><a href="#CreateAParallelTextPipeline">Create a Text Processing Pipeline</a></h2><p>We create a pipeline of three pipes (stages) and two parallel lines to solve the problem. The number of parallel lines is a tunable parameter. In most cases, we can just use <code><a href="http://en.cppreference.com/w/cpp/thread/thread/hardware_concurrency.html" class="m-doc-external">std::<wbr />thread::<wbr />hardware_concurrency</a></code> as the line count. The first pipe reads an input string from the vector in order, the second pipe transforms the input string from the first pipe to a frequency map in parallel, and the third pipe reduces the frequency map to find the most frequent character. The overall implementation is shown below:</p><pre class="m-code"><span class="cp">#include</span><span class="w"> </span><span class="cpf">&lt;taskflow/taskflow.hpp&gt;</span>
<span class="cp">#include</span><span class="w"> </span><span class="cpf">&lt;taskflow/algorithm/pipeline.hpp&gt;</span>
<span class="c1">// Function: format the map</span>
<span class="n">std</span><span class="o">::</span><span class="n">string</span><span class="w"> </span><span class="nf">format_map</span><span class="p">(</span><span class="k">const</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">unordered_map</span><span class="o">&lt;</span><span class="kt">char</span><span class="p">,</span><span class="w"> </span><span class="kt">size_t</span><span class="o">&gt;&amp;</span><span class="w"> </span><span class="n">map</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">ostringstream</span><span class="w"> </span><span class="n">oss</span><span class="p">;</span>
<span class="w"> </span><span class="k">for</span><span class="p">(</span><span class="k">const</span><span class="w"> </span><span class="k">auto</span><span class="o">&amp;</span><span class="w"> </span><span class="p">[</span><span class="n">i</span><span class="p">,</span><span class="w"> </span><span class="n">j</span><span class="p">]</span><span class="w"> </span><span class="o">:</span><span class="w"> </span><span class="n">map</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">oss</span><span class="w"> </span><span class="o">&lt;&lt;</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">&lt;&lt;</span><span class="w"> </span><span class="sc">&#39;:&#39;</span><span class="w"> </span><span class="o">&lt;&lt;</span><span class="w"> </span><span class="n">j</span><span class="w"> </span><span class="o">&lt;&lt;</span><span class="w"> </span><span class="sc">&#39; &#39;</span><span class="p">;</span>
<span class="w"> </span><span class="p">}</span>
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">oss</span><span class="p">.</span><span class="n">str</span><span class="p">();</span>
<span class="p">}</span>
<span class="kt">int</span><span class="w"> </span><span class="nf">main</span><span class="p">()</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">tf</span><span class="o">::</span><span class="n">Taskflow</span><span class="w"> </span><span class="n">taskflow</span><span class="p">(</span><span class="s">&quot;text-filter pipeline&quot;</span><span class="p">);</span>
<span class="w"> </span><span class="n">tf</span><span class="o">::</span><span class="n">Executor</span><span class="w"> </span><span class="n">executor</span><span class="p">;</span>
<span class="w"> </span><span class="k">const</span><span class="w"> </span><span class="kt">size_t</span><span class="w"> </span><span class="n">num_lines</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="mi">2</span><span class="p">;</span>
<span class="w"> </span><span class="c1">// input data </span>
<span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">vector</span><span class="o">&lt;</span><span class="n">std</span><span class="o">::</span><span class="n">string</span><span class="o">&gt;</span><span class="w"> </span><span class="n">input</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="s">&quot;abade&quot;</span><span class="p">,</span><span class="w"> </span>
<span class="w"> </span><span class="s">&quot;ddddf&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="s">&quot;eefge&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="s">&quot;xyzzd&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="s">&quot;ijjjj&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="s">&quot;jiiii&quot;</span><span class="p">,</span>
<span class="w"> </span><span class="s">&quot;kkijk&quot;</span>
<span class="w"> </span><span class="p">};</span>
<span class="w"> </span><span class="c1">// custom data storage</span>
<span class="w"> </span><span class="k">using</span><span class="w"> </span><span class="n">data_type</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">variant</span><span class="o">&lt;</span>
<span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">string</span><span class="p">,</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">unordered_map</span><span class="o">&lt;</span><span class="kt">char</span><span class="p">,</span><span class="w"> </span><span class="kt">size_t</span><span class="o">&gt;</span><span class="p">,</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">pair</span><span class="o">&lt;</span><span class="kt">char</span><span class="p">,</span><span class="w"> </span><span class="kt">size_t</span><span class="o">&gt;</span>
<span class="w"> </span><span class="o">&gt;</span><span class="p">;</span>
<span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">array</span><span class="o">&lt;</span><span class="n">data_type</span><span class="p">,</span><span class="w"> </span><span class="n">num_lines</span><span class="o">&gt;</span><span class="w"> </span><span class="n">mybuffer</span><span class="p">;</span>
<span class="w"> </span><span class="c1">// the pipeline consists of three pipes (serial-parallel-serial)</span>
<span class="w"> </span><span class="c1">// and up to two concurrent scheduling tokens</span>
<span class="w"> </span><span class="n">tf</span><span class="o">::</span><span class="n">Pipeline</span><span class="w"> </span><span class="n">pl</span><span class="p">(</span><span class="n">num_lines</span><span class="p">,</span>
<span class="w"> </span><span class="c1">// first pipe processes the input data</span>
<span class="w"> </span><span class="n">tf</span><span class="o">::</span><span class="n">Pipe</span><span class="p">{</span><span class="n">tf</span><span class="o">::</span><span class="n">PipeType</span><span class="o">::</span><span class="n">SERIAL</span><span class="p">,</span><span class="w"> </span><span class="p">[</span><span class="o">&amp;</span><span class="p">](</span><span class="n">tf</span><span class="o">::</span><span class="n">Pipeflow</span><span class="o">&amp;</span><span class="w"> </span><span class="n">pf</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="k">if</span><span class="p">(</span><span class="n">pf</span><span class="p">.</span><span class="n">token</span><span class="p">()</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="n">input</span><span class="p">.</span><span class="n">size</span><span class="p">())</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">pf</span><span class="p">.</span><span class="n">stop</span><span class="p">();</span>
<span class="w"> </span><span class="p">}</span>
<span class="w"> </span><span class="k">else</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">printf</span><span class="p">(</span><span class="s">&quot;stage 1: input token = %s</span><span class="se">\n</span><span class="s">&quot;</span><span class="p">,</span><span class="w"> </span><span class="n">input</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">token</span><span class="p">()].</span><span class="n">c_str</span><span class="p">());</span>
<span class="w"> </span><span class="n">mybuffer</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">line</span><span class="p">()]</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">input</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">token</span><span class="p">()];</span>
<span class="w"> </span><span class="p">}</span>
<span class="w"> </span><span class="p">}},</span>
<span class="w"> </span>
<span class="w"> </span><span class="c1">// second pipe counts the frequency of each character</span>
<span class="w"> </span><span class="n">tf</span><span class="o">::</span><span class="n">Pipe</span><span class="p">{</span><span class="n">tf</span><span class="o">::</span><span class="n">PipeType</span><span class="o">::</span><span class="n">PARALLEL</span><span class="p">,</span><span class="w"> </span><span class="p">[</span><span class="o">&amp;</span><span class="p">](</span><span class="n">tf</span><span class="o">::</span><span class="n">Pipeflow</span><span class="o">&amp;</span><span class="w"> </span><span class="n">pf</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">unordered_map</span><span class="o">&lt;</span><span class="kt">char</span><span class="p">,</span><span class="w"> </span><span class="kt">size_t</span><span class="o">&gt;</span><span class="w"> </span><span class="n">map</span><span class="p">;</span>
<span class="w"> </span><span class="k">for</span><span class="p">(</span><span class="k">auto</span><span class="w"> </span><span class="n">c</span><span class="w"> </span><span class="o">:</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">get</span><span class="o">&lt;</span><span class="n">std</span><span class="o">::</span><span class="n">string</span><span class="o">&gt;</span><span class="p">(</span><span class="n">mybuffer</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">line</span><span class="p">()]))</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">map</span><span class="p">[</span><span class="n">c</span><span class="p">]</span><span class="o">++</span><span class="p">;</span>
<span class="w"> </span><span class="p">}</span>
<span class="w"> </span><span class="n">printf</span><span class="p">(</span><span class="s">&quot;stage 2: map = %s</span><span class="se">\n</span><span class="s">&quot;</span><span class="p">,</span><span class="w"> </span><span class="n">format_map</span><span class="p">(</span><span class="n">map</span><span class="p">).</span><span class="n">c_str</span><span class="p">());</span>
<span class="w"> </span><span class="n">mybuffer</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">line</span><span class="p">()]</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">map</span><span class="p">;</span>
<span class="w"> </span><span class="p">}},</span>
<span class="w"> </span>
<span class="w"> </span><span class="c1">// third pipe reduces the most frequent character</span>
<span class="w"> </span><span class="n">tf</span><span class="o">::</span><span class="n">Pipe</span><span class="p">{</span><span class="n">tf</span><span class="o">::</span><span class="n">PipeType</span><span class="o">::</span><span class="n">SERIAL</span><span class="p">,</span><span class="w"> </span><span class="p">[</span><span class="o">&amp;</span><span class="n">mybuffer</span><span class="p">](</span><span class="n">tf</span><span class="o">::</span><span class="n">Pipeflow</span><span class="o">&amp;</span><span class="w"> </span><span class="n">pf</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="k">auto</span><span class="o">&amp;</span><span class="w"> </span><span class="n">map</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">get</span><span class="o">&lt;</span><span class="n">std</span><span class="o">::</span><span class="n">unordered_map</span><span class="o">&lt;</span><span class="kt">char</span><span class="p">,</span><span class="w"> </span><span class="kt">size_t</span><span class="o">&gt;&gt;</span><span class="p">(</span><span class="n">mybuffer</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">line</span><span class="p">()]);</span>
<span class="w"> </span><span class="k">auto</span><span class="w"> </span><span class="n">sol</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">max_element</span><span class="p">(</span><span class="n">map</span><span class="p">.</span><span class="n">begin</span><span class="p">(),</span><span class="w"> </span><span class="n">map</span><span class="p">.</span><span class="n">end</span><span class="p">(),</span><span class="w"> </span><span class="p">[](</span><span class="k">auto</span><span class="o">&amp;</span><span class="w"> </span><span class="n">a</span><span class="p">,</span><span class="w"> </span><span class="k">auto</span><span class="o">&amp;</span><span class="w"> </span><span class="n">b</span><span class="p">){</span>
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">a</span><span class="p">.</span><span class="n">second</span><span class="w"> </span><span class="o">&lt;</span><span class="w"> </span><span class="n">b</span><span class="p">.</span><span class="n">second</span><span class="p">;</span>
<span class="w"> </span><span class="p">});</span>
<span class="w"> </span><span class="n">printf</span><span class="p">(</span><span class="s">&quot;stage 3: %c:%zu</span><span class="se">\n</span><span class="s">&quot;</span><span class="p">,</span><span class="w"> </span><span class="n">sol</span><span class="o">-&gt;</span><span class="n">first</span><span class="p">,</span><span class="w"> </span><span class="n">sol</span><span class="o">-&gt;</span><span class="n">second</span><span class="p">);</span>
<span class="w"> </span><span class="c1">// not necessary to store the last-stage data, just for demo purpose</span>
<span class="w"> </span><span class="n">mybuffer</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">line</span><span class="p">()]</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="o">*</span><span class="n">sol</span><span class="p">;</span><span class="w"> </span>
<span class="w"> </span><span class="p">}}</span>
<span class="w"> </span><span class="p">);</span>
<span class="w"> </span>
<span class="w"> </span><span class="c1">// build the pipeline graph using composition</span>
<span class="w"> </span><span class="n">tf</span><span class="o">::</span><span class="n">Task</span><span class="w"> </span><span class="n">init</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">taskflow</span><span class="p">.</span><span class="n">emplace</span><span class="p">([](){</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">cout</span><span class="w"> </span><span class="o">&lt;&lt;</span><span class="w"> </span><span class="s">&quot;ready</span><span class="se">\n</span><span class="s">&quot;</span><span class="p">;</span><span class="w"> </span><span class="p">})</span>
<span class="w"> </span><span class="p">.</span><span class="n">name</span><span class="p">(</span><span class="s">&quot;starting pipeline&quot;</span><span class="p">);</span>
<span class="w"> </span><span class="n">tf</span><span class="o">::</span><span class="n">Task</span><span class="w"> </span><span class="n">task</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">taskflow</span><span class="p">.</span><span class="n">composed_of</span><span class="p">(</span><span class="n">pl</span><span class="p">)</span>
<span class="w"> </span><span class="p">.</span><span class="n">name</span><span class="p">(</span><span class="s">&quot;pipeline&quot;</span><span class="p">);</span>
<span class="w"> </span><span class="n">tf</span><span class="o">::</span><span class="n">Task</span><span class="w"> </span><span class="n">stop</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">taskflow</span><span class="p">.</span><span class="n">emplace</span><span class="p">([](){</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">cout</span><span class="w"> </span><span class="o">&lt;&lt;</span><span class="w"> </span><span class="s">&quot;stopped</span><span class="se">\n</span><span class="s">&quot;</span><span class="p">;</span><span class="w"> </span><span class="p">})</span>
<span class="w"> </span><span class="p">.</span><span class="n">name</span><span class="p">(</span><span class="s">&quot;pipeline stopped&quot;</span><span class="p">);</span>
<span class="w"> </span><span class="c1">// create task dependency</span>
<span class="w"> </span><span class="n">init</span><span class="p">.</span><span class="n">precede</span><span class="p">(</span><span class="n">task</span><span class="p">);</span>
<span class="w"> </span><span class="n">task</span><span class="p">.</span><span class="n">precede</span><span class="p">(</span><span class="n">stop</span><span class="p">);</span>
<span class="w"> </span>
<span class="w"> </span><span class="c1">// dump the pipeline graph structure (with composition)</span>
<span class="w"> </span><span class="n">taskflow</span><span class="p">.</span><span class="n">dump</span><span class="p">(</span><span class="n">std</span><span class="o">::</span><span class="n">cout</span><span class="p">);</span>
<span class="w"> </span><span class="c1">// run the pipeline</span>
<span class="w"> </span><span class="n">executor</span><span class="p">.</span><span class="n">run</span><span class="p">(</span><span class="n">taskflow</span><span class="p">).</span><span class="n">wait</span><span class="p">();</span>
<span class="w"> </span>
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="mi">0</span><span class="p">;</span>
<span class="p">}</span></pre><section id="TextPipelineDefineTheDataBuffer"><h3><a href="#TextPipelineDefineTheDataBuffer">Define the Data Buffer</a></h3><p>Taskflow does not provide any data abstraction to perform pipeline scheduling, but give users full control over data management in their applications. In this example, we create an one-dimensional buffer of a <a href="https://en.cppreference.com/w/cpp/utility/variant">std::<wbr />variant</a> data type to store the output of each pipe in a uniform storage:</p><pre class="m-code"><span class="k">using</span><span class="w"> </span><span class="n">data_type</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">variant</span><span class="o">&lt;</span>
<span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">string</span><span class="p">,</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">unordered_map</span><span class="o">&lt;</span><span class="kt">char</span><span class="p">,</span><span class="w"> </span><span class="kt">size_t</span><span class="o">&gt;</span><span class="p">,</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">pair</span><span class="o">&lt;</span><span class="kt">char</span><span class="p">,</span><span class="w"> </span><span class="kt">size_t</span><span class="o">&gt;</span>
<span class="o">&gt;</span><span class="p">;</span>
<span class="n">std</span><span class="o">::</span><span class="n">array</span><span class="o">&lt;</span><span class="n">std</span><span class="o">::</span><span class="n">array</span><span class="o">&lt;</span><span class="n">data_type</span><span class="p">,</span><span class="w"> </span><span class="n">num_pipes</span><span class="o">&gt;</span><span class="p">,</span><span class="w"> </span><span class="n">num_lines</span><span class="o">&gt;</span><span class="w"> </span><span class="n">mybuffer</span><span class="p">;</span></pre><aside class="m-note m-info"><h4>Note</h4><p>One-dimensional buffer is sufficient because Taskflow enables only one scheduling token per line at a time.</p></aside></section><section id="TextPipelineDefineThePipes"><h3><a href="#TextPipelineDefineThePipes">Define the Pipes</a></h3><p>The first pipe reads one string and puts it in the corresponding entry at the buffer, <code>mybuffer[pf.line()]</code>. Since we read in each string in order, we declare the pipe as a serial type:</p><pre class="m-code"><span class="n">tf</span><span class="o">::</span><span class="n">Pipe</span><span class="p">{</span><span class="n">tf</span><span class="o">::</span><span class="n">PipeType</span><span class="o">::</span><span class="n">SERIAL</span><span class="p">,</span><span class="w"> </span><span class="p">[</span><span class="o">&amp;</span><span class="p">](</span><span class="n">tf</span><span class="o">::</span><span class="n">Pipeflow</span><span class="o">&amp;</span><span class="w"> </span><span class="n">pf</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="k">if</span><span class="p">(</span><span class="n">pf</span><span class="p">.</span><span class="n">token</span><span class="p">()</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="n">input</span><span class="p">.</span><span class="n">size</span><span class="p">())</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">pf</span><span class="p">.</span><span class="n">stop</span><span class="p">();</span>
<span class="w"> </span><span class="p">}</span>
<span class="w"> </span><span class="k">else</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">mybuffer</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">line</span><span class="p">()]</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">input</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">token</span><span class="p">()];</span>
<span class="w"> </span><span class="n">printf</span><span class="p">(</span><span class="s">&quot;stage 1: input token = %s</span><span class="se">\n</span><span class="s">&quot;</span><span class="p">,</span><span class="w"> </span><span class="n">input</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">token</span><span class="p">()].</span><span class="n">c_str</span><span class="p">());</span>
<span class="w"> </span><span class="p">}</span>
<span class="p">}},</span></pre><p>The second pipe needs to get the input string from the previous pipe and then transforms that input string into a frequency map that records the occurrence of each character in the string. As multiple transforms can operate simultaneously, we declare the pipe as a parallel type:</p><pre class="m-code"><span class="n">tf</span><span class="o">::</span><span class="n">Pipe</span><span class="p">{</span><span class="n">tf</span><span class="o">::</span><span class="n">PipeType</span><span class="o">::</span><span class="n">PARALLEL</span><span class="p">,</span><span class="w"> </span><span class="p">[</span><span class="o">&amp;</span><span class="p">](</span><span class="n">tf</span><span class="o">::</span><span class="n">Pipeflow</span><span class="o">&amp;</span><span class="w"> </span><span class="n">pf</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">unordered_map</span><span class="o">&lt;</span><span class="kt">char</span><span class="p">,</span><span class="w"> </span><span class="kt">size_t</span><span class="o">&gt;</span><span class="w"> </span><span class="n">map</span><span class="p">;</span>
<span class="w"> </span><span class="k">for</span><span class="p">(</span><span class="k">auto</span><span class="w"> </span><span class="n">c</span><span class="w"> </span><span class="o">:</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">get</span><span class="o">&lt;</span><span class="n">std</span><span class="o">::</span><span class="n">string</span><span class="o">&gt;</span><span class="p">(</span><span class="n">mybuffer</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">line</span><span class="p">()]))</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">map</span><span class="p">[</span><span class="n">c</span><span class="p">]</span><span class="o">++</span><span class="p">;</span>
<span class="w"> </span><span class="p">}</span>
<span class="w"> </span><span class="n">mybuffer</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">line</span><span class="p">()]</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">map</span><span class="p">;</span>
<span class="w"> </span><span class="n">printf</span><span class="p">(</span><span class="s">&quot;stage 2: map = %s</span><span class="se">\n</span><span class="s">&quot;</span><span class="p">,</span><span class="w"> </span><span class="n">format_map</span><span class="p">(</span><span class="n">map</span><span class="p">).</span><span class="n">c_str</span><span class="p">());</span>
<span class="p">}}</span></pre><p>Similarly, the third pipe needs to get the input frequency map from the previous pipe and then reduces the result to find the most frequent character. We may not need to store the result in the buffer but other places defined by the application (e.g., an output file). As we want to output the result in the same order as the input, we declare the pipe as a serial type:</p><pre class="m-code"><span class="n">tf</span><span class="o">::</span><span class="n">Pipe</span><span class="p">{</span><span class="n">tf</span><span class="o">::</span><span class="n">PipeType</span><span class="o">::</span><span class="n">SERIAL</span><span class="p">,</span><span class="w"> </span><span class="p">[</span><span class="o">&amp;</span><span class="n">mybuffer</span><span class="p">](</span><span class="n">tf</span><span class="o">::</span><span class="n">Pipeflow</span><span class="o">&amp;</span><span class="w"> </span><span class="n">pf</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="k">auto</span><span class="o">&amp;</span><span class="w"> </span><span class="n">map</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">get</span><span class="o">&lt;</span><span class="n">std</span><span class="o">::</span><span class="n">unordered_map</span><span class="o">&lt;</span><span class="kt">char</span><span class="p">,</span><span class="w"> </span><span class="kt">size_t</span><span class="o">&gt;&gt;</span><span class="p">(</span><span class="n">mybuffer</span><span class="p">[</span><span class="n">pf</span><span class="p">.</span><span class="n">line</span><span class="p">()]);</span>
<span class="w"> </span><span class="k">auto</span><span class="w"> </span><span class="n">sol</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">max_element</span><span class="p">(</span><span class="n">map</span><span class="p">.</span><span class="n">begin</span><span class="p">(),</span><span class="w"> </span><span class="n">map</span><span class="p">.</span><span class="n">end</span><span class="p">(),</span><span class="w"> </span><span class="p">[](</span><span class="k">auto</span><span class="o">&amp;</span><span class="w"> </span><span class="n">a</span><span class="p">,</span><span class="w"> </span><span class="k">auto</span><span class="o">&amp;</span><span class="w"> </span><span class="n">b</span><span class="p">){</span>
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">a</span><span class="p">.</span><span class="n">second</span><span class="w"> </span><span class="o">&lt;</span><span class="w"> </span><span class="n">b</span><span class="p">.</span><span class="n">second</span><span class="p">;</span>
<span class="w"> </span><span class="p">});</span>
<span class="w"> </span><span class="n">printf</span><span class="p">(</span><span class="s">&quot;stage 3: %c:%zu</span><span class="se">\n</span><span class="s">&quot;</span><span class="p">,</span><span class="w"> </span><span class="n">sol</span><span class="o">-&gt;</span><span class="n">first</span><span class="p">,</span><span class="w"> </span><span class="n">sol</span><span class="o">-&gt;</span><span class="n">second</span><span class="p">);</span>
<span class="p">}}</span></pre></section><section id="TextPipelineDefineTheTaskGraph"><h3><a href="#TextPipelineDefineTheTaskGraph">Define the Task Graph</a></h3><p>To build up the taskflow graph for the pipeline, we create a module task out of the pipeline structure and connect it with two tasks that outputs messages before and after the pipeline:</p><pre class="m-code"><span class="n">tf</span><span class="o">::</span><span class="n">Task</span><span class="w"> </span><span class="n">init</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">taskflow</span><span class="p">.</span><span class="n">emplace</span><span class="p">([](){</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">cout</span><span class="w"> </span><span class="o">&lt;&lt;</span><span class="w"> </span><span class="s">&quot;ready</span><span class="se">\n</span><span class="s">&quot;</span><span class="p">;</span><span class="w"> </span><span class="p">})</span>
<span class="w"> </span><span class="p">.</span><span class="n">name</span><span class="p">(</span><span class="s">&quot;starting pipeline&quot;</span><span class="p">);</span>
<span class="n">tf</span><span class="o">::</span><span class="n">Task</span><span class="w"> </span><span class="n">task</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">taskflow</span><span class="p">.</span><span class="n">composed_of</span><span class="p">(</span><span class="n">pl</span><span class="p">)</span>
<span class="w"> </span><span class="p">.</span><span class="n">name</span><span class="p">(</span><span class="s">&quot;pipeline&quot;</span><span class="p">);</span>
<span class="n">tf</span><span class="o">::</span><span class="n">Task</span><span class="w"> </span><span class="n">stop</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">taskflow</span><span class="p">.</span><span class="n">emplace</span><span class="p">([](){</span><span class="w"> </span><span class="n">std</span><span class="o">::</span><span class="n">cout</span><span class="w"> </span><span class="o">&lt;&lt;</span><span class="w"> </span><span class="s">&quot;stopped</span><span class="se">\n</span><span class="s">&quot;</span><span class="p">;</span><span class="w"> </span><span class="p">})</span>
<span class="w"> </span><span class="p">.</span><span class="n">name</span><span class="p">(</span><span class="s">&quot;pipeline stopped&quot;</span><span class="p">);</span>
<span class="n">init</span><span class="p">.</span><span class="n">precede</span><span class="p">(</span><span class="n">task</span><span class="p">);</span>
<span class="n">task</span><span class="p">.</span><span class="n">precede</span><span class="p">(</span><span class="n">stop</span><span class="p">);</span></pre></section><section id="TextPipelineSubmitTheTaskGraph"><h3><a href="#TextPipelineSubmitTheTaskGraph">Submit the Task Graph</a></h3><p>Finally, we submit the taskflow to the execution and run it once:</p><pre class="m-code"><span class="n">executor</span><span class="p">.</span><span class="n">run</span><span class="p">(</span><span class="n">taskflow</span><span class="p">).</span><span class="n">wait</span><span class="p">();</span></pre><p>As the second stage is a parallel pipe, the output may interleave. One possible result is shown below:</p><pre class="m-console"><span class="go">ready</span>
<span class="go">stage 1: input token = abade</span>
<span class="go">stage 1: input token = ddddf</span>
<span class="go">stage 2: map = f:1 d:4 </span>
<span class="go">stage 2: map = e:1 d:1 a:2 b:1 </span>
<span class="go">stage 3: a:2</span>
<span class="go">stage 1: input token = eefge</span>
<span class="go">stage 2: map = g:1 e:3 f:1 </span>
<span class="go">stage 3: d:4</span>
<span class="go">stage 1: input token = xyzzd</span>
<span class="go">stage 3: e:3</span>
<span class="go">stage 1: input token = ijjjj</span>
<span class="go">stage 2: map = z:2 x:1 d:1 y:1 </span>
<span class="go">stage 3: z:2</span>
<span class="go">stage 1: input token = jiiii</span>
<span class="go">stage 2: map = j:4 i:1 </span>
<span class="go">stage 3: j:4</span>
<span class="go">stage 2: map = i:4 j:1 </span>
<span class="go">stage 1: input token = kkijk</span>
<span class="go">stage 3: i:4</span>
<span class="go">stage 2: map = j:1 k:3 i:1 </span>
<span class="go">stage 3: k:3</span>
<span class="go">stopped</span></pre><p>We can see seven outputs at the third stage that show the most frequent character for each of the seven strings in order (<code>a:2</code>, <code>d:4</code>, <code>e:3</code>, <code>z:2</code>, <code>j:4</code>, <code>i:4</code>, <code>k:3</code>). The taskflow graph of this pipeline workload is shown below:</p><div class="m-graph"><svg style="width: 30.800rem; height: 25.100rem;" viewBox="0.00 0.00 308.00 251.00">
<g transform="scale(1 1) rotate(0) translate(4 247)">
<title>Taskflow</title>
<g class="m-cluster">
<title>cluster_p0x7ffd7418c200</title>
<polygon points="8,-8 8,-235 142,-235 142,-8 8,-8"/>
<text text-anchor="middle" x="75" y="-223" font-family="Helvetica,sans-Serif" font-size="10.00">Text Processing Pipeline</text>
</g>
<g class="m-cluster">
<title>cluster_p0x7ffd7418c110</title>
<polygon points="150,-81 150,-235 292,-235 292,-81 150,-81"/>
<text text-anchor="middle" x="221" y="-223" font-family="Helvetica,sans-Serif" font-size="10.00">m1</text>
</g>
<g class="m-node m-flat">
<title>p0x7bc4000142e8</title>
<ellipse cx="75" cy="-190" rx="56.52" ry="18"/>
<text text-anchor="middle" x="75" y="-187.5" font-family="Helvetica,sans-Serif" font-size="10.00">starting pipeline</text>
</g>
<g class="m-node m-flat">
<title>p0x7bc4000143d0</title>
<polygon points="116,-125 38,-125 34,-121 34,-89 112,-89 116,-93 116,-125"/>
<polyline points="112,-121 34,-121 "/>
<polyline points="112,-121 112,-89 "/>
<polyline points="112,-121 116,-125 "/>
<text text-anchor="middle" x="75" y="-104.5" font-family="Helvetica,sans-Serif" font-size="10.00">pipeline [m1]</text>
</g>
<g class="m-edge">
<title>p0x7bc4000142e8&#45;&gt;p0x7bc4000143d0</title>
<path d="M75,-171.82C75,-161.19 75,-147.31 75,-135.2"/>
<polygon points="78.5,-135.15 75,-125.15 71.5,-135.15 78.5,-135.15"/>
</g>
<g class="m-node m-flat">
<title>p0x7bc4000144b8</title>
<ellipse cx="75" cy="-34" rx="57.88" ry="18"/>
<text text-anchor="middle" x="75" y="-31.5" font-family="Helvetica,sans-Serif" font-size="10.00">pipeline stopped</text>
</g>
<g class="m-edge">
<title>p0x7bc4000143d0&#45;&gt;p0x7bc4000144b8</title>
<path d="M75,-88.81C75,-80.79 75,-71.05 75,-62.07"/>
<polygon points="78.5,-62.03 75,-52.03 71.5,-62.03 78.5,-62.03"/>
</g>
<g class="m-node">
<title>p0x7bc400014030</title>
<polygon points="203,-208 169.9,-190 203,-172 236.1,-190 203,-208"/>
<text text-anchor="middle" x="203" y="-187.5" font-family="Helvetica,sans-Serif" font-size="10.00">cond</text>
</g>
<g class="m-node m-flat">
<title>p0x7bc400014118</title>
<polygon points="212,-125 158,-125 158,-121 154,-121 154,-117 158,-117 158,-97 154,-97 154,-93 158,-93 158,-89 212,-89 212,-125"/>
<polyline points="158,-121 162,-121 162,-117 158,-117 "/>
<polyline points="158,-97 162,-97 162,-93 158,-93 "/>
<text text-anchor="middle" x="185" y="-104.5" font-family="Helvetica,sans-Serif" font-size="10.00">rt&#45;0</text>
</g>
<g class="m-edge">
<title>p0x7bc400014030&#45;&gt;p0x7bc400014118</title>
<path stroke-dasharray="5,2" d="M199.61,-173.76C197.19,-162.84 193.85,-147.84 190.98,-134.93"/>
<polygon points="194.39,-134.13 188.81,-125.13 187.56,-135.65 194.39,-134.13"/>
<text text-anchor="middle" x="198.5" y="-146" font-family="Helvetica,sans-Serif" font-size="10.00">0</text>
</g>
<g class="m-node m-flat">
<title>p0x7bc400014200</title>
<polygon points="284,-125 230,-125 230,-121 226,-121 226,-117 230,-117 230,-97 226,-97 226,-93 230,-93 230,-89 284,-89 284,-125"/>
<polyline points="230,-121 234,-121 234,-117 230,-117 "/>
<polyline points="230,-97 234,-97 234,-93 230,-93 "/>
<text text-anchor="middle" x="257" y="-104.5" font-family="Helvetica,sans-Serif" font-size="10.00">rt&#45;1</text>
</g>
<g class="m-edge">
<title>p0x7bc400014030&#45;&gt;p0x7bc400014200</title>
<path stroke-dasharray="5,2" d="M211.44,-176.34C219.05,-164.92 230.49,-147.77 239.99,-133.51"/>
<polygon points="242.98,-135.34 245.61,-125.08 237.15,-131.46 242.98,-135.34"/>
<text text-anchor="middle" x="236.5" y="-146" font-family="Helvetica,sans-Serif" font-size="10.00">1</text>
</g>
</g>
</svg>
</div></section></section>
</div>
</div>
</div>
</article></main>
<div class="m-doc-search" id="search">
<a href="#!" onclick="return hideSearch()"></a>
<div class="m-container">
<div class="m-row">
<div class="m-col-m-8 m-push-m-2">
<div class="m-doc-search-header m-text m-small">
<div><span class="m-label m-default">Tab</span> / <span class="m-label m-default">T</span> to search, <span class="m-label m-default">Esc</span> to close</div>
<div id="search-symbolcount">&hellip;</div>
</div>
<div class="m-doc-search-content">
<form>
<input type="search" name="q" id="search-input" placeholder="Loading &hellip;" disabled="disabled" autofocus="autofocus" autocomplete="off" spellcheck="false" />
</form>
<noscript class="m-text m-danger m-text-center">Unlike everything else in the docs, the search functionality <em>requires</em> JavaScript.</noscript>
<div id="search-help" class="m-text m-dim m-text-center">
<p class="m-noindent">Search for symbols, directories, files, pages or
modules. You can omit any prefix from the symbol or file path; adding a
<code>:</code> or <code>/</code> suffix lists all members of given symbol or
directory.</p>
<p class="m-noindent">Use <span class="m-label m-dim">&darr;</span>
/ <span class="m-label m-dim">&uarr;</span> to navigate through the list,
<span class="m-label m-dim">Enter</span> to go.
<span class="m-label m-dim">Tab</span> autocompletes common prefix, you can
copy a link to the result using <span class="m-label m-dim"></span>
<span class="m-label m-dim">L</span> while <span class="m-label m-dim"></span>
<span class="m-label m-dim">M</span> produces a Markdown link.</p>
</div>
<div id="search-notfound" class="m-text m-warning m-text-center">Sorry, nothing was found.</div>
<ul id="search-results"></ul>
</div>
</div>
</div>
</div>
</div>
<script src="search-v2.js"></script>
<script src="searchdata-v2.js" async="async"></script>
<footer><nav>
<div class="m-container">
<div class="m-row">
<div class="m-col-l-10 m-push-l-1">
<p>Taskflow handbook is part of the <a href="https://taskflow.github.io">Taskflow project</a>, copyright © <a href="https://tsung-wei-huang.github.io/">Dr. Tsung-Wei Huang</a>, 2018&ndash;2024.<br />Generated by <a href="https://doxygen.org/">Doxygen</a> 1.9.1 and <a href="https://mcss.mosra.cz/">m.css</a>.</p>
</div>
</div>
</div>
</nav></footer>
</body>
</html>