diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9da1ed2..fd714fe 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,7 +1,5 @@ set(MY_WARN_FLAGS -Wall -Wextra -Wpedantic) -find_package(Threads) - # Bit of a hack to set the variables here. If set earlier clang-tidy will for # some reason pickup log.c and warn about some va_list stuff. Might be because # log.c is in the source directory as the same does not happen with the imgui @@ -32,6 +30,10 @@ target_compile_features(pair_consumer PRIVATE cxx_std_17) target_link_libraries(pair_consumer PRIVATE mesytec-mvlc PRIVATE nng) target_compile_options(pair_consumer PRIVATE ${MY_WARN_FLAGS}) +add_executable(pair_inproc pair_inproc.cc) +target_compile_features(pair_inproc PRIVATE cxx_std_17) +target_link_libraries(pair_inproc PRIVATE mesytec-mvlc PRIVATE nng) +target_compile_options(pair_inproc PRIVATE ${MY_WARN_FLAGS}) #unset(CMAKE_C_CLANG_TIDY) #unset(CMAKE_CXX_CLANG_TIDY) diff --git a/src/common.h b/src/common.h index ea6a073..75f1c8c 100644 --- a/src/common.h +++ b/src/common.h @@ -52,6 +52,62 @@ inline nng_socket make_pair_socket() return socket; } +inline void producer(nng_socket socket) +{ + while (true) + { + spdlog::trace("producer waiting for start message"); + nng_msg *msg = {}; + + if (auto res = receive_message(socket, &msg)) + { + if (res != NNG_ETIMEDOUT) + mesy_nng_fatal("receive_message", res); + } + else + { + nng_msg_free(msg); + spdlog::info("producer received 'start' from consumer"); + break; + } + } + + size_t totalBytes = 0u; + auto tStart = std::chrono::steady_clock::now(); + + for (size_t nbuf=0u; nbuf(nng_msg_body(msg)) = nbuf; + + if (auto res = nng_sendmsg(socket, msg, 0)) + mesy_nng_fatal("nng_sendmsg", res); + + totalBytes += BufferSize; + + spdlog::trace("producer sent buffer {}/{}", nbuf+1, BuffersToSend); + } + + { + spdlog::trace("producer sending 'quit' message"); + auto msg = alloc_message(0); + if (auto res = nng_sendmsg(socket, msg, 0)) + mesy_nng_fatal("nng_sendmsg", res); + } + + auto tEnd = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(tEnd - tStart) / 1000.0; + auto MiBs = (totalBytes / (1024.0 * 1024.0)) / elapsed.count(); + auto bufs = 1.0 * BuffersToSend / elapsed.count(); + + spdlog::info("producer sent {} messages, {:.2f} MiB/s. {:.2f} buffers/s", BuffersToSend, MiBs, bufs); +} + inline void consumer(nng_socket socket) { { diff --git a/src/pair_inproc.cc b/src/pair_inproc.cc new file mode 100644 index 0000000..5d9cc1d --- /dev/null +++ b/src/pair_inproc.cc @@ -0,0 +1,33 @@ +#include "common.h" + +#include + +int main(int argc, char *argv[]) +{ + spdlog::set_level(spdlog::level::info); + auto producerSocket = make_pair_socket(); + + if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0)) + mesy_nng_fatal("nng_listen inproc", res); + + auto consumerSocket = make_pair_socket(); + + if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0)) + mesy_nng_fatal("nng_dial", res); + + std::vector threads; + + threads.emplace_back(std::thread(producer, producerSocket)); + threads.emplace_back(std::thread(consumer, consumerSocket)); + + for (auto &t: threads) + if (t.joinable()) t.join(); + + if (int res = nng_close(consumerSocket)) + mesy_nng_fatal("nng_close", res); + + if (int res = nng_close(producerSocket)) + mesy_nng_fatal("nng_close", res); + + return 0; +} diff --git a/src/pair_producer.cc b/src/pair_producer.cc index fbac4d4..32e4b76 100644 --- a/src/pair_producer.cc +++ b/src/pair_producer.cc @@ -1,93 +1,17 @@ #include "common.h" -#include - -void producer(nng_socket socket) -{ - while (true) - { - spdlog::trace("producer waiting for start message"); - nng_msg *msg = {}; - - if (auto res = receive_message(socket, &msg)) - { - if (res != NNG_ETIMEDOUT) - mesy_nng_fatal("receive_message", res); - } - else - { - nng_msg_free(msg); - spdlog::info("producer received 'start' from consumer"); - break; - } - } - - size_t totalBytes = 0u; - auto tStart = std::chrono::steady_clock::now(); - - for (size_t nbuf=0u; nbuf(nng_msg_body(msg)) = nbuf; - - if (auto res = nng_sendmsg(socket, msg, 0)) - mesy_nng_fatal("nng_sendmsg", res); - - totalBytes += BufferSize; - - spdlog::trace("producer sent buffer {}/{}", nbuf+1, BuffersToSend); - } - - { - spdlog::trace("producer sending 'quit' message"); - auto msg = alloc_message(0); - if (auto res = nng_sendmsg(socket, msg, 0)) - mesy_nng_fatal("nng_sendmsg", res); - } - - auto tEnd = std::chrono::steady_clock::now(); - auto elapsed = std::chrono::duration_cast(tEnd - tStart) / 1000.0; - auto MiBs = (totalBytes / (1024.0 * 1024.0)) / elapsed.count(); - auto bufs = 1.0 * BuffersToSend / elapsed.count(); - - spdlog::info("producer sent {} messages, {:.2f} MiB/s. {:.2f} buffers/s", BuffersToSend, MiBs, bufs); -} - int main(int argc, char *argv[]) { spdlog::set_level(spdlog::level::info); auto producerSocket = make_pair_socket(); - if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0)) - mesy_nng_fatal("nng_listen inproc", res); - if (int res = nng_listen(producerSocket, "tcp6://*:41234", nullptr, 0)) mesy_nng_fatal("nng_listen tcp", res); - std::vector threads; - - threads.emplace_back(std::thread(producer, producerSocket)); - //threads.emplace_back(std::thread(consumer, consumerSocket)); - - //auto consumerSocket = make_pair_socket(); - - //if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0)) - // mesy_nng_fatal("nng_dial", res); - - for (auto &t: threads) - if (t.joinable()) - t.join(); + producer(producerSocket); if (int res = nng_close(producerSocket)) mesy_nng_fatal("nng_close", res); - //if (int res = nng_close(consumerSocket)) - // mesy_nng_fatal("nng_close", res); - return 0; }