#include "dp_common.h" #include #include #include #include #include #include #include "log.h" void dp_errno_fatal(const char *const msg) { log_fatal("%s: %s", msg, strerror(errno)); abort(); } void dp_nng_fatal(const char *const msg, int rv) { log_fatal("%s: %s", msg, nng_strerror(rv)); abort(); } void dp_nng_init_limits(int ncpu_max, int pool_thread_limit_max, int resolv_thread_limit) { nng_set_ncpu_max(ncpu_max); nng_set_pool_thread_limit_max(pool_thread_limit_max); nng_set_resolve_thread_max(resolv_thread_limit); } nng_socket make_ctrl_pub(const char *url) { nng_socket sock; int res = 0; if ((res = nng_pub0_open(&sock))) dp_nng_fatal("make_ctrl_pub/nng_pub0_open", res); if ((res = nng_listen(sock, url, NULL, 0))) dp_nng_fatal("make_ctrl_pub/nng_listen", res); return sock; } nng_socket make_ctrl_sub(const char *url) { nng_socket sock; int res = 0; if ((res = nng_sub0_open(&sock))) dp_nng_fatal("make_ctrl_sub/nng_sub0_open", res); if ((res = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0))) dp_nng_fatal("make_ctrl_sub/subscribe", res); if ((res = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, 100))) dp_nng_fatal("make_ctrl_sub/recvtimeo", res); if ((res = nng_listen(sock, url, NULL, 0))) dp_nng_fatal("make_ctrl_sub/nng_listen", res); return sock; } nng_socket make_doom_pub(const char *url) { nng_socket sock; int res = 0; if ((res = nng_pub0_open(&sock))) dp_nng_fatal("make_doom_pub/nng_pub0_open", res); if ((res = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK))) dp_nng_fatal("make_doom_pub/nng_dial", res); return sock; } nng_socket make_doom_sub(const char *url) { nng_socket sock; int res = 0; if ((res = nng_sub0_open(&sock))) dp_nng_fatal("make_doom_sub/nng_sub0_open", res); if ((res = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0))) dp_nng_fatal("make_doom_sub/subscribe", res); if ((res = nng_socket_set_ms(sock, NNG_OPT_RECVTIMEO, 100))) dp_nng_fatal("make_doom_sub/recvtimeo", res); if ((res = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK))) dp_nng_fatal("make_doom_sub/nng_dial", res); return sock; } const char *const DP_DoomState_Strings[] = { "DoomState_Unknown", "DoomState_Ready", "DoomState_Running", "DoomState_Endoom", }; _Static_assert(sizeof(DP_DoomState_Strings) / sizeof(DP_DoomState_Strings[0]) == DP_DS_COUNT, "DP_DoomState enum and strings do not match up"); int dp_recv_new_msg(nng_socket sock, nng_msg **msg_ptr) { return dp_recv_new_msg_flags(sock, msg_ptr, 0); } int dp_recv_new_msg_nonblock(nng_socket sock, nng_msg **msg_ptr) { return dp_recv_new_msg_flags(sock, msg_ptr, NNG_FLAG_NONBLOCK); } int dp_recv_new_msg_flags(nng_socket sock, nng_msg **msg_ptr, int flags) { assert(msg_ptr); int res = 0; if ((res = nng_recvmsg(sock, msg_ptr, flags))) { nng_msg_free(*msg_ptr); *msg_ptr = NULL; } return res; }