mana: strategy

This commit is contained in:
Florian Lüke 2024-12-26 03:17:33 +01:00
parent 76b22c1434
commit 9bdd79e910

View file

@ -141,8 +141,8 @@ struct ProcessingStrategy
virtual void run(ListfileContext &listfileContext, ParserContext &parserContext) = 0;
};
size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
ParserContext &parserContext)
inline size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
ParserContext &parserContext)
{
listfileContext.readerHelper.destBuf().clear();
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);
@ -159,6 +159,46 @@ size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
return buffer->used();
}
struct SingleThreadedStrategy: public ProcessingStrategy
{
void run(ListfileContext &listfileContext, ParserContext &parserContext) override
{
size_t bufferNumber = 0;
size_t totalBytesProcessed = 0;
size_t bytesProcessed = 0;
const std::chrono::milliseconds ReportInterval(500);
mvlc::util::Stopwatch sw;
auto report = [&]
{
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
{
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
auto bytesPerSecond = totalBytesProcessed / s;
auto MiBPerSecond = bytesPerSecond / (1u << 20);
std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n",
bufferNumber, totalBytesProcessed, s, MiBPerSecond);
}(sw, bufferNumber, totalBytesProcessed);
};
do
{
bytesProcessed = process_one_buffer(bufferNumber, listfileContext, parserContext);
totalBytesProcessed += bytesProcessed;
++bufferNumber;
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
{
report();
sw.interval();
}
}
while (bytesProcessed > 0);
report();
}
};
int main(int argc, char *argv[])
{
auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json");
@ -232,43 +272,12 @@ int main(int argc, char *argv[])
return 1;
parserContext->parser.userContext = &mana;
std::unique_ptr<ProcessingStrategy> strategy = std::make_unique<SingleThreadedStrategy>();
mana.sinkContext = mana.sink->init();
mana.sink->begin_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
size_t bufferNumber = 0;
size_t totalBytesProcessed = 0;
size_t bytesProcessed = 0;
const std::chrono::milliseconds ReportInterval(500);
mvlc::util::Stopwatch sw;
auto report = [&]
{
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
{
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
auto bytesPerSecond = totalBytesProcessed / s;
auto MiBPerSecond = bytesPerSecond / (1u << 20);
std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n",
bufferNumber, totalBytesProcessed, s, MiBPerSecond);
}(sw, bufferNumber, totalBytesProcessed);
};
do
{
bytesProcessed = process_one_buffer(bufferNumber, *listfileContext, *parserContext);
totalBytesProcessed += bytesProcessed;
++bufferNumber;
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
{
report();
sw.interval();
}
}
while (bytesProcessed > 0);
report();
strategy->run(*listfileContext, *parserContext);
mana.sink->end_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
mana.sink->shutdown(mana.sinkContext);