diff --git a/src/mnode_proto_ping_server.cc b/src/mnode_proto_ping_server.cc index 1cec3b2..64b71f8 100644 --- a/src/mnode_proto_ping_server.cc +++ b/src/mnode_proto_ping_server.cc @@ -4,113 +4,43 @@ #include #include "proto/service.pb.h" #include +#include "internal/argh.h" using namespace mesytec; using namespace mesytec::mnode; using namespace std::literals; -void server_cb(void *arg); - -class Work +class PingServer: public nng::AsyncRepWork { public: - explicit Work(nng_socket socket) - : socket(socket) + explicit PingServer(nng_socket socket) + : AsyncRepWork(socket) { - recvErrors_.fill(0); - sendErrors_.fill(0); } - ~Work() - { - nng_ctx_close(ctx); - nng_aio_free(aio); - } - - void work() - { - switch (state) - { - case INIT: - nng_aio_alloc(&aio, server_cb, this); - nng_ctx_open(&ctx, socket); - state = RECEIVE; - nng_ctx_recv(ctx, aio); - break; - - case RECEIVE: - if (auto rv = nng_aio_result(aio)) - { - if (rv != NNG_ETIMEDOUT) - nng::mnode_nng_error("nng_ctx_recv", rv); - if (rv < static_cast(recvErrors_.size())) - ++recvErrors_[rv]; - state = RECEIVE; - nng_ctx_recv(ctx, aio); - break; - } - else - { - auto reply = handle_request(nng::make_unique_msg(nng_aio_get_msg(aio))); - nng_aio_set_msg(aio, reply.release()); - state = SEND; - nng_ctx_send(ctx, aio); - } - break; - - case SEND: - if (auto rv = nng_aio_result(aio)) - { - nng::mnode_nng_error("nng_ctx_send", rv); - if (rv < static_cast(sendErrors_.size())) - ++sendErrors_[rv]; - nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); - } - state = RECEIVE; - nng_ctx_recv(ctx, aio); - break; - } - } - - nng::unique_msg handle_request(nng::unique_msg &&msg) + nng::unique_msg handle_request(nng::unique_msg &&msg) override { ping_.ParseFromArray(nng_msg_body(msg.get()), nng_msg_len(msg.get())); pong_.set_peer_id(ping_.peer_id()); pong_.set_sequence_number(ping_.sequence_number()); - sequence_number = ping_.sequence_number(); - return nng::make_message(ping_.SerializeAsString()); + ++num_transactions; + return nng::make_message(pong_.SerializeAsString()); } void report() { - spdlog::info("Work: state={}, sequence_number={}", static_cast(state), sequence_number); + spdlog::info("PingServer, peer_id={}, transactions={}", peer_id, num_transactions); } private: - enum State - { - INIT, // allocate aio, open ctx, receive first request - RECEIVE, // receive request - SEND, // send response - }; - - mnode::Ping ping_; - mnode::Pong pong_; - std::uint32_t sequence_number = 0; - State state = INIT; - nng_socket socket; - nng_aio *aio = nullptr; - nng_ctx ctx; - std::array recvErrors_; - std::array sendErrors_; - + Ping ping_; + Pong pong_; + static std::atomic next_peer_id; + size_t peer_id = next_peer_id++; + size_t num_transactions = 0; }; -void server_cb(void *arg) -{ - auto work = static_cast(arg); - work->work(); -} +std::atomic PingServer::next_peer_id; int main() { @@ -125,16 +55,24 @@ int main() } mvlc::util::Stopwatch sw; - Work work(socket); - work.work(); + std::vector> servers; + + for (int i = 0; i < 10; ++i) + servers.emplace_back(std::make_unique(socket)); + + for (auto &server: servers) + server->work(); for (;;) { nng_msleep(100); if (sw.get_interval() >= 1s) { - work.report(); + for (auto &server: servers) + server->report(); sw.interval(); } } + + return 0; }