diff --git a/src/mvlc_nng_replay.cc b/src/mvlc_nng_replay.cc index f17b89b..73fa73e 100644 --- a/src/mvlc_nng_replay.cc +++ b/src/mvlc_nng_replay.cc @@ -4,34 +4,11 @@ #include #include #include "common.h" +#include "mesy_nng.h" using namespace mesytec; using namespace mesytec::mvlc; -int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 0, const char *debugInfo = "") -{ - int res = 0; - size_t attempt = 0u; - - do - { - res = nng_sendmsg(socket, msg, 0); - - if (res && res == NNG_ETIMEDOUT) - spdlog::warn("send_message_retry: {} - send timeout", debugInfo); - - if (res && res != NNG_ETIMEDOUT) - return res; - - if (res && maxTries > 0 && attempt >= maxTries) - return res; - - ++attempt; - } while (res == NNG_ETIMEDOUT); - - return 0; -} - // Follows the framing structure inside the buffer until an incomplete frame // which doesn't fit into the buffer is detected. The incomplete data is moved // over to the tempBuffer so that the readBuffer ends with a complete frame. @@ -149,22 +126,22 @@ enum class MessageType: u8 ParsedEvents, }; -#define PACKED_AND_ALIGNED __attribute__((packed, aligned(4))) +#define PACK_AND_ALIGN4 __attribute__((packed, aligned(4))) -struct PACKED_AND_ALIGNED MessageHeaderBase +struct PACK_AND_ALIGN4 BaseMessageHeader { MessageType messageType; u32 messageNumber; }; -struct PACKED_AND_ALIGNED ListfileBufferMessageHeader: public MessageHeaderBase +struct PACK_AND_ALIGN4 ListfileBufferMessageHeader: public BaseMessageHeader { u32 bufferType; }; static_assert(sizeof(ListfileBufferMessageHeader) % sizeof(u32) == 0); -struct PACKED_AND_ALIGNED ParsedEventsMessageHeader: public MessageHeaderBase +struct PACK_AND_ALIGN4 ParsedEventsMessageHeader: public BaseMessageHeader { }; @@ -187,28 +164,6 @@ size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, return bytesMoved; } -std::string socket_get_string_opt(nng_socket s, const char *opt) -{ - char *dest = nullptr; - - if (nng_socket_get_string(s, opt, &dest)) - return {}; - - std::string result{*dest}; - nng_strfree(dest); - return result; -} - -void log_socket_info(nng_socket s, const char *info) -{ - auto sockName = socket_get_string_opt(s, NNG_OPT_SOCKNAME); - auto localAddress = socket_get_string_opt(s, NNG_OPT_LOCADDR); - auto remoteAddress = socket_get_string_opt(s, NNG_OPT_REMADDR); - - spdlog::info("{}: {}={}", info, NNG_OPT_SOCKNAME, sockName); - spdlog::info("{}: {}={}", info, NNG_OPT_LOCADDR, localAddress); - spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress); -} void listfile_reader_producer( nng_socket outputSocket, @@ -307,7 +262,7 @@ struct ReadoutParserNngContext size_t totalSystemEvents = 0u; }; -struct PACKED_AND_ALIGNED ParsedEventHeader +struct PACK_AND_ALIGN4 ParsedEventHeader { u8 magicByte; u8 crateIndex; @@ -316,13 +271,13 @@ struct PACKED_AND_ALIGNED ParsedEventHeader static const u8 ParsedDataEventMagic = 0xF3u; static const u8 ParsedSystemEventMagic = 0xFAu; -struct PACKED_AND_ALIGNED ParsedDataEventHeader: public ParsedEventHeader +struct PACK_AND_ALIGN4 ParsedDataEventHeader: public ParsedEventHeader { u8 eventIndex; u8 moduleCount; }; -struct PACKED_AND_ALIGNED ParsedModuleHeader +struct PACK_AND_ALIGN4 ParsedModuleHeader { u16 prefixSize; u16 suffixSize; @@ -332,7 +287,7 @@ struct PACKED_AND_ALIGNED ParsedModuleHeader size_t totalBytes() const { return totalSize() * sizeof(u32); } }; -struct PACKED_AND_ALIGNED ParsedSystemEventHeader: public ParsedEventHeader +struct PACK_AND_ALIGN4 ParsedSystemEventHeader: public ParsedEventHeader { u32 eventSize; @@ -804,6 +759,25 @@ void analysis_nng( lastInputMessageNumber, inputBuffersLost, 1.0 * totalInputBytes / util::Megabytes(1)); } +void pipe_cb(nng_pipe p, nng_pipe_ev event, void *arg) +{ + switch (event) + { + case::NNG_PIPE_EV_ADD_PRE: + spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_PRE"); + break; + case::NNG_PIPE_EV_ADD_POST: + spdlog::info("pipe_cb:NNG_PIPE_EV_ADD_POST"); + log_pipe_info(p, "NNG_PIPE_EV_ADD_POST"); + break; + case::NNG_PIPE_EV_REM_POST: + spdlog::info("pipe_cb:NNG_PIPE_EV_REM_POST"); + break; + case::NNG_PIPE_EV_NUM: // silence warning + break; + } +} + int main(int argc, char *argv[]) { spdlog::set_level(spdlog::level::info); @@ -849,25 +823,34 @@ int main(int argc, char *argv[]) try { auto readerOutputSocket = make_pair_socket(); + auto parserInputSocket = make_pair_socket(); + auto parserOutputSocket = make_pair_socket(); + auto analysisInputSocket = make_pair_socket(); + + for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket }) + { + for (auto event: { NNG_PIPE_EV_ADD_PRE, NNG_PIPE_EV_ADD_POST, NNG_PIPE_EV_REM_POST }) + { + if (int res = nng_pipe_notify(socket, event, pipe_cb, nullptr)) + mesy_nng_fatal("nng_pipe_notify", res); + } + } if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0)) mesy_nng_fatal("nng_listen inproc", res); log_socket_info(readerOutputSocket, "readerOutputSocket"); - auto parserInputSocket = make_pair_socket(); if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0)) mesy_nng_fatal("nng_dial inproc", res); log_socket_info(parserInputSocket, "parserInputSocket"); - auto parserOutputSocket = make_pair_socket(); if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0)) mesy_nng_fatal("nng_listen inproc", res); - auto analysisInputSocket = make_pair_socket(); if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0)) mesy_nng_fatal("nng_dial inproc", res); @@ -905,6 +888,10 @@ int main(int argc, char *argv[]) analysisInputSocket)); for (auto &t: threads) if (t.joinable()) t.join(); + + for (auto &socket: { readerOutputSocket, parserInputSocket, parserOutputSocket, analysisInputSocket }) + if (auto res = nng_close(socket)) + mesy_nng_fatal("nng_close", res); } catch(const std::exception& e) {