bit of refactoring and more error handling
This commit is contained in:
parent
31df88ff23
commit
ee5b451434
1 changed files with 94 additions and 22 deletions
|
@ -187,13 +187,49 @@ size_t fixup_listfile_buffer_message(const BufferType &bufferType, nng_msg *msg,
|
||||||
return bytesMoved;
|
return bytesMoved;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string socket_get_string_opt(nng_socket s, const char *opt)
|
||||||
|
{
|
||||||
|
char *dest = nullptr;
|
||||||
|
|
||||||
|
if (nng_socket_get_string(s, opt, &dest))
|
||||||
|
return {};
|
||||||
|
|
||||||
|
std::string result{*dest};
|
||||||
|
nng_strfree(dest);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
void log_socket_info(nng_socket s, const char *info)
|
||||||
|
{
|
||||||
|
auto sockName = socket_get_string_opt(s, NNG_OPT_SOCKNAME);
|
||||||
|
auto localAddress = socket_get_string_opt(s, NNG_OPT_LOCADDR);
|
||||||
|
auto remoteAddress = socket_get_string_opt(s, NNG_OPT_REMADDR);
|
||||||
|
|
||||||
|
spdlog::info("{}: {}={}", info, NNG_OPT_SOCKNAME, sockName);
|
||||||
|
spdlog::info("{}: {}={}", info, NNG_OPT_LOCADDR, localAddress);
|
||||||
|
spdlog::info("{}: {}={}", info, NNG_OPT_REMADDR, remoteAddress);
|
||||||
|
}
|
||||||
|
|
||||||
void listfile_reader_producer(
|
void listfile_reader_producer(
|
||||||
nng_socket socket,
|
nng_socket outputSocket,
|
||||||
mvlc::listfile::ReadHandle &input,
|
mvlc::listfile::ReadHandle &input,
|
||||||
const BufferType &bufferType)
|
const BufferType &bufferType)
|
||||||
{
|
{
|
||||||
prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0);
|
prctl(PR_SET_NAME,"listfile_reader_producer",0,0,0);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
log_socket_info(outputSocket, "listfile_reader_producer - outputSocket");
|
||||||
|
|
||||||
|
if (auto res = nng_close(outputSocket))
|
||||||
|
{
|
||||||
|
spdlog::error("nng_close: {}", nng_strerror(res));
|
||||||
|
log_socket_info(outputSocket, "listfile_reader_producer - outputSocket");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
#endif
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
u32 bufferNumber = 0u;
|
u32 bufferNumber = 0u;
|
||||||
|
@ -238,7 +274,7 @@ void listfile_reader_producer(
|
||||||
|
|
||||||
const auto msgSize = nng_msg_len(msg);
|
const auto msgSize = nng_msg_len(msg);
|
||||||
|
|
||||||
if (auto res = send_message_retry(socket, msg, 0, "listfile_reader_producer"))
|
if (auto res = send_message_retry(outputSocket, msg, 0, "listfile_reader_producer"))
|
||||||
{
|
{
|
||||||
nng_msg_free(msg);
|
nng_msg_free(msg);
|
||||||
msg = nullptr;
|
msg = nullptr;
|
||||||
|
@ -493,27 +529,14 @@ private:
|
||||||
void listfile_parser_nng(
|
void listfile_parser_nng(
|
||||||
nng_socket inputSocket,
|
nng_socket inputSocket,
|
||||||
nng_socket outputSocket,
|
nng_socket outputSocket,
|
||||||
const mvlc::listfile::Preamble &preamble)
|
const mvlc::CrateConfig &crateConfig)
|
||||||
{
|
{
|
||||||
prctl(PR_SET_NAME,"listfile_parser_nng",0,0,0);
|
prctl(PR_SET_NAME,"listfile_parser_nng",0,0,0);
|
||||||
|
|
||||||
size_t totalInputBytes = 0u;
|
size_t totalInputBytes = 0u;
|
||||||
u32 lastInputMessageNumber = 0u;
|
u32 lastInputMessageNumber = 0u;
|
||||||
size_t inputBuffersLost = 0;
|
size_t inputBuffersLost = 0;
|
||||||
const auto listfileFormat = (preamble.magic == mvlc::listfile::get_filemagic_eth()
|
const auto listfileFormat = crateConfig.connectionType;
|
||||||
? ConnectionType::ETH
|
|
||||||
: ConnectionType::USB);
|
|
||||||
|
|
||||||
auto configSection = preamble.findCrateConfig();
|
|
||||||
|
|
||||||
if (!configSection)
|
|
||||||
{
|
|
||||||
spdlog::error("listfile_parser_nng - no CrateConfig found in listfile preamble");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto configYaml = configSection->contentsToString();
|
|
||||||
auto crateConfig = mvlc::crate_config_from_yaml(configYaml);
|
|
||||||
|
|
||||||
std::string stacksYaml;
|
std::string stacksYaml;
|
||||||
for (const auto &stack: crateConfig.stacks)
|
for (const auto &stack: crateConfig.stacks)
|
||||||
|
@ -786,7 +809,42 @@ int main(int argc, char *argv[])
|
||||||
spdlog::set_level(spdlog::level::info);
|
spdlog::set_level(spdlog::level::info);
|
||||||
|
|
||||||
if (argc < 2)
|
if (argc < 2)
|
||||||
|
{
|
||||||
|
std::cerr << fmt::format("Usage: mvlc_nng_replay <zipfile>\n");
|
||||||
return 1;
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto inputFilename = argv[1];
|
||||||
|
|
||||||
|
listfile::ZipReader zipReader;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
zipReader.openArchive(inputFilename);
|
||||||
|
}
|
||||||
|
catch (const std::exception &e)
|
||||||
|
{
|
||||||
|
std::cout << fmt::format("Error: could not open '{}' for reading: {}\n", inputFilename, e.what());
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
listfile::ReadHandle *readHandle = nullptr;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
readHandle = zipReader.openEntry(zipReader.firstListfileEntryName());
|
||||||
|
}
|
||||||
|
catch(const std::exception& e)
|
||||||
|
{
|
||||||
|
if (zipReader.firstListfileEntryName().empty())
|
||||||
|
std::cout << fmt::format("Error: no MVLC listfile found in '{}'\n", inputFilename);
|
||||||
|
else
|
||||||
|
std::cout << fmt::format("Error: could not open listfile '{}' in '{}' for reading: {}\n",
|
||||||
|
zipReader.firstListfileEntryName(), inputFilename, e.what());
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(readHandle);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -795,11 +853,15 @@ int main(int argc, char *argv[])
|
||||||
if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0))
|
if (int res = nng_listen(readerOutputSocket, "inproc://1", nullptr, 0))
|
||||||
mesy_nng_fatal("nng_listen inproc", res);
|
mesy_nng_fatal("nng_listen inproc", res);
|
||||||
|
|
||||||
|
log_socket_info(readerOutputSocket, "readerOutputSocket");
|
||||||
|
|
||||||
auto parserInputSocket = make_pair_socket();
|
auto parserInputSocket = make_pair_socket();
|
||||||
|
|
||||||
if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0))
|
if (int res = nng_dial(parserInputSocket, "inproc://1", nullptr, 0))
|
||||||
mesy_nng_fatal("nng_dial inproc", res);
|
mesy_nng_fatal("nng_dial inproc", res);
|
||||||
|
|
||||||
|
log_socket_info(parserInputSocket, "parserInputSocket");
|
||||||
|
|
||||||
auto parserOutputSocket = make_pair_socket();
|
auto parserOutputSocket = make_pair_socket();
|
||||||
|
|
||||||
if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0))
|
if (int res = nng_listen(parserOutputSocket, "inproc://2", nullptr, 0))
|
||||||
|
@ -810,15 +872,25 @@ int main(int argc, char *argv[])
|
||||||
if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0))
|
if (int res = nng_dial(analysisInputSocket, "inproc://2", nullptr, 0))
|
||||||
mesy_nng_fatal("nng_dial inproc", res);
|
mesy_nng_fatal("nng_dial inproc", res);
|
||||||
|
|
||||||
listfile::ZipReader zipReader;
|
|
||||||
zipReader.openArchive(argv[1]);
|
|
||||||
spdlog::info("replaying from {}", zipReader.firstListfileEntryName());
|
spdlog::info("replaying from {}", zipReader.firstListfileEntryName());
|
||||||
auto readHandle = zipReader.openEntry(zipReader.firstListfileEntryName());
|
|
||||||
auto preamble = mvlc::listfile::read_preamble(*readHandle);
|
auto preamble = mvlc::listfile::read_preamble(*readHandle);
|
||||||
const auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth()
|
const auto bufferType = (preamble.magic == mvlc::listfile::get_filemagic_eth()
|
||||||
? BufferType::MVLC_ETH
|
? BufferType::MVLC_ETH
|
||||||
: BufferType::MVLC_USB);
|
: BufferType::MVLC_USB);
|
||||||
// Read past the magic bytes at the start of each listfile.
|
auto crateConfigSection = preamble.findCrateConfig();
|
||||||
|
|
||||||
|
if (!crateConfigSection)
|
||||||
|
{
|
||||||
|
spdlog::error("listfile_parser_nng - no CrateConfig found in listfile preamble");
|
||||||
|
// FIXME: close sockets
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto configYaml = crateConfigSection->contentsToString();
|
||||||
|
auto crateConfig = mvlc::crate_config_from_yaml(configYaml);
|
||||||
|
|
||||||
|
// Seek to start, then read past the magic bytes at the beginning of the
|
||||||
|
// listfile.
|
||||||
(void) listfile::read_magic(*readHandle);
|
(void) listfile::read_magic(*readHandle);
|
||||||
|
|
||||||
std::vector<std::thread> threads;
|
std::vector<std::thread> threads;
|
||||||
|
@ -827,7 +899,7 @@ int main(int argc, char *argv[])
|
||||||
readerOutputSocket, std::ref(*readHandle), std::cref(bufferType)));
|
readerOutputSocket, std::ref(*readHandle), std::cref(bufferType)));
|
||||||
|
|
||||||
threads.emplace_back(std::thread(listfile_parser_nng,
|
threads.emplace_back(std::thread(listfile_parser_nng,
|
||||||
parserInputSocket, parserOutputSocket, std::cref(preamble)));
|
parserInputSocket, parserOutputSocket, std::cref(crateConfig)));
|
||||||
|
|
||||||
threads.emplace_back(std::thread(analysis_nng,
|
threads.emplace_back(std::thread(analysis_nng,
|
||||||
analysisInputSocket));
|
analysisInputSocket));
|
||||||
|
|
Loading…
Reference in a new issue