1+ #pragma once
2+
3+ #include " ../../ring.h"
4+ #include " outputStorage.hpp"
5+
6+ #include < atomic>
7+ #include < condition_variable>
8+ #include < mutex>
9+ #include < optional>
10+
11+ #include < boost/container/static_vector.hpp>
12+
13+ namespace ipxp ::output {
14+
15+ class RingOutputStorage : public OutputStorage {
16+ public:
17+ explicit RingOutputStorage (const uint8_t writersCount) noexcept
18+ : OutputStorage(writersCount)
19+ , m_ring(
20+ ipx_ring_init (
21+ static_cast <uint32_t >(ALLOCATION_BUFFER_CAPACITY * 32 ),
22+ writersCount > 1),
23+ &ipx_ring_destroy)
24+ {
25+ static_assert (sizeof (ContainerWrapper) == sizeof (void *));
26+ }
27+
28+ void storeContainer (ContainerWrapper container) noexcept override
29+ {
30+ if (container.empty ()) {
31+ throw std::runtime_error (" Attempt to store empty container" );
32+ }
33+ ipx_ring_push (m_ring.get (), *reinterpret_cast <void **>(&container));
34+ }
35+
36+ std::optional<ReferenceCounterHandler<OutputContainer>>
37+ getContainer (const std::size_t readerGroupIndex) noexcept override
38+ {
39+ if (ipx_ring_cnt (m_ring.get ()) == 0 ) {
40+ return std::nullopt ;
41+ }
42+
43+ auto pop = ipx_ring_pop (m_ring.get ());
44+ if (pop == nullptr ) {
45+ return std::nullopt ;
46+ }
47+ ContainerWrapper& container = *reinterpret_cast <ContainerWrapper*>(&pop);
48+ return std::make_optional<ReferenceCounterHandler<OutputContainer>>(
49+ getReferenceCounter (container));
50+ }
51+
52+ bool finished (const std::size_t readerGroupIndex) noexcept override
53+ {
54+ return !writersPresent ();
55+ }
56+
57+ private:
58+ std::unique_ptr<ipx_ring_t , decltype (&ipx_ring_destroy)> m_ring;
59+ };
60+
61+ } // namespace ipxp::output
0 commit comments