// reference: // - gomp: https://github.com/gcc-mirror/gcc/blob/master/libgomp/iter.c // - komp: https://github.com/llvm-mirror/openmp/blob/master/runtime/src/kmp_dispatch.cpp #pragma once #include "../executor.hpp" namespace tf { // ---------------------------------------------------------------------------- // default parallel for // ---------------------------------------------------------------------------- // Function: for_each template Task FlowBuilder::for_each(B&& beg, E&& end, C&& c) { //return for_each_guided( // std::forward(beg), std::forward(end), std::forward(c), 1 //); using I = stateful_iterator_t; using namespace std::string_literals; Task task = emplace( [b=std::forward(beg), e=std::forward(end), c=std::forward(c)] (Subflow& sf) mutable { // fetch the stateful values I beg = b; I end = e; if(beg == end) { return; } size_t chunk_size = 1; size_t W = sf._executor.num_workers(); size_t N = std::distance(beg, end); // only myself - no need to spawn another graph if(W <= 1 || N <= chunk_size) { std::for_each(beg, end, c); return; } if(N < W) { W = N; } std::atomic next(0); for(size_t w=0; w(W); size_t s0 = next.load(std::memory_order_relaxed); while(s0 < N) { size_t r = N - s0; // fine-grained if(r < p1) { while(1) { s0 = next.fetch_add(chunk_size, std::memory_order_relaxed); if(s0 >= N) { return; } size_t e0 = (chunk_size <= (N - s0)) ? s0 + chunk_size : N; std::advance(beg, s0-z); for(size_t x=s0; x(p2 * r); if(q < chunk_size) { q = chunk_size; } size_t e0 = (q <= r) ? s0 + q : N; if(next.compare_exchange_strong(s0, e0, std::memory_order_acquire, std::memory_order_relaxed)) { std::advance(beg, s0-z); for(size_t x = s0; x< e0; x++) { c(*beg++); } z = e0; s0 = next.load(std::memory_order_relaxed); } } } //}).name("pfg_"s + std::to_string(w)); }); } sf.join(); }); return task; } // Function: for_each_index template Task FlowBuilder::for_each_index(B&& beg, E&& end, S&& inc, C&& c){ //return for_each_index_guided( // std::forward(beg), // std::forward(end), // std::forward(inc), // std::forward(c), // 1 //); using I = stateful_index_t; using namespace std::string_literals; Task task = emplace( [b=std::forward(beg), e=std::forward(end), a=std::forward(inc), c=std::forward(c)] (Subflow& sf) mutable { // fetch the iterator values I beg = b; I end = e; I inc = a; if(is_range_invalid(beg, end, inc)) { TF_THROW("invalid range [", beg, ", ", end, ") with step size ", inc); } size_t chunk_size = 1; size_t W = sf._executor.num_workers(); size_t N = distance(beg, end, inc); // only myself - no need to spawn another graph if(W <= 1 || N <= chunk_size) { for(size_t x=0; x next(0); for(size_t w=0; w(W); size_t s0 = next.load(std::memory_order_relaxed); while(s0 < N) { size_t r = N - s0; // find-grained if(r < p1) { while(1) { s0 = next.fetch_add(chunk_size, std::memory_order_relaxed); if(s0 >= N) { return; } size_t e0 = (chunk_size <= (N - s0)) ? s0 + chunk_size : N; auto s = static_cast(s0) * inc + beg; for(size_t x=s0; x(p2 * r); if(q < chunk_size) { q = chunk_size; } size_t e0 = (q <= r) ? s0 + q : N; if(next.compare_exchange_strong(s0, e0, std::memory_order_acquire, std::memory_order_relaxed)) { auto s = static_cast(s0) * inc + beg; for(size_t x=s0; x Task FlowBuilder::for_each_guided(B&& beg, E&& end, C&& c, H&& chunk_size){ using I = stateful_iterator_t; using namespace std::string_literals; Task task = emplace( [b=std::forward(beg), e=std::forward(end), c=std::forward(c), h=std::forward(chunk_size)] (Subflow& sf) mutable { // fetch the stateful values I beg = b; I end = e; if(beg == end) { return; } size_t chunk_size = (h == 0) ? 1 : h; size_t W = sf._executor.num_workers(); size_t N = std::distance(beg, end); // only myself - no need to spawn another graph if(W <= 1 || N <= chunk_size) { std::for_each(beg, end, c); return; } if(N < W) { W = N; } std::atomic next(0); for(size_t w=0; w(W); size_t s0 = next.load(std::memory_order_relaxed); while(s0 < N) { size_t r = N - s0; // fine-grained if(r < p1) { while(1) { s0 = next.fetch_add(chunk_size, std::memory_order_relaxed); if(s0 >= N) { return; } size_t e0 = (chunk_size <= (N - s0)) ? s0 + chunk_size : N; std::advance(beg, s0-z); for(size_t x=s0; x(p2 * r); if(q < chunk_size) { q = chunk_size; } size_t e0 = (q <= r) ? s0 + q : N; if(next.compare_exchange_strong(s0, e0, std::memory_order_acquire, std::memory_order_relaxed)) { std::advance(beg, s0-z); for(size_t x = s0; x< e0; x++) { c(*beg++); } z = e0; s0 = next.load(std::memory_order_relaxed); } } } //}).name("pfg_"s + std::to_string(w)); }); } sf.join(); }); return task; } // Function: for_each_index_guided template Task FlowBuilder::for_each_index_guided( B&& beg, E&& end, S&& inc, C&& c, H&& chunk_size ){ using I = stateful_index_t; using namespace std::string_literals; Task task = emplace( [b=std::forward(beg), e=std::forward(end), a=std::forward(inc), c=std::forward(c), h=std::forward(chunk_size)] (Subflow& sf) mutable { // fetch the iterator values I beg = b; I end = e; I inc = a; if(is_range_invalid(beg, end, inc)) { TF_THROW("invalid range [", beg, ", ", end, ") with step size ", inc); } size_t chunk_size = (h == 0) ? 1 : h; size_t W = sf._executor.num_workers(); size_t N = distance(beg, end, inc); // only myself - no need to spawn another graph if(W <= 1 || N <= chunk_size) { for(size_t x=0; x next(0); for(size_t w=0; w(W); size_t s0 = next.load(std::memory_order_relaxed); while(s0 < N) { size_t r = N - s0; // find-grained if(r < p1) { while(1) { s0 = next.fetch_add(chunk_size, std::memory_order_relaxed); if(s0 >= N) { return; } size_t e0 = (chunk_size <= (N - s0)) ? s0 + chunk_size : N; auto s = static_cast(s0) * inc + beg; for(size_t x=s0; x(p2 * r); if(q < chunk_size) { q = chunk_size; } size_t e0 = (q <= r) ? s0 + q : N; if(next.compare_exchange_strong(s0, e0, std::memory_order_acquire, std::memory_order_relaxed)) { auto s = static_cast(s0) * inc + beg; for(size_t x=s0; x Task FlowBuilder::for_each_factoring(B&& beg, E&& end, C&& c){ using I = stateful_iterator_t; using namespace std::string_literals; Task task = emplace( [b=std::forward(beg), e=std::forward(end), c=std::forward(c)] (Subflow& sf) mutable { // fetch the iterator values I beg = b; I end = e; if(beg == end) { return; } size_t W = sf._executor.num_workers(); size_t N = std::distance(beg, end); // only myself - no need to spawn another graph if(W <= 1 || N <= 1) { std::for_each(beg, end, c); return; } if(N < W) { W = N; } std::atomic batch(0); std::atomic next(0); for(size_t w=0; w((N >> b0) / (double)W); if(ck == 0) { ck = 1; } size_t s0 = next.fetch_add(ck, std::memory_order_relaxed); if(s0 >= N) { return; } size_t e0 = (ck <= (N - s0)) ? s0 + ck : N; std::advance(beg, s0-z); for(size_t x=s0; x Task FlowBuilder::for_each_factoring( B&& beg, E&& end, S&& inc, C&& c ){ using I = stateful_index_t; using namespace std::string_literals; Task task = emplace( [b=std::forward(beg), e=std::forward(end), i=std::forward(inc), c=std::forward(c)] (Subflow& sf) mutable { // fetch the iterator values I beg = b; I end = e; I inc = i; if(is_range_invalid(beg, end, inc)) { TF_THROW("invalid range [", beg, ", ", end, ") with step size ", inc); } size_t W = sf._executor.num_workers(); size_t N = distance(beg, end, inc); // only myself - no need to spawn another graph if(W <= 1 || N <= 1) { for(size_t x=0; x batch(0); std::atomic next(0); for(size_t w=0; w((N >> b0) / (double)(W)); if(ck == 0) { ck = 1; } size_t s0 = next.fetch_add(ck, std::memory_order_relaxed); if(s0 >= N) { return; } size_t e0 = (ck <= (N - s0)) ? s0 + ck : N; auto s = static_cast(s0) * inc + beg; for(size_t x=s0; x Task FlowBuilder::for_each_dynamic( B&& beg, E&& end, C&& c, H&& chunk_size ) { using I = stateful_iterator_t; using namespace std::string_literals; Task task = emplace( [b=std::forward(beg), e=std::forward(end), c=std::forward(c), h=std::forward(chunk_size)] (Subflow& sf) mutable { I beg = b; I end = e; if(beg == end) { return; } size_t chunk_size = (h == 0) ? 1 : h; size_t W = sf._executor.num_workers(); size_t N = std::distance(beg, end); // only myself - no need to spawn another graph if(W <= 1 || N <= chunk_size) { std::for_each(beg, end, c); return; } if(N < W) { W = N; } std::atomic next(0); for(size_t w=0; w= N) { break; } size_t e0 = (chunk_size <= (N - s0)) ? s0 + chunk_size : N; std::advance(beg, s0-z); for(size_t x=s0; x Task FlowBuilder::for_each_index_dynamic( B&& beg, E&& end, S&& inc, C&& c, H&& chunk_size ){ using I = stateful_index_t; using namespace std::string_literals; Task task = emplace( [b=std::forward(beg), e=std::forward(end), a=std::forward(inc), c=std::forward(c), h=std::forward(chunk_size)] (Subflow& sf) mutable { I beg = b; I end = e; I inc = a; if(is_range_invalid(beg, end, inc)) { TF_THROW("invalid range [", beg, ", ", end, ") with step size ", inc); } size_t chunk_size = (h == 0) ? 1 : h; size_t W = sf._executor.num_workers(); size_t N = distance(beg, end, inc); // only myself - no need to spawn another graph if(W <= 1 || N <= chunk_size) { for(size_t x=0; x next(0); for(size_t w=0; w= N) { break; } size_t e0 = (chunk_size <= (N - s0)) ? s0 + chunk_size : N; I s = static_cast(s0) * inc + beg; for(size_t x=s0; x Task FlowBuilder::for_each_static( B&& beg, E&& end, C&& c, H&& chunk_size ){ using I = stateful_iterator_t; using namespace std::string_literals; Task task = emplace( [b=std::forward(beg), e=std::forward(end), c=std::forward(c), h=std::forward(chunk_size)] (Subflow& sf) mutable { // fetch the iterator I beg = b; I end = e; if(beg == end) { return; } size_t chunk_size = h; const size_t W = sf._executor.num_workers(); const size_t N = std::distance(beg, end); // only myself - no need to spawn another graph if(W <= 1 || N <= chunk_size) { std::for_each(beg, end, c); return; } std::atomic next(0); // even partition if(chunk_size == 0){ // zero-based start and end points const size_t q0 = N / W; const size_t t0 = N % W; for(size_t i=0; i= N) { break; } //sf.emplace([&next, beg, end, chunk_size, N, W, &c] () mutable { sf.silent_async([&next, beg, end, chunk_size, N, W, &c] () mutable { size_t trip = W*chunk_size; size_t s0 = next.fetch_add(chunk_size, std::memory_order_relaxed); std::advance(beg, s0); while(1) { size_t items; I e = beg; for(items=0; items= N) { break; } std::advance(beg, trip); } //}).name("pfs_"s + std::to_string(i)); }); } } sf.join(); }); return task; } // Function: for_each_index_static // static scheduling with chunk size template Task FlowBuilder::for_each_index_static( B&& beg, E&& end, S&& inc, C&& c, H&& chunk_size ){ using I = stateful_index_t; using namespace std::string_literals; Task task = emplace( [b=std::forward(beg), e=std::forward(end), a=std::forward(inc), c=std::forward(c), h=std::forward(chunk_size)] (Subflow& sf) mutable { // fetch the indices I beg = b; I end = e; I inc = a; if(is_range_invalid(beg, end, inc)) { TF_THROW("invalid range [", beg, ", ", end, ") with step size ", inc); } size_t chunk_size = h; const size_t W = sf._executor.num_workers(); const size_t N = distance(beg, end, inc); // only myself - no need to spawn another graph if(W <= 1 || N <= chunk_size) { for(size_t x=0; x next(0); if(chunk_size == 0) { // zero-based start and end points const size_t q0 = N / W; const size_t t0 = N % W; for(size_t i=0; i(s0) * inc + beg; for(size_t x=0; x= N) { break; } //sf.emplace([&next, beg, inc, chunk_size, N, W, &c] () mutable { sf.silent_async([&next, beg, inc, chunk_size, N, W, &c] () mutable { size_t trip = W * chunk_size; size_t s0 = next.fetch_add(chunk_size, std::memory_order_relaxed); while(1) { size_t e0 = s0 + chunk_size; if(e0 > N) { e0 = N; } I s = static_cast(s0) * inc + beg; for(size_t x=s0; x= N) { break; } } //}).name("pfs_"s + std::to_string(i)); }); } } sf.join(); }); return task; }*/ } // end of namespace tf -----------------------------------------------------