fix analysis side message processing

This commit is contained in:
Florian Lüke 2023-07-13 09:38:14 +02:00
parent 0bf9947a29
commit 31df88ff23

View file

@ -143,26 +143,28 @@ enum class BufferType: u32
MVLC_ETH, MVLC_ETH,
}; };
enum class MessageType enum class MessageType: u8
{ {
ListfileBuffer, ListfileBuffer,
ParsedEvents, ParsedEvents,
}; };
struct __attribute__((packed, aligned(4))) MessageHeaderBase #define PACKED_AND_ALIGNED __attribute__((packed, aligned(4)))
struct PACKED_AND_ALIGNED MessageHeaderBase
{ {
MessageType messageType; MessageType messageType;
u32 messageNumber; u32 messageNumber;
}; };
struct __attribute__((packed, aligned(4))) ListfileBufferMessageHeader: public MessageHeaderBase struct PACKED_AND_ALIGNED ListfileBufferMessageHeader: public MessageHeaderBase
{ {
u32 bufferType; u32 bufferType;
}; };
static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0); static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0);
struct __attribute__((packed, aligned(4))) ParsedEventsMessageHeader: public MessageHeaderBase struct PACKED_AND_ALIGNED ParsedEventsMessageHeader: public MessageHeaderBase
{ {
}; };
@ -269,19 +271,22 @@ struct ReadoutParserNngContext
size_t totalSystemEvents = 0u; size_t totalSystemEvents = 0u;
}; };
struct __attribute__((packed, aligned(4))) ParsedEventHeader struct PACKED_AND_ALIGNED ParsedEventHeader
{ {
u32 magicByte: 8; u8 magicByte;
u8 crateIndex: 8; u8 crateIndex;
}; };
struct __attribute__((packed, aligned(4))) ParsedDataEventHeader: public ParsedEventHeader static const u8 ParsedDataEventMagic = 0xF3u;
static const u8 ParsedSystemEventMagic = 0xFAu;
struct PACKED_AND_ALIGNED ParsedDataEventHeader: public ParsedEventHeader
{ {
u8 eventIndex: 8; u8 eventIndex;
u8 moduleCount: 8; u8 moduleCount;
}; };
struct __attribute__((packed, aligned(4))) ParsedModuleHeader struct PACKED_AND_ALIGNED ParsedModuleHeader
{ {
u16 prefixSize; u16 prefixSize;
u16 suffixSize; u16 suffixSize;
@ -291,9 +296,12 @@ struct __attribute__((packed, aligned(4))) ParsedModuleHeader
size_t totalBytes() const { return totalSize() * sizeof(u32); } size_t totalBytes() const { return totalSize() * sizeof(u32); }
}; };
struct __attribute__((packed, aligned(4))) ParsedSystemEventHeader: public ParsedEventHeader struct PACKED_AND_ALIGNED ParsedSystemEventHeader: public ParsedEventHeader
{ {
u32 eventSize; u32 eventSize;
size_t totalSize() const { return eventSize; }
size_t totalBytes() const { return totalSize() * sizeof(u32); }
}; };
bool parser_maybe_alloc_output(ReadoutParserNngContext &ctx) bool parser_maybe_alloc_output(ReadoutParserNngContext &ctx)
@ -375,7 +383,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
ParsedDataEventHeader eventHeader = ParsedDataEventHeader eventHeader =
{ {
0xF3u, ParsedDataEventMagic,
static_cast<u8>(crateIndex), static_cast<u8>(crateIndex),
static_cast<u8>(eventIndex), static_cast<u8>(eventIndex),
static_cast<u8>(moduleCount), static_cast<u8>(moduleCount),
@ -434,7 +442,7 @@ void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 s
ParsedSystemEventHeader eventHeader = ParsedSystemEventHeader eventHeader =
{ {
0xFAu, ParsedSystemEventMagic,
static_cast<u8>(crateIndex), static_cast<u8>(crateIndex),
size, size,
}; };
@ -554,7 +562,7 @@ void listfile_parser_nng(
auto totalMiB = totalBytes / (1024.0*1024.0); auto totalMiB = totalBytes / (1024.0*1024.0);
//auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count(); //auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count();
auto MiBperSecond = totalMiB / totalElapsed.count() * 1000.0; auto MiBperSecond = totalMiB / totalElapsed.count() * 1000.0;
spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:} MiB, rate={:.2f} MiB/s", spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s",
outputMessageNumber, totalMiB, MiBperSecond); outputMessageNumber, totalMiB, MiBperSecond);
}; };
@ -713,34 +721,57 @@ void analysis_nng(
auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber);
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;;
lastInputMessageNumber = inputHeader.messageNumber; lastInputMessageNumber = inputHeader.messageNumber;
if (inputHeader.messageType != MessageType::ParsedEvents)
{
spdlog::error("Received input message with unhandled type 0x{:02x}, expected type 0x{:02x}",
static_cast<u8>(inputHeader.messageType), static_cast<u8>(MessageType::ParsedEvents));
break;
}
spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg));
#if 0 while (true)
while (auto msgLen = nng_msg_len(inputMsg))
{ {
auto eventHeader = msg_trim_read<ParsedDataEventHeader>(inputMsg); if (nng_msg_len(inputMsg) < 1)
if (!eventHeader)
break; break;
if (eventHeader->magicByte != 0xF3u) const u8 eventMagic = *reinterpret_cast<u8 *>(nng_msg_body(inputMsg));
{
spdlog::error("wrong ParsedDataEventHeader magic byte");
error = true;
break;
}
for (size_t moduleIndex = 0; moduleIndex < eventHeader->moduleCount; ++moduleIndex) if (eventMagic == ParsedDataEventMagic)
{ {
auto moduleHeader = msg_trim_read<ParsedModuleHeader>(inputMsg); auto eventHeader = msg_trim_read<ParsedDataEventHeader>(inputMsg);
if (!eventHeader)
break;
if (moduleHeader) for (size_t moduleIndex=0u; moduleIndex<eventHeader->moduleCount; ++moduleIndex)
{ {
nng_msg_trim(inputMsg, moduleHeader->totalBytes()); auto moduleHeader = msg_trim_read<ParsedModuleHeader>(inputMsg);
if (!moduleHeader)
break;
if (moduleHeader->totalBytes())
{
const u32 *moduleData = reinterpret_cast<const u32 *>(nng_msg_body(inputMsg));
//util::log_buffer(std::cout, moduleData, moduleHeader->totalSize(), fmt::format("crate={}, event={}, module={}, size={}",
// eventHeader->crateIndex, eventHeader->eventIndex, moduleIndex, moduleHeader->totalSize()));
if (nng_msg_trim(inputMsg, moduleHeader->totalBytes()))
break;
}
} }
} }
else if (eventMagic == ParsedSystemEventMagic)
{
auto eventHeader = msg_trim_read<ParsedSystemEventHeader>(inputMsg);
if (!eventHeader)
break;
if (nng_msg_trim(inputMsg, eventHeader->totalBytes()))
break;
}
} }
#endif
assert(nng_msg_len(inputMsg) == 0);
nng_msg_free(inputMsg); nng_msg_free(inputMsg);
inputMsg = nullptr; inputMsg = nullptr;
} }
@ -752,7 +783,7 @@ void analysis_nng(
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
spdlog::set_level(spdlog::level::debug); spdlog::set_level(spdlog::level::info);
if (argc < 2) if (argc < 2)
return 1; return 1;