Skip to content
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e6b4e8c
make Proposal type generic
jordanschalm Nov 26, 2025
880557b
make Buffer definition generic to both proposal types
jordanschalm Nov 27, 2025
09eb86c
use leveled forest for block buffer
jordanschalm Nov 27, 2025
46000ac
add docs, todos
jordanschalm Nov 27, 2025
d54e974
ai: remove bool return from pending blocks
jordanschalm Nov 27, 2025
9530e1e
ai: remove DropByParent method
jordanschalm Nov 27, 2025
7235f7a
update mocks
jordanschalm Nov 27, 2025
e3acbf0
add lower view boundary
jordanschalm Nov 27, 2025
56be6f0
update comments
jordanschalm Nov 27, 2025
760434b
add mutex
jordanschalm Nov 27, 2025
99b181f
check retained level error
jordanschalm Nov 27, 2025
3ab8dd5
add missing return
jordanschalm Nov 27, 2025
a4529e8
update buffer
jordanschalm Nov 28, 2025
b281bc0
update mocks
jordanschalm Nov 28, 2025
5dbfe91
fix tests for signature change
jordanschalm Nov 28, 2025
39f010a
bug: forest can get child count for nonextnt node
jordanschalm Nov 28, 2025
49a196b
ai: add tests for pending blocks suite
jordanschalm Nov 28, 2025
b3b1460
adjust tests
jordanschalm Nov 28, 2025
beec64c
remove old buffer backend
jordanschalm Nov 28, 2025
e813f5c
ai: add threshold also to mempool
jordanschalm Nov 28, 2025
e1bed68
ai: address lint errors
jordanschalm Nov 28, 2025
718a820
Apply suggestion from @jordanschalm
jordanschalm Nov 28, 2025
846a5f8
Merge branch 'master' into jord/8170-ln-perm-block-buffer
jordanschalm Nov 28, 2025
292a175
Merge branch 'master' into jord/8170-ln-perm-block-buffer
jordanschalm Dec 8, 2025
acccf49
Use HashablePayload as base generic type parameter
tim-barry Dec 9, 2025
7f89681
Merge pull request #8248 from onflow/tim/8196-refactor-generic-types-…
jordanschalm Dec 11, 2025
e21fa82
Merge branch 'master' into jord/8170-ln-perm-block-buffer
jordanschalm Dec 12, 2025
37c9569
Changed how compliance engine treats blocks in the cache. Updated godoc
durkmurder Dec 18, 2025
8fb4c54
Merge branch 'master' of https://github.com/onflow/flow-go into jord/…
durkmurder Dec 18, 2025
5108c97
documentation extensions
AlexHentschel Dec 22, 2025
d350dbe
Merge branch 'master' into jord/8170-ln-perm-block-buffer
durkmurder Dec 22, 2025
df61acd
Fixed initialization for PendingClusterBlocks
durkmurder Jan 5, 2026
5279647
Merge branch 'master' of https://github.com/onflow/flow-go into jord/…
durkmurder Jan 5, 2026
b001e9b
Updated godoc
durkmurder Jan 5, 2026
3326e63
Updated pending blocks buffer to store one proposal per view
durkmurder Jan 5, 2026
cb267a5
Moved validation of proposal before adding it to the cache
durkmurder Jan 5, 2026
cffc38d
Fixed tests. Updated godoc
durkmurder Jan 6, 2026
ad038a3
Merge branch 'master' of https://github.com/onflow/flow-go into jord/…
durkmurder Jan 6, 2026
ac0cdb2
Added test to ensure that we indeed store single proposal per view
durkmurder Jan 7, 2026
bee053b
Updated godoc
durkmurder Jan 7, 2026
9e7ec7e
Updated godoc
durkmurder Jan 7, 2026
923770b
Changed how already pending proposals are handled by the compliance e…
durkmurder Jan 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ func main() {
}).
Component("consensus compliance engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize the pending blocks cache
proposals := buffer.NewPendingBlocks()
proposals := buffer.NewPendingBlocks(node.LastFinalizedHeader.View, node.ComplianceConfig.GetSkipNewProposalsThreshold())

logger := createLogger(node.Logger, node.RootChainID)
complianceCore, err := compliance.NewCore(
Expand Down
6 changes: 3 additions & 3 deletions consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,12 +514,12 @@ func createNode(
)
require.NoError(t, err)

// initialize the pending blocks cache
cache := buffer.NewPendingBlocks()

rootHeader, err := rootSnapshot.Head()
require.NoError(t, err)

// initialize the pending blocks cache
cache := buffer.NewPendingBlocks(rootHeader.View, modulecompliance.DefaultConfig().SkipNewProposalsThreshold)

rootQC, err := rootSnapshot.QuorumCertificate()
require.NoError(t, err)

Expand Down
37 changes: 29 additions & 8 deletions engine/collection/compliance/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func NewCore(
if err != nil {
return nil, fmt.Errorf("could not initialized finalized boundary cache: %w", err)
}
c.ProcessFinalizedBlock(final)
err = c.ProcessFinalizedBlock(final)
if err != nil {
return nil, fmt.Errorf("could not process finalized block: %w", err)
}

// log the mempool size off the bat
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())
Expand Down Expand Up @@ -193,7 +196,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error
_, found := c.pending.ByID(block.ParentID)
if found {
// add the block to the cache
_ = c.pending.Add(proposal)
if err := c.pending.Add(proposal); err != nil {
if mempool.IsBeyondActiveRangeError(err) {
// In general we expect the block buffer to use SkipNewProposalsThreshold,
// however since it is instantiated outside this component, we allow the thresholds to differ
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
return nil
}
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
}
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())

return nil
Expand All @@ -207,7 +218,15 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*cluster.Proposal]) error
return fmt.Errorf("could not check parent exists: %w", err)
}
if !exists {
_ = c.pending.Add(proposal)
if err := c.pending.Add(proposal); err != nil {
if mempool.IsBeyondActiveRangeError(err) {
// In general we expect the block buffer to use SkipNewProposalsThreshold,
// however since it is instantiated outside this component, we allow the thresholds to differ
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
return nil
}
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
}
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())

c.sync.RequestBlock(block.ParentID, block.Height-1)
Expand Down Expand Up @@ -288,9 +307,6 @@ func (c *Core) processBlockAndDescendants(proposal flow.Slashable[*cluster.Propo
}
}

// drop all the children that should have been processed now
c.pending.DropForParent(blockID)
Copy link
Member Author

@jordanschalm jordanschalm Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this line is causing the integration test failure. We assume that c.pending only includes blocks that have not been processed and added to our local state. However, with the change in this PR, we retain blocks until their view is finalized.

Specific problematic case:

There are blocks like A<-B<-C.

  • We receive B first and cache it in buffer
  • We receive A and process it. Now A and B are in our local state
    • Critically, B is also still in the block buffer
  • We receive C
    • We first check to see if its parent B is in the block buffer
      _, found := c.pending.ByID(header.ParentID)
    • B is in the block buffer. We interpret this as meaning that B does not yet connect to the finalized state, store C in the buffer, and continue.
    • Now we will never process C

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


return nil
}

Expand Down Expand Up @@ -363,14 +379,19 @@ func (c *Core) processBlockProposal(proposal *cluster.Proposal) error {

// ProcessFinalizedBlock performs pruning of stale data based on finalization event
// removes pending blocks below the finalized view
func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) {
// No errors are expected during normal operation.
func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) error {
// remove all pending blocks at or below the finalized view
c.pending.PruneByView(finalized.View)
err := c.pending.PruneByView(finalized.View)
if err != nil {
return err
}
c.finalizedHeight.Set(finalized.Height)
c.finalizedView.Set(finalized.View)

// always record the metric
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())
return nil
}

// checkForAndLogOutdatedInputError checks whether error is an `engine.OutdatedInputError`.
Expand Down
10 changes: 3 additions & 7 deletions engine/collection/compliance/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (cs *CommonSuite) SetupTest() {

// set up pending module mock
cs.pending = &module.PendingClusterBlockBuffer{}
cs.pending.On("Add", mock.Anything, mock.Anything).Return(true)
cs.pending.On("Add", mock.Anything, mock.Anything)
cs.pending.On("ByID", mock.Anything).Return(
func(blockID flow.Identifier) flow.Slashable[*cluster.Proposal] {
return cs.pendingDB[blockID]
Expand All @@ -143,9 +143,8 @@ func (cs *CommonSuite) SetupTest() {
return ok
},
)
cs.pending.On("DropForParent", mock.Anything).Return()
cs.pending.On("Size").Return(uint(0))
cs.pending.On("PruneByView", mock.Anything).Return()
cs.pending.On("PruneByView", mock.Anything).Return(nil)

closed := func() <-chan struct{} {
channel := make(chan struct{})
Expand Down Expand Up @@ -518,9 +517,6 @@ func (cs *CoreSuite) TestProcessBlockAndDescendants() {

// check that we submitted each child to hotstuff
cs.hotstuff.AssertExpectations(cs.T())

// make sure we drop the cache after trying to process
cs.pending.AssertCalled(cs.T(), "DropForParent", parent.ID())
}

func (cs *CoreSuite) TestProposalBufferingOrder() {
Expand All @@ -546,7 +542,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() {
}

// replace the engine buffer with the real one
cs.core.pending = realbuffer.NewPendingClusterBlocks()
cs.core.pending = realbuffer.NewPendingClusterBlocks(cs.head.Block.View, 100_000)

// process all of the descendants
for _, proposal := range proposals {
Expand Down
4 changes: 3 additions & 1 deletion engine/collection/compliance/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ func (e *Engine) processOnFinalizedBlock(block *model.Block) error {
if err != nil { // no expected errors
return fmt.Errorf("could not get finalized header: %w", err)
}
e.core.ProcessFinalizedBlock(finalHeader)
if err := e.core.ProcessFinalizedBlock(finalHeader); err != nil {
return fmt.Errorf("could not process finalized block: %w", err)
}
return nil
}
6 changes: 5 additions & 1 deletion engine/collection/epochmgr/factories/compliance.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ func (f *ComplianceEngineFactory) Create(
validator hotstuff.Validator,
) (*compliance.Engine, error) {

cache := buffer.NewPendingClusterBlocks()
final, err := clusterState.Final().Head()
if err != nil {
return nil, fmt.Errorf("could not get finalized header: %w", err)
}
cache := buffer.NewPendingClusterBlocks(final.View, f.config.GetSkipNewProposalsThreshold())
core, err := compliance.NewCore(
f.log,
f.engMetrics,
Expand Down
120 changes: 83 additions & 37 deletions engine/consensus/compliance/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ func NewCore(
if err != nil {
return nil, fmt.Errorf("could not initialized finalized boundary cache: %w", err)
}
c.ProcessFinalizedBlock(final)
err = c.ProcessFinalizedBlock(final)
if err != nil {
return nil, fmt.Errorf("could not process finalized block: %w", err)
}

c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size())

Expand All @@ -111,6 +114,49 @@ func NewCore(
// No errors are expected during normal operation. All returned exceptions
// are potential symptoms of internal state corruption and should be fatal.
func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error {
// In general, there are a variety of attacks that a byzantine proposer might attempt. Conceptually,
// there are two classes:
// (I) Protocol-level attacks by either sending individually invalid blocks, or by equivocating with
// by sending a pair of conflicting blocks (individually valid and/or invalid) for the same view.
// (II) Resource exhaustion attacks by sending a large number of individually invalid or valid blocks.
// Category (I) will be eventually detected. Attacks of category (II), typically contain elements of (I).
// This is because the protocol is purposefully designed such that there is few degrees of freedom available
// to a byzantine proposer attempting to mount a resource exhaustion attack (type II), unless the proposer
// violates protocol rules i.e. provides evidence of their wrongdoing (type I).
// However, we have to make sure that the nodes survive an attack of category (II), and stays responsive.
// If the node crashes, the node will lose the evidence of the attack, and the byzantine proposer
// will have succeeded in their goal of mounting a denial-of-service attack without being held accountable.
//
// The general requirements for a BFT system are:
// 1. withstand attacks (don't crash and remain responsive)
// 2. detect attacks and collect evidence for slashing challenges
// 3. suppress the attack by slashing and ejecting the offending node(s)
//
// The primary responsibility of compliance engine is to protect the business logic from attacks of
// category (II) and to collect evidence for attacks of category (I) for blocks that are _individually_
// invalid. The compliance engine may detect some portion of equivocation attacks (type I), in order
// to better protect itself from resource exhaustion attacks (type II). Though, the primary responsibility
// for detecting equivocation is with the hotstuff layer. The reason is that, in case of equivocation with
// multiple valid blocks, the compliance engine can't know which block might get certified and potentially
// finalized. So it can't reject _valid_ equivocating blocks outright, as that might lead to liveness issues.
//
// The compliance engine must be resilient to the following classes of resource exhaustion attacks:
// 1. A byzantine proposers might attempt to create blocks at many different future views. Mitigations:
// • Only proposals whose proposer is the valid leader for the respective view should pass the compliance
// engine. Block that are not proposed by a valid leader are outright reject and we create a slashing
// challenge against the proposer. This filtering should be done by the compliance engine. Such blocks
// should never reach the higher-level business logic.
// • A byzantine proposer might attempt to create blocks for a large number of different future views,
// for which it is valid leader. This is mitigated by dropping blocks that are too far ahead of the
// locally finalized view. The threshold is configured via `SkipNewProposalsThreshold` parameter.
// This does not lead to a slashing challenge, as we can't reliably detect without investing significant
// CPU resources validating the QC, whether the proposer is violating protocol rules by making up an
// invalid QC / TC. Valid blocks will eventually be retrieved via sync again, once the local finalized
// view catches up, even if they were dropped at first.
// 2. A byzantine proposers might spam us with many different _valid_ blocks for the same view, for which
// it is the leader. This is particularly dangerous for
//

block := proposal.Message.Block
header := block.ToHeader()
blockID := block.ID()
Expand Down Expand Up @@ -165,17 +211,18 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error {
}

// first, we reject all blocks that we don't need to process:
// 1) blocks already in the cache; they will already be processed later
// 2) blocks already on disk; they were processed and await finalization
// 1. blocks already in the cache, that are disconnected: they will be processed later.
// 2. blocks already in the cache, that were already processed: they will be eventually pruned by view.
// 3. blocks already on disk: they were processed and await finalization

// ignore proposals that are already cached
// 1,2. Ignore proposals that are already cached
_, cached := c.pending.ByID(blockID)
if cached {
log.Debug().Msg("skipping already cached proposal")
return nil
}

// ignore proposals that were already processed
// 3. Ignore proposals that were already processed
_, err := c.headers.ByBlockID(blockID)
if err == nil {
log.Debug().Msg("skipping already processed proposal")
Expand All @@ -185,44 +232,41 @@ func (c *Core) OnBlockProposal(proposal flow.Slashable[*flow.Proposal]) error {
return fmt.Errorf("could not check proposal: %w", err)
}

// there are two possibilities if the proposal is neither already pending
// processing in the cache, nor has already been processed:
// 1) the proposal is unverifiable because the parent is unknown
// => we cache the proposal
// 2) the proposal is connected to finalized state through an unbroken chain
// => we verify the proposal and forward it to hotstuff if valid

// if the parent is a pending block (disconnected from the incorporated state), we cache this block as well.
// we don't have to request its parent block or its ancestor again, because as a
// pending block, its parent block must have been requested.
// if there was problem requesting its parent or ancestors, the sync engine's forward
// syncing with range requests for finalized blocks will request for the blocks.
_, found := c.pending.ByID(header.ParentID)
if found {
// add the block to the cache
_ = c.pending.Add(proposal)
c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size())

return nil
}
// At this point we are dealing with a block proposal that is neither present in the cache nor on disk.
// There are three possibilities if the proposal is stored neither in cache nor on disk:
// 1. The proposal is connected to the finalized state => we perform the further processing and pass it to the hotstuff layer.
// 2. The proposal is not connected to the finalized state:
// 2.1 Parent has been already cached, meaning we have a partial chain => cache the proposal and wait for eventual resolution of missing the piece.
// 2.2 Parent has not been cached yet => cache the proposal, additionally request the missing parent from the committee.

// if the proposal is connected to a block that is neither in the cache, nor
// in persistent storage, its direct parent is missing; cache the proposal
// and request the parent
// 1. Check if we parent is connected to the finalized state
exists, err := c.headers.Exists(header.ParentID)
if err != nil {
return fmt.Errorf("could not check parent exists: %w", err)
}
if !exists {
_ = c.pending.Add(proposal)
// 2. Cache the proposal either way for 2.1 or 2.2
if err := c.pending.Add(proposal); err != nil {
if mempool.IsBeyondActiveRangeError(err) {
// In general, we expect the block buffer to use SkipNewProposalsThreshold,
// however since it is instantiated outside this component, we allow the thresholds to differ
log.Debug().Err(err).Msg("dropping block beyond block buffer active range")
return nil
}
return fmt.Errorf("could not add proposal to pending buffer: %w", err)
}
c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size())

c.sync.RequestBlock(header.ParentID, header.Height-1)
log.Debug().Msg("requesting missing parent for proposal")
// 2.2 Parent has not been cached yet, request it from the committee
if _, found := c.pending.ByID(header.ParentID); !found {
c.sync.RequestBlock(header.ParentID, header.Height-1)
log.Debug().Msg("requesting missing parent for proposal")
}

return nil
}

// At this point, we should be able to connect the proposal to the finalized
// 1. At this point, we should be able to connect the proposal to the finalized
// state and should process it to see whether to forward to hotstuff or not.
// processBlockAndDescendants is a recursive function. Here we trace the
// execution of the entire recursion, which might include processing the
Expand Down Expand Up @@ -300,9 +344,6 @@ func (c *Core) processBlockAndDescendants(proposal flow.Slashable[*flow.Proposal
}
}

// drop all the children that should have been processed now
c.pending.DropForParent(blockID)

return nil
}

Expand Down Expand Up @@ -392,14 +433,19 @@ func (c *Core) processBlockProposal(proposal *flow.Proposal) error {

// ProcessFinalizedBlock performs pruning of stale data based on finalization event
// removes pending blocks below the finalized view
func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) {
// No errors are expected during normal operation.
func (c *Core) ProcessFinalizedBlock(finalized *flow.Header) error {
// remove all pending blocks at or below the finalized view
c.pending.PruneByView(finalized.View)
err := c.pending.PruneByView(finalized.View)
if err != nil {
return err
}
c.finalizedHeight.Set(finalized.Height)
c.finalizedView.Set(finalized.View)

// always record the metric
c.mempoolMetrics.MempoolEntries(metrics.ResourceProposal, c.pending.Size())
return nil
}

// checkForAndLogOutdatedInputError checks whether error is an `engine.OutdatedInputError`.
Expand Down
10 changes: 3 additions & 7 deletions engine/consensus/compliance/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (cs *CommonSuite) SetupTest() {

// set up pending module mock
cs.pending = &module.PendingBlockBuffer{}
cs.pending.On("Add", mock.Anything, mock.Anything).Return(true)
cs.pending.On("Add", mock.Anything, mock.Anything)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing mock return value for Add method.

The Add method now returns an error according to the new PendingBlockBuffer interface, but the mock expectation does not specify a return value. This could cause unexpected test behavior or panics if the code checks the error.

🔎 Proposed fix
-	cs.pending.On("Add", mock.Anything, mock.Anything)
+	cs.pending.On("Add", mock.Anything).Return(nil)

Note: The method signature also appears to have changed to accept a single argument (proposal) rather than two arguments. Verify against the interface definition.

🤖 Prompt for AI Agents
In @engine/consensus/compliance/core_test.go around line 203, The test mock for
PendingBlockBuffer is out of date: Update the cs.pending mock expectation to
match the new PendingBlockBuffer.Add signature (single argument `proposal`) and
specify the returned error value; for example replace the current
cs.pending.On("Add", mock.Anything, mock.Anything) with a single-arg expectation
and chain a .Return(...) that returns an error or nil as appropriate (e.g.,
.Return(nil)) so the test handles the Add error return correctly and matches the
interface.

cs.pending.On("ByID", mock.Anything).Return(
func(blockID flow.Identifier) flow.Slashable[*flow.Proposal] {
return cs.pendingDB[blockID]
Expand All @@ -219,9 +219,8 @@ func (cs *CommonSuite) SetupTest() {
return ok
},
)
cs.pending.On("DropForParent", mock.Anything).Return()
cs.pending.On("Size").Return(uint(0))
cs.pending.On("PruneByView", mock.Anything).Return()
cs.pending.On("PruneByView", mock.Anything).Return(nil)

// set up hotstuff module mock
cs.hotstuff = module.NewHotStuff(cs.T())
Expand Down Expand Up @@ -565,9 +564,6 @@ func (cs *CoreSuite) TestProcessBlockAndDescendants() {
Message: proposal0,
})
require.NoError(cs.T(), err, "should pass handling children")

// make sure we drop the cache after trying to process
cs.pending.AssertCalled(cs.T(), "DropForParent", parent.ID())
}

func (cs *CoreSuite) TestProposalBufferingOrder() {
Expand All @@ -588,7 +584,7 @@ func (cs *CoreSuite) TestProposalBufferingOrder() {
}

// replace the engine buffer with the real one
cs.core.pending = real.NewPendingBlocks()
cs.core.pending = real.NewPendingBlocks(cs.head.View, 100_000)

// check that we request the ancestor block each time
cs.sync.On("RequestBlock", missingBlock.ID(), missingBlock.Height).Times(len(proposals))
Expand Down
Loading
Loading