diff --git a/src/mvlc_nng_replay.cc b/src/mvlc_nng_replay.cc index 71ad1e4..cfbf3e1 100644 --- a/src/mvlc_nng_replay.cc +++ b/src/mvlc_nng_replay.cc @@ -270,61 +270,57 @@ struct __attribute__((packed, aligned(1))) ParsedModuleHeader u32 dynamicSize; }; -void parser_nng_eventdata_v1(void *ctx_, int crateIndex, int eventIndex, - const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) +static const size_t OutputMessageReserved = util::Megabytes(1); + +bool allocate_output_message(ParserNngContext &ctx, const char *debugInfo = "") { - assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); - assert(eventIndex >= 0 && eventIndex <= std::numeric_limits::max()); - assert(moduleCount < std::numeric_limits::max()); + auto &msg = ctx.outputMessage; - auto &ctx = *reinterpret_cast(ctx_); - ++ctx.totalReadoutEvents; - auto msg = ctx.outputMessage; + if (msg) + return false; - ParsedDataEventHeader eventHeader = + if (auto res = nng_msg_alloc(&msg, 0)) { - 0xF3u, - static_cast(crateIndex), - static_cast(eventIndex), - static_cast(moduleCount), + spdlog::error("{} - nng_msg_alloc: {}", debugInfo, nng_strerror(res)); + return false; + } + + if (auto res = nng_msg_reserve(msg, OutputMessageReserved)) + { + spdlog::error("{} - nng_msg_reserve: {}", debugInfo, nng_strerror(res)); + return false; + } + + ParsedEventsMessageHeader header = + { + MessageType::ParsedEvents, + ++ctx.outputMessageNumber, }; - if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) + nng_msg_append(msg, &header, sizeof(header)); + assert(nng_msg_len(msg) == sizeof(header)); + + return true; +} + +bool flush_output_message(ParserNngContext &ctx, const char *debugInfo = "") +{ + const auto msgSize = nng_msg_len(ctx.outputMessage); + + if (auto res = send_message_retry(ctx.outputSocket, ctx.outputMessage, 0, debugInfo)) { - spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); - return; + nng_msg_free(ctx.outputMessage); + ctx.outputMessage = nullptr; + spdlog::error("{}: send_message_retry: {}:", debugInfo, nng_strerror(res)); + return false; } - #if 1 - for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex) - { - auto &moduleData = moduleDataList[moduleIndex]; + ctx.outputMessage = nullptr; - ParsedModuleHeader moduleHeader = {}; - moduleHeader.prefixSize = moduleData.prefixSize; - moduleHeader.dynamicSize = moduleData.dynamicSize; - moduleHeader.suffixSize = moduleData.suffixSize; + spdlog::debug("{}: sent message {} of size {}", + debugInfo, ctx.outputMessageNumber, msgSize); - if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader))) - { - spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); - return; - } - - auto prefixSpan = prefix_span(moduleData); - auto dynamicSpan = dynamic_span(moduleData); - auto suffixSpan = suffix_span(moduleData); - - for (auto &dataSpan: { prefixSpan, dynamicSpan, suffixSpan }) - { - if (int res = nng_msg_append(msg, dataSpan.data, dataSpan.size * sizeof(u32))) - { - spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); - return; - } - } - } - #endif + return true; } void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, @@ -334,31 +330,10 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, assert(eventIndex >= 0 && eventIndex <= std::numeric_limits::max()); assert(moduleCount < std::numeric_limits::max()); - static const size_t OutputMessageReserved= util::Megabytes(1); - auto &parserContext = *reinterpret_cast(ctx_); - ++parserContext.totalReadoutEvents; - auto &msg = parserContext.outputMessage; - - auto flush_output_message = [&] - { - const auto msgSize = nng_msg_len(parserContext.outputMessage); - - if (auto res = send_message_retry(parserContext.outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) - { - nng_msg_free(parserContext.outputMessage); - parserContext.outputMessage = nullptr; - spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res)); - return false; - } - - parserContext.outputMessage = nullptr; - - spdlog::debug("listfile_parser_nng: sent message {} of size {}", - parserContext.outputMessageNumber, msgSize); - - return true; - }; + auto &ctx = *reinterpret_cast(ctx_); + ++ctx.totalReadoutEvents; + auto &msg = ctx.outputMessage; size_t requiredBytes = sizeof(ParsedDataEventHeader); @@ -373,33 +348,12 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, if (msg && bytesFree < requiredBytes) { - if (!flush_output_message()) + if (!flush_output_message(ctx, "parser_nng_eventdata_v2")) return; } - if (!msg) - { - if (auto res = nng_msg_alloc(&msg, 0)) - { - spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res)); - return; - } - - if (auto res = nng_msg_reserve(msg, OutputMessageReserved)) - { - spdlog::error("listfile_parser_nng - nng_msg_reserve: {}", nng_strerror(res)); - return; - } - - ParsedEventsMessageHeader header = - { - MessageType::ParsedEvents, - ++parserContext.outputMessageNumber, - }; - - nng_msg_append(msg, &header, sizeof(header)); - assert(nng_msg_len(msg) == sizeof(header)); - } + if (!msg && !allocate_output_message(ctx, "parser_nng_eventdata_v2")) + return; bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; assert(bytesFree >= requiredBytes); @@ -414,11 +368,10 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) { - spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); + spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); return; } - #if 1 for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex) { auto &moduleData = moduleDataList[moduleIndex]; @@ -430,27 +383,39 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader))) { - spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); + spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); return; } if (int res = nng_msg_append(msg, moduleData.data.data, moduleData.data.size * sizeof(u32))) { - spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res)); + spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); return; } } - #endif } -void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 size) +void parser_nng_systemevent_v2(void *ctx_, int crateIndex, const u32 *header, u32 size) { assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); - auto &ctx = *reinterpret_cast(ctx_); ++ctx.totalSystemEvents; + auto &msg = ctx.outputMessage; - auto msg = ctx.outputMessage; + size_t requiredBytes = sizeof(ParsedSystemEventHeader) + size * sizeof(u32); + size_t bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; + + if (msg && bytesFree < requiredBytes) + { + if (!flush_output_message(ctx, "parser_nng_systemevent_v2")) + return; + } + + if (!msg && !allocate_output_message(ctx, "parser_nng_systemevent_v2")) + return; + + bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; + assert(bytesFree >= requiredBytes); ParsedSystemEventHeader eventHeader = { @@ -465,7 +430,6 @@ void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 s return; } - if (int res = nng_msg_append(msg, header, size * sizeof(u32))) { spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res)); @@ -543,7 +507,7 @@ void listfile_parser_nng( mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = { parser_nng_eventdata_v2, - parser_nng_systemevent, + parser_nng_systemevent_v2, }; nng_msg *inputMsg = nullptr; @@ -614,22 +578,6 @@ void listfile_parser_nng( auto inputData = reinterpret_cast(nng_msg_body(inputMsg)); size_t inputLen = nng_msg_len(inputMsg) / sizeof(u32); - if (!parserContext.outputMessage) - { - if (auto res = nng_msg_alloc(&parserContext.outputMessage, nng_msg_len(inputMsg) * 1.5)) - { - spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res)); - break; - } - - auto &outputHeader = *reinterpret_cast(nng_msg_body(parserContext.outputMessage)); - outputHeader.messageType = MessageType::ParsedEvents; - outputHeader.messageNumber = ++outputMessageNumber; - nng_msg_realloc(parserContext.outputMessage, sizeof(ParsedEventsMessageHeader)); - } - - assert(parserContext.outputMessage); - readout_parser::parse_readout_buffer( listfileFormat, parserState, @@ -642,27 +590,6 @@ void listfile_parser_nng( tProcess += stopWatch.interval(); // TODO: also flush after a certain time - #if 0 - if (nng_msg_len(parserContext.outputMessage) >= util::Megabytes(1)) - { - const auto msgSize = nng_msg_len(parserContext.outputMessage); - - if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) - { - nng_msg_free(parserContext.outputMessage); - parserContext.outputMessage = nullptr; - spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res)); - break; - } - - parserContext.outputMessage = nullptr; - - spdlog::debug("listfile_parser_nng: sent message {} of size {}", - outputMessageNumber, msgSize); - } - - tSend += stopWatch.interval(); - #endif tTotal += stopWatch.end(); } @@ -686,21 +613,7 @@ void listfile_parser_nng( } if (parserContext.outputMessage) - { - const auto msgSize = nng_msg_len(parserContext.outputMessage); - - if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) - { - nng_msg_free(parserContext.outputMessage); - parserContext.outputMessage = nullptr; - spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res)); - } - - parserContext.outputMessage = nullptr; - - spdlog::debug("listfile_parser_nng: sent message {} of size {}", - outputMessageNumber, msgSize); - } + flush_output_message(parserContext, "listfile_parser_nng_v2"); if (inputMsg) { @@ -844,4 +757,4 @@ int main(int argc, char *argv[]) } return 0; -} \ No newline at end of file +}