implement multi-threaded stage1 analysis using an internal nng pipeline

This commit is contained in:
Florian Lüke 2024-12-29 21:31:44 +01:00
parent 4dcaf2ee47
commit 24542b4a6d
5 changed files with 263 additions and 174 deletions

View file

@ -221,6 +221,26 @@ class ManaSinkPerfProxy: public IManaSink
Perf perf_;
};
inline std::string to_string(const ManaSinkPerfProxy::Perf &perf)
{
auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes),
static_cast<size_t>(0));
auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits),
static_cast<size_t>(0));
double elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(perf.t_endRun - perf.t_beginRun)
.count();
double elapsed_s = elapsed_ms / 1000.0;
double MiB = totalBytes / (1024.0 * 1024);
double MiB_s = MiB / elapsed_s;
double hit_s = totalHits / elapsed_s;
return fmt::format("events={}, bytes={:.2f} MiB, elapsed={:.2f} s, "
"event_rate={:.2f} event/s, data_rate={:.2f} MiB/s",
totalHits, MiB, elapsed_s, hit_s, MiB_s);
}
// wraps a mana_api.h mana_sink_plugin_t instance
struct ManaCSink: public IManaSink
{

View file

@ -2,9 +2,9 @@
#define CF5E5AFF_F218_4A25_95DF_8097D7C5685B
#include "mana_api.hpp"
#include "mnode_spdlog.hpp"
#include <mesytec-mnode/mnode_cpp_types.h>
#include <mesytec-mnode/mnode_nng.h>
#include <spdlog/spdlog.h>
namespace mesytec::mnode::mana
{
@ -15,8 +15,8 @@ struct MessageHeader
{
Invalid = 0,
BeginRun = 42,
EndRun,
EventData,
EndRun = 43,
EventData = 44,
};
u32 messageType;
@ -37,6 +37,7 @@ class NngServerSink: public IManaSink
explicit NngServerSink(nng_socket socket)
: socket_(socket)
, msg_(nng::make_unique_msg())
, logger_(get_spdlog_logger("mana::NngServerSink"))
{
}
@ -57,7 +58,8 @@ class NngServerSink: public IManaSink
nng_msg_append(msg.get(), &header, sizeof(header));
nng_msg_append(msg.get(), descriptor_json, len + 1);
spdlog::info("Sending BeginRun message (size={})", nng_msg_len(msg.get()));
logger_->info("Sending BeginRun message (size={})", nng_msg_len(msg.get()));
int res = 0;
do
@ -66,14 +68,14 @@ class NngServerSink: public IManaSink
{
if (res != NNG_ETIMEDOUT)
{
nng::mnode_nng_error("nng_sendmsg", res);
logger_->error("Error sending BeginRun message: {}", nng_strerror(res));
return;
}
}
}
while (res == NNG_ETIMEDOUT);
spdlog::info("Waiting for response from client");
logger_->debug("Waiting for response from client");
while (true)
{
@ -81,12 +83,13 @@ class NngServerSink: public IManaSink
if (res && res != NNG_ETIMEDOUT)
{
nng::mnode_nng_error("nng_recvmsg", res);
logger_->error("Error receiving response from client: {}", nng_strerror(res));
return;
}
else if (!res)
{
spdlog::info("begin_run: Received response from client");
logger_->info("Received response to BeginRun from client (size={})",
nng_msg_len(msg.get()));
break;
}
}
@ -106,7 +109,7 @@ class NngServerSink: public IManaSink
MessageHeader header{MessageHeader::EndRun};
nng_msg_append(msg.get(), &header, sizeof(header));
nng_msg_append(msg.get(), descriptor_json, len + 1);
spdlog::info("Sending EndRun message (size={})", nng_msg_len(msg.get()));
logger_->info("Sending EndRun message (size={})", nng_msg_len(msg.get()));
int res = 0;
do
@ -115,12 +118,14 @@ class NngServerSink: public IManaSink
{
if (res != NNG_ETIMEDOUT)
{
nng::mnode_nng_error("nng_sendmsg", res);
logger_->error("Error sending EndRun message: {}", nng_strerror(res));
return;
}
}
}
while (res == NNG_ETIMEDOUT);
logger_->info("Sent EndRun message");
}
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
@ -174,7 +179,7 @@ class NngServerSink: public IManaSink
if (!msg_)
return false;
spdlog::debug("Sending EventData message of size {}", nng_msg_len(msg_.get()));
logger_->trace("Sending EventData message of size {}", nng_msg_len(msg_.get()));
while (true)
{
@ -182,7 +187,7 @@ class NngServerSink: public IManaSink
{
if (res != NNG_ETIMEDOUT)
{
nng::mnode_nng_error("nng_sendmsg", res);
logger_->error("Error sending EventData message: {}", nng_strerror(res));
return false;
}
}
@ -196,8 +201,156 @@ class NngServerSink: public IManaSink
static constexpr size_t InitialMessageReserve = 2 * 1024 * 1024;
nng_socket socket_;
nng::unique_msg msg_;
std::shared_ptr<spdlog::logger> logger_;
};
inline void nng_client_run(nng_socket socket_, mana::IManaSink *sink, std::atomic<bool> &quit)
{
auto receive_message = [socket_, &quit]()
{
auto rp = [&quit](int res) { return !quit && res == NNG_ETIMEDOUT; };
return nng::receive_message_retry(socket_, 0, rp);
};
enum class State
{
BeginRun,
Run,
};
auto logger = get_spdlog_logger("mana::nng_client");
auto process_event_data = [&logger](nng_msg *msg, mana::IManaSink *sink)
{
while (nng_msg_len(msg) >= sizeof(mana::EventDataHeader))
{
auto header = nng::msg_trim_read<mana::EventDataHeader>(msg);
assert(header);
if (nng_msg_len(msg) < header->sizeBytes)
{
logger->error("Error reading event data: expected {} bytes, got {}",
header->sizeBytes, nng_msg_len(msg));
return false;
}
auto arrays = reinterpret_cast<mana_offset_array_t *>(nng_msg_body(msg));
sink->process_event(header->eventIndex, arrays, header->arrayCount, header->sizeBytes);
nng_msg_trim(msg, header->sizeBytes);
}
return nng_msg_len(msg) == 0;
};
auto state = State::BeginRun;
while (!quit)
{
switch (state)
{
case State::BeginRun:
{
logger->info("Waiting for BeginRun from server");
auto [msg, res] = receive_message();
if (res)
{
logger->error("Error receiving BeginRun message: {}", nng_strerror(res));
return;
}
const size_t msgSize = nng_msg_len(msg.get());
logger->info("State::BeginRun: Received message of size {}", msgSize);
auto header = nng::msg_trim_read<mana::MessageHeader>(msg.get());
if (!header)
{
logger->error("Error reading message header");
return;
}
logger->info("State::BeginRun: messageType={}", static_cast<int>(header->messageType));
if (header->messageType == mana::MessageHeader::BeginRun)
{
logger->info("Received BeginRun message (size={})", msgSize);
std::string descriptor_json;
std::copy(reinterpret_cast<const char *>(nng_msg_body(msg.get())),
reinterpret_cast<const char *>(nng_msg_body(msg.get())) + msgSize,
std::back_inserter(descriptor_json));
sink->begin_run(descriptor_json.c_str());
if (auto res = nng::send_empty_message(socket_))
{
logger->error("Error sending response to BeginRun message: {}",
nng_strerror(res));
return;
}
logger->info("Receiving Data...");
state = State::Run;
}
else
{
logger->error("Error reading BeginRun header: messageType={}",
static_cast<int>(header->messageType));
return;
}
}
break;
case State::Run:
{
auto [msg, res] = receive_message();
if (res)
{
logger->error("State::Run: Error receiving message: {}", nng_strerror(res));
return;
}
const size_t msgSize = nng_msg_len(msg.get());
logger->trace("State::Run: Received message of size {}", msgSize);
auto header = nng::msg_trim_read<mana::MessageHeader>(msg.get());
if (!header)
{
logger->error("State::Run: Error reading message header, message too short");
return;
}
logger->trace("State::Run: received message: messageType={}",
static_cast<int>(header->messageType));
if (header->messageType == mana::MessageHeader::EndRun)
{
logger->info("State::Run: Received EndRun message");
std::string descriptor_json;
std::copy(reinterpret_cast<const char *>(nng_msg_body(msg.get())),
reinterpret_cast<const char *>(nng_msg_body(msg.get())) + msgSize,
std::back_inserter(descriptor_json));
sink->end_run(descriptor_json.c_str());
return;
}
else if (header->messageType == mana::MessageHeader::EventData)
{
if (!process_event_data(msg.get(), sink))
return;
}
else
{
logger->error("State::Run: Received unknown message type: {}",
static_cast<int>(header->messageType));
return;
}
}
break;
}
}
}
} // namespace mesytec::mnode::mana
#endif /* CF5E5AFF_F218_4A25_95DF_8097D7C5685B */

View file

@ -126,6 +126,8 @@ inline std::string histo_info(const std::vector<std::vector<TH1 *>> &histos)
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
{
assert(context);
assert(context == g_ctx);
log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json);
auto jRun = nlohmann::json::parse(descriptor_json);
std::filesystem::path rp(jRun["name"].get<std::string>());
@ -142,6 +144,8 @@ MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
MANA_DEFINE_PLUGIN_END_RUN(end_run)
{
assert(context);
assert(context == g_ctx);
log_debug("end: context=%p, descriptor_json=%s", context, descriptor_json);
auto ctx = reinterpret_cast<Context *>(context);
ctx->outputFile->Write();
@ -150,6 +154,8 @@ MANA_DEFINE_PLUGIN_END_RUN(end_run)
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
{
assert(context);
assert(context == g_ctx);
log_trace("event: ctx=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", context, eventIndex,
arrayCount, totalBytes);
auto ctx = reinterpret_cast<Context *>(context);

View file

@ -47,6 +47,7 @@
#include "internal/mana_analysis.h"
#include "internal/mana_arena.h"
#include "internal/mana_lib.hpp"
#include "internal/mana_nng.hpp"
CMRC_DECLARE(mnode::resources);
@ -377,10 +378,16 @@ void usage(const char *self)
std::cout << fmt::format(
"usage: {} [--plugin=<plugin.so>] [--replay-strategy=<name>] <zipfile>\n", self);
std::cout
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a "
"simple counting plugin.\n"
<< " --replay-strategy=<name> Use a specific replay strategy. "
"Available: 'single-threaded', 'multi-threaded'. Default: 'multi-threaded'\n"
<< " --analysis-strategy=<name> Use a specific analysis strategy. "
"Available: 'single-threaded', 'multi-threaded'. Default: 'multi-threaded'\n"
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
"'off'.\n";
}
@ -391,7 +398,8 @@ int main(int argc, char *argv[])
const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end());
argh::parser parser(
{"-h", "--help", "--plugin", "--replay-strategy", "--no-buffering ", "--log-level"});
{"-h", "--help", "--plugin", "--replay-strategy", "--analysis-strategy", "--log-level"});
parser.parse(argc, argv);
auto filename = parser[1];
@ -460,8 +468,6 @@ int main(int argc, char *argv[])
destSink = std::make_unique<mana::ManaCountingSink>();
}
auto manaSink = std::make_unique<mana::ManaSinkPerfProxy>(destSink.get());
std::string strategyName = "multi-threaded";
if (parser("--replay-strategy"))
@ -507,42 +513,75 @@ int main(int argc, char *argv[])
// make the analysis instance available to the parser callbacks
parserContext->parser.userContext = &mana;
auto perfSink = std::make_unique<mana::ManaSinkPerfProxy>(destSink.get());
mana.sink = perfSink.get();
auto run_replay = [&]
{
manaSink->init(0, nullptr);
manaSink->begin_run(mana.runDescriptor.dump().c_str());
mana.sink->init(0, nullptr);
mana.sink->begin_run(mana.runDescriptor.dump().c_str());
replayStrategy->run(*listfileContext, *parserContext);
manaSink->end_run(mana.runDescriptor.dump().c_str());
manaSink->shutdown();
mana.sink->end_run(mana.runDescriptor.dump().c_str());
mana.sink->shutdown();
};
// if (parser["--no-buffering"])
strategyName = "multi-threaded";
if (parser("--analysis-strategy"))
strategyName = parser("--analysis-strategy").str();
if (strategyName == "single-threaded")
{
mana.sink = manaSink.get();
// replay strategy handles threading of the file io.
// the target mana.sink is called in the main thread.
run_replay();
}
auto perf = manaSink->perf();
else if (strategyName == "multi-threaded")
{
auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes),
static_cast<size_t>(0));
auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits),
static_cast<size_t>(0));
double elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(perf.t_endRun - perf.t_beginRun)
.count();
double elapsed_s = elapsed_ms / 1000.0;
// Create a NngServerSink and use it as the destination sink of the parsing stage.
// In the main thread run a mana nng client, reading from the server
// socket, feeding the destination sink.
static const char *url = "inproc://mana_analysis_stage1";
nng_socket serverSocket = nng::make_pair_socket();
if (int res = nng_listen(serverSocket, url, nullptr, 0))
{
spdlog::error("Error listening on '{}': {}", url, nng_strerror(res));
return 1;
}
nng_socket clientSocket = nng::make_pair_socket();
if (int res = nng_dial(clientSocket, url, nullptr, 0))
{
spdlog::error("Error connecting to '{}': {}", url, nng_strerror(res));
return 1;
}
// create the server sink and set it as the destination sink for the module data stage
auto serverSink = std::make_unique<mana::NngServerSink>(serverSocket);
auto serverPerfSink = std::make_unique<mana::ManaSinkPerfProxy>(serverSink.get());
auto destSink = mana.sink;
mana.sink = serverPerfSink.get();
double MiB = totalBytes / (1024.0 * 1024);
double MiB_s = MiB / elapsed_s;
double hit_s = totalHits / elapsed_s;
std::thread replayThread(run_replay);
// FIXME: this is useless when running the client loop blocking like this
std::atomic<bool> clientQuit = false;
destSink->init(0, nullptr);
mana::nng_client_run(clientSocket, destSink, clientQuit);
destSink->shutdown();
if (replayThread.joinable())
replayThread.join();
fmt::print("Internal NngServerSink: {}\n", to_string(serverPerfSink->perf()));
}
else
{
std::cerr << fmt::format("Error: unknown analysis strategy '{}'\n", strategyName);
usage(argv[0]);
return 1;
}
fmt::print("Data Sink Performance: events={}, bytes={:.2f} MiB, elapsed={:.2f} s, "
"event_rate={:.2f} event/s, data_rate={:.2f} MiB/s\n",
totalHits, MiB, elapsed_s, hit_s, MiB_s);
if (auto perfProxy = dynamic_cast<mana::ManaSinkPerfProxy *>(perfSink.get()))
{
fmt::print("Destination Sink: {}\n", to_string(perfProxy->perf()));
}
return 0;

View file

@ -15,151 +15,17 @@ void usage(const char *self)
{
std::cout << fmt::format("usage: {} [--plugin=<plugin.so>] <url>\n", self);
std::cout
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a "
"simple counting plugin.\n"
<< " --processing-strategy=<name> Use a specific processing strategy. "
"Available: 'single-threaded', 'nng-pair'. Default: 'nng-pair'\n"
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
"'off'.\n";
}
bool process_event_data(nng_msg *msg, mana::IManaSink *sink)
{
while (nng_msg_len(msg) >= sizeof(mana::EventDataHeader))
{
auto header = nng::msg_trim_read<mana::EventDataHeader>(msg);
assert(header);
if (nng_msg_len(msg) < header->sizeBytes)
{
spdlog::error("Error reading event data: expected {} bytes, got {}", header->sizeBytes,
nng_msg_len(msg));
return false;
}
auto arrays = reinterpret_cast<mana_offset_array_t *>(nng_msg_body(msg));
sink->process_event(header->eventIndex, arrays, header->arrayCount, header->sizeBytes);
nng_msg_trim(msg, header->sizeBytes);
}
return nng_msg_len(msg) == 0;
}
void run(nng_socket socket_, mana::IManaSink *sink, std::atomic<bool> &quit)
{
auto receive_message = [socket_, &quit]()
{
auto rp = [&quit](int res) { return !quit && res == NNG_ETIMEDOUT; };
return nng::receive_message_retry(socket_, 0, rp);
};
enum class State
{
BeginRun,
Run,
};
auto state = State::BeginRun;
while (!quit)
{
switch (state)
{
case State::BeginRun:
{
spdlog::info("Waiting for BeginRun from server");
auto [msg, res] = receive_message();
if (res)
{
nng::mnode_nng_error("nng_recvmsg", res);
return;
}
const size_t msgSize = nng_msg_len(msg.get());
spdlog::info("State::BeginRun: Received message of size {}", msgSize);
auto header = nng::msg_trim_read<mana::MessageHeader>(msg.get());
if (!header)
{
spdlog::error("Error reading message header");
return;
}
spdlog::info("State::BeginRun: messageType={}", static_cast<int>(header->messageType));
if (header->messageType == mana::MessageHeader::BeginRun)
{
spdlog::info("Received BeginRun message (size={})", msgSize);
std::string descriptor_json;
std::copy(reinterpret_cast<const char *>(nng_msg_body(msg.get())),
reinterpret_cast<const char *>(nng_msg_body(msg.get())) + msgSize,
std::back_inserter(descriptor_json));
sink->begin_run(descriptor_json.c_str());
if (auto res = nng::send_empty_message(socket_))
{
nng::mnode_nng_error("nng_sendmsg", res);
return;
}
spdlog::info("Receiving Data");
state = State::Run;
}
else
{
spdlog::error("Error reading BeginRun header");
return;
}
}
break;
case State::Run:
{
auto [msg, res] = receive_message();
if (res)
{
nng::mnode_nng_error("nng_recvmsg", res);
return;
}
const size_t msgSize = nng_msg_len(msg.get());
spdlog::debug("State::Run: Received message of size {}", msgSize);
auto header = nng::msg_trim_read<mana::MessageHeader>(msg.get());
if (!header)
{
spdlog::error("Error reading message header");
return;
}
spdlog::debug("State::Run: messageType={}", static_cast<int>(header->messageType));
if (header->messageType == mana::MessageHeader::EndRun)
{
spdlog::info("Received EndRun message");
std::string descriptor_json;
std::copy(reinterpret_cast<const char *>(nng_msg_body(msg.get())),
reinterpret_cast<const char *>(nng_msg_body(msg.get())) + msgSize,
std::back_inserter(descriptor_json));
sink->end_run(descriptor_json.c_str());
return;
}
else if (header->messageType == mana::MessageHeader::EventData)
{
if (!process_event_data(msg.get(), sink))
return;
}
else
{
spdlog::error("Unknown message type: {}", static_cast<int>(header->messageType));
return;
}
}
break;
}
}
mana::nng_client_run(socket_, sink, quit);
}
int main(int argc, char *argv[])
@ -177,6 +43,11 @@ int main(int argc, char *argv[])
spdlog::set_level(spdlog::level::info);
if (parser("--log-level"))
{
spdlog::set_level(spdlog::level::from_str(parser("--log-level").str()));
}
boost::dll::shared_library pluginHandle;
std::unique_ptr<mana::IManaPlugin> manaCppPlugin;
std::unique_ptr<mana::IManaSink> destSink;