From 72cd10ba466b0b820cfd1fa6d82059c4159de3cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Tue, 11 Jul 2023 21:42:02 +0200 Subject: [PATCH] improve parser speed by flushing the output message before a realloc happens --- src/mvlc_nng_replay.cc | 137 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 129 insertions(+), 8 deletions(-) diff --git a/src/mvlc_nng_replay.cc b/src/mvlc_nng_replay.cc index c5aa5c8..71ad1e4 100644 --- a/src/mvlc_nng_replay.cc +++ b/src/mvlc_nng_replay.cc @@ -240,6 +240,8 @@ void listfile_reader_producer( struct ParserNngContext { nng_msg *outputMessage = nullptr; + u32 outputMessageNumber = 0u; + nng_socket outputSocket = NNG_SOCKET_INITIALIZER; size_t totalReadoutEvents = 0u; size_t totalSystemEvents = 0u; }; @@ -268,7 +270,7 @@ struct __attribute__((packed, aligned(1))) ParsedModuleHeader u32 dynamicSize; }; -void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, +void parser_nng_eventdata_v1(void *ctx_, int crateIndex, int eventIndex, const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) { assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); @@ -289,7 +291,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) { - spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); + spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); return; } @@ -305,7 +307,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader))) { - spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); + spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); return; } @@ -317,7 +319,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, { if (int res = nng_msg_append(msg, dataSpan.data, dataSpan.size * sizeof(u32))) { - spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); + spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); return; } } @@ -325,6 +327,122 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, #endif } +void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, + const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) +{ + assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); + assert(eventIndex >= 0 && eventIndex <= std::numeric_limits::max()); + assert(moduleCount < std::numeric_limits::max()); + + static const size_t OutputMessageReserved= util::Megabytes(1); + + auto &parserContext = *reinterpret_cast(ctx_); + ++parserContext.totalReadoutEvents; + auto &msg = parserContext.outputMessage; + + auto flush_output_message = [&] + { + const auto msgSize = nng_msg_len(parserContext.outputMessage); + + if (auto res = send_message_retry(parserContext.outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) + { + nng_msg_free(parserContext.outputMessage); + parserContext.outputMessage = nullptr; + spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res)); + return false; + } + + parserContext.outputMessage = nullptr; + + spdlog::debug("listfile_parser_nng: sent message {} of size {}", + parserContext.outputMessageNumber, msgSize); + + return true; + }; + + size_t requiredBytes = sizeof(ParsedDataEventHeader); + + for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex) + { + auto &moduleData = moduleDataList[moduleIndex]; + requiredBytes += sizeof(ParsedModuleHeader); + requiredBytes += moduleData.data.size * sizeof(u32); + } + + size_t bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; + + if (msg && bytesFree < requiredBytes) + { + if (!flush_output_message()) + return; + } + + if (!msg) + { + if (auto res = nng_msg_alloc(&msg, 0)) + { + spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res)); + return; + } + + if (auto res = nng_msg_reserve(msg, OutputMessageReserved)) + { + spdlog::error("listfile_parser_nng - nng_msg_reserve: {}", nng_strerror(res)); + return; + } + + ParsedEventsMessageHeader header = + { + MessageType::ParsedEvents, + ++parserContext.outputMessageNumber, + }; + + nng_msg_append(msg, &header, sizeof(header)); + assert(nng_msg_len(msg) == sizeof(header)); + } + + bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; + assert(bytesFree >= requiredBytes); + + ParsedDataEventHeader eventHeader = + { + 0xF3u, + static_cast(crateIndex), + static_cast(eventIndex), + static_cast(moduleCount), + }; + + if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) + { + spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); + return; + } + + #if 1 + for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex) + { + auto &moduleData = moduleDataList[moduleIndex]; + + ParsedModuleHeader moduleHeader = {}; + moduleHeader.prefixSize = moduleData.prefixSize; + moduleHeader.dynamicSize = moduleData.dynamicSize; + moduleHeader.suffixSize = moduleData.suffixSize; + + if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader))) + { + spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); + return; + } + + if (int res = nng_msg_append(msg, moduleData.data.data, moduleData.data.size * sizeof(u32))) + { + spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); + return; + } + } + #endif +} + void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 size) { assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); @@ -418,17 +536,18 @@ void listfile_parser_nng( auto crateIndex = 0; ParserNngContext parserContext; + parserContext.outputSocket = outputSocket; auto parserState = mvlc::readout_parser::make_readout_parser( crateConfig.stacks, crateIndex, &parserContext); mvlc::readout_parser::ReadoutParserCounters parserCounters = {}; mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = { - parser_nng_eventdata, + parser_nng_eventdata_v2, parser_nng_systemevent, }; nng_msg *inputMsg = nullptr; - u32 outputMessageNumber = 0u; + u32 &outputMessageNumber = parserContext.outputMessageNumber; std::chrono::microseconds tReceive(0); std::chrono::microseconds tProcess(0); std::chrono::microseconds tSend(0); @@ -456,8 +575,8 @@ void listfile_parser_nng( auto totalMiB = totalBytes / (1024.0*1024.0); //auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count(); auto MiBperSecond = totalMiB / totalElapsed.count(); - spdlog::info("listfile_parser_nng: bytesProcessed={:.2f}, rate={:.2f} MiB/s", - totalMiB, MiBperSecond); + spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s", + outputMessageNumber, totalMiB, MiBperSecond); }; while (true) @@ -523,6 +642,7 @@ void listfile_parser_nng( tProcess += stopWatch.interval(); // TODO: also flush after a certain time + #if 0 if (nng_msg_len(parserContext.outputMessage) >= util::Megabytes(1)) { const auto msgSize = nng_msg_len(parserContext.outputMessage); @@ -542,6 +662,7 @@ void listfile_parser_nng( } tSend += stopWatch.interval(); + #endif tTotal += stopWatch.end(); }