add prototype nng reqrep based blocking protobuf rpc server and client
The code requires cc_generic_services = true in the service proto files. nng_msg payload is: request format : size:u32 + serialized MethodCall response format: size:u32 + serialized MethodCallResponse Dispatch is done using descriptors, message prototypes and protobuf.Any. Error handling is missing apart from error logging.
This commit is contained in:
parent
7ab1af93e3
commit
bf03a19820
13 changed files with 529 additions and 28 deletions
21
include/mesytec-mnode/mnode_cpp_types.h
Normal file
21
include/mesytec-mnode/mnode_cpp_types.h
Normal file
|
@ -0,0 +1,21 @@
|
|||
#ifndef DB95D276_B1C8_4643_AEF5_5224D93DDD22
|
||||
#define DB95D276_B1C8_4643_AEF5_5224D93DDD22
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
namespace mesytec::mnode::cpp_types
|
||||
{
|
||||
|
||||
using u8 = std::uint8_t;
|
||||
using u16 = std::uint16_t;
|
||||
using u32 = std::uint32_t;
|
||||
using u64 = std::uint64_t;
|
||||
|
||||
using s8 = std::int8_t;
|
||||
using s16 = std::int16_t;
|
||||
using s32 = std::int32_t;
|
||||
using s64 = std::int64_t;
|
||||
|
||||
} // namespace mesytec::mnode::cpp_types
|
||||
|
||||
#endif /* DB95D276_B1C8_4643_AEF5_5224D93DDD22 */
|
|
@ -242,6 +242,31 @@ inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0
|
|||
return { make_unique_msg(msg), 0 };
|
||||
}
|
||||
|
||||
inline int send_message(nng_socket sock, unique_msg &msg, int flags = 0)
|
||||
{
|
||||
if (auto res = nng_sendmsg(sock, msg.get(), flags))
|
||||
{
|
||||
return res;
|
||||
}
|
||||
|
||||
msg.release();
|
||||
return 0;
|
||||
}
|
||||
|
||||
inline unique_msg allocate_message(size_t size)
|
||||
{
|
||||
nng_msg *msg = nullptr;
|
||||
|
||||
if (auto res = nng_msg_alloc(&msg, size))
|
||||
{
|
||||
mnode_nng_error("allocate_message", res);
|
||||
return make_unique_msg();
|
||||
}
|
||||
|
||||
return make_unique_msg(msg);
|
||||
}
|
||||
|
||||
// Returned message has size 0 and capacity 'reserve'.
|
||||
inline unique_msg allocate_reserve_message(size_t reserve = 0)
|
||||
{
|
||||
nng_msg *msg = nullptr;
|
||||
|
|
|
@ -1,10 +1,22 @@
|
|||
find_package(Protobuf REQUIRED)
|
||||
add_library(mnode-proto service.proto vme.proto mvlc.proto google/rpc/status.proto google/rpc/error_details.proto google/rpc/code.proto)
|
||||
|
||||
set(MNODE_PROTOS
|
||||
google/rpc/status.proto
|
||||
google/rpc/error_details.proto
|
||||
google/rpc/code.proto
|
||||
mrpc.proto
|
||||
mvlc.proto
|
||||
service.proto
|
||||
vme.proto
|
||||
)
|
||||
|
||||
add_library(mnode-proto ${MNODE_PROTOS})
|
||||
target_link_libraries(mnode-proto PUBLIC protobuf::libprotobuf)
|
||||
|
||||
target_include_directories(mnode-proto
|
||||
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}/>
|
||||
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}/../>)
|
||||
|
||||
protobuf_generate(TARGET mnode-proto)
|
||||
protobuf_generate_python(PROTO_PY service.proto vme.proto)
|
||||
protobuf_generate_python(PROTO_PY ${MNODE_PROTOS})
|
||||
add_custom_target(mnode-proto-py ALL DEPENDS ${PROTO_PY})
|
||||
|
|
19
proto/mrpc.proto
Normal file
19
proto/mrpc.proto
Normal file
|
@ -0,0 +1,19 @@
|
|||
syntax = "proto3";
|
||||
|
||||
import "google/protobuf/any.proto";
|
||||
import "google/rpc/status.proto";
|
||||
|
||||
package mesytec.mnode.proto.mrpc;
|
||||
|
||||
message MethodCall
|
||||
{
|
||||
string service = 1;
|
||||
string method = 2;
|
||||
google.protobuf.Any request = 3;
|
||||
}
|
||||
|
||||
message MethodCallResponse
|
||||
{
|
||||
google.rpc.Status status = 1;
|
||||
google.protobuf.Any response = 2;
|
||||
}
|
|
@ -5,7 +5,7 @@ import "vme.proto";
|
|||
|
||||
option cc_generic_services = true;
|
||||
|
||||
package mesytec.mnode.mvlc;
|
||||
package mesytec.mnode.proto.mvlc;
|
||||
|
||||
message ReadRegisterRequest
|
||||
{
|
||||
|
@ -29,10 +29,10 @@ message ReadRegisterResponse
|
|||
optional google.rpc.Status status = 2;
|
||||
}
|
||||
|
||||
service MVLC
|
||||
service MVLCService
|
||||
{
|
||||
rpc ReadRegister(ReadRegisterRequest) returns (ReadRegisterResponse);
|
||||
rpc WriteRegister(WriteRegisterRequest) returns (WriteRegisterResponse);
|
||||
rpc VMERead(mesytec.mnode.vme.ReadRequest) returns (mesytec.mnode.vme.ReadResponse);
|
||||
rpc VMEWrite(mesytec.mnode.vme.WriteRequest) returns (mesytec.mnode.vme.WriteResponse);
|
||||
rpc VMERead(mesytec.mnode.proto.vme.ReadRequest) returns (mesytec.mnode.proto.vme.ReadResponse);
|
||||
rpc VMEWrite(mesytec.mnode.proto.vme.WriteRequest) returns (mesytec.mnode.proto.vme.WriteResponse);
|
||||
}
|
||||
|
|
|
@ -4,10 +4,10 @@ import "google/protobuf/timestamp.proto";
|
|||
|
||||
option cc_generic_services = true;
|
||||
|
||||
package mesytec.mnode;
|
||||
package mesytec.mnode.proto;
|
||||
|
||||
service PingService {
|
||||
rpc Ping(mesytec.mnode.Ping) returns (Pong);
|
||||
rpc Ping(mesytec.mnode.proto.Ping) returns (Pong);
|
||||
}
|
||||
|
||||
message Ping {
|
||||
|
|
|
@ -4,7 +4,7 @@ import "google/rpc/status.proto";
|
|||
|
||||
option cc_generic_services = true;
|
||||
|
||||
package mesytec.mnode.vme;
|
||||
package mesytec.mnode.proto.vme;
|
||||
|
||||
enum DataWidth
|
||||
{
|
||||
|
@ -38,7 +38,7 @@ message ReadRequest
|
|||
{
|
||||
uint32 address = 1;
|
||||
AddressModifier amod = 2;
|
||||
DataWidth width = 3;
|
||||
DataWidth data_width = 3;
|
||||
ReadRequestOptions options = 4;
|
||||
}
|
||||
|
||||
|
@ -47,7 +47,7 @@ message WriteRequest
|
|||
uint32 address = 1;
|
||||
uint32 value = 2;
|
||||
AddressModifier amod = 3;
|
||||
DataWidth width = 4;
|
||||
DataWidth data_width = 4;
|
||||
}
|
||||
|
||||
enum ResponseFlags
|
||||
|
@ -60,7 +60,7 @@ enum ResponseFlags
|
|||
|
||||
message ReadResponse
|
||||
{
|
||||
repeated uint32 values = 1;
|
||||
repeated uint32 value = 1;
|
||||
optional ResponseFlags flags = 2;
|
||||
optional google.rpc.Status status = 3;
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ add_mnode_proto_dev_executable(mnode_proto_test1)
|
|||
add_mnode_proto_dev_executable(mnode_proto_ping_client)
|
||||
add_mnode_proto_dev_executable(mnode_proto_ping_server)
|
||||
add_mnode_proto_dev_executable(mnode_proto_rpc_ping_server)
|
||||
add_mnode_proto_dev_executable(mnode_proto_rpc_ping_client)
|
||||
|
||||
#add_subdirectory(qt)
|
||||
|
||||
|
|
|
@ -47,8 +47,8 @@ class PingClient: public nng::AsyncReqWork
|
|||
}
|
||||
|
||||
private:
|
||||
Ping ping_;
|
||||
Pong pong_;
|
||||
proto::Ping ping_;
|
||||
proto::Pong pong_;
|
||||
size_t sequence_number = 1;
|
||||
static std::atomic<size_t> next_peer_id;
|
||||
size_t peer_id = next_peer_id++;
|
||||
|
|
|
@ -33,8 +33,8 @@ class PingServer: public nng::AsyncRepWork
|
|||
}
|
||||
|
||||
private:
|
||||
Ping ping_;
|
||||
Pong pong_;
|
||||
proto::Ping ping_;
|
||||
proto::Pong pong_;
|
||||
static std::atomic<size_t> next_peer_id;
|
||||
size_t peer_id = next_peer_id++;
|
||||
size_t num_transactions = 0;
|
||||
|
|
172
src/mnode_proto_rpc_ping_client.cc
Normal file
172
src/mnode_proto_rpc_ping_client.cc
Normal file
|
@ -0,0 +1,172 @@
|
|||
#include <memory>
|
||||
|
||||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||
#include <mesytec-mvlc/util/signal_handling.h>
|
||||
#include <mesytec-mnode/mnode_nng_async.h>
|
||||
#include <mesytec-mnode/mnode_cpp_types.h>
|
||||
#include "internal/argh.h"
|
||||
#include "proto/google/rpc/code.pb.h"
|
||||
#include "proto/mrpc.pb.h"
|
||||
#include "proto/mvlc.pb.h"
|
||||
#include "proto/service.pb.h"
|
||||
|
||||
using namespace mesytec;
|
||||
using namespace std::literals;
|
||||
using namespace mesytec::mnode::cpp_types;
|
||||
|
||||
class MVLCService: public mnode::proto::mvlc::MVLCService
|
||||
{
|
||||
public:
|
||||
explicit MVLCService(mesytec::mvlc::MVLC mvlc)
|
||||
: mvlc_(mvlc)
|
||||
{ }
|
||||
|
||||
void ReadRegister(::google::protobuf::RpcController *controller,
|
||||
const mnode::proto::mvlc::ReadRegisterRequest *request,
|
||||
mnode::proto::mvlc::ReadRegisterResponse *response,
|
||||
::google::protobuf::Closure *done) override
|
||||
{
|
||||
spdlog::info("MVLCService::ReadRegister");
|
||||
u32 value;
|
||||
if (auto ec = mvlc_.readRegister(request->address(), value))
|
||||
{
|
||||
response->mutable_status()->set_code(ec.value());
|
||||
response->mutable_status()->set_message(ec.message());
|
||||
controller->SetFailed(ec.message());
|
||||
}
|
||||
else
|
||||
{
|
||||
response->set_value(value);
|
||||
}
|
||||
|
||||
done->Run();
|
||||
}
|
||||
|
||||
private:
|
||||
mesytec::mvlc::MVLC mvlc_;
|
||||
};
|
||||
|
||||
void print_service_descriptors(google::protobuf::Service &service)
|
||||
{
|
||||
auto sd = service.GetDescriptor();
|
||||
spdlog::info("service: full_name={}, index={}, method_count={}",
|
||||
sd->full_name(), sd->index(), sd->method_count());
|
||||
|
||||
for (int i = 0; i < sd->method_count(); ++i)
|
||||
{
|
||||
auto md = sd->method(i);
|
||||
auto it = md->input_type();
|
||||
auto ot = md->output_type();
|
||||
spdlog::info(" method: full_name={}, index={}, input_type={}, output_type={}",
|
||||
md->full_name(), md->index(), it->full_name(), ot->full_name());
|
||||
}
|
||||
}
|
||||
|
||||
struct RPCClosure: public google::protobuf::Closure
|
||||
{
|
||||
void Run() override { }
|
||||
};
|
||||
|
||||
struct RPCController: public google::protobuf::RpcController
|
||||
{
|
||||
void Reset() override { }
|
||||
bool Failed() const override { return false; }
|
||||
std::string ErrorText() const override { return ""; }
|
||||
void StartCancel() override { }
|
||||
void SetFailed(const std::string &reason) override { }
|
||||
bool IsCanceled() const override { return false; }
|
||||
void NotifyOnCancel(google::protobuf::Closure *callback) override { }
|
||||
};
|
||||
|
||||
struct RpcChannel: public google::protobuf::RpcChannel
|
||||
{
|
||||
explicit RpcChannel(nng_socket socket)
|
||||
: socket_(socket)
|
||||
{
|
||||
}
|
||||
|
||||
int send_request(const mnode::proto::mrpc::MethodCall &methodCall)
|
||||
{
|
||||
auto methodCallSize = methodCall.ByteSizeLong();
|
||||
auto msg = mnode::nng::allocate_message(sizeof(u32) + methodCallSize);
|
||||
*reinterpret_cast<u32 *>(nng_msg_body(msg.get())) = methodCallSize;
|
||||
if (!methodCall.SerializeToArray(reinterpret_cast<u8 *>(nng_msg_body(msg.get())) + sizeof(u32), methodCallSize))
|
||||
{
|
||||
spdlog::error("methodCall.SerializeToArray failed");
|
||||
}
|
||||
spdlog::info("sending request: nng_msg_len={}, methodCallSize={}", nng_msg_len(msg.get()), methodCallSize);
|
||||
return mnode::nng::send_message(socket_, msg);
|
||||
}
|
||||
|
||||
void CallMethod(const google::protobuf::MethodDescriptor *method,
|
||||
google::protobuf::RpcController *controller,
|
||||
const google::protobuf::Message *request,
|
||||
google::protobuf::Message *response,
|
||||
google::protobuf::Closure *done) override
|
||||
{
|
||||
spdlog::info("RpcChannel::CallMethod: method={}, request={}, response={}",
|
||||
method->full_name(), request->ShortDebugString(), response->ShortDebugString());
|
||||
|
||||
methodCall_.set_service(method->service()->full_name());
|
||||
methodCall_.set_method(method->full_name());
|
||||
methodCall_.mutable_request()->PackFrom(*request);
|
||||
|
||||
if (auto nng_res = send_request(methodCall_))
|
||||
{
|
||||
spdlog::error("nng_sendmsg: {}", nng_strerror(nng_res));
|
||||
return;
|
||||
}
|
||||
|
||||
auto [nng_response, nng_res] = mnode::nng::receive_message(socket_);
|
||||
|
||||
if (nng_res)
|
||||
{
|
||||
spdlog::error("nng_recvmsg: {}", nng_strerror(nng_res));
|
||||
return;
|
||||
}
|
||||
else if (nng_res == NNG_ETIMEDOUT)
|
||||
{
|
||||
spdlog::trace("nng_recvmsg: NNG_ETIMEDOUT");
|
||||
return;
|
||||
}
|
||||
|
||||
auto callResponseSize = mnode::nng::msg_trim_read<u32>(nng_response.get());
|
||||
}
|
||||
|
||||
nng_socket socket_;
|
||||
mnode::proto::mrpc::MethodCall methodCall_;
|
||||
mnode::proto::mrpc::MethodCallResponse callResponse_;
|
||||
|
||||
};
|
||||
|
||||
int main()
|
||||
{
|
||||
namespace nng = mnode::nng;
|
||||
spdlog::set_level(spdlog::level::debug);
|
||||
auto socket = nng::make_req_socket();
|
||||
|
||||
if (int res = nng_dial(socket, "tcp://localhost:5555", nullptr, NNG_FLAG_NONBLOCK))
|
||||
{
|
||||
nng::mnode_nng_error("nng_dial", res);
|
||||
return res;
|
||||
}
|
||||
|
||||
RPCController rpcController;
|
||||
RPCClosure done;
|
||||
RpcChannel rpcChannel(socket);
|
||||
mnode::proto::mvlc::MVLCService::Stub mvlcService(&rpcChannel);
|
||||
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
mnode::proto::vme::ReadRequest request;
|
||||
request.set_address(0xffff600e);
|
||||
request.mutable_amod()->set_value(mesytec::mvlc::vme_amods::A32);
|
||||
request.set_data_width(mnode::proto::vme::DataWidth::D32);
|
||||
mnode::proto::vme::ReadResponse response;
|
||||
spdlog::info("request: {}", request.ShortDebugString());
|
||||
mvlcService.VMERead(&rpcController, &request, &response, &done);
|
||||
spdlog::info("response: {}", response.ShortDebugString());
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -2,19 +2,88 @@
|
|||
|
||||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||
#include <mesytec-mvlc/util/signal_handling.h>
|
||||
#include "proto/service.pb.h"
|
||||
#include "proto/mvlc.pb.h"
|
||||
#include <mesytec-mnode/mnode_nng_async.h>
|
||||
#include <mesytec-mnode/mnode_cpp_types.h>
|
||||
#include "internal/argh.h"
|
||||
#include "proto/google/rpc/code.pb.h"
|
||||
#include "proto/mrpc.pb.h"
|
||||
#include "proto/mvlc.pb.h"
|
||||
#include "proto/service.pb.h"
|
||||
|
||||
using namespace mesytec;
|
||||
using namespace std::literals;
|
||||
using namespace mesytec::mnode::cpp_types;
|
||||
|
||||
class MVLCService: public mnode::proto::mvlc::MVLCService
|
||||
{
|
||||
public:
|
||||
explicit MVLCService(mesytec::mvlc::MVLC mvlc)
|
||||
: mvlc_(mvlc)
|
||||
{
|
||||
}
|
||||
|
||||
void ReadRegister(::google::protobuf::RpcController *controller,
|
||||
const mnode::proto::mvlc::ReadRegisterRequest *request,
|
||||
mnode::proto::mvlc::ReadRegisterResponse *response,
|
||||
::google::protobuf::Closure *done) override
|
||||
{
|
||||
spdlog::info("MVLCService::ReadRegister");
|
||||
u32 value;
|
||||
if (auto ec = mvlc_.readRegister(request->address(), value))
|
||||
{
|
||||
response->mutable_status()->set_code(ec.value());
|
||||
response->mutable_status()->set_message(ec.message());
|
||||
controller->SetFailed(ec.message());
|
||||
}
|
||||
else
|
||||
{
|
||||
response->set_value(value);
|
||||
}
|
||||
|
||||
done->Run();
|
||||
}
|
||||
|
||||
void VMERead(::google::protobuf::RpcController *controller,
|
||||
const mnode::proto::vme::ReadRequest *request,
|
||||
mnode::proto::vme::ReadResponse *response,
|
||||
::google::protobuf::Closure *done) override
|
||||
{
|
||||
spdlog::info("MVLCService::VMERead");
|
||||
|
||||
auto amod = request->amod().value();
|
||||
auto data_width = static_cast<mesytec::mvlc::VMEDataWidth>(request->data_width());
|
||||
u16 maxTransfers = request->options().max_transfers();
|
||||
bool fifo = request->options().fifo_read();
|
||||
std::error_code ec;
|
||||
|
||||
if (mesytec::mvlc::vme_amods::is_block_mode(amod))
|
||||
{
|
||||
std::vector<u32> dest;
|
||||
ec = mvlc_.vmeBlockRead(request->address(), amod, maxTransfers, dest, fifo);
|
||||
for (auto d : dest)
|
||||
{
|
||||
response->add_value(d);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
u32 dest = 0;
|
||||
ec = mvlc_.vmeRead(request->address(), dest, amod, data_width);
|
||||
response->add_value(dest);
|
||||
}
|
||||
|
||||
done->Run();
|
||||
}
|
||||
|
||||
private:
|
||||
mesytec::mvlc::MVLC mvlc_;
|
||||
};
|
||||
|
||||
void print_service_descriptors(google::protobuf::Service &service)
|
||||
{
|
||||
auto sd = service.GetDescriptor();
|
||||
spdlog::info("service: full_name={}, index={}, method_count={}",
|
||||
sd->full_name(), sd->index(), sd->method_count());
|
||||
spdlog::info("service: full_name={}, index={}, method_count={}", sd->full_name(), sd->index(),
|
||||
sd->method_count());
|
||||
|
||||
for (int i = 0; i < sd->method_count(); ++i)
|
||||
{
|
||||
|
@ -22,22 +91,201 @@ void print_service_descriptors(google::protobuf::Service &service)
|
|||
auto it = md->input_type();
|
||||
auto ot = md->output_type();
|
||||
spdlog::info(" method: full_name={}, index={}, input_type={}, output_type={}",
|
||||
md->full_name(), md->index(), it->full_name(), ot->full_name());
|
||||
md->full_name(), md->index(), it->full_name(), ot->full_name());
|
||||
}
|
||||
}
|
||||
|
||||
struct RPCClosure: public google::protobuf::Closure
|
||||
{
|
||||
void Run() override {}
|
||||
};
|
||||
|
||||
struct RPCController: public google::protobuf::RpcController
|
||||
{
|
||||
void Reset() override
|
||||
{
|
||||
spdlog::debug("RPCController::Reset");
|
||||
}
|
||||
bool Failed() const override { return false; }
|
||||
std::string ErrorText() const override { return ""; }
|
||||
void StartCancel() override {}
|
||||
void SetFailed(const std::string &reason) override
|
||||
{
|
||||
spdlog::error("RPCController::SetFailed: reason={}", reason);
|
||||
}
|
||||
|
||||
bool IsCanceled() const override { return false; }
|
||||
void NotifyOnCancel(google::protobuf::Closure */*callback*/) override {}
|
||||
};
|
||||
|
||||
struct RpcChannel: public google::protobuf::RpcChannel
|
||||
{
|
||||
void CallMethod(const google::protobuf::MethodDescriptor *method,
|
||||
google::protobuf::RpcController *controller,
|
||||
const google::protobuf::Message *request,
|
||||
google::protobuf::Message *response,
|
||||
google::protobuf::Closure *done) override
|
||||
{
|
||||
spdlog::info("RpcChannel::CallMethod: method={}, request={}, response={}",
|
||||
method->full_name(), request->ShortDebugString(), response->GetTypeName());
|
||||
}
|
||||
};
|
||||
|
||||
struct RPCServer
|
||||
{
|
||||
explicit RPCServer(nng_socket socket)
|
||||
: socket_(socket)
|
||||
{
|
||||
}
|
||||
|
||||
void registerService(google::protobuf::Service *service)
|
||||
{
|
||||
auto sd = service->GetDescriptor();
|
||||
services_[sd->full_name()] = service;
|
||||
|
||||
for (int i = 0; i < sd->method_count(); ++i)
|
||||
{
|
||||
auto md = sd->method(i);
|
||||
methods_[md->full_name()] = md;
|
||||
}
|
||||
}
|
||||
|
||||
// request format is u32: size + serialize MethodCall
|
||||
// response format is u32:size + serialized MethodCallResponse
|
||||
|
||||
int send_response(const mnode::proto::mrpc::MethodCallResponse &response)
|
||||
{
|
||||
auto responseSize = response.ByteSizeLong();
|
||||
auto msg = mnode::nng::allocate_reserve_message(sizeof(u32) + responseSize);
|
||||
*reinterpret_cast<u32 *>(nng_msg_body(msg.get())) = responseSize;
|
||||
response.SerializeToArray(reinterpret_cast<u8 *>(nng_msg_body(msg.get())) + sizeof(u32),
|
||||
responseSize);
|
||||
return mnode::nng::send_message(socket_, msg);
|
||||
}
|
||||
|
||||
void call_method(google::protobuf::Service *service,
|
||||
const google::protobuf::MethodDescriptor *method,
|
||||
const google::protobuf::Any &anyRequest,
|
||||
mnode::proto::mrpc::MethodCallResponse &callResponse)
|
||||
{
|
||||
auto request =
|
||||
std::unique_ptr<google::protobuf::Message>(service->GetRequestPrototype(method).New());
|
||||
|
||||
anyRequest.UnpackTo(request.get()); // FIXME: check bool return
|
||||
|
||||
auto response =
|
||||
std::unique_ptr<google::protobuf::Message>(service->GetResponsePrototype(method).New());
|
||||
|
||||
spdlog::debug("call_method: method={}, request={}, responsePrototype={}",
|
||||
method->full_name(), request->ShortDebugString(), response->GetTypeName());
|
||||
|
||||
service->CallMethod(method, &controller_, request.get(), response.get(), &done_);
|
||||
|
||||
spdlog::debug("call_method: response={}", response->ShortDebugString());
|
||||
|
||||
callResponse.mutable_status()->set_code(google::rpc::Code::OK);
|
||||
callResponse.mutable_response()->PackFrom(*response);
|
||||
}
|
||||
|
||||
void run()
|
||||
{
|
||||
mnode::proto::mrpc::MethodCall methodCall_;
|
||||
mnode::proto::mrpc::MethodCallResponse methodCallResponse_;
|
||||
|
||||
while (true)
|
||||
{
|
||||
auto [request, nng_res] = mnode::nng::receive_message(socket_);
|
||||
|
||||
if (nng_res != 0 && nng_res != NNG_ETIMEDOUT)
|
||||
{
|
||||
spdlog::error("nng_recvmsg: {}", nng_strerror(nng_res));
|
||||
return;
|
||||
}
|
||||
else if (nng_res == NNG_ETIMEDOUT)
|
||||
{
|
||||
spdlog::trace("nng_recvmsg: NNG_ETIMEDOUT");
|
||||
continue;
|
||||
}
|
||||
|
||||
spdlog::info("received request: nng_msg_len={}", nng_msg_len(request.get()));
|
||||
|
||||
methodCallResponse_.Clear();
|
||||
|
||||
auto methodCallSize = mnode::nng::msg_trim_read<u32>(request.get());
|
||||
|
||||
if (!methodCallSize)
|
||||
{
|
||||
spdlog::error("msg_trim_read<u32> failed");
|
||||
methodCallResponse_.mutable_status()->set_code(google::rpc::Code::INTERNAL);
|
||||
send_response(methodCallResponse_);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nng_msg_len(request.get()) < *methodCallSize)
|
||||
{
|
||||
spdlog::error("message too short");
|
||||
methodCallResponse_.mutable_status()->set_code(google::rpc::Code::INTERNAL);
|
||||
send_response(methodCallResponse_);
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: check bool return
|
||||
if (!methodCall_.ParseFromArray(nng_msg_body(request.get()), *methodCallSize))
|
||||
{
|
||||
methodCallResponse_.mutable_status()->set_code(google::rpc::Code::INTERNAL);
|
||||
send_response(methodCallResponse_);
|
||||
continue;
|
||||
}
|
||||
|
||||
spdlog::debug("methodCall: service={}, method={}, request={}", methodCall_.service(),
|
||||
methodCall_.method(), methodCall_.request().ShortDebugString());
|
||||
|
||||
auto serviceIt = services_.find(methodCall_.service());
|
||||
|
||||
if (serviceIt == services_.end())
|
||||
{
|
||||
spdlog::error("service not found: {}", methodCall_.service());
|
||||
methodCallResponse_.mutable_status()->set_code(google::rpc::Code::NOT_FOUND);
|
||||
send_response(methodCallResponse_);
|
||||
continue;
|
||||
}
|
||||
|
||||
auto methodIt = methods_.find(methodCall_.method());
|
||||
|
||||
if (methodIt == methods_.end())
|
||||
{
|
||||
spdlog::error("method not found: {}", methodCall_.method());
|
||||
continue;
|
||||
methodCallResponse_.mutable_status()->set_code(google::rpc::Code::NOT_FOUND);
|
||||
send_response(methodCallResponse_);
|
||||
}
|
||||
|
||||
call_method(serviceIt->second, methodIt->second, methodCall_.request(),
|
||||
methodCallResponse_);
|
||||
}
|
||||
}
|
||||
|
||||
nng_socket socket_;
|
||||
std::unordered_map<std::string, google::protobuf::Service *> services_;
|
||||
std::unordered_map<std::string, const google::protobuf::MethodDescriptor *> methods_;
|
||||
RPCClosure done_;
|
||||
RPCController controller_;
|
||||
};
|
||||
|
||||
int main()
|
||||
{
|
||||
namespace nng = mnode::nng;
|
||||
spdlog::set_level(spdlog::level::debug);
|
||||
|
||||
mnode::PingService_Stub pingService(nullptr);
|
||||
mnode::proto::PingService_Stub pingService(nullptr);
|
||||
print_service_descriptors(pingService);
|
||||
|
||||
mnode::vme::VmeAccess_Stub vmeService(nullptr);
|
||||
mnode::proto::vme::VmeAccess_Stub vmeService(nullptr);
|
||||
print_service_descriptors(vmeService);
|
||||
|
||||
mnode::mvlc::MVLC_Stub mvlcService(nullptr);
|
||||
auto mvlc = mesytec::mvlc::make_mvlc("mvlc-0124");
|
||||
|
||||
MVLCService mvlcService(mvlc);
|
||||
print_service_descriptors(mvlcService);
|
||||
|
||||
auto socket = nng::make_rep_socket();
|
||||
|
@ -48,6 +296,9 @@ int main()
|
|||
return res;
|
||||
}
|
||||
|
||||
RPCServer rpcServer(socket);
|
||||
rpcServer.registerService(&mvlcService);
|
||||
rpcServer.run();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -10,8 +10,8 @@ void requester(nng_socket socket, unsigned id)
|
|||
{
|
||||
mvlc::util::Stopwatch sw;
|
||||
std::uint32_t seq = 0;
|
||||
mnode::Ping ping;
|
||||
mnode::Pong pong;
|
||||
mnode::proto::Ping ping;
|
||||
mnode::proto::Pong pong;
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
@ -48,8 +48,8 @@ void requester(nng_socket socket, unsigned id)
|
|||
void responder(nng_socket socket, unsigned id)
|
||||
{
|
||||
mvlc::util::Stopwatch sw;
|
||||
mnode::Ping ping;
|
||||
mnode::Pong pong;
|
||||
mnode::proto::Ping ping;
|
||||
mnode::proto::Pong pong;
|
||||
|
||||
while (true)
|
||||
{
|
||||
|
|
Loading…
Reference in a new issue