#include #include #include #include #include #include "internal/argh.h" #include "proto/google/rpc/code.pb.h" #include "proto/mrpc.pb.h" #include "proto/mvlc.pb.h" #include "proto/service.pb.h" using namespace mesytec; using namespace std::literals; using namespace mesytec::mnode; class MVLCService: public mnode::proto::mvlc::MVLCService { public: explicit MVLCService(mesytec::mvlc::MVLC mvlc) : mvlc_(mvlc) { } void ReadRegister(::google::protobuf::RpcController *controller, const mnode::proto::mvlc::ReadRegisterRequest *request, mnode::proto::mvlc::ReadRegisterResponse *response, ::google::protobuf::Closure *done) override { spdlog::info("MVLCService::ReadRegister"); u32 value; if (auto ec = mvlc_.readRegister(request->address(), value)) { response->mutable_status()->set_code(ec.value()); response->mutable_status()->set_message(ec.message()); controller->SetFailed(ec.message()); } else { response->set_value(value); } done->Run(); } void VMERead(::google::protobuf::RpcController *controller, const mnode::proto::vme::ReadRequest *request, mnode::proto::vme::ReadResponse *response, ::google::protobuf::Closure *done) override { spdlog::info("MVLCService::VMERead"); auto amod = request->amod().value(); auto data_width = static_cast(request->data_width()); u16 maxTransfers = request->options().max_transfers(); bool fifo = request->options().fifo_read(); std::error_code ec; if (mesytec::mvlc::vme_amods::is_block_mode(amod)) { std::vector dest; ec = mvlc_.vmeBlockRead(request->address(), amod, maxTransfers, dest, fifo); for (auto d: dest) { response->add_value(d); } } else { u32 dest = 0; ec = mvlc_.vmeRead(request->address(), dest, amod, data_width); response->add_value(dest); } done->Run(); } private: mesytec::mvlc::MVLC mvlc_; }; void print_service_descriptors(google::protobuf::Service &service) { auto sd = service.GetDescriptor(); spdlog::info("service: full_name={}, index={}, method_count={}", sd->full_name(), sd->index(), sd->method_count()); for (int i = 0; i < sd->method_count(); ++i) { auto md = sd->method(i); auto it = md->input_type(); auto ot = md->output_type(); spdlog::info(" method: full_name={}, index={}, input_type={}, output_type={}", md->full_name(), md->index(), it->full_name(), ot->full_name()); } } struct RpcClosure: public google::protobuf::Closure { void Run() override {} }; struct RpcController: public google::protobuf::RpcController { void Reset() override { spdlog::debug("RpcController::Reset"); } bool Failed() const override { return false; } std::string ErrorText() const override { return ""; } void StartCancel() override {} void SetFailed(const std::string &reason) override { spdlog::error("RpcController::SetFailed: reason={}", reason); } bool IsCanceled() const override { return false; } void NotifyOnCancel(google::protobuf::Closure * /*callback*/) override {} }; struct RpcChannel: public google::protobuf::RpcChannel { void CallMethod(const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller, const google::protobuf::Message *request, google::protobuf::Message *response, google::protobuf::Closure *done) override { spdlog::info("RpcChannel::CallMethod: method={}, request={}, response={}", method->full_name(), request->ShortDebugString(), response->GetTypeName()); } }; bool serialize_proto_to_nng(const google::protobuf::MessageLite &message, nng_msg *msg) { auto messageSize = message.ByteSizeLong(); if (auto res = nng_msg_realloc(msg, nng_msg_len(msg) + sizeof(u32) + messageSize); res != 0) { spdlog::error("nng_msg_realloc: {}", nng_strerror(res)); return false; } *reinterpret_cast(nng_msg_body(msg)) = messageSize; return message.SerializeToArray(reinterpret_cast(nng_msg_body(msg)) + sizeof(u32), messageSize); } bool deserialize_proto_from_nng(google::protobuf::MessageLite &message, nng_msg *msg) { auto messageSize = mnode::nng::msg_trim_read(msg); if (!messageSize) return false; if (nng_msg_len(msg) < *messageSize) { spdlog::error("message too short"); return false; } bool ret = message.ParseFromArray(reinterpret_cast(nng_msg_body(msg)), *messageSize); nng_msg_trim(msg, *messageSize); return ret; } struct RPCServer { explicit RPCServer(nng_socket socket) : socket_(socket) { } void registerService(google::protobuf::Service *service) { auto sd = service->GetDescriptor(); services_[sd->full_name()] = service; for (int i = 0; i < sd->method_count(); ++i) { auto md = sd->method(i); methods_[md->full_name()] = md; } } // request format is u32: size + serialize MethodCall // response format is u32:size + serialized MethodCallResponse int send_response(const mnode::proto::mrpc::MethodCallResponse &response) { auto responseSize = response.ByteSizeLong(); auto msg = mnode::nng::allocate_reserve_message(sizeof(u32) + responseSize); *reinterpret_cast(nng_msg_body(msg.get())) = responseSize; response.SerializeToArray(reinterpret_cast(nng_msg_body(msg.get())) + sizeof(u32), responseSize); return mnode::nng::send_message(socket_, msg); } void call_method(google::protobuf::Service *service, const google::protobuf::MethodDescriptor *method, const google::protobuf::Any &anyRequest, mnode::proto::mrpc::MethodCallResponse &callResponse) { auto request = std::unique_ptr(service->GetRequestPrototype(method).New()); anyRequest.UnpackTo(request.get()); // FIXME: check bool return auto response = std::unique_ptr(service->GetResponsePrototype(method).New()); spdlog::debug("call_method: method={}, request={}, responsePrototype={}", method->full_name(), request->ShortDebugString(), response->GetTypeName()); service->CallMethod(method, &controller_, request.get(), response.get(), &done_); spdlog::debug("call_method: response={}", response->ShortDebugString()); callResponse.mutable_status()->set_code(google::rpc::Code::OK); callResponse.mutable_response()->PackFrom(*response); } void run() { mnode::proto::mrpc::MethodCall methodCall_; mnode::proto::mrpc::MethodCallResponse methodCallResponse_; while (true) { auto [request, nng_res] = mnode::nng::receive_message(socket_); if (nng_res != 0 && nng_res != NNG_ETIMEDOUT) { spdlog::error("nng_recvmsg: {}", nng_strerror(nng_res)); return; } else if (nng_res == NNG_ETIMEDOUT) { spdlog::trace("nng_recvmsg: NNG_ETIMEDOUT"); continue; } spdlog::info("received request: nng_msg_len={}", nng_msg_len(request.get())); methodCallResponse_.Clear(); auto methodCallSize = mnode::nng::msg_trim_read(request.get()); if (!methodCallSize) { spdlog::error("msg_trim_read failed"); methodCallResponse_.mutable_status()->set_code(google::rpc::Code::INTERNAL); send_response(methodCallResponse_); continue; } if (nng_msg_len(request.get()) < *methodCallSize) { spdlog::error("message too short"); methodCallResponse_.mutable_status()->set_code(google::rpc::Code::INTERNAL); send_response(methodCallResponse_); continue; } // TODO: check bool return if (!methodCall_.ParseFromArray(nng_msg_body(request.get()), *methodCallSize)) { methodCallResponse_.mutable_status()->set_code(google::rpc::Code::INTERNAL); send_response(methodCallResponse_); continue; } spdlog::debug("methodCall: service={}, method={}, request={}", methodCall_.service(), methodCall_.method(), methodCall_.request().ShortDebugString()); auto serviceIt = services_.find(methodCall_.service()); if (serviceIt == services_.end()) { spdlog::error("service not found: {}", methodCall_.service()); methodCallResponse_.mutable_status()->set_code(google::rpc::Code::NOT_FOUND); send_response(methodCallResponse_); continue; } auto methodIt = methods_.find(methodCall_.method()); if (methodIt == methods_.end()) { spdlog::error("method not found: {}", methodCall_.method()); continue; methodCallResponse_.mutable_status()->set_code(google::rpc::Code::NOT_FOUND); send_response(methodCallResponse_); } call_method(serviceIt->second, methodIt->second, methodCall_.request(), methodCallResponse_); } } nng_socket socket_; std::unordered_map services_; std::unordered_map methods_; RpcClosure done_; RpcController controller_; }; int main() { namespace nng = mnode::nng; spdlog::set_level(spdlog::level::debug); mnode::proto::PingService_Stub pingService(nullptr); print_service_descriptors(pingService); mnode::proto::vme::VmeAccess_Stub vmeService(nullptr); print_service_descriptors(vmeService); auto mvlc = mesytec::mvlc::make_mvlc("mvlc-0124"); MVLCService mvlcService(mvlc); print_service_descriptors(mvlcService); auto socket = nng::make_rep_socket(); if (int res = nng_listen(socket, "tcp://localhost:5555", nullptr, 0)) { nng::mnode_nng_error("nng_listen", res); return res; } RPCServer rpcServer(socket); rpcServer.registerService(&mvlcService); rpcServer.run(); return 0; }