diff --git a/include/mesytec-mnode/mnode_nng_async.h b/include/mesytec-mnode/mnode_nng_async.h index 4079fc7..ac0c04d 100644 --- a/include/mesytec-mnode/mnode_nng_async.h +++ b/include/mesytec-mnode/mnode_nng_async.h @@ -1,25 +1,59 @@ #ifndef C7F35237_1097_46F2_9573_490355317C24 #define C7F35237_1097_46F2_9573_490355317C24 -#include +#include namespace mesytec::mnode::nng { -class IWork +struct IWork { - public: - virtual ~IWork() = default; - virtual void work() = 0; + virtual ~IWork() = default; + virtual void work() = 0; }; -class IAsyncReqRepWork: public IWork +struct IAsyncReqWork: public IWork { - public: - virtual nng::unique_msg make_request() = 0; - virtual void handle_reply(nng::unique_msg &&request, nng::unique_msg &&reply) = 0; + virtual nng::unique_msg make_request() = 0; + virtual void handle_reply(nng::unique_msg &&request, nng::unique_msg &&reply) = 0; }; -} +struct IAsyncRepWork: public IWork +{ + virtual nng::unique_msg handle_request(nng::unique_msg &&request) = 0; +}; + +class AsyncReqWork: public IAsyncReqWork +{ + public: + explicit AsyncReqWork(nng_socket socket); + ~AsyncReqWork() override; + void work() override; + + private: + enum State { INIT, RECEIVE, SEND, }; + State state = INIT; + nng_socket socket; + nng_aio *aio = nullptr; + nng_ctx ctx; + nng::unique_msg request_; +}; + +class AsyncRepWork: public IAsyncRepWork +{ + public: + explicit AsyncRepWork(nng_socket socket); + ~AsyncRepWork() override; + void work() override; + + private: + enum State { INIT, RECEIVE, SEND, }; + State state = INIT; + nng_socket socket; + nng_aio *aio = nullptr; + nng_ctx ctx; +}; + +} // namespace mesytec::mnode::nng #endif /* C7F35237_1097_46F2_9573_490355317C24 */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 66c83cc..31913ce 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -20,7 +20,7 @@ set(MVLC_NNG_MNODE_WARN_FLAGS -Wall -Wextra -Wpedantic) #) #target_include_directories(dp_common INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) -add_library(mesytec-mnode-nng mnode_nng.cc) +add_library(mesytec-mnode-nng mnode_nng.cc mnode_nng_async.cc) target_include_directories(mesytec-mnode-nng PUBLIC $) target_link_libraries(mesytec-mnode-nng PUBLIC nng PRIVATE spdlog) target_compile_features(mesytec-mnode-nng PUBLIC cxx_std_17) diff --git a/src/mnode_nng_async.cc b/src/mnode_nng_async.cc new file mode 100644 index 0000000..53bc9fe --- /dev/null +++ b/src/mnode_nng_async.cc @@ -0,0 +1,129 @@ +#include + +namespace mesytec::mnode::nng +{ + +static void work_callback(void *arg) +{ + reinterpret_cast(arg)->work();; +} + +AsyncReqWork::AsyncReqWork(nng_socket socket) + : socket(socket) + , request_(nng::make_unique_msg()) +{ +} + +AsyncReqWork::~AsyncReqWork() +{ + nng_ctx_close(ctx); + nng_aio_free(aio); +} + +void AsyncReqWork::work() +{ + switch (state) + { + case INIT: + nng_aio_alloc(&aio, work_callback, this); + nng_ctx_open(&ctx, socket); + + state = SEND; + request_ = make_request(); + nng_aio_set_msg(aio, nng::clone_message(request_.get()).release()); + nng_ctx_send(ctx, aio); + break; + + case SEND: + if (auto rv = nng_aio_result(aio)) + { + nng::mnode_nng_error("nng_ctx_send", rv); + nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); + state = SEND; + request_ = make_request(); + nng_aio_set_msg(aio, nng::clone_message(request_.get()).release()); + nng_ctx_send(ctx, aio); + } + else + { + state = RECEIVE; + nng_ctx_recv(ctx, aio); + } + break; + + case RECEIVE: + if (auto rv = nng_aio_result(aio)) + { + nng::mnode_nng_error("nng_ctx_recv", rv); + state = SEND; + request_ = make_request(); + nng_aio_set_msg(aio, nng::clone_message(request_.get()).release()); + nng_ctx_send(ctx, aio); + } + else + { + auto reply = nng::make_unique_msg(nng_aio_get_msg(aio)); + handle_reply(std::move(request_), std::move(reply)); + state = SEND; + request_ = make_request(); + nng_aio_set_msg(aio, nng::clone_message(request_.get()).release()); + nng_ctx_send(ctx, aio); + } + + break; + } +} + +AsyncRepWork::AsyncRepWork(nng_socket socket) + : socket(socket) +{ +} + +AsyncRepWork::~AsyncRepWork() +{ + nng_ctx_close(ctx); + nng_aio_free(aio); +} + +void AsyncRepWork::work() +{ + switch (state) + { + case INIT: + nng_aio_alloc(&aio, work_callback, this); + nng_ctx_open(&ctx, socket); + state = RECEIVE; + nng_ctx_recv(ctx, aio); + break; + + case RECEIVE: + if (auto rv = nng_aio_result(aio)) + { + if (rv != NNG_ETIMEDOUT) + nng::mnode_nng_error("nng_ctx_recv", rv); + state = RECEIVE; + nng_ctx_recv(ctx, aio); + break; + } + else + { + auto reply = handle_request(nng::make_unique_msg(nng_aio_get_msg(aio))); + nng_aio_set_msg(aio, reply.release()); + state = SEND; + nng_ctx_send(ctx, aio); + } + break; + + case SEND: + if (auto rv = nng_aio_result(aio)) + { + nng::mnode_nng_error("nng_ctx_send", rv); + nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); + } + state = RECEIVE; + nng_ctx_recv(ctx, aio); + break; + } +} + +}