Skip to content

Commit 72ab23b

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent f654ded commit 72ab23b

18 files changed

+177
-128
lines changed

include/ipfixprobe/outputPlugin/outputStorage/allocationBuffer.hpp renamed to include/ipfixprobe/outputPlugin/outputStorage/allocationBuffer.hppxxx

File renamed without changes.

include/ipfixprobe/outputPlugin/outputStorage/allocationBuffer3.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class AllocationBuffer3 : public AllocationBufferBase<ElementType> {
9898
std::vector<CacheAlligned<WriterData>> m_writersData;
9999

100100
std::array<CacheAlligned<Queue>, 32> m_queues;
101-
std::atomic_uint64_t m_nextQueue {0};
101+
// std::atomic<uint64_t> m_nextQueue {0};
102102
};
103103

104104
} // namespace ipxp::output

include/ipfixprobe/outputPlugin/outputStorage/allocationBufferB.hpp

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
2424
constexpr static std::size_t WINDOW_SIZE = 16;
2525

2626
public:
27-
__attribute__((noinline)) std::size_t d_test(auto& container)
27+
/*__attribute__((noinline)) std::size_t d_test(auto& container)
2828
{
2929
return std::ranges::count_if(container, [](const auto& bucket) {
3030
return bucket.load(std::memory_order_acquire) != std::numeric_limits<uint16_t>::max();
3131
});
32-
}
32+
}*/
3333

3434
explicit AllocationBufferB(const std::size_t capacity, const uint8_t writersCount) noexcept
3535
: m_objectPool(capacity + writersCount * BUCKET_SIZE)
@@ -49,6 +49,8 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
4949
const std::size_t bucketIndex = elementIndex / BUCKET_SIZE;
5050
m_buckets[bucketIndex].storage[elementIndex % BUCKET_SIZE] = &element;
5151
}
52+
m_fullBuckets.resize(m_buckets.size());
53+
m_emptyBuckets.resize(m_buckets.size());
5254
for (std::size_t i = 0; i < m_buckets.size(); i++) {
5355
m_fullBuckets[i].store(i);
5456
m_emptyBuckets[i].store(Bucket::PLACEHOLDER);
@@ -67,7 +69,7 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
6769
}
6870
}
6971

70-
void unregisterWriter(const uint8_t writerIndex) noexcept override {}
72+
// void unregisterWriter(const uint8_t writerIndex) noexcept override {}
7173

7274
ElementType* allocate(const uint8_t writerIndex) noexcept override
7375
{
@@ -135,15 +137,15 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
135137

136138
void pushBucket(auto& buckets, const std::size_t bucketIndex, std::size_t& pushRank) noexcept
137139
{
138-
std::size_t offset = 0;
140+
// std::size_t offset = 0;
139141
while (true) {
140142
uint16_t expected = buckets[pushRank].load(std::memory_order_acquire);
141-
if (++offset % 100'000'000 == 0) {
143+
/*if (++offset % 100'000'000 == 0) {
142144
std::cout << "d_test(push)=" << d_test(buckets) << "\n";
143-
}
145+
}*/
144146
if (expected != Bucket::PLACEHOLDER) {
145147
const std::size_t newPushRank
146-
= ((pushRank / INDEXES_IN_CACHE_LINE + 1) * INDEXES_IN_CACHE_LINE + offset)
148+
= ((pushRank / INDEXES_IN_CACHE_LINE + 1) * INDEXES_IN_CACHE_LINE)
147149
% buckets.size();
148150
if (newPushRank < pushRank) {
149151
// offset++;
@@ -164,12 +166,12 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
164166

165167
uint16_t popBucket(auto& buckets, std::size_t& popRank) noexcept
166168
{
167-
std::size_t offset = 0;
169+
// std::size_t offset = 0;
168170
popRank = (popRank - 1 + buckets.size()) % buckets.size();
169171
while (true) {
170-
if (++offset % 100'000'000 == 0) {
172+
/*if (++offset % 100'000'000 == 0) {
171173
std::cout << "d_test(pop)=" << d_test(buckets) << "\n";
172-
}
174+
}*/
173175
uint16_t expected = buckets[popRank].load(std::memory_order_acquire);
174176
if (expected == Bucket::PLACEHOLDER) {
175177
popRank
@@ -190,8 +192,10 @@ class AllocationBufferB : public AllocationBufferBase<ElementType> {
190192

191193
std::vector<ElementType> m_objectPool;
192194
std::vector<Bucket> m_buckets;
193-
std::array<std::atomic<uint16_t>, 65536> m_fullBuckets;
194-
std::array<std::atomic<uint16_t>, 65536> m_emptyBuckets;
195+
// std::array<std::atomic<uint16_t>, 65536> m_fullBuckets;
196+
// std::array<std::atomic<uint16_t>, 65536> m_emptyBuckets;
197+
std::deque<std::atomic<uint16_t>> m_fullBuckets;
198+
std::deque<std::atomic<uint16_t>> m_emptyBuckets;
195199
std::vector<CacheAlligned<WriterData>> m_writersData;
196200
};
197201

include/ipfixprobe/outputPlugin/outputStorage/allocationBufferBase.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ class AllocationBufferBase {
99
std::is_default_constructible_v<ElementType>,
1010
"ElementType must be default constructible");
1111

12-
virtual ElementType* allocate(const uint8_t writerId) noexcept = 0;
12+
virtual ElementType* allocate(const uint8_t writerIndex) noexcept = 0;
1313

14-
virtual void deallocate(ElementType* element, const uint8_t writerId) noexcept = 0;
14+
virtual void deallocate(ElementType* element, const uint8_t writerIndex) noexcept = 0;
1515

16-
virtual void unregisterWriter([[maybe_unused]] const uint8_t writerId) noexcept {}
16+
virtual void unregisterWriter([[maybe_unused]] const uint8_t writerIndex) noexcept {}
1717

18-
virtual void registerWriter([[maybe_unused]] const uint8_t writerId) noexcept {}
18+
virtual void registerWriter([[maybe_unused]] const uint8_t writerIndex) noexcept {}
1919

2020
virtual ~AllocationBufferBase() = default;
2121

include/ipfixprobe/outputPlugin/outputStorage/allocationBufferR.hpp

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ template<typename ElementType>
1717
class AllocationBufferR : public AllocationBufferBase<ElementType> {
1818
public:
1919
explicit AllocationBufferR(const std::size_t capacity, const uint8_t writersCount) noexcept
20-
: m_objectPool(capacity + 4 * writersCount)
20+
: m_objectPool(capacity) // Too small for 8 reader groups
2121
, m_writersCount(writersCount)
2222
{
2323
std::ranges::transform(
@@ -28,13 +28,20 @@ class AllocationBufferR : public AllocationBufferBase<ElementType> {
2828
m_controlBlock.emplace(m_pointers.size(), m_writersCount);
2929
}
3030

31-
void registerWriter() noexcept override { m_controlBlock->registerWriter(); }
32-
void unregisterWriter() noexcept override { m_controlBlock->unregisterWriter(); }
31+
void registerWriter([[maybe_unused]] const uint8_t writerIndex) noexcept override
32+
{
33+
m_controlBlock->registerWriter();
34+
}
35+
36+
void unregisterWriter([[maybe_unused]] const uint8_t writerIndex) noexcept override
37+
{
38+
m_controlBlock->unregisterWriter();
39+
}
3340

34-
ElementType* allocate([[maybe_unused]] const uint8_t writerId) noexcept override
41+
ElementType* allocate([[maybe_unused]] const uint8_t writerIndex) noexcept override
3542
{
36-
const std::optional<uint16_t> readPos = std::invoke([&]() {
37-
std::optional<uint16_t> res = std::nullopt;
43+
const std::optional<uint32_t> readPos = std::invoke([&]() {
44+
std::optional<uint32_t> res = std::nullopt;
3845
while (!res.has_value()) {
3946
res = m_controlBlock->getReadPos();
4047
}
@@ -49,9 +56,13 @@ class AllocationBufferR : public AllocationBufferBase<ElementType> {
4956
return res;
5057
}
5158

52-
void deallocate(ElementType* element, [[maybe_unused]] const uint8_t writerId) noexcept override
59+
void
60+
deallocate(ElementType* element, [[maybe_unused]] const uint8_t writerIndex) noexcept override
5361
{
54-
const std::optional<uint16_t> writePos = m_controlBlock->getWritePos();
62+
std::optional<uint32_t> writePos = m_controlBlock->getWritePos();
63+
while (!writePos.has_value()) {
64+
writePos = m_controlBlock->getWritePos();
65+
}
5566
if (!writePos.has_value()) {
5667
throw std::runtime_error("Should not happen");
5768
}

include/ipfixprobe/outputPlugin/outputStorage/b2OutputStorage.hpp

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <random>
1515

1616
#include <boost/container/static_vector.hpp>
17+
#include <immintrin.h>
1718

1819
namespace ipxp::output {
1920

@@ -29,6 +30,8 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
2930
{
3031
}
3132

33+
~B2OutputStorage() override { std::cout << "In loop: " << d_reads << std::endl; }
34+
3235
bool write(
3336
const Reference<OutputContainer<ElementType>>& container,
3437
const uint8_t writerIndex) noexcept override
@@ -80,18 +83,25 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
8083
this->m_buckets[writerData.writePosition].bucketIndex = writerData.bucketAllocation.reset(
8184
this->m_buckets[writerData.writePosition].bucketIndex);
8285

83-
const uint64_t highestReaderGeneration
84-
= this->m_highestReaderGeneration.load(std::memory_order_acquire);
85-
if (writerData.generation.load(std::memory_order_acquire)
86-
< highestReaderGeneration + BOutputStorage<ElementType>::WINDOW_SIZE) {
87-
writerData.generation.store(
88-
highestReaderGeneration + BOutputStorage<ElementType>::WINDOW_SIZE,
89-
std::memory_order_release);
90-
}
91-
this->m_buckets[writerData.writePosition].generation.store(
92-
writerData.generation.load(std::memory_order_acquire),
93-
std::memory_order_release);
86+
/*if (this->m_highestReaderGeneration.load(std::memory_order_acquire) >= x) {
87+
throw std::runtime_error("Shnejne?");
88+
}*/
9489

90+
const uint8_t correspondingReaderIndex
91+
= writerData.writePosition % this->m_expectedReadersCount;
92+
uint64_t generationToStore = 0;
93+
do {
94+
generationToStore = this->m_readersData[correspondingReaderIndex]->generation.load(
95+
std::memory_order_acquire)
96+
+ BOutputStorage<ElementType>::WINDOW_SIZE;
97+
this->m_buckets[writerData.writePosition].generation.store(
98+
generationToStore,
99+
std::memory_order_release);
100+
// TODO REMOVE DEBUG COUNTER
101+
d_reads.fetch_add(1, std::memory_order_acq_rel);
102+
} while (this->m_readersData[correspondingReaderIndex]->generation.load(
103+
std::memory_order_acquire)
104+
>= generationToStore);
95105
this->m_buckets[writerData.writePosition].lock.unlock();
96106

97107
if (containersLeft == 0) {
@@ -135,13 +145,15 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
135145
readerData.skipLoop = false;
136146
updateLowestReaderGeneration();
137147
}
148+
this->m_buckets[readerData.readPosition].lock.lock();
138149
cachedGeneration = this->m_buckets[readerData.readPosition].generation.load(
139150
std::memory_order_acquire);
140151
// std::atomic_thread_fence(std::memory_order_acquire);
141152
cachedBucketIndex = this->m_buckets[readerData.readPosition].bucketIndex;
142153
if (cachedGeneration > readerData.generation.load(std::memory_order_acquire)) {
143154
readerData.seenValidBucket = true;
144155
}
156+
this->m_buckets[readerData.readPosition].lock.unlock();
145157
} while (cachedGeneration != readerData.generation.load(std::memory_order_acquire)
146158
|| !BOutputStorage<ElementType>::BucketAllocation::isValidBucketIndex(
147159
cachedBucketIndex));
@@ -154,7 +166,8 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
154166

155167
/*bool finished([[maybe_unused]] const std::size_t readerGroupIndex) noexcept override
156168
{
157-
return !writersPresent() && m_highestWriterGeneration + 200000 < m_lowestReaderGeneration;
169+
return !writersPresent() && m_highestWriterGeneration + 200000 <
170+
m_lowestReaderGeneration;
158171
}*/
159172

160173
protected:
@@ -170,15 +183,16 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
170183
| std::ranges::to<boost::container::static_vector<
171184
uint64_t,
172185
OutputStorage<ElementType>::MAX_READERS_COUNT>>();
173-
const uint64_t highestReaderGeneration = *std::ranges::max_element(readerGenerations);
174-
casMax(this->m_highestReaderGeneration, highestReaderGeneration);
175-
// m_highestReaderGeneration = highestReaderGeneration;
186+
// const uint64_t highestReaderGeneration = *std::ranges::max_element(readerGenerations);
187+
// casMax(this->m_highestReaderGeneration, highestReaderGeneration);
188+
// m_highestReaderGeneration = highestReaderGeneration;
176189
const uint64_t lowestReaderGeneration = *std::ranges::min_element(readerGenerations);
177190
// casMin(m_lowestReaderGeneration, lowestReaderGeneration);
178191
this->m_lowestReaderGeneration.store(lowestReaderGeneration, std::memory_order_release);
179192
}
180193

181194
std::atomic<uint64_t> m_highestWriterGeneration {0};
195+
std::atomic<uint64_t> d_reads {0};
182196
};
183197

184198
} // namespace ipxp::output

include/ipfixprobe/outputPlugin/outputStorage/bOutputStorage.hpp

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ namespace ipxp::output {
1818
template<typename ElementType>
1919
class BOutputStorage : public OutputStorage<ElementType> {
2020
protected:
21+
constexpr static std::size_t WINDOW_SIZE = 15;
2122
constexpr static std::size_t BUCKET_SIZE = 128;
2223
constexpr static std::size_t BUCKET_COUNT
2324
= OutputStorage<ElementType>::STORAGE_CAPACITY / BUCKET_SIZE;
@@ -76,6 +77,17 @@ class BOutputStorage : public OutputStorage<ElementType> {
7677
OutputStorage<ElementType>::registerWriter(writerIndex);
7778
}
7879

80+
void unregisterWriter(const uint8_t writerIndex) noexcept override
81+
{
82+
WriterData& writerData = this->m_writersData[writerIndex].get();
83+
/*const uint64_t highestReaderGeneration
84+
= this->m_highestReaderGeneration.load(std::memory_order_acquire);*/
85+
writerData.generation.store(
86+
this->getHighestReaderGeneration() + WINDOW_SIZE,
87+
std::memory_order_release);
88+
OutputStorage<ElementType>::unregisterWriter(writerIndex);
89+
}
90+
7991
void registerReader(const uint8_t readerIndex) noexcept override
8092
{
8193
// std::unique_lock<std::mutex> lock(m_registrationMutex);
@@ -158,11 +170,14 @@ class BOutputStorage : public OutputStorage<ElementType> {
158170
= writerData.bucketAllocation.reset(m_buckets[writerData.writePosition].bucketIndex);
159171
// std::atomic_thread_fence(std::memory_order_release);
160172

161-
writerData.generation.store(
173+
/*writerData.generation.store(
162174
m_highestReaderGeneration.load(std::memory_order_acquire) + WINDOW_SIZE,
163-
std::memory_order_release);
175+
std::memory_order_release);*/
176+
const uint8_t correspondingReaderIndex
177+
= writerData.writePosition % this->m_expectedReadersCount;
164178
m_buckets[writerData.writePosition].generation.store(
165-
writerData.generation.load(std::memory_order_acquire),
179+
m_readersData[correspondingReaderIndex]->generation.load(std::memory_order_acquire)
180+
+ WINDOW_SIZE,
166181
std::memory_order_release);
167182

168183
m_buckets[writerData.writePosition].lock.unlock();
@@ -237,13 +252,6 @@ class BOutputStorage : public OutputStorage<ElementType> {
237252
}
238253

239254
protected:
240-
uint16_t remap(const uint16_t index) noexcept
241-
{
242-
return index;
243-
// TODO test another remap strategy
244-
// return __builtin_bitreverse32(static_cast<uint32_t>(index)) & (BUCKET_SIZE - 1);
245-
}
246-
247255
struct WriterData {
248256
explicit WriterData(FastRandomGenerator<uint8_t>& randomGenerator) noexcept
249257
: randomHandler(randomGenerator.getHandler())
@@ -281,8 +289,8 @@ class BOutputStorage : public OutputStorage<ElementType> {
281289
| std::ranges::to<boost::container::static_vector<
282290
uint64_t,
283291
OutputStorage<ElementType>::MAX_READERS_COUNT>>();
284-
const uint64_t highestReaderGeneration = *std::ranges::max_element(readerGenerations);
285-
m_highestReaderGeneration.store(highestReaderGeneration, std::memory_order_release);
292+
// const uint64_t highestReaderGeneration = *std::ranges::max_element(readerGenerations);
293+
// m_highestReaderGeneration.store(highestReaderGeneration, std::memory_order_release);
286294
const uint64_t lowestReaderGeneration = *std::ranges::min_element(readerGenerations);
287295
m_lowestReaderGeneration.store(lowestReaderGeneration, std::memory_order_release);
288296
}
@@ -301,6 +309,19 @@ class BOutputStorage : public OutputStorage<ElementType> {
301309
return *std::ranges::max_element(writerGenerations);
302310
}
303311

312+
uint64_t getHighestReaderGeneration() const noexcept
313+
{
314+
boost::container::static_vector<uint64_t, OutputStorage<ElementType>::MAX_READERS_COUNT>
315+
readerGenerations
316+
= m_readersData
317+
| std::views::transform([](const CacheAlligned<ReaderData>& readerDataAlligned) {
318+
return readerDataAlligned->generation.load(std::memory_order_acquire);
319+
})
320+
| std::ranges::to<boost::container::static_vector<
321+
uint64_t,
322+
OutputStorage<ElementType>::MAX_READERS_COUNT>>();
323+
return *std::ranges::max_element(readerGenerations);
324+
}
304325
struct ReaderData {
305326
BucketAllocation bucketAllocation {};
306327
uint16_t readPosition;
@@ -361,7 +382,6 @@ class BOutputStorage : public OutputStorage<ElementType> {
361382
// std::atomic<uint64_t> m_highestWriterGeneration {1};
362383

363384
// std::vector<uint16_t> m_readIndex;
364-
constexpr static std::size_t WINDOW_SIZE = 4;
365385
boost::container::
366386
static_vector<CacheAlligned<ReaderData>, OutputStorage<ElementType>::MAX_READERS_COUNT>
367387
m_readersData;
@@ -373,7 +393,7 @@ class BOutputStorage : public OutputStorage<ElementType> {
373393
m_writersData.capacity()};
374394

375395
std::atomic<uint64_t> m_lowestReaderGeneration {1};
376-
std::atomic<uint64_t> m_highestReaderGeneration {1};
396+
// std::atomic<uint64_t> m_highestReaderGeneration {1};
377397
boost::container::
378398
static_vector<std::atomic_uint64_t, OutputStorage<ElementType>::MAX_WRITERS_COUNT>
379399
m_alreadyReadGroupPositions;

0 commit comments

Comments
 (0)