diff --git a/include/mesytec-mnode/mnode_nng.h b/include/mesytec-mnode/mnode_nng.h index c71fc25..38d6772 100644 --- a/include/mesytec-mnode/mnode_nng.h +++ b/include/mesytec-mnode/mnode_nng.h @@ -10,30 +10,26 @@ #include #include -#include - +#include +#include +#include +#include +#include #include +#include namespace mesytec::mnode::nng { -inline void mesy_nng_fatal(const char *const msg, int rv) -{ - spdlog::error("{} ({})", msg, nng_strerror(rv)); - std::abort(); -} - -inline void mesy_nng_error(const std::string &msg, int rv) -{ - spdlog::error("{} ({})", msg, nng_strerror(rv)); -} +void mnode_nng_fatal(const char *const msg, int rv); +void mnode_nng_error(const std::string &msg, int rv); inline nng_msg *alloc_message(size_t size) { nng_msg *msg = {}; if (int res = nng_msg_alloc(&msg, size)) { - mesy_nng_error("nng_msg_alloc", res); + mnode_nng_error("nng_msg_alloc", res); return nullptr; } return msg; @@ -57,13 +53,13 @@ inline int allocate_reserve_message(nng_msg **msg, size_t reserve = 0) if (auto res = nng_msg_alloc(msg, 0)) { - mesy_nng_error("nng_msg_alloc", res); + mnode_nng_error("nng_msg_alloc", res); return res; } if (auto res = nng_msg_reserve(*msg, reserve)) { - mesy_nng_error("nng_msg_reserve", res); + mnode_nng_error("nng_msg_reserve", res); return res; } @@ -84,13 +80,13 @@ inline int set_socket_timeouts(nng_socket socket, nng_duration timeout = Default { if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout))) { - mesy_nng_error("nng_socket_set", res); + mnode_nng_error("nng_socket_set", res); return res; } if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout))) { - mesy_nng_error("nng_socket_set", res); + mnode_nng_error("nng_socket_set", res); return res; } @@ -105,7 +101,7 @@ inline nng_socket make_socket(socket_factory factory, nng_duration timeout = Def if (int res = factory(&socket)) { - mesy_nng_error("make_socket", res); + mnode_nng_error("make_socket", res); return NNG_SOCKET_INITIALIZER; } @@ -153,35 +149,17 @@ inline std::string pipe_get_string_opt(nng_pipe p, const char *opt) return result; } -inline void log_socket_info(nng_socket s, const char *info) -{ - auto sockName = socket_get_string_opt(s, NNG_OPT_SOCKNAME); - auto localAddress = socket_get_string_opt(s, NNG_OPT_LOCADDR); - auto remoteAddress = socket_get_string_opt(s, NNG_OPT_REMADDR); - - spdlog::info("{}: {}={}", info, NNG_OPT_SOCKNAME, sockName); - spdlog::info("{}: {}={}", info, NNG_OPT_LOCADDR, localAddress); - spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress); -} +void log_socket_info(nng_socket s, const std::string &info_text); inline std::string get_local_address(nng_socket s) { return socket_get_string_opt(s, NNG_OPT_LOCADDR); } -inline void log_pipe_info(nng_pipe p, const char *info) -{ - auto sockName = pipe_get_string_opt(p, NNG_OPT_SOCKNAME); - auto localAddress = pipe_get_string_opt(p, NNG_OPT_LOCADDR); - auto remoteAddress = pipe_get_string_opt(p, NNG_OPT_REMADDR); +void log_pipe_info(nng_pipe p, const char *info_text); - spdlog::info("{}: {}={}", info, NNG_OPT_SOCKNAME, sockName); - spdlog::info("{}: {}={}", info, NNG_OPT_LOCADDR, localAddress); - spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress); -} - -// 'res' is the result of the last nng function call. -using retry_predicate = std::function; +// 'nng_res' is the result of the last nng function call. +using retry_predicate = std::function; class RetryNTimes { @@ -200,32 +178,7 @@ class RetryNTimes size_t attempt_ = 0u; }; -inline int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp, const char *debugInfo = "") -{ - int res = 0; - - do - { - res = nng_sendmsg(socket, msg, 0); - - if (res) - { - if (res != NNG_ETIMEDOUT) - { - spdlog::warn("send_message_retry: {} - send failed: {} (msg={})", debugInfo, nng_strerror(res), fmt::ptr(msg)); - return res; - } - - if (res == NNG_ETIMEDOUT) - spdlog::trace("send_message_retry: {} - send timeout (msg={})", debugInfo, fmt::ptr(msg)); - - if (!rp(res)) - return res; - } - } while (res == NNG_ETIMEDOUT); - - return 0; -} +int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp, const char *debugInfo = ""); inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3, const char *debugInfo = "") { @@ -329,37 +282,13 @@ void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth=0) } } -template -std::string format_stat(int type, const char *name, const char *desc, std::uint64_t ts, Value value, int unit) -{ - return fmt::format("type={}, name={}, desc={}, ts={}, value={}, unit={}", - nng::nng_stat_type_to_string(type), - name, desc, ts, value, - nng::nng_stat_unit_to_string(unit)); -} - -inline std::string format_stat(nng_stat *stat) -{ - switch (nng_stat_type(stat)) - { - case NNG_STAT_BOOLEAN: - return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat)); - default: - return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat)); - } -} - -static void custom_nng_msg_free(nng_msg *msg) -{ - //spdlog::warn("custom_nng_msg_free: msg={}", fmt::ptr(msg)); - nng_msg_free(msg); -} +std::string format_stat(nng_stat *stat); using unique_msg = std::unique_ptr; -inline unique_msg make_unique_msg(nng_msg *msg = nullptr, decltype(&nng_msg_free) deleter = custom_nng_msg_free) +inline unique_msg make_unique_msg(nng_msg *msg = nullptr) { - return unique_msg(msg, deleter); + return unique_msg(msg, &nng_msg_free); } inline std::pair receive_message(nng_socket sock, int flags = 0) @@ -380,13 +309,13 @@ inline unique_msg allocate_reserve_message(size_t reserve = 0) if (auto res = nng_msg_alloc(&msg, 0)) { - mesy_nng_error("allocate_reserve_message", res); + mnode_nng_error("allocate_reserve_message", res); return make_unique_msg(); } if (auto res = nng_msg_reserve(msg, reserve)) { - mesy_nng_error("allocate_reserve_message", res); + mnode_nng_error("allocate_reserve_message", res); nng_msg_free(msg); return make_unique_msg(); } @@ -398,13 +327,13 @@ inline int marry_listen_dial(nng_socket listen, nng_socket dial, const char *url { if (int res = nng_listen(listen, url, nullptr, 0)) { - mesy_nng_error("nng_listen", res); + mnode_nng_error("nng_listen", res); return res; } if (int res = nng_dial(dial, url, nullptr, 0)) { - mesy_nng_error("nng_dial", res); + mnode_nng_error("nng_dial", res); return res; } @@ -416,7 +345,7 @@ inline unique_msg make_message(const std::string &data) nng_msg *msg = nullptr; if (int res = nng_msg_alloc(&msg, data.size())) { - mesy_nng_error("nng_msg_alloc", res); + mnode_nng_error("nng_msg_alloc", res); return make_unique_msg(); } @@ -429,7 +358,7 @@ inline unique_msg clone_message(const nng_msg *msg) nng_msg *newMsg = nullptr; if (int res = nng_msg_dup(&newMsg, msg)) { - mesy_nng_error("nng_msg_dup", res); + mnode_nng_error("nng_msg_dup", res); return make_unique_msg(); } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index dfb2e65..66c83cc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -22,7 +22,7 @@ set(MVLC_NNG_MNODE_WARN_FLAGS -Wall -Wextra -Wpedantic) add_library(mesytec-mnode-nng mnode_nng.cc) target_include_directories(mesytec-mnode-nng PUBLIC $) -target_link_libraries(mesytec-mnode-nng PUBLIC nng PUBLIC spdlog) +target_link_libraries(mesytec-mnode-nng PUBLIC nng PRIVATE spdlog) target_compile_features(mesytec-mnode-nng PUBLIC cxx_std_17) add_library(mesytec-mnode-dev INTERFACE) diff --git a/src/mesy_nng_pipeline_main.cc b/src/mesy_nng_pipeline_main.cc index f85835d..ddcb689 100644 --- a/src/mesy_nng_pipeline_main.cc +++ b/src/mesy_nng_pipeline_main.cc @@ -1,5 +1,6 @@ #include #include +#include using namespace mesytec::mnode::nng; @@ -184,10 +185,10 @@ int pipe_end_receiver(PipelineElement e, const char *prefix) auto resultSocket = make_pair_socket(); if (auto res = nng_listen(resultSocket, resultUri.c_str(), nullptr, 0)) - mesy_nng_fatal("pipe_end_receiver nng_listen(resultSocket)", res); + mnode_nng_fatal("pipe_end_receiver nng_listen(resultSocket)", res); if (auto res = nng_dial(e.outputSocket, resultUri.c_str(), nullptr, 0)) - mesy_nng_fatal("pipe_end_receiver nng_dial(pipe0.output)", res); + mnode_nng_fatal("pipe_end_receiver nng_dial(pipe0.output)", res); bool quit = false; @@ -218,7 +219,7 @@ int pipe_end_receiver(PipelineElement e, const char *prefix) spdlog::info("pipe_end exiting"); if (auto res = nng_close(resultSocket)) - mesy_nng_fatal("pipe_end_receiver nng_close", res); + mnode_nng_fatal("pipe_end_receiver nng_close", res); return 0; } @@ -231,14 +232,14 @@ int main(int argc, char *argv[]) std::vector pipe0(10); if (auto res = init_inproc_pipeline(pipe0, "pipe0_")) - mesy_nng_fatal("main init_inproc_pipeline", res); + mnode_nng_fatal("main init_inproc_pipeline", res); auto resultSocket = make_pair_socket(); if (auto res = nng_listen(resultSocket, "inproc://pipe0_result", nullptr, 0)) - mesy_nng_fatal("main nng_listen(resultSocket)", res); + mnode_nng_fatal("main nng_listen(resultSocket)", res); if (auto res = nng_dial(pipe0.back().outputSocket, "inproc://pipe0_result", nullptr, 0)) - mesy_nng_fatal("main nng_dial(pipe0.output)", res); + mnode_nng_fatal("main nng_dial(pipe0.output)", res); std::vector> pipelineJobs; @@ -254,7 +255,7 @@ int main(int argc, char *argv[]) for (int i=0; i<2; ++i) { if (auto res = pipeline_add_inproc_part(pipe0, "pipe0")) - mesy_nng_fatal("pipeline_add_inproc_part", res); + mnode_nng_fatal("pipeline_add_inproc_part", res); if (i > 0) // first pipe element is fed from main pipelineJobs.emplace_back(std::async(std::launch::async, pipe_forwarder, pipe0.back())); } @@ -263,10 +264,10 @@ int main(int argc, char *argv[]) nng_socket pipe0PubSocket = make_pub_socket(); if (auto res = nng_listen(pipe0PubSocket, "inproc://pipe0_pub", nullptr, 0)) - mesy_nng_fatal("nng_listen pipe0_pub", res); + mnode_nng_fatal("nng_listen pipe0_pub", res); if (auto res = pipeline_add_inproc_part(pipe0, "pipe0")) - mesy_nng_fatal("pipeline_add_inproc_part", res); + mnode_nng_fatal("pipeline_add_inproc_part", res); pipelineJobs.emplace_back(std::async(std::launch::async, pipe_filter_publisher, pipe0.back(), pipe0PubSocket)); #endif @@ -274,7 +275,7 @@ int main(int argc, char *argv[]) for (int i=0; i<2; ++i) { if (auto res = pipeline_add_inproc_part(pipe0, "pipe0")) - mesy_nng_fatal("pipeline_add_inproc_part", res); + mnode_nng_fatal("pipeline_add_inproc_part", res); pipelineJobs.emplace_back(std::async(std::launch::async, pipe_forwarder, pipe0.back())); } @@ -284,33 +285,33 @@ int main(int argc, char *argv[]) const char *pipe0ResultUri = "inproc://pipe0_result"; if (auto res = nng_listen(pipe0.back().outputSocket, pipe0ResultUri, nullptr, 0)) - mesy_nng_fatal("main pipeline_end_inproc", res); + mnode_nng_fatal("main pipeline_end_inproc", res); auto resultSocket = make_pair_socket(); if (auto res = nng_dial(resultSocket, pipe0ResultUri, nullptr, 0)) - mesy_nng_fatal("main nng_dial(resultSocket)", res); + mnode_nng_fatal("main nng_dial(resultSocket)", res); #endif #endif nng_msg *outMsg = nullptr; if (auto res = nng_msg_alloc(&outMsg, 0)) - mesy_nng_fatal("main msg alloc", res); + mnode_nng_fatal("main msg alloc", res); if (auto res = nng_msg_append_u32(outMsg, 0xdeadbeef)) - mesy_nng_fatal("main msg append", res); + mnode_nng_fatal("main msg append", res); if (auto res = send_message_retry(pipe0.front().outputSocket, outMsg, 0, "main input")) - mesy_nng_fatal("main send_message_retry", res); + mnode_nng_fatal("main send_message_retry", res); outMsg = nullptr; spdlog::info("main sent first message"); if (auto res = nng_msg_alloc(&outMsg, 0)) - mesy_nng_fatal("main msg alloc", res); + mnode_nng_fatal("main msg alloc", res); if (auto res = send_message_retry(pipe0.front().outputSocket, outMsg, 0, "main input")) - mesy_nng_fatal("main send_message_retry", res); + mnode_nng_fatal("main send_message_retry", res); outMsg = nullptr; @@ -327,7 +328,7 @@ int main(int argc, char *argv[]) { if (res != NNG_ETIMEDOUT) { - mesy_nng_fatal("main receive_message", res); + mnode_nng_fatal("main receive_message", res); return res; } @@ -376,7 +377,7 @@ int main(int argc, char *argv[]) } if (auto res = close_pipeline(pipe0)) - mesy_nng_fatal("main close_pipeline", res); + mnode_nng_fatal("main close_pipeline", res); return ret; } diff --git a/src/mesy_nng_pub_producer.cc b/src/mesy_nng_pub_producer.cc index 9e04953..c682824 100644 --- a/src/mesy_nng_pub_producer.cc +++ b/src/mesy_nng_pub_producer.cc @@ -11,10 +11,10 @@ int main() nng_socket pubSocket = NNG_SOCKET_INITIALIZER; if (int res = nng_pub0_open(&pubSocket)) - mesy_nng_fatal("nng_pub0_open", res); + mnode_nng_fatal("nng_pub0_open", res); if (int res = nng_listen(pubSocket, "tcp://*:42777", nullptr, 0)) - mesy_nng_fatal("nng_listen tcp", res); + mnode_nng_fatal("nng_listen tcp", res); while (true) { @@ -27,10 +27,10 @@ int main() nng_msg *msg = nullptr; if (int res = nng_msg_alloc(&msg, size)) - mesy_nng_fatal("nng_msg_alloc", res); + mnode_nng_fatal("nng_msg_alloc", res); if (auto res = nng_sendmsg(pubSocket, msg, 0)) - mesy_nng_fatal("nng_sendmsg", res); + mnode_nng_fatal("nng_sendmsg", res); } return 0; diff --git a/src/mesy_nng_push_pull_main.cc b/src/mesy_nng_push_pull_main.cc index 886b90a..7e6b431 100644 --- a/src/mesy_nng_push_pull_main.cc +++ b/src/mesy_nng_push_pull_main.cc @@ -5,6 +5,7 @@ #include "test_producer_consumer.h" #include +#include using namespace mesytec::mnode::nng; @@ -28,13 +29,13 @@ static void push_producer(nng_socket socket, unsigned id) nng_msg *msg = nullptr; if (int res = nng_msg_alloc(&msg, BufferSize)) - mesy_nng_fatal("nng_msg_alloc", res); + mnode_nng_fatal("nng_msg_alloc", res); std::memset(nng_msg_body(msg), 0, BufferSize); *reinterpret_cast
(nng_msg_body(msg)) = header; if (auto res = nng_sendmsg(socket, msg, 0)) - mesy_nng_fatal("nng_sendmsg", res); + mnode_nng_fatal("nng_sendmsg", res); totalBytes += BufferSize; ++header.bufferNumber; @@ -82,7 +83,7 @@ static void pull_consumer(nng_socket socket) if (auto res = receive_message(socket, &msg)) { if (res != NNG_ETIMEDOUT) - mesy_nng_fatal("receive_message", res); + mnode_nng_fatal("receive_message", res); else spdlog::warn("consumer timed out in recv"); } @@ -157,18 +158,18 @@ int main(int argc, char *argv[]) nng_socket consumerSocket = NNG_SOCKET_INITIALIZER; if (int res = nng_pull0_open(&consumerSocket)) - mesy_nng_fatal("nng_pull0_open", res); + mnode_nng_fatal("nng_pull0_open", res); if (int res = nng_listen(consumerSocket, "inproc://pushpull", nullptr, 0)) - mesy_nng_fatal("nng_listen inproc", res); + mnode_nng_fatal("nng_listen inproc", res); nng_socket producerSocket = NNG_SOCKET_INITIALIZER; if (int res = nng_push0_open(&producerSocket)) - mesy_nng_fatal("nng_push0_open", res); + mnode_nng_fatal("nng_push0_open", res); if (int res = nng_dial(producerSocket, "inproc://pushpull", nullptr, 0)) - mesy_nng_fatal("nng_dial inproc", res); + mnode_nng_fatal("nng_dial inproc", res); std::vector threads; @@ -187,10 +188,10 @@ int main(int argc, char *argv[]) if (t.joinable()) t.join(); if (int res = nng_close(consumerSocket)) - mesy_nng_fatal("nng_close", res); + mnode_nng_fatal("nng_close", res); if (int res = nng_close(producerSocket)) - mesy_nng_fatal("nng_close", res); + mnode_nng_fatal("nng_close", res); return 0; } diff --git a/src/mesy_nng_sub_consumer.cc b/src/mesy_nng_sub_consumer.cc index 55e07d6..85fbb0d 100644 --- a/src/mesy_nng_sub_consumer.cc +++ b/src/mesy_nng_sub_consumer.cc @@ -10,16 +10,16 @@ int main(int argc, char *argv[]) nng_socket subSocket = NNG_SOCKET_INITIALIZER; if (int res = nng_sub0_open(&subSocket)) - mesy_nng_fatal("nng_sub0_open", res); + mnode_nng_fatal("nng_sub0_open", res); //if (int res = nng_socket_set_string(subSocket, NNG_OPT_SUB_SUBSCRIBE, "")) - // mesy_nng_fatal("consumer socket subscribe", res); + // mnode_nng_fatal("consumer socket subscribe", res); if (int res = nng_socket_set(subSocket, NNG_OPT_SUB_SUBSCRIBE, nullptr, 0)) - mesy_nng_fatal("consumer socket subscribe", res); + mnode_nng_fatal("consumer socket subscribe", res); if (int res = nng_dial(subSocket, "tcp://127.0.0.1:42777", nullptr, 0)) - mesy_nng_fatal("nng_dial tcp", res); + mnode_nng_fatal("nng_dial tcp", res); while (true) { @@ -29,7 +29,7 @@ int main(int argc, char *argv[]) if (auto res = receive_message(subSocket, &msg)) { if (res != NNG_ETIMEDOUT) - mesy_nng_fatal("receive_message", res); + mnode_nng_fatal("receive_message", res); spdlog::warn("consumer timed out in recv"); continue; } diff --git a/src/mnode_nng.cc b/src/mnode_nng.cc index d1c097e..9f2eedf 100644 --- a/src/mnode_nng.cc +++ b/src/mnode_nng.cc @@ -1,8 +1,90 @@ #include +#include + namespace mesytec::mnode::nng { +void mnode_nng_fatal(const char *const msg, int rv) +{ + spdlog::error("{} ({})", msg, nng_strerror(rv)); + std::abort(); +} + +void mnode_nng_error(const std::string &msg, int rv) +{ + spdlog::error("{} ({})", msg, nng_strerror(rv)); +} + +void log_socket_info(nng_socket s, const std::string &info_text) +{ + auto sockName = socket_get_string_opt(s, NNG_OPT_SOCKNAME); + auto localAddress = socket_get_string_opt(s, NNG_OPT_LOCADDR); + auto remoteAddress = socket_get_string_opt(s, NNG_OPT_REMADDR); + + spdlog::info("{}: {}={}", info_text, NNG_OPT_SOCKNAME, sockName); + spdlog::info("{}: {}={}", info_text, NNG_OPT_LOCADDR, localAddress); + spdlog::info("{}: {}={}", info_text, NNG_OPT_REMADDR, remoteAddress); +} + +void log_pipe_info(nng_pipe p, const char *info_text) +{ + auto sockName = pipe_get_string_opt(p, NNG_OPT_SOCKNAME); + auto localAddress = pipe_get_string_opt(p, NNG_OPT_LOCADDR); + auto remoteAddress = pipe_get_string_opt(p, NNG_OPT_REMADDR); + + spdlog::info("{}: {}={}", info_text, NNG_OPT_SOCKNAME, sockName); + spdlog::info("{}: {}={}", info_text, NNG_OPT_LOCADDR, localAddress); + spdlog::info("{}: {}={}", info_text, NNG_OPT_REMADDR, remoteAddress); +} + +template +std::string format_stat(int type, const char *name, const char *desc, std::uint64_t ts, Value value, int unit) +{ + return fmt::format("type={}, name={}, desc={}, ts={}, value={}, unit={}", + nng::nng_stat_type_to_string(type), + name, desc, ts, value, + nng::nng_stat_unit_to_string(unit)); +} + +std::string format_stat(nng_stat *stat) +{ + switch (nng_stat_type(stat)) + { + case NNG_STAT_BOOLEAN: + return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat)); + default: + return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat)); + } +} + +int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp, const char *debugInfo) +{ + int res = 0; + + do + { + res = nng_sendmsg(socket, msg, 0); + + if (res) + { + if (res != NNG_ETIMEDOUT) + { + spdlog::warn("send_message_retry: {} - send failed: {} (msg={})", debugInfo, nng_strerror(res), fmt::ptr(msg)); + return res; + } + + if (res == NNG_ETIMEDOUT) + spdlog::trace("send_message_retry: {} - send timeout (msg={})", debugInfo, fmt::ptr(msg)); + + if (!rp(res)) + return res; + } + } while (res == NNG_ETIMEDOUT); + + return 0; +} + nng_socket make_pair_socket(nng_duration timeout) { return make_socket(nng_pair0_open, timeout); diff --git a/src/mnode_proto_ping_client.cc b/src/mnode_proto_ping_client.cc index 84c8962..618e1ab 100644 --- a/src/mnode_proto_ping_client.cc +++ b/src/mnode_proto_ping_client.cc @@ -43,7 +43,7 @@ class Work case SEND: if (auto rv = nng_aio_result(aio)) { - nng::mesy_nng_error("nng_ctx_send", rv); + nng::mnode_nng_error("nng_ctx_send", rv); nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); state = SEND; request_ = make_request(); @@ -60,7 +60,7 @@ class Work case RECEIVE: if (auto rv = nng_aio_result(aio)) { - nng::mesy_nng_error("nng_ctx_recv", rv); + nng::mnode_nng_error("nng_ctx_recv", rv); state = SEND; request_ = make_request(); nng_aio_set_msg(aio, nng::clone_message(request_.get()).release()); @@ -137,7 +137,7 @@ int main() if (int res = nng_dial(socket, "tcp://localhost:5555", nullptr, 0)) { - nng::mesy_nng_error("nng_dial", res); + nng::mnode_nng_error("nng_dial", res); return res; } diff --git a/src/mnode_proto_ping_server.cc b/src/mnode_proto_ping_server.cc index 0ac204e..49e8c67 100644 --- a/src/mnode_proto_ping_server.cc +++ b/src/mnode_proto_ping_server.cc @@ -42,7 +42,7 @@ class Work if (auto rv = nng_aio_result(aio)) { if (rv != NNG_ETIMEDOUT) - nng::mesy_nng_error("nng_ctx_recv", rv); + nng::mnode_nng_error("nng_ctx_recv", rv); if (rv < static_cast(recvErrors_.size())) ++recvErrors_[rv]; state = RECEIVE; @@ -61,7 +61,7 @@ class Work case SEND: if (auto rv = nng_aio_result(aio)) { - nng::mesy_nng_error("nng_ctx_send", rv); + nng::mnode_nng_error("nng_ctx_send", rv); if (rv < static_cast(sendErrors_.size())) ++sendErrors_[rv]; nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); @@ -120,7 +120,7 @@ int main() if (int res = nng_listen(socket, "tcp://localhost:5555", nullptr, 0)) { - nng::mesy_nng_error("nng_listen", res); + nng::mnode_nng_error("nng_listen", res); return res; } diff --git a/src/mnode_proto_test1.cc b/src/mnode_proto_test1.cc index b0b1e4b..d7be51f 100644 --- a/src/mnode_proto_test1.cc +++ b/src/mnode_proto_test1.cc @@ -26,7 +26,7 @@ void requester(nng_socket socket, unsigned id) auto reqMsg = nng::make_message(ping.SerializeAsString()); if (int rc = nng_sendmsg(socket, reqMsg.release(), 0)) { - nng::mesy_nng_error("req: nng_sendmsg", rc); + nng::mnode_nng_error("req: nng_sendmsg", rc); if (rc != NNG_ETIMEDOUT) return; continue; @@ -35,7 +35,7 @@ void requester(nng_socket socket, unsigned id) auto [repMsg, rc] = nng::receive_message(socket); if (rc) { - nng::mesy_nng_error("requester: nng_recvmsg", rc); + nng::mnode_nng_error("requester: nng_recvmsg", rc); if (rc != NNG_ETIMEDOUT) return; continue; @@ -62,7 +62,7 @@ void responder(nng_socket socket, unsigned id) auto [reqMsg, rc] = nng::receive_message(socket); if (rc) { - nng::mesy_nng_error("responder: nng_recvmsg", rc); + nng::mnode_nng_error("responder: nng_recvmsg", rc); if (rc != NNG_ETIMEDOUT) return; continue; @@ -75,7 +75,7 @@ void responder(nng_socket socket, unsigned id) auto repMsg = nng::make_message(pong.SerializeAsString()); if (int rc = nng_sendmsg(socket, repMsg.release(), 0)) { - nng::mesy_nng_error("responder: nng_sendmsg", rc); + nng::mnode_nng_error("responder: nng_sendmsg", rc); if (rc != NNG_ETIMEDOUT) return; continue; diff --git a/src/mvlc_nng_replay.cc b/src/mvlc_nng_replay.cc index f75fbb6..ff225a9 100644 --- a/src/mvlc_nng_replay.cc +++ b/src/mvlc_nng_replay.cc @@ -715,28 +715,28 @@ int main(int argc, char *argv[]) for (auto event: { NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST }) { if (int res = nng_pipe_notify(socket, event, pipe_cb, nullptr)) - mesy_nng_fatal("nng_pipe_notify", res); + mnode_nng_fatal("nng_pipe_notify", res); } } if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0)) - mesy_nng_fatal("nng_listen inproc", res); + mnode_nng_fatal("nng_listen inproc", res); log_socket_info(readerOutputSocket, "readerOutputSocket"); if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0)) - mesy_nng_fatal("nng_dial inproc", res); + mnode_nng_fatal("nng_dial inproc", res); log_socket_info(parserInputSocket, "parserInputSocket"); if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0)) - mesy_nng_fatal("nng_listen inproc", res); + mnode_nng_fatal("nng_listen inproc", res); if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0)) - mesy_nng_fatal("nng_dial inproc", res); + mnode_nng_fatal("nng_dial inproc", res); spdlog::info("replaying from {}", zipReader.firstListfileEntryName()); auto preamble = mvlc::listfile::read_preamble(*readHandle); @@ -774,7 +774,7 @@ int main(int argc, char *argv[]) for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket }) if (auto res = nng_close(socket)) - mesy_nng_fatal("nng_close", res); + mnode_nng_fatal("nng_close", res); } catch(const std::exception& e) { diff --git a/src/pair_consumer.cc b/src/pair_consumer.cc index 02c0383..8f4bd55 100644 --- a/src/pair_consumer.cc +++ b/src/pair_consumer.cc @@ -6,12 +6,12 @@ int main() auto consumerSocket = make_pair_socket(); if (int res = nng_dial(consumerSocket, "tcp6://[::]:41234", nullptr, 0)) - mesy_nng_fatal("nng_dial", res); + mnode_nng_fatal("nng_dial", res); consumer(consumerSocket); if (int res = nng_close(consumerSocket)) - mesy_nng_fatal("nng_close", res); + mnode_nng_fatal("nng_close", res); return 0; } diff --git a/src/pair_inproc.cc b/src/pair_inproc.cc index 8e4cd6b..c30daee 100644 --- a/src/pair_inproc.cc +++ b/src/pair_inproc.cc @@ -8,12 +8,12 @@ int main(int argc, char *argv[]) auto producerSocket = make_pair_socket(); if (int res = nng_listen(producerSocket, "inproc://1", nullptr, 0)) - mesy_nng_fatal("nng_listen inproc", res); + mnode_nng_fatal("nng_listen inproc", res); auto consumerSocket = make_pair_socket(); if (int res = nng_dial(consumerSocket, "inproc://1", nullptr, 0)) - mesy_nng_fatal("nng_dial", res); + mnode_nng_fatal("nng_dial", res); std::vector threads; @@ -24,10 +24,10 @@ int main(int argc, char *argv[]) if (t.joinable()) t.join(); if (int res = nng_close(consumerSocket)) - mesy_nng_fatal("nng_close", res); + mnode_nng_fatal("nng_close", res); if (int res = nng_close(producerSocket)) - mesy_nng_fatal("nng_close", res); + mnode_nng_fatal("nng_close", res); return 0; } diff --git a/src/pair_producer.cc b/src/pair_producer.cc index a8d1db0..79e06c9 100644 --- a/src/pair_producer.cc +++ b/src/pair_producer.cc @@ -6,12 +6,12 @@ int main(int argc, char *argv[]) auto producerSocket = make_pair_socket(); if (int res = nng_listen(producerSocket, "tcp6://*:41234", nullptr, 0)) - mesy_nng_fatal("nng_listen tcp", res); + mnode_nng_fatal("nng_listen tcp", res); producer(producerSocket); if (int res = nng_close(producerSocket)) - mesy_nng_fatal("nng_close", res); + mnode_nng_fatal("nng_close", res); return 0; } diff --git a/src/test_producer_consumer.h b/src/test_producer_consumer.h index 1ab5577..3c55f9d 100644 --- a/src/test_producer_consumer.h +++ b/src/test_producer_consumer.h @@ -2,6 +2,7 @@ #define E6EFFE63_EB2C_4573_B723_61840850BBF6 #include +#include using namespace mesytec::mnode::nng; // to make the old code compile. for test code only. @@ -18,7 +19,7 @@ inline void producer(nng_socket socket) if (auto res = receive_message(socket, &msg)) { if (res != NNG_ETIMEDOUT) - mesy_nng_fatal("receive_message", res); + mnode_nng_fatal("receive_message", res); } else { @@ -36,13 +37,13 @@ inline void producer(nng_socket socket) nng_msg *msg = nullptr; if (int res = nng_msg_alloc(&msg, BufferSize)) - mesy_nng_fatal("nng_msg_alloc", res); + mnode_nng_fatal("nng_msg_alloc", res); std::memset(nng_msg_body(msg), 0, BufferSize); *reinterpret_cast(nng_msg_body(msg)) = nbuf; if (auto res = nng_sendmsg(socket, msg, 0)) - mesy_nng_fatal("nng_sendmsg", res); + mnode_nng_fatal("nng_sendmsg", res); totalBytes += BufferSize; @@ -53,7 +54,7 @@ inline void producer(nng_socket socket) spdlog::trace("producer sending 'quit' message"); auto msg = alloc_message(0); if (auto res = nng_sendmsg(socket, msg, 0)) - mesy_nng_fatal("nng_sendmsg", res); + mnode_nng_fatal("nng_sendmsg", res); } auto tEnd = std::chrono::steady_clock::now(); @@ -71,7 +72,7 @@ inline void consumer(nng_socket socket) auto msg = alloc_message(0); if (auto res = nng_sendmsg(socket, msg, 0)) - mesy_nng_fatal("nng_sendmsg", res); + mnode_nng_fatal("nng_sendmsg", res); } spdlog::info("consumer ready to receive"); @@ -87,7 +88,7 @@ inline void consumer(nng_socket socket) if (auto res = receive_message(socket, &msg)) { if (res != NNG_ETIMEDOUT) - mesy_nng_fatal("receive_message", res); + mnode_nng_fatal("receive_message", res); else spdlog::warn("consumer timed out in recv"); }