From 74d3b947a3ec1b9b2eaa1d5f04fe7f047f239050 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Tue, 27 Jun 2023 23:17:56 +0200 Subject: [PATCH] working pair producer and consumer --- src/CMakeLists.txt | 13 +++- src/common.h | 112 ++++++++++++++++++++++++++++ src/main.cc | 171 ------------------------------------------- src/pair_consumer.cc | 17 +++++ src/pair_producer.cc | 93 +++++++++++++++++++++++ 5 files changed, 231 insertions(+), 175 deletions(-) create mode 100644 src/common.h delete mode 100644 src/main.cc create mode 100644 src/pair_consumer.cc create mode 100644 src/pair_producer.cc diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ec99c99..9da1ed2 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -22,10 +22,15 @@ find_package(Threads) #) #target_include_directories(dp_common INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) -add_executable(main main.cc) -target_compile_features(main PRIVATE cxx_std_17) -target_link_libraries(main PRIVATE mesytec-mvlc PRIVATE nng) -target_compile_options(main PRIVATE ${MY_WARN_FLAGS}) +add_executable(pair_producer pair_producer.cc) +target_compile_features(pair_producer PRIVATE cxx_std_17) +target_link_libraries(pair_producer PRIVATE mesytec-mvlc PRIVATE nng) +target_compile_options(pair_producer PRIVATE ${MY_WARN_FLAGS}) + +add_executable(pair_consumer pair_consumer.cc) +target_compile_features(pair_consumer PRIVATE cxx_std_17) +target_link_libraries(pair_consumer PRIVATE mesytec-mvlc PRIVATE nng) +target_compile_options(pair_consumer PRIVATE ${MY_WARN_FLAGS}) #unset(CMAKE_C_CLANG_TIDY) diff --git a/src/common.h b/src/common.h new file mode 100644 index 0000000..ea6a073 --- /dev/null +++ b/src/common.h @@ -0,0 +1,112 @@ +#ifndef __MESYTEC_MVLC_NNG_NODE_COMMON_H__ +#define __MESYTEC_MVLC_NNG_NODE_COMMON_H__ + +#include +#include +#include + +static const size_t BufferSize = 1024u * 1024u; +static const size_t BuffersToSend = 100000u; + +inline void mesy_nng_fatal(const char *const msg, int rv) +{ + spdlog::error("{} ({})", msg, nng_strerror(rv)); + abort(); +} + +inline nng_msg *alloc_message(size_t size) +{ + nng_msg *msg = {}; + if (int res = nng_msg_alloc(&msg, size)) + mesy_nng_fatal("nng_msg_alloc", res); + return msg; +} + +inline int receive_message(nng_socket sock, nng_msg **msg_ptr, int flags = 0) +{ + if (auto res = nng_recvmsg(sock, msg_ptr, flags)) + { + nng_msg_free(*msg_ptr); + *msg_ptr = NULL; + return res; + } + + return 0; +} + +inline nng_socket make_pair_socket() +{ + nng_socket socket; + + if (int res = nng_pair0_open(&socket)) + mesy_nng_fatal("nng_pair0_open", res); + + nng_duration timeout = 100; + + if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout))) + mesy_nng_fatal("nng_socket_set", res); + + if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout))) + mesy_nng_fatal("nng_socket_set", res); + + return socket; +} + +inline void consumer(nng_socket socket) +{ + { + spdlog::trace("consumer sending start message"); + auto msg = alloc_message(0); + + if (auto res = nng_sendmsg(socket, msg, 0)) + mesy_nng_fatal("nng_sendmsg", res); + } + + spdlog::info("consumer ready to receive"); + + size_t recvCount = 0u; + size_t totalBytes = 0u; + auto tStart = std::chrono::steady_clock::now(); + + while (true) + { + nng_msg *msg = {}; + + if (auto res = receive_message(socket, &msg)) + { + if (res != NNG_ETIMEDOUT) + mesy_nng_fatal("receive_message", res); + else + spdlog::warn("consumer timed out in recv"); + } + else + { + size_t msglen = nng_msg_len(msg); + + if (msglen) + { + spdlog::trace("consumer received message {}/{} of size {}", + recvCount + 1, BuffersToSend, nng_msg_len(msg)); + ++recvCount; + totalBytes += msglen; + } + + nng_msg_free(msg); + + if (!msglen) + { + spdlog::info("consumer received 'quit' message"); + break; + } + } + } + + auto tEnd = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(tEnd - tStart) / 1000.0; + auto MiBs = (totalBytes / (1024.0 * 1024.0)) / elapsed.count(); + auto bufs = 1.0 * recvCount / elapsed.count(); + + spdlog::info("consumer received {} messages, {:.2f} MiB/s, {:.2f} buffers/s", recvCount, MiBs, bufs); +} + +#endif /* __MESYTEC_MVLC_NNG_NODE_COMMON_H__ */ diff --git a/src/main.cc b/src/main.cc deleted file mode 100644 index 880e9d0..0000000 --- a/src/main.cc +++ /dev/null @@ -1,171 +0,0 @@ -#include -#include -#include - -void mesy_nng_fatal(const char *const msg, int rv) -{ - spdlog::error("{} ({})", msg, nng_strerror(rv)); - abort(); -} - -nng_msg *alloc_message(size_t size) -{ - nng_msg *msg = {}; - if (int res = nng_msg_alloc(&msg, size)) - mesy_nng_fatal("nng_msg_alloc", res); - return msg; -} - -int receive_message(nng_socket sock, nng_msg **msg_ptr, int flags = 0) -{ - if (auto res = nng_recvmsg(sock, msg_ptr, flags)) - { - nng_msg_free(*msg_ptr); - *msg_ptr = NULL; - return res; - } - - return 0; -} - -static const size_t BufferSize = 1024u * 1024u; -static const size_t BuffersToSend = 100000u; - -void producer(nng_socket socket) -{ - { - spdlog::trace("producer waiting for start message"); - nng_msg *msg = {}; - - if (auto res = receive_message(socket, &msg)) - mesy_nng_fatal("receive_message", res); - nng_msg_free(msg); - spdlog::trace("producer received 'start' from consumer"); - } - - for (size_t nbuf=0u; nbuf(nng_msg_body(msg)) = nbuf; - - if (auto res = nng_sendmsg(socket, msg, 0)) - mesy_nng_fatal("nng_sendmsg", res); - - spdlog::trace("producer sent buffer {}/{}", nbuf+1, BuffersToSend); - } - - { - spdlog::trace("producer sending 'quit' message"); - auto msg = alloc_message(0); - if (auto res = nng_sendmsg(socket, msg, 0)) - mesy_nng_fatal("nng_sendmsg", res); - } - - if (int res = nng_close(socket)) - mesy_nng_fatal("nng_close", res); - - spdlog::info("producer sent {} messages, {} MiB/s", BuffersToSend, 0); -} - -void consumer(nng_socket socket) -{ - { - spdlog::trace("consumer sending start message"); - auto msg = alloc_message(0); - - if (auto res = nng_sendmsg(socket, msg, 0)) - mesy_nng_fatal("nng_sendmsg", res); - } - - spdlog::trace("consumer ready to receive"); - - size_t nmsg = 0u; - - while (true) - { - nng_msg *msg = {}; - - if (auto res = receive_message(socket, &msg)) - { - if (res != NNG_ETIMEDOUT) - mesy_nng_fatal("receive_message", res); - else - spdlog::warn("consumer timed out in recv"); - } - else - { - size_t msglen = nng_msg_len(msg); - - if (msglen) - { - spdlog::trace("consumer received message {}/{} of size {}", - nmsg + 1, BuffersToSend, nng_msg_len(msg)); - ++nmsg; - } - - nng_msg_free(msg); - - if (!msglen) - { - spdlog::trace("consumer received 'quit' message"); - break; - } - } - } - - if (int res = nng_close(socket)) - mesy_nng_fatal("nng_close", res); - - spdlog::info("consumer received {} messages, {} MiB/s", nmsg, 0); -} - -nng_socket make_pair_socket() -{ - nng_socket socket; - - if (int res = nng_pair0_open(&socket)) - mesy_nng_fatal("nng_pair0_open", res); - - nng_duration timeout = 100; - - if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout))) - mesy_nng_fatal("nng_socket_set", res); - - if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout))) - mesy_nng_fatal("nng_socket_set", res); - - return socket; -} - - -int main(int argc, char *argv[]) -{ - auto producerSocket = make_pair_socket(); - - if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0)) - mesy_nng_fatal("nng_listen inproc", res); - - //if (int res = nng_listen(producerSocket, "tcp://*:41234", nullptr, 0)) - // mesy_nng_fatal("nng_listen tcp", res); - - auto consumerSocket = make_pair_socket(); - - if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0)) - mesy_nng_fatal("nng_dial", res); - - std::vector threads; - - threads.emplace_back(std::thread(producer, producerSocket)); - threads.emplace_back(std::thread(consumer, consumerSocket)); - - for (auto &t: threads) - if (t.joinable()) - t.join(); - - return 0; -} diff --git a/src/pair_consumer.cc b/src/pair_consumer.cc new file mode 100644 index 0000000..e10c46c --- /dev/null +++ b/src/pair_consumer.cc @@ -0,0 +1,17 @@ +#include "common.h" + +int main(int argc, char *argv[]) +{ + spdlog::set_level(spdlog::level::info); + auto consumerSocket = make_pair_socket(); + + if (int res = nng_dial(consumerSocket, "tcp6://[::]:41234", nullptr, 0)) + mesy_nng_fatal("nng_dial", res); + + consumer(consumerSocket); + + if (int res = nng_close(consumerSocket)) + mesy_nng_fatal("nng_close", res); + + return 0; +} diff --git a/src/pair_producer.cc b/src/pair_producer.cc new file mode 100644 index 0000000..fbac4d4 --- /dev/null +++ b/src/pair_producer.cc @@ -0,0 +1,93 @@ +#include "common.h" + +#include + +void producer(nng_socket socket) +{ + while (true) + { + spdlog::trace("producer waiting for start message"); + nng_msg *msg = {}; + + if (auto res = receive_message(socket, &msg)) + { + if (res != NNG_ETIMEDOUT) + mesy_nng_fatal("receive_message", res); + } + else + { + nng_msg_free(msg); + spdlog::info("producer received 'start' from consumer"); + break; + } + } + + size_t totalBytes = 0u; + auto tStart = std::chrono::steady_clock::now(); + + for (size_t nbuf=0u; nbuf(nng_msg_body(msg)) = nbuf; + + if (auto res = nng_sendmsg(socket, msg, 0)) + mesy_nng_fatal("nng_sendmsg", res); + + totalBytes += BufferSize; + + spdlog::trace("producer sent buffer {}/{}", nbuf+1, BuffersToSend); + } + + { + spdlog::trace("producer sending 'quit' message"); + auto msg = alloc_message(0); + if (auto res = nng_sendmsg(socket, msg, 0)) + mesy_nng_fatal("nng_sendmsg", res); + } + + auto tEnd = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(tEnd - tStart) / 1000.0; + auto MiBs = (totalBytes / (1024.0 * 1024.0)) / elapsed.count(); + auto bufs = 1.0 * BuffersToSend / elapsed.count(); + + spdlog::info("producer sent {} messages, {:.2f} MiB/s. {:.2f} buffers/s", BuffersToSend, MiBs, bufs); +} + +int main(int argc, char *argv[]) +{ + spdlog::set_level(spdlog::level::info); + auto producerSocket = make_pair_socket(); + + if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0)) + mesy_nng_fatal("nng_listen inproc", res); + + if (int res = nng_listen(producerSocket, "tcp6://*:41234", nullptr, 0)) + mesy_nng_fatal("nng_listen tcp", res); + + std::vector threads; + + threads.emplace_back(std::thread(producer, producerSocket)); + //threads.emplace_back(std::thread(consumer, consumerSocket)); + + //auto consumerSocket = make_pair_socket(); + + //if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0)) + // mesy_nng_fatal("nng_dial", res); + + for (auto &t: threads) + if (t.joinable()) + t.join(); + + if (int res = nng_close(producerSocket)) + mesy_nng_fatal("nng_close", res); + + //if (int res = nng_close(consumerSocket)) + // mesy_nng_fatal("nng_close", res); + + return 0; +}