-
Notifications
You must be signed in to change notification settings - Fork 128
feat(mempool): Block EVM mempool Select on legacypool reorg #867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feat/krakatoa
Are you sure you want to change the base?
Changes from 5 commits
c9beb5c
df1fb75
c701d97
926b590
4e47fd4
d38f0dc
56d8315
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| package legacypool | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "maps" | ||
| "math/big" | ||
|
|
@@ -255,13 +256,15 @@ type LegacyPool struct { | |
| all *lookup // All transactions to allow lookups | ||
| priced *pricedList // All transactions sorted by price | ||
|
|
||
| reqResetCh chan *txpoolResetRequest | ||
| reqPromoteCh chan *accountSet | ||
| queueTxEventCh chan *types.Transaction | ||
| reorgDoneCh chan chan struct{} | ||
| reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop | ||
| wg sync.WaitGroup // tracks loop, scheduleReorgLoop | ||
| initDoneCh chan struct{} // is closed once the pool is initialized (for tests) | ||
| reqResetCh chan *txpoolResetRequest | ||
| reqPromoteCh chan *accountSet | ||
| queueTxEventCh chan *types.Transaction | ||
| reorgDoneCh chan chan struct{} | ||
| reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop | ||
| reorgSubscriptionCh chan struct{} // notifies the reorg loop that a subscriber wants to wait on nextDone | ||
| wg sync.WaitGroup // tracks loop, scheduleReorgLoop | ||
| initDoneCh chan struct{} // is closed once the pool is initialized (for tests) | ||
| latestReorgHeight atomic.Int64 // Latest height that the reorg loop has completed | ||
|
|
||
| changesSinceReorg int // A counter for how many drops we've performed in-between reorg. | ||
|
|
||
|
|
@@ -282,22 +285,24 @@ func New(config Config, chain BlockChain) *LegacyPool { | |
|
|
||
| // Create the transaction pool with its initial settings | ||
| pool := &LegacyPool{ | ||
| config: config, | ||
| chain: chain, | ||
| chainconfig: chain.Config(), | ||
| signer: types.LatestSigner(chain.Config()), | ||
| pending: make(map[common.Address]*list), | ||
| queue: make(map[common.Address]*list), | ||
| beats: make(map[common.Address]time.Time), | ||
| all: newLookup(), | ||
| reqResetCh: make(chan *txpoolResetRequest), | ||
| reqPromoteCh: make(chan *accountSet), | ||
| queueTxEventCh: make(chan *types.Transaction), | ||
| reorgDoneCh: make(chan chan struct{}), | ||
| reorgShutdownCh: make(chan struct{}), | ||
| initDoneCh: make(chan struct{}), | ||
| config: config, | ||
| chain: chain, | ||
| chainconfig: chain.Config(), | ||
| signer: types.LatestSigner(chain.Config()), | ||
| pending: make(map[common.Address]*list), | ||
| queue: make(map[common.Address]*list), | ||
| beats: make(map[common.Address]time.Time), | ||
| all: newLookup(), | ||
| reqResetCh: make(chan *txpoolResetRequest), | ||
| reqPromoteCh: make(chan *accountSet), | ||
| queueTxEventCh: make(chan *types.Transaction), | ||
| reorgDoneCh: make(chan chan struct{}), | ||
| reorgShutdownCh: make(chan struct{}), | ||
| reorgSubscriptionCh: make(chan struct{}), | ||
| initDoneCh: make(chan struct{}), | ||
| } | ||
| pool.priced = newPricedList(pool.all) | ||
| pool.latestReorgHeight.Store(0) | ||
|
|
||
| return pool | ||
| } | ||
|
|
@@ -1262,7 +1267,9 @@ func (pool *LegacyPool) scheduleReorgLoop() { | |
| queuedEvents[addr] = NewSortedMap() | ||
| } | ||
| queuedEvents[addr].Put(tx) | ||
|
|
||
| case <-pool.reorgSubscriptionCh: | ||
| launchNextRun = true | ||
| pool.reorgDoneCh <- nextDone | ||
| case <-curDone: | ||
| curDone = nil | ||
|
|
||
|
|
@@ -1342,6 +1349,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, | |
|
|
||
| dropBetweenReorgHistogram.Update(int64(pool.changesSinceReorg)) | ||
| pool.changesSinceReorg = 0 // Reset change counter | ||
| if reset != nil && reset.newHead != nil { | ||
| pool.latestReorgHeight.Store(reset.newHead.Number.Int64()) | ||
| } | ||
| pool.mu.Unlock() | ||
|
|
||
| // Notify subsystems for newly added transactions | ||
|
|
@@ -1963,6 +1973,41 @@ func (pool *LegacyPool) Clear() { | |
| pool.pendingNonces = newNoncer(pool.currentState) | ||
| } | ||
|
|
||
| // WaitForReorgHeight blocks until the reorg loop has reset at a head with | ||
| // height >= height. If the context is cancelled or the pool is shutting down, | ||
| // this will also return. | ||
| func (pool *LegacyPool) WaitForReorgHeight(ctx context.Context, height int64) { | ||
| for pool.latestReorgHeight.Load() < height { | ||
| sub, err := pool.SubscribeToNextReorg() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we have not run the reorg loop for the target height yet, we wait for the outcome of the next iteration of the loop. We are explicitly not telling the reorg loop to run here, since that would simply run it again, but not increment it to a new height (since we would need to pass the latest headers to it in order for that to happen). Also if we kick off a new run here and dont increment the |
||
| if err != nil { | ||
| return | ||
| } | ||
|
|
||
| // need to check again in case reorg has finished in between initial | ||
| // check and subscribing to next reorg | ||
| if pool.latestReorgHeight.Load() >= height { | ||
| return | ||
| } | ||
|
|
||
| select { | ||
| case <-sub: | ||
| case <-ctx.Done(): | ||
| return | ||
|
Comment on lines
+1995
to
+1996
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. likely a better way to handle this, but mempool.Select doesnt return an error, so if the context cancels during this call, then we potentially allow invalid txs to be selected, which will just be invalidated. If this happens we likely timeout propose. Not 100% sure what it means for the context to be cancelled here (is app shutting down?), based on what's actually happening panicing may be better. Likely this is a follow on to this, probably not important right now. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| // SubscribeToNextReorg returns a channel that will close when the next reorg | ||
| // loop completes. An error is returned if the loop is shutting down. | ||
| func (pool *LegacyPool) SubscribeToNextReorg() (chan struct{}, error) { | ||
| select { | ||
| case pool.reorgSubscriptionCh <- struct{}{}: | ||
| return <-pool.reorgDoneCh, nil | ||
| case <-pool.reorgShutdownCh: | ||
| return nil, errors.New("shutdown") | ||
| } | ||
| } | ||
|
|
||
| // HasPendingAuth returns a flag indicating whether there are pending | ||
| // authorizations from the specific address cached in the pool. | ||
| func (pool *LegacyPool) HasPendingAuth(addr common.Address) bool { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added ability for arbitrary subscriptions to the completion of
nextDone(when the "next" reorg loop will complete). This works by a a user pushing a request to subscribe onto this channel, then they must immediately listed on thepool.ReorgDoneCh. This loop will pushnextDoneonto that channel. The user can then wait on the closure ofnextDone, which essentially broadcasts to all holders of this channel (subscribers to the next run of the reorg loop) that is has completed.