mesytec-mnode/src/mnode_proto_test1.cc

132 lines
3.5 KiB
C++
Raw Normal View History

2024-11-21 23:00:13 +01:00
#include <mesytec-mvlc/mesytec-mvlc.h>
#include "proto/service.pb.h"
2024-11-21 22:23:52 +01:00
#include "mesy_nng.h"
using namespace mesytec;
2024-11-21 23:00:13 +01:00
using namespace std::literals;
2024-11-21 22:23:52 +01:00
void requester(nng_socket socket, unsigned id)
{
2024-11-21 23:00:13 +01:00
mvlc::util::Stopwatch sw;
2024-11-21 22:23:52 +01:00
std::uint32_t seq = 0;
2024-11-21 23:00:13 +01:00
mnode::Ping ping;
mnode::Pong pong;
2024-11-21 22:23:52 +01:00
while (true)
{
2024-11-21 23:00:13 +01:00
if (sw.get_elapsed() >= 1s)
{
spdlog::info("Requester {} sent {} requests", id, seq);
break;
}
2024-11-21 22:23:52 +01:00
ping.set_peer_id(id);
ping.set_sequence_number(seq++);
auto reqMsg = nng::make_message(ping.SerializeAsString());
if (int rc = nng_sendmsg(socket, reqMsg.release(), 0))
{
2024-11-22 13:05:49 +01:00
nng::mesy_nng_error("req: nng_sendmsg", rc);
2024-11-21 22:23:52 +01:00
if (rc != NNG_ETIMEDOUT)
return;
continue;
}
2024-11-21 23:00:13 +01:00
spdlog::debug("Sent ping request: {}", ping.ShortDebugString());
2024-11-21 22:23:52 +01:00
auto [repMsg, rc] = nng::receive_message(socket);
if (rc)
{
2024-11-22 13:05:49 +01:00
nng::mesy_nng_error("requester: nng_recvmsg", rc);
2024-11-21 22:23:52 +01:00
if (rc != NNG_ETIMEDOUT)
return;
continue;
}
pong.ParseFromArray(nng_msg_body(repMsg.get()), nng_msg_len(repMsg.get()));
2024-11-21 23:00:13 +01:00
spdlog::debug("Received pong response: {}", pong.ShortDebugString());
2024-11-21 22:23:52 +01:00
}
}
void responder(nng_socket socket, unsigned id)
{
2024-11-21 23:00:13 +01:00
mvlc::util::Stopwatch sw;
mnode::Ping ping;
mnode::Pong pong;
2024-11-21 22:23:52 +01:00
while (true)
{
2024-11-21 23:00:13 +01:00
if (sw.get_elapsed() >= 1s)
{
spdlog::info("Responder {} answered {} requests", id, ping.sequence_number());
break;
}
2024-11-21 22:23:52 +01:00
auto [reqMsg, rc] = nng::receive_message(socket);
if (rc)
{
2024-11-22 13:05:49 +01:00
nng::mesy_nng_error("responder: nng_recvmsg", rc);
2024-11-21 22:23:52 +01:00
if (rc != NNG_ETIMEDOUT)
return;
continue;
}
ping.ParseFromArray(nng_msg_body(reqMsg.get()), nng_msg_len(reqMsg.get()));
2024-11-21 23:00:13 +01:00
spdlog::debug("Received ping request: {}", ping.ShortDebugString());
2024-11-21 22:23:52 +01:00
pong.set_peer_id(id);
pong.set_sequence_number(ping.sequence_number());
auto repMsg = nng::make_message(pong.SerializeAsString());
if (int rc = nng_sendmsg(socket, repMsg.release(), 0))
{
2024-11-22 13:05:49 +01:00
nng::mesy_nng_error("responder: nng_sendmsg", rc);
2024-11-21 22:23:52 +01:00
if (rc != NNG_ETIMEDOUT)
return;
continue;
}
2024-11-21 23:00:13 +01:00
spdlog::debug("Sent pong response: {}", pong.ShortDebugString());
2024-11-21 22:23:52 +01:00
}
}
2024-11-21 23:00:13 +01:00
void client_aio(nng_socket socket, unsigned id)
{
}
void server_aio(nng_socket socket, unsigned id)
{
}
int main()
{
2024-11-21 22:23:52 +01:00
spdlog::set_level(spdlog::level::info);
2024-11-21 23:00:13 +01:00
unsigned peerId = 0;
std::vector<std::thread> threads;
#if 1
{
2024-11-21 22:23:52 +01:00
auto reqSocket = nng::make_req_socket();
auto repSocket = nng::make_rep_socket();
2024-11-21 23:00:13 +01:00
if (int rc = nng::marry_listen_dial(reqSocket, repSocket, fmt::format("inproc://test{}", peerId).c_str()))
2024-11-21 22:23:52 +01:00
return rc;
2024-11-21 23:00:13 +01:00
threads.emplace_back(std::thread(requester, reqSocket, peerId++));
threads.emplace_back(std::thread(responder, repSocket, peerId++));
}
#endif
#if 1
{
auto reqSocket = nng::make_req_socket();
auto repSocket = nng::make_rep_socket();
if (int rc = nng::marry_listen_dial(reqSocket, repSocket, fmt::format("inproc://test{}", peerId).c_str()))
return rc;
threads.emplace_back(std::thread(client_aio, reqSocket, peerId++));
threads.emplace_back(std::thread(server_aio, repSocket, peerId++));
}
#endif
2024-11-21 22:23:52 +01:00
for (auto &t: threads)
if (t.joinable()) t.join();
2024-11-21 22:23:52 +01:00
return 0;
}