add pub sub test programs
Lesson learned: subscription working for empty messages can be done, but needs nng_socket_set() instead of nng_socket_set_string().
This commit is contained in:
parent
a423e602aa
commit
8a2c2aa7fd
4 changed files with 90 additions and 1 deletions
|
|
@ -50,5 +50,15 @@ target_compile_features(mesy_nng_push_pull_main PRIVATE cxx_std_17)
|
||||||
target_link_libraries(mesy_nng_push_pull_main PRIVATE mesytec-mvlc PRIVATE nng)
|
target_link_libraries(mesy_nng_push_pull_main PRIVATE mesytec-mvlc PRIVATE nng)
|
||||||
target_compile_options(mesy_nng_push_pull_main PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
|
target_compile_options(mesy_nng_push_pull_main PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
|
||||||
|
|
||||||
|
add_executable(mesy_nng_pub_producer pub_producer.cc)
|
||||||
|
target_compile_features(mesy_nng_pub_producer PRIVATE cxx_std_17)
|
||||||
|
target_link_libraries(mesy_nng_pub_producer PRIVATE mesytec-mvlc PRIVATE nng)
|
||||||
|
target_compile_options(mesy_nng_pub_producer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
|
||||||
|
|
||||||
|
add_executable(mesy_nng_sub_consumer sub_consumer.cc)
|
||||||
|
target_compile_features(mesy_nng_sub_consumer PRIVATE cxx_std_17)
|
||||||
|
target_link_libraries(mesy_nng_sub_consumer PRIVATE mesytec-mvlc PRIVATE nng)
|
||||||
|
target_compile_options(mesy_nng_sub_consumer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
|
||||||
|
|
||||||
#unset(CMAKE_C_CLANG_TIDY)
|
#unset(CMAKE_C_CLANG_TIDY)
|
||||||
#unset(CMAKE_CXX_CLANG_TIDY)
|
#unset(CMAKE_CXX_CLANG_TIDY)
|
||||||
|
|
|
||||||
36
src/pub_producer.cc
Normal file
36
src/pub_producer.cc
Normal file
|
|
@ -0,0 +1,36 @@
|
||||||
|
#include "common.h"
|
||||||
|
#include <nng/protocol/pubsub0/pub.h>
|
||||||
|
#include <thread>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
int main(int argc, char *argv[])
|
||||||
|
{
|
||||||
|
spdlog::set_level(spdlog::level::info);
|
||||||
|
|
||||||
|
nng_socket pubSocket = NNG_SOCKET_INITIALIZER;
|
||||||
|
|
||||||
|
if (int res = nng_pub0_open(&pubSocket))
|
||||||
|
mesy_nng_fatal("nng_pub0_open", res);
|
||||||
|
|
||||||
|
if (int res = nng_listen(pubSocket, "tcp://*:42777", nullptr, 0))
|
||||||
|
mesy_nng_fatal("nng_listen tcp", res);
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
std::cout << "Enter size of pub message:";
|
||||||
|
size_t size = 0;
|
||||||
|
std::cin >> size;
|
||||||
|
|
||||||
|
fmt::print("Publishing message of size {}\n", size);
|
||||||
|
|
||||||
|
nng_msg *msg = nullptr;
|
||||||
|
|
||||||
|
if (int res = nng_msg_alloc(&msg, size))
|
||||||
|
mesy_nng_fatal("nng_msg_alloc", res);
|
||||||
|
|
||||||
|
if (auto res = nng_sendmsg(pubSocket, msg, 0))
|
||||||
|
mesy_nng_fatal("nng_sendmsg", res);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
43
src/sub_consumer.cc
Normal file
43
src/sub_consumer.cc
Normal file
|
|
@ -0,0 +1,43 @@
|
||||||
|
#include "common.h"
|
||||||
|
#include <nng/protocol/pubsub0/sub.h>
|
||||||
|
#include <thread>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
int main(int argc, char *argv[])
|
||||||
|
{
|
||||||
|
spdlog::set_level(spdlog::level::info);
|
||||||
|
|
||||||
|
nng_socket subSocket = NNG_SOCKET_INITIALIZER;
|
||||||
|
|
||||||
|
if (int res = nng_sub0_open(&subSocket))
|
||||||
|
mesy_nng_fatal("nng_sub0_open", res);
|
||||||
|
|
||||||
|
//if (int res = nng_socket_set_string(subSocket, NNG_OPT_SUB_SUBSCRIBE, ""))
|
||||||
|
// mesy_nng_fatal("consumer socket subscribe", res);
|
||||||
|
|
||||||
|
if (int res = nng_socket_set(subSocket, NNG_OPT_SUB_SUBSCRIBE, nullptr, 0))
|
||||||
|
mesy_nng_fatal("consumer socket subscribe", res);
|
||||||
|
|
||||||
|
if (int res = nng_dial(subSocket, "tcp://127.0.0.1:42777", nullptr, 0))
|
||||||
|
mesy_nng_fatal("nng_dial tcp", res);
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
std::cout << "Waiting for incoming message...\n";
|
||||||
|
nng_msg *msg = nullptr;
|
||||||
|
|
||||||
|
if (auto res = receive_message(subSocket, &msg))
|
||||||
|
{
|
||||||
|
if (res != NNG_ETIMEDOUT)
|
||||||
|
mesy_nng_fatal("receive_message", res);
|
||||||
|
spdlog::warn("consumer timed out in recv");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt::print("Received message of size {}\n", nng_msg_len(msg));
|
||||||
|
|
||||||
|
nng_msg_free(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue