diff --git a/src/internal/mana_nng.hpp b/src/internal/mana_nng.hpp index 32b0bb1..9d99630 100644 --- a/src/internal/mana_nng.hpp +++ b/src/internal/mana_nng.hpp @@ -13,7 +13,7 @@ struct MessageHeader { enum MessageType { - BeginRun, + BeginRun = 42, EndRun, EventData, }; @@ -25,7 +25,8 @@ struct MessageHeader // could transmit eventindex and array count as u16|u16 or sequence number and eventindex struct EventDataHeader { - u32 eventIndex; + u16 eventIndex; + u16 arrayCount; u32 sizeBytes; }; @@ -50,12 +51,12 @@ class NngServerSink: public IManaSink { auto len = std::strlen(descriptor_json); auto required = sizeof(MessageHeader) + len + 1; - auto msg = nng::allocate_message(required); + auto msg = nng::allocate_reserve_message(required); MessageHeader header{MessageHeader::BeginRun}; nng_msg_append(msg.get(), &header, sizeof(header)); nng_msg_append(msg.get(), descriptor_json, len + 1); - spdlog::info("Sending BeginRun message"); + spdlog::info("Sending BeginRun message (size={})", nng_msg_len(msg.get())); int res = 0; do @@ -83,19 +84,28 @@ class NngServerSink: public IManaSink return; } else if (!res) + { + spdlog::info("begin_run: Received response from client"); break; + } } } void end_run(const char *descriptor_json) override { + if (msg_) + { + if (!flush()) + return; + } + auto len = std::strlen(descriptor_json); auto required = sizeof(MessageHeader) + len + 1; - auto msg = nng::allocate_message(required); + auto msg = nng::allocate_reserve_message(required); MessageHeader header{MessageHeader::EndRun}; nng_msg_append(msg.get(), &header, sizeof(header)); nng_msg_append(msg.get(), descriptor_json, len + 1); - spdlog::info("Sending EndRun message"); + spdlog::info("Sending EndRun message (size={})", nng_msg_len(msg.get())); int res = 0; do @@ -145,6 +155,7 @@ class NngServerSink: public IManaSink EventDataHeader header{}; header.eventIndex = eventIndex; + header.arrayCount = arrayCount; header.sizeBytes = totalBytes; nng_msg_append(msg_.get(), &header, sizeof(header)); nng_msg_append(msg_.get(), arrays, totalBytes); @@ -162,7 +173,7 @@ class NngServerSink: public IManaSink if (!msg_) return false; - spdlog::info("Sending EventData message of size {}", nng_msg_len(msg_.get())); + spdlog::debug("Sending EventData message of size {}", nng_msg_len(msg_.get())); while (true) { diff --git a/src/mana_plugin_nng_server.cc b/src/mana_plugin_nng_server.cc index a50217a..1bab998 100644 --- a/src/mana_plugin_nng_server.cc +++ b/src/mana_plugin_nng_server.cc @@ -35,7 +35,7 @@ class Plugin: public IManaPlugin MANA_CPP_PLUGIN() { static Plugin *plugin; - static const char *listenUrl = "ipc://mana_nng_server.socket"; + static const char *listenUrl = "ipc:///tmp/mana_nng_server.socket"; if (plugin) {