From 587bf0e6cb59b41dcaa1f8b4017647787eab8e76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Wed, 25 Dec 2024 23:37:37 +0100 Subject: [PATCH] first working mana_auto_replay ManaCountingSink counts match the mvme analysis counts for the single event is690-9Li_3H_run094 run. ~280 MB/s in release mode. --- CMakeLists.txt | 2 +- src/CMakeLists.txt | 11 ++ src/internal/mana_analysis.h | 285 +++++++++++++++++++++++++-- src/internal/mana_api.h | 65 +++---- src/internal/mana_arena.h | 11 +- src/internal/mana_lib.hpp | 81 +++++++- src/mana_plugin_c_test.c | 36 ++++ src/tools/CMakeLists.txt | 2 +- src/tools/mana_auto_replay.cc | 353 ++++------------------------------ 9 files changed, 468 insertions(+), 378 deletions(-) create mode 100644 src/mana_plugin_c_test.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 3fc2a6a..edd2354 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.15) -project(mesytec-mnode LANGUAGES CXX) +project(mesytec-mnode LANGUAGES C CXX) set(MESYTEC_MNODE_MAIN_PROJECT OFF) if (CMAKE_CURRENT_SOURCE_DIR STREQUAL CMAKE_SOURCE_DIR) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6602053..f1e0c46 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -23,3 +23,14 @@ if (MNODE_BUILD_TESTS) add_mnode_gtest(mana) target_link_libraries(test_mana PRIVATE mana) endif() + + +add_library(mana-plugin-c-test SHARED mana_plugin_c_test.c) + +find_package(ROOT COMPONENTS Hist) +if (ROOT_FOUND) + message("-- Using ROOT installation from ${ROOT_USE_FILE}") + include(${ROOT_USE_FILE}) + add_library(mana-plugin-root-histogram SHARED mana_plugin_root_histogram.cc) + target_link_libraries(mana-plugin-root-histogram PRIVATE mana ${ROOT_LIBRARIES}) +endif() diff --git a/src/internal/mana_analysis.h b/src/internal/mana_analysis.h index 5972b2c..06c3bcc 100644 --- a/src/internal/mana_analysis.h +++ b/src/internal/mana_analysis.h @@ -13,39 +13,217 @@ struct BitFilterExtractor mvlc::util::DataFilter filter; mvlc::util::CacheEntry fAddress; mvlc::util::CacheEntry fValue; - mana_offset_array_t *dest; +}; + +struct ModuleInfo +{ + std::string name; + nlohmann::json jModule; + std::vector outputDescriptors; + std::vector extractors; + std::vector> outputSpans; +}; + +struct EventInfo +{ + std::string name; + std::vector modules; + // concatentation of all module output arrays + std::vector outputArrays; + // total size in bytes for this event counted from the address of the first output array + size_t sizeBytes = 0; +}; + +struct RunInfo +{ + std::string name; + std::vector events; }; struct ModuleDataStage { mana::Arena arena; + mvlc::CrateConfig crateConfig; + std::vector> moduleInfo; + RunInfo runInfo; nlohmann::json runDescriptor; - // eventindex -> list of pointers to output offset arrays - std::vector> eventArrayPointers; - // eventIndex, moduleIndex -> list of filters attached to the module. - std::vector>> dataSources; - mana_sink_t sink = {}; + ManaPlugin *sink = nullptr; void *sinkContext = nullptr; }; -inline ModuleDataStage make_module_data_stage(const std::string &runName, +// returns: eventIndex -> moduleIndex -> module info json +// currently only matches the module name against the module type name from the database. +// TODO: implement match by stack group meta info +inline std::vector> +auto_match_modules(const mvlc::CrateConfig &crateConfig, const nlohmann::json &moduleDb) +{ + std::vector> result; + + for (const auto &event: crateConfig.stacks) + { + std::vector eventJModules; + + for (const auto &module_: event.getGroups()) + { + nlohmann::json jModule; + + for (const auto &jModule_: moduleDb["modules"]) + { + if (module_.name.find(jModule_["type_name"]) != std::string::npos) + { + jModule = jModule_; + jModule["name"] = fmt::format("{}.{}", event.getName(), module_.name); + for (auto &jSource: jModule["data_sources"]) + jSource["name"] = + fmt::format("{}.{}.{}", event.getName(), module_.name, jSource["name"]); + break; + } + } + + eventJModules.emplace_back(jModule); + } + result.emplace_back(eventJModules); + } + + return result; +} + +// Does not allocate output memory, only creates and fills the structure. +RunInfo make_run_info(const std::string &name, const mvlc::CrateConfig &crateConfig, + const std::vector> &jModuleInfo) +{ + RunInfo result; + result.name = name; + + for (size_t ei = 0; ei < jModuleInfo.size(); ++ei) + { + EventInfo eventInfo; + eventInfo.name = crateConfig.stacks.at(ei).getName(); + + for (size_t mi = 0; mi < jModuleInfo[ei].size(); ++mi) + { + ModuleInfo moduleInfo; + moduleInfo.name = crateConfig.stacks.at(ei).getGroup(mi).name; + moduleInfo.jModule = jModuleInfo[ei][mi]; + + if (moduleInfo.jModule.contains("data_sources")) + { + for (const auto &jSource: moduleInfo.jModule["data_sources"]) + { + moduleInfo.outputDescriptors.emplace_back( + make_array_descriptor(jSource["name"], jSource["filter"])); + + BitFilterExtractor extractor; + extractor.filter = mvlc::util::make_filter(jSource["filter"]); + extractor.fAddress = mvlc::util::make_cache_entry(extractor.filter, 'A'); + extractor.fValue = mvlc::util::make_cache_entry(extractor.filter, 'D'); + moduleInfo.extractors.emplace_back(extractor); + } + } + + eventInfo.modules.emplace_back(moduleInfo); + } + + result.events.emplace_back(eventInfo); + } + + return result; +} + +// Allocates output memory in the arena, fills in module and event output array pointers and updates +// event sizes. +// Arena layout: +// for each event: mana_offset_array_t[outputCount][array00][array01]...[array0N] +inline void allocate_outputs(mana::Arena &arena, EventInfo &eventInfo) +{ + const size_t outputCount = std::accumulate( + std::begin(eventInfo.modules), std::end(eventInfo.modules), static_cast(0u), + [](size_t sum, const ModuleInfo &mi) { return sum + mi.extractors.size(); }); + + eventInfo.outputArrays.clear(); + auto arrayPtr = arena.push_t(outputCount); + + for (size_t mi = 0; mi < eventInfo.modules.size(); ++mi) + { + auto &moduleInfo = eventInfo.modules.at(mi); + moduleInfo.outputSpans.clear(); + + for (const auto &extractor: moduleInfo.extractors) + { + size_t size = 1u << extractor.fAddress.extractBits; + auto ptr = push_typed_offset_array(arena, *arrayPtr, size); + moduleInfo.outputSpans.push_back({ptr, size}); + eventInfo.outputArrays.push_back(arrayPtr++); + } + } + + eventInfo.sizeBytes = arena.cur_end() - reinterpret_cast(eventInfo.outputArrays.front()); +} + +inline void allocate_outputs(mana::Arena &arena, RunInfo &runInfo) +{ + for (size_t ei = 0; ei < runInfo.events.size(); ++ei) + { + allocate_outputs(arena, runInfo.events.at(ei)); + } +} + +inline nlohmann::json make_run_descriptor(const RunInfo &runInfo) +{ + auto jOutputs = nlohmann::json::array(); + + for (const auto &eventInfo: runInfo.events) + { + auto jEventOutputs = nlohmann::json::array(); + for (const auto &moduleInfo: eventInfo.modules) + { + for (const auto &output: moduleInfo.outputDescriptors) + jEventOutputs.push_back(output); + } + nlohmann::json jEvent; + jEvent["name"] = eventInfo.name; + jEvent["outputs"] = jEventOutputs; + jEvent["size_bytes"] = eventInfo.sizeBytes; + jOutputs.emplace_back(jEvent); + } + + nlohmann::json result; + result["name"] = runInfo.name; + result["events"] = jOutputs; + + return result; +} + +inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::Arena &&arena, const mvlc::CrateConfig &crateConfig, - nlohmann::json moduleDb, mana_sink_t sink, + nlohmann::json moduleDb, ManaPlugin *sink, void *sinkContext) { ModuleDataStage result; + result.arena = std::move(arena); + result.crateConfig = crateConfig; + result.sink = sink; + result.sinkContext = sinkContext; + result.moduleInfo = auto_match_modules(crateConfig, moduleDb); + result.runInfo = make_run_info(runName, crateConfig, result.moduleInfo); + allocate_outputs(result.arena, result.runInfo); + result.runDescriptor = make_run_descriptor(result.runInfo); + + spdlog::info("ModuleDataStage: runDescriptor={}", result.runDescriptor.dump(2)); + spdlog::info("ModuleDataStage: arena stats={}", arena_stats(result.arena)); + return result; } inline void module_data_stage_begin_run(ModuleDataStage &ctx) { - ctx.sink.begin_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str()); + ctx.sink->begin_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str()); } inline void module_data_stage_end_run(ModuleDataStage &ctx) { - ctx.sink.end_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str()); + ctx.sink->end_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str()); } inline void @@ -53,11 +231,9 @@ module_data_stage_process_module_data(ModuleDataStage &ctx, int eventIndex, const mvlc::readout_parser::ModuleData *moduleDataList, unsigned moduleCount) { - auto extract_module_data = - [](const mvlc::readout_parser::DataBlock &data, BitFilterExtractor &ex) + auto extract_module_data = [](const mvlc::readout_parser::DataBlock &data, + BitFilterExtractor &ex, mvlc::util::span dest) { - auto dest = get_span(*ex.dest); - std::fill(std::begin(dest), std::end(dest), mnode::make_quiet_nan()); for (const u32 *word = data.data, *end = data.data + data.size; word < end; ++word) @@ -72,21 +248,90 @@ module_data_stage_process_module_data(ModuleDataStage &ctx, int eventIndex, } }; - auto &eventSources = ctx.dataSources.at(eventIndex); + auto &eventInfo = ctx.runInfo.events.at(eventIndex); for (unsigned mi = 0; mi < moduleCount; ++mi) { auto &moduleData = moduleDataList[mi]; - auto dataBlock = moduleData.hasDynamic ? dynamic_span(moduleData) : prefix_span(moduleData); + auto inputData = moduleData.hasDynamic ? dynamic_span(moduleData) : prefix_span(moduleData); + auto &extractors = eventInfo.modules.at(mi).extractors; + auto &spans = eventInfo.modules.at(mi).outputSpans; + const size_t outputCount = extractors.size(); - for (auto &source: eventSources.at(mi)) - extract_module_data(dataBlock, source); + for (size_t oi = 0; oi < outputCount; ++oi) + extract_module_data(inputData, extractors.at(oi), spans.at(oi)); } - // auto arrays = ctx.eventArrayPointers.at(eventIndex); - // ctx.sink.process_event(ctx.sinkContext, eventIndex, + ctx.sink->process_event(ctx.sinkContext, eventIndex, eventInfo.outputArrays.front(), + eventInfo.outputArrays.size(), eventInfo.sizeBytes); } +inline void module_data_stage_process_system_event(ModuleDataStage &ctx, const u32 *data, u32 size) +{ + ctx.sink->process_system_event(ctx.sinkContext, data, size); +} + +struct ManaCountingSink: public ManaPlugin +{ + std::vector eventCounts; + std::vector> eventArrayHits; + size_t totalBytes = 0; + size_t systemEventCount = 0; + size_t systemEventBytes = 0; + void reset() + { + eventCounts.clear(); + eventArrayHits.clear(); + totalBytes = 0; + } + + MANA_DEFINE_PLUGIN_INIT(init) override { return nullptr; } + + MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) override { (void)context; } + + MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) override + { + (void)context; + auto jRun = nlohmann::json::parse(descriptor_json); + reset(); + eventCounts.resize(jRun["events"].size()); + eventArrayHits.resize(eventCounts.size()); + } + + MANA_DEFINE_PLUGIN_END_RUN(end_run) override + { + (void)context; + (void)descriptor_json; + } + + MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) override + { + (void)context; + eventCounts.resize(std::max(eventCounts.size(), static_cast(eventIndex + 1))); + eventArrayHits.resize(eventCounts.size()); + eventArrayHits[eventIndex].resize(std::max(eventArrayHits[eventIndex].size(), arrayCount)); + + ++eventCounts[eventIndex]; + for (size_t ai = 0; ai < arrayCount; ++ai) + { + auto input = get_span(arrays[ai]); + bool hit = std::any_of(std::begin(input), std::end(input), + [](float v) { return !std::isnan(v); }); + if (hit) + ++eventArrayHits[eventIndex][ai]; + } + this->totalBytes += totalBytes; + } + + MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) override + { + (void)context; + (void)data; + ++systemEventCount; + systemEventBytes += size * sizeof(u32); + } +}; + } // namespace mesytec::mnode::mana #endif /* B63F110F_BB53_46E7_AA8E_FF6BE10CAB40 */ diff --git a/src/internal/mana_api.h b/src/internal/mana_api.h index ec79b70..bf3b9b9 100644 --- a/src/internal/mana_api.h +++ b/src/internal/mana_api.h @@ -6,8 +6,8 @@ #ifdef __cplusplus extern "C" -#endif { +#endif typedef enum { @@ -33,47 +33,44 @@ extern "C" size_t size_bytes; } mana_offset_array_t; -#define MANA_DEFINE_PLUGIN_INIT(name) \ - void *name() +#define MANA_DEFINE_PLUGIN_INIT(name) void *name() -#define MANA_DEFINE_PLUGIN_SHUTDOWN(name) \ - void name(void *context) +#define MANA_DEFINE_PLUGIN_SHUTDOWN(name) void name(void *context) -#define MANA_DEFINE_PLUGIN_BEGIN_RUN(name) \ - void name(void *context, const char *descriptor_json) +#define MANA_DEFINE_PLUGIN_BEGIN_RUN(name) void name(void *context, const char *descriptor_json) -#define MANA_DEFINE_PLUGIN_END_RUN(name) \ - void name(void *context, const char *descriptor_json) +#define MANA_DEFINE_PLUGIN_END_RUN(name) void name(void *context, const char *descriptor_json) -#define MANA_DEFINE_PLUGIN_EVENT_DATA(name) \ - void name(void *context, uint16_t eventIndex, const mana_offset_array_t *arrays, size_t arrayCount, size_t totalBytes) +#define MANA_DEFINE_PLUGIN_EVENT_DATA(name) \ + void name(void *context, uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount, \ + size_t totalBytes) -#define MANA_DEFINE_PLUGIN_SYSTEM_EVENT(name) \ - void name(void *context, const uint32_t *data, size_t size) +#define MANA_DEFINE_PLUGIN_SYSTEM_EVENT(name) \ + void name(void *context, const uint32_t *data, size_t size) -typedef MANA_DEFINE_PLUGIN_INIT(mana_init_t); -typedef MANA_DEFINE_PLUGIN_SHUTDOWN(mana_shutdown_t); -typedef MANA_DEFINE_PLUGIN_BEGIN_RUN(mana_begin_run_t); -typedef MANA_DEFINE_PLUGIN_END_RUN(mana_end_run_t); -typedef MANA_DEFINE_PLUGIN_EVENT_DATA(mana_process_event_t); -typedef MANA_DEFINE_PLUGIN_SYSTEM_EVENT(mana_process_system_event_t); + typedef MANA_DEFINE_PLUGIN_INIT(mana_init_t); + typedef MANA_DEFINE_PLUGIN_SHUTDOWN(mana_shutdown_t); + typedef MANA_DEFINE_PLUGIN_BEGIN_RUN(mana_begin_run_t); + typedef MANA_DEFINE_PLUGIN_END_RUN(mana_end_run_t); + typedef MANA_DEFINE_PLUGIN_EVENT_DATA(mana_process_event_t); + typedef MANA_DEFINE_PLUGIN_SYSTEM_EVENT(mana_process_system_event_t); -typedef struct -{ - mana_init_t *init; - mana_shutdown_t *shutdown; - mana_begin_run_t *begin_run; - mana_end_run_t *end_run; - mana_process_event_t *process_event; - mana_process_system_event_t *process_system_event; -} mana_sink_t; + typedef struct + { + mana_init_t *init; + mana_shutdown_t *shutdown; + mana_begin_run_t *begin_run; + mana_end_run_t *end_run; + mana_process_event_t *process_event; + mana_process_system_event_t *process_system_event; + } mana_plugin_t; - // event data serialization format: - // u32 event count - // u32 total size in bytes - // u16 eventIndex, u16 array descriptor count - // array descriptors - // data arrays +// plugins need to define this function with the name 'mana_get_plugin' +#define MANA_DEFINE_GET_PLUGIN(name) mana_plugin_t name() + typedef MANA_DEFINE_GET_PLUGIN(mana_get_plugin_t); + +#ifdef __cplusplus } +#endif #endif /* A51A04C1_ABD6_4DE9_B16A_49A9DA46C67E */ diff --git a/src/internal/mana_arena.h b/src/internal/mana_arena.h index fe3972d..0c5ca18 100644 --- a/src/internal/mana_arena.h +++ b/src/internal/mana_arena.h @@ -5,6 +5,7 @@ #include #include #include +#include #include namespace mesytec::mnode::mana @@ -20,8 +21,8 @@ class Arena Arena() = default; ~Arena() = default; - Arena(Arena &&other); - Arena &operator=(Arena &&other); + Arena(Arena &&other) = default; + Arena &operator=(Arena &&other) = default; // pushes at least the required amount of memory onto the arena. // 'required' is rounded up to the nearest multiple of 'default_pad' to make @@ -127,6 +128,12 @@ class Arena size_t pad_waste_ = 0; }; +inline std::string arena_stats(const Arena &arena) +{ + return fmt::format("allocations={}, capacity={}, pad_waste={}, used_size={}", + arena.allocations(), arena.capacity(), arena.pad_waste(), arena.used()); +} + } // namespace mesytec::mnode::mana #endif /* F20CF38F_7327_4608_8307_6AE058041CD5 */ diff --git a/src/internal/mana_lib.hpp b/src/internal/mana_lib.hpp index 010317c..e73fef5 100644 --- a/src/internal/mana_lib.hpp +++ b/src/internal/mana_lib.hpp @@ -68,11 +68,34 @@ inline void set(mana_offset_ptr_t &ptr, char *p) { set(ptr, mana_sint8, p); } inline void set(mana_offset_ptr_t &ptr, float *p) { set(ptr, mana_float, p); } inline void set(mana_offset_ptr_t &ptr, double *p) { set(ptr, mana_double, p); } +template +T *push_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size) +{ + T *ptr = arena.push_t(size); + mana::set(dest.ptr, mana_custom, ptr); + dest.size_bytes = size * sizeof(T); + return ptr; +} + +template +T *push_typed_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size) +{ + T *ptr = arena.push_t(size); + mana::set(dest.ptr, ptr); + dest.size_bytes = size * sizeof(T); + return ptr; +} + +template size_t element_count(mana_offset_array_t &array) +{ + return array.size_bytes / sizeof(T); +} + template mvlc::util::span get_span(mana_offset_array_t &array) { auto ptr = reinterpret_cast(get(array.ptr)); - auto size = array.size_bytes / sizeof(T); - return { ptr, size }; + auto size = element_count(array); + return {ptr, size}; } inline nlohmann::json make_array_descriptor(const std::string &name, mana_data_type_t data_type, @@ -102,6 +125,60 @@ inline nlohmann::json make_array_descriptor(const std::string &name, const std:: return make_array_descriptor(name, mana_float, size, bits); } +struct ManaPlugin +{ + virtual ~ManaPlugin() = default; + virtual MANA_DEFINE_PLUGIN_INIT(init) = 0; + virtual MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) = 0; + virtual MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) = 0; + virtual MANA_DEFINE_PLUGIN_END_RUN(end_run) = 0; + virtual MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) = 0; + virtual MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) = 0; +}; + +struct ManaCPlugin: public ManaPlugin +{ + mana_plugin_t plugin_; + void *context_ = nullptr; + + ManaCPlugin(mana_plugin_t plugin) + : plugin_(plugin) + { + } + + MANA_DEFINE_PLUGIN_INIT(init) override { return context_ = plugin_.init(); } + + MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) override + { + (void)context; + plugin_.shutdown(context_); + } + + MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) override + { + (void)context; + plugin_.begin_run(context_, descriptor_json); + } + + MANA_DEFINE_PLUGIN_END_RUN(end_run) override + { + (void)context; + plugin_.end_run(context_, descriptor_json); + } + + MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) override + { + (void)context; + plugin_.process_event(context_, eventIndex, arrays, arrayCount, totalBytes); + } + + MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) override + { + (void)context; + plugin_.process_system_event(context_, data, size); + } +}; + } // namespace mesytec::mnode::mana #endif /* AAB5E4D2_A05B_4F2F_B76A_406A5A569D55 */ diff --git a/src/mana_plugin_c_test.c b/src/mana_plugin_c_test.c new file mode 100644 index 0000000..a4c9642 --- /dev/null +++ b/src/mana_plugin_c_test.c @@ -0,0 +1,36 @@ +#include "internal/mana_api.h" +#include + +struct Context +{ +}; + +struct Context ctx_; + +MANA_DEFINE_PLUGIN_INIT(init) +{ + memset(&ctx_, 0, sizeof(ctx_)); + return &ctx_; +} + +MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) {} + +MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) {} + +MANA_DEFINE_PLUGIN_END_RUN(end_run) {} + +MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) {} + +MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) {} + +MANA_DEFINE_GET_PLUGIN(mana_get_plugin) +{ + mana_plugin_t plugin; + plugin.init = init; + plugin.shutdown = shutdown; + plugin.begin_run = begin_run; + plugin.end_run = end_run; + plugin.process_event = process_event; + plugin.process_system_event = process_system_event; + return plugin; +} diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index 59d9421..7569022 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -22,7 +22,7 @@ add_mnode_dev_executable(mesy_nng_push_pull_main) add_mnode_dev_executable(mesy_nng_pub_producer) add_mnode_dev_executable(mesy_nng_sub_consumer) -find_package(Boost COMPONENTS filesystem system) +find_package(Boost CONFIG COMPONENTS filesystem system) if (Boost_FOUND) add_mnode_dev_executable(mana_auto_replay) target_link_libraries(mana_auto_replay PRIVATE nlohmann_json::nlohmann_json mnode::resources Boost::filesystem Boost::system) diff --git a/src/tools/mana_auto_replay.cc b/src/tools/mana_auto_replay.cc index f4727fa..d220191 100644 --- a/src/tools/mana_auto_replay.cc +++ b/src/tools/mana_auto_replay.cc @@ -40,6 +40,7 @@ #include #include "internal/mana_arena.h" #include "internal/mana_lib.hpp" +#include "internal/mana_analysis.h" CMRC_DECLARE(mnode::resources); @@ -132,313 +133,8 @@ std::optional make_parser_context(const mvlc::CrateConfig &crateC } } -MANA_DEFINE_PLUGIN_INIT(mana_default_init) { return nullptr; } -MANA_DEFINE_PLUGIN_SHUTDOWN(mana_default_shutdown) {} -MANA_DEFINE_PLUGIN_BEGIN_RUN(mana_default_begin_run) {} -MANA_DEFINE_PLUGIN_END_RUN(mana_default_end_run) {} -MANA_DEFINE_PLUGIN_EVENT_DATA(mana_default_process_event) {} -MANA_DEFINE_PLUGIN_SYSTEM_EVENT(mana_default_system_event) {} - -static mana_sink_t DefaultPlugin = {.init = mana_default_init, - .shutdown = mana_default_shutdown, - .begin_run = mana_default_begin_run, - .end_run = mana_default_end_run, - .process_event = mana_default_process_event}; - -struct DataSource -{ - mvlc::util::DataFilter filter; - mvlc::util::CacheEntry fAddress; - mvlc::util::CacheEntry fValue; - mana_offset_array_t *dest; -}; - -struct AnalysisContext -{ - std::unique_ptr arena; - nlohmann::json runDescriptor; - std::vector> eventArrays; - std::vector>> - dataSources; // eventIndex, moduleIndex, sourceIndex - mana_sink_t outputPlugin = DefaultPlugin; - void *outputPluginContext = nullptr; - - size_t runDescriptorBytes = 0; - size_t eventStoragesBytes = 0; -}; - -std::map -make_module_info_by_type(const nlohmann::json &jModuleDataSources) -{ - // module type name -> module info json - std::map moduleInfoByType; - for (const auto &mod_: jModuleDataSources["modules"]) - moduleInfoByType[mod_["type_name"]] = mod_; - return moduleInfoByType; -} - -std::vector> -match_modules(const mvlc::CrateConfig &crateConfig, - const std::map &moduleInfoByType) -{ - std::vector> moduleDataSources; - for (const auto &event: crateConfig.stacks) - { - std::vector eventModuleDataSources; - for (const auto &module_: event.getGroups()) - { - nlohmann::json jModule; - // match the meta data module type name against the concrete module name - for (const auto &[type_, info]: moduleInfoByType) - { - if (module_.name.find(type_) != std::string::npos) - { - spdlog::info("match: type={}, name={}", type_, module_.name); - jModule = info; - break; - } - } - - if (jModule.empty()) - { - spdlog::warn("No module info found for module name '{}'", module_.name); - } - - eventModuleDataSources.emplace_back(jModule); - } - moduleDataSources.emplace_back(eventModuleDataSources); - } - - return moduleDataSources; -} - -struct DataSourceInfo -{ - const std::string name; - const std::string filterString; - mvlc::util::FilterWithCaches filter; - - DataSourceInfo(const std::string &name_, const std::string &filterString_) - : name(name_) - , filterString(filterString_) - , filter(mvlc::util::make_filter_with_caches(filterString)) - { - } -}; - -std::vector> -make_event_ds_info(const mvlc::CrateConfig &crateConfig, - const std::vector> &moduleDataSources) -{ - std::vector> eventDsInfo; - - for (size_t eventIndex = 0; eventIndex < crateConfig.stacks.size(); ++eventIndex) - { - const auto &dataSources = moduleDataSources.at(eventIndex); - const auto &event = crateConfig.stacks.at(eventIndex); - const auto &readouts = event.getGroups(); - std::vector dsInfo; - - for (size_t moduleIndex = 0; moduleIndex < event.getGroups().size(); ++moduleIndex) - { - const auto &readout = readouts.at(moduleIndex); - const auto &ds = dataSources.at(moduleIndex); - - spdlog::info("readout.name={}, ds={}", readout.name, ds.dump()); - if (ds.contains("data_sources")) - { - for (const auto &filter: ds["data_sources"]) - { - auto name = - fmt::format("{}.{}.{}", event.getName(), readout.name, filter["name"]); - dsInfo.emplace_back(DataSourceInfo(name, filter["filter"])); - } - } - } - eventDsInfo.emplace_back(dsInfo); - } - - return eventDsInfo; -} - -template -T *push_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size) -{ - T *ptr = arena.push_t(size); - mana::set(dest.ptr, mana_custom, ptr); - dest.size_bytes = size * sizeof(T); - return ptr; -} - -template -T *push_typed_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size) -{ - T *ptr = arena.push_t(size); - mana::set(dest.ptr, ptr); - dest.size_bytes = size * sizeof(T); - return ptr; -} - -template size_t element_count(mana_offset_array_t &array) -{ - return array.size_bytes / sizeof(T); -} - -template mvlc::util::span get_span(mana_offset_array_t &array) -{ - auto ptr = reinterpret_cast(mana::get(array.ptr)); - auto size = element_count(array); - return {ptr, size}; -} - -#if 0 -std::pair -make_run_descriptor(mana::Arena &arena, const std::string &runName, - const std::vector> &eventDsInfo) -{ - auto rd = arena.push_t(); - mana::set(rd->name, arena.push_cstr(runName)); - auto eds = push_offset_array(arena, rd->events, eventDsInfo.size()); - rd->event_count = eventDsInfo.size(); - - for (const auto &eventDataSources: eventDsInfo) - { - auto ed = eds++; - auto ads = push_offset_array(arena, ed->data_arrays, - eventDataSources.size()); - ed->data_array_count = eventDataSources.size(); - - for (const auto &ds: eventDataSources) - { - auto ad = ads++; - mana::set(ad->name, arena.push_cstr(ds.name)); - ad->data_type = mana_float; - ad->size = 1; - if (auto c = get_cache_entry(ds.filter, 'A'); c) - ad->size = 1u << c->extractBits; - if (auto c = get_cache_entry(ds.filter, 'D'); c) - ad->bits = c->extractBits; - } - } - - return {rd, arena.cur_end() - reinterpret_cast(rd)}; -} - -std::pair, size_t> -make_event_storages(mana::Arena &arena, const std::vector> &eventDsInfo) -{ - auto eds = arena.push_t(eventDsInfo.size()); - - for (size_t eventIndex = 0; eventIndex < eventDsInfo.size(); ++eventIndex) - { - auto &ed = eds[eventIndex]; - auto &dsInfo = eventDsInfo[eventIndex]; - auto das = push_offset_array(arena, ed.data_arrays, dsInfo.size()); - - for (const auto &ds: dsInfo) - { - auto da = das++; - size_t size = 1; - if (auto c = get_cache_entry(ds.filter, 'A'); c) - size = 1u << c->extractBits; - push_typed_offset_array(arena, *da, size); - } - } - - return {{eds, eventDsInfo.size()}, arena.cur_end() - reinterpret_cast(eds)}; -} - -void dump(mana_run_descriptor_t *rd) -{ - auto eds = get_span(rd->events); - - spdlog::info("mana_run_descriptor @ {}, name={}, event_count={}", fmt::ptr(rd), - mana::get(rd->name), eds.size()); - - for (size_t eventIndex = 0; eventIndex < rd->event_count; ++eventIndex) - { - auto &ed = eds[eventIndex]; - spdlog::info(" event[{}]:", eventIndex); - - auto ads = get_span(ed.data_arrays); - - for (auto &ad: ads) - { - spdlog::info(" array: name={}, data_type={}, size={}, bits={}", - mana::get(ad.name), static_cast(ad.data_type), ad.size, - ad.bits); - } - } -} -#endif - -std::optional make_mana(const std::string runName, - const mvlc::CrateConfig &crateConfig, - const nlohmann::json &jModuleDataSources) -{ - try - { - // module type name -> module info json - auto moduleInfoByType = make_module_info_by_type(jModuleDataSources); - - AnalysisContext ctx; - ctx.arena = std::make_unique(); - // auto arena = ctx.arena.get(); - - // eventIndex -> moduleIndex -> module info json - auto moduleDataSources = match_modules(crateConfig, moduleInfoByType); -#if 0 - auto eventDsInfo = make_event_ds_info(crateConfig, moduleDataSources); - - auto arena = ctx.arena.get(); - - auto [runDescriptor, runDescriptorSize] = make_run_descriptor(*arena, runName, eventDsInfo); - - spdlog::info( - "runDescriptor @ {}, size={}, allocations={}, capacity={}, pad_waste={}, used_size={}", - fmt::ptr(runDescriptor), runDescriptorSize, arena->allocations(), arena->capacity(), - arena->pad_waste(), arena->used()); - - if (runDescriptor) - dump(runDescriptor); - - auto [eventStorages, eventStoragesSize] = make_event_storages(*arena, eventDsInfo); - - spdlog::info( - "eventStorages @ {}, size={}, allocations={}, capacity={}, pad_waste={}, used_size={}", - fmt::ptr(&eventStorages.front()), eventStoragesSize, arena->allocations(), - arena->capacity(), arena->pad_waste(), arena->used()); - - for (size_t eventIndex = 0; eventIndex < eventStorages.size(); ++eventIndex) - { - auto &es = eventStorages[eventIndex]; - auto arrays = get_span(es.data_arrays); - spdlog::info("eventStorage: event={}, arrays={}", eventIndex, arrays.size()); - for (auto &array: arrays) - { - spdlog::info(" array @ {}, size_bytes={}, size={}", - fmt::ptr(mana::get(array.ptr)), array.size_bytes, - element_count(array)); - } - } - - ctx.runDescriptor = runDescriptor; - ctx.eventStorages = eventStorages; - ctx.runDescriptorBytes = runDescriptorSize; - ctx.eventStoragesBytes = eventStoragesSize; -#endif - - return ctx; - } - catch (const std::exception &e) - { - std::cerr << fmt::format("Error: {}\n", e.what()); - return {}; - } -} - size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext, - ParserContext &parserContext, AnalysisContext &analysisContext) + ParserContext &parserContext, mana::ModuleDataStage &analysisContext) { listfileContext.readerHelper.destBuf().clear(); auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper); @@ -473,23 +169,24 @@ int main(int argc, char *argv[]) if (!listfileContext) return 1; - auto analysisContext = make_mana(filename, listfileContext->crateConfig, jModuleDataSources); + mana::ManaCountingSink manaPlugin; + auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig, + jModuleDataSources, &manaPlugin, nullptr); - if (!analysisContext) - return 1; - - auto event_data = [](void *ctx, int crateIndex, int eventIndex, + auto event_data = [](void *ctx_, int crateIndex, int eventIndex, const mvlc::readout_parser::ModuleData *moduleDataList, unsigned moduleCount) { - // reinterpret_cast(ctx)->process_event_data(crateIndex, eventIndex, - // moduleDataList, - // moduleCount); + (void)crateIndex; + auto ctx = reinterpret_cast(ctx_); + mana::module_data_stage_process_module_data(*ctx, eventIndex, moduleDataList, moduleCount); }; - auto system_event = [](void *ctx, int crateIndex, const u32 *header, u32 size) + auto system_event = [](void *ctx_, int crateIndex, const u32 *header, u32 size) { - // reinterpret_cast(ctx)->process_system_event(crateIndex, header, size); + (void)crateIndex; + auto ctx = reinterpret_cast(ctx_); + mana::module_data_stage_process_system_event(*ctx, header, size); }; auto parserContext = @@ -498,6 +195,9 @@ int main(int argc, char *argv[]) if (!parserContext) return 1; + mana.sinkContext = mana.sink->init(); + mana.sink->begin_run(mana.sinkContext, mana.runDescriptor.dump().c_str()); + size_t bufferNumber = 0; size_t totalBytesProcessed = 0; size_t bytesProcessed = 0; @@ -518,8 +218,7 @@ int main(int argc, char *argv[]) do { - bytesProcessed = - process_one_buffer(bufferNumber, *listfileContext, *parserContext, *analysisContext); + bytesProcessed = process_one_buffer(bufferNumber, *listfileContext, *parserContext, mana); totalBytesProcessed += bytesProcessed; ++bufferNumber; @@ -533,5 +232,23 @@ int main(int argc, char *argv[]) report(); + mana.sink->end_run(mana.sinkContext, mana.runDescriptor.dump().c_str()); + mana.sink->shutdown(mana.sinkContext); + + spdlog::info( + "ManaCountingSink: eventCounts=[{}], totalBytes={}, systemEvents={}, systemEventBytes={}", + fmt::join(manaPlugin.eventCounts, ", "), manaPlugin.totalBytes, manaPlugin.systemEventCount, + manaPlugin.systemEventBytes); + + for (size_t ei = 0; ei < manaPlugin.eventArrayHits.size(); ++ei) + { + spdlog::info("event[{}]: {} hits", ei, manaPlugin.eventCounts[ei]); + for (size_t ai = 0; ai < manaPlugin.eventArrayHits[ei].size(); ++ai) + { + auto name = mana.runDescriptor["events"][ei]["outputs"][ai]["name"]; + spdlog::info(" {}: {}", name, manaPlugin.eventArrayHits[ei][ai]); + } + } + return 0; }