NngServerSink fixes and use an absolute ipc socket url for now

This commit is contained in:
Florian Lüke 2024-12-29 00:15:01 +01:00
parent af8ea1db47
commit 99949f08e4
2 changed files with 19 additions and 8 deletions

View file

@ -13,7 +13,7 @@ struct MessageHeader
{ {
enum MessageType enum MessageType
{ {
BeginRun, BeginRun = 42,
EndRun, EndRun,
EventData, EventData,
}; };
@ -25,7 +25,8 @@ struct MessageHeader
// could transmit eventindex and array count as u16|u16 or sequence number and eventindex // could transmit eventindex and array count as u16|u16 or sequence number and eventindex
struct EventDataHeader struct EventDataHeader
{ {
u32 eventIndex; u16 eventIndex;
u16 arrayCount;
u32 sizeBytes; u32 sizeBytes;
}; };
@ -50,12 +51,12 @@ class NngServerSink: public IManaSink
{ {
auto len = std::strlen(descriptor_json); auto len = std::strlen(descriptor_json);
auto required = sizeof(MessageHeader) + len + 1; auto required = sizeof(MessageHeader) + len + 1;
auto msg = nng::allocate_message(required); auto msg = nng::allocate_reserve_message(required);
MessageHeader header{MessageHeader::BeginRun}; MessageHeader header{MessageHeader::BeginRun};
nng_msg_append(msg.get(), &header, sizeof(header)); nng_msg_append(msg.get(), &header, sizeof(header));
nng_msg_append(msg.get(), descriptor_json, len + 1); nng_msg_append(msg.get(), descriptor_json, len + 1);
spdlog::info("Sending BeginRun message"); spdlog::info("Sending BeginRun message (size={})", nng_msg_len(msg.get()));
int res = 0; int res = 0;
do do
@ -83,19 +84,28 @@ class NngServerSink: public IManaSink
return; return;
} }
else if (!res) else if (!res)
{
spdlog::info("begin_run: Received response from client");
break; break;
}
} }
} }
void end_run(const char *descriptor_json) override void end_run(const char *descriptor_json) override
{ {
if (msg_)
{
if (!flush())
return;
}
auto len = std::strlen(descriptor_json); auto len = std::strlen(descriptor_json);
auto required = sizeof(MessageHeader) + len + 1; auto required = sizeof(MessageHeader) + len + 1;
auto msg = nng::allocate_message(required); auto msg = nng::allocate_reserve_message(required);
MessageHeader header{MessageHeader::EndRun}; MessageHeader header{MessageHeader::EndRun};
nng_msg_append(msg.get(), &header, sizeof(header)); nng_msg_append(msg.get(), &header, sizeof(header));
nng_msg_append(msg.get(), descriptor_json, len + 1); nng_msg_append(msg.get(), descriptor_json, len + 1);
spdlog::info("Sending EndRun message"); spdlog::info("Sending EndRun message (size={})", nng_msg_len(msg.get()));
int res = 0; int res = 0;
do do
@ -145,6 +155,7 @@ class NngServerSink: public IManaSink
EventDataHeader header{}; EventDataHeader header{};
header.eventIndex = eventIndex; header.eventIndex = eventIndex;
header.arrayCount = arrayCount;
header.sizeBytes = totalBytes; header.sizeBytes = totalBytes;
nng_msg_append(msg_.get(), &header, sizeof(header)); nng_msg_append(msg_.get(), &header, sizeof(header));
nng_msg_append(msg_.get(), arrays, totalBytes); nng_msg_append(msg_.get(), arrays, totalBytes);
@ -162,7 +173,7 @@ class NngServerSink: public IManaSink
if (!msg_) if (!msg_)
return false; return false;
spdlog::info("Sending EventData message of size {}", nng_msg_len(msg_.get())); spdlog::debug("Sending EventData message of size {}", nng_msg_len(msg_.get()));
while (true) while (true)
{ {

View file

@ -35,7 +35,7 @@ class Plugin: public IManaPlugin
MANA_CPP_PLUGIN() MANA_CPP_PLUGIN()
{ {
static Plugin *plugin; static Plugin *plugin;
static const char *listenUrl = "ipc://mana_nng_server.socket"; static const char *listenUrl = "ipc:///tmp/mana_nng_server.socket";
if (plugin) if (plugin)
{ {