mesytec-mnode/src/mnode_proto_ping_client.cc

160 lines
3.8 KiB
C++
Raw Normal View History

2024-12-07 16:12:40 +01:00
#include <memory>
#include <mesytec-mvlc/mesytec-mvlc.h>
#include <mesytec-mvlc/util/signal_handling.h>
#include "proto/service.pb.h"
2024-12-07 16:48:24 +01:00
#include <mesytec-mnode/mnode_nng.h>
2024-12-07 16:12:40 +01:00
using namespace mesytec;
2024-12-07 16:48:24 +01:00
using namespace mesytec::mnode;
2024-12-07 16:12:40 +01:00
using namespace std::literals;
void client_cb(void *arg);
class Work
{
public:
explicit Work(nng_socket socket)
: socket(socket)
2024-12-07 16:48:24 +01:00
, request_(nng::make_unique_msg())
2024-12-07 16:12:40 +01:00
{
}
~Work()
{
nng_ctx_close(ctx);
nng_aio_free(aio);
}
void work()
{
switch (state)
{
case INIT:
nng_aio_alloc(&aio, client_cb, this);
nng_ctx_open(&ctx, socket);
state = SEND;
2024-12-07 16:48:24 +01:00
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
2024-12-07 16:12:40 +01:00
nng_ctx_send(ctx, aio);
break;
case SEND:
if (auto rv = nng_aio_result(aio))
{
nng::mesy_nng_error("nng_ctx_send", rv);
nng::make_unique_msg(nng_aio_get_msg(aio)).reset();
state = SEND;
2024-12-07 16:48:24 +01:00
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
2024-12-07 16:12:40 +01:00
nng_ctx_send(ctx, aio);
}
else
{
state = RECEIVE;
nng_ctx_recv(ctx, aio);
}
break;
case RECEIVE:
if (auto rv = nng_aio_result(aio))
{
nng::mesy_nng_error("nng_ctx_recv", rv);
state = SEND;
2024-12-07 16:48:24 +01:00
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
2024-12-07 16:12:40 +01:00
nng_ctx_send(ctx, aio);
}
else
{
auto reply = nng::make_unique_msg(nng_aio_get_msg(aio));
2024-12-07 16:48:24 +01:00
handle_reply(std::move(request_), std::move(reply));
2024-12-07 16:12:40 +01:00
state = SEND;
2024-12-07 16:48:24 +01:00
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
2024-12-07 16:12:40 +01:00
nng_ctx_send(ctx, aio);
}
break;
}
}
nng::unique_msg make_request()
{
2024-12-07 16:48:24 +01:00
Ping ping;
2024-12-07 16:12:40 +01:00
ping.set_peer_id(42);
ping.set_sequence_number(++sequence_number);
return nng::make_message(ping.SerializeAsString());
}
2024-12-07 16:48:24 +01:00
void handle_reply(nng::unique_msg &&request, nng::unique_msg &&reply)
2024-12-07 16:12:40 +01:00
{
2024-12-07 16:48:24 +01:00
Pong pong;
pong.ParseFromArray(nng_msg_body(reply.get()), nng_msg_len(reply.get()));
2024-12-07 16:12:40 +01:00
if (pong.peer_id() != 42 || pong.sequence_number() != sequence_number)
{
spdlog::error("received pong with unexpected values: {}", pong.ShortDebugString());
}
}
void report()
{
spdlog::info("Work: state={}, sequence_number={}", static_cast<int>(state), sequence_number);
}
private:
enum State
{
INIT, // allocate aio, open ctx, send first request
SEND, // send request
RECEIVE, // receive response
};
size_t sequence_number = 0;
State state = INIT;
nng_socket socket;
nng_aio *aio = nullptr;
nng_ctx ctx;
2024-12-07 16:48:24 +01:00
nng::unique_msg request_;
Ping ping;
Pong pong;
2024-12-07 16:12:40 +01:00
};
void client_cb(void *arg)
{
auto work = static_cast<Work *>(arg);
work->work();
}
int main()
{
spdlog::set_level(spdlog::level::info);
auto socket = nng::make_req_socket();
if (int res = nng_dial(socket, "tcp://localhost:5555", nullptr, 0))
{
nng::mesy_nng_error("nng_dial", res);
return res;
}
mvlc::util::Stopwatch sw;
Work work(socket);
work.work();
for (;;)
{
nng_msleep(100);
if (sw.get_interval() >= 1s)
{
work.report();
sw.interval();
}
}
return 0;
}