better allocations but broken analysis messages

This commit is contained in:
Florian Lüke 2023-07-12 20:41:09 +02:00
parent 2db593069d
commit 0bf9947a29

View file

@ -1,4 +1,5 @@
#include <limits> #include <limits>
#include <optional>
#include <mesytec-mvlc/mesytec-mvlc.h> #include <mesytec-mvlc/mesytec-mvlc.h>
#include <nng/nng.h> #include <nng/nng.h>
#include <sys/prctl.h> #include <sys/prctl.h>
@ -115,6 +116,27 @@ inline size_t fixup_buffer_mvlc_eth(const u8 *buf, size_t bufUsed, std::vector<u
return fixup_buffer(buf, bufUsed, tmpBuf, skip_func); return fixup_buffer(buf, bufUsed, tmpBuf, skip_func);
} }
int allocate_reserve_message(nng_msg **msg, size_t reserve = 0)
{
assert(msg);
if (auto res = nng_msg_alloc(msg, 0))
{
spdlog::error("nng_msg_alloc: {}", nng_strerror(res));
return res;
}
if (auto res = nng_msg_reserve(*msg, reserve))
{
spdlog::error("nng_msg_reserve: {}", nng_strerror(res));
return res;
}
return 0;
}
static const size_t DefaultOutputMessageReserve = util::Megabytes(1);
enum class BufferType: u32 enum class BufferType: u32
{ {
MVLC_USB, MVLC_USB,
@ -127,29 +149,30 @@ enum class MessageType
ParsedEvents, ParsedEvents,
}; };
struct __attribute__((packed, aligned(1))) MessageHeaderBase struct __attribute__((packed, aligned(4))) MessageHeaderBase
{ {
MessageType messageType; MessageType messageType;
u32 messageNumber; u32 messageNumber;
}; };
struct __attribute__((packed, aligned(1))) ListfileBufferMessageHeader: public MessageHeaderBase struct __attribute__((packed, aligned(4))) ListfileBufferMessageHeader: public MessageHeaderBase
{ {
u32 bufferType; u32 bufferType;
}; };
static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0); static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0);
struct __attribute__((packed, aligned(1))) ParsedEventsMessageHeader: public MessageHeaderBase struct __attribute__((packed, aligned(4))) ParsedEventsMessageHeader: public MessageHeaderBase
{ {
}; };
static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0); static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0);
void fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, std::vector<u8> &tmpBuf) size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, std::vector<u8> &tmpBuf)
{ {
size_t bytesMoved = 0u; size_t bytesMoved = 0u;
const u8 *msgBufferData = reinterpret_cast<const u8 *>(nng_msg_body(msg)) + sizeof(ListfileBufferMessageHeader); const u8 *msgBufferData = reinterpret_cast<const u8 *>(nng_msg_body(msg))
+ sizeof(ListfileBufferMessageHeader);
const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader); const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader);
if (bufferType == BufferType::MVLC_USB) if (bufferType == BufferType::MVLC_USB)
@ -158,21 +181,19 @@ void fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, s
bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf); bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf);
nng_msg_chop(msg, bytesMoved); nng_msg_chop(msg, bytesMoved);
return bytesMoved;
} }
void listfile_reader_producer( void listfile_reader_producer(
nng_socket socket, nng_socket socket,
mvlc::listfile::ReadHandle &input, mvlc::listfile::ReadHandle &input,
const mvlc::listfile::Preamble &preamble) const BufferType &bufferType)
{ {
prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0); prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0);
try try
{ {
auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth()
? BufferType::MVLC_ETH
: BufferType::MVLC_USB);
u32 bufferNumber = 0u; u32 bufferNumber = 0u;
size_t totalBytesSent = 0u; size_t totalBytesSent = 0u;
std::vector<u8> previousData; std::vector<u8> previousData;
@ -181,23 +202,25 @@ void listfile_reader_producer(
while (!done) while (!done)
{ {
nng_msg *msg = {}; nng_msg *msg = {};
if (int res = nng_msg_alloc(&msg, util::Megabytes(1)))
{
spdlog::error("nng_msg_alloc: {}", nng_strerror(res));
return;
}
auto msgHeader = reinterpret_cast<ListfileBufferMessageHeader *>(nng_msg_body(msg)); if (allocate_reserve_message(&msg, DefaultOutputMessageReserve))
msgHeader->messageType = MessageType::ListfileBuffer; return;
msgHeader->messageNumber = ++bufferNumber;
msgHeader->bufferType = static_cast<u32>(bufferType); ListfileBufferMessageHeader header
nng_msg_realloc(msg, sizeof(ListfileBufferMessageHeader)); {
MessageType::ListfileBuffer,
++bufferNumber,
static_cast<u32>(bufferType),
};
nng_msg_append(msg, &header, sizeof(header));
assert(nng_msg_len(msg) == sizeof(header));
nng_msg_append(msg, previousData.data(), previousData.size()); nng_msg_append(msg, previousData.data(), previousData.size());
previousData.clear(); previousData.clear();
size_t msgUsed = nng_msg_len(msg); size_t msgUsed = nng_msg_len(msg);
nng_msg_realloc(msg, util::Megabytes(1)); nng_msg_realloc(msg, DefaultOutputMessageReserve);
size_t bytesRead = input.read( size_t bytesRead = input.read(
reinterpret_cast<u8 *>(nng_msg_body(msg)) + msgUsed, reinterpret_cast<u8 *>(nng_msg_body(msg)) + msgUsed,
@ -237,7 +260,7 @@ void listfile_reader_producer(
} }
struct ParserNngContext struct ReadoutParserNngContext
{ {
nng_msg *outputMessage = nullptr; nng_msg *outputMessage = nullptr;
u32 outputMessageNumber = 0u; u32 outputMessageNumber = 0u;
@ -246,50 +269,42 @@ struct ParserNngContext
size_t totalSystemEvents = 0u; size_t totalSystemEvents = 0u;
}; };
struct __attribute__((packed, aligned(1))) ParsedEventHeader struct __attribute__((packed, aligned(4))) ParsedEventHeader
{ {
u32 magicByte: 8; u32 magicByte: 8;
u8 crateIndex: 8; u8 crateIndex: 8;
}; };
struct __attribute__((packed, aligned(1))) ParsedDataEventHeader: public ParsedEventHeader struct __attribute__((packed, aligned(4))) ParsedDataEventHeader: public ParsedEventHeader
{ {
u8 eventIndex: 8; u8 eventIndex: 8;
u8 moduleCount: 8; u8 moduleCount: 8;
}; };
struct __attribute__((packed, aligned(1))) ParsedSystemEventHeader: public ParsedEventHeader struct __attribute__((packed, aligned(4))) ParsedModuleHeader
{
u32 eventSize;
};
struct __attribute__((packed, aligned(1))) ParsedModuleHeader
{ {
u16 prefixSize; u16 prefixSize;
u16 suffixSize; u16 suffixSize;
u32 dynamicSize; u32 dynamicSize;
size_t totalSize() const { return prefixSize + suffixSize + dynamicSize; }
size_t totalBytes() const { return totalSize() * sizeof(u32); }
}; };
static const size_t OutputMessageReserved = util::Megabytes(1); struct __attribute__((packed, aligned(4))) ParsedSystemEventHeader: public ParsedEventHeader
{
u32 eventSize;
};
bool allocate_output_message(ParserNngContext &ctx, const char *debugInfo = "") bool parser_maybe_alloc_output(ReadoutParserNngContext &ctx)
{ {
auto &msg = ctx.outputMessage; auto &msg = ctx.outputMessage;
if (msg) if (msg)
return false; return false;
if (auto res = nng_msg_alloc(&msg, 0)) if (allocate_reserve_message(&msg, DefaultOutputMessageReserve))
{
spdlog::error("{} - nng_msg_alloc: {}", debugInfo, nng_strerror(res));
return false; return false;
}
if (auto res = nng_msg_reserve(msg, OutputMessageReserved))
{
spdlog::error("{} - nng_msg_reserve: {}", debugInfo, nng_strerror(res));
return false;
}
ParsedEventsMessageHeader header = ParsedEventsMessageHeader header =
{ {
@ -303,7 +318,7 @@ bool allocate_output_message(ParserNngContext &ctx, const char *debugInfo = "")
return true; return true;
} }
bool flush_output_message(ParserNngContext &ctx, const char *debugInfo = "") bool flush_output_message(ReadoutParserNngContext &ctx, const char *debugInfo = "")
{ {
const auto msgSize = nng_msg_len(ctx.outputMessage); const auto msgSize = nng_msg_len(ctx.outputMessage);
@ -323,7 +338,7 @@ bool flush_output_message(ParserNngContext &ctx, const char *debugInfo = "")
return true; return true;
} }
void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex, void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
const readout_parser::ModuleData *moduleDataList, unsigned moduleCount) const readout_parser::ModuleData *moduleDataList, unsigned moduleCount)
{ {
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max()); assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
@ -331,7 +346,7 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex,
assert(moduleCount < std::numeric_limits<u8>::max()); assert(moduleCount < std::numeric_limits<u8>::max());
auto &ctx = *reinterpret_cast<ParserNngContext *>(ctx_); auto &ctx = *reinterpret_cast<ReadoutParserNngContext *>(ctx_);
++ctx.totalReadoutEvents; ++ctx.totalReadoutEvents;
auto &msg = ctx.outputMessage; auto &msg = ctx.outputMessage;
@ -344,18 +359,18 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex,
requiredBytes += moduleData.data.size * sizeof(u32); requiredBytes += moduleData.data.size * sizeof(u32);
} }
size_t bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
if (msg && bytesFree < requiredBytes) if (msg && bytesFree < requiredBytes)
{ {
if (!flush_output_message(ctx, "parser_nng_eventdata_v2")) if (!flush_output_message(ctx, "parser_nng_eventdata"))
return; return;
} }
if (!msg && !allocate_output_message(ctx, "parser_nng_eventdata_v2")) if (!msg && !parser_maybe_alloc_output(ctx))
return; return;
bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
assert(bytesFree >= requiredBytes); assert(bytesFree >= requiredBytes);
ParsedDataEventHeader eventHeader = ParsedDataEventHeader eventHeader =
@ -368,7 +383,7 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex,
if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader))) if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader)))
{ {
spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
return; return;
} }
@ -383,38 +398,38 @@ void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex,
if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader))) if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader)))
{ {
spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
return; return;
} }
if (int res = nng_msg_append(msg, moduleData.data.data, moduleData.data.size * sizeof(u32))) if (int res = nng_msg_append(msg, moduleData.data.data, moduleData.data.size * sizeof(u32)))
{ {
spdlog::error("parser_nng_eventdata_v2: nng_msg_append: {}", nng_strerror(res)); spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
return; return;
} }
} }
} }
void parser_nng_systemevent_v2(void *ctx_, int crateIndex, const u32 *header, u32 size) void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 size)
{ {
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max()); assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
auto &ctx = *reinterpret_cast<ParserNngContext *>(ctx_); auto &ctx = *reinterpret_cast<ReadoutParserNngContext *>(ctx_);
++ctx.totalSystemEvents; ++ctx.totalSystemEvents;
auto &msg = ctx.outputMessage; auto &msg = ctx.outputMessage;
size_t requiredBytes = sizeof(ParsedSystemEventHeader) + size * sizeof(u32); size_t requiredBytes = sizeof(ParsedSystemEventHeader) + size * sizeof(u32);
size_t bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; size_t bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
if (msg && bytesFree < requiredBytes) if (msg && bytesFree < requiredBytes)
{ {
if (!flush_output_message(ctx, "parser_nng_systemevent_v2")) if (!flush_output_message(ctx, "parser_nng_systemevent"))
return; return;
} }
if (!msg && !allocate_output_message(ctx, "parser_nng_systemevent_v2")) if (!msg && !parser_maybe_alloc_output(ctx))
return; return;
bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u; bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u;
assert(bytesFree >= requiredBytes); assert(bytesFree >= requiredBytes);
ParsedSystemEventHeader eventHeader = ParsedSystemEventHeader eventHeader =
@ -499,15 +514,15 @@ void listfile_parser_nng(
spdlog::info("listfile_parser_nng: readout stacks:\n{}", stacksYaml); spdlog::info("listfile_parser_nng: readout stacks:\n{}", stacksYaml);
auto crateIndex = 0; auto crateIndex = 0;
ParserNngContext parserContext; ReadoutParserNngContext parserContext;
parserContext.outputSocket = outputSocket; parserContext.outputSocket = outputSocket;
auto parserState = mvlc::readout_parser::make_readout_parser( auto parserState = mvlc::readout_parser::make_readout_parser(
crateConfig.stacks, crateIndex, &parserContext); crateConfig.stacks, crateIndex, &parserContext);
mvlc::readout_parser::ReadoutParserCounters parserCounters = {}; mvlc::readout_parser::ReadoutParserCounters parserCounters = {};
mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks = mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks =
{ {
parser_nng_eventdata_v2, parser_nng_eventdata,
parser_nng_systemevent_v2, parser_nng_systemevent,
}; };
nng_msg *inputMsg = nullptr; nng_msg *inputMsg = nullptr;
@ -533,13 +548,13 @@ void listfile_parser_nng(
tSend.count() / 1000.0, tSend.count() / 1000.0,
tTotal.count() / 1000.0); tTotal.count() / 1000.0);
auto totalElapsed = std::chrono::duration_cast<std::chrono::seconds>( auto totalElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - tStart); std::chrono::steady_clock::now() - tStart);
auto totalBytes = parserCounters.bytesProcessed; auto totalBytes = parserCounters.bytesProcessed;
auto totalMiB = totalBytes / (1024.0*1024.0); auto totalMiB = totalBytes / (1024.0*1024.0);
//auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count(); //auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count();
auto MiBperSecond = totalMiB / totalElapsed.count(); auto MiBperSecond = totalMiB / totalElapsed.count() * 1000.0;
spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s", spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:} MiB, rate={:.2f} MiB/s",
outputMessageNumber, totalMiB, MiBperSecond); outputMessageNumber, totalMiB, MiBperSecond);
}; };
@ -554,6 +569,7 @@ void listfile_parser_nng(
spdlog::error("listfile_parser_nng - receive_message: {}", nng_strerror(res)); spdlog::error("listfile_parser_nng - receive_message: {}", nng_strerror(res));
return; return;
} }
spdlog::trace("listfile_parser_nng - receive_message: timeout");
} }
else if (nng_msg_len(inputMsg) < sizeof(ListfileBufferMessageHeader)) else if (nng_msg_len(inputMsg) < sizeof(ListfileBufferMessageHeader))
{ {
@ -572,7 +588,7 @@ void listfile_parser_nng(
auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber);
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;;
lastInputMessageNumber = inputHeader.messageNumber; lastInputMessageNumber = inputHeader.messageNumber;
spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); spdlog::debug("listfile_parser_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg));
nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader)); nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader));
auto inputData = reinterpret_cast<const u32 *>(nng_msg_body(inputMsg)); auto inputData = reinterpret_cast<const u32 *>(nng_msg_body(inputMsg));
@ -613,7 +629,7 @@ void listfile_parser_nng(
} }
if (parserContext.outputMessage) if (parserContext.outputMessage)
flush_output_message(parserContext, "listfile_parser_nng_v2"); flush_output_message(parserContext, "listfile_parser_nng");
if (inputMsg) if (inputMsg)
{ {
@ -645,7 +661,20 @@ void listfile_parser_nng(
readout_parser::print_counters(ss, parserCounters); readout_parser::print_counters(ss, parserCounters);
spdlog::info("listfile_parser_nng: parser counters:\n{}", ss.str()); spdlog::info("listfile_parser_nng: parser counters:\n{}", ss.str());
} }
}
template<typename T>
std::optional<T> msg_trim_read(nng_msg *msg)
{
const auto oldlen = nng_msg_len(msg);
if (nng_msg_len(msg) < sizeof(T))
return {};
T result = *reinterpret_cast<T *>(nng_msg_body(msg));
nng_msg_trim(msg, sizeof(T));
const auto newlen = nng_msg_len(msg);
assert(newlen + sizeof(T) == oldlen);
return result;
} }
void analysis_nng( void analysis_nng(
@ -657,8 +686,9 @@ void analysis_nng(
size_t totalInputBytes = 0u; size_t totalInputBytes = 0u;
u32 lastInputMessageNumber = 0u; u32 lastInputMessageNumber = 0u;
size_t inputBuffersLost = 0; size_t inputBuffersLost = 0;
bool error = false;
while (true) while (!error)
{ {
if (auto res = receive_message(inputSocket, &inputMsg)) if (auto res = receive_message(inputSocket, &inputMsg))
{ {
@ -679,19 +709,38 @@ void analysis_nng(
else else
{ {
totalInputBytes += nng_msg_len(inputMsg); totalInputBytes += nng_msg_len(inputMsg);
auto inputHeader = *reinterpret_cast<const ParsedEventsMessageHeader *>( auto inputHeader = msg_trim_read<ParsedEventsMessageHeader>(inputMsg).value();
nng_msg_body(inputMsg));
auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber); auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber);
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;; inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;;
lastInputMessageNumber = inputHeader.messageNumber; lastInputMessageNumber = inputHeader.messageNumber;
spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg)); spdlog::debug("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg));
nng_msg_trim(inputMsg, sizeof(ParsedEventsMessageHeader)); #if 0
while (auto msgLen = nng_msg_len(inputMsg))
{
auto eventHeader = msg_trim_read<ParsedDataEventHeader>(inputMsg);
//while (nng_msg_len(inputMsg)) if (!eventHeader)
//{ break;
//} if (eventHeader->magicByte != 0xF3u)
{
spdlog::error("wrong ParsedDataEventHeader magic byte");
error = true;
break;
}
for (size_t moduleIndex = 0; moduleIndex < eventHeader->moduleCount; ++moduleIndex)
{
auto moduleHeader = msg_trim_read<ParsedModuleHeader>(inputMsg);
if (moduleHeader)
{
nng_msg_trim(inputMsg, moduleHeader->totalBytes());
}
}
}
#endif
nng_msg_free(inputMsg); nng_msg_free(inputMsg);
inputMsg = nullptr; inputMsg = nullptr;
} }
@ -703,16 +752,16 @@ void analysis_nng(
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
spdlog::set_level(spdlog::level::info); spdlog::set_level(spdlog::level::debug);
if (argc < 2) if (argc < 2)
return 1; return 1;
try try
{ {
auto producerSocket = make_pair_socket(); auto readerOutputSocket = make_pair_socket();
if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0)) if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0))
mesy_nng_fatal("nng_listen inproc", res); mesy_nng_fatal("nng_listen inproc", res);
auto parserInputSocket = make_pair_socket(); auto parserInputSocket = make_pair_socket();
@ -735,12 +784,16 @@ int main(int argc, char *argv[])
spdlog::info("replaying from {}", zipReader.firstListfileEntryName()); spdlog::info("replaying from {}", zipReader.firstListfileEntryName());
auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName()); auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName());
auto preamble = mvlc::listfile::read_preamble(*readHandle); auto preamble = mvlc::listfile::read_preamble(*readHandle);
const auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth()
? BufferType::MVLC_ETH
: BufferType::MVLC_USB);
// Read past the magic bytes at the start of each listfile.
(void) listfile::read_magic(*readHandle); (void) listfile::read_magic(*readHandle);
std::vector<std::thread> threads; std::vector<std::thread> threads;
threads.emplace_back(std::thread(listfile_reader_producer, threads.emplace_back(std::thread(listfile_reader_producer,
producerSocket, std::ref(*readHandle), std::cref(preamble))); readerOutputSocket, std::ref(*readHandle), std::cref(bufferType)));
threads.emplace_back(std::thread(listfile_parser_nng, threads.emplace_back(std::thread(listfile_parser_nng,
parserInputSocket, parserOutputSocket, std::cref(preamble))); parserInputSocket, parserOutputSocket, std::cref(preamble)));