add a NngServerSink and somehow get the plugin to link

Things are messy, doesn't link with clang in vscode, works in the shell.
This commit is contained in:
Florian Lüke 2024-12-28 07:06:05 +01:00
parent c08f339c95
commit b40d60e015
4 changed files with 216 additions and 9 deletions

View file

@ -16,6 +16,7 @@ set(CMAKE_VERBOSE_MAKEFILE ON CACHE BOOL "ON")
set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
option(MNODE_BUILD_TESTS "Build mnode test binaries" ${MESYTEC_MNODE_MAIN_PROJECT}) option(MNODE_BUILD_TESTS "Build mnode test binaries" ${MESYTEC_MNODE_MAIN_PROJECT})

View file

@ -1,5 +1,11 @@
set(MVLC_NNG_MNODE_WARN_FLAGS -Wall -Wextra -Wpedantic -Werror=return-type) set(MVLC_NNG_MNODE_WARN_FLAGS -Wall -Wextra -Wpedantic -Werror=return-type)
add_library(mesytec-mnode-nng mnode_nng.cc)
target_include_directories(mesytec-mnode-nng
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/mesytec-mnode)
target_link_libraries(mesytec-mnode-nng PUBLIC nng PRIVATE spdlog)
add_library(mesytec-mnode mnode_nng.cc mnode_nng_async.cc mnode_nng_proto.cc) add_library(mesytec-mnode mnode_nng.cc mnode_nng_async.cc mnode_nng_proto.cc)
target_include_directories(mesytec-mnode target_include_directories(mesytec-mnode
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include> PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>
@ -10,6 +16,9 @@ target_compile_options(mesytec-mnode PRIVATE ${MVLC_NNG_MNODE_WARN_FLAGS})
add_library(mana INTERFACE) add_library(mana INTERFACE)
target_link_libraries(mana INTERFACE nlohmann_json::nlohmann_json mesytec-mvlc) target_link_libraries(mana INTERFACE nlohmann_json::nlohmann_json mesytec-mvlc)
target_include_directories(mana
INTERFACE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>
INTERFACE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/mesytec-mnode>)
add_subdirectory(tools) add_subdirectory(tools)
@ -47,3 +56,6 @@ if (pybind11_FOUND)
target_link_libraries(mana-plugin-python PRIVATE mana mesytec-mnode rxi-logc pybind11::embed) target_link_libraries(mana-plugin-python PRIVATE mana mesytec-mnode rxi-logc pybind11::embed)
file(CREATE_LINK ${CMAKE_CURRENT_SOURCE_DIR}/mana_python_test_plugin.py ${CMAKE_BINARY_DIR}/mana_python_test_plugin.py COPY_ON_ERROR SYMBOLIC) file(CREATE_LINK ${CMAKE_CURRENT_SOURCE_DIR}/mana_python_test_plugin.py ${CMAKE_BINARY_DIR}/mana_python_test_plugin.py COPY_ON_ERROR SYMBOLIC)
endif() endif()
add_library(mana-plugin-nng-server SHARED mana_plugin_nng_server.cc)
target_link_libraries(mana-plugin-nng-server PRIVATE mesytec-mnode-nng mana rxi-logc)

View file

@ -7,11 +7,32 @@
namespace mesytec::mnode::mana namespace mesytec::mnode::mana
{ {
class NngServer: public IManaSink struct MessageHeader
{
enum MessageType
{
BeginRun,
EndRun,
EventData,
};
u32 messageType;
};
// TODO: don't need sizeBytes as the client can infer from the descriptor in begin_run()
// could transmit eventindex and array count as u16|u16 or sequence number and eventindex
struct EventDataHeader
{
u32 eventIndex;
u32 sizeBytes;
};
class NngServerSink: public IManaSink
{ {
public: public:
explicit NngServer(nng_socket socket) explicit NngServerSink(nng_socket socket)
: socket_(socket) : socket_(socket)
, msg_(nng::make_unique_msg())
{ {
} }
@ -21,19 +42,110 @@ class NngServer: public IManaSink
(void)plugin_argv; (void)plugin_argv;
} }
void shutdown() override { nng_close(socket_); } void shutdown() override {}
void begin_run(const char *descriptor_json) override { (void)descriptor_json; } void begin_run(const char *descriptor_json) override
{
auto len = std::strlen(descriptor_json);
auto required = sizeof(MessageHeader) + len + 1;
auto msg = nng::allocate_message(required);
MessageHeader header{MessageHeader::BeginRun};
nng_msg_append(msg.get(), &header, sizeof(header));
nng_msg_append(msg.get(), descriptor_json, len + 1);
void end_run(const char *descriptor_json) override { (void)descriptor_json; } spdlog::info("Sending BeginRun message");
int res = 0;
do
{
if ((res = nng::send_message(socket_, msg)))
{
if (res != NNG_ETIMEDOUT)
{
nng::mnode_nng_error("nng_sendmsg", res);
return;
}
}
}
while (res == NNG_ETIMEDOUT);
spdlog::info("Waiting for response from client");
while (true)
{
auto [msg, res] = nng::receive_message(socket_);
if (res && res != NNG_ETIMEDOUT)
{
nng::mnode_nng_error("nng_recvmsg", res);
return;
}
else if (!res)
break;
}
}
void end_run(const char *descriptor_json) override
{
auto len = std::strlen(descriptor_json);
auto required = sizeof(MessageHeader) + len + 1;
auto msg = nng::allocate_message(required);
MessageHeader header{MessageHeader::EndRun};
nng_msg_append(msg.get(), &header, sizeof(header));
nng_msg_append(msg.get(), descriptor_json, len + 1);
spdlog::info("Sending EndRun message");
int res = 0;
do
{
if ((res = nng::send_message(socket_, msg)))
{
if (res != NNG_ETIMEDOUT)
{
nng::mnode_nng_error("nng_sendmsg", res);
return;
}
}
}
while (res == NNG_ETIMEDOUT);
}
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount, void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
size_t totalBytes) override size_t totalBytes) override
{ {
(void)eventIndex; if (!msg_)
(void)arrays; {
(void)arrayCount; msg_ = nng::allocate_reserve_message(InitialMessageReserve);
(void)totalBytes; MessageHeader header{MessageHeader::EventData};
nng_msg_append(msg_.get(), &header, sizeof(header));
}
const size_t required = sizeof(EventDataHeader) + totalBytes;
if (nng::allocated_free_space(msg_.get()) < required &&
nng_msg_len(msg_.get()) > sizeof(MessageHeader))
{
if (!flush())
return;
assert(!msg_);
msg_ = nng::allocate_reserve_message(InitialMessageReserve);
MessageHeader header{MessageHeader::EventData};
nng_msg_append(msg_.get(), &header, sizeof(header));
}
// second check in case the event is larger than the initial reserve
if (nng::allocated_free_space(msg_.get()) < required)
nng_msg_reserve(msg_.get(), required);
assert(nng::allocated_free_space(msg_.get()) >= required);
assert(nng_msg_len(msg_.get()) >= sizeof(MessageHeader));
EventDataHeader header{};
header.eventIndex = eventIndex;
header.sizeBytes = totalBytes;
nng_msg_append(msg_.get(), &header, sizeof(header));
nng_msg_append(msg_.get(), arrays, totalBytes);
} }
void process_system_event(const uint32_t *data, size_t size) override void process_system_event(const uint32_t *data, size_t size) override
@ -43,7 +155,33 @@ class NngServer: public IManaSink
} }
private: private:
bool flush()
{
if (!msg_)
return false;
spdlog::info("Sending EventData message of size {}", nng_msg_len(msg_.get()));
while (true)
{
if (auto res = nng::send_message(socket_, msg_))
{
if (res != NNG_ETIMEDOUT)
{
nng::mnode_nng_error("nng_sendmsg", res);
return false;
}
}
else
break;
}
return true;
}
static constexpr size_t InitialMessageReserve = 2 * 1024 * 1024;
nng_socket socket_; nng_socket socket_;
nng::unique_msg msg_;
}; };
} // namespace mesytec::mnode::mana } // namespace mesytec::mnode::mana

View file

@ -0,0 +1,56 @@
#include "internal/mana_nng.hpp"
#include "internal/rxi/log.hpp"
using namespace mesytec::mnode;
using namespace mesytec::mnode::mana;
class Plugin: public IManaPlugin
{
public:
explicit Plugin(nng_socket socket)
: socket_(socket)
{
}
~Plugin() override
{
#if 0
if (nng_socket_id(socket_) >= 0)
{
nng_close(socket_);
socket_ = NNG_SOCKET_INITIALIZER;
}
#endif
}
std::unique_ptr<IManaSink> makeSink() override
{
return std::make_unique<NngServerSink>(socket_);
}
private:
nng_socket socket_;
};
MANA_CPP_PLUGIN()
{
static Plugin *plugin;
static const char *listenUrl = "ipc://mana_nng_server.socket";
if (plugin)
{
log_warn("plugin already created");
return plugin;
}
nng_socket socket = nng::make_pair_socket();
if (int res = nng_listen(socket, listenUrl, nullptr, 0))
{
nng::mnode_nng_error("nng_listen", res);
return nullptr;
}
log_info("listening on %s", listenUrl);
return plugin = new Plugin(socket);
}