diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ae0077e..21d3bd2 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -45,5 +45,10 @@ target_compile_features(mesy_nng_pipeline_main PRIVATE cxx_std_17) target_link_libraries(mesy_nng_pipeline_main PRIVATE mesytec-mvlc PRIVATE nng) target_compile_options(mesy_nng_pipeline_main PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) +add_executable(mesy_nng_push_pull_main mesy_nng_push_pull_main.cc) +target_compile_features(mesy_nng_push_pull_main PRIVATE cxx_std_17) +target_link_libraries(mesy_nng_push_pull_main PRIVATE mesytec-mvlc PRIVATE nng) +target_compile_options(mesy_nng_push_pull_main PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) + #unset(CMAKE_C_CLANG_TIDY) #unset(CMAKE_CXX_CLANG_TIDY) diff --git a/src/mesy_nng_push_pull_main.cc b/src/mesy_nng_push_pull_main.cc new file mode 100644 index 0000000..925a1f8 --- /dev/null +++ b/src/mesy_nng_push_pull_main.cc @@ -0,0 +1,183 @@ +#include "common.h" +#include +#include +#include + +struct Header +{ + unsigned id; + unsigned bufferNumber; +}; + +static void push_producer(nng_socket socket, unsigned id) +{ + Header header = {}; + header.id = id; + header.bufferNumber = 1; + + size_t totalBytes = 0u; + auto tStart = std::chrono::steady_clock::now(); + + while (true) + { + nng_msg *msg = nullptr; + + if (int res = nng_msg_alloc(&msg, BufferSize)) + mesy_nng_fatal("nng_msg_alloc", res); + + std::memset(nng_msg_body(msg), 0, BufferSize); + *reinterpret_cast
(nng_msg_body(msg)) = header; + + if (auto res = nng_sendmsg(socket, msg, 0)) + mesy_nng_fatal("nng_sendmsg", res); + + totalBytes += BufferSize; + ++header.bufferNumber; + } +} + +static const size_t MaxProducers = 100; + +ssize_t calc_packet_loss(size_t lastPacketNumber, size_t packetNumber) +{ + static const auto PacketNumberMax = std::numeric_limits::max(); + + ssize_t diff = packetNumber - lastPacketNumber; + + if (diff < 1) + { + diff = PacketNumberMax + diff; + return diff; + } + + return diff - 1; +} + +static void pull_consumer(nng_socket socket) +{ + size_t recvCount = 0u; + size_t totalBytes = 0u; + std::array producerBuffers; + std::array producerBytes; + std::array producerLastBufferNumber; + std::array producerBufferLoss; + + producerBuffers.fill(0); + producerBytes.fill(0); + producerLastBufferNumber.fill(0); + producerBufferLoss.fill(0); + + auto tStart = std::chrono::steady_clock::now(); + auto tReport = tStart; + + while (true) + { + nng_msg *msg = {}; + + if (auto res = receive_message(socket, &msg)) + { + if (res != NNG_ETIMEDOUT) + mesy_nng_fatal("receive_message", res); + else + spdlog::warn("consumer timed out in recv"); + } + else + { + size_t msglen = nng_msg_len(msg); + + if (msglen) + { + spdlog::trace("consumer received message {}/{} of size {}", + recvCount + 1, BuffersToSend, nng_msg_len(msg)); + ++recvCount; + totalBytes += msglen; + + if (msglen >= sizeof(Header)) + { + auto header = reinterpret_cast(nng_msg_body(msg)); + if (header->id < MaxProducers) + { + ++producerBuffers[header->id]; + producerBytes[header->id] += nng_msg_len(msg); + auto loss = calc_packet_loss(producerLastBufferNumber[header->id], header->bufferNumber); + producerBufferLoss[header->id] += loss; + producerLastBufferNumber[header->id] = header->bufferNumber; + } + else + spdlog::error("Producer id out of range: {}", header->id); + } + } + + nng_msg_free(msg); + + if (!msglen) + { + spdlog::info("consumer received 'quit' message"); + break; + } + + { + auto tNow = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(tNow - tReport); + if (elapsed.count() >= 1000) + { + tReport = tNow; + auto totalElapsed = std::chrono::duration_cast(tNow - tStart) / 1000.0; + auto MiBs = (totalBytes / (1024.0 * 1024.0)) / totalElapsed.count(); + auto bufs = 1.0 * recvCount / totalElapsed.count(); + spdlog::info("consumer received {} messages, {:.2f} MiB/s, {:.2f} buffers/s", recvCount, MiBs, bufs); + for (size_t id=0; id threads; + + const size_t ProducerCount = 10; + for (size_t i=0; i