slow replay
This commit is contained in:
parent
ae815d4adb
commit
32aa3d6d08
1 changed files with 528 additions and 39 deletions
|
@ -1,9 +1,36 @@
|
||||||
|
#include <limits>
|
||||||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||||
#include <nng/nng.h>
|
#include <nng/nng.h>
|
||||||
|
#include <sys/prctl.h>
|
||||||
|
#include "common.h"
|
||||||
|
|
||||||
using namespace mesytec;
|
using namespace mesytec;
|
||||||
using namespace mesytec::mvlc;
|
using namespace mesytec::mvlc;
|
||||||
|
|
||||||
|
int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 0, const char *debugInfo = "")
|
||||||
|
{
|
||||||
|
int res = 0;
|
||||||
|
size_t attempt = 0u;
|
||||||
|
|
||||||
|
do
|
||||||
|
{
|
||||||
|
res = nng_sendmsg(socket, msg, 0);
|
||||||
|
|
||||||
|
if (res && res == NNG_ETIMEDOUT)
|
||||||
|
spdlog::warn("send_message_retry: {} - send timeout", debugInfo);
|
||||||
|
|
||||||
|
if (res && res != NNG_ETIMEDOUT)
|
||||||
|
return res;
|
||||||
|
|
||||||
|
if (res && maxTries > 0 && attempt >= maxTries)
|
||||||
|
return res;
|
||||||
|
|
||||||
|
++attempt;
|
||||||
|
} while (res == NNG_ETIMEDOUT);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// Follows the framing structure inside the buffer until an incomplete frame
|
// Follows the framing structure inside the buffer until an incomplete frame
|
||||||
// which doesn't fit into the buffer is detected. The incomplete data is moved
|
// which doesn't fit into the buffer is detected. The incomplete data is moved
|
||||||
// over to the tempBuffer so that the readBuffer ends with a complete frame.
|
// over to the tempBuffer so that the readBuffer ends with a complete frame.
|
||||||
|
@ -94,44 +121,64 @@ enum class BufferType: u32
|
||||||
MVLC_ETH,
|
MVLC_ETH,
|
||||||
};
|
};
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
enum class MessageType
|
||||||
struct BufferMessageHeader
|
|
||||||
{
|
{
|
||||||
u32 bufferType;
|
ListfileBuffer,
|
||||||
u32 bufferNumber;
|
ParsedEvents,
|
||||||
};
|
};
|
||||||
|
|
||||||
#pragma pack(pop)
|
struct __attribute__((packed, aligned(1))) MessageHeaderBase
|
||||||
|
{
|
||||||
|
MessageType messageType;
|
||||||
|
u32 messageNumber;
|
||||||
|
};
|
||||||
|
|
||||||
size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, size_t msgUsed, std::vector<u8> &tmpBuf)
|
struct __attribute__((packed, aligned(1))) ListfileBufferMessageHeader: public MessageHeaderBase
|
||||||
|
{
|
||||||
|
u32 bufferType;
|
||||||
|
};
|
||||||
|
|
||||||
|
static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0);
|
||||||
|
|
||||||
|
struct __attribute__((packed, aligned(1))) ParsedEventsMessageHeader: public MessageHeaderBase
|
||||||
|
{
|
||||||
|
};
|
||||||
|
|
||||||
|
static_assert(sizeof(ParsedEventsMessageHeader) % sizeof(u32) == 0);
|
||||||
|
|
||||||
|
void fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, std::vector<u8> &tmpBuf)
|
||||||
{
|
{
|
||||||
size_t bytesMoved = 0u;
|
size_t bytesMoved = 0u;
|
||||||
const u8 *msgBufferData = reinterpret_cast<const u8 *>(nng_msg_body(msg)) + sizeof(BufferMessageHeader);
|
const u8 *msgBufferData = reinterpret_cast<const u8 *>(nng_msg_body(msg)) + sizeof(ListfileBufferMessageHeader);
|
||||||
const auto msgBufferSize = msgUsed - sizeof(BufferMessageHeader);
|
const auto msgBufferSize = nng_msg_len(msg) - sizeof(ListfileBufferMessageHeader);
|
||||||
|
|
||||||
if (bufferType == BufferType::MVLC_USB)
|
if (bufferType == BufferType::MVLC_USB)
|
||||||
bytesMoved = fixup_buffer_mvlc_usb(msgBufferData, msgBufferSize, tmpBuf);
|
bytesMoved = fixup_buffer_mvlc_usb(msgBufferData, msgBufferSize, tmpBuf);
|
||||||
else
|
else
|
||||||
bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf);
|
bytesMoved = fixup_buffer_mvlc_eth(msgBufferData, msgBufferSize, tmpBuf);
|
||||||
|
|
||||||
nng_msg_chop(msg, msgUsed - bytesMoved);
|
nng_msg_chop(msg, bytesMoved);
|
||||||
return msgUsed - bytesMoved;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void listfile_reader_produer(
|
void listfile_reader_producer(
|
||||||
nng_socket socket,
|
nng_socket socket,
|
||||||
mvlc::listfile::ReadHandle &input)
|
mvlc::listfile::ReadHandle &input,
|
||||||
|
const mvlc::listfile::Preamble &preamble)
|
||||||
{
|
{
|
||||||
|
prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto preamble = mvlc::listfile::read_preamble(input);
|
auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth()
|
||||||
auto bufferType = BufferType::MVLC_USB;
|
? BufferType::MVLC_ETH
|
||||||
if (preamble.magic == mvlc::listfile::get_filemagic_eth())
|
: BufferType::MVLC_USB);
|
||||||
bufferType = BufferType::MVLC_ETH;
|
|
||||||
u32 bufferNumber = 0u;
|
|
||||||
std::vector<u8> previousData;
|
|
||||||
|
|
||||||
while (true)
|
u32 bufferNumber = 0u;
|
||||||
|
size_t totalBytesSent = 0u;
|
||||||
|
std::vector<u8> previousData;
|
||||||
|
bool done = false;
|
||||||
|
|
||||||
|
while (!done)
|
||||||
{
|
{
|
||||||
nng_msg *msg = {};
|
nng_msg *msg = {};
|
||||||
if (int res = nng_msg_alloc(&msg, util::Megabytes(1)))
|
if (int res = nng_msg_alloc(&msg, util::Megabytes(1)))
|
||||||
|
@ -140,57 +187,499 @@ void listfile_reader_produer(
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
BufferMessageHeader msgHeader = {};
|
auto msgHeader = reinterpret_cast<ListfileBufferMessageHeader *>(nng_msg_body(msg));
|
||||||
msgHeader.bufferType = static_cast<u32>(bufferType);
|
msgHeader->messageType = MessageType::ListfileBuffer;
|
||||||
msgHeader.bufferNumber = bufferNumber;
|
msgHeader->messageNumber = ++bufferNumber;
|
||||||
nng_msg_append(msg, &msgHeader, sizeof(msgHeader));
|
msgHeader->bufferType = static_cast<u32>(bufferType);
|
||||||
size_t msgUsed = sizeof(msgHeader);
|
nng_msg_realloc(msg, sizeof(ListfileBufferMessageHeader));
|
||||||
|
|
||||||
std::copy(std::begin(previousData), std::end(previousData),
|
nng_msg_append(msg, previousData.data(), previousData.size());
|
||||||
reinterpret_cast<u8 *>(nng_msg_body(msg)) + msgUsed);
|
|
||||||
msgUsed += previousData.size();
|
|
||||||
previousData.clear();
|
previousData.clear();
|
||||||
|
|
||||||
|
size_t msgUsed = nng_msg_len(msg);
|
||||||
|
nng_msg_realloc(msg, util::Megabytes(1));
|
||||||
|
|
||||||
size_t bytesRead = input.read(
|
size_t bytesRead = input.read(
|
||||||
reinterpret_cast<u8 *>(nng_msg_body(msg)) + msgUsed,
|
reinterpret_cast<u8 *>(nng_msg_body(msg)) + msgUsed,
|
||||||
nng_msg_len(msg) - msgUsed);
|
nng_msg_len(msg) - msgUsed);
|
||||||
msgUsed += bytesRead;
|
|
||||||
msgUsed = fixup_listfile_buffer_message(bufferType, msg, msgUsed, previousData);
|
|
||||||
assert(msgUsed == nng_msg_len(msg));
|
|
||||||
|
|
||||||
if (bytesRead == 0 && nng_msg_len(msg) == sizeof(BufferMessageHeader))
|
nng_msg_realloc(msg, msgUsed + bytesRead);
|
||||||
|
fixup_listfile_buffer_message(bufferType, msg, previousData);
|
||||||
|
|
||||||
|
done = (bytesRead == 0 && nng_msg_len(msg) == sizeof(ListfileBufferMessageHeader));
|
||||||
|
|
||||||
|
if (done)
|
||||||
|
nng_msg_realloc(msg, 0);
|
||||||
|
|
||||||
|
const auto msgSize = nng_msg_len(msg);
|
||||||
|
|
||||||
|
if (auto res = send_message_retry(socket, msg, 0, "listfile_reader_producer"))
|
||||||
{
|
{
|
||||||
nng_msg_free(msg);
|
nng_msg_free(msg);
|
||||||
|
msg = nullptr;
|
||||||
|
spdlog::error("listfile_reader_producer: send_message_retry: {}", nng_strerror(res));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
else if (auto res = nng_sendmsg(socket, msg, 0))
|
|
||||||
|
spdlog::info("listfile_reader_producer: sent message {} of size {}",
|
||||||
|
bufferNumber, msgSize);
|
||||||
|
totalBytesSent += msgSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB",
|
||||||
|
bufferNumber, 1.0 * totalBytesSent / util::Megabytes(1));
|
||||||
|
}
|
||||||
|
catch(const std::exception& e)
|
||||||
|
{
|
||||||
|
spdlog::error("listfile_reader_prroducer: exception: {}", e.what());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ParserNngContext
|
||||||
|
{
|
||||||
|
nng_msg *outputMessage = nullptr;
|
||||||
|
size_t totalReadoutEvents = 0u;
|
||||||
|
size_t totalSystemEvents = 0u;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct __attribute__((packed, aligned(1))) ParsedEventHeader
|
||||||
|
{
|
||||||
|
u32 magicByte: 8;
|
||||||
|
u8 crateIndex: 8;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct __attribute__((packed, aligned(1))) ParsedDataEventHeader: public ParsedEventHeader
|
||||||
|
{
|
||||||
|
u8 eventIndex: 8;
|
||||||
|
u8 moduleCount: 8;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct __attribute__((packed, aligned(1))) ParsedSystemEventHeader: public ParsedEventHeader
|
||||||
|
{
|
||||||
|
u32 eventSize;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct __attribute__((packed, aligned(1))) ParsedModuleHeader
|
||||||
|
{
|
||||||
|
u16 prefixSize;
|
||||||
|
u16 suffixSize;
|
||||||
|
u32 dynamicSize;
|
||||||
|
};
|
||||||
|
|
||||||
|
void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
|
||||||
|
const readout_parser::ModuleData *moduleDataList, unsigned moduleCount)
|
||||||
|
{
|
||||||
|
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
|
||||||
|
assert(eventIndex >= 0 && eventIndex <= std::numeric_limits<u8>::max());
|
||||||
|
assert(moduleCount < std::numeric_limits<u8>::max());
|
||||||
|
|
||||||
|
auto &ctx = *reinterpret_cast<ParserNngContext *>(ctx_);
|
||||||
|
++ctx.totalReadoutEvents;
|
||||||
|
auto msg = ctx.outputMessage;
|
||||||
|
|
||||||
|
ParsedDataEventHeader eventHeader =
|
||||||
|
{
|
||||||
|
0xF3u,
|
||||||
|
static_cast<u8>(crateIndex),
|
||||||
|
static_cast<u8>(eventIndex),
|
||||||
|
static_cast<u8>(moduleCount),
|
||||||
|
};
|
||||||
|
|
||||||
|
if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader)))
|
||||||
|
{
|
||||||
|
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex)
|
||||||
|
{
|
||||||
|
auto &moduleData = moduleDataList[moduleIndex];
|
||||||
|
|
||||||
|
ParsedModuleHeader moduleHeader = {};
|
||||||
|
moduleHeader.prefixSize = moduleData.prefixSize;
|
||||||
|
moduleHeader.dynamicSize = moduleData.dynamicSize;
|
||||||
|
moduleHeader.suffixSize = moduleData.suffixSize;
|
||||||
|
|
||||||
|
if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader)))
|
||||||
|
{
|
||||||
|
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto prefixSpan = prefix_span(moduleData);
|
||||||
|
auto dynamicSpan = dynamic_span(moduleData);
|
||||||
|
auto suffixSpan = suffix_span(moduleData);
|
||||||
|
|
||||||
|
for (auto &dataSpan: { prefixSpan, dynamicSpan, suffixSpan })
|
||||||
|
{
|
||||||
|
if (int res = nng_msg_append(msg, dataSpan.data, dataSpan.size * sizeof(u32)))
|
||||||
{
|
{
|
||||||
// TODO: how to handle blocking here? need some way to ensure
|
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
|
||||||
// nng_sendmsg() returns
|
|
||||||
// -> maybe: timeout and loop with check if the producer should quit
|
|
||||||
spdlog::error("nng_sendmsg: {}", nng_strerror(res));
|
|
||||||
nng_msg_free(msg);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch(const std::exception& e)
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 size)
|
||||||
|
{
|
||||||
|
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
|
||||||
|
|
||||||
|
auto &ctx = *reinterpret_cast<ParserNngContext *>(ctx_);
|
||||||
|
++ctx.totalSystemEvents;
|
||||||
|
|
||||||
|
auto msg = ctx.outputMessage;
|
||||||
|
|
||||||
|
ParsedSystemEventHeader eventHeader =
|
||||||
{
|
{
|
||||||
spdlog::error(e.what());
|
0xFAu,
|
||||||
|
static_cast<u8>(crateIndex),
|
||||||
|
size,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader)))
|
||||||
|
{
|
||||||
|
spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (int res = nng_msg_append(msg, header, size * sizeof(u32)))
|
||||||
|
{
|
||||||
|
spdlog::error("parser_nng_systemevent: nng_msg_append: {}", nng_strerror(res));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class StopWatch
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
using duration_type = std::chrono::microseconds;
|
||||||
|
|
||||||
|
void start()
|
||||||
|
{
|
||||||
|
tStart_ = tInterval_ = std::chrono::high_resolution_clock::now();
|
||||||
|
}
|
||||||
|
|
||||||
|
duration_type interval()
|
||||||
|
{
|
||||||
|
auto now = std::chrono::high_resolution_clock::now();
|
||||||
|
auto result = std::chrono::duration_cast<duration_type>(now - tInterval_);
|
||||||
|
tInterval_ = now;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
duration_type end()
|
||||||
|
{
|
||||||
|
auto now = std::chrono::high_resolution_clock::now();
|
||||||
|
auto result = std::chrono::duration_cast<duration_type>(now - tStart_);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::chrono::high_resolution_clock::time_point tStart_;
|
||||||
|
std::chrono::high_resolution_clock::time_point tInterval_;
|
||||||
|
};
|
||||||
|
|
||||||
|
void listfile_parser_nng(
|
||||||
|
nng_socket inputSocket,
|
||||||
|
nng_socket outputSocket,
|
||||||
|
const mvlc::listfile::Preamble &preamble)
|
||||||
|
{
|
||||||
|
prctl(PR_SET_NAME,"listfile_parser_nng",0,0,0);
|
||||||
|
|
||||||
|
size_t totalInputBytes = 0u;
|
||||||
|
u32 lastInputMessageNumber = 0u;
|
||||||
|
size_t inputBuffersLost = 0;
|
||||||
|
const auto listfileFormat = (preamble.magic == mvlc::listfile::get_filemagic_eth()
|
||||||
|
? ConnectionType::ETH
|
||||||
|
: ConnectionType::USB);
|
||||||
|
|
||||||
|
auto configSection = preamble.findCrateConfig();
|
||||||
|
|
||||||
|
if (!configSection)
|
||||||
|
{
|
||||||
|
spdlog::error("listfile_parser_nng - no CrateConfig found in listfile preamble");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto configYaml = configSection->contentsToString();
|
||||||
|
auto crateConfig = mvlc::crate_config_from_yaml(configYaml);
|
||||||
|
|
||||||
|
spdlog::info("listfile_parser_nng: CrateConfig:\n{}", to_yaml(crateConfig));
|
||||||
|
|
||||||
|
auto crateIndex = 0;
|
||||||
|
ParserNngContext parserContext;
|
||||||
|
auto parserState = mvlc::readout_parser::make_readout_parser(
|
||||||
|
crateConfig.stacks, crateIndex, &parserContext);
|
||||||
|
mvlc::readout_parser::ReadoutParserCounters parserCounters = {};
|
||||||
|
mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks =
|
||||||
|
{
|
||||||
|
parser_nng_eventdata,
|
||||||
|
parser_nng_systemevent,
|
||||||
|
};
|
||||||
|
|
||||||
|
nng_msg *inputMsg = nullptr;
|
||||||
|
u32 outputMessageNumber = 0u;
|
||||||
|
std::chrono::microseconds tReceive;
|
||||||
|
std::chrono::microseconds tProcess;
|
||||||
|
std::chrono::microseconds tSend;
|
||||||
|
std::chrono::microseconds tTotal;
|
||||||
|
auto tLastReport = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
StopWatch stopWatch;
|
||||||
|
stopWatch.start();
|
||||||
|
if (auto res = receive_message(inputSocket, &inputMsg))
|
||||||
|
{
|
||||||
|
if (res != NNG_ETIMEDOUT)
|
||||||
|
{
|
||||||
|
spdlog::error("listfile_parser_nng - receive_message: {}", nng_strerror(res));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (nng_msg_len(inputMsg) < sizeof(ListfileBufferMessageHeader))
|
||||||
|
{
|
||||||
|
if (nng_msg_len(inputMsg) == 0)
|
||||||
|
break;
|
||||||
|
|
||||||
|
spdlog::warn("listfile_parser_nng - incoming message too short (len={})",
|
||||||
|
nng_msg_len(inputMsg));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
tReceive += stopWatch.interval();
|
||||||
|
totalInputBytes += nng_msg_len(inputMsg);
|
||||||
|
auto inputHeader = *reinterpret_cast<const ListfileBufferMessageHeader *>(
|
||||||
|
nng_msg_body(inputMsg));
|
||||||
|
auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber);
|
||||||
|
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;;
|
||||||
|
lastInputMessageNumber = inputHeader.messageNumber;
|
||||||
|
spdlog::info("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg));
|
||||||
|
|
||||||
|
nng_msg_trim(inputMsg, sizeof(ListfileBufferMessageHeader));
|
||||||
|
auto inputData = reinterpret_cast<const u32 *>(nng_msg_body(inputMsg));
|
||||||
|
size_t inputLen = nng_msg_len(inputMsg) / sizeof(u32);
|
||||||
|
|
||||||
|
if (!parserContext.outputMessage)
|
||||||
|
{
|
||||||
|
if (auto res = nng_msg_alloc(&parserContext.outputMessage, util::Megabytes(1)))
|
||||||
|
{
|
||||||
|
spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto &outputHeader = *reinterpret_cast<ParsedEventsMessageHeader *>(nng_msg_body(parserContext.outputMessage));
|
||||||
|
outputHeader.messageType = MessageType::ParsedEvents;
|
||||||
|
outputHeader.messageNumber = ++outputMessageNumber;
|
||||||
|
nng_msg_realloc(parserContext.outputMessage, sizeof(ParsedEventsMessageHeader));
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(parserContext.outputMessage);
|
||||||
|
|
||||||
|
readout_parser::parse_readout_buffer(
|
||||||
|
listfileFormat,
|
||||||
|
parserState,
|
||||||
|
parserCallbacks,
|
||||||
|
parserCounters,
|
||||||
|
inputHeader.messageNumber,
|
||||||
|
inputData,
|
||||||
|
inputLen);
|
||||||
|
|
||||||
|
tProcess += stopWatch.interval();
|
||||||
|
|
||||||
|
// TODO: also flush after a certain time
|
||||||
|
if (nng_msg_len(parserContext.outputMessage) >= util::Megabytes(1))
|
||||||
|
{
|
||||||
|
const auto msgSize = nng_msg_len(parserContext.outputMessage);
|
||||||
|
|
||||||
|
if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng"))
|
||||||
|
{
|
||||||
|
nng_msg_free(parserContext.outputMessage);
|
||||||
|
parserContext.outputMessage = nullptr;
|
||||||
|
spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
parserContext.outputMessage = nullptr;
|
||||||
|
|
||||||
|
spdlog::info("listfile_parser_nng: sent message {} of size {}",
|
||||||
|
outputMessageNumber, msgSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
tSend += stopWatch.interval();
|
||||||
|
tTotal += stopWatch.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inputMsg)
|
||||||
|
{
|
||||||
|
nng_msg_free(inputMsg);
|
||||||
|
inputMsg = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
auto now = std::chrono::steady_clock::now();
|
||||||
|
auto tElapsed = now - tLastReport;
|
||||||
|
static const auto ReportInterval = std::chrono::seconds(1);
|
||||||
|
|
||||||
|
if (tElapsed >= ReportInterval)
|
||||||
|
{
|
||||||
|
spdlog::info("listfile_parser_nng: time budget:\n"
|
||||||
|
" tReceive = {}\n"
|
||||||
|
" tProcess = {}\n"
|
||||||
|
" tSend = {}\n"
|
||||||
|
" tTotal = {}\n",
|
||||||
|
tReceive.count(),
|
||||||
|
tProcess.count(),
|
||||||
|
tSend.count(),
|
||||||
|
tTotal.count());
|
||||||
|
tLastReport = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (inputMsg)
|
||||||
|
{
|
||||||
|
nng_msg_free(inputMsg);
|
||||||
|
inputMsg = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(!parserContext.outputMessage);
|
||||||
|
|
||||||
|
spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB",
|
||||||
|
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1));
|
||||||
|
spdlog::info("listfile_parser_nng: time budget:\n"
|
||||||
|
" tReceive = {} ms\n"
|
||||||
|
" tProcess = {} ms\n"
|
||||||
|
" tSend = {} ms\n"
|
||||||
|
" tTotal = {} ms\n",
|
||||||
|
tReceive.count() / 1000.0,
|
||||||
|
tProcess.count() / 1000.0,
|
||||||
|
tSend.count() / 1000.0,
|
||||||
|
tTotal.count() / 1000.0);
|
||||||
|
|
||||||
|
// send empty message
|
||||||
|
if (auto res = nng_msg_alloc(&parserContext.outputMessage, 0))
|
||||||
|
{
|
||||||
|
spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (auto res = send_message_retry(outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng"))
|
||||||
|
{
|
||||||
|
nng_msg_free(parserContext.outputMessage);
|
||||||
|
parserContext.outputMessage = nullptr;
|
||||||
|
spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void analysis_nng(
|
||||||
|
nng_socket inputSocket
|
||||||
|
)
|
||||||
|
{
|
||||||
|
prctl(PR_SET_NAME,"analysis_nng",0,0,0);
|
||||||
|
nng_msg *inputMsg = nullptr;
|
||||||
|
size_t totalInputBytes = 0u;
|
||||||
|
u32 lastInputMessageNumber = 0u;
|
||||||
|
size_t inputBuffersLost = 0;
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
if (auto res = receive_message(inputSocket, &inputMsg))
|
||||||
|
{
|
||||||
|
if (res != NNG_ETIMEDOUT)
|
||||||
|
{
|
||||||
|
spdlog::error("analysis_nng - receive_message: {}", nng_strerror(res));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (nng_msg_len(inputMsg) < sizeof(ParsedEventsMessageHeader))
|
||||||
|
{
|
||||||
|
if (nng_msg_len(inputMsg) == 0)
|
||||||
|
break;
|
||||||
|
|
||||||
|
spdlog::warn("analysis_nng - incoming message too short (len={})",
|
||||||
|
nng_msg_len(inputMsg));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
totalInputBytes += nng_msg_len(inputMsg);
|
||||||
|
auto inputHeader = *reinterpret_cast<const ParsedEventsMessageHeader *>(
|
||||||
|
nng_msg_body(inputMsg));
|
||||||
|
auto bufferLoss = readout_parser::calc_buffer_loss(inputHeader.messageNumber, lastInputMessageNumber);
|
||||||
|
inputBuffersLost += bufferLoss >= 0 ? bufferLoss : 0u;;
|
||||||
|
lastInputMessageNumber = inputHeader.messageNumber;
|
||||||
|
spdlog::info("analysis_nng: received message {} of size {}", lastInputMessageNumber, nng_msg_len(inputMsg));
|
||||||
|
|
||||||
|
nng_msg_trim(inputMsg, sizeof(ParsedEventsMessageHeader));
|
||||||
|
|
||||||
|
//while (nng_msg_len(inputMsg))
|
||||||
|
//{
|
||||||
|
|
||||||
|
//}
|
||||||
|
nng_msg_free(inputMsg);
|
||||||
|
inputMsg = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spdlog::info("analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB",
|
||||||
|
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1));
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char *argv[])
|
int main(int argc, char *argv[])
|
||||||
{
|
{
|
||||||
|
spdlog::set_level(spdlog::level::info);
|
||||||
|
|
||||||
if (argc < 2)
|
if (argc < 2)
|
||||||
return 1;
|
return 1;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
auto producerSocket = make_pair_socket();
|
||||||
|
|
||||||
|
if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0))
|
||||||
|
mesy_nng_fatal("nng_listen inproc", res);
|
||||||
|
|
||||||
|
auto parserInputSocket = make_pair_socket();
|
||||||
|
|
||||||
|
if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0))
|
||||||
|
mesy_nng_fatal("nng_dial inproc", res);
|
||||||
|
|
||||||
|
auto parserOutputSocket = make_pair_socket();
|
||||||
|
|
||||||
|
if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0))
|
||||||
|
mesy_nng_fatal("nng_listen inproc", res);
|
||||||
|
|
||||||
|
auto analysisInputSocket = make_pair_socket();
|
||||||
|
|
||||||
|
if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0))
|
||||||
|
mesy_nng_fatal("nng_dial inproc", res);
|
||||||
|
|
||||||
listfile::ZipReader zipReader;
|
listfile::ZipReader zipReader;
|
||||||
zipReader.openArchive(argv[1]);
|
zipReader.openArchive(argv[1]);
|
||||||
|
spdlog::info("replaying from {}", zipReader.firstListfileEntryName());
|
||||||
auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName());
|
auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName());
|
||||||
|
auto preamble = mvlc::listfile::read_preamble(*readHandle);
|
||||||
|
(void) listfile::read_magic(*readHandle);
|
||||||
|
|
||||||
|
std::vector<std::thread> threads;
|
||||||
|
|
||||||
|
threads.emplace_back(std::thread(listfile_reader_producer,
|
||||||
|
producerSocket, std::ref(*readHandle), std::cref(preamble)));
|
||||||
|
|
||||||
|
threads.emplace_back(std::thread(listfile_parser_nng,
|
||||||
|
parserInputSocket, parserOutputSocket, std::cref(preamble)));
|
||||||
|
|
||||||
|
threads.emplace_back(std::thread(analysis_nng,
|
||||||
|
analysisInputSocket));
|
||||||
|
|
||||||
|
for (auto &t: threads) if (t.joinable()) t.join();
|
||||||
}
|
}
|
||||||
catch(const std::exception& e)
|
catch(const std::exception& e)
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in a new issue