diff --git a/include/mesytec-mnode/mnode_cpp_types.h b/include/mesytec-mnode/mnode_cpp_types.h new file mode 100644 index 0000000..31d7e43 --- /dev/null +++ b/include/mesytec-mnode/mnode_cpp_types.h @@ -0,0 +1,21 @@ +#ifndef DB95D276_B1C8_4643_AEF5_5224D93DDD22 +#define DB95D276_B1C8_4643_AEF5_5224D93DDD22 + +#include + +namespace mesytec::mnode::cpp_types +{ + +using u8 = std::uint8_t; +using u16 = std::uint16_t; +using u32 = std::uint32_t; +using u64 = std::uint64_t; + +using s8 = std::int8_t; +using s16 = std::int16_t; +using s32 = std::int32_t; +using s64 = std::int64_t; + +} // namespace mesytec::mnode::cpp_types + +#endif /* DB95D276_B1C8_4643_AEF5_5224D93DDD22 */ diff --git a/include/mesytec-mnode/mnode_nng.h b/include/mesytec-mnode/mnode_nng.h index dd88a0c..5fb8f27 100644 --- a/include/mesytec-mnode/mnode_nng.h +++ b/include/mesytec-mnode/mnode_nng.h @@ -242,6 +242,31 @@ inline std::pair receive_message(nng_socket sock, int flags = 0 return { make_unique_msg(msg), 0 }; } +inline int send_message(nng_socket sock, unique_msg &msg, int flags = 0) +{ + if (auto res = nng_sendmsg(sock, msg.get(), flags)) + { + return res; + } + + msg.release(); + return 0; +} + +inline unique_msg allocate_message(size_t size) +{ + nng_msg *msg = nullptr; + + if (auto res = nng_msg_alloc(&msg, size)) + { + mnode_nng_error("allocate_message", res); + return make_unique_msg(); + } + + return make_unique_msg(msg); +} + +// Returned message has size 0 and capacity 'reserve'. inline unique_msg allocate_reserve_message(size_t reserve = 0) { nng_msg *msg = nullptr; diff --git a/proto/CMakeLists.txt b/proto/CMakeLists.txt index 3adfeb9..dd408c7 100644 --- a/proto/CMakeLists.txt +++ b/proto/CMakeLists.txt @@ -1,10 +1,22 @@ find_package(Protobuf REQUIRED) -add_library(mnode-proto service.proto vme.proto mvlc.proto google/rpc/status.proto google/rpc/error_details.proto google/rpc/code.proto) + +set(MNODE_PROTOS + google/rpc/status.proto + google/rpc/error_details.proto + google/rpc/code.proto + mrpc.proto + mvlc.proto + service.proto + vme.proto +) + +add_library(mnode-proto ${MNODE_PROTOS}) target_link_libraries(mnode-proto PUBLIC protobuf::libprotobuf) + target_include_directories(mnode-proto PUBLIC $ PUBLIC $) protobuf_generate(TARGET mnode-proto) -protobuf_generate_python(PROTO_PY service.proto vme.proto) +protobuf_generate_python(PROTO_PY ${MNODE_PROTOS}) add_custom_target(mnode-proto-py ALL DEPENDS ${PROTO_PY}) diff --git a/proto/mrpc.proto b/proto/mrpc.proto new file mode 100644 index 0000000..76a31fc --- /dev/null +++ b/proto/mrpc.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +import "google/protobuf/any.proto"; +import "google/rpc/status.proto"; + +package mesytec.mnode.proto.mrpc; + +message MethodCall +{ + string service = 1; + string method = 2; + google.protobuf.Any request = 3; +} + +message MethodCallResponse +{ + google.rpc.Status status = 1; + google.protobuf.Any response = 2; +} diff --git a/proto/mvlc.proto b/proto/mvlc.proto index 312690f..4f2120f 100644 --- a/proto/mvlc.proto +++ b/proto/mvlc.proto @@ -5,7 +5,7 @@ import "vme.proto"; option cc_generic_services = true; -package mesytec.mnode.mvlc; +package mesytec.mnode.proto.mvlc; message ReadRegisterRequest { @@ -29,10 +29,10 @@ message ReadRegisterResponse optional google.rpc.Status status = 2; } -service MVLC +service MVLCService { rpc ReadRegister(ReadRegisterRequest) returns (ReadRegisterResponse); rpc WriteRegister(WriteRegisterRequest) returns (WriteRegisterResponse); - rpc VMERead(mesytec.mnode.vme.ReadRequest) returns (mesytec.mnode.vme.ReadResponse); - rpc VMEWrite(mesytec.mnode.vme.WriteRequest) returns (mesytec.mnode.vme.WriteResponse); + rpc VMERead(mesytec.mnode.proto.vme.ReadRequest) returns (mesytec.mnode.proto.vme.ReadResponse); + rpc VMEWrite(mesytec.mnode.proto.vme.WriteRequest) returns (mesytec.mnode.proto.vme.WriteResponse); } diff --git a/proto/service.proto b/proto/service.proto index 4591a5c..a98acc3 100644 --- a/proto/service.proto +++ b/proto/service.proto @@ -4,10 +4,10 @@ import "google/protobuf/timestamp.proto"; option cc_generic_services = true; -package mesytec.mnode; +package mesytec.mnode.proto; service PingService { - rpc Ping(mesytec.mnode.Ping) returns (Pong); + rpc Ping(mesytec.mnode.proto.Ping) returns (Pong); } message Ping { diff --git a/proto/vme.proto b/proto/vme.proto index 99e4dff..e67ea72 100644 --- a/proto/vme.proto +++ b/proto/vme.proto @@ -4,7 +4,7 @@ import "google/rpc/status.proto"; option cc_generic_services = true; -package mesytec.mnode.vme; +package mesytec.mnode.proto.vme; enum DataWidth { @@ -38,7 +38,7 @@ message ReadRequest { uint32 address = 1; AddressModifier amod = 2; - DataWidth width = 3; + DataWidth data_width = 3; ReadRequestOptions options = 4; } @@ -47,7 +47,7 @@ message WriteRequest uint32 address = 1; uint32 value = 2; AddressModifier amod = 3; - DataWidth width = 4; + DataWidth data_width = 4; } enum ResponseFlags @@ -60,7 +60,7 @@ enum ResponseFlags message ReadResponse { - repeated uint32 values = 1; + repeated uint32 value = 1; optional ResponseFlags flags = 2; optional google.rpc.Status status = 3; } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 26785bb..2ee959a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -40,6 +40,7 @@ add_mnode_proto_dev_executable(mnode_proto_test1) add_mnode_proto_dev_executable(mnode_proto_ping_client) add_mnode_proto_dev_executable(mnode_proto_ping_server) add_mnode_proto_dev_executable(mnode_proto_rpc_ping_server) +add_mnode_proto_dev_executable(mnode_proto_rpc_ping_client) #add_subdirectory(qt) diff --git a/src/mnode_proto_ping_client.cc b/src/mnode_proto_ping_client.cc index 37baff8..4c11234 100644 --- a/src/mnode_proto_ping_client.cc +++ b/src/mnode_proto_ping_client.cc @@ -47,8 +47,8 @@ class PingClient: public nng::AsyncReqWork } private: - Ping ping_; - Pong pong_; + proto::Ping ping_; + proto::Pong pong_; size_t sequence_number = 1; static std::atomic next_peer_id; size_t peer_id = next_peer_id++; diff --git a/src/mnode_proto_ping_server.cc b/src/mnode_proto_ping_server.cc index 64b71f8..dc97afe 100644 --- a/src/mnode_proto_ping_server.cc +++ b/src/mnode_proto_ping_server.cc @@ -33,8 +33,8 @@ class PingServer: public nng::AsyncRepWork } private: - Ping ping_; - Pong pong_; + proto::Ping ping_; + proto::Pong pong_; static std::atomic next_peer_id; size_t peer_id = next_peer_id++; size_t num_transactions = 0; diff --git a/src/mnode_proto_rpc_ping_client.cc b/src/mnode_proto_rpc_ping_client.cc new file mode 100644 index 0000000..153baa2 --- /dev/null +++ b/src/mnode_proto_rpc_ping_client.cc @@ -0,0 +1,172 @@ +#include + +#include +#include +#include +#include +#include "internal/argh.h" +#include "proto/google/rpc/code.pb.h" +#include "proto/mrpc.pb.h" +#include "proto/mvlc.pb.h" +#include "proto/service.pb.h" + +using namespace mesytec; +using namespace std::literals; +using namespace mesytec::mnode::cpp_types; + +class MVLCService: public mnode::proto::mvlc::MVLCService +{ +public: + explicit MVLCService(mesytec::mvlc::MVLC mvlc) + : mvlc_(mvlc) + { } + + void ReadRegister(::google::protobuf::RpcController *controller, + const mnode::proto::mvlc::ReadRegisterRequest *request, + mnode::proto::mvlc::ReadRegisterResponse *response, + ::google::protobuf::Closure *done) override + { + spdlog::info("MVLCService::ReadRegister"); + u32 value; + if (auto ec = mvlc_.readRegister(request->address(), value)) + { + response->mutable_status()->set_code(ec.value()); + response->mutable_status()->set_message(ec.message()); + controller->SetFailed(ec.message()); + } + else + { + response->set_value(value); + } + + done->Run(); + } + +private: + mesytec::mvlc::MVLC mvlc_; +}; + +void print_service_descriptors(google::protobuf::Service &service) +{ + auto sd = service.GetDescriptor(); + spdlog::info("service: full_name={}, index={}, method_count={}", + sd->full_name(), sd->index(), sd->method_count()); + + for (int i = 0; i < sd->method_count(); ++i) + { + auto md = sd->method(i); + auto it = md->input_type(); + auto ot = md->output_type(); + spdlog::info(" method: full_name={}, index={}, input_type={}, output_type={}", + md->full_name(), md->index(), it->full_name(), ot->full_name()); + } +} + +struct RPCClosure: public google::protobuf::Closure +{ + void Run() override { } +}; + +struct RPCController: public google::protobuf::RpcController +{ + void Reset() override { } + bool Failed() const override { return false; } + std::string ErrorText() const override { return ""; } + void StartCancel() override { } + void SetFailed(const std::string &reason) override { } + bool IsCanceled() const override { return false; } + void NotifyOnCancel(google::protobuf::Closure *callback) override { } +}; + +struct RpcChannel: public google::protobuf::RpcChannel +{ + explicit RpcChannel(nng_socket socket) + : socket_(socket) + { + } + + int send_request(const mnode::proto::mrpc::MethodCall &methodCall) + { + auto methodCallSize = methodCall.ByteSizeLong(); + auto msg = mnode::nng::allocate_message(sizeof(u32) + methodCallSize); + *reinterpret_cast(nng_msg_body(msg.get())) = methodCallSize; + if (!methodCall.SerializeToArray(reinterpret_cast(nng_msg_body(msg.get())) + sizeof(u32), methodCallSize)) + { + spdlog::error("methodCall.SerializeToArray failed"); + } + spdlog::info("sending request: nng_msg_len={}, methodCallSize={}", nng_msg_len(msg.get()), methodCallSize); + return mnode::nng::send_message(socket_, msg); + } + + void CallMethod(const google::protobuf::MethodDescriptor *method, + google::protobuf::RpcController *controller, + const google::protobuf::Message *request, + google::protobuf::Message *response, + google::protobuf::Closure *done) override + { + spdlog::info("RpcChannel::CallMethod: method={}, request={}, response={}", + method->full_name(), request->ShortDebugString(), response->ShortDebugString()); + + methodCall_.set_service(method->service()->full_name()); + methodCall_.set_method(method->full_name()); + methodCall_.mutable_request()->PackFrom(*request); + + if (auto nng_res = send_request(methodCall_)) + { + spdlog::error("nng_sendmsg: {}", nng_strerror(nng_res)); + return; + } + + auto [nng_response, nng_res] = mnode::nng::receive_message(socket_); + + if (nng_res) + { + spdlog::error("nng_recvmsg: {}", nng_strerror(nng_res)); + return; + } + else if (nng_res == NNG_ETIMEDOUT) + { + spdlog::trace("nng_recvmsg: NNG_ETIMEDOUT"); + return; + } + + auto callResponseSize = mnode::nng::msg_trim_read(nng_response.get()); + } + + nng_socket socket_; + mnode::proto::mrpc::MethodCall methodCall_; + mnode::proto::mrpc::MethodCallResponse callResponse_; + +}; + +int main() +{ + namespace nng = mnode::nng; + spdlog::set_level(spdlog::level::debug); + auto socket = nng::make_req_socket(); + + if (int res = nng_dial(socket, "tcp://localhost:5555", nullptr, NNG_FLAG_NONBLOCK)) + { + nng::mnode_nng_error("nng_dial", res); + return res; + } + + RPCController rpcController; + RPCClosure done; + RpcChannel rpcChannel(socket); + mnode::proto::mvlc::MVLCService::Stub mvlcService(&rpcChannel); + + for (int i = 0; i < 10; ++i) + { + mnode::proto::vme::ReadRequest request; + request.set_address(0xffff600e); + request.mutable_amod()->set_value(mesytec::mvlc::vme_amods::A32); + request.set_data_width(mnode::proto::vme::DataWidth::D32); + mnode::proto::vme::ReadResponse response; + spdlog::info("request: {}", request.ShortDebugString()); + mvlcService.VMERead(&rpcController, &request, &response, &done); + spdlog::info("response: {}", response.ShortDebugString()); + } + + return 0; +} diff --git a/src/mnode_proto_rpc_ping_server.cc b/src/mnode_proto_rpc_ping_server.cc index 1b8d8d8..26bc236 100644 --- a/src/mnode_proto_rpc_ping_server.cc +++ b/src/mnode_proto_rpc_ping_server.cc @@ -2,19 +2,88 @@ #include #include -#include "proto/service.pb.h" -#include "proto/mvlc.pb.h" #include +#include #include "internal/argh.h" +#include "proto/google/rpc/code.pb.h" +#include "proto/mrpc.pb.h" +#include "proto/mvlc.pb.h" +#include "proto/service.pb.h" using namespace mesytec; using namespace std::literals; +using namespace mesytec::mnode::cpp_types; + +class MVLCService: public mnode::proto::mvlc::MVLCService +{ + public: + explicit MVLCService(mesytec::mvlc::MVLC mvlc) + : mvlc_(mvlc) + { + } + + void ReadRegister(::google::protobuf::RpcController *controller, + const mnode::proto::mvlc::ReadRegisterRequest *request, + mnode::proto::mvlc::ReadRegisterResponse *response, + ::google::protobuf::Closure *done) override + { + spdlog::info("MVLCService::ReadRegister"); + u32 value; + if (auto ec = mvlc_.readRegister(request->address(), value)) + { + response->mutable_status()->set_code(ec.value()); + response->mutable_status()->set_message(ec.message()); + controller->SetFailed(ec.message()); + } + else + { + response->set_value(value); + } + + done->Run(); + } + + void VMERead(::google::protobuf::RpcController *controller, + const mnode::proto::vme::ReadRequest *request, + mnode::proto::vme::ReadResponse *response, + ::google::protobuf::Closure *done) override + { + spdlog::info("MVLCService::VMERead"); + + auto amod = request->amod().value(); + auto data_width = static_cast(request->data_width()); + u16 maxTransfers = request->options().max_transfers(); + bool fifo = request->options().fifo_read(); + std::error_code ec; + + if (mesytec::mvlc::vme_amods::is_block_mode(amod)) + { + std::vector dest; + ec = mvlc_.vmeBlockRead(request->address(), amod, maxTransfers, dest, fifo); + for (auto d : dest) + { + response->add_value(d); + } + } + else + { + u32 dest = 0; + ec = mvlc_.vmeRead(request->address(), dest, amod, data_width); + response->add_value(dest); + } + + done->Run(); + } + + private: + mesytec::mvlc::MVLC mvlc_; +}; void print_service_descriptors(google::protobuf::Service &service) { auto sd = service.GetDescriptor(); - spdlog::info("service: full_name={}, index={}, method_count={}", - sd->full_name(), sd->index(), sd->method_count()); + spdlog::info("service: full_name={}, index={}, method_count={}", sd->full_name(), sd->index(), + sd->method_count()); for (int i = 0; i < sd->method_count(); ++i) { @@ -22,22 +91,201 @@ void print_service_descriptors(google::protobuf::Service &service) auto it = md->input_type(); auto ot = md->output_type(); spdlog::info(" method: full_name={}, index={}, input_type={}, output_type={}", - md->full_name(), md->index(), it->full_name(), ot->full_name()); + md->full_name(), md->index(), it->full_name(), ot->full_name()); } } +struct RPCClosure: public google::protobuf::Closure +{ + void Run() override {} +}; + +struct RPCController: public google::protobuf::RpcController +{ + void Reset() override + { + spdlog::debug("RPCController::Reset"); + } + bool Failed() const override { return false; } + std::string ErrorText() const override { return ""; } + void StartCancel() override {} + void SetFailed(const std::string &reason) override + { + spdlog::error("RPCController::SetFailed: reason={}", reason); + } + + bool IsCanceled() const override { return false; } + void NotifyOnCancel(google::protobuf::Closure */*callback*/) override {} +}; + +struct RpcChannel: public google::protobuf::RpcChannel +{ + void CallMethod(const google::protobuf::MethodDescriptor *method, + google::protobuf::RpcController *controller, + const google::protobuf::Message *request, + google::protobuf::Message *response, + google::protobuf::Closure *done) override + { + spdlog::info("RpcChannel::CallMethod: method={}, request={}, response={}", + method->full_name(), request->ShortDebugString(), response->GetTypeName()); + } +}; + +struct RPCServer +{ + explicit RPCServer(nng_socket socket) + : socket_(socket) + { + } + + void registerService(google::protobuf::Service *service) + { + auto sd = service->GetDescriptor(); + services_[sd->full_name()] = service; + + for (int i = 0; i < sd->method_count(); ++i) + { + auto md = sd->method(i); + methods_[md->full_name()] = md; + } + } + + // request format is u32: size + serialize MethodCall + // response format is u32:size + serialized MethodCallResponse + + int send_response(const mnode::proto::mrpc::MethodCallResponse &response) + { + auto responseSize = response.ByteSizeLong(); + auto msg = mnode::nng::allocate_reserve_message(sizeof(u32) + responseSize); + *reinterpret_cast(nng_msg_body(msg.get())) = responseSize; + response.SerializeToArray(reinterpret_cast(nng_msg_body(msg.get())) + sizeof(u32), + responseSize); + return mnode::nng::send_message(socket_, msg); + } + + void call_method(google::protobuf::Service *service, + const google::protobuf::MethodDescriptor *method, + const google::protobuf::Any &anyRequest, + mnode::proto::mrpc::MethodCallResponse &callResponse) + { + auto request = + std::unique_ptr(service->GetRequestPrototype(method).New()); + + anyRequest.UnpackTo(request.get()); // FIXME: check bool return + + auto response = + std::unique_ptr(service->GetResponsePrototype(method).New()); + + spdlog::debug("call_method: method={}, request={}, responsePrototype={}", + method->full_name(), request->ShortDebugString(), response->GetTypeName()); + + service->CallMethod(method, &controller_, request.get(), response.get(), &done_); + + spdlog::debug("call_method: response={}", response->ShortDebugString()); + + callResponse.mutable_status()->set_code(google::rpc::Code::OK); + callResponse.mutable_response()->PackFrom(*response); + } + + void run() + { + mnode::proto::mrpc::MethodCall methodCall_; + mnode::proto::mrpc::MethodCallResponse methodCallResponse_; + + while (true) + { + auto [request, nng_res] = mnode::nng::receive_message(socket_); + + if (nng_res != 0 && nng_res != NNG_ETIMEDOUT) + { + spdlog::error("nng_recvmsg: {}", nng_strerror(nng_res)); + return; + } + else if (nng_res == NNG_ETIMEDOUT) + { + spdlog::trace("nng_recvmsg: NNG_ETIMEDOUT"); + continue; + } + + spdlog::info("received request: nng_msg_len={}", nng_msg_len(request.get())); + + methodCallResponse_.Clear(); + + auto methodCallSize = mnode::nng::msg_trim_read(request.get()); + + if (!methodCallSize) + { + spdlog::error("msg_trim_read failed"); + methodCallResponse_.mutable_status()->set_code(google::rpc::Code::INTERNAL); + send_response(methodCallResponse_); + continue; + } + + if (nng_msg_len(request.get()) < *methodCallSize) + { + spdlog::error("message too short"); + methodCallResponse_.mutable_status()->set_code(google::rpc::Code::INTERNAL); + send_response(methodCallResponse_); + continue; + } + + // TODO: check bool return + if (!methodCall_.ParseFromArray(nng_msg_body(request.get()), *methodCallSize)) + { + methodCallResponse_.mutable_status()->set_code(google::rpc::Code::INTERNAL); + send_response(methodCallResponse_); + continue; + } + + spdlog::debug("methodCall: service={}, method={}, request={}", methodCall_.service(), + methodCall_.method(), methodCall_.request().ShortDebugString()); + + auto serviceIt = services_.find(methodCall_.service()); + + if (serviceIt == services_.end()) + { + spdlog::error("service not found: {}", methodCall_.service()); + methodCallResponse_.mutable_status()->set_code(google::rpc::Code::NOT_FOUND); + send_response(methodCallResponse_); + continue; + } + + auto methodIt = methods_.find(methodCall_.method()); + + if (methodIt == methods_.end()) + { + spdlog::error("method not found: {}", methodCall_.method()); + continue; + methodCallResponse_.mutable_status()->set_code(google::rpc::Code::NOT_FOUND); + send_response(methodCallResponse_); + } + + call_method(serviceIt->second, methodIt->second, methodCall_.request(), + methodCallResponse_); + } + } + + nng_socket socket_; + std::unordered_map services_; + std::unordered_map methods_; + RPCClosure done_; + RPCController controller_; +}; + int main() { namespace nng = mnode::nng; spdlog::set_level(spdlog::level::debug); - mnode::PingService_Stub pingService(nullptr); + mnode::proto::PingService_Stub pingService(nullptr); print_service_descriptors(pingService); - mnode::vme::VmeAccess_Stub vmeService(nullptr); + mnode::proto::vme::VmeAccess_Stub vmeService(nullptr); print_service_descriptors(vmeService); - mnode::mvlc::MVLC_Stub mvlcService(nullptr); + auto mvlc = mesytec::mvlc::make_mvlc("mvlc-0124"); + + MVLCService mvlcService(mvlc); print_service_descriptors(mvlcService); auto socket = nng::make_rep_socket(); @@ -48,6 +296,9 @@ int main() return res; } + RPCServer rpcServer(socket); + rpcServer.registerService(&mvlcService); + rpcServer.run(); return 0; } diff --git a/src/mnode_proto_test1.cc b/src/mnode_proto_test1.cc index d7be51f..9abff19 100644 --- a/src/mnode_proto_test1.cc +++ b/src/mnode_proto_test1.cc @@ -10,8 +10,8 @@ void requester(nng_socket socket, unsigned id) { mvlc::util::Stopwatch sw; std::uint32_t seq = 0; - mnode::Ping ping; - mnode::Pong pong; + mnode::proto::Ping ping; + mnode::proto::Pong pong; while (true) { @@ -48,8 +48,8 @@ void requester(nng_socket socket, unsigned id) void responder(nng_socket socket, unsigned id) { mvlc::util::Stopwatch sw; - mnode::Ping ping; - mnode::Pong pong; + mnode::proto::Ping ping; + mnode::proto::Pong pong; while (true) {