From 17787b8ab19469ff836ef79ec2c58f8efa75abee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20L=C3=BCke?= Date: Thu, 12 Dec 2024 00:31:03 +0100 Subject: [PATCH] work on rpc --- external/mesytec-mvlc | 2 +- include/mesytec-mnode/mnode_cpp_types.h | 8 +- include/mesytec-mnode/mnode_nng_proto.h | 16 ++++ proto/mrpc.proto | 4 + src/CMakeLists.txt | 4 +- src/mnode_nng_proto.cc | 57 ++++++++++++ src/mnode_proto_rpc_ping_client.cc | 118 ++++++++++++------------ src/mnode_proto_rpc_ping_server.cc | 57 +++++++++--- 8 files changed, 189 insertions(+), 77 deletions(-) create mode 100644 include/mesytec-mnode/mnode_nng_proto.h create mode 100644 src/mnode_nng_proto.cc diff --git a/external/mesytec-mvlc b/external/mesytec-mvlc index 31606b3..3347a18 160000 --- a/external/mesytec-mvlc +++ b/external/mesytec-mvlc @@ -1 +1 @@ -Subproject commit 31606b39eab4b3581633d6693329082328ff7c62 +Subproject commit 3347a186c7c3eb0a12b6baa76b90e71d573fcd39 diff --git a/include/mesytec-mnode/mnode_cpp_types.h b/include/mesytec-mnode/mnode_cpp_types.h index 31d7e43..1493543 100644 --- a/include/mesytec-mnode/mnode_cpp_types.h +++ b/include/mesytec-mnode/mnode_cpp_types.h @@ -3,18 +3,20 @@ #include -namespace mesytec::mnode::cpp_types +namespace mesytec::mnode { -using u8 = std::uint8_t; +// clang-format off +using u8 = std::uint8_t; using u16 = std::uint16_t; using u32 = std::uint32_t; using u64 = std::uint64_t; -using s8 = std::int8_t; +using s8 = std::int8_t; using s16 = std::int16_t; using s32 = std::int32_t; using s64 = std::int64_t; +// clang-format on } // namespace mesytec::mnode::cpp_types diff --git a/include/mesytec-mnode/mnode_nng_proto.h b/include/mesytec-mnode/mnode_nng_proto.h new file mode 100644 index 0000000..f563127 --- /dev/null +++ b/include/mesytec-mnode/mnode_nng_proto.h @@ -0,0 +1,16 @@ +#ifndef D9BA34A9_3472_4E22_8E82_5C335EDA67C7 +#define D9BA34A9_3472_4E22_8E82_5C335EDA67C7 + +#include +#include + +namespace mesytec::mnode::nng +{ + +size_t serialize_proto_to_nng(const google::protobuf::MessageLite &message, nng_msg *msg); +size_t deserialize_proto_from_nng(google::protobuf::MessageLite &message, nng_msg *msg); +size_t deserialize_proto_from_nng_trim(google::protobuf::MessageLite &message, nng_msg *msg); + +} // namespace mesytec::mnode::nng + +#endif /* D9BA34A9_3472_4E22_8E82_5C335EDA67C7 */ diff --git a/proto/mrpc.proto b/proto/mrpc.proto index 76a31fc..ea42897 100644 --- a/proto/mrpc.proto +++ b/proto/mrpc.proto @@ -7,13 +7,17 @@ package mesytec.mnode.proto.mrpc; message MethodCall { + // Service::GetDescriptor()->full_name() string service = 1; + // MethodDescriptor::full_name() string method = 2; + // Must be of type Service::GetRequestPrototype(method) google.protobuf.Any request = 3; } message MethodCallResponse { google.rpc.Status status = 1; + // Is of type Service::GetResponsePrototype(method) google.protobuf.Any response = 2; } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2ee959a..6a7230d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -6,11 +6,11 @@ set(MVLC_NNG_MNODE_WARN_FLAGS -Wall -Wextra -Wpedantic) # set(CMAKE_CXX_CLANG_TIDY clang-tidy -p ${CMAKE_BINARY_DIR} --extra-arg=-std=c++17) #endif() -add_library(mesytec-mnode mnode_nng.cc mnode_nng_async.cc) +add_library(mesytec-mnode mnode_nng.cc mnode_nng_async.cc mnode_nng_proto.cc) target_include_directories(mesytec-mnode PUBLIC $ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/mesytec-mnode) -target_link_libraries(mesytec-mnode PUBLIC nng PRIVATE spdlog) +target_link_libraries(mesytec-mnode PUBLIC nng mnode-proto PRIVATE spdlog) target_compile_features(mesytec-mnode PRIVATE cxx_std_17) add_library(mesytec-mnode-dev INTERFACE) diff --git a/src/mnode_nng_proto.cc b/src/mnode_nng_proto.cc new file mode 100644 index 0000000..487e92b --- /dev/null +++ b/src/mnode_nng_proto.cc @@ -0,0 +1,57 @@ +#include +#include +#include + +namespace mesytec::mnode::nng +{ + +size_t serialize_proto_to_nng(const google::protobuf::MessageLite &message, nng_msg *msg) +{ + auto messageSize = message.ByteSizeLong(); + + if (auto res = nng_msg_realloc(msg, nng_msg_len(msg) + sizeof(u32) + messageSize)) + return false; + + *reinterpret_cast(nng_msg_body(msg)) = messageSize; + if (!message.SerializeToArray(reinterpret_cast(nng_msg_body(msg)) + sizeof(u32), + messageSize)) + { + return 0; + } + return sizeof(u32) + messageSize; +} + +size_t deserialize_proto_from_nng(google::protobuf::MessageLite &message, nng_msg *msg) +{ + if (nng_msg_len(msg) < sizeof(u32)) + return 0; + + auto messageSize = *reinterpret_cast(nng_msg_body(msg)); + + if (messageSize == 0) + return 0; + + if (nng_msg_len(msg) < sizeof(u32) + messageSize) + return 0; + + if (!message.ParseFromArray(reinterpret_cast(nng_msg_body(msg)) + sizeof(u32), + messageSize)) + { + return 0; + } + + return sizeof(u32) + messageSize; +} + +size_t deserialize_proto_from_nng_trim(google::protobuf::MessageLite &message, nng_msg *msg) +{ + if (auto result = deserialize_proto_from_nng(message, msg)) + { + nng_msg_trim(msg, result); + return result; + } + + return 0; +} + +} // namespace mesytec::mnode::nng diff --git a/src/mnode_proto_rpc_ping_client.cc b/src/mnode_proto_rpc_ping_client.cc index 153baa2..20be550 100644 --- a/src/mnode_proto_rpc_ping_client.cc +++ b/src/mnode_proto_rpc_ping_client.cc @@ -2,8 +2,9 @@ #include #include -#include #include +#include +#include #include "internal/argh.h" #include "proto/google/rpc/code.pb.h" #include "proto/mrpc.pb.h" @@ -12,39 +13,7 @@ using namespace mesytec; using namespace std::literals; -using namespace mesytec::mnode::cpp_types; - -class MVLCService: public mnode::proto::mvlc::MVLCService -{ -public: - explicit MVLCService(mesytec::mvlc::MVLC mvlc) - : mvlc_(mvlc) - { } - - void ReadRegister(::google::protobuf::RpcController *controller, - const mnode::proto::mvlc::ReadRegisterRequest *request, - mnode::proto::mvlc::ReadRegisterResponse *response, - ::google::protobuf::Closure *done) override - { - spdlog::info("MVLCService::ReadRegister"); - u32 value; - if (auto ec = mvlc_.readRegister(request->address(), value)) - { - response->mutable_status()->set_code(ec.value()); - response->mutable_status()->set_message(ec.message()); - controller->SetFailed(ec.message()); - } - else - { - response->set_value(value); - } - - done->Run(); - } - -private: - mesytec::mvlc::MVLC mvlc_; -}; +using namespace mesytec::mnode; void print_service_descriptors(google::protobuf::Service &service) { @@ -62,20 +31,32 @@ void print_service_descriptors(google::protobuf::Service &service) } } -struct RPCClosure: public google::protobuf::Closure +struct RpcClosure: public google::protobuf::Closure { void Run() override { } }; -struct RPCController: public google::protobuf::RpcController +struct RpcController: public google::protobuf::RpcController { - void Reset() override { } - bool Failed() const override { return false; } - std::string ErrorText() const override { return ""; } + void Reset() override + { + failed_ = false; + canceled_ = false; + errorText_.clear(); + cancelCallback_ = nullptr; + } + + bool Failed() const override { return failed_; } + std::string ErrorText() const override { return errorText_; } void StartCancel() override { } - void SetFailed(const std::string &reason) override { } - bool IsCanceled() const override { return false; } - void NotifyOnCancel(google::protobuf::Closure *callback) override { } + void SetFailed(const std::string &reason) override { errorText_ = reason; failed_ = true; } + bool IsCanceled() const override { return canceled_; } + void NotifyOnCancel(google::protobuf::Closure *callback) override { cancelCallback_ = callback; } + + bool failed_ = false; + bool canceled_ = false; + std::string errorText_; + google::protobuf::Closure *cancelCallback_ = nullptr; }; struct RpcChannel: public google::protobuf::RpcChannel @@ -85,17 +66,12 @@ struct RpcChannel: public google::protobuf::RpcChannel { } - int send_request(const mnode::proto::mrpc::MethodCall &methodCall) + void fail(google::protobuf::RpcController *controller, const std::string &reason, google::rpc::Code code) { - auto methodCallSize = methodCall.ByteSizeLong(); - auto msg = mnode::nng::allocate_message(sizeof(u32) + methodCallSize); - *reinterpret_cast(nng_msg_body(msg.get())) = methodCallSize; - if (!methodCall.SerializeToArray(reinterpret_cast(nng_msg_body(msg.get())) + sizeof(u32), methodCallSize)) - { - spdlog::error("methodCall.SerializeToArray failed"); - } - spdlog::info("sending request: nng_msg_len={}, methodCallSize={}", nng_msg_len(msg.get()), methodCallSize); - return mnode::nng::send_message(socket_, msg); + controller->SetFailed(reason); + callResponse_.mutable_status()->set_code(code); + callResponse_.mutable_status()->set_message(reason); + callResponse_.mutable_status()->clear_details(); } void CallMethod(const google::protobuf::MethodDescriptor *method, @@ -109,9 +85,25 @@ struct RpcChannel: public google::protobuf::RpcChannel methodCall_.set_service(method->service()->full_name()); methodCall_.set_method(method->full_name()); - methodCall_.mutable_request()->PackFrom(*request); - if (auto nng_res = send_request(methodCall_)) + if (!methodCall_.mutable_request()->PackFrom(*request)) + { + fail(controller, "PackFrom failed", google::rpc::Code::INTERNAL); + spdlog::error("PackFrom failed"); + return; + } + + auto msg = mnode::nng::allocate_message(0); + + if (mnode::nng::serialize_proto_to_nng(methodCall_, msg.get()) == 0) + { + spdlog::error("serialize_proto_to_nng failed"); + return; + } + + spdlog::info("sending request: nng_msg_len={}", nng_msg_len(msg.get())); + + if (auto nng_res = mnode::nng::send_message(socket_, msg)) { spdlog::error("nng_sendmsg: {}", nng_strerror(nng_res)); return; @@ -130,7 +122,19 @@ struct RpcChannel: public google::protobuf::RpcChannel return; } - auto callResponseSize = mnode::nng::msg_trim_read(nng_response.get()); + if (mnode::nng::deserialize_proto_from_nng_trim(callResponse_, nng_response.get()) == 0) + { + spdlog::error("deserialize_proto_from_nng_trim failed"); + return; + } + + if (!callResponse_.response().UnpackTo(response)) + { + spdlog::error("UnpackTo failed"); + return; + } + + done->Run(); } nng_socket socket_; @@ -151,8 +155,8 @@ int main() return res; } - RPCController rpcController; - RPCClosure done; + RpcController rpcController; + RpcClosure done; RpcChannel rpcChannel(socket); mnode::proto::mvlc::MVLCService::Stub mvlcService(&rpcChannel); diff --git a/src/mnode_proto_rpc_ping_server.cc b/src/mnode_proto_rpc_ping_server.cc index 26bc236..5dfe487 100644 --- a/src/mnode_proto_rpc_ping_server.cc +++ b/src/mnode_proto_rpc_ping_server.cc @@ -12,7 +12,7 @@ using namespace mesytec; using namespace std::literals; -using namespace mesytec::mnode::cpp_types; +using namespace mesytec::mnode; class MVLCService: public mnode::proto::mvlc::MVLCService { @@ -60,7 +60,7 @@ class MVLCService: public mnode::proto::mvlc::MVLCService { std::vector dest; ec = mvlc_.vmeBlockRead(request->address(), amod, maxTransfers, dest, fifo); - for (auto d : dest) + for (auto d: dest) { response->add_value(d); } @@ -95,35 +95,31 @@ void print_service_descriptors(google::protobuf::Service &service) } } -struct RPCClosure: public google::protobuf::Closure +struct RpcClosure: public google::protobuf::Closure { void Run() override {} }; -struct RPCController: public google::protobuf::RpcController +struct RpcController: public google::protobuf::RpcController { - void Reset() override - { - spdlog::debug("RPCController::Reset"); - } + void Reset() override { spdlog::debug("RpcController::Reset"); } bool Failed() const override { return false; } std::string ErrorText() const override { return ""; } void StartCancel() override {} void SetFailed(const std::string &reason) override { - spdlog::error("RPCController::SetFailed: reason={}", reason); + spdlog::error("RpcController::SetFailed: reason={}", reason); } bool IsCanceled() const override { return false; } - void NotifyOnCancel(google::protobuf::Closure */*callback*/) override {} + void NotifyOnCancel(google::protobuf::Closure * /*callback*/) override {} }; struct RpcChannel: public google::protobuf::RpcChannel { void CallMethod(const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller, - const google::protobuf::Message *request, - google::protobuf::Message *response, + const google::protobuf::Message *request, google::protobuf::Message *response, google::protobuf::Closure *done) override { spdlog::info("RpcChannel::CallMethod: method={}, request={}, response={}", @@ -131,6 +127,39 @@ struct RpcChannel: public google::protobuf::RpcChannel } }; +bool serialize_proto_to_nng(const google::protobuf::MessageLite &message, nng_msg *msg) +{ + auto messageSize = message.ByteSizeLong(); + + if (auto res = nng_msg_realloc(msg, nng_msg_len(msg) + sizeof(u32) + messageSize); res != 0) + { + spdlog::error("nng_msg_realloc: {}", nng_strerror(res)); + return false; + } + + *reinterpret_cast(nng_msg_body(msg)) = messageSize; + return message.SerializeToArray(reinterpret_cast(nng_msg_body(msg)) + sizeof(u32), + messageSize); +} + +bool deserialize_proto_from_nng(google::protobuf::MessageLite &message, nng_msg *msg) +{ + auto messageSize = mnode::nng::msg_trim_read(msg); + + if (!messageSize) + return false; + + if (nng_msg_len(msg) < *messageSize) + { + spdlog::error("message too short"); + return false; + } + + bool ret = message.ParseFromArray(reinterpret_cast(nng_msg_body(msg)), *messageSize); + nng_msg_trim(msg, *messageSize); + return ret; +} + struct RPCServer { explicit RPCServer(nng_socket socket) @@ -268,8 +297,8 @@ struct RPCServer nng_socket socket_; std::unordered_map services_; std::unordered_map methods_; - RPCClosure done_; - RPCController controller_; + RpcClosure done_; + RpcController controller_; }; int main()