Compare commits
19 commits
bfce5e76ab
...
a7b5d50533
Author | SHA1 | Date | |
---|---|---|---|
|
a7b5d50533 | ||
|
48d0a3d5c0 | ||
|
eedd95bd2f | ||
|
47b8977807 | ||
|
47926ce5e1 | ||
|
a9bfcf151c | ||
|
aa9d21d81c | ||
|
c82671490a | ||
|
bbd52cbde0 | ||
|
1698987311 | ||
|
00337c6af8 | ||
|
f5fc1d49e7 | ||
|
81023bbde2 | ||
|
ee10c7ce70 | ||
|
714e0c4f3a | ||
|
d8eb73671d | ||
|
8c1e122e06 | ||
|
59b11c94a7 | ||
|
9c76aa93bf |
15 changed files with 449 additions and 120 deletions
19
include/mesytec-mnode/mnode_string.hpp
Normal file
19
include/mesytec-mnode/mnode_string.hpp
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
#ifndef E7FD4517_4FF5_4BDA_9AE4_95308A37554D
|
||||||
|
#define E7FD4517_4FF5_4BDA_9AE4_95308A37554D
|
||||||
|
|
||||||
|
#include <regex>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
namespace mesytec::mnode
|
||||||
|
{
|
||||||
|
|
||||||
|
template <typename OutputIt>
|
||||||
|
void split_string(const std::string &str, const std::regex &re, OutputIt out)
|
||||||
|
{
|
||||||
|
std::sregex_token_iterator it(str.begin(), str.end(), re, -1);
|
||||||
|
std::copy(it, std::sregex_token_iterator(), out);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace mesytec::mnode
|
||||||
|
|
||||||
|
#endif /* E7FD4517_4FF5_4BDA_9AE4_95308A37554D */
|
|
@ -24,13 +24,16 @@ if (MNODE_BUILD_TESTS)
|
||||||
target_link_libraries(test_mana PRIVATE mana)
|
target_link_libraries(test_mana PRIVATE mana)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
|
||||||
add_library(rxi-logc INTERFACE)
|
add_library(rxi-logc INTERFACE)
|
||||||
target_sources(rxi-logc INTERFACE internal/rxi/log.c)
|
target_sources(rxi-logc INTERFACE internal/rxi/log.c)
|
||||||
|
target_compile_options(rxi-logc INTERFACE -DLOG_USE_COLOR)
|
||||||
|
|
||||||
add_library(mana-plugin-c-test SHARED mana_plugin_c_test.c)
|
add_library(mana-plugin-c-test SHARED mana_plugin_c_test.c)
|
||||||
target_link_libraries(mana-plugin-c-test PRIVATE rxi-logc)
|
target_link_libraries(mana-plugin-c-test PRIVATE rxi-logc)
|
||||||
|
|
||||||
|
add_library(mana-plugin-cpp-test SHARED mana_plugin_cpp_test.cc)
|
||||||
|
target_link_libraries(mana-plugin-cpp-test PRIVATE rxi-logc)
|
||||||
|
|
||||||
find_package(ROOT COMPONENTS Hist)
|
find_package(ROOT COMPONENTS Hist)
|
||||||
if (ROOT_FOUND)
|
if (ROOT_FOUND)
|
||||||
message("-- Using ROOT installation from ${ROOT_USE_FILE}")
|
message("-- Using ROOT installation from ${ROOT_USE_FILE}")
|
||||||
|
|
|
@ -47,7 +47,7 @@ struct ModuleDataStage
|
||||||
std::vector<std::vector<nlohmann::json>> moduleInfo;
|
std::vector<std::vector<nlohmann::json>> moduleInfo;
|
||||||
RunInfo runInfo;
|
RunInfo runInfo;
|
||||||
nlohmann::json runDescriptor;
|
nlohmann::json runDescriptor;
|
||||||
ManaPlugin *sink = nullptr;
|
IManaSink *sink = nullptr;
|
||||||
void *sinkContext = nullptr;
|
void *sinkContext = nullptr;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -196,7 +196,7 @@ inline nlohmann::json make_run_descriptor(const RunInfo &runInfo)
|
||||||
|
|
||||||
inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::Arena &&arena,
|
inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::Arena &&arena,
|
||||||
const mvlc::CrateConfig &crateConfig,
|
const mvlc::CrateConfig &crateConfig,
|
||||||
nlohmann::json moduleDb, ManaPlugin *sink,
|
nlohmann::json moduleDb, IManaSink *sink,
|
||||||
void *sinkContext)
|
void *sinkContext)
|
||||||
{
|
{
|
||||||
ModuleDataStage result;
|
ModuleDataStage result;
|
||||||
|
@ -210,7 +210,7 @@ inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::
|
||||||
allocate_outputs(result.arena, result.runInfo);
|
allocate_outputs(result.arena, result.runInfo);
|
||||||
result.runDescriptor = make_run_descriptor(result.runInfo);
|
result.runDescriptor = make_run_descriptor(result.runInfo);
|
||||||
|
|
||||||
spdlog::info("ModuleDataStage: runDescriptor={}", result.runDescriptor.dump(2));
|
spdlog::info("ModuleDataStage: runDescriptor={}", result.runDescriptor.dump());
|
||||||
spdlog::info("ModuleDataStage: arena stats={}", arena_stats(result.arena));
|
spdlog::info("ModuleDataStage: arena stats={}", arena_stats(result.arena));
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
@ -218,12 +218,12 @@ inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::
|
||||||
|
|
||||||
inline void module_data_stage_begin_run(ModuleDataStage &ctx)
|
inline void module_data_stage_begin_run(ModuleDataStage &ctx)
|
||||||
{
|
{
|
||||||
ctx.sink->begin_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str());
|
ctx.sink->begin_run(ctx.runDescriptor.dump().c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void module_data_stage_end_run(ModuleDataStage &ctx)
|
inline void module_data_stage_end_run(ModuleDataStage &ctx)
|
||||||
{
|
{
|
||||||
ctx.sink->end_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str());
|
ctx.sink->end_run(ctx.runDescriptor.dump().c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void
|
inline void
|
||||||
|
@ -262,22 +262,23 @@ module_data_stage_process_module_data(ModuleDataStage &ctx, int eventIndex,
|
||||||
extract_module_data(inputData, extractors.at(oi), spans.at(oi));
|
extract_module_data(inputData, extractors.at(oi), spans.at(oi));
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.sink->process_event(ctx.sinkContext, eventIndex, eventInfo.outputArrays.front(),
|
ctx.sink->process_event(eventIndex, eventInfo.outputArrays.front(),
|
||||||
eventInfo.outputArrays.size(), eventInfo.sizeBytes);
|
eventInfo.outputArrays.size(), eventInfo.sizeBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void module_data_stage_process_system_event(ModuleDataStage &ctx, const u32 *data, u32 size)
|
inline void module_data_stage_process_system_event(ModuleDataStage &ctx, const u32 *data, u32 size)
|
||||||
{
|
{
|
||||||
ctx.sink->process_system_event(ctx.sinkContext, data, size);
|
ctx.sink->process_system_event(data, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ManaCountingSink: public ManaPlugin
|
struct ManaCountingSink: public IManaSink
|
||||||
{
|
{
|
||||||
std::vector<size_t> eventCounts;
|
std::vector<size_t> eventCounts;
|
||||||
std::vector<std::vector<std::vector<size_t>>> eventArrayIndexHits;
|
std::vector<std::vector<std::vector<size_t>>> eventArrayIndexHits;
|
||||||
size_t totalBytes = 0;
|
size_t totalBytes = 0;
|
||||||
size_t systemEventCount = 0;
|
size_t systemEventCount = 0;
|
||||||
size_t systemEventBytes = 0;
|
size_t systemEventBytes = 0;
|
||||||
|
|
||||||
void reset()
|
void reset()
|
||||||
{
|
{
|
||||||
eventCounts.clear();
|
eventCounts.clear();
|
||||||
|
@ -285,13 +286,10 @@ struct ManaCountingSink: public ManaPlugin
|
||||||
totalBytes = 0;
|
totalBytes = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_INIT(init) override { return nullptr; }
|
void init(int, const char **) override {}
|
||||||
|
void shutdown() override {}
|
||||||
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) override { (void)context; }
|
void begin_run(const char *descriptor_json) override
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) override
|
|
||||||
{
|
{
|
||||||
(void)context;
|
|
||||||
auto jRun = nlohmann::json::parse(descriptor_json);
|
auto jRun = nlohmann::json::parse(descriptor_json);
|
||||||
reset();
|
reset();
|
||||||
eventCounts.resize(jRun["events"].size());
|
eventCounts.resize(jRun["events"].size());
|
||||||
|
@ -299,9 +297,8 @@ struct ManaCountingSink: public ManaPlugin
|
||||||
eventArrayIndexHits.resize(eventCounts.size());
|
eventArrayIndexHits.resize(eventCounts.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_END_RUN(end_run) override
|
void end_run(const char *descriptor_json) override
|
||||||
{
|
{
|
||||||
(void)context;
|
|
||||||
auto jRun = nlohmann::json::parse(descriptor_json);
|
auto jRun = nlohmann::json::parse(descriptor_json);
|
||||||
|
|
||||||
spdlog::info("ManaCountingSink: eventCounts=[{}], totalBytes={}, systemEvents={}, "
|
spdlog::info("ManaCountingSink: eventCounts=[{}], totalBytes={}, systemEvents={}, "
|
||||||
|
@ -323,9 +320,9 @@ struct ManaCountingSink: public ManaPlugin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) override
|
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
|
||||||
|
size_t totalBytes) override
|
||||||
{
|
{
|
||||||
(void)context;
|
|
||||||
eventCounts.resize(std::max(eventCounts.size(), static_cast<size_t>(eventIndex + 1)));
|
eventCounts.resize(std::max(eventCounts.size(), static_cast<size_t>(eventIndex + 1)));
|
||||||
eventArrayIndexHits.resize(eventCounts.size());
|
eventArrayIndexHits.resize(eventCounts.size());
|
||||||
eventArrayIndexHits[eventIndex].resize(
|
eventArrayIndexHits[eventIndex].resize(
|
||||||
|
@ -350,9 +347,8 @@ struct ManaCountingSink: public ManaPlugin
|
||||||
this->totalBytes += totalBytes;
|
this->totalBytes += totalBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) override
|
void process_system_event(const uint32_t *data, size_t size) override
|
||||||
{
|
{
|
||||||
(void)context;
|
|
||||||
(void)data;
|
(void)data;
|
||||||
++systemEventCount;
|
++systemEventCount;
|
||||||
systemEventBytes += size * sizeof(u32);
|
systemEventBytes += size * sizeof(u32);
|
||||||
|
|
62
src/internal/mana_api.hpp
Normal file
62
src/internal/mana_api.hpp
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
#ifndef A042D3CF_5927_42A2_93FF_E49E940B32BD
|
||||||
|
#define A042D3CF_5927_42A2_93FF_E49E940B32BD
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
#include <regex>
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "mana_c_api.h"
|
||||||
|
|
||||||
|
namespace mesytec::mnode::mana
|
||||||
|
{
|
||||||
|
|
||||||
|
class IManaSink
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
virtual ~IManaSink() = default;
|
||||||
|
|
||||||
|
virtual void init(int plugin_argc, const char **plugin_argv) = 0;
|
||||||
|
virtual void shutdown() = 0;
|
||||||
|
virtual void begin_run(const char *descriptor_json) = 0;
|
||||||
|
virtual void end_run(const char *descriptor_json) = 0;
|
||||||
|
virtual void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
|
||||||
|
size_t totalBytes) = 0;
|
||||||
|
|
||||||
|
virtual void process_system_event(const uint32_t *data, size_t size) = 0;
|
||||||
|
|
||||||
|
template <typename StringHolder> void init(StringHolder args)
|
||||||
|
{
|
||||||
|
std::vector<const char *> cargs(args.size());
|
||||||
|
std::transform(args.begin(), args.end(), cargs.begin(),
|
||||||
|
[](const std::string &s) { return s.c_str(); });
|
||||||
|
init(cargs.size(), cargs.data());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
IManaSink() = default;
|
||||||
|
|
||||||
|
private:
|
||||||
|
IManaSink(const IManaSink &) = delete;
|
||||||
|
IManaSink &operator=(const IManaSink &) = delete;
|
||||||
|
};
|
||||||
|
|
||||||
|
class IManaPlugin
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
virtual ~IManaPlugin() = default;
|
||||||
|
virtual std::unique_ptr<IManaSink> makeSink() = 0;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
IManaPlugin() = default;
|
||||||
|
|
||||||
|
private:
|
||||||
|
IManaPlugin(const IManaPlugin &) = delete;
|
||||||
|
IManaPlugin &operator=(const IManaPlugin &) = delete;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace mesytec::mnode::mana
|
||||||
|
|
||||||
|
#define MANA_CPP_PLUGIN extern "C" mana::IManaPlugin *mana_get_plugin
|
||||||
|
|
||||||
|
#endif /* A042D3CF_5927_42A2_93FF_E49E940B32BD */
|
|
@ -1,8 +1,8 @@
|
||||||
#ifndef A51A04C1_ABD6_4DE9_B16A_49A9DA46C67E
|
#ifndef A51A04C1_ABD6_4DE9_B16A_49A9DA46C67E
|
||||||
#define A51A04C1_ABD6_4DE9_B16A_49A9DA46C67E
|
#define A51A04C1_ABD6_4DE9_B16A_49A9DA46C67E
|
||||||
|
|
||||||
#include <stdint.h>
|
|
||||||
#include <stddef.h>
|
#include <stddef.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C"
|
extern "C"
|
||||||
|
@ -33,7 +33,7 @@ extern "C"
|
||||||
size_t size_bytes;
|
size_t size_bytes;
|
||||||
} mana_offset_array_t;
|
} mana_offset_array_t;
|
||||||
|
|
||||||
#define MANA_DEFINE_PLUGIN_INIT(name) void *name()
|
#define MANA_DEFINE_PLUGIN_INIT(name) void *name(int plugin_argc, const char **plugin_argv)
|
||||||
|
|
||||||
#define MANA_DEFINE_PLUGIN_SHUTDOWN(name) void name(void *context)
|
#define MANA_DEFINE_PLUGIN_SHUTDOWN(name) void name(void *context)
|
||||||
|
|
||||||
|
@ -63,11 +63,11 @@ extern "C"
|
||||||
mana_end_run_t *end_run;
|
mana_end_run_t *end_run;
|
||||||
mana_process_event_t *process_event;
|
mana_process_event_t *process_event;
|
||||||
mana_process_system_event_t *process_system_event;
|
mana_process_system_event_t *process_system_event;
|
||||||
} mana_plugin_t;
|
} mana_sink_plugin_t;
|
||||||
|
|
||||||
// plugins need to define this function with the name 'mana_get_plugin'
|
// use this to define the entry point into the plugin
|
||||||
#define MANA_DEFINE_GET_PLUGIN(name) mana_plugin_t name()
|
// from c++: extern "C" MANA_C_SINK_PLUGIN() { ... return plugin; }
|
||||||
typedef MANA_DEFINE_GET_PLUGIN(mana_get_plugin_t);
|
#define MANA_C_SINK_PLUGIN mana_sink_plugin_t mana_get_sink_plugin
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
|
@ -1,13 +1,15 @@
|
||||||
#ifndef AAB5E4D2_A05B_4F2F_B76A_406A5A569D55
|
#ifndef AAB5E4D2_A05B_4F2F_B76A_406A5A569D55
|
||||||
#define AAB5E4D2_A05B_4F2F_B76A_406A5A569D55
|
#define AAB5E4D2_A05B_4F2F_B76A_406A5A569D55
|
||||||
|
|
||||||
|
#include "mana_api.hpp"
|
||||||
|
#include "mana_arena.h"
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
#include <chrono>
|
||||||
#include <mesytec-mnode/mnode_cpp_types.h>
|
#include <mesytec-mnode/mnode_cpp_types.h>
|
||||||
|
#include <mesytec-mnode/mnode_string.hpp>
|
||||||
#include <mesytec-mvlc/cpp_compat.h>
|
#include <mesytec-mvlc/cpp_compat.h>
|
||||||
#include <mesytec-mvlc/util/data_filter.h>
|
#include <mesytec-mvlc/util/data_filter.h>
|
||||||
#include <nlohmann/json.hpp>
|
#include <nlohmann/json.hpp>
|
||||||
#include "mana_api.h"
|
|
||||||
#include "mana_arena.h"
|
|
||||||
|
|
||||||
namespace mesytec::mnode::mana
|
namespace mesytec::mnode::mana
|
||||||
{
|
{
|
||||||
|
@ -126,56 +128,129 @@ inline nlohmann::json make_array_descriptor(const std::string &name, const std::
|
||||||
return make_array_descriptor(name, mana_float, size, bits);
|
return make_array_descriptor(name, mana_float, size, bits);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ManaPlugin
|
class ManaSinkPerfProxy: public IManaSink
|
||||||
{
|
{
|
||||||
virtual ~ManaPlugin() = default;
|
public:
|
||||||
virtual MANA_DEFINE_PLUGIN_INIT(init) = 0;
|
struct Perf
|
||||||
virtual MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) = 0;
|
{
|
||||||
virtual MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) = 0;
|
using clock = std::chrono::high_resolution_clock;
|
||||||
virtual MANA_DEFINE_PLUGIN_END_RUN(end_run) = 0;
|
using time_point = std::chrono::time_point<clock>;
|
||||||
virtual MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) = 0;
|
using duration = std::chrono::microseconds;
|
||||||
virtual MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) = 0;
|
template <typename T> static duration duration_cast(const T &dt)
|
||||||
|
{
|
||||||
|
return std::chrono::duration_cast<duration>(dt);
|
||||||
|
}
|
||||||
|
std::vector<size_t> eventHits;
|
||||||
|
std::vector<size_t> eventBytes;
|
||||||
|
duration dt_init;
|
||||||
|
duration dt_shutdown;
|
||||||
|
duration dt_beginRun;
|
||||||
|
duration dt_endRun;
|
||||||
|
std::vector<duration> dt_processEvent;
|
||||||
|
time_point t_beginRun;
|
||||||
|
time_point t_endRun;
|
||||||
|
};
|
||||||
|
|
||||||
|
explicit ManaSinkPerfProxy(IManaSink *sink)
|
||||||
|
: sink_(sink)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
const Perf &perf() const { return perf_; }
|
||||||
|
|
||||||
|
void init(int plugin_argc, const char **plugin_argv) override
|
||||||
|
{
|
||||||
|
auto t = Perf::clock::now();
|
||||||
|
sink_->init(plugin_argc, plugin_argv);
|
||||||
|
perf_.dt_init = Perf::duration_cast(Perf::clock::now() - t);
|
||||||
|
}
|
||||||
|
|
||||||
|
void shutdown() override
|
||||||
|
{
|
||||||
|
auto t = Perf::clock::now();
|
||||||
|
sink_->shutdown();
|
||||||
|
perf_.dt_shutdown = Perf::duration_cast(Perf::clock::now() - t);
|
||||||
|
}
|
||||||
|
|
||||||
|
void begin_run(const char *descriptor_json) override
|
||||||
|
{
|
||||||
|
perf_.eventHits.clear();
|
||||||
|
perf_.eventBytes.clear();
|
||||||
|
perf_.dt_beginRun = perf_.dt_endRun = {};
|
||||||
|
perf_.dt_processEvent.clear();
|
||||||
|
perf_.t_endRun = {};
|
||||||
|
|
||||||
|
perf_.t_beginRun = Perf::clock::now();
|
||||||
|
sink_->begin_run(descriptor_json);
|
||||||
|
perf_.dt_beginRun = Perf::duration_cast(Perf::clock::now() - perf_.t_beginRun);
|
||||||
|
}
|
||||||
|
|
||||||
|
void end_run(const char *descriptor_json) override
|
||||||
|
{
|
||||||
|
auto t = Perf::clock::now();
|
||||||
|
sink_->end_run(descriptor_json);
|
||||||
|
perf_.t_endRun = Perf::clock::now();
|
||||||
|
perf_.dt_endRun = Perf::duration_cast(perf_.t_endRun - t);
|
||||||
|
}
|
||||||
|
|
||||||
|
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
|
||||||
|
size_t totalBytes) override
|
||||||
|
{
|
||||||
|
size_t size = eventIndex + 1;
|
||||||
|
perf_.eventHits.resize(std::max(perf_.eventHits.size(), size));
|
||||||
|
perf_.eventBytes.resize(std::max(perf_.eventBytes.size(), size));
|
||||||
|
perf_.dt_processEvent.resize(std::max(perf_.dt_processEvent.size(), size));
|
||||||
|
|
||||||
|
auto t = Perf::clock::now();
|
||||||
|
sink_->process_event(eventIndex, arrays, arrayCount, totalBytes);
|
||||||
|
auto dt = Perf::duration_cast(Perf::clock::now() - t);
|
||||||
|
|
||||||
|
perf_.eventHits[eventIndex]++;
|
||||||
|
perf_.eventBytes[eventIndex] += totalBytes;
|
||||||
|
perf_.dt_processEvent[eventIndex] += dt;
|
||||||
|
}
|
||||||
|
|
||||||
|
void process_system_event(const uint32_t *data, size_t size) override
|
||||||
|
{
|
||||||
|
sink_->process_system_event(data, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
IManaSink *sink_;
|
||||||
|
Perf perf_;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ManaCPlugin: public ManaPlugin
|
// wraps a mana_api.h mana_sink_plugin_t instance
|
||||||
|
struct ManaCSink: public IManaSink
|
||||||
{
|
{
|
||||||
mana_plugin_t plugin_;
|
mana_sink_plugin_t plugin_;
|
||||||
void *context_ = nullptr;
|
void *context_ = nullptr;
|
||||||
|
|
||||||
explicit ManaCPlugin(mana_plugin_t plugin)
|
explicit ManaCSink(mana_sink_plugin_t plugin)
|
||||||
: plugin_(plugin)
|
: plugin_(plugin)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_INIT(init) override { return context_ = plugin_.init(); }
|
void init(int plugin_argc, const char **plugin_argv) override
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) override
|
|
||||||
{
|
{
|
||||||
(void)context;
|
context_ = plugin_.init(plugin_argc, plugin_argv);
|
||||||
plugin_.shutdown(context_);
|
|
||||||
}
|
}
|
||||||
|
void shutdown() override { plugin_.shutdown(context_); }
|
||||||
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) override
|
void begin_run(const char *descriptor_json) override
|
||||||
{
|
{
|
||||||
(void)context;
|
|
||||||
plugin_.begin_run(context_, descriptor_json);
|
plugin_.begin_run(context_, descriptor_json);
|
||||||
}
|
}
|
||||||
|
void end_run(const char *descriptor_json) override
|
||||||
MANA_DEFINE_PLUGIN_END_RUN(end_run) override
|
|
||||||
{
|
{
|
||||||
(void)context;
|
|
||||||
plugin_.end_run(context_, descriptor_json);
|
plugin_.end_run(context_, descriptor_json);
|
||||||
}
|
}
|
||||||
|
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
|
||||||
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) override
|
size_t totalBytes) override
|
||||||
{
|
{
|
||||||
(void)context;
|
|
||||||
plugin_.process_event(context_, eventIndex, arrays, arrayCount, totalBytes);
|
plugin_.process_event(context_, eventIndex, arrays, arrayCount, totalBytes);
|
||||||
}
|
}
|
||||||
|
void process_system_event(const uint32_t *data, size_t size) override
|
||||||
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) override
|
|
||||||
{
|
{
|
||||||
(void)context;
|
|
||||||
plugin_.process_system_event(context_, data, size);
|
plugin_.process_system_event(context_, data, size);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
1
src/internal/mana_nng.cc
Normal file
1
src/internal/mana_nng.cc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
#include "mana_nng.hpp"
|
51
src/internal/mana_nng.hpp
Normal file
51
src/internal/mana_nng.hpp
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
#ifndef CF5E5AFF_F218_4A25_95DF_8097D7C5685B
|
||||||
|
#define CF5E5AFF_F218_4A25_95DF_8097D7C5685B
|
||||||
|
|
||||||
|
#include "mana_analysis.h"
|
||||||
|
#include <mesytec-mnode/mnode_nng.h>
|
||||||
|
|
||||||
|
namespace mesytec::mnode::mana
|
||||||
|
{
|
||||||
|
|
||||||
|
class NngServer: public IManaSink
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit NngServer(nng_socket socket)
|
||||||
|
: socket_(socket)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void init(int plugin_argc, const char **plugin_argv) override
|
||||||
|
{
|
||||||
|
(void)plugin_argc;
|
||||||
|
(void)plugin_argv;
|
||||||
|
}
|
||||||
|
|
||||||
|
void shutdown() override { nng_close(socket_); }
|
||||||
|
|
||||||
|
void begin_run(const char *descriptor_json) override { (void)descriptor_json; }
|
||||||
|
|
||||||
|
void end_run(const char *descriptor_json) override { (void)descriptor_json; }
|
||||||
|
|
||||||
|
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
|
||||||
|
size_t totalBytes) override
|
||||||
|
{
|
||||||
|
(void)eventIndex;
|
||||||
|
(void)arrays;
|
||||||
|
(void)arrayCount;
|
||||||
|
(void)totalBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
void process_system_event(const uint32_t *data, size_t size) override
|
||||||
|
{
|
||||||
|
(void)data;
|
||||||
|
(void)size;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
nng_socket socket_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace mesytec::mnode::mana
|
||||||
|
|
||||||
|
#endif /* CF5E5AFF_F218_4A25_95DF_8097D7C5685B */
|
9
src/internal/rxi/log.hpp
Normal file
9
src/internal/rxi/log.hpp
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
#ifndef DE6BF3F6_F6A6_4CB0_B986_4815C9B2C2CF
|
||||||
|
#define DE6BF3F6_F6A6_4CB0_B986_4815C9B2C2CF
|
||||||
|
|
||||||
|
extern "C"
|
||||||
|
{
|
||||||
|
#include "log.h"
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* DE6BF3F6_F6A6_4CB0_B986_4815C9B2C2CF */
|
|
@ -1,7 +1,7 @@
|
||||||
#include "internal/mana_api.h"
|
#include "internal/mana_c_api.h"
|
||||||
#include "internal/rxi/log.h"
|
#include "internal/rxi/log.h"
|
||||||
#include <string.h>
|
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
struct Context
|
struct Context
|
||||||
{
|
{
|
||||||
|
@ -9,28 +9,30 @@ struct Context
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_INIT(init)
|
MANA_DEFINE_PLUGIN_INIT(init)
|
||||||
{
|
{
|
||||||
log_set_level(LOG_DEBUG);
|
log_set_level(LOG_INFO);
|
||||||
struct Context *ctx = calloc(1, sizeof(*ctx));
|
struct Context *ctx = calloc(1, sizeof(*ctx));
|
||||||
log_debug("init: ctx=%p", ctx);
|
log_info("init: ctx=%p", ctx);
|
||||||
return ctx;
|
return ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
|
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
|
||||||
{
|
{
|
||||||
(void)context;
|
log_info("shutdown: ctx=%p", context);
|
||||||
log_debug("shutdown");
|
free(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
|
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
|
||||||
{
|
{
|
||||||
log_debug("begin_run: ctx=%p, descriptor_json=%s", context, descriptor_json);
|
(void)descriptor_json;
|
||||||
|
log_info("begin_run: ctx=%p", context);
|
||||||
/* pretty useless as the json needs to be parsed back into some c structure describing the input
|
/* pretty useless as the json needs to be parsed back into some c structure describing the input
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_END_RUN(end_run)
|
MANA_DEFINE_PLUGIN_END_RUN(end_run)
|
||||||
{
|
{
|
||||||
log_debug("end: ctx=%p, descriptor_json=%s", context, descriptor_json);
|
(void)descriptor_json;
|
||||||
|
log_info("end: ctx=%p", context);
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
|
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
|
||||||
|
@ -44,9 +46,9 @@ MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event)
|
||||||
log_debug("system_event: ctx=%p, size=%zu", context, size);
|
log_debug("system_event: ctx=%p, size=%zu", context, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_GET_PLUGIN(mana_get_plugin)
|
MANA_C_SINK_PLUGIN()
|
||||||
{
|
{
|
||||||
mana_plugin_t plugin;
|
mana_sink_plugin_t plugin;
|
||||||
plugin.init = init;
|
plugin.init = init;
|
||||||
plugin.shutdown = shutdown;
|
plugin.shutdown = shutdown;
|
||||||
plugin.begin_run = begin_run;
|
plugin.begin_run = begin_run;
|
||||||
|
|
51
src/mana_plugin_cpp_test.cc
Normal file
51
src/mana_plugin_cpp_test.cc
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
#include "internal/mana_api.hpp"
|
||||||
|
#include "internal/rxi/log.hpp"
|
||||||
|
|
||||||
|
using namespace mesytec::mnode;
|
||||||
|
using namespace mesytec::mnode::mana;
|
||||||
|
|
||||||
|
class Sink: public IManaSink
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
void init(int plugin_argc, const char **plugin_argv) override
|
||||||
|
{
|
||||||
|
(void)plugin_argc;
|
||||||
|
(void)plugin_argv;
|
||||||
|
log_set_level(LOG_INFO);
|
||||||
|
log_info("init: this=%p", this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void shutdown() override { log_info("shutdown: this=%p", this); }
|
||||||
|
|
||||||
|
void begin_run(const char *descriptor_json) override
|
||||||
|
{
|
||||||
|
(void)descriptor_json;
|
||||||
|
log_info("begin_run: this=%p", this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void end_run(const char *descriptor_json) override
|
||||||
|
{
|
||||||
|
(void)descriptor_json;
|
||||||
|
log_info("end_run: this=%p", this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
|
||||||
|
size_t totalBytes) override
|
||||||
|
{
|
||||||
|
log_trace("event: this=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", this, eventIndex,
|
||||||
|
arrayCount, totalBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void process_system_event(const uint32_t *data, size_t size) override
|
||||||
|
{
|
||||||
|
log_debug("system_event: this=%p, size=%zu", this, size);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class Plugin: public IManaPlugin
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
std::unique_ptr<IManaSink> makeSink() override { return std::make_unique<Sink>(); }
|
||||||
|
};
|
||||||
|
|
||||||
|
MANA_CPP_PLUGIN() { return new Plugin; }
|
|
@ -21,7 +21,8 @@ struct Context
|
||||||
py::module usercode;
|
py::module usercode;
|
||||||
py::object userobject;
|
py::object userobject;
|
||||||
py::object py_begin_run;
|
py::object py_begin_run;
|
||||||
py::object py_event_data;
|
py::object py_process_event;
|
||||||
|
py::object py_process_system_event;
|
||||||
py::object py_end_run;
|
py::object py_end_run;
|
||||||
std::vector<std::vector<py::object>> eventBuffers;
|
std::vector<std::vector<py::object>> eventBuffers;
|
||||||
};
|
};
|
||||||
|
@ -29,7 +30,7 @@ struct Context
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_INIT(init)
|
MANA_DEFINE_PLUGIN_INIT(init)
|
||||||
{
|
{
|
||||||
log_set_level(LOG_DEBUG);
|
log_set_level(LOG_INFO);
|
||||||
static Context g_ctx;
|
static Context g_ctx;
|
||||||
auto ctx = &g_ctx;
|
auto ctx = &g_ctx;
|
||||||
log_debug("init: ctx=%p", ctx);
|
log_debug("init: ctx=%p", ctx);
|
||||||
|
@ -39,21 +40,18 @@ MANA_DEFINE_PLUGIN_INIT(init)
|
||||||
return ctx;
|
return ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
|
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) { log_info("shutdown: ctx=%p", context); }
|
||||||
{
|
|
||||||
(void)context;
|
|
||||||
log_debug("shutdown");
|
|
||||||
}
|
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
|
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
|
||||||
{
|
{
|
||||||
log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json);
|
log_info("begin_run: ctx=%p", context);
|
||||||
auto jRun = nlohmann::json::parse(descriptor_json);
|
auto jRun = nlohmann::json::parse(descriptor_json);
|
||||||
auto ctx = reinterpret_cast<Context *>(context);
|
auto ctx = reinterpret_cast<Context *>(context);
|
||||||
ctx->usercode.reload();
|
ctx->usercode.reload();
|
||||||
// TODO: check if the retrieved attributes are callable
|
// TODO: check if the retrieved attributes are callable
|
||||||
ctx->py_begin_run = ctx->usercode.attr("begin_run");
|
ctx->py_begin_run = ctx->usercode.attr("begin_run");
|
||||||
ctx->py_event_data = ctx->usercode.attr("event_data");
|
ctx->py_process_event = ctx->usercode.attr("process_event");
|
||||||
|
ctx->py_process_system_event = ctx->usercode.attr("process_system_event");
|
||||||
ctx->py_end_run = ctx->usercode.attr("end_run");
|
ctx->py_end_run = ctx->usercode.attr("end_run");
|
||||||
ctx->py_begin_run(descriptor_json);
|
ctx->py_begin_run(descriptor_json);
|
||||||
ctx->eventBuffers.clear();
|
ctx->eventBuffers.clear();
|
||||||
|
@ -68,7 +66,7 @@ MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_END_RUN(end_run)
|
MANA_DEFINE_PLUGIN_END_RUN(end_run)
|
||||||
{
|
{
|
||||||
log_debug("end: context=%p, descriptor_json=%s", context, descriptor_json);
|
log_info("end: ctx=%p", context);
|
||||||
auto ctx = reinterpret_cast<Context *>(context);
|
auto ctx = reinterpret_cast<Context *>(context);
|
||||||
ctx->py_end_run(descriptor_json);
|
ctx->py_end_run(descriptor_json);
|
||||||
}
|
}
|
||||||
|
@ -77,6 +75,7 @@ MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
|
||||||
{
|
{
|
||||||
log_trace("event: ctx=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", context, eventIndex,
|
log_trace("event: ctx=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", context, eventIndex,
|
||||||
arrayCount, totalBytes);
|
arrayCount, totalBytes);
|
||||||
|
|
||||||
auto ctx = reinterpret_cast<Context *>(context);
|
auto ctx = reinterpret_cast<Context *>(context);
|
||||||
auto &buffers = ctx->eventBuffers.at(eventIndex);
|
auto &buffers = ctx->eventBuffers.at(eventIndex);
|
||||||
assert(buffers.size() == arrayCount);
|
assert(buffers.size() == arrayCount);
|
||||||
|
@ -86,18 +85,20 @@ MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
|
||||||
ctx->eventBuffers[eventIndex][ai] =
|
ctx->eventBuffers[eventIndex][ai] =
|
||||||
py::memoryview::from_memory(arraySpan.data(), arraySpan.size() * sizeof(float));
|
py::memoryview::from_memory(arraySpan.data(), arraySpan.size() * sizeof(float));
|
||||||
}
|
}
|
||||||
ctx->py_event_data(eventIndex, ctx->eventBuffers);
|
ctx->py_process_event(eventIndex, ctx->eventBuffers);
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event)
|
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event)
|
||||||
{
|
{
|
||||||
log_trace("system_event: ctx=%p, size=%zu", context, size);
|
log_debug("system_event: ctx=%p, size=%zu", context, size);
|
||||||
auto ctx = reinterpret_cast<Context *>(context);
|
auto ctx = reinterpret_cast<Context *>(context);
|
||||||
|
auto view = py::memoryview::from_memory(data, size * sizeof(uint32_t));
|
||||||
|
ctx->py_process_system_event(view);
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" MANA_DEFINE_GET_PLUGIN(mana_get_plugin)
|
extern "C" MANA_C_SINK_PLUGIN()
|
||||||
{
|
{
|
||||||
mana_plugin_t plugin;
|
mana_sink_plugin_t plugin;
|
||||||
plugin.init = init;
|
plugin.init = init;
|
||||||
plugin.shutdown = shutdown;
|
plugin.shutdown = shutdown;
|
||||||
plugin.begin_run = begin_run;
|
plugin.begin_run = begin_run;
|
||||||
|
|
|
@ -1,15 +1,12 @@
|
||||||
#include <TFile.h>
|
#include <TFile.h>
|
||||||
#include <TH1.h>
|
#include <TH1.h>
|
||||||
|
#include <filesystem>
|
||||||
#include <regex>
|
#include <regex>
|
||||||
#include <spdlog/spdlog.h>
|
#include <spdlog/spdlog.h>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "internal/mana_lib.hpp"
|
#include "internal/mana_lib.hpp"
|
||||||
|
#include "internal/rxi/log.hpp"
|
||||||
extern "C"
|
|
||||||
{
|
|
||||||
#include "internal/rxi/log.h"
|
|
||||||
}
|
|
||||||
|
|
||||||
using namespace mesytec::mnode;
|
using namespace mesytec::mnode;
|
||||||
|
|
||||||
|
@ -20,18 +17,30 @@ struct Context
|
||||||
std::vector<std::vector<TH1 *>> rawHistograms;
|
std::vector<std::vector<TH1 *>> rawHistograms;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static Context *g_ctx = nullptr;
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_INIT(init)
|
MANA_DEFINE_PLUGIN_INIT(init)
|
||||||
{
|
{
|
||||||
|
if (g_ctx)
|
||||||
|
{
|
||||||
|
log_warn("init() called multiple times. This plugin is a singleton!");
|
||||||
|
return g_ctx;
|
||||||
|
}
|
||||||
log_set_level(LOG_INFO);
|
log_set_level(LOG_INFO);
|
||||||
static Context g_ctx;
|
log_debug("init");
|
||||||
log_debug("init: ctx=%p", &g_ctx);
|
return g_ctx = new Context;
|
||||||
return &g_ctx;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
|
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
|
||||||
{
|
{
|
||||||
(void)context;
|
|
||||||
log_debug("shutdown");
|
log_debug("shutdown");
|
||||||
|
if (context != g_ctx)
|
||||||
|
{
|
||||||
|
log_warn("shutdown() called with invalid context");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
delete g_ctx;
|
||||||
|
g_ctx = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ObjectPath
|
struct ObjectPath
|
||||||
|
@ -145,20 +154,24 @@ inline std::string histo_info(const std::vector<std::vector<TH1 *>> &histos)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt::format("histoCount={}, histoMem={} MiB", histoCount, histoMem / (1024.0 * 1024));
|
return fmt::format("histoCount={}, histoMem={:.2f} MiB", histoCount,
|
||||||
|
histoMem / (1024.0 * 1024));
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
|
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
|
||||||
{
|
{
|
||||||
log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json);
|
log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json);
|
||||||
auto jRun = nlohmann::json::parse(descriptor_json);
|
auto jRun = nlohmann::json::parse(descriptor_json);
|
||||||
|
std::filesystem::path rp(jRun["name"].get<std::string>());
|
||||||
|
auto filename = fmt::format("{}_histograms.root", rp.filename().replace_extension().string());
|
||||||
auto ctx = reinterpret_cast<Context *>(context);
|
auto ctx = reinterpret_cast<Context *>(context);
|
||||||
ctx->hitCounts.clear();
|
ctx->hitCounts.clear();
|
||||||
ctx->outputFile = std::make_unique<TFile>("output.root", "RECREATE");
|
ctx->outputFile = std::make_unique<TFile>(filename.c_str(), "RECREATE");
|
||||||
ctx->hitCounts = make_hitcount_histos(ctx->outputFile.get(), jRun);
|
ctx->hitCounts = make_hitcount_histos(ctx->outputFile.get(), jRun);
|
||||||
ctx->rawHistograms = make_raw_histos(ctx->outputFile.get(), jRun);
|
ctx->rawHistograms = make_raw_histos(ctx->outputFile.get(), jRun);
|
||||||
log_info("hitCount histograms: %s", histo_info(ctx->hitCounts).c_str());
|
log_info("hitCount histograms: %s", histo_info(ctx->hitCounts).c_str());
|
||||||
log_info("raw histograms: %s", histo_info(ctx->rawHistograms).c_str());
|
log_info("raw histograms: %s", histo_info(ctx->rawHistograms).c_str());
|
||||||
|
log_info("writing histograms into: %s", filename.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
MANA_DEFINE_PLUGIN_END_RUN(end_run)
|
MANA_DEFINE_PLUGIN_END_RUN(end_run)
|
||||||
|
@ -202,9 +215,9 @@ MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event)
|
||||||
log_trace("system_event: ctx=%p, size=%zu", context, size);
|
log_trace("system_event: ctx=%p, size=%zu", context, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
extern "C" MANA_DEFINE_GET_PLUGIN(mana_get_plugin)
|
extern "C" MANA_C_SINK_PLUGIN()
|
||||||
{
|
{
|
||||||
mana_plugin_t plugin;
|
mana_sink_plugin_t plugin;
|
||||||
plugin.init = init;
|
plugin.init = init;
|
||||||
plugin.shutdown = shutdown;
|
plugin.shutdown = shutdown;
|
||||||
plugin.begin_run = begin_run;
|
plugin.begin_run = begin_run;
|
||||||
|
|
|
@ -1,13 +1,16 @@
|
||||||
|
|
||||||
import json
|
import sys
|
||||||
import py_mana
|
|
||||||
|
|
||||||
def begin_run(runDescription: str):
|
def begin_run(runDescription: str):
|
||||||
print("begin_run")
|
print(f"python: begin_run - python version: {sys.version}")
|
||||||
|
|
||||||
def end_run(runDescription: str):
|
def end_run(runDescription: str):
|
||||||
print("end_run")
|
print(f"python: end_run")
|
||||||
|
|
||||||
def event_data(eventIndex: int, dataArrays):
|
def process_event(eventIndex: int, dataArrays):
|
||||||
return
|
return
|
||||||
print(f"event[{eventIndex}]: {dataArrays}")
|
print(f"event[{eventIndex}]: {dataArrays}")
|
||||||
|
|
||||||
|
def process_system_event(data):
|
||||||
|
return
|
||||||
|
print(f"python: system_event: size={len(data)}")
|
||||||
|
|
|
@ -293,11 +293,11 @@ struct NngPairStrategy: public ProcessingStrategy
|
||||||
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
|
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
|
||||||
{
|
{
|
||||||
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
|
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
|
||||||
auto bytesPerSecond = totalBytesProcessed / s;
|
auto MiB = totalBytesProcessed / (1024.0 * 1024);
|
||||||
auto MiBPerSecond = bytesPerSecond / (1u << 20);
|
auto MiB_s = MiB / s;
|
||||||
std::cout << fmt::format(
|
fmt::print("Processed {} mvlc data buffers, {:.2f} MiB. "
|
||||||
"Processed {} buffers, {} bytes. t={} s, rate={:.2f} MiB/s\n", bufferNumber,
|
"elapsed={:.2f} s, rate={:.2f} MiB/s\n",
|
||||||
totalBytesProcessed, s, MiBPerSecond);
|
bufferNumber, MiB, s, MiB_s);
|
||||||
}(sw, bufferNumber, totalBytesProcessed);
|
}(sw, bufferNumber, totalBytesProcessed);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -377,7 +377,7 @@ void usage(const char *self)
|
||||||
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a "
|
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a "
|
||||||
"simple counting plugin.\n"
|
"simple counting plugin.\n"
|
||||||
<< " --processing-strategy=<name> Use a specific processing strategy. "
|
<< " --processing-strategy=<name> Use a specific processing strategy. "
|
||||||
"Available: 'direct', 'nng-pair'. Default: 'direct'\n"
|
"Available: 'single-threaded', 'nng-pair'. Default: 'nng-pair'\n"
|
||||||
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
|
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
|
||||||
"'off'.\n";
|
"'off'.\n";
|
||||||
}
|
}
|
||||||
|
@ -413,8 +413,9 @@ int main(int argc, char *argv[])
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<mana::ManaPlugin> manaPlugin;
|
|
||||||
boost::dll::shared_library pluginHandle;
|
boost::dll::shared_library pluginHandle;
|
||||||
|
std::unique_ptr<mana::IManaPlugin> manaCppPlugin;
|
||||||
|
std::unique_ptr<mana::IManaSink> destSink;
|
||||||
|
|
||||||
if (parser("--plugin"))
|
if (parser("--plugin"))
|
||||||
{
|
{
|
||||||
|
@ -422,8 +423,27 @@ int main(int argc, char *argv[])
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
pluginHandle = boost::dll::shared_library(pluginFile);
|
pluginHandle = boost::dll::shared_library(pluginFile);
|
||||||
manaPlugin = std::make_unique<mana::ManaCPlugin>(
|
|
||||||
pluginHandle.get<mana_plugin_t()>("mana_get_plugin")());
|
try
|
||||||
|
{
|
||||||
|
destSink = std::make_unique<mana::ManaCSink>(
|
||||||
|
pluginHandle.get<mana_sink_plugin_t()>("mana_get_sink_plugin")());
|
||||||
|
}
|
||||||
|
catch (const std::exception &e)
|
||||||
|
{
|
||||||
|
spdlog::debug("plugin {} is not a MANA_C_SINK_PLUGIN", pluginFile);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
manaCppPlugin = std::unique_ptr<mana::IManaPlugin>(
|
||||||
|
pluginHandle.get<mana::IManaPlugin *()>("mana_get_plugin")());
|
||||||
|
destSink = manaCppPlugin->makeSink();
|
||||||
|
}
|
||||||
|
catch (const std::exception &e)
|
||||||
|
{
|
||||||
|
std::cerr << fmt::format("Error loading plugin: {}\n", e.what());
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (const std::exception &e)
|
catch (const std::exception &e)
|
||||||
{
|
{
|
||||||
|
@ -433,19 +453,22 @@ int main(int argc, char *argv[])
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
manaPlugin = std::make_unique<mana::ManaCountingSink>();
|
destSink = std::make_unique<mana::ManaCountingSink>();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string strategyName = "direct";
|
auto manaSink = std::make_unique<mana::ManaSinkPerfProxy>(destSink.get());
|
||||||
|
|
||||||
|
std::string strategyName = "nng-pair";
|
||||||
|
|
||||||
if (parser("--processing-strategy"))
|
if (parser("--processing-strategy"))
|
||||||
strategyName = parser("--processing-strategy").str();
|
strategyName = parser("--processing-strategy").str();
|
||||||
|
|
||||||
std::unique_ptr<ProcessingStrategy> strategy; //= std::make_unique<SingleThreadedStrategy>();
|
std::unique_ptr<ProcessingStrategy> processingStrategy;
|
||||||
|
|
||||||
if (strategyName == "nng-pair")
|
if (strategyName == "nng-pair")
|
||||||
strategy = std::make_unique<NngPairStrategy>("inproc://mana_module_data_stage");
|
processingStrategy = std::make_unique<NngPairStrategy>("inproc://mana_module_data_stage");
|
||||||
else if (strategyName == "direct")
|
else if (strategyName == "single-threaded")
|
||||||
strategy = std::make_unique<SingleThreadedStrategy>();
|
processingStrategy = std::make_unique<SingleThreadedStrategy>();
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName);
|
std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName);
|
||||||
|
@ -454,7 +477,7 @@ int main(int argc, char *argv[])
|
||||||
}
|
}
|
||||||
|
|
||||||
auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig,
|
auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig,
|
||||||
jModuleDataSources, manaPlugin.get(), nullptr);
|
jModuleDataSources, manaSink.get(), nullptr);
|
||||||
|
|
||||||
auto event_data = [](void *ctx_, int crateIndex, int eventIndex,
|
auto event_data = [](void *ctx_, int crateIndex, int eventIndex,
|
||||||
const mvlc::readout_parser::ModuleData *moduleDataList,
|
const mvlc::readout_parser::ModuleData *moduleDataList,
|
||||||
|
@ -481,13 +504,33 @@ int main(int argc, char *argv[])
|
||||||
// make the analysis instance available to the parser callbacks
|
// make the analysis instance available to the parser callbacks
|
||||||
parserContext->parser.userContext = &mana;
|
parserContext->parser.userContext = &mana;
|
||||||
|
|
||||||
mana.sinkContext = mana.sink->init();
|
mana.sink->init(0, nullptr);
|
||||||
mana.sink->begin_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
|
mana.sink->begin_run(mana.runDescriptor.dump().c_str());
|
||||||
|
|
||||||
strategy->run(*listfileContext, *parserContext);
|
processingStrategy->run(*listfileContext, *parserContext);
|
||||||
|
|
||||||
mana.sink->end_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
|
mana.sink->end_run(mana.runDescriptor.dump().c_str());
|
||||||
mana.sink->shutdown(mana.sinkContext);
|
mana.sink->shutdown();
|
||||||
|
|
||||||
|
auto perf = manaSink->perf();
|
||||||
|
{
|
||||||
|
auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes),
|
||||||
|
static_cast<size_t>(0));
|
||||||
|
auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits),
|
||||||
|
static_cast<size_t>(0));
|
||||||
|
double elapsed_ms =
|
||||||
|
std::chrono::duration_cast<std::chrono::milliseconds>(perf.t_endRun - perf.t_beginRun)
|
||||||
|
.count();
|
||||||
|
double elapsed_s = elapsed_ms / 1000.0;
|
||||||
|
|
||||||
|
double MiB = totalBytes / (1024.0 * 1024);
|
||||||
|
double MiB_s = MiB / elapsed_s;
|
||||||
|
double hit_s = totalHits / elapsed_s;
|
||||||
|
|
||||||
|
fmt::print("Data Sink Performance: events={}, bytes={:.2f} MiB, elapsed={:.2f} s, "
|
||||||
|
"event_rate={:.2f} event/s, data_rate={:.2f} MiB/s\n",
|
||||||
|
totalHits, MiB, elapsed_s, hit_s, MiB_s);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue