Skip to content

Commit 0dc9888

Browse files
authored
feat: event query acceleration (#2866)
* Aggregated bloom filter impl * Aggregated bloom filter test * Running event filter and persistent storage * Event filter cache and matched blocks iterator * aggregated filters cache and iter test * Integrate aggregated filter * Running filter integration -draft * Running fitler dummy initializer * go generate * lazy loading for running filter and migration * pending filter error fix * No copy query for bloom filter and file rename * blockchain. running filter, public to private * Encapsulate db as a field in RunningEventFilter * Running filter reorg handling spanning previous filter * Running filter reorg step by one block * Reduce allocations for matched blocks, and make iterator reuse same buffer * aggreg. filter & running filter test enhance * Fix linter spelling * go:generate * running filter nil check in iterator ctor * introduced const AggregatedBloomFilterCacheSize * AggregatedBloomFİlter comment indentation * reduce allocations by using value types * MatchedBlockIterator: ctor returns value type instead of pointer * AggregatedBloomFilter: .Copy() renamed to .Clone() * MatchedBlockIterator: MaxScanned and scannedcount uint -> uint64 for consistency * Distribute block ranges to multiple workers on migration * Rename Persist to Write * Rename AggregatedBloomFilterRangeLen to NumBlocksPerFilter * refactor long line * GetAggregatedBloomFilter accessor: return value type * consistent naming in migration function * remove locks from 'WithFallback' * remove locks in AggregatedBloomFilterCache due to inner lock of LRUCache
1 parent 4409ccd commit 0dc9888

18 files changed

+2006
-69
lines changed
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package blockchain
2+
3+
import (
4+
"errors"
5+
6+
"github.com/NethermindEth/juno/core"
7+
"github.com/bits-and-blooms/bitset"
8+
"github.com/ethereum/go-ethereum/common/lru"
9+
)
10+
11+
// NOTE(Ege): consider making it configurable
12+
const AggregatedBloomFilterCacheSize = 16
13+
14+
// Provides cache-accelerated lookup of blockchain events
15+
// across block ranges by aggregating bloom filters. It includes LRU-cached filters
16+
// and efficient block iterators for event queries.
17+
18+
// EventFiltersCacheKey uniquely identifies a range of blocks whose aggregated bloom
19+
// filter is cached. Used as the lookup key for bloom filter caches.
20+
type EventFiltersCacheKey struct {
21+
fromBlock uint64
22+
toBlock uint64
23+
}
24+
25+
// AggregatedBloomFilterCache stores and manages LRU-cached aggregated bloom filters
26+
// for block ranges, supporting fallback loading and bulk insertion.
27+
// It is safe for concurrent use.
28+
type AggregatedBloomFilterCache struct {
29+
cache lru.Cache[EventFiltersCacheKey, *core.AggregatedBloomFilter]
30+
fallbackFunc func(EventFiltersCacheKey) (core.AggregatedBloomFilter, error)
31+
}
32+
33+
// NewAggregatedBloomCache creates a new LRU cache for aggregated bloom filters
34+
// with the specified maximum size (number of ranges to cache).
35+
func NewAggregatedBloomCache(size int) AggregatedBloomFilterCache {
36+
return AggregatedBloomFilterCache{
37+
cache: *lru.NewCache[EventFiltersCacheKey, *core.AggregatedBloomFilter](size),
38+
}
39+
}
40+
41+
// WithFallback sets a fallback fetch function to be used if a requested
42+
// AggregatedBloomFilter is not found in the cache. The provided function must
43+
// return a filter matching the queried range, or an error.
44+
func (c *AggregatedBloomFilterCache) WithFallback(fallback func(EventFiltersCacheKey) (core.AggregatedBloomFilter, error)) {
45+
c.fallbackFunc = fallback
46+
}
47+
48+
// Reset clears the entire bloom filter cache, removing all stored filters.
49+
func (c *AggregatedBloomFilterCache) Reset() {
50+
c.cache.Purge()
51+
}
52+
53+
// SetMany inserts multiple aggregated bloom filters into the cache.
54+
// Each filter is keyed by its block range.
55+
func (c *AggregatedBloomFilterCache) SetMany(filters []*core.AggregatedBloomFilter) {
56+
for _, filter := range filters {
57+
c.cache.Add(
58+
EventFiltersCacheKey{
59+
fromBlock: filter.FromBlock(),
60+
toBlock: filter.ToBlock(),
61+
},
62+
filter,
63+
)
64+
}
65+
}
66+
67+
// MatchedBlockIterator iterates over candidate block numbers within a block range
68+
// that may match an event query, using cached (or fetched) aggregated bloom filters
69+
// for efficient windowed scanning and filtering.
70+
type MatchedBlockIterator struct {
71+
currentBits *bitset.BitSet // current candidate blocks bitset to iterate
72+
nextIndex uint64 // next bit index to test and possibly yield
73+
rangeStart uint64 // starting block number of the filter range
74+
currentWindowStart uint64 // absolute block start of currently loaded window
75+
rangeEnd uint64 // end block number of the filter range
76+
done bool // iteration finished flag
77+
78+
maxScanned uint64 // max number of blocks to iterate (0 = unlimited)
79+
scannedCount uint64 // number of blocks yielded so far
80+
81+
cache *AggregatedBloomFilterCache
82+
runningFilter *core.RunningEventFilter
83+
matcher *EventMatcher
84+
}
85+
86+
var (
87+
ErrInvalidBlockRange = errors.New("fromBlock > toBlock")
88+
ErrMaxScannedBlockLimitExceed = errors.New("max scanned blocks exceeded")
89+
ErrAggregatedBloomFilterFallbackNil = errors.New("aggregated bloom filter does not have fallback")
90+
ErrFetchedFilterBoundsMismatch = errors.New("fetched filter bounds mismatch")
91+
ErrNilRunningFilter = errors.New("running filter is nil")
92+
)
93+
94+
// NewMatchedBlockIterator constructs an iterator for block numbers within [fromBlock, toBlock]
95+
// that may match the given EventMatcher. The scan can be limited to maxScanned candidate
96+
// blocks. It uses cached (or fetched via fallback) AggregatedBloomFilter windows for
97+
// efficiency.
98+
// Returns an error if input is invalid or required state is missing.
99+
func (c *AggregatedBloomFilterCache) NewMatchedBlockIterator(
100+
fromBlock, toBlock uint64,
101+
maxScanned uint64,
102+
matcher *EventMatcher,
103+
runningFilter *core.RunningEventFilter,
104+
) (MatchedBlockIterator, error) {
105+
if fromBlock > toBlock {
106+
return MatchedBlockIterator{}, ErrInvalidBlockRange
107+
}
108+
109+
if runningFilter == nil {
110+
return MatchedBlockIterator{}, ErrNilRunningFilter
111+
}
112+
113+
windowStart := fromBlock - (fromBlock % core.NumBlocksPerFilter)
114+
return MatchedBlockIterator{
115+
rangeStart: fromBlock,
116+
rangeEnd: toBlock,
117+
maxScanned: maxScanned,
118+
cache: c,
119+
runningFilter: runningFilter,
120+
matcher: matcher,
121+
currentWindowStart: windowStart,
122+
}, nil
123+
}
124+
125+
// loadNextWindow prepares the iterator to scan the next window of blocks,
126+
// loading or fetching the corresponding AggregatedBloomFilter as necessary.
127+
// Advances currentBits and nextIndex appropriately for iteration.
128+
// Returns an error if the cache or fallback retrieval fails, or if a filter's block range is inconsistent.
129+
func (it *MatchedBlockIterator) loadNextWindow() error {
130+
if it.done {
131+
return nil
132+
}
133+
134+
// Calculate next window start aligned to block range
135+
var windowStart uint64
136+
if it.currentBits == nil {
137+
it.currentBits = bitset.New(uint(core.NumBlocksPerFilter))
138+
windowStart = it.currentWindowStart
139+
it.nextIndex = it.rangeStart % core.NumBlocksPerFilter // offset for first window
140+
} else {
141+
windowStart = it.currentWindowStart + core.NumBlocksPerFilter
142+
it.nextIndex = 0 // offset 0 for subsequent windows
143+
}
144+
145+
if windowStart > it.rangeEnd {
146+
it.done = true
147+
return nil
148+
}
149+
150+
fromAligned := windowStart - (windowStart % core.NumBlocksPerFilter)
151+
toAligned := fromAligned + core.NumBlocksPerFilter - 1
152+
153+
// Falls into range of running filter
154+
if fromAligned == it.runningFilter.FromBlock() {
155+
err := it.matcher.getCandidateBlocksForFilterInto(it.runningFilter.InnerFilter(), it.currentBits)
156+
if err != nil {
157+
return err
158+
}
159+
it.currentWindowStart = fromAligned // set current window start absolute index
160+
return nil
161+
}
162+
163+
key := EventFiltersCacheKey{fromBlock: fromAligned, toBlock: toAligned}
164+
filter, ok := it.cache.cache.Get(key)
165+
166+
if ok {
167+
err := it.matcher.getCandidateBlocksForFilterInto(filter, it.currentBits)
168+
if err != nil {
169+
return err
170+
}
171+
it.currentWindowStart = fromAligned // set current window start absolute index
172+
return nil
173+
}
174+
175+
// Not found in cache and not fall into range of running filter
176+
if it.cache.fallbackFunc == nil {
177+
return ErrAggregatedBloomFilterFallbackNil
178+
}
179+
180+
fetched, err := it.cache.fallbackFunc(key)
181+
if err != nil {
182+
return err
183+
}
184+
filter = &fetched
185+
if filter.FromBlock() != fromAligned || filter.ToBlock() != toAligned {
186+
return ErrFetchedFilterBoundsMismatch
187+
}
188+
189+
it.cache.cache.Add(EventFiltersCacheKey{fromBlock: filter.FromBlock(), toBlock: filter.ToBlock()}, filter)
190+
191+
err = it.matcher.getCandidateBlocksForFilterInto(filter, it.currentBits)
192+
if err != nil {
193+
return err
194+
}
195+
it.currentWindowStart = fromAligned // set current window start absolute index
196+
return nil
197+
}
198+
199+
// Next advances the iterator to the next matching block number within the scanned range.
200+
// Returns the next candidate block number (absolute), a boolean indicating if such exists,
201+
// and any error encountered (including scan limit exhaustion or fallback fetch errors).
202+
// When ok == false and error is nil, the iteration is complete.
203+
func (it *MatchedBlockIterator) Next() (uint64, bool, error) {
204+
if it.done {
205+
return 0, false, nil
206+
}
207+
208+
/// Load the first filter
209+
if it.currentBits == nil {
210+
if err := it.loadNextWindow(); err != nil {
211+
it.done = true
212+
return 0, false, err
213+
}
214+
if it.done {
215+
return 0, false, nil
216+
}
217+
}
218+
219+
// Search till finding next set bit or iterator exhausts
220+
next, found := it.currentBits.NextSet(uint(it.nextIndex))
221+
for !found {
222+
if err := it.loadNextWindow(); err != nil {
223+
it.done = true
224+
return 0, false, err
225+
}
226+
227+
if it.done {
228+
return 0, false, nil
229+
}
230+
next, found = it.currentBits.NextSet(uint(it.nextIndex))
231+
}
232+
233+
// Calculate absolute block number relative to current window
234+
blockNum := it.currentWindowStart + uint64(next)
235+
if blockNum > it.rangeEnd {
236+
it.done = true
237+
return 0, false, nil
238+
}
239+
it.nextIndex = uint64(next) + 1
240+
241+
if it.maxScanned > 0 {
242+
it.scannedCount++
243+
if it.scannedCount > it.maxScanned {
244+
it.done = true
245+
return blockNum, false, ErrMaxScannedBlockLimitExceed
246+
}
247+
}
248+
249+
return blockNum, true, nil
250+
}

0 commit comments

Comments
 (0)