From 24542b4a6db7879ba7b9dd5f1ff5e00b4660800c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Sun, 29 Dec 2024 21:31:44 +0100 Subject: [PATCH] implement multi-threaded stage1 analysis using an internal nng pipeline --- src/internal/mana_lib.hpp | 20 ++++ src/internal/mana_nng.hpp | 177 ++++++++++++++++++++++++++++-- src/mana_plugin_root_histogram.cc | 6 + src/tools/mana_auto_replay.cc | 89 ++++++++++----- src/tools/mana_nng_client.cc | 145 ++---------------------- 5 files changed, 263 insertions(+), 174 deletions(-) diff --git a/src/internal/mana_lib.hpp b/src/internal/mana_lib.hpp index 8cb8527..2b0188f 100644 --- a/src/internal/mana_lib.hpp +++ b/src/internal/mana_lib.hpp @@ -221,6 +221,26 @@ class ManaSinkPerfProxy: public IManaSink Perf perf_; }; +inline std::string to_string(const ManaSinkPerfProxy::Perf &perf) +{ + auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes), + static_cast(0)); + auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits), + static_cast(0)); + double elapsed_ms = + std::chrono::duration_cast(perf.t_endRun - perf.t_beginRun) + .count(); + double elapsed_s = elapsed_ms / 1000.0; + + double MiB = totalBytes / (1024.0 * 1024); + double MiB_s = MiB / elapsed_s; + double hit_s = totalHits / elapsed_s; + + return fmt::format("events={}, bytes={:.2f} MiB, elapsed={:.2f} s, " + "event_rate={:.2f} event/s, data_rate={:.2f} MiB/s", + totalHits, MiB, elapsed_s, hit_s, MiB_s); +} + // wraps a mana_api.h mana_sink_plugin_t instance struct ManaCSink: public IManaSink { diff --git a/src/internal/mana_nng.hpp b/src/internal/mana_nng.hpp index e176228..258e4ba 100644 --- a/src/internal/mana_nng.hpp +++ b/src/internal/mana_nng.hpp @@ -2,9 +2,9 @@ #define CF5E5AFF_F218_4A25_95DF_8097D7C5685B #include "mana_api.hpp" +#include "mnode_spdlog.hpp" #include #include -#include namespace mesytec::mnode::mana { @@ -15,8 +15,8 @@ struct MessageHeader { Invalid = 0, BeginRun = 42, - EndRun, - EventData, + EndRun = 43, + EventData = 44, }; u32 messageType; @@ -37,6 +37,7 @@ class NngServerSink: public IManaSink explicit NngServerSink(nng_socket socket) : socket_(socket) , msg_(nng::make_unique_msg()) + , logger_(get_spdlog_logger("mana::NngServerSink")) { } @@ -57,7 +58,8 @@ class NngServerSink: public IManaSink nng_msg_append(msg.get(), &header, sizeof(header)); nng_msg_append(msg.get(), descriptor_json, len + 1); - spdlog::info("Sending BeginRun message (size={})", nng_msg_len(msg.get())); + logger_->info("Sending BeginRun message (size={})", nng_msg_len(msg.get())); + int res = 0; do @@ -66,14 +68,14 @@ class NngServerSink: public IManaSink { if (res != NNG_ETIMEDOUT) { - nng::mnode_nng_error("nng_sendmsg", res); + logger_->error("Error sending BeginRun message: {}", nng_strerror(res)); return; } } } while (res == NNG_ETIMEDOUT); - spdlog::info("Waiting for response from client"); + logger_->debug("Waiting for response from client"); while (true) { @@ -81,12 +83,13 @@ class NngServerSink: public IManaSink if (res && res != NNG_ETIMEDOUT) { - nng::mnode_nng_error("nng_recvmsg", res); + logger_->error("Error receiving response from client: {}", nng_strerror(res)); return; } else if (!res) { - spdlog::info("begin_run: Received response from client"); + logger_->info("Received response to BeginRun from client (size={})", + nng_msg_len(msg.get())); break; } } @@ -106,7 +109,7 @@ class NngServerSink: public IManaSink MessageHeader header{MessageHeader::EndRun}; nng_msg_append(msg.get(), &header, sizeof(header)); nng_msg_append(msg.get(), descriptor_json, len + 1); - spdlog::info("Sending EndRun message (size={})", nng_msg_len(msg.get())); + logger_->info("Sending EndRun message (size={})", nng_msg_len(msg.get())); int res = 0; do @@ -115,12 +118,14 @@ class NngServerSink: public IManaSink { if (res != NNG_ETIMEDOUT) { - nng::mnode_nng_error("nng_sendmsg", res); + logger_->error("Error sending EndRun message: {}", nng_strerror(res)); return; } } } while (res == NNG_ETIMEDOUT); + + logger_->info("Sent EndRun message"); } void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount, @@ -174,7 +179,7 @@ class NngServerSink: public IManaSink if (!msg_) return false; - spdlog::debug("Sending EventData message of size {}", nng_msg_len(msg_.get())); + logger_->trace("Sending EventData message of size {}", nng_msg_len(msg_.get())); while (true) { @@ -182,7 +187,7 @@ class NngServerSink: public IManaSink { if (res != NNG_ETIMEDOUT) { - nng::mnode_nng_error("nng_sendmsg", res); + logger_->error("Error sending EventData message: {}", nng_strerror(res)); return false; } } @@ -196,8 +201,156 @@ class NngServerSink: public IManaSink static constexpr size_t InitialMessageReserve = 2 * 1024 * 1024; nng_socket socket_; nng::unique_msg msg_; + std::shared_ptr logger_; }; +inline void nng_client_run(nng_socket socket_, mana::IManaSink *sink, std::atomic &quit) +{ + auto receive_message = [socket_, &quit]() + { + auto rp = [&quit](int res) { return !quit && res == NNG_ETIMEDOUT; }; + return nng::receive_message_retry(socket_, 0, rp); + }; + + enum class State + { + BeginRun, + Run, + }; + + auto logger = get_spdlog_logger("mana::nng_client"); + + auto process_event_data = [&logger](nng_msg *msg, mana::IManaSink *sink) + { + while (nng_msg_len(msg) >= sizeof(mana::EventDataHeader)) + { + auto header = nng::msg_trim_read(msg); + assert(header); + + if (nng_msg_len(msg) < header->sizeBytes) + { + logger->error("Error reading event data: expected {} bytes, got {}", + header->sizeBytes, nng_msg_len(msg)); + return false; + } + + auto arrays = reinterpret_cast(nng_msg_body(msg)); + sink->process_event(header->eventIndex, arrays, header->arrayCount, header->sizeBytes); + nng_msg_trim(msg, header->sizeBytes); + } + + return nng_msg_len(msg) == 0; + }; + + auto state = State::BeginRun; + + while (!quit) + { + switch (state) + { + case State::BeginRun: + { + logger->info("Waiting for BeginRun from server"); + + auto [msg, res] = receive_message(); + + if (res) + { + logger->error("Error receiving BeginRun message: {}", nng_strerror(res)); + return; + } + + const size_t msgSize = nng_msg_len(msg.get()); + logger->info("State::BeginRun: Received message of size {}", msgSize); + + auto header = nng::msg_trim_read(msg.get()); + + if (!header) + { + logger->error("Error reading message header"); + return; + } + + logger->info("State::BeginRun: messageType={}", static_cast(header->messageType)); + + if (header->messageType == mana::MessageHeader::BeginRun) + { + logger->info("Received BeginRun message (size={})", msgSize); + std::string descriptor_json; + std::copy(reinterpret_cast(nng_msg_body(msg.get())), + reinterpret_cast(nng_msg_body(msg.get())) + msgSize, + std::back_inserter(descriptor_json)); + + sink->begin_run(descriptor_json.c_str()); + + if (auto res = nng::send_empty_message(socket_)) + { + logger->error("Error sending response to BeginRun message: {}", + nng_strerror(res)); + return; + } + logger->info("Receiving Data..."); + state = State::Run; + } + else + { + logger->error("Error reading BeginRun header: messageType={}", + static_cast(header->messageType)); + return; + } + } + break; + case State::Run: + { + auto [msg, res] = receive_message(); + + if (res) + { + logger->error("State::Run: Error receiving message: {}", nng_strerror(res)); + return; + } + + const size_t msgSize = nng_msg_len(msg.get()); + logger->trace("State::Run: Received message of size {}", msgSize); + + auto header = nng::msg_trim_read(msg.get()); + + if (!header) + { + logger->error("State::Run: Error reading message header, message too short"); + return; + } + + logger->trace("State::Run: received message: messageType={}", + static_cast(header->messageType)); + + if (header->messageType == mana::MessageHeader::EndRun) + { + logger->info("State::Run: Received EndRun message"); + std::string descriptor_json; + std::copy(reinterpret_cast(nng_msg_body(msg.get())), + reinterpret_cast(nng_msg_body(msg.get())) + msgSize, + std::back_inserter(descriptor_json)); + sink->end_run(descriptor_json.c_str()); + return; + } + else if (header->messageType == mana::MessageHeader::EventData) + { + if (!process_event_data(msg.get(), sink)) + return; + } + else + { + logger->error("State::Run: Received unknown message type: {}", + static_cast(header->messageType)); + return; + } + } + break; + } + } +} + } // namespace mesytec::mnode::mana #endif /* CF5E5AFF_F218_4A25_95DF_8097D7C5685B */ diff --git a/src/mana_plugin_root_histogram.cc b/src/mana_plugin_root_histogram.cc index e120375..cb920b9 100644 --- a/src/mana_plugin_root_histogram.cc +++ b/src/mana_plugin_root_histogram.cc @@ -126,6 +126,8 @@ inline std::string histo_info(const std::vector> &histos) MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) { + assert(context); + assert(context == g_ctx); log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json); auto jRun = nlohmann::json::parse(descriptor_json); std::filesystem::path rp(jRun["name"].get()); @@ -142,6 +144,8 @@ MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) MANA_DEFINE_PLUGIN_END_RUN(end_run) { + assert(context); + assert(context == g_ctx); log_debug("end: context=%p, descriptor_json=%s", context, descriptor_json); auto ctx = reinterpret_cast(context); ctx->outputFile->Write(); @@ -150,6 +154,8 @@ MANA_DEFINE_PLUGIN_END_RUN(end_run) MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) { + assert(context); + assert(context == g_ctx); log_trace("event: ctx=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", context, eventIndex, arrayCount, totalBytes); auto ctx = reinterpret_cast(context); diff --git a/src/tools/mana_auto_replay.cc b/src/tools/mana_auto_replay.cc index 0c6962a..97eae9f 100644 --- a/src/tools/mana_auto_replay.cc +++ b/src/tools/mana_auto_replay.cc @@ -47,6 +47,7 @@ #include "internal/mana_analysis.h" #include "internal/mana_arena.h" #include "internal/mana_lib.hpp" +#include "internal/mana_nng.hpp" CMRC_DECLARE(mnode::resources); @@ -377,10 +378,16 @@ void usage(const char *self) std::cout << fmt::format( "usage: {} [--plugin=] [--replay-strategy=] \n", self); std::cout + << " --plugin= Load a plugin to process the data. Default is a " "simple counting plugin.\n" + << " --replay-strategy= Use a specific replay strategy. " "Available: 'single-threaded', 'multi-threaded'. Default: 'multi-threaded'\n" + + << " --analysis-strategy= Use a specific analysis strategy. " + "Available: 'single-threaded', 'multi-threaded'. Default: 'multi-threaded'\n" + << " --log-level= One of 'trace', 'debug', 'info', 'warn', 'error', " "'off'.\n"; } @@ -391,7 +398,8 @@ int main(int argc, char *argv[]) const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end()); argh::parser parser( - {"-h", "--help", "--plugin", "--replay-strategy", "--no-buffering ", "--log-level"}); + {"-h", "--help", "--plugin", "--replay-strategy", "--analysis-strategy", "--log-level"}); + parser.parse(argc, argv); auto filename = parser[1]; @@ -460,8 +468,6 @@ int main(int argc, char *argv[]) destSink = std::make_unique(); } - auto manaSink = std::make_unique(destSink.get()); - std::string strategyName = "multi-threaded"; if (parser("--replay-strategy")) @@ -507,42 +513,75 @@ int main(int argc, char *argv[]) // make the analysis instance available to the parser callbacks parserContext->parser.userContext = &mana; + auto perfSink = std::make_unique(destSink.get()); + mana.sink = perfSink.get(); auto run_replay = [&] { - manaSink->init(0, nullptr); - manaSink->begin_run(mana.runDescriptor.dump().c_str()); + mana.sink->init(0, nullptr); + mana.sink->begin_run(mana.runDescriptor.dump().c_str()); replayStrategy->run(*listfileContext, *parserContext); - manaSink->end_run(mana.runDescriptor.dump().c_str()); - manaSink->shutdown(); + mana.sink->end_run(mana.runDescriptor.dump().c_str()); + mana.sink->shutdown(); }; - // if (parser["--no-buffering"]) + strategyName = "multi-threaded"; + + if (parser("--analysis-strategy")) + strategyName = parser("--analysis-strategy").str(); + + if (strategyName == "single-threaded") { - mana.sink = manaSink.get(); + // replay strategy handles threading of the file io. + // the target mana.sink is called in the main thread. run_replay(); } - - auto perf = manaSink->perf(); + else if (strategyName == "multi-threaded") { - auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes), - static_cast(0)); - auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits), - static_cast(0)); - double elapsed_ms = - std::chrono::duration_cast(perf.t_endRun - perf.t_beginRun) - .count(); - double elapsed_s = elapsed_ms / 1000.0; + // Create a NngServerSink and use it as the destination sink of the parsing stage. + // In the main thread run a mana nng client, reading from the server + // socket, feeding the destination sink. + static const char *url = "inproc://mana_analysis_stage1"; + nng_socket serverSocket = nng::make_pair_socket(); + if (int res = nng_listen(serverSocket, url, nullptr, 0)) + { + spdlog::error("Error listening on '{}': {}", url, nng_strerror(res)); + return 1; + } + nng_socket clientSocket = nng::make_pair_socket(); + if (int res = nng_dial(clientSocket, url, nullptr, 0)) + { + spdlog::error("Error connecting to '{}': {}", url, nng_strerror(res)); + return 1; + } + // create the server sink and set it as the destination sink for the module data stage + auto serverSink = std::make_unique(serverSocket); + auto serverPerfSink = std::make_unique(serverSink.get()); + auto destSink = mana.sink; + mana.sink = serverPerfSink.get(); - double MiB = totalBytes / (1024.0 * 1024); - double MiB_s = MiB / elapsed_s; - double hit_s = totalHits / elapsed_s; + std::thread replayThread(run_replay); + // FIXME: this is useless when running the client loop blocking like this + std::atomic clientQuit = false; + destSink->init(0, nullptr); + mana::nng_client_run(clientSocket, destSink, clientQuit); + destSink->shutdown(); + if (replayThread.joinable()) + replayThread.join(); + fmt::print("Internal NngServerSink: {}\n", to_string(serverPerfSink->perf())); + } + else + { + std::cerr << fmt::format("Error: unknown analysis strategy '{}'\n", strategyName); + usage(argv[0]); + return 1; + } - fmt::print("Data Sink Performance: events={}, bytes={:.2f} MiB, elapsed={:.2f} s, " - "event_rate={:.2f} event/s, data_rate={:.2f} MiB/s\n", - totalHits, MiB, elapsed_s, hit_s, MiB_s); + if (auto perfProxy = dynamic_cast(perfSink.get())) + { + fmt::print("Destination Sink: {}\n", to_string(perfProxy->perf())); } return 0; diff --git a/src/tools/mana_nng_client.cc b/src/tools/mana_nng_client.cc index 7c09109..ee18c22 100644 --- a/src/tools/mana_nng_client.cc +++ b/src/tools/mana_nng_client.cc @@ -15,151 +15,17 @@ void usage(const char *self) { std::cout << fmt::format("usage: {} [--plugin=] \n", self); std::cout + << " --plugin= Load a plugin to process the data. Default is a " "simple counting plugin.\n" - << " --processing-strategy= Use a specific processing strategy. " - "Available: 'single-threaded', 'nng-pair'. Default: 'nng-pair'\n" + << " --log-level= One of 'trace', 'debug', 'info', 'warn', 'error', " "'off'.\n"; } -bool process_event_data(nng_msg *msg, mana::IManaSink *sink) -{ - while (nng_msg_len(msg) >= sizeof(mana::EventDataHeader)) - { - auto header = nng::msg_trim_read(msg); - assert(header); - - if (nng_msg_len(msg) < header->sizeBytes) - { - spdlog::error("Error reading event data: expected {} bytes, got {}", header->sizeBytes, - nng_msg_len(msg)); - return false; - } - - auto arrays = reinterpret_cast(nng_msg_body(msg)); - sink->process_event(header->eventIndex, arrays, header->arrayCount, header->sizeBytes); - nng_msg_trim(msg, header->sizeBytes); - } - - return nng_msg_len(msg) == 0; -} - void run(nng_socket socket_, mana::IManaSink *sink, std::atomic &quit) { - auto receive_message = [socket_, &quit]() - { - auto rp = [&quit](int res) { return !quit && res == NNG_ETIMEDOUT; }; - return nng::receive_message_retry(socket_, 0, rp); - }; - - enum class State - { - BeginRun, - Run, - }; - - auto state = State::BeginRun; - - while (!quit) - { - switch (state) - { - case State::BeginRun: - { - spdlog::info("Waiting for BeginRun from server"); - - auto [msg, res] = receive_message(); - - if (res) - { - nng::mnode_nng_error("nng_recvmsg", res); - return; - } - - const size_t msgSize = nng_msg_len(msg.get()); - spdlog::info("State::BeginRun: Received message of size {}", msgSize); - - auto header = nng::msg_trim_read(msg.get()); - - if (!header) - { - spdlog::error("Error reading message header"); - return; - } - - spdlog::info("State::BeginRun: messageType={}", static_cast(header->messageType)); - - if (header->messageType == mana::MessageHeader::BeginRun) - { - spdlog::info("Received BeginRun message (size={})", msgSize); - std::string descriptor_json; - std::copy(reinterpret_cast(nng_msg_body(msg.get())), - reinterpret_cast(nng_msg_body(msg.get())) + msgSize, - std::back_inserter(descriptor_json)); - sink->begin_run(descriptor_json.c_str()); - if (auto res = nng::send_empty_message(socket_)) - { - nng::mnode_nng_error("nng_sendmsg", res); - return; - } - spdlog::info("Receiving Data"); - state = State::Run; - } - else - { - spdlog::error("Error reading BeginRun header"); - return; - } - } - break; - case State::Run: - { - auto [msg, res] = receive_message(); - - if (res) - { - nng::mnode_nng_error("nng_recvmsg", res); - return; - } - - const size_t msgSize = nng_msg_len(msg.get()); - spdlog::debug("State::Run: Received message of size {}", msgSize); - - auto header = nng::msg_trim_read(msg.get()); - - if (!header) - { - spdlog::error("Error reading message header"); - return; - } - - spdlog::debug("State::Run: messageType={}", static_cast(header->messageType)); - - if (header->messageType == mana::MessageHeader::EndRun) - { - spdlog::info("Received EndRun message"); - std::string descriptor_json; - std::copy(reinterpret_cast(nng_msg_body(msg.get())), - reinterpret_cast(nng_msg_body(msg.get())) + msgSize, - std::back_inserter(descriptor_json)); - sink->end_run(descriptor_json.c_str()); - return; - } - else if (header->messageType == mana::MessageHeader::EventData) - { - if (!process_event_data(msg.get(), sink)) - return; - } - else - { - spdlog::error("Unknown message type: {}", static_cast(header->messageType)); - return; - } - } - break; - } - } + mana::nng_client_run(socket_, sink, quit); } int main(int argc, char *argv[]) @@ -177,6 +43,11 @@ int main(int argc, char *argv[]) spdlog::set_level(spdlog::level::info); + if (parser("--log-level")) + { + spdlog::set_level(spdlog::level::from_str(parser("--log-level").str())); + } + boost::dll::shared_library pluginHandle; std::unique_ptr manaCppPlugin; std::unique_ptr destSink;