add a barebones nng async abstraction

This commit is contained in:
Florian Lüke 2024-12-07 18:33:35 +01:00
parent 4c940d4e0a
commit 25b018b1bb
3 changed files with 174 additions and 11 deletions

View file

@ -1,25 +1,59 @@
#ifndef C7F35237_1097_46F2_9573_490355317C24
#define C7F35237_1097_46F2_9573_490355317C24
#include <mesytec-node/mesytec_node_nng.h>
#include <mesytec-mnode/mnode_nng.h>
namespace mesytec::mnode::nng
{
class IWork
struct IWork
{
public:
virtual ~IWork() = default;
virtual void work() = 0;
virtual ~IWork() = default;
virtual void work() = 0;
};
class IAsyncReqRepWork: public IWork
struct IAsyncReqWork: public IWork
{
public:
virtual nng::unique_msg make_request() = 0;
virtual void handle_reply(nng::unique_msg &&request, nng::unique_msg &&reply) = 0;
virtual nng::unique_msg make_request() = 0;
virtual void handle_reply(nng::unique_msg &&request, nng::unique_msg &&reply) = 0;
};
}
struct IAsyncRepWork: public IWork
{
virtual nng::unique_msg handle_request(nng::unique_msg &&request) = 0;
};
class AsyncReqWork: public IAsyncReqWork
{
public:
explicit AsyncReqWork(nng_socket socket);
~AsyncReqWork() override;
void work() override;
private:
enum State { INIT, RECEIVE, SEND, };
State state = INIT;
nng_socket socket;
nng_aio *aio = nullptr;
nng_ctx ctx;
nng::unique_msg request_;
};
class AsyncRepWork: public IAsyncRepWork
{
public:
explicit AsyncRepWork(nng_socket socket);
~AsyncRepWork() override;
void work() override;
private:
enum State { INIT, RECEIVE, SEND, };
State state = INIT;
nng_socket socket;
nng_aio *aio = nullptr;
nng_ctx ctx;
};
} // namespace mesytec::mnode::nng
#endif /* C7F35237_1097_46F2_9573_490355317C24 */

View file

@ -20,7 +20,7 @@ set(MVLC_NNG_MNODE_WARN_FLAGS -Wall -Wextra -Wpedantic)
#)
#target_include_directories(dp_common INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
add_library(mesytec-mnode-nng mnode_nng.cc)
add_library(mesytec-mnode-nng mnode_nng.cc mnode_nng_async.cc)
target_include_directories(mesytec-mnode-nng PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>)
target_link_libraries(mesytec-mnode-nng PUBLIC nng PRIVATE spdlog)
target_compile_features(mesytec-mnode-nng PUBLIC cxx_std_17)

129
src/mnode_nng_async.cc Normal file
View file

@ -0,0 +1,129 @@
#include <mesytec-mnode/mnode_nng_async.h>
namespace mesytec::mnode::nng
{
static void work_callback(void *arg)
{
reinterpret_cast<IWork *>(arg)->work();;
}
AsyncReqWork::AsyncReqWork(nng_socket socket)
: socket(socket)
, request_(nng::make_unique_msg())
{
}
AsyncReqWork::~AsyncReqWork()
{
nng_ctx_close(ctx);
nng_aio_free(aio);
}
void AsyncReqWork::work()
{
switch (state)
{
case INIT:
nng_aio_alloc(&aio, work_callback, this);
nng_ctx_open(&ctx, socket);
state = SEND;
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
nng_ctx_send(ctx, aio);
break;
case SEND:
if (auto rv = nng_aio_result(aio))
{
nng::mnode_nng_error("nng_ctx_send", rv);
nng::make_unique_msg(nng_aio_get_msg(aio)).reset();
state = SEND;
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
nng_ctx_send(ctx, aio);
}
else
{
state = RECEIVE;
nng_ctx_recv(ctx, aio);
}
break;
case RECEIVE:
if (auto rv = nng_aio_result(aio))
{
nng::mnode_nng_error("nng_ctx_recv", rv);
state = SEND;
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
nng_ctx_send(ctx, aio);
}
else
{
auto reply = nng::make_unique_msg(nng_aio_get_msg(aio));
handle_reply(std::move(request_), std::move(reply));
state = SEND;
request_ = make_request();
nng_aio_set_msg(aio, nng::clone_message(request_.get()).release());
nng_ctx_send(ctx, aio);
}
break;
}
}
AsyncRepWork::AsyncRepWork(nng_socket socket)
: socket(socket)
{
}
AsyncRepWork::~AsyncRepWork()
{
nng_ctx_close(ctx);
nng_aio_free(aio);
}
void AsyncRepWork::work()
{
switch (state)
{
case INIT:
nng_aio_alloc(&aio, work_callback, this);
nng_ctx_open(&ctx, socket);
state = RECEIVE;
nng_ctx_recv(ctx, aio);
break;
case RECEIVE:
if (auto rv = nng_aio_result(aio))
{
if (rv != NNG_ETIMEDOUT)
nng::mnode_nng_error("nng_ctx_recv", rv);
state = RECEIVE;
nng_ctx_recv(ctx, aio);
break;
}
else
{
auto reply = handle_request(nng::make_unique_msg(nng_aio_get_msg(aio)));
nng_aio_set_msg(aio, reply.release());
state = SEND;
nng_ctx_send(ctx, aio);
}
break;
case SEND:
if (auto rv = nng_aio_result(aio))
{
nng::mnode_nng_error("nng_ctx_send", rv);
nng::make_unique_msg(nng_aio_get_msg(aio)).reset();
}
state = RECEIVE;
nng_ctx_recv(ctx, aio);
break;
}
}
}