mesytec-mnode/src/mnode_nng_async.cc
2024-12-07 18:33:35 +01:00

129 lines
3 KiB
C++

#include <mesytec-mnode/mnode_nng_async.h>
namespace mesytec::mnode::nng
{
static void work_callback(void *arg)
{
reinterpret_cast<IWork *>(arg)->work();;
}
AsyncReqWork::AsyncReqWork(nng_socket socket)
: socket(socket)
, request_(nng::make_unique_msg())
{
}
AsyncReqWork::~AsyncReqWork()
{
nng_ctx_close(ctx);
nng_aio_free(aio);
}
void AsyncReqWork::work()
{
switch (state)
{
case INIT:
nng_aio_alloc(&aio, work_callback, this);
nng_ctx_open(&ctx, socket);
state = SEND;
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
nng_ctx_send(ctx, aio);
break;
case SEND:
if (auto rv = nng_aio_result(aio))
{
nng::mnode_nng_error("nng_ctx_send", rv);
nng::make_unique_msg(nng_aio_get_msg(aio)).reset();
state = SEND;
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
nng_ctx_send(ctx, aio);
}
else
{
state = RECEIVE;
nng_ctx_recv(ctx, aio);
}
break;
case RECEIVE:
if (auto rv = nng_aio_result(aio))
{
nng::mnode_nng_error("nng_ctx_recv", rv);
state = SEND;
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
nng_ctx_send(ctx, aio);
}
else
{
auto reply = nng::make_unique_msg(nng_aio_get_msg(aio));
handle_reply(std::move(request_), std::move(reply));
state = SEND;
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
nng_ctx_send(ctx, aio);
}
break;
}
}
AsyncRepWork::AsyncRepWork(nng_socket socket)
: socket(socket)
{
}
AsyncRepWork::~AsyncRepWork()
{
nng_ctx_close(ctx);
nng_aio_free(aio);
}
void AsyncRepWork::work()
{
switch (state)
{
case INIT:
nng_aio_alloc(&aio, work_callback, this);
nng_ctx_open(&ctx, socket);
state = RECEIVE;
nng_ctx_recv(ctx, aio);
break;
case RECEIVE:
if (auto rv = nng_aio_result(aio))
{
if (rv != NNG_ETIMEDOUT)
nng::mnode_nng_error("nng_ctx_recv", rv);
state = RECEIVE;
nng_ctx_recv(ctx, aio);
break;
}
else
{
auto reply = handle_request(nng::make_unique_msg(nng_aio_get_msg(aio)));
nng_aio_set_msg(aio, reply.release());
state = SEND;
nng_ctx_send(ctx, aio);
}
break;
case SEND:
if (auto rv = nng_aio_result(aio))
{
nng::mnode_nng_error("nng_ctx_send", rv);
nng::make_unique_msg(nng_aio_get_msg(aio)).reset();
}
state = RECEIVE;
nng_ctx_recv(ctx, aio);
break;
}
}
}