diff --git a/external/mesytec-mvlc b/external/mesytec-mvlc index 22b099f..10bdc1b 160000 --- a/external/mesytec-mvlc +++ b/external/mesytec-mvlc @@ -1 +1 @@ -Subproject commit 22b099f2f4785122086620d25c43060e78ef7e7a +Subproject commit 10bdc1b16dc3c9f6bbb678009fa548dab4a104d5 diff --git a/include/mesytec-node/mesytec_node_nng.h b/include/mesytec-node/mesytec_node_nng.h index 106676d..02e40f5 100644 --- a/include/mesytec-node/mesytec_node_nng.h +++ b/include/mesytec-node/mesytec_node_nng.h @@ -2,6 +2,14 @@ #define B18E3651_CA9A_43BC_AA25_810EA16533CD #include +#include +#include +#include +#include +#include +#include +#include + #include #include @@ -70,7 +78,7 @@ inline size_t allocated_free_space(nng_msg *msg) return capacity - used; } -static nng_duration DefaultTimeout = 16; +static nng_duration DefaultTimeout = 250; inline int set_socket_timeouts(nng_socket socket, nng_duration timeout = DefaultTimeout) { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6cb72ee..35eda19 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -34,6 +34,10 @@ function(add_node_dev_executable name) target_compile_options(${name} PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) endfunction() +function(add_node_proto_dev_executable name) + add_node_dev_executable(${name}) + target_link_libraries(${name} PRIVATE mnode-proto mesytec-mvlc) +endfunction() add_node_dev_executable(pair_producer) add_node_dev_executable(pair_consumer) @@ -43,9 +47,10 @@ add_node_dev_executable(mesy_nng_pipeline_main) add_node_dev_executable(mesy_nng_push_pull_main) add_node_dev_executable(mesy_nng_pub_producer) add_node_dev_executable(mesy_nng_sub_consumer) -add_node_dev_executable(mnode_proto_test1) -target_sources(mnode_proto_test1 PRIVATE thread_name.cc) -target_link_libraries(mnode_proto_test1 PRIVATE mnode-proto mesytec-mvlc) + +add_node_proto_dev_executable(mnode_proto_test1) +add_node_proto_dev_executable(mnode_proto_ping_client) +add_node_proto_dev_executable(mnode_proto_ping_server) #add_subdirectory(qt) diff --git a/src/mesytec_node_nng.cc b/src/mesytec_node_nng.cc index a20179a..8bedd41 100644 --- a/src/mesytec_node_nng.cc +++ b/src/mesytec_node_nng.cc @@ -1,13 +1,5 @@ #include "mesytec-node/mesytec_node_nng.h" -#include -#include -#include -#include -#include -#include -#include - namespace mesytec::nng { diff --git a/src/mnode_proto_ping_client.cc b/src/mnode_proto_ping_client.cc new file mode 100644 index 0000000..16c52f3 --- /dev/null +++ b/src/mnode_proto_ping_client.cc @@ -0,0 +1,151 @@ +#include + +#include +#include +#include "proto/service.pb.h" +#include + +using namespace mesytec; +using namespace std::literals; + +void client_cb(void *arg); + +class Work +{ + public: + explicit Work(nng_socket socket) + : socket(socket) + { + } + + ~Work() + { + nng_ctx_close(ctx); + nng_aio_free(aio); + } + + void work() + { + switch (state) + { + case INIT: + nng_aio_alloc(&aio, client_cb, this); + nng_ctx_open(&ctx, socket); + + state = SEND; + nng_aio_set_msg(aio, make_request().release()); + nng_ctx_send(ctx, aio); + break; + + case SEND: + if (auto rv = nng_aio_result(aio)) + { + nng::mesy_nng_error("nng_ctx_send", rv); + nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); + state = SEND; + nng_aio_set_msg(aio, make_request().release()); + nng_ctx_send(ctx, aio); + } + else + { + state = RECEIVE; + nng_ctx_recv(ctx, aio); + } + break; + + case RECEIVE: + if (auto rv = nng_aio_result(aio)) + { + nng::mesy_nng_error("nng_ctx_recv", rv); + state = SEND; + nng_aio_set_msg(aio, make_request().release()); + nng_ctx_send(ctx, aio); + } + else + { + auto reply = nng::make_unique_msg(nng_aio_get_msg(aio)); + handle_reply(std::move(reply)); + state = SEND; + nng_aio_set_msg(aio, make_request().release()); + nng_ctx_send(ctx, aio); + } + + break; + } + + } + + nng::unique_msg make_request() + { + mnode::Ping ping; + ping.set_peer_id(42); + ping.set_sequence_number(++sequence_number); + return nng::make_message(ping.SerializeAsString()); + } + + void handle_reply(nng::unique_msg &&msg) + { + mnode::Pong pong; + pong.ParseFromArray(nng_msg_body(msg.get()), nng_msg_len(msg.get())); + if (pong.peer_id() != 42 || pong.sequence_number() != sequence_number) + { + spdlog::error("received pong with unexpected values: {}", pong.ShortDebugString()); + } + } + + void report() + { + spdlog::info("Work: state={}, sequence_number={}", static_cast(state), sequence_number); + } + + private: + enum State + { + INIT, // allocate aio, open ctx, send first request + SEND, // send request + RECEIVE, // receive response + }; + + mnode::Ping ping; + mnode::Pong pong; + size_t sequence_number = 0; + State state = INIT; + nng_socket socket; + nng_aio *aio = nullptr; + nng_ctx ctx; +}; + +void client_cb(void *arg) +{ + auto work = static_cast(arg); + work->work(); +} + +int main() +{ + spdlog::set_level(spdlog::level::info); + + auto socket = nng::make_req_socket(); + + if (int res = nng_dial(socket, "tcp://localhost:5555", nullptr, 0)) + { + nng::mesy_nng_error("nng_dial", res); + return res; + } + + mvlc::util::Stopwatch sw; + Work work(socket); + work.work(); + + for (;;) + { + nng_msleep(100); + if (sw.get_interval() >= 1s) + { + work.report(); + sw.interval(); + } + } + + return 0; +} diff --git a/src/mnode_proto_ping_server.cc b/src/mnode_proto_ping_server.cc new file mode 100644 index 0000000..fd75ff6 --- /dev/null +++ b/src/mnode_proto_ping_server.cc @@ -0,0 +1,139 @@ +#include + +#include +#include +#include "proto/service.pb.h" +#include + +using namespace mesytec; +using namespace std::literals; + +void server_cb(void *arg); + +class Work +{ + public: + explicit Work(nng_socket socket) + : socket(socket) + { + recvErrors_.fill(0); + sendErrors_.fill(0); + } + + ~Work() + { + nng_ctx_close(ctx); + nng_aio_free(aio); + } + + void work() + { + switch (state) + { + case INIT: + nng_aio_alloc(&aio, server_cb, this); + nng_ctx_open(&ctx, socket); + state = RECEIVE; + nng_ctx_recv(ctx, aio); + break; + + case RECEIVE: + if (auto rv = nng_aio_result(aio)) + { + if (rv != NNG_ETIMEDOUT) + nng::mesy_nng_error("nng_ctx_recv", rv); + if (rv < static_cast(recvErrors_.size())) + ++recvErrors_[rv]; + state = RECEIVE; + nng_ctx_recv(ctx, aio); + break; + } + else + { + auto reply = handle_request(nng::make_unique_msg(nng_aio_get_msg(aio))); + nng_aio_set_msg(aio, reply.release()); + state = SEND; + nng_ctx_send(ctx, aio); + } + break; + + case SEND: + if (auto rv = nng_aio_result(aio)) + { + nng::mesy_nng_error("nng_ctx_send", rv); + if (rv < static_cast(sendErrors_.size())) + ++sendErrors_[rv]; + nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); + } + state = RECEIVE; + nng_ctx_recv(ctx, aio); + break; + } + } + + nng::unique_msg handle_request(nng::unique_msg &&msg) + { + ping_.ParseFromArray(nng_msg_body(msg.get()), nng_msg_len(msg.get())); + pong_.set_peer_id(ping_.peer_id()); + pong_.set_sequence_number(ping_.sequence_number()); + sequence_number = ping_.sequence_number(); + return nng::make_message(ping_.SerializeAsString()); + } + + void report() + { + spdlog::info("Work: state={}, sequence_number={}", static_cast(state), sequence_number); + } + + private: + enum State + { + INIT, // allocate aio, open ctx, receive first request + RECEIVE, // receive request + SEND, // send response + }; + + mnode::Ping ping_; + mnode::Pong pong_; + std::uint32_t sequence_number = 0; + State state = INIT; + nng_socket socket; + nng_aio *aio = nullptr; + nng_ctx ctx; + std::array recvErrors_; + std::array sendErrors_; + +}; + +void server_cb(void *arg) +{ + auto work = static_cast(arg); + work->work(); +} + +int main() +{ + spdlog::set_level(spdlog::level::debug); + + auto socket = nng::make_rep_socket(); + + if (int res = nng_listen(socket, "tcp://localhost:5555", nullptr, 0)) + { + nng::mesy_nng_error("nng_listen", res); + return res; + } + + mvlc::util::Stopwatch sw; + Work work(socket); + work.work(); + + for (;;) + { + nng_msleep(100); + if (sw.get_interval() >= 1s) + { + work.report(); + sw.interval(); + } + } +} diff --git a/src/mnode_proto_test1.cc b/src/mnode_proto_test1.cc index cab5b50..c829352 100644 --- a/src/mnode_proto_test1.cc +++ b/src/mnode_proto_test1.cc @@ -83,14 +83,6 @@ void responder(nng_socket socket, unsigned id) } } -void client_aio(nng_socket socket, unsigned id) -{ -} - -void server_aio(nng_socket socket, unsigned id) -{ -} - int main() { spdlog::set_level(spdlog::level::info); @@ -98,8 +90,6 @@ int main() unsigned peerId = 0; std::vector threads; - #if 1 - { auto reqSocket = nng::make_req_socket(); auto repSocket = nng::make_rep_socket(); @@ -108,21 +98,6 @@ int main() threads.emplace_back(std::thread(requester, reqSocket, peerId++)); threads.emplace_back(std::thread(responder, repSocket, peerId++)); - } - #endif - - #if 1 - { - auto reqSocket = nng::make_req_socket(); - auto repSocket = nng::make_rep_socket(); - - if (int rc = nng::marry_listen_dial(reqSocket, repSocket, fmt::format("inproc://test{}", peerId).c_str())) - return rc; - - threads.emplace_back(std::thread(client_aio, reqSocket, peerId++)); - threads.emplace_back(std::thread(server_aio, repSocket, peerId++)); - } - #endif for (auto &t: threads) if (t.joinable()) t.join();