Skip to content

Commit e813f5c

Browse files
committed
ai: add threshold also to mempool
it already exists in the compliance engine as well, but including in mempool provides stronger localized guarantee of behaviour
1 parent beec64c commit e813f5c

File tree

15 files changed

+232
-39
lines changed

15 files changed

+232
-39
lines changed

cmd/consensus/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -805,7 +805,7 @@ func main() {
805805
}).
806806
Component("consensus compliance engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
807807
// initialize the pending blocks cache
808-
proposals := buffer.NewPendingBlocks(node.LastFinalizedHeader.View)
808+
proposals := buffer.NewPendingBlocks(node.LastFinalizedHeader.View, node.ComplianceConfig.GetSkipNewProposalsThreshold())
809809

810810
logger := createLogger(node.Logger, node.RootChainID)
811811
complianceCore, err := compliance.NewCore(

consensus/integration/nodes_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ func createNode(
518518
require.NoError(t, err)
519519

520520
// initialize the pending blocks cache
521-
cache := buffer.NewPendingBlocks(rootHeader.View)
521+
cache := buffer.NewPendingBlocks(rootHeader.View, modulecompliance.DefaultConfig().SkipNewProposalsThreshold)
522522

523523
rootQC, err := rootSnapshot.QuorumCertificate()
524524
require.NoError(t, err)

engine/collection/compliance/core.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error
196196
_, found := c.pending.ByID(block.ParentID)
197197
if found {
198198
// add the block to the cache
199-
c.pending.Add(proposal)
199+
if err := c.pending.Add(proposal); err != nil {
200+
if mempool.IsBeyondActiveRangeError(err) {
201+
// In general we expect the block buffer to use SkipNewProposalsThreshold,
202+
// however since it is instantiated outside this component, we allow the thresholds to differ
203+
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
204+
return nil
205+
}
206+
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
207+
}
200208
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())
201209

202210
return nil
@@ -210,7 +218,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error
210218
return fmt.Errorf("could not check parent exists: %w", err)
211219
}
212220
if !exists {
213-
c.pending.Add(proposal)
221+
if err := c.pending.Add(proposal); err != nil {
222+
if mempool.IsBeyondActiveRangeError(err) {
223+
// In general we expect the block buffer to use SkipNewProposalsThreshold,
224+
// however since it is instantiated outside this component, we allow the thresholds to differ
225+
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
226+
return nil
227+
}
228+
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
229+
}
214230
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())
215231

216232
c.sync.RequestBlock(block.ParentID, block.Height-1)

engine/collection/compliance/core_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() {
542542
}
543543

544544
// replace the engine buffer with the real one
545-
cs.core.pending = realbuffer.NewPendingClusterBlocks(cs.head.Block.View)
545+
cs.core.pending = realbuffer.NewPendingClusterBlocks(cs.head.Block.View, 100_000)
546546

547547
// process all of the descendants
548548
for _, proposal := range proposals {

engine/collection/epochmgr/factories/compliance.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (f *ComplianceEngineFactory) Create(
7070
if err != nil {
7171
return nil, fmt.Errorf("could not get finalized header: %w", err)
7272
}
73-
cache := buffer.NewPendingClusterBlocks(final.View)
73+
cache := buffer.NewPendingClusterBlocks(final.View, f.config.GetSkipNewProposalsThreshold())
7474
core, err := compliance.NewCore(
7575
f.log,
7676
f.engMetrics,

engine/consensus/compliance/core.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error {
203203
_, found := c.pending.ByID(header.ParentID)
204204
if found {
205205
// add the block to the cache
206-
c.pending.Add(proposal)
206+
if err := c.pending.Add(proposal); err != nil {
207+
if mempool.IsBeyondActiveRangeError(err) {
208+
// In general we expect the block buffer to use SkipNewProposalsThreshold,
209+
// however since it is instantiated outside this component, we allow the thresholds to differ
210+
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
211+
return nil
212+
}
213+
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
214+
}
207215
c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size())
208216

209217
return nil
@@ -217,7 +225,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error {
217225
return fmt.Errorf("could not check parent exists: %w", err)
218226
}
219227
if !exists {
220-
c.pending.Add(proposal)
228+
if err := c.pending.Add(proposal); err != nil {
229+
if mempool.IsBeyondActiveRangeError(err) {
230+
// In general we expect the block buffer to use SkipNewProposalsThreshold,
231+
// however since it is instantiated outside this component, we allow the thresholds to differ
232+
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
233+
return nil
234+
}
235+
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
236+
}
221237
c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size())
222238

223239
c.sync.RequestBlock(header.ParentID, header.Height-1)

engine/consensus/compliance/core_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() {
584584
}
585585

586586
// replace the engine buffer with the real one
587-
cs.core.pending = real.NewPendingBlocks(cs.head.View)
587+
cs.core.pending = real.NewPendingBlocks(cs.head.View, 100_000)
588588

589589
// check that we request the ancestor block each time
590590
cs.sync.On("RequestBlock", missingBlock.ID(), missingBlock.Height).Times(len(proposals))

module/buffer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ type BufferedProposal interface {
2020
type GenericPendingBlockBuffer[T BufferedProposal] interface {
2121
// Add adds the input block to the block buffer.
2222
// If the block already exists, or is below the finalized view, this is a no-op.
23-
Add(block flow.Slashable[T])
23+
// Errors returns:
24+
// - mempool.BlockViewTooFarAheadError if block.View > finalizedView + activeViewRangeSize (when activeViewRangeSize > 0)
25+
Add(block flow.Slashable[T]) error
2426

2527
// ByID returns the block with the given ID, if it exists.
2628
// Otherwise returns (nil, false)

module/buffer/pending_blocks.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/onflow/flow-go/model/flow"
88
"github.com/onflow/flow-go/module"
99
"github.com/onflow/flow-go/module/forest"
10+
"github.com/onflow/flow-go/module/mempool"
1011
)
1112

1213
// proposalVertex implements [forest.Vertex] for generic block proposals.
@@ -51,8 +52,9 @@ func (v proposalVertex[T]) Parent() (flow.Identifier, uint64) {
5152
//
5253
// Safe for concurrent use.
5354
type GenericPendingBlocks[T module.BufferedProposal] struct {
54-
lock *sync.Mutex
55-
forest *forest.LevelledForest
55+
lock *sync.Mutex
56+
forest *forest.LevelledForest
57+
activeViewRangeSize uint64
5658
}
5759

5860
type PendingBlocks = GenericPendingBlocks[*flow.Proposal]
@@ -61,26 +63,45 @@ type PendingClusterBlocks = GenericPendingBlocks[*cluster.Proposal]
6163
var _ module.PendingBlockBuffer = (*PendingBlocks)(nil)
6264
var _ module.PendingClusterBlockBuffer = (*PendingClusterBlocks)(nil)
6365

64-
func NewPendingBlocks(finalizedView uint64) *PendingBlocks {
66+
func NewPendingBlocks(finalizedView uint64, activeViewRangeSize uint64) *PendingBlocks {
6567
return &PendingBlocks{
66-
lock: new(sync.Mutex),
67-
forest: forest.NewLevelledForest(finalizedView),
68+
lock: new(sync.Mutex),
69+
// LevelledForest's lowestLevel is inclusive, so add 1 here
70+
forest: forest.NewLevelledForest(finalizedView + 1),
71+
activeViewRangeSize: activeViewRangeSize,
6872
}
6973
}
7074

71-
func NewPendingClusterBlocks(finalizedView uint64) *PendingClusterBlocks {
75+
func NewPendingClusterBlocks(finalizedView uint64, activeViewRangeSize uint64) *PendingClusterBlocks {
7276
return &PendingClusterBlocks{
73-
lock: new(sync.Mutex),
74-
forest: forest.NewLevelledForest(finalizedView),
77+
lock: new(sync.Mutex),
78+
forest: forest.NewLevelledForest(finalizedView),
79+
activeViewRangeSize: activeViewRangeSize,
7580
}
7681
}
7782

7883
// Add adds the input block to the block buffer.
7984
// If the block already exists, or is below the finalized view, this is a no-op.
80-
func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) {
85+
// Errors returns:
86+
// - mempool.BeyondActiveRangeError if block.View > finalizedView + activeViewRangeSize (when activeViewRangeSize > 0)
87+
func (b *GenericPendingBlocks[T]) Add(block flow.Slashable[T]) error {
8188
b.lock.Lock()
8289
defer b.lock.Unlock()
90+
91+
blockView := block.Message.ProposalHeader().Header.View
92+
finalizedView := b.highestPrunedView()
93+
94+
// Check if block view exceeds the active view range size
95+
// If activeViewRangeSize is 0, there's no limitation
96+
if b.activeViewRangeSize > 0 && blockView > finalizedView+b.activeViewRangeSize {
97+
return mempool.NewBeyondActiveRangeError(
98+
"block view %d exceeds active view range size: finalized view %d + range size %d = %d",
99+
blockView, finalizedView, b.activeViewRangeSize, finalizedView+b.activeViewRangeSize,
100+
)
101+
}
102+
83103
b.forest.AddVertex(newProposalVertex(block))
104+
return nil
84105
}
85106

86107
// ByID returns the block with the given ID, if it exists.
@@ -131,3 +152,10 @@ func (b *GenericPendingBlocks[T]) Size() uint {
131152
defer b.lock.Unlock()
132153
return uint(b.forest.GetSize())
133154
}
155+
156+
// highestPrunedView returns the highest pruned view (finalized view).
157+
// CAUTION: Caller must acquire the lock.
158+
func (b *GenericPendingBlocks[T]) highestPrunedView() uint64 {
159+
// LevelledForest.LowestLevel is the lowest UNPRUNED view, so subtract 1 here
160+
return b.forest.LowestLevel - 1
161+
}

module/buffer/pending_blocks_test.go

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ func TestPendingBlocksSuite(t *testing.T) {
2222
}
2323

2424
func (suite *PendingBlocksSuite) SetupTest() {
25-
// Initialize with finalized view 0
26-
suite.buffer = NewPendingBlocks(0)
25+
// Initialize with finalized view 0 and no view range limitation (0 = no limit)
26+
// Individual tests that need the limitation will create their own buffers
27+
suite.buffer = NewPendingBlocks(0, 0)
2728
}
2829

2930
// block creates a new block proposal wrapped as Slashable.
@@ -41,7 +42,7 @@ func (suite *PendingBlocksSuite) blockWithParent(parent *flow.Header) flow.Slash
4142
// TestAdd tests adding blocks to the buffer.
4243
func (suite *PendingBlocksSuite) TestAdd() {
4344
block := suite.block()
44-
suite.buffer.Add(block)
45+
suite.Require().NoError(suite.buffer.Add(block))
4546

4647
// Verify block can be retrieved by ID
4748
retrieved, ok := suite.buffer.ByID(block.Message.Block.ID())
@@ -59,8 +60,8 @@ func (suite *PendingBlocksSuite) TestAdd() {
5960
// TestAddDuplicate verifies that adding the same block twice is a no-op.
6061
func (suite *PendingBlocksSuite) TestAddDuplicate() {
6162
block := suite.block()
62-
suite.buffer.Add(block)
63-
suite.buffer.Add(block) // Add again
63+
suite.Require().NoError(suite.buffer.Add(block))
64+
suite.Require().NoError(suite.buffer.Add(block)) // Add again
6465

6566
// Should still only have one block
6667
suite.Assert().Equal(uint(1), suite.buffer.Size())
@@ -74,24 +75,88 @@ func (suite *PendingBlocksSuite) TestAddDuplicate() {
7475
// TestAddBelowFinalizedView verifies that adding blocks below finalized view is a no-op.
7576
func (suite *PendingBlocksSuite) TestAddBelowFinalizedView() {
7677
finalizedView := uint64(1000)
77-
buffer := NewPendingBlocks(finalizedView)
78+
buffer := NewPendingBlocks(finalizedView, 100_000)
7879

7980
// Create a block with view below finalized
8081
block := suite.block()
8182
block.Message.Block.ParentView = finalizedView - 10
8283
block.Message.Block.View = finalizedView - 5
8384

84-
buffer.Add(block)
85+
suite.Require().NoError(buffer.Add(block))
8586

8687
_, ok := buffer.ByID(block.Message.Block.ID())
8788
suite.Assert().False(ok)
8889
suite.Assert().Equal(uint(0), buffer.Size())
8990
}
9091

92+
// TestAddExceedsActiveViewRangeSize verifies that adding blocks that exceed the active view range size returns an error.
93+
func (suite *PendingBlocksSuite) TestAddExceedsActiveViewRangeSize() {
94+
finalizedView := uint64(1000)
95+
activeViewRangeSize := uint64(100)
96+
buffer := NewPendingBlocks(finalizedView, activeViewRangeSize)
97+
98+
// Create a parent header and then a block that exceeds the active view range size
99+
parentHeader := unittest.BlockHeaderFixture()
100+
parentHeader.View = finalizedView + 50
101+
block := suite.blockWithParent(parentHeader)
102+
block.Message.Block.View = finalizedView + activeViewRangeSize + 1
103+
104+
err := buffer.Add(block)
105+
suite.Assert().Error(err)
106+
suite.Assert().True(mempool.IsBeyondActiveRangeError(err))
107+
108+
// Verify block was not added
109+
_, ok := buffer.ByID(block.Message.Block.ID())
110+
suite.Assert().False(ok)
111+
suite.Assert().Equal(uint(0), buffer.Size())
112+
}
113+
114+
// TestAddWithinActiveViewRangeSize verifies that adding blocks within the active view range size succeeds.
115+
func (suite *PendingBlocksSuite) TestAddWithinActiveViewRangeSize() {
116+
finalizedView := uint64(1000)
117+
activeViewRangeSize := uint64(100)
118+
buffer := NewPendingBlocks(finalizedView, activeViewRangeSize)
119+
120+
// Create a parent header and then a block that is exactly at the limit
121+
parentHeader := unittest.BlockHeaderFixture()
122+
parentHeader.View = finalizedView + 50
123+
block := suite.blockWithParent(parentHeader)
124+
block.Message.Block.View = finalizedView + activeViewRangeSize
125+
126+
err := buffer.Add(block)
127+
suite.Assert().NoError(err)
128+
129+
// Verify block was added
130+
_, ok := buffer.ByID(block.Message.Block.ID())
131+
suite.Assert().True(ok)
132+
suite.Assert().Equal(uint(1), buffer.Size())
133+
}
134+
135+
// TestAddWithZeroActiveViewRangeSize verifies that when activeViewRangeSize is 0, there's no limitation.
136+
func (suite *PendingBlocksSuite) TestAddWithZeroActiveViewRangeSize() {
137+
finalizedView := uint64(1000)
138+
activeViewRangeSize := uint64(0) // No limitation
139+
buffer := NewPendingBlocks(finalizedView, activeViewRangeSize)
140+
141+
// Create a parent header and then a block that is very far ahead
142+
parentHeader := unittest.BlockHeaderFixture()
143+
parentHeader.View = finalizedView + 500_000
144+
block := suite.blockWithParent(parentHeader)
145+
block.Message.Block.View = finalizedView + 1_000_000
146+
147+
err := buffer.Add(block)
148+
suite.Assert().NoError(err)
149+
150+
// Verify block was added
151+
_, ok := buffer.ByID(block.Message.Block.ID())
152+
suite.Assert().True(ok)
153+
suite.Assert().Equal(uint(1), buffer.Size())
154+
}
155+
91156
// TestByID tests retrieving blocks by ID.
92157
func (suite *PendingBlocksSuite) TestByID() {
93158
block := suite.block()
94-
suite.buffer.Add(block)
159+
suite.Require().NoError(suite.buffer.Add(block))
95160

96161
// Test retrieving existing block
97162
retrieved, ok := suite.buffer.ByID(block.Message.Block.ID())
@@ -173,7 +238,7 @@ func (suite *PendingBlocksSuite) TestPruneByView() {
173238
// 10% of the time, add a new unrelated block
174239
if i%10 == 0 {
175240
block := suite.block()
176-
suite.buffer.Add(block)
241+
suite.Require().NoError(suite.buffer.Add(block))
177242
blocks = append(blocks, block)
178243
continue
179244
}
@@ -182,7 +247,7 @@ func (suite *PendingBlocksSuite) TestPruneByView() {
182247
if i%2 == 1 && len(blocks) > 0 {
183248
parent := blocks[rand.Intn(len(blocks))]
184249
block := suite.blockWithParent(parent.Message.Block.ToHeader())
185-
suite.buffer.Add(block)
250+
suite.Require().NoError(suite.buffer.Add(block))
186251
blocks = append(blocks, block)
187252
}
188253
}
@@ -210,13 +275,13 @@ func (suite *PendingBlocksSuite) TestPruneByView() {
210275
// TestPruneByViewBelowFinalizedView verifies that pruning below finalized view returns an error.
211276
func (suite *PendingBlocksSuite) TestPruneByViewBelowFinalizedView() {
212277
finalizedView := uint64(100)
213-
buffer := NewPendingBlocks(finalizedView)
278+
buffer := NewPendingBlocks(finalizedView, 100_000)
214279

215280
// Add some blocks above finalized view
216281
parent := unittest.BlockHeaderFixture()
217282
parent.View = finalizedView + 10
218283
block := suite.blockWithParent(parent)
219-
buffer.Add(block)
284+
suite.Require().NoError(buffer.Add(block))
220285

221286
// Prune at finalized view should succeed
222287
err := buffer.PruneByView(finalizedView)
@@ -305,7 +370,7 @@ func (suite *PendingBlocksSuite) TestConcurrentAccess() {
305370
defer wg.Done()
306371
for j := 0; j < blocksPerGoroutine; j++ {
307372
block := suite.block()
308-
suite.buffer.Add(block)
373+
suite.Require().NoError(suite.buffer.Add(block))
309374
}
310375
}()
311376
}

0 commit comments

Comments
 (0)