From b40d60e015451b1acf3294498400b7f50a3372fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Sat, 28 Dec 2024 07:06:05 +0100 Subject: [PATCH] add a NngServerSink and somehow get the plugin to link Things are messy, doesn't link with clang in vscode, works in the shell. --- CMakeLists.txt | 1 + src/CMakeLists.txt | 12 +++ src/internal/mana_nng.hpp | 156 ++++++++++++++++++++++++++++++++-- src/mana_plugin_nng_server.cc | 56 ++++++++++++ 4 files changed, 216 insertions(+), 9 deletions(-) create mode 100644 src/mana_plugin_nng_server.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index f04e231..acc2ad5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,7 @@ set(CMAKE_VERBOSE_MAKEFILE ON CACHE BOOL "ON") set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_POSITION_INDEPENDENT_CODE ON) option(MNODE_BUILD_TESTS "Build mnode test binaries" ${MESYTEC_MNODE_MAIN_PROJECT}) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a5d02a0..33a855d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,11 @@ set(MVLC_NNG_MNODE_WARN_FLAGS -Wall -Wextra -Wpedantic -Werror=return-type) +add_library(mesytec-mnode-nng mnode_nng.cc) +target_include_directories(mesytec-mnode-nng + PUBLIC $ + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/mesytec-mnode) +target_link_libraries(mesytec-mnode-nng PUBLIC nng PRIVATE spdlog) + add_library(mesytec-mnode mnode_nng.cc mnode_nng_async.cc mnode_nng_proto.cc) target_include_directories(mesytec-mnode PUBLIC $ @@ -10,6 +16,9 @@ target_compile_options(mesytec-mnode PRIVATE ${MVLC_NNG_MNODE_WARN_FLAGS}) add_library(mana INTERFACE) target_link_libraries(mana INTERFACE nlohmann_json::nlohmann_json mesytec-mvlc) +target_include_directories(mana + INTERFACE $ + INTERFACE $) add_subdirectory(tools) @@ -47,3 +56,6 @@ if (pybind11_FOUND) target_link_libraries(mana-plugin-python PRIVATE mana mesytec-mnode rxi-logc pybind11::embed) file(CREATE_LINK ${CMAKE_CURRENT_SOURCE_DIR}/mana_python_test_plugin.py ${CMAKE_BINARY_DIR}/mana_python_test_plugin.py COPY_ON_ERROR SYMBOLIC) endif() + +add_library(mana-plugin-nng-server SHARED mana_plugin_nng_server.cc) +target_link_libraries(mana-plugin-nng-server PRIVATE mesytec-mnode-nng mana rxi-logc) diff --git a/src/internal/mana_nng.hpp b/src/internal/mana_nng.hpp index 69cd1a1..d43644f 100644 --- a/src/internal/mana_nng.hpp +++ b/src/internal/mana_nng.hpp @@ -7,11 +7,32 @@ namespace mesytec::mnode::mana { -class NngServer: public IManaSink +struct MessageHeader +{ + enum MessageType + { + BeginRun, + EndRun, + EventData, + }; + + u32 messageType; +}; + +// TODO: don't need sizeBytes as the client can infer from the descriptor in begin_run() +// could transmit eventindex and array count as u16|u16 or sequence number and eventindex +struct EventDataHeader +{ + u32 eventIndex; + u32 sizeBytes; +}; + +class NngServerSink: public IManaSink { public: - explicit NngServer(nng_socket socket) + explicit NngServerSink(nng_socket socket) : socket_(socket) + , msg_(nng::make_unique_msg()) { } @@ -21,19 +42,110 @@ class NngServer: public IManaSink (void)plugin_argv; } - void shutdown() override { nng_close(socket_); } + void shutdown() override {} - void begin_run(const char *descriptor_json) override { (void)descriptor_json; } + void begin_run(const char *descriptor_json) override + { + auto len = std::strlen(descriptor_json); + auto required = sizeof(MessageHeader) + len + 1; + auto msg = nng::allocate_message(required); + MessageHeader header{MessageHeader::BeginRun}; + nng_msg_append(msg.get(), &header, sizeof(header)); + nng_msg_append(msg.get(), descriptor_json, len + 1); - void end_run(const char *descriptor_json) override { (void)descriptor_json; } + spdlog::info("Sending BeginRun message"); + int res = 0; + + do + { + if ((res = nng::send_message(socket_, msg))) + { + if (res != NNG_ETIMEDOUT) + { + nng::mnode_nng_error("nng_sendmsg", res); + return; + } + } + } + while (res == NNG_ETIMEDOUT); + + spdlog::info("Waiting for response from client"); + + while (true) + { + auto [msg, res] = nng::receive_message(socket_); + + if (res && res != NNG_ETIMEDOUT) + { + nng::mnode_nng_error("nng_recvmsg", res); + return; + } + else if (!res) + break; + } + } + + void end_run(const char *descriptor_json) override + { + auto len = std::strlen(descriptor_json); + auto required = sizeof(MessageHeader) + len + 1; + auto msg = nng::allocate_message(required); + MessageHeader header{MessageHeader::EndRun}; + nng_msg_append(msg.get(), &header, sizeof(header)); + nng_msg_append(msg.get(), descriptor_json, len + 1); + spdlog::info("Sending EndRun message"); + int res = 0; + + do + { + if ((res = nng::send_message(socket_, msg))) + { + if (res != NNG_ETIMEDOUT) + { + nng::mnode_nng_error("nng_sendmsg", res); + return; + } + } + } + while (res == NNG_ETIMEDOUT); + } void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount, size_t totalBytes) override { - (void)eventIndex; - (void)arrays; - (void)arrayCount; - (void)totalBytes; + if (!msg_) + { + msg_ = nng::allocate_reserve_message(InitialMessageReserve); + MessageHeader header{MessageHeader::EventData}; + nng_msg_append(msg_.get(), &header, sizeof(header)); + } + + const size_t required = sizeof(EventDataHeader) + totalBytes; + + if (nng::allocated_free_space(msg_.get()) < required && + nng_msg_len(msg_.get()) > sizeof(MessageHeader)) + { + if (!flush()) + return; + + assert(!msg_); + msg_ = nng::allocate_reserve_message(InitialMessageReserve); + MessageHeader header{MessageHeader::EventData}; + nng_msg_append(msg_.get(), &header, sizeof(header)); + } + + // second check in case the event is larger than the initial reserve + if (nng::allocated_free_space(msg_.get()) < required) + nng_msg_reserve(msg_.get(), required); + + assert(nng::allocated_free_space(msg_.get()) >= required); + assert(nng_msg_len(msg_.get()) >= sizeof(MessageHeader)); + + EventDataHeader header{}; + header.eventIndex = eventIndex; + header.sizeBytes = totalBytes; + nng_msg_append(msg_.get(), &header, sizeof(header)); + nng_msg_append(msg_.get(), arrays, totalBytes); } void process_system_event(const uint32_t *data, size_t size) override @@ -43,7 +155,33 @@ class NngServer: public IManaSink } private: + bool flush() + { + if (!msg_) + return false; + + spdlog::info("Sending EventData message of size {}", nng_msg_len(msg_.get())); + + while (true) + { + if (auto res = nng::send_message(socket_, msg_)) + { + if (res != NNG_ETIMEDOUT) + { + nng::mnode_nng_error("nng_sendmsg", res); + return false; + } + } + else + break; + } + + return true; + } + + static constexpr size_t InitialMessageReserve = 2 * 1024 * 1024; nng_socket socket_; + nng::unique_msg msg_; }; } // namespace mesytec::mnode::mana diff --git a/src/mana_plugin_nng_server.cc b/src/mana_plugin_nng_server.cc new file mode 100644 index 0000000..a50217a --- /dev/null +++ b/src/mana_plugin_nng_server.cc @@ -0,0 +1,56 @@ +#include "internal/mana_nng.hpp" +#include "internal/rxi/log.hpp" + +using namespace mesytec::mnode; +using namespace mesytec::mnode::mana; + +class Plugin: public IManaPlugin +{ + public: + explicit Plugin(nng_socket socket) + : socket_(socket) + { + } + + ~Plugin() override + { +#if 0 + if (nng_socket_id(socket_) >= 0) + { + nng_close(socket_); + socket_ = NNG_SOCKET_INITIALIZER; + } +#endif + } + + std::unique_ptr makeSink() override + { + return std::make_unique(socket_); + } + + private: + nng_socket socket_; +}; + +MANA_CPP_PLUGIN() +{ + static Plugin *plugin; + static const char *listenUrl = "ipc://mana_nng_server.socket"; + + if (plugin) + { + log_warn("plugin already created"); + return plugin; + } + + nng_socket socket = nng::make_pair_socket(); + if (int res = nng_listen(socket, listenUrl, nullptr, 0)) + { + nng::mnode_nng_error("nng_listen", res); + return nullptr; + } + + log_info("listening on %s", listenUrl); + + return plugin = new Plugin(socket); +}