#include #include #include #include #include #include #include void dp_nng_fatal(const std::string &msg, int rv) { spdlog::error("{}: {}", msg, nng_strerror(rv)); abort(); } void dp_errno_fatal(const std::string &msg) { spdlog::error("{}: {}", msg, strerror(errno)); abort(); } nng_socket make_ctrl_pub(const char *url) { nng_socket sock; if (auto res = nng_pub0_open(&sock)) dp_nng_fatal("make_ctrl_pub/nng_pub0_open", res); if (auto res = nng_listen(sock, url, nullptr, 0)) dp_nng_fatal("make_ctrl_pub/nng_listen", res); return sock; } nng_socket make_ctrl_sub(const char *url) { nng_socket sock; if (auto res = nng_sub0_open(&sock)) dp_nng_fatal("make_ctrl_sub/nng_sub0_open", res); if (auto res = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) dp_nng_fatal("make_ctrl_sub/subscribe", res); if (auto res = nng_listen(sock, url, nullptr, 0)) dp_nng_fatal("make_ctrl_sub/nng_listen", res); return sock; } nng_socket make_doom_pub(const char *url) { nng_socket sock; if (auto res = nng_pub0_open(&sock)) dp_nng_fatal("make_doom_pub/nng_pub0_open", res); if (auto res = nng_dial(sock, url, nullptr, NNG_FLAG_NONBLOCK)) dp_nng_fatal("make_doom_pub/nng_dial", res); return sock; } nng_socket make_doom_sub(const char *url) { nng_socket sock; if (auto res = nng_sub0_open(&sock)) dp_nng_fatal("make_doom_sub/nng_sub0_open", res); if (auto res = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) dp_nng_fatal("make_doom_sub/subscribe", res); if (auto res = nng_dial(sock, url, nullptr, NNG_FLAG_NONBLOCK)) dp_nng_fatal("make_doom_sub/nng_dial", res); return sock; } //static const char *CtrlUrl = "ipc://666_ctrl.socket"; // controller publishes here //static const char *DoomUrl = "ipc://666_doom.socket"; // dooms publish here static const char *CtrlUrl = "tcp://127.0.0.1:6666"; // controller publishes here static const char *DoomUrl = "tcp://127.0.0.1:6667"; // dooms publish here static const size_t NumDooms = 10; int run_controller(const size_t numDooms) { nng_set_ncpu_max(1); nng_set_pool_thread_limit_max(1); nng_set_resolve_thread_max(1); auto pubSock = make_ctrl_pub(CtrlUrl); auto subSock = make_ctrl_sub(DoomUrl); size_t readyCount = 0; spdlog::debug("ctrl waiting for hello from dooms"); while (readyCount < numDooms) { char *buf = nullptr; size_t sz = 0; if (auto res = nng_recv(subSock, &buf, &sz, NNG_FLAG_ALLOC)) dp_nng_fatal("run_ctrl/sub/go", res); spdlog::debug("ctrl received {} bytes from a doom", sz); nng_free(buf, sz); ++readyCount; } spdlog::debug("ctrl is unleashing dooms"); if (auto res = nng_send(pubSock, (void *)&readyCount, sizeof(readyCount), 0)) dp_nng_fatal("run_ctrl/pub/unleash", res); nng_close(pubSock); nng_close(subSock); return 0; } int run_doom(const size_t id) { spdlog::debug("doom#{} started", id); nng_set_ncpu_max(1); nng_set_pool_thread_limit_max(1); nng_set_resolve_thread_max(1); auto pubSock = make_doom_pub(DoomUrl); auto subSock = make_doom_sub(CtrlUrl); while (true) // FIXME: ctrl code does not work this way { spdlog::debug("doom{} sending hello", id); if (auto res = nng_send(pubSock, (void *)&id, sizeof(id), 0)) dp_nng_fatal("run_doom/pub/hello", res); { char *buf = nullptr; size_t sz = 0; spdlog::debug("doom{} waiting for unleash", id); auto res = nng_recv(subSock, &buf, &sz, NNG_FLAG_ALLOC | NNG_FLAG_NONBLOCK); if (res == NNG_EAGAIN) { usleep(10 * 1000); continue; } else if (res) dp_nng_fatal("run_doom/sub/go", res); spdlog::debug("doom{} unleashed by {} bytes!", id, sz); nng_free(buf, sz); break; } } nng_close(pubSock); nng_close(subSock); return 0; } int main(int argc, char *argv[]) { spdlog::set_level(spdlog::level::debug); spdlog::set_pattern("[%H:%M:%S.%e] [pid=%P] [%^%l%$] %v"); spdlog::info("Hello, doomed world!"); for (size_t i=0; i