first working mana_auto_replay

ManaCountingSink counts match the mvme analysis counts for the single event
is690-9Li_3H_run094 run. ~280 MB/s in release mode.
This commit is contained in:
Florian Lüke 2024-12-25 23:37:37 +01:00
parent ce00821bbb
commit 587bf0e6cb
9 changed files with 468 additions and 378 deletions

View file

@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15) cmake_minimum_required(VERSION 3.15)
project(mesytec-mnode LANGUAGES CXX) project(mesytec-mnode LANGUAGES C CXX)
set(MESYTEC_MNODE_MAIN_PROJECT OFF) set(MESYTEC_MNODE_MAIN_PROJECT OFF)
if (CMAKE_CURRENT_SOURCE_DIR STREQUAL CMAKE_SOURCE_DIR) if (CMAKE_CURRENT_SOURCE_DIR STREQUAL CMAKE_SOURCE_DIR)

View file

@ -23,3 +23,14 @@ if (MNODE_BUILD_TESTS)
add_mnode_gtest(mana) add_mnode_gtest(mana)
target_link_libraries(test_mana PRIVATE mana) target_link_libraries(test_mana PRIVATE mana)
endif() endif()
add_library(mana-plugin-c-test SHARED mana_plugin_c_test.c)
find_package(ROOT COMPONENTS Hist)
if (ROOT_FOUND)
message("-- Using ROOT installation from ${ROOT_USE_FILE}")
include(${ROOT_USE_FILE})
add_library(mana-plugin-root-histogram SHARED mana_plugin_root_histogram.cc)
target_link_libraries(mana-plugin-root-histogram PRIVATE mana ${ROOT_LIBRARIES})
endif()

View file

@ -13,39 +13,217 @@ struct BitFilterExtractor
mvlc::util::DataFilter filter; mvlc::util::DataFilter filter;
mvlc::util::CacheEntry fAddress; mvlc::util::CacheEntry fAddress;
mvlc::util::CacheEntry fValue; mvlc::util::CacheEntry fValue;
mana_offset_array_t *dest; };
struct ModuleInfo
{
std::string name;
nlohmann::json jModule;
std::vector<nlohmann::json> outputDescriptors;
std::vector<BitFilterExtractor> extractors;
std::vector<mvlc::util::span<float>> outputSpans;
};
struct EventInfo
{
std::string name;
std::vector<ModuleInfo> modules;
// concatentation of all module output arrays
std::vector<mana_offset_array_t *> outputArrays;
// total size in bytes for this event counted from the address of the first output array
size_t sizeBytes = 0;
};
struct RunInfo
{
std::string name;
std::vector<EventInfo> events;
}; };
struct ModuleDataStage struct ModuleDataStage
{ {
mana::Arena arena; mana::Arena arena;
mvlc::CrateConfig crateConfig;
std::vector<std::vector<nlohmann::json>> moduleInfo;
RunInfo runInfo;
nlohmann::json runDescriptor; nlohmann::json runDescriptor;
// eventindex -> list of pointers to output offset arrays ManaPlugin *sink = nullptr;
std::vector<std::vector<mana_offset_array_t *>> eventArrayPointers;
// eventIndex, moduleIndex -> list of filters attached to the module.
std::vector<std::vector<std::vector<BitFilterExtractor>>> dataSources;
mana_sink_t sink = {};
void *sinkContext = nullptr; void *sinkContext = nullptr;
}; };
inline ModuleDataStage make_module_data_stage(const std::string &runName, // returns: eventIndex -> moduleIndex -> module info json
// currently only matches the module name against the module type name from the database.
// TODO: implement match by stack group meta info
inline std::vector<std::vector<nlohmann::json>>
auto_match_modules(const mvlc::CrateConfig &crateConfig, const nlohmann::json &moduleDb)
{
std::vector<std::vector<nlohmann::json>> result;
for (const auto &event: crateConfig.stacks)
{
std::vector<nlohmann::json> eventJModules;
for (const auto &module_: event.getGroups())
{
nlohmann::json jModule;
for (const auto &jModule_: moduleDb["modules"])
{
if (module_.name.find(jModule_["type_name"]) != std::string::npos)
{
jModule = jModule_;
jModule["name"] = fmt::format("{}.{}", event.getName(), module_.name);
for (auto &jSource: jModule["data_sources"])
jSource["name"] =
fmt::format("{}.{}.{}", event.getName(), module_.name, jSource["name"]);
break;
}
}
eventJModules.emplace_back(jModule);
}
result.emplace_back(eventJModules);
}
return result;
}
// Does not allocate output memory, only creates and fills the structure.
RunInfo make_run_info(const std::string &name, const mvlc::CrateConfig &crateConfig,
const std::vector<std::vector<nlohmann::json>> &jModuleInfo)
{
RunInfo result;
result.name = name;
for (size_t ei = 0; ei < jModuleInfo.size(); ++ei)
{
EventInfo eventInfo;
eventInfo.name = crateConfig.stacks.at(ei).getName();
for (size_t mi = 0; mi < jModuleInfo[ei].size(); ++mi)
{
ModuleInfo moduleInfo;
moduleInfo.name = crateConfig.stacks.at(ei).getGroup(mi).name;
moduleInfo.jModule = jModuleInfo[ei][mi];
if (moduleInfo.jModule.contains("data_sources"))
{
for (const auto &jSource: moduleInfo.jModule["data_sources"])
{
moduleInfo.outputDescriptors.emplace_back(
make_array_descriptor(jSource["name"], jSource["filter"]));
BitFilterExtractor extractor;
extractor.filter = mvlc::util::make_filter(jSource["filter"]);
extractor.fAddress = mvlc::util::make_cache_entry(extractor.filter, 'A');
extractor.fValue = mvlc::util::make_cache_entry(extractor.filter, 'D');
moduleInfo.extractors.emplace_back(extractor);
}
}
eventInfo.modules.emplace_back(moduleInfo);
}
result.events.emplace_back(eventInfo);
}
return result;
}
// Allocates output memory in the arena, fills in module and event output array pointers and updates
// event sizes.
// Arena layout:
// for each event: mana_offset_array_t[outputCount]<pad>[array00]<pad>[array01]...[array0N]
inline void allocate_outputs(mana::Arena &arena, EventInfo &eventInfo)
{
const size_t outputCount = std::accumulate(
std::begin(eventInfo.modules), std::end(eventInfo.modules), static_cast<size_t>(0u),
[](size_t sum, const ModuleInfo &mi) { return sum + mi.extractors.size(); });
eventInfo.outputArrays.clear();
auto arrayPtr = arena.push_t<mana_offset_array_t>(outputCount);
for (size_t mi = 0; mi < eventInfo.modules.size(); ++mi)
{
auto &moduleInfo = eventInfo.modules.at(mi);
moduleInfo.outputSpans.clear();
for (const auto &extractor: moduleInfo.extractors)
{
size_t size = 1u << extractor.fAddress.extractBits;
auto ptr = push_typed_offset_array<float>(arena, *arrayPtr, size);
moduleInfo.outputSpans.push_back({ptr, size});
eventInfo.outputArrays.push_back(arrayPtr++);
}
}
eventInfo.sizeBytes = arena.cur_end() - reinterpret_cast<u8 *>(eventInfo.outputArrays.front());
}
inline void allocate_outputs(mana::Arena &arena, RunInfo &runInfo)
{
for (size_t ei = 0; ei < runInfo.events.size(); ++ei)
{
allocate_outputs(arena, runInfo.events.at(ei));
}
}
inline nlohmann::json make_run_descriptor(const RunInfo &runInfo)
{
auto jOutputs = nlohmann::json::array();
for (const auto &eventInfo: runInfo.events)
{
auto jEventOutputs = nlohmann::json::array();
for (const auto &moduleInfo: eventInfo.modules)
{
for (const auto &output: moduleInfo.outputDescriptors)
jEventOutputs.push_back(output);
}
nlohmann::json jEvent;
jEvent["name"] = eventInfo.name;
jEvent["outputs"] = jEventOutputs;
jEvent["size_bytes"] = eventInfo.sizeBytes;
jOutputs.emplace_back(jEvent);
}
nlohmann::json result;
result["name"] = runInfo.name;
result["events"] = jOutputs;
return result;
}
inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::Arena &&arena,
const mvlc::CrateConfig &crateConfig, const mvlc::CrateConfig &crateConfig,
nlohmann::json moduleDb, mana_sink_t sink, nlohmann::json moduleDb, ManaPlugin *sink,
void *sinkContext) void *sinkContext)
{ {
ModuleDataStage result; ModuleDataStage result;
result.arena = std::move(arena);
result.crateConfig = crateConfig;
result.sink = sink;
result.sinkContext = sinkContext;
result.moduleInfo = auto_match_modules(crateConfig, moduleDb);
result.runInfo = make_run_info(runName, crateConfig, result.moduleInfo);
allocate_outputs(result.arena, result.runInfo);
result.runDescriptor = make_run_descriptor(result.runInfo);
spdlog::info("ModuleDataStage: runDescriptor={}", result.runDescriptor.dump(2));
spdlog::info("ModuleDataStage: arena stats={}", arena_stats(result.arena));
return result; return result;
} }
inline void module_data_stage_begin_run(ModuleDataStage &ctx) inline void module_data_stage_begin_run(ModuleDataStage &ctx)
{ {
ctx.sink.begin_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str()); ctx.sink->begin_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str());
} }
inline void module_data_stage_end_run(ModuleDataStage &ctx) inline void module_data_stage_end_run(ModuleDataStage &ctx)
{ {
ctx.sink.end_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str()); ctx.sink->end_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str());
} }
inline void inline void
@ -53,11 +231,9 @@ module_data_stage_process_module_data(ModuleDataStage &ctx, int eventIndex,
const mvlc::readout_parser::ModuleData *moduleDataList, const mvlc::readout_parser::ModuleData *moduleDataList,
unsigned moduleCount) unsigned moduleCount)
{ {
auto extract_module_data = auto extract_module_data = [](const mvlc::readout_parser::DataBlock &data,
[](const mvlc::readout_parser::DataBlock &data, BitFilterExtractor &ex) BitFilterExtractor &ex, mvlc::util::span<float> dest)
{ {
auto dest = get_span<float>(*ex.dest);
std::fill(std::begin(dest), std::end(dest), mnode::make_quiet_nan()); std::fill(std::begin(dest), std::end(dest), mnode::make_quiet_nan());
for (const u32 *word = data.data, *end = data.data + data.size; word < end; ++word) for (const u32 *word = data.data, *end = data.data + data.size; word < end; ++word)
@ -72,21 +248,90 @@ module_data_stage_process_module_data(ModuleDataStage &ctx, int eventIndex,
} }
}; };
auto &eventSources = ctx.dataSources.at(eventIndex); auto &eventInfo = ctx.runInfo.events.at(eventIndex);
for (unsigned mi = 0; mi < moduleCount; ++mi) for (unsigned mi = 0; mi < moduleCount; ++mi)
{ {
auto &moduleData = moduleDataList[mi]; auto &moduleData = moduleDataList[mi];
auto dataBlock = moduleData.hasDynamic ? dynamic_span(moduleData) : prefix_span(moduleData); auto inputData = moduleData.hasDynamic ? dynamic_span(moduleData) : prefix_span(moduleData);
auto &extractors = eventInfo.modules.at(mi).extractors;
auto &spans = eventInfo.modules.at(mi).outputSpans;
const size_t outputCount = extractors.size();
for (auto &source: eventSources.at(mi)) for (size_t oi = 0; oi < outputCount; ++oi)
extract_module_data(dataBlock, source); extract_module_data(inputData, extractors.at(oi), spans.at(oi));
} }
// auto arrays = ctx.eventArrayPointers.at(eventIndex); ctx.sink->process_event(ctx.sinkContext, eventIndex, eventInfo.outputArrays.front(),
// ctx.sink.process_event(ctx.sinkContext, eventIndex, eventInfo.outputArrays.size(), eventInfo.sizeBytes);
} }
inline void module_data_stage_process_system_event(ModuleDataStage &ctx, const u32 *data, u32 size)
{
ctx.sink->process_system_event(ctx.sinkContext, data, size);
}
struct ManaCountingSink: public ManaPlugin
{
std::vector<size_t> eventCounts;
std::vector<std::vector<size_t>> eventArrayHits;
size_t totalBytes = 0;
size_t systemEventCount = 0;
size_t systemEventBytes = 0;
void reset()
{
eventCounts.clear();
eventArrayHits.clear();
totalBytes = 0;
}
MANA_DEFINE_PLUGIN_INIT(init) override { return nullptr; }
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) override { (void)context; }
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) override
{
(void)context;
auto jRun = nlohmann::json::parse(descriptor_json);
reset();
eventCounts.resize(jRun["events"].size());
eventArrayHits.resize(eventCounts.size());
}
MANA_DEFINE_PLUGIN_END_RUN(end_run) override
{
(void)context;
(void)descriptor_json;
}
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) override
{
(void)context;
eventCounts.resize(std::max(eventCounts.size(), static_cast<size_t>(eventIndex + 1)));
eventArrayHits.resize(eventCounts.size());
eventArrayHits[eventIndex].resize(std::max(eventArrayHits[eventIndex].size(), arrayCount));
++eventCounts[eventIndex];
for (size_t ai = 0; ai < arrayCount; ++ai)
{
auto input = get_span<float>(arrays[ai]);
bool hit = std::any_of(std::begin(input), std::end(input),
[](float v) { return !std::isnan(v); });
if (hit)
++eventArrayHits[eventIndex][ai];
}
this->totalBytes += totalBytes;
}
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) override
{
(void)context;
(void)data;
++systemEventCount;
systemEventBytes += size * sizeof(u32);
}
};
} // namespace mesytec::mnode::mana } // namespace mesytec::mnode::mana
#endif /* B63F110F_BB53_46E7_AA8E_FF6BE10CAB40 */ #endif /* B63F110F_BB53_46E7_AA8E_FF6BE10CAB40 */

View file

@ -6,8 +6,8 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" extern "C"
#endif
{ {
#endif
typedef enum typedef enum
{ {
@ -33,47 +33,44 @@ extern "C"
size_t size_bytes; size_t size_bytes;
} mana_offset_array_t; } mana_offset_array_t;
#define MANA_DEFINE_PLUGIN_INIT(name) \ #define MANA_DEFINE_PLUGIN_INIT(name) void *name()
void *name()
#define MANA_DEFINE_PLUGIN_SHUTDOWN(name) \ #define MANA_DEFINE_PLUGIN_SHUTDOWN(name) void name(void *context)
void name(void *context)
#define MANA_DEFINE_PLUGIN_BEGIN_RUN(name) \ #define MANA_DEFINE_PLUGIN_BEGIN_RUN(name) void name(void *context, const char *descriptor_json)
void name(void *context, const char *descriptor_json)
#define MANA_DEFINE_PLUGIN_END_RUN(name) \ #define MANA_DEFINE_PLUGIN_END_RUN(name) void name(void *context, const char *descriptor_json)
void name(void *context, const char *descriptor_json)
#define MANA_DEFINE_PLUGIN_EVENT_DATA(name) \ #define MANA_DEFINE_PLUGIN_EVENT_DATA(name) \
void name(void *context, uint16_t eventIndex, const mana_offset_array_t *arrays, size_t arrayCount, size_t totalBytes) void name(void *context, uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount, \
size_t totalBytes)
#define MANA_DEFINE_PLUGIN_SYSTEM_EVENT(name) \ #define MANA_DEFINE_PLUGIN_SYSTEM_EVENT(name) \
void name(void *context, const uint32_t *data, size_t size) void name(void *context, const uint32_t *data, size_t size)
typedef MANA_DEFINE_PLUGIN_INIT(mana_init_t); typedef MANA_DEFINE_PLUGIN_INIT(mana_init_t);
typedef MANA_DEFINE_PLUGIN_SHUTDOWN(mana_shutdown_t); typedef MANA_DEFINE_PLUGIN_SHUTDOWN(mana_shutdown_t);
typedef MANA_DEFINE_PLUGIN_BEGIN_RUN(mana_begin_run_t); typedef MANA_DEFINE_PLUGIN_BEGIN_RUN(mana_begin_run_t);
typedef MANA_DEFINE_PLUGIN_END_RUN(mana_end_run_t); typedef MANA_DEFINE_PLUGIN_END_RUN(mana_end_run_t);
typedef MANA_DEFINE_PLUGIN_EVENT_DATA(mana_process_event_t); typedef MANA_DEFINE_PLUGIN_EVENT_DATA(mana_process_event_t);
typedef MANA_DEFINE_PLUGIN_SYSTEM_EVENT(mana_process_system_event_t); typedef MANA_DEFINE_PLUGIN_SYSTEM_EVENT(mana_process_system_event_t);
typedef struct typedef struct
{ {
mana_init_t *init; mana_init_t *init;
mana_shutdown_t *shutdown; mana_shutdown_t *shutdown;
mana_begin_run_t *begin_run; mana_begin_run_t *begin_run;
mana_end_run_t *end_run; mana_end_run_t *end_run;
mana_process_event_t *process_event; mana_process_event_t *process_event;
mana_process_system_event_t *process_system_event; mana_process_system_event_t *process_system_event;
} mana_sink_t; } mana_plugin_t;
// event data serialization format: // plugins need to define this function with the name 'mana_get_plugin'
// u32 event count #define MANA_DEFINE_GET_PLUGIN(name) mana_plugin_t name()
// u32 total size in bytes typedef MANA_DEFINE_GET_PLUGIN(mana_get_plugin_t);
// u16 eventIndex, u16 array descriptor count
// array descriptors #ifdef __cplusplus
// data arrays
} }
#endif
#endif /* A51A04C1_ABD6_4DE9_B16A_49A9DA46C67E */ #endif /* A51A04C1_ABD6_4DE9_B16A_49A9DA46C67E */

View file

@ -5,6 +5,7 @@
#include <cstring> #include <cstring>
#include <list> #include <list>
#include <mesytec-mnode/mnode_math.h> #include <mesytec-mnode/mnode_math.h>
#include <mesytec-mvlc/util/fmt.h>
#include <numeric> #include <numeric>
namespace mesytec::mnode::mana namespace mesytec::mnode::mana
@ -20,8 +21,8 @@ class Arena
Arena() = default; Arena() = default;
~Arena() = default; ~Arena() = default;
Arena(Arena &&other); Arena(Arena &&other) = default;
Arena &operator=(Arena &&other); Arena &operator=(Arena &&other) = default;
// pushes at least the required amount of memory onto the arena. // pushes at least the required amount of memory onto the arena.
// 'required' is rounded up to the nearest multiple of 'default_pad' to make // 'required' is rounded up to the nearest multiple of 'default_pad' to make
@ -127,6 +128,12 @@ class Arena
size_t pad_waste_ = 0; size_t pad_waste_ = 0;
}; };
inline std::string arena_stats(const Arena &arena)
{
return fmt::format("allocations={}, capacity={}, pad_waste={}, used_size={}",
arena.allocations(), arena.capacity(), arena.pad_waste(), arena.used());
}
} // namespace mesytec::mnode::mana } // namespace mesytec::mnode::mana
#endif /* F20CF38F_7327_4608_8307_6AE058041CD5 */ #endif /* F20CF38F_7327_4608_8307_6AE058041CD5 */

View file

@ -68,11 +68,34 @@ inline void set(mana_offset_ptr_t &ptr, char *p) { set(ptr, mana_sint8, p); }
inline void set(mana_offset_ptr_t &ptr, float *p) { set(ptr, mana_float, p); } inline void set(mana_offset_ptr_t &ptr, float *p) { set(ptr, mana_float, p); }
inline void set(mana_offset_ptr_t &ptr, double *p) { set(ptr, mana_double, p); } inline void set(mana_offset_ptr_t &ptr, double *p) { set(ptr, mana_double, p); }
template <typename T>
T *push_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size)
{
T *ptr = arena.push_t<T>(size);
mana::set(dest.ptr, mana_custom, ptr);
dest.size_bytes = size * sizeof(T);
return ptr;
}
template <typename T>
T *push_typed_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size)
{
T *ptr = arena.push_t<T>(size);
mana::set(dest.ptr, ptr);
dest.size_bytes = size * sizeof(T);
return ptr;
}
template <typename T> size_t element_count(mana_offset_array_t &array)
{
return array.size_bytes / sizeof(T);
}
template <typename T> mvlc::util::span<T> get_span(mana_offset_array_t &array) template <typename T> mvlc::util::span<T> get_span(mana_offset_array_t &array)
{ {
auto ptr = reinterpret_cast<T *>(get(array.ptr)); auto ptr = reinterpret_cast<T *>(get(array.ptr));
auto size = array.size_bytes / sizeof(T); auto size = element_count<T>(array);
return { ptr, size }; return {ptr, size};
} }
inline nlohmann::json make_array_descriptor(const std::string &name, mana_data_type_t data_type, inline nlohmann::json make_array_descriptor(const std::string &name, mana_data_type_t data_type,
@ -102,6 +125,60 @@ inline nlohmann::json make_array_descriptor(const std::string &name, const std::
return make_array_descriptor(name, mana_float, size, bits); return make_array_descriptor(name, mana_float, size, bits);
} }
struct ManaPlugin
{
virtual ~ManaPlugin() = default;
virtual MANA_DEFINE_PLUGIN_INIT(init) = 0;
virtual MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) = 0;
virtual MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) = 0;
virtual MANA_DEFINE_PLUGIN_END_RUN(end_run) = 0;
virtual MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) = 0;
virtual MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) = 0;
};
struct ManaCPlugin: public ManaPlugin
{
mana_plugin_t plugin_;
void *context_ = nullptr;
ManaCPlugin(mana_plugin_t plugin)
: plugin_(plugin)
{
}
MANA_DEFINE_PLUGIN_INIT(init) override { return context_ = plugin_.init(); }
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) override
{
(void)context;
plugin_.shutdown(context_);
}
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) override
{
(void)context;
plugin_.begin_run(context_, descriptor_json);
}
MANA_DEFINE_PLUGIN_END_RUN(end_run) override
{
(void)context;
plugin_.end_run(context_, descriptor_json);
}
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) override
{
(void)context;
plugin_.process_event(context_, eventIndex, arrays, arrayCount, totalBytes);
}
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) override
{
(void)context;
plugin_.process_system_event(context_, data, size);
}
};
} // namespace mesytec::mnode::mana } // namespace mesytec::mnode::mana
#endif /* AAB5E4D2_A05B_4F2F_B76A_406A5A569D55 */ #endif /* AAB5E4D2_A05B_4F2F_B76A_406A5A569D55 */

36
src/mana_plugin_c_test.c Normal file
View file

@ -0,0 +1,36 @@
#include "internal/mana_api.h"
#include <string.h>
struct Context
{
};
struct Context ctx_;
MANA_DEFINE_PLUGIN_INIT(init)
{
memset(&ctx_, 0, sizeof(ctx_));
return &ctx_;
}
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) {}
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) {}
MANA_DEFINE_PLUGIN_END_RUN(end_run) {}
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) {}
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) {}
MANA_DEFINE_GET_PLUGIN(mana_get_plugin)
{
mana_plugin_t plugin;
plugin.init = init;
plugin.shutdown = shutdown;
plugin.begin_run = begin_run;
plugin.end_run = end_run;
plugin.process_event = process_event;
plugin.process_system_event = process_system_event;
return plugin;
}

View file

@ -22,7 +22,7 @@ add_mnode_dev_executable(mesy_nng_push_pull_main)
add_mnode_dev_executable(mesy_nng_pub_producer) add_mnode_dev_executable(mesy_nng_pub_producer)
add_mnode_dev_executable(mesy_nng_sub_consumer) add_mnode_dev_executable(mesy_nng_sub_consumer)
find_package(Boost COMPONENTS filesystem system) find_package(Boost CONFIG COMPONENTS filesystem system)
if (Boost_FOUND) if (Boost_FOUND)
add_mnode_dev_executable(mana_auto_replay) add_mnode_dev_executable(mana_auto_replay)
target_link_libraries(mana_auto_replay PRIVATE nlohmann_json::nlohmann_json mnode::resources Boost::filesystem Boost::system) target_link_libraries(mana_auto_replay PRIVATE nlohmann_json::nlohmann_json mnode::resources Boost::filesystem Boost::system)

View file

@ -40,6 +40,7 @@
#include <sstream> #include <sstream>
#include "internal/mana_arena.h" #include "internal/mana_arena.h"
#include "internal/mana_lib.hpp" #include "internal/mana_lib.hpp"
#include "internal/mana_analysis.h"
CMRC_DECLARE(mnode::resources); CMRC_DECLARE(mnode::resources);
@ -132,313 +133,8 @@ std::optional<ParserContext> make_parser_context(const mvlc::CrateConfig &crateC
} }
} }
MANA_DEFINE_PLUGIN_INIT(mana_default_init) { return nullptr; }
MANA_DEFINE_PLUGIN_SHUTDOWN(mana_default_shutdown) {}
MANA_DEFINE_PLUGIN_BEGIN_RUN(mana_default_begin_run) {}
MANA_DEFINE_PLUGIN_END_RUN(mana_default_end_run) {}
MANA_DEFINE_PLUGIN_EVENT_DATA(mana_default_process_event) {}
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(mana_default_system_event) {}
static mana_sink_t DefaultPlugin = {.init = mana_default_init,
.shutdown = mana_default_shutdown,
.begin_run = mana_default_begin_run,
.end_run = mana_default_end_run,
.process_event = mana_default_process_event};
struct DataSource
{
mvlc::util::DataFilter filter;
mvlc::util::CacheEntry fAddress;
mvlc::util::CacheEntry fValue;
mana_offset_array_t *dest;
};
struct AnalysisContext
{
std::unique_ptr<mana::Arena> arena;
nlohmann::json runDescriptor;
std::vector<mvlc::util::span<mana_offset_array_t *>> eventArrays;
std::vector<std::vector<std::vector<DataSource>>>
dataSources; // eventIndex, moduleIndex, sourceIndex
mana_sink_t outputPlugin = DefaultPlugin;
void *outputPluginContext = nullptr;
size_t runDescriptorBytes = 0;
size_t eventStoragesBytes = 0;
};
std::map<std::string, nlohmann::json>
make_module_info_by_type(const nlohmann::json &jModuleDataSources)
{
// module type name -> module info json
std::map<std::string, nlohmann::json> moduleInfoByType;
for (const auto &mod_: jModuleDataSources["modules"])
moduleInfoByType[mod_["type_name"]] = mod_;
return moduleInfoByType;
}
std::vector<std::vector<nlohmann::json>>
match_modules(const mvlc::CrateConfig &crateConfig,
const std::map<std::string, nlohmann::json> &moduleInfoByType)
{
std::vector<std::vector<nlohmann::json>> moduleDataSources;
for (const auto &event: crateConfig.stacks)
{
std::vector<nlohmann::json> eventModuleDataSources;
for (const auto &module_: event.getGroups())
{
nlohmann::json jModule;
// match the meta data module type name against the concrete module name
for (const auto &[type_, info]: moduleInfoByType)
{
if (module_.name.find(type_) != std::string::npos)
{
spdlog::info("match: type={}, name={}", type_, module_.name);
jModule = info;
break;
}
}
if (jModule.empty())
{
spdlog::warn("No module info found for module name '{}'", module_.name);
}
eventModuleDataSources.emplace_back(jModule);
}
moduleDataSources.emplace_back(eventModuleDataSources);
}
return moduleDataSources;
}
struct DataSourceInfo
{
const std::string name;
const std::string filterString;
mvlc::util::FilterWithCaches filter;
DataSourceInfo(const std::string &name_, const std::string &filterString_)
: name(name_)
, filterString(filterString_)
, filter(mvlc::util::make_filter_with_caches(filterString))
{
}
};
std::vector<std::vector<DataSourceInfo>>
make_event_ds_info(const mvlc::CrateConfig &crateConfig,
const std::vector<std::vector<nlohmann::json>> &moduleDataSources)
{
std::vector<std::vector<DataSourceInfo>> eventDsInfo;
for (size_t eventIndex = 0; eventIndex < crateConfig.stacks.size(); ++eventIndex)
{
const auto &dataSources = moduleDataSources.at(eventIndex);
const auto &event = crateConfig.stacks.at(eventIndex);
const auto &readouts = event.getGroups();
std::vector<DataSourceInfo> dsInfo;
for (size_t moduleIndex = 0; moduleIndex < event.getGroups().size(); ++moduleIndex)
{
const auto &readout = readouts.at(moduleIndex);
const auto &ds = dataSources.at(moduleIndex);
spdlog::info("readout.name={}, ds={}", readout.name, ds.dump());
if (ds.contains("data_sources"))
{
for (const auto &filter: ds["data_sources"])
{
auto name =
fmt::format("{}.{}.{}", event.getName(), readout.name, filter["name"]);
dsInfo.emplace_back(DataSourceInfo(name, filter["filter"]));
}
}
}
eventDsInfo.emplace_back(dsInfo);
}
return eventDsInfo;
}
template <typename T>
T *push_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size)
{
T *ptr = arena.push_t<T>(size);
mana::set(dest.ptr, mana_custom, ptr);
dest.size_bytes = size * sizeof(T);
return ptr;
}
template <typename T>
T *push_typed_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size)
{
T *ptr = arena.push_t<T>(size);
mana::set(dest.ptr, ptr);
dest.size_bytes = size * sizeof(T);
return ptr;
}
template <typename T> size_t element_count(mana_offset_array_t &array)
{
return array.size_bytes / sizeof(T);
}
template <typename T> mvlc::util::span<T> get_span(mana_offset_array_t &array)
{
auto ptr = reinterpret_cast<T *>(mana::get(array.ptr));
auto size = element_count<T>(array);
return {ptr, size};
}
#if 0
std::pair<mana_run_descriptor_t *, size_t>
make_run_descriptor(mana::Arena &arena, const std::string &runName,
const std::vector<std::vector<DataSourceInfo>> &eventDsInfo)
{
auto rd = arena.push_t<mana_run_descriptor_t>();
mana::set(rd->name, arena.push_cstr(runName));
auto eds = push_offset_array<mana_event_descriptor_t>(arena, rd->events, eventDsInfo.size());
rd->event_count = eventDsInfo.size();
for (const auto &eventDataSources: eventDsInfo)
{
auto ed = eds++;
auto ads = push_offset_array<mana_array_descriptor_t>(arena, ed->data_arrays,
eventDataSources.size());
ed->data_array_count = eventDataSources.size();
for (const auto &ds: eventDataSources)
{
auto ad = ads++;
mana::set(ad->name, arena.push_cstr(ds.name));
ad->data_type = mana_float;
ad->size = 1;
if (auto c = get_cache_entry(ds.filter, 'A'); c)
ad->size = 1u << c->extractBits;
if (auto c = get_cache_entry(ds.filter, 'D'); c)
ad->bits = c->extractBits;
}
}
return {rd, arena.cur_end() - reinterpret_cast<u8 *>(rd)};
}
std::pair<mvlc::util::span<mana_event_data_t>, size_t>
make_event_storages(mana::Arena &arena, const std::vector<std::vector<DataSourceInfo>> &eventDsInfo)
{
auto eds = arena.push_t<mana_event_data_t>(eventDsInfo.size());
for (size_t eventIndex = 0; eventIndex < eventDsInfo.size(); ++eventIndex)
{
auto &ed = eds[eventIndex];
auto &dsInfo = eventDsInfo[eventIndex];
auto das = push_offset_array<mana_offset_array_t>(arena, ed.data_arrays, dsInfo.size());
for (const auto &ds: dsInfo)
{
auto da = das++;
size_t size = 1;
if (auto c = get_cache_entry(ds.filter, 'A'); c)
size = 1u << c->extractBits;
push_typed_offset_array<float>(arena, *da, size);
}
}
return {{eds, eventDsInfo.size()}, arena.cur_end() - reinterpret_cast<u8 *>(eds)};
}
void dump(mana_run_descriptor_t *rd)
{
auto eds = get_span<mana_event_descriptor_t>(rd->events);
spdlog::info("mana_run_descriptor @ {}, name={}, event_count={}", fmt::ptr(rd),
mana::get<char>(rd->name), eds.size());
for (size_t eventIndex = 0; eventIndex < rd->event_count; ++eventIndex)
{
auto &ed = eds[eventIndex];
spdlog::info(" event[{}]:", eventIndex);
auto ads = get_span<mana_array_descriptor_t>(ed.data_arrays);
for (auto &ad: ads)
{
spdlog::info(" array: name={}, data_type={}, size={}, bits={}",
mana::get<char>(ad.name), static_cast<int>(ad.data_type), ad.size,
ad.bits);
}
}
}
#endif
std::optional<AnalysisContext> make_mana(const std::string runName,
const mvlc::CrateConfig &crateConfig,
const nlohmann::json &jModuleDataSources)
{
try
{
// module type name -> module info json
auto moduleInfoByType = make_module_info_by_type(jModuleDataSources);
AnalysisContext ctx;
ctx.arena = std::make_unique<mana::Arena>();
// auto arena = ctx.arena.get();
// eventIndex -> moduleIndex -> module info json
auto moduleDataSources = match_modules(crateConfig, moduleInfoByType);
#if 0
auto eventDsInfo = make_event_ds_info(crateConfig, moduleDataSources);
auto arena = ctx.arena.get();
auto [runDescriptor, runDescriptorSize] = make_run_descriptor(*arena, runName, eventDsInfo);
spdlog::info(
"runDescriptor @ {}, size={}, allocations={}, capacity={}, pad_waste={}, used_size={}",
fmt::ptr(runDescriptor), runDescriptorSize, arena->allocations(), arena->capacity(),
arena->pad_waste(), arena->used());
if (runDescriptor)
dump(runDescriptor);
auto [eventStorages, eventStoragesSize] = make_event_storages(*arena, eventDsInfo);
spdlog::info(
"eventStorages @ {}, size={}, allocations={}, capacity={}, pad_waste={}, used_size={}",
fmt::ptr(&eventStorages.front()), eventStoragesSize, arena->allocations(),
arena->capacity(), arena->pad_waste(), arena->used());
for (size_t eventIndex = 0; eventIndex < eventStorages.size(); ++eventIndex)
{
auto &es = eventStorages[eventIndex];
auto arrays = get_span<mana_offset_array_t>(es.data_arrays);
spdlog::info("eventStorage: event={}, arrays={}", eventIndex, arrays.size());
for (auto &array: arrays)
{
spdlog::info(" array @ {}, size_bytes={}, size={}",
fmt::ptr(mana::get<float>(array.ptr)), array.size_bytes,
element_count<float>(array));
}
}
ctx.runDescriptor = runDescriptor;
ctx.eventStorages = eventStorages;
ctx.runDescriptorBytes = runDescriptorSize;
ctx.eventStoragesBytes = eventStoragesSize;
#endif
return ctx;
}
catch (const std::exception &e)
{
std::cerr << fmt::format("Error: {}\n", e.what());
return {};
}
}
size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext, size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
ParserContext &parserContext, AnalysisContext &analysisContext) ParserContext &parserContext, mana::ModuleDataStage &analysisContext)
{ {
listfileContext.readerHelper.destBuf().clear(); listfileContext.readerHelper.destBuf().clear();
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper); auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);
@ -473,23 +169,24 @@ int main(int argc, char *argv[])
if (!listfileContext) if (!listfileContext)
return 1; return 1;
auto analysisContext = make_mana(filename, listfileContext->crateConfig, jModuleDataSources); mana::ManaCountingSink manaPlugin;
auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig,
jModuleDataSources, &manaPlugin, nullptr);
if (!analysisContext) auto event_data = [](void *ctx_, int crateIndex, int eventIndex,
return 1;
auto event_data = [](void *ctx, int crateIndex, int eventIndex,
const mvlc::readout_parser::ModuleData *moduleDataList, const mvlc::readout_parser::ModuleData *moduleDataList,
unsigned moduleCount) unsigned moduleCount)
{ {
// reinterpret_cast<AnalysisContext *>(ctx)->process_event_data(crateIndex, eventIndex, (void)crateIndex;
// moduleDataList, auto ctx = reinterpret_cast<mana::ModuleDataStage *>(ctx_);
// moduleCount); mana::module_data_stage_process_module_data(*ctx, eventIndex, moduleDataList, moduleCount);
}; };
auto system_event = [](void *ctx, int crateIndex, const u32 *header, u32 size) auto system_event = [](void *ctx_, int crateIndex, const u32 *header, u32 size)
{ {
// reinterpret_cast<AnalysisContext *>(ctx)->process_system_event(crateIndex, header, size); (void)crateIndex;
auto ctx = reinterpret_cast<mana::ModuleDataStage *>(ctx_);
mana::module_data_stage_process_system_event(*ctx, header, size);
}; };
auto parserContext = auto parserContext =
@ -498,6 +195,9 @@ int main(int argc, char *argv[])
if (!parserContext) if (!parserContext)
return 1; return 1;
mana.sinkContext = mana.sink->init();
mana.sink->begin_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
size_t bufferNumber = 0; size_t bufferNumber = 0;
size_t totalBytesProcessed = 0; size_t totalBytesProcessed = 0;
size_t bytesProcessed = 0; size_t bytesProcessed = 0;
@ -518,8 +218,7 @@ int main(int argc, char *argv[])
do do
{ {
bytesProcessed = bytesProcessed = process_one_buffer(bufferNumber, *listfileContext, *parserContext, mana);
process_one_buffer(bufferNumber, *listfileContext, *parserContext, *analysisContext);
totalBytesProcessed += bytesProcessed; totalBytesProcessed += bytesProcessed;
++bufferNumber; ++bufferNumber;
@ -533,5 +232,23 @@ int main(int argc, char *argv[])
report(); report();
mana.sink->end_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
mana.sink->shutdown(mana.sinkContext);
spdlog::info(
"ManaCountingSink: eventCounts=[{}], totalBytes={}, systemEvents={}, systemEventBytes={}",
fmt::join(manaPlugin.eventCounts, ", "), manaPlugin.totalBytes, manaPlugin.systemEventCount,
manaPlugin.systemEventBytes);
for (size_t ei = 0; ei < manaPlugin.eventArrayHits.size(); ++ei)
{
spdlog::info("event[{}]: {} hits", ei, manaPlugin.eventCounts[ei]);
for (size_t ai = 0; ai < manaPlugin.eventArrayHits[ei].size(); ++ai)
{
auto name = mana.runDescriptor["events"][ei]["outputs"][ai]["name"];
spdlog::info(" {}: {}", name, manaPlugin.eventArrayHits[ei][ai]);
}
}
return 0; return 0;
} }