diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..458dedc --- /dev/null +++ b/.clang-format @@ -0,0 +1,224 @@ +--- +Language: Cpp +# BasedOnStyle: LLVM +AccessModifierOffset: -2 +AlignAfterOpenBracket: Align +AlignArrayOfStructures: None +AlignConsecutiveAssignments: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + PadOperators: true +AlignConsecutiveBitFields: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + PadOperators: false +AlignConsecutiveDeclarations: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + PadOperators: false +AlignConsecutiveMacros: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + PadOperators: false +AlignEscapedNewlines: Right +AlignOperands: Align +AlignTrailingComments: + Kind: Always + OverEmptyLines: 0 +AllowAllArgumentsOnNextLine: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortBlocksOnASingleLine: Always +AllowShortCaseLabelsOnASingleLine: false +AllowShortEnumsOnASingleLine: true +AllowShortFunctionsOnASingleLine: All +AllowShortIfStatementsOnASingleLine: Never +AllowShortLambdasOnASingleLine: All +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: false +AlwaysBreakTemplateDeclarations: MultiLine +AttributeMacros: + - __capability +BinPackArguments: true +BinPackParameters: true +BitFieldColonSpacing: Both +BraceWrapping: + AfterCaseLabel: true + AfterClass: true + AfterControlStatement: Always + AfterEnum: true + AfterExternBlock: true + AfterFunction: true + AfterNamespace: true + AfterObjCDeclaration: true + AfterStruct: true + AfterUnion: true + BeforeCatch: true + BeforeElse: true + BeforeLambdaBody: true + BeforeWhile: true + IndentBraces: false + SplitEmptyFunction: true + SplitEmptyRecord: true + SplitEmptyNamespace: true +BreakAfterAttributes: Never +BreakAfterJavaFieldAnnotations: false +BreakArrays: true +BreakBeforeBinaryOperators: None +BreakBeforeConceptDeclarations: Always +BreakBeforeBraces: Custom +BreakBeforeInlineASMColon: OnlyMultiline +BreakBeforeTernaryOperators: true +BreakConstructorInitializers: BeforeColon +BreakInheritanceList: BeforeColon +BreakStringLiterals: true +ColumnLimit: 80 +CommentPragmas: '^ IWYU pragma:' +CompactNamespaces: false +ConstructorInitializerIndentWidth: 4 +ContinuationIndentWidth: 4 +Cpp11BracedListStyle: true +DerivePointerAlignment: false +DisableFormat: false +EmptyLineAfterAccessModifier: Never +EmptyLineBeforeAccessModifier: LogicalBlock +ExperimentalAutoDetectBinPacking: false +FixNamespaceComments: true +ForEachMacros: + - foreach + - Q_FOREACH + - BOOST_FOREACH +IfMacros: + - KJ_IF_MAYBE +IncludeBlocks: Preserve +IncludeCategories: + - Regex: '^"(llvm|llvm-c|clang|clang-c)/' + Priority: 2 + SortPriority: 0 + CaseSensitive: false + - Regex: '^(<|"(gtest|gmock|isl|json)/)' + Priority: 3 + SortPriority: 0 + CaseSensitive: false + - Regex: '.*' + Priority: 1 + SortPriority: 0 + CaseSensitive: false +IncludeIsMainRegex: '(Test)?$' +IncludeIsMainSourceRegex: '' +IndentAccessModifiers: false +IndentCaseBlocks: false +IndentCaseLabels: false +IndentExternBlock: AfterExternBlock +IndentGotoLabels: true +IndentPPDirectives: None +IndentRequiresClause: true +IndentWidth: 4 +IndentWrappedFunctionNames: false +InsertBraces: false +InsertNewlineAtEOF: false +InsertTrailingCommas: None +IntegerLiteralSeparator: + Binary: 0 + BinaryMinDigits: 0 + Decimal: 0 + DecimalMinDigits: 0 + Hex: 0 + HexMinDigits: 0 +JavaScriptQuotes: Leave +JavaScriptWrapImports: true +KeepEmptyLinesAtTheStartOfBlocks: true +LambdaBodyIndentation: Signature +LineEnding: DeriveLF +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCBinPackProtocolList: Auto +ObjCBlockIndentWidth: 2 +ObjCBreakBeforeNestedBlockParam: true +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: true +PackConstructorInitializers: BinPack +PenaltyBreakAssignment: 2 +PenaltyBreakBeforeFirstCallParameter: 19 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakOpenParenthesis: 0 +PenaltyBreakString: 1000 +PenaltyBreakTemplateDeclaration: 10 +PenaltyExcessCharacter: 1000000 +PenaltyIndentedWhitespace: 0 +PenaltyReturnTypeOnItsOwnLine: 60 +PointerAlignment: Right +PPIndentWidth: -1 +QualifierAlignment: Leave +ReferenceAlignment: Pointer +ReflowComments: true +RemoveBracesLLVM: false +RemoveSemicolon: false +RequiresClausePosition: OwnLine +RequiresExpressionIndentation: OuterScope +SeparateDefinitionBlocks: Leave +ShortNamespaceLines: 1 +SortIncludes: Never +SortJavaStaticImport: Before +SortUsingDeclarations: LexicographicNumeric +SpaceAfterCStyleCast: false +SpaceAfterLogicalNot: false +SpaceAfterTemplateKeyword: true +SpaceAroundPointerQualifiers: Default +SpaceBeforeAssignmentOperators: true +SpaceBeforeCaseColon: false +SpaceBeforeCpp11BracedList: false +SpaceBeforeCtorInitializerColon: true +SpaceBeforeInheritanceColon: false +SpaceBeforeParens: ControlStatements +SpaceBeforeParensOptions: + AfterControlStatements: true + AfterForeachMacros: true + AfterFunctionDefinitionName: false + AfterFunctionDeclarationName: false + AfterIfMacros: true + AfterOverloadedOperator: false + AfterRequiresInClause: false + AfterRequiresInExpression: false + BeforeNonEmptyParentheses: false +SpaceBeforeRangeBasedForLoopColon: true +SpaceBeforeSquareBrackets: false +SpaceInEmptyBlock: false +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 1 +SpacesInAngles: Never +SpacesInConditionalStatement: false +SpacesInContainerLiterals: true +SpacesInCStyleCastParentheses: false +SpacesInLineCommentPrefix: + Minimum: 1 + Maximum: -1 +SpacesInParentheses: false +SpacesInSquareBrackets: false +Standard: Latest +StatementAttributeLikeMacros: + - Q_EMIT +StatementMacros: + - Q_UNUSED + - QT_REQUIRE_VERSION +TabWidth: 8 +UseTab: Never +WhitespaceSensitiveMacros: + - BOOST_PP_STRINGIZE + - CF_SWIFT_NAME + - NS_SWIFT_NAME + - PP_STRINGIZE + - STRINGIZE +... diff --git a/external/mesytec-mvlc b/external/mesytec-mvlc index b77701f..31606b3 160000 --- a/external/mesytec-mvlc +++ b/external/mesytec-mvlc @@ -1 +1 @@ -Subproject commit b77701fb3904fd46da95de89336af702a900b541 +Subproject commit 31606b39eab4b3581633d6693329082328ff7c62 diff --git a/external/spdlog b/external/spdlog index 8e56133..24dde31 160000 --- a/external/spdlog +++ b/external/spdlog @@ -1 +1 @@ -Subproject commit 8e5613379f5140fefb0b60412fbf1f5406e7c7f8 +Subproject commit 24dde318fe034f0c7e809daa5a6e81dc85e1155a diff --git a/include/mesytec-node/mesytec_node_nng.h b/include/mesytec-node/mesytec_node_nng.h index 106676d..02e40f5 100644 --- a/include/mesytec-node/mesytec_node_nng.h +++ b/include/mesytec-node/mesytec_node_nng.h @@ -2,6 +2,14 @@ #define B18E3651_CA9A_43BC_AA25_810EA16533CD #include +#include +#include +#include +#include +#include +#include +#include + #include #include @@ -70,7 +78,7 @@ inline size_t allocated_free_space(nng_msg *msg) return capacity - used; } -static nng_duration DefaultTimeout = 16; +static nng_duration DefaultTimeout = 250; inline int set_socket_timeouts(nng_socket socket, nng_duration timeout = DefaultTimeout) { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index bc7a47c..0429290 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -34,6 +34,10 @@ function(add_node_dev_executable name) target_compile_options(${name} PRIVATE ${MVLC_NNG_NODE_WARN_FLAGS}) endfunction() +function(add_node_proto_dev_executable name) + add_node_dev_executable(${name}) + target_link_libraries(${name} PRIVATE mnode-proto mesytec-mvlc) +endfunction() add_node_dev_executable(pair_producer) add_node_dev_executable(pair_consumer) @@ -43,9 +47,10 @@ add_node_dev_executable(mesy_nng_pipeline_main) add_node_dev_executable(mesy_nng_push_pull_main) add_node_dev_executable(mesy_nng_pub_producer) add_node_dev_executable(mesy_nng_sub_consumer) -add_node_dev_executable(mnode_proto_test1) -target_sources(mnode_proto_test1 PRIVATE thread_name.cc) -target_link_libraries(mnode_proto_test1 PRIVATE mnode-proto mesytec-mvlc) + +add_node_proto_dev_executable(mnode_proto_test1) +add_node_proto_dev_executable(mnode_proto_ping_client) +add_node_proto_dev_executable(mnode_proto_ping_server) #add_subdirectory(qt) diff --git a/src/mesytec_node_nng.cc b/src/mesytec_node_nng.cc index a20179a..8bedd41 100644 --- a/src/mesytec_node_nng.cc +++ b/src/mesytec_node_nng.cc @@ -1,13 +1,5 @@ #include "mesytec-node/mesytec_node_nng.h" -#include -#include -#include -#include -#include -#include -#include - namespace mesytec::nng { diff --git a/src/mnode_proto_ping_client.cc b/src/mnode_proto_ping_client.cc new file mode 100644 index 0000000..16c52f3 --- /dev/null +++ b/src/mnode_proto_ping_client.cc @@ -0,0 +1,151 @@ +#include + +#include +#include +#include "proto/service.pb.h" +#include + +using namespace mesytec; +using namespace std::literals; + +void client_cb(void *arg); + +class Work +{ + public: + explicit Work(nng_socket socket) + : socket(socket) + { + } + + ~Work() + { + nng_ctx_close(ctx); + nng_aio_free(aio); + } + + void work() + { + switch (state) + { + case INIT: + nng_aio_alloc(&aio, client_cb, this); + nng_ctx_open(&ctx, socket); + + state = SEND; + nng_aio_set_msg(aio, make_request().release()); + nng_ctx_send(ctx, aio); + break; + + case SEND: + if (auto rv = nng_aio_result(aio)) + { + nng::mesy_nng_error("nng_ctx_send", rv); + nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); + state = SEND; + nng_aio_set_msg(aio, make_request().release()); + nng_ctx_send(ctx, aio); + } + else + { + state = RECEIVE; + nng_ctx_recv(ctx, aio); + } + break; + + case RECEIVE: + if (auto rv = nng_aio_result(aio)) + { + nng::mesy_nng_error("nng_ctx_recv", rv); + state = SEND; + nng_aio_set_msg(aio, make_request().release()); + nng_ctx_send(ctx, aio); + } + else + { + auto reply = nng::make_unique_msg(nng_aio_get_msg(aio)); + handle_reply(std::move(reply)); + state = SEND; + nng_aio_set_msg(aio, make_request().release()); + nng_ctx_send(ctx, aio); + } + + break; + } + + } + + nng::unique_msg make_request() + { + mnode::Ping ping; + ping.set_peer_id(42); + ping.set_sequence_number(++sequence_number); + return nng::make_message(ping.SerializeAsString()); + } + + void handle_reply(nng::unique_msg &&msg) + { + mnode::Pong pong; + pong.ParseFromArray(nng_msg_body(msg.get()), nng_msg_len(msg.get())); + if (pong.peer_id() != 42 || pong.sequence_number() != sequence_number) + { + spdlog::error("received pong with unexpected values: {}", pong.ShortDebugString()); + } + } + + void report() + { + spdlog::info("Work: state={}, sequence_number={}", static_cast(state), sequence_number); + } + + private: + enum State + { + INIT, // allocate aio, open ctx, send first request + SEND, // send request + RECEIVE, // receive response + }; + + mnode::Ping ping; + mnode::Pong pong; + size_t sequence_number = 0; + State state = INIT; + nng_socket socket; + nng_aio *aio = nullptr; + nng_ctx ctx; +}; + +void client_cb(void *arg) +{ + auto work = static_cast(arg); + work->work(); +} + +int main() +{ + spdlog::set_level(spdlog::level::info); + + auto socket = nng::make_req_socket(); + + if (int res = nng_dial(socket, "tcp://localhost:5555", nullptr, 0)) + { + nng::mesy_nng_error("nng_dial", res); + return res; + } + + mvlc::util::Stopwatch sw; + Work work(socket); + work.work(); + + for (;;) + { + nng_msleep(100); + if (sw.get_interval() >= 1s) + { + work.report(); + sw.interval(); + } + } + + return 0; +} diff --git a/src/mnode_proto_ping_server.cc b/src/mnode_proto_ping_server.cc new file mode 100644 index 0000000..fd75ff6 --- /dev/null +++ b/src/mnode_proto_ping_server.cc @@ -0,0 +1,139 @@ +#include + +#include +#include +#include "proto/service.pb.h" +#include + +using namespace mesytec; +using namespace std::literals; + +void server_cb(void *arg); + +class Work +{ + public: + explicit Work(nng_socket socket) + : socket(socket) + { + recvErrors_.fill(0); + sendErrors_.fill(0); + } + + ~Work() + { + nng_ctx_close(ctx); + nng_aio_free(aio); + } + + void work() + { + switch (state) + { + case INIT: + nng_aio_alloc(&aio, server_cb, this); + nng_ctx_open(&ctx, socket); + state = RECEIVE; + nng_ctx_recv(ctx, aio); + break; + + case RECEIVE: + if (auto rv = nng_aio_result(aio)) + { + if (rv != NNG_ETIMEDOUT) + nng::mesy_nng_error("nng_ctx_recv", rv); + if (rv < static_cast(recvErrors_.size())) + ++recvErrors_[rv]; + state = RECEIVE; + nng_ctx_recv(ctx, aio); + break; + } + else + { + auto reply = handle_request(nng::make_unique_msg(nng_aio_get_msg(aio))); + nng_aio_set_msg(aio, reply.release()); + state = SEND; + nng_ctx_send(ctx, aio); + } + break; + + case SEND: + if (auto rv = nng_aio_result(aio)) + { + nng::mesy_nng_error("nng_ctx_send", rv); + if (rv < static_cast(sendErrors_.size())) + ++sendErrors_[rv]; + nng::make_unique_msg(nng_aio_get_msg(aio)).reset(); + } + state = RECEIVE; + nng_ctx_recv(ctx, aio); + break; + } + } + + nng::unique_msg handle_request(nng::unique_msg &&msg) + { + ping_.ParseFromArray(nng_msg_body(msg.get()), nng_msg_len(msg.get())); + pong_.set_peer_id(ping_.peer_id()); + pong_.set_sequence_number(ping_.sequence_number()); + sequence_number = ping_.sequence_number(); + return nng::make_message(ping_.SerializeAsString()); + } + + void report() + { + spdlog::info("Work: state={}, sequence_number={}", static_cast(state), sequence_number); + } + + private: + enum State + { + INIT, // allocate aio, open ctx, receive first request + RECEIVE, // receive request + SEND, // send response + }; + + mnode::Ping ping_; + mnode::Pong pong_; + std::uint32_t sequence_number = 0; + State state = INIT; + nng_socket socket; + nng_aio *aio = nullptr; + nng_ctx ctx; + std::array recvErrors_; + std::array sendErrors_; + +}; + +void server_cb(void *arg) +{ + auto work = static_cast(arg); + work->work(); +} + +int main() +{ + spdlog::set_level(spdlog::level::debug); + + auto socket = nng::make_rep_socket(); + + if (int res = nng_listen(socket, "tcp://localhost:5555", nullptr, 0)) + { + nng::mesy_nng_error("nng_listen", res); + return res; + } + + mvlc::util::Stopwatch sw; + Work work(socket); + work.work(); + + for (;;) + { + nng_msleep(100); + if (sw.get_interval() >= 1s) + { + work.report(); + sw.interval(); + } + } +} diff --git a/src/mnode_proto_test1.cc b/src/mnode_proto_test1.cc index cab5b50..c829352 100644 --- a/src/mnode_proto_test1.cc +++ b/src/mnode_proto_test1.cc @@ -83,14 +83,6 @@ void responder(nng_socket socket, unsigned id) } } -void client_aio(nng_socket socket, unsigned id) -{ -} - -void server_aio(nng_socket socket, unsigned id) -{ -} - int main() { spdlog::set_level(spdlog::level::info); @@ -98,8 +90,6 @@ int main() unsigned peerId = 0; std::vector threads; - #if 1 - { auto reqSocket = nng::make_req_socket(); auto repSocket = nng::make_rep_socket(); @@ -108,21 +98,6 @@ int main() threads.emplace_back(std::thread(requester, reqSocket, peerId++)); threads.emplace_back(std::thread(responder, repSocket, peerId++)); - } - #endif - - #if 1 - { - auto reqSocket = nng::make_req_socket(); - auto repSocket = nng::make_rep_socket(); - - if (int rc = nng::marry_listen_dial(reqSocket, repSocket, fmt::format("inproc://test{}", peerId).c_str())) - return rc; - - threads.emplace_back(std::thread(client_aio, reqSocket, peerId++)); - threads.emplace_back(std::thread(server_aio, repSocket, peerId++)); - } - #endif for (auto &t: threads) if (t.joinable()) t.join();