mesytec-mnode/src/mvlc_nng_replay.cc
2023-07-03 21:32:47 +02:00

691 lines
No EOL
22 KiB
C++

#include <limits>
#include <mesytec-mvlc/mesytec-mvlc.h>
#include <nng/nng.h>
#include <sys/prctl.h>
#include "common.h"
using namespace mesytec;
using namespace mesytec::mvlc;
int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 0, const char *debugInfo = "")
{
int res = 0;
size_t attempt = 0u;
do
{
res = nng_sendmsg(socket, msg, 0);
if (res && res == NNG_ETIMEDOUT)
spdlog::warn("send_message_retry: {} - send timeout", debugInfo);
if (res && res != NNG_ETIMEDOUT)
return res;
if (res && maxTries > 0 && attempt >= maxTries)
return res;
++attempt;
} while (res == NNG_ETIMEDOUT);
return 0;
}
// Follows the framing structure inside the buffer until an incomplete frame
// which doesn't fit into the buffer is detected. The incomplete data is moved
// over to the tempBuffer so that the readBuffer ends with a complete frame.
//
// The input buffer must start with a frame header (skip_count will be called
// with the first word of the input buffer on the first iteration).
//
// The SkipCountFunc must return the number of words to skip to get to the next
// frame header or 0 if there is not enough data left in the input iterator to
// determine the frames size.
// Signature of SkipCountFunc: u32 skip_count(const basic_string_view<const u8> &view);
// Returns the number of trailing bytes copied from msgBuf into tmpBuf.
template<typename SkipCountFunc>
inline size_t fixup_buffer(
const u8 *msgBuf, size_t msgUsed,
std::vector<u8> &tmpBuf,
SkipCountFunc skip_count)
{
auto view = basic_string_view<const u8>(msgBuf, msgUsed);
while (!view.empty())
{
if (view.size() >= sizeof(u32))
{
u32 wordsToSkip = skip_count(view);
//cout << "wordsToSkip=" << wordsToSkip << ", view.size()=" << view.size() << ", in words:" << view.size() / sizeof(u32));
if (wordsToSkip == 0 || wordsToSkip > view.size() / sizeof(u32))
{
tmpBuf.reserve(tmpBuf.size() + view.size());
std::copy(std::begin(view), std::end(view), std::back_inserter(tmpBuf));
return view.size();
}
// Skip over the SystemEvent frame or the ETH packet data.
view.remove_prefix(wordsToSkip * sizeof(u32));
}
}
return 0u;
}
inline size_t fixup_buffer_mvlc_usb(const u8 *buf, size_t bufUsed, std::vector<u8> &tmpBuf)
{
auto skip_func = [] (const basic_string_view<const u8> &view) -> u32
{
if (view.size() < sizeof(u32))
return 0u;
u32 header = *reinterpret_cast<const u32 *>(view.data());
return 1u + extract_frame_info(header).len;
};
return fixup_buffer(buf, bufUsed, tmpBuf, skip_func);
}
inline size_t fixup_buffer_mvlc_eth(const u8 *buf, size_t bufUsed, std::vector<u8> &tmpBuf)
{
auto skip_func = [](const basic_string_view<const u8> &view) -> u32
{
if (view.size() < sizeof(u32))
return 0u;
// Either a SystemEvent header or the first of the two ETH packet headers
u32 header = *reinterpret_cast<const u32 *>(view.data());
if (get_frame_type(header) == frame_headers::SystemEvent)
return 1u + extract_frame_info(header).len;
if (view.size() >= 2 * sizeof(u32))
{
u32 header1 = *reinterpret_cast<const u32 *>(view.data() + sizeof(u32));
eth::PayloadHeaderInfo ethHdrs{ header, header1 };
return eth::HeaderWords + ethHdrs.dataWordCount();
}
// Not enough data to get the 2nd ETH header word.
return 0u;
};
return fixup_buffer(buf, bufUsed, tmpBuf, skip_func);
}
enum class BufferType: u32
{
MVLC_USB,
MVLC_ETH,
};
enum class MessageType
{
ListfileBuffer,
ParsedEvents,
};
struct __attribute__((packed, aligned(1))) MessageHeaderBase
{
MessageType messageType;
u32 messageNumber;
};
struct __attribute__((packed, aligned(1))) ListfileBufferMessageHeader: public MessageHeaderBase
{
u32 bufferType;
};
static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0);
struct __attribute__((packed, aligned(1))) ParsedEventsMessageHeader: public MessageHeaderBase
{
};
static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0);
void fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, std::vector<u8> &tmpBuf)
{
size_t bytesMoved = 0u;
const u8 *msgBufferData = reinterpret_cast<const u8 *>(nng_msg_body(msg)) + sizeof(ListfileBufferMessageHeader);
const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader);
if (bufferType == BufferType::MVLC_USB)
bytesMoved = fixup_buffer_mvlc_usb(msgBufferData, msgBufferSize, tmpBuf);
else
bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf);
nng_msg_chop(msg, bytesMoved);
}
void listfile_reader_producer(
nng_socket socket,
mvlc::listfile::ReadHandle &input,
const mvlc::listfile::Preamble &preamble)
{
prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0);
try
{
auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth()
? BufferType::MVLC_ETH
: BufferType::MVLC_USB);
u32 bufferNumber = 0u;
size_t totalBytesSent = 0u;
std::vector<u8> previousData;
bool done = false;
while (!done)
{
nng_msg *msg = {};
if (int res = nng_msg_alloc(&msg, util::Megabytes(1)))
{
spdlog::error("nng_msg_alloc: {}", nng_strerror(res));
return;
}
auto msgHeader = reinterpret_cast<ListfileBufferMessageHeader *>(nng_msg_body(msg));
msgHeader->messageType = MessageType::ListfileBuffer;
msgHeader->messageNumber = ++bufferNumber;
msgHeader->bufferType = static_cast<u32>(bufferType);
nng_msg_realloc(msg, sizeof(ListfileBufferMessageHeader));
nng_msg_append(msg, previousData.data(), previousData.size());
previousData.clear();
size_t msgUsed = nng_msg_len(msg);
nng_msg_realloc(msg, util::Megabytes(1));
size_t bytesRead = input.read(
reinterpret_cast<u8 *>(nng_msg_body(msg)) + msgUsed,
nng_msg_len(msg) - msgUsed);
nng_msg_realloc(msg, msgUsed + bytesRead);
fixup_listfile_buffer_message(bufferType, msg, previousData);
done = (bytesRead == 0 && nng_msg_len(msg) == sizeof(ListfileBufferMessageHeader));
if (done)
nng_msg_realloc(msg, 0);
const auto msgSize = nng_msg_len(msg);
if (auto res = send_message_retry(socket, msg, 0, "listfile_reader_producer"))
{
nng_msg_free(msg);
msg = nullptr;
spdlog::error("listfile_reader_producer: send_message_retry: {}", nng_strerror(res));
return;
}
spdlog::info("listfile_reader_producer: sent message {} of size {}",
bufferNumber, msgSize);
totalBytesSent += msgSize;
}
spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB",
bufferNumber, 1.0 * totalBytesSent / util::Megabytes(1));
}
catch(const std::exception& e)
{
spdlog::error("listfile_reader_prroducer: exception: {}", e.what());
return;
}
}
struct ParserNngContext
{
nng_msg *outputMessage = nullptr;
size_t totalReadoutEvents = 0u;
size_t totalSystemEvents = 0u;
};
struct __attribute__((packed, aligned(1))) ParsedEventHeader
{
u32 magicByte: 8;
u8 crateIndex: 8;
};
struct __attribute__((packed, aligned(1))) ParsedDataEventHeader: public ParsedEventHeader
{
u8 eventIndex: 8;
u8 moduleCount: 8;
};
struct __attribute__((packed, aligned(1))) ParsedSystemEventHeader: public ParsedEventHeader
{
u32 eventSize;
};
struct __attribute__((packed, aligned(1))) ParsedModuleHeader
{
u16 prefixSize;
u16 suffixSize;
u32 dynamicSize;
};
void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
const readout_parser::ModuleData *moduleDataList, unsigned moduleCount)
{
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
assert(eventIndex >= 0 && eventIndex <= std::numeric_limits<u8>::max());
assert(moduleCount < std::numeric_limits<u8>::max());
auto &ctx = *reinterpret_cast<ParserNngContext *>(ctx_);
++ctx.totalReadoutEvents;
auto msg = ctx.outputMessage;
ParsedDataEventHeader eventHeader =
{
0xF3u,
static_cast<u8>(crateIndex),
static_cast<u8>(eventIndex),
static_cast<u8>(moduleCount),
};
if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader)))
{
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
return;
}
#if 1
for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex)
{
auto &moduleData = moduleDataList[moduleIndex];
ParsedModuleHeader moduleHeader = {};
moduleHeader.prefixSize = moduleData.prefixSize;
moduleHeader.dynamicSize = moduleData.dynamicSize;
moduleHeader.suffixSize = moduleData.suffixSize;
if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader)))
{
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
return;
}
auto prefixSpan = prefix_span(moduleData);
auto dynamicSpan = dynamic_span(moduleData);
auto suffixSpan = suffix_span(moduleData);
for (auto &dataSpan: { prefixSpan, dynamicSpan, suffixSpan })
{
if (int res = nng_msg_append(msg, dataSpan.data, dataSpan.size * sizeof(u32)))
{
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
return;
}
}
}
#endif
}
void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 size)
{
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
auto &ctx = *reinterpret_cast<ParserNngContext *>(ctx_);
++ctx.totalSystemEvents;
auto msg = ctx.outputMessage;
ParsedSystemEventHeader eventHeader =
{
0xFAu,
static_cast<u8>(crateIndex),
size,
};
if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader)))
{
spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res));
return;
}
if (int res = nng_msg_append(msg, header, size * sizeof(u32)))
{
spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res));
return;
}
}
class StopWatch
{
public:
using duration_type = std::chrono::microseconds;
void start()
{
tStart_ = tInterval_ = std::chrono::high_resolution_clock::now();
}
duration_type interval()
{
auto now = std::chrono::high_resolution_clock::now();
auto result = std::chrono::duration_cast<duration_type>(now - tInterval_);
tInterval_ = now;
return result;
}
duration_type end()
{
auto now = std::chrono::high_resolution_clock::now();
auto result = std::chrono::duration_cast<duration_type>(now - tStart_);
return result;
}
private:
std::chrono::high_resolution_clock::time_point tStart_;
std::chrono::high_resolution_clock::time_point tInterval_;
};
void listfile_parser_nng(
nng_socket inputSocket,
nng_socket outputSocket,
const mvlc::listfile::Preamble &preamble)
{
prctl(PR_SET_NAME,"listfile_parser_nng",0,0,0);
size_t totalInputBytes = 0u;
u32 lastInputMessageNumber = 0u;
size_t inputBuffersLost = 0;
const auto listfileFormat = (preamble.magic == mvlc::listfile::get_filemagic_eth()
? ConnectionType::ETH
: ConnectionType::USB);
auto configSection = preamble.findCrateConfig();
if (!configSection)
{
spdlog::error("listfile_parser_nng - no CrateConfig found in listfile preamble");
return;
}
auto configYaml = configSection->contentsToString();
auto crateConfig = mvlc::crate_config_from_yaml(configYaml);
spdlog::info("listfile_parser_nng: CrateConfig:\n{}", to_yaml(crateConfig));
auto crateIndex = 0;
ParserNngContext parserContext;
auto parserState = mvlc::readout_parser::make_readout_parser(
crateConfig.stacks, crateIndex, &parserContext);
mvlc::readout_parser::ReadoutParserCounters parserCounters = {};
mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks =
{
parser_nng_eventdata,
parser_nng_systemevent,
};
nng_msg *inputMsg = nullptr;
u32 outputMessageNumber = 0u;
std::chrono::microseconds tReceive(0);
std::chrono::microseconds tProcess(0);
std::chrono::microseconds tSend(0);
std::chrono::microseconds tTotal(0);
auto tLastReport = std::chrono::steady_clock::now();
while (true)
{
StopWatch stopWatch;
stopWatch.start();
if (auto res = receive_message(inputSocket, &inputMsg))
{
if (res != NNG_ETIMEDOUT)
{
spdlog::error("listfile_parser_nng - receive_message: {}", nng_strerror(res));
return;
}
}
else if (nng_msg_len(inputMsg) < sizeof(ListfileBufferMessageHeader))
{
if (nng_msg_len(inputMsg) == 0)
break;
spdlog::warn("listfile_parser_nng - incoming message too short (len={})",
nng_msg_len(inputMsg));
}
else
{
tReceive += stopWatch.interval();
totalInputBytes += nng_msg_len(inputMsg);
auto inputHeader = *reinterpret_cast<const ListfileBufferMessageHeader *>(
nng_msg_body(inputMsg));
auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber);
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;;
lastInputMessageNumber = inputHeader.messageNumber;
spdlog::info("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg));
nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader));
auto inputData = reinterpret_cast<const u32 *>(nng_msg_body(inputMsg));
size_t inputLen = nng_msg_len(inputMsg) / sizeof(u32);
if (!parserContext.outputMessage)
{
if (auto res = nng_msg_alloc(&parserContext.outputMessage, util::Megabytes(1)))
{
spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res));
break;
}
auto &outputHeader = *reinterpret_cast<ParsedEventsMessageHeader *>(nng_msg_body(parserContext.outputMessage));
outputHeader.messageType = MessageType::ParsedEvents;
outputHeader.messageNumber = ++outputMessageNumber;
nng_msg_realloc(parserContext.outputMessage, sizeof(ParsedEventsMessageHeader));
}
assert(parserContext.outputMessage);
readout_parser::parse_readout_buffer(
listfileFormat,
parserState,
parserCallbacks,
parserCounters,
inputHeader.messageNumber,
inputData,
inputLen);
tProcess += stopWatch.interval();
// TODO: also flush after a certain time
if (nng_msg_len(parserContext.outputMessage) >= util::Megabytes(1))
{
const auto msgSize = nng_msg_len(parserContext.outputMessage);
if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng"))
{
nng_msg_free(parserContext.outputMessage);
parserContext.outputMessage = nullptr;
spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res));
break;
}
parserContext.outputMessage = nullptr;
spdlog::info("listfile_parser_nng: sent message {} of size {}",
outputMessageNumber, msgSize);
}
tSend += stopWatch.interval();
tTotal += stopWatch.end();
}
if (inputMsg)
{
nng_msg_free(inputMsg);
inputMsg = nullptr;
}
{
auto now = std::chrono::steady_clock::now();
auto tElapsed = now - tLastReport;
static const auto ReportInterval = std::chrono::seconds(1);
if (tElapsed >= ReportInterval)
{
spdlog::info("listfile_parser_nng: time budget:\n"
" tReceive = {}\n"
" tProcess = {}\n"
" tSend = {}\n"
" tTotal = {}\n",
tReceive.count(),
tProcess.count(),
tSend.count(),
tTotal.count());
tLastReport = now;
}
}
}
if (inputMsg)
{
nng_msg_free(inputMsg);
inputMsg = nullptr;
}
assert(!parserContext.outputMessage);
spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB",
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1));
spdlog::info("listfile_parser_nng: time budget:\n"
" tReceive = {} ms\n"
" tProcess = {} ms\n"
" tSend = {} ms\n"
" tTotal = {} ms\n",
tReceive.count() / 1000.0,
tProcess.count() / 1000.0,
tSend.count() / 1000.0,
tTotal.count() / 1000.0);
// send empty message
if (auto res = nng_msg_alloc(&parserContext.outputMessage, 0))
{
spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res));
return;
}
if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng"))
{
nng_msg_free(parserContext.outputMessage);
parserContext.outputMessage = nullptr;
spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res));
return;
}
}
void analysis_nng(
nng_socket inputSocket
)
{
prctl(PR_SET_NAME,"analysis_nng",0,0,0);
nng_msg *inputMsg = nullptr;
size_t totalInputBytes = 0u;
u32 lastInputMessageNumber = 0u;
size_t inputBuffersLost = 0;
while (true)
{
if (auto res = receive_message(inputSocket, &inputMsg))
{
if (res != NNG_ETIMEDOUT)
{
spdlog::error("analysis_nng - receive_message: {}", nng_strerror(res));
return;
}
}
else if (nng_msg_len(inputMsg) < sizeof(ParsedEventsMessageHeader))
{
if (nng_msg_len(inputMsg) == 0)
break;
spdlog::warn("analysis_nng - incoming message too short (len={})",
nng_msg_len(inputMsg));
}
else
{
totalInputBytes += nng_msg_len(inputMsg);
auto inputHeader = *reinterpret_cast<const ParsedEventsMessageHeader *>(
nng_msg_body(inputMsg));
auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber);
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;;
lastInputMessageNumber = inputHeader.messageNumber;
spdlog::info("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg));
nng_msg_trim(inputMsg, sizeof(ParsedEventsMessageHeader));
//while (nng_msg_len(inputMsg))
//{
//}
nng_msg_free(inputMsg);
inputMsg = nullptr;
}
}
spdlog::info("analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB",
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1));
}
int main(int argc, char *argv[])
{
spdlog::set_level(spdlog::level::info);
if (argc < 2)
return 1;
try
{
auto producerSocket = make_pair_socket();
if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0))
mesy_nng_fatal("nng_listen inproc", res);
auto parserInputSocket = make_pair_socket();
if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0))
mesy_nng_fatal("nng_dial inproc", res);
auto parserOutputSocket = make_pair_socket();
if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0))
mesy_nng_fatal("nng_listen inproc", res);
auto analysisInputSocket = make_pair_socket();
if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0))
mesy_nng_fatal("nng_dial inproc", res);
listfile::ZipReader zipReader;
zipReader.openArchive(argv[1]);
spdlog::info("replaying from {}", zipReader.firstListfileEntryName());
auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName());
auto preamble = mvlc::listfile::read_preamble(*readHandle);
(void) listfile::read_magic(*readHandle);
std::vector<std::thread> threads;
threads.emplace_back(std::thread(listfile_reader_producer,
producerSocket, std::ref(*readHandle), std::cref(preamble)));
threads.emplace_back(std::thread(listfile_parser_nng,
parserInputSocket, parserOutputSocket, std::cref(preamble)));
threads.emplace_back(std::thread(analysis_nng,
analysisInputSocket));
for (auto &t: threads) if (t.joinable()) t.join();
}
catch(const std::exception& e)
{
spdlog::error("exception in main(): {}", e.what());
return 1;
}
return 0;
}