add IManaBufferSink concept

This commit is contained in:
Florian Lüke 2025-01-01 18:21:52 +01:00
parent b85096f649
commit dd939b4808
4 changed files with 72 additions and 0 deletions

View file

@ -31,6 +31,26 @@ class IManaSink
IManaSink &operator=(const IManaSink &) = delete; IManaSink &operator=(const IManaSink &) = delete;
}; };
class IManaBufferSink
{
public:
virtual ~IManaBufferSink() = default;
virtual void begin_run(const char *descriptor_json) = 0;
virtual void end_run(const char *descriptor_json) = 0;
// buffer is the same format as used in mana_nng.hpp. It starts with a MessageHeader::Eventdata
// structure.
virtual void process_event_buffer(const uint8_t *buffer, size_t size) = 0;
virtual void process_system_event(const uint32_t *data, size_t size) = 0;
protected:
IManaBufferSink() = default;
private:
IManaBufferSink(const IManaBufferSink &) = delete;
IManaBufferSink &operator=(const IManaBufferSink &) = delete;
};
class IManaPlugin class IManaPlugin
{ {
public: public:

View file

@ -232,6 +232,54 @@ inline std::string to_string(const ManaSinkPerfProxy::Perf &perf)
totalHits, MiB, elapsed_s, hit_s, MiB_s); totalHits, MiB, elapsed_s, hit_s, MiB_s);
} }
#if 0
class ManaBufferingSink: public IManaSink
{
public:
using flush_predicate = std::function<bool()>;
explicit ManaBufferingSink(IManaBufferSink *dest, flush_predicate doFlush)
: dest_(dest)
, doFlush_(doFlush)
{
}
void begin_run(const char *descriptor_json) override { dest_->begin_run(descriptor_json); }
void end_run(const char *descriptor_json) override { dest_->end_run(descriptor_json); }
void process_event(uint16_t eventIndex, mana_offset_array_t *arrays, size_t arrayCount,
size_t totalBytes) override
{
}
void process_system_event(const uint32_t *data, size_t size) override {}
private:
IManaBufferSink *dest_;
flush_predicate doFlush_;
};
class ManaDebufferingSink: public IManaBufferSink
{
public:
explicit ManaDebufferingSink(IManaSink *dest)
: dest_(dest)
{
}
void begin_run(const char *descriptor_json) override { dest_->begin_run(descriptor_json); }
void end_run(const char *descriptor_json) override { dest_->end_run(descriptor_json); }
void process_event_buffer(const uint8_t *buffer, size_t size) override {}
void process_system_event(const uint32_t *data, size_t size) override {}
private:
IManaSink *dest_;
};
#endif
struct mana_c_error: public std::exception struct mana_c_error: public std::exception
{ {
mana_status_t status; mana_status_t status;

View file

@ -29,8 +29,10 @@ struct EventDataHeader
u32 sizeBytes; u32 sizeBytes;
}; };
// clang-format: off
// EventData message layout: MessageHeader, repeated(EventDataHeader + // EventData message layout: MessageHeader, repeated(EventDataHeader +
// mana_offset_array_t[arrayCount]) // mana_offset_array_t[arrayCount])
// clang-format: on
class NngServerSink: public IManaSink class NngServerSink: public IManaSink
{ {

View file

@ -246,6 +246,8 @@ struct NngPairStrategy: public ReplayStrategy
while (!quit) while (!quit)
{ {
// TODO: eliminate this buffer. read directly into an allocated nng_msg. do usb fixup in
// there.
listfileContext.readerHelper.destBuf().clear(); listfileContext.readerHelper.destBuf().clear();
auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper); auto buffer = mvlc::listfile::read_next_buffer(listfileContext.readerHelper);