add sync reqrep ping pong

This commit is contained in:
Florian Lüke 2024-11-21 22:23:52 +01:00
parent f017d2406e
commit 78cab52b91
4 changed files with 109 additions and 6 deletions

View file

@ -9,7 +9,11 @@ message SearchRequest {
}
message Ping {
int32 peer_id = 1;
uint32 sequence_number = 2;
}
message Pong {
int32 peer_id = 1;
uint32 sequence_number = 2;
}

View file

@ -61,7 +61,7 @@ target_link_libraries(mesy_nng_sub_consumer PRIVATE mesytec-mvlc PRIVATE nng)
target_compile_options(mesy_nng_sub_consumer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
add_executable(mnode-proto-test1 mnode_proto_test1.cc thread_name.cc)
target_link_libraries(mnode-proto-test1 PRIVATE mnode-proto)
target_link_libraries(mnode-proto-test1 PRIVATE mnode-proto PRIVATE mesytec-mvlc PRIVATE nng)
#unset(CMAKE_C_CLANG_TIDY)
#unset(CMAKE_CXX_CLANG_TIDY)

View file

@ -7,6 +7,8 @@
#include <nng/protocol/pipeline0/push.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>
#include <nng/protocol/reqrep0/req.h>
#include <nng/protocol/reqrep0/rep.h>
#include <spdlog/spdlog.h>
#include <optional>
@ -141,6 +143,16 @@ inline nng_socket make_sub_socket(nng_duration timeout = DefaultTimeout)
return make_socket(nng_sub0_open, timeout);
}
inline nng_socket make_req_socket(nng_duration timeout = DefaultTimeout)
{
return make_socket(nng_req0_open, timeout);
}
inline nng_socket make_rep_socket(nng_duration timeout = DefaultTimeout)
{
return make_socket(nng_rep0_open, timeout);
}
inline std::string socket_get_string_opt(nng_socket s, const char *opt)
{
char *dest = nullptr;
@ -422,6 +434,19 @@ inline int marry_listen_dial(nng_socket listen, nng_socket dial, const char *url
return 0;
}
inline unique_msg make_message(const std::string &data)
{
nng_msg *msg = nullptr;
if (int res = nng_msg_alloc(&msg, data.size()))
{
mesy_nng_error("nng_msg_alloc", res);
return unique_msg(nullptr, nng_msg_free);
}
std::memcpy(nng_msg_body(msg), data.data(), data.size());
return unique_msg(msg, nng_msg_free);
}
}
#endif /* B18E3651_CA9A_43BC_AA25_810EA16533CD */

View file

@ -1,11 +1,85 @@
#include "proto/service.pb.h"
#include "mesy_nng.h"
using namespace mesytec;
void requester(nng_socket socket, unsigned id)
{
std::uint32_t seq = 0;
while (true)
{
mnode::Ping ping;
ping.set_peer_id(id);
ping.set_sequence_number(seq++);
auto reqMsg = nng::make_message(ping.SerializeAsString());
if (int rc = nng_sendmsg(socket, reqMsg.release(), 0))
{
nng::mesy_nng_error("nng_sendmsg", rc);
if (rc != NNG_ETIMEDOUT)
return;
continue;
}
spdlog::info("Sent ping request: {}", ping.ShortDebugString());
auto [repMsg, rc] = nng::receive_message(socket);
if (rc)
{
nng::mesy_nng_error("nng_recvmsg", rc);
if (rc != NNG_ETIMEDOUT)
return;
continue;
}
mnode::Pong pong;
pong.ParseFromArray(nng_msg_body(repMsg.get()), nng_msg_len(repMsg.get()));
spdlog::info("Received pong response: {}", pong.ShortDebugString());
}
}
void responder(nng_socket socket, unsigned id)
{
while (true)
{
auto [reqMsg, rc] = nng::receive_message(socket);
if (rc)
{
nng::mesy_nng_error("nng_recvmsg", rc);
if (rc != NNG_ETIMEDOUT)
return;
continue;
}
mnode::Ping ping;
ping.ParseFromArray(nng_msg_body(reqMsg.get()), nng_msg_len(reqMsg.get()));
spdlog::info("Received ping request: {}", ping.ShortDebugString());
mnode::Pong pong;
pong.set_peer_id(id);
pong.set_sequence_number(ping.sequence_number());
auto repMsg = nng::make_message(pong.SerializeAsString());
if (int rc = nng_sendmsg(socket, repMsg.release(), 0))
{
nng::mesy_nng_error("nng_sendmsg", rc);
if (rc != NNG_ETIMEDOUT)
return;
continue;
}
spdlog::info("Sent pong response: {}", pong.ShortDebugString());
}
}
int main()
{
mesytec::mnode::SearchRequest request;
request.set_query("query");
request.set_page_number(42);
request.set_results_per_page(10);
spdlog::set_level(spdlog::level::info);
auto reqSocket = nng::make_req_socket();
auto repSocket = nng::make_rep_socket();
mesytec::mnode::Ping pingRequest;
if (int rc = nng::marry_listen_dial(reqSocket, repSocket, "inproc://test"))
return rc;
std::vector<std::thread> threads;
threads.emplace_back(std::thread(requester, reqSocket, 0));
threads.emplace_back(std::thread(responder, repSocket, 1));
for (auto &t: threads)
if (t.joinable()) t.join();
return 0;
}