diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index c7605fe..f83477d 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -16,7 +16,6 @@ endfunction() add_mnode_dev_executable(pair_producer) add_mnode_dev_executable(pair_consumer) add_mnode_dev_executable(pair_inproc) -add_mnode_dev_executable(mvlc_nng_replay) add_mnode_dev_executable(mesy_nng_pipeline_main) add_mnode_dev_executable(mesy_nng_push_pull_main) add_mnode_dev_executable(mesy_nng_pub_producer) diff --git a/src/tools/mvlc_nng_replay.cc b/src/tools/mvlc_nng_replay.cc deleted file mode 100644 index 5492bdb..0000000 --- a/src/tools/mvlc_nng_replay.cc +++ /dev/null @@ -1,789 +0,0 @@ -#include -#include -#include -#include -#ifndef __WIN32 -#include -#endif -#include - -using namespace mesytec; -using namespace mesytec::mvlc; -using namespace mesytec::mnode::nng; - -static const size_t DefaultOutputMessageReserve = mvlc::util::Megabytes(1); - -enum class BufferType : u32 -{ - MVLC_USB, - MVLC_ETH, -}; - -enum class MessageType : u8 -{ - ListfileBuffer, - ParsedEvents, -}; - -#define PACK_AND_ALIGN4 __attribute__((packed, aligned(4))) - -struct PACK_AND_ALIGN4 BaseMessageHeader -{ - MessageType messageType; - u32 messageNumber; -}; - -struct PACK_AND_ALIGN4 ListfileBufferMessageHeader: public BaseMessageHeader -{ - u32 bufferType; -}; - -static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0); - -struct PACK_AND_ALIGN4 ParsedEventsMessageHeader: public BaseMessageHeader -{ -}; - -static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0); - -size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, - std::vector &tmpBuf) -{ - size_t bytesMoved = 0u; - const u8 *msgBufferData = - reinterpret_cast(nng_msg_body(msg)) + sizeof(ListfileBufferMessageHeader); - const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader); - - if (bufferType == BufferType::MVLC_USB) - bytesMoved = fixup_buffer_mvlc_usb(msgBufferData, msgBufferSize, tmpBuf); - else - bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf); - - nng_msg_chop(msg, bytesMoved); - - return bytesMoved; -} - -void listfile_reader_producer(nng_socket outputSocket, mvlc::listfile::ReadHandle &input, - const BufferType &bufferType) -{ -#ifndef __WIN32 - prctl(PR_SET_NAME, "listfile_reader_producer", 0, 0, 0); -#endif - -#if 0 - log_socket_info(outputSocket, "listfile_reader_producer - outputSocket"); - - if (auto res = nng_close(outputSocket)) - { - spdlog::error("nng_close: {}", nng_strerror(res)); - log_socket_info(outputSocket, "listfile_reader_producer - outputSocket"); - return; - } - - return; -#endif - - try - { - u32 bufferNumber = 0u; - size_t totalBytesSent = 0u; - std::vector previousData; - bool done = false; - - while (!done) - { - nng_msg *msg = {}; - - if (allocate_reserve_message(&msg, DefaultOutputMessageReserve)) - return; - - ListfileBufferMessageHeader header{ - MessageType::ListfileBuffer, - ++bufferNumber, - static_cast(bufferType), - }; - - nng_msg_append(msg, &header, sizeof(header)); - assert(nng_msg_len(msg) == sizeof(header)); - - nng_msg_append(msg, previousData.data(), previousData.size()); - previousData.clear(); - - size_t msgUsed = nng_msg_len(msg); - nng_msg_realloc(msg, DefaultOutputMessageReserve); - - size_t bytesRead = input.read(reinterpret_cast(nng_msg_body(msg)) + msgUsed, - nng_msg_len(msg) - msgUsed); - - nng_msg_realloc(msg, msgUsed + bytesRead); - fixup_listfile_buffer_message(bufferType, msg, previousData); - - done = (bytesRead == 0 && nng_msg_len(msg) == sizeof(ListfileBufferMessageHeader)); - - if (done) - nng_msg_realloc(msg, 0); - - const auto msgSize = nng_msg_len(msg); - - if (auto res = send_message_retry(outputSocket, msg, 0, "listfile_reader_producer")) - { - nng_msg_free(msg); - msg = nullptr; - spdlog::error("listfile_reader_producer: send_message_retry: {}", - nng_strerror(res)); - return; - } - - spdlog::debug("listfile_reader_producer: sent message {} of size {}", bufferNumber, - msgSize); - totalBytesSent += msgSize; - } - - spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB", - bufferNumber, 1.0 * totalBytesSent / mvlc::util::Megabytes(1)); - } - catch (const std::exception &e) - { - spdlog::error("listfile_reader_prroducer: exception: {}", e.what()); - return; - } -} - -struct ReadoutParserNngContext -{ - nng_msg *outputMessage = nullptr; - u32 outputMessageNumber = 0u; - nng_socket outputSocket = NNG_SOCKET_INITIALIZER; - size_t totalReadoutEvents = 0u; - size_t totalSystemEvents = 0u; -}; - -struct PACK_AND_ALIGN4 ParsedEventHeader -{ - u8 magicByte; - u8 crateIndex; -}; - -static const u8 ParsedDataEventMagic = 0xF3u; -static const u8 ParsedSystemEventMagic = 0xFAu; - -struct PACK_AND_ALIGN4 ParsedDataEventHeader: public ParsedEventHeader -{ - u8 eventIndex; - u8 moduleCount; -}; - -struct PACK_AND_ALIGN4 ParsedModuleHeader -{ - u16 prefixSize; - u16 suffixSize; - u32 dynamicSize; - - size_t totalSize() const { return prefixSize + suffixSize + dynamicSize; } - size_t totalBytes() const { return totalSize() * sizeof(u32); } -}; - -struct PACK_AND_ALIGN4 ParsedSystemEventHeader: public ParsedEventHeader -{ - u32 eventSize; - - size_t totalSize() const { return eventSize; } - size_t totalBytes() const { return totalSize() * sizeof(u32); } -}; - -bool parser_maybe_alloc_output(ReadoutParserNngContext &ctx) -{ - auto &msg = ctx.outputMessage; - - if (msg) - return false; - - if (allocate_reserve_message(&msg, DefaultOutputMessageReserve)) - return false; - - ParsedEventsMessageHeader header = { - MessageType::ParsedEvents, - ++ctx.outputMessageNumber, - }; - - nng_msg_append(msg, &header, sizeof(header)); - assert(nng_msg_len(msg) == sizeof(header)); - - return true; -} - -bool flush_output_message(ReadoutParserNngContext &ctx, const char *debugInfo = "") -{ - const auto msgSize = nng_msg_len(ctx.outputMessage); - - if (auto res = send_message_retry(ctx.outputSocket, ctx.outputMessage, 0, debugInfo)) - { - nng_msg_free(ctx.outputMessage); - ctx.outputMessage = nullptr; - spdlog::error("{}: send_message_retry: {}:", debugInfo, nng_strerror(res)); - return false; - } - - ctx.outputMessage = nullptr; - - spdlog::debug("{}: sent message {} of size {}", debugInfo, ctx.outputMessageNumber, msgSize); - - return true; -} - -void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, - const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) -{ - assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); - assert(eventIndex >= 0 && eventIndex <= std::numeric_limits::max()); - assert(moduleCount < std::numeric_limits::max()); - - auto &ctx = *reinterpret_cast(ctx_); - ++ctx.totalReadoutEvents; - auto &msg = ctx.outputMessage; - - size_t requiredBytes = sizeof(ParsedDataEventHeader); - - for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex) - { - auto &moduleData = moduleDataList[moduleIndex]; - requiredBytes += sizeof(ParsedModuleHeader); - requiredBytes += moduleData.data.size * sizeof(u32); - } - - size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; - - if (msg && bytesFree < requiredBytes) - { - if (!flush_output_message(ctx, "parser_nng_eventdata")) - return; - } - - if (!msg && !parser_maybe_alloc_output(ctx)) - return; - - bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; - assert(bytesFree >= requiredBytes); - - ParsedDataEventHeader eventHeader = { - ParsedDataEventMagic, - static_cast(crateIndex), - static_cast(eventIndex), - static_cast(moduleCount), - }; - - if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) - { - spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); - return; - } - - for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex) - { - auto &moduleData = moduleDataList[moduleIndex]; - - ParsedModuleHeader moduleHeader = {}; - moduleHeader.prefixSize = moduleData.prefixSize; - moduleHeader.dynamicSize = moduleData.dynamicSize; - moduleHeader.suffixSize = moduleData.suffixSize; - - if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader))) - { - spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); - return; - } - - if (int res = nng_msg_append(msg, moduleData.data.data, moduleData.data.size * sizeof(u32))) - { - spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res)); - return; - } - } -} - -void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 size) -{ - assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); - auto &ctx = *reinterpret_cast(ctx_); - ++ctx.totalSystemEvents; - auto &msg = ctx.outputMessage; - - size_t requiredBytes = sizeof(ParsedSystemEventHeader) + size * sizeof(u32); - size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; - - if (msg && bytesFree < requiredBytes) - { - if (!flush_output_message(ctx, "parser_nng_systemevent")) - return; - } - - if (!msg && !parser_maybe_alloc_output(ctx)) - return; - - bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; - assert(bytesFree >= requiredBytes); - - ParsedSystemEventHeader eventHeader = { - ParsedSystemEventMagic, - static_cast(crateIndex), - size, - }; - - if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) - { - spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res)); - return; - } - - if (int res = nng_msg_append(msg, header, size * sizeof(u32))) - { - spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res)); - return; - } -} - -class StopWatch -{ - public: - using duration_type = std::chrono::microseconds; - - void start() { tStart_ = tInterval_ = std::chrono::high_resolution_clock::now(); } - - duration_type interval() - { - auto now = std::chrono::high_resolution_clock::now(); - auto result = std::chrono::duration_cast(now - tInterval_); - tInterval_ = now; - return result; - } - - duration_type end() - { - auto now = std::chrono::high_resolution_clock::now(); - auto result = std::chrono::duration_cast(now - tStart_); - return result; - } - - private: - std::chrono::high_resolution_clock::time_point tStart_; - std::chrono::high_resolution_clock::time_point tInterval_; -}; - -void listfile_parser_nng(nng_socket inputSocket, nng_socket outputSocket, - const mvlc::CrateConfig &crateConfig) -{ -#ifndef __WIN32 - prctl(PR_SET_NAME, "listfile_parser_nng", 0, 0, 0); -#endif - - size_t totalInputBytes = 0u; - u32 lastInputMessageNumber = 0u; - size_t inputBuffersLost = 0; - const auto listfileFormat = crateConfig.connectionType; - - std::string stacksYaml; - for (const auto &stack: crateConfig.stacks) - stacksYaml += to_yaml(stack); - - spdlog::info("listfile_parser_nng: readout stacks:\n{}", stacksYaml); - - auto crateIndex = 0; - ReadoutParserNngContext parserContext; - parserContext.outputSocket = outputSocket; - auto parserState = - mvlc::readout_parser::make_readout_parser(crateConfig.stacks, &parserContext); - mvlc::readout_parser::ReadoutParserCounters parserCounters = {}; - mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = { - parser_nng_eventdata, - parser_nng_systemevent, - }; - - nng_msg *inputMsg = nullptr; - u32 &outputMessageNumber = parserContext.outputMessageNumber; - std::chrono::microseconds tReceive(0); - std::chrono::microseconds tProcess(0); - std::chrono::microseconds tSend(0); - std::chrono::microseconds tTotal(0); - auto tLastReport = std::chrono::steady_clock::now(); - const auto tStart = std::chrono::steady_clock::now(); - - auto log_stats = [&] - { - spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, " - "totalInput={:.2f} MiB", - lastInputMessageNumber, inputBuffersLost, - 1.0 * totalInputBytes / mvlc::util::Megabytes(1)); - spdlog::info("listfile_parser_nng: time budget: " - " tReceive = {} ms, " - " tProcess = {} ms, " - " tSend = {} ms, " - " tTotal = {} ms", - tReceive.count() / 1000.0, tProcess.count() / 1000.0, tSend.count() / 1000.0, - tTotal.count() / 1000.0); - - auto totalElapsed = std::chrono::duration_cast( - std::chrono::steady_clock::now() - tStart); - auto totalBytes = parserCounters.bytesProcessed; - auto totalMiB = totalBytes / (1024.0 * 1024.0); - // auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count(); - auto MiBperSecond = totalMiB / totalElapsed.count() * 1000.0; - spdlog::info( - "listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s", - outputMessageNumber, totalMiB, MiBperSecond); - }; - - while (true) - { - StopWatch stopWatch; - stopWatch.start(); - if (auto res = receive_message(inputSocket, &inputMsg)) - { - if (res != NNG_ETIMEDOUT) - { - spdlog::error("listfile_parser_nng - receive_message: {}", nng_strerror(res)); - return; - } - spdlog::trace("listfile_parser_nng - receive_message: timeout"); - } - else if (nng_msg_len(inputMsg) < sizeof(ListfileBufferMessageHeader)) - { - if (nng_msg_len(inputMsg) == 0) - break; - - spdlog::warn("listfile_parser_nng - incoming message too short (len={})", - nng_msg_len(inputMsg)); - } - else - { - tReceive += stopWatch.interval(); - totalInputBytes += nng_msg_len(inputMsg); - auto inputHeader = - *reinterpret_cast(nng_msg_body(inputMsg)); - auto bufferLoss = - readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); - inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u; - ; - lastInputMessageNumber = inputHeader.messageNumber; - spdlog::debug("listfile_parser_nng: received message {} of size {}", - lastInputMessageNumber, nng_msg_len(inputMsg)); - - nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader)); - auto inputData = reinterpret_cast(nng_msg_body(inputMsg)); - size_t inputLen = nng_msg_len(inputMsg) / sizeof(u32); - - readout_parser::parse_readout_buffer(listfileFormat, parserState, parserCallbacks, - parserCounters, inputHeader.messageNumber, - inputData, inputLen); - - tProcess += stopWatch.interval(); - - // TODO: also flush after a certain time - tTotal += stopWatch.end(); - } - - if (inputMsg) - { - nng_msg_free(inputMsg); - inputMsg = nullptr; - } - - { - auto now = std::chrono::steady_clock::now(); - auto tReportElapsed = now - tLastReport; - static const auto ReportInterval = std::chrono::seconds(1); - - if (tReportElapsed >= ReportInterval) - { - log_stats(); - tLastReport = now; - } - } - } - - if (parserContext.outputMessage) - flush_output_message(parserContext, "listfile_parser_nng"); - - if (inputMsg) - { - nng_msg_free(inputMsg); - inputMsg = nullptr; - } - - assert(!parserContext.outputMessage); - - // send empty message - if (auto res = nng_msg_alloc(&parserContext.outputMessage, 0)) - { - spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res)); - return; - } - - if (auto res = - send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) - { - nng_msg_free(parserContext.outputMessage); - parserContext.outputMessage = nullptr; - spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res)); - return; - } - - log_stats(); - - { - std::ostringstream ss; - readout_parser::print_counters(ss, parserCounters); - spdlog::info("listfile_parser_nng: parser counters:\n{}", ss.str()); - } -} - -void analysis_nng(nng_socket inputSocket) -{ -#ifndef __WIN32 - prctl(PR_SET_NAME, "analysis_nng", 0, 0, 0); -#endif - nng_msg *inputMsg = nullptr; - size_t totalInputBytes = 0u; - u32 lastInputMessageNumber = 0u; - size_t inputBuffersLost = 0; - bool error = false; - - while (!error) - { - if (auto res = receive_message(inputSocket, &inputMsg)) - { - if (res != NNG_ETIMEDOUT) - { - spdlog::error("analysis_nng - receive_message: {}", nng_strerror(res)); - return; - } - } - else if (nng_msg_len(inputMsg) < sizeof(ParsedEventsMessageHeader)) - { - if (nng_msg_len(inputMsg) == 0) - break; - - spdlog::warn("analysis_nng - incoming message too short (len={})", - nng_msg_len(inputMsg)); - } - else - { - totalInputBytes += nng_msg_len(inputMsg); - auto inputHeader = msg_trim_read(inputMsg).value(); - auto bufferLoss = - readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); - inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u; - ; - lastInputMessageNumber = inputHeader.messageNumber; - if (inputHeader.messageType != MessageType::ParsedEvents) - { - spdlog::error( - "Received input message with unhandled type 0x{:02x}, expected type 0x{:02x}", - static_cast(inputHeader.messageType), - static_cast(MessageType::ParsedEvents)); - break; - } - spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, - nng_msg_len(inputMsg)); - - while (true) - { - if (nng_msg_len(inputMsg) < 1) - break; - - const u8 eventMagic = *reinterpret_cast(nng_msg_body(inputMsg)); - - if (eventMagic == ParsedDataEventMagic) - { - auto eventHeader = msg_trim_read(inputMsg); - if (!eventHeader) - break; - - for (size_t moduleIndex = 0u; moduleIndex < eventHeader->moduleCount; - ++moduleIndex) - { - auto moduleHeader = msg_trim_read(inputMsg); - if (!moduleHeader) - break; - - if (moduleHeader->totalBytes()) - { - const u32 *moduleData = - reinterpret_cast(nng_msg_body(inputMsg)); - - // util::log_buffer(std::cout, moduleData, moduleHeader->totalSize(), - // fmt::format("crate={}, event={}, module={}, size={}", - // eventHeader->crateIndex, eventHeader->eventIndex, moduleIndex, - // moduleHeader->totalSize())); - - if (nng_msg_trim(inputMsg, moduleHeader->totalBytes())) - break; - } - } - } - else if (eventMagic == ParsedSystemEventMagic) - { - auto eventHeader = msg_trim_read(inputMsg); - if (!eventHeader) - break; - if (nng_msg_trim(inputMsg, eventHeader->totalBytes())) - break; - } - } - - assert(nng_msg_len(inputMsg) == 0); - - nng_msg_free(inputMsg); - inputMsg = nullptr; - } - } - - spdlog::info( - "analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", - lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1)); -} - -void pipe_cb(nng_pipe p, nng_pipe_ev event, void * /*arg*/) -{ - switch (event) - { - case ::NNG_PIPE_EV_ADD_PRE: - spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_PRE"); - break; - case ::NNG_PIPE_EV_ADD_POST: - spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_POST"); - log_pipe_info(p, "NNG_PIPE_EV_ADD_POST"); - break; - case ::NNG_PIPE_EV_REM_POST: - spdlog::info("pipe_cb:NNG_PIPE_EV_REM_POST"); - break; - case ::NNG_PIPE_EV_NUM: // silence warning - break; - } -} - -int main(int argc, char *argv[]) -{ - spdlog::set_level(spdlog::level::info); - - if (argc < 2) - { - std::cerr << fmt::format("Usage: mvlc_nng_replay \n"); - return 1; - } - - const auto inputFilename = argv[1]; - - listfile::ZipReader zipReader; - try - { - zipReader.openArchive(inputFilename); - } - catch (const std::exception &e) - { - std::cout << fmt::format("Error: could not open '{}' for reading: {}\n", inputFilename, - e.what()); - return 1; - } - - listfile::ReadHandle *readHandle = nullptr; - - try - { - readHandle = zipReader.openEntry(zipReader.firstListfileEntryName()); - } - catch (const std::exception &e) - { - if (zipReader.firstListfileEntryName().empty()) - std::cout << fmt::format("Error: no MVLC listfile found in '{}'\n", inputFilename); - else - std::cout << fmt::format( - "Error: could not open listfile '{}' in '{}' for reading: {}\n", - zipReader.firstListfileEntryName(), inputFilename, e.what()); - - return 1; - } - - assert(readHandle); - - try - { - auto readerOutputSocket = make_pair_socket(); - auto parserInputSocket = make_pair_socket(); - auto parserOutputSocket = make_pair_socket(); - auto analysisInputSocket = make_pair_socket(); - - for (auto &socket: - {readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket}) - { - for (auto event: {NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST}) - { - if (int res = nng_pipe_notify(socket, event, pipe_cb, nullptr)) - mnode_nng_fatal("nng_pipe_notify", res); - } - } - - if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0)) - mnode_nng_fatal("nng_listen inproc", res); - - log_socket_info(readerOutputSocket, "readerOutputSocket"); - - if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0)) - mnode_nng_fatal("nng_dial inproc", res); - - log_socket_info(parserInputSocket, "parserInputSocket"); - - if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0)) - mnode_nng_fatal("nng_listen inproc", res); - - if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0)) - mnode_nng_fatal("nng_dial inproc", res); - - spdlog::info("replaying from {}", zipReader.firstListfileEntryName()); - auto preamble = mvlc::listfile::read_preamble(*readHandle); - const auto bufferType = - (preamble.magic == mvlc::listfile::get_filemagic_eth() ? BufferType::MVLC_ETH - : BufferType::MVLC_USB); - auto crateConfigSection = preamble.findCrateConfig(); - - if (!crateConfigSection) - { - spdlog::error("listfile_parser_nng - no CrateConfig found in listfile preamble"); - // FIXME: close sockets - return 1; - } - - auto configYaml = crateConfigSection->contentsToString(); - auto crateConfig = mvlc::crate_config_from_yaml(configYaml); - - // Seek to start, then read past the magic bytes at the beginning of the - // listfile. - (void)listfile::read_magic(*readHandle); - - std::vector threads; - - threads.emplace_back(std::thread(listfile_reader_producer, readerOutputSocket, - std::ref(*readHandle), std::cref(bufferType))); - - threads.emplace_back(std::thread(listfile_parser_nng, parserInputSocket, parserOutputSocket, - std::cref(crateConfig))); - - threads.emplace_back(std::thread(analysis_nng, analysisInputSocket)); - - for (auto &t: threads) - if (t.joinable()) - t.join(); - - for (auto &socket: - {readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket}) - if (auto res = nng_close(socket)) - mnode_nng_fatal("nng_close", res); - } - catch (const std::exception &e) - { - spdlog::error("exception in main(): {}", e.what()); - return 1; - } - - return 0; -}