Compare commits

...

10 commits

Author SHA1 Message Date
Florian Lüke
ebd823ae4e mana root plugin: implement raw histograms, fixes 2024-12-26 18:44:24 +01:00
Florian Lüke
ead9c0ee2e fixes and refactorings 2024-12-26 18:42:25 +01:00
Florian Lüke
9e5b79cb34 mana: implement nng pair based threaded strategy
It's a tiny bit faster than the single threaded strategy:
- Counting plugin, is690-9Li_3H_run094:
  - direct, single threaded:  217.72411048962562 MiB/s
  - nng-pair, multi threaded: 228.50186892946155 MiB/s
- root-histogram plugin (hitcount histos only), is690-9Li_3H_run094:
  - direct, single threaded:  210.73162379794522 MiB/s
  - nng-pair, multi threaded: 219.66463320034057 MiB/s
2024-12-26 15:37:15 +01:00
Florian Lüke
e38ada8854 .clang-format: set SortIncludes: CaseSensitive 2024-12-26 15:28:54 +01:00
Florian Lüke
76d85e5c39 mnode_nng_proto: fix warning 2024-12-26 15:28:17 +01:00
Florian Lüke
0eaa5aa942 mnode_nng: reformat 2024-12-26 15:28:00 +01:00
Florian Lüke
9bdd79e910 mana: strategy 2024-12-26 03:17:33 +01:00
Florian Lüke
76b22c1434 mana: don't need to pass analysisContext to process_one_buffer() 2024-12-26 03:09:08 +01:00
Florian Lüke
13c73cfdfc mana: add a python plugin 2024-12-26 03:08:26 +01:00
Florian Lüke
6f7102549e mana: add a root-histogram plugin and improve ManaCountingSink
Currently only histograms the hit counts per input array element, not
the actual data.

The counting plugin now also tracks and prints 'eventArrayIndexHits'.
2024-12-26 01:46:58 +01:00
16 changed files with 716 additions and 128 deletions

View file

@ -170,7 +170,7 @@ RequiresClausePosition: OwnLine
RequiresExpressionIndentation: OuterScope
SeparateDefinitionBlocks: Leave
ShortNamespaceLines: 1
SortIncludes: Never
SortIncludes: CaseSensitive
SortJavaStaticImport: Before
SortUsingDeclarations: LexicographicNumeric
SpaceAfterCStyleCast: false

2
.vscode/launch.json vendored
View file

@ -11,7 +11,7 @@
"program": "${workspaceFolder}/build/mana_auto_replay",
"args": [
"~/Documents/is690-9Li_3H_run094_241020_124254_part001.zip",
"--plugin=src/libmana-plugin-c-test.so"
"--plugin=src/libmana-plugin-root-histogram.so"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}/build",

View file

@ -20,7 +20,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
option(MNODE_BUILD_TESTS "Build mnode test binaries" ${MESYTEC_MNODE_MAIN_PROJECT})
if (MNODE_BUILD_TESTS)
find_package(GTest CONFIG)
find_package(GTest CONFIG QUIET)
if (NOT GTest_FOUND)
include(FetchContent)
FetchContent_Declare(googletest URL https://github.com/google/googletest/archive/refs/tags/v1.15.2.zip)
@ -34,6 +34,14 @@ if (MNODE_BUILD_TESTS)
enable_testing()
endif()
include(FetchContent)
FetchContent_Declare(pybind11 GIT_REPOSITORY https://github.com/pybind/pybind11 GIT_TAG v2.13.6)
FetchContent_GetProperties(pybind11)
if(NOT pybind11_POPULATED)
FetchContent_Populate(pybind11)
add_subdirectory(${pybind11_SOURCE_DIR} ${pybind11_BINARY_DIR})
endif()
include(CMakeRC)
cmrc_add_resource_library(mnode-resources
ALIAS mnode::resources NAMESPACE mnode::resources

View file

@ -1,6 +1,7 @@
#ifndef C536E080_25D7_476D_B8E3_25912B9CFC3B
#define C536E080_25D7_476D_B8E3_25912B9CFC3B
#include <cassert>
#include <cstdint>
#include <limits>
@ -21,6 +22,6 @@ inline double make_quiet_nan()
return result;
}
}
} // namespace mesytec::mnode
#endif /* C536E080_25D7_476D_B8E3_25912B9CFC3B */

View file

@ -7,8 +7,8 @@
#include <nng/protocol/pipeline0/push.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>
#include <nng/protocol/reqrep0/req.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
#include <cassert>
#include <cstdint>
@ -93,7 +93,7 @@ inline int set_socket_timeouts(nng_socket socket, nng_duration timeout = Default
return 0;
}
using socket_factory = std::function<int (nng_socket *)>;
using socket_factory = std::function<int(nng_socket *)>;
inline nng_socket make_socket(socket_factory factory, nng_duration timeout = DefaultTimeout)
{
@ -159,20 +159,25 @@ inline std::string get_local_address(nng_socket s)
void log_pipe_info(nng_pipe p, const char *info_text);
// 'nng_res' is the result of the last nng function call.
using retry_predicate = std::function<bool (int nng_res)>;
using retry_predicate = std::function<bool(int nng_res)>;
int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp, const char *debugInfo = "");
int send_message_retry(nng_socket socket, nng_msg *msg, retry_predicate rp,
const char *debugInfo = "");
struct RetryNTimes
{
explicit RetryNTimes(size_t maxTries = 3) : maxTries(maxTries) {}
explicit RetryNTimes(size_t maxTries = 3)
: maxTries(maxTries)
{
}
bool operator()(int) { return attempt++ < maxTries; }
size_t maxTries;
size_t attempt = 0u;
};
inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3, const char *debugInfo = "")
inline int send_message_retry(nng_socket socket, nng_msg *msg, size_t maxTries = 3,
const char *debugInfo = "")
{
return send_message_retry(socket, msg, RetryNTimes(maxTries), debugInfo);
}
@ -185,16 +190,17 @@ inline int send_empty_message(nng_socket socket, size_t maxTries = 3)
}
// Read type T from the front of msg and trim the message by sizeof(T).
template<typename T>
std::optional<T> msg_trim_read(nng_msg *msg)
template <typename T> std::optional<T> msg_trim_read(nng_msg *msg)
{
const auto oldlen = nng_msg_len(msg); (void) oldlen;
const auto oldlen = nng_msg_len(msg);
(void)oldlen;
if (nng_msg_len(msg) < sizeof(T))
return std::nullopt;
T result = *reinterpret_cast<T *>(nng_msg_body(msg));
nng_msg_trim(msg, sizeof(T));
const auto newlen = nng_msg_len(msg); (void) newlen;
const auto newlen = nng_msg_len(msg);
(void)newlen;
assert(newlen + sizeof(T) == oldlen);
return result;
}
@ -205,8 +211,8 @@ const char *nng_stat_type_to_string(int nng_stat_type);
// Important: as of nng-1.8.0 sockets stats are only implemented for pair1 type
// sockets!
template<typename Visitor>
void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth=0)
template <typename Visitor>
void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth = 0)
{
visitor(stat, depth);
@ -216,7 +222,7 @@ void visit_nng_stats(nng_stat *stat, Visitor visitor, unsigned depth=0)
{
for (auto child = nng_stat_child(stat); child; child = nng_stat_next(child))
{
visit_nng_stats(child, visitor, depth+1);
visit_nng_stats(child, visitor, depth + 1);
}
}
}
@ -225,10 +231,7 @@ std::string format_stat(nng_stat *stat);
using unique_msg = std::unique_ptr<nng_msg, decltype(&nng_msg_free)>;
inline unique_msg make_unique_msg(nng_msg *msg = nullptr)
{
return unique_msg(msg, &nng_msg_free);
}
inline unique_msg make_unique_msg(nng_msg *msg = nullptr) { return unique_msg(msg, &nng_msg_free); }
inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0)
{
@ -236,10 +239,10 @@ inline std::pair<unique_msg, int> receive_message(nng_socket sock, int flags = 0
if (auto res = nng_recvmsg(sock, &msg, flags))
{
return { make_unique_msg(), res };
return {make_unique_msg(), res};
}
return { make_unique_msg(msg), 0 };
return {make_unique_msg(msg), 0};
}
inline int send_message(nng_socket sock, unique_msg &msg, int flags = 0)
@ -329,11 +332,8 @@ inline unique_msg clone_message(const nng_msg *msg)
return make_unique_msg(newMsg);
}
inline unique_msg clone_message(const unique_msg &msg)
{
return clone_message(msg.get());
}
inline unique_msg clone_message(const unique_msg &msg) { return clone_message(msg.get()); }
}
} // namespace mesytec::mnode::nng
#endif /* B18E3651_CA9A_43BC_AA25_810EA16533CD */

View file

@ -17,7 +17,7 @@ if (MNODE_BUILD_TESTS)
function (add_mnode_gtest name)
add_executable(test_${name} ${name}.test.cc)
target_link_libraries(test_${name} PRIVATE mesytec-mnode GTest::gtest_main spdlog)
add_test(NAME name COMMAND $<TARGET_FILE:test_${name}>)
add_test(NAME test_${name} COMMAND $<TARGET_FILE:test_${name}>)
endfunction()
add_mnode_gtest(mana)
@ -36,5 +36,11 @@ if (ROOT_FOUND)
message("-- Using ROOT installation from ${ROOT_USE_FILE}")
include(${ROOT_USE_FILE})
add_library(mana-plugin-root-histogram SHARED mana_plugin_root_histogram.cc)
target_link_libraries(mana-plugin-root-histogram PRIVATE mana ${ROOT_LIBRARIES})
target_link_libraries(mana-plugin-root-histogram PRIVATE mana mesytec-mnode rxi-logc ${ROOT_LIBRARIES})
endif()
if (pybind11_FOUND)
add_library(mana-plugin-python SHARED mana_plugin_python.cc)
target_link_libraries(mana-plugin-python PRIVATE mana mesytec-mnode rxi-logc pybind11::embed)
file(CREATE_LINK ${CMAKE_CURRENT_SOURCE_DIR}/mana_python_test_plugin.py ${CMAKE_BINARY_DIR}/mana_python_test_plugin.py COPY_ON_ERROR SYMBOLIC)
endif()

View file

@ -274,14 +274,14 @@ inline void module_data_stage_process_system_event(ModuleDataStage &ctx, const u
struct ManaCountingSink: public ManaPlugin
{
std::vector<size_t> eventCounts;
std::vector<std::vector<size_t>> eventArrayHits;
std::vector<std::vector<std::vector<size_t>>> eventArrayIndexHits;
size_t totalBytes = 0;
size_t systemEventCount = 0;
size_t systemEventBytes = 0;
void reset()
{
eventCounts.clear();
eventArrayHits.clear();
eventArrayIndexHits.clear();
totalBytes = 0;
}
@ -295,31 +295,58 @@ struct ManaCountingSink: public ManaPlugin
auto jRun = nlohmann::json::parse(descriptor_json);
reset();
eventCounts.resize(jRun["events"].size());
eventArrayHits.resize(eventCounts.size());
// TODO: resize nested vectors
eventArrayIndexHits.resize(eventCounts.size());
}
MANA_DEFINE_PLUGIN_END_RUN(end_run) override
{
(void)context;
(void)descriptor_json;
auto jRun = nlohmann::json::parse(descriptor_json);
spdlog::info("ManaCountingSink: eventCounts=[{}], totalBytes={}, systemEvents={}, "
"systemEventBytes={}",
fmt::join(eventCounts, ", "), totalBytes, systemEventCount, systemEventBytes);
for (size_t ei = 0; ei < eventArrayIndexHits.size(); ++ei)
{
spdlog::info("event[{}]: {} hits", ei, eventCounts[ei]);
for (size_t ai = 0; ai < eventArrayIndexHits[ei].size(); ++ai)
{
auto name = jRun["events"][ei]["outputs"][ai]["name"];
auto arrayHits = eventArrayIndexHits[ei][ai];
auto sumHits = std::accumulate(std::begin(arrayHits), std::end(arrayHits),
static_cast<size_t>(0u));
spdlog::info(" {}[{}]: [{}], sum={}", name, arrayHits.size(),
fmt::join(arrayHits, ", "), sumHits);
}
}
}
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) override
{
(void)context;
eventCounts.resize(std::max(eventCounts.size(), static_cast<size_t>(eventIndex + 1)));
eventArrayHits.resize(eventCounts.size());
eventArrayHits[eventIndex].resize(std::max(eventArrayHits[eventIndex].size(), arrayCount));
eventArrayIndexHits.resize(eventCounts.size());
eventArrayIndexHits[eventIndex].resize(
std::max(eventArrayIndexHits[eventIndex].size(), arrayCount));
++eventCounts[eventIndex];
for (size_t ai = 0; ai < arrayCount; ++ai)
{
auto input = get_span<float>(arrays[ai]);
bool hit = std::any_of(std::begin(input), std::end(input),
[](float v) { return !std::isnan(v); });
if (hit)
++eventArrayHits[eventIndex][ai];
eventArrayIndexHits[eventIndex][ai].resize(input.size());
for (size_t i = 0; i < input.size(); ++i)
{
if (!std::isnan(input[i]))
{
++eventArrayIndexHits[eventIndex][ai][i];
}
}
}
this->totalBytes += totalBytes;
}

View file

@ -4,9 +4,11 @@
#include <algorithm>
#include <cstring>
#include <list>
#include <mesytec-mnode/mnode_cpp_types.h>
#include <mesytec-mnode/mnode_math.h>
#include <mesytec-mvlc/util/fmt.h>
#include <numeric>
#include <vector>
namespace mesytec::mnode::mana
{
@ -15,7 +17,7 @@ class Arena
{
public:
static constexpr size_t initial_segment_size = 1u << 20;
static constexpr size_t default_pad = 8;
static constexpr size_t default_align_to = 8;
static constexpr size_t default_max_segments = 1;
Arena() = default;
@ -25,11 +27,11 @@ class Arena
Arena &operator=(Arena &&other) = default;
// pushes at least the required amount of memory onto the arena.
// 'required' is rounded up to the nearest multiple of 'default_pad' to make
// 'required' is rounded up to the nearest multiple of 'default_align_to' to make
// subsequent allocations aligned.
u8 *push_size(size_t required)
{
size_t padded = round_up(required, default_pad);
size_t padded = round_up(required, default_align_to);
size_t pad_waste = padded - required;
required = padded;

View file

@ -7,6 +7,7 @@
#include <mesytec-mvlc/util/data_filter.h>
#include <nlohmann/json.hpp>
#include "mana_api.h"
#include "mana_arena.h"
namespace mesytec::mnode::mana
{

View file

@ -1,8 +1,8 @@
#include "internal/mana_arena.h"
#include "internal/mana_lib.hpp"
#include <gtest/gtest.h>
#include <mesytec-mnode/mnode_cpp_types.h>
#include "internal/mana_arena.h"
#include <spdlog/spdlog.h>
#include "internal/mana_lib.hpp"
using namespace mesytec::mnode;
@ -18,35 +18,35 @@ TEST(mnode_mana_arena, push_reset_push)
for (size_t i = 0; i < 10; ++i)
{
auto mem = arena.push_size(mana::Arena::default_pad + 2);
auto mem = arena.push_size(mana::Arena::default_align_to + 2);
ASSERT_NE(mem, nullptr);
}
ASSERT_EQ(arena.allocations(), 10);
ASSERT_GE(arena.pad_waste(), 0);
ASSERT_GE(arena.segment_count(), 1);
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_pad + 2));
ASSERT_EQ(arena.used(), 10 * (mana::Arena::default_pad + 2));
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_align_to + 2));
ASSERT_EQ(arena.used(), 10 * (mana::Arena::default_align_to + 2));
arena.reset();
ASSERT_EQ(arena.allocations(), 0);
ASSERT_EQ(arena.pad_waste(), 0);
ASSERT_GE(arena.segment_count(), 1);
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_pad + 2));
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_align_to + 2));
ASSERT_EQ(arena.used(), 0);
for (size_t i = 0; i < 10; ++i)
{
auto mem = arena.push_size(mana::Arena::default_pad + 2);
auto mem = arena.push_size(mana::Arena::default_align_to + 2);
ASSERT_NE(mem, nullptr);
}
ASSERT_EQ(arena.allocations(), 10);
ASSERT_GE(arena.pad_waste(), 0);
ASSERT_GE(arena.segment_count(), 1);
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_pad + 2));
ASSERT_EQ(arena.used(), 10 * (mana::Arena::default_pad + 2));
ASSERT_GE(arena.capacity(), 10 * (mana::Arena::default_align_to + 2));
ASSERT_EQ(arena.used(), 10 * (mana::Arena::default_align_to + 2));
}
TEST(mnode_mana_offset_ptr, basic)
@ -93,8 +93,8 @@ TEST(mnode_mana_offset_ptr, strings)
auto s0 = arena.push_cstr("hello");
auto s1 = arena.push_cstr("world");
spdlog::info("&t->ptr0={}, &t->ptr1={}", fmt::ptr(&t->ptr0), fmt::ptr(&t->ptr1));
spdlog::info("s0={} @ {}, s1={} @ {}", s0, fmt::ptr(s0), s1, fmt::ptr(s1));
// spdlog::info("&t->ptr0={}, &t->ptr1={}", fmt::ptr(&t->ptr0), fmt::ptr(&t->ptr1));
// spdlog::info("s0={} @ {}, s1={} @ {}", s0, fmt::ptr(s0), s1, fmt::ptr(s1));
set(t->ptr0, s0);
set(t->ptr1, s1);

View file

@ -9,6 +9,7 @@ struct Context
MANA_DEFINE_PLUGIN_INIT(init)
{
log_set_level(LOG_DEBUG);
struct Context *ctx = calloc(1, sizeof(*ctx));
log_debug("init: ctx=%p", ctx);
return ctx;

108
src/mana_plugin_python.cc Normal file
View file

@ -0,0 +1,108 @@
#include "internal/mana_lib.hpp"
#include <mesytec-mvlc/cpp_compat.h>
extern "C"
{
#include "internal/rxi/log.h"
}
#include <pybind11/embed.h>
#include <pybind11/stl.h>
using namespace mesytec;
using namespace mesytec::mnode;
namespace py = pybind11;
PYBIND11_EMBEDDED_MODULE(py_mana, m) {}
namespace
{
struct Context
{
py::scoped_interpreter interp;
py::module usercode;
py::object userobject;
py::object py_begin_run;
py::object py_event_data;
py::object py_end_run;
std::vector<std::vector<py::object>> eventBuffers;
};
} // namespace
MANA_DEFINE_PLUGIN_INIT(init)
{
log_set_level(LOG_DEBUG);
static Context g_ctx;
auto ctx = &g_ctx;
log_debug("init: ctx=%p", ctx);
// TODO: pass args to init()
// TODO: catch exceptions
ctx->usercode = py::module::import("mana_python_test_plugin");
return ctx;
}
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
{
(void)context;
log_debug("shutdown");
}
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
{
log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json);
auto jRun = nlohmann::json::parse(descriptor_json);
auto ctx = reinterpret_cast<Context *>(context);
ctx->usercode.reload();
// TODO: check if the retrieved attributes are callable
ctx->py_begin_run = ctx->usercode.attr("begin_run");
ctx->py_event_data = ctx->usercode.attr("event_data");
ctx->py_end_run = ctx->usercode.attr("end_run");
ctx->py_begin_run(descriptor_json);
ctx->eventBuffers.clear();
ctx->eventBuffers.resize(jRun["events"].size());
for (size_t ei = 0; ei < jRun["events"].size(); ++ei)
{
auto &jEvent = jRun["events"][ei];
ctx->eventBuffers[ei].resize(jEvent["outputs"].size());
}
}
MANA_DEFINE_PLUGIN_END_RUN(end_run)
{
log_debug("end: context=%p, descriptor_json=%s", context, descriptor_json);
auto ctx = reinterpret_cast<Context *>(context);
ctx->py_end_run(descriptor_json);
}
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
{
log_trace("event: ctx=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", context, eventIndex,
arrayCount, totalBytes);
auto ctx = reinterpret_cast<Context *>(context);
auto &buffers = ctx->eventBuffers.at(eventIndex);
assert(buffers.size() == arrayCount);
for (size_t ai = 0; ai < arrayCount; ++ai)
{
auto arraySpan = mana::get_span<float>(arrays[ai]);
ctx->eventBuffers[eventIndex][ai] =
py::memoryview::from_memory(arraySpan.data(), arraySpan.size() * sizeof(float));
}
ctx->py_event_data(eventIndex, ctx->eventBuffers);
}
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event)
{
log_trace("system_event: ctx=%p, size=%zu", context, size);
auto ctx = reinterpret_cast<Context *>(context);
}
extern "C" MANA_DEFINE_GET_PLUGIN(mana_get_plugin)
{
mana_plugin_t plugin;
plugin.init = init;
plugin.shutdown = shutdown;
plugin.begin_run = begin_run;
plugin.end_run = end_run;
plugin.process_event = process_event;
plugin.process_system_event = process_system_event;
return plugin;
}

View file

@ -0,0 +1,216 @@
#include <TFile.h>
#include <TH1.h>
#include <regex>
#include <spdlog/spdlog.h>
#include <vector>
#include "internal/mana_lib.hpp"
extern "C"
{
#include "internal/rxi/log.h"
}
using namespace mesytec::mnode;
struct Context
{
std::unique_ptr<TFile> outputFile;
std::vector<std::vector<TH1 *>> hitCounts;
std::vector<std::vector<TH1 *>> rawHistograms;
};
MANA_DEFINE_PLUGIN_INIT(init)
{
log_set_level(LOG_DEBUG);
static Context g_ctx;
log_debug("init: ctx=%p", &g_ctx);
return &g_ctx;
}
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
{
(void)context;
log_debug("shutdown");
}
struct ObjectPath
{
std::string path;
std::string objectName;
std::vector<std::string> components;
ObjectPath(const std::string &path, const std::string &sep = "\\.")
: path(path)
{
std::regex re(sep);
std::sregex_token_iterator it(path.begin(), path.end(), re, -1);
std::copy(it, std::sregex_token_iterator(), std::back_inserter(components));
if (!components.empty())
{
objectName = components.back();
components.pop_back();
}
spdlog::debug("ObjectPath: objectName={}, components={}", objectName,
fmt::join(components, ", "));
}
};
template <typename It> TDirectory *root_mkpath_cd(It begin, It end)
{
TDirectory *result = nullptr;
for (auto it = begin; it != end; ++it)
{
result = gDirectory->mkdir(it->c_str(), "", true);
result->cd();
}
return result;
}
inline std::vector<std::vector<TH1 *>> make_hitcount_histos(TDirectory *baseDir,
const nlohmann::json &jRun)
{
std::vector<std::vector<TH1 *>> result;
for (const auto &jEvent: jRun["events"])
{
std::vector<TH1 *> eventHistos;
for (const auto &jOutput: jEvent["outputs"])
{
auto size = jOutput["size"].get<size_t>();
ObjectPath path(jOutput["name"].get<std::string>());
path.components.emplace_back("hit_counts");
baseDir->cd();
root_mkpath_cd(path.components.begin(), path.components.end());
auto histoName = fmt::format("{}_hits", path.path);
auto histoTitle = fmt::format("Hit Counts {}", path.path);
auto th1 = new TH1F(histoName.c_str(), histoTitle.c_str(), size, 0.0, size);
th1->GetXaxis()->SetTitle("Index");
th1->GetYaxis()->SetTitle("Hits");
eventHistos.emplace_back(std::move(th1));
}
result.emplace_back(std::move(eventHistos));
}
return result;
}
inline std::vector<std::vector<TH1 *>> make_raw_histos(TDirectory *baseDir,
const nlohmann::json &jRun)
{
std::vector<std::vector<TH1 *>> result;
for (const auto &jEvent: jRun["events"])
{
std::vector<TH1 *> eventHistos;
for (const auto &jOutput: jEvent["outputs"])
{
auto histoCount = jOutput["size"].get<size_t>();
auto bits = jOutput["bits"].get<unsigned>();
auto bins = 1u << std::min(bits, 16u);
auto maxValue = std::pow(2.0, bits);
ObjectPath path(jOutput["name"].get<std::string>());
spdlog::info("path={}, bins={}, maxValue={}", path.path, bins, maxValue);
baseDir->cd();
root_mkpath_cd(path.components.begin(), path.components.end());
for (size_t i = 0; i < histoCount; ++i)
{
auto histoName = fmt::format("{}_{:04d}", path.objectName, i);
auto histoTitle = fmt::format("{}[{}]", path.path, i);
auto th1 = new TH1F(histoName.c_str(), histoTitle.c_str(), bins, 0.0, maxValue);
eventHistos.emplace_back(std::move(th1));
}
}
result.emplace_back(std::move(eventHistos));
}
return result;
}
inline std::string histo_info(const std::vector<std::vector<TH1 *>> &histos)
{
size_t histoCount = 0;
size_t histoMem = 0;
for (size_t ei = 0; ei < histos.size(); ++ei)
{
for (size_t mi = 0; mi < histos[ei].size(); ++mi)
{
auto histo = histos[ei][mi];
histoCount++;
histoMem += histo->GetNbinsX() * sizeof(float);
}
}
return fmt::format("histoCount={}, histoMem={} MiB", histoCount, histoMem / (1024.0 * 1024));
}
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
{
log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json);
auto jRun = nlohmann::json::parse(descriptor_json);
auto ctx = reinterpret_cast<Context *>(context);
ctx->hitCounts.clear();
ctx->outputFile = std::make_unique<TFile>("output.root", "RECREATE");
ctx->hitCounts = make_hitcount_histos(ctx->outputFile.get(), jRun);
ctx->rawHistograms = make_raw_histos(ctx->outputFile.get(), jRun);
spdlog::info("hitCount histograms: {}", histo_info(ctx->hitCounts));
spdlog::info("raw histograms: {}", histo_info(ctx->rawHistograms));
}
MANA_DEFINE_PLUGIN_END_RUN(end_run)
{
log_debug("end: context=%p, descriptor_json=%s", context, descriptor_json);
auto ctx = reinterpret_cast<Context *>(context);
ctx->outputFile->Write();
*ctx = {};
}
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
{
log_trace("event: ctx=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", context, eventIndex,
arrayCount, totalBytes);
auto ctx = reinterpret_cast<Context *>(context);
auto &hitCountHistograms = ctx->hitCounts.at(eventIndex);
auto &rawHistograms = ctx->rawHistograms.at(eventIndex);
size_t rawHistoIndex = 0;
for (size_t ai = 0; ai < arrayCount; ++ai)
{
auto &hitCountHisto = hitCountHistograms.at(ai);
auto input = mana::get_span<float>(arrays[ai]);
for (size_t i = 0; i < input.size(); ++i)
{
if (!std::isnan(input[i]))
{
hitCountHisto->Fill(i);
rawHistograms.at(rawHistoIndex + i)->Fill(input[i]);
}
}
rawHistoIndex += input.size();
}
}
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event)
{
log_trace("system_event: ctx=%p, size=%zu", context, size);
}
extern "C" MANA_DEFINE_GET_PLUGIN(mana_get_plugin)
{
mana_plugin_t plugin;
plugin.init = init;
plugin.shutdown = shutdown;
plugin.begin_run = begin_run;
plugin.end_run = end_run;
plugin.process_event = process_event;
plugin.process_system_event = process_system_event;
return plugin;
}

View file

@ -0,0 +1,13 @@
import json
import py_mana
def begin_run(runDescription: str):
print("begin_run")
def end_run(runDescription: str):
print("end_run")
def event_data(eventIndex: int, dataArrays):
return
print(f"event[{eventIndex}]: {dataArrays}")

View file

@ -1,6 +1,6 @@
#include <mesytec-mnode/mnode_nng_proto.h>
#include <mesytec-mnode/mnode_nng.h>
#include <mesytec-mnode/mnode_cpp_types.h>
#include <mesytec-mnode/mnode_nng.h>
#include <mesytec-mnode/mnode_nng_proto.h>
namespace mesytec::mnode::nng
{
@ -9,10 +9,11 @@ size_t serialize_proto_to_nng(const google::protobuf::MessageLite &message, nng_
{
auto messageSize = message.ByteSizeLong();
if (auto res = nng_msg_realloc(msg, nng_msg_len(msg) + sizeof(u32) + messageSize))
if (nng_msg_realloc(msg, nng_msg_len(msg) + sizeof(u32) + messageSize))
return 0;
*reinterpret_cast<u32 *>(nng_msg_body(msg)) = messageSize;
if (!message.SerializeToArray(reinterpret_cast<u8 *>(nng_msg_body(msg)) + sizeof(u32),
messageSize))
{

View file

@ -31,18 +31,22 @@
// - create root trees or the new rntuples(?)
// -> want plugins. similar to the mvme listfile_reader but on analysis data
#include <atomic>
#include <boost/dll.hpp>
#include <cmrc/cmrc.hpp> // mnode::resources
#include <mesytec-mvlc/mesytec-mvlc.h>
#include <list>
#include <mesytec-mnode/mnode_cpp_types.h>
#include <mesytec-mnode/mnode_math.h>
#include <mesytec-mnode/mnode_nng.h>
#include <mesytec-mvlc/mesytec-mvlc.h>
#include <mutex>
#include <nlohmann/json.hpp>
#include <list>
#include <sstream>
#include "internal/argh.h"
#include "internal/mana_analysis.h"
#include "internal/mana_arena.h"
#include "internal/mana_lib.hpp"
#include "internal/mana_analysis.h"
CMRC_DECLARE(mnode::resources);
@ -135,9 +139,17 @@ std::optional<ParserContext> make_parser_context(const mvlc::CrateConfig &crateC
}
}
size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
ParserContext &parserContext, mana::ModuleDataStage &analysisContext)
struct ProcessingStrategy
{
virtual ~ProcessingStrategy() = default;
virtual void run(ListfileContext &listfileContext, ParserContext &parserContext) = 0;
};
struct SingleThreadedStrategy: public ProcessingStrategy
{
inline static size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
ParserContext &parserContext)
{
listfileContext.readerHelper.destBuf().clear();
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);
@ -145,13 +157,229 @@ size_t process_one_buffer(size_t bufferNumber, ListfileContext &listfileContext,
return 0;
auto bufferView = buffer->viewU32();
parserContext.parser.userContext = &analysisContext;
mvlc::readout_parser::parse_readout_buffer(
listfileContext.readerHelper.bufferFormat, parserContext.parser, parserContext.callbacks,
parserContext.counters, bufferNumber, bufferView.data(), bufferView.size());
mvlc::readout_parser::parse_readout_buffer(listfileContext.readerHelper.bufferFormat,
parserContext.parser, parserContext.callbacks,
parserContext.counters, bufferNumber,
bufferView.data(), bufferView.size());
return buffer->used();
}
void run(ListfileContext &listfileContext, ParserContext &parserContext) override
{
size_t bufferNumber = 1;
size_t totalBytesProcessed = 0;
size_t bytesProcessed = 0;
const std::chrono::milliseconds ReportInterval(500);
mvlc::util::Stopwatch sw;
auto report = [&]
{
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
{
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
auto bytesPerSecond = totalBytesProcessed / s;
auto MiBPerSecond = bytesPerSecond / (1u << 20);
std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n",
bufferNumber, totalBytesProcessed, s, MiBPerSecond);
}(sw, bufferNumber, totalBytesProcessed);
};
do
{
bytesProcessed = process_one_buffer(bufferNumber, listfileContext, parserContext);
totalBytesProcessed += bytesProcessed;
++bufferNumber;
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
{
report();
sw.interval();
}
}
while (bytesProcessed > 0);
report();
}
};
struct NngPairStrategy: public ProcessingStrategy
{
struct SocketCounters
{
size_t bytes;
size_t messages;
};
nng_socket producerSocket;
nng_socket consumerSocket;
SocketCounters producerTx;
SocketCounters consumerRx;
// std::mutex producerTxMutex;
explicit NngPairStrategy(const std::string &url_)
{
producerSocket = nng::make_pair_socket();
consumerSocket = nng::make_pair_socket();
if (int res = nng_listen(producerSocket, url_.c_str(), nullptr, 0))
throw std::runtime_error(
fmt::format("error listening on '{}': {}", url_, nng_strerror(res)));
if (int res = nng_dial(consumerSocket, url_.c_str(), nullptr, 0))
throw std::runtime_error(
fmt::format("error connecting to '{}': {}", url_, nng_strerror(res)));
};
~NngPairStrategy()
{
nng_close(consumerSocket);
nng_close(producerSocket);
}
// message format is: u32 bufferNumber, u32 bufferSize, u32 bufferData[bufferSize]
void producer(ListfileContext &listfileContext, std::atomic<bool> &quit)
{
size_t bufferNumber = 1;
while (!quit)
{
listfileContext.readerHelper.destBuf().clear();
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);
if (!buffer->used())
break;
auto bufferView = buffer->viewU32();
auto msg = nng::allocate_message(2 * sizeof(u32) + bufferView.size() * sizeof(u32));
size_t msgLen = nng_msg_len(msg.get());
auto data = reinterpret_cast<u32 *>(nng_msg_body(msg.get()));
*(data + 0) = bufferNumber++;
*(data + 1) = static_cast<u32>(bufferView.size());
std::copy(std::begin(bufferView), std::end(bufferView), data + 2);
int res = 0;
do
{
if ((res = nng::send_message(producerSocket, msg)))
{
if (res != NNG_ETIMEDOUT)
{
spdlog::error("NngPairStrategy: error sending message: {}",
nng_strerror(res));
return;
}
}
// std::lock_guard<std::mutex> lock(producerTxMutex);
producerTx.bytes += msgLen;
++producerTx.messages;
}
while (res == NNG_ETIMEDOUT);
}
nng::send_empty_message(producerSocket);
}
void consumer(mvlc::ConnectionType bufferFormat, ParserContext &parserContext)
{
size_t bufferNumber = 1;
size_t totalBytesProcessed = 0;
const std::chrono::milliseconds ReportInterval(500);
mvlc::util::Stopwatch sw;
auto report = [&]
{
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
{
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
auto bytesPerSecond = totalBytesProcessed / s;
auto MiBPerSecond = bytesPerSecond / (1u << 20);
std::cout << fmt::format(
"Processed {} buffers, {} bytes. t={} s, rate={:.2f} MiB/s\n", bufferNumber,
totalBytesProcessed, s, MiBPerSecond);
}(sw, bufferNumber, totalBytesProcessed);
};
while (true)
{
auto [msg, res] = nng::receive_message(consumerSocket);
if (res && res != NNG_ETIMEDOUT)
{
spdlog::error("NngPairStrategy: error receiving message: {}", nng_strerror(res));
return;
}
auto data =
mvlc::util::span<const u32>(reinterpret_cast<const u32 *>(nng_msg_body(msg.get())),
nng_msg_len(msg.get()) / sizeof(u32));
if (data.size() < 2)
break;
bufferNumber = data[0];
size_t bufferSize = data[1];
if (data.size() != bufferSize + 2)
{
spdlog::error("NngPairStrategy: invalid message size: {}", data.size());
return;
}
std::basic_string_view<u32> bufferView(data.data() + 2, bufferSize);
mvlc::readout_parser::parse_readout_buffer(
bufferFormat, parserContext.parser, parserContext.callbacks, parserContext.counters,
bufferNumber, bufferView.data(), bufferView.size());
consumerRx.bytes += nng_msg_len(msg.get());
++consumerRx.messages;
totalBytesProcessed += bufferView.size() * sizeof(u32);
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
{
report();
sw.interval();
}
}
report();
spdlog::debug("NngPairStrategy: producerTx: {:.2f} MiB, {} messages",
producerTx.bytes / (1024.0 * 1024), producerTx.messages);
spdlog::debug("NngPairStrategy: consumerRx: {:.2f} MiB, {} messages",
consumerRx.bytes / (1024.0 * 1024), consumerRx.messages);
}
void run(ListfileContext &listfileContext, ParserContext &parserContext) override
{
auto bufferFormat = listfileContext.readerHelper.bufferFormat;
std::atomic<bool> quitProducer = false;
producerTx = {};
consumerRx = {};
std::thread producerThread([this, &listfileContext, &quitProducer]
{ producer(listfileContext, quitProducer); });
consumer(bufferFormat, parserContext);
quitProducer = true;
if (producerThread.joinable())
producerThread.join();
}
};
void usage(const char *self)
{
std::cout << fmt::format(
"usage: {} [--plugin=<plugin.so>] [--processing-strategy=<name>] <zipfile>\n", self);
std::cout
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a "
"simple counting plugin.\n"
<< " --processing-strategy=<name> Use a specific processing strategy. "
"Available: 'direct', 'nng-pair'. Default: 'direct'\n"
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
"'off'.\n";
}
int main(int argc, char *argv[])
@ -159,17 +387,24 @@ int main(int argc, char *argv[])
auto f = cmrc::mnode::resources::get_filesystem().open("data/vme_module_data_sources.json");
const auto jModuleDataSources = nlohmann::json::parse(f.begin(), f.end());
argh::parser parser({"-h", "--help", "--plugin"});
parser.parse(argv);
argh::parser parser({"-h", "--help", "--plugin", "--processing-strategy", "--log-level"});
parser.parse(argc, argv);
auto filename = parser[1];
if (parser["-h"] || parser["--help"] || filename.empty())
{
std::cout << fmt::format("usage: {} [--plugin=<plugin.so>] <zipfile>\n", argv[0]);
usage(argv[0]);
return 0;
}
spdlog::set_level(spdlog::level::info);
if (parser("--log-level"))
{
spdlog::set_level(spdlog::level::from_str(parser("--log-level").str()));
}
auto listfileContext = make_listfile_context(filename);
if (!listfileContext)
@ -201,6 +436,23 @@ int main(int argc, char *argv[])
manaPlugin = std::make_unique<mana::ManaCountingSink>();
}
std::string strategyName = "direct";
if (parser("--processing-strategy"))
strategyName = parser("--processing-strategy").str();
std::unique_ptr<ProcessingStrategy> strategy; //= std::make_unique<SingleThreadedStrategy>();
if (strategyName == "nng-pair")
strategy = std::make_unique<NngPairStrategy>("inproc://mana_module_data_stage");
else if (strategyName == "direct")
strategy = std::make_unique<SingleThreadedStrategy>();
else
{
std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName);
usage(argv[0]);
return 1;
}
auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig,
jModuleDataSources, manaPlugin.get(), nullptr);
@ -226,64 +478,16 @@ int main(int argc, char *argv[])
if (!parserContext)
return 1;
// make the analysis instance available to the parser callbacks
parserContext->parser.userContext = &mana;
mana.sinkContext = mana.sink->init();
mana.sink->begin_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
size_t bufferNumber = 0;
size_t totalBytesProcessed = 0;
size_t bytesProcessed = 0;
const std::chrono::milliseconds ReportInterval(500);
mvlc::util::Stopwatch sw;
auto report = [&]
{
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
{
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
auto bytesPerSecond = totalBytesProcessed / s;
auto MiBPerSecond = bytesPerSecond / (1u << 20);
std::cout << fmt::format("Processed {} buffers, {} bytes. t={} s, rate={} MiB/s\n",
bufferNumber, totalBytesProcessed, s, MiBPerSecond);
}(sw, bufferNumber, totalBytesProcessed);
};
do
{
bytesProcessed = process_one_buffer(bufferNumber, *listfileContext, *parserContext, mana);
totalBytesProcessed += bytesProcessed;
++bufferNumber;
if (auto elapsed = sw.get_interval(); elapsed >= ReportInterval)
{
report();
sw.interval();
}
}
while (bytesProcessed > 0);
report();
strategy->run(*listfileContext, *parserContext);
mana.sink->end_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
mana.sink->shutdown(mana.sinkContext);
if (auto cp = dynamic_cast<mana::ManaCountingSink *>(manaPlugin.get()))
{
spdlog::info("ManaCountingSink: eventCounts=[{}], totalBytes={}, systemEvents={}, "
"systemEventBytes={}",
fmt::join(cp->eventCounts, ", "), cp->totalBytes, cp->systemEventCount,
cp->systemEventBytes);
for (size_t ei = 0; ei < cp->eventArrayHits.size(); ++ei)
{
spdlog::info("event[{}]: {} hits", ei, cp->eventCounts[ei]);
for (size_t ai = 0; ai < cp->eventArrayHits[ei].size(); ++ai)
{
auto name = mana.runDescriptor["events"][ei]["outputs"][ai]["name"];
spdlog::info(" {}: {}", name, cp->eventArrayHits[ei][ai]);
}
}
}
return 0;
}