From 42a2d01f2af3031058d3e42749c7766b8daf7306 Mon Sep 17 00:00:00 2001 From: oxmox Date: Sat, 18 Feb 2023 22:08:12 +0100 Subject: [PATCH] build tiny messaging infrastructure, refactor, hack till it sort of works When usleep()'ing in doomsim it misses most of the quit messages. I think it's because the controller spams RunDoom all the time and the quit messages get discarded while doomsim is sleeping. --- src/doompanning.cc | 75 +++++++++++++------- src/doomsim.cc | 156 +++++++++++++++++++++++++---------------- src/dp_common.c | 12 ++-- src/dp_common.h | 104 +++++++++++++++++++++------ src/dp_common_c_test.c | 2 +- 5 files changed, 235 insertions(+), 114 deletions(-) diff --git a/src/doompanning.cc b/src/doompanning.cc index 43d7bc0..52ec08f 100644 --- a/src/doompanning.cc +++ b/src/doompanning.cc @@ -7,9 +7,11 @@ #include #include #include +#include #include #include +#include #include #include @@ -48,7 +50,7 @@ struct ControllerContext struct ControllerActions { int doomsToSpawn = 0; - bool quitAllDooms = false; + bool endAllDooms = false; }; void spawn_doom(ControllerContext &ctx) @@ -66,8 +68,9 @@ void spawn_doom(ControllerContext &ctx) nullptr }; - // Close stdin and stdout? Leave them open for now to see the logging + // TODO: Close stdin and stdout? Leave them open for now to see the logging // output. + // TODO: any way to get rid of the const_cast? if (auto err = posix_spawn(&ds.pid, "doomsim", nullptr, nullptr, const_cast(argv), nullptr)) { @@ -81,22 +84,29 @@ void spawn_doom(ControllerContext &ctx) log_info("Spawned doom#%zu, pid=%d", ds.id, ds.pid); } -void quit_all_dooms(ControllerContext &ctx) +void end_all_dooms(ControllerContext &ctx) { - // Publish { DP_MT_QuitDoom, doomid }. - { - nng_msg *msg = nullptr; - int res = 0; + nng_msg *msg = nullptr; + int res = 0; - if ((res = nng_msg_alloc(&msg, 0))) - dp_nng_fatal("ctrl/alloc", res); + if ((res = nng_msg_alloc(&msg, sizeof(MsgMcstCommand)))) + dp_nng_fatal("ctrl/nng_msg_alloc", res); - if ((res = nng_msg_append_u16(msg, DP_MT_QuitDoom))) - dp_nng_fatal("ctrl/msg", res); + auto dpmsg = DP_NNG_BODY_AS(msg, MsgMcstCommand); + dpmsg->head.msgType = DP_MT_McstCommand; + dpmsg->cmd = DP_DC_QuitDoom; - if ((res = nng_sendmsg(ctx.pub, msg, 0))) - dp_nng_fatal("ctrl/sendmsg", res); - } + if ((res = nng_sendmsg(ctx.pub, msg, 0))) + dp_nng_fatal("ctrl/sendmsg", res); +} + +void signal_all_dooms(ControllerContext &ctx, int signum) +{ + std::for_each(std::begin(ctx.dooms), std::end(ctx.dooms), + [signum] (const auto &ds) + { + kill(ds.pid, signum); + }); } void perform_actions(ControllerContext &ctx, const ControllerActions &actions) @@ -108,10 +118,10 @@ void perform_actions(ControllerContext &ctx, const ControllerActions &actions) spawn_doom(ctx); } - if (actions.quitAllDooms) + if (actions.endAllDooms) { log_info("Telling all dooms to quit"); - quit_all_dooms(ctx); + end_all_dooms(ctx); } } @@ -142,24 +152,35 @@ void check_on_dooms(ControllerContext &ctx) void do_networking(ControllerContext &ctx) { - // Publish { DP_MT_DoomReady, doomid }. + // FIXME: test code. spam RunDoom { nng_msg *msg = nullptr; int res = 0; - if ((res = nng_msg_alloc(&msg, 0))) - dp_nng_fatal("ctrl/alloc", res); + if ((res = nng_msg_alloc(&msg, sizeof(MsgMcstCommand)))) + dp_nng_fatal("ctrl/nng_msg_alloc", res); - if ((res = nng_msg_append_u16(msg, DP_MT_RunDoom))) - dp_nng_fatal("ctrl/msg", res); + auto dpmsg = DP_NNG_BODY_AS(msg, MsgMcstCommand); + dpmsg->head.msgType = DP_MT_McstCommand; + dpmsg->cmd = DP_DC_RunDoom; if ((res = nng_sendmsg(ctx.pub, msg, 0))) dp_nng_fatal("ctrl/sendmsg", res); } } -void do_cleanup(ControllerContext &ctx) +void final_cleanup(ControllerContext &ctx) { + log_debug("final cleanup: ending all dooms"); + end_all_dooms(ctx); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + check_on_dooms(ctx); + + if (!ctx.dooms.empty()) + { + log_warn("final cleanup: terminating all %zu remaining dooms", ctx.dooms.size()); + signal_all_dooms(ctx, SIGTERM); + } } ControllerActions run_ui(ControllerContext &ctx) @@ -239,8 +260,8 @@ ControllerActions run_ui(ControllerContext &ctx) if (ImGui::SameLine(); ImGui::Button(strbuf.data())) result.doomsToSpawn = doomsToSpawn; - if (ImGui::Button("Quit all Dooms")) - result.quitAllDooms = true; + if (ImGui::Button("End all Dooms")) + result.endAllDooms = true; ImGui::PopItemWidth(); ImGui::End(); @@ -302,7 +323,7 @@ int doom_controller_loop(ControllerContext &ctx) SDL_RenderPresent(ctx.renderer); } - do_cleanup(ctx); + final_cleanup(ctx); return 0; } @@ -350,7 +371,7 @@ int main(int argc, char *argv[]) ImGui_ImplSDL2_InitForSDLRenderer(window, renderer); ImGui_ImplSDLRenderer_Init(renderer); - dp_nng_init_limits(1, 1, 1); + dp_nng_init_limits(1, 1, 1); // int ncpu_max, int pool_thread_limit_max, int resolv_thread_limit ControllerContext ctx; ctx.pub = make_ctrl_pub(CtrlUrl); @@ -366,4 +387,4 @@ int main(int argc, char *argv[]) nng_close(ctx.sub); return ret; -} \ No newline at end of file +} diff --git a/src/doomsim.cc b/src/doomsim.cc index 9e8cec5..edcfab4 100644 --- a/src/doomsim.cc +++ b/src/doomsim.cc @@ -5,6 +5,8 @@ #include #include #include +#include + #include "dp_common.h" typedef struct DoomContext DoomContext; @@ -22,77 +24,97 @@ typedef struct DoomContext DoomStateFunc *f; } DoomContext; +static int publish_state(DoomContext *ctx) +{ + nng_msg *msg = NULL; + int res = 0; + + if ((res = nng_msg_alloc(&msg, sizeof(MsgDoomState)))) + dp_nng_fatal("doom/nng_msg_alloc", res); + + MsgDoomState *dmsg = DP_NNG_BODY_AS(msg, MsgDoomState); + dmsg->head.msgType = DP_MT_DoomState; + dmsg->doomId = ctx->id; + dmsg->doomState = ctx->state; + + res = nng_sendmsg(ctx->pub, msg, 0); + + return res; +} + DEF_DOOM_STATE_FUNC(do_doom_ready); DEF_DOOM_STATE_FUNC(do_doom_running); +static const useconds_t IdleSleep = 0; + DEF_DOOM_STATE_FUNC(do_doom_ready) { assert(ctx->state == DP_DS_Ready); int res = 0; - // Publish { DP_MT_DoomReady, doomid }. - { - nng_msg *msg = NULL; - - if ((res = nng_msg_alloc(&msg, 0))) - dp_nng_fatal("doom/alloc", res); - - if ((res = nng_msg_append_u16(msg, DP_MT_DoomReady))) - dp_nng_fatal("doom/msg", res); - - if ((res = nng_msg_append_u32(msg, ctx->id))) - dp_nng_fatal("doom/msg", res); - - if ((res = nng_sendmsg(ctx->pub, msg, 0))) - dp_nng_fatal("doom/sendmsg", res); - } + if ((res = publish_state(ctx))) + dp_nng_fatal("doom/publish_sate", res); // Incoming Message | Next State // ------------------------------- // DP_MT_RunDoom -> DP_DS_Running // DP_MT_QuitDoom -> DP_DS_Quitting // */none -> DP_DS_Ready + nng_msg *msg = NULL; + + if ((res = dp_recv_new_msg_nonblock(ctx->sub, &msg))) { - nng_msg *msg = NULL; + if (!dp_nng_is_timeout(res)) + dp_nng_fatal("doom/recvmsg", res); - if ((res = dp_recv_new_msg(ctx->sub, &msg))) - { - if (res != NNG_ETIMEDOUT) - dp_nng_fatal("doom/recvmsg", res); - - return 0; - } - - auto len = nng_msg_len(msg); - log_trace("do_doom_ready received message of size %zu", len); - - dmt_t mt = DP_MT_Invalid; - - if ((res = nng_msg_trim_u16(msg, &mt))) - dp_nng_fatal("doom/msg_trim", res); - - nng_msg_free(msg); - - if (mt == DP_MT_RunDoom) - { - ctx->state = DP_DS_Running; - ctx->f = do_doom_running; - } - else if (mt == DP_MT_QuitDoom) - // TODO: pop off the doomid and check against ours. - // TODO: add a QuitAllDooms message - ctx->state = DP_DS_Quitting; - - // No error on receiving different message types because other dooms - // might get readied or be told to quit. + usleep(IdleSleep); + return 0; } + log_trace("do_doom_ready received message of size %zu", nng_msg_len(msg)); + + MessageBase *msgBase = DP_NNG_BODY_AS(msg, MessageBase); + + DP_DoomCommand cmd = {}; + + if (msgBase->msgType == DP_MT_Command) + { + MsgCommand *msgCmd = DP_NNG_BODY_AS(msg, MsgCommand); + + if (msgCmd->doomId == ctx->id) + cmd = msgCmd->cmd; + } + else if (msgBase->msgType == DP_MT_McstCommand) + { + MsgMcstCommand *msgCmd = DP_NNG_BODY_AS(msg, MsgMcstCommand); + cmd = msgCmd->cmd; + } + + if (cmd == DP_DC_RunDoom) + { + ctx->state = DP_DS_Running; + ctx->f = do_doom_running; + + if ((res = publish_state(ctx))) + dp_nng_fatal("doom/publish_sate", res); + } + else if (cmd == DP_DC_QuitDoom) + { + ctx->state = DP_DS_Quitting; + + if ((res = publish_state(ctx))) + dp_nng_fatal("doom/publish_sate", res); + } + + nng_msg_free(msg); + usleep(IdleSleep); return res; } DEF_DOOM_STATE_FUNC(do_doom_running) { + assert(ctx->state == DP_DS_Running); // Non-blocking receive of incoming messages. @@ -104,31 +126,45 @@ DEF_DOOM_STATE_FUNC(do_doom_running) int res = 0; nng_msg *msg = NULL; - // FIXME: this is blocking while testing things if ((res = dp_recv_new_msg_nonblock(ctx->sub, &msg))) { - if (res != NNG_ETIMEDOUT && res != NNG_EAGAIN) + if (!dp_nng_is_timeout(res)) dp_nng_fatal("doom/recvmsg", res); + usleep(IdleSleep); return 0; } - auto len = nng_msg_len(msg); - log_trace("do_doom_running received message of size %zu", len); + log_trace("do_doom_running received message of size %zu", nng_msg_len(msg)); - dmt_t mt = DP_MT_Invalid; + MessageBase *msgBase = DP_NNG_BODY_AS(msg, MessageBase); - if ((res = nng_msg_trim_u16(msg, &mt))) - dp_nng_fatal("doom/msg_trim", res); + DP_DoomCommand cmd = {}; - nng_msg_free(msg); + if (msgBase->msgType == DP_MT_Command) + { + MsgCommand *msgCmd = DP_NNG_BODY_AS(msg, MsgCommand); - if (mt == DP_MT_QuitDoom) + if (msgCmd->doomId == ctx->id) + cmd = msgCmd->cmd; + } + else if (msgBase->msgType == DP_MT_McstCommand) + { + MsgMcstCommand *msgCmd = DP_NNG_BODY_AS(msg, MsgMcstCommand); + cmd = msgCmd->cmd; + } + + if (cmd == DP_DC_QuitDoom) + { ctx->state = DP_DS_Quitting; - // Otherwise stay in running state + if ((res = publish_state(ctx))) + dp_nng_fatal("doom/publish_sate", res); + } - return 0; + nng_msg_free(msg); + usleep(IdleSleep); + return res; } int doom_loop(DoomContext *ctx) @@ -144,7 +180,7 @@ int doom_loop(DoomContext *ctx) if (prevState != ctx->state) { log_info("transition %s -> %s", - doomstate_str(prevState), doomstate_str(ctx->state)); + doomstate_to_string(prevState), doomstate_to_string(ctx->state)); } } @@ -198,4 +234,4 @@ int main(int argc, char *argv[]) nng_close(subSock); return ret; -} \ No newline at end of file +} diff --git a/src/dp_common.c b/src/dp_common.c index 437f5a2..6f17d3a 100644 --- a/src/dp_common.c +++ b/src/dp_common.c @@ -9,18 +9,18 @@ #include "log.h" -void dp_nng_fatal(const char *const msg, int rv) -{ - log_fatal("%s: %s", msg, nng_strerror(rv)); - abort(); -} - void dp_errno_fatal(const char *const msg) { log_fatal("%s: %s", msg, strerror(errno)); abort(); } +void dp_nng_fatal(const char *const msg, int rv) +{ + log_fatal("%s: %s", msg, nng_strerror(rv)); + abort(); +} + void dp_nng_init_limits(int ncpu_max, int pool_thread_limit_max, int resolv_thread_limit) { nng_set_ncpu_max(ncpu_max); diff --git a/src/dp_common.h b/src/dp_common.h index 8670a28..fb3782b 100644 --- a/src/dp_common.h +++ b/src/dp_common.h @@ -12,28 +12,17 @@ extern "C" { #define dp_fatal(...) log_fatal(__VA_ARGS__) -void dp_nng_fatal(const char *const msg, int rv); -void dp_errno_fatal(const char *const msg); - -void dp_nng_init_limits(int ncpu_max, int pool_thread_limit_max, int resolv_thread_limit); - -nng_socket make_ctrl_pub(const char *url); -nng_socket make_ctrl_sub(const char *url); -nng_socket make_doom_pub(const char *url); -nng_socket make_doom_sub(const char *url); - -static const char *const CtrlUrl = "ipc://666_ctrl.socket"; // controller publishes here -static const char *const DoomUrl = "ipc://666_doom.socket"; // dooms publish here - typedef u32 doomid_t; // unique id for each doom instance typedef u16 dmt_t; // for DP_MessageType values typedef enum DP_MessageType { DP_MT_Invalid, - DP_MT_DoomReady, - DP_MT_RunDoom, - DP_MT_QuitDoom, + DP_MT_DoomState, // dooms can publish their state (MsgDoomState) + DP_MT_DoomFrame, // doom framebuffer data + DP_MT_Command, // DP_DoomCommand message to a specific doom instance + DP_MT_McstCommand, // DP_DoomCmannd multicast message to all dooms + DP_MT_Inputs, // controller publishes input state for dooms to consume DP_MT_COUNT } DP_MessageType; @@ -46,17 +35,92 @@ typedef enum DP_DoomState DP_DS_COUNT, } DP_DoomState; -extern const char *const DP_DoomState_Strings[]; - -static inline const char *doomstate_str(DP_DoomState ds) +typedef enum DP_DoomCommand { - return DP_DoomState_Strings[ds]; + DP_DC_Noop, + DP_DC_RunDoom, + DP_DC_QuitDoom, + DP_DC_COUNT, +} DP_DoomCommand; + +typedef struct +{ + dmt_t msgType; +} MessageBase; + +typedef struct __attribute__((packed, aligned(4))) +{ + MessageBase head; + doomid_t doomId; + DP_DoomState doomState; +} MsgDoomState; + +typedef struct __attribute__((packed, aligned(4))) +{ + MessageBase head; + u8 frame[320 * 200]; +} MsgDoomFrame; + +typedef struct __attribute__((packed, aligned(4))) +{ + MessageBase head; + doomid_t doomId; + DP_DoomCommand cmd; +} MsgCommand; + +typedef struct __attribute__((packed, aligned(4))) +{ + MessageBase head; + DP_DoomCommand cmd; +} MsgMcstCommand; + +typedef struct __attribute__((packed, aligned(4))) +{ + MessageBase head; +} MsgInputs; + +void dp_errno_fatal(const char *const msg); +void dp_nng_fatal(const char *const msg, int rv); +void dp_nng_init_limits(int ncpu_max, int pool_thread_limit_max, int resolv_thread_limit); +inline bool dp_nng_is_timeout(int res) +{ + return res == NNG_ETIMEDOUT || res == NNG_EAGAIN; } +static const char *const CtrlUrl = "ipc://666_ctrl.socket"; // controller publishes here +static const char *const DoomUrl = "ipc://666_doom.socket"; // dooms publish here + +nng_socket make_ctrl_pub(const char *url); +nng_socket make_ctrl_sub(const char *url); +nng_socket make_doom_pub(const char *url); +nng_socket make_doom_sub(const char *url); + int dp_recv_new_msg(nng_socket sock, nng_msg **msg_ptr); int dp_recv_new_msg_nonblock(nng_socket sock, nng_msg **msg_ptr); int dp_recv_new_msg_flags(nng_socket sock, nng_msg **msg_ptr, int flags); +inline void *dp_nng_msg_body_if_size_ge(nng_msg *msg, size_t size, bool isFatal) +{ + if (nng_msg_len(msg) >= size) + return nng_msg_body(msg); + + if (isFatal) + dp_fatal("message body too short"); + + return NULL; +} + +// Size checked cast of message body to type, fatal version +#define DP_NNG_BODY_AS(msg, ToType) (ToType *) dp_nng_msg_body_if_size_ge(msg, sizeof(ToType), true) +// Size checked cast of message body to type, non-fatal version +#define DP_NNG_BODY_AS_NF(msg, ToType) (ToType *) dp_nng_msg_body_if_size_ge(msg, sizeof(ToType), false) + +extern const char *const DP_DoomState_Strings[]; + +static inline const char *doomstate_to_string(DP_DoomState ds) +{ + return DP_DoomState_Strings[ds]; +} #ifdef __cplusplus } diff --git a/src/dp_common_c_test.c b/src/dp_common_c_test.c index 2de5b33..48c92ae 100644 --- a/src/dp_common_c_test.c +++ b/src/dp_common_c_test.c @@ -7,6 +7,6 @@ int main(int argc, char *argv[]) log_info("Hello from C11!"); for (int i=0; i