From 9e5b79cb3421696f8e636f26b7bd21c93fcad487 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Thu, 26 Dec 2024 15:37:15 +0100 Subject: [PATCH] mana: implement nng pair based threaded strategy It's a tiny bit faster than the single threaded strategy: - Counting plugin, is690-9Li_3H_run094: - direct, single threaded: 217.72411048962562 MiB/s - nng-pair, multi threaded: 228.50186892946155 MiB/s - root-histogram plugin (hitcount histos only), is690-9Li_3H_run094: - direct, single threaded: 210.73162379794522 MiB/s - nng-pair, multi threaded: 219.66463320034057 MiB/s --- src/tools/mana_auto_replay.cc | 256 ++++++++++++++++++++++++++++++---- 1 file changed, 231 insertions(+), 25 deletions(-) diff --git a/src/tools/mana_auto_replay.cc b/src/tools/mana_auto_replay.cc index 49cfea2..02e24ec 100644 --- a/src/tools/mana_auto_replay.cc +++ b/src/tools/mana_auto_replay.cc @@ -31,18 +31,22 @@ // - create root trees or the new rntuples(?) // -> want plugins. similar to the mvme listfile_reader but on analysis data +#include #include #include // mnode::resources -#include +#include #include #include +#include +#include +#include #include -#include #include + #include "internal/argh.h" +#include "internal/mana_analysis.h" #include "internal/mana_arena.h" #include "internal/mana_lib.hpp" -#include "internal/mana_analysis.h" CMRC_DECLARE(mnode::resources); @@ -141,29 +145,30 @@ struct ProcessingStrategy virtual void run(ListfileContext &listfileContext, ParserContext &parserContext) = 0; }; -inline size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext, - ParserContext &parserContext) -{ - listfileContext.readerHelper.destBuf().clear(); - auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper); - - if (!buffer->used()) - return 0; - - auto bufferView = buffer->viewU32(); - - mvlc::readout_parser::parse_readout_buffer( - listfileContext.readerHelper.bufferFormat, parserContext.parser, parserContext.callbacks, - parserContext.counters, bufferNumber, bufferView.data(), bufferView.size()); - - return buffer->used(); -} - struct SingleThreadedStrategy: public ProcessingStrategy { + inline static size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext, + ParserContext &parserContext) + { + listfileContext.readerHelper.destBuf().clear(); + auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper); + + if (!buffer->used()) + return 0; + + auto bufferView = buffer->viewU32(); + + mvlc::readout_parser::parse_readout_buffer(listfileContext.readerHelper.bufferFormat, + parserContext.parser, parserContext.callbacks, + parserContext.counters, bufferNumber, + bufferView.data(), bufferView.size()); + + return buffer->used(); + } + void run(ListfileContext &listfileContext, ParserContext &parserContext) override { - size_t bufferNumber = 0; + size_t bufferNumber = 1; size_t totalBytesProcessed = 0; size_t bytesProcessed = 0; const std::chrono::milliseconds ReportInterval(500); @@ -199,22 +204,206 @@ struct SingleThreadedStrategy: public ProcessingStrategy } }; +struct NngPairStrategy: public ProcessingStrategy +{ + struct SocketCounters + { + size_t bytes; + size_t messages; + }; + + nng_socket producerSocket; + nng_socket consumerSocket; + SocketCounters producerTx; + SocketCounters consumerRx; + // std::mutex producerTxMutex; + + explicit NngPairStrategy(const std::string &url_) + { + producerSocket = nng::make_pair_socket(); + consumerSocket = nng::make_pair_socket(); + if (int res = nng_listen(producerSocket, url_.c_str(), nullptr, 0)) + throw std::runtime_error( + fmt::format("error listening on '{}': {}", url_, nng_strerror(res))); + + if (int res = nng_dial(consumerSocket, url_.c_str(), nullptr, 0)) + throw std::runtime_error( + fmt::format("error connecting to '{}': {}", url_, nng_strerror(res))); + }; + + ~NngPairStrategy() + { + nng_close(consumerSocket); + nng_close(producerSocket); + } + + // message format is: u32 bufferNumber, u32 bufferSize, u32 bufferData[bufferSize] + void producer(ListfileContext &listfileContext, std::atomic &quit) + { + size_t bufferNumber = 1; + + while (!quit) + { + listfileContext.readerHelper.destBuf().clear(); + auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper); + + if (!buffer->used()) + break; + + auto bufferView = buffer->viewU32(); + auto msg = nng::allocate_message(2 * sizeof(u32) + bufferView.size() * sizeof(u32)); + size_t msgLen = nng_msg_len(msg.get()); + auto data = reinterpret_cast(nng_msg_body(msg.get())); + *(data + 0) = bufferNumber++; + *(data + 1) = static_cast(bufferView.size()); + std::copy(std::begin(bufferView), std::end(bufferView), data + 2); + int res = 0; + + do + { + if ((res = nng::send_message(producerSocket, msg))) + { + if (res != NNG_ETIMEDOUT) + { + spdlog::error("NngPairStrategy: error sending message: {}", + nng_strerror(res)); + return; + } + } + + // std::lock_guard lock(producerTxMutex); + producerTx.bytes += msgLen; + ++producerTx.messages; + } + while (res == NNG_ETIMEDOUT); + } + + nng::send_empty_message(producerSocket); + } + + void consumer(mvlc::ConnectionType bufferFormat, ParserContext &parserContext) + { + size_t bufferNumber = 1; + size_t totalBytesProcessed = 0; + const std::chrono::milliseconds ReportInterval(500); + mvlc::util::Stopwatch sw; + + auto report = [&] + { + [](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed) + { + auto s = sw.get_elapsed().count() / (1000.0 * 1000.0); + auto bytesPerSecond = totalBytesProcessed / s; + auto MiBPerSecond = bytesPerSecond / (1u << 20); + std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n", + bufferNumber, totalBytesProcessed, s, MiBPerSecond); + }(sw, bufferNumber, totalBytesProcessed); + }; + + while (true) + { + auto [msg, res] = nng::receive_message(consumerSocket); + + if (res && res != NNG_ETIMEDOUT) + { + spdlog::error("NngPairStrategy: error receiving message: {}", nng_strerror(res)); + return; + } + + auto data = + mvlc::util::span(reinterpret_cast(nng_msg_body(msg.get())), + nng_msg_len(msg.get()) / sizeof(u32)); + if (data.size() < 2) + break; + + bufferNumber = data[0]; + size_t bufferSize = data[1]; + + if (data.size() != bufferSize + 2) + { + spdlog::error("NngPairStrategy: invalid message size: {}", data.size()); + return; + } + + std::basic_string_view bufferView(data.data() + 2, bufferSize); + + mvlc::readout_parser::parse_readout_buffer( + bufferFormat, parserContext.parser, parserContext.callbacks, parserContext.counters, + bufferNumber, bufferView.data(), bufferView.size()); + + consumerRx.bytes += nng_msg_len(msg.get()); + ++consumerRx.messages; + totalBytesProcessed += bufferView.size() * sizeof(u32); + + if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval) + { + report(); + sw.interval(); + } + } + + report(); + spdlog::info("NngPairStrategy: producerTx: {} MiB, {} messages", + producerTx.bytes / (1024.0 * 1024), producerTx.messages); + spdlog::info("NngPairStrategy: consumerRx: {} MiB, {} messages", + consumerRx.bytes / (1024.0 * 1024), consumerRx.messages); + } + + void run(ListfileContext &listfileContext, ParserContext &parserContext) override + { + auto bufferFormat = listfileContext.readerHelper.bufferFormat; + std::atomic quitProducer = false; + producerTx = {}; + consumerRx = {}; + + std::thread producerThread([this, &listfileContext, &quitProducer] + { producer(listfileContext, quitProducer); }); + + consumer(bufferFormat, parserContext); + + quitProducer = true; + + if (producerThread.joinable()) + producerThread.join(); + } +}; + +void usage(const char *self) +{ + std::cout << fmt::format( + "usage: {} [--plugin=] [--processing-strategy=] \n", self); + std::cout + << " --plugin= Load a plugin to process the data. Default is a " + "simple counting plugin.\n" + << " --processing-strategy= Use a specific processing strategy. " + "Available: 'direct', 'nng-pair'. Default: 'direct'\n" + << " --log-level= One of 'trace', 'debug', 'info', 'warn', 'error', " + "'off'.\n"; +} + int main(int argc, char *argv[]) { auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json"); const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end()); - argh::parser parser({"-h", "--help", "--plugin"}); + argh::parser parser({"-h", "--help", "--plugin", "--processing-strategy", "--log-level"}); parser.parse(argc, argv); auto filename = parser[1]; if (parser["-h"] || parser["--help"] || filename.empty()) { - std::cout << fmt::format("usage: {} [--plugin=] \n", argv[0]); + usage(argv[0]); return 0; } + spdlog::set_level(spdlog::level::info); + + if (parser("--log-level")) + { + spdlog::set_level(spdlog::level::from_str(parser("--log-level").str())); + } + auto listfileContext = make_listfile_context(filename); if (!listfileContext) @@ -246,6 +435,23 @@ int main(int argc, char *argv[]) manaPlugin = std::make_unique(); } + std::string strategyName = "direct"; + if (parser("--processing-strategy")) + strategyName = parser("--processing-strategy").str(); + + std::unique_ptr strategy; //= std::make_unique(); + + if (strategyName == "nng-pair") + strategy = std::make_unique("inproc://mana_module_data_stage"); + else if (strategyName == "direct") + strategy = std::make_unique(); + else + { + std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName); + usage(argv[0]); + return 1; + } + auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig, jModuleDataSources, manaPlugin.get(), nullptr); @@ -271,8 +477,8 @@ int main(int argc, char *argv[]) if (!parserContext) return 1; + // make the analysis instance available to the parser callbacks parserContext->parser.userContext = &mana; - std::unique_ptr strategy = std::make_unique(); mana.sinkContext = mana.sink->init(); mana.sink->begin_run(mana.sinkContext, mana.runDescriptor.dump().c_str());