mana: implement nng pair based threaded strategy
It's a tiny bit faster than the single threaded strategy: - Counting plugin, is690-9Li_3H_run094: - direct, single threaded: 217.72411048962562 MiB/s - nng-pair, multi threaded: 228.50186892946155 MiB/s - root-histogram plugin (hitcount histos only), is690-9Li_3H_run094: - direct, single threaded: 210.73162379794522 MiB/s - nng-pair, multi threaded: 219.66463320034057 MiB/s
This commit is contained in:
parent
e38ada8854
commit
9e5b79cb34
1 changed files with 231 additions and 25 deletions
|
@ -31,18 +31,22 @@
|
|||
// - create root trees or the new rntuples(?)
|
||||
// -> want plugins. similar to the mvme listfile_reader but on analysis data
|
||||
|
||||
#include <atomic>
|
||||
#include <boost/dll.hpp>
|
||||
#include <cmrc/cmrc.hpp> // mnode::resources
|
||||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||
#include <list>
|
||||
#include <mesytec-mnode/mnode_cpp_types.h>
|
||||
#include <mesytec-mnode/mnode_math.h>
|
||||
#include <mesytec-mnode/mnode_nng.h>
|
||||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||
#include <mutex>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <list>
|
||||
#include <sstream>
|
||||
|
||||
#include "internal/argh.h"
|
||||
#include "internal/mana_analysis.h"
|
||||
#include "internal/mana_arena.h"
|
||||
#include "internal/mana_lib.hpp"
|
||||
#include "internal/mana_analysis.h"
|
||||
|
||||
CMRC_DECLARE(mnode::resources);
|
||||
|
||||
|
@ -141,9 +145,11 @@ struct ProcessingStrategy
|
|||
virtual void run(ListfileContext &listfileContext, ParserContext &parserContext) = 0;
|
||||
};
|
||||
|
||||
inline size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
|
||||
ParserContext &parserContext)
|
||||
struct SingleThreadedStrategy: public ProcessingStrategy
|
||||
{
|
||||
inline static size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
|
||||
ParserContext &parserContext)
|
||||
{
|
||||
listfileContext.readerHelper.destBuf().clear();
|
||||
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);
|
||||
|
||||
|
@ -152,18 +158,17 @@ inline size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileC
|
|||
|
||||
auto bufferView = buffer->viewU32();
|
||||
|
||||
mvlc::readout_parser::parse_readout_buffer(
|
||||
listfileContext.readerHelper.bufferFormat, parserContext.parser, parserContext.callbacks,
|
||||
parserContext.counters, bufferNumber, bufferView.data(), bufferView.size());
|
||||
mvlc::readout_parser::parse_readout_buffer(listfileContext.readerHelper.bufferFormat,
|
||||
parserContext.parser, parserContext.callbacks,
|
||||
parserContext.counters, bufferNumber,
|
||||
bufferView.data(), bufferView.size());
|
||||
|
||||
return buffer->used();
|
||||
}
|
||||
}
|
||||
|
||||
struct SingleThreadedStrategy: public ProcessingStrategy
|
||||
{
|
||||
void run(ListfileContext &listfileContext, ParserContext &parserContext) override
|
||||
{
|
||||
size_t bufferNumber = 0;
|
||||
size_t bufferNumber = 1;
|
||||
size_t totalBytesProcessed = 0;
|
||||
size_t bytesProcessed = 0;
|
||||
const std::chrono::milliseconds ReportInterval(500);
|
||||
|
@ -199,22 +204,206 @@ struct SingleThreadedStrategy: public ProcessingStrategy
|
|||
}
|
||||
};
|
||||
|
||||
struct NngPairStrategy: public ProcessingStrategy
|
||||
{
|
||||
struct SocketCounters
|
||||
{
|
||||
size_t bytes;
|
||||
size_t messages;
|
||||
};
|
||||
|
||||
nng_socket producerSocket;
|
||||
nng_socket consumerSocket;
|
||||
SocketCounters producerTx;
|
||||
SocketCounters consumerRx;
|
||||
// std::mutex producerTxMutex;
|
||||
|
||||
explicit NngPairStrategy(const std::string &url_)
|
||||
{
|
||||
producerSocket = nng::make_pair_socket();
|
||||
consumerSocket = nng::make_pair_socket();
|
||||
if (int res = nng_listen(producerSocket, url_.c_str(), nullptr, 0))
|
||||
throw std::runtime_error(
|
||||
fmt::format("error listening on '{}': {}", url_, nng_strerror(res)));
|
||||
|
||||
if (int res = nng_dial(consumerSocket, url_.c_str(), nullptr, 0))
|
||||
throw std::runtime_error(
|
||||
fmt::format("error connecting to '{}': {}", url_, nng_strerror(res)));
|
||||
};
|
||||
|
||||
~NngPairStrategy()
|
||||
{
|
||||
nng_close(consumerSocket);
|
||||
nng_close(producerSocket);
|
||||
}
|
||||
|
||||
// message format is: u32 bufferNumber, u32 bufferSize, u32 bufferData[bufferSize]
|
||||
void producer(ListfileContext &listfileContext, std::atomic<bool> &quit)
|
||||
{
|
||||
size_t bufferNumber = 1;
|
||||
|
||||
while (!quit)
|
||||
{
|
||||
listfileContext.readerHelper.destBuf().clear();
|
||||
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);
|
||||
|
||||
if (!buffer->used())
|
||||
break;
|
||||
|
||||
auto bufferView = buffer->viewU32();
|
||||
auto msg = nng::allocate_message(2 * sizeof(u32) + bufferView.size() * sizeof(u32));
|
||||
size_t msgLen = nng_msg_len(msg.get());
|
||||
auto data = reinterpret_cast<u32 *>(nng_msg_body(msg.get()));
|
||||
*(data + 0) = bufferNumber++;
|
||||
*(data + 1) = static_cast<u32>(bufferView.size());
|
||||
std::copy(std::begin(bufferView), std::end(bufferView), data + 2);
|
||||
int res = 0;
|
||||
|
||||
do
|
||||
{
|
||||
if ((res = nng::send_message(producerSocket, msg)))
|
||||
{
|
||||
if (res != NNG_ETIMEDOUT)
|
||||
{
|
||||
spdlog::error("NngPairStrategy: error sending message: {}",
|
||||
nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// std::lock_guard<std::mutex> lock(producerTxMutex);
|
||||
producerTx.bytes += msgLen;
|
||||
++producerTx.messages;
|
||||
}
|
||||
while (res == NNG_ETIMEDOUT);
|
||||
}
|
||||
|
||||
nng::send_empty_message(producerSocket);
|
||||
}
|
||||
|
||||
void consumer(mvlc::ConnectionType bufferFormat, ParserContext &parserContext)
|
||||
{
|
||||
size_t bufferNumber = 1;
|
||||
size_t totalBytesProcessed = 0;
|
||||
const std::chrono::milliseconds ReportInterval(500);
|
||||
mvlc::util::Stopwatch sw;
|
||||
|
||||
auto report = [&]
|
||||
{
|
||||
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
|
||||
{
|
||||
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
|
||||
auto bytesPerSecond = totalBytesProcessed / s;
|
||||
auto MiBPerSecond = bytesPerSecond / (1u << 20);
|
||||
std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n",
|
||||
bufferNumber, totalBytesProcessed, s, MiBPerSecond);
|
||||
}(sw, bufferNumber, totalBytesProcessed);
|
||||
};
|
||||
|
||||
while (true)
|
||||
{
|
||||
auto [msg, res] = nng::receive_message(consumerSocket);
|
||||
|
||||
if (res && res != NNG_ETIMEDOUT)
|
||||
{
|
||||
spdlog::error("NngPairStrategy: error receiving message: {}", nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
|
||||
auto data =
|
||||
mvlc::util::span<const u32>(reinterpret_cast<const u32 *>(nng_msg_body(msg.get())),
|
||||
nng_msg_len(msg.get()) / sizeof(u32));
|
||||
if (data.size() < 2)
|
||||
break;
|
||||
|
||||
bufferNumber = data[0];
|
||||
size_t bufferSize = data[1];
|
||||
|
||||
if (data.size() != bufferSize + 2)
|
||||
{
|
||||
spdlog::error("NngPairStrategy: invalid message size: {}", data.size());
|
||||
return;
|
||||
}
|
||||
|
||||
std::basic_string_view<u32> bufferView(data.data() + 2, bufferSize);
|
||||
|
||||
mvlc::readout_parser::parse_readout_buffer(
|
||||
bufferFormat, parserContext.parser, parserContext.callbacks, parserContext.counters,
|
||||
bufferNumber, bufferView.data(), bufferView.size());
|
||||
|
||||
consumerRx.bytes += nng_msg_len(msg.get());
|
||||
++consumerRx.messages;
|
||||
totalBytesProcessed += bufferView.size() * sizeof(u32);
|
||||
|
||||
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
|
||||
{
|
||||
report();
|
||||
sw.interval();
|
||||
}
|
||||
}
|
||||
|
||||
report();
|
||||
spdlog::info("NngPairStrategy: producerTx: {} MiB, {} messages",
|
||||
producerTx.bytes / (1024.0 * 1024), producerTx.messages);
|
||||
spdlog::info("NngPairStrategy: consumerRx: {} MiB, {} messages",
|
||||
consumerRx.bytes / (1024.0 * 1024), consumerRx.messages);
|
||||
}
|
||||
|
||||
void run(ListfileContext &listfileContext, ParserContext &parserContext) override
|
||||
{
|
||||
auto bufferFormat = listfileContext.readerHelper.bufferFormat;
|
||||
std::atomic<bool> quitProducer = false;
|
||||
producerTx = {};
|
||||
consumerRx = {};
|
||||
|
||||
std::thread producerThread([this, &listfileContext, &quitProducer]
|
||||
{ producer(listfileContext, quitProducer); });
|
||||
|
||||
consumer(bufferFormat, parserContext);
|
||||
|
||||
quitProducer = true;
|
||||
|
||||
if (producerThread.joinable())
|
||||
producerThread.join();
|
||||
}
|
||||
};
|
||||
|
||||
void usage(const char *self)
|
||||
{
|
||||
std::cout << fmt::format(
|
||||
"usage: {} [--plugin=<plugin.so>] [--processing-strategy=<name>] <zipfile>\n", self);
|
||||
std::cout
|
||||
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a "
|
||||
"simple counting plugin.\n"
|
||||
<< " --processing-strategy=<name> Use a specific processing strategy. "
|
||||
"Available: 'direct', 'nng-pair'. Default: 'direct'\n"
|
||||
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
|
||||
"'off'.\n";
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json");
|
||||
const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end());
|
||||
|
||||
argh::parser parser({"-h", "--help", "--plugin"});
|
||||
argh::parser parser({"-h", "--help", "--plugin", "--processing-strategy", "--log-level"});
|
||||
parser.parse(argc, argv);
|
||||
|
||||
auto filename = parser[1];
|
||||
|
||||
if (parser["-h"] || parser["--help"] || filename.empty())
|
||||
{
|
||||
std::cout << fmt::format("usage: {} [--plugin=<plugin.so>] <zipfile>\n", argv[0]);
|
||||
usage(argv[0]);
|
||||
return 0;
|
||||
}
|
||||
|
||||
spdlog::set_level(spdlog::level::info);
|
||||
|
||||
if (parser("--log-level"))
|
||||
{
|
||||
spdlog::set_level(spdlog::level::from_str(parser("--log-level").str()));
|
||||
}
|
||||
|
||||
auto listfileContext = make_listfile_context(filename);
|
||||
|
||||
if (!listfileContext)
|
||||
|
@ -246,6 +435,23 @@ int main(int argc, char *argv[])
|
|||
manaPlugin = std::make_unique<mana::ManaCountingSink>();
|
||||
}
|
||||
|
||||
std::string strategyName = "direct";
|
||||
if (parser("--processing-strategy"))
|
||||
strategyName = parser("--processing-strategy").str();
|
||||
|
||||
std::unique_ptr<ProcessingStrategy> strategy; //= std::make_unique<SingleThreadedStrategy>();
|
||||
|
||||
if (strategyName == "nng-pair")
|
||||
strategy = std::make_unique<NngPairStrategy>("inproc://mana_module_data_stage");
|
||||
else if (strategyName == "direct")
|
||||
strategy = std::make_unique<SingleThreadedStrategy>();
|
||||
else
|
||||
{
|
||||
std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName);
|
||||
usage(argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig,
|
||||
jModuleDataSources, manaPlugin.get(), nullptr);
|
||||
|
||||
|
@ -271,8 +477,8 @@ int main(int argc, char *argv[])
|
|||
if (!parserContext)
|
||||
return 1;
|
||||
|
||||
// make the analysis instance available to the parser callbacks
|
||||
parserContext->parser.userContext = &mana;
|
||||
std::unique_ptr<ProcessingStrategy> strategy = std::make_unique<SingleThreadedStrategy>();
|
||||
|
||||
mana.sinkContext = mana.sink->init();
|
||||
mana.sink->begin_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
|
||||
|
|
Loading…
Reference in a new issue