diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..0fb5248 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,11 @@ +cmake_minimum_required(VERSION 3.14) + +project(mvlc-nng-node) + +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) +## Create binaries in the root of the build directory +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) +set(CMAKE_VERBOSE_MAKEFILE ON CACHE BOOL "ON") + +add_subdirectory(external) +add_subdirectory(src) diff --git a/external/CMakeLists.txt b/external/CMakeLists.txt new file mode 100644 index 0000000..7f4900a --- /dev/null +++ b/external/CMakeLists.txt @@ -0,0 +1,16 @@ +find_package(nng) + +if(NOT nng) + message("-- Using nng from external/nng") + option(NNG_SETSTACKSIZE "Use rlimit for thread stack size" ON) + set(NNG_TESTS OFF) + set(NNG_ENABLE_NNGCAT ON) + add_subdirectory(nng) +endif() + +find_package(mesytec-mvlc) + +if (NOT mesytec-mvlc) + message("-- Using mesytec-mvlc from external/mesytec-mvlc") + add_subdirectory(mesytec-mvlc) +endif() diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..ec99c99 --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,32 @@ +set(MY_WARN_FLAGS -Wall -Wextra -Wpedantic) + +find_package(Threads) + +# Bit of a hack to set the variables here. If set earlier clang-tidy will for +# some reason pickup log.c and warn about some va_list stuff. Might be because +# log.c is in the source directory as the same does not happen with the imgui +# object library. TODO: move logc to externals. +#find_program(CLANG_TIDY_EXECUTABLE clang-tidy) +#if (CLANG_TIDY_EXECUTABLE) +# set(CMAKE_C_CLANG_TIDY clang-tidy -p ${CMAKE_BINARY_DIR} --extra-arg=-std=c11) +# set(CMAKE_CXX_CLANG_TIDY clang-tidy -p ${CMAKE_BINARY_DIR} --extra-arg=-std=c++17) +#endif() + +#add_library(dp_common dp_common.c) +#target_compile_features(dp_common PRIVATE c_std_11) +#target_compile_options(dp_common PUBLIC ${DP_WARN_FLAGS}) # spread warning flags +#target_link_libraries(dp_common +# PUBLIC logc +# PUBLIC nng +# PUBLIC Threads::Threads +#) +#target_include_directories(dp_common INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) + +add_executable(main main.cc) +target_compile_features(main PRIVATE cxx_std_17) +target_link_libraries(main PRIVATE mesytec-mvlc PRIVATE nng) +target_compile_options(main PRIVATE ${MY_WARN_FLAGS}) + + +#unset(CMAKE_C_CLANG_TIDY) +#unset(CMAKE_CXX_CLANG_TIDY) diff --git a/src/main.cc b/src/main.cc new file mode 100644 index 0000000..880e9d0 --- /dev/null +++ b/src/main.cc @@ -0,0 +1,171 @@ +#include +#include +#include + +void mesy_nng_fatal(const char *const msg, int rv) +{ + spdlog::error("{} ({})", msg, nng_strerror(rv)); + abort(); +} + +nng_msg *alloc_message(size_t size) +{ + nng_msg *msg = {}; + if (int res = nng_msg_alloc(&msg, size)) + mesy_nng_fatal("nng_msg_alloc", res); + return msg; +} + +int receive_message(nng_socket sock, nng_msg **msg_ptr, int flags = 0) +{ + if (auto res = nng_recvmsg(sock, msg_ptr, flags)) + { + nng_msg_free(*msg_ptr); + *msg_ptr = NULL; + return res; + } + + return 0; +} + +static const size_t BufferSize = 1024u * 1024u; +static const size_t BuffersToSend = 100000u; + +void producer(nng_socket socket) +{ + { + spdlog::trace("producer waiting for start message"); + nng_msg *msg = {}; + + if (auto res = receive_message(socket, &msg)) + mesy_nng_fatal("receive_message", res); + nng_msg_free(msg); + spdlog::trace("producer received 'start' from consumer"); + } + + for (size_t nbuf=0u; nbuf(nng_msg_body(msg)) = nbuf; + + if (auto res = nng_sendmsg(socket, msg, 0)) + mesy_nng_fatal("nng_sendmsg", res); + + spdlog::trace("producer sent buffer {}/{}", nbuf+1, BuffersToSend); + } + + { + spdlog::trace("producer sending 'quit' message"); + auto msg = alloc_message(0); + if (auto res = nng_sendmsg(socket, msg, 0)) + mesy_nng_fatal("nng_sendmsg", res); + } + + if (int res = nng_close(socket)) + mesy_nng_fatal("nng_close", res); + + spdlog::info("producer sent {} messages, {} MiB/s", BuffersToSend, 0); +} + +void consumer(nng_socket socket) +{ + { + spdlog::trace("consumer sending start message"); + auto msg = alloc_message(0); + + if (auto res = nng_sendmsg(socket, msg, 0)) + mesy_nng_fatal("nng_sendmsg", res); + } + + spdlog::trace("consumer ready to receive"); + + size_t nmsg = 0u; + + while (true) + { + nng_msg *msg = {}; + + if (auto res = receive_message(socket, &msg)) + { + if (res != NNG_ETIMEDOUT) + mesy_nng_fatal("receive_message", res); + else + spdlog::warn("consumer timed out in recv"); + } + else + { + size_t msglen = nng_msg_len(msg); + + if (msglen) + { + spdlog::trace("consumer received message {}/{} of size {}", + nmsg + 1, BuffersToSend, nng_msg_len(msg)); + ++nmsg; + } + + nng_msg_free(msg); + + if (!msglen) + { + spdlog::trace("consumer received 'quit' message"); + break; + } + } + } + + if (int res = nng_close(socket)) + mesy_nng_fatal("nng_close", res); + + spdlog::info("consumer received {} messages, {} MiB/s", nmsg, 0); +} + +nng_socket make_pair_socket() +{ + nng_socket socket; + + if (int res = nng_pair0_open(&socket)) + mesy_nng_fatal("nng_pair0_open", res); + + nng_duration timeout = 100; + + if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout))) + mesy_nng_fatal("nng_socket_set", res); + + if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout))) + mesy_nng_fatal("nng_socket_set", res); + + return socket; +} + + +int main(int argc, char *argv[]) +{ + auto producerSocket = make_pair_socket(); + + if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0)) + mesy_nng_fatal("nng_listen inproc", res); + + //if (int res = nng_listen(producerSocket, "tcp://*:41234", nullptr, 0)) + // mesy_nng_fatal("nng_listen tcp", res); + + auto consumerSocket = make_pair_socket(); + + if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0)) + mesy_nng_fatal("nng_dial", res); + + std::vector threads; + + threads.emplace_back(std::thread(producer, producerSocket)); + threads.emplace_back(std::thread(consumer, consumerSocket)); + + for (auto &t: threads) + if (t.joinable()) + t.join(); + + return 0; +}