hide spdlog from mnode_nng.h

This commit is contained in:
Florian Lüke 2024-12-07 17:36:59 +01:00
parent d3a5f2c75c
commit 4e4606c454
15 changed files with 181 additions and 167 deletions

View file

@ -10,30 +10,26 @@
#include <nng/protocol/reqrep0/req.h> #include <nng/protocol/reqrep0/req.h>
#include <nng/protocol/reqrep0/rep.h> #include <nng/protocol/reqrep0/rep.h>
#include <spdlog/spdlog.h> #include <cassert>
#include <cstdint>
#include <cstring>
#include <functional>
#include <memory>
#include <optional> #include <optional>
#include <string>
namespace mesytec::mnode::nng namespace mesytec::mnode::nng
{ {
inline void mesy_nng_fatal(const char *const msg, int rv) void mnode_nng_fatal(const char *const msg, int rv);
{ void mnode_nng_error(const std::string &msg, int rv);
spdlog::error("{} ({})", msg, nng_strerror(rv));
std::abort();
}
inline void mesy_nng_error(const std::string &msg, int rv)
{
spdlog::error("{} ({})", msg, nng_strerror(rv));
}
inline nng_msg *alloc_message(size_t size) inline nng_msg *alloc_message(size_t size)
{ {
nng_msg *msg = {}; nng_msg *msg = {};
if (int res = nng_msg_alloc(&msg, size)) if (int res = nng_msg_alloc(&msg, size))
{ {
mesy_nng_error("nng_msg_alloc", res); mnode_nng_error("nng_msg_alloc", res);
return nullptr; return nullptr;
} }
return msg; return msg;
@ -57,13 +53,13 @@ inline int allocate_reserve_message(nng_msg **msg, size_t reserve = 0)
if (auto res = nng_msg_alloc(msg, 0)) if (auto res = nng_msg_alloc(msg, 0))
{ {
mesy_nng_error("nng_msg_alloc", res); mnode_nng_error("nng_msg_alloc", res);
return res; return res;
} }
if (auto res = nng_msg_reserve(*msg, reserve)) if (auto res = nng_msg_reserve(*msg, reserve))
{ {
mesy_nng_error("nng_msg_reserve", res); mnode_nng_error("nng_msg_reserve", res);
return res; return res;
} }
@ -84,13 +80,13 @@ inline int set_socket_timeouts(nng_socket socket, nng_duration timeout = Default
{ {
if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout))) if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout)))
{ {
mesy_nng_error("nng_socket_set", res); mnode_nng_error("nng_socket_set", res);
return res; return res;
} }
if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout))) if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout)))
{ {
mesy_nng_error("nng_socket_set", res); mnode_nng_error("nng_socket_set", res);
return res; return res;
} }
@ -105,7 +101,7 @@ inline nng_socket make_socket(socket_factory factory, nng_duration timeout = Def
if (int res = factory(&socket)) if (int res = factory(&socket))
{ {
mesy_nng_error("make_socket", res); mnode_nng_error("make_socket", res);
return NNG_SOCKET_INITIALIZER; return NNG_SOCKET_INITIALIZER;
} }
@ -153,35 +149,17 @@ inline std::string pipe_get_string_opt(nng_pipe p, const char *opt)
return result; return result;
} }
inline void log_socket_info(nng_socket s, const char *info) void log_socket_info(nng_socket s, const std::string &info_text);
{
auto sockName = socket_get_string_opt(s, NNG_OPT_SOCKNAME);
auto localAddress = socket_get_string_opt(s, NNG_OPT_LOCADDR);
auto remoteAddress = socket_get_string_opt(s, NNG_OPT_REMADDR);
spdlog::info("{}: {}={}", info, NNG_OPT_SOCKNAME, sockName);
spdlog::info("{}: {}={}", info, NNG_OPT_LOCADDR, localAddress);
spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress);
}
inline std::string get_local_address(nng_socket s) inline std::string get_local_address(nng_socket s)
{ {
return socket_get_string_opt(s, NNG_OPT_LOCADDR); return socket_get_string_opt(s, NNG_OPT_LOCADDR);
} }
inline void log_pipe_info(nng_pipe p, const char *info) void log_pipe_info(nng_pipe p, const char *info_text);
{
auto sockName = pipe_get_string_opt(p, NNG_OPT_SOCKNAME);
auto localAddress = pipe_get_string_opt(p, NNG_OPT_LOCADDR);
auto remoteAddress = pipe_get_string_opt(p, NNG_OPT_REMADDR);
spdlog::info("{}: {}={}", info, NNG_OPT_SOCKNAME, sockName); // 'nng_res' is the result of the last nng function call.
spdlog::info("{}: {}={}", info, NNG_OPT_LOCADDR, localAddress); using retry_predicate = std::function<bool (int nng_res)>;
spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress);
}
// 'res' is the result of the last nng function call.
using retry_predicate = std::function<bool (int res)>;
class RetryNTimes class RetryNTimes
{ {
@ -200,32 +178,7 @@ class RetryNTimes
size_t attempt_ = 0u; size_t attempt_ = 0u;
}; };
inline int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp, const char *debugInfo = "") int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp, const char *debugInfo = "");
{
int res = 0;
do
{
res = nng_sendmsg(socket, msg, 0);
if (res)
{
if (res != NNG_ETIMEDOUT)
{
spdlog::warn("send_message_retry: {} - send failed: {} (msg={})", debugInfo, nng_strerror(res), fmt::ptr(msg));
return res;
}
if (res == NNG_ETIMEDOUT)
spdlog::trace("send_message_retry: {} - send timeout (msg={})", debugInfo, fmt::ptr(msg));
if (!rp(res))
return res;
}
} while (res == NNG_ETIMEDOUT);
return 0;
}
inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3, const char *debugInfo = "") inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3, const char *debugInfo = "")
{ {
@ -329,37 +282,13 @@ void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth=0)
} }
} }
template<typename Value> std::string format_stat(nng_stat *stat);
std::string format_stat(int type, const char *name, const char *desc, std::uint64_t ts, Value value, int unit)
{
return fmt::format("type={}, name={}, desc={}, ts={}, value={}, unit={}",
nng::nng_stat_type_to_string(type),
name, desc, ts, value,
nng::nng_stat_unit_to_string(unit));
}
inline std::string format_stat(nng_stat *stat)
{
switch (nng_stat_type(stat))
{
case NNG_STAT_BOOLEAN:
return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat));
default:
return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat));
}
}
static void custom_nng_msg_free(nng_msg *msg)
{
//spdlog::warn("custom_nng_msg_free: msg={}", fmt::ptr(msg));
nng_msg_free(msg);
}
using unique_msg = std::unique_ptr<nng_msg, decltype(&nng_msg_free)>; using unique_msg = std::unique_ptr<nng_msg, decltype(&nng_msg_free)>;
inline unique_msg make_unique_msg(nng_msg *msg = nullptr, decltype(&nng_msg_free) deleter = custom_nng_msg_free) inline unique_msg make_unique_msg(nng_msg *msg = nullptr)
{ {
return unique_msg(msg, deleter); return unique_msg(msg, &nng_msg_free);
} }
inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0) inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0)
@ -380,13 +309,13 @@ inline unique_msg allocate_reserve_message(size_t reserve = 0)
if (auto res = nng_msg_alloc(&msg, 0)) if (auto res = nng_msg_alloc(&msg, 0))
{ {
mesy_nng_error("allocate_reserve_message", res); mnode_nng_error("allocate_reserve_message", res);
return make_unique_msg(); return make_unique_msg();
} }
if (auto res = nng_msg_reserve(msg, reserve)) if (auto res = nng_msg_reserve(msg, reserve))
{ {
mesy_nng_error("allocate_reserve_message", res); mnode_nng_error("allocate_reserve_message", res);
nng_msg_free(msg); nng_msg_free(msg);
return make_unique_msg(); return make_unique_msg();
} }
@ -398,13 +327,13 @@ inline int marry_listen_dial(nng_socket listen, nng_socket dial, const char *url
{ {
if (int res = nng_listen(listen, url, nullptr, 0)) if (int res = nng_listen(listen, url, nullptr, 0))
{ {
mesy_nng_error("nng_listen", res); mnode_nng_error("nng_listen", res);
return res; return res;
} }
if (int res = nng_dial(dial, url, nullptr, 0)) if (int res = nng_dial(dial, url, nullptr, 0))
{ {
mesy_nng_error("nng_dial", res); mnode_nng_error("nng_dial", res);
return res; return res;
} }
@ -416,7 +345,7 @@ inline unique_msg make_message(const std::string &data)
nng_msg *msg = nullptr; nng_msg *msg = nullptr;
if (int res = nng_msg_alloc(&msg, data.size())) if (int res = nng_msg_alloc(&msg, data.size()))
{ {
mesy_nng_error("nng_msg_alloc", res); mnode_nng_error("nng_msg_alloc", res);
return make_unique_msg(); return make_unique_msg();
} }
@ -429,7 +358,7 @@ inline unique_msg clone_message(const nng_msg *msg)
nng_msg *newMsg = nullptr; nng_msg *newMsg = nullptr;
if (int res = nng_msg_dup(&newMsg, msg)) if (int res = nng_msg_dup(&newMsg, msg))
{ {
mesy_nng_error("nng_msg_dup", res); mnode_nng_error("nng_msg_dup", res);
return make_unique_msg(); return make_unique_msg();
} }

View file

@ -22,7 +22,7 @@ set(MVLC_NNG_MNODE_WARN_FLAGS -Wall -Wextra -Wpedantic)
add_library(mesytec-mnode-nng mnode_nng.cc) add_library(mesytec-mnode-nng mnode_nng.cc)
target_include_directories(mesytec-mnode-nng PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>) target_include_directories(mesytec-mnode-nng PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>)
target_link_libraries(mesytec-mnode-nng PUBLIC nng PUBLIC spdlog) target_link_libraries(mesytec-mnode-nng PUBLIC nng PRIVATE spdlog)
target_compile_features(mesytec-mnode-nng PUBLIC cxx_std_17) target_compile_features(mesytec-mnode-nng PUBLIC cxx_std_17)
add_library(mesytec-mnode-dev INTERFACE) add_library(mesytec-mnode-dev INTERFACE)

View file

@ -1,5 +1,6 @@
#include <future> #include <future>
#include <mesytec-mnode/mnode_nng.h> #include <mesytec-mnode/mnode_nng.h>
#include <spdlog/spdlog.h>
using namespace mesytec::mnode::nng; using namespace mesytec::mnode::nng;
@ -184,10 +185,10 @@ int pipe_end_receiver(PipelineElement e, const char *prefix)
auto resultSocket = make_pair_socket(); auto resultSocket = make_pair_socket();
if (auto res = nng_listen(resultSocket, resultUri.c_str(), nullptr, 0)) if (auto res = nng_listen(resultSocket, resultUri.c_str(), nullptr, 0))
mesy_nng_fatal("pipe_end_receiver nng_listen(resultSocket)", res); mnode_nng_fatal("pipe_end_receiver nng_listen(resultSocket)", res);
if (auto res = nng_dial(e.outputSocket, resultUri.c_str(), nullptr, 0)) if (auto res = nng_dial(e.outputSocket, resultUri.c_str(), nullptr, 0))
mesy_nng_fatal("pipe_end_receiver nng_dial(pipe0.output)", res); mnode_nng_fatal("pipe_end_receiver nng_dial(pipe0.output)", res);
bool quit = false; bool quit = false;
@ -218,7 +219,7 @@ int pipe_end_receiver(PipelineElement e, const char *prefix)
spdlog::info("pipe_end exiting"); spdlog::info("pipe_end exiting");
if (auto res = nng_close(resultSocket)) if (auto res = nng_close(resultSocket))
mesy_nng_fatal("pipe_end_receiver nng_close", res); mnode_nng_fatal("pipe_end_receiver nng_close", res);
return 0; return 0;
} }
@ -231,14 +232,14 @@ int main(int argc, char *argv[])
std::vector<PipelineElement> pipe0(10); std::vector<PipelineElement> pipe0(10);
if (auto res = init_inproc_pipeline(pipe0, "pipe0_")) if (auto res = init_inproc_pipeline(pipe0, "pipe0_"))
mesy_nng_fatal("main init_inproc_pipeline", res); mnode_nng_fatal("main init_inproc_pipeline", res);
auto resultSocket = make_pair_socket(); auto resultSocket = make_pair_socket();
if (auto res = nng_listen(resultSocket, "inproc://pipe0_result", nullptr, 0)) if (auto res = nng_listen(resultSocket, "inproc://pipe0_result", nullptr, 0))
mesy_nng_fatal("main nng_listen(resultSocket)", res); mnode_nng_fatal("main nng_listen(resultSocket)", res);
if (auto res = nng_dial(pipe0.back().outputSocket, "inproc://pipe0_result", nullptr, 0)) if (auto res = nng_dial(pipe0.back().outputSocket, "inproc://pipe0_result", nullptr, 0))
mesy_nng_fatal("main nng_dial(pipe0.output)", res); mnode_nng_fatal("main nng_dial(pipe0.output)", res);
std::vector<std::future<int>> pipelineJobs; std::vector<std::future<int>> pipelineJobs;
@ -254,7 +255,7 @@ int main(int argc, char *argv[])
for (int i=0; i<2; ++i) for (int i=0; i<2; ++i)
{ {
if (auto res = pipeline_add_inproc_part(pipe0, "pipe0")) if (auto res = pipeline_add_inproc_part(pipe0, "pipe0"))
mesy_nng_fatal("pipeline_add_inproc_part", res); mnode_nng_fatal("pipeline_add_inproc_part", res);
if (i > 0) // first pipe element is fed from main if (i > 0) // first pipe element is fed from main
pipelineJobs.emplace_back(std::async(std::launch::async, pipe_forwarder, pipe0.back())); pipelineJobs.emplace_back(std::async(std::launch::async, pipe_forwarder, pipe0.back()));
} }
@ -263,10 +264,10 @@ int main(int argc, char *argv[])
nng_socket pipe0PubSocket = make_pub_socket(); nng_socket pipe0PubSocket = make_pub_socket();
if (auto res = nng_listen(pipe0PubSocket, "inproc://pipe0_pub", nullptr, 0)) if (auto res = nng_listen(pipe0PubSocket, "inproc://pipe0_pub", nullptr, 0))
mesy_nng_fatal("nng_listen pipe0_pub", res); mnode_nng_fatal("nng_listen pipe0_pub", res);
if (auto res = pipeline_add_inproc_part(pipe0, "pipe0")) if (auto res = pipeline_add_inproc_part(pipe0, "pipe0"))
mesy_nng_fatal("pipeline_add_inproc_part", res); mnode_nng_fatal("pipeline_add_inproc_part", res);
pipelineJobs.emplace_back(std::async(std::launch::async, pipe_filter_publisher, pipe0.back(), pipe0PubSocket)); pipelineJobs.emplace_back(std::async(std::launch::async, pipe_filter_publisher, pipe0.back(), pipe0PubSocket));
#endif #endif
@ -274,7 +275,7 @@ int main(int argc, char *argv[])
for (int i=0; i<2; ++i) for (int i=0; i<2; ++i)
{ {
if (auto res = pipeline_add_inproc_part(pipe0, "pipe0")) if (auto res = pipeline_add_inproc_part(pipe0, "pipe0"))
mesy_nng_fatal("pipeline_add_inproc_part", res); mnode_nng_fatal("pipeline_add_inproc_part", res);
pipelineJobs.emplace_back(std::async(std::launch::async, pipe_forwarder, pipe0.back())); pipelineJobs.emplace_back(std::async(std::launch::async, pipe_forwarder, pipe0.back()));
} }
@ -284,33 +285,33 @@ int main(int argc, char *argv[])
const char *pipe0ResultUri = "inproc://pipe0_result"; const char *pipe0ResultUri = "inproc://pipe0_result";
if (auto res = nng_listen(pipe0.back().outputSocket, pipe0ResultUri, nullptr, 0)) if (auto res = nng_listen(pipe0.back().outputSocket, pipe0ResultUri, nullptr, 0))
mesy_nng_fatal("main pipeline_end_inproc", res); mnode_nng_fatal("main pipeline_end_inproc", res);
auto resultSocket = make_pair_socket(); auto resultSocket = make_pair_socket();
if (auto res = nng_dial(resultSocket, pipe0ResultUri, nullptr, 0)) if (auto res = nng_dial(resultSocket, pipe0ResultUri, nullptr, 0))
mesy_nng_fatal("main nng_dial(resultSocket)", res); mnode_nng_fatal("main nng_dial(resultSocket)", res);
#endif #endif
#endif #endif
nng_msg *outMsg = nullptr; nng_msg *outMsg = nullptr;
if (auto res = nng_msg_alloc(&outMsg, 0)) if (auto res = nng_msg_alloc(&outMsg, 0))
mesy_nng_fatal("main msg alloc", res); mnode_nng_fatal("main msg alloc", res);
if (auto res = nng_msg_append_u32(outMsg, 0xdeadbeef)) if (auto res = nng_msg_append_u32(outMsg, 0xdeadbeef))
mesy_nng_fatal("main msg append", res); mnode_nng_fatal("main msg append", res);
if (auto res = send_message_retry(pipe0.front().outputSocket, outMsg, 0, "main input")) if (auto res = send_message_retry(pipe0.front().outputSocket, outMsg, 0, "main input"))
mesy_nng_fatal("main send_message_retry", res); mnode_nng_fatal("main send_message_retry", res);
outMsg = nullptr; outMsg = nullptr;
spdlog::info("main sent first message"); spdlog::info("main sent first message");
if (auto res = nng_msg_alloc(&outMsg, 0)) if (auto res = nng_msg_alloc(&outMsg, 0))
mesy_nng_fatal("main msg alloc", res); mnode_nng_fatal("main msg alloc", res);
if (auto res = send_message_retry(pipe0.front().outputSocket, outMsg, 0, "main input")) if (auto res = send_message_retry(pipe0.front().outputSocket, outMsg, 0, "main input"))
mesy_nng_fatal("main send_message_retry", res); mnode_nng_fatal("main send_message_retry", res);
outMsg = nullptr; outMsg = nullptr;
@ -327,7 +328,7 @@ int main(int argc, char *argv[])
{ {
if (res != NNG_ETIMEDOUT) if (res != NNG_ETIMEDOUT)
{ {
mesy_nng_fatal("main receive_message", res); mnode_nng_fatal("main receive_message", res);
return res; return res;
} }
@ -376,7 +377,7 @@ int main(int argc, char *argv[])
} }
if (auto res = close_pipeline(pipe0)) if (auto res = close_pipeline(pipe0))
mesy_nng_fatal("main close_pipeline", res); mnode_nng_fatal("main close_pipeline", res);
return ret; return ret;
} }

View file

@ -11,10 +11,10 @@ int main()
nng_socket pubSocket = NNG_SOCKET_INITIALIZER; nng_socket pubSocket = NNG_SOCKET_INITIALIZER;
if (int res = nng_pub0_open(&pubSocket)) if (int res = nng_pub0_open(&pubSocket))
mesy_nng_fatal("nng_pub0_open", res); mnode_nng_fatal("nng_pub0_open", res);
if (int res = nng_listen(pubSocket, "tcp://*:42777", nullptr, 0)) if (int res = nng_listen(pubSocket, "tcp://*:42777", nullptr, 0))
mesy_nng_fatal("nng_listen tcp", res); mnode_nng_fatal("nng_listen tcp", res);
while (true) while (true)
{ {
@ -27,10 +27,10 @@ int main()
nng_msg *msg = nullptr; nng_msg *msg = nullptr;
if (int res = nng_msg_alloc(&msg, size)) if (int res = nng_msg_alloc(&msg, size))
mesy_nng_fatal("nng_msg_alloc", res); mnode_nng_fatal("nng_msg_alloc", res);
if (auto res = nng_sendmsg(pubSocket, msg, 0)) if (auto res = nng_sendmsg(pubSocket, msg, 0))
mesy_nng_fatal("nng_sendmsg", res); mnode_nng_fatal("nng_sendmsg", res);
} }
return 0; return 0;

View file

@ -5,6 +5,7 @@
#include "test_producer_consumer.h" #include "test_producer_consumer.h"
#include <mesytec-mnode/mnode_nng.h> #include <mesytec-mnode/mnode_nng.h>
#include <spdlog/spdlog.h>
using namespace mesytec::mnode::nng; using namespace mesytec::mnode::nng;
@ -28,13 +29,13 @@ static void push_producer(nng_socket socket, unsigned id)
nng_msg *msg = nullptr; nng_msg *msg = nullptr;
if (int res = nng_msg_alloc(&msg, BufferSize)) if (int res = nng_msg_alloc(&msg, BufferSize))
mesy_nng_fatal("nng_msg_alloc", res); mnode_nng_fatal("nng_msg_alloc", res);
std::memset(nng_msg_body(msg), 0, BufferSize); std::memset(nng_msg_body(msg), 0, BufferSize);
*reinterpret_cast<Header *>(nng_msg_body(msg)) = header; *reinterpret_cast<Header *>(nng_msg_body(msg)) = header;
if (auto res = nng_sendmsg(socket, msg, 0)) if (auto res = nng_sendmsg(socket, msg, 0))
mesy_nng_fatal("nng_sendmsg", res); mnode_nng_fatal("nng_sendmsg", res);
totalBytes += BufferSize; totalBytes += BufferSize;
++header.bufferNumber; ++header.bufferNumber;
@ -82,7 +83,7 @@ static void pull_consumer(nng_socket socket)
if (auto res = receive_message(socket, &msg)) if (auto res = receive_message(socket, &msg))
{ {
if (res != NNG_ETIMEDOUT) if (res != NNG_ETIMEDOUT)
mesy_nng_fatal("receive_message", res); mnode_nng_fatal("receive_message", res);
else else
spdlog::warn("consumer timed out in recv"); spdlog::warn("consumer timed out in recv");
} }
@ -157,18 +158,18 @@ int main(int argc, char *argv[])
nng_socket consumerSocket = NNG_SOCKET_INITIALIZER; nng_socket consumerSocket = NNG_SOCKET_INITIALIZER;
if (int res = nng_pull0_open(&consumerSocket)) if (int res = nng_pull0_open(&consumerSocket))
mesy_nng_fatal("nng_pull0_open", res); mnode_nng_fatal("nng_pull0_open", res);
if (int res = nng_listen(consumerSocket, "inproc://pushpull", nullptr, 0)) if (int res = nng_listen(consumerSocket, "inproc://pushpull", nullptr, 0))
mesy_nng_fatal("nng_listen inproc", res); mnode_nng_fatal("nng_listen inproc", res);
nng_socket producerSocket = NNG_SOCKET_INITIALIZER; nng_socket producerSocket = NNG_SOCKET_INITIALIZER;
if (int res = nng_push0_open(&producerSocket)) if (int res = nng_push0_open(&producerSocket))
mesy_nng_fatal("nng_push0_open", res); mnode_nng_fatal("nng_push0_open", res);
if (int res = nng_dial(producerSocket, "inproc://pushpull", nullptr, 0)) if (int res = nng_dial(producerSocket, "inproc://pushpull", nullptr, 0))
mesy_nng_fatal("nng_dial inproc", res); mnode_nng_fatal("nng_dial inproc", res);
std::vector<std::thread> threads; std::vector<std::thread> threads;
@ -187,10 +188,10 @@ int main(int argc, char *argv[])
if (t.joinable()) t.join(); if (t.joinable()) t.join();
if (int res = nng_close(consumerSocket)) if (int res = nng_close(consumerSocket))
mesy_nng_fatal("nng_close", res); mnode_nng_fatal("nng_close", res);
if (int res = nng_close(producerSocket)) if (int res = nng_close(producerSocket))
mesy_nng_fatal("nng_close", res); mnode_nng_fatal("nng_close", res);
return 0; return 0;
} }

View file

@ -10,16 +10,16 @@ int main(int argc, char *argv[])
nng_socket subSocket = NNG_SOCKET_INITIALIZER; nng_socket subSocket = NNG_SOCKET_INITIALIZER;
if (int res = nng_sub0_open(&subSocket)) if (int res = nng_sub0_open(&subSocket))
mesy_nng_fatal("nng_sub0_open", res); mnode_nng_fatal("nng_sub0_open", res);
//if (int res = nng_socket_set_string(subSocket, NNG_OPT_SUB_SUBSCRIBE, "")) //if (int res = nng_socket_set_string(subSocket, NNG_OPT_SUB_SUBSCRIBE, ""))
// mesy_nng_fatal("consumer socket subscribe", res); // mnode_nng_fatal("consumer socket subscribe", res);
if (int res = nng_socket_set(subSocket, NNG_OPT_SUB_SUBSCRIBE, nullptr, 0)) if (int res = nng_socket_set(subSocket, NNG_OPT_SUB_SUBSCRIBE, nullptr, 0))
mesy_nng_fatal("consumer socket subscribe", res); mnode_nng_fatal("consumer socket subscribe", res);
if (int res = nng_dial(subSocket, "tcp://127.0.0.1:42777", nullptr, 0)) if (int res = nng_dial(subSocket, "tcp://127.0.0.1:42777", nullptr, 0))
mesy_nng_fatal("nng_dial tcp", res); mnode_nng_fatal("nng_dial tcp", res);
while (true) while (true)
{ {
@ -29,7 +29,7 @@ int main(int argc, char *argv[])
if (auto res = receive_message(subSocket, &msg)) if (auto res = receive_message(subSocket, &msg))
{ {
if (res != NNG_ETIMEDOUT) if (res != NNG_ETIMEDOUT)
mesy_nng_fatal("receive_message", res); mnode_nng_fatal("receive_message", res);
spdlog::warn("consumer timed out in recv"); spdlog::warn("consumer timed out in recv");
continue; continue;
} }

View file

@ -1,8 +1,90 @@
#include <mesytec-mnode/mnode_nng.h> #include <mesytec-mnode/mnode_nng.h>
#include <spdlog/spdlog.h>
namespace mesytec::mnode::nng namespace mesytec::mnode::nng
{ {
void mnode_nng_fatal(const char *const msg, int rv)
{
spdlog::error("{} ({})", msg, nng_strerror(rv));
std::abort();
}
void mnode_nng_error(const std::string &msg, int rv)
{
spdlog::error("{} ({})", msg, nng_strerror(rv));
}
void log_socket_info(nng_socket s, const std::string &info_text)
{
auto sockName = socket_get_string_opt(s, NNG_OPT_SOCKNAME);
auto localAddress = socket_get_string_opt(s, NNG_OPT_LOCADDR);
auto remoteAddress = socket_get_string_opt(s, NNG_OPT_REMADDR);
spdlog::info("{}: {}={}", info_text, NNG_OPT_SOCKNAME, sockName);
spdlog::info("{}: {}={}", info_text, NNG_OPT_LOCADDR, localAddress);
spdlog::info("{}: {}={}", info_text, NNG_OPT_REMADDR, remoteAddress);
}
void log_pipe_info(nng_pipe p, const char *info_text)
{
auto sockName = pipe_get_string_opt(p, NNG_OPT_SOCKNAME);
auto localAddress = pipe_get_string_opt(p, NNG_OPT_LOCADDR);
auto remoteAddress = pipe_get_string_opt(p, NNG_OPT_REMADDR);
spdlog::info("{}: {}={}", info_text, NNG_OPT_SOCKNAME, sockName);
spdlog::info("{}: {}={}", info_text, NNG_OPT_LOCADDR, localAddress);
spdlog::info("{}: {}={}", info_text, NNG_OPT_REMADDR, remoteAddress);
}
template<typename Value>
std::string format_stat(int type, const char *name, const char *desc, std::uint64_t ts, Value value, int unit)
{
return fmt::format("type={}, name={}, desc={}, ts={}, value={}, unit={}",
nng::nng_stat_type_to_string(type),
name, desc, ts, value,
nng::nng_stat_unit_to_string(unit));
}
std::string format_stat(nng_stat *stat)
{
switch (nng_stat_type(stat))
{
case NNG_STAT_BOOLEAN:
return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat));
default:
return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat));
}
}
int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp, const char *debugInfo)
{
int res = 0;
do
{
res = nng_sendmsg(socket, msg, 0);
if (res)
{
if (res != NNG_ETIMEDOUT)
{
spdlog::warn("send_message_retry: {} - send failed: {} (msg={})", debugInfo, nng_strerror(res), fmt::ptr(msg));
return res;
}
if (res == NNG_ETIMEDOUT)
spdlog::trace("send_message_retry: {} - send timeout (msg={})", debugInfo, fmt::ptr(msg));
if (!rp(res))
return res;
}
} while (res == NNG_ETIMEDOUT);
return 0;
}
nng_socket make_pair_socket(nng_duration timeout) nng_socket make_pair_socket(nng_duration timeout)
{ {
return make_socket(nng_pair0_open, timeout); return make_socket(nng_pair0_open, timeout);

View file

@ -43,7 +43,7 @@ class Work
case SEND: case SEND:
if (auto rv = nng_aio_result(aio)) if (auto rv = nng_aio_result(aio))
{ {
nng::mesy_nng_error("nng_ctx_send", rv); nng::mnode_nng_error("nng_ctx_send", rv);
nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); nng::make_unique_msg(nng_aio_get_msg(aio)).reset();
state = SEND; state = SEND;
request_ = make_request(); request_ = make_request();
@ -60,7 +60,7 @@ class Work
case RECEIVE: case RECEIVE:
if (auto rv = nng_aio_result(aio)) if (auto rv = nng_aio_result(aio))
{ {
nng::mesy_nng_error("nng_ctx_recv", rv); nng::mnode_nng_error("nng_ctx_recv", rv);
state = SEND; state = SEND;
request_ = make_request(); request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release()); nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
@ -137,7 +137,7 @@ int main()
if (int res = nng_dial(socket, "tcp://localhost:5555", nullptr, 0)) if (int res = nng_dial(socket, "tcp://localhost:5555", nullptr, 0))
{ {
nng::mesy_nng_error("nng_dial", res); nng::mnode_nng_error("nng_dial", res);
return res; return res;
} }

View file

@ -42,7 +42,7 @@ class Work
if (auto rv = nng_aio_result(aio)) if (auto rv = nng_aio_result(aio))
{ {
if (rv != NNG_ETIMEDOUT) if (rv != NNG_ETIMEDOUT)
nng::mesy_nng_error("nng_ctx_recv", rv); nng::mnode_nng_error("nng_ctx_recv", rv);
if (rv < static_cast<int>(recvErrors_.size())) if (rv < static_cast<int>(recvErrors_.size()))
++recvErrors_[rv]; ++recvErrors_[rv];
state = RECEIVE; state = RECEIVE;
@ -61,7 +61,7 @@ class Work
case SEND: case SEND:
if (auto rv = nng_aio_result(aio)) if (auto rv = nng_aio_result(aio))
{ {
nng::mesy_nng_error("nng_ctx_send", rv); nng::mnode_nng_error("nng_ctx_send", rv);
if (rv < static_cast<int>(sendErrors_.size())) if (rv < static_cast<int>(sendErrors_.size()))
++sendErrors_[rv]; ++sendErrors_[rv];
nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); nng::make_unique_msg(nng_aio_get_msg(aio)).reset();
@ -120,7 +120,7 @@ int main()
if (int res = nng_listen(socket, "tcp://localhost:5555", nullptr, 0)) if (int res = nng_listen(socket, "tcp://localhost:5555", nullptr, 0))
{ {
nng::mesy_nng_error("nng_listen", res); nng::mnode_nng_error("nng_listen", res);
return res; return res;
} }

View file

@ -26,7 +26,7 @@ void requester(nng_socket socket, unsigned id)
auto reqMsg = nng::make_message(ping.SerializeAsString()); auto reqMsg = nng::make_message(ping.SerializeAsString());
if (int rc = nng_sendmsg(socket, reqMsg.release(), 0)) if (int rc = nng_sendmsg(socket, reqMsg.release(), 0))
{ {
nng::mesy_nng_error("req: nng_sendmsg", rc); nng::mnode_nng_error("req: nng_sendmsg", rc);
if (rc != NNG_ETIMEDOUT) if (rc != NNG_ETIMEDOUT)
return; return;
continue; continue;
@ -35,7 +35,7 @@ void requester(nng_socket socket, unsigned id)
auto [repMsg, rc] = nng::receive_message(socket); auto [repMsg, rc] = nng::receive_message(socket);
if (rc) if (rc)
{ {
nng::mesy_nng_error("requester: nng_recvmsg", rc); nng::mnode_nng_error("requester: nng_recvmsg", rc);
if (rc != NNG_ETIMEDOUT) if (rc != NNG_ETIMEDOUT)
return; return;
continue; continue;
@ -62,7 +62,7 @@ void responder(nng_socket socket, unsigned id)
auto [reqMsg, rc] = nng::receive_message(socket); auto [reqMsg, rc] = nng::receive_message(socket);
if (rc) if (rc)
{ {
nng::mesy_nng_error("responder: nng_recvmsg", rc); nng::mnode_nng_error("responder: nng_recvmsg", rc);
if (rc != NNG_ETIMEDOUT) if (rc != NNG_ETIMEDOUT)
return; return;
continue; continue;
@ -75,7 +75,7 @@ void responder(nng_socket socket, unsigned id)
auto repMsg = nng::make_message(pong.SerializeAsString()); auto repMsg = nng::make_message(pong.SerializeAsString());
if (int rc = nng_sendmsg(socket, repMsg.release(), 0)) if (int rc = nng_sendmsg(socket, repMsg.release(), 0))
{ {
nng::mesy_nng_error("responder: nng_sendmsg", rc); nng::mnode_nng_error("responder: nng_sendmsg", rc);
if (rc != NNG_ETIMEDOUT) if (rc != NNG_ETIMEDOUT)
return; return;
continue; continue;

View file

@ -715,28 +715,28 @@ int main(int argc, char *argv[])
for (auto event: { NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST }) for (auto event: { NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST })
{ {
if (int res = nng_pipe_notify(socket, event, pipe_cb, nullptr)) if (int res = nng_pipe_notify(socket, event, pipe_cb, nullptr))
mesy_nng_fatal("nng_pipe_notify", res); mnode_nng_fatal("nng_pipe_notify", res);
} }
} }
if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0)) if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0))
mesy_nng_fatal("nng_listen inproc", res); mnode_nng_fatal("nng_listen inproc", res);
log_socket_info(readerOutputSocket, "readerOutputSocket"); log_socket_info(readerOutputSocket, "readerOutputSocket");
if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0)) if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0))
mesy_nng_fatal("nng_dial inproc", res); mnode_nng_fatal("nng_dial inproc", res);
log_socket_info(parserInputSocket, "parserInputSocket"); log_socket_info(parserInputSocket, "parserInputSocket");
if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0)) if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0))
mesy_nng_fatal("nng_listen inproc", res); mnode_nng_fatal("nng_listen inproc", res);
if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0)) if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0))
mesy_nng_fatal("nng_dial inproc", res); mnode_nng_fatal("nng_dial inproc", res);
spdlog::info("replaying from {}", zipReader.firstListfileEntryName()); spdlog::info("replaying from {}", zipReader.firstListfileEntryName());
auto preamble = mvlc::listfile::read_preamble(*readHandle); auto preamble = mvlc::listfile::read_preamble(*readHandle);
@ -774,7 +774,7 @@ int main(int argc, char *argv[])
for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket }) for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket })
if (auto res = nng_close(socket)) if (auto res = nng_close(socket))
mesy_nng_fatal("nng_close", res); mnode_nng_fatal("nng_close", res);
} }
catch(const std::exception& e) catch(const std::exception& e)
{ {

View file

@ -6,12 +6,12 @@ int main()
auto consumerSocket = make_pair_socket(); auto consumerSocket = make_pair_socket();
if (int res = nng_dial(consumerSocket, "tcp6://[::]:41234", nullptr, 0)) if (int res = nng_dial(consumerSocket, "tcp6://[::]:41234", nullptr, 0))
mesy_nng_fatal("nng_dial", res); mnode_nng_fatal("nng_dial", res);
consumer(consumerSocket); consumer(consumerSocket);
if (int res = nng_close(consumerSocket)) if (int res = nng_close(consumerSocket))
mesy_nng_fatal("nng_close", res); mnode_nng_fatal("nng_close", res);
return 0; return 0;
} }

View file

@ -8,12 +8,12 @@ int main(int argc, char *argv[])
auto producerSocket = make_pair_socket(); auto producerSocket = make_pair_socket();
if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0)) if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0))
mesy_nng_fatal("nng_listen inproc", res); mnode_nng_fatal("nng_listen inproc", res);
auto consumerSocket = make_pair_socket(); auto consumerSocket = make_pair_socket();
if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0)) if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0))
mesy_nng_fatal("nng_dial", res); mnode_nng_fatal("nng_dial", res);
std::vector<std::thread> threads; std::vector<std::thread> threads;
@ -24,10 +24,10 @@ int main(int argc, char *argv[])
if (t.joinable()) t.join(); if (t.joinable()) t.join();
if (int res = nng_close(consumerSocket)) if (int res = nng_close(consumerSocket))
mesy_nng_fatal("nng_close", res); mnode_nng_fatal("nng_close", res);
if (int res = nng_close(producerSocket)) if (int res = nng_close(producerSocket))
mesy_nng_fatal("nng_close", res); mnode_nng_fatal("nng_close", res);
return 0; return 0;
} }

View file

@ -6,12 +6,12 @@ int main(int argc, char *argv[])
auto producerSocket = make_pair_socket(); auto producerSocket = make_pair_socket();
if (int res = nng_listen(producerSocket, "tcp6://*:41234", nullptr, 0)) if (int res = nng_listen(producerSocket, "tcp6://*:41234", nullptr, 0))
mesy_nng_fatal("nng_listen tcp", res); mnode_nng_fatal("nng_listen tcp", res);
producer(producerSocket); producer(producerSocket);
if (int res = nng_close(producerSocket)) if (int res = nng_close(producerSocket))
mesy_nng_fatal("nng_close", res); mnode_nng_fatal("nng_close", res);
return 0; return 0;
} }

View file

@ -2,6 +2,7 @@
#define E6EFFE63_EB2C_4573_B723_61840850BBF6 #define E6EFFE63_EB2C_4573_B723_61840850BBF6
#include <mesytec-mnode/mnode_nng.h> #include <mesytec-mnode/mnode_nng.h>
#include <spdlog/spdlog.h>
using namespace mesytec::mnode::nng; // to make the old code compile. for test code only. using namespace mesytec::mnode::nng; // to make the old code compile. for test code only.
@ -18,7 +19,7 @@ inline void producer(nng_socket socket)
if (auto res = receive_message(socket, &msg)) if (auto res = receive_message(socket, &msg))
{ {
if (res != NNG_ETIMEDOUT) if (res != NNG_ETIMEDOUT)
mesy_nng_fatal("receive_message", res); mnode_nng_fatal("receive_message", res);
} }
else else
{ {
@ -36,13 +37,13 @@ inline void producer(nng_socket socket)
nng_msg *msg = nullptr; nng_msg *msg = nullptr;
if (int res = nng_msg_alloc(&msg, BufferSize)) if (int res = nng_msg_alloc(&msg, BufferSize))
mesy_nng_fatal("nng_msg_alloc", res); mnode_nng_fatal("nng_msg_alloc", res);
std::memset(nng_msg_body(msg), 0, BufferSize); std::memset(nng_msg_body(msg), 0, BufferSize);
*reinterpret_cast<size_t *>(nng_msg_body(msg)) = nbuf; *reinterpret_cast<size_t *>(nng_msg_body(msg)) = nbuf;
if (auto res = nng_sendmsg(socket, msg, 0)) if (auto res = nng_sendmsg(socket, msg, 0))
mesy_nng_fatal("nng_sendmsg", res); mnode_nng_fatal("nng_sendmsg", res);
totalBytes += BufferSize; totalBytes += BufferSize;
@ -53,7 +54,7 @@ inline void producer(nng_socket socket)
spdlog::trace("producer sending 'quit' message"); spdlog::trace("producer sending 'quit' message");
auto msg = alloc_message(0); auto msg = alloc_message(0);
if (auto res = nng_sendmsg(socket, msg, 0)) if (auto res = nng_sendmsg(socket, msg, 0))
mesy_nng_fatal("nng_sendmsg", res); mnode_nng_fatal("nng_sendmsg", res);
} }
auto tEnd = std::chrono::steady_clock::now(); auto tEnd = std::chrono::steady_clock::now();
@ -71,7 +72,7 @@ inline void consumer(nng_socket socket)
auto msg = alloc_message(0); auto msg = alloc_message(0);
if (auto res = nng_sendmsg(socket, msg, 0)) if (auto res = nng_sendmsg(socket, msg, 0))
mesy_nng_fatal("nng_sendmsg", res); mnode_nng_fatal("nng_sendmsg", res);
} }
spdlog::info("consumer ready to receive"); spdlog::info("consumer ready to receive");
@ -87,7 +88,7 @@ inline void consumer(nng_socket socket)
if (auto res = receive_message(socket, &msg)) if (auto res = receive_message(socket, &msg))
{ {
if (res != NNG_ETIMEDOUT) if (res != NNG_ETIMEDOUT)
mesy_nng_fatal("receive_message", res); mnode_nng_fatal("receive_message", res);
else else
spdlog::warn("consumer timed out in recv"); spdlog::warn("consumer timed out in recv");
} }