Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ func (m *ExperimentalEVMMempool) Select(goCtx context.Context, i [][]byte) sdkme
defer m.mtx.Unlock()
ctx := sdk.UnwrapSDKContext(goCtx)

// Wait for the legacypool to Reset at >= blockHeight (this may have
// already happened), to ensure all txs in pending pool are valid.
m.legacyTxPool.WaitForReorgHeight(ctx, ctx.BlockHeight())

evmIterator, cosmosIterator := m.getIterators(goCtx, i)

combinedIterator := NewEVMMempoolIterator(evmIterator, cosmosIterator, m.logger, m.txConfig, m.vmKeeper.GetEvmCoinInfo(ctx).Denom, m.blockchain.Config().ChainID, m.blockchain)
Expand Down
90 changes: 68 additions & 22 deletions mempool/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package legacypool

import (
"context"
"errors"
"maps"
"math/big"
Expand Down Expand Up @@ -255,13 +256,15 @@ type LegacyPool struct {
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price

reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
reorgSubscriptionCh chan struct{} // notifies the reorg loop that a subscriber wants to wait on nextDone
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
latestReorgHeight atomic.Int64 // Latest height that the reorg loop has completed

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

Expand All @@ -282,22 +285,24 @@ func New(config Config, chain BlockChain) *LegacyPool {

// Create the transaction pool with its initial settings
pool := &LegacyPool{
config: config,
chain: chain,
chainconfig: chain.Config(),
signer: types.LatestSigner(chain.Config()),
pending: make(map[common.Address]*list),
queue: make(map[common.Address]*list),
beats: make(map[common.Address]time.Time),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
config: config,
chain: chain,
chainconfig: chain.Config(),
signer: types.LatestSigner(chain.Config()),
pending: make(map[common.Address]*list),
queue: make(map[common.Address]*list),
beats: make(map[common.Address]time.Time),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
reorgSubscriptionCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
}
pool.priced = newPricedList(pool.all)
pool.latestReorgHeight.Store(0)

return pool
}
Expand Down Expand Up @@ -1262,7 +1267,8 @@ func (pool *LegacyPool) scheduleReorgLoop() {
queuedEvents[addr] = NewSortedMap()
}
queuedEvents[addr].Put(tx)

case <-pool.reorgSubscriptionCh:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added ability for arbitrary subscriptions to the completion of nextDone (when the "next" reorg loop will complete). This works by a a user pushing a request to subscribe onto this channel, then they must immediately listed on the pool.ReorgDoneCh. This loop will push nextDone onto that channel. The user can then wait on the closure of nextDone, which essentially broadcasts to all holders of this channel (subscribers to the next run of the reorg loop) that is has completed.

pool.reorgDoneCh <- nextDone
case <-curDone:
curDone = nil

Expand Down Expand Up @@ -1342,6 +1348,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,

dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg))
pool.changesSinceReorg = 0 // Reset change counter
if reset != nil && reset.newHead != nil {
pool.latestReorgHeight.Store(reset.newHead.Number.Int64())
}
pool.mu.Unlock()

// Notify subsystems for newly added transactions
Expand Down Expand Up @@ -1963,6 +1972,43 @@ func (pool *LegacyPool) Clear() {
pool.pendingNonces = newNoncer(pool.currentState)
}

// WaitForReorgHeight blocks until the reorg loop has reset at a head with
// height >= height. If the context is cancelled or the pool is shutting down,
// this will also return.
func (pool *LegacyPool) WaitForReorgHeight(ctx context.Context, height int64) {
for pool.latestReorgHeight.Load() < height {
// reorg loop has not run at the target height, subscribe to the
// outcome of the next reorg loop iteration to know when to check again
sub, err := pool.SubscribeToNextReorg()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have not run the reorg loop for the target height yet, we wait for the outcome of the next iteration of the loop. We are explicitly not telling the reorg loop to run here, since that would simply run it again, but not increment it to a new height (since we would need to pass the latest headers to it in order for that to happen). Also if we kick off a new run here and dont increment the latestReorgHeight, then we will continuously kick off new reorg loops until the txpool sees a new block and reorg runs on a new block, essentially doing lots of wasted work.

if err != nil {
return
}

// need to check again in case reorg has finished in between initial
// check and subscribing to next reorg
if pool.latestReorgHeight.Load() >= height {
return
}

select {
case <-sub:
case <-ctx.Done():
return
Comment on lines +1995 to +1996
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likely a better way to handle this, but mempool.Select doesnt return an error, so if the context cancels during this call, then we potentially allow invalid txs to be selected, which will just be invalidated. If this happens we likely timeout propose. Not 100% sure what it means for the context to be cancelled here (is app shutting down?), based on what's actually happening panicing may be better. Likely this is a follow on to this, probably not important right now.

}
}
}

// SubscribeToNextReorg returns a channel that will close when the next reorg
// loop completes. An error is returned if the loop is shutting down.
func (pool *LegacyPool) SubscribeToNextReorg() (chan struct{}, error) {
select {
case pool.reorgSubscriptionCh <- struct{}{}:
return <-pool.reorgDoneCh, nil
case <-pool.reorgShutdownCh:
return nil, errors.New("shutdown")
}
}

// HasPendingAuth returns a flag indicating whether there are pending
// authorizations from the specific address cached in the pool.
func (pool *LegacyPool) HasPendingAuth(addr common.Address) bool {
Expand Down
149 changes: 149 additions & 0 deletions mempool/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package legacypool

import (
"context"
"crypto/ecdsa"
crand "crypto/rand"
"errors"
Expand Down Expand Up @@ -2668,6 +2669,154 @@ func TestRemoveTxTruncatePoolRace(t *testing.T) {
wg.Wait()
}

// TestWaitForReorgHeight tests that WaitForReorgHeight properly blocks until
// the reorg loop has completed for the specified height.
func TestWaitForReorgHeight(t *testing.T) {
t.Run("waits for reorg to complete", func(t *testing.T) {
pool, _ := setupPool()
defer pool.Close()

if pool.latestReorgHeight.Load() != 0 {
t.Fatalf("expected initial height 0, got %d", pool.latestReorgHeight.Load())
}

// Create headers for the reset
oldHead := &types.Header{Number: big.NewInt(0), BaseFee: big.NewInt(10)}
newHead := &types.Header{Number: big.NewInt(5), BaseFee: big.NewInt(10)}

var reorgCompleted atomic.Bool
var waitCompleted atomic.Bool
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
ctx := context.Background()
pool.WaitForReorgHeight(ctx, 5)
waitCompleted.Store(true)
}()

// Give the waiter a chance to subscribe
time.Sleep(50 * time.Millisecond)

wg.Add(1)
go func() {
pool.Reset(oldHead, newHead)
reorgCompleted.Store(true)
wg.Done()
}()

// Wait for waiters
waitChan := make(chan struct{})
go func() {
wg.Wait()
close(waitChan)
}()
select {
case <-waitChan:
case <-time.After(time.Second):
t.Fatal("timeout waiting for waiters")
}

if pool.latestReorgHeight.Load() != newHead.Number.Int64() {
t.Errorf("expected height 5 after reorg, got %d", pool.latestReorgHeight.Load())
}
if !reorgCompleted.Load() {
t.Errorf("WaitForReorgHeight returned before reorg completed")
}
})

t.Run("multiple height wait", func(t *testing.T) {
pool, _ := setupPool()
defer pool.Close()

if pool.latestReorgHeight.Load() != 0 {
t.Fatalf("expected initial height 0, got %d", pool.latestReorgHeight.Load())
}

var reorgCompleted atomic.Bool
var waitCompleted atomic.Bool
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
ctx := context.Background()
pool.WaitForReorgHeight(ctx, 10)
waitCompleted.Store(true)
}()

// Give the waiter a chance to subscribe
time.Sleep(50 * time.Millisecond)

wg.Add(1)
go func() {
for i := 0; i < 20; i++ {
oldHead := &types.Header{Number: big.NewInt(int64(i)), BaseFee: big.NewInt(10)}
newHead := &types.Header{Number: big.NewInt(int64(i + 1)), BaseFee: big.NewInt(10)}
pool.Reset(oldHead, newHead)
}
reorgCompleted.Store(true)
fmt.Println("all resets done")
wg.Done()
}()

// Wait for waiters
waitChan := make(chan struct{})
go func() {
wg.Wait()
close(waitChan)
}()

select {
case <-waitChan:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for waiters")
}

if pool.latestReorgHeight.Load() != 10 {
t.Errorf("expected height 10 after reorg, got %d", pool.latestReorgHeight.Load())
}
if !reorgCompleted.Load() {
t.Errorf("WaitForReorgHeight returned before reorg completed")
}
})

t.Run("concurrent waiters", func(t *testing.T) {
pool, _ := setupPool()
defer pool.Close()

var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
pool.WaitForReorgHeight(context.Background(), 7)
}(i)
}

// Give all waiters time to subscribe
time.Sleep(100 * time.Millisecond)

// Trigger a single reorg
oldHead := &types.Header{Number: big.NewInt(0), BaseFee: big.NewInt(10)}
newHead := &types.Header{Number: big.NewInt(7), BaseFee: big.NewInt(10)}
pool.Reset(oldHead, newHead)

// Wait for all waiters to complete
waitChan := make(chan struct{})
go func() {
wg.Wait()
close(waitChan)
}()
select {
case <-waitChan:
case <-time.After(2 * time.Second):
t.Errorf("not all waiters completed in 2 seconds")
}
})
}

// TestPromoteExecutablesRecheckTx tests that promoteExecutables properly removes
// a transaction from all pools if it fails the RecheckTxFn.
func TestPromoteExecutablesRecheckTx(t *testing.T) {
Expand Down
Loading