From 8dfa2d798a529820d145d9c5549019b24d08d67f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Sun, 29 Dec 2024 00:17:38 +0100 Subject: [PATCH] implement a working mana_nng_client Speed is great: it's faster when using the root-histogram plugin. In the case of the cpp-test plugin, which does not do anything, it's slower: 6 GB/s vs 9 GB/s when running the plugin directly. -> As soon as a bit of work is done per event it should always be better to buffer up parsed event data and process it in a tight loop (in a different thread). --- src/tools/mana_nng_client.cc | 178 +++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) diff --git a/src/tools/mana_nng_client.cc b/src/tools/mana_nng_client.cc index fcc1b39..7c09109 100644 --- a/src/tools/mana_nng_client.cc +++ b/src/tools/mana_nng_client.cc @@ -4,6 +4,7 @@ #include "internal/argh.h" #include "internal/mana_analysis.h" // ManaCountingSink #include "internal/mana_api.hpp" +#include "internal/mana_nng.hpp" #include @@ -22,6 +23,145 @@ void usage(const char *self) "'off'.\n"; } +bool process_event_data(nng_msg *msg, mana::IManaSink *sink) +{ + while (nng_msg_len(msg) >= sizeof(mana::EventDataHeader)) + { + auto header = nng::msg_trim_read(msg); + assert(header); + + if (nng_msg_len(msg) < header->sizeBytes) + { + spdlog::error("Error reading event data: expected {} bytes, got {}", header->sizeBytes, + nng_msg_len(msg)); + return false; + } + + auto arrays = reinterpret_cast(nng_msg_body(msg)); + sink->process_event(header->eventIndex, arrays, header->arrayCount, header->sizeBytes); + nng_msg_trim(msg, header->sizeBytes); + } + + return nng_msg_len(msg) == 0; +} + +void run(nng_socket socket_, mana::IManaSink *sink, std::atomic &quit) +{ + auto receive_message = [socket_, &quit]() + { + auto rp = [&quit](int res) { return !quit && res == NNG_ETIMEDOUT; }; + return nng::receive_message_retry(socket_, 0, rp); + }; + + enum class State + { + BeginRun, + Run, + }; + + auto state = State::BeginRun; + + while (!quit) + { + switch (state) + { + case State::BeginRun: + { + spdlog::info("Waiting for BeginRun from server"); + + auto [msg, res] = receive_message(); + + if (res) + { + nng::mnode_nng_error("nng_recvmsg", res); + return; + } + + const size_t msgSize = nng_msg_len(msg.get()); + spdlog::info("State::BeginRun: Received message of size {}", msgSize); + + auto header = nng::msg_trim_read(msg.get()); + + if (!header) + { + spdlog::error("Error reading message header"); + return; + } + + spdlog::info("State::BeginRun: messageType={}", static_cast(header->messageType)); + + if (header->messageType == mana::MessageHeader::BeginRun) + { + spdlog::info("Received BeginRun message (size={})", msgSize); + std::string descriptor_json; + std::copy(reinterpret_cast(nng_msg_body(msg.get())), + reinterpret_cast(nng_msg_body(msg.get())) + msgSize, + std::back_inserter(descriptor_json)); + sink->begin_run(descriptor_json.c_str()); + if (auto res = nng::send_empty_message(socket_)) + { + nng::mnode_nng_error("nng_sendmsg", res); + return; + } + spdlog::info("Receiving Data"); + state = State::Run; + } + else + { + spdlog::error("Error reading BeginRun header"); + return; + } + } + break; + case State::Run: + { + auto [msg, res] = receive_message(); + + if (res) + { + nng::mnode_nng_error("nng_recvmsg", res); + return; + } + + const size_t msgSize = nng_msg_len(msg.get()); + spdlog::debug("State::Run: Received message of size {}", msgSize); + + auto header = nng::msg_trim_read(msg.get()); + + if (!header) + { + spdlog::error("Error reading message header"); + return; + } + + spdlog::debug("State::Run: messageType={}", static_cast(header->messageType)); + + if (header->messageType == mana::MessageHeader::EndRun) + { + spdlog::info("Received EndRun message"); + std::string descriptor_json; + std::copy(reinterpret_cast(nng_msg_body(msg.get())), + reinterpret_cast(nng_msg_body(msg.get())) + msgSize, + std::back_inserter(descriptor_json)); + sink->end_run(descriptor_json.c_str()); + return; + } + else if (header->messageType == mana::MessageHeader::EventData) + { + if (!process_event_data(msg.get(), sink)) + return; + } + else + { + spdlog::error("Unknown message type: {}", static_cast(header->messageType)); + return; + } + } + break; + } + } +} + int main(int argc, char *argv[]) { argh::parser parser({"-h", "--help", "--plugin", "--log-level"}); @@ -79,4 +219,42 @@ int main(int argc, char *argv[]) { destSink = std::make_unique(); } + + auto manaSink = std::make_unique(destSink.get()); + nng_socket socket = nng::make_pair_socket(); + + if (int res = nng_dial(socket, url.c_str(), nullptr, 0)) + { + nng::mnode_nng_error("nng_dial", res); + return 1; + } + + spdlog::info("connected to {}", url); + std::atomic quit = false; + + manaSink->init(0, nullptr); + run(socket, manaSink.get(), quit); + manaSink->shutdown(); + + auto perf = manaSink->perf(); + { + auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes), + static_cast(0)); + auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits), + static_cast(0)); + double elapsed_ms = + std::chrono::duration_cast(perf.t_endRun - perf.t_beginRun) + .count(); + double elapsed_s = elapsed_ms / 1000.0; + + double MiB = totalBytes / (1024.0 * 1024); + double MiB_s = MiB / elapsed_s; + double hit_s = totalHits / elapsed_s; + + fmt::print("Data Sink Performance: events={}, bytes={:.2f} MiB, elapsed={:.2f} s, " + "event_rate={:.2f} event/s, data_rate={:.2f} MiB/s\n", + totalHits, MiB, elapsed_s, hit_s, MiB_s); + } + + return 0; }