mesytec-mnode/src/tools/mana_auto_replay.cc

494 lines
17 KiB
C++
Raw Normal View History

// mana - mnode analysis / mini analysis
// usage: mana_auto_replay <zipfile>
//
// open the file
// read preamble
// create crateconfig from preamble data
//
// setup:
// for each event in the config:
// for each module:
// check for module type in meta data
// find module type in vme_module_data_sources.json
// for each filter in the modules data_sources:
// calculate array size (use float as the storage type for now)
// reserve buffer space for the array
//
// replay:
// for each event in the listfile:
// for each module in the event:
// locate the list of filters for the crate, event and module index triplet
// for each filter:
// match it against every word in the module data
// if a word matches extract address and value
// store value in the reserved array buffer
//
// -> extracted u32 data is stored in the buffer space for this event
// what now?
// - histogram
// - make arrays available to python and test numpy
// - crrate root histograms and accumulate
// - create root trees or the new rntuples(?)
// -> want plugins. similar to the mvme listfile_reader but on analysis data
#include <atomic>
#include <boost/dll.hpp>
#include <cmrc/cmrc.hpp> // mnode::resources
#include <list>
#include <mesytec-mnode/mnode_cpp_types.h>
#include <mesytec-mnode/mnode_math.h>
#include <mesytec-mnode/mnode_nng.h>
#include <mesytec-mvlc/mesytec-mvlc.h>
#include <mutex>
#include <nlohmann/json.hpp>
2024-12-25 03:19:29 +01:00
#include <sstream>
#include "internal/argh.h"
#include "internal/mana_analysis.h"
#include "internal/mana_arena.h"
2024-12-25 05:47:38 +01:00
#include "internal/mana_lib.hpp"
CMRC_DECLARE(mnode::resources);
using namespace mesytec;
using namespace mesytec::mnode;
struct ListfileContext
{
std::unique_ptr<mvlc::listfile::ZipReader> zipReader;
mvlc::listfile::ReadHandle *readHandle;
mvlc::listfile::ListfileReaderHelper readerHelper;
mvlc::CrateConfig crateConfig;
ListfileContext() = default;
ListfileContext(const ListfileContext &) = delete;
ListfileContext &operator=(const ListfileContext &) = delete;
ListfileContext(ListfileContext &&) = default;
ListfileContext &operator=(ListfileContext &&) = default;
};
std::optional<ListfileContext> make_listfile_context(const std::string &filename)
{
try
{
ListfileContext ctx;
ctx.zipReader = std::make_unique<mvlc::listfile::ZipReader>();
ctx.zipReader->openArchive(filename);
ctx.readHandle = ctx.zipReader->openEntry(ctx.zipReader->firstListfileEntryName());
ctx.readerHelper = mvlc::listfile::make_listfile_reader_helper(ctx.readHandle);
auto configData = ctx.readerHelper.preamble.findCrateConfig();
if (!configData)
{
std::cerr << fmt::format("No MVLC crate config found in {}\n", filename);
return {};
}
ctx.crateConfig = mvlc::crate_config_from_yaml(configData->contentsToString());
std::cout << fmt::format("Found MVLC crate config in {}\n", filename);
for (size_t i = 0; i < ctx.crateConfig.stacks.size(); ++i)
{
std::cout << fmt::format("event[{}] {}:\n", i, ctx.crateConfig.stacks[i].getName());
size_t mi = 0;
for (const auto &module_: ctx.crateConfig.stacks[i].getGroups())
{
auto meta = fmt::format("{}", fmt::join(module_.meta, ", "));
std::cout << fmt::format(" module[{}]: name={}, meta={{{}}}", mi++, module_.name,
meta);
if (!mvlc::produces_output(module_))
std::cout << ", (no output)";
std::cout << "\n";
}
}
return ctx;
}
catch (const std::exception &e)
{
std::cerr << fmt::format("Error: {}\n", e.what());
return {};
}
}
struct ParserContext
{
using Parser = mvlc::readout_parser::ReadoutParserState;
using Counters = mvlc::readout_parser::ReadoutParserCounters;
using Callbacks = mvlc::readout_parser::ReadoutParserCallbacks;
Parser parser;
Counters counters;
Callbacks callbacks;
mvlc::CrateConfig crateConfig;
};
std::optional<ParserContext> make_parser_context(const mvlc::CrateConfig &crateConfig,
ParserContext::Callbacks callbacks)
{
try
{
ParserContext ctx{};
ctx.parser = mvlc::readout_parser::make_readout_parser(crateConfig.stacks);
ctx.crateConfig = crateConfig;
ctx.callbacks = callbacks;
return ctx;
}
catch (const std::exception &e)
{
std::cerr << fmt::format("Error: {}\n", e.what());
return {};
}
}
struct ProcessingStrategy
{
virtual ~ProcessingStrategy() = default;
virtual void run(ListfileContext &listfileContext, ParserContext &parserContext) = 0;
};
struct SingleThreadedStrategy: public ProcessingStrategy
{
inline static size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
ParserContext &parserContext)
{
listfileContext.readerHelper.destBuf().clear();
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);
if (!buffer->used())
return 0;
auto bufferView = buffer->viewU32();
mvlc::readout_parser::parse_readout_buffer(listfileContext.readerHelper.bufferFormat,
parserContext.parser, parserContext.callbacks,
parserContext.counters, bufferNumber,
bufferView.data(), bufferView.size());
return buffer->used();
}
2024-12-26 03:17:33 +01:00
void run(ListfileContext &listfileContext, ParserContext &parserContext) override
{
size_t bufferNumber = 1;
2024-12-26 03:17:33 +01:00
size_t totalBytesProcessed = 0;
size_t bytesProcessed = 0;
const std::chrono::milliseconds ReportInterval(500);
mvlc::util::Stopwatch sw;
auto report = [&]
{
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
{
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
auto bytesPerSecond = totalBytesProcessed / s;
auto MiBPerSecond = bytesPerSecond / (1u << 20);
std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n",
bufferNumber, totalBytesProcessed, s, MiBPerSecond);
}(sw, bufferNumber, totalBytesProcessed);
};
do
{
bytesProcessed = process_one_buffer(bufferNumber, listfileContext, parserContext);
totalBytesProcessed += bytesProcessed;
++bufferNumber;
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
{
report();
sw.interval();
}
}
while (bytesProcessed > 0);
report();
}
};
struct NngPairStrategy: public ProcessingStrategy
{
struct SocketCounters
{
size_t bytes;
size_t messages;
};
nng_socket producerSocket;
nng_socket consumerSocket;
SocketCounters producerTx;
SocketCounters consumerRx;
// std::mutex producerTxMutex;
explicit NngPairStrategy(const std::string &url_)
{
producerSocket = nng::make_pair_socket();
consumerSocket = nng::make_pair_socket();
if (int res = nng_listen(producerSocket, url_.c_str(), nullptr, 0))
throw std::runtime_error(
fmt::format("error listening on '{}': {}", url_, nng_strerror(res)));
if (int res = nng_dial(consumerSocket, url_.c_str(), nullptr, 0))
throw std::runtime_error(
fmt::format("error connecting to '{}': {}", url_, nng_strerror(res)));
};
~NngPairStrategy()
{
nng_close(consumerSocket);
nng_close(producerSocket);
}
// message format is: u32 bufferNumber, u32 bufferSize, u32 bufferData[bufferSize]
void producer(ListfileContext &listfileContext, std::atomic<bool> &quit)
{
size_t bufferNumber = 1;
while (!quit)
{
listfileContext.readerHelper.destBuf().clear();
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);
if (!buffer->used())
break;
auto bufferView = buffer->viewU32();
auto msg = nng::allocate_message(2 * sizeof(u32) + bufferView.size() * sizeof(u32));
size_t msgLen = nng_msg_len(msg.get());
auto data = reinterpret_cast<u32 *>(nng_msg_body(msg.get()));
*(data + 0) = bufferNumber++;
*(data + 1) = static_cast<u32>(bufferView.size());
std::copy(std::begin(bufferView), std::end(bufferView), data + 2);
int res = 0;
do
{
if ((res = nng::send_message(producerSocket, msg)))
{
if (res != NNG_ETIMEDOUT)
{
spdlog::error("NngPairStrategy: error sending message: {}",
nng_strerror(res));
return;
}
}
// std::lock_guard<std::mutex> lock(producerTxMutex);
producerTx.bytes += msgLen;
++producerTx.messages;
}
while (res == NNG_ETIMEDOUT);
}
nng::send_empty_message(producerSocket);
}
void consumer(mvlc::ConnectionType bufferFormat, ParserContext &parserContext)
{
size_t bufferNumber = 1;
size_t totalBytesProcessed = 0;
const std::chrono::milliseconds ReportInterval(500);
mvlc::util::Stopwatch sw;
auto report = [&]
{
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
{
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
auto bytesPerSecond = totalBytesProcessed / s;
auto MiBPerSecond = bytesPerSecond / (1u << 20);
2024-12-26 18:42:25 +01:00
std::cout << fmt::format(
"Processed {} buffers, {} bytes. t={} s, rate={:.2f} MiB/s\n", bufferNumber,
totalBytesProcessed, s, MiBPerSecond);
}(sw, bufferNumber, totalBytesProcessed);
};
while (true)
{
auto [msg, res] = nng::receive_message(consumerSocket);
if (res && res != NNG_ETIMEDOUT)
{
spdlog::error("NngPairStrategy: error receiving message: {}", nng_strerror(res));
return;
}
auto data =
mvlc::util::span<const u32>(reinterpret_cast<const u32 *>(nng_msg_body(msg.get())),
nng_msg_len(msg.get()) / sizeof(u32));
if (data.size() < 2)
break;
bufferNumber = data[0];
size_t bufferSize = data[1];
if (data.size() != bufferSize + 2)
{
spdlog::error("NngPairStrategy: invalid message size: {}", data.size());
return;
}
std::basic_string_view<u32> bufferView(data.data() + 2, bufferSize);
mvlc::readout_parser::parse_readout_buffer(
bufferFormat, parserContext.parser, parserContext.callbacks, parserContext.counters,
bufferNumber, bufferView.data(), bufferView.size());
consumerRx.bytes += nng_msg_len(msg.get());
++consumerRx.messages;
totalBytesProcessed += bufferView.size() * sizeof(u32);
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
{
report();
sw.interval();
}
}
report();
2024-12-26 18:42:25 +01:00
spdlog::debug("NngPairStrategy: producerTx: {:.2f} MiB, {} messages",
producerTx.bytes / (1024.0 * 1024), producerTx.messages);
spdlog::debug("NngPairStrategy: consumerRx: {:.2f} MiB, {} messages",
consumerRx.bytes / (1024.0 * 1024), consumerRx.messages);
}
void run(ListfileContext &listfileContext, ParserContext &parserContext) override
{
auto bufferFormat = listfileContext.readerHelper.bufferFormat;
std::atomic<bool> quitProducer = false;
producerTx = {};
consumerRx = {};
std::thread producerThread([this, &listfileContext, &quitProducer]
{ producer(listfileContext, quitProducer); });
consumer(bufferFormat, parserContext);
quitProducer = true;
if (producerThread.joinable())
producerThread.join();
}
};
void usage(const char *self)
{
std::cout << fmt::format(
"usage: {} [--plugin=<plugin.so>] [--processing-strategy=<name>] <zipfile>\n", self);
std::cout
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a "
"simple counting plugin.\n"
<< " --processing-strategy=<name> Use a specific processing strategy. "
"Available: 'direct', 'nng-pair'. Default: 'direct'\n"
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
"'off'.\n";
}
int main(int argc, char *argv[])
{
auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json");
const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end());
argh::parser parser({"-h", "--help", "--plugin", "--processing-strategy", "--log-level"});
parser.parse(argc, argv);
auto filename = parser[1];
if (parser["-h"] || parser["--help"] || filename.empty())
{
usage(argv[0]);
return 0;
}
spdlog::set_level(spdlog::level::info);
if (parser("--log-level"))
{
spdlog::set_level(spdlog::level::from_str(parser("--log-level").str()));
}
auto listfileContext = make_listfile_context(filename);
if (!listfileContext)
{
std::cerr << fmt::format("Error: could not open {}\n", filename);
return 1;
}
std::unique_ptr<mana::IManaSink> manaPlugin;
boost::dll::shared_library pluginHandle;
if (parser("--plugin"))
{
auto pluginFile = parser("--plugin").str();
try
{
pluginHandle = boost::dll::shared_library(pluginFile);
manaPlugin = std::make_unique<mana::ManaCSink>(
pluginHandle.get<mana_sink_plugin_t()>("mana_get_sink_plugin")());
}
catch (const std::exception &e)
{
std::cerr << fmt::format("Error loading plugin: {}\n", e.what());
return 1;
}
}
else
{
manaPlugin = std::make_unique<mana::ManaCountingSink>();
}
std::string strategyName = "direct";
if (parser("--processing-strategy"))
strategyName = parser("--processing-strategy").str();
std::unique_ptr<ProcessingStrategy> strategy; //= std::make_unique<SingleThreadedStrategy>();
if (strategyName == "nng-pair")
strategy = std::make_unique<NngPairStrategy>("inproc://mana_module_data_stage");
else if (strategyName == "direct")
strategy = std::make_unique<SingleThreadedStrategy>();
else
{
std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName);
usage(argv[0]);
return 1;
}
auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig,
jModuleDataSources, manaPlugin.get(), nullptr);
auto event_data = [](void *ctx_, int crateIndex, int eventIndex,
const mvlc::readout_parser::ModuleData *moduleDataList,
unsigned moduleCount)
{
(void)crateIndex;
auto ctx = reinterpret_cast<mana::ModuleDataStage *>(ctx_);
mana::module_data_stage_process_module_data(*ctx, eventIndex, moduleDataList, moduleCount);
};
auto system_event = [](void *ctx_, int crateIndex, const u32 *header, u32 size)
2024-12-25 05:47:38 +01:00
{
(void)crateIndex;
auto ctx = reinterpret_cast<mana::ModuleDataStage *>(ctx_);
mana::module_data_stage_process_system_event(*ctx, header, size);
2024-12-25 05:47:38 +01:00
};
auto parserContext =
make_parser_context(listfileContext->crateConfig, {event_data, system_event});
if (!parserContext)
return 1;
// make the analysis instance available to the parser callbacks
parserContext->parser.userContext = &mana;
2024-12-27 14:38:12 +01:00
mana.sink->init(0, nullptr);
mana.sink->begin_run(mana.runDescriptor.dump().c_str());
2024-12-26 03:17:33 +01:00
strategy->run(*listfileContext, *parserContext);
mana.sink->end_run(mana.runDescriptor.dump().c_str());
mana.sink->shutdown();
return 0;
}