possible low level mvlc proto interface

This commit is contained in:
Florian Lüke 2024-12-17 23:27:48 +01:00
parent 2c75142cb8
commit 728bad6841
5 changed files with 79 additions and 60 deletions

@ -1 +1 @@
Subproject commit 3347a186c7c3eb0a12b6baa76b90e71d573fcd39 Subproject commit 51cf6051d87a52984fe7a8feaf6f3516be5358cd

View file

@ -12,18 +12,18 @@ struct IWork
virtual void work() = 0; virtual void work() = 0;
}; };
struct IAsyncReqWork: public IWork struct IReqWork: public IWork
{ {
virtual nng::unique_msg make_request() = 0; virtual nng::unique_msg make_request() = 0;
virtual void handle_reply(nng::unique_msg &&request, nng::unique_msg &&reply) = 0; virtual void handle_reply(nng::unique_msg &&request, nng::unique_msg &&reply) = 0;
}; };
struct IAsyncRepWork: public IWork struct IRepWork: public IWork
{ {
virtual nng::unique_msg handle_request(nng::unique_msg &&request) = 0; virtual nng::unique_msg handle_request(nng::unique_msg &&request) = 0;
}; };
class AsyncReqWork: public IAsyncReqWork class AsyncReqWork: public IReqWork
{ {
public: public:
explicit AsyncReqWork(nng_socket socket); explicit AsyncReqWork(nng_socket socket);
@ -39,7 +39,7 @@ class AsyncReqWork: public IAsyncReqWork
nng::unique_msg request_; nng::unique_msg request_;
}; };
class AsyncRepWork: public IAsyncRepWork class AsyncRepWork: public IRepWork
{ {
public: public:
explicit AsyncRepWork(nng_socket socket); explicit AsyncRepWork(nng_socket socket);

View file

@ -29,8 +29,72 @@ message ReadRegisterResponse
optional google.rpc.Status status = 2; optional google.rpc.Status status = 2;
} }
message Buffer
{
repeated uint32 data = 1;
}
message LowLevelRequest
{
Buffer contents = 1;
}
message LowLevelResponse
{
Buffer contents = 1;
}
message CommandRequest
{
Buffer contents = 1;
};
message CommandResponse
{
Buffer contents = 1;
};
enum Pipe
{
Command = 0;
Data = 1;
}
message WriteRequest
{
Pipe pipe = 1;
bytes data = 2;
}
message WriteResponse
{
google.rpc.Status status = 1;
uint32 bytes_written = 2;
}
message ReadRequest
{
Pipe pipe = 1;
uint32 bytes_to_read = 2;
}
message ReadResponse
{
google.rpc.Status status = 1;
bytes data = 2;
}
service MvlcCoreService
{
rpc Write(WriteRequest) returns (WriteResponse);
rpc Read(ReadRequest) returns (ReadResponse);
}
service MVLCService service MVLCService
{ {
rpc LowLevelTransaction(LowLevelRequest) returns (LowLevelResponse);
rpc CommandTransaction(CommandRequest) returns (CommandResponse);
rpc ReadRegister(ReadRegisterRequest) returns (ReadRegisterResponse); rpc ReadRegister(ReadRegisterRequest) returns (ReadRegisterResponse);
rpc WriteRegister(WriteRegisterRequest) returns (WriteRegisterResponse); rpc WriteRegister(WriteRegisterRequest) returns (WriteRegisterResponse);
rpc VMERead(mesytec.mnode.proto.vme.ReadRequest) returns (mesytec.mnode.proto.vme.ReadResponse); rpc VMERead(mesytec.mnode.proto.vme.ReadRequest) returns (mesytec.mnode.proto.vme.ReadResponse);

View file

@ -60,7 +60,7 @@ enum ResponseFlags
message ReadResponse message ReadResponse
{ {
repeated uint32 value = 1; repeated uint32 values = 1;
optional ResponseFlags flags = 2; optional ResponseFlags flags = 2;
optional google.rpc.Status status = 3; optional google.rpc.Status status = 3;
} }

View file

@ -27,13 +27,13 @@ class MVLCService: public mnode::proto::mvlc::MVLCService
mnode::proto::mvlc::ReadRegisterResponse *response, mnode::proto::mvlc::ReadRegisterResponse *response,
::google::protobuf::Closure *done) override ::google::protobuf::Closure *done) override
{ {
(void)controller;
spdlog::info("MVLCService::ReadRegister"); spdlog::info("MVLCService::ReadRegister");
u32 value; u32 value;
if (auto ec = mvlc_.readRegister(request->address(), value)) if (auto ec = mvlc_.readRegister(request->address(), value))
{ {
response->mutable_status()->set_code(ec.value()); response->mutable_status()->set_code(ec.value());
response->mutable_status()->set_message(ec.message()); response->mutable_status()->set_message(ec.message());
controller->SetFailed(ec.message());
} }
else else
{ {
@ -62,14 +62,14 @@ class MVLCService: public mnode::proto::mvlc::MVLCService
ec = mvlc_.vmeBlockRead(request->address(), amod, maxTransfers, dest, fifo); ec = mvlc_.vmeBlockRead(request->address(), amod, maxTransfers, dest, fifo);
for (auto d: dest) for (auto d: dest)
{ {
response->add_value(d); response->add_values(d);
} }
} }
else else
{ {
u32 dest = 0; u32 dest = 0;
ec = mvlc_.vmeRead(request->address(), dest, amod, data_width); ec = mvlc_.vmeRead(request->address(), dest, amod, data_width);
response->add_value(dest); response->add_values(dest);
} }
done->Run(); done->Run();
@ -102,64 +102,19 @@ struct RpcClosure: public google::protobuf::Closure
struct RpcController: public google::protobuf::RpcController struct RpcController: public google::protobuf::RpcController
{ {
void Reset() override { spdlog::debug("RpcController::Reset"); } // Client-side methods ---------------------------------------------
void Reset() override {}
bool Failed() const override { return false; } bool Failed() const override { return false; }
std::string ErrorText() const override { return ""; } std::string ErrorText() const override { return {}; }
void StartCancel() override {} void StartCancel() override {}
void SetFailed(const std::string &reason) override
{ // Server-side methods ---------------------------------------------
spdlog::error("RpcController::SetFailed: reason={}", reason); void SetFailed(const std::string &) override { assert(!"SetFailed not implemented"); }
}
bool IsCanceled() const override { return false; } bool IsCanceled() const override { return false; }
void NotifyOnCancel(google::protobuf::Closure * /*callback*/) override {} void NotifyOnCancel(google::protobuf::Closure * /*callback*/) override {}
}; };
struct RpcChannel: public google::protobuf::RpcChannel
{
void CallMethod(const google::protobuf::MethodDescriptor *method,
google::protobuf::RpcController *controller,
const google::protobuf::Message *request, google::protobuf::Message *response,
google::protobuf::Closure *done) override
{
spdlog::info("RpcChannel::CallMethod: method={}, request={}, response={}",
method->full_name(), request->ShortDebugString(), response->GetTypeName());
}
};
bool serialize_proto_to_nng(const google::protobuf::MessageLite &message, nng_msg *msg)
{
auto messageSize = message.ByteSizeLong();
if (auto res = nng_msg_realloc(msg, nng_msg_len(msg) + sizeof(u32) + messageSize); res != 0)
{
spdlog::error("nng_msg_realloc: {}", nng_strerror(res));
return false;
}
*reinterpret_cast<u32 *>(nng_msg_body(msg)) = messageSize;
return message.SerializeToArray(reinterpret_cast<u8 *>(nng_msg_body(msg)) + sizeof(u32),
messageSize);
}
bool deserialize_proto_from_nng(google::protobuf::MessageLite &message, nng_msg *msg)
{
auto messageSize = mnode::nng::msg_trim_read<u32>(msg);
if (!messageSize)
return false;
if (nng_msg_len(msg) < *messageSize)
{
spdlog::error("message too short");
return false;
}
bool ret = message.ParseFromArray(reinterpret_cast<u8 *>(nng_msg_body(msg)), *messageSize);
nng_msg_trim(msg, *messageSize);
return ret;
}
struct RPCServer struct RPCServer
{ {
explicit RPCServer(nng_socket socket) explicit RPCServer(nng_socket socket)