diff --git a/proto/CMakeLists.txt b/proto/CMakeLists.txt index dd408c7..930f408 100644 --- a/proto/CMakeLists.txt +++ b/proto/CMakeLists.txt @@ -1,4 +1,4 @@ -find_package(Protobuf REQUIRED) +find_package(Protobuf CONFIG REQUIRED) set(MNODE_PROTOS google/rpc/status.proto @@ -18,5 +18,6 @@ target_include_directories(mnode-proto PUBLIC $) protobuf_generate(TARGET mnode-proto) -protobuf_generate_python(PROTO_PY ${MNODE_PROTOS}) +protobuf_generate(LANGUAGE python OUT_VAR PROTO_PY PROTOS ${MNODE_PROTOS}) +#protobuf_generate_python(PROTO_PY ${MNODE_PROTOS}) add_custom_target(mnode-proto-py ALL DEPENDS ${PROTO_PY}) diff --git a/src/tools/mnode_proto_ping_client.cc b/src/tools/mnode_proto_ping_client.cc index 4c11234..fca6564 100644 --- a/src/tools/mnode_proto_ping_client.cc +++ b/src/tools/mnode_proto_ping_client.cc @@ -1,10 +1,10 @@ #include #include -#include -#include #include "proto/service.pb.h" #include +#include +#include using namespace mesytec; using namespace mesytec::mnode; @@ -80,7 +80,7 @@ int main() for (;;) { - nng_msleep(100); + std::this_thread::sleep_for(100ms); if (sw.get_interval() >= 1s) { for (auto &client: clients) diff --git a/src/tools/mnode_proto_ping_server.cc b/src/tools/mnode_proto_ping_server.cc index dc97afe..c0de1a7 100644 --- a/src/tools/mnode_proto_ping_server.cc +++ b/src/tools/mnode_proto_ping_server.cc @@ -1,10 +1,10 @@ #include -#include -#include +#include "internal/argh.h" #include "proto/service.pb.h" #include -#include "internal/argh.h" +#include +#include using namespace mesytec; using namespace mesytec::mnode; @@ -65,7 +65,7 @@ int main() for (;;) { - nng_msleep(100); + std::this_thread::sleep_for(100ms); if (sw.get_interval() >= 1s) { for (auto &server: servers) diff --git a/src/tools/mvlc_nng_replay.cc b/src/tools/mvlc_nng_replay.cc index ff225a9..5492bdb 100644 --- a/src/tools/mvlc_nng_replay.cc +++ b/src/tools/mvlc_nng_replay.cc @@ -1,8 +1,10 @@ #include -#include #include #include +#include +#ifndef __WIN32 #include +#endif #include using namespace mesytec; @@ -11,13 +13,13 @@ using namespace mesytec::mnode::nng; static const size_t DefaultOutputMessageReserve = mvlc::util::Megabytes(1); -enum class BufferType: u32 +enum class BufferType : u32 { MVLC_USB, MVLC_ETH, }; -enum class MessageType: u8 +enum class MessageType : u8 { ListfileBuffer, ParsedEvents, @@ -44,11 +46,12 @@ struct PACK_AND_ALIGN4 ParsedEventsMessageHeader: public BaseMessageHeader static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0); -size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, std::vector &tmpBuf) +size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, + std::vector &tmpBuf) { size_t bytesMoved = 0u; - const u8 *msgBufferData = reinterpret_cast(nng_msg_body(msg)) - + sizeof(ListfileBufferMessageHeader); + const u8 *msgBufferData = + reinterpret_cast(nng_msg_body(msg)) + sizeof(ListfileBufferMessageHeader); const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader); if (bufferType == BufferType::MVLC_USB) @@ -61,15 +64,14 @@ size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, return bytesMoved; } - -void listfile_reader_producer( - nng_socket outputSocket, - mvlc::listfile::ReadHandle &input, - const BufferType &bufferType) +void listfile_reader_producer(nng_socket outputSocket, mvlc::listfile::ReadHandle &input, + const BufferType &bufferType) { - prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0); +#ifndef __WIN32 + prctl(PR_SET_NAME, "listfile_reader_producer", 0, 0, 0); +#endif - #if 0 +#if 0 log_socket_info(outputSocket, "listfile_reader_producer - outputSocket"); if (auto res = nng_close(outputSocket)) @@ -80,7 +82,7 @@ void listfile_reader_producer( } return; - #endif +#endif try { @@ -96,8 +98,7 @@ void listfile_reader_producer( if (allocate_reserve_message(&msg, DefaultOutputMessageReserve)) return; - ListfileBufferMessageHeader header - { + ListfileBufferMessageHeader header{ MessageType::ListfileBuffer, ++bufferNumber, static_cast(bufferType), @@ -112,9 +113,8 @@ void listfile_reader_producer( size_t msgUsed = nng_msg_len(msg); nng_msg_realloc(msg, DefaultOutputMessageReserve); - size_t bytesRead = input.read( - reinterpret_cast(nng_msg_body(msg)) + msgUsed, - nng_msg_len(msg) - msgUsed); + size_t bytesRead = input.read(reinterpret_cast(nng_msg_body(msg)) + msgUsed, + nng_msg_len(msg) - msgUsed); nng_msg_realloc(msg, msgUsed + bytesRead); fixup_listfile_buffer_message(bufferType, msg, previousData); @@ -130,24 +130,24 @@ void listfile_reader_producer( { nng_msg_free(msg); msg = nullptr; - spdlog::error("listfile_reader_producer: send_message_retry: {}", nng_strerror(res)); + spdlog::error("listfile_reader_producer: send_message_retry: {}", + nng_strerror(res)); return; } - spdlog::debug("listfile_reader_producer: sent message {} of size {}", - bufferNumber, msgSize); + spdlog::debug("listfile_reader_producer: sent message {} of size {}", bufferNumber, + msgSize); totalBytesSent += msgSize; } spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB", - bufferNumber, 1.0 * totalBytesSent / mvlc::util::Megabytes(1)); + bufferNumber, 1.0 * totalBytesSent / mvlc::util::Megabytes(1)); } - catch(const std::exception& e) + catch (const std::exception &e) { spdlog::error("listfile_reader_prroducer: exception: {}", e.what()); return; } - } struct ReadoutParserNngContext @@ -202,8 +202,7 @@ bool parser_maybe_alloc_output(ReadoutParserNngContext &ctx) if (allocate_reserve_message(&msg, DefaultOutputMessageReserve)) return false; - ParsedEventsMessageHeader header = - { + ParsedEventsMessageHeader header = { MessageType::ParsedEvents, ++ctx.outputMessageNumber, }; @@ -228,20 +227,18 @@ bool flush_output_message(ReadoutParserNngContext &ctx, const char *debugInfo = ctx.outputMessage = nullptr; - spdlog::debug("{}: sent message {} of size {}", - debugInfo, ctx.outputMessageNumber, msgSize); + spdlog::debug("{}: sent message {} of size {}", debugInfo, ctx.outputMessageNumber, msgSize); return true; } void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, - const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) + const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) { assert(crateIndex >= 0 && crateIndex <= std::numeric_limits::max()); assert(eventIndex >= 0 && eventIndex <= std::numeric_limits::max()); assert(moduleCount < std::numeric_limits::max()); - auto &ctx = *reinterpret_cast(ctx_); ++ctx.totalReadoutEvents; auto &msg = ctx.outputMessage; @@ -255,7 +252,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, requiredBytes += moduleData.data.size * sizeof(u32); } - size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; + size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; if (msg && bytesFree < requiredBytes) { @@ -269,8 +266,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; assert(bytesFree >= requiredBytes); - ParsedDataEventHeader eventHeader = - { + ParsedDataEventHeader eventHeader = { ParsedDataEventMagic, static_cast(crateIndex), static_cast(eventIndex), @@ -314,7 +310,7 @@ void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 s auto &msg = ctx.outputMessage; size_t requiredBytes = sizeof(ParsedSystemEventHeader) + size * sizeof(u32); - size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; + size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; if (msg && bytesFree < requiredBytes) { @@ -325,11 +321,10 @@ void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 s if (!msg && !parser_maybe_alloc_output(ctx)) return; - bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; + bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; assert(bytesFree >= requiredBytes); - ParsedSystemEventHeader eventHeader = - { + ParsedSystemEventHeader eventHeader = { ParsedSystemEventMagic, static_cast(crateIndex), size, @@ -350,13 +345,10 @@ void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 s class StopWatch { -public: + public: using duration_type = std::chrono::microseconds; - void start() - { - tStart_ = tInterval_ = std::chrono::high_resolution_clock::now(); - } + void start() { tStart_ = tInterval_ = std::chrono::high_resolution_clock::now(); } duration_type interval() { @@ -373,17 +365,17 @@ public: return result; } -private: + private: std::chrono::high_resolution_clock::time_point tStart_; std::chrono::high_resolution_clock::time_point tInterval_; }; -void listfile_parser_nng( - nng_socket inputSocket, - nng_socket outputSocket, - const mvlc::CrateConfig &crateConfig) +void listfile_parser_nng(nng_socket inputSocket, nng_socket outputSocket, + const mvlc::CrateConfig &crateConfig) { - prctl(PR_SET_NAME,"listfile_parser_nng",0,0,0); +#ifndef __WIN32 + prctl(PR_SET_NAME, "listfile_parser_nng", 0, 0, 0); +#endif size_t totalInputBytes = 0u; u32 lastInputMessageNumber = 0u; @@ -399,11 +391,10 @@ void listfile_parser_nng( auto crateIndex = 0; ReadoutParserNngContext parserContext; parserContext.outputSocket = outputSocket; - auto parserState = mvlc::readout_parser::make_readout_parser( - crateConfig.stacks, &parserContext); + auto parserState = + mvlc::readout_parser::make_readout_parser(crateConfig.stacks, &parserContext); mvlc::readout_parser::ReadoutParserCounters parserCounters = {}; - mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = - { + mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = { parser_nng_eventdata, parser_nng_systemevent, }; @@ -419,25 +410,26 @@ void listfile_parser_nng( auto log_stats = [&] { - spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", - lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1)); + spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, " + "totalInput={:.2f} MiB", + lastInputMessageNumber, inputBuffersLost, + 1.0 * totalInputBytes / mvlc::util::Megabytes(1)); spdlog::info("listfile_parser_nng: time budget: " - " tReceive = {} ms, " - " tProcess = {} ms, " - " tSend = {} ms, " - " tTotal = {} ms", - tReceive.count() / 1000.0, - tProcess.count() / 1000.0, - tSend.count() / 1000.0, - tTotal.count() / 1000.0); + " tReceive = {} ms, " + " tProcess = {} ms, " + " tSend = {} ms, " + " tTotal = {} ms", + tReceive.count() / 1000.0, tProcess.count() / 1000.0, tSend.count() / 1000.0, + tTotal.count() / 1000.0); auto totalElapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - tStart); auto totalBytes = parserCounters.bytesProcessed; - auto totalMiB = totalBytes / (1024.0*1024.0); - //auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count(); + auto totalMiB = totalBytes / (1024.0 * 1024.0); + // auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count(); auto MiBperSecond = totalMiB / totalElapsed.count() * 1000.0; - spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s", + spdlog::info( + "listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s", outputMessageNumber, totalMiB, MiBperSecond); }; @@ -460,31 +452,29 @@ void listfile_parser_nng( break; spdlog::warn("listfile_parser_nng - incoming message too short (len={})", - nng_msg_len(inputMsg)); + nng_msg_len(inputMsg)); } else { tReceive += stopWatch.interval(); totalInputBytes += nng_msg_len(inputMsg); - auto inputHeader = *reinterpret_cast( - nng_msg_body(inputMsg)); - auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); - inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; + auto inputHeader = + *reinterpret_cast(nng_msg_body(inputMsg)); + auto bufferLoss = + readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); + inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u; + ; lastInputMessageNumber = inputHeader.messageNumber; - spdlog::debug("listfile_parser_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); + spdlog::debug("listfile_parser_nng: received message {} of size {}", + lastInputMessageNumber, nng_msg_len(inputMsg)); nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader)); auto inputData = reinterpret_cast(nng_msg_body(inputMsg)); size_t inputLen = nng_msg_len(inputMsg) / sizeof(u32); - readout_parser::parse_readout_buffer( - listfileFormat, - parserState, - parserCallbacks, - parserCounters, - inputHeader.messageNumber, - inputData, - inputLen); + readout_parser::parse_readout_buffer(listfileFormat, parserState, parserCallbacks, + parserCounters, inputHeader.messageNumber, + inputData, inputLen); tProcess += stopWatch.interval(); @@ -529,7 +519,8 @@ void listfile_parser_nng( return; } - if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) + if (auto res = + send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) { nng_msg_free(parserContext.outputMessage); parserContext.outputMessage = nullptr; @@ -546,11 +537,11 @@ void listfile_parser_nng( } } -void analysis_nng( - nng_socket inputSocket - ) +void analysis_nng(nng_socket inputSocket) { - prctl(PR_SET_NAME,"analysis_nng",0,0,0); +#ifndef __WIN32 + prctl(PR_SET_NAME, "analysis_nng", 0, 0, 0); +#endif nng_msg *inputMsg = nullptr; size_t totalInputBytes = 0u; u32 lastInputMessageNumber = 0u; @@ -573,22 +564,27 @@ void analysis_nng( break; spdlog::warn("analysis_nng - incoming message too short (len={})", - nng_msg_len(inputMsg)); + nng_msg_len(inputMsg)); } else { totalInputBytes += nng_msg_len(inputMsg); auto inputHeader = msg_trim_read(inputMsg).value(); - auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); - inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; + auto bufferLoss = + readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); + inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u; + ; lastInputMessageNumber = inputHeader.messageNumber; if (inputHeader.messageType != MessageType::ParsedEvents) { - spdlog::error("Received input message with unhandled type 0x{:02x}, expected type 0x{:02x}", - static_cast(inputHeader.messageType), static_cast(MessageType::ParsedEvents)); - break; + spdlog::error( + "Received input message with unhandled type 0x{:02x}, expected type 0x{:02x}", + static_cast(inputHeader.messageType), + static_cast(MessageType::ParsedEvents)); + break; } - spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); + spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, + nng_msg_len(inputMsg)); while (true) { @@ -603,7 +599,8 @@ void analysis_nng( if (!eventHeader) break; - for (size_t moduleIndex=0u; moduleIndexmoduleCount; ++moduleIndex) + for (size_t moduleIndex = 0u; moduleIndex < eventHeader->moduleCount; + ++moduleIndex) { auto moduleHeader = msg_trim_read(inputMsg); if (!moduleHeader) @@ -611,10 +608,13 @@ void analysis_nng( if (moduleHeader->totalBytes()) { - const u32 *moduleData = reinterpret_cast(nng_msg_body(inputMsg)); + const u32 *moduleData = + reinterpret_cast(nng_msg_body(inputMsg)); - //util::log_buffer(std::cout, moduleData, moduleHeader->totalSize(), fmt::format("crate={}, event={}, module={}, size={}", - // eventHeader->crateIndex, eventHeader->eventIndex, moduleIndex, moduleHeader->totalSize())); + // util::log_buffer(std::cout, moduleData, moduleHeader->totalSize(), + // fmt::format("crate={}, event={}, module={}, size={}", + // eventHeader->crateIndex, eventHeader->eventIndex, moduleIndex, + // moduleHeader->totalSize())); if (nng_msg_trim(inputMsg, moduleHeader->totalBytes())) break; @@ -638,25 +638,26 @@ void analysis_nng( } } - spdlog::info("analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", + spdlog::info( + "analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1)); } -void pipe_cb(nng_pipe p, nng_pipe_ev event, void */*arg*/) +void pipe_cb(nng_pipe p, nng_pipe_ev event, void * /*arg*/) { switch (event) { - case::NNG_PIPE_EV_ADD_PRE: - spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_PRE"); + case ::NNG_PIPE_EV_ADD_PRE: + spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_PRE"); break; - case::NNG_PIPE_EV_ADD_POST: - spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_POST"); - log_pipe_info(p, "NNG_PIPE_EV_ADD_POST"); + case ::NNG_PIPE_EV_ADD_POST: + spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_POST"); + log_pipe_info(p, "NNG_PIPE_EV_ADD_POST"); break; - case::NNG_PIPE_EV_REM_POST: - spdlog::info("pipe_cb:NNG_PIPE_EV_REM_POST"); + case ::NNG_PIPE_EV_REM_POST: + spdlog::info("pipe_cb:NNG_PIPE_EV_REM_POST"); break; - case::NNG_PIPE_EV_NUM: // silence warning + case ::NNG_PIPE_EV_NUM: // silence warning break; } } @@ -680,7 +681,8 @@ int main(int argc, char *argv[]) } catch (const std::exception &e) { - std::cout << fmt::format("Error: could not open '{}' for reading: {}\n", inputFilename, e.what()); + std::cout << fmt::format("Error: could not open '{}' for reading: {}\n", inputFilename, + e.what()); return 1; } @@ -690,12 +692,13 @@ int main(int argc, char *argv[]) { readHandle = zipReader.openEntry(zipReader.firstListfileEntryName()); } - catch(const std::exception& e) + catch (const std::exception &e) { if (zipReader.firstListfileEntryName().empty()) std::cout << fmt::format("Error: no MVLC listfile found in '{}'\n", inputFilename); else - std::cout << fmt::format("Error: could not open listfile '{}' in '{}' for reading: {}\n", + std::cout << fmt::format( + "Error: could not open listfile '{}' in '{}' for reading: {}\n", zipReader.firstListfileEntryName(), inputFilename, e.what()); return 1; @@ -710,9 +713,10 @@ int main(int argc, char *argv[]) auto parserOutputSocket = make_pair_socket(); auto analysisInputSocket = make_pair_socket(); - for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket }) + for (auto &socket: + {readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket}) { - for (auto event: { NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST }) + for (auto event: {NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST}) { if (int res = nng_pipe_notify(socket, event, pipe_cb, nullptr)) mnode_nng_fatal("nng_pipe_notify", res); @@ -724,25 +728,22 @@ int main(int argc, char *argv[]) log_socket_info(readerOutputSocket, "readerOutputSocket"); - if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0)) mnode_nng_fatal("nng_dial inproc", res); log_socket_info(parserInputSocket, "parserInputSocket"); - if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0)) mnode_nng_fatal("nng_listen inproc", res); - if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0)) mnode_nng_fatal("nng_dial inproc", res); spdlog::info("replaying from {}", zipReader.firstListfileEntryName()); auto preamble = mvlc::listfile::read_preamble(*readHandle); - const auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth() - ? BufferType::MVLC_ETH - : BufferType::MVLC_USB); + const auto bufferType = + (preamble.magic == mvlc::listfile::get_filemagic_eth() ? BufferType::MVLC_ETH + : BufferType::MVLC_USB); auto crateConfigSection = preamble.findCrateConfig(); if (!crateConfigSection) @@ -757,26 +758,28 @@ int main(int argc, char *argv[]) // Seek to start, then read past the magic bytes at the beginning of the // listfile. - (void) listfile::read_magic(*readHandle); + (void)listfile::read_magic(*readHandle); std::vector threads; - threads.emplace_back(std::thread(listfile_reader_producer, - readerOutputSocket, std::ref(*readHandle), std::cref(bufferType))); + threads.emplace_back(std::thread(listfile_reader_producer, readerOutputSocket, + std::ref(*readHandle), std::cref(bufferType))); - threads.emplace_back(std::thread(listfile_parser_nng, - parserInputSocket, parserOutputSocket, std::cref(crateConfig))); + threads.emplace_back(std::thread(listfile_parser_nng, parserInputSocket, parserOutputSocket, + std::cref(crateConfig))); - threads.emplace_back(std::thread(analysis_nng, - analysisInputSocket)); + threads.emplace_back(std::thread(analysis_nng, analysisInputSocket)); - for (auto &t: threads) if (t.joinable()) t.join(); + for (auto &t: threads) + if (t.joinable()) + t.join(); - for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket }) + for (auto &socket: + {readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket}) if (auto res = nng_close(socket)) mnode_nng_fatal("nng_close", res); } - catch(const std::exception& e) + catch (const std::exception &e) { spdlog::error("exception in main(): {}", e.what()); return 1;