doomsim updates: state machine, nng_recvmsg/nng_sendmsg, first actual logic
This commit is contained in:
parent
3d2d6cc033
commit
cd99af391f
4 changed files with 230 additions and 57 deletions
|
@ -5,21 +5,36 @@
|
||||||
#include "dp_common.h"
|
#include "dp_common.h"
|
||||||
#include "log.h"
|
#include "log.h"
|
||||||
|
|
||||||
static const char *CtrlUrl = "ipc://666_ctrl.socket"; // controller publishes here
|
|
||||||
static const char *DoomUrl = "ipc://666_doom.socket"; // dooms publish here
|
|
||||||
//static const char *CtrlUrl = "tcp://127.0.0.1:6666"; // controller publishes here
|
|
||||||
//static const char *DoomUrl = "tcp://127.0.0.1:6667"; // dooms publish here
|
|
||||||
static const size_t NumDooms = 3;
|
static const size_t NumDooms = 3;
|
||||||
|
|
||||||
int run_controller(const size_t numDooms)
|
int doom_controller_loop(nng_socket pubSock, nng_socket subSock)
|
||||||
{
|
{
|
||||||
nng_set_ncpu_max(1);
|
|
||||||
nng_set_pool_thread_limit_max(1);
|
return 0;
|
||||||
nng_set_resolve_thread_max(1);
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[])
|
||||||
|
{
|
||||||
|
(void) argc;
|
||||||
|
(void) argv;
|
||||||
|
|
||||||
|
log_info("doompanning ctrl starting");
|
||||||
|
|
||||||
|
dp_nng_init_limits(1, 1, 1);
|
||||||
|
|
||||||
auto pubSock = make_ctrl_pub(CtrlUrl);
|
auto pubSock = make_ctrl_pub(CtrlUrl);
|
||||||
auto subSock = make_ctrl_sub(DoomUrl);
|
auto subSock = make_ctrl_sub(DoomUrl);
|
||||||
|
|
||||||
|
int ret = doom_controller_loop(pubSock, subSock);
|
||||||
|
|
||||||
|
nng_close(pubSock);
|
||||||
|
nng_close(subSock);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
// doomctrl test code
|
||||||
|
#if 0
|
||||||
size_t readyCount = 0;
|
size_t readyCount = 0;
|
||||||
|
|
||||||
log_debug("ctrl waiting for hello from dooms");
|
log_debug("ctrl waiting for hello from dooms");
|
||||||
|
@ -41,24 +56,10 @@ int run_controller(const size_t numDooms)
|
||||||
if (auto res = nng_send(pubSock, (void *)&readyCount, sizeof(readyCount), 0))
|
if (auto res = nng_send(pubSock, (void *)&readyCount, sizeof(readyCount), 0))
|
||||||
dp_nng_fatal("run_ctrl/pub/unleash", res);
|
dp_nng_fatal("run_ctrl/pub/unleash", res);
|
||||||
|
|
||||||
nng_close(pubSock);
|
#endif
|
||||||
nng_close(subSock);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int run_doom(const size_t id)
|
|
||||||
{
|
|
||||||
log_debug("doom#%zu started", id);
|
|
||||||
|
|
||||||
nng_set_ncpu_max(1);
|
|
||||||
nng_set_pool_thread_limit_max(1);
|
|
||||||
nng_set_resolve_thread_max(1);
|
|
||||||
|
|
||||||
auto pubSock = make_doom_pub(DoomUrl);
|
|
||||||
auto subSock = make_doom_sub(CtrlUrl);
|
|
||||||
|
|
||||||
|
|
||||||
|
// doomsim test code
|
||||||
|
#if 0
|
||||||
while (true) // FIXME: ctrl code does not work this way
|
while (true) // FIXME: ctrl code does not work this way
|
||||||
{
|
{
|
||||||
log_debug("doom%zu sending hello", id);
|
log_debug("doom%zu sending hello", id);
|
||||||
|
@ -85,28 +86,4 @@ int run_doom(const size_t id)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
nng_close(pubSock);
|
|
||||||
nng_close(subSock);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int main(int argc, char *argv[])
|
|
||||||
{
|
|
||||||
log_info("doompanning ctrl starting");
|
|
||||||
|
|
||||||
for (size_t i=0; i<NumDooms; ++i)
|
|
||||||
{
|
|
||||||
auto res = fork();
|
|
||||||
|
|
||||||
if (res == -1)
|
|
||||||
dp_errno_fatal("spawn doom");
|
|
||||||
else if (res == 0)
|
|
||||||
return run_doom(i);
|
|
||||||
else
|
|
||||||
log_debug("spawned doom#%zu", i);
|
|
||||||
}
|
|
||||||
|
|
||||||
return run_controller(NumDooms);
|
|
||||||
}
|
|
167
src/doomsim.cc
167
src/doomsim.cc
|
@ -1,15 +1,176 @@
|
||||||
|
#include <cassert>
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
|
#include <cstring>
|
||||||
|
#include <optional>
|
||||||
#include <nng/nng.h>
|
#include <nng/nng.h>
|
||||||
#include <nng/protocol/pubsub0/pub.h>
|
#include <nng/protocol/pubsub0/pub.h>
|
||||||
#include <nng/protocol/pubsub0/sub.h>
|
#include <nng/protocol/pubsub0/sub.h>
|
||||||
#include <sys/types.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
#include "dp_common.h"
|
#include "dp_common.h"
|
||||||
#include "log.h"
|
#include "log.h"
|
||||||
|
|
||||||
|
enum DP_DoomState
|
||||||
|
{
|
||||||
|
DP_DS_Ready,
|
||||||
|
DP_DS_Running,
|
||||||
|
DP_DS_Quit,
|
||||||
|
DP_DS_COUNT,
|
||||||
|
};
|
||||||
|
|
||||||
|
static const char *const DP_DoomState_Strings[DP_DS_COUNT] =
|
||||||
|
{
|
||||||
|
"DoomState_Ready",
|
||||||
|
"DoomState_Running",
|
||||||
|
"DoomState_Quit",
|
||||||
|
};
|
||||||
|
|
||||||
|
#define doomstate_str(s) DP_DoomState_Strings[s]
|
||||||
|
|
||||||
|
struct DoomContext;
|
||||||
|
|
||||||
|
#define DEF_DOOM_STATE_FUNC(fn) int fn(DoomContext *ctx)
|
||||||
|
|
||||||
|
typedef DEF_DOOM_STATE_FUNC(DoomStateFunc);
|
||||||
|
|
||||||
|
typedef struct DoomContext
|
||||||
|
{
|
||||||
|
nng_socket pub;
|
||||||
|
nng_socket sub;
|
||||||
|
doomid_t id;
|
||||||
|
DP_DoomState state;
|
||||||
|
DoomStateFunc *f;
|
||||||
|
} DoomContext;
|
||||||
|
|
||||||
|
DEF_DOOM_STATE_FUNC(do_doom_ready)
|
||||||
|
{
|
||||||
|
assert(ctx->state == DP_DS_Ready);
|
||||||
|
|
||||||
|
int res = 0;
|
||||||
|
|
||||||
|
// Publish { DP_MT_DoomReady, doomid }.
|
||||||
|
{
|
||||||
|
nng_msg *msg = NULL;
|
||||||
|
|
||||||
|
if ((res = nng_msg_alloc(&msg, sizeof(dmt_t) + sizeof(doomid_t))))
|
||||||
|
dp_nng_fatal("doom/alloc", res);
|
||||||
|
|
||||||
|
if ((res = nng_msg_append_u16(msg, DP_MT_DoomReady)))
|
||||||
|
dp_nng_fatal("doom/msg", res);
|
||||||
|
|
||||||
|
if ((res = nng_msg_append_u32(msg, ctx->id)))
|
||||||
|
dp_nng_fatal("doom/msg", res);
|
||||||
|
|
||||||
|
if ((res = nng_sendmsg(ctx->pub, msg, 0)))
|
||||||
|
dp_nng_fatal("doom/sendmsg", res);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Incoming Message | Next State
|
||||||
|
// -------------------------------
|
||||||
|
// DP_MT_RunDoom -> DP_DS_Running
|
||||||
|
// DP_MT_QuitDoom -> DP_DS_Quit
|
||||||
|
// */none -> DP_DS_Ready
|
||||||
|
{
|
||||||
|
nng_msg *msg = NULL;
|
||||||
|
|
||||||
|
if ((res = nng_msg_alloc(&msg, sizeof(int) + sizeof(doomid_t))))
|
||||||
|
dp_nng_fatal("doom/alloc", res);
|
||||||
|
|
||||||
|
if ((res = nng_recvmsg(ctx->sub, &msg, 0)))
|
||||||
|
{
|
||||||
|
if (res != NNG_ETIMEDOUT)
|
||||||
|
dp_nng_fatal("doom/recvmsg", res);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
dmt_t mt = DP_MT_Invalid;
|
||||||
|
|
||||||
|
if ((res = nng_msg_trim_u16(msg, &mt)))
|
||||||
|
dp_nng_fatal("doom/msg_trim", res);
|
||||||
|
|
||||||
|
nng_msg_free(msg);
|
||||||
|
|
||||||
|
if (mt == DP_MT_RunDoom)
|
||||||
|
ctx->state = DP_DS_Running;
|
||||||
|
else if (mt == DP_MT_QuitDoom)
|
||||||
|
ctx->state = DP_DS_Quit;
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
DEF_DOOM_STATE_FUNC(do_doom_running)
|
||||||
|
{
|
||||||
|
assert(ctx->state == DP_DS_Running);
|
||||||
|
|
||||||
|
// Read and handle input messages
|
||||||
|
|
||||||
|
// Let doom render a new frame
|
||||||
|
|
||||||
|
// Publish the frame
|
||||||
|
|
||||||
|
// Check if we should quit
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int doom_loop(nng_socket pubSock, nng_socket subSock, doomid_t doomId)
|
||||||
|
{
|
||||||
|
log_debug("doom#%zu started", doomId);
|
||||||
|
|
||||||
|
DoomContext ctx
|
||||||
|
{
|
||||||
|
pubSock,
|
||||||
|
subSock,
|
||||||
|
doomId,
|
||||||
|
DP_DS_Ready,
|
||||||
|
do_doom_ready,
|
||||||
|
};
|
||||||
|
|
||||||
|
int res = 0;
|
||||||
|
|
||||||
|
while (ctx.state != DP_DS_Quit && res == 0)
|
||||||
|
{
|
||||||
|
DP_DoomState prevState = ctx.state;
|
||||||
|
res = ctx.f(&ctx);
|
||||||
|
if (prevState != ctx.state)
|
||||||
|
{
|
||||||
|
log_info("transition %s -> %s",
|
||||||
|
doomstate_str(prevState), doomstate_str(ctx.state));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char *argv[])
|
int main(int argc, char *argv[])
|
||||||
{
|
{
|
||||||
log_info("doomsim starting");
|
log_info("doomsim starting");
|
||||||
return 0;
|
|
||||||
|
|
||||||
|
std::optional<doomid_t> doomId;
|
||||||
|
|
||||||
|
for (int i=1; i<argc; ++i)
|
||||||
|
{
|
||||||
|
if (i < argc-1 && strcmp(argv[i], "-doomid") == 0)
|
||||||
|
{
|
||||||
|
doomId = std::strtoull(argv[i+1], nullptr, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!doomId)
|
||||||
|
{
|
||||||
|
log_error("Missing -doomid <id> parameter!");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dp_nng_init_limits(1, 1, 1);
|
||||||
|
|
||||||
|
auto pubSock = make_doom_pub(DoomUrl);
|
||||||
|
auto subSock = make_doom_sub(CtrlUrl);
|
||||||
|
|
||||||
|
int ret = doom_loop(pubSock, subSock, *doomId);
|
||||||
|
|
||||||
|
nng_close(pubSock);
|
||||||
|
nng_close(subSock);
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
|
@ -10,16 +10,23 @@
|
||||||
|
|
||||||
void dp_nng_fatal(const char *const msg, int rv)
|
void dp_nng_fatal(const char *const msg, int rv)
|
||||||
{
|
{
|
||||||
log_error("%s: %s", msg, nng_strerror(rv));
|
log_fatal("%s: %s", msg, nng_strerror(rv));
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
void dp_errno_fatal(const char *const msg)
|
void dp_errno_fatal(const char *const msg)
|
||||||
{
|
{
|
||||||
log_error("%s: %s", msg, strerror(errno));
|
log_fatal("%s: %s", msg, strerror(errno));
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dp_nng_init_limits(int ncpu_max, int pool_thread_limit_max, int resolv_thread_limit)
|
||||||
|
{
|
||||||
|
nng_set_ncpu_max(ncpu_max);
|
||||||
|
nng_set_pool_thread_limit_max(pool_thread_limit_max);
|
||||||
|
nng_set_resolve_thread_max(resolv_thread_limit);
|
||||||
|
}
|
||||||
|
|
||||||
nng_socket make_ctrl_pub(const char *url)
|
nng_socket make_ctrl_pub(const char *url)
|
||||||
{
|
{
|
||||||
nng_socket sock;
|
nng_socket sock;
|
||||||
|
@ -42,9 +49,13 @@ nng_socket make_ctrl_sub(const char *url)
|
||||||
if ((res = nng_sub0_open(&sock)))
|
if ((res = nng_sub0_open(&sock)))
|
||||||
dp_nng_fatal("make_ctrl_sub/nng_sub0_open", res);
|
dp_nng_fatal("make_ctrl_sub/nng_sub0_open", res);
|
||||||
|
|
||||||
if ((res = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)))
|
if ((res = nng_socket_set_string(sock, NNG_OPT_SUB_SUBSCRIBE, "")))
|
||||||
dp_nng_fatal("make_ctrl_sub/subscribe", res);
|
dp_nng_fatal("make_ctrl_sub/subscribe", res);
|
||||||
|
|
||||||
|
if ((res = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, 100)))
|
||||||
|
dp_nng_fatal("make_ctrl_sub/recvtimeo", res);
|
||||||
|
|
||||||
|
|
||||||
if ((res = nng_listen(sock, url, NULL, 0)))
|
if ((res = nng_listen(sock, url, NULL, 0)))
|
||||||
dp_nng_fatal("make_ctrl_sub/nng_listen", res);
|
dp_nng_fatal("make_ctrl_sub/nng_listen", res);
|
||||||
|
|
||||||
|
@ -73,9 +84,12 @@ nng_socket make_doom_sub(const char *url)
|
||||||
if ((res = nng_sub0_open(&sock)))
|
if ((res = nng_sub0_open(&sock)))
|
||||||
dp_nng_fatal("make_doom_sub/nng_sub0_open", res);
|
dp_nng_fatal("make_doom_sub/nng_sub0_open", res);
|
||||||
|
|
||||||
if ((res = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)))
|
if ((res = nng_socket_set_string(sock, NNG_OPT_SUB_SUBSCRIBE, "")))
|
||||||
dp_nng_fatal("make_doom_sub/subscribe", res);
|
dp_nng_fatal("make_doom_sub/subscribe", res);
|
||||||
|
|
||||||
|
if ((res = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, 100)))
|
||||||
|
dp_nng_fatal("make_doom_sub/recvtimeo", res);
|
||||||
|
|
||||||
if ((res = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)))
|
if ((res = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)))
|
||||||
dp_nng_fatal("make_doom_sub/nng_dial", res);
|
dp_nng_fatal("make_doom_sub/nng_dial", res);
|
||||||
|
|
||||||
|
|
|
@ -2,19 +2,40 @@
|
||||||
#define DP_COMMON_H
|
#define DP_COMMON_H
|
||||||
|
|
||||||
#include <nng/nng.h>
|
#include <nng/nng.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define dp_fatal(...) log_fatal(__VA_ARGS__)
|
||||||
|
|
||||||
void dp_nng_fatal(const char *const msg, int rv);
|
void dp_nng_fatal(const char *const msg, int rv);
|
||||||
void dp_errno_fatal(const char *const msg);
|
void dp_errno_fatal(const char *const msg);
|
||||||
|
|
||||||
|
void dp_nng_init_limits(int ncpu_max, int pool_thread_limit_max, int resolv_thread_limit);
|
||||||
|
|
||||||
nng_socket make_ctrl_pub(const char *url);
|
nng_socket make_ctrl_pub(const char *url);
|
||||||
nng_socket make_ctrl_sub(const char *url);
|
nng_socket make_ctrl_sub(const char *url);
|
||||||
nng_socket make_doom_pub(const char *url);
|
nng_socket make_doom_pub(const char *url);
|
||||||
nng_socket make_doom_sub(const char *url);
|
nng_socket make_doom_sub(const char *url);
|
||||||
|
|
||||||
|
static const char *const CtrlUrl = "ipc://666_ctrl.socket"; // controller publishes here
|
||||||
|
static const char *const DoomUrl = "ipc://666_doom.socket"; // dooms publish here
|
||||||
|
|
||||||
|
typedef uint32_t doomid_t; // unique id for each doom instance
|
||||||
|
typedef uint16_t dmt_t; // for DP_MessageType values
|
||||||
|
|
||||||
|
enum DP_MessageType
|
||||||
|
{
|
||||||
|
DP_MT_Invalid,
|
||||||
|
DP_MT_DoomReady,
|
||||||
|
DP_MT_RunDoom,
|
||||||
|
DP_MT_QuitDoom,
|
||||||
|
DP_MT_COUNT
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Reference in a new issue