improve parser speed by flushing the output message before a realloc happens
This commit is contained in:
parent
c258a2c8c0
commit
72cd10ba46
1 changed files with 129 additions and 8 deletions
|
@ -240,6 +240,8 @@ void listfile_reader_producer(
|
|||
struct ParserNngContext
|
||||
{
|
||||
nng_msg *outputMessage = nullptr;
|
||||
u32 outputMessageNumber = 0u;
|
||||
nng_socket outputSocket = NNG_SOCKET_INITIALIZER;
|
||||
size_t totalReadoutEvents = 0u;
|
||||
size_t totalSystemEvents = 0u;
|
||||
};
|
||||
|
@ -268,7 +270,7 @@ struct __attribute__((packed, aligned(1))) ParsedModuleHeader
|
|||
u32 dynamicSize;
|
||||
};
|
||||
|
||||
void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
|
||||
void parser_nng_eventdata_v1(void *ctx_, int crateIndex, int eventIndex,
|
||||
const readout_parser::ModuleData *moduleDataList, unsigned moduleCount)
|
||||
{
|
||||
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
|
||||
|
@ -289,7 +291,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
|
|||
|
||||
if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader)))
|
||||
{
|
||||
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
|
||||
spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -305,7 +307,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
|
|||
|
||||
if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader)))
|
||||
{
|
||||
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
|
||||
spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -317,7 +319,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
|
|||
{
|
||||
if (int res = nng_msg_append(msg, dataSpan.data, dataSpan.size * sizeof(u32)))
|
||||
{
|
||||
spdlog::error("parser_nng_eventdata: nng_msg_append: {}", nng_strerror(res));
|
||||
spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -325,6 +327,122 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex,
|
|||
#endif
|
||||
}
|
||||
|
||||
void parser_nng_eventdata_v2(void *ctx_, int crateIndex, int eventIndex,
|
||||
const readout_parser::ModuleData *moduleDataList, unsigned moduleCount)
|
||||
{
|
||||
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
|
||||
assert(eventIndex >= 0 && eventIndex <= std::numeric_limits<u8>::max());
|
||||
assert(moduleCount < std::numeric_limits<u8>::max());
|
||||
|
||||
static const size_t OutputMessageReserved= util::Megabytes(1);
|
||||
|
||||
auto &parserContext = *reinterpret_cast<ParserNngContext *>(ctx_);
|
||||
++parserContext.totalReadoutEvents;
|
||||
auto &msg = parserContext.outputMessage;
|
||||
|
||||
auto flush_output_message = [&]
|
||||
{
|
||||
const auto msgSize = nng_msg_len(parserContext.outputMessage);
|
||||
|
||||
if (auto res = send_message_retry(parserContext.outputSocket, parserContext.outputMessage, 0, "listfile_parser_nng"))
|
||||
{
|
||||
nng_msg_free(parserContext.outputMessage);
|
||||
parserContext.outputMessage = nullptr;
|
||||
spdlog::error("listfile_parser_nng: send_message_retry: {}:", nng_strerror(res));
|
||||
return false;
|
||||
}
|
||||
|
||||
parserContext.outputMessage = nullptr;
|
||||
|
||||
spdlog::debug("listfile_parser_nng: sent message {} of size {}",
|
||||
parserContext.outputMessageNumber, msgSize);
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
size_t requiredBytes = sizeof(ParsedDataEventHeader);
|
||||
|
||||
for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex)
|
||||
{
|
||||
auto &moduleData = moduleDataList[moduleIndex];
|
||||
requiredBytes += sizeof(ParsedModuleHeader);
|
||||
requiredBytes += moduleData.data.size * sizeof(u32);
|
||||
}
|
||||
|
||||
size_t bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u;
|
||||
|
||||
if (msg && bytesFree < requiredBytes)
|
||||
{
|
||||
if (!flush_output_message())
|
||||
return;
|
||||
}
|
||||
|
||||
if (!msg)
|
||||
{
|
||||
if (auto res = nng_msg_alloc(&msg, 0))
|
||||
{
|
||||
spdlog::error("listfile_parser_nng - nng_msg_alloc: {}", nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
|
||||
if (auto res = nng_msg_reserve(msg, OutputMessageReserved))
|
||||
{
|
||||
spdlog::error("listfile_parser_nng - nng_msg_reserve: {}", nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
|
||||
ParsedEventsMessageHeader header =
|
||||
{
|
||||
MessageType::ParsedEvents,
|
||||
++parserContext.outputMessageNumber,
|
||||
};
|
||||
|
||||
nng_msg_append(msg, &header, sizeof(header));
|
||||
assert(nng_msg_len(msg) == sizeof(header));
|
||||
}
|
||||
|
||||
bytesFree = msg ? OutputMessageReserved - nng_msg_len(msg) : 0u;
|
||||
assert(bytesFree >= requiredBytes);
|
||||
|
||||
ParsedDataEventHeader eventHeader =
|
||||
{
|
||||
0xF3u,
|
||||
static_cast<u8>(crateIndex),
|
||||
static_cast<u8>(eventIndex),
|
||||
static_cast<u8>(moduleCount),
|
||||
};
|
||||
|
||||
if (int res = nng_msg_append(msg, &eventHeader, sizeof(eventHeader)))
|
||||
{
|
||||
spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
|
||||
#if 1
|
||||
for (size_t moduleIndex = 0; moduleIndex < moduleCount; ++moduleIndex)
|
||||
{
|
||||
auto &moduleData = moduleDataList[moduleIndex];
|
||||
|
||||
ParsedModuleHeader moduleHeader = {};
|
||||
moduleHeader.prefixSize = moduleData.prefixSize;
|
||||
moduleHeader.dynamicSize = moduleData.dynamicSize;
|
||||
moduleHeader.suffixSize = moduleData.suffixSize;
|
||||
|
||||
if (int res = nng_msg_append(msg, &moduleHeader, sizeof(moduleHeader)))
|
||||
{
|
||||
spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
|
||||
if (int res = nng_msg_append(msg, moduleData.data.data, moduleData.data.size * sizeof(u32)))
|
||||
{
|
||||
spdlog::error("parser_nng_eventdata_v1: nng_msg_append: {}", nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void parser_nng_systemevent(void *ctx_, int crateIndex, const u32 *header, u32 size)
|
||||
{
|
||||
assert(crateIndex >= 0 && crateIndex <= std::numeric_limits<u8>::max());
|
||||
|
@ -418,17 +536,18 @@ void listfile_parser_nng(
|
|||
|
||||
auto crateIndex = 0;
|
||||
ParserNngContext parserContext;
|
||||
parserContext.outputSocket = outputSocket;
|
||||
auto parserState = mvlc::readout_parser::make_readout_parser(
|
||||
crateConfig.stacks, crateIndex, &parserContext);
|
||||
mvlc::readout_parser::ReadoutParserCounters parserCounters = {};
|
||||
mvlc::readout_parser::ReadoutParserCallbacks parserCallbacks =
|
||||
{
|
||||
parser_nng_eventdata,
|
||||
parser_nng_eventdata_v2,
|
||||
parser_nng_systemevent,
|
||||
};
|
||||
|
||||
nng_msg *inputMsg = nullptr;
|
||||
u32 outputMessageNumber = 0u;
|
||||
u32 &outputMessageNumber = parserContext.outputMessageNumber;
|
||||
std::chrono::microseconds tReceive(0);
|
||||
std::chrono::microseconds tProcess(0);
|
||||
std::chrono::microseconds tSend(0);
|
||||
|
@ -456,8 +575,8 @@ void listfile_parser_nng(
|
|||
auto totalMiB = totalBytes / (1024.0*1024.0);
|
||||
//auto bytesPerSecond = 1.0 * totalBytes / totalElapsed.count();
|
||||
auto MiBperSecond = totalMiB / totalElapsed.count();
|
||||
spdlog::info("listfile_parser_nng: bytesProcessed={:.2f}, rate={:.2f} MiB/s",
|
||||
totalMiB, MiBperSecond);
|
||||
spdlog::info("listfile_parser_nng: outputMessages={}, bytesProcessed={:.2f} MiB, rate={:.2f} MiB/s",
|
||||
outputMessageNumber, totalMiB, MiBperSecond);
|
||||
};
|
||||
|
||||
while (true)
|
||||
|
@ -523,6 +642,7 @@ void listfile_parser_nng(
|
|||
tProcess += stopWatch.interval();
|
||||
|
||||
// TODO: also flush after a certain time
|
||||
#if 0
|
||||
if (nng_msg_len(parserContext.outputMessage) >= util::Megabytes(1))
|
||||
{
|
||||
const auto msgSize = nng_msg_len(parserContext.outputMessage);
|
||||
|
@ -542,6 +662,7 @@ void listfile_parser_nng(
|
|||
}
|
||||
|
||||
tSend += stopWatch.interval();
|
||||
#endif
|
||||
tTotal += stopWatch.end();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue