mesytec-mnode/src/pair_producer.cc

94 lines
2.7 KiB
C++
Raw Normal View History

2023-06-27 23:17:56 +02:00
#include "common.h"
#include <mesytec-mvlc/mesytec-mvlc.h>
void producer(nng_socket socket)
{
while (true)
{
spdlog::trace("producer waiting for start message");
nng_msg *msg = {};
if (auto res = receive_message(socket, &msg))
{
if (res != NNG_ETIMEDOUT)
mesy_nng_fatal("receive_message", res);
}
else
{
nng_msg_free(msg);
spdlog::info("producer received 'start' from consumer");
break;
}
}
size_t totalBytes = 0u;
auto tStart = std::chrono::steady_clock::now();
for (size_t nbuf=0u; nbuf<BuffersToSend; ++nbuf)
{
nng_msg *msg = nullptr;
if (int res = nng_msg_alloc(&msg, BufferSize))
mesy_nng_fatal("nng_msg_alloc", res);
std::memset(nng_msg_body(msg), 0, BufferSize);
*reinterpret_cast<size_t *>(nng_msg_body(msg)) = nbuf;
if (auto res = nng_sendmsg(socket, msg, 0))
mesy_nng_fatal("nng_sendmsg", res);
totalBytes += BufferSize;
spdlog::trace("producer sent buffer {}/{}", nbuf+1, BuffersToSend);
}
{
spdlog::trace("producer sending 'quit' message");
auto msg = alloc_message(0);
if (auto res = nng_sendmsg(socket, msg, 0))
mesy_nng_fatal("nng_sendmsg", res);
}
auto tEnd = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(tEnd - tStart) / 1000.0;
auto MiBs = (totalBytes / (1024.0 * 1024.0)) / elapsed.count();
auto bufs = 1.0 * BuffersToSend / elapsed.count();
spdlog::info("producer sent {} messages, {:.2f} MiB/s. {:.2f} buffers/s", BuffersToSend, MiBs, bufs);
}
int main(int argc, char *argv[])
{
spdlog::set_level(spdlog::level::info);
auto producerSocket = make_pair_socket();
if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0))
mesy_nng_fatal("nng_listen inproc", res);
if (int res = nng_listen(producerSocket, "tcp6://*:41234", nullptr, 0))
mesy_nng_fatal("nng_listen tcp", res);
std::vector<std::thread> threads;
threads.emplace_back(std::thread(producer, producerSocket));
//threads.emplace_back(std::thread(consumer, consumerSocket));
//auto consumerSocket = make_pair_socket();
//if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0))
// mesy_nng_fatal("nng_dial", res);
for (auto &t: threads)
if (t.joinable())
t.join();
if (int res = nng_close(producerSocket))
mesy_nng_fatal("nng_close", res);
//if (int res = nng_close(consumerSocket))
// mesy_nng_fatal("nng_close", res);
return 0;
}