Skip to content

Commit 7239e37

Browse files
authored
refactor: Separate matcher logic in EventFilter (#2546)
1 parent f9b4aed commit 7239e37

File tree

2 files changed

+155
-137
lines changed

2 files changed

+155
-137
lines changed

blockchain/event_filter.go

Lines changed: 14 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package blockchain
22

33
import (
4-
"encoding/binary"
54
"errors"
65
"fmt"
76
"io"
@@ -10,7 +9,6 @@ import (
109
"github.com/NethermindEth/juno/core"
1110
"github.com/NethermindEth/juno/core/felt"
1211
"github.com/NethermindEth/juno/db"
13-
"github.com/bits-and-blooms/bloom/v3"
1412
)
1513

1614
var errChunkSizeReached = errors.New("chunk size reached")
@@ -26,13 +24,12 @@ type EventFilterer interface {
2624
}
2725

2826
type EventFilter struct {
29-
txn db.Transaction
30-
fromBlock uint64
31-
toBlock uint64
32-
contractAddress *felt.Felt
33-
keys [][]felt.Felt
34-
maxScanned uint // maximum number of scanned blocks in single call.
35-
pendingBlockFn func() *core.Block
27+
txn db.Transaction
28+
fromBlock uint64
29+
toBlock uint64
30+
matcher EventMatcher
31+
maxScanned uint // maximum number of scanned blocks in single call.
32+
pendingBlockFn func() *core.Block
3633
}
3734

3835
type EventFilterRange uint
@@ -46,13 +43,12 @@ func newEventFilter(txn db.Transaction, contractAddress *felt.Felt, keys [][]fel
4643
pendingBlockFn func() *core.Block,
4744
) *EventFilter {
4845
return &EventFilter{
49-
txn: txn,
50-
contractAddress: contractAddress,
51-
keys: keys,
52-
fromBlock: fromBlock,
53-
toBlock: toBlock,
54-
maxScanned: math.MaxUint,
55-
pendingBlockFn: pendingBlockFn,
46+
txn: txn,
47+
matcher: NewEventMatcher(contractAddress, keys),
48+
fromBlock: fromBlock,
49+
toBlock: toBlock,
50+
maxScanned: math.MaxUint,
51+
pendingBlockFn: pendingBlockFn,
5652
}
5753
}
5854

@@ -128,8 +124,6 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
128124
}
129125
}
130126

131-
filterKeysMaps := makeKeysMaps(e.keys)
132-
133127
curBlock := e.fromBlock
134128
// skip the blocks that we previously processed for this request
135129
if cToken != nil {
@@ -151,7 +145,7 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
151145
header = pending.Header
152146
}
153147

154-
if possibleMatches := e.testBloom(header.EventsBloom, filterKeysMaps); !possibleMatches {
148+
if possibleMatches := e.matcher.TestBloom(header.EventsBloom); !possibleMatches {
155149
// bloom filter says no events match the filter, skip this block entirely if from is not nil
156150
continue
157151
}
@@ -167,7 +161,7 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
167161
}
168162

169163
var processedEvents uint64
170-
matchedEvents, processedEvents, err = e.appendBlockEvents(matchedEvents, header, receipts, filterKeysMaps, cToken, chunkSize)
164+
matchedEvents, processedEvents, err = e.matcher.AppendBlockEvents(matchedEvents, header, receipts, cToken, chunkSize)
171165
if err != nil {
172166
if errors.Is(err, errChunkSizeReached) {
173167
rToken = &ContinuationToken{fromBlock: curBlock, processedEvents: processedEvents}
@@ -182,120 +176,3 @@ func (e *EventFilter) Events(cToken *ContinuationToken, chunkSize uint64) ([]*Fi
182176
}
183177
return matchedEvents, rToken, nil
184178
}
185-
186-
func (e *EventFilter) testBloom(bloomFilter *bloom.BloomFilter, keysMap []map[felt.Felt]struct{}) bool {
187-
possibleMatches := true
188-
if e.contractAddress != nil {
189-
addrBytes := e.contractAddress.Bytes()
190-
possibleMatches = bloomFilter.Test(addrBytes[:])
191-
// bloom filter says no events from this contract
192-
if !possibleMatches {
193-
return possibleMatches
194-
}
195-
}
196-
197-
for index, kMap := range keysMap {
198-
for key := range kMap {
199-
keyBytes := key.Bytes()
200-
keyAndIndexBytes := binary.AppendVarint(keyBytes[:], int64(index))
201-
202-
// check if block possibly contains the event we are looking for
203-
possibleMatches = bloomFilter.Test(keyAndIndexBytes)
204-
// possible match for this index, no need to continue checking the rest of the keys
205-
if possibleMatches {
206-
break
207-
}
208-
}
209-
210-
// no key on this index matches the filter
211-
if !possibleMatches {
212-
break
213-
}
214-
}
215-
216-
return possibleMatches
217-
}
218-
219-
func (e *EventFilter) appendBlockEvents(matchedEventsSofar []*FilteredEvent, header *core.Header,
220-
receipts []*core.TransactionReceipt, keysMap []map[felt.Felt]struct{}, cToken *ContinuationToken, chunkSize uint64,
221-
) ([]*FilteredEvent, uint64, error) {
222-
processedEvents := uint64(0)
223-
for _, receipt := range receipts {
224-
for i, event := range receipt.Events {
225-
var blockNumber *uint64
226-
// if header.Hash == nil it's a pending block
227-
if header.Hash != nil {
228-
blockNumber = &header.Number
229-
}
230-
231-
// if last request was interrupted mid-block, and we are still processing that block, skip events
232-
// that were already processed
233-
if cToken != nil && header.Number == cToken.fromBlock && processedEvents < cToken.processedEvents {
234-
processedEvents++
235-
continue
236-
}
237-
238-
if e.contractAddress != nil && !event.From.Equal(e.contractAddress) {
239-
processedEvents++
240-
continue
241-
}
242-
243-
if e.matchesEventKeys(event.Keys, keysMap) {
244-
if uint64(len(matchedEventsSofar)) < chunkSize {
245-
matchedEventsSofar = append(matchedEventsSofar, &FilteredEvent{
246-
BlockNumber: blockNumber,
247-
BlockHash: header.Hash,
248-
TransactionHash: receipt.TransactionHash,
249-
EventIndex: i,
250-
Event: event,
251-
})
252-
} else {
253-
// we are at the capacity, return what we have accumulated so far and a continuation token
254-
return matchedEventsSofar, processedEvents, errChunkSizeReached
255-
}
256-
}
257-
// count the events we processed for this block to include in the continuation token
258-
processedEvents++
259-
}
260-
}
261-
return matchedEventsSofar, processedEvents, nil
262-
}
263-
264-
func (e *EventFilter) matchesEventKeys(eventKeys []*felt.Felt, keysMap []map[felt.Felt]struct{}) bool {
265-
// short circuit if event doest have enough keys
266-
for i := len(eventKeys); i < len(keysMap); i++ {
267-
if len(keysMap[i]) > 0 {
268-
return false
269-
}
270-
}
271-
272-
/// e.keys = [["V1", "V2"], [], ["V3"]] means:
273-
/// ((event.Keys[0] == "V1" OR event.Keys[0] == "V2") AND (event.Keys[2] == "V3")).
274-
//
275-
// Essentially
276-
// for each event.Keys[i], (len(e.keys[i]) == 0 OR event.Keys[i] is in e.keys[i]) should hold
277-
for index, eventKey := range eventKeys {
278-
// empty filter keys means match all
279-
if index >= len(keysMap) || len(keysMap[index]) == 0 {
280-
break
281-
}
282-
if _, found := keysMap[index][*eventKey]; !found {
283-
return false
284-
}
285-
}
286-
287-
return true
288-
}
289-
290-
func makeKeysMaps(filterKeys [][]felt.Felt) []map[felt.Felt]struct{} {
291-
filterKeysMaps := make([]map[felt.Felt]struct{}, len(filterKeys))
292-
for index, keys := range filterKeys {
293-
kMap := make(map[felt.Felt]struct{}, len(keys))
294-
for _, key := range keys {
295-
kMap[key] = struct{}{}
296-
}
297-
filterKeysMaps[index] = kMap
298-
}
299-
300-
return filterKeysMaps
301-
}

blockchain/event_matcher.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package blockchain
2+
3+
import (
4+
"encoding/binary"
5+
6+
"github.com/NethermindEth/juno/core"
7+
"github.com/NethermindEth/juno/core/felt"
8+
"github.com/bits-and-blooms/bloom/v3"
9+
)
10+
11+
type EventMatcher struct {
12+
contractAddress *felt.Felt
13+
keysMap []map[felt.Felt]struct{}
14+
}
15+
16+
func NewEventMatcher(contractAddress *felt.Felt, keys [][]felt.Felt) EventMatcher {
17+
return EventMatcher{
18+
contractAddress: contractAddress,
19+
keysMap: makeKeysMaps(keys),
20+
}
21+
}
22+
23+
func makeKeysMaps(filterKeys [][]felt.Felt) []map[felt.Felt]struct{} {
24+
filterKeysMaps := make([]map[felt.Felt]struct{}, len(filterKeys))
25+
for index, keys := range filterKeys {
26+
kMap := make(map[felt.Felt]struct{}, len(keys))
27+
for _, key := range keys {
28+
kMap[key] = struct{}{}
29+
}
30+
filterKeysMaps[index] = kMap
31+
}
32+
33+
return filterKeysMaps
34+
}
35+
36+
func (e *EventMatcher) matchesEventKeys(eventKeys []*felt.Felt) bool {
37+
// short circuit if event doest have enough keys
38+
for i := len(eventKeys); i < len(e.keysMap); i++ {
39+
if len(e.keysMap[i]) > 0 {
40+
return false
41+
}
42+
}
43+
44+
/// e.keys = [["V1", "V2"], [], ["V3"]] means:
45+
/// ((event.Keys[0] == "V1" OR event.Keys[0] == "V2") AND (event.Keys[2] == "V3")).
46+
//
47+
// Essentially
48+
// for each event.Keys[i], (len(e.keys[i]) == 0 OR event.Keys[i] is in e.keys[i]) should hold
49+
for index, eventKey := range eventKeys {
50+
// empty filter keys means match all
51+
if index >= len(e.keysMap) || len(e.keysMap[index]) == 0 {
52+
break
53+
}
54+
if _, found := e.keysMap[index][*eventKey]; !found {
55+
return false
56+
}
57+
}
58+
59+
return true
60+
}
61+
62+
func (e *EventMatcher) TestBloom(bloomFilter *bloom.BloomFilter) bool {
63+
possibleMatches := true
64+
if e.contractAddress != nil {
65+
addrBytes := e.contractAddress.Bytes()
66+
possibleMatches = bloomFilter.Test(addrBytes[:])
67+
// bloom filter says no events from this contract
68+
if !possibleMatches {
69+
return possibleMatches
70+
}
71+
}
72+
73+
for index, kMap := range e.keysMap {
74+
for key := range kMap {
75+
keyBytes := key.Bytes()
76+
keyAndIndexBytes := binary.AppendVarint(keyBytes[:], int64(index))
77+
78+
// check if block possibly contains the event we are looking for
79+
possibleMatches = bloomFilter.Test(keyAndIndexBytes)
80+
// possible match for this index, no need to continue checking the rest of the keys
81+
if possibleMatches {
82+
break
83+
}
84+
}
85+
86+
// no key on this index matches the filter
87+
if !possibleMatches {
88+
break
89+
}
90+
}
91+
92+
return possibleMatches
93+
}
94+
95+
func (e *EventMatcher) AppendBlockEvents(matchedEventsSofar []*FilteredEvent, header *core.Header, receipts []*core.TransactionReceipt,
96+
cToken *ContinuationToken, chunkSize uint64,
97+
) ([]*FilteredEvent, uint64, error) {
98+
processedEvents := uint64(0)
99+
for _, receipt := range receipts {
100+
for i, event := range receipt.Events {
101+
var blockNumber *uint64
102+
// if header.Hash == nil it's a pending block
103+
if header.Hash != nil {
104+
blockNumber = &header.Number
105+
}
106+
107+
// if last request was interrupted mid-block, and we are still processing that block, skip events
108+
// that were already processed
109+
if cToken != nil && header.Number == cToken.fromBlock && processedEvents < cToken.processedEvents {
110+
processedEvents++
111+
continue
112+
}
113+
114+
if e.contractAddress != nil && !event.From.Equal(e.contractAddress) {
115+
processedEvents++
116+
continue
117+
}
118+
119+
if !e.matchesEventKeys(event.Keys) {
120+
processedEvents++
121+
continue
122+
}
123+
124+
if uint64(len(matchedEventsSofar)) < chunkSize {
125+
matchedEventsSofar = append(matchedEventsSofar, &FilteredEvent{
126+
BlockNumber: blockNumber,
127+
BlockHash: header.Hash,
128+
TransactionHash: receipt.TransactionHash,
129+
EventIndex: i,
130+
Event: event,
131+
})
132+
} else {
133+
// we are at the capacity, return what we have accumulated so far and a continuation token
134+
return matchedEventsSofar, processedEvents, errChunkSizeReached
135+
}
136+
// count the events we processed for this block to include in the continuation token
137+
processedEvents++
138+
}
139+
}
140+
return matchedEventsSofar, processedEvents, nil
141+
}

0 commit comments

Comments
 (0)