From 1d5a957043ce4d4873f3159f0efc57cd1b61cb5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Sat, 7 Dec 2024 18:34:06 +0100 Subject: [PATCH] port mnode_proto_ping_client to the new async work --- src/mnode_proto_ping_client.cc | 138 +++++++++------------------------ src/mnode_proto_ping_server.cc | 2 +- 2 files changed, 37 insertions(+), 103 deletions(-) diff --git a/src/mnode_proto_ping_client.cc b/src/mnode_proto_ping_client.cc index 618e1ab..097828e 100644 --- a/src/mnode_proto_ping_client.cc +++ b/src/mnode_proto_ping_client.cc @@ -1,133 +1,61 @@ +#include #include #include #include #include "proto/service.pb.h" -#include +#include using namespace mesytec; using namespace mesytec::mnode; using namespace std::literals; -void client_cb(void *arg); - -class Work +class PingClient: public nng::AsyncReqWork { public: - explicit Work(nng_socket socket) - : socket(socket) - , request_(nng::make_unique_msg()) + explicit PingClient(nng_socket socket) + : AsyncReqWork(socket) { } - ~Work() + nng::unique_msg make_request() override { - nng_ctx_close(ctx); - nng_aio_free(aio); + ping_.set_peer_id(peer_id); + ping_.set_sequence_number(sequence_number++); + return nng::make_message(ping_.SerializeAsString()); } - void work() + void handle_reply(nng::unique_msg &&request, nng::unique_msg &&reply) override { - switch (state) - { - case INIT: - nng_aio_alloc(&aio, client_cb, this); - nng_ctx_open(&ctx, socket); + ping_.ParseFromArray(nng_msg_body(request.get()), nng_msg_len(request.get())); + pong_.ParseFromArray(nng_msg_body(reply.get()), nng_msg_len(reply.get())); - state = SEND; - request_ = make_request(); - nng_aio_set_msg(aio, nng::clone_message(request_.get()).release()); - nng_ctx_send(ctx, aio); - break; + if (ping_.sequence_number() != pong_.sequence_number()) + spdlog::error("Work: sequence_number mismatch: ping={}, pong={}", + ping_.sequence_number(), pong_.sequence_number()); - case SEND: - if (auto rv = nng_aio_result(aio)) - { - nng::mnode_nng_error("nng_ctx_send", rv); - nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); - state = SEND; - request_ = make_request(); - nng_aio_set_msg(aio, nng::clone_message(request_.get()).release()); - nng_ctx_send(ctx, aio); - } - else - { - state = RECEIVE; - nng_ctx_recv(ctx, aio); - } - break; + if (ping_.peer_id() != pong_.peer_id()) + spdlog::error("Work: peer_id mismatch: ping={}, pong={}", ping_.peer_id(), + pong_.peer_id()); - case RECEIVE: - if (auto rv = nng_aio_result(aio)) - { - nng::mnode_nng_error("nng_ctx_recv", rv); - state = SEND; - request_ = make_request(); - nng_aio_set_msg(aio, nng::clone_message(request_.get()).release()); - nng_ctx_send(ctx, aio); - } - else - { - auto reply = nng::make_unique_msg(nng_aio_get_msg(aio)); - handle_reply(std::move(request_), std::move(reply)); - state = SEND; - request_ = make_request(); - nng_aio_set_msg(aio, nng::clone_message(request_.get()).release()); - nng_ctx_send(ctx, aio); - } - - break; - } - - } - - nng::unique_msg make_request() - { - Ping ping; - ping.set_peer_id(42); - ping.set_sequence_number(++sequence_number); - return nng::make_message(ping.SerializeAsString()); - } - - void handle_reply(nng::unique_msg &&request, nng::unique_msg &&reply) - { - Pong pong; - pong.ParseFromArray(nng_msg_body(reply.get()), nng_msg_len(reply.get())); - if (pong.peer_id() != 42 || pong.sequence_number() != sequence_number) - { - spdlog::error("received pong with unexpected values: {}", pong.ShortDebugString()); - } + ++num_transactions; } void report() { - spdlog::info("Work: state={}, sequence_number={}", static_cast(state), sequence_number); + spdlog::info("PingClient, peer_id={}, transactions={}", peer_id, num_transactions); } private: - enum State - { - INIT, // allocate aio, open ctx, send first request - SEND, // send request - RECEIVE, // receive response - }; - - size_t sequence_number = 0; - State state = INIT; - nng_socket socket; - nng_aio *aio = nullptr; - nng_ctx ctx; - - nng::unique_msg request_; - Ping ping; - Pong pong; + Ping ping_; + Pong pong_; + size_t sequence_number = 1; + static std::atomic next_peer_id; + size_t peer_id = next_peer_id++; + size_t num_transactions = 0; }; -void client_cb(void *arg) -{ - auto work = static_cast(arg); - work->work(); -} +std::atomic PingClient::next_peer_id; int main() { @@ -142,15 +70,21 @@ int main() } mvlc::util::Stopwatch sw; - Work work(socket); - work.work(); + std::vector> clients; + + for (int i = 0; i < 10; ++i) + clients.emplace_back(std::make_unique(socket)); + + for (auto &client: clients) + client->work(); for (;;) { nng_msleep(100); if (sw.get_interval() >= 1s) { - work.report(); + for (auto &client: clients) + client->report(); sw.interval(); } } diff --git a/src/mnode_proto_ping_server.cc b/src/mnode_proto_ping_server.cc index 49e8c67..1cec3b2 100644 --- a/src/mnode_proto_ping_server.cc +++ b/src/mnode_proto_ping_server.cc @@ -3,7 +3,7 @@ #include #include #include "proto/service.pb.h" -#include +#include using namespace mesytec; using namespace mesytec::mnode;