diff --git a/src/mvlc_nng_replay.cc b/src/mvlc_nng_replay.cc index 7d619fc..7e2f941 100644 --- a/src/mvlc_nng_replay.cc +++ b/src/mvlc_nng_replay.cc @@ -143,26 +143,28 @@ enum class BufferType: u32 MVLC_ETH, }; -enum class MessageType +enum class MessageType: u8 { ListfileBuffer, ParsedEvents, }; -struct __attribute__((packed, aligned(4))) MessageHeaderBase +#define PACKED_AND_ALIGNED __attribute__((packed, aligned(4))) + +struct PACKED_AND_ALIGNED MessageHeaderBase { MessageType messageType; u32 messageNumber; }; -struct __attribute__((packed, aligned(4))) ListfileBufferMessageHeader: public MessageHeaderBase +struct PACKED_AND_ALIGNED ListfileBufferMessageHeader: public MessageHeaderBase { u32 bufferType; }; static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0); -struct __attribute__((packed, aligned(4))) ParsedEventsMessageHeader: public MessageHeaderBase +struct PACKED_AND_ALIGNED ParsedEventsMessageHeader: public MessageHeaderBase { }; @@ -269,19 +271,22 @@ struct ReadoutParserNngContext size_t totalSystemEvents = 0u; }; -struct __attribute__((packed, aligned(4))) ParsedEventHeader +struct PACKED_AND_ALIGNED ParsedEventHeader { - u32 magicByte: 8; - u8 crateIndex: 8; + u8 magicByte; + u8 crateIndex; }; -struct __attribute__((packed, aligned(4))) ParsedDataEventHeader: public ParsedEventHeader +static const u8 ParsedDataEventMagic = 0xF3u; +static const u8 ParsedSystemEventMagic = 0xFAu; + +struct PACKED_AND_ALIGNED ParsedDataEventHeader: public ParsedEventHeader { - u8 eventIndex: 8; - u8 moduleCount: 8; + u8 eventIndex; + u8 moduleCount; }; -struct __attribute__((packed, aligned(4))) ParsedModuleHeader +struct PACKED_AND_ALIGNED ParsedModuleHeader { u16 prefixSize; u16 suffixSize; @@ -291,9 +296,12 @@ struct __attribute__((packed, aligned(4))) ParsedModuleHeader size_t totalBytes() const { return totalSize() * sizeof(u32); } }; -struct __attribute__((packed, aligned(4))) ParsedSystemEventHeader: public ParsedEventHeader +struct PACKED_AND_ALIGNED ParsedSystemEventHeader: public ParsedEventHeader { u32 eventSize; + + size_t totalSize() const { return eventSize; } + size_t totalBytes() const { return totalSize() * sizeof(u32); } }; bool parser_maybe_alloc_output(ReadoutParserNngContext &ctx) @@ -375,7 +383,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, ParsedDataEventHeader eventHeader = { - 0xF3u, + ParsedDataEventMagic, static_cast(crateIndex), static_cast(eventIndex), static_cast(moduleCount), @@ -434,7 +442,7 @@ void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 s ParsedSystemEventHeader eventHeader = { - 0xFAu, + ParsedSystemEventMagic, static_cast(crateIndex), size, }; @@ -554,7 +562,7 @@ void listfile_parser_nng( auto totalMiB = totalBytes / (1024.0*1024.0); //auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count(); auto MiBperSecond = totalMiB / totalElapsed.count() * 1000.0; - spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:} MiB, rate={:.2f} MiB/s", + spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s", outputMessageNumber, totalMiB, MiBperSecond); }; @@ -713,34 +721,57 @@ void analysis_nng( auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; lastInputMessageNumber = inputHeader.messageNumber; + if (inputHeader.messageType != MessageType::ParsedEvents) + { + spdlog::error("Received input message with unhandled type 0x{:02x}, expected type 0x{:02x}", + static_cast(inputHeader.messageType), static_cast(MessageType::ParsedEvents)); + break; + } spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); - #if 0 - while (auto msgLen = nng_msg_len(inputMsg)) + while (true) { - auto eventHeader = msg_trim_read(inputMsg); - - if (!eventHeader) + if (nng_msg_len(inputMsg) < 1) break; - if (eventHeader->magicByte != 0xF3u) - { - spdlog::error("wrong ParsedDataEventHeader magic byte"); - error = true; - break; - } + const u8 eventMagic = *reinterpret_cast(nng_msg_body(inputMsg)); - for (size_t moduleIndex = 0; moduleIndex < eventHeader->moduleCount; ++moduleIndex) + if (eventMagic == ParsedDataEventMagic) { - auto moduleHeader = msg_trim_read(inputMsg); + auto eventHeader = msg_trim_read(inputMsg); + if (!eventHeader) + break; - if (moduleHeader) + for (size_t moduleIndex=0u; moduleIndexmoduleCount; ++moduleIndex) { - nng_msg_trim(inputMsg, moduleHeader->totalBytes()); + auto moduleHeader = msg_trim_read(inputMsg); + if (!moduleHeader) + break; + + if (moduleHeader->totalBytes()) + { + const u32 *moduleData = reinterpret_cast(nng_msg_body(inputMsg)); + + //util::log_buffer(std::cout, moduleData, moduleHeader->totalSize(), fmt::format("crate={}, event={}, module={}, size={}", + // eventHeader->crateIndex, eventHeader->eventIndex, moduleIndex, moduleHeader->totalSize())); + + if (nng_msg_trim(inputMsg, moduleHeader->totalBytes())) + break; + } } } + else if (eventMagic == ParsedSystemEventMagic) + { + auto eventHeader = msg_trim_read(inputMsg); + if (!eventHeader) + break; + if (nng_msg_trim(inputMsg, eventHeader->totalBytes())) + break; + } } - #endif + + assert(nng_msg_len(inputMsg) == 0); + nng_msg_free(inputMsg); inputMsg = nullptr; } @@ -752,7 +783,7 @@ void analysis_nng( int main(int argc, char *argv[]) { - spdlog::set_level(spdlog::level::debug); + spdlog::set_level(spdlog::level::info); if (argc < 2) return 1;