Compare commits
10 commits
edf0d291a9
...
ebd823ae4e
Author | SHA1 | Date | |
---|---|---|---|
|
ebd823ae4e | ||
|
ead9c0ee2e | ||
|
9e5b79cb34 | ||
|
e38ada8854 | ||
|
76d85e5c39 | ||
|
0eaa5aa942 | ||
|
9bdd79e910 | ||
|
76b22c1434 | ||
|
13c73cfdfc | ||
|
6f7102549e |
16 changed files with 716 additions and 128 deletions
|
@ -170,7 +170,7 @@ RequiresClausePosition: OwnLine
|
|||
RequiresExpressionIndentation: OuterScope
|
||||
SeparateDefinitionBlocks: Leave
|
||||
ShortNamespaceLines: 1
|
||||
SortIncludes: Never
|
||||
SortIncludes: CaseSensitive
|
||||
SortJavaStaticImport: Before
|
||||
SortUsingDeclarations: LexicographicNumeric
|
||||
SpaceAfterCStyleCast: false
|
||||
|
|
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
|
@ -11,7 +11,7 @@
|
|||
"program": "${workspaceFolder}/build/mana_auto_replay",
|
||||
"args": [
|
||||
"~/Documents/is690-9Li_3H_run094_241020_124254_part001.zip",
|
||||
"--plugin=src/libmana-plugin-c-test.so"
|
||||
"--plugin=src/libmana-plugin-root-histogram.so"
|
||||
],
|
||||
"stopAtEntry": false,
|
||||
"cwd": "${workspaceFolder}/build",
|
||||
|
|
|
@ -20,7 +20,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
|
|||
option(MNODE_BUILD_TESTS "Build mnode test binaries" ${MESYTEC_MNODE_MAIN_PROJECT})
|
||||
|
||||
if (MNODE_BUILD_TESTS)
|
||||
find_package(GTest CONFIG)
|
||||
find_package(GTest CONFIG QUIET)
|
||||
if (NOT GTest_FOUND)
|
||||
include(FetchContent)
|
||||
FetchContent_Declare(googletest URL https://github.com/google/googletest/archive/refs/tags/v1.15.2.zip)
|
||||
|
@ -34,6 +34,14 @@ if (MNODE_BUILD_TESTS)
|
|||
enable_testing()
|
||||
endif()
|
||||
|
||||
include(FetchContent)
|
||||
FetchContent_Declare(pybind11 GIT_REPOSITORY https://github.com/pybind/pybind11 GIT_TAG v2.13.6)
|
||||
FetchContent_GetProperties(pybind11)
|
||||
if(NOT pybind11_POPULATED)
|
||||
FetchContent_Populate(pybind11)
|
||||
add_subdirectory(${pybind11_SOURCE_DIR} ${pybind11_BINARY_DIR})
|
||||
endif()
|
||||
|
||||
include(CMakeRC)
|
||||
cmrc_add_resource_library(mnode-resources
|
||||
ALIAS mnode::resources NAMESPACE mnode::resources
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#ifndef C536E080_25D7_476D_B8E3_25912B9CFC3B
|
||||
#define C536E080_25D7_476D_B8E3_25912B9CFC3B
|
||||
|
||||
#include <cassert>
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
|
||||
|
@ -21,6 +22,6 @@ inline double make_quiet_nan()
|
|||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
} // namespace mesytec::mnode
|
||||
|
||||
#endif /* C536E080_25D7_476D_B8E3_25912B9CFC3B */
|
||||
|
|
|
@ -7,8 +7,8 @@
|
|||
#include <nng/protocol/pipeline0/push.h>
|
||||
#include <nng/protocol/pubsub0/pub.h>
|
||||
#include <nng/protocol/pubsub0/sub.h>
|
||||
#include <nng/protocol/reqrep0/req.h>
|
||||
#include <nng/protocol/reqrep0/rep.h>
|
||||
#include <nng/protocol/reqrep0/req.h>
|
||||
|
||||
#include <cassert>
|
||||
#include <cstdint>
|
||||
|
@ -93,7 +93,7 @@ inline int set_socket_timeouts(nng_socket socket, nng_duration timeout = Default
|
|||
return 0;
|
||||
}
|
||||
|
||||
using socket_factory = std::function<int (nng_socket *)>;
|
||||
using socket_factory = std::function<int(nng_socket *)>;
|
||||
|
||||
inline nng_socket make_socket(socket_factory factory, nng_duration timeout = DefaultTimeout)
|
||||
{
|
||||
|
@ -159,20 +159,25 @@ inline std::string get_local_address(nng_socket s)
|
|||
void log_pipe_info(nng_pipe p, const char *info_text);
|
||||
|
||||
// 'nng_res' is the result of the last nng function call.
|
||||
using retry_predicate = std::function<bool (int nng_res)>;
|
||||
using retry_predicate = std::function<bool(int nng_res)>;
|
||||
|
||||
int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp, const char *debugInfo = "");
|
||||
int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp,
|
||||
const char *debugInfo = "");
|
||||
|
||||
struct RetryNTimes
|
||||
{
|
||||
explicit RetryNTimes(size_t maxTries = 3) : maxTries(maxTries) {}
|
||||
explicit RetryNTimes(size_t maxTries = 3)
|
||||
: maxTries(maxTries)
|
||||
{
|
||||
}
|
||||
bool operator()(int) { return attempt++ < maxTries; }
|
||||
|
||||
size_t maxTries;
|
||||
size_t attempt = 0u;
|
||||
};
|
||||
|
||||
inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3, const char *debugInfo = "")
|
||||
inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3,
|
||||
const char *debugInfo = "")
|
||||
{
|
||||
return send_message_retry(socket, msg, RetryNTimes(maxTries), debugInfo);
|
||||
}
|
||||
|
@ -185,16 +190,17 @@ inline int send_empty_message(nng_socket socket, size_t maxTries = 3)
|
|||
}
|
||||
|
||||
// Read type T from the front of msg and trim the message by sizeof(T).
|
||||
template<typename T>
|
||||
std::optional<T> msg_trim_read(nng_msg *msg)
|
||||
template <typename T> std::optional<T> msg_trim_read(nng_msg *msg)
|
||||
{
|
||||
const auto oldlen = nng_msg_len(msg); (void) oldlen;
|
||||
const auto oldlen = nng_msg_len(msg);
|
||||
(void)oldlen;
|
||||
if (nng_msg_len(msg) < sizeof(T))
|
||||
return std::nullopt;
|
||||
|
||||
T result = *reinterpret_cast<T *>(nng_msg_body(msg));
|
||||
nng_msg_trim(msg, sizeof(T));
|
||||
const auto newlen = nng_msg_len(msg); (void) newlen;
|
||||
const auto newlen = nng_msg_len(msg);
|
||||
(void)newlen;
|
||||
assert(newlen + sizeof(T) == oldlen);
|
||||
return result;
|
||||
}
|
||||
|
@ -205,8 +211,8 @@ const char *nng_stat_type_to_string(int nng_stat_type);
|
|||
// Important: as of nng-1.8.0 sockets stats are only implemented for pair1 type
|
||||
// sockets!
|
||||
|
||||
template<typename Visitor>
|
||||
void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth=0)
|
||||
template <typename Visitor>
|
||||
void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth = 0)
|
||||
{
|
||||
visitor(stat, depth);
|
||||
|
||||
|
@ -216,7 +222,7 @@ void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth=0)
|
|||
{
|
||||
for (auto child = nng_stat_child(stat); child; child = nng_stat_next(child))
|
||||
{
|
||||
visit_nng_stats(child, visitor, depth+1);
|
||||
visit_nng_stats(child, visitor, depth + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -225,10 +231,7 @@ std::string format_stat(nng_stat *stat);
|
|||
|
||||
using unique_msg = std::unique_ptr<nng_msg, decltype(&nng_msg_free)>;
|
||||
|
||||
inline unique_msg make_unique_msg(nng_msg *msg = nullptr)
|
||||
{
|
||||
return unique_msg(msg, &nng_msg_free);
|
||||
}
|
||||
inline unique_msg make_unique_msg(nng_msg *msg = nullptr) { return unique_msg(msg, &nng_msg_free); }
|
||||
|
||||
inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0)
|
||||
{
|
||||
|
@ -236,10 +239,10 @@ inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0
|
|||
|
||||
if (auto res = nng_recvmsg(sock, &msg, flags))
|
||||
{
|
||||
return { make_unique_msg(), res };
|
||||
return {make_unique_msg(), res};
|
||||
}
|
||||
|
||||
return { make_unique_msg(msg), 0 };
|
||||
return {make_unique_msg(msg), 0};
|
||||
}
|
||||
|
||||
inline int send_message(nng_socket sock, unique_msg &msg, int flags = 0)
|
||||
|
@ -329,11 +332,8 @@ inline unique_msg clone_message(const nng_msg *msg)
|
|||
return make_unique_msg(newMsg);
|
||||
}
|
||||
|
||||
inline unique_msg clone_message(const unique_msg &msg)
|
||||
{
|
||||
return clone_message(msg.get());
|
||||
}
|
||||
inline unique_msg clone_message(const unique_msg &msg) { return clone_message(msg.get()); }
|
||||
|
||||
}
|
||||
} // namespace mesytec::mnode::nng
|
||||
|
||||
#endif /* B18E3651_CA9A_43BC_AA25_810EA16533CD */
|
||||
|
|
|
@ -17,7 +17,7 @@ if (MNODE_BUILD_TESTS)
|
|||
function (add_mnode_gtest name)
|
||||
add_executable(test_${name} ${name}.test.cc)
|
||||
target_link_libraries(test_${name} PRIVATE mesytec-mnode GTest::gtest_main spdlog)
|
||||
add_test(NAME name COMMAND $<TARGET_FILE:test_${name}>)
|
||||
add_test(NAME test_${name} COMMAND $<TARGET_FILE:test_${name}>)
|
||||
endfunction()
|
||||
|
||||
add_mnode_gtest(mana)
|
||||
|
@ -36,5 +36,11 @@ if (ROOT_FOUND)
|
|||
message("-- Using ROOT installation from ${ROOT_USE_FILE}")
|
||||
include(${ROOT_USE_FILE})
|
||||
add_library(mana-plugin-root-histogram SHARED mana_plugin_root_histogram.cc)
|
||||
target_link_libraries(mana-plugin-root-histogram PRIVATE mana ${ROOT_LIBRARIES})
|
||||
target_link_libraries(mana-plugin-root-histogram PRIVATE mana mesytec-mnode rxi-logc ${ROOT_LIBRARIES})
|
||||
endif()
|
||||
|
||||
if (pybind11_FOUND)
|
||||
add_library(mana-plugin-python SHARED mana_plugin_python.cc)
|
||||
target_link_libraries(mana-plugin-python PRIVATE mana mesytec-mnode rxi-logc pybind11::embed)
|
||||
file(CREATE_LINK ${CMAKE_CURRENT_SOURCE_DIR}/mana_python_test_plugin.py ${CMAKE_BINARY_DIR}/mana_python_test_plugin.py COPY_ON_ERROR SYMBOLIC)
|
||||
endif()
|
||||
|
|
|
@ -274,14 +274,14 @@ inline void module_data_stage_process_system_event(ModuleDataStage &ctx, const u
|
|||
struct ManaCountingSink: public ManaPlugin
|
||||
{
|
||||
std::vector<size_t> eventCounts;
|
||||
std::vector<std::vector<size_t>> eventArrayHits;
|
||||
std::vector<std::vector<std::vector<size_t>>> eventArrayIndexHits;
|
||||
size_t totalBytes = 0;
|
||||
size_t systemEventCount = 0;
|
||||
size_t systemEventBytes = 0;
|
||||
void reset()
|
||||
{
|
||||
eventCounts.clear();
|
||||
eventArrayHits.clear();
|
||||
eventArrayIndexHits.clear();
|
||||
totalBytes = 0;
|
||||
}
|
||||
|
||||
|
@ -295,31 +295,58 @@ struct ManaCountingSink: public ManaPlugin
|
|||
auto jRun = nlohmann::json::parse(descriptor_json);
|
||||
reset();
|
||||
eventCounts.resize(jRun["events"].size());
|
||||
eventArrayHits.resize(eventCounts.size());
|
||||
// TODO: resize nested vectors
|
||||
eventArrayIndexHits.resize(eventCounts.size());
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_END_RUN(end_run) override
|
||||
{
|
||||
(void)context;
|
||||
(void)descriptor_json;
|
||||
auto jRun = nlohmann::json::parse(descriptor_json);
|
||||
|
||||
spdlog::info("ManaCountingSink: eventCounts=[{}], totalBytes={}, systemEvents={}, "
|
||||
"systemEventBytes={}",
|
||||
fmt::join(eventCounts, ", "), totalBytes, systemEventCount, systemEventBytes);
|
||||
|
||||
for (size_t ei = 0; ei < eventArrayIndexHits.size(); ++ei)
|
||||
{
|
||||
spdlog::info("event[{}]: {} hits", ei, eventCounts[ei]);
|
||||
for (size_t ai = 0; ai < eventArrayIndexHits[ei].size(); ++ai)
|
||||
{
|
||||
auto name = jRun["events"][ei]["outputs"][ai]["name"];
|
||||
auto arrayHits = eventArrayIndexHits[ei][ai];
|
||||
auto sumHits = std::accumulate(std::begin(arrayHits), std::end(arrayHits),
|
||||
static_cast<size_t>(0u));
|
||||
spdlog::info(" {}[{}]: [{}], sum={}", name, arrayHits.size(),
|
||||
fmt::join(arrayHits, ", "), sumHits);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) override
|
||||
{
|
||||
(void)context;
|
||||
eventCounts.resize(std::max(eventCounts.size(), static_cast<size_t>(eventIndex + 1)));
|
||||
eventArrayHits.resize(eventCounts.size());
|
||||
eventArrayHits[eventIndex].resize(std::max(eventArrayHits[eventIndex].size(), arrayCount));
|
||||
eventArrayIndexHits.resize(eventCounts.size());
|
||||
eventArrayIndexHits[eventIndex].resize(
|
||||
std::max(eventArrayIndexHits[eventIndex].size(), arrayCount));
|
||||
|
||||
++eventCounts[eventIndex];
|
||||
|
||||
for (size_t ai = 0; ai < arrayCount; ++ai)
|
||||
{
|
||||
auto input = get_span<float>(arrays[ai]);
|
||||
bool hit = std::any_of(std::begin(input), std::end(input),
|
||||
[](float v) { return !std::isnan(v); });
|
||||
if (hit)
|
||||
++eventArrayHits[eventIndex][ai];
|
||||
eventArrayIndexHits[eventIndex][ai].resize(input.size());
|
||||
|
||||
for (size_t i = 0; i < input.size(); ++i)
|
||||
{
|
||||
if (!std::isnan(input[i]))
|
||||
{
|
||||
++eventArrayIndexHits[eventIndex][ai][i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this->totalBytes += totalBytes;
|
||||
}
|
||||
|
||||
|
|
|
@ -4,9 +4,11 @@
|
|||
#include <algorithm>
|
||||
#include <cstring>
|
||||
#include <list>
|
||||
#include <mesytec-mnode/mnode_cpp_types.h>
|
||||
#include <mesytec-mnode/mnode_math.h>
|
||||
#include <mesytec-mvlc/util/fmt.h>
|
||||
#include <numeric>
|
||||
#include <vector>
|
||||
|
||||
namespace mesytec::mnode::mana
|
||||
{
|
||||
|
@ -15,7 +17,7 @@ class Arena
|
|||
{
|
||||
public:
|
||||
static constexpr size_t initial_segment_size = 1u << 20;
|
||||
static constexpr size_t default_pad = 8;
|
||||
static constexpr size_t default_align_to = 8;
|
||||
static constexpr size_t default_max_segments = 1;
|
||||
|
||||
Arena() = default;
|
||||
|
@ -25,11 +27,11 @@ class Arena
|
|||
Arena &operator=(Arena &&other) = default;
|
||||
|
||||
// pushes at least the required amount of memory onto the arena.
|
||||
// 'required' is rounded up to the nearest multiple of 'default_pad' to make
|
||||
// 'required' is rounded up to the nearest multiple of 'default_align_to' to make
|
||||
// subsequent allocations aligned.
|
||||
u8 *push_size(size_t required)
|
||||
{
|
||||
size_t padded = round_up(required, default_pad);
|
||||
size_t padded = round_up(required, default_align_to);
|
||||
size_t pad_waste = padded - required;
|
||||
required = padded;
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
#include <mesytec-mvlc/util/data_filter.h>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include "mana_api.h"
|
||||
#include "mana_arena.h"
|
||||
|
||||
namespace mesytec::mnode::mana
|
||||
{
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
#include "internal/mana_arena.h"
|
||||
#include "internal/mana_lib.hpp"
|
||||
#include <gtest/gtest.h>
|
||||
#include <mesytec-mnode/mnode_cpp_types.h>
|
||||
#include "internal/mana_arena.h"
|
||||
#include <spdlog/spdlog.h>
|
||||
#include "internal/mana_lib.hpp"
|
||||
|
||||
using namespace mesytec::mnode;
|
||||
|
||||
|
@ -18,35 +18,35 @@ TEST(mnode_mana_arena, push_reset_push)
|
|||
|
||||
for (size_t i = 0; i < 10; ++i)
|
||||
{
|
||||
auto mem = arena.push_size(mana::Arena::default_pad + 2);
|
||||
auto mem = arena.push_size(mana::Arena::default_align_to + 2);
|
||||
ASSERT_NE(mem, nullptr);
|
||||
}
|
||||
|
||||
ASSERT_EQ(arena.allocations(), 10);
|
||||
ASSERT_GE(arena.pad_waste(), 0);
|
||||
ASSERT_GE(arena.segment_count(), 1);
|
||||
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_pad + 2));
|
||||
ASSERT_EQ(arena.used(), 10 * (mana::Arena::default_pad + 2));
|
||||
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_align_to + 2));
|
||||
ASSERT_EQ(arena.used(), 10 * (mana::Arena::default_align_to + 2));
|
||||
|
||||
arena.reset();
|
||||
|
||||
ASSERT_EQ(arena.allocations(), 0);
|
||||
ASSERT_EQ(arena.pad_waste(), 0);
|
||||
ASSERT_GE(arena.segment_count(), 1);
|
||||
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_pad + 2));
|
||||
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_align_to + 2));
|
||||
ASSERT_EQ(arena.used(), 0);
|
||||
|
||||
for (size_t i = 0; i < 10; ++i)
|
||||
{
|
||||
auto mem = arena.push_size(mana::Arena::default_pad + 2);
|
||||
auto mem = arena.push_size(mana::Arena::default_align_to + 2);
|
||||
ASSERT_NE(mem, nullptr);
|
||||
}
|
||||
|
||||
ASSERT_EQ(arena.allocations(), 10);
|
||||
ASSERT_GE(arena.pad_waste(), 0);
|
||||
ASSERT_GE(arena.segment_count(), 1);
|
||||
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_pad + 2));
|
||||
ASSERT_EQ(arena.used(), 10 * (mana::Arena::default_pad + 2));
|
||||
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_align_to + 2));
|
||||
ASSERT_EQ(arena.used(), 10 * (mana::Arena::default_align_to + 2));
|
||||
}
|
||||
|
||||
TEST(mnode_mana_offset_ptr, basic)
|
||||
|
@ -93,8 +93,8 @@ TEST(mnode_mana_offset_ptr, strings)
|
|||
auto s0 = arena.push_cstr("hello");
|
||||
auto s1 = arena.push_cstr("world");
|
||||
|
||||
spdlog::info("&t->ptr0={}, &t->ptr1={}", fmt::ptr(&t->ptr0), fmt::ptr(&t->ptr1));
|
||||
spdlog::info("s0={} @ {}, s1={} @ {}", s0, fmt::ptr(s0), s1, fmt::ptr(s1));
|
||||
// spdlog::info("&t->ptr0={}, &t->ptr1={}", fmt::ptr(&t->ptr0), fmt::ptr(&t->ptr1));
|
||||
// spdlog::info("s0={} @ {}, s1={} @ {}", s0, fmt::ptr(s0), s1, fmt::ptr(s1));
|
||||
|
||||
set(t->ptr0, s0);
|
||||
set(t->ptr1, s1);
|
||||
|
|
|
@ -9,6 +9,7 @@ struct Context
|
|||
|
||||
MANA_DEFINE_PLUGIN_INIT(init)
|
||||
{
|
||||
log_set_level(LOG_DEBUG);
|
||||
struct Context *ctx = calloc(1, sizeof(*ctx));
|
||||
log_debug("init: ctx=%p", ctx);
|
||||
return ctx;
|
||||
|
|
108
src/mana_plugin_python.cc
Normal file
108
src/mana_plugin_python.cc
Normal file
|
@ -0,0 +1,108 @@
|
|||
#include "internal/mana_lib.hpp"
|
||||
#include <mesytec-mvlc/cpp_compat.h>
|
||||
extern "C"
|
||||
{
|
||||
#include "internal/rxi/log.h"
|
||||
}
|
||||
#include <pybind11/embed.h>
|
||||
#include <pybind11/stl.h>
|
||||
|
||||
using namespace mesytec;
|
||||
using namespace mesytec::mnode;
|
||||
namespace py = pybind11;
|
||||
|
||||
PYBIND11_EMBEDDED_MODULE(py_mana, m) {}
|
||||
|
||||
namespace
|
||||
{
|
||||
struct Context
|
||||
{
|
||||
py::scoped_interpreter interp;
|
||||
py::module usercode;
|
||||
py::object userobject;
|
||||
py::object py_begin_run;
|
||||
py::object py_event_data;
|
||||
py::object py_end_run;
|
||||
std::vector<std::vector<py::object>> eventBuffers;
|
||||
};
|
||||
} // namespace
|
||||
|
||||
MANA_DEFINE_PLUGIN_INIT(init)
|
||||
{
|
||||
log_set_level(LOG_DEBUG);
|
||||
static Context g_ctx;
|
||||
auto ctx = &g_ctx;
|
||||
log_debug("init: ctx=%p", ctx);
|
||||
// TODO: pass args to init()
|
||||
// TODO: catch exceptions
|
||||
ctx->usercode = py::module::import("mana_python_test_plugin");
|
||||
return ctx;
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
|
||||
{
|
||||
(void)context;
|
||||
log_debug("shutdown");
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
|
||||
{
|
||||
log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json);
|
||||
auto jRun = nlohmann::json::parse(descriptor_json);
|
||||
auto ctx = reinterpret_cast<Context *>(context);
|
||||
ctx->usercode.reload();
|
||||
// TODO: check if the retrieved attributes are callable
|
||||
ctx->py_begin_run = ctx->usercode.attr("begin_run");
|
||||
ctx->py_event_data = ctx->usercode.attr("event_data");
|
||||
ctx->py_end_run = ctx->usercode.attr("end_run");
|
||||
ctx->py_begin_run(descriptor_json);
|
||||
ctx->eventBuffers.clear();
|
||||
ctx->eventBuffers.resize(jRun["events"].size());
|
||||
|
||||
for (size_t ei = 0; ei < jRun["events"].size(); ++ei)
|
||||
{
|
||||
auto &jEvent = jRun["events"][ei];
|
||||
ctx->eventBuffers[ei].resize(jEvent["outputs"].size());
|
||||
}
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_END_RUN(end_run)
|
||||
{
|
||||
log_debug("end: context=%p, descriptor_json=%s", context, descriptor_json);
|
||||
auto ctx = reinterpret_cast<Context *>(context);
|
||||
ctx->py_end_run(descriptor_json);
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
|
||||
{
|
||||
log_trace("event: ctx=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", context, eventIndex,
|
||||
arrayCount, totalBytes);
|
||||
auto ctx = reinterpret_cast<Context *>(context);
|
||||
auto &buffers = ctx->eventBuffers.at(eventIndex);
|
||||
assert(buffers.size() == arrayCount);
|
||||
for (size_t ai = 0; ai < arrayCount; ++ai)
|
||||
{
|
||||
auto arraySpan = mana::get_span<float>(arrays[ai]);
|
||||
ctx->eventBuffers[eventIndex][ai] =
|
||||
py::memoryview::from_memory(arraySpan.data(), arraySpan.size() * sizeof(float));
|
||||
}
|
||||
ctx->py_event_data(eventIndex, ctx->eventBuffers);
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event)
|
||||
{
|
||||
log_trace("system_event: ctx=%p, size=%zu", context, size);
|
||||
auto ctx = reinterpret_cast<Context *>(context);
|
||||
}
|
||||
|
||||
extern "C" MANA_DEFINE_GET_PLUGIN(mana_get_plugin)
|
||||
{
|
||||
mana_plugin_t plugin;
|
||||
plugin.init = init;
|
||||
plugin.shutdown = shutdown;
|
||||
plugin.begin_run = begin_run;
|
||||
plugin.end_run = end_run;
|
||||
plugin.process_event = process_event;
|
||||
plugin.process_system_event = process_system_event;
|
||||
return plugin;
|
||||
}
|
216
src/mana_plugin_root_histogram.cc
Normal file
216
src/mana_plugin_root_histogram.cc
Normal file
|
@ -0,0 +1,216 @@
|
|||
#include <TFile.h>
|
||||
#include <TH1.h>
|
||||
#include <regex>
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <vector>
|
||||
|
||||
#include "internal/mana_lib.hpp"
|
||||
|
||||
extern "C"
|
||||
{
|
||||
#include "internal/rxi/log.h"
|
||||
}
|
||||
|
||||
using namespace mesytec::mnode;
|
||||
|
||||
struct Context
|
||||
{
|
||||
std::unique_ptr<TFile> outputFile;
|
||||
std::vector<std::vector<TH1 *>> hitCounts;
|
||||
std::vector<std::vector<TH1 *>> rawHistograms;
|
||||
};
|
||||
|
||||
MANA_DEFINE_PLUGIN_INIT(init)
|
||||
{
|
||||
log_set_level(LOG_DEBUG);
|
||||
static Context g_ctx;
|
||||
log_debug("init: ctx=%p", &g_ctx);
|
||||
return &g_ctx;
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
|
||||
{
|
||||
(void)context;
|
||||
log_debug("shutdown");
|
||||
}
|
||||
|
||||
struct ObjectPath
|
||||
{
|
||||
std::string path;
|
||||
std::string objectName;
|
||||
std::vector<std::string> components;
|
||||
|
||||
ObjectPath(const std::string &path, const std::string &sep = "\\.")
|
||||
: path(path)
|
||||
{
|
||||
std::regex re(sep);
|
||||
std::sregex_token_iterator it(path.begin(), path.end(), re, -1);
|
||||
std::copy(it, std::sregex_token_iterator(), std::back_inserter(components));
|
||||
|
||||
if (!components.empty())
|
||||
{
|
||||
objectName = components.back();
|
||||
components.pop_back();
|
||||
}
|
||||
|
||||
spdlog::debug("ObjectPath: objectName={}, components={}", objectName,
|
||||
fmt::join(components, ", "));
|
||||
}
|
||||
};
|
||||
|
||||
template <typename It> TDirectory *root_mkpath_cd(It begin, It end)
|
||||
{
|
||||
TDirectory *result = nullptr;
|
||||
for (auto it = begin; it != end; ++it)
|
||||
{
|
||||
result = gDirectory->mkdir(it->c_str(), "", true);
|
||||
result->cd();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
inline std::vector<std::vector<TH1 *>> make_hitcount_histos(TDirectory *baseDir,
|
||||
const nlohmann::json &jRun)
|
||||
{
|
||||
std::vector<std::vector<TH1 *>> result;
|
||||
|
||||
for (const auto &jEvent: jRun["events"])
|
||||
{
|
||||
std::vector<TH1 *> eventHistos;
|
||||
for (const auto &jOutput: jEvent["outputs"])
|
||||
{
|
||||
auto size = jOutput["size"].get<size_t>();
|
||||
ObjectPath path(jOutput["name"].get<std::string>());
|
||||
path.components.emplace_back("hit_counts");
|
||||
baseDir->cd();
|
||||
root_mkpath_cd(path.components.begin(), path.components.end());
|
||||
auto histoName = fmt::format("{}_hits", path.path);
|
||||
auto histoTitle = fmt::format("Hit Counts {}", path.path);
|
||||
auto th1 = new TH1F(histoName.c_str(), histoTitle.c_str(), size, 0.0, size);
|
||||
th1->GetXaxis()->SetTitle("Index");
|
||||
th1->GetYaxis()->SetTitle("Hits");
|
||||
eventHistos.emplace_back(std::move(th1));
|
||||
}
|
||||
result.emplace_back(std::move(eventHistos));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
inline std::vector<std::vector<TH1 *>> make_raw_histos(TDirectory *baseDir,
|
||||
const nlohmann::json &jRun)
|
||||
{
|
||||
std::vector<std::vector<TH1 *>> result;
|
||||
|
||||
for (const auto &jEvent: jRun["events"])
|
||||
{
|
||||
std::vector<TH1 *> eventHistos;
|
||||
for (const auto &jOutput: jEvent["outputs"])
|
||||
{
|
||||
auto histoCount = jOutput["size"].get<size_t>();
|
||||
auto bits = jOutput["bits"].get<unsigned>();
|
||||
auto bins = 1u << std::min(bits, 16u);
|
||||
auto maxValue = std::pow(2.0, bits);
|
||||
ObjectPath path(jOutput["name"].get<std::string>());
|
||||
spdlog::info("path={}, bins={}, maxValue={}", path.path, bins, maxValue);
|
||||
baseDir->cd();
|
||||
root_mkpath_cd(path.components.begin(), path.components.end());
|
||||
|
||||
for (size_t i = 0; i < histoCount; ++i)
|
||||
{
|
||||
auto histoName = fmt::format("{}_{:04d}", path.objectName, i);
|
||||
auto histoTitle = fmt::format("{}[{}]", path.path, i);
|
||||
auto th1 = new TH1F(histoName.c_str(), histoTitle.c_str(), bins, 0.0, maxValue);
|
||||
eventHistos.emplace_back(std::move(th1));
|
||||
}
|
||||
}
|
||||
result.emplace_back(std::move(eventHistos));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
inline std::string histo_info(const std::vector<std::vector<TH1 *>> &histos)
|
||||
{
|
||||
size_t histoCount = 0;
|
||||
size_t histoMem = 0;
|
||||
|
||||
for (size_t ei = 0; ei < histos.size(); ++ei)
|
||||
{
|
||||
for (size_t mi = 0; mi < histos[ei].size(); ++mi)
|
||||
{
|
||||
auto histo = histos[ei][mi];
|
||||
histoCount++;
|
||||
histoMem += histo->GetNbinsX() * sizeof(float);
|
||||
}
|
||||
}
|
||||
|
||||
return fmt::format("histoCount={}, histoMem={} MiB", histoCount, histoMem / (1024.0 * 1024));
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
|
||||
{
|
||||
log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json);
|
||||
auto jRun = nlohmann::json::parse(descriptor_json);
|
||||
auto ctx = reinterpret_cast<Context *>(context);
|
||||
ctx->hitCounts.clear();
|
||||
ctx->outputFile = std::make_unique<TFile>("output.root", "RECREATE");
|
||||
ctx->hitCounts = make_hitcount_histos(ctx->outputFile.get(), jRun);
|
||||
ctx->rawHistograms = make_raw_histos(ctx->outputFile.get(), jRun);
|
||||
spdlog::info("hitCount histograms: {}", histo_info(ctx->hitCounts));
|
||||
spdlog::info("raw histograms: {}", histo_info(ctx->rawHistograms));
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_END_RUN(end_run)
|
||||
{
|
||||
log_debug("end: context=%p, descriptor_json=%s", context, descriptor_json);
|
||||
auto ctx = reinterpret_cast<Context *>(context);
|
||||
ctx->outputFile->Write();
|
||||
*ctx = {};
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
|
||||
{
|
||||
log_trace("event: ctx=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", context, eventIndex,
|
||||
arrayCount, totalBytes);
|
||||
auto ctx = reinterpret_cast<Context *>(context);
|
||||
|
||||
auto &hitCountHistograms = ctx->hitCounts.at(eventIndex);
|
||||
auto &rawHistograms = ctx->rawHistograms.at(eventIndex);
|
||||
size_t rawHistoIndex = 0;
|
||||
|
||||
for (size_t ai = 0; ai < arrayCount; ++ai)
|
||||
{
|
||||
auto &hitCountHisto = hitCountHistograms.at(ai);
|
||||
auto input = mana::get_span<float>(arrays[ai]);
|
||||
|
||||
for (size_t i = 0; i < input.size(); ++i)
|
||||
{
|
||||
if (!std::isnan(input[i]))
|
||||
{
|
||||
hitCountHisto->Fill(i);
|
||||
rawHistograms.at(rawHistoIndex + i)->Fill(input[i]);
|
||||
}
|
||||
}
|
||||
|
||||
rawHistoIndex += input.size();
|
||||
}
|
||||
}
|
||||
|
||||
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event)
|
||||
{
|
||||
log_trace("system_event: ctx=%p, size=%zu", context, size);
|
||||
}
|
||||
|
||||
extern "C" MANA_DEFINE_GET_PLUGIN(mana_get_plugin)
|
||||
{
|
||||
mana_plugin_t plugin;
|
||||
plugin.init = init;
|
||||
plugin.shutdown = shutdown;
|
||||
plugin.begin_run = begin_run;
|
||||
plugin.end_run = end_run;
|
||||
plugin.process_event = process_event;
|
||||
plugin.process_system_event = process_system_event;
|
||||
return plugin;
|
||||
}
|
13
src/mana_python_test_plugin.py
Normal file
13
src/mana_python_test_plugin.py
Normal file
|
@ -0,0 +1,13 @@
|
|||
|
||||
import json
|
||||
import py_mana
|
||||
|
||||
def begin_run(runDescription: str):
|
||||
print("begin_run")
|
||||
|
||||
def end_run(runDescription: str):
|
||||
print("end_run")
|
||||
|
||||
def event_data(eventIndex: int, dataArrays):
|
||||
return
|
||||
print(f"event[{eventIndex}]: {dataArrays}")
|
|
@ -1,6 +1,6 @@
|
|||
#include <mesytec-mnode/mnode_nng_proto.h>
|
||||
#include <mesytec-mnode/mnode_nng.h>
|
||||
#include <mesytec-mnode/mnode_cpp_types.h>
|
||||
#include <mesytec-mnode/mnode_nng.h>
|
||||
#include <mesytec-mnode/mnode_nng_proto.h>
|
||||
|
||||
namespace mesytec::mnode::nng
|
||||
{
|
||||
|
@ -9,10 +9,11 @@ size_t serialize_proto_to_nng(const google::protobuf::MessageLite &message, nng_
|
|||
{
|
||||
auto messageSize = message.ByteSizeLong();
|
||||
|
||||
if (auto res = nng_msg_realloc(msg, nng_msg_len(msg) + sizeof(u32) + messageSize))
|
||||
if (nng_msg_realloc(msg, nng_msg_len(msg) + sizeof(u32) + messageSize))
|
||||
return 0;
|
||||
|
||||
*reinterpret_cast<u32 *>(nng_msg_body(msg)) = messageSize;
|
||||
|
||||
if (!message.SerializeToArray(reinterpret_cast<u8 *>(nng_msg_body(msg)) + sizeof(u32),
|
||||
messageSize))
|
||||
{
|
||||
|
|
|
@ -31,18 +31,22 @@
|
|||
// - create root trees or the new rntuples(?)
|
||||
// -> want plugins. similar to the mvme listfile_reader but on analysis data
|
||||
|
||||
#include <atomic>
|
||||
#include <boost/dll.hpp>
|
||||
#include <cmrc/cmrc.hpp> // mnode::resources
|
||||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||
#include <list>
|
||||
#include <mesytec-mnode/mnode_cpp_types.h>
|
||||
#include <mesytec-mnode/mnode_math.h>
|
||||
#include <mesytec-mnode/mnode_nng.h>
|
||||
#include <mesytec-mvlc/mesytec-mvlc.h>
|
||||
#include <mutex>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <list>
|
||||
#include <sstream>
|
||||
|
||||
#include "internal/argh.h"
|
||||
#include "internal/mana_analysis.h"
|
||||
#include "internal/mana_arena.h"
|
||||
#include "internal/mana_lib.hpp"
|
||||
#include "internal/mana_analysis.h"
|
||||
|
||||
CMRC_DECLARE(mnode::resources);
|
||||
|
||||
|
@ -135,9 +139,17 @@ std::optional<ParserContext> make_parser_context(const mvlc::CrateConfig &crateC
|
|||
}
|
||||
}
|
||||
|
||||
size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
|
||||
ParserContext &parserContext, mana::ModuleDataStage &analysisContext)
|
||||
struct ProcessingStrategy
|
||||
{
|
||||
virtual ~ProcessingStrategy() = default;
|
||||
virtual void run(ListfileContext &listfileContext, ParserContext &parserContext) = 0;
|
||||
};
|
||||
|
||||
struct SingleThreadedStrategy: public ProcessingStrategy
|
||||
{
|
||||
inline static size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
|
||||
ParserContext &parserContext)
|
||||
{
|
||||
listfileContext.readerHelper.destBuf().clear();
|
||||
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);
|
||||
|
||||
|
@ -145,13 +157,229 @@ size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
|
|||
return 0;
|
||||
|
||||
auto bufferView = buffer->viewU32();
|
||||
parserContext.parser.userContext = &analysisContext;
|
||||
|
||||
mvlc::readout_parser::parse_readout_buffer(
|
||||
listfileContext.readerHelper.bufferFormat, parserContext.parser, parserContext.callbacks,
|
||||
parserContext.counters, bufferNumber, bufferView.data(), bufferView.size());
|
||||
mvlc::readout_parser::parse_readout_buffer(listfileContext.readerHelper.bufferFormat,
|
||||
parserContext.parser, parserContext.callbacks,
|
||||
parserContext.counters, bufferNumber,
|
||||
bufferView.data(), bufferView.size());
|
||||
|
||||
return buffer->used();
|
||||
}
|
||||
|
||||
void run(ListfileContext &listfileContext, ParserContext &parserContext) override
|
||||
{
|
||||
size_t bufferNumber = 1;
|
||||
size_t totalBytesProcessed = 0;
|
||||
size_t bytesProcessed = 0;
|
||||
const std::chrono::milliseconds ReportInterval(500);
|
||||
mvlc::util::Stopwatch sw;
|
||||
|
||||
auto report = [&]
|
||||
{
|
||||
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
|
||||
{
|
||||
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
|
||||
auto bytesPerSecond = totalBytesProcessed / s;
|
||||
auto MiBPerSecond = bytesPerSecond / (1u << 20);
|
||||
std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n",
|
||||
bufferNumber, totalBytesProcessed, s, MiBPerSecond);
|
||||
}(sw, bufferNumber, totalBytesProcessed);
|
||||
};
|
||||
|
||||
do
|
||||
{
|
||||
bytesProcessed = process_one_buffer(bufferNumber, listfileContext, parserContext);
|
||||
totalBytesProcessed += bytesProcessed;
|
||||
++bufferNumber;
|
||||
|
||||
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
|
||||
{
|
||||
report();
|
||||
sw.interval();
|
||||
}
|
||||
}
|
||||
while (bytesProcessed > 0);
|
||||
|
||||
report();
|
||||
}
|
||||
};
|
||||
|
||||
struct NngPairStrategy: public ProcessingStrategy
|
||||
{
|
||||
struct SocketCounters
|
||||
{
|
||||
size_t bytes;
|
||||
size_t messages;
|
||||
};
|
||||
|
||||
nng_socket producerSocket;
|
||||
nng_socket consumerSocket;
|
||||
SocketCounters producerTx;
|
||||
SocketCounters consumerRx;
|
||||
// std::mutex producerTxMutex;
|
||||
|
||||
explicit NngPairStrategy(const std::string &url_)
|
||||
{
|
||||
producerSocket = nng::make_pair_socket();
|
||||
consumerSocket = nng::make_pair_socket();
|
||||
if (int res = nng_listen(producerSocket, url_.c_str(), nullptr, 0))
|
||||
throw std::runtime_error(
|
||||
fmt::format("error listening on '{}': {}", url_, nng_strerror(res)));
|
||||
|
||||
if (int res = nng_dial(consumerSocket, url_.c_str(), nullptr, 0))
|
||||
throw std::runtime_error(
|
||||
fmt::format("error connecting to '{}': {}", url_, nng_strerror(res)));
|
||||
};
|
||||
|
||||
~NngPairStrategy()
|
||||
{
|
||||
nng_close(consumerSocket);
|
||||
nng_close(producerSocket);
|
||||
}
|
||||
|
||||
// message format is: u32 bufferNumber, u32 bufferSize, u32 bufferData[bufferSize]
|
||||
void producer(ListfileContext &listfileContext, std::atomic<bool> &quit)
|
||||
{
|
||||
size_t bufferNumber = 1;
|
||||
|
||||
while (!quit)
|
||||
{
|
||||
listfileContext.readerHelper.destBuf().clear();
|
||||
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);
|
||||
|
||||
if (!buffer->used())
|
||||
break;
|
||||
|
||||
auto bufferView = buffer->viewU32();
|
||||
auto msg = nng::allocate_message(2 * sizeof(u32) + bufferView.size() * sizeof(u32));
|
||||
size_t msgLen = nng_msg_len(msg.get());
|
||||
auto data = reinterpret_cast<u32 *>(nng_msg_body(msg.get()));
|
||||
*(data + 0) = bufferNumber++;
|
||||
*(data + 1) = static_cast<u32>(bufferView.size());
|
||||
std::copy(std::begin(bufferView), std::end(bufferView), data + 2);
|
||||
int res = 0;
|
||||
|
||||
do
|
||||
{
|
||||
if ((res = nng::send_message(producerSocket, msg)))
|
||||
{
|
||||
if (res != NNG_ETIMEDOUT)
|
||||
{
|
||||
spdlog::error("NngPairStrategy: error sending message: {}",
|
||||
nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// std::lock_guard<std::mutex> lock(producerTxMutex);
|
||||
producerTx.bytes += msgLen;
|
||||
++producerTx.messages;
|
||||
}
|
||||
while (res == NNG_ETIMEDOUT);
|
||||
}
|
||||
|
||||
nng::send_empty_message(producerSocket);
|
||||
}
|
||||
|
||||
void consumer(mvlc::ConnectionType bufferFormat, ParserContext &parserContext)
|
||||
{
|
||||
size_t bufferNumber = 1;
|
||||
size_t totalBytesProcessed = 0;
|
||||
const std::chrono::milliseconds ReportInterval(500);
|
||||
mvlc::util::Stopwatch sw;
|
||||
|
||||
auto report = [&]
|
||||
{
|
||||
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
|
||||
{
|
||||
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
|
||||
auto bytesPerSecond = totalBytesProcessed / s;
|
||||
auto MiBPerSecond = bytesPerSecond / (1u << 20);
|
||||
std::cout << fmt::format(
|
||||
"Processed {} buffers, {} bytes. t={} s, rate={:.2f} MiB/s\n", bufferNumber,
|
||||
totalBytesProcessed, s, MiBPerSecond);
|
||||
}(sw, bufferNumber, totalBytesProcessed);
|
||||
};
|
||||
|
||||
while (true)
|
||||
{
|
||||
auto [msg, res] = nng::receive_message(consumerSocket);
|
||||
|
||||
if (res && res != NNG_ETIMEDOUT)
|
||||
{
|
||||
spdlog::error("NngPairStrategy: error receiving message: {}", nng_strerror(res));
|
||||
return;
|
||||
}
|
||||
|
||||
auto data =
|
||||
mvlc::util::span<const u32>(reinterpret_cast<const u32 *>(nng_msg_body(msg.get())),
|
||||
nng_msg_len(msg.get()) / sizeof(u32));
|
||||
if (data.size() < 2)
|
||||
break;
|
||||
|
||||
bufferNumber = data[0];
|
||||
size_t bufferSize = data[1];
|
||||
|
||||
if (data.size() != bufferSize + 2)
|
||||
{
|
||||
spdlog::error("NngPairStrategy: invalid message size: {}", data.size());
|
||||
return;
|
||||
}
|
||||
|
||||
std::basic_string_view<u32> bufferView(data.data() + 2, bufferSize);
|
||||
|
||||
mvlc::readout_parser::parse_readout_buffer(
|
||||
bufferFormat, parserContext.parser, parserContext.callbacks, parserContext.counters,
|
||||
bufferNumber, bufferView.data(), bufferView.size());
|
||||
|
||||
consumerRx.bytes += nng_msg_len(msg.get());
|
||||
++consumerRx.messages;
|
||||
totalBytesProcessed += bufferView.size() * sizeof(u32);
|
||||
|
||||
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
|
||||
{
|
||||
report();
|
||||
sw.interval();
|
||||
}
|
||||
}
|
||||
|
||||
report();
|
||||
spdlog::debug("NngPairStrategy: producerTx: {:.2f} MiB, {} messages",
|
||||
producerTx.bytes / (1024.0 * 1024), producerTx.messages);
|
||||
spdlog::debug("NngPairStrategy: consumerRx: {:.2f} MiB, {} messages",
|
||||
consumerRx.bytes / (1024.0 * 1024), consumerRx.messages);
|
||||
}
|
||||
|
||||
void run(ListfileContext &listfileContext, ParserContext &parserContext) override
|
||||
{
|
||||
auto bufferFormat = listfileContext.readerHelper.bufferFormat;
|
||||
std::atomic<bool> quitProducer = false;
|
||||
producerTx = {};
|
||||
consumerRx = {};
|
||||
|
||||
std::thread producerThread([this, &listfileContext, &quitProducer]
|
||||
{ producer(listfileContext, quitProducer); });
|
||||
|
||||
consumer(bufferFormat, parserContext);
|
||||
|
||||
quitProducer = true;
|
||||
|
||||
if (producerThread.joinable())
|
||||
producerThread.join();
|
||||
}
|
||||
};
|
||||
|
||||
void usage(const char *self)
|
||||
{
|
||||
std::cout << fmt::format(
|
||||
"usage: {} [--plugin=<plugin.so>] [--processing-strategy=<name>] <zipfile>\n", self);
|
||||
std::cout
|
||||
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a "
|
||||
"simple counting plugin.\n"
|
||||
<< " --processing-strategy=<name> Use a specific processing strategy. "
|
||||
"Available: 'direct', 'nng-pair'. Default: 'direct'\n"
|
||||
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
|
||||
"'off'.\n";
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
|
@ -159,17 +387,24 @@ int main(int argc, char *argv[])
|
|||
auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json");
|
||||
const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end());
|
||||
|
||||
argh::parser parser({"-h", "--help", "--plugin"});
|
||||
parser.parse(argv);
|
||||
argh::parser parser({"-h", "--help", "--plugin", "--processing-strategy", "--log-level"});
|
||||
parser.parse(argc, argv);
|
||||
|
||||
auto filename = parser[1];
|
||||
|
||||
if (parser["-h"] || parser["--help"] || filename.empty())
|
||||
{
|
||||
std::cout << fmt::format("usage: {} [--plugin=<plugin.so>] <zipfile>\n", argv[0]);
|
||||
usage(argv[0]);
|
||||
return 0;
|
||||
}
|
||||
|
||||
spdlog::set_level(spdlog::level::info);
|
||||
|
||||
if (parser("--log-level"))
|
||||
{
|
||||
spdlog::set_level(spdlog::level::from_str(parser("--log-level").str()));
|
||||
}
|
||||
|
||||
auto listfileContext = make_listfile_context(filename);
|
||||
|
||||
if (!listfileContext)
|
||||
|
@ -201,6 +436,23 @@ int main(int argc, char *argv[])
|
|||
manaPlugin = std::make_unique<mana::ManaCountingSink>();
|
||||
}
|
||||
|
||||
std::string strategyName = "direct";
|
||||
if (parser("--processing-strategy"))
|
||||
strategyName = parser("--processing-strategy").str();
|
||||
|
||||
std::unique_ptr<ProcessingStrategy> strategy; //= std::make_unique<SingleThreadedStrategy>();
|
||||
|
||||
if (strategyName == "nng-pair")
|
||||
strategy = std::make_unique<NngPairStrategy>("inproc://mana_module_data_stage");
|
||||
else if (strategyName == "direct")
|
||||
strategy = std::make_unique<SingleThreadedStrategy>();
|
||||
else
|
||||
{
|
||||
std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName);
|
||||
usage(argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig,
|
||||
jModuleDataSources, manaPlugin.get(), nullptr);
|
||||
|
||||
|
@ -226,64 +478,16 @@ int main(int argc, char *argv[])
|
|||
if (!parserContext)
|
||||
return 1;
|
||||
|
||||
// make the analysis instance available to the parser callbacks
|
||||
parserContext->parser.userContext = &mana;
|
||||
|
||||
mana.sinkContext = mana.sink->init();
|
||||
mana.sink->begin_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
|
||||
|
||||
size_t bufferNumber = 0;
|
||||
size_t totalBytesProcessed = 0;
|
||||
size_t bytesProcessed = 0;
|
||||
const std::chrono::milliseconds ReportInterval(500);
|
||||
mvlc::util::Stopwatch sw;
|
||||
|
||||
auto report = [&]
|
||||
{
|
||||
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
|
||||
{
|
||||
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
|
||||
auto bytesPerSecond = totalBytesProcessed / s;
|
||||
auto MiBPerSecond = bytesPerSecond / (1u << 20);
|
||||
std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n",
|
||||
bufferNumber, totalBytesProcessed, s, MiBPerSecond);
|
||||
}(sw, bufferNumber, totalBytesProcessed);
|
||||
};
|
||||
|
||||
do
|
||||
{
|
||||
bytesProcessed = process_one_buffer(bufferNumber, *listfileContext, *parserContext, mana);
|
||||
totalBytesProcessed += bytesProcessed;
|
||||
++bufferNumber;
|
||||
|
||||
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
|
||||
{
|
||||
report();
|
||||
sw.interval();
|
||||
}
|
||||
}
|
||||
while (bytesProcessed > 0);
|
||||
|
||||
report();
|
||||
strategy->run(*listfileContext, *parserContext);
|
||||
|
||||
mana.sink->end_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
|
||||
mana.sink->shutdown(mana.sinkContext);
|
||||
|
||||
if (auto cp = dynamic_cast<mana::ManaCountingSink *>(manaPlugin.get()))
|
||||
{
|
||||
|
||||
spdlog::info("ManaCountingSink: eventCounts=[{}], totalBytes={}, systemEvents={}, "
|
||||
"systemEventBytes={}",
|
||||
fmt::join(cp->eventCounts, ", "), cp->totalBytes, cp->systemEventCount,
|
||||
cp->systemEventBytes);
|
||||
|
||||
for (size_t ei = 0; ei < cp->eventArrayHits.size(); ++ei)
|
||||
{
|
||||
spdlog::info("event[{}]: {} hits", ei, cp->eventCounts[ei]);
|
||||
for (size_t ai = 0; ai < cp->eventArrayHits[ei].size(); ++ai)
|
||||
{
|
||||
auto name = mana.runDescriptor["events"][ei]["outputs"][ai]["name"];
|
||||
spdlog::info(" {}: {}", name, cp->eventArrayHits[ei][ai]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue