add updated mesy_nng from mvme, fix things, add mnode_proto_test1
This commit is contained in:
parent
e316f2e95a
commit
f017d2406e
14 changed files with 429 additions and 106 deletions
|
@ -60,5 +60,8 @@ target_compile_features(mesy_nng_sub_consumer PRIVATE cxx_std_17)
|
|||
target_link_libraries(mesy_nng_sub_consumer PRIVATE mesytec-mvlc PRIVATE nng)
|
||||
target_compile_options(mesy_nng_sub_consumer PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS})
|
||||
|
||||
add_executable(mnode-proto-test1 mnode_proto_test1.cc thread_name.cc)
|
||||
target_link_libraries(mnode-proto-test1 PRIVATE mnode-proto)
|
||||
|
||||
#unset(CMAKE_C_CLANG_TIDY)
|
||||
#unset(CMAKE_CXX_CLANG_TIDY)
|
||||
|
|
364
src/mesy_nng.h
364
src/mesy_nng.h
|
@ -1,9 +1,145 @@
|
|||
#pragma once
|
||||
#ifndef B18E3651_CA9A_43BC_AA25_810EA16533CD
|
||||
#define B18E3651_CA9A_43BC_AA25_810EA16533CD
|
||||
|
||||
#include <nng/nng.h>
|
||||
#include <nng/protocol/pair0/pair.h>
|
||||
#include <nng/protocol/pipeline0/pull.h>
|
||||
#include <nng/protocol/pipeline0/push.h>
|
||||
#include <nng/protocol/pubsub0/pub.h>
|
||||
#include <nng/protocol/pubsub0/sub.h>
|
||||
#include "common.h"
|
||||
#include <spdlog/spdlog.h>
|
||||
|
||||
#include <optional>
|
||||
#include "thread_name.h"
|
||||
|
||||
namespace mesytec::nng
|
||||
{
|
||||
|
||||
inline void mesy_nng_fatal(const char *const msg, int rv)
|
||||
{
|
||||
spdlog::error("{} ({})", msg, nng_strerror(rv));
|
||||
std::abort();
|
||||
}
|
||||
|
||||
inline void mesy_nng_error(const std::string &msg, int rv)
|
||||
{
|
||||
spdlog::error("{} ({})", msg, nng_strerror(rv));
|
||||
}
|
||||
|
||||
inline nng_msg *alloc_message(size_t size)
|
||||
{
|
||||
nng_msg *msg = {};
|
||||
if (int res = nng_msg_alloc(&msg, size))
|
||||
{
|
||||
mesy_nng_error("nng_msg_alloc", res);
|
||||
return nullptr;
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
|
||||
inline int receive_message(nng_socket sock, nng_msg **msg_ptr, int flags = 0)
|
||||
{
|
||||
if (auto res = nng_recvmsg(sock, msg_ptr, flags))
|
||||
{
|
||||
nng_msg_free(*msg_ptr);
|
||||
*msg_ptr = NULL;
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
inline int allocate_reserve_message(nng_msg **msg, size_t reserve = 0)
|
||||
{
|
||||
assert(msg);
|
||||
|
||||
if (auto res = nng_msg_alloc(msg, 0))
|
||||
{
|
||||
mesy_nng_error("nng_msg_alloc", res);
|
||||
return res;
|
||||
}
|
||||
|
||||
if (auto res = nng_msg_reserve(*msg, reserve))
|
||||
{
|
||||
mesy_nng_error("nng_msg_reserve", res);
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
inline size_t allocated_free_space(nng_msg *msg)
|
||||
{
|
||||
auto capacity = nng_msg_capacity(msg);
|
||||
auto used = nng_msg_len(msg);
|
||||
assert(capacity >= used);
|
||||
return capacity - used;
|
||||
}
|
||||
|
||||
static nng_duration DefaultTimeout = 16;
|
||||
|
||||
inline int set_socket_timeouts(nng_socket socket, nng_duration timeout = DefaultTimeout)
|
||||
{
|
||||
if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout)))
|
||||
{
|
||||
mesy_nng_error("nng_socket_set", res);
|
||||
return res;
|
||||
}
|
||||
|
||||
if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout)))
|
||||
{
|
||||
mesy_nng_error("nng_socket_set", res);
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
using socket_factory = std::function<int (nng_socket *)>;
|
||||
|
||||
inline nng_socket make_socket(socket_factory factory, nng_duration timeout = DefaultTimeout)
|
||||
{
|
||||
nng_socket socket;
|
||||
|
||||
if (int res = factory(&socket))
|
||||
{
|
||||
mesy_nng_error("make_socket", res);
|
||||
return NNG_SOCKET_INITIALIZER;
|
||||
}
|
||||
|
||||
if (set_socket_timeouts(socket, timeout) != 0)
|
||||
{
|
||||
nng_close(socket);
|
||||
return NNG_SOCKET_INITIALIZER;
|
||||
}
|
||||
|
||||
return socket;
|
||||
}
|
||||
|
||||
inline nng_socket make_pair_socket(nng_duration timeout = DefaultTimeout)
|
||||
{
|
||||
return make_socket(nng_pair0_open, timeout);
|
||||
}
|
||||
|
||||
inline nng_socket make_push_socket(nng_duration timeout = DefaultTimeout)
|
||||
{
|
||||
return make_socket(nng_push0_open, timeout);
|
||||
}
|
||||
|
||||
inline nng_socket make_pull_socket(nng_duration timeout = DefaultTimeout)
|
||||
{
|
||||
return make_socket(nng_pull0_open, timeout);
|
||||
}
|
||||
|
||||
inline nng_socket make_pub_socket(nng_duration timeout = DefaultTimeout)
|
||||
{
|
||||
return make_socket(nng_pub0_open, timeout);
|
||||
}
|
||||
|
||||
inline nng_socket make_sub_socket(nng_duration timeout = DefaultTimeout)
|
||||
{
|
||||
return make_socket(nng_sub0_open, timeout);
|
||||
}
|
||||
|
||||
inline std::string socket_get_string_opt(nng_socket s, const char *opt)
|
||||
{
|
||||
|
@ -40,6 +176,11 @@ inline void log_socket_info(nng_socket s, const char *info)
|
|||
spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress);
|
||||
}
|
||||
|
||||
inline std::string get_local_address(nng_socket s)
|
||||
{
|
||||
return socket_get_string_opt(s, NNG_OPT_LOCADDR);
|
||||
}
|
||||
|
||||
inline void log_pipe_info(nng_pipe p, const char *info)
|
||||
{
|
||||
auto sockName = pipe_get_string_opt(p, NNG_OPT_SOCKNAME);
|
||||
|
@ -51,10 +192,28 @@ inline void log_pipe_info(nng_pipe p, const char *info)
|
|||
spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress);
|
||||
}
|
||||
|
||||
inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 0, const char *debugInfo = "")
|
||||
using retry_predicate = std::function<bool ()>;
|
||||
|
||||
class RetryNTimes
|
||||
{
|
||||
public:
|
||||
RetryNTimes(size_t maxTries = 3)
|
||||
: maxTries_(maxTries)
|
||||
{}
|
||||
|
||||
bool operator()()
|
||||
{
|
||||
return attempt_++ < maxTries_;
|
||||
}
|
||||
|
||||
private:
|
||||
size_t maxTries_;
|
||||
size_t attempt_ = 0u;
|
||||
};
|
||||
|
||||
inline int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp, const char *debugInfo = "")
|
||||
{
|
||||
int res = 0;
|
||||
size_t attempt = 0u;
|
||||
|
||||
do
|
||||
{
|
||||
|
@ -63,17 +222,206 @@ inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries =
|
|||
if (res)
|
||||
{
|
||||
if (res != NNG_ETIMEDOUT)
|
||||
{
|
||||
spdlog::warn("send_message_retry: {} - send failed: {} (msg={})", debugInfo, nng_strerror(res), fmt::ptr(msg));
|
||||
return res;
|
||||
}
|
||||
|
||||
if (res == NNG_ETIMEDOUT)
|
||||
spdlog::warn("send_message_retry: {} - send timeout", debugInfo);
|
||||
spdlog::trace("send_message_retry: {} - send timeout (msg={})", debugInfo, fmt::ptr(msg));
|
||||
|
||||
if (maxTries > 0 && attempt >= maxTries)
|
||||
if (!rp())
|
||||
return res;
|
||||
|
||||
++attempt;
|
||||
}
|
||||
} while (res == NNG_ETIMEDOUT);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3, const char *debugInfo = "")
|
||||
{
|
||||
RetryNTimes retryPredicate(maxTries);
|
||||
|
||||
return send_message_retry(socket, msg, retryPredicate, debugInfo);
|
||||
}
|
||||
|
||||
inline int send_empty_message(nng_socket socket, retry_predicate rp)
|
||||
{
|
||||
nng_msg *msg = nullptr;
|
||||
|
||||
if (int res = allocate_reserve_message(&msg, 0))
|
||||
return res;
|
||||
|
||||
if (int res = send_message_retry(socket, msg, rp, "send_empty_message"))
|
||||
{
|
||||
nng_msg_free(msg);
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
inline int send_empty_message(nng_socket socket, size_t maxTries = 3)
|
||||
{
|
||||
RetryNTimes retryPredicate(maxTries);
|
||||
|
||||
return send_empty_message(socket, retryPredicate);
|
||||
}
|
||||
|
||||
// Read type T from the front of msg and trim the message by sizeof(T).
|
||||
template<typename T>
|
||||
std::optional<T> msg_trim_read(nng_msg *msg)
|
||||
{
|
||||
const auto oldlen = nng_msg_len(msg); (void) oldlen;
|
||||
if (nng_msg_len(msg) < sizeof(T))
|
||||
return {};
|
||||
|
||||
T result = *reinterpret_cast<T *>(nng_msg_body(msg));
|
||||
nng_msg_trim(msg, sizeof(T));
|
||||
const auto newlen = nng_msg_len(msg); (void) newlen;
|
||||
assert(newlen + sizeof(T) == oldlen);
|
||||
return result;
|
||||
}
|
||||
|
||||
inline const char *nng_stat_unit_to_string(int unit)
|
||||
{
|
||||
switch (unit)
|
||||
{
|
||||
case NNG_UNIT_BYTES:
|
||||
return "B";
|
||||
case NNG_UNIT_MESSAGES:
|
||||
return "msgs";
|
||||
case NNG_UNIT_MILLIS:
|
||||
return "ms";
|
||||
case NNG_UNIT_EVENTS:
|
||||
return "events";
|
||||
}
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
inline const char *nng_stat_type_to_string(int type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case NNG_STAT_SCOPE:
|
||||
return "scope";
|
||||
case NNG_STAT_LEVEL:
|
||||
return "level";
|
||||
case NNG_STAT_COUNTER:
|
||||
return "counter";
|
||||
case NNG_STAT_STRING:
|
||||
return "string";
|
||||
case NNG_STAT_BOOLEAN:
|
||||
return "bool";
|
||||
case NNG_STAT_ID:
|
||||
return "id";
|
||||
}
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
// Important: as of nng-1.8.0 sockets stats are only implemented for pair1 type
|
||||
// sockets!
|
||||
|
||||
template<typename Visitor>
|
||||
void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth=0)
|
||||
{
|
||||
visitor(stat, depth);
|
||||
|
||||
auto statType = nng_stat_type(stat);
|
||||
|
||||
if (statType == NNG_STAT_SCOPE)
|
||||
{
|
||||
for (auto child = nng_stat_child(stat); child; child = nng_stat_next(child))
|
||||
{
|
||||
visit_nng_stats(child, visitor, depth+1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<typename Value>
|
||||
std::string format_stat(int type, const char *name, const char *desc, std::uint64_t ts, Value value, int unit)
|
||||
{
|
||||
return fmt::format("type={}, name={}, desc={}, ts={}, value={}, unit={}",
|
||||
nng::nng_stat_type_to_string(type),
|
||||
name, desc, ts, value,
|
||||
nng::nng_stat_unit_to_string(unit));
|
||||
}
|
||||
|
||||
inline std::string format_stat(nng_stat *stat)
|
||||
{
|
||||
switch (nng_stat_type(stat))
|
||||
{
|
||||
case NNG_STAT_BOOLEAN:
|
||||
return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat));
|
||||
default:
|
||||
return format_stat(nng_stat_type(stat), nng_stat_name(stat), nng_stat_desc(stat), nng_stat_timestamp(stat), nng_stat_value(stat), nng_stat_unit(stat));
|
||||
}
|
||||
}
|
||||
|
||||
static void custom_nng_msg_free(nng_msg *msg)
|
||||
{
|
||||
//spdlog::warn("custom_nng_msg_free: msg={}", fmt::ptr(msg));
|
||||
nng_msg_free(msg);
|
||||
}
|
||||
|
||||
using unique_msg = std::unique_ptr<nng_msg, decltype(&nng_msg_free)>;
|
||||
|
||||
inline unique_msg make_unique_msg(nng_msg *msg = nullptr, decltype(&nng_msg_free) deleter = custom_nng_msg_free)
|
||||
{
|
||||
return unique_msg(msg, deleter);
|
||||
}
|
||||
|
||||
inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0)
|
||||
{
|
||||
nng_msg *msg = nullptr;
|
||||
|
||||
if (auto res = nng_recvmsg(sock, &msg, flags))
|
||||
{
|
||||
return { make_unique_msg(), res };
|
||||
}
|
||||
|
||||
return { make_unique_msg(msg), 0 };
|
||||
}
|
||||
|
||||
inline unique_msg allocate_reserve_message(size_t reserve = 0)
|
||||
{
|
||||
nng_msg *msg = nullptr;
|
||||
|
||||
if (auto res = nng_msg_alloc(&msg, 0))
|
||||
{
|
||||
mesy_nng_error("allocate_reserve_message", res);
|
||||
return make_unique_msg();
|
||||
}
|
||||
|
||||
if (auto res = nng_msg_reserve(msg, reserve))
|
||||
{
|
||||
mesy_nng_error("allocate_reserve_message", res);
|
||||
nng_msg_free(msg);
|
||||
return make_unique_msg();
|
||||
}
|
||||
|
||||
return make_unique_msg(msg);
|
||||
}
|
||||
|
||||
inline int marry_listen_dial(nng_socket listen, nng_socket dial, const char *url)
|
||||
{
|
||||
if (int res = nng_listen(listen, url, nullptr, 0))
|
||||
{
|
||||
mesy_nng_error("nng_listen", res);
|
||||
return res;
|
||||
}
|
||||
|
||||
if (int res = nng_dial(dial, url, nullptr, 0))
|
||||
{
|
||||
mesy_nng_error("nng_dial", res);
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif /* B18E3651_CA9A_43BC_AA25_810EA16533CD */
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
#include <future>
|
||||
#include "mesy_nng.h"
|
||||
|
||||
using namespace mesytec::nng;
|
||||
|
||||
struct PipelineElement
|
||||
{
|
||||
nng_socket inputSocket = NNG_SOCKET_INITIALIZER;
|
||||
|
@ -380,4 +382,4 @@ int main(int argc, char *argv[])
|
|||
mesy_nng_fatal("main close_pipeline", res);
|
||||
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
#include "common.h"
|
||||
#include "mesy_nng.h"
|
||||
#include <nng/protocol/pipeline0/push.h>
|
||||
#include <nng/protocol/pipeline0/pull.h>
|
||||
#include <thread>
|
||||
|
||||
#include "test_producer_consumer.h"
|
||||
|
||||
using namespace mesytec::nng;
|
||||
|
||||
struct Header
|
||||
{
|
||||
unsigned id;
|
||||
|
|
11
src/mnode_proto_test1.cc
Normal file
11
src/mnode_proto_test1.cc
Normal file
|
@ -0,0 +1,11 @@
|
|||
#include "proto/service.pb.h"
|
||||
|
||||
int main()
|
||||
{
|
||||
mesytec::mnode::SearchRequest request;
|
||||
request.set_query("query");
|
||||
request.set_page_number(42);
|
||||
request.set_results_per_page(10);
|
||||
|
||||
mesytec::mnode::Ping pingRequest;
|
||||
}
|
|
@ -3,32 +3,13 @@
|
|||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||
#include <nng/nng.h>
|
||||
#include <sys/prctl.h>
|
||||
#include "common.h"
|
||||
#include "mesy_nng.h"
|
||||
|
||||
using namespace mesytec;
|
||||
using namespace mesytec::mvlc;
|
||||
using namespace mesytec::nng;
|
||||
|
||||
int allocate_reserve_message(nng_msg **msg, size_t reserve = 0)
|
||||
{
|
||||
assert(msg);
|
||||
|
||||
if (auto res = nng_msg_alloc(msg, 0))
|
||||
{
|
||||
spdlog::error("nng_msg_alloc: {}", nng_strerror(res));
|
||||
return res;
|
||||
}
|
||||
|
||||
if (auto res = nng_msg_reserve(*msg, reserve))
|
||||
{
|
||||
spdlog::error("nng_msg_reserve: {}", nng_strerror(res));
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static const size_t DefaultOutputMessageReserve = util::Megabytes(1);
|
||||
static const size_t DefaultOutputMessageReserve = mvlc::util::Megabytes(1);
|
||||
|
||||
enum class BufferType: u32
|
||||
{
|
||||
|
@ -159,7 +140,7 @@ void listfile_reader_producer(
|
|||
}
|
||||
|
||||
spdlog::info("listfile_reader_producer: done, sent {} messages, totalSize={:.2f} MiB",
|
||||
bufferNumber, 1.0 * totalBytesSent / util::Megabytes(1));
|
||||
bufferNumber, 1.0 * totalBytesSent / mvlc::util::Megabytes(1));
|
||||
}
|
||||
catch(const std::exception& e)
|
||||
{
|
||||
|
@ -439,7 +420,7 @@ void listfile_parser_nng(
|
|||
auto log_stats = [&]
|
||||
{
|
||||
spdlog::info("listfile_parser_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB",
|
||||
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1));
|
||||
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1));
|
||||
spdlog::info("listfile_parser_nng: time budget: "
|
||||
" tReceive = {} ms, "
|
||||
" tProcess = {} ms, "
|
||||
|
@ -565,20 +546,6 @@ void listfile_parser_nng(
|
|||
}
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
std::optional<T> msg_trim_read(nng_msg *msg)
|
||||
{
|
||||
const auto oldlen = nng_msg_len(msg);
|
||||
if (nng_msg_len(msg) < sizeof(T))
|
||||
return {};
|
||||
|
||||
T result = *reinterpret_cast<T *>(nng_msg_body(msg));
|
||||
nng_msg_trim(msg, sizeof(T));
|
||||
const auto newlen = nng_msg_len(msg);
|
||||
assert(newlen + sizeof(T) == oldlen);
|
||||
return result;
|
||||
}
|
||||
|
||||
void analysis_nng(
|
||||
nng_socket inputSocket
|
||||
)
|
||||
|
@ -672,10 +639,10 @@ void analysis_nng(
|
|||
}
|
||||
|
||||
spdlog::info("analysis_nng: lastInputMessageNumber={}, inputBuffersLost={}, totalInput={:.2f} MiB",
|
||||
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1));
|
||||
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / mvlc::util::Megabytes(1));
|
||||
}
|
||||
|
||||
void pipe_cb(nng_pipe p, nng_pipe_ev event, void *arg)
|
||||
void pipe_cb(nng_pipe p, nng_pipe_ev event, void */*arg*/)
|
||||
{
|
||||
switch (event)
|
||||
{
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
#include "common.h"
|
||||
#include "test_producer_consumer.h"
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
int main()
|
||||
{
|
||||
spdlog::set_level(spdlog::level::info);
|
||||
auto consumerSocket = make_pair_socket();
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
#include "common.h"
|
||||
#include "test_producer_consumer.h"
|
||||
|
||||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
#include "common.h"
|
||||
#include "test_producer_consumer.h"
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
#include "common.h"
|
||||
#include "test_producer_consumer.h"
|
||||
#include <nng/protocol/pubsub0/pub.h>
|
||||
#include <thread>
|
||||
#include <iostream>
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
|
||||
int main()
|
||||
{
|
||||
spdlog::set_level(spdlog::level::info);
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
#include "common.h"
|
||||
#include "test_producer_consumer.h"
|
||||
#include <nng/protocol/pubsub0/sub.h>
|
||||
#include <thread>
|
||||
#include <iostream>
|
||||
|
|
|
@ -1,57 +1,13 @@
|
|||
#ifndef __MESYTEC_MVLC_NNG_NODE_COMMON_H__
|
||||
#define __MESYTEC_MVLC_NNG_NODE_COMMON_H__
|
||||
#ifndef E6EFFE63_EB2C_4573_B723_61840850BBF6
|
||||
#define E6EFFE63_EB2C_4573_B723_61840850BBF6
|
||||
|
||||
#include <nng/nng.h>
|
||||
#include <nng/protocol/pair0/pair.h>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include "mesy_nng.h"
|
||||
|
||||
using namespace mesytec::nng; // to make the old code compile. for test code only.
|
||||
|
||||
static const size_t BufferSize = 1024u * 1024u;
|
||||
static const size_t BuffersToSend = 100000u;
|
||||
|
||||
inline void mesy_nng_fatal(const char *const msg, int rv)
|
||||
{
|
||||
spdlog::error("{} ({})", msg, nng_strerror(rv));
|
||||
abort();
|
||||
}
|
||||
|
||||
inline nng_msg *alloc_message(size_t size)
|
||||
{
|
||||
nng_msg *msg = {};
|
||||
if (int res = nng_msg_alloc(&msg, size))
|
||||
mesy_nng_fatal("nng_msg_alloc", res);
|
||||
return msg;
|
||||
}
|
||||
|
||||
inline int receive_message(nng_socket sock, nng_msg **msg_ptr, int flags = 0)
|
||||
{
|
||||
if (auto res = nng_recvmsg(sock, msg_ptr, flags))
|
||||
{
|
||||
nng_msg_free(*msg_ptr);
|
||||
*msg_ptr = NULL;
|
||||
return res;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
inline nng_socket make_pair_socket()
|
||||
{
|
||||
nng_socket socket;
|
||||
|
||||
if (int res = nng_pair0_open(&socket))
|
||||
mesy_nng_fatal("nng_pair0_open", res);
|
||||
|
||||
nng_duration timeout = 100;
|
||||
|
||||
if (int res = nng_socket_set(socket, NNG_OPT_RECVTIMEO, &timeout, sizeof(timeout)))
|
||||
mesy_nng_fatal("nng_socket_set", res);
|
||||
|
||||
if (int res = nng_socket_set(socket, NNG_OPT_SENDTIMEO, &timeout, sizeof(timeout)))
|
||||
mesy_nng_fatal("nng_socket_set", res);
|
||||
|
||||
return socket;
|
||||
}
|
||||
|
||||
inline void producer(nng_socket socket)
|
||||
{
|
||||
while (true)
|
||||
|
@ -165,4 +121,4 @@ inline void consumer(nng_socket socket)
|
|||
spdlog::info("consumer received {} messages, {:.2f} MiB/s, {:.2f} buffers/s", recvCount, MiBs, bufs);
|
||||
}
|
||||
|
||||
#endif /* __MESYTEC_MVLC_NNG_NODE_COMMON_H__ */
|
||||
#endif /* BD81CA64_A8E6_4938_9E27_773D6E7C4E9B */
|
22
src/thread_name.cc
Normal file
22
src/thread_name.cc
Normal file
|
@ -0,0 +1,22 @@
|
|||
#include "thread_name.h"
|
||||
#ifdef __linux__
|
||||
#include <sys/prctl.h>
|
||||
#endif
|
||||
|
||||
namespace mesytec::util
|
||||
{
|
||||
|
||||
#ifdef __linux__
|
||||
void set_thread_name(const char *name)
|
||||
{
|
||||
prctl(PR_SET_NAME,name,0,0,0);
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
void set_thread_name(const char *)
|
||||
{
|
||||
}
|
||||
|
||||
#endif
|
||||
}
|
9
src/thread_name.h
Normal file
9
src/thread_name.h
Normal file
|
@ -0,0 +1,9 @@
|
|||
#ifndef CAF8B988_F0C9_475A_8E38_8789949859DB
|
||||
#define CAF8B988_F0C9_475A_8E38_8789949859DB
|
||||
|
||||
namespace mesytec::util
|
||||
{
|
||||
void set_thread_name(const char *name);
|
||||
}
|
||||
|
||||
#endif /* CAF8B988_F0C9_475A_8E38_8789949859DB */
|
Loading…
Reference in a new issue