cmake scaffolding and inproc pair test program
This commit is contained in:
parent
7f802aa09c
commit
881dc11cd1
4 changed files with 230 additions and 0 deletions
11
CMakeLists.txt
Normal file
11
CMakeLists.txt
Normal file
|
@ -0,0 +1,11 @@
|
|||
cmake_minimum_required(VERSION 3.14)
|
||||
|
||||
project(mvlc-nng-node)
|
||||
|
||||
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
|
||||
## Create binaries in the root of the build directory
|
||||
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})
|
||||
set(CMAKE_VERBOSE_MAKEFILE ON CACHE BOOL "ON")
|
||||
|
||||
add_subdirectory(external)
|
||||
add_subdirectory(src)
|
16
external/CMakeLists.txt
vendored
Normal file
16
external/CMakeLists.txt
vendored
Normal file
|
@ -0,0 +1,16 @@
|
|||
find_package(nng)
|
||||
|
||||
if(NOT nng)
|
||||
message("-- Using nng from external/nng")
|
||||
option(NNG_SETSTACKSIZE "Use rlimit for thread stack size" ON)
|
||||
set(NNG_TESTS OFF)
|
||||
set(NNG_ENABLE_NNGCAT ON)
|
||||
add_subdirectory(nng)
|
||||
endif()
|
||||
|
||||
find_package(mesytec-mvlc)
|
||||
|
||||
if (NOT mesytec-mvlc)
|
||||
message("-- Using mesytec-mvlc from external/mesytec-mvlc")
|
||||
add_subdirectory(mesytec-mvlc)
|
||||
endif()
|
32
src/CMakeLists.txt
Normal file
32
src/CMakeLists.txt
Normal file
|
@ -0,0 +1,32 @@
|
|||
set(MY_WARN_FLAGS -Wall -Wextra -Wpedantic)
|
||||
|
||||
find_package(Threads)
|
||||
|
||||
# Bit of a hack to set the variables here. If set earlier clang-tidy will for
|
||||
# some reason pickup log.c and warn about some va_list stuff. Might be because
|
||||
# log.c is in the source directory as the same does not happen with the imgui
|
||||
# object library. TODO: move logc to externals.
|
||||
#find_program(CLANG_TIDY_EXECUTABLE clang-tidy)
|
||||
#if (CLANG_TIDY_EXECUTABLE)
|
||||
# set(CMAKE_C_CLANG_TIDY clang-tidy -p ${CMAKE_BINARY_DIR} --extra-arg=-std=c11)
|
||||
# set(CMAKE_CXX_CLANG_TIDY clang-tidy -p ${CMAKE_BINARY_DIR} --extra-arg=-std=c++17)
|
||||
#endif()
|
||||
|
||||
#add_library(dp_common dp_common.c)
|
||||
#target_compile_features(dp_common PRIVATE c_std_11)
|
||||
#target_compile_options(dp_common PUBLIC ${DP_WARN_FLAGS}) # spread warning flags
|
||||
#target_link_libraries(dp_common
|
||||
# PUBLIC logc
|
||||
# PUBLIC nng
|
||||
# PUBLIC Threads::Threads
|
||||
#)
|
||||
#target_include_directories(dp_common INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
|
||||
|
||||
add_executable(main main.cc)
|
||||
target_compile_features(main PRIVATE cxx_std_17)
|
||||
target_link_libraries(main PRIVATE mesytec-mvlc PRIVATE nng)
|
||||
target_compile_options(main PRIVATE ${MY_WARN_FLAGS})
|
||||
|
||||
|
||||
#unset(CMAKE_C_CLANG_TIDY)
|
||||
#unset(CMAKE_CXX_CLANG_TIDY)
|
171
src/main.cc
Normal file
171
src/main.cc
Normal file
|
@ -0,0 +1,171 @@
|
|||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||
#include <nng/nng.h>
|
||||
#include <nng/protocol/pair0/pair.h>
|
||||
|
||||
void mesy_nng_fatal(const char *const msg, int rv)
|
||||
{
|
||||
spdlog::error("{} ({})", msg, nng_strerror(rv));
|
||||
abort();
|
||||
}
|
||||
|
||||
nng_msg *alloc_message(size_t size)
|
||||
{
|
||||
nng_msg *msg = {};
|
||||
if (int res = nng_msg_alloc(&msg, size))
|
||||
mesy_nng_fatal("nng_msg_alloc", res);
|
||||
return msg;
|
||||
}
|
||||
|
||||
int receive_message(nng_socket sock, nng_msg **msg_ptr, int flags = 0)
|
||||
{
|
||||
if (auto res = nng_recvmsg(sock, msg_ptr, flags))
|
||||
{
|
||||
nng_msg_free(*msg_ptr);
|
||||
*msg_ptr = NULL;
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static const size_t BufferSize = 1024u * 1024u;
|
||||
static const size_t BuffersToSend = 100000u;
|
||||
|
||||
void producer(nng_socket socket)
|
||||
{
|
||||
{
|
||||
spdlog::trace("producer waiting for start message");
|
||||
nng_msg *msg = {};
|
||||
|
||||
if (auto res = receive_message(socket, &msg))
|
||||
mesy_nng_fatal("receive_message", res);
|
||||
nng_msg_free(msg);
|
||||
spdlog::trace("producer received 'start' from consumer");
|
||||
}
|
||||
|
||||
for (size_t nbuf=0u; nbuf<BuffersToSend; ++nbuf)
|
||||
{
|
||||
nng_msg *msg = nullptr;
|
||||
|
||||
if (int res = nng_msg_alloc(&msg, BufferSize))
|
||||
mesy_nng_fatal("nng_msg_alloc", res);
|
||||
|
||||
//std::memset(nng_msg_body(msg), 0, BufferSize);
|
||||
*reinterpret_cast<size_t *>(nng_msg_body(msg)) = nbuf;
|
||||
|
||||
if (auto res = nng_sendmsg(socket, msg, 0))
|
||||
mesy_nng_fatal("nng_sendmsg", res);
|
||||
|
||||
spdlog::trace("producer sent buffer {}/{}", nbuf+1, BuffersToSend);
|
||||
}
|
||||
|
||||
{
|
||||
spdlog::trace("producer sending 'quit' message");
|
||||
auto msg = alloc_message(0);
|
||||
if (auto res = nng_sendmsg(socket, msg, 0))
|
||||
mesy_nng_fatal("nng_sendmsg", res);
|
||||
}
|
||||
|
||||
if (int res = nng_close(socket))
|
||||
mesy_nng_fatal("nng_close", res);
|
||||
|
||||
spdlog::info("producer sent {} messages, {} MiB/s", BuffersToSend, 0);
|
||||
}
|
||||
|
||||
void consumer(nng_socket socket)
|
||||
{
|
||||
{
|
||||
spdlog::trace("consumer sending start message");
|
||||
auto msg = alloc_message(0);
|
||||
|
||||
if (auto res = nng_sendmsg(socket, msg, 0))
|
||||
mesy_nng_fatal("nng_sendmsg", res);
|
||||
}
|
||||
|
||||
spdlog::trace("consumer ready to receive");
|
||||
|
||||
size_t nmsg = 0u;
|
||||
|
||||
while (true)
|
||||
{
|
||||
nng_msg *msg = {};
|
||||
|
||||
if (auto res = receive_message(socket, &msg))
|
||||
{
|
||||
if (res != NNG_ETIMEDOUT)
|
||||
mesy_nng_fatal("receive_message", res);
|
||||
else
|
||||
spdlog::warn("consumer timed out in recv");
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t msglen = nng_msg_len(msg);
|
||||
|
||||
if (msglen)
|
||||
{
|
||||
spdlog::trace("consumer received message {}/{} of size {}",
|
||||
nmsg + 1, BuffersToSend, nng_msg_len(msg));
|
||||
++nmsg;
|
||||
}
|
||||
|
||||
nng_msg_free(msg);
|
||||
|
||||
if (!msglen)
|
||||
{
|
||||
spdlog::trace("consumer received 'quit' message");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (int res = nng_close(socket))
|
||||
mesy_nng_fatal("nng_close", res);
|
||||
|
||||
spdlog::info("consumer received {} messages, {} MiB/s", nmsg, 0);
|
||||
}
|
||||
|
||||
nng_socket make_pair_socket()
|
||||
{
|
||||
nng_socket socket;
|
||||
|
||||
if (int res = nng_pair0_open(&socket))
|
||||
mesy_nng_fatal("nng_pair0_open", res);
|
||||
|
||||
nng_duration timeout = 100;
|
||||
|
||||
if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout)))
|
||||
mesy_nng_fatal("nng_socket_set", res);
|
||||
|
||||
if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout)))
|
||||
mesy_nng_fatal("nng_socket_set", res);
|
||||
|
||||
return socket;
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
auto producerSocket = make_pair_socket();
|
||||
|
||||
if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0))
|
||||
mesy_nng_fatal("nng_listen inproc", res);
|
||||
|
||||
//if (int res = nng_listen(producerSocket, "tcp://*:41234", nullptr, 0))
|
||||
// mesy_nng_fatal("nng_listen tcp", res);
|
||||
|
||||
auto consumerSocket = make_pair_socket();
|
||||
|
||||
if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0))
|
||||
mesy_nng_fatal("nng_dial", res);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
|
||||
threads.emplace_back(std::thread(producer, producerSocket));
|
||||
threads.emplace_back(std::thread(consumer, consumerSocket));
|
||||
|
||||
for (auto &t: threads)
|
||||
if (t.joinable())
|
||||
t.join();
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in a new issue