Skip to content
Open
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e6b4e8c
make Proposal type generic
jordanschalm Nov 26, 2025
880557b
make Buffer definition generic to both proposal types
jordanschalm Nov 27, 2025
09eb86c
use leveled forest for block buffer
jordanschalm Nov 27, 2025
46000ac
add docs, todos
jordanschalm Nov 27, 2025
d54e974
ai: remove bool return from pending blocks
jordanschalm Nov 27, 2025
9530e1e
ai: remove DropByParent method
jordanschalm Nov 27, 2025
7235f7a
update mocks
jordanschalm Nov 27, 2025
e3acbf0
add lower view boundary
jordanschalm Nov 27, 2025
56be6f0
update comments
jordanschalm Nov 27, 2025
760434b
add mutex
jordanschalm Nov 27, 2025
99b181f
check retained level error
jordanschalm Nov 27, 2025
3ab8dd5
add missing return
jordanschalm Nov 27, 2025
a4529e8
update buffer
jordanschalm Nov 28, 2025
b281bc0
update mocks
jordanschalm Nov 28, 2025
5dbfe91
fix tests for signature change
jordanschalm Nov 28, 2025
39f010a
bug: forest can get child count for nonextnt node
jordanschalm Nov 28, 2025
49a196b
ai: add tests for pending blocks suite
jordanschalm Nov 28, 2025
b3b1460
adjust tests
jordanschalm Nov 28, 2025
beec64c
remove old buffer backend
jordanschalm Nov 28, 2025
e813f5c
ai: add threshold also to mempool
jordanschalm Nov 28, 2025
e1bed68
ai: address lint errors
jordanschalm Nov 28, 2025
718a820
Apply suggestion from @jordanschalm
jordanschalm Nov 28, 2025
846a5f8
Merge branch 'master' into jord/8170-ln-perm-block-buffer
jordanschalm Nov 28, 2025
292a175
Merge branch 'master' into jord/8170-ln-perm-block-buffer
jordanschalm Dec 8, 2025
acccf49
Use HashablePayload as base generic type parameter
tim-barry Dec 9, 2025
7f89681
Merge pull request #8248 from onflow/tim/8196-refactor-generic-types-…
jordanschalm Dec 11, 2025
e21fa82
Merge branch 'master' into jord/8170-ln-perm-block-buffer
jordanschalm Dec 12, 2025
37c9569
Changed how compliance engine treats blocks in the cache. Updated godoc
durkmurder Dec 18, 2025
8fb4c54
Merge branch 'master' of https://github.com/onflow/flow-go into jord/…
durkmurder Dec 18, 2025
5108c97
documentation extensions
AlexHentschel Dec 22, 2025
d350dbe
Merge branch 'master' into jord/8170-ln-perm-block-buffer
durkmurder Dec 22, 2025
df61acd
Fixed initialization for PendingClusterBlocks
durkmurder Jan 5, 2026
5279647
Merge branch 'master' of https://github.com/onflow/flow-go into jord/…
durkmurder Jan 5, 2026
b001e9b
Updated godoc
durkmurder Jan 5, 2026
3326e63
Updated pending blocks buffer to store one proposal per view
durkmurder Jan 5, 2026
cb267a5
Moved validation of proposal before adding it to the cache
durkmurder Jan 5, 2026
cffc38d
Fixed tests. Updated godoc
durkmurder Jan 6, 2026
ad038a3
Merge branch 'master' of https://github.com/onflow/flow-go into jord/…
durkmurder Jan 6, 2026
ac0cdb2
Added test to ensure that we indeed store single proposal per view
durkmurder Jan 7, 2026
bee053b
Updated godoc
durkmurder Jan 7, 2026
9e7ec7e
Updated godoc
durkmurder Jan 7, 2026
923770b
Changed how already pending proposals are handled by the compliance e…
durkmurder Jan 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ func main() {
}).
Component("consensus compliance engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize the pending blocks cache
proposals := buffer.NewPendingBlocks()
proposals := buffer.NewPendingBlocks(node.LastFinalizedHeader.View, node.ComplianceConfig.GetSkipNewProposalsThreshold())

logger := createLogger(node.Logger, node.RootChainID)
complianceCore, err := compliance.NewCore(
Expand Down
6 changes: 3 additions & 3 deletions consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,12 +514,12 @@ func createNode(
)
require.NoError(t, err)

// initialize the pending blocks cache
cache := buffer.NewPendingBlocks()

rootHeader, err := rootSnapshot.Head()
require.NoError(t, err)

// initialize the pending blocks cache
cache := buffer.NewPendingBlocks(rootHeader.View, modulecompliance.DefaultConfig().SkipNewProposalsThreshold)

rootQC, err := rootSnapshot.QuorumCertificate()
require.NoError(t, err)

Expand Down
37 changes: 29 additions & 8 deletions engine/collection/compliance/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func NewCore(
if err != nil {
return nil, fmt.Errorf("could not initialized finalized boundary cache: %w", err)
}
c.ProcessFinalizedBlock(final)
err = c.ProcessFinalizedBlock(final)
if err != nil {
return nil, fmt.Errorf("could not process finalized block: %w", err)
}

// log the mempool size off the bat
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())
Expand Down Expand Up @@ -193,7 +196,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error
_, found := c.pending.ByID(block.ParentID)
if found {
// add the block to the cache
_ = c.pending.Add(proposal)
if err := c.pending.Add(proposal); err != nil {
if mempool.IsBeyondActiveRangeError(err) {
// In general we expect the block buffer to use SkipNewProposalsThreshold,
// however since it is instantiated outside this component, we allow the thresholds to differ
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
return nil
}
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
}
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())

return nil
Expand All @@ -207,7 +218,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error
return fmt.Errorf("could not check parent exists: %w", err)
}
if !exists {
_ = c.pending.Add(proposal)
if err := c.pending.Add(proposal); err != nil {
if mempool.IsBeyondActiveRangeError(err) {
// In general we expect the block buffer to use SkipNewProposalsThreshold,
// however since it is instantiated outside this component, we allow the thresholds to differ
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
return nil
}
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
}
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())

c.sync.RequestBlock(block.ParentID, block.Height-1)
Expand Down Expand Up @@ -288,9 +307,6 @@ func (c *Core) processBlockAndDescendants(proposal flow.Slashable[*cluster.Propo
}
}

// drop all the children that should have been processed now
c.pending.DropForParent(blockID)
Copy link
Member Author

@jordanschalm jordanschalm Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this line is causing the integration test failure. We assume that c.pending only includes blocks that have not been processed and added to our local state. However, with the change in this PR, we retain blocks until their view is finalized.

Specific problematic case:

There are blocks like A<-B<-C.

  • We receive B first and cache it in buffer
  • We receive A and process it. Now A and B are in our local state
    • Critically, B is also still in the block buffer
  • We receive C
    • We first check to see if its parent B is in the block buffer
      _, found := c.pending.ByID(header.ParentID)
    • B is in the block buffer. We interpret this as meaning that B does not yet connect to the finalized state, store C in the buffer, and continue.
    • Now we will never process C

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


return nil
}

Expand Down Expand Up @@ -363,14 +379,19 @@ func (c *Core) processBlockProposal(proposal *cluster.Proposal) error {

// ProcessFinalizedBlock performs pruning of stale data based on finalization event
// removes pending blocks below the finalized view
func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) {
// No errors are expected during normal operation.
func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) error {
// remove all pending blocks at or below the finalized view
c.pending.PruneByView(finalized.View)
err := c.pending.PruneByView(finalized.View)
if err != nil {
return err
}
c.finalizedHeight.Set(finalized.Height)
c.finalizedView.Set(finalized.View)

// always record the metric
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())
return nil
}

// checkForAndLogOutdatedInputError checks whether error is an `engine.OutdatedInputError`.
Expand Down
10 changes: 3 additions & 7 deletions engine/collection/compliance/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (cs *CommonSuite) SetupTest() {

// set up pending module mock
cs.pending = &module.PendingClusterBlockBuffer{}
cs.pending.On("Add", mock.Anything, mock.Anything).Return(true)
cs.pending.On("Add", mock.Anything, mock.Anything)
cs.pending.On("ByID", mock.Anything).Return(
func(blockID flow.Identifier) flow.Slashable[*cluster.Proposal] {
return cs.pendingDB[blockID]
Expand All @@ -143,9 +143,8 @@ func (cs *CommonSuite) SetupTest() {
return ok
},
)
cs.pending.On("DropForParent", mock.Anything).Return()
cs.pending.On("Size").Return(uint(0))
cs.pending.On("PruneByView", mock.Anything).Return()
cs.pending.On("PruneByView", mock.Anything).Return(nil)

closed := func() <-chan struct{} {
channel := make(chan struct{})
Expand Down Expand Up @@ -518,9 +517,6 @@ func (cs *CoreSuite) TestProcessBlockAndDescendants() {

// check that we submitted each child to hotstuff
cs.hotstuff.AssertExpectations(cs.T())

// make sure we drop the cache after trying to process
cs.pending.AssertCalled(cs.T(), "DropForParent", parent.ID())
}

func (cs *CoreSuite) TestProposalBufferingOrder() {
Expand All @@ -546,7 +542,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() {
}

// replace the engine buffer with the real one
cs.core.pending = realbuffer.NewPendingClusterBlocks()
cs.core.pending = realbuffer.NewPendingClusterBlocks(cs.head.Block.View, 100_000)

// process all of the descendants
for _, proposal := range proposals {
Expand Down
4 changes: 3 additions & 1 deletion engine/collection/compliance/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ func (e *Engine) processOnFinalizedBlock(block *model.Block) error {
if err != nil { // no expected errors
return fmt.Errorf("could not get finalized header: %w", err)
}
e.core.ProcessFinalizedBlock(finalHeader)
if err := e.core.ProcessFinalizedBlock(finalHeader); err != nil {
return fmt.Errorf("could not process finalized block: %w", err)
}
return nil
}
6 changes: 5 additions & 1 deletion engine/collection/epochmgr/factories/compliance.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ func (f *ComplianceEngineFactory) Create(
validator hotstuff.Validator,
) (*compliance.Engine, error) {

cache := buffer.NewPendingClusterBlocks()
final, err := clusterState.Final().Head()
if err != nil {
return nil, fmt.Errorf("could not get finalized header: %w", err)
}
cache := buffer.NewPendingClusterBlocks(final.View, f.config.GetSkipNewProposalsThreshold())
core, err := compliance.NewCore(
f.log,
f.engMetrics,
Expand Down
Loading
Loading