From 78cab52b91b3e2101c1063ac61299da0bf815950 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Thu, 21 Nov 2024 22:23:52 +0100 Subject: [PATCH] add sync reqrep ping pong --- proto/service.proto | 4 ++ src/CMakeLists.txt | 2 +- src/mesy_nng.h | 25 ++++++++++++ src/mnode_proto_test1.cc | 84 +++++++++++++++++++++++++++++++++++++--- 4 files changed, 109 insertions(+), 6 deletions(-) diff --git a/proto/service.proto b/proto/service.proto index 0044b69..2680a1e 100644 --- a/proto/service.proto +++ b/proto/service.proto @@ -9,7 +9,11 @@ message SearchRequest { } message Ping { + int32 peer_id = 1; + uint32 sequence_number = 2; } message Pong { + int32 peer_id = 1; + uint32 sequence_number = 2; } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5f02f85..77235a2 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -61,7 +61,7 @@ target_link_libraries(mesy_nng_sub_consumer PRIVATE mesytec-mvlc PRIVATE nng) target_compile_options(mesy_nng_sub_consumer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) add_executable(mnode-proto-test1 mnode_proto_test1.cc thread_name.cc) -target_link_libraries(mnode-proto-test1 PRIVATE mnode-proto) +target_link_libraries(mnode-proto-test1 PRIVATE mnode-proto PRIVATE mesytec-mvlc PRIVATE nng) #unset(CMAKE_C_CLANG_TIDY) #unset(CMAKE_CXX_CLANG_TIDY) diff --git a/src/mesy_nng.h b/src/mesy_nng.h index 2b9b45a..8f5ed95 100644 --- a/src/mesy_nng.h +++ b/src/mesy_nng.h @@ -7,6 +7,8 @@ #include #include #include +#include +#include #include #include @@ -141,6 +143,16 @@ inline nng_socket make_sub_socket(nng_duration timeout = DefaultTimeout) return make_socket(nng_sub0_open, timeout); } +inline nng_socket make_req_socket(nng_duration timeout = DefaultTimeout) +{ + return make_socket(nng_req0_open, timeout); +} + +inline nng_socket make_rep_socket(nng_duration timeout = DefaultTimeout) +{ + return make_socket(nng_rep0_open, timeout); +} + inline std::string socket_get_string_opt(nng_socket s, const char *opt) { char *dest = nullptr; @@ -422,6 +434,19 @@ inline int marry_listen_dial(nng_socket listen, nng_socket dial, const char *url return 0; } +inline unique_msg make_message(const std::string &data) +{ + nng_msg *msg = nullptr; + if (int res = nng_msg_alloc(&msg, data.size())) + { + mesy_nng_error("nng_msg_alloc", res); + return unique_msg(nullptr, nng_msg_free); + } + + std::memcpy(nng_msg_body(msg), data.data(), data.size()); + return unique_msg(msg, nng_msg_free); +} + } #endif /* B18E3651_CA9A_43BC_AA25_810EA16533CD */ diff --git a/src/mnode_proto_test1.cc b/src/mnode_proto_test1.cc index 85ea08c..f0f5690 100644 --- a/src/mnode_proto_test1.cc +++ b/src/mnode_proto_test1.cc @@ -1,11 +1,85 @@ #include "proto/service.pb.h" +#include "mesy_nng.h" + +using namespace mesytec; + +void requester(nng_socket socket, unsigned id) +{ + std::uint32_t seq = 0; + while (true) + { + mnode::Ping ping; + ping.set_peer_id(id); + ping.set_sequence_number(seq++); + auto reqMsg = nng::make_message(ping.SerializeAsString()); + if (int rc = nng_sendmsg(socket, reqMsg.release(), 0)) + { + nng::mesy_nng_error("nng_sendmsg", rc); + if (rc != NNG_ETIMEDOUT) + return; + continue; + } + spdlog::info("Sent ping request: {}", ping.ShortDebugString()); + auto [repMsg, rc] = nng::receive_message(socket); + if (rc) + { + nng::mesy_nng_error("nng_recvmsg", rc); + if (rc != NNG_ETIMEDOUT) + return; + continue; + } + mnode::Pong pong; + pong.ParseFromArray(nng_msg_body(repMsg.get()), nng_msg_len(repMsg.get())); + spdlog::info("Received pong response: {}", pong.ShortDebugString()); + } +} + +void responder(nng_socket socket, unsigned id) +{ + while (true) + { + auto [reqMsg, rc] = nng::receive_message(socket); + if (rc) + { + nng::mesy_nng_error("nng_recvmsg", rc); + if (rc != NNG_ETIMEDOUT) + return; + continue; + } + mnode::Ping ping; + ping.ParseFromArray(nng_msg_body(reqMsg.get()), nng_msg_len(reqMsg.get())); + spdlog::info("Received ping request: {}", ping.ShortDebugString()); + + mnode::Pong pong; + pong.set_peer_id(id); + pong.set_sequence_number(ping.sequence_number()); + auto repMsg = nng::make_message(pong.SerializeAsString()); + if (int rc = nng_sendmsg(socket, repMsg.release(), 0)) + { + nng::mesy_nng_error("nng_sendmsg", rc); + if (rc != NNG_ETIMEDOUT) + return; + continue; + } + spdlog::info("Sent pong response: {}", pong.ShortDebugString()); + } +} int main() { - mesytec::mnode::SearchRequest request; - request.set_query("query"); - request.set_page_number(42); - request.set_results_per_page(10); + spdlog::set_level(spdlog::level::info); + auto reqSocket = nng::make_req_socket(); + auto repSocket = nng::make_rep_socket(); - mesytec::mnode::Ping pingRequest; + if (int rc = nng::marry_listen_dial(reqSocket, repSocket, "inproc://test")) + return rc; + + std::vector threads; + threads.emplace_back(std::thread(requester, reqSocket, 0)); + threads.emplace_back(std::thread(responder, repSocket, 1)); + + for (auto &t: threads) + if (t.joinable()) t.join(); + + return 0; }