mana: remove int() and shutdown() from IManaSink

This commit is contained in:
Florian Lüke 2024-12-30 10:38:41 +01:00
parent 17ce37b480
commit e7e7690d8f
7 changed files with 28 additions and 90 deletions

View file

@ -284,8 +284,6 @@ struct ManaCountingSink: public IManaSink
totalBytes = 0; totalBytes = 0;
} }
void init(int, const char **) override {}
void shutdown() override {}
void begin_run(const char *descriptor_json) override void begin_run(const char *descriptor_json) override
{ {
auto jRun = nlohmann::json::parse(descriptor_json); auto jRun = nlohmann::json::parse(descriptor_json);

View file

@ -16,8 +16,6 @@ class IManaSink
public: public:
virtual ~IManaSink() = default; virtual ~IManaSink() = default;
virtual void init(int plugin_argc, const char **plugin_argv) = 0;
virtual void shutdown() = 0;
virtual void begin_run(const char *descriptor_json) = 0; virtual void begin_run(const char *descriptor_json) = 0;
virtual void end_run(const char *descriptor_json) = 0; virtual void end_run(const char *descriptor_json) = 0;
virtual void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount, virtual void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
@ -25,14 +23,6 @@ class IManaSink
virtual void process_system_event(const uint32_t *data, size_t size) = 0; virtual void process_system_event(const uint32_t *data, size_t size) = 0;
template <typename StringHolder> void init(StringHolder args)
{
std::vector<const char *> cargs(args.size());
std::transform(args.begin(), args.end(), cargs.begin(),
[](const std::string &s) { return s.c_str(); });
init(cargs.size(), cargs.data());
}
protected: protected:
IManaSink() = default; IManaSink() = default;

View file

@ -142,8 +142,6 @@ class ManaSinkPerfProxy: public IManaSink
} }
std::vector<size_t> eventHits; std::vector<size_t> eventHits;
std::vector<size_t> eventBytes; std::vector<size_t> eventBytes;
duration dt_init;
duration dt_shutdown;
duration dt_beginRun; duration dt_beginRun;
duration dt_endRun; duration dt_endRun;
std::vector<duration> dt_processEvent; std::vector<duration> dt_processEvent;
@ -158,20 +156,6 @@ class ManaSinkPerfProxy: public IManaSink
const Perf &perf() const { return perf_; } const Perf &perf() const { return perf_; }
void init(int plugin_argc, const char **plugin_argv) override
{
auto t = Perf::clock::now();
sink_->init(plugin_argc, plugin_argv);
perf_.dt_init = Perf::duration_cast(Perf::clock::now() - t);
}
void shutdown() override
{
auto t = Perf::clock::now();
sink_->shutdown();
perf_.dt_shutdown = Perf::duration_cast(Perf::clock::now() - t);
}
void begin_run(const char *descriptor_json) override void begin_run(const char *descriptor_json) override
{ {
perf_.eventHits.clear(); perf_.eventHits.clear();
@ -247,16 +231,16 @@ struct ManaCSink: public IManaSink
mana_sink_plugin_t plugin_; mana_sink_plugin_t plugin_;
void *context_ = nullptr; void *context_ = nullptr;
explicit ManaCSink(mana_sink_plugin_t plugin) explicit ManaCSink(mana_sink_plugin_t plugin, int plugin_argc, const char **plugin_argv)
: plugin_(plugin) : plugin_(plugin)
, context_(plugin_.init(plugin_argc, plugin_argv))
{ {
if (!context_)
throw std::runtime_error("ManaCSink: plugin init failed");
} }
void init(int plugin_argc, const char **plugin_argv) override ~ManaCSink() override { plugin_.shutdown(context_); }
{
context_ = plugin_.init(plugin_argc, plugin_argv);
}
void shutdown() override { plugin_.shutdown(context_); }
void begin_run(const char *descriptor_json) override void begin_run(const char *descriptor_json) override
{ {
plugin_.begin_run(context_, descriptor_json); plugin_.begin_run(context_, descriptor_json);

View file

@ -41,14 +41,6 @@ class NngServerSink: public IManaSink
{ {
} }
void init(int plugin_argc, const char **plugin_argv) override
{
(void)plugin_argc;
(void)plugin_argv;
}
void shutdown() override {}
void begin_run(const char *descriptor_json) override void begin_run(const char *descriptor_json) override
{ {
auto len = std::strlen(descriptor_json); auto len = std::strlen(descriptor_json);

View file

@ -7,16 +7,6 @@ using namespace mesytec::mnode::mana;
class Sink: public IManaSink class Sink: public IManaSink
{ {
public: public:
void init(int plugin_argc, const char **plugin_argv) override
{
(void)plugin_argc;
(void)plugin_argv;
log_set_level(LOG_INFO);
log_info("init: this=%p", this);
}
void shutdown() override { log_info("shutdown: this=%p", this); }
void begin_run(const char *descriptor_json) override void begin_run(const char *descriptor_json) override
{ {
(void)descriptor_json; (void)descriptor_json;

View file

@ -441,7 +441,7 @@ int main(int argc, char *argv[])
if (auto entryPoint = if (auto entryPoint =
pluginHandle.get<mana_sink_plugin_t()>("mana_get_sink_plugin")) pluginHandle.get<mana_sink_plugin_t()>("mana_get_sink_plugin"))
{ {
destSink = std::make_unique<mana::ManaCSink>(entryPoint()); destSink = std::make_unique<mana::ManaCSink>(entryPoint(), 0, nullptr);
} }
} }
catch (const std::exception &e) catch (const std::exception &e)
@ -526,13 +526,11 @@ int main(int argc, char *argv[])
auto run_replay = [&] auto run_replay = [&]
{ {
mana.sink->init(0, nullptr);
mana.sink->begin_run(mana.runDescriptor.dump().c_str()); mana.sink->begin_run(mana.runDescriptor.dump().c_str());
replayStrategy->run(*listfileContext, *parserContext); replayStrategy->run(*listfileContext, *parserContext);
mana.sink->end_run(mana.runDescriptor.dump().c_str()); mana.sink->end_run(mana.runDescriptor.dump().c_str());
mana.sink->shutdown();
}; };
strategyName = "multi-threaded"; strategyName = "multi-threaded";
@ -573,9 +571,7 @@ int main(int argc, char *argv[])
std::thread replayThread(run_replay); std::thread replayThread(run_replay);
// FIXME: this is useless when running the client loop blocking like this // FIXME: this is useless when running the client loop blocking like this
std::atomic<bool> clientQuit = false; std::atomic<bool> clientQuit = false;
destSink->init(0, nullptr);
mana::nng_client_run(clientSocket, destSink, clientQuit); mana::nng_client_run(clientSocket, destSink, clientQuit);
destSink->shutdown();
if (replayThread.joinable()) if (replayThread.joinable())
replayThread.join(); replayThread.join();
fmt::print("Internal NngServerSink: {}\n", to_string(serverPerfSink->perf())); fmt::print("Internal NngServerSink: {}\n", to_string(serverPerfSink->perf()));

View file

@ -61,28 +61,36 @@ int main(int argc, char *argv[])
try try
{ {
destSink = std::make_unique<mana::ManaCSink>( if (auto entryPoint =
pluginHandle.get<mana_sink_plugin_t()>("mana_get_sink_plugin")()); pluginHandle.get<mana_sink_plugin_t()>("mana_get_sink_plugin"))
{
destSink = std::make_unique<mana::ManaCSink>(entryPoint(), 0, nullptr);
}
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
spdlog::debug("plugin {} is not a MANA_C_SINK_PLUGIN", pluginFile); }
try
if (!destSink)
{ {
manaCppPlugin = std::unique_ptr<mana::IManaPlugin>( if (auto entryPoint = pluginHandle.get<mana::IManaPlugin *()>("mana_get_plugin"))
pluginHandle.get<mana::IManaPlugin *()>("mana_get_plugin")()); {
manaCppPlugin = std::unique_ptr<mana::IManaPlugin>(entryPoint());
if (!manaCppPlugin)
throw std::runtime_error("plugin {}: mana_get_plugin() returned nullptr");
destSink = manaCppPlugin->makeSink(); destSink = manaCppPlugin->makeSink();
} }
}
}
catch (const std::exception &e) catch (const std::exception &e)
{ {
std::cerr << fmt::format("Error loading plugin: {}\n", e.what()); std::cerr << fmt::format("Error loading shared library {}: {}\n", pluginFile, e.what());
return 1; return 1;
} }
}
} if (!destSink)
catch (const std::exception &e)
{ {
std::cerr << fmt::format("Error loading plugin: {}\n", e.what()); std::cerr << fmt::format("Error: plugin {} does not provide a sink\n", pluginFile);
return 1; return 1;
} }
} }
@ -103,29 +111,9 @@ int main(int argc, char *argv[])
spdlog::info("connected to {}", url); spdlog::info("connected to {}", url);
std::atomic<bool> quit = false; std::atomic<bool> quit = false;
manaSink->init(0, nullptr);
run(socket, manaSink.get(), quit); run(socket, manaSink.get(), quit);
manaSink->shutdown();
auto perf = manaSink->perf(); fmt::print("Destination Sink: {}\n", to_string(manaSink->perf()));
{
auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes),
static_cast<size_t>(0));
auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits),
static_cast<size_t>(0));
double elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(perf.t_endRun - perf.t_beginRun)
.count();
double elapsed_s = elapsed_ms / 1000.0;
double MiB = totalBytes / (1024.0 * 1024);
double MiB_s = MiB / elapsed_s;
double hit_s = totalHits / elapsed_s;
fmt::print("Data Sink Performance: events={}, bytes={:.2f} MiB, elapsed={:.2f} s, "
"event_rate={:.2f} event/s, data_rate={:.2f} MiB/s\n",
totalHits, MiB, elapsed_s, hit_s, MiB_s);
}
return 0; return 0;
} }