try out nng_pipe_notify

This commit is contained in:
Florian Lüke 2023-07-16 06:35:26 +02:00
parent 6c450d3a47
commit ea2b18ce83

View file

@ -4,34 +4,11 @@
#include <nng/nng.h> #include <nng/nng.h>
#include <sys/prctl.h> #include <sys/prctl.h>
#include "common.h" #include "common.h"
#include "mesy_nng.h"
using namespace mesytec; using namespace mesytec;
using namespace mesytec::mvlc; using namespace mesytec::mvlc;
int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 0, const char *debugInfo = "")
{
int res = 0;
size_t attempt = 0u;
do
{
res = nng_sendmsg(socket, msg, 0);
if (res && res == NNG_ETIMEDOUT)
spdlog::warn("send_message_retry: {} - send timeout", debugInfo);
if (res && res != NNG_ETIMEDOUT)
return res;
if (res && maxTries > 0 && attempt >= maxTries)
return res;
++attempt;
} while (res == NNG_ETIMEDOUT);
return 0;
}
// Follows the framing structure inside the buffer until an incomplete frame // Follows the framing structure inside the buffer until an incomplete frame
// which doesn't fit into the buffer is detected. The incomplete data is moved // which doesn't fit into the buffer is detected. The incomplete data is moved
// over to the tempBuffer so that the readBuffer ends with a complete frame. // over to the tempBuffer so that the readBuffer ends with a complete frame.
@ -149,22 +126,22 @@ enum class MessageType: u8
ParsedEvents, ParsedEvents,
}; };
#define PACKED_AND_ALIGNED __attribute__((packed, aligned(4))) #define PACK_AND_ALIGN4 __attribute__((packed, aligned(4)))
struct PACKED_AND_ALIGNED MessageHeaderBase struct PACK_AND_ALIGN4 BaseMessageHeader
{ {
MessageType messageType; MessageType messageType;
u32 messageNumber; u32 messageNumber;
}; };
struct PACKED_AND_ALIGNED ListfileBufferMessageHeader: public MessageHeaderBase struct PACK_AND_ALIGN4 ListfileBufferMessageHeader: public BaseMessageHeader
{ {
u32 bufferType; u32 bufferType;
}; };
static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0); static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0);
struct PACKED_AND_ALIGNED ParsedEventsMessageHeader: public MessageHeaderBase struct PACK_AND_ALIGN4 ParsedEventsMessageHeader: public BaseMessageHeader
{ {
}; };
@ -187,28 +164,6 @@ size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg,
return bytesMoved; return bytesMoved;
} }
std::string socket_get_string_opt(nng_socket s, const char *opt)
{
char *dest = nullptr;
if (nng_socket_get_string(s, opt, &dest))
return {};
std::string result{*dest};
nng_strfree(dest);
return result;
}
void log_socket_info(nng_socket s, const char *info)
{
auto sockName = socket_get_string_opt(s, NNG_OPT_SOCKNAME);
auto localAddress = socket_get_string_opt(s, NNG_OPT_LOCADDR);
auto remoteAddress = socket_get_string_opt(s, NNG_OPT_REMADDR);
spdlog::info("{}: {}={}", info, NNG_OPT_SOCKNAME, sockName);
spdlog::info("{}: {}={}", info, NNG_OPT_LOCADDR, localAddress);
spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress);
}
void listfile_reader_producer( void listfile_reader_producer(
nng_socket outputSocket, nng_socket outputSocket,
@ -307,7 +262,7 @@ struct ReadoutParserNngContext
size_t totalSystemEvents = 0u; size_t totalSystemEvents = 0u;
}; };
struct PACKED_AND_ALIGNED ParsedEventHeader struct PACK_AND_ALIGN4 ParsedEventHeader
{ {
u8 magicByte; u8 magicByte;
u8 crateIndex; u8 crateIndex;
@ -316,13 +271,13 @@ struct PACKED_AND_ALIGNED ParsedEventHeader
static const u8 ParsedDataEventMagic = 0xF3u; static const u8 ParsedDataEventMagic = 0xF3u;
static const u8 ParsedSystemEventMagic = 0xFAu; static const u8 ParsedSystemEventMagic = 0xFAu;
struct PACKED_AND_ALIGNED ParsedDataEventHeader: public ParsedEventHeader struct PACK_AND_ALIGN4 ParsedDataEventHeader: public ParsedEventHeader
{ {
u8 eventIndex; u8 eventIndex;
u8 moduleCount; u8 moduleCount;
}; };
struct PACKED_AND_ALIGNED ParsedModuleHeader struct PACK_AND_ALIGN4 ParsedModuleHeader
{ {
u16 prefixSize; u16 prefixSize;
u16 suffixSize; u16 suffixSize;
@ -332,7 +287,7 @@ struct PACKED_AND_ALIGNED ParsedModuleHeader
size_t totalBytes() const { return totalSize() * sizeof(u32); } size_t totalBytes() const { return totalSize() * sizeof(u32); }
}; };
struct PACKED_AND_ALIGNED ParsedSystemEventHeader: public ParsedEventHeader struct PACK_AND_ALIGN4 ParsedSystemEventHeader: public ParsedEventHeader
{ {
u32 eventSize; u32 eventSize;
@ -804,6 +759,25 @@ void analysis_nng(
lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1)); lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1));
} }
void pipe_cb(nng_pipe p, nng_pipe_ev event, void *arg)
{
switch (event)
{
case::NNG_PIPE_EV_ADD_PRE:
spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_PRE");
break;
case::NNG_PIPE_EV_ADD_POST:
spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_POST");
log_pipe_info(p, "NNG_PIPE_EV_ADD_POST");
break;
case::NNG_PIPE_EV_REM_POST:
spdlog::info("pipe_cb:NNG_PIPE_EV_REM_POST");
break;
case::NNG_PIPE_EV_NUM: // silence warning
break;
}
}
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
spdlog::set_level(spdlog::level::info); spdlog::set_level(spdlog::level::info);
@ -849,25 +823,34 @@ int main(int argc, char *argv[])
try try
{ {
auto readerOutputSocket = make_pair_socket(); auto readerOutputSocket = make_pair_socket();
auto parserInputSocket = make_pair_socket();
auto parserOutputSocket = make_pair_socket();
auto analysisInputSocket = make_pair_socket();
for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket })
{
for (auto event: { NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST })
{
if (int res = nng_pipe_notify(socket, event, pipe_cb, nullptr))
mesy_nng_fatal("nng_pipe_notify", res);
}
}
if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0)) if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0))
mesy_nng_fatal("nng_listen inproc", res); mesy_nng_fatal("nng_listen inproc", res);
log_socket_info(readerOutputSocket, "readerOutputSocket"); log_socket_info(readerOutputSocket, "readerOutputSocket");
auto parserInputSocket = make_pair_socket();
if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0)) if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0))
mesy_nng_fatal("nng_dial inproc", res); mesy_nng_fatal("nng_dial inproc", res);
log_socket_info(parserInputSocket, "parserInputSocket"); log_socket_info(parserInputSocket, "parserInputSocket");
auto parserOutputSocket = make_pair_socket();
if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0)) if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0))
mesy_nng_fatal("nng_listen inproc", res); mesy_nng_fatal("nng_listen inproc", res);
auto analysisInputSocket = make_pair_socket();
if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0)) if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0))
mesy_nng_fatal("nng_dial inproc", res); mesy_nng_fatal("nng_dial inproc", res);
@ -905,6 +888,10 @@ int main(int argc, char *argv[])
analysisInputSocket)); analysisInputSocket));
for (auto &t: threads) if (t.joinable()) t.join(); for (auto &t: threads) if (t.joinable()) t.join();
for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket })
if (auto res = nng_close(socket))
mesy_nng_fatal("nng_close", res);
} }
catch(const std::exception& e) catch(const std::exception& e)
{ {