#pragma once #include #include #include namespace tf { /** * A 'lockless' bounded multi-producer, multi-consumer queue * * Has the caveat that the queue can *appear* empty even if there are * returned items within it as a single thread can block progression * of the queue. */ template class MPMC { constexpr static uint64_t BufferSize = 1ull << LogSize; constexpr static uint64_t BufferMask = (BufferSize - 1); static_assert((BufferSize >= 2) && ((BufferSize & (BufferSize - 1)) == 0)); public: /** * Constructs a bounded multi-producer, multi-consumer queue * * Note: Due to the algorithm used, buffer_size must be a power * of two and must be greater than or equal to two. * * @param buffer_size Number of spaces available in the queue. */ explicit MPMC() { for (size_t i = 0; i < _buffer.size(); i++) { _buffer[i].sequence.store(i, std::memory_order_relaxed); } _enqueue_pos.store(0, std::memory_order_relaxed); _dequeue_pos.store(0, std::memory_order_relaxed); } /** * Enqueues an item into the queue * * @param data Argument to place into the array * @return false if the queue was full (and enqueing failed), * true otherwise */ bool try_enqueue(T data) { Cell *cell; auto pos = _enqueue_pos.load(std::memory_order_relaxed); for (; ;) { cell = &_buffer[pos & BufferMask]; auto seq = cell->sequence.load(std::memory_order_acquire); if (seq == pos) { if (_enqueue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { break; } } else if (seq < pos) { return false; } else { pos = _enqueue_pos.load(std::memory_order_relaxed); } } cell->data = data; cell->sequence.store(pos + 1, std::memory_order_release); return true; } void enqueue(T data) { Cell *cell; auto pos = _enqueue_pos.load(std::memory_order_relaxed); for (; ;) { cell = &_buffer[pos & BufferMask]; auto seq = cell->sequence.load(std::memory_order_acquire); if (seq == pos) { if (_enqueue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { break; } } else { pos = _enqueue_pos.load(std::memory_order_relaxed); } } cell->data = data; cell->sequence.store(pos + 1, std::memory_order_release); } /** * Dequeues an item from the queue * * @param[out] data Reference to place item into * @return false if the queue was empty (and dequeuing failed), * true if successful */ std::optional try_dequeue() { Cell *cell; auto pos = _dequeue_pos.load(std::memory_order_relaxed); for (; ;) { cell = &_buffer[pos & BufferMask]; auto seq = cell->sequence.load(std::memory_order_acquire); if (seq == pos + 1) { if (_dequeue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { break; } } else if (seq < (pos + 1)) { return std::nullopt; } else { pos = _dequeue_pos.load(std::memory_order_relaxed); } } T data = cell->data; cell->sequence.store(pos + BufferMask + 1, std::memory_order_release); return data; } bool empty() const { auto beg = _dequeue_pos.load(std::memory_order_relaxed); auto end = _enqueue_pos.load(std::memory_order_relaxed); return beg >= end; } size_t capacity() const { return BufferSize; } private: struct Cell { T data; std::atomic sequence; }; //static const size_t cacheline_size = 64; alignas(2*TF_CACHELINE_SIZE) std::array _buffer; alignas(2*TF_CACHELINE_SIZE) std::atomic _enqueue_pos; alignas(2*TF_CACHELINE_SIZE) std::atomic _dequeue_pos; }; } // end of namespace tf -----------------------------------------------------