diff --git a/src/mvlc_nng_replay.cc b/src/mvlc_nng_replay.cc index 7e2f941..f17b89b 100644 --- a/src/mvlc_nng_replay.cc +++ b/src/mvlc_nng_replay.cc @@ -187,13 +187,49 @@ size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg, return bytesMoved; } +std::string socket_get_string_opt(nng_socket s, const char *opt) +{ + char *dest = nullptr; + + if (nng_socket_get_string(s, opt, &dest)) + return {}; + + std::string result{*dest}; + nng_strfree(dest); + return result; +} + +void log_socket_info(nng_socket s, const char *info) +{ + auto sockName = socket_get_string_opt(s, NNG_OPT_SOCKNAME); + auto localAddress = socket_get_string_opt(s, NNG_OPT_LOCADDR); + auto remoteAddress = socket_get_string_opt(s, NNG_OPT_REMADDR); + + spdlog::info("{}: {}={}", info, NNG_OPT_SOCKNAME, sockName); + spdlog::info("{}: {}={}", info, NNG_OPT_LOCADDR, localAddress); + spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress); +} + void listfile_reader_producer( - nng_socket socket, + nng_socket outputSocket, mvlc::listfile::ReadHandle &input, const BufferType &bufferType) { prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0); + #if 0 + log_socket_info(outputSocket, "listfile_reader_producer - outputSocket"); + + if (auto res = nng_close(outputSocket)) + { + spdlog::error("nng_close: {}", nng_strerror(res)); + log_socket_info(outputSocket, "listfile_reader_producer - outputSocket"); + return; + } + + return; + #endif + try { u32 bufferNumber = 0u; @@ -238,7 +274,7 @@ void listfile_reader_producer( const auto msgSize = nng_msg_len(msg); - if (auto res = send_message_retry(socket, msg, 0, "listfile_reader_producer")) + if (auto res = send_message_retry(outputSocket, msg, 0, "listfile_reader_producer")) { nng_msg_free(msg); msg = nullptr; @@ -493,27 +529,14 @@ private: void listfile_parser_nng( nng_socket inputSocket, nng_socket outputSocket, - const mvlc::listfile::Preamble &preamble) + const mvlc::CrateConfig &crateConfig) { prctl(PR_SET_NAME,"listfile_parser_nng",0,0,0); size_t totalInputBytes = 0u; u32 lastInputMessageNumber = 0u; size_t inputBuffersLost = 0; - const auto listfileFormat = (preamble.magic == mvlc::listfile::get_filemagic_eth() - ? ConnectionType::ETH - : ConnectionType::USB); - - auto configSection = preamble.findCrateConfig(); - - if (!configSection) - { - spdlog::error("listfile_parser_nng - no CrateConfig found in listfile preamble"); - return; - } - - auto configYaml = configSection->contentsToString(); - auto crateConfig = mvlc::crate_config_from_yaml(configYaml); + const auto listfileFormat = crateConfig.connectionType; std::string stacksYaml; for (const auto &stack: crateConfig.stacks) @@ -786,7 +809,42 @@ int main(int argc, char *argv[]) spdlog::set_level(spdlog::level::info); if (argc < 2) + { + std::cerr << fmt::format("Usage: mvlc_nng_replay \n"); return 1; + } + + const auto inputFilename = argv[1]; + + listfile::ZipReader zipReader; + try + { + zipReader.openArchive(inputFilename); + } + catch (const std::exception &e) + { + std::cout << fmt::format("Error: could not open '{}' for reading: {}\n", inputFilename, e.what()); + return 1; + } + + listfile::ReadHandle *readHandle = nullptr; + + try + { + readHandle = zipReader.openEntry(zipReader.firstListfileEntryName()); + } + catch(const std::exception& e) + { + if (zipReader.firstListfileEntryName().empty()) + std::cout << fmt::format("Error: no MVLC listfile found in '{}'\n", inputFilename); + else + std::cout << fmt::format("Error: could not open listfile '{}' in '{}' for reading: {}\n", + zipReader.firstListfileEntryName(), inputFilename, e.what()); + + return 1; + } + + assert(readHandle); try { @@ -795,11 +853,15 @@ int main(int argc, char *argv[]) if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0)) mesy_nng_fatal("nng_listen inproc", res); + log_socket_info(readerOutputSocket, "readerOutputSocket"); + auto parserInputSocket = make_pair_socket(); if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0)) mesy_nng_fatal("nng_dial inproc", res); + log_socket_info(parserInputSocket, "parserInputSocket"); + auto parserOutputSocket = make_pair_socket(); if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0)) @@ -810,15 +872,25 @@ int main(int argc, char *argv[]) if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0)) mesy_nng_fatal("nng_dial inproc", res); - listfile::ZipReader zipReader; - zipReader.openArchive(argv[1]); spdlog::info("replaying from {}", zipReader.firstListfileEntryName()); - auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName()); auto preamble = mvlc::listfile::read_preamble(*readHandle); const auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth() ? BufferType::MVLC_ETH : BufferType::MVLC_USB); - // Read past the magic bytes at the start of each listfile. + auto crateConfigSection = preamble.findCrateConfig(); + + if (!crateConfigSection) + { + spdlog::error("listfile_parser_nng - no CrateConfig found in listfile preamble"); + // FIXME: close sockets + return 1; + } + + auto configYaml = crateConfigSection->contentsToString(); + auto crateConfig = mvlc::crate_config_from_yaml(configYaml); + + // Seek to start, then read past the magic bytes at the beginning of the + // listfile. (void) listfile::read_magic(*readHandle); std::vector threads; @@ -827,7 +899,7 @@ int main(int argc, char *argv[]) readerOutputSocket, std::ref(*readHandle), std::cref(bufferType))); threads.emplace_back(std::thread(listfile_parser_nng, - parserInputSocket, parserOutputSocket, std::cref(preamble))); + parserInputSocket, parserOutputSocket, std::cref(crateConfig))); threads.emplace_back(std::thread(analysis_nng, analysisInputSocket));