diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f719490..5f02f85 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -60,5 +60,8 @@ target_compile_features(mesy_nng_sub_consumer PRIVATE cxx_std_17) target_link_libraries(mesy_nng_sub_consumer PRIVATE mesytec-mvlc PRIVATE nng) target_compile_options(mesy_nng_sub_consumer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) +add_executable(mnode-proto-test1 mnode_proto_test1.cc thread_name.cc) +target_link_libraries(mnode-proto-test1 PRIVATE mnode-proto) + #unset(CMAKE_C_CLANG_TIDY) #unset(CMAKE_CXX_CLANG_TIDY) diff --git a/src/mesy_nng.h b/src/mesy_nng.h index a7cb1f9..2b9b45a 100644 --- a/src/mesy_nng.h +++ b/src/mesy_nng.h @@ -1,9 +1,145 @@ -#pragma once +#ifndef B18E3651_CA9A_43BC_AA25_810EA16533CD +#define B18E3651_CA9A_43BC_AA25_810EA16533CD #include +#include +#include +#include #include #include -#include "common.h" +#include + +#include +#include "thread_name.h" + +namespace mesytec::nng +{ + +inline void mesy_nng_fatal(const char *const msg, int rv) +{ + spdlog::error("{} ({})", msg, nng_strerror(rv)); + std::abort(); +} + +inline void mesy_nng_error(const std::string &msg, int rv) +{ + spdlog::error("{} ({})", msg, nng_strerror(rv)); +} + +inline nng_msg *alloc_message(size_t size) +{ + nng_msg *msg = {}; + if (int res = nng_msg_alloc(&msg, size)) + { + mesy_nng_error("nng_msg_alloc", res); + return nullptr; + } + return msg; +} + +inline int receive_message(nng_socket sock, nng_msg **msg_ptr, int flags = 0) +{ + if (auto res = nng_recvmsg(sock, msg_ptr, flags)) + { + nng_msg_free(*msg_ptr); + *msg_ptr = NULL; + return res; + } + + return 0; +} + +inline int allocate_reserve_message(nng_msg **msg, size_t reserve = 0) +{ + assert(msg); + + if (auto res = nng_msg_alloc(msg, 0)) + { + mesy_nng_error("nng_msg_alloc", res); + return res; + } + + if (auto res = nng_msg_reserve(*msg, reserve)) + { + mesy_nng_error("nng_msg_reserve", res); + return res; + } + + return 0; +} + +inline size_t allocated_free_space(nng_msg *msg) +{ + auto capacity = nng_msg_capacity(msg); + auto used = nng_msg_len(msg); + assert(capacity >= used); + return capacity - used; +} + +static nng_duration DefaultTimeout = 16; + +inline int set_socket_timeouts(nng_socket socket, nng_duration timeout = DefaultTimeout) +{ + if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout))) + { + mesy_nng_error("nng_socket_set", res); + return res; + } + + if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout))) + { + mesy_nng_error("nng_socket_set", res); + return res; + } + + return 0; +} + +using socket_factory = std::function; + +inline nng_socket make_socket(socket_factory factory, nng_duration timeout = DefaultTimeout) +{ + nng_socket socket; + + if (int res = factory(&socket)) + { + mesy_nng_error("make_socket", res); + return NNG_SOCKET_INITIALIZER; + } + + if (set_socket_timeouts(socket, timeout) != 0) + { + nng_close(socket); + return NNG_SOCKET_INITIALIZER; + } + + return socket; +} + +inline nng_socket make_pair_socket(nng_duration timeout = DefaultTimeout) +{ + return make_socket(nng_pair0_open, timeout); +} + +inline nng_socket make_push_socket(nng_duration timeout = DefaultTimeout) +{ + return make_socket(nng_push0_open, timeout); +} + +inline nng_socket make_pull_socket(nng_duration timeout = DefaultTimeout) +{ + return make_socket(nng_pull0_open, timeout); +} + +inline nng_socket make_pub_socket(nng_duration timeout = DefaultTimeout) +{ + return make_socket(nng_pub0_open, timeout); +} + +inline nng_socket make_sub_socket(nng_duration timeout = DefaultTimeout) +{ + return make_socket(nng_sub0_open, timeout); +} inline std::string socket_get_string_opt(nng_socket s, const char *opt) { @@ -40,6 +176,11 @@ inline void log_socket_info(nng_socket s, const char *info) spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress); } +inline std::string get_local_address(nng_socket s) +{ + return socket_get_string_opt(s, NNG_OPT_LOCADDR); +} + inline void log_pipe_info(nng_pipe p, const char *info) { auto sockName = pipe_get_string_opt(p, NNG_OPT_SOCKNAME); @@ -51,10 +192,28 @@ inline void log_pipe_info(nng_pipe p, const char *info) spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress); } -inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 0, const char *debugInfo = "") +using retry_predicate = std::function; + +class RetryNTimes +{ + public: + RetryNTimes(size_t maxTries = 3) + : maxTries_(maxTries) + {} + + bool operator()() + { + return attempt_++ < maxTries_; + } + + private: + size_t maxTries_; + size_t attempt_ = 0u; +}; + +inline int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp, const char *debugInfo = "") { int res = 0; - size_t attempt = 0u; do { @@ -63,17 +222,206 @@ inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = if (res) { if (res != NNG_ETIMEDOUT) + { + spdlog::warn("send_message_retry: {} - send failed: {} (msg={})", debugInfo, nng_strerror(res), fmt::ptr(msg)); return res; + } if (res == NNG_ETIMEDOUT) - spdlog::warn("send_message_retry: {} - send timeout", debugInfo); + spdlog::trace("send_message_retry: {} - send timeout (msg={})", debugInfo, fmt::ptr(msg)); - if (maxTries > 0 && attempt >= maxTries) + if (!rp()) return res; - - ++attempt; } } while (res == NNG_ETIMEDOUT); return 0; } + +inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3, const char *debugInfo = "") +{ + RetryNTimes retryPredicate(maxTries); + + return send_message_retry(socket, msg, retryPredicate, debugInfo); +} + +inline int send_empty_message(nng_socket socket, retry_predicate rp) +{ + nng_msg *msg = nullptr; + + if (int res = allocate_reserve_message(&msg, 0)) + return res; + + if (int res = send_message_retry(socket, msg, rp, "send_empty_message")) + { + nng_msg_free(msg); + return res; + } + + return 0; +} + +inline int send_empty_message(nng_socket socket, size_t maxTries = 3) +{ + RetryNTimes retryPredicate(maxTries); + + return send_empty_message(socket, retryPredicate); +} + +// Read type T from the front of msg and trim the message by sizeof(T). +template +std::optional msg_trim_read(nng_msg *msg) +{ + const auto oldlen = nng_msg_len(msg); (void) oldlen; + if (nng_msg_len(msg) < sizeof(T)) + return {}; + + T result = *reinterpret_cast(nng_msg_body(msg)); + nng_msg_trim(msg, sizeof(T)); + const auto newlen = nng_msg_len(msg); (void) newlen; + assert(newlen + sizeof(T) == oldlen); + return result; +} + +inline const char *nng_stat_unit_to_string(int unit) +{ + switch (unit) + { + case NNG_UNIT_BYTES: + return "B"; + case NNG_UNIT_MESSAGES: + return "msgs"; + case NNG_UNIT_MILLIS: + return "ms"; + case NNG_UNIT_EVENTS: + return "events"; + } + + return ""; +} + +inline const char *nng_stat_type_to_string(int type) +{ + switch (type) + { + case NNG_STAT_SCOPE: + return "scope"; + case NNG_STAT_LEVEL: + return "level"; + case NNG_STAT_COUNTER: + return "counter"; + case NNG_STAT_STRING: + return "string"; + case NNG_STAT_BOOLEAN: + return "bool"; + case NNG_STAT_ID: + return "id"; + } + + return ""; +} + +// Important: as of nng-1.8.0 sockets stats are only implemented for pair1 type +// sockets! + +template +void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth=0) +{ + visitor(stat, depth); + + auto statType = nng_stat_type(stat); + + if (statType == NNG_STAT_SCOPE) + { + for (auto child = nng_stat_child(stat); child; child = nng_stat_next(child)) + { + visit_nng_stats(child, visitor, depth+1); + } + } +} + +template +std::string format_stat(int type, const char *name, const char *desc, std::uint64_t ts, Value value, int unit) +{ + return fmt::format("type={}, name={}, desc={}, ts={}, value={}, unit={}", + nng::nng_stat_type_to_string(type), + name, desc, ts, value, + nng::nng_stat_unit_to_string(unit)); +} + +inline std::string format_stat(nng_stat *stat) +{ + switch (nng_stat_type(stat)) + { + case NNG_STAT_BOOLEAN: + return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat)); + default: + return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat)); + } +} + +static void custom_nng_msg_free(nng_msg *msg) +{ + //spdlog::warn("custom_nng_msg_free: msg={}", fmt::ptr(msg)); + nng_msg_free(msg); +} + +using unique_msg = std::unique_ptr; + +inline unique_msg make_unique_msg(nng_msg *msg = nullptr, decltype(&nng_msg_free) deleter = custom_nng_msg_free) +{ + return unique_msg(msg, deleter); +} + +inline std::pair receive_message(nng_socket sock, int flags = 0) +{ + nng_msg *msg = nullptr; + + if (auto res = nng_recvmsg(sock, &msg, flags)) + { + return { make_unique_msg(), res }; + } + + return { make_unique_msg(msg), 0 }; +} + +inline unique_msg allocate_reserve_message(size_t reserve = 0) +{ + nng_msg *msg = nullptr; + + if (auto res = nng_msg_alloc(&msg, 0)) + { + mesy_nng_error("allocate_reserve_message", res); + return make_unique_msg(); + } + + if (auto res = nng_msg_reserve(msg, reserve)) + { + mesy_nng_error("allocate_reserve_message", res); + nng_msg_free(msg); + return make_unique_msg(); + } + + return make_unique_msg(msg); +} + +inline int marry_listen_dial(nng_socket listen, nng_socket dial, const char *url) +{ + if (int res = nng_listen(listen, url, nullptr, 0)) + { + mesy_nng_error("nng_listen", res); + return res; + } + + if (int res = nng_dial(dial, url, nullptr, 0)) + { + mesy_nng_error("nng_dial", res); + return res; + } + + return 0; +} + +} + +#endif /* B18E3651_CA9A_43BC_AA25_810EA16533CD */ diff --git a/src/mesy_nng_pipeline_main.cc b/src/mesy_nng_pipeline_main.cc index cfe6515..766ae55 100644 --- a/src/mesy_nng_pipeline_main.cc +++ b/src/mesy_nng_pipeline_main.cc @@ -1,6 +1,8 @@ #include #include "mesy_nng.h" +using namespace mesytec::nng; + struct PipelineElement { nng_socket inputSocket = NNG_SOCKET_INITIALIZER; @@ -380,4 +382,4 @@ int main(int argc, char *argv[]) mesy_nng_fatal("main close_pipeline", res); return ret; -} \ No newline at end of file +} diff --git a/src/mesy_nng_push_pull_main.cc b/src/mesy_nng_push_pull_main.cc index 24da0f1..355d755 100644 --- a/src/mesy_nng_push_pull_main.cc +++ b/src/mesy_nng_push_pull_main.cc @@ -1,8 +1,12 @@ -#include "common.h" +#include "mesy_nng.h" #include #include #include +#include "test_producer_consumer.h" + +using namespace mesytec::nng; + struct Header { unsigned id; diff --git a/src/mnode_proto_test1.cc b/src/mnode_proto_test1.cc new file mode 100644 index 0000000..85ea08c --- /dev/null +++ b/src/mnode_proto_test1.cc @@ -0,0 +1,11 @@ +#include "proto/service.pb.h" + +int main() +{ + mesytec::mnode::SearchRequest request; + request.set_query("query"); + request.set_page_number(42); + request.set_results_per_page(10); + + mesytec::mnode::Ping pingRequest; +} diff --git a/src/mvlc_nng_replay.cc b/src/mvlc_nng_replay.cc index 244deb8..7fdc8e8 100644 --- a/src/mvlc_nng_replay.cc +++ b/src/mvlc_nng_replay.cc @@ -3,32 +3,13 @@ #include #include #include -#include "common.h" #include "mesy_nng.h" using namespace mesytec; using namespace mesytec::mvlc; +using namespace mesytec::nng; -int allocate_reserve_message(nng_msg **msg, size_t reserve = 0) -{ - assert(msg); - - if (auto res = nng_msg_alloc(msg, 0)) - { - spdlog::error("nng_msg_alloc: {}", nng_strerror(res)); - return res; - } - - if (auto res = nng_msg_reserve(*msg, reserve)) - { - spdlog::error("nng_msg_reserve: {}", nng_strerror(res)); - return res; - } - - return 0; -} - -static const size_t DefaultOutputMessageReserve = util::Megabytes(1); +static const size_t DefaultOutputMessageReserve = mvlc::util::Megabytes(1); enum class BufferType: u32 { @@ -159,7 +140,7 @@ void listfile_reader_producer( } spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB", - bufferNumber, 1.0 * totalBytesSent / util::Megabytes(1)); + bufferNumber, 1.0 * totalBytesSent / mvlc::util::Megabytes(1)); } catch(const std::exception& e) { @@ -439,7 +420,7 @@ void listfile_parser_nng( auto log_stats = [&] { spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", - lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1)); + lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1)); spdlog::info("listfile_parser_nng: time budget: " " tReceive = {} ms, " " tProcess = {} ms, " @@ -565,20 +546,6 @@ void listfile_parser_nng( } } -template -std::optional msg_trim_read(nng_msg *msg) -{ - const auto oldlen = nng_msg_len(msg); - if (nng_msg_len(msg) < sizeof(T)) - return {}; - - T result = *reinterpret_cast(nng_msg_body(msg)); - nng_msg_trim(msg, sizeof(T)); - const auto newlen = nng_msg_len(msg); - assert(newlen + sizeof(T) == oldlen); - return result; -} - void analysis_nng( nng_socket inputSocket ) @@ -672,10 +639,10 @@ void analysis_nng( } spdlog::info("analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB", - lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1)); + lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1)); } -void pipe_cb(nng_pipe p, nng_pipe_ev event, void *arg) +void pipe_cb(nng_pipe p, nng_pipe_ev event, void */*arg*/) { switch (event) { diff --git a/src/pair_consumer.cc b/src/pair_consumer.cc index e10c46c..02c0383 100644 --- a/src/pair_consumer.cc +++ b/src/pair_consumer.cc @@ -1,6 +1,6 @@ -#include "common.h" +#include "test_producer_consumer.h" -int main(int argc, char *argv[]) +int main() { spdlog::set_level(spdlog::level::info); auto consumerSocket = make_pair_socket(); diff --git a/src/pair_inproc.cc b/src/pair_inproc.cc index 5d9cc1d..8e4cd6b 100644 --- a/src/pair_inproc.cc +++ b/src/pair_inproc.cc @@ -1,4 +1,4 @@ -#include "common.h" +#include "test_producer_consumer.h" #include diff --git a/src/pair_producer.cc b/src/pair_producer.cc index 32e4b76..a8d1db0 100644 --- a/src/pair_producer.cc +++ b/src/pair_producer.cc @@ -1,4 +1,4 @@ -#include "common.h" +#include "test_producer_consumer.h" int main(int argc, char *argv[]) { diff --git a/src/pub_producer.cc b/src/pub_producer.cc index 9115224..9e04953 100644 --- a/src/pub_producer.cc +++ b/src/pub_producer.cc @@ -1,9 +1,10 @@ -#include "common.h" +#include "test_producer_consumer.h" #include #include #include -int main(int argc, char *argv[]) + +int main() { spdlog::set_level(spdlog::level::info); diff --git a/src/sub_consumer.cc b/src/sub_consumer.cc index 1a61fd0..55e07d6 100644 --- a/src/sub_consumer.cc +++ b/src/sub_consumer.cc @@ -1,4 +1,4 @@ -#include "common.h" +#include "test_producer_consumer.h" #include #include #include diff --git a/src/common.h b/src/test_producer_consumer.h similarity index 73% rename from src/common.h rename to src/test_producer_consumer.h index 75f1c8c..d793f91 100644 --- a/src/common.h +++ b/src/test_producer_consumer.h @@ -1,57 +1,13 @@ -#ifndef __MESYTEC_MVLC_NNG_NODE_COMMON_H__ -#define __MESYTEC_MVLC_NNG_NODE_COMMON_H__ +#ifndef E6EFFE63_EB2C_4573_B723_61840850BBF6 +#define E6EFFE63_EB2C_4573_B723_61840850BBF6 -#include -#include -#include +#include "mesy_nng.h" + +using namespace mesytec::nng; // to make the old code compile. for test code only. static const size_t BufferSize = 1024u * 1024u; static const size_t BuffersToSend = 100000u; -inline void mesy_nng_fatal(const char *const msg, int rv) -{ - spdlog::error("{} ({})", msg, nng_strerror(rv)); - abort(); -} - -inline nng_msg *alloc_message(size_t size) -{ - nng_msg *msg = {}; - if (int res = nng_msg_alloc(&msg, size)) - mesy_nng_fatal("nng_msg_alloc", res); - return msg; -} - -inline int receive_message(nng_socket sock, nng_msg **msg_ptr, int flags = 0) -{ - if (auto res = nng_recvmsg(sock, msg_ptr, flags)) - { - nng_msg_free(*msg_ptr); - *msg_ptr = NULL; - return res; - } - - return 0; -} - -inline nng_socket make_pair_socket() -{ - nng_socket socket; - - if (int res = nng_pair0_open(&socket)) - mesy_nng_fatal("nng_pair0_open", res); - - nng_duration timeout = 100; - - if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout))) - mesy_nng_fatal("nng_socket_set", res); - - if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout))) - mesy_nng_fatal("nng_socket_set", res); - - return socket; -} - inline void producer(nng_socket socket) { while (true) @@ -165,4 +121,4 @@ inline void consumer(nng_socket socket) spdlog::info("consumer received {} messages, {:.2f} MiB/s, {:.2f} buffers/s", recvCount, MiBs, bufs); } -#endif /* __MESYTEC_MVLC_NNG_NODE_COMMON_H__ */ +#endif /* BD81CA64_A8E6_4938_9E27_773D6E7C4E9B */ diff --git a/src/thread_name.cc b/src/thread_name.cc new file mode 100644 index 0000000..005e237 --- /dev/null +++ b/src/thread_name.cc @@ -0,0 +1,22 @@ +#include "thread_name.h" +#ifdef __linux__ +#include +#endif + +namespace mesytec::util +{ + +#ifdef __linux__ +void set_thread_name(const char *name) +{ + prctl(PR_SET_NAME,name,0,0,0); +} + +#else + +void set_thread_name(const char *) +{ +} + +#endif +} diff --git a/src/thread_name.h b/src/thread_name.h new file mode 100644 index 0000000..0500f27 --- /dev/null +++ b/src/thread_name.h @@ -0,0 +1,9 @@ +#ifndef CAF8B988_F0C9_475A_8E38_8789949859DB +#define CAF8B988_F0C9_475A_8E38_8789949859DB + +namespace mesytec::util +{ +void set_thread_name(const char *name); +} + +#endif /* CAF8B988_F0C9_475A_8E38_8789949859DB */