diff --git a/src/internal/mana_analysis.h b/src/internal/mana_analysis.h index 4f216e6..096ff98 100644 --- a/src/internal/mana_analysis.h +++ b/src/internal/mana_analysis.h @@ -74,8 +74,8 @@ auto_match_modules(const mvlc::CrateConfig &crateConfig, const nlohmann::json &m jModule = jModule_; jModule["name"] = fmt::format("{}.{}", event.getName(), module_.name); for (auto &jSource: jModule["data_sources"]) - jSource["name"] = - fmt::format("{}.{}.{}", event.getName(), module_.name, jSource["name"].get()); + jSource["name"] = fmt::format("{}.{}.{}", event.getName(), module_.name, + jSource["name"].get()); break; } } @@ -196,15 +196,13 @@ inline nlohmann::json make_run_descriptor(const RunInfo &runInfo) inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::Arena &&arena, const mvlc::CrateConfig &crateConfig, - nlohmann::json moduleDb, IManaSink *sink, - void *sinkContext) + nlohmann::json moduleDb, IManaSink *sink) { ModuleDataStage result; result.arena = std::move(arena); result.crateConfig = crateConfig; result.sink = sink; - result.sinkContext = sinkContext; result.moduleInfo = auto_match_modules(crateConfig, moduleDb); result.runInfo = make_run_info(runName, crateConfig, result.moduleInfo); allocate_outputs(result.arena, result.runInfo); diff --git a/src/internal/mana_nng.hpp b/src/internal/mana_nng.hpp index 9d99630..e176228 100644 --- a/src/internal/mana_nng.hpp +++ b/src/internal/mana_nng.hpp @@ -13,6 +13,7 @@ struct MessageHeader { enum MessageType { + Invalid = 0, BeginRun = 42, EndRun, EventData, diff --git a/src/tools/mana_auto_replay.cc b/src/tools/mana_auto_replay.cc index e89d300..0c6962a 100644 --- a/src/tools/mana_auto_replay.cc +++ b/src/tools/mana_auto_replay.cc @@ -139,13 +139,14 @@ std::optional make_parser_context(const mvlc::CrateConfig &crateC } } -struct ProcessingStrategy +struct ReplayStrategy { - virtual ~ProcessingStrategy() = default; + virtual ~ReplayStrategy() = default; virtual void run(ListfileContext &listfileContext, ParserContext &parserContext) = 0; }; -struct SingleThreadedStrategy: public ProcessingStrategy +// Reading buffers from file and parsing them is done in the same thread. +struct SingleThreadedStrategy: public ReplayStrategy { inline static size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext, ParserContext &parserContext) @@ -204,7 +205,8 @@ struct SingleThreadedStrategy: public ProcessingStrategy } }; -struct NngPairStrategy: public ProcessingStrategy +// An extra file reader thread is spawned. Parsing is done in the thread calling run()po +struct NngPairStrategy: public ReplayStrategy { struct SocketCounters { @@ -373,12 +375,12 @@ struct NngPairStrategy: public ProcessingStrategy void usage(const char *self) { std::cout << fmt::format( - "usage: {} [--plugin=] [--processing-strategy=] \n", self); + "usage: {} [--plugin=] [--replay-strategy=] \n", self); std::cout << " --plugin= Load a plugin to process the data. Default is a " "simple counting plugin.\n" - << " --processing-strategy= Use a specific processing strategy. " - "Available: 'single-threaded', 'nng-pair'. Default: 'nng-pair'\n" + << " --replay-strategy= Use a specific replay strategy. " + "Available: 'single-threaded', 'multi-threaded'. Default: 'multi-threaded'\n" << " --log-level= One of 'trace', 'debug', 'info', 'warn', 'error', " "'off'.\n"; } @@ -388,7 +390,8 @@ int main(int argc, char *argv[]) auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json"); const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end()); - argh::parser parser({"-h", "--help", "--plugin", "--processing-strategy", "--log-level"}); + argh::parser parser( + {"-h", "--help", "--plugin", "--replay-strategy", "--no-buffering ", "--log-level"}); parser.parse(argc, argv); auto filename = parser[1]; @@ -459,26 +462,26 @@ int main(int argc, char *argv[]) auto manaSink = std::make_unique(destSink.get()); - std::string strategyName = "nng-pair"; + std::string strategyName = "multi-threaded"; - if (parser("--processing-strategy")) - strategyName = parser("--processing-strategy").str(); + if (parser("--replay-strategy")) + strategyName = parser("--replay-strategy").str(); - std::unique_ptr processingStrategy; + std::unique_ptr replayStrategy; - if (strategyName == "nng-pair") - processingStrategy = std::make_unique("inproc://mana_module_data_stage"); + if (strategyName == "multi-threaded") + replayStrategy = std::make_unique("inproc://mana_mvlc_parsing_stage"); else if (strategyName == "single-threaded") - processingStrategy = std::make_unique(); + replayStrategy = std::make_unique(); else { - std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName); + std::cerr << fmt::format("Error: unknown replay strategy '{}'\n", strategyName); usage(argv[0]); return 1; } auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig, - jModuleDataSources, manaSink.get(), nullptr); + jModuleDataSources, destSink.get()); auto event_data = [](void *ctx_, int crateIndex, int eventIndex, const mvlc::readout_parser::ModuleData *moduleDataList, @@ -505,13 +508,22 @@ int main(int argc, char *argv[]) // make the analysis instance available to the parser callbacks parserContext->parser.userContext = &mana; - mana.sink->init(0, nullptr); - mana.sink->begin_run(mana.runDescriptor.dump().c_str()); + auto run_replay = [&] + { + manaSink->init(0, nullptr); + manaSink->begin_run(mana.runDescriptor.dump().c_str()); - processingStrategy->run(*listfileContext, *parserContext); + replayStrategy->run(*listfileContext, *parserContext); - mana.sink->end_run(mana.runDescriptor.dump().c_str()); - mana.sink->shutdown(); + manaSink->end_run(mana.runDescriptor.dump().c_str()); + manaSink->shutdown(); + }; + + // if (parser["--no-buffering"]) + { + mana.sink = manaSink.get(); + run_replay(); + } auto perf = manaSink->perf(); {