port ping server to async work

This commit is contained in:
Florian Lüke 2024-12-07 20:36:37 +01:00
parent c729069c2b
commit fea9cd07b1

View file

@ -4,113 +4,43 @@
#include <mesytec-mvlc/util/signal_handling.h> #include <mesytec-mvlc/util/signal_handling.h>
#include "proto/service.pb.h" #include "proto/service.pb.h"
#include <mesytec-mnode/mnode_nng_async.h> #include <mesytec-mnode/mnode_nng_async.h>
#include "internal/argh.h"
using namespace mesytec; using namespace mesytec;
using namespace mesytec::mnode; using namespace mesytec::mnode;
using namespace std::literals; using namespace std::literals;
void server_cb(void *arg); class PingServer: public nng::AsyncRepWork
class Work
{ {
public: public:
explicit Work(nng_socket socket) explicit PingServer(nng_socket socket)
: socket(socket) : AsyncRepWork(socket)
{ {
recvErrors_.fill(0);
sendErrors_.fill(0);
} }
~Work() nng::unique_msg handle_request(nng::unique_msg &&msg) override
{
nng_ctx_close(ctx);
nng_aio_free(aio);
}
void work()
{
switch (state)
{
case INIT:
nng_aio_alloc(&aio, server_cb, this);
nng_ctx_open(&ctx, socket);
state = RECEIVE;
nng_ctx_recv(ctx, aio);
break;
case RECEIVE:
if (auto rv = nng_aio_result(aio))
{
if (rv != NNG_ETIMEDOUT)
nng::mnode_nng_error("nng_ctx_recv", rv);
if (rv < static_cast<int>(recvErrors_.size()))
++recvErrors_[rv];
state = RECEIVE;
nng_ctx_recv(ctx, aio);
break;
}
else
{
auto reply = handle_request(nng::make_unique_msg(nng_aio_get_msg(aio)));
nng_aio_set_msg(aio, reply.release());
state = SEND;
nng_ctx_send(ctx, aio);
}
break;
case SEND:
if (auto rv = nng_aio_result(aio))
{
nng::mnode_nng_error("nng_ctx_send", rv);
if (rv < static_cast<int>(sendErrors_.size()))
++sendErrors_[rv];
nng::make_unique_msg(nng_aio_get_msg(aio)).reset();
}
state = RECEIVE;
nng_ctx_recv(ctx, aio);
break;
}
}
nng::unique_msg handle_request(nng::unique_msg &&msg)
{ {
ping_.ParseFromArray(nng_msg_body(msg.get()), nng_msg_len(msg.get())); ping_.ParseFromArray(nng_msg_body(msg.get()), nng_msg_len(msg.get()));
pong_.set_peer_id(ping_.peer_id()); pong_.set_peer_id(ping_.peer_id());
pong_.set_sequence_number(ping_.sequence_number()); pong_.set_sequence_number(ping_.sequence_number());
sequence_number = ping_.sequence_number(); ++num_transactions;
return nng::make_message(ping_.SerializeAsString()); return nng::make_message(pong_.SerializeAsString());
} }
void report() void report()
{ {
spdlog::info("Work: state={}, sequence_number={}", static_cast<int>(state), sequence_number); spdlog::info("PingServer, peer_id={}, transactions={}", peer_id, num_transactions);
} }
private: private:
enum State Ping ping_;
{ Pong pong_;
INIT, // allocate aio, open ctx, receive first request static std::atomic<size_t> next_peer_id;
RECEIVE, // receive request size_t peer_id = next_peer_id++;
SEND, // send response size_t num_transactions = 0;
}; };
mnode::Ping ping_; std::atomic<size_t> PingServer::next_peer_id;
mnode::Pong pong_;
std::uint32_t sequence_number = 0;
State state = INIT;
nng_socket socket;
nng_aio *aio = nullptr;
nng_ctx ctx;
std::array<size_t, 64> recvErrors_;
std::array<size_t, 64> sendErrors_;
};
void server_cb(void *arg)
{
auto work = static_cast<Work *>(arg);
work->work();
}
int main() int main()
{ {
@ -125,16 +55,24 @@ int main()
} }
mvlc::util::Stopwatch sw; mvlc::util::Stopwatch sw;
Work work(socket); std::vector<std::unique_ptr<PingServer>> servers;
work.work();
for (int i = 0; i < 10; ++i)
servers.emplace_back(std::make_unique<PingServer>(socket));
for (auto &server: servers)
server->work();
for (;;) for (;;)
{ {
nng_msleep(100); nng_msleep(100);
if (sw.get_interval() >= 1s) if (sw.get_interval() >= 1s)
{ {
work.report(); for (auto &server: servers)
server->report();
sw.interval(); sw.interval();
} }
} }
return 0;
} }