mana auto replay: print sink perf stats at the end of the run
This commit is contained in:
parent
1698987311
commit
bbd52cbde0
2 changed files with 40 additions and 17 deletions
|
@ -210,7 +210,7 @@ inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::
|
|||
allocate_outputs(result.arena, result.runInfo);
|
||||
result.runDescriptor = make_run_descriptor(result.runInfo);
|
||||
|
||||
spdlog::info("ModuleDataStage: runDescriptor={}", result.runDescriptor.dump(2));
|
||||
spdlog::info("ModuleDataStage: runDescriptor={}", result.runDescriptor.dump());
|
||||
spdlog::info("ModuleDataStage: arena stats={}", arena_stats(result.arena));
|
||||
|
||||
return result;
|
||||
|
|
|
@ -293,11 +293,11 @@ struct NngPairStrategy: public ProcessingStrategy
|
|||
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
|
||||
{
|
||||
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
|
||||
auto bytesPerSecond = totalBytesProcessed / s;
|
||||
auto MiBPerSecond = bytesPerSecond / (1u << 20);
|
||||
std::cout << fmt::format(
|
||||
"Processed {} buffers, {} bytes. t={} s, rate={:.2f} MiB/s\n", bufferNumber,
|
||||
totalBytesProcessed, s, MiBPerSecond);
|
||||
auto MiB = totalBytesProcessed / (1024.0 * 1024);
|
||||
auto MiB_s = MiB / s;
|
||||
fmt::print("Processed {} mvlc data buffers, {:.2f} MiB. "
|
||||
"elapsed={:.2f} s, rate={:.2f} MiB/s\n",
|
||||
bufferNumber, MiB, s, MiB_s);
|
||||
}(sw, bufferNumber, totalBytesProcessed);
|
||||
};
|
||||
|
||||
|
@ -377,7 +377,7 @@ void usage(const char *self)
|
|||
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a "
|
||||
"simple counting plugin.\n"
|
||||
<< " --processing-strategy=<name> Use a specific processing strategy. "
|
||||
"Available: 'direct', 'nng-pair'. Default: 'direct'\n"
|
||||
"Available: 'single-threaded', 'nng-pair'. Default: 'nng-pair'\n"
|
||||
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
|
||||
"'off'.\n";
|
||||
}
|
||||
|
@ -413,7 +413,7 @@ int main(int argc, char *argv[])
|
|||
return 1;
|
||||
}
|
||||
|
||||
std::unique_ptr<mana::IManaSink> manaPlugin;
|
||||
std::unique_ptr<mana::IManaSink> destSink;
|
||||
boost::dll::shared_library pluginHandle;
|
||||
|
||||
if (parser("--plugin"))
|
||||
|
@ -422,7 +422,7 @@ int main(int argc, char *argv[])
|
|||
try
|
||||
{
|
||||
pluginHandle = boost::dll::shared_library(pluginFile);
|
||||
manaPlugin = std::make_unique<mana::ManaCSink>(
|
||||
destSink = std::make_unique<mana::ManaCSink>(
|
||||
pluginHandle.get<mana_sink_plugin_t()>("mana_get_sink_plugin")());
|
||||
}
|
||||
catch (const std::exception &e)
|
||||
|
@ -433,19 +433,22 @@ int main(int argc, char *argv[])
|
|||
}
|
||||
else
|
||||
{
|
||||
manaPlugin = std::make_unique<mana::ManaCountingSink>();
|
||||
destSink = std::make_unique<mana::ManaCountingSink>();
|
||||
}
|
||||
|
||||
std::string strategyName = "direct";
|
||||
auto manaSink = std::make_unique<mana::ManaSinkPerfProxy>(destSink.get());
|
||||
|
||||
std::string strategyName = "nng-pair";
|
||||
|
||||
if (parser("--processing-strategy"))
|
||||
strategyName = parser("--processing-strategy").str();
|
||||
|
||||
std::unique_ptr<ProcessingStrategy> strategy; //= std::make_unique<SingleThreadedStrategy>();
|
||||
std::unique_ptr<ProcessingStrategy> processingStrategy;
|
||||
|
||||
if (strategyName == "nng-pair")
|
||||
strategy = std::make_unique<NngPairStrategy>("inproc://mana_module_data_stage");
|
||||
else if (strategyName == "direct")
|
||||
strategy = std::make_unique<SingleThreadedStrategy>();
|
||||
processingStrategy = std::make_unique<NngPairStrategy>("inproc://mana_module_data_stage");
|
||||
else if (strategyName == "single-threaded")
|
||||
processingStrategy = std::make_unique<SingleThreadedStrategy>();
|
||||
else
|
||||
{
|
||||
std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName);
|
||||
|
@ -454,7 +457,7 @@ int main(int argc, char *argv[])
|
|||
}
|
||||
|
||||
auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig,
|
||||
jModuleDataSources, manaPlugin.get(), nullptr);
|
||||
jModuleDataSources, manaSink.get(), nullptr);
|
||||
|
||||
auto event_data = [](void *ctx_, int crateIndex, int eventIndex,
|
||||
const mvlc::readout_parser::ModuleData *moduleDataList,
|
||||
|
@ -484,10 +487,30 @@ int main(int argc, char *argv[])
|
|||
mana.sink->init(0, nullptr);
|
||||
mana.sink->begin_run(mana.runDescriptor.dump().c_str());
|
||||
|
||||
strategy->run(*listfileContext, *parserContext);
|
||||
processingStrategy->run(*listfileContext, *parserContext);
|
||||
|
||||
mana.sink->end_run(mana.runDescriptor.dump().c_str());
|
||||
mana.sink->shutdown();
|
||||
|
||||
auto perf = manaSink->perf();
|
||||
{
|
||||
auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes),
|
||||
static_cast<size_t>(0));
|
||||
auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits),
|
||||
static_cast<size_t>(0));
|
||||
double elapsed_ms =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(perf.t_endRun - perf.t_beginRun)
|
||||
.count();
|
||||
double elapsed_s = elapsed_ms / 1000.0;
|
||||
|
||||
double MiB = totalBytes / (1024.0 * 1024);
|
||||
double MiB_s = MiB / elapsed_s;
|
||||
double hit_s = totalHits / elapsed_s;
|
||||
|
||||
fmt::print("Data Sink Performance: events={}, bytes={:.2f} MiB, elapsed={:.2f} s, "
|
||||
"event_rate={:.2f} event/s, data_rate={:.2f} MiB/s\n",
|
||||
totalHits, MiB, elapsed_s, hit_s, MiB_s);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue