port mnode_proto_ping_client to the new async work

This commit is contained in:
Florian Lüke 2024-12-07 18:34:06 +01:00
parent 25b018b1bb
commit 1d5a957043
2 changed files with 37 additions and 103 deletions

View file

@ -1,133 +1,61 @@
#include <atomic>
#include <memory> #include <memory>
#include <mesytec-mvlc/mesytec-mvlc.h> #include <mesytec-mvlc/mesytec-mvlc.h>
#include <mesytec-mvlc/util/signal_handling.h> #include <mesytec-mvlc/util/signal_handling.h>
#include "proto/service.pb.h" #include "proto/service.pb.h"
#include <mesytec-mnode/mnode_nng.h> #include <mesytec-mnode/mnode_nng_async.h>
using namespace mesytec; using namespace mesytec;
using namespace mesytec::mnode; using namespace mesytec::mnode;
using namespace std::literals; using namespace std::literals;
void client_cb(void *arg); class PingClient: public nng::AsyncReqWork
class Work
{ {
public: public:
explicit Work(nng_socket socket) explicit PingClient(nng_socket socket)
: socket(socket) : AsyncReqWork(socket)
, request_(nng::make_unique_msg())
{ {
} }
~Work() nng::unique_msg make_request() override
{ {
nng_ctx_close(ctx); ping_.set_peer_id(peer_id);
nng_aio_free(aio); ping_.set_sequence_number(sequence_number++);
return nng::make_message(ping_.SerializeAsString());
} }
void work() void handle_reply(nng::unique_msg &&request, nng::unique_msg &&reply) override
{ {
switch (state) ping_.ParseFromArray(nng_msg_body(request.get()), nng_msg_len(request.get()));
{ pong_.ParseFromArray(nng_msg_body(reply.get()), nng_msg_len(reply.get()));
case INIT:
nng_aio_alloc(&aio, client_cb, this);
nng_ctx_open(&ctx, socket);
state = SEND; if (ping_.sequence_number() != pong_.sequence_number())
request_ = make_request(); spdlog::error("Work: sequence_number mismatch: ping={}, pong={}",
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release()); ping_.sequence_number(), pong_.sequence_number());
nng_ctx_send(ctx, aio);
break;
case SEND: if (ping_.peer_id() != pong_.peer_id())
if (auto rv = nng_aio_result(aio)) spdlog::error("Work: peer_id mismatch: ping={}, pong={}", ping_.peer_id(),
{ pong_.peer_id());
nng::mnode_nng_error("nng_ctx_send", rv);
nng::make_unique_msg(nng_aio_get_msg(aio)).reset();
state = SEND;
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
nng_ctx_send(ctx, aio);
}
else
{
state = RECEIVE;
nng_ctx_recv(ctx, aio);
}
break;
case RECEIVE: ++num_transactions;
if (auto rv = nng_aio_result(aio))
{
nng::mnode_nng_error("nng_ctx_recv", rv);
state = SEND;
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
nng_ctx_send(ctx, aio);
}
else
{
auto reply = nng::make_unique_msg(nng_aio_get_msg(aio));
handle_reply(std::move(request_), std::move(reply));
state = SEND;
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
nng_ctx_send(ctx, aio);
}
break;
}
}
nng::unique_msg make_request()
{
Ping ping;
ping.set_peer_id(42);
ping.set_sequence_number(++sequence_number);
return nng::make_message(ping.SerializeAsString());
}
void handle_reply(nng::unique_msg &&request, nng::unique_msg &&reply)
{
Pong pong;
pong.ParseFromArray(nng_msg_body(reply.get()), nng_msg_len(reply.get()));
if (pong.peer_id() != 42 || pong.sequence_number() != sequence_number)
{
spdlog::error("received pong with unexpected values: {}", pong.ShortDebugString());
}
} }
void report() void report()
{ {
spdlog::info("Work: state={}, sequence_number={}", static_cast<int>(state), sequence_number); spdlog::info("PingClient, peer_id={}, transactions={}", peer_id, num_transactions);
} }
private: private:
enum State Ping ping_;
{ Pong pong_;
INIT, // allocate aio, open ctx, send first request size_t sequence_number = 1;
SEND, // send request static std::atomic<size_t> next_peer_id;
RECEIVE, // receive response size_t peer_id = next_peer_id++;
}; size_t num_transactions = 0;
size_t sequence_number = 0;
State state = INIT;
nng_socket socket;
nng_aio *aio = nullptr;
nng_ctx ctx;
nng::unique_msg request_;
Ping ping;
Pong pong;
}; };
void client_cb(void *arg) std::atomic<size_t> PingClient::next_peer_id;
{
auto work = static_cast<Work *>(arg);
work->work();
}
int main() int main()
{ {
@ -142,15 +70,21 @@ int main()
} }
mvlc::util::Stopwatch sw; mvlc::util::Stopwatch sw;
Work work(socket); std::vector<std::unique_ptr<PingClient>> clients;
work.work();
for (int i = 0; i < 10; ++i)
clients.emplace_back(std::make_unique<PingClient>(socket));
for (auto &client: clients)
client->work();
for (;;) for (;;)
{ {
nng_msleep(100); nng_msleep(100);
if (sw.get_interval() >= 1s) if (sw.get_interval() >= 1s)
{ {
work.report(); for (auto &client: clients)
client->report();
sw.interval(); sw.interval();
} }
} }

View file

@ -3,7 +3,7 @@
#include <mesytec-mvlc/mesytec-mvlc.h> #include <mesytec-mvlc/mesytec-mvlc.h>
#include <mesytec-mvlc/util/signal_handling.h> #include <mesytec-mvlc/util/signal_handling.h>
#include "proto/service.pb.h" #include "proto/service.pb.h"
#include <mesytec-mnode/mnode_nng.h> #include <mesytec-mnode/mnode_nng_async.h>
using namespace mesytec; using namespace mesytec;
using namespace mesytec::mnode; using namespace mesytec::mnode;