fixes for windows msys2 ucrt64

Builds with protobuf v29.2 compiled and installed locally. Set
CMAKE_PREFIX_PATH via cmake.configureArgs.

The find_package(Protobuf CONFIG REQUIRED) will likely break the debian
build.
This commit is contained in:
Florian Lüke 2024-12-28 05:33:42 +01:00
parent d0b6089536
commit c08f339c95
4 changed files with 140 additions and 136 deletions

View file

@ -1,4 +1,4 @@
find_package(Protobuf REQUIRED) find_package(Protobuf CONFIG REQUIRED)
set(MNODE_PROTOS set(MNODE_PROTOS
google/rpc/status.proto google/rpc/status.proto
@ -18,5 +18,6 @@ target_include_directories(mnode-proto
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}/../>) PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}/../>)
protobuf_generate(TARGET mnode-proto) protobuf_generate(TARGET mnode-proto)
protobuf_generate_python(PROTO_PY ${MNODE_PROTOS}) protobuf_generate(LANGUAGE python OUT_VAR PROTO_PY PROTOS ${MNODE_PROTOS})
#protobuf_generate_python(PROTO_PY ${MNODE_PROTOS})
add_custom_target(mnode-proto-py ALL DEPENDS ${PROTO_PY}) add_custom_target(mnode-proto-py ALL DEPENDS ${PROTO_PY})

View file

@ -1,10 +1,10 @@
#include <atomic> #include <atomic>
#include <memory> #include <memory>
#include <mesytec-mvlc/mesytec-mvlc.h>
#include <mesytec-mvlc/util/signal_handling.h>
#include "proto/service.pb.h" #include "proto/service.pb.h"
#include <mesytec-mnode/mnode_nng_async.h> #include <mesytec-mnode/mnode_nng_async.h>
#include <mesytec-mvlc/mesytec-mvlc.h>
#include <mesytec-mvlc/util/signal_handling.h>
using namespace mesytec; using namespace mesytec;
using namespace mesytec::mnode; using namespace mesytec::mnode;
@ -80,7 +80,7 @@ int main()
for (;;) for (;;)
{ {
nng_msleep(100); std::this_thread::sleep_for(100ms);
if (sw.get_interval() >= 1s) if (sw.get_interval() >= 1s)
{ {
for (auto &client: clients) for (auto &client: clients)

View file

@ -1,10 +1,10 @@
#include <memory> #include <memory>
#include <mesytec-mvlc/mesytec-mvlc.h> #include "internal/argh.h"
#include <mesytec-mvlc/util/signal_handling.h>
#include "proto/service.pb.h" #include "proto/service.pb.h"
#include <mesytec-mnode/mnode_nng_async.h> #include <mesytec-mnode/mnode_nng_async.h>
#include "internal/argh.h" #include <mesytec-mvlc/mesytec-mvlc.h>
#include <mesytec-mvlc/util/signal_handling.h>
using namespace mesytec; using namespace mesytec;
using namespace mesytec::mnode; using namespace mesytec::mnode;
@ -65,7 +65,7 @@ int main()
for (;;) for (;;)
{ {
nng_msleep(100); std::this_thread::sleep_for(100ms);
if (sw.get_interval() >= 1s) if (sw.get_interval() >= 1s)
{ {
for (auto &server: servers) for (auto &server: servers)

View file

@ -1,8 +1,10 @@
#include <limits> #include <limits>
#include <optional>
#include <mesytec-mvlc/mesytec-mvlc.h> #include <mesytec-mvlc/mesytec-mvlc.h>
#include <nng/nng.h> #include <nng/nng.h>
#include <optional>
#ifndef __WIN32
#include <sys/prctl.h> #include <sys/prctl.h>
#endif
#include <mesytec-mnode/mnode_nng.h> #include <mesytec-mnode/mnode_nng.h>
using namespace mesytec; using namespace mesytec;
@ -11,13 +13,13 @@ using namespace mesytec::mnode::nng;
static const size_t DefaultOutputMessageReserve = mvlc::util::Megabytes(1); static const size_t DefaultOutputMessageReserve = mvlc::util::Megabytes(1);
enum class BufferType: u32 enum class BufferType : u32
{ {
MVLC_USB, MVLC_USB,
MVLC_ETH, MVLC_ETH,
}; };
enum class MessageType: u8 enum class MessageType : u8
{ {
ListfileBuffer, ListfileBuffer,
ParsedEvents, ParsedEvents,
@ -44,11 +46,12 @@ struct PACK_AND_ALIGN4 ParsedEventsMessageHeader: public BaseMessageHeader
static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0); static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0);
size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, std::vector<u8> &tmpBuf) size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg,
std::vector<u8> &tmpBuf)
{ {
size_t bytesMoved = 0u; size_t bytesMoved = 0u;
const u8 *msgBufferData = reinterpret_cast<const u8 *>(nng_msg_body(msg)) const u8 *msgBufferData =
+ sizeof(ListfileBufferMessageHeader); reinterpret_cast<const u8 *>(nng_msg_body(msg)) + sizeof(ListfileBufferMessageHeader);
const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader); const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader);
if (bufferType == BufferType::MVLC_USB) if (bufferType == BufferType::MVLC_USB)
@ -61,15 +64,14 @@ size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg,
return bytesMoved; return bytesMoved;
} }
void listfile_reader_producer(nng_socket outputSocket, mvlc::listfile::ReadHandle &input,
void listfile_reader_producer( const BufferType &bufferType)
nng_socket outputSocket,
mvlc::listfile::ReadHandle &input,
const BufferType &bufferType)
{ {
prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0); #ifndef __WIN32
prctl(PR_SET_NAME, "listfile_reader_producer", 0, 0, 0);
#endif
#if 0 #if 0
log_socket_info(outputSocket, "listfile_reader_producer - outputSocket"); log_socket_info(outputSocket, "listfile_reader_producer - outputSocket");
if (auto res = nng_close(outputSocket)) if (auto res = nng_close(outputSocket))
@ -80,7 +82,7 @@ void listfile_reader_producer(
} }
return; return;
#endif #endif
try try
{ {
@ -96,8 +98,7 @@ void listfile_reader_producer(
if (allocate_reserve_message(&msg, DefaultOutputMessageReserve)) if (allocate_reserve_message(&msg, DefaultOutputMessageReserve))
return; return;
ListfileBufferMessageHeader header ListfileBufferMessageHeader header{
{
MessageType::ListfileBuffer, MessageType::ListfileBuffer,
++bufferNumber, ++bufferNumber,
static_cast<u32>(bufferType), static_cast<u32>(bufferType),
@ -112,9 +113,8 @@ void listfile_reader_producer(
size_t msgUsed = nng_msg_len(msg); size_t msgUsed = nng_msg_len(msg);
nng_msg_realloc(msg, DefaultOutputMessageReserve); nng_msg_realloc(msg, DefaultOutputMessageReserve);
size_t bytesRead = input.read( size_t bytesRead = input.read(reinterpret_cast<u8 *>(nng_msg_body(msg)) + msgUsed,
reinterpret_cast<u8 *>(nng_msg_body(msg)) + msgUsed, nng_msg_len(msg) - msgUsed);
nng_msg_len(msg) - msgUsed);
nng_msg_realloc(msg, msgUsed + bytesRead); nng_msg_realloc(msg, msgUsed + bytesRead);
fixup_listfile_buffer_message(bufferType, msg, previousData); fixup_listfile_buffer_message(bufferType, msg, previousData);
@ -130,24 +130,24 @@ void listfile_reader_producer(
{ {
nng_msg_free(msg); nng_msg_free(msg);
msg = nullptr; msg = nullptr;
spdlog::error("listfile_reader_producer: send_message_retry: {}", nng_strerror(res)); spdlog::error("listfile_reader_producer: send_message_retry: {}",
nng_strerror(res));
return; return;
} }
spdlog::debug("listfile_reader_producer: sent message {} of size {}", spdlog::debug("listfile_reader_producer: sent message {} of size {}", bufferNumber,
bufferNumber, msgSize); msgSize);
totalBytesSent += msgSize; totalBytesSent += msgSize;
} }
spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB", spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB",
bufferNumber, 1.0 * totalBytesSent / mvlc::util::Megabytes(1)); bufferNumber, 1.0 * totalBytesSent / mvlc::util::Megabytes(1));
} }
catch(const std::exception& e) catch (const std::exception &e)
{ {
spdlog::error("listfile_reader_prroducer: exception: {}", e.what()); spdlog::error("listfile_reader_prroducer: exception: {}", e.what());
return; return;
} }
} }
struct ReadoutParserNngContext struct ReadoutParserNngContext
@ -202,8 +202,7 @@ bool parser_maybe_alloc_output(ReadoutParserNngContext &ctx)
if (allocate_reserve_message(&msg, DefaultOutputMessageReserve)) if (allocate_reserve_message(&msg, DefaultOutputMessageReserve))
return false; return false;
ParsedEventsMessageHeader header = ParsedEventsMessageHeader header = {
{
MessageType::ParsedEvents, MessageType::ParsedEvents,
++ctx.outputMessageNumber, ++ctx.outputMessageNumber,
}; };
@ -228,20 +227,18 @@ bool flush_output_message(ReadoutParserNngContext &ctx, const char *debugInfo =
ctx.outputMessage = nullptr; ctx.outputMessage = nullptr;
spdlog::debug("{}: sent message {} of size {}", spdlog::debug("{}: sent message {} of size {}", debugInfo, ctx.outputMessageNumber, msgSize);
debugInfo, ctx.outputMessageNumber, msgSize);
return true; return true;
} }
void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) const readout_parser::ModuleData *moduleDataList, unsigned moduleCount)
{ {
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max()); assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
assert(eventIndex >= 0 && eventIndex <= std::numeric_limits<u8>::max()); assert(eventIndex >= 0 && eventIndex <= std::numeric_limits<u8>::max());
assert(moduleCount < std::numeric_limits<u8>::max()); assert(moduleCount < std::numeric_limits<u8>::max());
auto &ctx = *reinterpret_cast<ReadoutParserNngContext *>(ctx_); auto &ctx = *reinterpret_cast<ReadoutParserNngContext *>(ctx_);
++ctx.totalReadoutEvents; ++ctx.totalReadoutEvents;
auto &msg = ctx.outputMessage; auto &msg = ctx.outputMessage;
@ -255,7 +252,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
requiredBytes += moduleData.data.size * sizeof(u32); requiredBytes += moduleData.data.size * sizeof(u32);
} }
size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
if (msg && bytesFree < requiredBytes) if (msg && bytesFree < requiredBytes)
{ {
@ -269,8 +266,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
assert(bytesFree >= requiredBytes); assert(bytesFree >= requiredBytes);
ParsedDataEventHeader eventHeader = ParsedDataEventHeader eventHeader = {
{
ParsedDataEventMagic, ParsedDataEventMagic,
static_cast<u8>(crateIndex), static_cast<u8>(crateIndex),
static_cast<u8>(eventIndex), static_cast<u8>(eventIndex),
@ -314,7 +310,7 @@ void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 s
auto &msg = ctx.outputMessage; auto &msg = ctx.outputMessage;
size_t requiredBytes = sizeof(ParsedSystemEventHeader) + size * sizeof(u32); size_t requiredBytes = sizeof(ParsedSystemEventHeader) + size * sizeof(u32);
size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
if (msg && bytesFree < requiredBytes) if (msg && bytesFree < requiredBytes)
{ {
@ -325,11 +321,10 @@ void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 s
if (!msg && !parser_maybe_alloc_output(ctx)) if (!msg && !parser_maybe_alloc_output(ctx))
return; return;
bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
assert(bytesFree >= requiredBytes); assert(bytesFree >= requiredBytes);
ParsedSystemEventHeader eventHeader = ParsedSystemEventHeader eventHeader = {
{
ParsedSystemEventMagic, ParsedSystemEventMagic,
static_cast<u8>(crateIndex), static_cast<u8>(crateIndex),
size, size,
@ -350,13 +345,10 @@ void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 s
class StopWatch class StopWatch
{ {
public: public:
using duration_type = std::chrono::microseconds; using duration_type = std::chrono::microseconds;
void start() void start() { tStart_ = tInterval_ = std::chrono::high_resolution_clock::now(); }
{
tStart_ = tInterval_ = std::chrono::high_resolution_clock::now();
}
duration_type interval() duration_type interval()
{ {
@ -373,17 +365,17 @@ public:
return result; return result;
} }
private: private:
std::chrono::high_resolution_clock::time_point tStart_; std::chrono::high_resolution_clock::time_point tStart_;
std::chrono::high_resolution_clock::time_point tInterval_; std::chrono::high_resolution_clock::time_point tInterval_;
}; };
void listfile_parser_nng( void listfile_parser_nng(nng_socket inputSocket, nng_socket outputSocket,
nng_socket inputSocket, const mvlc::CrateConfig &crateConfig)
nng_socket outputSocket,
const mvlc::CrateConfig &crateConfig)
{ {
prctl(PR_SET_NAME,"listfile_parser_nng",0,0,0); #ifndef __WIN32
prctl(PR_SET_NAME, "listfile_parser_nng", 0, 0, 0);
#endif
size_t totalInputBytes = 0u; size_t totalInputBytes = 0u;
u32 lastInputMessageNumber = 0u; u32 lastInputMessageNumber = 0u;
@ -399,11 +391,10 @@ void listfile_parser_nng(
auto crateIndex = 0; auto crateIndex = 0;
ReadoutParserNngContext parserContext; ReadoutParserNngContext parserContext;
parserContext.outputSocket = outputSocket; parserContext.outputSocket = outputSocket;
auto parserState = mvlc::readout_parser::make_readout_parser( auto parserState =
crateConfig.stacks, &parserContext); mvlc::readout_parser::make_readout_parser(crateConfig.stacks, &parserContext);
mvlc::readout_parser::ReadoutParserCounters parserCounters = {}; mvlc::readout_parser::ReadoutParserCounters parserCounters = {};
mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = {
{
parser_nng_eventdata, parser_nng_eventdata,
parser_nng_systemevent, parser_nng_systemevent,
}; };
@ -419,25 +410,26 @@ void listfile_parser_nng(
auto log_stats = [&] auto log_stats = [&]
{ {
spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, "
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1)); "totalInput={:.2f} MiB",
lastInputMessageNumber, inputBuffersLost,
1.0 * totalInputBytes / mvlc::util::Megabytes(1));
spdlog::info("listfile_parser_nng: time budget: " spdlog::info("listfile_parser_nng: time budget: "
" tReceive = {} ms, " " tReceive = {} ms, "
" tProcess = {} ms, " " tProcess = {} ms, "
" tSend = {} ms, " " tSend = {} ms, "
" tTotal = {} ms", " tTotal = {} ms",
tReceive.count() / 1000.0, tReceive.count() / 1000.0, tProcess.count() / 1000.0, tSend.count() / 1000.0,
tProcess.count() / 1000.0, tTotal.count() / 1000.0);
tSend.count() / 1000.0,
tTotal.count() / 1000.0);
auto totalElapsed = std::chrono::duration_cast<std::chrono::milliseconds>( auto totalElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - tStart); std::chrono::steady_clock::now() - tStart);
auto totalBytes = parserCounters.bytesProcessed; auto totalBytes = parserCounters.bytesProcessed;
auto totalMiB = totalBytes / (1024.0*1024.0); auto totalMiB = totalBytes / (1024.0 * 1024.0);
//auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count(); // auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count();
auto MiBperSecond = totalMiB / totalElapsed.count() * 1000.0; auto MiBperSecond = totalMiB / totalElapsed.count() * 1000.0;
spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s", spdlog::info(
"listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s",
outputMessageNumber, totalMiB, MiBperSecond); outputMessageNumber, totalMiB, MiBperSecond);
}; };
@ -460,31 +452,29 @@ void listfile_parser_nng(
break; break;
spdlog::warn("listfile_parser_nng - incoming message too short (len={})", spdlog::warn("listfile_parser_nng - incoming message too short (len={})",
nng_msg_len(inputMsg)); nng_msg_len(inputMsg));
} }
else else
{ {
tReceive += stopWatch.interval(); tReceive += stopWatch.interval();
totalInputBytes += nng_msg_len(inputMsg); totalInputBytes += nng_msg_len(inputMsg);
auto inputHeader = *reinterpret_cast<const ListfileBufferMessageHeader *>( auto inputHeader =
nng_msg_body(inputMsg)); *reinterpret_cast<const ListfileBufferMessageHeader *>(nng_msg_body(inputMsg));
auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); auto bufferLoss =
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber);
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;
;
lastInputMessageNumber = inputHeader.messageNumber; lastInputMessageNumber = inputHeader.messageNumber;
spdlog::debug("listfile_parser_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); spdlog::debug("listfile_parser_nng: received message {} of size {}",
lastInputMessageNumber, nng_msg_len(inputMsg));
nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader)); nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader));
auto inputData = reinterpret_cast<const u32 *>(nng_msg_body(inputMsg)); auto inputData = reinterpret_cast<const u32 *>(nng_msg_body(inputMsg));
size_t inputLen = nng_msg_len(inputMsg) / sizeof(u32); size_t inputLen = nng_msg_len(inputMsg) / sizeof(u32);
readout_parser::parse_readout_buffer( readout_parser::parse_readout_buffer(listfileFormat, parserState, parserCallbacks,
listfileFormat, parserCounters, inputHeader.messageNumber,
parserState, inputData, inputLen);
parserCallbacks,
parserCounters,
inputHeader.messageNumber,
inputData,
inputLen);
tProcess += stopWatch.interval(); tProcess += stopWatch.interval();
@ -529,7 +519,8 @@ void listfile_parser_nng(
return; return;
} }
if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng")) if (auto res =
send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng"))
{ {
nng_msg_free(parserContext.outputMessage); nng_msg_free(parserContext.outputMessage);
parserContext.outputMessage = nullptr; parserContext.outputMessage = nullptr;
@ -546,11 +537,11 @@ void listfile_parser_nng(
} }
} }
void analysis_nng( void analysis_nng(nng_socket inputSocket)
nng_socket inputSocket
)
{ {
prctl(PR_SET_NAME,"analysis_nng",0,0,0); #ifndef __WIN32
prctl(PR_SET_NAME, "analysis_nng", 0, 0, 0);
#endif
nng_msg *inputMsg = nullptr; nng_msg *inputMsg = nullptr;
size_t totalInputBytes = 0u; size_t totalInputBytes = 0u;
u32 lastInputMessageNumber = 0u; u32 lastInputMessageNumber = 0u;
@ -573,22 +564,27 @@ void analysis_nng(
break; break;
spdlog::warn("analysis_nng - incoming message too short (len={})", spdlog::warn("analysis_nng - incoming message too short (len={})",
nng_msg_len(inputMsg)); nng_msg_len(inputMsg));
} }
else else
{ {
totalInputBytes += nng_msg_len(inputMsg); totalInputBytes += nng_msg_len(inputMsg);
auto inputHeader = msg_trim_read<ParsedEventsMessageHeader>(inputMsg).value(); auto inputHeader = msg_trim_read<ParsedEventsMessageHeader>(inputMsg).value();
auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); auto bufferLoss =
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber);
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;
;
lastInputMessageNumber = inputHeader.messageNumber; lastInputMessageNumber = inputHeader.messageNumber;
if (inputHeader.messageType != MessageType::ParsedEvents) if (inputHeader.messageType != MessageType::ParsedEvents)
{ {
spdlog::error("Received input message with unhandled type 0x{:02x}, expected type 0x{:02x}", spdlog::error(
static_cast<u8>(inputHeader.messageType), static_cast<u8>(MessageType::ParsedEvents)); "Received input message with unhandled type 0x{:02x}, expected type 0x{:02x}",
break; static_cast<u8>(inputHeader.messageType),
static_cast<u8>(MessageType::ParsedEvents));
break;
} }
spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber,
nng_msg_len(inputMsg));
while (true) while (true)
{ {
@ -603,7 +599,8 @@ void analysis_nng(
if (!eventHeader) if (!eventHeader)
break; break;
for (size_t moduleIndex=0u; moduleIndex<eventHeader->moduleCount; ++moduleIndex) for (size_t moduleIndex = 0u; moduleIndex < eventHeader->moduleCount;
++moduleIndex)
{ {
auto moduleHeader = msg_trim_read<ParsedModuleHeader>(inputMsg); auto moduleHeader = msg_trim_read<ParsedModuleHeader>(inputMsg);
if (!moduleHeader) if (!moduleHeader)
@ -611,10 +608,13 @@ void analysis_nng(
if (moduleHeader->totalBytes()) if (moduleHeader->totalBytes())
{ {
const u32 *moduleData = reinterpret_cast<const u32 *>(nng_msg_body(inputMsg)); const u32 *moduleData =
reinterpret_cast<const u32 *>(nng_msg_body(inputMsg));
//util::log_buffer(std::cout, moduleData, moduleHeader->totalSize(), fmt::format("crate={}, event={}, module={}, size={}", // util::log_buffer(std::cout, moduleData, moduleHeader->totalSize(),
// eventHeader->crateIndex, eventHeader->eventIndex, moduleIndex, moduleHeader->totalSize())); // fmt::format("crate={}, event={}, module={}, size={}",
// eventHeader->crateIndex, eventHeader->eventIndex, moduleIndex,
// moduleHeader->totalSize()));
if (nng_msg_trim(inputMsg, moduleHeader->totalBytes())) if (nng_msg_trim(inputMsg, moduleHeader->totalBytes()))
break; break;
@ -638,25 +638,26 @@ void analysis_nng(
} }
} }
spdlog::info("analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", spdlog::info(
"analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB",
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1)); lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1));
} }
void pipe_cb(nng_pipe p, nng_pipe_ev event, void */*arg*/) void pipe_cb(nng_pipe p, nng_pipe_ev event, void * /*arg*/)
{ {
switch (event) switch (event)
{ {
case::NNG_PIPE_EV_ADD_PRE: case ::NNG_PIPE_EV_ADD_PRE:
spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_PRE"); spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_PRE");
break; break;
case::NNG_PIPE_EV_ADD_POST: case ::NNG_PIPE_EV_ADD_POST:
spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_POST"); spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_POST");
log_pipe_info(p, "NNG_PIPE_EV_ADD_POST"); log_pipe_info(p, "NNG_PIPE_EV_ADD_POST");
break; break;
case::NNG_PIPE_EV_REM_POST: case ::NNG_PIPE_EV_REM_POST:
spdlog::info("pipe_cb:NNG_PIPE_EV_REM_POST"); spdlog::info("pipe_cb:NNG_PIPE_EV_REM_POST");
break; break;
case::NNG_PIPE_EV_NUM: // silence warning case ::NNG_PIPE_EV_NUM: // silence warning
break; break;
} }
} }
@ -680,7 +681,8 @@ int main(int argc, char *argv[])
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
std::cout << fmt::format("Error: could not open '{}' for reading: {}\n", inputFilename, e.what()); std::cout << fmt::format("Error: could not open '{}' for reading: {}\n", inputFilename,
e.what());
return 1; return 1;
} }
@ -690,12 +692,13 @@ int main(int argc, char *argv[])
{ {
readHandle = zipReader.openEntry(zipReader.firstListfileEntryName()); readHandle = zipReader.openEntry(zipReader.firstListfileEntryName());
} }
catch(const std::exception& e) catch (const std::exception &e)
{ {
if (zipReader.firstListfileEntryName().empty()) if (zipReader.firstListfileEntryName().empty())
std::cout << fmt::format("Error: no MVLC listfile found in '{}'\n", inputFilename); std::cout << fmt::format("Error: no MVLC listfile found in '{}'\n", inputFilename);
else else
std::cout << fmt::format("Error: could not open listfile '{}' in '{}' for reading: {}\n", std::cout << fmt::format(
"Error: could not open listfile '{}' in '{}' for reading: {}\n",
zipReader.firstListfileEntryName(), inputFilename, e.what()); zipReader.firstListfileEntryName(), inputFilename, e.what());
return 1; return 1;
@ -710,9 +713,10 @@ int main(int argc, char *argv[])
auto parserOutputSocket = make_pair_socket(); auto parserOutputSocket = make_pair_socket();
auto analysisInputSocket = make_pair_socket(); auto analysisInputSocket = make_pair_socket();
for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket }) for (auto &socket:
{readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket})
{ {
for (auto event: { NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST }) for (auto event: {NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST})
{ {
if (int res = nng_pipe_notify(socket, event, pipe_cb, nullptr)) if (int res = nng_pipe_notify(socket, event, pipe_cb, nullptr))
mnode_nng_fatal("nng_pipe_notify", res); mnode_nng_fatal("nng_pipe_notify", res);
@ -724,25 +728,22 @@ int main(int argc, char *argv[])
log_socket_info(readerOutputSocket, "readerOutputSocket"); log_socket_info(readerOutputSocket, "readerOutputSocket");
if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0)) if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0))
mnode_nng_fatal("nng_dial inproc", res); mnode_nng_fatal("nng_dial inproc", res);
log_socket_info(parserInputSocket, "parserInputSocket"); log_socket_info(parserInputSocket, "parserInputSocket");
if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0)) if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0))
mnode_nng_fatal("nng_listen inproc", res); mnode_nng_fatal("nng_listen inproc", res);
if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0)) if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0))
mnode_nng_fatal("nng_dial inproc", res); mnode_nng_fatal("nng_dial inproc", res);
spdlog::info("replaying from {}", zipReader.firstListfileEntryName()); spdlog::info("replaying from {}", zipReader.firstListfileEntryName());
auto preamble = mvlc::listfile::read_preamble(*readHandle); auto preamble = mvlc::listfile::read_preamble(*readHandle);
const auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth() const auto bufferType =
? BufferType::MVLC_ETH (preamble.magic == mvlc::listfile::get_filemagic_eth() ? BufferType::MVLC_ETH
: BufferType::MVLC_USB); : BufferType::MVLC_USB);
auto crateConfigSection = preamble.findCrateConfig(); auto crateConfigSection = preamble.findCrateConfig();
if (!crateConfigSection) if (!crateConfigSection)
@ -757,26 +758,28 @@ int main(int argc, char *argv[])
// Seek to start, then read past the magic bytes at the beginning of the // Seek to start, then read past the magic bytes at the beginning of the
// listfile. // listfile.
(void) listfile::read_magic(*readHandle); (void)listfile::read_magic(*readHandle);
std::vector<std::thread> threads; std::vector<std::thread> threads;
threads.emplace_back(std::thread(listfile_reader_producer, threads.emplace_back(std::thread(listfile_reader_producer, readerOutputSocket,
readerOutputSocket, std::ref(*readHandle), std::cref(bufferType))); std::ref(*readHandle), std::cref(bufferType)));
threads.emplace_back(std::thread(listfile_parser_nng, threads.emplace_back(std::thread(listfile_parser_nng, parserInputSocket, parserOutputSocket,
parserInputSocket, parserOutputSocket, std::cref(crateConfig))); std::cref(crateConfig)));
threads.emplace_back(std::thread(analysis_nng, threads.emplace_back(std::thread(analysis_nng, analysisInputSocket));
analysisInputSocket));
for (auto &t: threads) if (t.joinable()) t.join(); for (auto &t: threads)
if (t.joinable())
t.join();
for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket }) for (auto &socket:
{readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket})
if (auto res = nng_close(socket)) if (auto res = nng_close(socket))
mnode_nng_fatal("nng_close", res); mnode_nng_fatal("nng_close", res);
} }
catch(const std::exception& e) catch (const std::exception &e)
{ {
spdlog::error("exception in main(): {}", e.what()); spdlog::error("exception in main(): {}", e.what());
return 1; return 1;