diff --git a/src/doompanning.cc b/src/doompanning.cc index c84e3d8..a98b360 100644 --- a/src/doompanning.cc +++ b/src/doompanning.cc @@ -5,21 +5,36 @@ #include "dp_common.h" #include "log.h" -static const char *CtrlUrl = "ipc://666_ctrl.socket"; // controller publishes here -static const char *DoomUrl = "ipc://666_doom.socket"; // dooms publish here -//static const char *CtrlUrl = "tcp://127.0.0.1:6666"; // controller publishes here -//static const char *DoomUrl = "tcp://127.0.0.1:6667"; // dooms publish here static const size_t NumDooms = 3; -int run_controller(const size_t numDooms) +int doom_controller_loop(nng_socket pubSock, nng_socket subSock) { - nng_set_ncpu_max(1); - nng_set_pool_thread_limit_max(1); - nng_set_resolve_thread_max(1); + + return 0; +} + +int main(int argc, char *argv[]) +{ + (void) argc; + (void) argv; + + log_info("doompanning ctrl starting"); + + dp_nng_init_limits(1, 1, 1); auto pubSock = make_ctrl_pub(CtrlUrl); auto subSock = make_ctrl_sub(DoomUrl); + int ret = doom_controller_loop(pubSock, subSock); + + nng_close(pubSock); + nng_close(subSock); + + return ret; +} + +// doomctrl test code + #if 0 size_t readyCount = 0; log_debug("ctrl waiting for hello from dooms"); @@ -41,24 +56,10 @@ int run_controller(const size_t numDooms) if (auto res = nng_send(pubSock, (void *)&readyCount, sizeof(readyCount), 0)) dp_nng_fatal("run_ctrl/pub/unleash", res); - nng_close(pubSock); - nng_close(subSock); - - return 0; -} - -int run_doom(const size_t id) -{ - log_debug("doom#%zu started", id); - - nng_set_ncpu_max(1); - nng_set_pool_thread_limit_max(1); - nng_set_resolve_thread_max(1); - - auto pubSock = make_doom_pub(DoomUrl); - auto subSock = make_doom_sub(CtrlUrl); - + #endif +// doomsim test code + #if 0 while (true) // FIXME: ctrl code does not work this way { log_debug("doom%zu sending hello", id); @@ -85,28 +86,4 @@ int run_doom(const size_t id) break; } } - - nng_close(pubSock); - nng_close(subSock); - - return 0; -} - -int main(int argc, char *argv[]) -{ - log_info("doompanning ctrl starting"); - - for (size_t i=0; i #include +#include +#include #include #include #include -#include -#include #include "dp_common.h" #include "log.h" +enum DP_DoomState +{ + DP_DS_Ready, + DP_DS_Running, + DP_DS_Quit, + DP_DS_COUNT, +}; + +static const char *const DP_DoomState_Strings[DP_DS_COUNT] = +{ + "DoomState_Ready", + "DoomState_Running", + "DoomState_Quit", +}; + +#define doomstate_str(s) DP_DoomState_Strings[s] + +struct DoomContext; + +#define DEF_DOOM_STATE_FUNC(fn) int fn(DoomContext *ctx) + +typedef DEF_DOOM_STATE_FUNC(DoomStateFunc); + +typedef struct DoomContext +{ + nng_socket pub; + nng_socket sub; + doomid_t id; + DP_DoomState state; + DoomStateFunc *f; +} DoomContext; + +DEF_DOOM_STATE_FUNC(do_doom_ready) +{ + assert(ctx->state == DP_DS_Ready); + + int res = 0; + + // Publish { DP_MT_DoomReady, doomid }. + { + nng_msg *msg = NULL; + + if ((res = nng_msg_alloc(&msg, sizeof(dmt_t) + sizeof(doomid_t)))) + dp_nng_fatal("doom/alloc", res); + + if ((res = nng_msg_append_u16(msg, DP_MT_DoomReady))) + dp_nng_fatal("doom/msg", res); + + if ((res = nng_msg_append_u32(msg, ctx->id))) + dp_nng_fatal("doom/msg", res); + + if ((res = nng_sendmsg(ctx->pub, msg, 0))) + dp_nng_fatal("doom/sendmsg", res); + } + + // Incoming Message | Next State + // ------------------------------- + // DP_MT_RunDoom -> DP_DS_Running + // DP_MT_QuitDoom -> DP_DS_Quit + // */none -> DP_DS_Ready + { + nng_msg *msg = NULL; + + if ((res = nng_msg_alloc(&msg, sizeof(int) + sizeof(doomid_t)))) + dp_nng_fatal("doom/alloc", res); + + if ((res = nng_recvmsg(ctx->sub, &msg, 0))) + { + if (res != NNG_ETIMEDOUT) + dp_nng_fatal("doom/recvmsg", res); + + return 0; + } + + dmt_t mt = DP_MT_Invalid; + + if ((res = nng_msg_trim_u16(msg, &mt))) + dp_nng_fatal("doom/msg_trim", res); + + nng_msg_free(msg); + + if (mt == DP_MT_RunDoom) + ctx->state = DP_DS_Running; + else if (mt == DP_MT_QuitDoom) + ctx->state = DP_DS_Quit; + } + + return res; +} + +DEF_DOOM_STATE_FUNC(do_doom_running) +{ + assert(ctx->state == DP_DS_Running); + + // Read and handle input messages + + // Let doom render a new frame + + // Publish the frame + + // Check if we should quit + + return 0; +} + +int doom_loop(nng_socket pubSock, nng_socket subSock, doomid_t doomId) +{ + log_debug("doom#%zu started", doomId); + + DoomContext ctx + { + pubSock, + subSock, + doomId, + DP_DS_Ready, + do_doom_ready, + }; + + int res = 0; + + while (ctx.state != DP_DS_Quit && res == 0) + { + DP_DoomState prevState = ctx.state; + res = ctx.f(&ctx); + if (prevState != ctx.state) + { + log_info("transition %s -> %s", + doomstate_str(prevState), doomstate_str(ctx.state)); + } + } + + return res; +} + int main(int argc, char *argv[]) { log_info("doomsim starting"); - return 0; + std::optional doomId; + + for (int i=1; i parameter!"); + return 1; + } + + dp_nng_init_limits(1, 1, 1); + + auto pubSock = make_doom_pub(DoomUrl); + auto subSock = make_doom_sub(CtrlUrl); + + int ret = doom_loop(pubSock, subSock, *doomId); + + nng_close(pubSock); + nng_close(subSock); + + return ret; } \ No newline at end of file diff --git a/src/dp_common.c b/src/dp_common.c index b2dd8a3..f4a4c23 100644 --- a/src/dp_common.c +++ b/src/dp_common.c @@ -10,16 +10,23 @@ void dp_nng_fatal(const char *const msg, int rv) { - log_error("%s: %s", msg, nng_strerror(rv)); + log_fatal("%s: %s", msg, nng_strerror(rv)); abort(); } void dp_errno_fatal(const char *const msg) { - log_error("%s: %s", msg, strerror(errno)); + log_fatal("%s: %s", msg, strerror(errno)); abort(); } +void dp_nng_init_limits(int ncpu_max, int pool_thread_limit_max, int resolv_thread_limit) +{ + nng_set_ncpu_max(ncpu_max); + nng_set_pool_thread_limit_max(pool_thread_limit_max); + nng_set_resolve_thread_max(resolv_thread_limit); +} + nng_socket make_ctrl_pub(const char *url) { nng_socket sock; @@ -42,9 +49,13 @@ nng_socket make_ctrl_sub(const char *url) if ((res = nng_sub0_open(&sock))) dp_nng_fatal("make_ctrl_sub/nng_sub0_open", res); - if ((res = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0))) + if ((res = nng_socket_set_string(sock, NNG_OPT_SUB_SUBSCRIBE, ""))) dp_nng_fatal("make_ctrl_sub/subscribe", res); + if ((res = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, 100))) + dp_nng_fatal("make_ctrl_sub/recvtimeo", res); + + if ((res = nng_listen(sock, url, NULL, 0))) dp_nng_fatal("make_ctrl_sub/nng_listen", res); @@ -73,9 +84,12 @@ nng_socket make_doom_sub(const char *url) if ((res = nng_sub0_open(&sock))) dp_nng_fatal("make_doom_sub/nng_sub0_open", res); - if ((res = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0))) + if ((res = nng_socket_set_string(sock, NNG_OPT_SUB_SUBSCRIBE, ""))) dp_nng_fatal("make_doom_sub/subscribe", res); + if ((res = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, 100))) + dp_nng_fatal("make_doom_sub/recvtimeo", res); + if ((res = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK))) dp_nng_fatal("make_doom_sub/nng_dial", res); diff --git a/src/dp_common.h b/src/dp_common.h index 919d92e..d52909a 100644 --- a/src/dp_common.h +++ b/src/dp_common.h @@ -2,19 +2,40 @@ #define DP_COMMON_H #include +#include #ifdef __cplusplus extern "C" { #endif +#define dp_fatal(...) log_fatal(__VA_ARGS__) + void dp_nng_fatal(const char *const msg, int rv); void dp_errno_fatal(const char *const msg); +void dp_nng_init_limits(int ncpu_max, int pool_thread_limit_max, int resolv_thread_limit); + nng_socket make_ctrl_pub(const char *url); nng_socket make_ctrl_sub(const char *url); nng_socket make_doom_pub(const char *url); nng_socket make_doom_sub(const char *url); +static const char *const CtrlUrl = "ipc://666_ctrl.socket"; // controller publishes here +static const char *const DoomUrl = "ipc://666_doom.socket"; // dooms publish here + +typedef uint32_t doomid_t; // unique id for each doom instance +typedef uint16_t dmt_t; // for DP_MessageType values + +enum DP_MessageType +{ + DP_MT_Invalid, + DP_MT_DoomReady, + DP_MT_RunDoom, + DP_MT_QuitDoom, + DP_MT_COUNT +}; + + #ifdef __cplusplus } #endif