#include #include "proto/service.pb.h" #include "mesy_nng.h" using namespace mesytec; using namespace std::literals; void requester(nng_socket socket, unsigned id) { mvlc::util::Stopwatch sw; std::uint32_t seq = 0; mnode::Ping ping; mnode::Pong pong; while (true) { if (sw.get_elapsed() >= 1s) { spdlog::info("Requester {} sent {} requests", id, seq); break; } ping.set_peer_id(id); ping.set_sequence_number(seq++); auto reqMsg = nng::make_message(ping.SerializeAsString()); if (int rc = nng_sendmsg(socket, reqMsg.release(), 0)) { nng::mesy_nng_error("nng_sendmsg", rc); if (rc != NNG_ETIMEDOUT) return; continue; } spdlog::debug("Sent ping request: {}", ping.ShortDebugString()); auto [repMsg, rc] = nng::receive_message(socket); if (rc) { nng::mesy_nng_error("nng_recvmsg", rc); if (rc != NNG_ETIMEDOUT) return; continue; } pong.ParseFromArray(nng_msg_body(repMsg.get()), nng_msg_len(repMsg.get())); spdlog::debug("Received pong response: {}", pong.ShortDebugString()); } } void responder(nng_socket socket, unsigned id) { mvlc::util::Stopwatch sw; mnode::Ping ping; mnode::Pong pong; while (true) { if (sw.get_elapsed() >= 1s) { spdlog::info("Responder {} answered {} requests", id, ping.sequence_number()); break; } auto [reqMsg, rc] = nng::receive_message(socket); if (rc) { nng::mesy_nng_error("nng_recvmsg", rc); if (rc != NNG_ETIMEDOUT) return; continue; } ping.ParseFromArray(nng_msg_body(reqMsg.get()), nng_msg_len(reqMsg.get())); spdlog::debug("Received ping request: {}", ping.ShortDebugString()); pong.set_peer_id(id); pong.set_sequence_number(ping.sequence_number()); auto repMsg = nng::make_message(pong.SerializeAsString()); if (int rc = nng_sendmsg(socket, repMsg.release(), 0)) { nng::mesy_nng_error("nng_sendmsg", rc); if (rc != NNG_ETIMEDOUT) return; continue; } spdlog::debug("Sent pong response: {}", pong.ShortDebugString()); } } void client_aio(nng_socket socket, unsigned id) { } void server_aio(nng_socket socket, unsigned id) { } int main() { spdlog::set_level(spdlog::level::info); unsigned peerId = 0; std::vector threads; #if 1 { auto reqSocket = nng::make_req_socket(); auto repSocket = nng::make_rep_socket(); if (int rc = nng::marry_listen_dial(reqSocket, repSocket, fmt::format("inproc://test{}", peerId).c_str())) return rc; threads.emplace_back(std::thread(requester, reqSocket, peerId++)); threads.emplace_back(std::thread(responder, repSocket, peerId++)); } #endif #if 1 { auto reqSocket = nng::make_req_socket(); auto repSocket = nng::make_rep_socket(); if (int rc = nng::marry_listen_dial(reqSocket, repSocket, fmt::format("inproc://test{}", peerId).c_str())) return rc; threads.emplace_back(std::thread(client_aio, reqSocket, peerId++)); threads.emplace_back(std::thread(server_aio, repSocket, peerId++)); } #endif for (auto &t: threads) if (t.joinable()) t.join(); return 0; }