mesytec-mnode/src/tools/mana_auto_replay.cc

538 lines
18 KiB
C++
Raw Normal View History

// mana - mnode analysis / mini analysis
// usage: mana_auto_replay <zipfile>
//
// open the file
// read preamble
// create crateconfig from preamble data
//
// setup:
// for each event in the config:
// for each module:
// check for module type in meta data
// find module type in vme_module_data_sources.json
// for each filter in the modules data_sources:
// calculate array size (use float as the storage type for now)
// reserve buffer space for the array
//
// replay:
// for each event in the listfile:
// for each module in the event:
// locate the list of filters for the crate, event and module index triplet
// for each filter:
// match it against every word in the module data
// if a word matches extract address and value
// store value in the reserved array buffer
//
// -> extracted u32 data is stored in the buffer space for this event
// what now?
// - histogram
// - make arrays available to python and test numpy
// - crrate root histograms and accumulate
// - create root trees or the new rntuples(?)
// -> want plugins. similar to the mvme listfile_reader but on analysis data
#include <cmrc/cmrc.hpp> // mnode::resources
#include <mesytec-mvlc/mesytec-mvlc.h>
#include <mesytec-mnode/mnode_cpp_types.h>
#include <mesytec-mnode/mnode_math.h>
#include <nlohmann/json.hpp>
#include <list>
2024-12-25 03:19:29 +01:00
#include <sstream>
#include "internal/mana_arena.h"
2024-12-25 05:47:38 +01:00
#include "internal/mana_lib.hpp"
CMRC_DECLARE(mnode::resources);
using namespace mesytec;
using namespace mesytec::mnode;
struct ListfileContext
{
std::unique_ptr<mvlc::listfile::ZipReader> zipReader;
mvlc::listfile::ReadHandle *readHandle;
mvlc::listfile::ListfileReaderHelper readerHelper;
mvlc::CrateConfig crateConfig;
ListfileContext() = default;
ListfileContext(const ListfileContext &) = delete;
ListfileContext &operator=(const ListfileContext &) = delete;
ListfileContext(ListfileContext &&) = default;
ListfileContext &operator=(ListfileContext &&) = default;
};
std::optional<ListfileContext> make_listfile_context(const std::string &filename)
{
try
{
ListfileContext ctx;
ctx.zipReader = std::make_unique<mvlc::listfile::ZipReader>();
ctx.zipReader->openArchive(filename);
ctx.readHandle = ctx.zipReader->openEntry(ctx.zipReader->firstListfileEntryName());
ctx.readerHelper = mvlc::listfile::make_listfile_reader_helper(ctx.readHandle);
auto configData = ctx.readerHelper.preamble.findCrateConfig();
if (!configData)
{
std::cerr << fmt::format("No MVLC crate config found in {}\n", filename);
return {};
}
ctx.crateConfig = mvlc::crate_config_from_yaml(configData->contentsToString());
std::cout << fmt::format("Found MVLC crate config in {}\n", filename);
for (size_t i = 0; i < ctx.crateConfig.stacks.size(); ++i)
{
std::cout << fmt::format("event[{}] {}:\n", i, ctx.crateConfig.stacks[i].getName());
size_t mi = 0;
for (const auto &module_: ctx.crateConfig.stacks[i].getGroups())
{
auto meta = fmt::format("{}", fmt::join(module_.meta, ", "));
std::cout << fmt::format(" module[{}]: name={}, meta={{{}}}", mi++, module_.name,
meta);
if (!mvlc::produces_output(module_))
std::cout << ", (no output)";
std::cout << "\n";
}
}
return ctx;
}
catch (const std::exception &e)
{
std::cerr << fmt::format("Error: {}\n", e.what());
return {};
}
}
struct ParserContext
{
using Parser = mvlc::readout_parser::ReadoutParserState;
using Counters = mvlc::readout_parser::ReadoutParserCounters;
using Callbacks = mvlc::readout_parser::ReadoutParserCallbacks;
Parser parser;
Counters counters;
Callbacks callbacks;
mvlc::CrateConfig crateConfig;
};
std::optional<ParserContext> make_parser_context(const mvlc::CrateConfig &crateConfig,
ParserContext::Callbacks callbacks)
{
try
{
ParserContext ctx{};
ctx.parser = mvlc::readout_parser::make_readout_parser(crateConfig.stacks);
ctx.crateConfig = crateConfig;
ctx.callbacks = callbacks;
return ctx;
}
catch (const std::exception &e)
{
std::cerr << fmt::format("Error: {}\n", e.what());
return {};
}
}
2024-12-25 03:19:29 +01:00
MANA_DEFINE_PLUGIN_INIT(mana_default_init) { return nullptr; }
MANA_DEFINE_PLUGIN_SHUTDOWN(mana_default_shutdown) {}
MANA_DEFINE_PLUGIN_BEGIN_RUN(mana_default_begin_run) {}
MANA_DEFINE_PLUGIN_END_RUN(mana_default_end_run) {}
MANA_DEFINE_PLUGIN_EVENT_DATA(mana_default_process_event) {}
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(mana_default_system_event) {}
2024-12-25 05:47:38 +01:00
static mana_sink_t DefaultPlugin = {.init = mana_default_init,
.shutdown = mana_default_shutdown,
.begin_run = mana_default_begin_run,
.end_run = mana_default_end_run,
.process_event = mana_default_process_event};
2024-12-25 03:19:29 +01:00
struct DataSource
{
mvlc::util::DataFilter filter;
mvlc::util::CacheEntry fAddress;
mvlc::util::CacheEntry fValue;
2024-12-25 05:47:38 +01:00
mana_offset_array_t *dest;
2024-12-25 03:19:29 +01:00
};
struct AnalysisContext
{
std::unique_ptr<mana::Arena> arena;
2024-12-25 05:47:38 +01:00
nlohmann::json runDescriptor;
std::vector<mvlc::util::span<mana_offset_array_t *>> eventArrays;
std::vector<std::vector<std::vector<DataSource>>>
dataSources; // eventIndex, moduleIndex, sourceIndex
mana_sink_t outputPlugin = DefaultPlugin;
2024-12-25 03:19:29 +01:00
void *outputPluginContext = nullptr;
size_t runDescriptorBytes = 0;
size_t eventStoragesBytes = 0;
};
2024-12-25 03:19:29 +01:00
std::map<std::string, nlohmann::json>
make_module_info_by_type(const nlohmann::json &jModuleDataSources)
{
// module type name -> module info json
std::map<std::string, nlohmann::json> moduleInfoByType;
for (const auto &mod_: jModuleDataSources["modules"])
moduleInfoByType[mod_["type_name"]] = mod_;
return moduleInfoByType;
}
std::vector<std::vector<nlohmann::json>>
match_modules(const mvlc::CrateConfig &crateConfig,
const std::map<std::string, nlohmann::json> &moduleInfoByType)
{
std::vector<std::vector<nlohmann::json>> moduleDataSources;
for (const auto &event: crateConfig.stacks)
{
std::vector<nlohmann::json> eventModuleDataSources;
for (const auto &module_: event.getGroups())
{
nlohmann::json jModule;
// match the meta data module type name against the concrete module name
for (const auto &[type_, info]: moduleInfoByType)
{
if (module_.name.find(type_) != std::string::npos)
{
spdlog::info("match: type={}, name={}", type_, module_.name);
jModule = info;
break;
}
}
if (jModule.empty())
{
spdlog::warn("No module info found for module name '{}'", module_.name);
}
eventModuleDataSources.emplace_back(jModule);
}
moduleDataSources.emplace_back(eventModuleDataSources);
}
return moduleDataSources;
}
struct DataSourceInfo
{
const std::string name;
const std::string filterString;
mvlc::util::FilterWithCaches filter;
DataSourceInfo(const std::string &name_, const std::string &filterString_)
: name(name_)
, filterString(filterString_)
, filter(mvlc::util::make_filter_with_caches(filterString))
{
}
};
std::vector<std::vector<DataSourceInfo>>
make_event_ds_info(const mvlc::CrateConfig &crateConfig,
const std::vector<std::vector<nlohmann::json>> &moduleDataSources)
{
std::vector<std::vector<DataSourceInfo>> eventDsInfo;
for (size_t eventIndex = 0; eventIndex < crateConfig.stacks.size(); ++eventIndex)
{
const auto &dataSources = moduleDataSources.at(eventIndex);
const auto &event = crateConfig.stacks.at(eventIndex);
const auto &readouts = event.getGroups();
std::vector<DataSourceInfo> dsInfo;
for (size_t moduleIndex = 0; moduleIndex < event.getGroups().size(); ++moduleIndex)
{
const auto &readout = readouts.at(moduleIndex);
const auto &ds = dataSources.at(moduleIndex);
spdlog::info("readout.name={}, ds={}", readout.name, ds.dump());
if (ds.contains("data_sources"))
{
for (const auto &filter: ds["data_sources"])
{
auto name =
fmt::format("{}.{}.{}", event.getName(), readout.name, filter["name"]);
dsInfo.emplace_back(DataSourceInfo(name, filter["filter"]));
}
}
}
eventDsInfo.emplace_back(dsInfo);
}
return eventDsInfo;
}
template <typename T>
T *push_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size)
{
T *ptr = arena.push_t<T>(size);
mana::set(dest.ptr, mana_custom, ptr);
dest.size_bytes = size * sizeof(T);
return ptr;
}
template <typename T>
T *push_typed_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size)
{
T *ptr = arena.push_t<T>(size);
mana::set(dest.ptr, ptr);
dest.size_bytes = size * sizeof(T);
return ptr;
}
template <typename T> size_t element_count(mana_offset_array_t &array)
{
return array.size_bytes / sizeof(T);
}
template <typename T> mvlc::util::span<T> get_span(mana_offset_array_t &array)
{
auto ptr = reinterpret_cast<T *>(mana::get(array.ptr));
auto size = element_count<T>(array);
return {ptr, size};
}
2024-12-25 05:47:38 +01:00
#if 0
2024-12-25 03:19:29 +01:00
std::pair<mana_run_descriptor_t *, size_t>
make_run_descriptor(mana::Arena &arena, const std::string &runName,
const std::vector<std::vector<DataSourceInfo>> &eventDsInfo)
{
auto rd = arena.push_t<mana_run_descriptor_t>();
mana::set(rd->name, arena.push_cstr(runName));
auto eds = push_offset_array<mana_event_descriptor_t>(arena, rd->events, eventDsInfo.size());
rd->event_count = eventDsInfo.size();
for (const auto &eventDataSources: eventDsInfo)
{
auto ed = eds++;
auto ads = push_offset_array<mana_array_descriptor_t>(arena, ed->data_arrays,
eventDataSources.size());
ed->data_array_count = eventDataSources.size();
for (const auto &ds: eventDataSources)
{
auto ad = ads++;
mana::set(ad->name, arena.push_cstr(ds.name));
ad->data_type = mana_float;
ad->size = 1;
if (auto c = get_cache_entry(ds.filter, 'A'); c)
ad->size = 1u << c->extractBits;
if (auto c = get_cache_entry(ds.filter, 'D'); c)
ad->bits = c->extractBits;
}
}
return {rd, arena.cur_end() - reinterpret_cast<u8 *>(rd)};
}
std::pair<mvlc::util::span<mana_event_data_t>, size_t>
make_event_storages(mana::Arena &arena, const std::vector<std::vector<DataSourceInfo>> &eventDsInfo)
{
auto eds = arena.push_t<mana_event_data_t>(eventDsInfo.size());
for (size_t eventIndex = 0; eventIndex < eventDsInfo.size(); ++eventIndex)
{
auto &ed = eds[eventIndex];
auto &dsInfo = eventDsInfo[eventIndex];
auto das = push_offset_array<mana_offset_array_t>(arena, ed.data_arrays, dsInfo.size());
for (const auto &ds: dsInfo)
{
auto da = das++;
size_t size = 1;
if (auto c = get_cache_entry(ds.filter, 'A'); c)
size = 1u << c->extractBits;
push_typed_offset_array<float>(arena, *da, size);
}
}
return {{eds, eventDsInfo.size()}, arena.cur_end() - reinterpret_cast<u8 *>(eds)};
}
void dump(mana_run_descriptor_t *rd)
{
auto eds = get_span<mana_event_descriptor_t>(rd->events);
spdlog::info("mana_run_descriptor @ {}, name={}, event_count={}", fmt::ptr(rd),
mana::get<char>(rd->name), eds.size());
for (size_t eventIndex = 0; eventIndex < rd->event_count; ++eventIndex)
{
auto &ed = eds[eventIndex];
spdlog::info(" event[{}]:", eventIndex);
auto ads = get_span<mana_array_descriptor_t>(ed.data_arrays);
for (auto &ad: ads)
{
spdlog::info(" array: name={}, data_type={}, size={}, bits={}",
mana::get<char>(ad.name), static_cast<int>(ad.data_type), ad.size,
ad.bits);
}
}
}
2024-12-25 05:47:38 +01:00
#endif
2024-12-25 03:19:29 +01:00
std::optional<AnalysisContext> make_mana(const std::string runName,
const mvlc::CrateConfig &crateConfig,
const nlohmann::json &jModuleDataSources)
{
try
{
2024-12-25 03:19:29 +01:00
// module type name -> module info json
auto moduleInfoByType = make_module_info_by_type(jModuleDataSources);
AnalysisContext ctx;
ctx.arena = std::make_unique<mana::Arena>();
2024-12-25 03:19:29 +01:00
// auto arena = ctx.arena.get();
// eventIndex -> moduleIndex -> module info json
auto moduleDataSources = match_modules(crateConfig, moduleInfoByType);
2024-12-25 05:47:38 +01:00
#if 0
2024-12-25 03:19:29 +01:00
auto eventDsInfo = make_event_ds_info(crateConfig, moduleDataSources);
auto arena = ctx.arena.get();
2024-12-25 03:19:29 +01:00
auto [runDescriptor, runDescriptorSize] = make_run_descriptor(*arena, runName, eventDsInfo);
spdlog::info(
"runDescriptor @ {}, size={}, allocations={}, capacity={}, pad_waste={}, used_size={}",
fmt::ptr(runDescriptor), runDescriptorSize, arena->allocations(), arena->capacity(),
arena->pad_waste(), arena->used());
if (runDescriptor)
dump(runDescriptor);
auto [eventStorages, eventStoragesSize] = make_event_storages(*arena, eventDsInfo);
spdlog::info(
"eventStorages @ {}, size={}, allocations={}, capacity={}, pad_waste={}, used_size={}",
fmt::ptr(&eventStorages.front()), eventStoragesSize, arena->allocations(),
arena->capacity(), arena->pad_waste(), arena->used());
for (size_t eventIndex = 0; eventIndex < eventStorages.size(); ++eventIndex)
{
auto &es = eventStorages[eventIndex];
auto arrays = get_span<mana_offset_array_t>(es.data_arrays);
spdlog::info("eventStorage: event={}, arrays={}", eventIndex, arrays.size());
for (auto &array: arrays)
{
spdlog::info(" array @ {}, size_bytes={}, size={}",
fmt::ptr(mana::get<float>(array.ptr)), array.size_bytes,
element_count<float>(array));
}
}
ctx.runDescriptor = runDescriptor;
ctx.eventStorages = eventStorages;
ctx.runDescriptorBytes = runDescriptorSize;
ctx.eventStoragesBytes = eventStoragesSize;
2024-12-25 05:47:38 +01:00
#endif
2024-12-25 03:19:29 +01:00
return ctx;
}
catch (const std::exception &e)
{
std::cerr << fmt::format("Error: {}\n", e.what());
return {};
}
}
size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
ParserContext &parserContext, AnalysisContext &analysisContext)
{
listfileContext.readerHelper.destBuf().clear();
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);
if (!buffer->used())
return 0;
auto bufferView = buffer->viewU32();
parserContext.parser.userContext = &analysisContext;
mvlc::readout_parser::parse_readout_buffer(
listfileContext.readerHelper.bufferFormat, parserContext.parser, parserContext.callbacks,
parserContext.counters, bufferNumber, bufferView.data(), bufferView.size());
return buffer->used();
}
int main(int argc, char *argv[])
{
auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json");
const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end());
if (argc < 2)
{
std::cout << fmt::format("Usage: {} <listfile.zip>\n", argv[0]);
return 1;
}
std::string filename = argv[1];
auto listfileContext = make_listfile_context(filename);
if (!listfileContext)
return 1;
2024-12-25 03:19:29 +01:00
auto analysisContext = make_mana(filename, listfileContext->crateConfig, jModuleDataSources);
if (!analysisContext)
return 1;
auto event_data = [](void *ctx, int crateIndex, int eventIndex,
const mvlc::readout_parser::ModuleData *moduleDataList,
unsigned moduleCount)
{
2024-12-25 05:47:38 +01:00
// reinterpret_cast<AnalysisContext *>(ctx)->process_event_data(crateIndex, eventIndex,
// moduleDataList,
// moduleCount);
};
auto system_event = [](void *ctx, int crateIndex, const u32 *header, u32 size)
2024-12-25 05:47:38 +01:00
{
// reinterpret_cast<AnalysisContext *>(ctx)->process_system_event(crateIndex, header, size);
};
auto parserContext =
make_parser_context(listfileContext->crateConfig, {event_data, system_event});
if (!parserContext)
return 1;
size_t bufferNumber = 0;
size_t totalBytesProcessed = 0;
size_t bytesProcessed = 0;
const std::chrono::milliseconds ReportInterval(500);
mvlc::util::Stopwatch sw;
2024-12-25 03:19:29 +01:00
auto report = [&]
{
2024-12-25 03:19:29 +01:00
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
{
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
auto bytesPerSecond = totalBytesProcessed / s;
auto MiBPerSecond = bytesPerSecond / (1u << 20);
std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n",
bufferNumber, totalBytesProcessed, s, MiBPerSecond);
}(sw, bufferNumber, totalBytesProcessed);
};
do
{
bytesProcessed =
process_one_buffer(bufferNumber, *listfileContext, *parserContext, *analysisContext);
totalBytesProcessed += bytesProcessed;
++bufferNumber;
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
{
2024-12-25 03:19:29 +01:00
report();
sw.interval();
}
}
while (bytesProcessed > 0);
2024-12-25 03:19:29 +01:00
report();
return 0;
}