implement a working mana_nng_client

Speed is great: it's faster when using the root-histogram plugin. In the
case of the cpp-test plugin, which does not do anything, it's slower:
6 GB/s vs 9 GB/s when running the plugin directly.

-> As soon as a bit of work is done per event it should always be better to
buffer up parsed event data and process it in a tight loop (in a different
thread).
This commit is contained in:
Florian Lüke 2024-12-29 00:17:38 +01:00
parent 99949f08e4
commit 8dfa2d798a

View file

@ -4,6 +4,7 @@
#include "internal/argh.h" #include "internal/argh.h"
#include "internal/mana_analysis.h" // ManaCountingSink #include "internal/mana_analysis.h" // ManaCountingSink
#include "internal/mana_api.hpp" #include "internal/mana_api.hpp"
#include "internal/mana_nng.hpp"
#include <iostream> #include <iostream>
@ -22,6 +23,145 @@ void usage(const char *self)
"'off'.\n"; "'off'.\n";
} }
bool process_event_data(nng_msg *msg, mana::IManaSink *sink)
{
while (nng_msg_len(msg) >= sizeof(mana::EventDataHeader))
{
auto header = nng::msg_trim_read<mana::EventDataHeader>(msg);
assert(header);
if (nng_msg_len(msg) < header->sizeBytes)
{
spdlog::error("Error reading event data: expected {} bytes, got {}", header->sizeBytes,
nng_msg_len(msg));
return false;
}
auto arrays = reinterpret_cast<mana_offset_array_t *>(nng_msg_body(msg));
sink->process_event(header->eventIndex, arrays, header->arrayCount, header->sizeBytes);
nng_msg_trim(msg, header->sizeBytes);
}
return nng_msg_len(msg) == 0;
}
void run(nng_socket socket_, mana::IManaSink *sink, std::atomic<bool> &quit)
{
auto receive_message = [socket_, &quit]()
{
auto rp = [&quit](int res) { return !quit && res == NNG_ETIMEDOUT; };
return nng::receive_message_retry(socket_, 0, rp);
};
enum class State
{
BeginRun,
Run,
};
auto state = State::BeginRun;
while (!quit)
{
switch (state)
{
case State::BeginRun:
{
spdlog::info("Waiting for BeginRun from server");
auto [msg, res] = receive_message();
if (res)
{
nng::mnode_nng_error("nng_recvmsg", res);
return;
}
const size_t msgSize = nng_msg_len(msg.get());
spdlog::info("State::BeginRun: Received message of size {}", msgSize);
auto header = nng::msg_trim_read<mana::MessageHeader>(msg.get());
if (!header)
{
spdlog::error("Error reading message header");
return;
}
spdlog::info("State::BeginRun: messageType={}", static_cast<int>(header->messageType));
if (header->messageType == mana::MessageHeader::BeginRun)
{
spdlog::info("Received BeginRun message (size={})", msgSize);
std::string descriptor_json;
std::copy(reinterpret_cast<const char *>(nng_msg_body(msg.get())),
reinterpret_cast<const char *>(nng_msg_body(msg.get())) + msgSize,
std::back_inserter(descriptor_json));
sink->begin_run(descriptor_json.c_str());
if (auto res = nng::send_empty_message(socket_))
{
nng::mnode_nng_error("nng_sendmsg", res);
return;
}
spdlog::info("Receiving Data");
state = State::Run;
}
else
{
spdlog::error("Error reading BeginRun header");
return;
}
}
break;
case State::Run:
{
auto [msg, res] = receive_message();
if (res)
{
nng::mnode_nng_error("nng_recvmsg", res);
return;
}
const size_t msgSize = nng_msg_len(msg.get());
spdlog::debug("State::Run: Received message of size {}", msgSize);
auto header = nng::msg_trim_read<mana::MessageHeader>(msg.get());
if (!header)
{
spdlog::error("Error reading message header");
return;
}
spdlog::debug("State::Run: messageType={}", static_cast<int>(header->messageType));
if (header->messageType == mana::MessageHeader::EndRun)
{
spdlog::info("Received EndRun message");
std::string descriptor_json;
std::copy(reinterpret_cast<const char *>(nng_msg_body(msg.get())),
reinterpret_cast<const char *>(nng_msg_body(msg.get())) + msgSize,
std::back_inserter(descriptor_json));
sink->end_run(descriptor_json.c_str());
return;
}
else if (header->messageType == mana::MessageHeader::EventData)
{
if (!process_event_data(msg.get(), sink))
return;
}
else
{
spdlog::error("Unknown message type: {}", static_cast<int>(header->messageType));
return;
}
}
break;
}
}
}
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
argh::parser parser({"-h", "--help", "--plugin", "--log-level"}); argh::parser parser({"-h", "--help", "--plugin", "--log-level"});
@ -79,4 +219,42 @@ int main(int argc, char *argv[])
{ {
destSink = std::make_unique<mana::ManaCountingSink>(); destSink = std::make_unique<mana::ManaCountingSink>();
} }
auto manaSink = std::make_unique<mana::ManaSinkPerfProxy>(destSink.get());
nng_socket socket = nng::make_pair_socket();
if (int res = nng_dial(socket, url.c_str(), nullptr, 0))
{
nng::mnode_nng_error("nng_dial", res);
return 1;
}
spdlog::info("connected to {}", url);
std::atomic<bool> quit = false;
manaSink->init(0, nullptr);
run(socket, manaSink.get(), quit);
manaSink->shutdown();
auto perf = manaSink->perf();
{
auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes),
static_cast<size_t>(0));
auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits),
static_cast<size_t>(0));
double elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(perf.t_endRun - perf.t_beginRun)
.count();
double elapsed_s = elapsed_ms / 1000.0;
double MiB = totalBytes / (1024.0 * 1024);
double MiB_s = MiB / elapsed_s;
double hit_s = totalHits / elapsed_s;
fmt::print("Data Sink Performance: events={}, bytes={:.2f} MiB, elapsed={:.2f} s, "
"event_rate={:.2f} event/s, data_rate={:.2f} MiB/s\n",
totalHits, MiB, elapsed_s, hit_s, MiB_s);
}
return 0;
} }