untested replay over nng code

This commit is contained in:
Florian Lüke 2023-07-02 23:33:22 +02:00
parent e1c77492e2
commit ae815d4adb
3 changed files with 212 additions and 5 deletions

@ -1 +1 @@
Subproject commit 79a194b029bb2a63ca69cde0df81ae91c104e661
Subproject commit ecbcd56ab69a802f7173fa28d25b8eb6dd824015

View file

@ -1,4 +1,4 @@
set(MY_WARN_FLAGS -Wall -Wextra -Wpedantic)
set(MVLC_NNG_NODE_WARN_FLAGS -Wall -Wextra -Wpedantic)
# Bit of a hack to set the variables here. If set earlier clang-tidy will for
# some reason pickup log.c and warn about some va_list stuff. Might be because
@ -23,17 +23,22 @@ set(MY_WARN_FLAGS -Wall -Wextra -Wpedantic)
add_executable(pair_producer pair_producer.cc)
target_compile_features(pair_producer PRIVATE cxx_std_17)
target_link_libraries(pair_producer PRIVATE mesytec-mvlc PRIVATE nng)
target_compile_options(pair_producer PRIVATE ${MY_WARN_FLAGS})
target_compile_options(pair_producer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
add_executable(pair_consumer pair_consumer.cc)
target_compile_features(pair_consumer PRIVATE cxx_std_17)
target_link_libraries(pair_consumer PRIVATE mesytec-mvlc PRIVATE nng)
target_compile_options(pair_consumer PRIVATE ${MY_WARN_FLAGS})
target_compile_options(pair_consumer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
add_executable(pair_inproc pair_inproc.cc)
target_compile_features(pair_inproc PRIVATE cxx_std_17)
target_link_libraries(pair_inproc PRIVATE mesytec-mvlc PRIVATE nng)
target_compile_options(pair_inproc PRIVATE ${MY_WARN_FLAGS})
target_compile_options(pair_inproc PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
add_executable(mvlc_nng_replay mvlc_nng_replay.cc)
target_compile_features(mvlc_nng_replay PRIVATE cxx_std_17)
target_link_libraries(mvlc_nng_replay PRIVATE mesytec-mvlc PRIVATE nng)
target_compile_options(mvlc_nng_replay PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
#unset(CMAKE_C_CLANG_TIDY)
#unset(CMAKE_CXX_CLANG_TIDY)

202
src/mvlc_nng_replay.cc Normal file
View file

@ -0,0 +1,202 @@
#include <mesytec-mvlc/mesytec-mvlc.h>
#include <nng/nng.h>
using namespace mesytec;
using namespace mesytec::mvlc;
// Follows the framing structure inside the buffer until an incomplete frame
// which doesn't fit into the buffer is detected. The incomplete data is moved
// over to the tempBuffer so that the readBuffer ends with a complete frame.
//
// The input buffer must start with a frame header (skip_count will be called
// with the first word of the input buffer on the first iteration).
//
// The SkipCountFunc must return the number of words to skip to get to the next
// frame header or 0 if there is not enough data left in the input iterator to
// determine the frames size.
// Signature of SkipCountFunc: u32 skip_count(const basic_string_view<const u8> &view);
// Returns the number of trailing bytes copied from msgBuf into tmpBuf.
template<typename SkipCountFunc>
inline size_t fixup_buffer(
const u8 *msgBuf, size_t msgUsed,
std::vector<u8> &tmpBuf,
SkipCountFunc skip_count)
{
auto view = basic_string_view<const u8>(msgBuf, msgUsed);
while (!view.empty())
{
if (view.size() >= sizeof(u32))
{
u32 wordsToSkip = skip_count(view);
//cout << "wordsToSkip=" << wordsToSkip << ", view.size()=" << view.size() << ", in words:" << view.size() / sizeof(u32));
if (wordsToSkip == 0 || wordsToSkip > view.size() / sizeof(u32))
{
tmpBuf.reserve(tmpBuf.size() + view.size());
std::copy(std::begin(view), std::end(view), std::back_inserter(tmpBuf));
return view.size();
}
// Skip over the SystemEvent frame or the ETH packet data.
view.remove_prefix(wordsToSkip * sizeof(u32));
}
}
return 0u;
}
inline size_t fixup_buffer_mvlc_usb(const u8 *buf, size_t bufUsed, std::vector<u8> &tmpBuf)
{
auto skip_func = [] (const basic_string_view<const u8> &view) -> u32
{
if (view.size() < sizeof(u32))
return 0u;
u32 header = *reinterpret_cast<const u32 *>(view.data());
return 1u + extract_frame_info(header).len;
};
return fixup_buffer(buf, bufUsed, tmpBuf, skip_func);
}
inline size_t fixup_buffer_mvlc_eth(const u8 *buf, size_t bufUsed, std::vector<u8> &tmpBuf)
{
auto skip_func = [](const basic_string_view<const u8> &view) -> u32
{
if (view.size() < sizeof(u32))
return 0u;
// Either a SystemEvent header or the first of the two ETH packet headers
u32 header = *reinterpret_cast<const u32 *>(view.data());
if (get_frame_type(header) == frame_headers::SystemEvent)
return 1u + extract_frame_info(header).len;
if (view.size() >= 2 * sizeof(u32))
{
u32 header1 = *reinterpret_cast<const u32 *>(view.data() + sizeof(u32));
eth::PayloadHeaderInfo ethHdrs{ header, header1 };
return eth::HeaderWords + ethHdrs.dataWordCount();
}
// Not enough data to get the 2nd ETH header word.
return 0u;
};
return fixup_buffer(buf, bufUsed, tmpBuf, skip_func);
}
enum class BufferType: u32
{
MVLC_USB,
MVLC_ETH,
};
#pragma pack(push, 1)
struct BufferMessageHeader
{
u32 bufferType;
u32 bufferNumber;
};
#pragma pack(pop)
size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, size_t msgUsed, std::vector<u8> &tmpBuf)
{
size_t bytesMoved = 0u;
const u8 *msgBufferData = reinterpret_cast<const u8 *>(nng_msg_body(msg)) + sizeof(BufferMessageHeader);
const auto msgBufferSize = msgUsed - sizeof(BufferMessageHeader);
if (bufferType == BufferType::MVLC_USB)
bytesMoved = fixup_buffer_mvlc_usb(msgBufferData, msgBufferSize, tmpBuf);
else
bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf);
nng_msg_chop(msg, msgUsed - bytesMoved);
return msgUsed - bytesMoved;
}
void listfile_reader_produer(
nng_socket socket,
mvlc::listfile::ReadHandle &input)
{
try
{
auto preamble = mvlc::listfile::read_preamble(input);
auto bufferType = BufferType::MVLC_USB;
if (preamble.magic == mvlc::listfile::get_filemagic_eth())
bufferType = BufferType::MVLC_ETH;
u32 bufferNumber = 0u;
std::vector<u8> previousData;
while (true)
{
nng_msg *msg = {};
if (int res = nng_msg_alloc(&msg, util::Megabytes(1)))
{
spdlog::error("nng_msg_alloc: {}", nng_strerror(res));
return;
}
BufferMessageHeader msgHeader = {};
msgHeader.bufferType = static_cast<u32>(bufferType);
msgHeader.bufferNumber = bufferNumber;
nng_msg_append(msg, &msgHeader, sizeof(msgHeader));
size_t msgUsed = sizeof(msgHeader);
std::copy(std::begin(previousData), std::end(previousData),
reinterpret_cast<u8 *>(nng_msg_body(msg)) + msgUsed);
msgUsed += previousData.size();
previousData.clear();
size_t bytesRead = input.read(
reinterpret_cast<u8 *>(nng_msg_body(msg)) + msgUsed,
nng_msg_len(msg) - msgUsed);
msgUsed += bytesRead;
msgUsed = fixup_listfile_buffer_message(bufferType, msg, msgUsed, previousData);
assert(msgUsed == nng_msg_len(msg));
if (bytesRead == 0 && nng_msg_len(msg) == sizeof(BufferMessageHeader))
{
nng_msg_free(msg);
return;
}
else if (auto res = nng_sendmsg(socket, msg, 0))
{
// TODO: how to handle blocking here? need some way to ensure
// nng_sendmsg() returns
// -> maybe: timeout and loop with check if the producer should quit
spdlog::error("nng_sendmsg: {}", nng_strerror(res));
nng_msg_free(msg);
return;
}
}
}
catch(const std::exception& e)
{
spdlog::error(e.what());
return;
}
}
int main(int argc, char *argv[])
{
if (argc < 2)
return 1;
try
{
listfile::ZipReader zipReader;
zipReader.openArchive(argv[1]);
auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName());
}
catch(const std::exception& e)
{
spdlog::error("exception in main(): {}", e.what());
return 1;
}
return 0;
}