From 8a2c2aa7fd2f2ace1aa9e2f65c1bdc91e75e3f8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Tue, 7 May 2024 14:59:06 +0200 Subject: [PATCH] add pub sub test programs Lesson learned: subscription working for empty messages can be done, but needs nng_socket_set() instead of nng_socket_set_string(). --- src/CMakeLists.txt | 10 ++++++++++ src/mvlc_nng_replay.cc | 2 +- src/pub_producer.cc | 36 +++++++++++++++++++++++++++++++++++ src/sub_consumer.cc | 43 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 src/pub_producer.cc create mode 100644 src/sub_consumer.cc diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 21d3bd2..f719490 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -50,5 +50,15 @@ target_compile_features(mesy_nng_push_pull_main PRIVATE cxx_std_17) target_link_libraries(mesy_nng_push_pull_main PRIVATE mesytec-mvlc PRIVATE nng) target_compile_options(mesy_nng_push_pull_main PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) +add_executable(mesy_nng_pub_producer pub_producer.cc) +target_compile_features(mesy_nng_pub_producer PRIVATE cxx_std_17) +target_link_libraries(mesy_nng_pub_producer PRIVATE mesytec-mvlc PRIVATE nng) +target_compile_options(mesy_nng_pub_producer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) + +add_executable(mesy_nng_sub_consumer sub_consumer.cc) +target_compile_features(mesy_nng_sub_consumer PRIVATE cxx_std_17) +target_link_libraries(mesy_nng_sub_consumer PRIVATE mesytec-mvlc PRIVATE nng) +target_compile_options(mesy_nng_sub_consumer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) + #unset(CMAKE_C_CLANG_TIDY) #unset(CMAKE_CXX_CLANG_TIDY) diff --git a/src/mvlc_nng_replay.cc b/src/mvlc_nng_replay.cc index 6d08d4e..1653165 100644 --- a/src/mvlc_nng_replay.cc +++ b/src/mvlc_nng_replay.cc @@ -285,7 +285,7 @@ void parser_nng_eventdata(void *ctx_, int crateIndex, int eventIndex, if (!msg && !parser_maybe_alloc_output(ctx)) return; - bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; + bytesFree = msg ? DefaultOutputMessageReserve - nng_msg_len(msg) : 0u; assert(bytesFree >= requiredBytes); ParsedDataEventHeader eventHeader = diff --git a/src/pub_producer.cc b/src/pub_producer.cc new file mode 100644 index 0000000..9115224 --- /dev/null +++ b/src/pub_producer.cc @@ -0,0 +1,36 @@ +#include "common.h" +#include +#include +#include + +int main(int argc, char *argv[]) +{ + spdlog::set_level(spdlog::level::info); + + nng_socket pubSocket = NNG_SOCKET_INITIALIZER; + + if (int res = nng_pub0_open(&pubSocket)) + mesy_nng_fatal("nng_pub0_open", res); + + if (int res = nng_listen(pubSocket, "tcp://*:42777", nullptr, 0)) + mesy_nng_fatal("nng_listen tcp", res); + + while (true) + { + std::cout << "Enter size of pub message:"; + size_t size = 0; + std::cin >> size; + + fmt::print("Publishing message of size {}\n", size); + + nng_msg *msg = nullptr; + + if (int res = nng_msg_alloc(&msg, size)) + mesy_nng_fatal("nng_msg_alloc", res); + + if (auto res = nng_sendmsg(pubSocket, msg, 0)) + mesy_nng_fatal("nng_sendmsg", res); + } + + return 0; +} diff --git a/src/sub_consumer.cc b/src/sub_consumer.cc new file mode 100644 index 0000000..1a61fd0 --- /dev/null +++ b/src/sub_consumer.cc @@ -0,0 +1,43 @@ +#include "common.h" +#include +#include +#include + +int main(int argc, char *argv[]) +{ + spdlog::set_level(spdlog::level::info); + + nng_socket subSocket = NNG_SOCKET_INITIALIZER; + + if (int res = nng_sub0_open(&subSocket)) + mesy_nng_fatal("nng_sub0_open", res); + + //if (int res = nng_socket_set_string(subSocket, NNG_OPT_SUB_SUBSCRIBE, "")) + // mesy_nng_fatal("consumer socket subscribe", res); + + if (int res = nng_socket_set(subSocket, NNG_OPT_SUB_SUBSCRIBE, nullptr, 0)) + mesy_nng_fatal("consumer socket subscribe", res); + + if (int res = nng_dial(subSocket, "tcp://127.0.0.1:42777", nullptr, 0)) + mesy_nng_fatal("nng_dial tcp", res); + + while (true) + { + std::cout << "Waiting for incoming message...\n"; + nng_msg *msg = nullptr; + + if (auto res = receive_message(subSocket, &msg)) + { + if (res != NNG_ETIMEDOUT) + mesy_nng_fatal("receive_message", res); + spdlog::warn("consumer timed out in recv"); + continue; + } + + fmt::print("Received message of size {}\n", nng_msg_len(msg)); + + nng_msg_free(msg); + } + + return 0; +}