#ifndef B63F110F_BB53_46E7_AA8E_FF6BE10CAB40 #define B63F110F_BB53_46E7_AA8E_FF6BE10CAB40 #include "mana_arena.h" #include "mana_lib.hpp" #include namespace mesytec::mnode::mana { struct BitFilterExtractor { mvlc::util::DataFilter filter; mvlc::util::CacheEntry fAddress; mvlc::util::CacheEntry fValue; }; struct ModuleInfo { std::string name; nlohmann::json jModule; std::vector outputDescriptors; std::vector extractors; std::vector> outputSpans; }; struct EventInfo { std::string name; std::vector modules; // concatentation of all module output arrays std::vector outputArrays; // total size in bytes for this event counted from the address of the first output array size_t sizeBytes = 0; }; struct RunInfo { std::string name; std::vector events; }; struct ModuleDataStage { mana::Arena arena; mvlc::CrateConfig crateConfig; std::vector> moduleInfo; RunInfo runInfo; nlohmann::json runDescriptor; IManaSink *sink = nullptr; void *sinkContext = nullptr; }; // returns: eventIndex -> moduleIndex -> module info json // currently only matches the module name against the module type name from the database. // TODO: implement match by stack group meta info inline std::vector> auto_match_modules(const mvlc::CrateConfig &crateConfig, const nlohmann::json &moduleDb) { std::vector> result; for (const auto &event: crateConfig.stacks) { std::vector eventJModules; for (const auto &module_: event.getGroups()) { nlohmann::json jModule; for (const auto &jModule_: moduleDb["modules"]) { if (module_.name.find(jModule_["type_name"]) != std::string::npos) { jModule = jModule_; jModule["name"] = fmt::format("{}.{}", event.getName(), module_.name); for (auto &jSource: jModule["data_sources"]) jSource["name"] = fmt::format("{}.{}.{}", event.getName(), module_.name, jSource["name"]); break; } } eventJModules.emplace_back(jModule); } result.emplace_back(eventJModules); } return result; } // Does not allocate output memory, only creates and fills the structure. RunInfo make_run_info(const std::string &name, const mvlc::CrateConfig &crateConfig, const std::vector> &jModuleInfo) { RunInfo result; result.name = name; for (size_t ei = 0; ei < jModuleInfo.size(); ++ei) { EventInfo eventInfo; eventInfo.name = crateConfig.stacks.at(ei).getName(); for (size_t mi = 0; mi < jModuleInfo[ei].size(); ++mi) { ModuleInfo moduleInfo; moduleInfo.name = crateConfig.stacks.at(ei).getGroup(mi).name; moduleInfo.jModule = jModuleInfo[ei][mi]; if (moduleInfo.jModule.contains("data_sources")) { for (const auto &jSource: moduleInfo.jModule["data_sources"]) { moduleInfo.outputDescriptors.emplace_back( make_array_descriptor(jSource["name"], jSource["filter"])); BitFilterExtractor extractor; extractor.filter = mvlc::util::make_filter(jSource["filter"]); extractor.fAddress = mvlc::util::make_cache_entry(extractor.filter, 'A'); extractor.fValue = mvlc::util::make_cache_entry(extractor.filter, 'D'); moduleInfo.extractors.emplace_back(extractor); } } eventInfo.modules.emplace_back(moduleInfo); } result.events.emplace_back(eventInfo); } return result; } // Allocates output memory in the arena, fills in module and event output array pointers and updates // event sizes. // Arena layout: // for each event: mana_offset_array_t[outputCount][array00][array01]...[array0N] inline void allocate_outputs(mana::Arena &arena, EventInfo &eventInfo) { const size_t outputCount = std::accumulate( std::begin(eventInfo.modules), std::end(eventInfo.modules), static_cast(0u), [](size_t sum, const ModuleInfo &mi) { return sum + mi.extractors.size(); }); eventInfo.outputArrays.clear(); auto arrayPtr = arena.push_t(outputCount); for (size_t mi = 0; mi < eventInfo.modules.size(); ++mi) { auto &moduleInfo = eventInfo.modules.at(mi); moduleInfo.outputSpans.clear(); for (const auto &extractor: moduleInfo.extractors) { size_t size = 1u << extractor.fAddress.extractBits; auto ptr = push_typed_offset_array(arena, *arrayPtr, size); moduleInfo.outputSpans.push_back({ptr, size}); eventInfo.outputArrays.push_back(arrayPtr++); } } eventInfo.sizeBytes = arena.cur_end() - reinterpret_cast(eventInfo.outputArrays.front()); } inline void allocate_outputs(mana::Arena &arena, RunInfo &runInfo) { for (size_t ei = 0; ei < runInfo.events.size(); ++ei) { allocate_outputs(arena, runInfo.events.at(ei)); } } inline nlohmann::json make_run_descriptor(const RunInfo &runInfo) { auto jOutputs = nlohmann::json::array(); for (const auto &eventInfo: runInfo.events) { auto jEventOutputs = nlohmann::json::array(); for (const auto &moduleInfo: eventInfo.modules) { for (const auto &output: moduleInfo.outputDescriptors) jEventOutputs.push_back(output); } nlohmann::json jEvent; jEvent["name"] = eventInfo.name; jEvent["outputs"] = jEventOutputs; jEvent["size_bytes"] = eventInfo.sizeBytes; jOutputs.emplace_back(jEvent); } nlohmann::json result; result["name"] = runInfo.name; result["events"] = jOutputs; return result; } inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::Arena &&arena, const mvlc::CrateConfig &crateConfig, nlohmann::json moduleDb, IManaSink *sink, void *sinkContext) { ModuleDataStage result; result.arena = std::move(arena); result.crateConfig = crateConfig; result.sink = sink; result.sinkContext = sinkContext; result.moduleInfo = auto_match_modules(crateConfig, moduleDb); result.runInfo = make_run_info(runName, crateConfig, result.moduleInfo); allocate_outputs(result.arena, result.runInfo); result.runDescriptor = make_run_descriptor(result.runInfo); spdlog::info("ModuleDataStage: runDescriptor={}", result.runDescriptor.dump(2)); spdlog::info("ModuleDataStage: arena stats={}", arena_stats(result.arena)); return result; } inline void module_data_stage_begin_run(ModuleDataStage &ctx) { ctx.sink->begin_run(ctx.runDescriptor.dump().c_str()); } inline void module_data_stage_end_run(ModuleDataStage &ctx) { ctx.sink->end_run(ctx.runDescriptor.dump().c_str()); } inline void module_data_stage_process_module_data(ModuleDataStage &ctx, int eventIndex, const mvlc::readout_parser::ModuleData *moduleDataList, unsigned moduleCount) { auto extract_module_data = [](const mvlc::readout_parser::DataBlock &data, BitFilterExtractor &ex, mvlc::util::span dest) { std::fill(std::begin(dest), std::end(dest), mnode::make_quiet_nan()); for (const u32 *word = data.data, *end = data.data + data.size; word < end; ++word) { if (mvlc::util::matches(ex.filter, *word)) { u32 address = mvlc::util::extract(ex.fAddress, *word); u32 value = mvlc::util::extract(ex.fValue, *word); assert(address < dest.size()); dest[address] = value; } } }; auto &eventInfo = ctx.runInfo.events.at(eventIndex); for (unsigned mi = 0; mi < moduleCount; ++mi) { auto &moduleData = moduleDataList[mi]; auto inputData = moduleData.hasDynamic ? dynamic_span(moduleData) : prefix_span(moduleData); auto &extractors = eventInfo.modules.at(mi).extractors; auto &spans = eventInfo.modules.at(mi).outputSpans; const size_t outputCount = extractors.size(); for (size_t oi = 0; oi < outputCount; ++oi) extract_module_data(inputData, extractors.at(oi), spans.at(oi)); } ctx.sink->process_event(eventIndex, eventInfo.outputArrays.front(), eventInfo.outputArrays.size(), eventInfo.sizeBytes); } inline void module_data_stage_process_system_event(ModuleDataStage &ctx, const u32 *data, u32 size) { ctx.sink->process_system_event(data, size); } struct ManaCountingSink: public IManaSink { std::vector eventCounts; std::vector>> eventArrayIndexHits; size_t totalBytes = 0; size_t systemEventCount = 0; size_t systemEventBytes = 0; void reset() { eventCounts.clear(); eventArrayIndexHits.clear(); totalBytes = 0; } void init(int, const char **) override {} void shutdown() override {} void begin_run(const char *descriptor_json) override { auto jRun = nlohmann::json::parse(descriptor_json); reset(); eventCounts.resize(jRun["events"].size()); // TODO: resize nested vectors eventArrayIndexHits.resize(eventCounts.size()); } void end_run(const char *descriptor_json) override { auto jRun = nlohmann::json::parse(descriptor_json); spdlog::info("ManaCountingSink: eventCounts=[{}], totalBytes={}, systemEvents={}, " "systemEventBytes={}", fmt::join(eventCounts, ", "), totalBytes, systemEventCount, systemEventBytes); for (size_t ei = 0; ei < eventArrayIndexHits.size(); ++ei) { spdlog::info("event[{}]: {} hits", ei, eventCounts[ei]); for (size_t ai = 0; ai < eventArrayIndexHits[ei].size(); ++ai) { auto name = jRun["events"][ei]["outputs"][ai]["name"]; auto arrayHits = eventArrayIndexHits[ei][ai]; auto sumHits = std::accumulate(std::begin(arrayHits), std::end(arrayHits), static_cast(0u)); spdlog::info(" {}[{}]: [{}], sum={}", name, arrayHits.size(), fmt::join(arrayHits, ", "), sumHits); } } } void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount, size_t totalBytes) override { eventCounts.resize(std::max(eventCounts.size(), static_cast(eventIndex + 1))); eventArrayIndexHits.resize(eventCounts.size()); eventArrayIndexHits[eventIndex].resize( std::max(eventArrayIndexHits[eventIndex].size(), arrayCount)); ++eventCounts[eventIndex]; for (size_t ai = 0; ai < arrayCount; ++ai) { auto input = get_span(arrays[ai]); eventArrayIndexHits[eventIndex][ai].resize(input.size()); for (size_t i = 0; i < input.size(); ++i) { if (!std::isnan(input[i])) { ++eventArrayIndexHits[eventIndex][ai][i]; } } } this->totalBytes += totalBytes; } void process_system_event(const uint32_t *data, size_t size) override { (void)data; ++systemEventCount; systemEventBytes += size * sizeof(u32); } }; } // namespace mesytec::mnode::mana #endif /* B63F110F_BB53_46E7_AA8E_FF6BE10CAB40 */