From 32aa3d6d08d7e787dbbb5f5b60051ee4b50977ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Mon, 3 Jul 2023 20:00:19 +0200 Subject: [PATCH] slow replay --- src/mvlc_nng_replay.cc | 567 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 528 insertions(+), 39 deletions(-) diff --git a/src/mvlc_nng_replay.cc b/src/mvlc_nng_replay.cc index 6095404..6d106e3 100644 --- a/src/mvlc_nng_replay.cc +++ b/src/mvlc_nng_replay.cc @@ -1,9 +1,36 @@ +#include #include #include +#include +#include "common.h" using namespace mesytec; using namespace mesytec::mvlc; +int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 0, const char *debugInfo = "") +{ + int res = 0; + size_t attempt = 0u; + + do + { + res = nng_sendmsg(socket, msg, 0); + + if (res && res == NNG_ETIMEDOUT) + spdlog::warn("send_message_retry: {} - send timeout", debugInfo); + + if (res && res != NNG_ETIMEDOUT) + return res; + + if (res && maxTries > 0 && attempt >= maxTries) + return res; + + ++attempt; + } while (res == NNG_ETIMEDOUT); + + return 0; +} + // Follows the framing structure inside the buffer until an incomplete frame // which doesn't fit into the buffer is detected. The incomplete data is moved // over to the tempBuffer so that the readBuffer ends with a complete frame. @@ -94,44 +121,64 @@ enum class BufferType: u32 MVLC_ETH, }; -#pragma pack(push, 1) -struct BufferMessageHeader +enum class MessageType { - u32 bufferType; - u32 bufferNumber; + ListfileBuffer, + ParsedEvents, }; -#pragma pack(pop) +struct __attribute__((packed, aligned(1))) MessageHeaderBase +{ + MessageType messageType; + u32 messageNumber; +}; -size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, size_t msgUsed, std::vector &tmpBuf) +struct __attribute__((packed, aligned(1))) ListfileBufferMessageHeader: public MessageHeaderBase +{ + u32 bufferType; +}; + +static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0); + +struct __attribute__((packed, aligned(1))) ParsedEventsMessageHeader: public MessageHeaderBase +{ +}; + +static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0); + +void fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, std::vector &tmpBuf) { size_t bytesMoved = 0u; - const u8 *msgBufferData = reinterpret_cast(nng_msg_body(msg)) + sizeof(BufferMessageHeader); - const auto msgBufferSize = msgUsed - sizeof(BufferMessageHeader); + const u8 *msgBufferData = reinterpret_cast(nng_msg_body(msg)) + sizeof(ListfileBufferMessageHeader); + const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader); if (bufferType == BufferType::MVLC_USB) bytesMoved = fixup_buffer_mvlc_usb(msgBufferData, msgBufferSize, tmpBuf); else bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf); - nng_msg_chop(msg, msgUsed - bytesMoved); - return msgUsed - bytesMoved; + nng_msg_chop(msg, bytesMoved); } -void listfile_reader_produer( +void listfile_reader_producer( nng_socket socket, - mvlc::listfile::ReadHandle &input) + mvlc::listfile::ReadHandle &input, + const mvlc::listfile::Preamble &preamble) { + prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0); + try { - auto preamble = mvlc::listfile::read_preamble(input); - auto bufferType = BufferType::MVLC_USB; - if (preamble.magic == mvlc::listfile::get_filemagic_eth()) - bufferType = BufferType::MVLC_ETH; - u32 bufferNumber = 0u; - std::vector previousData; + auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth() + ? BufferType::MVLC_ETH + : BufferType::MVLC_USB); - while (true) + u32 bufferNumber = 0u; + size_t totalBytesSent = 0u; + std::vector previousData; + bool done = false; + + while (!done) { nng_msg *msg = {}; if (int res = nng_msg_alloc(&msg, util::Megabytes(1))) @@ -140,57 +187,499 @@ void listfile_reader_produer( return; } - BufferMessageHeader msgHeader = {}; - msgHeader.bufferType = static_cast(bufferType); - msgHeader.bufferNumber = bufferNumber; - nng_msg_append(msg, &msgHeader, sizeof(msgHeader)); - size_t msgUsed = sizeof(msgHeader); + auto msgHeader = reinterpret_cast(nng_msg_body(msg)); + msgHeader->messageType = MessageType::ListfileBuffer; + msgHeader->messageNumber = ++bufferNumber; + msgHeader->bufferType = static_cast(bufferType); + nng_msg_realloc(msg, sizeof(ListfileBufferMessageHeader)); - std::copy(std::begin(previousData), std::end(previousData), - reinterpret_cast(nng_msg_body(msg)) + msgUsed); - msgUsed += previousData.size(); + nng_msg_append(msg, previousData.data(), previousData.size()); previousData.clear(); + size_t msgUsed = nng_msg_len(msg); + nng_msg_realloc(msg, util::Megabytes(1)); + size_t bytesRead = input.read( reinterpret_cast(nng_msg_body(msg)) + msgUsed, nng_msg_len(msg) - msgUsed); - msgUsed += bytesRead; - msgUsed = fixup_listfile_buffer_message(bufferType, msg, msgUsed, previousData); - assert(msgUsed == nng_msg_len(msg)); - if (bytesRead == 0 && nng_msg_len(msg) == sizeof(BufferMessageHeader)) + nng_msg_realloc(msg, msgUsed + bytesRead); + fixup_listfile_buffer_message(bufferType, msg, previousData); + + done = (bytesRead == 0 && nng_msg_len(msg) == sizeof(ListfileBufferMessageHeader)); + + if (done) + nng_msg_realloc(msg, 0); + + const auto msgSize = nng_msg_len(msg); + + if (auto res = send_message_retry(socket, msg, 0, "listfile_reader_producer")) { nng_msg_free(msg); + msg = nullptr; + spdlog::error("listfile_reader_producer: send_message_retry: {}", nng_strerror(res)); return; } - else if (auto res = nng_sendmsg(socket, msg, 0)) + + spdlog::info("listfile_reader_producer: sent message {} of size {}", + bufferNumber, msgSize); + totalBytesSent += msgSize; + } + + spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB", + bufferNumber, 1.0 * totalBytesSent / util::Megabytes(1)); + } + catch(const std::exception& e) + { + spdlog::error("listfile_reader_prroducer: exception: {}", e.what()); + return; + } + +} + +struct ParserNngContext +{ + nng_msg *outputMessage = nullptr; + size_t totalReadoutEvents = 0u; + size_t totalSystemEvents = 0u; +}; + +struct __attribute__((packed, aligned(1))) ParsedEventHeader +{ + u32 magicByte: 8; + u8 crateIndex: 8; +}; + +struct __attribute__((packed, aligned(1))) ParsedDataEventHeader: public ParsedEventHeader +{ + u8 eventIndex: 8; + u8 moduleCount: 8; +}; + +struct __attribute__((packed, aligned(1))) ParsedSystemEventHeader: public ParsedEventHeader +{ + u32 eventSize; +}; + +struct __attribute__((packed, aligned(1))) ParsedModuleHeader +{ + u16 prefixSize; + u16 suffixSize; + u32 dynamicSize; +}; + +void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, + const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) +{ + assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); + assert(eventIndex >= 0 && eventIndex <= std::numeric_limits::max()); + assert(moduleCount < std::numeric_limits::max()); + + auto &ctx = *reinterpret_cast(ctx_); + ++ctx.totalReadoutEvents; + auto msg = ctx.outputMessage; + + ParsedDataEventHeader eventHeader = + { + 0xF3u, + static_cast(crateIndex), + static_cast(eventIndex), + static_cast(moduleCount), + }; + + if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) + { + spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); + return; + } + + #if 1 + for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex) + { + auto &moduleData = moduleDataList[moduleIndex]; + + ParsedModuleHeader moduleHeader = {}; + moduleHeader.prefixSize = moduleData.prefixSize; + moduleHeader.dynamicSize = moduleData.dynamicSize; + moduleHeader.suffixSize = moduleData.suffixSize; + + if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader))) + { + spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); + return; + } + + auto prefixSpan = prefix_span(moduleData); + auto dynamicSpan = dynamic_span(moduleData); + auto suffixSpan = suffix_span(moduleData); + + for (auto &dataSpan: { prefixSpan, dynamicSpan, suffixSpan }) + { + if (int res = nng_msg_append(msg, dataSpan.data, dataSpan.size * sizeof(u32))) { - // TODO: how to handle blocking here? need some way to ensure - // nng_sendmsg() returns - // -> maybe: timeout and loop with check if the producer should quit - spdlog::error("nng_sendmsg: {}", nng_strerror(res)); - nng_msg_free(msg); + spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); return; } } } - catch(const std::exception& e) + #endif +} + +void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 size) +{ + assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); + + auto &ctx = *reinterpret_cast(ctx_); + ++ctx.totalSystemEvents; + + auto msg = ctx.outputMessage; + + ParsedSystemEventHeader eventHeader = { - spdlog::error(e.what()); + 0xFAu, + static_cast(crateIndex), + size, + }; + + if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) + { + spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res)); + return; + } + + + if (int res = nng_msg_append(msg, header, size * sizeof(u32))) + { + spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res)); return; } } +class StopWatch +{ +public: + using duration_type = std::chrono::microseconds; + + void start() + { + tStart_ = tInterval_ = std::chrono::high_resolution_clock::now(); + } + + duration_type interval() + { + auto now = std::chrono::high_resolution_clock::now(); + auto result = std::chrono::duration_cast(now - tInterval_); + tInterval_ = now; + return result; + } + + duration_type end() + { + auto now = std::chrono::high_resolution_clock::now(); + auto result = std::chrono::duration_cast(now - tStart_); + return result; + } + +private: + std::chrono::high_resolution_clock::time_point tStart_; + std::chrono::high_resolution_clock::time_point tInterval_; +}; + +void listfile_parser_nng( + nng_socket inputSocket, + nng_socket outputSocket, + const mvlc::listfile::Preamble &preamble) +{ + prctl(PR_SET_NAME,"listfile_parser_nng",0,0,0); + + size_t totalInputBytes = 0u; + u32 lastInputMessageNumber = 0u; + size_t inputBuffersLost = 0; + const auto listfileFormat = (preamble.magic == mvlc::listfile::get_filemagic_eth() + ? ConnectionType::ETH + : ConnectionType::USB); + + auto configSection = preamble.findCrateConfig(); + + if (!configSection) + { + spdlog::error("listfile_parser_nng - no CrateConfig found in listfile preamble"); + return; + } + + auto configYaml = configSection->contentsToString(); + auto crateConfig = mvlc::crate_config_from_yaml(configYaml); + + spdlog::info("listfile_parser_nng: CrateConfig:\n{}", to_yaml(crateConfig)); + + auto crateIndex = 0; + ParserNngContext parserContext; + auto parserState = mvlc::readout_parser::make_readout_parser( + crateConfig.stacks, crateIndex, &parserContext); + mvlc::readout_parser::ReadoutParserCounters parserCounters = {}; + mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = + { + parser_nng_eventdata, + parser_nng_systemevent, + }; + + nng_msg *inputMsg = nullptr; + u32 outputMessageNumber = 0u; + std::chrono::microseconds tReceive; + std::chrono::microseconds tProcess; + std::chrono::microseconds tSend; + std::chrono::microseconds tTotal; + auto tLastReport = std::chrono::steady_clock::now(); + + while (true) + { + StopWatch stopWatch; + stopWatch.start(); + if (auto res = receive_message(inputSocket, &inputMsg)) + { + if (res != NNG_ETIMEDOUT) + { + spdlog::error("listfile_parser_nng - receive_message: {}", nng_strerror(res)); + return; + } + } + else if (nng_msg_len(inputMsg) < sizeof(ListfileBufferMessageHeader)) + { + if (nng_msg_len(inputMsg) == 0) + break; + + spdlog::warn("listfile_parser_nng - incoming message too short (len={})", + nng_msg_len(inputMsg)); + } + else + { + tReceive += stopWatch.interval(); + totalInputBytes += nng_msg_len(inputMsg); + auto inputHeader = *reinterpret_cast( + nng_msg_body(inputMsg)); + auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); + inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; + lastInputMessageNumber = inputHeader.messageNumber; + spdlog::info("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); + + nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader)); + auto inputData = reinterpret_cast(nng_msg_body(inputMsg)); + size_t inputLen = nng_msg_len(inputMsg) / sizeof(u32); + + if (!parserContext.outputMessage) + { + if (auto res = nng_msg_alloc(&parserContext.outputMessage, util::Megabytes(1))) + { + spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res)); + break; + } + + auto &outputHeader = *reinterpret_cast(nng_msg_body(parserContext.outputMessage)); + outputHeader.messageType = MessageType::ParsedEvents; + outputHeader.messageNumber = ++outputMessageNumber; + nng_msg_realloc(parserContext.outputMessage, sizeof(ParsedEventsMessageHeader)); + } + + assert(parserContext.outputMessage); + + readout_parser::parse_readout_buffer( + listfileFormat, + parserState, + parserCallbacks, + parserCounters, + inputHeader.messageNumber, + inputData, + inputLen); + + tProcess += stopWatch.interval(); + + // TODO: also flush after a certain time + if (nng_msg_len(parserContext.outputMessage) >= util::Megabytes(1)) + { + const auto msgSize = nng_msg_len(parserContext.outputMessage); + + if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) + { + nng_msg_free(parserContext.outputMessage); + parserContext.outputMessage = nullptr; + spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res)); + break; + } + + parserContext.outputMessage = nullptr; + + spdlog::info("listfile_parser_nng: sent message {} of size {}", + outputMessageNumber, msgSize); + } + + tSend += stopWatch.interval(); + tTotal += stopWatch.end(); + } + + if (inputMsg) + { + nng_msg_free(inputMsg); + inputMsg = nullptr; + } + + { + auto now = std::chrono::steady_clock::now(); + auto tElapsed = now - tLastReport; + static const auto ReportInterval = std::chrono::seconds(1); + + if (tElapsed >= ReportInterval) + { + spdlog::info("listfile_parser_nng: time budget:\n" + " tReceive = {}\n" + " tProcess = {}\n" + " tSend = {}\n" + " tTotal = {}\n", + tReceive.count(), + tProcess.count(), + tSend.count(), + tTotal.count()); + tLastReport = now; + } + } + } + + if (inputMsg) + { + nng_msg_free(inputMsg); + inputMsg = nullptr; + } + + assert(!parserContext.outputMessage); + + spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", + lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1)); + spdlog::info("listfile_parser_nng: time budget:\n" + " tReceive = {} ms\n" + " tProcess = {} ms\n" + " tSend = {} ms\n" + " tTotal = {} ms\n", + tReceive.count() / 1000.0, + tProcess.count() / 1000.0, + tSend.count() / 1000.0, + tTotal.count() / 1000.0); + + // send empty message + if (auto res = nng_msg_alloc(&parserContext.outputMessage, 0)) + { + spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res)); + return; + } + + if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) + { + nng_msg_free(parserContext.outputMessage); + parserContext.outputMessage = nullptr; + spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res)); + return; + } +} + +void analysis_nng( + nng_socket inputSocket + ) +{ + prctl(PR_SET_NAME,"analysis_nng",0,0,0); + nng_msg *inputMsg = nullptr; + size_t totalInputBytes = 0u; + u32 lastInputMessageNumber = 0u; + size_t inputBuffersLost = 0; + + while (true) + { + if (auto res = receive_message(inputSocket, &inputMsg)) + { + if (res != NNG_ETIMEDOUT) + { + spdlog::error("analysis_nng - receive_message: {}", nng_strerror(res)); + return; + } + } + else if (nng_msg_len(inputMsg) < sizeof(ParsedEventsMessageHeader)) + { + if (nng_msg_len(inputMsg) == 0) + break; + + spdlog::warn("analysis_nng - incoming message too short (len={})", + nng_msg_len(inputMsg)); + } + else + { + totalInputBytes += nng_msg_len(inputMsg); + auto inputHeader = *reinterpret_cast( + nng_msg_body(inputMsg)); + auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); + inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; + lastInputMessageNumber = inputHeader.messageNumber; + spdlog::info("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); + + nng_msg_trim(inputMsg, sizeof(ParsedEventsMessageHeader)); + + //while (nng_msg_len(inputMsg)) + //{ + + //} + nng_msg_free(inputMsg); + inputMsg = nullptr; + } + } + + spdlog::info("analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", + lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1)); +} + int main(int argc, char *argv[]) { + spdlog::set_level(spdlog::level::info); + if (argc < 2) return 1; try { + auto producerSocket = make_pair_socket(); + + if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0)) + mesy_nng_fatal("nng_listen inproc", res); + + auto parserInputSocket = make_pair_socket(); + + if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0)) + mesy_nng_fatal("nng_dial inproc", res); + + auto parserOutputSocket = make_pair_socket(); + + if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0)) + mesy_nng_fatal("nng_listen inproc", res); + + auto analysisInputSocket = make_pair_socket(); + + if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0)) + mesy_nng_fatal("nng_dial inproc", res); + listfile::ZipReader zipReader; zipReader.openArchive(argv[1]); + spdlog::info("replaying from {}", zipReader.firstListfileEntryName()); auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName()); + auto preamble = mvlc::listfile::read_preamble(*readHandle); + (void) listfile::read_magic(*readHandle); + + std::vector threads; + + threads.emplace_back(std::thread(listfile_reader_producer, + producerSocket, std::ref(*readHandle), std::cref(preamble))); + + threads.emplace_back(std::thread(listfile_parser_nng, + parserInputSocket, parserOutputSocket, std::cref(preamble))); + + threads.emplace_back(std::thread(analysis_nng, + analysisInputSocket)); + + for (auto &t: threads) if (t.joinable()) t.join(); } catch(const std::exception& e) {