#ifndef AAB5E4D2_A05B_4F2F_B76A_406A5A569D55 #define AAB5E4D2_A05B_4F2F_B76A_406A5A569D55 #include "mana_api.hpp" #include "mana_arena.h" #include #include #include #include #include #include #include #include #include namespace mesytec::mnode::mana { namespace detail { inline void set(mana_offset_ptr_t &ptr, void *p) { assert(p != reinterpret_cast(&ptr) + 1); if (p) { ptr.offset = reinterpret_cast(p) - reinterpret_cast(&ptr); // spdlog::info("detail::set: &ptr={}, p={}, offset={}", fmt::ptr(&ptr), fmt::ptr(p), // ptr.offset); } else { ptr.offset = 1; } } } // namespace detail inline bool is_null(const mana_offset_ptr_t &ptr) { return ptr.offset == 1; } inline void *get(mana_offset_ptr_t &ptr) { return is_null(ptr) ? nullptr : reinterpret_cast(&ptr) + ptr.offset; } template T *get(mana_offset_ptr_t &ptr); // to catch unsupported types template T *get_(mana_offset_ptr_t &ptr, mana_data_type_t expected) { if (ptr.data_type != expected) return nullptr; return reinterpret_cast(get(ptr)); } template <> u32 *get(mana_offset_ptr_t &ptr) { return get_(ptr, mana_uint32); } template <> u64 *get(mana_offset_ptr_t &ptr) { return get_(ptr, mana_uint64); } template <> s8 *get(mana_offset_ptr_t &ptr) { return get_(ptr, mana_sint8); } template <> char *get(mana_offset_ptr_t &ptr) { return get_(ptr, mana_sint8); } template <> float *get(mana_offset_ptr_t &ptr) { return get_(ptr, mana_float); } template <> double *get(mana_offset_ptr_t &ptr) { return get_(ptr, mana_double); } inline void set(mana_offset_ptr_t &ptr, mana_data_type_t data_type, void *p) { ptr.data_type = data_type; detail::set(ptr, p); } inline void set(mana_offset_ptr_t &ptr, std::nullptr_t) { set(ptr, mana_uint32, nullptr); } inline void set(mana_offset_ptr_t &ptr, u32 *p) { set(ptr, mana_uint32, p); } inline void set(mana_offset_ptr_t &ptr, u64 *p) { set(ptr, mana_uint64, p); } inline void set(mana_offset_ptr_t &ptr, s8 *p) { set(ptr, mana_sint8, p); } inline void set(mana_offset_ptr_t &ptr, char *p) { set(ptr, mana_sint8, p); } inline void set(mana_offset_ptr_t &ptr, float *p) { set(ptr, mana_float, p); } inline void set(mana_offset_ptr_t &ptr, double *p) { set(ptr, mana_double, p); } template T *push_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size) { T *ptr = arena.push_t(size); mana::set(dest.ptr, mana_custom, ptr); dest.size_bytes = size * sizeof(T); return ptr; } template T *push_typed_offset_array(mana::Arena &arena, mana_offset_array_t &dest, size_t size) { T *ptr = arena.push_t(size); mana::set(dest.ptr, ptr); dest.size_bytes = size * sizeof(T); return ptr; } template size_t element_count(mana_offset_array_t &array) { return array.size_bytes / sizeof(T); } template mvlc::util::span get_span(mana_offset_array_t &array) { auto ptr = reinterpret_cast(get(array.ptr)); auto size = element_count(array); return {ptr, size}; } inline nlohmann::json make_array_descriptor(const std::string &name, mana_data_type_t data_type, size_t size, size_t bits = 0) { nlohmann::json j; j["name"] = name; j["data_type"] = data_type; j["size"] = size; j["bits"] = bits; if (bits != 0) { j["lower_limit"] = 0u; j["upper_limit"] = 1lu << bits; } return j; } inline nlohmann::json make_array_descriptor(const std::string &name, const std::string &bit_filter) { auto f = mvlc::util::make_filter_with_caches(bit_filter); size_t size = 1; if (auto c = mvlc::util::get_cache_entry(f, 'A')) size = 1u << c->extractBits; size_t bits = 0; if (auto c = mvlc::util::get_cache_entry(f, 'D')) bits = c->extractBits; return make_array_descriptor(name, mana_float, size, bits); } class ManaSinkPerfProxy: public IManaSink { public: struct Perf { using clock = std::chrono::high_resolution_clock; using time_point = std::chrono::time_point; using duration = std::chrono::microseconds; template static duration duration_cast(const T &dt) { return std::chrono::duration_cast(dt); } std::vector eventHits; std::vector eventBytes; duration dt_beginRun; duration dt_endRun; std::vector dt_processEvent; time_point t_beginRun; time_point t_endRun; }; explicit ManaSinkPerfProxy(IManaSink *sink) : sink_(sink) { } const Perf &perf() const { return perf_; } void begin_run(const char *descriptor_json) override { perf_.eventHits.clear(); perf_.eventBytes.clear(); perf_.dt_beginRun = perf_.dt_endRun = {}; perf_.dt_processEvent.clear(); perf_.t_endRun = {}; auto t = Perf::clock::now(); sink_->begin_run(descriptor_json); perf_.t_beginRun = Perf::clock::now(); perf_.dt_beginRun = Perf::duration_cast(Perf::clock::now() - t); } void end_run(const char *descriptor_json) override { auto t = Perf::clock::now(); sink_->end_run(descriptor_json); perf_.t_endRun = Perf::clock::now(); perf_.dt_endRun = Perf::duration_cast(perf_.t_endRun - t); } void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount, size_t totalBytes) override { size_t size = eventIndex + 1; perf_.eventHits.resize(std::max(perf_.eventHits.size(), size)); perf_.eventBytes.resize(std::max(perf_.eventBytes.size(), size)); perf_.dt_processEvent.resize(std::max(perf_.dt_processEvent.size(), size)); auto t = Perf::clock::now(); sink_->process_event(eventIndex, arrays, arrayCount, totalBytes); auto dt = Perf::duration_cast(Perf::clock::now() - t); perf_.eventHits[eventIndex]++; perf_.eventBytes[eventIndex] += totalBytes; perf_.dt_processEvent[eventIndex] += dt; } void process_system_event(const uint32_t *data, size_t size) override { sink_->process_system_event(data, size); } private: IManaSink *sink_; Perf perf_; }; inline std::string to_string(const ManaSinkPerfProxy::Perf &perf) { auto totalBytes = std::accumulate(std::begin(perf.eventBytes), std::end(perf.eventBytes), static_cast(0)); auto totalHits = std::accumulate(std::begin(perf.eventHits), std::end(perf.eventHits), static_cast(0)); double elapsed_ms = std::chrono::duration_cast(perf.t_endRun - perf.t_beginRun) .count(); double elapsed_s = elapsed_ms / 1000.0; double MiB = totalBytes / (1024.0 * 1024); double MiB_s = MiB / elapsed_s; double hit_s = totalHits / elapsed_s; return fmt::format("events={}, bytes={:.2f} MiB, elapsed={:.2f} s, " "event_rate={:.2f} event/s, data_rate={:.2f} MiB/s", totalHits, MiB, elapsed_s, hit_s, MiB_s); } #if 0 class ManaBufferingSink: public IManaSink { public: using flush_predicate = std::function; explicit ManaBufferingSink(IManaBufferSink *dest, flush_predicate doFlush) : dest_(dest) , doFlush_(doFlush) { } void begin_run(const char *descriptor_json) override { dest_->begin_run(descriptor_json); } void end_run(const char *descriptor_json) override { dest_->end_run(descriptor_json); } void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount, size_t totalBytes) override { } void process_system_event(const uint32_t *data, size_t size) override {} private: IManaBufferSink *dest_; flush_predicate doFlush_; }; class ManaDebufferingSink: public IManaBufferSink { public: explicit ManaDebufferingSink(IManaSink *dest) : dest_(dest) { } void begin_run(const char *descriptor_json) override { dest_->begin_run(descriptor_json); } void end_run(const char *descriptor_json) override { dest_->end_run(descriptor_json); } void process_event_buffer(const uint8_t *buffer, size_t size) override {} void process_system_event(const uint32_t *data, size_t size) override {} private: IManaSink *dest_; }; #endif struct mana_c_error: public std::exception { mana_status_t status; mana_c_error(const mana_status_t &status_) : status(status_) { } const char *what() const noexcept override { return status.message; } }; // wraps a mana_api.h mana_sink_plugin_t instance struct ManaCSink: public IManaSink { mana_sink_plugin_t plugin_; void *context_ = nullptr; explicit ManaCSink(mana_sink_plugin_t plugin, int plugin_argc, const char **plugin_argv) : plugin_(plugin) , context_(plugin_.init(plugin_argc, plugin_argv)) { if (!context_) throw std::runtime_error("ManaCSink: plugin init failed"); } ~ManaCSink() override { plugin_.shutdown(context_); } void begin_run(const char *descriptor_json) override { if (auto status = plugin_.begin_run(context_, descriptor_json); status.code != mana_status_ok) { throw mana_c_error(status); } } void end_run(const char *descriptor_json) override { if (auto status = plugin_.end_run(context_, descriptor_json); status.code != mana_status_ok) { throw mana_c_error(status); } } void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount, size_t totalBytes) override { plugin_.process_event(context_, eventIndex, arrays, arrayCount, totalBytes); } void process_system_event(const uint32_t *data, size_t size) override { plugin_.process_system_event(context_, data, size); } }; struct ObjectPath { std::string path; std::string objectName; std::vector components; ObjectPath(const std::string &path, const std::string &sep = "\\.") : path(path) { split_string(path, std::regex(sep), std::back_inserter(components)); if (!components.empty()) { objectName = components.back(); components.pop_back(); } } }; struct PluginWrapper { boost::dll::shared_library pluginHandle; std::unique_ptr manaCppPlugin; std::unique_ptr destSink; }; PluginWrapper load_mana_plugin(const std::string &filename, const std::vector &pluginArgs) { PluginWrapper result; result.pluginHandle = boost::dll::shared_library(filename, boost::dll::load_mode::rtld_global); if (auto entryPoint = result.pluginHandle.get("mana_get_sink_plugin")) { std::vector argv; for (const auto &arg: pluginArgs) argv.push_back(arg.data()); result.destSink = std::make_unique(entryPoint(), argv.size(), argv.data()); } else if (auto entryPoint = result.pluginHandle.get("mana_get_plugin")) { result.manaCppPlugin = std::unique_ptr(entryPoint()); if (!result.manaCppPlugin) throw std::runtime_error("plugin {}: mana_get_plugin() returned nullptr"); result.destSink = result.manaCppPlugin->makeSink(); } return result; } PluginWrapper load_mana_plugin(const std::string &filename, const std::string &cmdline) { return load_mana_plugin(filename, boost::program_options::split_unix(cmdline)); } } // namespace mesytec::mnode::mana #endif /* AAB5E4D2_A05B_4F2F_B76A_406A5A569D55 */