Compare commits

..

No commits in common. "a7b5d505335cd9c6dd02ca07c80bc4cc1b321f32" and "bfce5e76ab4909c1c2f0a51dad4a7d141993266b" have entirely different histories.

15 changed files with 120 additions and 449 deletions

View file

@ -1,19 +0,0 @@
#ifndef E7FD4517_4FF5_4BDA_9AE4_95308A37554D
#define E7FD4517_4FF5_4BDA_9AE4_95308A37554D
#include <regex>
#include <string>
namespace mesytec::mnode
{
template <typename OutputIt>
void split_string(const std::string &str, const std::regex &re, OutputIt out)
{
std::sregex_token_iterator it(str.begin(), str.end(), re, -1);
std::copy(it, std::sregex_token_iterator(), out);
}
} // namespace mesytec::mnode
#endif /* E7FD4517_4FF5_4BDA_9AE4_95308A37554D */

View file

@ -24,16 +24,13 @@ if (MNODE_BUILD_TESTS)
target_link_libraries(test_mana PRIVATE mana) target_link_libraries(test_mana PRIVATE mana)
endif() endif()
add_library(rxi-logc INTERFACE) add_library(rxi-logc INTERFACE)
target_sources(rxi-logc INTERFACE internal/rxi/log.c) target_sources(rxi-logc INTERFACE internal/rxi/log.c)
target_compile_options(rxi-logc INTERFACE -DLOG_USE_COLOR)
add_library(mana-plugin-c-test SHARED mana_plugin_c_test.c) add_library(mana-plugin-c-test SHARED mana_plugin_c_test.c)
target_link_libraries(mana-plugin-c-test PRIVATE rxi-logc) target_link_libraries(mana-plugin-c-test PRIVATE rxi-logc)
add_library(mana-plugin-cpp-test SHARED mana_plugin_cpp_test.cc)
target_link_libraries(mana-plugin-cpp-test PRIVATE rxi-logc)
find_package(ROOT COMPONENTS Hist) find_package(ROOT COMPONENTS Hist)
if (ROOT_FOUND) if (ROOT_FOUND)
message("-- Using ROOT installation from ${ROOT_USE_FILE}") message("-- Using ROOT installation from ${ROOT_USE_FILE}")

View file

@ -47,7 +47,7 @@ struct ModuleDataStage
std::vector<std::vector<nlohmann::json>> moduleInfo; std::vector<std::vector<nlohmann::json>> moduleInfo;
RunInfo runInfo; RunInfo runInfo;
nlohmann::json runDescriptor; nlohmann::json runDescriptor;
IManaSink *sink = nullptr; ManaPlugin *sink = nullptr;
void *sinkContext = nullptr; void *sinkContext = nullptr;
}; };
@ -196,7 +196,7 @@ inline nlohmann::json make_run_descriptor(const RunInfo &runInfo)
inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::Arena &&arena, inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::Arena &&arena,
const mvlc::CrateConfig &crateConfig, const mvlc::CrateConfig &crateConfig,
nlohmann::json moduleDb, IManaSink *sink, nlohmann::json moduleDb, ManaPlugin *sink,
void *sinkContext) void *sinkContext)
{ {
ModuleDataStage result; ModuleDataStage result;
@ -210,7 +210,7 @@ inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::
allocate_outputs(result.arena, result.runInfo); allocate_outputs(result.arena, result.runInfo);
result.runDescriptor = make_run_descriptor(result.runInfo); result.runDescriptor = make_run_descriptor(result.runInfo);
spdlog::info("ModuleDataStage: runDescriptor={}", result.runDescriptor.dump()); spdlog::info("ModuleDataStage: runDescriptor={}", result.runDescriptor.dump(2));
spdlog::info("ModuleDataStage: arena stats={}", arena_stats(result.arena)); spdlog::info("ModuleDataStage: arena stats={}", arena_stats(result.arena));
return result; return result;
@ -218,12 +218,12 @@ inline ModuleDataStage make_module_data_stage(const std::string &runName, mana::
inline void module_data_stage_begin_run(ModuleDataStage &ctx) inline void module_data_stage_begin_run(ModuleDataStage &ctx)
{ {
ctx.sink->begin_run(ctx.runDescriptor.dump().c_str()); ctx.sink->begin_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str());
} }
inline void module_data_stage_end_run(ModuleDataStage &ctx) inline void module_data_stage_end_run(ModuleDataStage &ctx)
{ {
ctx.sink->end_run(ctx.runDescriptor.dump().c_str()); ctx.sink->end_run(ctx.sinkContext, ctx.runDescriptor.dump().c_str());
} }
inline void inline void
@ -262,23 +262,22 @@ module_data_stage_process_module_data(ModuleDataStage &ctx, int eventIndex,
extract_module_data(inputData, extractors.at(oi), spans.at(oi)); extract_module_data(inputData, extractors.at(oi), spans.at(oi));
} }
ctx.sink->process_event(eventIndex, eventInfo.outputArrays.front(), ctx.sink->process_event(ctx.sinkContext, eventIndex, eventInfo.outputArrays.front(),
eventInfo.outputArrays.size(), eventInfo.sizeBytes); eventInfo.outputArrays.size(), eventInfo.sizeBytes);
} }
inline void module_data_stage_process_system_event(ModuleDataStage &ctx, const u32 *data, u32 size) inline void module_data_stage_process_system_event(ModuleDataStage &ctx, const u32 *data, u32 size)
{ {
ctx.sink->process_system_event(data, size); ctx.sink->process_system_event(ctx.sinkContext, data, size);
} }
struct ManaCountingSink: public IManaSink struct ManaCountingSink: public ManaPlugin
{ {
std::vector<size_t> eventCounts; std::vector<size_t> eventCounts;
std::vector<std::vector<std::vector<size_t>>> eventArrayIndexHits; std::vector<std::vector<std::vector<size_t>>> eventArrayIndexHits;
size_t totalBytes = 0; size_t totalBytes = 0;
size_t systemEventCount = 0; size_t systemEventCount = 0;
size_t systemEventBytes = 0; size_t systemEventBytes = 0;
void reset() void reset()
{ {
eventCounts.clear(); eventCounts.clear();
@ -286,10 +285,13 @@ struct ManaCountingSink: public IManaSink
totalBytes = 0; totalBytes = 0;
} }
void init(int, const char **) override {} MANA_DEFINE_PLUGIN_INIT(init) override { return nullptr; }
void shutdown() override {}
void begin_run(const char *descriptor_json) override MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) override { (void)context; }
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) override
{ {
(void)context;
auto jRun = nlohmann::json::parse(descriptor_json); auto jRun = nlohmann::json::parse(descriptor_json);
reset(); reset();
eventCounts.resize(jRun["events"].size()); eventCounts.resize(jRun["events"].size());
@ -297,8 +299,9 @@ struct ManaCountingSink: public IManaSink
eventArrayIndexHits.resize(eventCounts.size()); eventArrayIndexHits.resize(eventCounts.size());
} }
void end_run(const char *descriptor_json) override MANA_DEFINE_PLUGIN_END_RUN(end_run) override
{ {
(void)context;
auto jRun = nlohmann::json::parse(descriptor_json); auto jRun = nlohmann::json::parse(descriptor_json);
spdlog::info("ManaCountingSink: eventCounts=[{}], totalBytes={}, systemEvents={}, " spdlog::info("ManaCountingSink: eventCounts=[{}], totalBytes={}, systemEvents={}, "
@ -320,9 +323,9 @@ struct ManaCountingSink: public IManaSink
} }
} }
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount, MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) override
size_t totalBytes) override
{ {
(void)context;
eventCounts.resize(std::max(eventCounts.size(), static_cast<size_t>(eventIndex + 1))); eventCounts.resize(std::max(eventCounts.size(), static_cast<size_t>(eventIndex + 1)));
eventArrayIndexHits.resize(eventCounts.size()); eventArrayIndexHits.resize(eventCounts.size());
eventArrayIndexHits[eventIndex].resize( eventArrayIndexHits[eventIndex].resize(
@ -347,8 +350,9 @@ struct ManaCountingSink: public IManaSink
this->totalBytes += totalBytes; this->totalBytes += totalBytes;
} }
void process_system_event(const uint32_t *data, size_t size) override MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) override
{ {
(void)context;
(void)data; (void)data;
++systemEventCount; ++systemEventCount;
systemEventBytes += size * sizeof(u32); systemEventBytes += size * sizeof(u32);

View file

@ -1,8 +1,8 @@
#ifndef A51A04C1_ABD6_4DE9_B16A_49A9DA46C67E #ifndef A51A04C1_ABD6_4DE9_B16A_49A9DA46C67E
#define A51A04C1_ABD6_4DE9_B16A_49A9DA46C67E #define A51A04C1_ABD6_4DE9_B16A_49A9DA46C67E
#include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <stddef.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" extern "C"
@ -33,7 +33,7 @@ extern "C"
size_t size_bytes; size_t size_bytes;
} mana_offset_array_t; } mana_offset_array_t;
#define MANA_DEFINE_PLUGIN_INIT(name) void *name(int plugin_argc, const char **plugin_argv) #define MANA_DEFINE_PLUGIN_INIT(name) void *name()
#define MANA_DEFINE_PLUGIN_SHUTDOWN(name) void name(void *context) #define MANA_DEFINE_PLUGIN_SHUTDOWN(name) void name(void *context)
@ -63,11 +63,11 @@ extern "C"
mana_end_run_t *end_run; mana_end_run_t *end_run;
mana_process_event_t *process_event; mana_process_event_t *process_event;
mana_process_system_event_t *process_system_event; mana_process_system_event_t *process_system_event;
} mana_sink_plugin_t; } mana_plugin_t;
// use this to define the entry point into the plugin // plugins need to define this function with the name 'mana_get_plugin'
// from c++: extern "C" MANA_C_SINK_PLUGIN() { ... return plugin; } #define MANA_DEFINE_GET_PLUGIN(name) mana_plugin_t name()
#define MANA_C_SINK_PLUGIN mana_sink_plugin_t mana_get_sink_plugin typedef MANA_DEFINE_GET_PLUGIN(mana_get_plugin_t);
#ifdef __cplusplus #ifdef __cplusplus
} }

View file

@ -1,62 +0,0 @@
#ifndef A042D3CF_5927_42A2_93FF_E49E940B32BD
#define A042D3CF_5927_42A2_93FF_E49E940B32BD
#include <memory>
#include <regex>
#include <string>
#include <vector>
#include "mana_c_api.h"
namespace mesytec::mnode::mana
{
class IManaSink
{
public:
virtual ~IManaSink() = default;
virtual void init(int plugin_argc, const char **plugin_argv) = 0;
virtual void shutdown() = 0;
virtual void begin_run(const char *descriptor_json) = 0;
virtual void end_run(const char *descriptor_json) = 0;
virtual void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
size_t totalBytes) = 0;
virtual void process_system_event(const uint32_t *data, size_t size) = 0;
template <typename StringHolder> void init(StringHolder args)
{
std::vector<const char *> cargs(args.size());
std::transform(args.begin(), args.end(), cargs.begin(),
[](const std::string &s) { return s.c_str(); });
init(cargs.size(), cargs.data());
}
protected:
IManaSink() = default;
private:
IManaSink(const IManaSink &) = delete;
IManaSink &operator=(const IManaSink &) = delete;
};
class IManaPlugin
{
public:
virtual ~IManaPlugin() = default;
virtual std::unique_ptr<IManaSink> makeSink() = 0;
protected:
IManaPlugin() = default;
private:
IManaPlugin(const IManaPlugin &) = delete;
IManaPlugin &operator=(const IManaPlugin &) = delete;
};
} // namespace mesytec::mnode::mana
#define MANA_CPP_PLUGIN extern "C" mana::IManaPlugin *mana_get_plugin
#endif /* A042D3CF_5927_42A2_93FF_E49E940B32BD */

View file

@ -1,15 +1,13 @@
#ifndef AAB5E4D2_A05B_4F2F_B76A_406A5A569D55 #ifndef AAB5E4D2_A05B_4F2F_B76A_406A5A569D55
#define AAB5E4D2_A05B_4F2F_B76A_406A5A569D55 #define AAB5E4D2_A05B_4F2F_B76A_406A5A569D55
#include "mana_api.hpp"
#include "mana_arena.h"
#include <cassert> #include <cassert>
#include <chrono>
#include <mesytec-mnode/mnode_cpp_types.h> #include <mesytec-mnode/mnode_cpp_types.h>
#include <mesytec-mnode/mnode_string.hpp>
#include <mesytec-mvlc/cpp_compat.h> #include <mesytec-mvlc/cpp_compat.h>
#include <mesytec-mvlc/util/data_filter.h> #include <mesytec-mvlc/util/data_filter.h>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include "mana_api.h"
#include "mana_arena.h"
namespace mesytec::mnode::mana namespace mesytec::mnode::mana
{ {
@ -128,129 +126,56 @@ inline nlohmann::json make_array_descriptor(const std::string &name, const std::
return make_array_descriptor(name, mana_float, size, bits); return make_array_descriptor(name, mana_float, size, bits);
} }
class ManaSinkPerfProxy: public IManaSink struct ManaPlugin
{ {
public: virtual ~ManaPlugin() = default;
struct Perf virtual MANA_DEFINE_PLUGIN_INIT(init) = 0;
{ virtual MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) = 0;
using clock = std::chrono::high_resolution_clock; virtual MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) = 0;
using time_point = std::chrono::time_point<clock>; virtual MANA_DEFINE_PLUGIN_END_RUN(end_run) = 0;
using duration = std::chrono::microseconds; virtual MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) = 0;
template <typename T> static duration duration_cast(const T &dt) virtual MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) = 0;
{
return std::chrono::duration_cast<duration>(dt);
}
std::vector<size_t> eventHits;
std::vector<size_t> eventBytes;
duration dt_init;
duration dt_shutdown;
duration dt_beginRun;
duration dt_endRun;
std::vector<duration> dt_processEvent;
time_point t_beginRun;
time_point t_endRun;
}; };
explicit ManaSinkPerfProxy(IManaSink *sink) struct ManaCPlugin: public ManaPlugin
: sink_(sink)
{ {
} mana_plugin_t plugin_;
const Perf &perf() const { return perf_; }
void init(int plugin_argc, const char **plugin_argv) override
{
auto t = Perf::clock::now();
sink_->init(plugin_argc, plugin_argv);
perf_.dt_init = Perf::duration_cast(Perf::clock::now() - t);
}
void shutdown() override
{
auto t = Perf::clock::now();
sink_->shutdown();
perf_.dt_shutdown = Perf::duration_cast(Perf::clock::now() - t);
}
void begin_run(const char *descriptor_json) override
{
perf_.eventHits.clear();
perf_.eventBytes.clear();
perf_.dt_beginRun = perf_.dt_endRun = {};
perf_.dt_processEvent.clear();
perf_.t_endRun = {};
perf_.t_beginRun = Perf::clock::now();
sink_->begin_run(descriptor_json);
perf_.dt_beginRun = Perf::duration_cast(Perf::clock::now() - perf_.t_beginRun);
}
void end_run(const char *descriptor_json) override
{
auto t = Perf::clock::now();
sink_->end_run(descriptor_json);
perf_.t_endRun = Perf::clock::now();
perf_.dt_endRun = Perf::duration_cast(perf_.t_endRun - t);
}
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
size_t totalBytes) override
{
size_t size = eventIndex + 1;
perf_.eventHits.resize(std::max(perf_.eventHits.size(), size));
perf_.eventBytes.resize(std::max(perf_.eventBytes.size(), size));
perf_.dt_processEvent.resize(std::max(perf_.dt_processEvent.size(), size));
auto t = Perf::clock::now();
sink_->process_event(eventIndex, arrays, arrayCount, totalBytes);
auto dt = Perf::duration_cast(Perf::clock::now() - t);
perf_.eventHits[eventIndex]++;
perf_.eventBytes[eventIndex] += totalBytes;
perf_.dt_processEvent[eventIndex] += dt;
}
void process_system_event(const uint32_t *data, size_t size) override
{
sink_->process_system_event(data, size);
}
private:
IManaSink *sink_;
Perf perf_;
};
// wraps a mana_api.h mana_sink_plugin_t instance
struct ManaCSink: public IManaSink
{
mana_sink_plugin_t plugin_;
void *context_ = nullptr; void *context_ = nullptr;
explicit ManaCSink(mana_sink_plugin_t plugin) explicit ManaCPlugin(mana_plugin_t plugin)
: plugin_(plugin) : plugin_(plugin)
{ {
} }
void init(int plugin_argc, const char **plugin_argv) override MANA_DEFINE_PLUGIN_INIT(init) override { return context_ = plugin_.init(); }
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) override
{ {
context_ = plugin_.init(plugin_argc, plugin_argv); (void)context;
plugin_.shutdown(context_);
} }
void shutdown() override { plugin_.shutdown(context_); }
void begin_run(const char *descriptor_json) override MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) override
{ {
(void)context;
plugin_.begin_run(context_, descriptor_json); plugin_.begin_run(context_, descriptor_json);
} }
void end_run(const char *descriptor_json) override
MANA_DEFINE_PLUGIN_END_RUN(end_run) override
{ {
(void)context;
plugin_.end_run(context_, descriptor_json); plugin_.end_run(context_, descriptor_json);
} }
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
size_t totalBytes) override MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) override
{ {
(void)context;
plugin_.process_event(context_, eventIndex, arrays, arrayCount, totalBytes); plugin_.process_event(context_, eventIndex, arrays, arrayCount, totalBytes);
} }
void process_system_event(const uint32_t *data, size_t size) override
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) override
{ {
(void)context;
plugin_.process_system_event(context_, data, size); plugin_.process_system_event(context_, data, size);
} }
}; };

View file

@ -1 +0,0 @@
#include "mana_nng.hpp"

View file

@ -1,51 +0,0 @@
#ifndef CF5E5AFF_F218_4A25_95DF_8097D7C5685B
#define CF5E5AFF_F218_4A25_95DF_8097D7C5685B
#include "mana_analysis.h"
#include <mesytec-mnode/mnode_nng.h>
namespace mesytec::mnode::mana
{
class NngServer: public IManaSink
{
public:
explicit NngServer(nng_socket socket)
: socket_(socket)
{
}
void init(int plugin_argc, const char **plugin_argv) override
{
(void)plugin_argc;
(void)plugin_argv;
}
void shutdown() override { nng_close(socket_); }
void begin_run(const char *descriptor_json) override { (void)descriptor_json; }
void end_run(const char *descriptor_json) override { (void)descriptor_json; }
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
size_t totalBytes) override
{
(void)eventIndex;
(void)arrays;
(void)arrayCount;
(void)totalBytes;
}
void process_system_event(const uint32_t *data, size_t size) override
{
(void)data;
(void)size;
}
private:
nng_socket socket_;
};
} // namespace mesytec::mnode::mana
#endif /* CF5E5AFF_F218_4A25_95DF_8097D7C5685B */

View file

@ -1,9 +0,0 @@
#ifndef DE6BF3F6_F6A6_4CB0_B986_4815C9B2C2CF
#define DE6BF3F6_F6A6_4CB0_B986_4815C9B2C2CF
extern "C"
{
#include "log.h"
}
#endif /* DE6BF3F6_F6A6_4CB0_B986_4815C9B2C2CF */

View file

@ -1,7 +1,7 @@
#include "internal/mana_c_api.h" #include "internal/mana_api.h"
#include "internal/rxi/log.h" #include "internal/rxi/log.h"
#include <stdlib.h>
#include <string.h> #include <string.h>
#include <stdlib.h>
struct Context struct Context
{ {
@ -9,30 +9,28 @@ struct Context
MANA_DEFINE_PLUGIN_INIT(init) MANA_DEFINE_PLUGIN_INIT(init)
{ {
log_set_level(LOG_INFO); log_set_level(LOG_DEBUG);
struct Context *ctx = calloc(1, sizeof(*ctx)); struct Context *ctx = calloc(1, sizeof(*ctx));
log_info("init: ctx=%p", ctx); log_debug("init: ctx=%p", ctx);
return ctx; return ctx;
} }
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
{ {
log_info("shutdown: ctx=%p", context); (void)context;
free(context); log_debug("shutdown");
} }
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
{ {
(void)descriptor_json; log_debug("begin_run: ctx=%p, descriptor_json=%s", context, descriptor_json);
log_info("begin_run: ctx=%p", context);
/* pretty useless as the json needs to be parsed back into some c structure describing the input /* pretty useless as the json needs to be parsed back into some c structure describing the input
*/ */
} }
MANA_DEFINE_PLUGIN_END_RUN(end_run) MANA_DEFINE_PLUGIN_END_RUN(end_run)
{ {
(void)descriptor_json; log_debug("end: ctx=%p, descriptor_json=%s", context, descriptor_json);
log_info("end: ctx=%p", context);
} }
MANA_DEFINE_PLUGIN_EVENT_DATA(process_event) MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
@ -46,9 +44,9 @@ MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event)
log_debug("system_event: ctx=%p, size=%zu", context, size); log_debug("system_event: ctx=%p, size=%zu", context, size);
} }
MANA_C_SINK_PLUGIN() MANA_DEFINE_GET_PLUGIN(mana_get_plugin)
{ {
mana_sink_plugin_t plugin; mana_plugin_t plugin;
plugin.init = init; plugin.init = init;
plugin.shutdown = shutdown; plugin.shutdown = shutdown;
plugin.begin_run = begin_run; plugin.begin_run = begin_run;

View file

@ -1,51 +0,0 @@
#include "internal/mana_api.hpp"
#include "internal/rxi/log.hpp"
using namespace mesytec::mnode;
using namespace mesytec::mnode::mana;
class Sink: public IManaSink
{
public:
void init(int plugin_argc, const char **plugin_argv) override
{
(void)plugin_argc;
(void)plugin_argv;
log_set_level(LOG_INFO);
log_info("init: this=%p", this);
}
void shutdown() override { log_info("shutdown: this=%p", this); }
void begin_run(const char *descriptor_json) override
{
(void)descriptor_json;
log_info("begin_run: this=%p", this);
}
void end_run(const char *descriptor_json) override
{
(void)descriptor_json;
log_info("end_run: this=%p", this);
}
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
size_t totalBytes) override
{
log_trace("event: this=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", this, eventIndex,
arrayCount, totalBytes);
}
void process_system_event(const uint32_t *data, size_t size) override
{
log_debug("system_event: this=%p, size=%zu", this, size);
}
};
class Plugin: public IManaPlugin
{
public:
std::unique_ptr<IManaSink> makeSink() override { return std::make_unique<Sink>(); }
};
MANA_CPP_PLUGIN() { return new Plugin; }

View file

@ -21,8 +21,7 @@ struct Context
py::module usercode; py::module usercode;
py::object userobject; py::object userobject;
py::object py_begin_run; py::object py_begin_run;
py::object py_process_event; py::object py_event_data;
py::object py_process_system_event;
py::object py_end_run; py::object py_end_run;
std::vector<std::vector<py::object>> eventBuffers; std::vector<std::vector<py::object>> eventBuffers;
}; };
@ -30,7 +29,7 @@ struct Context
MANA_DEFINE_PLUGIN_INIT(init) MANA_DEFINE_PLUGIN_INIT(init)
{ {
log_set_level(LOG_INFO); log_set_level(LOG_DEBUG);
static Context g_ctx; static Context g_ctx;
auto ctx = &g_ctx; auto ctx = &g_ctx;
log_debug("init: ctx=%p", ctx); log_debug("init: ctx=%p", ctx);
@ -40,18 +39,21 @@ MANA_DEFINE_PLUGIN_INIT(init)
return ctx; return ctx;
} }
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) { log_info("shutdown: ctx=%p", context); } MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
{
(void)context;
log_debug("shutdown");
}
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
{ {
log_info("begin_run: ctx=%p", context); log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json);
auto jRun = nlohmann::json::parse(descriptor_json); auto jRun = nlohmann::json::parse(descriptor_json);
auto ctx = reinterpret_cast<Context *>(context); auto ctx = reinterpret_cast<Context *>(context);
ctx->usercode.reload(); ctx->usercode.reload();
// TODO: check if the retrieved attributes are callable // TODO: check if the retrieved attributes are callable
ctx->py_begin_run = ctx->usercode.attr("begin_run"); ctx->py_begin_run = ctx->usercode.attr("begin_run");
ctx->py_process_event = ctx->usercode.attr("process_event"); ctx->py_event_data = ctx->usercode.attr("event_data");
ctx->py_process_system_event = ctx->usercode.attr("process_system_event");
ctx->py_end_run = ctx->usercode.attr("end_run"); ctx->py_end_run = ctx->usercode.attr("end_run");
ctx->py_begin_run(descriptor_json); ctx->py_begin_run(descriptor_json);
ctx->eventBuffers.clear(); ctx->eventBuffers.clear();
@ -66,7 +68,7 @@ MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
MANA_DEFINE_PLUGIN_END_RUN(end_run) MANA_DEFINE_PLUGIN_END_RUN(end_run)
{ {
log_info("end: ctx=%p", context); log_debug("end: context=%p, descriptor_json=%s", context, descriptor_json);
auto ctx = reinterpret_cast<Context *>(context); auto ctx = reinterpret_cast<Context *>(context);
ctx->py_end_run(descriptor_json); ctx->py_end_run(descriptor_json);
} }
@ -75,7 +77,6 @@ MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
{ {
log_trace("event: ctx=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", context, eventIndex, log_trace("event: ctx=%p, eventIndex=%d, arrayCount=%zu, totalBytes=%zu", context, eventIndex,
arrayCount, totalBytes); arrayCount, totalBytes);
auto ctx = reinterpret_cast<Context *>(context); auto ctx = reinterpret_cast<Context *>(context);
auto &buffers = ctx->eventBuffers.at(eventIndex); auto &buffers = ctx->eventBuffers.at(eventIndex);
assert(buffers.size() == arrayCount); assert(buffers.size() == arrayCount);
@ -85,20 +86,18 @@ MANA_DEFINE_PLUGIN_EVENT_DATA(process_event)
ctx->eventBuffers[eventIndex][ai] = ctx->eventBuffers[eventIndex][ai] =
py::memoryview::from_memory(arraySpan.data(), arraySpan.size() * sizeof(float)); py::memoryview::from_memory(arraySpan.data(), arraySpan.size() * sizeof(float));
} }
ctx->py_process_event(eventIndex, ctx->eventBuffers); ctx->py_event_data(eventIndex, ctx->eventBuffers);
} }
MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event) MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event)
{ {
log_debug("system_event: ctx=%p, size=%zu", context, size); log_trace("system_event: ctx=%p, size=%zu", context, size);
auto ctx = reinterpret_cast<Context *>(context); auto ctx = reinterpret_cast<Context *>(context);
auto view = py::memoryview::from_memory(data, size * sizeof(uint32_t));
ctx->py_process_system_event(view);
} }
extern "C" MANA_C_SINK_PLUGIN() extern "C" MANA_DEFINE_GET_PLUGIN(mana_get_plugin)
{ {
mana_sink_plugin_t plugin; mana_plugin_t plugin;
plugin.init = init; plugin.init = init;
plugin.shutdown = shutdown; plugin.shutdown = shutdown;
plugin.begin_run = begin_run; plugin.begin_run = begin_run;

View file

@ -1,12 +1,15 @@
#include <TFile.h> #include <TFile.h>
#include <TH1.h> #include <TH1.h>
#include <filesystem>
#include <regex> #include <regex>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <vector> #include <vector>
#include "internal/mana_lib.hpp" #include "internal/mana_lib.hpp"
#include "internal/rxi/log.hpp"
extern "C"
{
#include "internal/rxi/log.h"
}
using namespace mesytec::mnode; using namespace mesytec::mnode;
@ -17,30 +20,18 @@ struct Context
std::vector<std::vector<TH1 *>> rawHistograms; std::vector<std::vector<TH1 *>> rawHistograms;
}; };
static Context *g_ctx = nullptr;
MANA_DEFINE_PLUGIN_INIT(init) MANA_DEFINE_PLUGIN_INIT(init)
{ {
if (g_ctx)
{
log_warn("init() called multiple times. This plugin is a singleton!");
return g_ctx;
}
log_set_level(LOG_INFO); log_set_level(LOG_INFO);
log_debug("init"); static Context g_ctx;
return g_ctx = new Context; log_debug("init: ctx=%p", &g_ctx);
return &g_ctx;
} }
MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown) MANA_DEFINE_PLUGIN_SHUTDOWN(shutdown)
{ {
(void)context;
log_debug("shutdown"); log_debug("shutdown");
if (context != g_ctx)
{
log_warn("shutdown() called with invalid context");
return;
}
delete g_ctx;
g_ctx = nullptr;
} }
struct ObjectPath struct ObjectPath
@ -154,24 +145,20 @@ inline std::string histo_info(const std::vector<std::vector<TH1 *>> &histos)
} }
} }
return fmt::format("histoCount={}, histoMem={:.2f} MiB", histoCount, return fmt::format("histoCount={}, histoMem={} MiB", histoCount, histoMem / (1024.0 * 1024));
histoMem / (1024.0 * 1024));
} }
MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run) MANA_DEFINE_PLUGIN_BEGIN_RUN(begin_run)
{ {
log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json); log_debug("begin_run: context=%p, descriptor_json=%s", context, descriptor_json);
auto jRun = nlohmann::json::parse(descriptor_json); auto jRun = nlohmann::json::parse(descriptor_json);
std::filesystem::path rp(jRun["name"].get<std::string>());
auto filename = fmt::format("{}_histograms.root", rp.filename().replace_extension().string());
auto ctx = reinterpret_cast<Context *>(context); auto ctx = reinterpret_cast<Context *>(context);
ctx->hitCounts.clear(); ctx->hitCounts.clear();
ctx->outputFile = std::make_unique<TFile>(filename.c_str(), "RECREATE"); ctx->outputFile = std::make_unique<TFile>("output.root", "RECREATE");
ctx->hitCounts = make_hitcount_histos(ctx->outputFile.get(), jRun); ctx->hitCounts = make_hitcount_histos(ctx->outputFile.get(), jRun);
ctx->rawHistograms = make_raw_histos(ctx->outputFile.get(), jRun); ctx->rawHistograms = make_raw_histos(ctx->outputFile.get(), jRun);
log_info("hitCount histograms: %s", histo_info(ctx->hitCounts).c_str()); log_info("hitCount histograms: %s", histo_info(ctx->hitCounts).c_str());
log_info("raw histograms: %s", histo_info(ctx->rawHistograms).c_str()); log_info("raw histograms: %s", histo_info(ctx->rawHistograms).c_str());
log_info("writing histograms into: %s", filename.c_str());
} }
MANA_DEFINE_PLUGIN_END_RUN(end_run) MANA_DEFINE_PLUGIN_END_RUN(end_run)
@ -215,9 +202,9 @@ MANA_DEFINE_PLUGIN_SYSTEM_EVENT(process_system_event)
log_trace("system_event: ctx=%p, size=%zu", context, size); log_trace("system_event: ctx=%p, size=%zu", context, size);
} }
extern "C" MANA_C_SINK_PLUGIN() extern "C" MANA_DEFINE_GET_PLUGIN(mana_get_plugin)
{ {
mana_sink_plugin_t plugin; mana_plugin_t plugin;
plugin.init = init; plugin.init = init;
plugin.shutdown = shutdown; plugin.shutdown = shutdown;
plugin.begin_run = begin_run; plugin.begin_run = begin_run;

View file

@ -1,16 +1,13 @@
import sys import json
import py_mana
def begin_run(runDescription: str): def begin_run(runDescription: str):
print(f"python: begin_run - python version: {sys.version}") print("begin_run")
def end_run(runDescription: str): def end_run(runDescription: str):
print(f"python: end_run") print("end_run")
def process_event(eventIndex: int, dataArrays): def event_data(eventIndex: int, dataArrays):
return return
print(f"event[{eventIndex}]: {dataArrays}") print(f"event[{eventIndex}]: {dataArrays}")
def process_system_event(data):
return
print(f"python: system_event: size={len(data)}")

View file

@ -293,11 +293,11 @@ struct NngPairStrategy: public ProcessingStrategy
[](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed) [](const mvlc::util::Stopwatch sw, size_t bufferNumber, size_t totalBytesProcessed)
{ {
auto s = sw.get_elapsed().count() / (1000.0 * 1000.0); auto s = sw.get_elapsed().count() / (1000.0 * 1000.0);
auto MiB = totalBytesProcessed / (1024.0 * 1024); auto bytesPerSecond = totalBytesProcessed / s;
auto MiB_s = MiB / s; auto MiBPerSecond = bytesPerSecond / (1u << 20);
fmt::print("Processed {} mvlc data buffers, {:.2f} MiB. " std::cout << fmt::format(
"elapsed={:.2f} s, rate={:.2f} MiB/s\n", "Processed {} buffers, {} bytes. t={} s, rate={:.2f} MiB/s\n", bufferNumber,
bufferNumber, MiB, s, MiB_s); totalBytesProcessed, s, MiBPerSecond);
}(sw, bufferNumber, totalBytesProcessed); }(sw, bufferNumber, totalBytesProcessed);
}; };
@ -377,7 +377,7 @@ void usage(const char *self)
<< " --plugin=<plugin.so> Load a plugin to process the data. Default is a " << " --plugin=<plugin.so> Load a plugin to process the data. Default is a "
"simple counting plugin.\n" "simple counting plugin.\n"
<< " --processing-strategy=<name> Use a specific processing strategy. " << " --processing-strategy=<name> Use a specific processing strategy. "
"Available: 'single-threaded', 'nng-pair'. Default: 'nng-pair'\n" "Available: 'direct', 'nng-pair'. Default: 'direct'\n"
<< " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', " << " --log-level=<level> One of 'trace', 'debug', 'info', 'warn', 'error', "
"'off'.\n"; "'off'.\n";
} }
@ -413,9 +413,8 @@ int main(int argc, char *argv[])
return 1; return 1;
} }
std::unique_ptr<mana::ManaPlugin> manaPlugin;
boost::dll::shared_library pluginHandle; boost::dll::shared_library pluginHandle;
std::unique_ptr<mana::IManaPlugin> manaCppPlugin;
std::unique_ptr<mana::IManaSink> destSink;
if (parser("--plugin")) if (parser("--plugin"))
{ {
@ -423,27 +422,8 @@ int main(int argc, char *argv[])
try try
{ {
pluginHandle = boost::dll::shared_library(pluginFile); pluginHandle = boost::dll::shared_library(pluginFile);
manaPlugin = std::make_unique<mana::ManaCPlugin>(
try pluginHandle.get<mana_plugin_t()>("mana_get_plugin")());
{
destSink = std::make_unique<mana::ManaCSink>(
pluginHandle.get<mana_sink_plugin_t()>("mana_get_sink_plugin")());
}
catch (const std::exception &e)
{
spdlog::debug("plugin {} is not a MANA_C_SINK_PLUGIN", pluginFile);
try
{
manaCppPlugin = std::unique_ptr<mana::IManaPlugin>(
pluginHandle.get<mana::IManaPlugin *()>("mana_get_plugin")());
destSink = manaCppPlugin->makeSink();
}
catch (const std::exception &e)
{
std::cerr << fmt::format("Error loading plugin: {}\n", e.what());
return 1;
}
}
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
@ -453,22 +433,19 @@ int main(int argc, char *argv[])
} }
else else
{ {
destSink = std::make_unique<mana::ManaCountingSink>(); manaPlugin = std::make_unique<mana::ManaCountingSink>();
} }
auto manaSink = std::make_unique<mana::ManaSinkPerfProxy>(destSink.get()); std::string strategyName = "direct";
std::string strategyName = "nng-pair";
if (parser("--processing-strategy")) if (parser("--processing-strategy"))
strategyName = parser("--processing-strategy").str(); strategyName = parser("--processing-strategy").str();
std::unique_ptr<ProcessingStrategy> processingStrategy; std::unique_ptr<ProcessingStrategy> strategy; //= std::make_unique<SingleThreadedStrategy>();
if (strategyName == "nng-pair") if (strategyName == "nng-pair")
processingStrategy = std::make_unique<NngPairStrategy>("inproc://mana_module_data_stage"); strategy = std::make_unique<NngPairStrategy>("inproc://mana_module_data_stage");
else if (strategyName == "single-threaded") else if (strategyName == "direct")
processingStrategy = std::make_unique<SingleThreadedStrategy>(); strategy = std::make_unique<SingleThreadedStrategy>();
else else
{ {
std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName); std::cerr << fmt::format("Error: unknown processing strategy '{}'\n", strategyName);
@ -477,7 +454,7 @@ int main(int argc, char *argv[])
} }
auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig, auto mana = mana::make_module_data_stage(filename, mana::Arena(), listfileContext->crateConfig,
jModuleDataSources, manaSink.get(), nullptr); jModuleDataSources, manaPlugin.get(), nullptr);
auto event_data = [](void *ctx_, int crateIndex, int eventIndex, auto event_data = [](void *ctx_, int crateIndex, int eventIndex,
const mvlc::readout_parser::ModuleData *moduleDataList, const mvlc::readout_parser::ModuleData *moduleDataList,
@ -504,33 +481,13 @@ int main(int argc, char *argv[])
// make the analysis instance available to the parser callbacks // make the analysis instance available to the parser callbacks
parserContext->parser.userContext = &mana; parserContext->parser.userContext = &mana;
mana.sink->init(0, nullptr); mana.sinkContext = mana.sink->init();
mana.sink->begin_run(mana.runDescriptor.dump().c_str()); mana.sink->begin_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
processingStrategy->run(*listfileContext, *parserContext); strategy->run(*listfileContext, *parserContext);
mana.sink->end_run(mana.runDescriptor.dump().c_str()); mana.sink->end_run(mana.sinkContext, mana.runDescriptor.dump().c_str());
mana.sink->shutdown(); mana.sink->shutdown(mana.sinkContext);
auto perf = manaSink->perf();
{
auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes),
static_cast<size_t>(0));
auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits),
static_cast<size_t>(0));
double elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(perf.t_endRun - perf.t_beginRun)
.count();
double elapsed_s = elapsed_ms / 1000.0;
double MiB = totalBytes / (1024.0 * 1024);
double MiB_s = MiB / elapsed_s;
double hit_s = totalHits / elapsed_s;
fmt::print("Data Sink Performance: events={}, bytes={:.2f} MiB, elapsed={:.2f} s, "
"event_rate={:.2f} event/s, data_rate={:.2f} MiB/s\n",
totalHits, MiB, elapsed_s, hit_s, MiB_s);
}
return 0; return 0;
} }