working pair producer and consumer
This commit is contained in:
parent
a47eed6d0e
commit
74d3b947a3
5 changed files with 231 additions and 175 deletions
|
@ -22,10 +22,15 @@ find_package(Threads)
|
||||||
#)
|
#)
|
||||||
#target_include_directories(dp_common INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
|
#target_include_directories(dp_common INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
|
||||||
|
|
||||||
add_executable(main main.cc)
|
add_executable(pair_producer pair_producer.cc)
|
||||||
target_compile_features(main PRIVATE cxx_std_17)
|
target_compile_features(pair_producer PRIVATE cxx_std_17)
|
||||||
target_link_libraries(main PRIVATE mesytec-mvlc PRIVATE nng)
|
target_link_libraries(pair_producer PRIVATE mesytec-mvlc PRIVATE nng)
|
||||||
target_compile_options(main PRIVATE ${MY_WARN_FLAGS})
|
target_compile_options(pair_producer PRIVATE ${MY_WARN_FLAGS})
|
||||||
|
|
||||||
|
add_executable(pair_consumer pair_consumer.cc)
|
||||||
|
target_compile_features(pair_consumer PRIVATE cxx_std_17)
|
||||||
|
target_link_libraries(pair_consumer PRIVATE mesytec-mvlc PRIVATE nng)
|
||||||
|
target_compile_options(pair_consumer PRIVATE ${MY_WARN_FLAGS})
|
||||||
|
|
||||||
|
|
||||||
#unset(CMAKE_C_CLANG_TIDY)
|
#unset(CMAKE_C_CLANG_TIDY)
|
||||||
|
|
112
src/common.h
Normal file
112
src/common.h
Normal file
|
@ -0,0 +1,112 @@
|
||||||
|
#ifndef __MESYTEC_MVLC_NNG_NODE_COMMON_H__
|
||||||
|
#define __MESYTEC_MVLC_NNG_NODE_COMMON_H__
|
||||||
|
|
||||||
|
#include <nng/nng.h>
|
||||||
|
#include <nng/protocol/pair0/pair.h>
|
||||||
|
#include <spdlog/spdlog.h>
|
||||||
|
|
||||||
|
static const size_t BufferSize = 1024u * 1024u;
|
||||||
|
static const size_t BuffersToSend = 100000u;
|
||||||
|
|
||||||
|
inline void mesy_nng_fatal(const char *const msg, int rv)
|
||||||
|
{
|
||||||
|
spdlog::error("{} ({})", msg, nng_strerror(rv));
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
inline nng_msg *alloc_message(size_t size)
|
||||||
|
{
|
||||||
|
nng_msg *msg = {};
|
||||||
|
if (int res = nng_msg_alloc(&msg, size))
|
||||||
|
mesy_nng_fatal("nng_msg_alloc", res);
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline int receive_message(nng_socket sock, nng_msg **msg_ptr, int flags = 0)
|
||||||
|
{
|
||||||
|
if (auto res = nng_recvmsg(sock, msg_ptr, flags))
|
||||||
|
{
|
||||||
|
nng_msg_free(*msg_ptr);
|
||||||
|
*msg_ptr = NULL;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline nng_socket make_pair_socket()
|
||||||
|
{
|
||||||
|
nng_socket socket;
|
||||||
|
|
||||||
|
if (int res = nng_pair0_open(&socket))
|
||||||
|
mesy_nng_fatal("nng_pair0_open", res);
|
||||||
|
|
||||||
|
nng_duration timeout = 100;
|
||||||
|
|
||||||
|
if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout)))
|
||||||
|
mesy_nng_fatal("nng_socket_set", res);
|
||||||
|
|
||||||
|
if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout)))
|
||||||
|
mesy_nng_fatal("nng_socket_set", res);
|
||||||
|
|
||||||
|
return socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void consumer(nng_socket socket)
|
||||||
|
{
|
||||||
|
{
|
||||||
|
spdlog::trace("consumer sending start message");
|
||||||
|
auto msg = alloc_message(0);
|
||||||
|
|
||||||
|
if (auto res = nng_sendmsg(socket, msg, 0))
|
||||||
|
mesy_nng_fatal("nng_sendmsg", res);
|
||||||
|
}
|
||||||
|
|
||||||
|
spdlog::info("consumer ready to receive");
|
||||||
|
|
||||||
|
size_t recvCount = 0u;
|
||||||
|
size_t totalBytes = 0u;
|
||||||
|
auto tStart = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
nng_msg *msg = {};
|
||||||
|
|
||||||
|
if (auto res = receive_message(socket, &msg))
|
||||||
|
{
|
||||||
|
if (res != NNG_ETIMEDOUT)
|
||||||
|
mesy_nng_fatal("receive_message", res);
|
||||||
|
else
|
||||||
|
spdlog::warn("consumer timed out in recv");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
size_t msglen = nng_msg_len(msg);
|
||||||
|
|
||||||
|
if (msglen)
|
||||||
|
{
|
||||||
|
spdlog::trace("consumer received message {}/{} of size {}",
|
||||||
|
recvCount + 1, BuffersToSend, nng_msg_len(msg));
|
||||||
|
++recvCount;
|
||||||
|
totalBytes += msglen;
|
||||||
|
}
|
||||||
|
|
||||||
|
nng_msg_free(msg);
|
||||||
|
|
||||||
|
if (!msglen)
|
||||||
|
{
|
||||||
|
spdlog::info("consumer received 'quit' message");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto tEnd = std::chrono::steady_clock::now();
|
||||||
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(tEnd - tStart) / 1000.0;
|
||||||
|
auto MiBs = (totalBytes / (1024.0 * 1024.0)) / elapsed.count();
|
||||||
|
auto bufs = 1.0 * recvCount / elapsed.count();
|
||||||
|
|
||||||
|
spdlog::info("consumer received {} messages, {:.2f} MiB/s, {:.2f} buffers/s", recvCount, MiBs, bufs);
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* __MESYTEC_MVLC_NNG_NODE_COMMON_H__ */
|
171
src/main.cc
171
src/main.cc
|
@ -1,171 +0,0 @@
|
||||||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
|
||||||
#include <nng/nng.h>
|
|
||||||
#include <nng/protocol/pair0/pair.h>
|
|
||||||
|
|
||||||
void mesy_nng_fatal(const char *const msg, int rv)
|
|
||||||
{
|
|
||||||
spdlog::error("{} ({})", msg, nng_strerror(rv));
|
|
||||||
abort();
|
|
||||||
}
|
|
||||||
|
|
||||||
nng_msg *alloc_message(size_t size)
|
|
||||||
{
|
|
||||||
nng_msg *msg = {};
|
|
||||||
if (int res = nng_msg_alloc(&msg, size))
|
|
||||||
mesy_nng_fatal("nng_msg_alloc", res);
|
|
||||||
return msg;
|
|
||||||
}
|
|
||||||
|
|
||||||
int receive_message(nng_socket sock, nng_msg **msg_ptr, int flags = 0)
|
|
||||||
{
|
|
||||||
if (auto res = nng_recvmsg(sock, msg_ptr, flags))
|
|
||||||
{
|
|
||||||
nng_msg_free(*msg_ptr);
|
|
||||||
*msg_ptr = NULL;
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static const size_t BufferSize = 1024u * 1024u;
|
|
||||||
static const size_t BuffersToSend = 100000u;
|
|
||||||
|
|
||||||
void producer(nng_socket socket)
|
|
||||||
{
|
|
||||||
{
|
|
||||||
spdlog::trace("producer waiting for start message");
|
|
||||||
nng_msg *msg = {};
|
|
||||||
|
|
||||||
if (auto res = receive_message(socket, &msg))
|
|
||||||
mesy_nng_fatal("receive_message", res);
|
|
||||||
nng_msg_free(msg);
|
|
||||||
spdlog::trace("producer received 'start' from consumer");
|
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t nbuf=0u; nbuf<BuffersToSend; ++nbuf)
|
|
||||||
{
|
|
||||||
nng_msg *msg = nullptr;
|
|
||||||
|
|
||||||
if (int res = nng_msg_alloc(&msg, BufferSize))
|
|
||||||
mesy_nng_fatal("nng_msg_alloc", res);
|
|
||||||
|
|
||||||
//std::memset(nng_msg_body(msg), 0, BufferSize);
|
|
||||||
*reinterpret_cast<size_t *>(nng_msg_body(msg)) = nbuf;
|
|
||||||
|
|
||||||
if (auto res = nng_sendmsg(socket, msg, 0))
|
|
||||||
mesy_nng_fatal("nng_sendmsg", res);
|
|
||||||
|
|
||||||
spdlog::trace("producer sent buffer {}/{}", nbuf+1, BuffersToSend);
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
spdlog::trace("producer sending 'quit' message");
|
|
||||||
auto msg = alloc_message(0);
|
|
||||||
if (auto res = nng_sendmsg(socket, msg, 0))
|
|
||||||
mesy_nng_fatal("nng_sendmsg", res);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (int res = nng_close(socket))
|
|
||||||
mesy_nng_fatal("nng_close", res);
|
|
||||||
|
|
||||||
spdlog::info("producer sent {} messages, {} MiB/s", BuffersToSend, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
void consumer(nng_socket socket)
|
|
||||||
{
|
|
||||||
{
|
|
||||||
spdlog::trace("consumer sending start message");
|
|
||||||
auto msg = alloc_message(0);
|
|
||||||
|
|
||||||
if (auto res = nng_sendmsg(socket, msg, 0))
|
|
||||||
mesy_nng_fatal("nng_sendmsg", res);
|
|
||||||
}
|
|
||||||
|
|
||||||
spdlog::trace("consumer ready to receive");
|
|
||||||
|
|
||||||
size_t nmsg = 0u;
|
|
||||||
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
nng_msg *msg = {};
|
|
||||||
|
|
||||||
if (auto res = receive_message(socket, &msg))
|
|
||||||
{
|
|
||||||
if (res != NNG_ETIMEDOUT)
|
|
||||||
mesy_nng_fatal("receive_message", res);
|
|
||||||
else
|
|
||||||
spdlog::warn("consumer timed out in recv");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
size_t msglen = nng_msg_len(msg);
|
|
||||||
|
|
||||||
if (msglen)
|
|
||||||
{
|
|
||||||
spdlog::trace("consumer received message {}/{} of size {}",
|
|
||||||
nmsg + 1, BuffersToSend, nng_msg_len(msg));
|
|
||||||
++nmsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
nng_msg_free(msg);
|
|
||||||
|
|
||||||
if (!msglen)
|
|
||||||
{
|
|
||||||
spdlog::trace("consumer received 'quit' message");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (int res = nng_close(socket))
|
|
||||||
mesy_nng_fatal("nng_close", res);
|
|
||||||
|
|
||||||
spdlog::info("consumer received {} messages, {} MiB/s", nmsg, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
nng_socket make_pair_socket()
|
|
||||||
{
|
|
||||||
nng_socket socket;
|
|
||||||
|
|
||||||
if (int res = nng_pair0_open(&socket))
|
|
||||||
mesy_nng_fatal("nng_pair0_open", res);
|
|
||||||
|
|
||||||
nng_duration timeout = 100;
|
|
||||||
|
|
||||||
if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout)))
|
|
||||||
mesy_nng_fatal("nng_socket_set", res);
|
|
||||||
|
|
||||||
if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout)))
|
|
||||||
mesy_nng_fatal("nng_socket_set", res);
|
|
||||||
|
|
||||||
return socket;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int main(int argc, char *argv[])
|
|
||||||
{
|
|
||||||
auto producerSocket = make_pair_socket();
|
|
||||||
|
|
||||||
if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0))
|
|
||||||
mesy_nng_fatal("nng_listen inproc", res);
|
|
||||||
|
|
||||||
//if (int res = nng_listen(producerSocket, "tcp://*:41234", nullptr, 0))
|
|
||||||
// mesy_nng_fatal("nng_listen tcp", res);
|
|
||||||
|
|
||||||
auto consumerSocket = make_pair_socket();
|
|
||||||
|
|
||||||
if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0))
|
|
||||||
mesy_nng_fatal("nng_dial", res);
|
|
||||||
|
|
||||||
std::vector<std::thread> threads;
|
|
||||||
|
|
||||||
threads.emplace_back(std::thread(producer, producerSocket));
|
|
||||||
threads.emplace_back(std::thread(consumer, consumerSocket));
|
|
||||||
|
|
||||||
for (auto &t: threads)
|
|
||||||
if (t.joinable())
|
|
||||||
t.join();
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
17
src/pair_consumer.cc
Normal file
17
src/pair_consumer.cc
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
#include "common.h"
|
||||||
|
|
||||||
|
int main(int argc, char *argv[])
|
||||||
|
{
|
||||||
|
spdlog::set_level(spdlog::level::info);
|
||||||
|
auto consumerSocket = make_pair_socket();
|
||||||
|
|
||||||
|
if (int res = nng_dial(consumerSocket, "tcp6://[::]:41234", nullptr, 0))
|
||||||
|
mesy_nng_fatal("nng_dial", res);
|
||||||
|
|
||||||
|
consumer(consumerSocket);
|
||||||
|
|
||||||
|
if (int res = nng_close(consumerSocket))
|
||||||
|
mesy_nng_fatal("nng_close", res);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
93
src/pair_producer.cc
Normal file
93
src/pair_producer.cc
Normal file
|
@ -0,0 +1,93 @@
|
||||||
|
#include "common.h"
|
||||||
|
|
||||||
|
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||||
|
|
||||||
|
void producer(nng_socket socket)
|
||||||
|
{
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
spdlog::trace("producer waiting for start message");
|
||||||
|
nng_msg *msg = {};
|
||||||
|
|
||||||
|
if (auto res = receive_message(socket, &msg))
|
||||||
|
{
|
||||||
|
if (res != NNG_ETIMEDOUT)
|
||||||
|
mesy_nng_fatal("receive_message", res);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
nng_msg_free(msg);
|
||||||
|
spdlog::info("producer received 'start' from consumer");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t totalBytes = 0u;
|
||||||
|
auto tStart = std::chrono::steady_clock::now();
|
||||||
|
|
||||||
|
for (size_t nbuf=0u; nbuf<BuffersToSend; ++nbuf)
|
||||||
|
{
|
||||||
|
nng_msg *msg = nullptr;
|
||||||
|
|
||||||
|
if (int res = nng_msg_alloc(&msg, BufferSize))
|
||||||
|
mesy_nng_fatal("nng_msg_alloc", res);
|
||||||
|
|
||||||
|
std::memset(nng_msg_body(msg), 0, BufferSize);
|
||||||
|
*reinterpret_cast<size_t *>(nng_msg_body(msg)) = nbuf;
|
||||||
|
|
||||||
|
if (auto res = nng_sendmsg(socket, msg, 0))
|
||||||
|
mesy_nng_fatal("nng_sendmsg", res);
|
||||||
|
|
||||||
|
totalBytes += BufferSize;
|
||||||
|
|
||||||
|
spdlog::trace("producer sent buffer {}/{}", nbuf+1, BuffersToSend);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
spdlog::trace("producer sending 'quit' message");
|
||||||
|
auto msg = alloc_message(0);
|
||||||
|
if (auto res = nng_sendmsg(socket, msg, 0))
|
||||||
|
mesy_nng_fatal("nng_sendmsg", res);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto tEnd = std::chrono::steady_clock::now();
|
||||||
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(tEnd - tStart) / 1000.0;
|
||||||
|
auto MiBs = (totalBytes / (1024.0 * 1024.0)) / elapsed.count();
|
||||||
|
auto bufs = 1.0 * BuffersToSend / elapsed.count();
|
||||||
|
|
||||||
|
spdlog::info("producer sent {} messages, {:.2f} MiB/s. {:.2f} buffers/s", BuffersToSend, MiBs, bufs);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[])
|
||||||
|
{
|
||||||
|
spdlog::set_level(spdlog::level::info);
|
||||||
|
auto producerSocket = make_pair_socket();
|
||||||
|
|
||||||
|
if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0))
|
||||||
|
mesy_nng_fatal("nng_listen inproc", res);
|
||||||
|
|
||||||
|
if (int res = nng_listen(producerSocket, "tcp6://*:41234", nullptr, 0))
|
||||||
|
mesy_nng_fatal("nng_listen tcp", res);
|
||||||
|
|
||||||
|
std::vector<std::thread> threads;
|
||||||
|
|
||||||
|
threads.emplace_back(std::thread(producer, producerSocket));
|
||||||
|
//threads.emplace_back(std::thread(consumer, consumerSocket));
|
||||||
|
|
||||||
|
//auto consumerSocket = make_pair_socket();
|
||||||
|
|
||||||
|
//if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0))
|
||||||
|
// mesy_nng_fatal("nng_dial", res);
|
||||||
|
|
||||||
|
for (auto &t: threads)
|
||||||
|
if (t.joinable())
|
||||||
|
t.join();
|
||||||
|
|
||||||
|
if (int res = nng_close(producerSocket))
|
||||||
|
mesy_nng_fatal("nng_close", res);
|
||||||
|
|
||||||
|
//if (int res = nng_close(consumerSocket))
|
||||||
|
// mesy_nng_fatal("nng_close", res);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in a new issue