remove old mvlc_nng_replay

This commit is contained in:
Florian Lüke 2024-12-30 17:03:34 +01:00
parent 1e2eb48a32
commit a4f98c540e
2 changed files with 0 additions and 790 deletions

View file

@ -16,7 +16,6 @@ endfunction()
add_mnode_dev_executable(pair_producer) add_mnode_dev_executable(pair_producer)
add_mnode_dev_executable(pair_consumer) add_mnode_dev_executable(pair_consumer)
add_mnode_dev_executable(pair_inproc) add_mnode_dev_executable(pair_inproc)
add_mnode_dev_executable(mvlc_nng_replay)
add_mnode_dev_executable(mesy_nng_pipeline_main) add_mnode_dev_executable(mesy_nng_pipeline_main)
add_mnode_dev_executable(mesy_nng_push_pull_main) add_mnode_dev_executable(mesy_nng_push_pull_main)
add_mnode_dev_executable(mesy_nng_pub_producer) add_mnode_dev_executable(mesy_nng_pub_producer)

View file

@ -1,789 +0,0 @@
#include <limits>
#include <mesytec-mvlc/mesytec-mvlc.h>
#include <nng/nng.h>
#include <optional>
#ifndef __WIN32
#include <sys/prctl.h>
#endif
#include <mesytec-mnode/mnode_nng.h>
using namespace mesytec;
using namespace mesytec::mvlc;
using namespace mesytec::mnode::nng;
static const size_t DefaultOutputMessageReserve = mvlc::util::Megabytes(1);
enum class BufferType : u32
{
MVLC_USB,
MVLC_ETH,
};
enum class MessageType : u8
{
ListfileBuffer,
ParsedEvents,
};
#define PACK_AND_ALIGN4 __attribute__((packed, aligned(4)))
struct PACK_AND_ALIGN4 BaseMessageHeader
{
MessageType messageType;
u32 messageNumber;
};
struct PACK_AND_ALIGN4 ListfileBufferMessageHeader: public BaseMessageHeader
{
u32 bufferType;
};
static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0);
struct PACK_AND_ALIGN4 ParsedEventsMessageHeader: public BaseMessageHeader
{
};
static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0);
size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg,
std::vector<u8> &tmpBuf)
{
size_t bytesMoved = 0u;
const u8 *msgBufferData =
reinterpret_cast<const u8 *>(nng_msg_body(msg)) + sizeof(ListfileBufferMessageHeader);
const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader);
if (bufferType == BufferType::MVLC_USB)
bytesMoved = fixup_buffer_mvlc_usb(msgBufferData, msgBufferSize, tmpBuf);
else
bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf);
nng_msg_chop(msg, bytesMoved);
return bytesMoved;
}
void listfile_reader_producer(nng_socket outputSocket, mvlc::listfile::ReadHandle &input,
const BufferType &bufferType)
{
#ifndef __WIN32
prctl(PR_SET_NAME, "listfile_reader_producer", 0, 0, 0);
#endif
#if 0
log_socket_info(outputSocket, "listfile_reader_producer - outputSocket");
if (auto res = nng_close(outputSocket))
{
spdlog::error("nng_close: {}", nng_strerror(res));
log_socket_info(outputSocket, "listfile_reader_producer - outputSocket");
return;
}
return;
#endif
try
{
u32 bufferNumber = 0u;
size_t totalBytesSent = 0u;
std::vector<u8> previousData;
bool done = false;
while (!done)
{
nng_msg *msg = {};
if (allocate_reserve_message(&msg, DefaultOutputMessageReserve))
return;
ListfileBufferMessageHeader header{
MessageType::ListfileBuffer,
++bufferNumber,
static_cast<u32>(bufferType),
};
nng_msg_append(msg, &header, sizeof(header));
assert(nng_msg_len(msg) == sizeof(header));
nng_msg_append(msg, previousData.data(), previousData.size());
previousData.clear();
size_t msgUsed = nng_msg_len(msg);
nng_msg_realloc(msg, DefaultOutputMessageReserve);
size_t bytesRead = input.read(reinterpret_cast<u8 *>(nng_msg_body(msg)) + msgUsed,
nng_msg_len(msg) - msgUsed);
nng_msg_realloc(msg, msgUsed + bytesRead);
fixup_listfile_buffer_message(bufferType, msg, previousData);
done = (bytesRead == 0 && nng_msg_len(msg) == sizeof(ListfileBufferMessageHeader));
if (done)
nng_msg_realloc(msg, 0);
const auto msgSize = nng_msg_len(msg);
if (auto res = send_message_retry(outputSocket, msg, 0, "listfile_reader_producer"))
{
nng_msg_free(msg);
msg = nullptr;
spdlog::error("listfile_reader_producer: send_message_retry: {}",
nng_strerror(res));
return;
}
spdlog::debug("listfile_reader_producer: sent message {} of size {}", bufferNumber,
msgSize);
totalBytesSent += msgSize;
}
spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB",
bufferNumber, 1.0 * totalBytesSent / mvlc::util::Megabytes(1));
}
catch (const std::exception &e)
{
spdlog::error("listfile_reader_prroducer: exception: {}", e.what());
return;
}
}
struct ReadoutParserNngContext
{
nng_msg *outputMessage = nullptr;
u32 outputMessageNumber = 0u;
nng_socket outputSocket = NNG_SOCKET_INITIALIZER;
size_t totalReadoutEvents = 0u;
size_t totalSystemEvents = 0u;
};
struct PACK_AND_ALIGN4 ParsedEventHeader
{
u8 magicByte;
u8 crateIndex;
};
static const u8 ParsedDataEventMagic = 0xF3u;
static const u8 ParsedSystemEventMagic = 0xFAu;
struct PACK_AND_ALIGN4 ParsedDataEventHeader: public ParsedEventHeader
{
u8 eventIndex;
u8 moduleCount;
};
struct PACK_AND_ALIGN4 ParsedModuleHeader
{
u16 prefixSize;
u16 suffixSize;
u32 dynamicSize;
size_t totalSize() const { return prefixSize + suffixSize + dynamicSize; }
size_t totalBytes() const { return totalSize() * sizeof(u32); }
};
struct PACK_AND_ALIGN4 ParsedSystemEventHeader: public ParsedEventHeader
{
u32 eventSize;
size_t totalSize() const { return eventSize; }
size_t totalBytes() const { return totalSize() * sizeof(u32); }
};
bool parser_maybe_alloc_output(ReadoutParserNngContext &ctx)
{
auto &msg = ctx.outputMessage;
if (msg)
return false;
if (allocate_reserve_message(&msg, DefaultOutputMessageReserve))
return false;
ParsedEventsMessageHeader header = {
MessageType::ParsedEvents,
++ctx.outputMessageNumber,
};
nng_msg_append(msg, &header, sizeof(header));
assert(nng_msg_len(msg) == sizeof(header));
return true;
}
bool flush_output_message(ReadoutParserNngContext &ctx, const char *debugInfo = "")
{
const auto msgSize = nng_msg_len(ctx.outputMessage);
if (auto res = send_message_retry(ctx.outputSocket, ctx.outputMessage, 0, debugInfo))
{
nng_msg_free(ctx.outputMessage);
ctx.outputMessage = nullptr;
spdlog::error("{}: send_message_retry: {}:", debugInfo, nng_strerror(res));
return false;
}
ctx.outputMessage = nullptr;
spdlog::debug("{}: sent message {} of size {}", debugInfo, ctx.outputMessageNumber, msgSize);
return true;
}
void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
const readout_parser::ModuleData *moduleDataList, unsigned moduleCount)
{
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
assert(eventIndex >= 0 && eventIndex <= std::numeric_limits<u8>::max());
assert(moduleCount < std::numeric_limits<u8>::max());
auto &ctx = *reinterpret_cast<ReadoutParserNngContext *>(ctx_);
++ctx.totalReadoutEvents;
auto &msg = ctx.outputMessage;
size_t requiredBytes = sizeof(ParsedDataEventHeader);
for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex)
{
auto &moduleData = moduleDataList[moduleIndex];
requiredBytes += sizeof(ParsedModuleHeader);
requiredBytes += moduleData.data.size * sizeof(u32);
}
size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
if (msg && bytesFree < requiredBytes)
{
if (!flush_output_message(ctx, "parser_nng_eventdata"))
return;
}
if (!msg && !parser_maybe_alloc_output(ctx))
return;
bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
assert(bytesFree >= requiredBytes);
ParsedDataEventHeader eventHeader = {
ParsedDataEventMagic,
static_cast<u8>(crateIndex),
static_cast<u8>(eventIndex),
static_cast<u8>(moduleCount),
};
if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader)))
{
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
return;
}
for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex)
{
auto &moduleData = moduleDataList[moduleIndex];
ParsedModuleHeader moduleHeader = {};
moduleHeader.prefixSize = moduleData.prefixSize;
moduleHeader.dynamicSize = moduleData.dynamicSize;
moduleHeader.suffixSize = moduleData.suffixSize;
if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader)))
{
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
return;
}
if (int res = nng_msg_append(msg, moduleData.data.data, moduleData.data.size * sizeof(u32)))
{
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
return;
}
}
}
void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 size)
{
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
auto &ctx = *reinterpret_cast<ReadoutParserNngContext *>(ctx_);
++ctx.totalSystemEvents;
auto &msg = ctx.outputMessage;
size_t requiredBytes = sizeof(ParsedSystemEventHeader) + size * sizeof(u32);
size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
if (msg && bytesFree < requiredBytes)
{
if (!flush_output_message(ctx, "parser_nng_systemevent"))
return;
}
if (!msg && !parser_maybe_alloc_output(ctx))
return;
bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
assert(bytesFree >= requiredBytes);
ParsedSystemEventHeader eventHeader = {
ParsedSystemEventMagic,
static_cast<u8>(crateIndex),
size,
};
if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader)))
{
spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res));
return;
}
if (int res = nng_msg_append(msg, header, size * sizeof(u32)))
{
spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res));
return;
}
}
class StopWatch
{
public:
using duration_type = std::chrono::microseconds;
void start() { tStart_ = tInterval_ = std::chrono::high_resolution_clock::now(); }
duration_type interval()
{
auto now = std::chrono::high_resolution_clock::now();
auto result = std::chrono::duration_cast<duration_type>(now - tInterval_);
tInterval_ = now;
return result;
}
duration_type end()
{
auto now = std::chrono::high_resolution_clock::now();
auto result = std::chrono::duration_cast<duration_type>(now - tStart_);
return result;
}
private:
std::chrono::high_resolution_clock::time_point tStart_;
std::chrono::high_resolution_clock::time_point tInterval_;
};
void listfile_parser_nng(nng_socket inputSocket, nng_socket outputSocket,
const mvlc::CrateConfig &crateConfig)
{
#ifndef __WIN32
prctl(PR_SET_NAME, "listfile_parser_nng", 0, 0, 0);
#endif
size_t totalInputBytes = 0u;
u32 lastInputMessageNumber = 0u;
size_t inputBuffersLost = 0;
const auto listfileFormat = crateConfig.connectionType;
std::string stacksYaml;
for (const auto &stack: crateConfig.stacks)
stacksYaml += to_yaml(stack);
spdlog::info("listfile_parser_nng: readout stacks:\n{}", stacksYaml);
auto crateIndex = 0;
ReadoutParserNngContext parserContext;
parserContext.outputSocket = outputSocket;
auto parserState =
mvlc::readout_parser::make_readout_parser(crateConfig.stacks, &parserContext);
mvlc::readout_parser::ReadoutParserCounters parserCounters = {};
mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = {
parser_nng_eventdata,
parser_nng_systemevent,
};
nng_msg *inputMsg = nullptr;
u32 &outputMessageNumber = parserContext.outputMessageNumber;
std::chrono::microseconds tReceive(0);
std::chrono::microseconds tProcess(0);
std::chrono::microseconds tSend(0);
std::chrono::microseconds tTotal(0);
auto tLastReport = std::chrono::steady_clock::now();
const auto tStart = std::chrono::steady_clock::now();
auto log_stats = [&]
{
spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, "
"totalInput={:.2f} MiB",
lastInputMessageNumber, inputBuffersLost,
1.0 * totalInputBytes / mvlc::util::Megabytes(1));
spdlog::info("listfile_parser_nng: time budget: "
" tReceive = {} ms, "
" tProcess = {} ms, "
" tSend = {} ms, "
" tTotal = {} ms",
tReceive.count() / 1000.0, tProcess.count() / 1000.0, tSend.count() / 1000.0,
tTotal.count() / 1000.0);
auto totalElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - tStart);
auto totalBytes = parserCounters.bytesProcessed;
auto totalMiB = totalBytes / (1024.0 * 1024.0);
// auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count();
auto MiBperSecond = totalMiB / totalElapsed.count() * 1000.0;
spdlog::info(
"listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s",
outputMessageNumber, totalMiB, MiBperSecond);
};
while (true)
{
StopWatch stopWatch;
stopWatch.start();
if (auto res = receive_message(inputSocket, &inputMsg))
{
if (res != NNG_ETIMEDOUT)
{
spdlog::error("listfile_parser_nng - receive_message: {}", nng_strerror(res));
return;
}
spdlog::trace("listfile_parser_nng - receive_message: timeout");
}
else if (nng_msg_len(inputMsg) < sizeof(ListfileBufferMessageHeader))
{
if (nng_msg_len(inputMsg) == 0)
break;
spdlog::warn("listfile_parser_nng - incoming message too short (len={})",
nng_msg_len(inputMsg));
}
else
{
tReceive += stopWatch.interval();
totalInputBytes += nng_msg_len(inputMsg);
auto inputHeader =
*reinterpret_cast<const ListfileBufferMessageHeader *>(nng_msg_body(inputMsg));
auto bufferLoss =
readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber);
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;
;
lastInputMessageNumber = inputHeader.messageNumber;
spdlog::debug("listfile_parser_nng: received message {} of size {}",
lastInputMessageNumber, nng_msg_len(inputMsg));
nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader));
auto inputData = reinterpret_cast<const u32 *>(nng_msg_body(inputMsg));
size_t inputLen = nng_msg_len(inputMsg) / sizeof(u32);
readout_parser::parse_readout_buffer(listfileFormat, parserState, parserCallbacks,
parserCounters, inputHeader.messageNumber,
inputData, inputLen);
tProcess += stopWatch.interval();
// TODO: also flush after a certain time
tTotal += stopWatch.end();
}
if (inputMsg)
{
nng_msg_free(inputMsg);
inputMsg = nullptr;
}
{
auto now = std::chrono::steady_clock::now();
auto tReportElapsed = now - tLastReport;
static const auto ReportInterval = std::chrono::seconds(1);
if (tReportElapsed >= ReportInterval)
{
log_stats();
tLastReport = now;
}
}
}
if (parserContext.outputMessage)
flush_output_message(parserContext, "listfile_parser_nng");
if (inputMsg)
{
nng_msg_free(inputMsg);
inputMsg = nullptr;
}
assert(!parserContext.outputMessage);
// send empty message
if (auto res = nng_msg_alloc(&parserContext.outputMessage, 0))
{
spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res));
return;
}
if (auto res =
send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng"))
{
nng_msg_free(parserContext.outputMessage);
parserContext.outputMessage = nullptr;
spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res));
return;
}
log_stats();
{
std::ostringstream ss;
readout_parser::print_counters(ss, parserCounters);
spdlog::info("listfile_parser_nng: parser counters:\n{}", ss.str());
}
}
void analysis_nng(nng_socket inputSocket)
{
#ifndef __WIN32
prctl(PR_SET_NAME, "analysis_nng", 0, 0, 0);
#endif
nng_msg *inputMsg = nullptr;
size_t totalInputBytes = 0u;
u32 lastInputMessageNumber = 0u;
size_t inputBuffersLost = 0;
bool error = false;
while (!error)
{
if (auto res = receive_message(inputSocket, &inputMsg))
{
if (res != NNG_ETIMEDOUT)
{
spdlog::error("analysis_nng - receive_message: {}", nng_strerror(res));
return;
}
}
else if (nng_msg_len(inputMsg) < sizeof(ParsedEventsMessageHeader))
{
if (nng_msg_len(inputMsg) == 0)
break;
spdlog::warn("analysis_nng - incoming message too short (len={})",
nng_msg_len(inputMsg));
}
else
{
totalInputBytes += nng_msg_len(inputMsg);
auto inputHeader = msg_trim_read<ParsedEventsMessageHeader>(inputMsg).value();
auto bufferLoss =
readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber);
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;
;
lastInputMessageNumber = inputHeader.messageNumber;
if (inputHeader.messageType != MessageType::ParsedEvents)
{
spdlog::error(
"Received input message with unhandled type 0x{:02x}, expected type 0x{:02x}",
static_cast<u8>(inputHeader.messageType),
static_cast<u8>(MessageType::ParsedEvents));
break;
}
spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber,
nng_msg_len(inputMsg));
while (true)
{
if (nng_msg_len(inputMsg) < 1)
break;
const u8 eventMagic = *reinterpret_cast<u8 *>(nng_msg_body(inputMsg));
if (eventMagic == ParsedDataEventMagic)
{
auto eventHeader = msg_trim_read<ParsedDataEventHeader>(inputMsg);
if (!eventHeader)
break;
for (size_t moduleIndex = 0u; moduleIndex < eventHeader->moduleCount;
++moduleIndex)
{
auto moduleHeader = msg_trim_read<ParsedModuleHeader>(inputMsg);
if (!moduleHeader)
break;
if (moduleHeader->totalBytes())
{
const u32 *moduleData =
reinterpret_cast<const u32 *>(nng_msg_body(inputMsg));
// util::log_buffer(std::cout, moduleData, moduleHeader->totalSize(),
// fmt::format("crate={}, event={}, module={}, size={}",
// eventHeader->crateIndex, eventHeader->eventIndex, moduleIndex,
// moduleHeader->totalSize()));
if (nng_msg_trim(inputMsg, moduleHeader->totalBytes()))
break;
}
}
}
else if (eventMagic == ParsedSystemEventMagic)
{
auto eventHeader = msg_trim_read<ParsedSystemEventHeader>(inputMsg);
if (!eventHeader)
break;
if (nng_msg_trim(inputMsg, eventHeader->totalBytes()))
break;
}
}
assert(nng_msg_len(inputMsg) == 0);
nng_msg_free(inputMsg);
inputMsg = nullptr;
}
}
spdlog::info(
"analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB",
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1));
}
void pipe_cb(nng_pipe p, nng_pipe_ev event, void * /*arg*/)
{
switch (event)
{
case ::NNG_PIPE_EV_ADD_PRE:
spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_PRE");
break;
case ::NNG_PIPE_EV_ADD_POST:
spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_POST");
log_pipe_info(p, "NNG_PIPE_EV_ADD_POST");
break;
case ::NNG_PIPE_EV_REM_POST:
spdlog::info("pipe_cb:NNG_PIPE_EV_REM_POST");
break;
case ::NNG_PIPE_EV_NUM: // silence warning
break;
}
}
int main(int argc, char *argv[])
{
spdlog::set_level(spdlog::level::info);
if (argc < 2)
{
std::cerr << fmt::format("Usage: mvlc_nng_replay <zipfile>\n");
return 1;
}
const auto inputFilename = argv[1];
listfile::ZipReader zipReader;
try
{
zipReader.openArchive(inputFilename);
}
catch (const std::exception &e)
{
std::cout << fmt::format("Error: could not open '{}' for reading: {}\n", inputFilename,
e.what());
return 1;
}
listfile::ReadHandle *readHandle = nullptr;
try
{
readHandle = zipReader.openEntry(zipReader.firstListfileEntryName());
}
catch (const std::exception &e)
{
if (zipReader.firstListfileEntryName().empty())
std::cout << fmt::format("Error: no MVLC listfile found in '{}'\n", inputFilename);
else
std::cout << fmt::format(
"Error: could not open listfile '{}' in '{}' for reading: {}\n",
zipReader.firstListfileEntryName(), inputFilename, e.what());
return 1;
}
assert(readHandle);
try
{
auto readerOutputSocket = make_pair_socket();
auto parserInputSocket = make_pair_socket();
auto parserOutputSocket = make_pair_socket();
auto analysisInputSocket = make_pair_socket();
for (auto &socket:
{readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket})
{
for (auto event: {NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST})
{
if (int res = nng_pipe_notify(socket, event, pipe_cb, nullptr))
mnode_nng_fatal("nng_pipe_notify", res);
}
}
if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0))
mnode_nng_fatal("nng_listen inproc", res);
log_socket_info(readerOutputSocket, "readerOutputSocket");
if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0))
mnode_nng_fatal("nng_dial inproc", res);
log_socket_info(parserInputSocket, "parserInputSocket");
if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0))
mnode_nng_fatal("nng_listen inproc", res);
if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0))
mnode_nng_fatal("nng_dial inproc", res);
spdlog::info("replaying from {}", zipReader.firstListfileEntryName());
auto preamble = mvlc::listfile::read_preamble(*readHandle);
const auto bufferType =
(preamble.magic == mvlc::listfile::get_filemagic_eth() ? BufferType::MVLC_ETH
: BufferType::MVLC_USB);
auto crateConfigSection = preamble.findCrateConfig();
if (!crateConfigSection)
{
spdlog::error("listfile_parser_nng - no CrateConfig found in listfile preamble");
// FIXME: close sockets
return 1;
}
auto configYaml = crateConfigSection->contentsToString();
auto crateConfig = mvlc::crate_config_from_yaml(configYaml);
// Seek to start, then read past the magic bytes at the beginning of the
// listfile.
(void)listfile::read_magic(*readHandle);
std::vector<std::thread> threads;
threads.emplace_back(std::thread(listfile_reader_producer, readerOutputSocket,
std::ref(*readHandle), std::cref(bufferType)));
threads.emplace_back(std::thread(listfile_parser_nng, parserInputSocket, parserOutputSocket,
std::cref(crateConfig)));
threads.emplace_back(std::thread(analysis_nng, analysisInputSocket));
for (auto &t: threads)
if (t.joinable())
t.join();
for (auto &socket:
{readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket})
if (auto res = nng_close(socket))
mnode_nng_fatal("nng_close", res);
}
catch (const std::exception &e)
{
spdlog::error("exception in main(): {}", e.what());
return 1;
}
return 0;
}