// mana - mnode analysis / mini analysis // usage: mana_auto_replay // // open the file // read preamble // create crateconfig from preamble data // // setup: // for each event in the config: // for each module: // check for module type in meta data // find module type in vme_module_data_sources.json // for each filter in the modules data_sources: // calculate array size (use float as the storage type for now) // reserve buffer space for the array // // replay: // for each event in the listfile: // for each module in the event: // locate the list of filters for the crate, event and module index triplet // for each filter: // match it against every word in the module data // if a word matches extract address and value // store value in the reserved array buffer // // -> extracted u32 data is stored in the buffer space for this event // what now? // - histogram // - make arrays available to python and test numpy // - crrate root histograms and accumulate // - create root trees or the new rntuples(?) // -> want plugins. similar to the mvme listfile_reader but on analysis data #include // mnode::resources #include #include #include #include #include #include #include "mana_replay_api.h" #include "internal/mana_arena.h" #include "tools/mana_lib.hpp" CMRC_DECLARE(mnode::resources); using namespace mesytec; using namespace mesytec::mnode; struct ListfileContext { std::unique_ptr zipReader; mvlc::listfile::ReadHandle *readHandle; mvlc::listfile::ListfileReaderHelper readerHelper; mvlc::CrateConfig crateConfig; ListfileContext() = default; ListfileContext(const ListfileContext &) = delete; ListfileContext &operator=(const ListfileContext &) = delete; ListfileContext(ListfileContext &&) = default; ListfileContext &operator=(ListfileContext &&) = default; }; std::optional make_listfile_context(const std::string &filename) { try { ListfileContext ctx; ctx.zipReader = std::make_unique(); ctx.zipReader->openArchive(filename); ctx.readHandle = ctx.zipReader->openEntry(ctx.zipReader->firstListfileEntryName()); ctx.readerHelper = mvlc::listfile::make_listfile_reader_helper(ctx.readHandle); auto configData = ctx.readerHelper.preamble.findCrateConfig(); if (!configData) { std::cerr << fmt::format("No MVLC crate config found in {}\n", filename); return {}; } ctx.crateConfig = mvlc::crate_config_from_yaml(configData->contentsToString()); std::cout << fmt::format("Found MVLC crate config in {}\n", filename); for (size_t i = 0; i < ctx.crateConfig.stacks.size(); ++i) { std::cout << fmt::format("event[{}] {}:\n", i, ctx.crateConfig.stacks[i].getName()); size_t mi = 0; for (const auto &module_: ctx.crateConfig.stacks[i].getGroups()) { auto meta = fmt::format("{}", fmt::join(module_.meta, ", ")); std::cout << fmt::format(" module[{}]: name={}, meta={{{}}}", mi++, module_.name, meta); if (!mvlc::produces_output(module_)) std::cout << ", (no output)"; std::cout << "\n"; } } return ctx; } catch (const std::exception &e) { std::cerr << fmt::format("Error: {}\n", e.what()); return {}; } } struct ParserContext { using Parser = mvlc::readout_parser::ReadoutParserState; using Counters = mvlc::readout_parser::ReadoutParserCounters; using Callbacks = mvlc::readout_parser::ReadoutParserCallbacks; Parser parser; Counters counters; Callbacks callbacks; mvlc::CrateConfig crateConfig; }; std::optional make_parser_context(const mvlc::CrateConfig &crateConfig, ParserContext::Callbacks callbacks) { try { ParserContext ctx{}; ctx.parser = mvlc::readout_parser::make_readout_parser(crateConfig.stacks); ctx.crateConfig = crateConfig; ctx.callbacks = callbacks; return ctx; } catch (const std::exception &e) { std::cerr << fmt::format("Error: {}\n", e.what()); return {}; } } MANA_DEFINE_PLUGIN_INIT(mana_default_init) { return nullptr; } MANA_DEFINE_PLUGIN_SHUTDOWN(mana_default_shutdown) {} MANA_DEFINE_PLUGIN_BEGIN_RUN(mana_default_begin_run) {} MANA_DEFINE_PLUGIN_END_RUN(mana_default_end_run) {} MANA_DEFINE_PLUGIN_EVENT_DATA(mana_default_process_event) {} MANA_DEFINE_PLUGIN_SYSTEM_EVENT(mana_default_system_event) {} static mana_plugin_t DefaultPlugin = {.init = mana_default_init, .shutdown = mana_default_shutdown, .begin_run = mana_default_begin_run, .end_run = mana_default_end_run, .process_event = mana_default_process_event}; struct DataSource { mvlc::util::DataFilter filter; mvlc::util::CacheEntry fAddress; mvlc::util::CacheEntry fValue; mvlc::util::span dest; }; struct AnalysisContext { std::unique_ptr arena; mana_run_descriptor_t *runDescriptor = nullptr; mvlc::util::span eventStorages; std::vector>> dataSources; // eventIndex, moduleIndex, sourceIndex mana_plugin_t outputPlugin = DefaultPlugin; void *outputPluginContext = nullptr; size_t runDescriptorBytes = 0; size_t eventStoragesBytes = 0; void process_module_data(const mvlc::readout_parser::DataBlock &data, DataSource &source) { std::fill(std::begin(source.dest), std::end(source.dest), mnode::make_quiet_nan()); for (const u32 *word = data.data, *end = data.data + data.size; word < end; ++word) { if (mvlc::util::matches(source.filter, *word)) { u32 address = mvlc::util::extract(source.fAddress, *word); u32 value = mvlc::util::extract(source.fValue, *word); assert(address < source.dest.size()); source.dest[address] = value; } } } void process_module_data(const mvlc::readout_parser::DataBlock &data, std::vector &sources) { for (auto &source: sources) { process_module_data(data, source); } } void process_module_data(const mvlc::readout_parser::ModuleData &moduleData, std::vector &sources) { mvlc::readout_parser::DataBlock data = moduleData.hasDynamic ? dynamic_span(moduleData) : prefix_span(moduleData); process_module_data(data, sources); } void process_event_data(int crateIndex, int eventIndex, const mvlc::readout_parser::ModuleData *moduleDataList, unsigned moduleCount) { std::cout << fmt::format( "ReadoutEvent: ctx={}, crateId={}, eventIndex={}, moduleCount={}\n", fmt::ptr(this), crateIndex, eventIndex, moduleCount); auto &eventSources = dataSources.at(eventIndex); for (unsigned mi=0; mi make_module_info_by_type(const nlohmann::json &jModuleDataSources) { // module type name -> module info json std::map moduleInfoByType; for (const auto &mod_: jModuleDataSources["modules"]) moduleInfoByType[mod_["type_name"]] = mod_; return moduleInfoByType; } std::vector> match_modules(const mvlc::CrateConfig &crateConfig, const std::map &moduleInfoByType) { std::vector> moduleDataSources; for (const auto &event: crateConfig.stacks) { std::vector eventModuleDataSources; for (const auto &module_: event.getGroups()) { nlohmann::json jModule; // match the meta data module type name against the concrete module name for (const auto &[type_, info]: moduleInfoByType) { if (module_.name.find(type_) != std::string::npos) { spdlog::info("match: type={}, name={}", type_, module_.name); jModule = info; break; } } if (jModule.empty()) { spdlog::warn("No module info found for module name '{}'", module_.name); } eventModuleDataSources.emplace_back(jModule); } moduleDataSources.emplace_back(eventModuleDataSources); } return moduleDataSources; } struct DataSourceInfo { const std::string name; const std::string filterString; mvlc::util::FilterWithCaches filter; DataSourceInfo(const std::string &name_, const std::string &filterString_) : name(name_) , filterString(filterString_) , filter(mvlc::util::make_filter_with_caches(filterString)) { } }; std::vector> make_event_ds_info(const mvlc::CrateConfig &crateConfig, const std::vector> &moduleDataSources) { std::vector> eventDsInfo; for (size_t eventIndex = 0; eventIndex < crateConfig.stacks.size(); ++eventIndex) { const auto &dataSources = moduleDataSources.at(eventIndex); const auto &event = crateConfig.stacks.at(eventIndex); const auto &readouts = event.getGroups(); std::vector dsInfo; for (size_t moduleIndex = 0; moduleIndex < event.getGroups().size(); ++moduleIndex) { const auto &readout = readouts.at(moduleIndex); const auto &ds = dataSources.at(moduleIndex); spdlog::info("readout.name={}, ds={}", readout.name, ds.dump()); if (ds.contains("data_sources")) { for (const auto &filter: ds["data_sources"]) { auto name = fmt::format("{}.{}.{}", event.getName(), readout.name, filter["name"]); dsInfo.emplace_back(DataSourceInfo(name, filter["filter"])); } } } eventDsInfo.emplace_back(dsInfo); } return eventDsInfo; } template T *push_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size) { T *ptr = arena.push_t(size); mana::set(dest.ptr, mana_custom, ptr); dest.size_bytes = size * sizeof(T); return ptr; } template T *push_typed_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size) { T *ptr = arena.push_t(size); mana::set(dest.ptr, ptr); dest.size_bytes = size * sizeof(T); return ptr; } template size_t element_count(mana_offset_array_t &array) { return array.size_bytes / sizeof(T); } template mvlc::util::span get_span(mana_offset_array_t &array) { auto ptr = reinterpret_cast(mana::get(array.ptr)); auto size = element_count(array); return {ptr, size}; } std::pair make_run_descriptor(mana::Arena &arena, const std::string &runName, const std::vector> &eventDsInfo) { auto rd = arena.push_t(); mana::set(rd->name, arena.push_cstr(runName)); auto eds = push_offset_array(arena, rd->events, eventDsInfo.size()); rd->event_count = eventDsInfo.size(); for (const auto &eventDataSources: eventDsInfo) { auto ed = eds++; auto ads = push_offset_array(arena, ed->data_arrays, eventDataSources.size()); ed->data_array_count = eventDataSources.size(); for (const auto &ds: eventDataSources) { auto ad = ads++; mana::set(ad->name, arena.push_cstr(ds.name)); ad->data_type = mana_float; ad->size = 1; if (auto c = get_cache_entry(ds.filter, 'A'); c) ad->size = 1u << c->extractBits; if (auto c = get_cache_entry(ds.filter, 'D'); c) ad->bits = c->extractBits; } } return {rd, arena.cur_end() - reinterpret_cast(rd)}; } std::pair, size_t> make_event_storages(mana::Arena &arena, const std::vector> &eventDsInfo) { auto eds = arena.push_t(eventDsInfo.size()); for (size_t eventIndex = 0; eventIndex < eventDsInfo.size(); ++eventIndex) { auto &ed = eds[eventIndex]; auto &dsInfo = eventDsInfo[eventIndex]; auto das = push_offset_array(arena, ed.data_arrays, dsInfo.size()); for (const auto &ds: dsInfo) { auto da = das++; size_t size = 1; if (auto c = get_cache_entry(ds.filter, 'A'); c) size = 1u << c->extractBits; push_typed_offset_array(arena, *da, size); } } return {{eds, eventDsInfo.size()}, arena.cur_end() - reinterpret_cast(eds)}; } void dump(mana_run_descriptor_t *rd) { auto eds = get_span(rd->events); spdlog::info("mana_run_descriptor @ {}, name={}, event_count={}", fmt::ptr(rd), mana::get(rd->name), eds.size()); for (size_t eventIndex = 0; eventIndex < rd->event_count; ++eventIndex) { auto &ed = eds[eventIndex]; spdlog::info(" event[{}]:", eventIndex); auto ads = get_span(ed.data_arrays); for (auto &ad: ads) { spdlog::info(" array: name={}, data_type={}, size={}, bits={}", mana::get(ad.name), static_cast(ad.data_type), ad.size, ad.bits); } } } std::optional make_mana(const std::string runName, const mvlc::CrateConfig &crateConfig, const nlohmann::json &jModuleDataSources) { try { // module type name -> module info json auto moduleInfoByType = make_module_info_by_type(jModuleDataSources); AnalysisContext ctx; ctx.arena = std::make_unique(); // auto arena = ctx.arena.get(); // eventIndex -> moduleIndex -> module info json auto moduleDataSources = match_modules(crateConfig, moduleInfoByType); auto eventDsInfo = make_event_ds_info(crateConfig, moduleDataSources); auto arena = ctx.arena.get(); auto [runDescriptor, runDescriptorSize] = make_run_descriptor(*arena, runName, eventDsInfo); spdlog::info( "runDescriptor @ {}, size={}, allocations={}, capacity={}, pad_waste={}, used_size={}", fmt::ptr(runDescriptor), runDescriptorSize, arena->allocations(), arena->capacity(), arena->pad_waste(), arena->used()); if (runDescriptor) dump(runDescriptor); auto [eventStorages, eventStoragesSize] = make_event_storages(*arena, eventDsInfo); spdlog::info( "eventStorages @ {}, size={}, allocations={}, capacity={}, pad_waste={}, used_size={}", fmt::ptr(&eventStorages.front()), eventStoragesSize, arena->allocations(), arena->capacity(), arena->pad_waste(), arena->used()); for (size_t eventIndex = 0; eventIndex < eventStorages.size(); ++eventIndex) { auto &es = eventStorages[eventIndex]; auto arrays = get_span(es.data_arrays); spdlog::info("eventStorage: event={}, arrays={}", eventIndex, arrays.size()); for (auto &array: arrays) { spdlog::info(" array @ {}, size_bytes={}, size={}", fmt::ptr(mana::get(array.ptr)), array.size_bytes, element_count(array)); } } ctx.runDescriptor = runDescriptor; ctx.eventStorages = eventStorages; ctx.runDescriptorBytes = runDescriptorSize; ctx.eventStoragesBytes = eventStoragesSize; return ctx; } catch (const std::exception &e) { std::cerr << fmt::format("Error: {}\n", e.what()); return {}; } } size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext, ParserContext &parserContext, AnalysisContext &analysisContext) { listfileContext.readerHelper.destBuf().clear(); auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper); if (!buffer->used()) return 0; auto bufferView = buffer->viewU32(); parserContext.parser.userContext = &analysisContext; mvlc::readout_parser::parse_readout_buffer( listfileContext.readerHelper.bufferFormat, parserContext.parser, parserContext.callbacks, parserContext.counters, bufferNumber, bufferView.data(), bufferView.size()); return buffer->used(); } int main(int argc, char *argv[]) { auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json"); const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end()); if (argc < 2) { std::cout << fmt::format("Usage: {} \n", argv[0]); return 1; } std::string filename = argv[1]; auto listfileContext = make_listfile_context(filename); if (!listfileContext) return 1; auto analysisContext = make_mana(filename, listfileContext->crateConfig, jModuleDataSources); if (!analysisContext) return 1; auto event_data = [](void *ctx, int crateIndex, int eventIndex, const mvlc::readout_parser::ModuleData *moduleDataList, unsigned moduleCount) { reinterpret_cast(ctx)->process_event_data(crateIndex, eventIndex, moduleDataList, moduleCount); }; auto system_event = [](void *ctx, int crateIndex, const u32 *header, u32 size) { reinterpret_cast(ctx)->process_system_event(crateIndex, header, size); }; auto parserContext = make_parser_context(listfileContext->crateConfig, {event_data, system_event}); if (!parserContext) return 1; size_t bufferNumber = 0; size_t totalBytesProcessed = 0; size_t bytesProcessed = 0; const std::chrono::milliseconds ReportInterval(500); mvlc::util::Stopwatch sw; auto report = [&] { [](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed) { auto s = sw.get_elapsed().count() / (1000.0 * 1000.0); auto bytesPerSecond = totalBytesProcessed / s; auto MiBPerSecond = bytesPerSecond / (1u << 20); std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n", bufferNumber, totalBytesProcessed, s, MiBPerSecond); }(sw, bufferNumber, totalBytesProcessed); }; do { bytesProcessed = process_one_buffer(bufferNumber, *listfileContext, *parserContext, *analysisContext); totalBytesProcessed += bytesProcessed; ++bufferNumber; if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval) { report(); sw.interval(); } } while (bytesProcessed > 0); report(); return 0; }