mesytec-mnode/include/mesytec-mnode/mnode_nng.h
2024-12-26 15:28:00 +01:00

339 lines
8.1 KiB
C++

#ifndef B18E3651_CA9A_43BC_AA25_810EA16533CD
#define B18E3651_CA9A_43BC_AA25_810EA16533CD
#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#include <nng/protocol/pipeline0/pull.h>
#include <nng/protocol/pipeline0/push.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
#include <cassert>
#include <cstdint>
#include <cstring>
#include <functional>
#include <memory>
#include <optional>
#include <string>
namespace mesytec::mnode::nng
{
void mnode_nng_fatal(const char *const msg, int rv);
void mnode_nng_error(const std::string &msg, int rv);
inline nng_msg *alloc_message(size_t size)
{
nng_msg *msg = {};
if (int res = nng_msg_alloc(&msg, size))
{
mnode_nng_error("nng_msg_alloc", res);
return nullptr;
}
return msg;
}
inline int receive_message(nng_socket sock, nng_msg **msg_ptr, int flags = 0)
{
if (auto res = nng_recvmsg(sock, msg_ptr, flags))
{
nng_msg_free(*msg_ptr);
*msg_ptr = NULL;
return res;
}
return 0;
}
inline int allocate_reserve_message(nng_msg **msg, size_t reserve = 0)
{
assert(msg);
if (auto res = nng_msg_alloc(msg, 0))
{
mnode_nng_error("nng_msg_alloc", res);
return res;
}
if (auto res = nng_msg_reserve(*msg, reserve))
{
mnode_nng_error("nng_msg_reserve", res);
return res;
}
return 0;
}
inline size_t allocated_free_space(nng_msg *msg)
{
auto capacity = nng_msg_capacity(msg);
auto used = nng_msg_len(msg);
assert(capacity >= used);
return capacity - used;
}
static nng_duration DefaultTimeout = 250;
inline int set_socket_timeouts(nng_socket socket, nng_duration timeout = DefaultTimeout)
{
if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout)))
{
mnode_nng_error("nng_socket_set", res);
return res;
}
if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout)))
{
mnode_nng_error("nng_socket_set", res);
return res;
}
return 0;
}
using socket_factory = std::function<int(nng_socket *)>;
inline nng_socket make_socket(socket_factory factory, nng_duration timeout = DefaultTimeout)
{
nng_socket socket;
if (int res = factory(&socket))
{
mnode_nng_error("make_socket", res);
return NNG_SOCKET_INITIALIZER;
}
if (set_socket_timeouts(socket, timeout) != 0)
{
nng_close(socket);
return NNG_SOCKET_INITIALIZER;
}
return socket;
}
nng_socket make_pair_socket(nng_duration timeout = DefaultTimeout);
nng_socket make_push_socket(nng_duration timeout = DefaultTimeout);
nng_socket make_pull_socket(nng_duration timeout = DefaultTimeout);
nng_socket make_pub_socket(nng_duration timeout = DefaultTimeout);
nng_socket make_sub_socket(nng_duration timeout = DefaultTimeout);
nng_socket make_req_socket(nng_duration timeout = DefaultTimeout);
nng_socket make_rep_socket(nng_duration timeout = DefaultTimeout);
inline std::string socket_get_string_opt(nng_socket s, const char *opt)
{
char *dest = nullptr;
if (nng_socket_get_string(s, opt, &dest))
return {};
std::string result{*dest};
nng_strfree(dest);
return result;
}
inline std::string pipe_get_string_opt(nng_pipe p, const char *opt)
{
char *dest = nullptr;
if (nng_pipe_get_string(p, opt, &dest))
return {};
std::string result{*dest};
nng_strfree(dest);
return result;
}
void log_socket_info(nng_socket s, const std::string &info_text);
inline std::string get_local_address(nng_socket s)
{
return socket_get_string_opt(s, NNG_OPT_LOCADDR);
}
void log_pipe_info(nng_pipe p, const char *info_text);
// 'nng_res' is the result of the last nng function call.
using retry_predicate = std::function<bool(int nng_res)>;
int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp,
const char *debugInfo = "");
struct RetryNTimes
{
explicit RetryNTimes(size_t maxTries = 3)
: maxTries(maxTries)
{
}
bool operator()(int) { return attempt++ < maxTries; }
size_t maxTries;
size_t attempt = 0u;
};
inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3,
const char *debugInfo = "")
{
return send_message_retry(socket, msg, RetryNTimes(maxTries), debugInfo);
}
int send_empty_message(nng_socket socket, retry_predicate rp);
inline int send_empty_message(nng_socket socket, size_t maxTries = 3)
{
return send_empty_message(socket, RetryNTimes(maxTries));
}
// Read type T from the front of msg and trim the message by sizeof(T).
template <typename T> std::optional<T> msg_trim_read(nng_msg *msg)
{
const auto oldlen = nng_msg_len(msg);
(void)oldlen;
if (nng_msg_len(msg) < sizeof(T))
return std::nullopt;
T result = *reinterpret_cast<T *>(nng_msg_body(msg));
nng_msg_trim(msg, sizeof(T));
const auto newlen = nng_msg_len(msg);
(void)newlen;
assert(newlen + sizeof(T) == oldlen);
return result;
}
const char *nng_stat_unit_to_string(int nng_unit);
const char *nng_stat_type_to_string(int nng_stat_type);
// Important: as of nng-1.8.0 sockets stats are only implemented for pair1 type
// sockets!
template <typename Visitor>
void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth = 0)
{
visitor(stat, depth);
auto statType = nng_stat_type(stat);
if (statType == NNG_STAT_SCOPE)
{
for (auto child = nng_stat_child(stat); child; child = nng_stat_next(child))
{
visit_nng_stats(child, visitor, depth + 1);
}
}
}
std::string format_stat(nng_stat *stat);
using unique_msg = std::unique_ptr<nng_msg, decltype(&nng_msg_free)>;
inline unique_msg make_unique_msg(nng_msg *msg = nullptr) { return unique_msg(msg, &nng_msg_free); }
inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0)
{
nng_msg *msg = nullptr;
if (auto res = nng_recvmsg(sock, &msg, flags))
{
return {make_unique_msg(), res};
}
return {make_unique_msg(msg), 0};
}
inline int send_message(nng_socket sock, unique_msg &msg, int flags = 0)
{
if (auto res = nng_sendmsg(sock, msg.get(), flags))
{
return res;
}
msg.release();
return 0;
}
inline unique_msg allocate_message(size_t size)
{
nng_msg *msg = nullptr;
if (auto res = nng_msg_alloc(&msg, size))
{
mnode_nng_error("allocate_message", res);
return make_unique_msg();
}
return make_unique_msg(msg);
}
// Returned message has size 0 and capacity 'reserve'.
inline unique_msg allocate_reserve_message(size_t reserve = 0)
{
nng_msg *msg = nullptr;
if (auto res = nng_msg_alloc(&msg, 0))
{
mnode_nng_error("allocate_reserve_message", res);
return make_unique_msg();
}
if (auto res = nng_msg_reserve(msg, reserve))
{
mnode_nng_error("allocate_reserve_message", res);
nng_msg_free(msg);
return make_unique_msg();
}
return make_unique_msg(msg);
}
inline int marry_listen_dial(nng_socket listen, nng_socket dial, const char *url)
{
if (int res = nng_listen(listen, url, nullptr, 0))
{
mnode_nng_error("nng_listen", res);
return res;
}
if (int res = nng_dial(dial, url, nullptr, 0))
{
mnode_nng_error("nng_dial", res);
return res;
}
return 0;
}
inline unique_msg make_message(const std::string &data)
{
nng_msg *msg = nullptr;
if (int res = nng_msg_alloc(&msg, data.size()))
{
mnode_nng_error("nng_msg_alloc", res);
return make_unique_msg();
}
std::memcpy(nng_msg_body(msg), data.data(), data.size());
return make_unique_msg(msg);
}
inline unique_msg clone_message(const nng_msg *msg)
{
nng_msg *newMsg = nullptr;
if (int res = nng_msg_dup(&newMsg, msg))
{
mnode_nng_error("nng_msg_dup", res);
return make_unique_msg();
}
return make_unique_msg(newMsg);
}
inline unique_msg clone_message(const unique_msg &msg) { return clone_message(msg.get()); }
} // namespace mesytec::mnode::nng
#endif /* B18E3651_CA9A_43BC_AA25_810EA16533CD */