From 5ca8329db1d5e1f947c56101623a57b8854beed9 Mon Sep 17 00:00:00 2001 From: oxmox Date: Sat, 4 Feb 2023 01:56:30 +0100 Subject: [PATCH] buggy pub sub code --- src/doompanning.cc | 126 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 108 insertions(+), 18 deletions(-) diff --git a/src/doompanning.cc b/src/doompanning.cc index ad26ae3..02ba0f9 100644 --- a/src/doompanning.cc +++ b/src/doompanning.cc @@ -36,7 +36,10 @@ nng_socket make_ctrl_sub(const char *url) nng_socket sock; if (auto res = nng_sub0_open(&sock)) - dp_nng_fatal("make_ctrl_pub/nng_sub0_open", res); + dp_nng_fatal("make_ctrl_sub/nng_sub0_open", res); + + if (auto res = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) + dp_nng_fatal("make_ctrl_sub/subscribe", res); if (auto res = nng_listen(sock, url, nullptr, 0)) dp_nng_fatal("make_ctrl_sub/nng_listen", res); @@ -46,19 +49,117 @@ nng_socket make_ctrl_sub(const char *url) nng_socket make_doom_pub(const char *url) { + nng_socket sock; + + if (auto res = nng_pub0_open(&sock)) + dp_nng_fatal("make_doom_pub/nng_pub0_open", res); + + if (auto res = nng_dial(sock, url, nullptr, NNG_FLAG_NONBLOCK)) + dp_nng_fatal("make_doom_pub/nng_dial", res); + + return sock; } nng_socket make_doom_sub(const char *url) { + nng_socket sock; + + if (auto res = nng_sub0_open(&sock)) + dp_nng_fatal("make_doom_sub/nng_sub0_open", res); + + if (auto res = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) + dp_nng_fatal("make_doom_sub/subscribe", res); + + if (auto res = nng_dial(sock, url, nullptr, NNG_FLAG_NONBLOCK)) + dp_nng_fatal("make_doom_sub/nng_dial", res); + + return sock; } -static const char *CtrlUrl = "ipc://666_ctrl.socket"; -static const char *DoomUrl = "ipc://666_doom.socket"; -static const size_t NumDooms = 2; +//static const char *CtrlUrl = "ipc://666_ctrl.socket"; // controller publishes here +//static const char *DoomUrl = "ipc://666_doom.socket"; // dooms publish here +static const char *CtrlUrl = "tcp://127.0.0.1:6666"; // controller publishes here +static const char *DoomUrl = "tcp://127.0.0.1:6667"; // dooms publish here +static const size_t NumDooms = 10; -int run_doom(size_t id) +int run_controller(const size_t numDooms) { - spdlog::debug("doom#{} running", id); + nng_set_ncpu_max(1); + nng_set_pool_thread_limit_max(1); + nng_set_resolve_thread_max(1); + + auto pubSock = make_ctrl_pub(CtrlUrl); + auto subSock = make_ctrl_sub(DoomUrl); + + size_t readyCount = 0; + + spdlog::debug("ctrl waiting for hello from dooms"); + while (readyCount < numDooms) + { + char *buf = nullptr; + size_t sz = 0; + + if (auto res = nng_recv(subSock, &buf, &sz, NNG_FLAG_ALLOC)) + dp_nng_fatal("run_ctrl/sub/go", res); + + spdlog::debug("ctrl received {} bytes from a doom", sz); + nng_free(buf, sz); + ++readyCount; + } + + spdlog::debug("ctrl is unleashing dooms"); + + if (auto res = nng_send(pubSock, (void *)&readyCount, sizeof(readyCount), 0)) + dp_nng_fatal("run_ctrl/pub/unleash", res); + + nng_close(pubSock); + nng_close(subSock); + + return 0; +} + +int run_doom(const size_t id) +{ + spdlog::debug("doom#{} started", id); + + nng_set_ncpu_max(1); + nng_set_pool_thread_limit_max(1); + nng_set_resolve_thread_max(1); + + auto pubSock = make_doom_pub(DoomUrl); + auto subSock = make_doom_sub(CtrlUrl); + + + while (true) // FIXME: ctrl code does not work this way + { + spdlog::debug("doom{} sending hello", id); + if (auto res = nng_send(pubSock, (void *)&id, sizeof(id), 0)) + dp_nng_fatal("run_doom/pub/hello", res); + + { + char *buf = nullptr; + size_t sz = 0; + + spdlog::debug("doom{} waiting for unleash", id); + auto res = nng_recv(subSock, &buf, &sz, NNG_FLAG_ALLOC | NNG_FLAG_NONBLOCK); + + if (res == NNG_EAGAIN) + { + usleep(10 * 1000); + continue; + } + else if (res) + dp_nng_fatal("run_doom/sub/go", res); + + spdlog::debug("doom{} unleashed by {} bytes!", id, sz); + nng_free(buf, sz); + break; + } + } + + nng_close(pubSock); + nng_close(subSock); + return 0; } @@ -80,16 +181,5 @@ int main(int argc, char *argv[]) spdlog::debug("spawned doom#{}", i); } - return 0; - - auto ctrlPubSock = make_ctrl_pub(CtrlUrl); - auto ctrlSubSock = make_ctrl_sub(DoomUrl); - - if (auto res = nng_send(ctrlPubSock, (void *)"foobar", 7, 0)) - dp_nng_fatal("ctrl send", res); - - - nng_close(ctrlPubSock); - nng_close(ctrlSubSock); - return 0; + return run_controller(NumDooms); }