refactor things, mostly renaming

This commit is contained in:
Florian Lüke 2024-12-29 18:58:19 +01:00
parent ee8305cd0c
commit 1d2e4952ad
3 changed files with 38 additions and 27 deletions

View file

@ -74,8 +74,8 @@ auto_match_modules(const mvlc::CrateConfig &crateConfig, const nlohmann::json &m
jModule = jModule_; jModule = jModule_;
jModule["name"] = fmt::format("{}.{}", event.getName(), module_.name); jModule["name"] = fmt::format("{}.{}", event.getName(), module_.name);
for (auto &jSource: jModule["data_sources"]) for (auto &jSource: jModule["data_sources"])
jSource["name"] = jSource["name"] = fmt::format("{}.{}.{}", event.getName(), module_.name,
fmt::format("{}.{}.{}", event.getName(), module_.name, jSource["name"].get<std::string>()); jSource["name"].get<std::string>());
break; break;
} }
} }
@ -196,15 +196,13 @@ inline nlohmann::json make_run_descriptor(const RunInfo &runInfo)
inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::Arena &&arena, inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::Arena &&arena,
const mvlc::CrateConfig &crateConfig, const mvlc::CrateConfig &crateConfig,
nlohmann::json moduleDb, IManaSink *sink, nlohmann::json moduleDb, IManaSink *sink)
void *sinkContext)
{ {
ModuleDataStage result; ModuleDataStage result;
result.arena = std::move(arena); result.arena = std::move(arena);
result.crateConfig = crateConfig; result.crateConfig = crateConfig;
result.sink = sink; result.sink = sink;
result.sinkContext = sinkContext;
result.moduleInfo = auto_match_modules(crateConfig, moduleDb); result.moduleInfo = auto_match_modules(crateConfig, moduleDb);
result.runInfo = make_run_info(runName, crateConfig, result.moduleInfo); result.runInfo = make_run_info(runName, crateConfig, result.moduleInfo);
allocate_outputs(result.arena, result.runInfo); allocate_outputs(result.arena, result.runInfo);

View file

@ -13,6 +13,7 @@ struct MessageHeader
{ {
enum MessageType enum MessageType
{ {
Invalid = 0,
BeginRun = 42, BeginRun = 42,
EndRun, EndRun,
EventData, EventData,

View file

@ -139,13 +139,14 @@ std::optional<ParserContext> make_parser_context(const mvlc::CrateConfig &crateC
} }
} }
struct ProcessingStrategy struct ReplayStrategy
{ {
virtual ~ProcessingStrategy() = default; virtual ~ReplayStrategy() = default;
virtual void run(ListfileContext &listfileContext, ParserContext &parserContext) = 0; virtual void run(ListfileContext &listfileContext, ParserContext &parserContext) = 0;
}; };
struct SingleThreadedStrategy: public ProcessingStrategy // Reading buffers from file and parsing them is done in the same thread.
struct SingleThreadedStrategy: public ReplayStrategy
{ {
inline static size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext, inline static size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
ParserContext &parserContext) ParserContext &parserContext)
@ -204,7 +205,8 @@ struct SingleThreadedStrategy: public ProcessingStrategy
} }
}; };
struct NngPairStrategy: public ProcessingStrategy // An extra file reader thread is spawned. Parsing is done in the thread calling run()po
struct NngPairStrategy: public ReplayStrategy
{ {
struct SocketCounters struct SocketCounters
{ {
@ -373,12 +375,12 @@ struct NngPairStrategy: public ProcessingStrategy
void usage(const char *self) void usage(const char *self)
{ {
std::cout << fmt::format( std::cout << fmt::format(
"usage: {} [--plugin=<plugin.so>] [--processing-strategy=<name>] <zipfile>\n", self); "usage: {} [--plugin=<plugin.so>] [--replay-strategy=<name>] <zipfile>\n", self);
std::cout std::cout
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a " << " --plugin=<plugin.so> Load a plugin to process the data. Default is a "
"simple counting plugin.\n" "simple counting plugin.\n"
<< " --processing-strategy=<name> Use a specific processing strategy. " << " --replay-strategy=<name> Use a specific replay strategy. "
"Available: 'single-threaded', 'nng-pair'. Default: 'nng-pair'\n" "Available: 'single-threaded', 'multi-threaded'. Default: 'multi-threaded'\n"
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', " << " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
"'off'.\n"; "'off'.\n";
} }
@ -388,7 +390,8 @@ int main(int argc, char *argv[])
auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json"); auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json");
const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end()); const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end());
argh::parser parser({"-h", "--help", "--plugin", "--processing-strategy", "--log-level"}); argh::parser parser(
{"-h", "--help", "--plugin", "--replay-strategy", "--no-buffering ", "--log-level"});
parser.parse(argc, argv); parser.parse(argc, argv);
auto filename = parser[1]; auto filename = parser[1];
@ -459,26 +462,26 @@ int main(int argc, char *argv[])
auto manaSink = std::make_unique<mana::ManaSinkPerfProxy>(destSink.get()); auto manaSink = std::make_unique<mana::ManaSinkPerfProxy>(destSink.get());
std::string strategyName = "nng-pair"; std::string strategyName = "multi-threaded";
if (parser("--processing-strategy")) if (parser("--replay-strategy"))
strategyName = parser("--processing-strategy").str(); strategyName = parser("--replay-strategy").str();
std::unique_ptr<ProcessingStrategy> processingStrategy; std::unique_ptr<ReplayStrategy> replayStrategy;
if (strategyName == "nng-pair") if (strategyName == "multi-threaded")
processingStrategy = std::make_unique<NngPairStrategy>("inproc://mana_module_data_stage"); replayStrategy = std::make_unique<NngPairStrategy>("inproc://mana_mvlc_parsing_stage");
else if (strategyName == "single-threaded") else if (strategyName == "single-threaded")
processingStrategy = std::make_unique<SingleThreadedStrategy>(); replayStrategy = std::make_unique<SingleThreadedStrategy>();
else else
{ {
std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName); std::cerr << fmt::format("Error: unknown replay strategy '{}'\n", strategyName);
usage(argv[0]); usage(argv[0]);
return 1; return 1;
} }
auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig, auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig,
jModuleDataSources, manaSink.get(), nullptr); jModuleDataSources, destSink.get());
auto event_data = [](void *ctx_, int crateIndex, int eventIndex, auto event_data = [](void *ctx_, int crateIndex, int eventIndex,
const mvlc::readout_parser::ModuleData *moduleDataList, const mvlc::readout_parser::ModuleData *moduleDataList,
@ -505,13 +508,22 @@ int main(int argc, char *argv[])
// make the analysis instance available to the parser callbacks // make the analysis instance available to the parser callbacks
parserContext->parser.userContext = &mana; parserContext->parser.userContext = &mana;
mana.sink->init(0, nullptr); auto run_replay = [&]
mana.sink->begin_run(mana.runDescriptor.dump().c_str()); {
manaSink->init(0, nullptr);
manaSink->begin_run(mana.runDescriptor.dump().c_str());
processingStrategy->run(*listfileContext, *parserContext); replayStrategy->run(*listfileContext, *parserContext);
mana.sink->end_run(mana.runDescriptor.dump().c_str()); manaSink->end_run(mana.runDescriptor.dump().c_str());
mana.sink->shutdown(); manaSink->shutdown();
};
// if (parser["--no-buffering"])
{
mana.sink = manaSink.get();
run_replay();
}
auto perf = manaSink->perf(); auto perf = manaSink->perf();
{ {