From 9e95897283beef52ee402ae5deda86eac81557a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Thu, 21 Nov 2024 23:00:13 +0100 Subject: [PATCH] tweak ping pong --- src/mnode_proto_test1.cc | 70 +++++++++++++++++++++++++++++++++------- 1 file changed, 58 insertions(+), 12 deletions(-) diff --git a/src/mnode_proto_test1.cc b/src/mnode_proto_test1.cc index f0f5690..662e018 100644 --- a/src/mnode_proto_test1.cc +++ b/src/mnode_proto_test1.cc @@ -1,14 +1,25 @@ +#include #include "proto/service.pb.h" #include "mesy_nng.h" using namespace mesytec; +using namespace std::literals; void requester(nng_socket socket, unsigned id) { + mvlc::util::Stopwatch sw; std::uint32_t seq = 0; + mnode::Ping ping; + mnode::Pong pong; + while (true) { - mnode::Ping ping; + if (sw.get_elapsed() >= 1s) + { + spdlog::info("Requester {} sent {} requests", id, seq); + break; + } + ping.set_peer_id(id); ping.set_sequence_number(seq++); auto reqMsg = nng::make_message(ping.SerializeAsString()); @@ -19,7 +30,7 @@ void requester(nng_socket socket, unsigned id) return; continue; } - spdlog::info("Sent ping request: {}", ping.ShortDebugString()); + spdlog::debug("Sent ping request: {}", ping.ShortDebugString()); auto [repMsg, rc] = nng::receive_message(socket); if (rc) { @@ -28,16 +39,25 @@ void requester(nng_socket socket, unsigned id) return; continue; } - mnode::Pong pong; pong.ParseFromArray(nng_msg_body(repMsg.get()), nng_msg_len(repMsg.get())); - spdlog::info("Received pong response: {}", pong.ShortDebugString()); + spdlog::debug("Received pong response: {}", pong.ShortDebugString()); } } void responder(nng_socket socket, unsigned id) { + mvlc::util::Stopwatch sw; + mnode::Ping ping; + mnode::Pong pong; + while (true) { + if (sw.get_elapsed() >= 1s) + { + spdlog::info("Responder {} answered {} requests", id, ping.sequence_number()); + break; + } + auto [reqMsg, rc] = nng::receive_message(socket); if (rc) { @@ -46,11 +66,9 @@ void responder(nng_socket socket, unsigned id) return; continue; } - mnode::Ping ping; ping.ParseFromArray(nng_msg_body(reqMsg.get()), nng_msg_len(reqMsg.get())); - spdlog::info("Received ping request: {}", ping.ShortDebugString()); + spdlog::debug("Received ping request: {}", ping.ShortDebugString()); - mnode::Pong pong; pong.set_peer_id(id); pong.set_sequence_number(ping.sequence_number()); auto repMsg = nng::make_message(pong.SerializeAsString()); @@ -61,22 +79,50 @@ void responder(nng_socket socket, unsigned id) return; continue; } - spdlog::info("Sent pong response: {}", pong.ShortDebugString()); + spdlog::debug("Sent pong response: {}", pong.ShortDebugString()); } } +void client_aio(nng_socket socket, unsigned id) +{ +} + +void server_aio(nng_socket socket, unsigned id) +{ +} + int main() { spdlog::set_level(spdlog::level::info); + + unsigned peerId = 0; + std::vector threads; + + #if 1 + { auto reqSocket = nng::make_req_socket(); auto repSocket = nng::make_rep_socket(); - if (int rc = nng::marry_listen_dial(reqSocket, repSocket, "inproc://test")) + if (int rc = nng::marry_listen_dial(reqSocket, repSocket, fmt::format("inproc://test{}", peerId).c_str())) return rc; - std::vector threads; - threads.emplace_back(std::thread(requester, reqSocket, 0)); - threads.emplace_back(std::thread(responder, repSocket, 1)); + threads.emplace_back(std::thread(requester, reqSocket, peerId++)); + threads.emplace_back(std::thread(responder, repSocket, peerId++)); + } + #endif + + #if 1 + { + auto reqSocket = nng::make_req_socket(); + auto repSocket = nng::make_rep_socket(); + + if (int rc = nng::marry_listen_dial(reqSocket, repSocket, fmt::format("inproc://test{}", peerId).c_str())) + return rc; + + threads.emplace_back(std::thread(client_aio, reqSocket, peerId++)); + threads.emplace_back(std::thread(server_aio, repSocket, peerId++)); + } + #endif for (auto &t: threads) if (t.joinable()) t.join();