#include #include #include #include #include #include using namespace mesytec; using namespace mesytec::mvlc; using namespace mesytec::nng; static const size_t DefaultOutputMessageReserve = mvlc::util::Megabytes(1); enum class BufferType: u32 { MVLC_USB, MVLC_ETH, }; enum class MessageType: u8 { ListfileBuffer, ParsedEvents, }; #define PACK_AND_ALIGN4 __attribute__((packed, aligned(4))) struct PACK_AND_ALIGN4 BaseMessageHeader { MessageType messageType; u32 messageNumber; }; struct PACK_AND_ALIGN4 ListfileBufferMessageHeader: public BaseMessageHeader { u32 bufferType; }; static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0); struct PACK_AND_ALIGN4 ParsedEventsMessageHeader: public BaseMessageHeader { }; static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0); size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, std::vector &tmpBuf) { size_t bytesMoved = 0u; const u8 *msgBufferData = reinterpret_cast(nng_msg_body(msg)) + sizeof(ListfileBufferMessageHeader); const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader); if (bufferType == BufferType::MVLC_USB) bytesMoved = fixup_buffer_mvlc_usb(msgBufferData, msgBufferSize, tmpBuf); else bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf); nng_msg_chop(msg, bytesMoved); return bytesMoved; } void listfile_reader_producer( nng_socket outputSocket, mvlc::listfile::ReadHandle &input, const BufferType &bufferType) { prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0); #if 0 log_socket_info(outputSocket, "listfile_reader_producer - outputSocket"); if (auto res = nng_close(outputSocket)) { spdlog::error("nng_close: {}", nng_strerror(res)); log_socket_info(outputSocket, "listfile_reader_producer - outputSocket"); return; } return; #endif try { u32 bufferNumber = 0u; size_t totalBytesSent = 0u; std::vector previousData; bool done = false; while (!done) { nng_msg *msg = {}; if (allocate_reserve_message(&msg, DefaultOutputMessageReserve)) return; ListfileBufferMessageHeader header { MessageType::ListfileBuffer, ++bufferNumber, static_cast(bufferType), }; nng_msg_append(msg, &header, sizeof(header)); assert(nng_msg_len(msg) == sizeof(header)); nng_msg_append(msg, previousData.data(), previousData.size()); previousData.clear(); size_t msgUsed = nng_msg_len(msg); nng_msg_realloc(msg, DefaultOutputMessageReserve); size_t bytesRead = input.read( reinterpret_cast(nng_msg_body(msg)) + msgUsed, nng_msg_len(msg) - msgUsed); nng_msg_realloc(msg, msgUsed + bytesRead); fixup_listfile_buffer_message(bufferType, msg, previousData); done = (bytesRead == 0 && nng_msg_len(msg) == sizeof(ListfileBufferMessageHeader)); if (done) nng_msg_realloc(msg, 0); const auto msgSize = nng_msg_len(msg); if (auto res = send_message_retry(outputSocket, msg, 0, "listfile_reader_producer")) { nng_msg_free(msg); msg = nullptr; spdlog::error("listfile_reader_producer: send_message_retry: {}", nng_strerror(res)); return; } spdlog::debug("listfile_reader_producer: sent message {} of size {}", bufferNumber, msgSize); totalBytesSent += msgSize; } spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB", bufferNumber, 1.0 * totalBytesSent / mvlc::util::Megabytes(1)); } catch(const std::exception& e) { spdlog::error("listfile_reader_prroducer: exception: {}", e.what()); return; } } struct ReadoutParserNngContext { nng_msg *outputMessage = nullptr; u32 outputMessageNumber = 0u; nng_socket outputSocket = NNG_SOCKET_INITIALIZER; size_t totalReadoutEvents = 0u; size_t totalSystemEvents = 0u; }; struct PACK_AND_ALIGN4 ParsedEventHeader { u8 magicByte; u8 crateIndex; }; static const u8 ParsedDataEventMagic = 0xF3u; static const u8 ParsedSystemEventMagic = 0xFAu; struct PACK_AND_ALIGN4 ParsedDataEventHeader: public ParsedEventHeader { u8 eventIndex; u8 moduleCount; }; struct PACK_AND_ALIGN4 ParsedModuleHeader { u16 prefixSize; u16 suffixSize; u32 dynamicSize; size_t totalSize() const { return prefixSize + suffixSize + dynamicSize; } size_t totalBytes() const { return totalSize() * sizeof(u32); } }; struct PACK_AND_ALIGN4 ParsedSystemEventHeader: public ParsedEventHeader { u32 eventSize; size_t totalSize() const { return eventSize; } size_t totalBytes() const { return totalSize() * sizeof(u32); } }; bool parser_maybe_alloc_output(ReadoutParserNngContext &ctx) { auto &msg = ctx.outputMessage; if (msg) return false; if (allocate_reserve_message(&msg, DefaultOutputMessageReserve)) return false; ParsedEventsMessageHeader header = { MessageType::ParsedEvents, ++ctx.outputMessageNumber, }; nng_msg_append(msg, &header, sizeof(header)); assert(nng_msg_len(msg) == sizeof(header)); return true; } bool flush_output_message(ReadoutParserNngContext &ctx, const char *debugInfo = "") { const auto msgSize = nng_msg_len(ctx.outputMessage); if (auto res = send_message_retry(ctx.outputSocket, ctx.outputMessage, 0, debugInfo)) { nng_msg_free(ctx.outputMessage); ctx.outputMessage = nullptr; spdlog::error("{}: send_message_retry: {}:", debugInfo, nng_strerror(res)); return false; } ctx.outputMessage = nullptr; spdlog::debug("{}: sent message {} of size {}", debugInfo, ctx.outputMessageNumber, msgSize); return true; } void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) { assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); assert(eventIndex >= 0 && eventIndex <= std::numeric_limits::max()); assert(moduleCount < std::numeric_limits::max()); auto &ctx = *reinterpret_cast(ctx_); ++ctx.totalReadoutEvents; auto &msg = ctx.outputMessage; size_t requiredBytes = sizeof(ParsedDataEventHeader); for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex) { auto &moduleData = moduleDataList[moduleIndex]; requiredBytes += sizeof(ParsedModuleHeader); requiredBytes += moduleData.data.size * sizeof(u32); } size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; if (msg && bytesFree < requiredBytes) { if (!flush_output_message(ctx, "parser_nng_eventdata")) return; } if (!msg && !parser_maybe_alloc_output(ctx)) return; bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; assert(bytesFree >= requiredBytes); ParsedDataEventHeader eventHeader = { ParsedDataEventMagic, static_cast(crateIndex), static_cast(eventIndex), static_cast(moduleCount), }; if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) { spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); return; } for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex) { auto &moduleData = moduleDataList[moduleIndex]; ParsedModuleHeader moduleHeader = {}; moduleHeader.prefixSize = moduleData.prefixSize; moduleHeader.dynamicSize = moduleData.dynamicSize; moduleHeader.suffixSize = moduleData.suffixSize; if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader))) { spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); return; } if (int res = nng_msg_append(msg, moduleData.data.data, moduleData.data.size * sizeof(u32))) { spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); return; } } } void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 size) { assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); auto &ctx = *reinterpret_cast(ctx_); ++ctx.totalSystemEvents; auto &msg = ctx.outputMessage; size_t requiredBytes = sizeof(ParsedSystemEventHeader) + size * sizeof(u32); size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; if (msg && bytesFree < requiredBytes) { if (!flush_output_message(ctx, "parser_nng_systemevent")) return; } if (!msg && !parser_maybe_alloc_output(ctx)) return; bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; assert(bytesFree >= requiredBytes); ParsedSystemEventHeader eventHeader = { ParsedSystemEventMagic, static_cast(crateIndex), size, }; if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) { spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res)); return; } if (int res = nng_msg_append(msg, header, size * sizeof(u32))) { spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res)); return; } } class StopWatch { public: using duration_type = std::chrono::microseconds; void start() { tStart_ = tInterval_ = std::chrono::high_resolution_clock::now(); } duration_type interval() { auto now = std::chrono::high_resolution_clock::now(); auto result = std::chrono::duration_cast(now - tInterval_); tInterval_ = now; return result; } duration_type end() { auto now = std::chrono::high_resolution_clock::now(); auto result = std::chrono::duration_cast(now - tStart_); return result; } private: std::chrono::high_resolution_clock::time_point tStart_; std::chrono::high_resolution_clock::time_point tInterval_; }; void listfile_parser_nng( nng_socket inputSocket, nng_socket outputSocket, const mvlc::CrateConfig &crateConfig) { prctl(PR_SET_NAME,"listfile_parser_nng",0,0,0); size_t totalInputBytes = 0u; u32 lastInputMessageNumber = 0u; size_t inputBuffersLost = 0; const auto listfileFormat = crateConfig.connectionType; std::string stacksYaml; for (const auto &stack: crateConfig.stacks) stacksYaml += to_yaml(stack); spdlog::info("listfile_parser_nng: readout stacks:\n{}", stacksYaml); auto crateIndex = 0; ReadoutParserNngContext parserContext; parserContext.outputSocket = outputSocket; auto parserState = mvlc::readout_parser::make_readout_parser( crateConfig.stacks, &parserContext); mvlc::readout_parser::ReadoutParserCounters parserCounters = {}; mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = { parser_nng_eventdata, parser_nng_systemevent, }; nng_msg *inputMsg = nullptr; u32 &outputMessageNumber = parserContext.outputMessageNumber; std::chrono::microseconds tReceive(0); std::chrono::microseconds tProcess(0); std::chrono::microseconds tSend(0); std::chrono::microseconds tTotal(0); auto tLastReport = std::chrono::steady_clock::now(); const auto tStart = std::chrono::steady_clock::now(); auto log_stats = [&] { spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1)); spdlog::info("listfile_parser_nng: time budget: " " tReceive = {} ms, " " tProcess = {} ms, " " tSend = {} ms, " " tTotal = {} ms", tReceive.count() / 1000.0, tProcess.count() / 1000.0, tSend.count() / 1000.0, tTotal.count() / 1000.0); auto totalElapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - tStart); auto totalBytes = parserCounters.bytesProcessed; auto totalMiB = totalBytes / (1024.0*1024.0); //auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count(); auto MiBperSecond = totalMiB / totalElapsed.count() * 1000.0; spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s", outputMessageNumber, totalMiB, MiBperSecond); }; while (true) { StopWatch stopWatch; stopWatch.start(); if (auto res = receive_message(inputSocket, &inputMsg)) { if (res != NNG_ETIMEDOUT) { spdlog::error("listfile_parser_nng - receive_message: {}", nng_strerror(res)); return; } spdlog::trace("listfile_parser_nng - receive_message: timeout"); } else if (nng_msg_len(inputMsg) < sizeof(ListfileBufferMessageHeader)) { if (nng_msg_len(inputMsg) == 0) break; spdlog::warn("listfile_parser_nng - incoming message too short (len={})", nng_msg_len(inputMsg)); } else { tReceive += stopWatch.interval(); totalInputBytes += nng_msg_len(inputMsg); auto inputHeader = *reinterpret_cast( nng_msg_body(inputMsg)); auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; lastInputMessageNumber = inputHeader.messageNumber; spdlog::debug("listfile_parser_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader)); auto inputData = reinterpret_cast(nng_msg_body(inputMsg)); size_t inputLen = nng_msg_len(inputMsg) / sizeof(u32); readout_parser::parse_readout_buffer( listfileFormat, parserState, parserCallbacks, parserCounters, inputHeader.messageNumber, inputData, inputLen); tProcess += stopWatch.interval(); // TODO: also flush after a certain time tTotal += stopWatch.end(); } if (inputMsg) { nng_msg_free(inputMsg); inputMsg = nullptr; } { auto now = std::chrono::steady_clock::now(); auto tReportElapsed = now - tLastReport; static const auto ReportInterval = std::chrono::seconds(1); if (tReportElapsed >= ReportInterval) { log_stats(); tLastReport = now; } } } if (parserContext.outputMessage) flush_output_message(parserContext, "listfile_parser_nng"); if (inputMsg) { nng_msg_free(inputMsg); inputMsg = nullptr; } assert(!parserContext.outputMessage); // send empty message if (auto res = nng_msg_alloc(&parserContext.outputMessage, 0)) { spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res)); return; } if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) { nng_msg_free(parserContext.outputMessage); parserContext.outputMessage = nullptr; spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res)); return; } log_stats(); { std::ostringstream ss; readout_parser::print_counters(ss, parserCounters); spdlog::info("listfile_parser_nng: parser counters:\n{}", ss.str()); } } void analysis_nng( nng_socket inputSocket ) { prctl(PR_SET_NAME,"analysis_nng",0,0,0); nng_msg *inputMsg = nullptr; size_t totalInputBytes = 0u; u32 lastInputMessageNumber = 0u; size_t inputBuffersLost = 0; bool error = false; while (!error) { if (auto res = receive_message(inputSocket, &inputMsg)) { if (res != NNG_ETIMEDOUT) { spdlog::error("analysis_nng - receive_message: {}", nng_strerror(res)); return; } } else if (nng_msg_len(inputMsg) < sizeof(ParsedEventsMessageHeader)) { if (nng_msg_len(inputMsg) == 0) break; spdlog::warn("analysis_nng - incoming message too short (len={})", nng_msg_len(inputMsg)); } else { totalInputBytes += nng_msg_len(inputMsg); auto inputHeader = msg_trim_read(inputMsg).value(); auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; lastInputMessageNumber = inputHeader.messageNumber; if (inputHeader.messageType != MessageType::ParsedEvents) { spdlog::error("Received input message with unhandled type 0x{:02x}, expected type 0x{:02x}", static_cast(inputHeader.messageType), static_cast(MessageType::ParsedEvents)); break; } spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); while (true) { if (nng_msg_len(inputMsg) < 1) break; const u8 eventMagic = *reinterpret_cast(nng_msg_body(inputMsg)); if (eventMagic == ParsedDataEventMagic) { auto eventHeader = msg_trim_read(inputMsg); if (!eventHeader) break; for (size_t moduleIndex=0u; moduleIndexmoduleCount; ++moduleIndex) { auto moduleHeader = msg_trim_read(inputMsg); if (!moduleHeader) break; if (moduleHeader->totalBytes()) { const u32 *moduleData = reinterpret_cast(nng_msg_body(inputMsg)); //util::log_buffer(std::cout, moduleData, moduleHeader->totalSize(), fmt::format("crate={}, event={}, module={}, size={}", // eventHeader->crateIndex, eventHeader->eventIndex, moduleIndex, moduleHeader->totalSize())); if (nng_msg_trim(inputMsg, moduleHeader->totalBytes())) break; } } } else if (eventMagic == ParsedSystemEventMagic) { auto eventHeader = msg_trim_read(inputMsg); if (!eventHeader) break; if (nng_msg_trim(inputMsg, eventHeader->totalBytes())) break; } } assert(nng_msg_len(inputMsg) == 0); nng_msg_free(inputMsg); inputMsg = nullptr; } } spdlog::info("analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1)); } void pipe_cb(nng_pipe p, nng_pipe_ev event, void */*arg*/) { switch (event) { case::NNG_PIPE_EV_ADD_PRE: spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_PRE"); break; case::NNG_PIPE_EV_ADD_POST: spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_POST"); log_pipe_info(p, "NNG_PIPE_EV_ADD_POST"); break; case::NNG_PIPE_EV_REM_POST: spdlog::info("pipe_cb:NNG_PIPE_EV_REM_POST"); break; case::NNG_PIPE_EV_NUM: // silence warning break; } } int main(int argc, char *argv[]) { spdlog::set_level(spdlog::level::info); if (argc < 2) { std::cerr << fmt::format("Usage: mvlc_nng_replay \n"); return 1; } const auto inputFilename = argv[1]; listfile::ZipReader zipReader; try { zipReader.openArchive(inputFilename); } catch (const std::exception &e) { std::cout << fmt::format("Error: could not open '{}' for reading: {}\n", inputFilename, e.what()); return 1; } listfile::ReadHandle *readHandle = nullptr; try { readHandle = zipReader.openEntry(zipReader.firstListfileEntryName()); } catch(const std::exception& e) { if (zipReader.firstListfileEntryName().empty()) std::cout << fmt::format("Error: no MVLC listfile found in '{}'\n", inputFilename); else std::cout << fmt::format("Error: could not open listfile '{}' in '{}' for reading: {}\n", zipReader.firstListfileEntryName(), inputFilename, e.what()); return 1; } assert(readHandle); try { auto readerOutputSocket = make_pair_socket(); auto parserInputSocket = make_pair_socket(); auto parserOutputSocket = make_pair_socket(); auto analysisInputSocket = make_pair_socket(); for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket }) { for (auto event: { NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST }) { if (int res = nng_pipe_notify(socket, event, pipe_cb, nullptr)) mesy_nng_fatal("nng_pipe_notify", res); } } if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0)) mesy_nng_fatal("nng_listen inproc", res); log_socket_info(readerOutputSocket, "readerOutputSocket"); if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0)) mesy_nng_fatal("nng_dial inproc", res); log_socket_info(parserInputSocket, "parserInputSocket"); if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0)) mesy_nng_fatal("nng_listen inproc", res); if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0)) mesy_nng_fatal("nng_dial inproc", res); spdlog::info("replaying from {}", zipReader.firstListfileEntryName()); auto preamble = mvlc::listfile::read_preamble(*readHandle); const auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth() ? BufferType::MVLC_ETH : BufferType::MVLC_USB); auto crateConfigSection = preamble.findCrateConfig(); if (!crateConfigSection) { spdlog::error("listfile_parser_nng - no CrateConfig found in listfile preamble"); // FIXME: close sockets return 1; } auto configYaml = crateConfigSection->contentsToString(); auto crateConfig = mvlc::crate_config_from_yaml(configYaml); // Seek to start, then read past the magic bytes at the beginning of the // listfile. (void) listfile::read_magic(*readHandle); std::vector threads; threads.emplace_back(std::thread(listfile_reader_producer, readerOutputSocket, std::ref(*readHandle), std::cref(bufferType))); threads.emplace_back(std::thread(listfile_parser_nng, parserInputSocket, parserOutputSocket, std::cref(crateConfig))); threads.emplace_back(std::thread(analysis_nng, analysisInputSocket)); for (auto &t: threads) if (t.joinable()) t.join(); for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket }) if (auto res = nng_close(socket)) mesy_nng_fatal("nng_close", res); } catch(const std::exception& e) { spdlog::error("exception in main(): {}", e.what()); return 1; } return 0; }