pass --plugin-args to mana c sink plugins

This commit is contained in:
Florian Lüke 2024-12-30 18:59:04 +01:00
parent ab05cab596
commit b85096f649
8 changed files with 84 additions and 91 deletions

View file

@ -2,7 +2,7 @@
## How to build ## How to build
Requires git, ninja, cmake, gcc or clang, boost-filesystem, boost-system. Requires: git ninja cmake gcc/clang libboost-filesystem-dev libboost-system-dev libboost-program-options-dev
If building under msys2 use ucrt64. If building under msys2 use ucrt64.

View file

@ -14,8 +14,9 @@ target_link_libraries(mesytec-mnode PUBLIC nng mnode-proto PRIVATE spdlog)
target_compile_features(mesytec-mnode PRIVATE cxx_std_17) target_compile_features(mesytec-mnode PRIVATE cxx_std_17)
target_compile_options(mesytec-mnode PRIVATE ${MVLC_NNG_MNODE_WARN_FLAGS}) target_compile_options(mesytec-mnode PRIVATE ${MVLC_NNG_MNODE_WARN_FLAGS})
find_package(Boost CONFIG COMPONENTS filesystem program_options system REQUIRED)
add_library(mana INTERFACE) add_library(mana INTERFACE)
target_link_libraries(mana INTERFACE nlohmann_json::nlohmann_json mesytec-mvlc) target_link_libraries(mana INTERFACE nlohmann_json::nlohmann_json mesytec-mvlc Boost::filesystem Boost::program_options Boost::system)
target_include_directories(mana target_include_directories(mana
INTERFACE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include> INTERFACE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>
INTERFACE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/mesytec-mnode>) INTERFACE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/mesytec-mnode>)

View file

@ -3,6 +3,8 @@
#include "mana_api.hpp" #include "mana_api.hpp"
#include "mana_arena.h" #include "mana_arena.h"
#include <boost/dll.hpp>
#include <boost/program_options/parsers.hpp>
#include <cassert> #include <cassert>
#include <chrono> #include <chrono>
#include <mesytec-mnode/mnode_cpp_types.h> #include <mesytec-mnode/mnode_cpp_types.h>
@ -304,6 +306,42 @@ struct ObjectPath
} }
}; };
struct PluginWrapper
{
boost::dll::shared_library pluginHandle;
std::unique_ptr<mana::IManaPlugin> manaCppPlugin;
std::unique_ptr<mana::IManaSink> destSink;
};
PluginWrapper load_mana_plugin(const std::string &filename,
const std::vector<std::string> &pluginArgs)
{
PluginWrapper result;
result.pluginHandle = boost::dll::shared_library(filename);
if (auto entryPoint = result.pluginHandle.get<mana_sink_plugin_t()>("mana_get_sink_plugin"))
{
std::vector<const char *> argv;
for (const auto &arg: pluginArgs)
argv.push_back(arg.data());
result.destSink = std::make_unique<mana::ManaCSink>(entryPoint(), argv.size(), argv.data());
}
else if (auto entryPoint = result.pluginHandle.get<mana::IManaPlugin *()>("mana_get_plugin"))
{
result.manaCppPlugin = std::unique_ptr<mana::IManaPlugin>(entryPoint());
if (!result.manaCppPlugin)
throw std::runtime_error("plugin {}: mana_get_plugin() returned nullptr");
result.destSink = result.manaCppPlugin->makeSink();
}
return result;
}
PluginWrapper load_mana_plugin(const std::string &filename, const std::string &cmdline)
{
return load_mana_plugin(filename, boost::program_options::split_unix(cmdline));
}
} // namespace mesytec::mnode::mana } // namespace mesytec::mnode::mana
#endif /* AAB5E4D2_A05B_4F2F_B76A_406A5A569D55 */ #endif /* AAB5E4D2_A05B_4F2F_B76A_406A5A569D55 */

View file

@ -22,8 +22,6 @@ struct MessageHeader
u32 messageType; u32 messageType;
}; };
// TODO: don't need sizeBytes as the client can infer from the descriptor in begin_run()
// could transmit eventindex and array count as u16|u16 or sequence number and eventindex
struct EventDataHeader struct EventDataHeader
{ {
u16 eventIndex; u16 eventIndex;
@ -31,6 +29,9 @@ struct EventDataHeader
u32 sizeBytes; u32 sizeBytes;
}; };
// EventData message layout: MessageHeader, repeated(EventDataHeader +
// mana_offset_array_t[arrayCount])
class NngServerSink: public IManaSink class NngServerSink: public IManaSink
{ {
public: public:

View file

@ -11,7 +11,11 @@ MANA_DEFINE_PLUGIN_INIT(init)
{ {
log_set_level(LOG_INFO); log_set_level(LOG_INFO);
struct Context *ctx = calloc(1, sizeof(*ctx)); struct Context *ctx = calloc(1, sizeof(*ctx));
log_info("init: ctx=%p", ctx); log_info("init: ctx=%p, argc=%d", ctx, plugin_argc);
for (int i = 0; i < plugin_argc; ++i)
{
log_info("init: argv[%d]=%s", i, plugin_argv[i]);
}
return ctx; return ctx;
} }

View file

@ -21,13 +21,10 @@ add_mnode_dev_executable(mesy_nng_push_pull_main)
add_mnode_dev_executable(mesy_nng_pub_producer) add_mnode_dev_executable(mesy_nng_pub_producer)
add_mnode_dev_executable(mesy_nng_sub_consumer) add_mnode_dev_executable(mesy_nng_sub_consumer)
find_package(Boost CONFIG COMPONENTS filesystem system REQUIRED)
if (Boost_FOUND)
add_mnode_dev_executable(mana_auto_replay) add_mnode_dev_executable(mana_auto_replay)
target_link_libraries(mana_auto_replay PRIVATE nlohmann_json::nlohmann_json mnode::resources Boost::filesystem Boost::system) target_link_libraries(mana_auto_replay PRIVATE mana mnode::resources)
add_mnode_dev_executable(mana_nng_client) add_mnode_dev_executable(mana_nng_client)
target_link_libraries(mana_nng_client PRIVATE nlohmann_json::nlohmann_json mnode::resources Boost::filesystem Boost::system) target_link_libraries(mana_nng_client PRIVATE mana mnode::resources)
endif()
add_mnode_proto_dev_executable(mnode_proto_test1) add_mnode_proto_dev_executable(mnode_proto_test1)
add_mnode_proto_dev_executable(mnode_proto_ping_client) add_mnode_proto_dev_executable(mnode_proto_ping_client)

View file

@ -32,7 +32,6 @@
// -> want plugins. similar to the mvme listfile_reader but on analysis data // -> want plugins. similar to the mvme listfile_reader but on analysis data
#include <atomic> #include <atomic>
#include <boost/dll.hpp>
#include <cmrc/cmrc.hpp> // mnode::resources #include <cmrc/cmrc.hpp> // mnode::resources
#include <list> #include <list>
#include <mesytec-mnode/mnode_cpp_types.h> #include <mesytec-mnode/mnode_cpp_types.h>
@ -375,13 +374,16 @@ struct NngPairStrategy: public ReplayStrategy
void usage(const char *self) void usage(const char *self)
{ {
std::cout << fmt::format( std::cout << fmt::format("usage: {} [--plugin <plugin.so>] [--plugin-args <args>] "
"usage: {} [--plugin=<plugin.so>] [--replay-strategy=<name>] <zipfile>\n", self); "[--replay-strategy=<name>] <zipfile>\n",
self);
std::cout std::cout
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a " << " --plugin <plugin.so> Load a plugin to process the data. Default is a "
"simple counting plugin.\n" "simple counting plugin.\n"
<< " --plugin-args <args> Optional arguments to pass to the plugin.\n"
<< " --replay-strategy=<name> Use a specific replay strategy. " << " --replay-strategy=<name> Use a specific replay strategy. "
"Available: 'single-threaded', 'multi-threaded'. Default: 'multi-threaded'\n" "Available: 'single-threaded', 'multi-threaded'. Default: 'multi-threaded'\n"
@ -397,8 +399,8 @@ int main(int argc, char *argv[])
auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json"); auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json");
const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end()); const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end());
argh::parser parser( argh::parser parser({"-h", "--help", "--plugin", "--plugin-args", "--replay-strategy",
{"-h", "--help", "--plugin", "--replay-strategy", "--analysis-strategy", "--log-level"}); "--analysis-strategy", "--log-level"});
parser.parse(argc, argv); parser.parse(argc, argv);
@ -425,57 +427,30 @@ int main(int argc, char *argv[])
return 1; return 1;
} }
boost::dll::shared_library pluginHandle; mana::PluginWrapper plugin;
std::unique_ptr<mana::IManaPlugin> manaCppPlugin;
std::unique_ptr<mana::IManaSink> destSink;
if (parser("--plugin")) if (parser("--plugin"))
{ {
auto pluginFile = parser("--plugin").str(); auto pluginFile = parser("--plugin").str();
try
{
pluginHandle = boost::dll::shared_library(pluginFile);
try try
{ {
if (auto entryPoint = plugin = mana::load_mana_plugin(pluginFile, parser("--plugin-args").str());
pluginHandle.get<mana_sink_plugin_t()>("mana_get_sink_plugin"))
{
destSink = std::make_unique<mana::ManaCSink>(entryPoint(), 0, nullptr);
}
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
} std::cerr << fmt::format("Error loading mana plugin from {}: {}\n", pluginFile,
e.what());
if (!destSink)
{
if (auto entryPoint = pluginHandle.get<mana::IManaPlugin *()>("mana_get_plugin"))
{
manaCppPlugin = std::unique_ptr<mana::IManaPlugin>(entryPoint());
if (!manaCppPlugin)
throw std::runtime_error("plugin {}: mana_get_plugin() returned nullptr");
destSink = manaCppPlugin->makeSink();
}
}
}
catch (const std::exception &e)
{
std::cerr << fmt::format("Error loading shared library {}: {}\n", pluginFile, e.what());
return 1;
}
if (!destSink)
{
std::cerr << fmt::format("Error: plugin {} does not provide a sink\n", pluginFile);
return 1; return 1;
} }
} }
else else
{ {
destSink = std::make_unique<mana::ManaCountingSink>(); plugin.destSink = std::make_unique<mana::ManaCountingSink>();
} }
auto &destSink = plugin.destSink;
std::string strategyName = "multi-threaded"; std::string strategyName = "multi-threaded";
if (parser("--replay-strategy")) if (parser("--replay-strategy"))

View file

@ -13,12 +13,15 @@ using namespace mesytec::mnode;
void usage(const char *self) void usage(const char *self)
{ {
std::cout << fmt::format("usage: {} [--plugin=<plugin.so>] <url>\n", self); std::cout << fmt::format("usage: {} [--plugin <plugin.so>] [--plugin-args <args>] <url>\n",
self);
std::cout std::cout
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a " << " --plugin <plugin.so> Load a plugin to process the data. Default is a "
"simple counting plugin.\n" "simple counting plugin.\n"
<< " --plugin-args <args> Optional arguments to pass to the plugin.\n"
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', " << " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
"'off'.\n"; "'off'.\n";
} }
@ -30,7 +33,8 @@ void run(nng_socket socket_, mana::IManaSink *sink, std::atomic<bool> &quit)
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
argh::parser parser({"-h", "--help", "--plugin", "--log-level"}); argh::parser parser(
{"-h", "--help", "--plugin", "--plugin-args", "--plugin-args", "--log-level"});
parser.parse(argc, argv); parser.parse(argc, argv);
auto url = parser[1]; auto url = parser[1];
@ -48,57 +52,30 @@ int main(int argc, char *argv[])
spdlog::set_level(spdlog::level::from_str(parser("--log-level").str())); spdlog::set_level(spdlog::level::from_str(parser("--log-level").str()));
} }
boost::dll::shared_library pluginHandle; mana::PluginWrapper plugin;
std::unique_ptr<mana::IManaPlugin> manaCppPlugin;
std::unique_ptr<mana::IManaSink> destSink;
if (parser("--plugin")) if (parser("--plugin"))
{ {
auto pluginFile = parser("--plugin").str(); auto pluginFile = parser("--plugin").str();
try
{
pluginHandle = boost::dll::shared_library(pluginFile);
try try
{ {
if (auto entryPoint = plugin = mana::load_mana_plugin(pluginFile, parser("--plugin-args").str());
pluginHandle.get<mana_sink_plugin_t()>("mana_get_sink_plugin"))
{
destSink = std::make_unique<mana::ManaCSink>(entryPoint(), 0, nullptr);
}
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
} std::cerr << fmt::format("Error loading mana plugin from {}: {}\n", pluginFile,
e.what());
if (!destSink)
{
if (auto entryPoint = pluginHandle.get<mana::IManaPlugin *()>("mana_get_plugin"))
{
manaCppPlugin = std::unique_ptr<mana::IManaPlugin>(entryPoint());
if (!manaCppPlugin)
throw std::runtime_error("plugin {}: mana_get_plugin() returned nullptr");
destSink = manaCppPlugin->makeSink();
}
}
}
catch (const std::exception &e)
{
std::cerr << fmt::format("Error loading shared library {}: {}\n", pluginFile, e.what());
return 1;
}
if (!destSink)
{
std::cerr << fmt::format("Error: plugin {} does not provide a sink\n", pluginFile);
return 1; return 1;
} }
} }
else else
{ {
destSink = std::make_unique<mana::ManaCountingSink>(); plugin.destSink = std::make_unique<mana::ManaCountingSink>();
} }
auto &destSink = plugin.destSink;
auto manaSink = std::make_unique<mana::ManaSinkPerfProxy>(destSink.get()); auto manaSink = std::make_unique<mana::ManaSinkPerfProxy>(destSink.get());
nng_socket socket = nng::make_pair_socket(); nng_socket socket = nng::make_pair_socket();