diff --git a/src/internal/mana_analysis.h b/src/internal/mana_analysis.h index d2dabf1..e2a2a09 100644 --- a/src/internal/mana_analysis.h +++ b/src/internal/mana_analysis.h @@ -210,7 +210,7 @@ inline ModuleDataStage make_module_data_stage(const std::string &runName, mana:: allocate_outputs(result.arena, result.runInfo); result.runDescriptor = make_run_descriptor(result.runInfo); - spdlog::info("ModuleDataStage: runDescriptor={}", result.runDescriptor.dump(2)); + spdlog::info("ModuleDataStage: runDescriptor={}", result.runDescriptor.dump()); spdlog::info("ModuleDataStage: arena stats={}", arena_stats(result.arena)); return result; diff --git a/src/tools/mana_auto_replay.cc b/src/tools/mana_auto_replay.cc index ad72179..f66e323 100644 --- a/src/tools/mana_auto_replay.cc +++ b/src/tools/mana_auto_replay.cc @@ -293,11 +293,11 @@ struct NngPairStrategy: public ProcessingStrategy [](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed) { auto s = sw.get_elapsed().count() / (1000.0 * 1000.0); - auto bytesPerSecond = totalBytesProcessed / s; - auto MiBPerSecond = bytesPerSecond / (1u << 20); - std::cout << fmt::format( - "Processed {} buffers, {} bytes. t={} s, rate={:.2f} MiB/s\n", bufferNumber, - totalBytesProcessed, s, MiBPerSecond); + auto MiB = totalBytesProcessed / (1024.0 * 1024); + auto MiB_s = MiB / s; + fmt::print("Processed {} mvlc data buffers, {:.2f} MiB. " + "elapsed={:.2f} s, rate={:.2f} MiB/s\n", + bufferNumber, MiB, s, MiB_s); }(sw, bufferNumber, totalBytesProcessed); }; @@ -377,7 +377,7 @@ void usage(const char *self) << " --plugin= Load a plugin to process the data. Default is a " "simple counting plugin.\n" << " --processing-strategy= Use a specific processing strategy. " - "Available: 'direct', 'nng-pair'. Default: 'direct'\n" + "Available: 'single-threaded', 'nng-pair'. Default: 'nng-pair'\n" << " --log-level= One of 'trace', 'debug', 'info', 'warn', 'error', " "'off'.\n"; } @@ -413,7 +413,7 @@ int main(int argc, char *argv[]) return 1; } - std::unique_ptr manaPlugin; + std::unique_ptr destSink; boost::dll::shared_library pluginHandle; if (parser("--plugin")) @@ -422,7 +422,7 @@ int main(int argc, char *argv[]) try { pluginHandle = boost::dll::shared_library(pluginFile); - manaPlugin = std::make_unique( + destSink = std::make_unique( pluginHandle.get("mana_get_sink_plugin")()); } catch (const std::exception &e) @@ -433,19 +433,22 @@ int main(int argc, char *argv[]) } else { - manaPlugin = std::make_unique(); + destSink = std::make_unique(); } - std::string strategyName = "direct"; + auto manaSink = std::make_unique(destSink.get()); + + std::string strategyName = "nng-pair"; + if (parser("--processing-strategy")) strategyName = parser("--processing-strategy").str(); - std::unique_ptr strategy; //= std::make_unique(); + std::unique_ptr processingStrategy; if (strategyName == "nng-pair") - strategy = std::make_unique("inproc://mana_module_data_stage"); - else if (strategyName == "direct") - strategy = std::make_unique(); + processingStrategy = std::make_unique("inproc://mana_module_data_stage"); + else if (strategyName == "single-threaded") + processingStrategy = std::make_unique(); else { std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName); @@ -454,7 +457,7 @@ int main(int argc, char *argv[]) } auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig, - jModuleDataSources, manaPlugin.get(), nullptr); + jModuleDataSources, manaSink.get(), nullptr); auto event_data = [](void *ctx_, int crateIndex, int eventIndex, const mvlc::readout_parser::ModuleData *moduleDataList, @@ -484,10 +487,30 @@ int main(int argc, char *argv[]) mana.sink->init(0, nullptr); mana.sink->begin_run(mana.runDescriptor.dump().c_str()); - strategy->run(*listfileContext, *parserContext); + processingStrategy->run(*listfileContext, *parserContext); mana.sink->end_run(mana.runDescriptor.dump().c_str()); mana.sink->shutdown(); + auto perf = manaSink->perf(); + { + auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes), + static_cast(0)); + auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits), + static_cast(0)); + double elapsed_ms = + std::chrono::duration_cast(perf.t_endRun - perf.t_beginRun) + .count(); + double elapsed_s = elapsed_ms / 1000.0; + + double MiB = totalBytes / (1024.0 * 1024); + double MiB_s = MiB / elapsed_s; + double hit_s = totalHits / elapsed_s; + + fmt::print("Data Sink Performance: events={}, bytes={:.2f} MiB, elapsed={:.2f} s, " + "event_rate={:.2f} event/s, data_rate={:.2f} MiB/s\n", + totalHits, MiB, elapsed_s, hit_s, MiB_s); + } + return 0; }