From 0bf9947a2988f3e5dd5d8c6b058e322958868d5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Wed, 12 Jul 2023 20:41:09 +0200 Subject: [PATCH] better allocations but broken analysis messages --- src/mvlc_nng_replay.cc | 207 ++++++++++++++++++++++++++--------------- 1 file changed, 130 insertions(+), 77 deletions(-) diff --git a/src/mvlc_nng_replay.cc b/src/mvlc_nng_replay.cc index cfbf3e1..7d619fc 100644 --- a/src/mvlc_nng_replay.cc +++ b/src/mvlc_nng_replay.cc @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -115,6 +116,27 @@ inline size_t fixup_buffer_mvlc_eth(const u8 *buf, size_t bufUsed, std::vector &tmpBuf) +size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, std::vector &tmpBuf) { size_t bytesMoved = 0u; - const u8 *msgBufferData = reinterpret_cast(nng_msg_body(msg)) + sizeof(ListfileBufferMessageHeader); + const u8 *msgBufferData = reinterpret_cast(nng_msg_body(msg)) + + sizeof(ListfileBufferMessageHeader); const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader); if (bufferType == BufferType::MVLC_USB) @@ -158,21 +181,19 @@ void fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, s bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf); nng_msg_chop(msg, bytesMoved); + + return bytesMoved; } void listfile_reader_producer( nng_socket socket, mvlc::listfile::ReadHandle &input, - const mvlc::listfile::Preamble &preamble) + const BufferType &bufferType) { prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0); try { - auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth() - ? BufferType::MVLC_ETH - : BufferType::MVLC_USB); - u32 bufferNumber = 0u; size_t totalBytesSent = 0u; std::vector previousData; @@ -181,23 +202,25 @@ void listfile_reader_producer( while (!done) { nng_msg *msg = {}; - if (int res = nng_msg_alloc(&msg, util::Megabytes(1))) - { - spdlog::error("nng_msg_alloc: {}", nng_strerror(res)); - return; - } - auto msgHeader = reinterpret_cast(nng_msg_body(msg)); - msgHeader->messageType = MessageType::ListfileBuffer; - msgHeader->messageNumber = ++bufferNumber; - msgHeader->bufferType = static_cast(bufferType); - nng_msg_realloc(msg, sizeof(ListfileBufferMessageHeader)); + if (allocate_reserve_message(&msg, DefaultOutputMessageReserve)) + return; + + ListfileBufferMessageHeader header + { + MessageType::ListfileBuffer, + ++bufferNumber, + static_cast(bufferType), + }; + + nng_msg_append(msg, &header, sizeof(header)); + assert(nng_msg_len(msg) == sizeof(header)); nng_msg_append(msg, previousData.data(), previousData.size()); previousData.clear(); size_t msgUsed = nng_msg_len(msg); - nng_msg_realloc(msg, util::Megabytes(1)); + nng_msg_realloc(msg, DefaultOutputMessageReserve); size_t bytesRead = input.read( reinterpret_cast(nng_msg_body(msg)) + msgUsed, @@ -237,7 +260,7 @@ void listfile_reader_producer( } -struct ParserNngContext +struct ReadoutParserNngContext { nng_msg *outputMessage = nullptr; u32 outputMessageNumber = 0u; @@ -246,50 +269,42 @@ struct ParserNngContext size_t totalSystemEvents = 0u; }; -struct __attribute__((packed, aligned(1))) ParsedEventHeader +struct __attribute__((packed, aligned(4))) ParsedEventHeader { u32 magicByte: 8; u8 crateIndex: 8; }; -struct __attribute__((packed, aligned(1))) ParsedDataEventHeader: public ParsedEventHeader +struct __attribute__((packed, aligned(4))) ParsedDataEventHeader: public ParsedEventHeader { u8 eventIndex: 8; u8 moduleCount: 8; }; -struct __attribute__((packed, aligned(1))) ParsedSystemEventHeader: public ParsedEventHeader -{ - u32 eventSize; -}; - -struct __attribute__((packed, aligned(1))) ParsedModuleHeader +struct __attribute__((packed, aligned(4))) ParsedModuleHeader { u16 prefixSize; u16 suffixSize; u32 dynamicSize; + + size_t totalSize() const { return prefixSize + suffixSize + dynamicSize; } + size_t totalBytes() const { return totalSize() * sizeof(u32); } }; -static const size_t OutputMessageReserved = util::Megabytes(1); +struct __attribute__((packed, aligned(4))) ParsedSystemEventHeader: public ParsedEventHeader +{ + u32 eventSize; +}; -bool allocate_output_message(ParserNngContext &ctx, const char *debugInfo = "") +bool parser_maybe_alloc_output(ReadoutParserNngContext &ctx) { auto &msg = ctx.outputMessage; if (msg) return false; - if (auto res = nng_msg_alloc(&msg, 0)) - { - spdlog::error("{} - nng_msg_alloc: {}", debugInfo, nng_strerror(res)); + if (allocate_reserve_message(&msg, DefaultOutputMessageReserve)) return false; - } - - if (auto res = nng_msg_reserve(msg, OutputMessageReserved)) - { - spdlog::error("{} - nng_msg_reserve: {}", debugInfo, nng_strerror(res)); - return false; - } ParsedEventsMessageHeader header = { @@ -303,7 +318,7 @@ bool allocate_output_message(ParserNngContext &ctx, const char *debugInfo = "") return true; } -bool flush_output_message(ParserNngContext &ctx, const char *debugInfo = "") +bool flush_output_message(ReadoutParserNngContext &ctx, const char *debugInfo = "") { const auto msgSize = nng_msg_len(ctx.outputMessage); @@ -323,7 +338,7 @@ bool flush_output_message(ParserNngContext &ctx, const char *debugInfo = "") return true; } -void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, +void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) { assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); @@ -331,7 +346,7 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, assert(moduleCount < std::numeric_limits::max()); - auto &ctx = *reinterpret_cast(ctx_); + auto &ctx = *reinterpret_cast(ctx_); ++ctx.totalReadoutEvents; auto &msg = ctx.outputMessage; @@ -344,18 +359,18 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, requiredBytes += moduleData.data.size * sizeof(u32); } - size_t bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; + size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; if (msg && bytesFree < requiredBytes) { - if (!flush_output_message(ctx, "parser_nng_eventdata_v2")) + if (!flush_output_message(ctx, "parser_nng_eventdata")) return; } - if (!msg && !allocate_output_message(ctx, "parser_nng_eventdata_v2")) + if (!msg && !parser_maybe_alloc_output(ctx)) return; - bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; + bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; assert(bytesFree >= requiredBytes); ParsedDataEventHeader eventHeader = @@ -368,7 +383,7 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) { - spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); + spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); return; } @@ -383,38 +398,38 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader))) { - spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); + spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); return; } if (int res = nng_msg_append(msg, moduleData.data.data, moduleData.data.size * sizeof(u32))) { - spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); + spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); return; } } } -void parser_nng_systemevent_v2(void *ctx_, int crateIndex, const u32 *header, u32 size) +void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 size) { assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); - auto &ctx = *reinterpret_cast(ctx_); + auto &ctx = *reinterpret_cast(ctx_); ++ctx.totalSystemEvents; auto &msg = ctx.outputMessage; size_t requiredBytes = sizeof(ParsedSystemEventHeader) + size * sizeof(u32); - size_t bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; + size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; if (msg && bytesFree < requiredBytes) { - if (!flush_output_message(ctx, "parser_nng_systemevent_v2")) + if (!flush_output_message(ctx, "parser_nng_systemevent")) return; } - if (!msg && !allocate_output_message(ctx, "parser_nng_systemevent_v2")) + if (!msg && !parser_maybe_alloc_output(ctx)) return; - bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; + bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; assert(bytesFree >= requiredBytes); ParsedSystemEventHeader eventHeader = @@ -499,15 +514,15 @@ void listfile_parser_nng( spdlog::info("listfile_parser_nng: readout stacks:\n{}", stacksYaml); auto crateIndex = 0; - ParserNngContext parserContext; + ReadoutParserNngContext parserContext; parserContext.outputSocket = outputSocket; auto parserState = mvlc::readout_parser::make_readout_parser( crateConfig.stacks, crateIndex, &parserContext); mvlc::readout_parser::ReadoutParserCounters parserCounters = {}; mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = { - parser_nng_eventdata_v2, - parser_nng_systemevent_v2, + parser_nng_eventdata, + parser_nng_systemevent, }; nng_msg *inputMsg = nullptr; @@ -533,13 +548,13 @@ void listfile_parser_nng( tSend.count() / 1000.0, tTotal.count() / 1000.0); - auto totalElapsed = std::chrono::duration_cast( + auto totalElapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - tStart); auto totalBytes = parserCounters.bytesProcessed; auto totalMiB = totalBytes / (1024.0*1024.0); //auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count(); - auto MiBperSecond = totalMiB / totalElapsed.count(); - spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s", + auto MiBperSecond = totalMiB / totalElapsed.count() * 1000.0; + spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:} MiB, rate={:.2f} MiB/s", outputMessageNumber, totalMiB, MiBperSecond); }; @@ -554,6 +569,7 @@ void listfile_parser_nng( spdlog::error("listfile_parser_nng - receive_message: {}", nng_strerror(res)); return; } + spdlog::trace("listfile_parser_nng - receive_message: timeout"); } else if (nng_msg_len(inputMsg) < sizeof(ListfileBufferMessageHeader)) { @@ -572,7 +588,7 @@ void listfile_parser_nng( auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; lastInputMessageNumber = inputHeader.messageNumber; - spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); + spdlog::debug("listfile_parser_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader)); auto inputData = reinterpret_cast(nng_msg_body(inputMsg)); @@ -613,7 +629,7 @@ void listfile_parser_nng( } if (parserContext.outputMessage) - flush_output_message(parserContext, "listfile_parser_nng_v2"); + flush_output_message(parserContext, "listfile_parser_nng"); if (inputMsg) { @@ -645,7 +661,20 @@ void listfile_parser_nng( readout_parser::print_counters(ss, parserCounters); spdlog::info("listfile_parser_nng: parser counters:\n{}", ss.str()); } +} +template +std::optional msg_trim_read(nng_msg *msg) +{ + const auto oldlen = nng_msg_len(msg); + if (nng_msg_len(msg) < sizeof(T)) + return {}; + + T result = *reinterpret_cast(nng_msg_body(msg)); + nng_msg_trim(msg, sizeof(T)); + const auto newlen = nng_msg_len(msg); + assert(newlen + sizeof(T) == oldlen); + return result; } void analysis_nng( @@ -657,8 +686,9 @@ void analysis_nng( size_t totalInputBytes = 0u; u32 lastInputMessageNumber = 0u; size_t inputBuffersLost = 0; + bool error = false; - while (true) + while (!error) { if (auto res = receive_message(inputSocket, &inputMsg)) { @@ -679,19 +709,38 @@ void analysis_nng( else { totalInputBytes += nng_msg_len(inputMsg); - auto inputHeader = *reinterpret_cast( - nng_msg_body(inputMsg)); + auto inputHeader = msg_trim_read(inputMsg).value(); auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; lastInputMessageNumber = inputHeader.messageNumber; spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); - nng_msg_trim(inputMsg, sizeof(ParsedEventsMessageHeader)); + #if 0 + while (auto msgLen = nng_msg_len(inputMsg)) + { + auto eventHeader = msg_trim_read(inputMsg); - //while (nng_msg_len(inputMsg)) - //{ + if (!eventHeader) + break; - //} + if (eventHeader->magicByte != 0xF3u) + { + spdlog::error("wrong ParsedDataEventHeader magic byte"); + error = true; + break; + } + + for (size_t moduleIndex = 0; moduleIndex < eventHeader->moduleCount; ++moduleIndex) + { + auto moduleHeader = msg_trim_read(inputMsg); + + if (moduleHeader) + { + nng_msg_trim(inputMsg, moduleHeader->totalBytes()); + } + } + } + #endif nng_msg_free(inputMsg); inputMsg = nullptr; } @@ -703,16 +752,16 @@ void analysis_nng( int main(int argc, char *argv[]) { - spdlog::set_level(spdlog::level::info); + spdlog::set_level(spdlog::level::debug); if (argc < 2) return 1; try { - auto producerSocket = make_pair_socket(); + auto readerOutputSocket = make_pair_socket(); - if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0)) + if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0)) mesy_nng_fatal("nng_listen inproc", res); auto parserInputSocket = make_pair_socket(); @@ -735,12 +784,16 @@ int main(int argc, char *argv[]) spdlog::info("replaying from {}", zipReader.firstListfileEntryName()); auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName()); auto preamble = mvlc::listfile::read_preamble(*readHandle); + const auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth() + ? BufferType::MVLC_ETH + : BufferType::MVLC_USB); + // Read past the magic bytes at the start of each listfile. (void) listfile::read_magic(*readHandle); std::vector threads; threads.emplace_back(std::thread(listfile_reader_producer, - producerSocket, std::ref(*readHandle), std::cref(preamble))); + readerOutputSocket, std::ref(*readHandle), std::cref(bufferType))); threads.emplace_back(std::thread(listfile_parser_nng, parserInputSocket, parserOutputSocket, std::cref(preamble)));