diff --git a/src/tools/mana_auto_replay.cc b/src/tools/mana_auto_replay.cc index 256ed05..49cfea2 100644 --- a/src/tools/mana_auto_replay.cc +++ b/src/tools/mana_auto_replay.cc @@ -141,8 +141,8 @@ struct ProcessingStrategy virtual void run(ListfileContext &listfileContext, ParserContext &parserContext) = 0; }; -size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext, - ParserContext &parserContext) +inline size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext, + ParserContext &parserContext) { listfileContext.readerHelper.destBuf().clear(); auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper); @@ -159,6 +159,46 @@ size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext, return buffer->used(); } +struct SingleThreadedStrategy: public ProcessingStrategy +{ + void run(ListfileContext &listfileContext, ParserContext &parserContext) override + { + size_t bufferNumber = 0; + size_t totalBytesProcessed = 0; + size_t bytesProcessed = 0; + const std::chrono::milliseconds ReportInterval(500); + mvlc::util::Stopwatch sw; + + auto report = [&] + { + [](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed) + { + auto s = sw.get_elapsed().count() / (1000.0 * 1000.0); + auto bytesPerSecond = totalBytesProcessed / s; + auto MiBPerSecond = bytesPerSecond / (1u << 20); + std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n", + bufferNumber, totalBytesProcessed, s, MiBPerSecond); + }(sw, bufferNumber, totalBytesProcessed); + }; + + do + { + bytesProcessed = process_one_buffer(bufferNumber, listfileContext, parserContext); + totalBytesProcessed += bytesProcessed; + ++bufferNumber; + + if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval) + { + report(); + sw.interval(); + } + } + while (bytesProcessed > 0); + + report(); + } +}; + int main(int argc, char *argv[]) { auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json"); @@ -232,43 +272,12 @@ int main(int argc, char *argv[]) return 1; parserContext->parser.userContext = &mana; + std::unique_ptr strategy = std::make_unique(); mana.sinkContext = mana.sink->init(); mana.sink->begin_run(mana.sinkContext, mana.runDescriptor.dump().c_str()); - size_t bufferNumber = 0; - size_t totalBytesProcessed = 0; - size_t bytesProcessed = 0; - const std::chrono::milliseconds ReportInterval(500); - mvlc::util::Stopwatch sw; - - auto report = [&] - { - [](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed) - { - auto s = sw.get_elapsed().count() / (1000.0 * 1000.0); - auto bytesPerSecond = totalBytesProcessed / s; - auto MiBPerSecond = bytesPerSecond / (1u << 20); - std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n", - bufferNumber, totalBytesProcessed, s, MiBPerSecond); - }(sw, bufferNumber, totalBytesProcessed); - }; - - do - { - bytesProcessed = process_one_buffer(bufferNumber, *listfileContext, *parserContext); - totalBytesProcessed += bytesProcessed; - ++bufferNumber; - - if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval) - { - report(); - sw.interval(); - } - } - while (bytesProcessed > 0); - - report(); + strategy->run(*listfileContext, *parserContext); mana.sink->end_run(mana.sinkContext, mana.runDescriptor.dump().c_str()); mana.sink->shutdown(mana.sinkContext);