#include "common.h" #include #include #include struct Header { unsigned id; unsigned bufferNumber; }; static void push_producer(nng_socket socket, unsigned id) { Header header = {}; header.id = id; header.bufferNumber = 1; size_t totalBytes = 0u; auto tStart = std::chrono::steady_clock::now(); while (true) { nng_msg *msg = nullptr; if (int res = nng_msg_alloc(&msg, BufferSize)) mesy_nng_fatal("nng_msg_alloc", res); std::memset(nng_msg_body(msg), 0, BufferSize); *reinterpret_cast
(nng_msg_body(msg)) = header; if (auto res = nng_sendmsg(socket, msg, 0)) mesy_nng_fatal("nng_sendmsg", res); totalBytes += BufferSize; ++header.bufferNumber; } } static const size_t MaxProducers = 100; ssize_t calc_packet_loss(size_t lastPacketNumber, size_t packetNumber) { static const auto PacketNumberMax = std::numeric_limits::max(); ssize_t diff = packetNumber - lastPacketNumber; if (diff < 1) { diff = PacketNumberMax + diff; return diff; } return diff - 1; } static void pull_consumer(nng_socket socket) { size_t recvCount = 0u; size_t totalBytes = 0u; std::array producerBuffers; std::array producerBytes; std::array producerLastBufferNumber; std::array producerBufferLoss; producerBuffers.fill(0); producerBytes.fill(0); producerLastBufferNumber.fill(0); producerBufferLoss.fill(0); auto tStart = std::chrono::steady_clock::now(); auto tReport = tStart; while (true) { nng_msg *msg = {}; if (auto res = receive_message(socket, &msg)) { if (res != NNG_ETIMEDOUT) mesy_nng_fatal("receive_message", res); else spdlog::warn("consumer timed out in recv"); } else { size_t msglen = nng_msg_len(msg); if (msglen) { spdlog::trace("consumer received message {}/{} of size {}", recvCount + 1, BuffersToSend, nng_msg_len(msg)); ++recvCount; totalBytes += msglen; if (msglen >= sizeof(Header)) { auto header = reinterpret_cast(nng_msg_body(msg)); if (header->id < MaxProducers) { ++producerBuffers[header->id]; producerBytes[header->id] += nng_msg_len(msg); auto loss = calc_packet_loss(producerLastBufferNumber[header->id], header->bufferNumber); producerBufferLoss[header->id] += loss; producerLastBufferNumber[header->id] = header->bufferNumber; } else spdlog::error("Producer id out of range: {}", header->id); } } nng_msg_free(msg); if (!msglen) { spdlog::info("consumer received 'quit' message"); break; } { auto tNow = std::chrono::steady_clock::now(); auto elapsed = std::chrono::duration_cast(tNow - tReport); if (elapsed.count() >= 1000) { tReport = tNow; auto totalElapsed = std::chrono::duration_cast(tNow - tStart) / 1000.0; auto MiBs = (totalBytes / (1024.0 * 1024.0)) / totalElapsed.count(); auto bufs = 1.0 * recvCount / totalElapsed.count(); spdlog::info("consumer received {} messages, {:.2f} MiB/s, {:.2f} buffers/s", recvCount, MiBs, bufs); for (size_t id=0; id 1) ProducerCount = std::stoull(argv[1]); nng_socket consumerSocket = NNG_SOCKET_INITIALIZER; if (int res = nng_pull0_open(&consumerSocket)) mesy_nng_fatal("nng_pull0_open", res); if (int res = nng_listen(consumerSocket, "inproc://pushpull", nullptr, 0)) mesy_nng_fatal("nng_listen inproc", res); nng_socket producerSocket = NNG_SOCKET_INITIALIZER; if (int res = nng_push0_open(&producerSocket)) mesy_nng_fatal("nng_push0_open", res); if (int res = nng_dial(producerSocket, "inproc://pushpull", nullptr, 0)) mesy_nng_fatal("nng_dial inproc", res); std::vector threads; spdlog::info("Starting {} producers.", ProducerCount); for (size_t i=0; i