separate program for inproc producer/consumer and cleanup
This commit is contained in:
parent
2bb7581aa6
commit
b5c5450877
4 changed files with 94 additions and 79 deletions
|
@ -1,7 +1,5 @@
|
|||
set(MY_WARN_FLAGS -Wall -Wextra -Wpedantic)
|
||||
|
||||
find_package(Threads)
|
||||
|
||||
# Bit of a hack to set the variables here. If set earlier clang-tidy will for
|
||||
# some reason pickup log.c and warn about some va_list stuff. Might be because
|
||||
# log.c is in the source directory as the same does not happen with the imgui
|
||||
|
@ -32,6 +30,10 @@ target_compile_features(pair_consumer PRIVATE cxx_std_17)
|
|||
target_link_libraries(pair_consumer PRIVATE mesytec-mvlc PRIVATE nng)
|
||||
target_compile_options(pair_consumer PRIVATE ${MY_WARN_FLAGS})
|
||||
|
||||
add_executable(pair_inproc pair_inproc.cc)
|
||||
target_compile_features(pair_inproc PRIVATE cxx_std_17)
|
||||
target_link_libraries(pair_inproc PRIVATE mesytec-mvlc PRIVATE nng)
|
||||
target_compile_options(pair_inproc PRIVATE ${MY_WARN_FLAGS})
|
||||
|
||||
#unset(CMAKE_C_CLANG_TIDY)
|
||||
#unset(CMAKE_CXX_CLANG_TIDY)
|
||||
|
|
56
src/common.h
56
src/common.h
|
@ -52,6 +52,62 @@ inline nng_socket make_pair_socket()
|
|||
return socket;
|
||||
}
|
||||
|
||||
inline void producer(nng_socket socket)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
spdlog::trace("producer waiting for start message");
|
||||
nng_msg *msg = {};
|
||||
|
||||
if (auto res = receive_message(socket, &msg))
|
||||
{
|
||||
if (res != NNG_ETIMEDOUT)
|
||||
mesy_nng_fatal("receive_message", res);
|
||||
}
|
||||
else
|
||||
{
|
||||
nng_msg_free(msg);
|
||||
spdlog::info("producer received 'start' from consumer");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
size_t totalBytes = 0u;
|
||||
auto tStart = std::chrono::steady_clock::now();
|
||||
|
||||
for (size_t nbuf=0u; nbuf<BuffersToSend; ++nbuf)
|
||||
{
|
||||
nng_msg *msg = nullptr;
|
||||
|
||||
if (int res = nng_msg_alloc(&msg, BufferSize))
|
||||
mesy_nng_fatal("nng_msg_alloc", res);
|
||||
|
||||
std::memset(nng_msg_body(msg), 0, BufferSize);
|
||||
*reinterpret_cast<size_t *>(nng_msg_body(msg)) = nbuf;
|
||||
|
||||
if (auto res = nng_sendmsg(socket, msg, 0))
|
||||
mesy_nng_fatal("nng_sendmsg", res);
|
||||
|
||||
totalBytes += BufferSize;
|
||||
|
||||
spdlog::trace("producer sent buffer {}/{}", nbuf+1, BuffersToSend);
|
||||
}
|
||||
|
||||
{
|
||||
spdlog::trace("producer sending 'quit' message");
|
||||
auto msg = alloc_message(0);
|
||||
if (auto res = nng_sendmsg(socket, msg, 0))
|
||||
mesy_nng_fatal("nng_sendmsg", res);
|
||||
}
|
||||
|
||||
auto tEnd = std::chrono::steady_clock::now();
|
||||
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(tEnd - tStart) / 1000.0;
|
||||
auto MiBs = (totalBytes / (1024.0 * 1024.0)) / elapsed.count();
|
||||
auto bufs = 1.0 * BuffersToSend / elapsed.count();
|
||||
|
||||
spdlog::info("producer sent {} messages, {:.2f} MiB/s. {:.2f} buffers/s", BuffersToSend, MiBs, bufs);
|
||||
}
|
||||
|
||||
inline void consumer(nng_socket socket)
|
||||
{
|
||||
{
|
||||
|
|
33
src/pair_inproc.cc
Normal file
33
src/pair_inproc.cc
Normal file
|
@ -0,0 +1,33 @@
|
|||
#include "common.h"
|
||||
|
||||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
spdlog::set_level(spdlog::level::info);
|
||||
auto producerSocket = make_pair_socket();
|
||||
|
||||
if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0))
|
||||
mesy_nng_fatal("nng_listen inproc", res);
|
||||
|
||||
auto consumerSocket = make_pair_socket();
|
||||
|
||||
if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0))
|
||||
mesy_nng_fatal("nng_dial", res);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
|
||||
threads.emplace_back(std::thread(producer, producerSocket));
|
||||
threads.emplace_back(std::thread(consumer, consumerSocket));
|
||||
|
||||
for (auto &t: threads)
|
||||
if (t.joinable()) t.join();
|
||||
|
||||
if (int res = nng_close(consumerSocket))
|
||||
mesy_nng_fatal("nng_close", res);
|
||||
|
||||
if (int res = nng_close(producerSocket))
|
||||
mesy_nng_fatal("nng_close", res);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -1,93 +1,17 @@
|
|||
#include "common.h"
|
||||
|
||||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||
|
||||
void producer(nng_socket socket)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
spdlog::trace("producer waiting for start message");
|
||||
nng_msg *msg = {};
|
||||
|
||||
if (auto res = receive_message(socket, &msg))
|
||||
{
|
||||
if (res != NNG_ETIMEDOUT)
|
||||
mesy_nng_fatal("receive_message", res);
|
||||
}
|
||||
else
|
||||
{
|
||||
nng_msg_free(msg);
|
||||
spdlog::info("producer received 'start' from consumer");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
size_t totalBytes = 0u;
|
||||
auto tStart = std::chrono::steady_clock::now();
|
||||
|
||||
for (size_t nbuf=0u; nbuf<BuffersToSend; ++nbuf)
|
||||
{
|
||||
nng_msg *msg = nullptr;
|
||||
|
||||
if (int res = nng_msg_alloc(&msg, BufferSize))
|
||||
mesy_nng_fatal("nng_msg_alloc", res);
|
||||
|
||||
std::memset(nng_msg_body(msg), 0, BufferSize);
|
||||
*reinterpret_cast<size_t *>(nng_msg_body(msg)) = nbuf;
|
||||
|
||||
if (auto res = nng_sendmsg(socket, msg, 0))
|
||||
mesy_nng_fatal("nng_sendmsg", res);
|
||||
|
||||
totalBytes += BufferSize;
|
||||
|
||||
spdlog::trace("producer sent buffer {}/{}", nbuf+1, BuffersToSend);
|
||||
}
|
||||
|
||||
{
|
||||
spdlog::trace("producer sending 'quit' message");
|
||||
auto msg = alloc_message(0);
|
||||
if (auto res = nng_sendmsg(socket, msg, 0))
|
||||
mesy_nng_fatal("nng_sendmsg", res);
|
||||
}
|
||||
|
||||
auto tEnd = std::chrono::steady_clock::now();
|
||||
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(tEnd - tStart) / 1000.0;
|
||||
auto MiBs = (totalBytes / (1024.0 * 1024.0)) / elapsed.count();
|
||||
auto bufs = 1.0 * BuffersToSend / elapsed.count();
|
||||
|
||||
spdlog::info("producer sent {} messages, {:.2f} MiB/s. {:.2f} buffers/s", BuffersToSend, MiBs, bufs);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
spdlog::set_level(spdlog::level::info);
|
||||
auto producerSocket = make_pair_socket();
|
||||
|
||||
if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0))
|
||||
mesy_nng_fatal("nng_listen inproc", res);
|
||||
|
||||
if (int res = nng_listen(producerSocket, "tcp6://*:41234", nullptr, 0))
|
||||
mesy_nng_fatal("nng_listen tcp", res);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
|
||||
threads.emplace_back(std::thread(producer, producerSocket));
|
||||
//threads.emplace_back(std::thread(consumer, consumerSocket));
|
||||
|
||||
//auto consumerSocket = make_pair_socket();
|
||||
|
||||
//if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0))
|
||||
// mesy_nng_fatal("nng_dial", res);
|
||||
|
||||
for (auto &t: threads)
|
||||
if (t.joinable())
|
||||
t.join();
|
||||
producer(producerSocket);
|
||||
|
||||
if (int res = nng_close(producerSocket))
|
||||
mesy_nng_fatal("nng_close", res);
|
||||
|
||||
//if (int res = nng_close(consumerSocket))
|
||||
// mesy_nng_fatal("nng_close", res);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue