mnode_nng: reformat
This commit is contained in:
parent
9bdd79e910
commit
0eaa5aa942
1 changed files with 24 additions and 24 deletions
|
@ -7,8 +7,8 @@
|
||||||
#include <nng/protocol/pipeline0/push.h>
|
#include <nng/protocol/pipeline0/push.h>
|
||||||
#include <nng/protocol/pubsub0/pub.h>
|
#include <nng/protocol/pubsub0/pub.h>
|
||||||
#include <nng/protocol/pubsub0/sub.h>
|
#include <nng/protocol/pubsub0/sub.h>
|
||||||
#include <nng/protocol/reqrep0/req.h>
|
|
||||||
#include <nng/protocol/reqrep0/rep.h>
|
#include <nng/protocol/reqrep0/rep.h>
|
||||||
|
#include <nng/protocol/reqrep0/req.h>
|
||||||
|
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
|
@ -93,7 +93,7 @@ inline int set_socket_timeouts(nng_socket socket, nng_duration timeout = Default
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
using socket_factory = std::function<int (nng_socket *)>;
|
using socket_factory = std::function<int(nng_socket *)>;
|
||||||
|
|
||||||
inline nng_socket make_socket(socket_factory factory, nng_duration timeout = DefaultTimeout)
|
inline nng_socket make_socket(socket_factory factory, nng_duration timeout = DefaultTimeout)
|
||||||
{
|
{
|
||||||
|
@ -159,20 +159,25 @@ inline std::string get_local_address(nng_socket s)
|
||||||
void log_pipe_info(nng_pipe p, const char *info_text);
|
void log_pipe_info(nng_pipe p, const char *info_text);
|
||||||
|
|
||||||
// 'nng_res' is the result of the last nng function call.
|
// 'nng_res' is the result of the last nng function call.
|
||||||
using retry_predicate = std::function<bool (int nng_res)>;
|
using retry_predicate = std::function<bool(int nng_res)>;
|
||||||
|
|
||||||
int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp, const char *debugInfo = "");
|
int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp,
|
||||||
|
const char *debugInfo = "");
|
||||||
|
|
||||||
struct RetryNTimes
|
struct RetryNTimes
|
||||||
{
|
{
|
||||||
explicit RetryNTimes(size_t maxTries = 3) : maxTries(maxTries) {}
|
explicit RetryNTimes(size_t maxTries = 3)
|
||||||
|
: maxTries(maxTries)
|
||||||
|
{
|
||||||
|
}
|
||||||
bool operator()(int) { return attempt++ < maxTries; }
|
bool operator()(int) { return attempt++ < maxTries; }
|
||||||
|
|
||||||
size_t maxTries;
|
size_t maxTries;
|
||||||
size_t attempt = 0u;
|
size_t attempt = 0u;
|
||||||
};
|
};
|
||||||
|
|
||||||
inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3, const char *debugInfo = "")
|
inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3,
|
||||||
|
const char *debugInfo = "")
|
||||||
{
|
{
|
||||||
return send_message_retry(socket, msg, RetryNTimes(maxTries), debugInfo);
|
return send_message_retry(socket, msg, RetryNTimes(maxTries), debugInfo);
|
||||||
}
|
}
|
||||||
|
@ -185,16 +190,17 @@ inline int send_empty_message(nng_socket socket, size_t maxTries = 3)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read type T from the front of msg and trim the message by sizeof(T).
|
// Read type T from the front of msg and trim the message by sizeof(T).
|
||||||
template<typename T>
|
template <typename T> std::optional<T> msg_trim_read(nng_msg *msg)
|
||||||
std::optional<T> msg_trim_read(nng_msg *msg)
|
|
||||||
{
|
{
|
||||||
const auto oldlen = nng_msg_len(msg); (void) oldlen;
|
const auto oldlen = nng_msg_len(msg);
|
||||||
|
(void)oldlen;
|
||||||
if (nng_msg_len(msg) < sizeof(T))
|
if (nng_msg_len(msg) < sizeof(T))
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
|
|
||||||
T result = *reinterpret_cast<T *>(nng_msg_body(msg));
|
T result = *reinterpret_cast<T *>(nng_msg_body(msg));
|
||||||
nng_msg_trim(msg, sizeof(T));
|
nng_msg_trim(msg, sizeof(T));
|
||||||
const auto newlen = nng_msg_len(msg); (void) newlen;
|
const auto newlen = nng_msg_len(msg);
|
||||||
|
(void)newlen;
|
||||||
assert(newlen + sizeof(T) == oldlen);
|
assert(newlen + sizeof(T) == oldlen);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -205,8 +211,8 @@ const char *nng_stat_type_to_string(int nng_stat_type);
|
||||||
// Important: as of nng-1.8.0 sockets stats are only implemented for pair1 type
|
// Important: as of nng-1.8.0 sockets stats are only implemented for pair1 type
|
||||||
// sockets!
|
// sockets!
|
||||||
|
|
||||||
template<typename Visitor>
|
template <typename Visitor>
|
||||||
void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth=0)
|
void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth = 0)
|
||||||
{
|
{
|
||||||
visitor(stat, depth);
|
visitor(stat, depth);
|
||||||
|
|
||||||
|
@ -216,7 +222,7 @@ void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth=0)
|
||||||
{
|
{
|
||||||
for (auto child = nng_stat_child(stat); child; child = nng_stat_next(child))
|
for (auto child = nng_stat_child(stat); child; child = nng_stat_next(child))
|
||||||
{
|
{
|
||||||
visit_nng_stats(child, visitor, depth+1);
|
visit_nng_stats(child, visitor, depth + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -225,10 +231,7 @@ std::string format_stat(nng_stat *stat);
|
||||||
|
|
||||||
using unique_msg = std::unique_ptr<nng_msg, decltype(&nng_msg_free)>;
|
using unique_msg = std::unique_ptr<nng_msg, decltype(&nng_msg_free)>;
|
||||||
|
|
||||||
inline unique_msg make_unique_msg(nng_msg *msg = nullptr)
|
inline unique_msg make_unique_msg(nng_msg *msg = nullptr) { return unique_msg(msg, &nng_msg_free); }
|
||||||
{
|
|
||||||
return unique_msg(msg, &nng_msg_free);
|
|
||||||
}
|
|
||||||
|
|
||||||
inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0)
|
inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0)
|
||||||
{
|
{
|
||||||
|
@ -236,10 +239,10 @@ inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0
|
||||||
|
|
||||||
if (auto res = nng_recvmsg(sock, &msg, flags))
|
if (auto res = nng_recvmsg(sock, &msg, flags))
|
||||||
{
|
{
|
||||||
return { make_unique_msg(), res };
|
return {make_unique_msg(), res};
|
||||||
}
|
}
|
||||||
|
|
||||||
return { make_unique_msg(msg), 0 };
|
return {make_unique_msg(msg), 0};
|
||||||
}
|
}
|
||||||
|
|
||||||
inline int send_message(nng_socket sock, unique_msg &msg, int flags = 0)
|
inline int send_message(nng_socket sock, unique_msg &msg, int flags = 0)
|
||||||
|
@ -329,11 +332,8 @@ inline unique_msg clone_message(const nng_msg *msg)
|
||||||
return make_unique_msg(newMsg);
|
return make_unique_msg(newMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline unique_msg clone_message(const unique_msg &msg)
|
inline unique_msg clone_message(const unique_msg &msg) { return clone_message(msg.get()); }
|
||||||
{
|
|
||||||
return clone_message(msg.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
} // namespace mesytec::mnode::nng
|
||||||
|
|
||||||
#endif /* B18E3651_CA9A_43BC_AA25_810EA16533CD */
|
#endif /* B18E3651_CA9A_43BC_AA25_810EA16533CD */
|
||||||
|
|
Loading…
Reference in a new issue