#include #include #include #include #include "common.h" using namespace mesytec; using namespace mesytec::mvlc; int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 0, const char *debugInfo = "") { int res = 0; size_t attempt = 0u; do { res = nng_sendmsg(socket, msg, 0); if (res && res == NNG_ETIMEDOUT) spdlog::warn("send_message_retry: {} - send timeout", debugInfo); if (res && res != NNG_ETIMEDOUT) return res; if (res && maxTries > 0 && attempt >= maxTries) return res; ++attempt; } while (res == NNG_ETIMEDOUT); return 0; } // Follows the framing structure inside the buffer until an incomplete frame // which doesn't fit into the buffer is detected. The incomplete data is moved // over to the tempBuffer so that the readBuffer ends with a complete frame. // // The input buffer must start with a frame header (skip_count will be called // with the first word of the input buffer on the first iteration). // // The SkipCountFunc must return the number of words to skip to get to the next // frame header or 0 if there is not enough data left in the input iterator to // determine the frames size. // Signature of SkipCountFunc: u32 skip_count(const basic_string_view &view); // Returns the number of trailing bytes copied from msgBuf into tmpBuf. template inline size_t fixup_buffer( const u8 *msgBuf, size_t msgUsed, std::vector &tmpBuf, SkipCountFunc skip_count) { auto view = basic_string_view(msgBuf, msgUsed); while (!view.empty()) { if (view.size() >= sizeof(u32)) { u32 wordsToSkip = skip_count(view); //cout << "wordsToSkip=" << wordsToSkip << ", view.size()=" << view.size() << ", in words:" << view.size() / sizeof(u32)); if (wordsToSkip == 0 || wordsToSkip > view.size() / sizeof(u32)) { tmpBuf.reserve(tmpBuf.size() + view.size()); std::copy(std::begin(view), std::end(view), std::back_inserter(tmpBuf)); return view.size(); } // Skip over the SystemEvent frame or the ETH packet data. view.remove_prefix(wordsToSkip * sizeof(u32)); } } return 0u; } inline size_t fixup_buffer_mvlc_usb(const u8 *buf, size_t bufUsed, std::vector &tmpBuf) { auto skip_func = [] (const basic_string_view &view) -> u32 { if (view.size() < sizeof(u32)) return 0u; u32 header = *reinterpret_cast(view.data()); return 1u + extract_frame_info(header).len; }; return fixup_buffer(buf, bufUsed, tmpBuf, skip_func); } inline size_t fixup_buffer_mvlc_eth(const u8 *buf, size_t bufUsed, std::vector &tmpBuf) { auto skip_func = [](const basic_string_view &view) -> u32 { if (view.size() < sizeof(u32)) return 0u; // Either a SystemEvent header or the first of the two ETH packet headers u32 header = *reinterpret_cast(view.data()); if (get_frame_type(header) == frame_headers::SystemEvent) return 1u + extract_frame_info(header).len; if (view.size() >= 2 * sizeof(u32)) { u32 header1 = *reinterpret_cast(view.data() + sizeof(u32)); eth::PayloadHeaderInfo ethHdrs{ header, header1 }; return eth::HeaderWords + ethHdrs.dataWordCount(); } // Not enough data to get the 2nd ETH header word. return 0u; }; return fixup_buffer(buf, bufUsed, tmpBuf, skip_func); } enum class BufferType: u32 { MVLC_USB, MVLC_ETH, }; enum class MessageType { ListfileBuffer, ParsedEvents, }; struct __attribute__((packed, aligned(1))) MessageHeaderBase { MessageType messageType; u32 messageNumber; }; struct __attribute__((packed, aligned(1))) ListfileBufferMessageHeader: public MessageHeaderBase { u32 bufferType; }; static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0); struct __attribute__((packed, aligned(1))) ParsedEventsMessageHeader: public MessageHeaderBase { }; static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0); void fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, std::vector &tmpBuf) { size_t bytesMoved = 0u; const u8 *msgBufferData = reinterpret_cast(nng_msg_body(msg)) + sizeof(ListfileBufferMessageHeader); const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader); if (bufferType == BufferType::MVLC_USB) bytesMoved = fixup_buffer_mvlc_usb(msgBufferData, msgBufferSize, tmpBuf); else bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf); nng_msg_chop(msg, bytesMoved); } void listfile_reader_producer( nng_socket socket, mvlc::listfile::ReadHandle &input, const mvlc::listfile::Preamble &preamble) { prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0); try { auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth() ? BufferType::MVLC_ETH : BufferType::MVLC_USB); u32 bufferNumber = 0u; size_t totalBytesSent = 0u; std::vector previousData; bool done = false; while (!done) { nng_msg *msg = {}; if (int res = nng_msg_alloc(&msg, util::Megabytes(1))) { spdlog::error("nng_msg_alloc: {}", nng_strerror(res)); return; } auto msgHeader = reinterpret_cast(nng_msg_body(msg)); msgHeader->messageType = MessageType::ListfileBuffer; msgHeader->messageNumber = ++bufferNumber; msgHeader->bufferType = static_cast(bufferType); nng_msg_realloc(msg, sizeof(ListfileBufferMessageHeader)); nng_msg_append(msg, previousData.data(), previousData.size()); previousData.clear(); size_t msgUsed = nng_msg_len(msg); nng_msg_realloc(msg, util::Megabytes(1)); size_t bytesRead = input.read( reinterpret_cast(nng_msg_body(msg)) + msgUsed, nng_msg_len(msg) - msgUsed); nng_msg_realloc(msg, msgUsed + bytesRead); fixup_listfile_buffer_message(bufferType, msg, previousData); done = (bytesRead == 0 && nng_msg_len(msg) == sizeof(ListfileBufferMessageHeader)); if (done) nng_msg_realloc(msg, 0); const auto msgSize = nng_msg_len(msg); if (auto res = send_message_retry(socket, msg, 0, "listfile_reader_producer")) { nng_msg_free(msg); msg = nullptr; spdlog::error("listfile_reader_producer: send_message_retry: {}", nng_strerror(res)); return; } spdlog::debug("listfile_reader_producer: sent message {} of size {}", bufferNumber, msgSize); totalBytesSent += msgSize; } spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB", bufferNumber, 1.0 * totalBytesSent / util::Megabytes(1)); } catch(const std::exception& e) { spdlog::error("listfile_reader_prroducer: exception: {}", e.what()); return; } } struct ParserNngContext { nng_msg *outputMessage = nullptr; u32 outputMessageNumber = 0u; nng_socket outputSocket = NNG_SOCKET_INITIALIZER; size_t totalReadoutEvents = 0u; size_t totalSystemEvents = 0u; }; struct __attribute__((packed, aligned(1))) ParsedEventHeader { u32 magicByte: 8; u8 crateIndex: 8; }; struct __attribute__((packed, aligned(1))) ParsedDataEventHeader: public ParsedEventHeader { u8 eventIndex: 8; u8 moduleCount: 8; }; struct __attribute__((packed, aligned(1))) ParsedSystemEventHeader: public ParsedEventHeader { u32 eventSize; }; struct __attribute__((packed, aligned(1))) ParsedModuleHeader { u16 prefixSize; u16 suffixSize; u32 dynamicSize; }; static const size_t OutputMessageReserved = util::Megabytes(1); bool allocate_output_message(ParserNngContext &ctx, const char *debugInfo = "") { auto &msg = ctx.outputMessage; if (msg) return false; if (auto res = nng_msg_alloc(&msg, 0)) { spdlog::error("{} - nng_msg_alloc: {}", debugInfo, nng_strerror(res)); return false; } if (auto res = nng_msg_reserve(msg, OutputMessageReserved)) { spdlog::error("{} - nng_msg_reserve: {}", debugInfo, nng_strerror(res)); return false; } ParsedEventsMessageHeader header = { MessageType::ParsedEvents, ++ctx.outputMessageNumber, }; nng_msg_append(msg, &header, sizeof(header)); assert(nng_msg_len(msg) == sizeof(header)); return true; } bool flush_output_message(ParserNngContext &ctx, const char *debugInfo = "") { const auto msgSize = nng_msg_len(ctx.outputMessage); if (auto res = send_message_retry(ctx.outputSocket, ctx.outputMessage, 0, debugInfo)) { nng_msg_free(ctx.outputMessage); ctx.outputMessage = nullptr; spdlog::error("{}: send_message_retry: {}:", debugInfo, nng_strerror(res)); return false; } ctx.outputMessage = nullptr; spdlog::debug("{}: sent message {} of size {}", debugInfo, ctx.outputMessageNumber, msgSize); return true; } void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) { assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); assert(eventIndex >= 0 && eventIndex <= std::numeric_limits::max()); assert(moduleCount < std::numeric_limits::max()); auto &ctx = *reinterpret_cast(ctx_); ++ctx.totalReadoutEvents; auto &msg = ctx.outputMessage; size_t requiredBytes = sizeof(ParsedDataEventHeader); for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex) { auto &moduleData = moduleDataList[moduleIndex]; requiredBytes += sizeof(ParsedModuleHeader); requiredBytes += moduleData.data.size * sizeof(u32); } size_t bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; if (msg && bytesFree < requiredBytes) { if (!flush_output_message(ctx, "parser_nng_eventdata_v2")) return; } if (!msg && !allocate_output_message(ctx, "parser_nng_eventdata_v2")) return; bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; assert(bytesFree >= requiredBytes); ParsedDataEventHeader eventHeader = { 0xF3u, static_cast(crateIndex), static_cast(eventIndex), static_cast(moduleCount), }; if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) { spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); return; } for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex) { auto &moduleData = moduleDataList[moduleIndex]; ParsedModuleHeader moduleHeader = {}; moduleHeader.prefixSize = moduleData.prefixSize; moduleHeader.dynamicSize = moduleData.dynamicSize; moduleHeader.suffixSize = moduleData.suffixSize; if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader))) { spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); return; } if (int res = nng_msg_append(msg, moduleData.data.data, moduleData.data.size * sizeof(u32))) { spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); return; } } } void parser_nng_systemevent_v2(void *ctx_, int crateIndex, const u32 *header, u32 size) { assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); auto &ctx = *reinterpret_cast(ctx_); ++ctx.totalSystemEvents; auto &msg = ctx.outputMessage; size_t requiredBytes = sizeof(ParsedSystemEventHeader) + size * sizeof(u32); size_t bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; if (msg && bytesFree < requiredBytes) { if (!flush_output_message(ctx, "parser_nng_systemevent_v2")) return; } if (!msg && !allocate_output_message(ctx, "parser_nng_systemevent_v2")) return; bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; assert(bytesFree >= requiredBytes); ParsedSystemEventHeader eventHeader = { 0xFAu, static_cast(crateIndex), size, }; if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) { spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res)); return; } if (int res = nng_msg_append(msg, header, size * sizeof(u32))) { spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res)); return; } } class StopWatch { public: using duration_type = std::chrono::microseconds; void start() { tStart_ = tInterval_ = std::chrono::high_resolution_clock::now(); } duration_type interval() { auto now = std::chrono::high_resolution_clock::now(); auto result = std::chrono::duration_cast(now - tInterval_); tInterval_ = now; return result; } duration_type end() { auto now = std::chrono::high_resolution_clock::now(); auto result = std::chrono::duration_cast(now - tStart_); return result; } private: std::chrono::high_resolution_clock::time_point tStart_; std::chrono::high_resolution_clock::time_point tInterval_; }; void listfile_parser_nng( nng_socket inputSocket, nng_socket outputSocket, const mvlc::listfile::Preamble &preamble) { prctl(PR_SET_NAME,"listfile_parser_nng",0,0,0); size_t totalInputBytes = 0u; u32 lastInputMessageNumber = 0u; size_t inputBuffersLost = 0; const auto listfileFormat = (preamble.magic == mvlc::listfile::get_filemagic_eth() ? ConnectionType::ETH : ConnectionType::USB); auto configSection = preamble.findCrateConfig(); if (!configSection) { spdlog::error("listfile_parser_nng - no CrateConfig found in listfile preamble"); return; } auto configYaml = configSection->contentsToString(); auto crateConfig = mvlc::crate_config_from_yaml(configYaml); std::string stacksYaml; for (const auto &stack: crateConfig.stacks) stacksYaml += to_yaml(stack); spdlog::info("listfile_parser_nng: readout stacks:\n{}", stacksYaml); auto crateIndex = 0; ParserNngContext parserContext; parserContext.outputSocket = outputSocket; auto parserState = mvlc::readout_parser::make_readout_parser( crateConfig.stacks, crateIndex, &parserContext); mvlc::readout_parser::ReadoutParserCounters parserCounters = {}; mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = { parser_nng_eventdata_v2, parser_nng_systemevent_v2, }; nng_msg *inputMsg = nullptr; u32 &outputMessageNumber = parserContext.outputMessageNumber; std::chrono::microseconds tReceive(0); std::chrono::microseconds tProcess(0); std::chrono::microseconds tSend(0); std::chrono::microseconds tTotal(0); auto tLastReport = std::chrono::steady_clock::now(); const auto tStart = std::chrono::steady_clock::now(); auto log_stats = [&] { spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1)); spdlog::info("listfile_parser_nng: time budget: " " tReceive = {} ms, " " tProcess = {} ms, " " tSend = {} ms, " " tTotal = {} ms", tReceive.count() / 1000.0, tProcess.count() / 1000.0, tSend.count() / 1000.0, tTotal.count() / 1000.0); auto totalElapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - tStart); auto totalBytes = parserCounters.bytesProcessed; auto totalMiB = totalBytes / (1024.0*1024.0); //auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count(); auto MiBperSecond = totalMiB / totalElapsed.count(); spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s", outputMessageNumber, totalMiB, MiBperSecond); }; while (true) { StopWatch stopWatch; stopWatch.start(); if (auto res = receive_message(inputSocket, &inputMsg)) { if (res != NNG_ETIMEDOUT) { spdlog::error("listfile_parser_nng - receive_message: {}", nng_strerror(res)); return; } } else if (nng_msg_len(inputMsg) < sizeof(ListfileBufferMessageHeader)) { if (nng_msg_len(inputMsg) == 0) break; spdlog::warn("listfile_parser_nng - incoming message too short (len={})", nng_msg_len(inputMsg)); } else { tReceive += stopWatch.interval(); totalInputBytes += nng_msg_len(inputMsg); auto inputHeader = *reinterpret_cast( nng_msg_body(inputMsg)); auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; lastInputMessageNumber = inputHeader.messageNumber; spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader)); auto inputData = reinterpret_cast(nng_msg_body(inputMsg)); size_t inputLen = nng_msg_len(inputMsg) / sizeof(u32); readout_parser::parse_readout_buffer( listfileFormat, parserState, parserCallbacks, parserCounters, inputHeader.messageNumber, inputData, inputLen); tProcess += stopWatch.interval(); // TODO: also flush after a certain time tTotal += stopWatch.end(); } if (inputMsg) { nng_msg_free(inputMsg); inputMsg = nullptr; } { auto now = std::chrono::steady_clock::now(); auto tReportElapsed = now - tLastReport; static const auto ReportInterval = std::chrono::seconds(1); if (tReportElapsed >= ReportInterval) { log_stats(); tLastReport = now; } } } if (parserContext.outputMessage) flush_output_message(parserContext, "listfile_parser_nng_v2"); if (inputMsg) { nng_msg_free(inputMsg); inputMsg = nullptr; } assert(!parserContext.outputMessage); // send empty message if (auto res = nng_msg_alloc(&parserContext.outputMessage, 0)) { spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res)); return; } if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) { nng_msg_free(parserContext.outputMessage); parserContext.outputMessage = nullptr; spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res)); return; } log_stats(); { std::ostringstream ss; readout_parser::print_counters(ss, parserCounters); spdlog::info("listfile_parser_nng: parser counters:\n{}", ss.str()); } } void analysis_nng( nng_socket inputSocket ) { prctl(PR_SET_NAME,"analysis_nng",0,0,0); nng_msg *inputMsg = nullptr; size_t totalInputBytes = 0u; u32 lastInputMessageNumber = 0u; size_t inputBuffersLost = 0; while (true) { if (auto res = receive_message(inputSocket, &inputMsg)) { if (res != NNG_ETIMEDOUT) { spdlog::error("analysis_nng - receive_message: {}", nng_strerror(res)); return; } } else if (nng_msg_len(inputMsg) < sizeof(ParsedEventsMessageHeader)) { if (nng_msg_len(inputMsg) == 0) break; spdlog::warn("analysis_nng - incoming message too short (len={})", nng_msg_len(inputMsg)); } else { totalInputBytes += nng_msg_len(inputMsg); auto inputHeader = *reinterpret_cast( nng_msg_body(inputMsg)); auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; lastInputMessageNumber = inputHeader.messageNumber; spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); nng_msg_trim(inputMsg, sizeof(ParsedEventsMessageHeader)); //while (nng_msg_len(inputMsg)) //{ //} nng_msg_free(inputMsg); inputMsg = nullptr; } } spdlog::info("analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1)); } int main(int argc, char *argv[]) { spdlog::set_level(spdlog::level::info); if (argc < 2) return 1; try { auto producerSocket = make_pair_socket(); if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0)) mesy_nng_fatal("nng_listen inproc", res); auto parserInputSocket = make_pair_socket(); if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0)) mesy_nng_fatal("nng_dial inproc", res); auto parserOutputSocket = make_pair_socket(); if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0)) mesy_nng_fatal("nng_listen inproc", res); auto analysisInputSocket = make_pair_socket(); if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0)) mesy_nng_fatal("nng_dial inproc", res); listfile::ZipReader zipReader; zipReader.openArchive(argv[1]); spdlog::info("replaying from {}", zipReader.firstListfileEntryName()); auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName()); auto preamble = mvlc::listfile::read_preamble(*readHandle); (void) listfile::read_magic(*readHandle); std::vector threads; threads.emplace_back(std::thread(listfile_reader_producer, producerSocket, std::ref(*readHandle), std::cref(preamble))); threads.emplace_back(std::thread(listfile_parser_nng, parserInputSocket, parserOutputSocket, std::cref(preamble))); threads.emplace_back(std::thread(analysis_nng, analysisInputSocket)); for (auto &t: threads) if (t.joinable()) t.join(); } catch(const std::exception& e) { spdlog::error("exception in main(): {}", e.what()); return 1; } return 0; }