From ae815d4adb2f7c4b263d4dd6d253c7acc0053c5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Sun, 2 Jul 2023 23:33:22 +0200 Subject: [PATCH] untested replay over nng code --- external/mesytec-mvlc | 2 +- src/CMakeLists.txt | 13 ++- src/mvlc_nng_replay.cc | 202 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 212 insertions(+), 5 deletions(-) create mode 100644 src/mvlc_nng_replay.cc diff --git a/external/mesytec-mvlc b/external/mesytec-mvlc index 79a194b..ecbcd56 160000 --- a/external/mesytec-mvlc +++ b/external/mesytec-mvlc @@ -1 +1 @@ -Subproject commit 79a194b029bb2a63ca69cde0df81ae91c104e661 +Subproject commit ecbcd56ab69a802f7173fa28d25b8eb6dd824015 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fd714fe..4750d58 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,4 +1,4 @@ -set(MY_WARN_FLAGS -Wall -Wextra -Wpedantic) +set(MVLC_NNG_NODE_WARN_FLAGS -Wall -Wextra -Wpedantic) # Bit of a hack to set the variables here. If set earlier clang-tidy will for # some reason pickup log.c and warn about some va_list stuff. Might be because @@ -23,17 +23,22 @@ set(MY_WARN_FLAGS -Wall -Wextra -Wpedantic) add_executable(pair_producer pair_producer.cc) target_compile_features(pair_producer PRIVATE cxx_std_17) target_link_libraries(pair_producer PRIVATE mesytec-mvlc PRIVATE nng) -target_compile_options(pair_producer PRIVATE ${MY_WARN_FLAGS}) +target_compile_options(pair_producer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) add_executable(pair_consumer pair_consumer.cc) target_compile_features(pair_consumer PRIVATE cxx_std_17) target_link_libraries(pair_consumer PRIVATE mesytec-mvlc PRIVATE nng) -target_compile_options(pair_consumer PRIVATE ${MY_WARN_FLAGS}) +target_compile_options(pair_consumer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) add_executable(pair_inproc pair_inproc.cc) target_compile_features(pair_inproc PRIVATE cxx_std_17) target_link_libraries(pair_inproc PRIVATE mesytec-mvlc PRIVATE nng) -target_compile_options(pair_inproc PRIVATE ${MY_WARN_FLAGS}) +target_compile_options(pair_inproc PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) + +add_executable(mvlc_nng_replay mvlc_nng_replay.cc) +target_compile_features(mvlc_nng_replay PRIVATE cxx_std_17) +target_link_libraries(mvlc_nng_replay PRIVATE mesytec-mvlc PRIVATE nng) +target_compile_options(mvlc_nng_replay PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) #unset(CMAKE_C_CLANG_TIDY) #unset(CMAKE_CXX_CLANG_TIDY) diff --git a/src/mvlc_nng_replay.cc b/src/mvlc_nng_replay.cc new file mode 100644 index 0000000..6095404 --- /dev/null +++ b/src/mvlc_nng_replay.cc @@ -0,0 +1,202 @@ +#include +#include + +using namespace mesytec; +using namespace mesytec::mvlc; + +// Follows the framing structure inside the buffer until an incomplete frame +// which doesn't fit into the buffer is detected. The incomplete data is moved +// over to the tempBuffer so that the readBuffer ends with a complete frame. +// +// The input buffer must start with a frame header (skip_count will be called +// with the first word of the input buffer on the first iteration). +// +// The SkipCountFunc must return the number of words to skip to get to the next +// frame header or 0 if there is not enough data left in the input iterator to +// determine the frames size. +// Signature of SkipCountFunc: u32 skip_count(const basic_string_view &view); +// Returns the number of trailing bytes copied from msgBuf into tmpBuf. +template +inline size_t fixup_buffer( + const u8 *msgBuf, size_t msgUsed, + std::vector &tmpBuf, + SkipCountFunc skip_count) +{ + auto view = basic_string_view(msgBuf, msgUsed); + + while (!view.empty()) + { + if (view.size() >= sizeof(u32)) + { + u32 wordsToSkip = skip_count(view); + + //cout << "wordsToSkip=" << wordsToSkip << ", view.size()=" << view.size() << ", in words:" << view.size() / sizeof(u32)); + + if (wordsToSkip == 0 || wordsToSkip > view.size() / sizeof(u32)) + { + tmpBuf.reserve(tmpBuf.size() + view.size()); + std::copy(std::begin(view), std::end(view), std::back_inserter(tmpBuf)); + return view.size(); + } + + // Skip over the SystemEvent frame or the ETH packet data. + view.remove_prefix(wordsToSkip * sizeof(u32)); + } + } + + return 0u; +} + +inline size_t fixup_buffer_mvlc_usb(const u8 *buf, size_t bufUsed, std::vector &tmpBuf) +{ + auto skip_func = [] (const basic_string_view &view) -> u32 + { + if (view.size() < sizeof(u32)) + return 0u; + + u32 header = *reinterpret_cast(view.data()); + return 1u + extract_frame_info(header).len; + }; + + return fixup_buffer(buf, bufUsed, tmpBuf, skip_func); +} + +inline size_t fixup_buffer_mvlc_eth(const u8 *buf, size_t bufUsed, std::vector &tmpBuf) +{ + auto skip_func = [](const basic_string_view &view) -> u32 + { + if (view.size() < sizeof(u32)) + return 0u; + + // Either a SystemEvent header or the first of the two ETH packet headers + u32 header = *reinterpret_cast(view.data()); + + if (get_frame_type(header) == frame_headers::SystemEvent) + return 1u + extract_frame_info(header).len; + + if (view.size() >= 2 * sizeof(u32)) + { + u32 header1 = *reinterpret_cast(view.data() + sizeof(u32)); + eth::PayloadHeaderInfo ethHdrs{ header, header1 }; + return eth::HeaderWords + ethHdrs.dataWordCount(); + } + + // Not enough data to get the 2nd ETH header word. + return 0u; + }; + + return fixup_buffer(buf, bufUsed, tmpBuf, skip_func); +} + +enum class BufferType: u32 +{ + MVLC_USB, + MVLC_ETH, +}; + +#pragma pack(push, 1) +struct BufferMessageHeader +{ + u32 bufferType; + u32 bufferNumber; +}; + +#pragma pack(pop) + +size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, size_t msgUsed, std::vector &tmpBuf) +{ + size_t bytesMoved = 0u; + const u8 *msgBufferData = reinterpret_cast(nng_msg_body(msg)) + sizeof(BufferMessageHeader); + const auto msgBufferSize = msgUsed - sizeof(BufferMessageHeader); + + if (bufferType == BufferType::MVLC_USB) + bytesMoved = fixup_buffer_mvlc_usb(msgBufferData, msgBufferSize, tmpBuf); + else + bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf); + + nng_msg_chop(msg, msgUsed - bytesMoved); + return msgUsed - bytesMoved; +} + +void listfile_reader_produer( + nng_socket socket, + mvlc::listfile::ReadHandle &input) +{ + try + { + auto preamble = mvlc::listfile::read_preamble(input); + auto bufferType = BufferType::MVLC_USB; + if (preamble.magic == mvlc::listfile::get_filemagic_eth()) + bufferType = BufferType::MVLC_ETH; + u32 bufferNumber = 0u; + std::vector previousData; + + while (true) + { + nng_msg *msg = {}; + if (int res = nng_msg_alloc(&msg, util::Megabytes(1))) + { + spdlog::error("nng_msg_alloc: {}", nng_strerror(res)); + return; + } + + BufferMessageHeader msgHeader = {}; + msgHeader.bufferType = static_cast(bufferType); + msgHeader.bufferNumber = bufferNumber; + nng_msg_append(msg, &msgHeader, sizeof(msgHeader)); + size_t msgUsed = sizeof(msgHeader); + + std::copy(std::begin(previousData), std::end(previousData), + reinterpret_cast(nng_msg_body(msg)) + msgUsed); + msgUsed += previousData.size(); + previousData.clear(); + + size_t bytesRead = input.read( + reinterpret_cast(nng_msg_body(msg)) + msgUsed, + nng_msg_len(msg) - msgUsed); + msgUsed += bytesRead; + msgUsed = fixup_listfile_buffer_message(bufferType, msg, msgUsed, previousData); + assert(msgUsed == nng_msg_len(msg)); + + if (bytesRead == 0 && nng_msg_len(msg) == sizeof(BufferMessageHeader)) + { + nng_msg_free(msg); + return; + } + else if (auto res = nng_sendmsg(socket, msg, 0)) + { + // TODO: how to handle blocking here? need some way to ensure + // nng_sendmsg() returns + // -> maybe: timeout and loop with check if the producer should quit + spdlog::error("nng_sendmsg: {}", nng_strerror(res)); + nng_msg_free(msg); + return; + } + } + } + catch(const std::exception& e) + { + spdlog::error(e.what()); + return; + } +} + +int main(int argc, char *argv[]) +{ + if (argc < 2) + return 1; + + try + { + listfile::ZipReader zipReader; + zipReader.openArchive(argv[1]); + auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName()); + } + catch(const std::exception& e) + { + spdlog::error("exception in main(): {}", e.what()); + return 1; + } + + return 0; +} \ No newline at end of file