add a push-pull test program
No buffer loss seen, rate is ~40GB/s starting at around 4 producer threads.
This commit is contained in:
parent
3d53c99114
commit
68ce15fedf
2 changed files with 188 additions and 0 deletions
|
@ -45,5 +45,10 @@ target_compile_features(mesy_nng_pipeline_main PRIVATE cxx_std_17)
|
||||||
target_link_libraries(mesy_nng_pipeline_main PRIVATE mesytec-mvlc PRIVATE nng)
|
target_link_libraries(mesy_nng_pipeline_main PRIVATE mesytec-mvlc PRIVATE nng)
|
||||||
target_compile_options(mesy_nng_pipeline_main PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
|
target_compile_options(mesy_nng_pipeline_main PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
|
||||||
|
|
||||||
|
add_executable(mesy_nng_push_pull_main mesy_nng_push_pull_main.cc)
|
||||||
|
target_compile_features(mesy_nng_push_pull_main PRIVATE cxx_std_17)
|
||||||
|
target_link_libraries(mesy_nng_push_pull_main PRIVATE mesytec-mvlc PRIVATE nng)
|
||||||
|
target_compile_options(mesy_nng_push_pull_main PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
|
||||||
|
|
||||||
#unset(CMAKE_C_CLANG_TIDY)
|
#unset(CMAKE_C_CLANG_TIDY)
|
||||||
#unset(CMAKE_CXX_CLANG_TIDY)
|
#unset(CMAKE_CXX_CLANG_TIDY)
|
||||||
|
|
183
src/mesy_nng_push_pull_main.cc
Normal file
183
src/mesy_nng_push_pull_main.cc
Normal file
|
@ -0,0 +1,183 @@
|
||||||
|
#include "common.h"
|
||||||
|
#include <nng/protocol/pipeline0/push.h>
|
||||||
|
#include <nng/protocol/pipeline0/pull.h>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
struct Header
|
||||||
|
{
|
||||||
|
unsigned id;
|
||||||
|
unsigned bufferNumber;
|
||||||
|
};
|
||||||
|
|
||||||
|
static void push_producer(nng_socket socket, unsigned id)
|
||||||
|
{
|
||||||
|
Header header = {};
|
||||||
|
header.id = id;
|
||||||
|
header.bufferNumber = 1;
|
||||||
|
|
||||||
|
size_t totalBytes = 0u;
|
||||||
|
auto tStart = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
nng_msg *msg = nullptr;
|
||||||
|
|
||||||
|
if (int res = nng_msg_alloc(&msg, BufferSize))
|
||||||
|
mesy_nng_fatal("nng_msg_alloc", res);
|
||||||
|
|
||||||
|
std::memset(nng_msg_body(msg), 0, BufferSize);
|
||||||
|
*reinterpret_cast<Header *>(nng_msg_body(msg)) = header;
|
||||||
|
|
||||||
|
if (auto res = nng_sendmsg(socket, msg, 0))
|
||||||
|
mesy_nng_fatal("nng_sendmsg", res);
|
||||||
|
|
||||||
|
totalBytes += BufferSize;
|
||||||
|
++header.bufferNumber;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static const size_t MaxProducers = 100;
|
||||||
|
|
||||||
|
ssize_t calc_packet_loss(size_t lastPacketNumber, size_t packetNumber)
|
||||||
|
{
|
||||||
|
static const auto PacketNumberMax = std::numeric_limits<size_t>::max();
|
||||||
|
|
||||||
|
ssize_t diff = packetNumber - lastPacketNumber;
|
||||||
|
|
||||||
|
if (diff < 1)
|
||||||
|
{
|
||||||
|
diff = PacketNumberMax + diff;
|
||||||
|
return diff;
|
||||||
|
}
|
||||||
|
|
||||||
|
return diff - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void pull_consumer(nng_socket socket)
|
||||||
|
{
|
||||||
|
size_t recvCount = 0u;
|
||||||
|
size_t totalBytes = 0u;
|
||||||
|
std::array<size_t, MaxProducers> producerBuffers;
|
||||||
|
std::array<size_t, MaxProducers> producerBytes;
|
||||||
|
std::array<size_t, MaxProducers> producerLastBufferNumber;
|
||||||
|
std::array<size_t, MaxProducers> producerBufferLoss;
|
||||||
|
|
||||||
|
producerBuffers.fill(0);
|
||||||
|
producerBytes.fill(0);
|
||||||
|
producerLastBufferNumber.fill(0);
|
||||||
|
producerBufferLoss.fill(0);
|
||||||
|
|
||||||
|
auto tStart = std::chrono::steady_clock::now();
|
||||||
|
auto tReport = tStart;
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
nng_msg *msg = {};
|
||||||
|
|
||||||
|
if (auto res = receive_message(socket, &msg))
|
||||||
|
{
|
||||||
|
if (res != NNG_ETIMEDOUT)
|
||||||
|
mesy_nng_fatal("receive_message", res);
|
||||||
|
else
|
||||||
|
spdlog::warn("consumer timed out in recv");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
size_t msglen = nng_msg_len(msg);
|
||||||
|
|
||||||
|
if (msglen)
|
||||||
|
{
|
||||||
|
spdlog::trace("consumer received message {}/{} of size {}",
|
||||||
|
recvCount + 1, BuffersToSend, nng_msg_len(msg));
|
||||||
|
++recvCount;
|
||||||
|
totalBytes += msglen;
|
||||||
|
|
||||||
|
if (msglen >= sizeof(Header))
|
||||||
|
{
|
||||||
|
auto header = reinterpret_cast<const Header *>(nng_msg_body(msg));
|
||||||
|
if (header->id < MaxProducers)
|
||||||
|
{
|
||||||
|
++producerBuffers[header->id];
|
||||||
|
producerBytes[header->id] += nng_msg_len(msg);
|
||||||
|
auto loss = calc_packet_loss(producerLastBufferNumber[header->id], header->bufferNumber);
|
||||||
|
producerBufferLoss[header->id] += loss;
|
||||||
|
producerLastBufferNumber[header->id] = header->bufferNumber;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
spdlog::error("Producer id out of range: {}", header->id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nng_msg_free(msg);
|
||||||
|
|
||||||
|
if (!msglen)
|
||||||
|
{
|
||||||
|
spdlog::info("consumer received 'quit' message");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
auto tNow = std::chrono::steady_clock::now();
|
||||||
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(tNow - tReport);
|
||||||
|
if (elapsed.count() >= 1000)
|
||||||
|
{
|
||||||
|
tReport = tNow;
|
||||||
|
auto totalElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(tNow - tStart) / 1000.0;
|
||||||
|
auto MiBs = (totalBytes / (1024.0 * 1024.0)) / totalElapsed.count();
|
||||||
|
auto bufs = 1.0 * recvCount / totalElapsed.count();
|
||||||
|
spdlog::info("consumer received {} messages, {:.2f} MiB/s, {:.2f} buffers/s", recvCount, MiBs, bufs);
|
||||||
|
for (size_t id=0; id<MaxProducers; ++id)
|
||||||
|
{
|
||||||
|
if (producerBuffers[id])
|
||||||
|
{
|
||||||
|
spdlog::info("buffers from producer {}: received={}, lost={}: ",
|
||||||
|
id, producerBuffers[id], producerBufferLoss[id]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[])
|
||||||
|
{
|
||||||
|
spdlog::set_level(spdlog::level::info);
|
||||||
|
|
||||||
|
nng_socket consumerSocket = NNG_SOCKET_INITIALIZER;
|
||||||
|
|
||||||
|
if (int res = nng_pull0_open(&consumerSocket))
|
||||||
|
mesy_nng_fatal("nng_pull0_open", res);
|
||||||
|
|
||||||
|
if (int res = nng_listen(consumerSocket, "inproc://pushpull", nullptr, 0))
|
||||||
|
mesy_nng_fatal("nng_listen inproc", res);
|
||||||
|
|
||||||
|
nng_socket producerSocket = NNG_SOCKET_INITIALIZER;
|
||||||
|
|
||||||
|
if (int res = nng_push0_open(&producerSocket))
|
||||||
|
mesy_nng_fatal("nng_push0_open", res);
|
||||||
|
|
||||||
|
if (int res = nng_dial(producerSocket, "inproc://pushpull", nullptr, 0))
|
||||||
|
mesy_nng_fatal("nng_dial inproc", res);
|
||||||
|
|
||||||
|
std::vector<std::thread> threads;
|
||||||
|
|
||||||
|
const size_t ProducerCount = 10;
|
||||||
|
for (size_t i=0; i<ProducerCount; ++i)
|
||||||
|
threads.emplace_back(std::thread(push_producer, producerSocket, i));
|
||||||
|
|
||||||
|
// Deliberately start the consumer after the producers. There should be no buffer loss.
|
||||||
|
threads.emplace_back(std::thread(pull_consumer, consumerSocket));
|
||||||
|
|
||||||
|
|
||||||
|
for (auto &t: threads)
|
||||||
|
if (t.joinable()) t.join();
|
||||||
|
|
||||||
|
if (int res = nng_close(consumerSocket))
|
||||||
|
mesy_nng_fatal("nng_close", res);
|
||||||
|
|
||||||
|
if (int res = nng_close(producerSocket))
|
||||||
|
mesy_nng_fatal("nng_close", res);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in a new issue