Skip to content

Commit d298e6a

Browse files
authored
txpool: Reorganize Filter Checks (#572)
1 parent 02dfe86 commit d298e6a

File tree

4 files changed

+44
-36
lines changed

4 files changed

+44
-36
lines changed

core/txpool/blobpool/blobpool.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,10 @@ func (p *BlobPool) Filter(tx *types.Transaction) bool {
348348
return tx.Type() == types.BlobTxType
349349
}
350350

351+
func (p *BlobPool) SetIngressFilters(filters []txpool.IngressFilter) {
352+
// No-op, ingress filters are not supported in the blob pool
353+
}
354+
351355
// Init sets the gas price needed to keep a transaction in the pool and the chain
352356
// head to allow balance / nonce checks. The transaction journal will be loaded
353357
// from disk and filtered based on the provided starting settings.

core/txpool/legacypool/legacypool.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package legacypool
1919

2020
import (
21+
"context"
2122
"errors"
2223
"math"
2324
"math/big"
@@ -261,6 +262,10 @@ type LegacyPool struct {
261262
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
262263

263264
rollupCostFn txpool.RollupCostFunc // Additional rollup cost function, optional field, may be nil.
265+
266+
ingressFilters []txpool.IngressFilter // Filters to apply to incoming transactions
267+
filterCtx context.Context // Filters may use this context with external resources
268+
filterCancel context.CancelFunc // Cancel function for the filter context
264269
}
265270

266271
type txpoolResetRequest struct {
@@ -397,6 +402,10 @@ func (pool *LegacyPool) loop() {
397402

398403
// Close terminates the transaction pool.
399404
func (pool *LegacyPool) Close() error {
405+
// Cancel the filter context if it exists
406+
if pool.filterCancel != nil {
407+
pool.filterCancel()
408+
}
400409
// Terminate the pool reorger and return
401410
close(pool.reorgShutdownCh)
402411
pool.wg.Wait()
@@ -477,6 +486,13 @@ func (pool *LegacyPool) stats() (int, int) {
477486
return pending, queued
478487
}
479488

489+
func (pool *LegacyPool) SetIngressFilters(filters []txpool.IngressFilter) {
490+
pool.mu.Lock()
491+
defer pool.mu.Unlock()
492+
pool.filterCtx, pool.filterCancel = context.WithCancel(context.Background())
493+
pool.ingressFilters = filters
494+
}
495+
480496
// Content retrieves the data content of the transaction pool, returning all the
481497
// pending as well as queued transactions, grouped by account and sorted by nonce.
482498
func (pool *LegacyPool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
@@ -987,6 +1003,20 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, sync bool) []error {
9871003
invalidTxMeter.Mark(1)
9881004
continue
9891005
}
1006+
// Exclude transactions which fail the ingress filters
1007+
filtered := false
1008+
for _, filter := range pool.ingressFilters {
1009+
if !filter.FilterTx(pool.filterCtx, tx) {
1010+
errs[i] = core.ErrTxFilteredOut
1011+
log.Trace("Discarding filtered transaction", "hash", tx.Hash())
1012+
invalidTxMeter.Mark(1)
1013+
filtered = true
1014+
break
1015+
}
1016+
}
1017+
if filtered {
1018+
continue
1019+
}
9901020
// Accumulate all unknown transactions for deeper processing
9911021
news = append(news, tx)
9921022
}

core/txpool/subpool.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ type SubPool interface {
111111
// one another.
112112
Init(gasTip uint64, head *types.Header, reserve AddressReserver) error
113113

114+
SetIngressFilters([]IngressFilter)
115+
114116
// Close terminates any background processing threads and releases any held
115117
// resources.
116118
Close() error

core/txpool/txpool.go

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package txpool
1818

1919
import (
20-
"context"
2120
"errors"
2221
"fmt"
2322
"math/big"
@@ -77,32 +76,22 @@ type TxPool struct {
7776
term chan struct{} // Termination channel to detect a closed pool
7877

7978
sync chan chan error // Testing / simulator channel to block until internal reset is done
80-
81-
ingressFilters []IngressFilter // List of filters to apply to incoming transactions
82-
83-
filterCtx context.Context // Filters may use external resources
84-
filterCancel context.CancelFunc // Filter calls are cancelled on shutdown
8579
}
8680

8781
// New creates a new transaction pool to gather, sort and filter inbound
8882
// transactions from the network.
89-
func New(gasTip uint64, chain BlockChain, subpools []SubPool, poolFilters []IngressFilter) (*TxPool, error) {
83+
func New(gasTip uint64, chain BlockChain, subpools []SubPool, ingressFilters []IngressFilter) (*TxPool, error) {
9084
// Retrieve the current head so that all subpools and this main coordinator
9185
// pool will have the same starting state, even if the chain moves forward
9286
// during initialization.
9387
head := chain.CurrentBlock()
9488

95-
filterCtx, filterCancel := context.WithCancel(context.Background())
96-
9789
pool := &TxPool{
98-
subpools: subpools,
99-
reservations: make(map[common.Address]SubPool),
100-
quit: make(chan chan error),
101-
term: make(chan struct{}),
102-
sync: make(chan chan error),
103-
ingressFilters: poolFilters,
104-
filterCtx: filterCtx,
105-
filterCancel: filterCancel,
90+
subpools: subpools,
91+
reservations: make(map[common.Address]SubPool),
92+
quit: make(chan chan error),
93+
term: make(chan struct{}),
94+
sync: make(chan chan error),
10695
}
10796
for i, subpool := range subpools {
10897
if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil {
@@ -111,6 +100,8 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool, poolFilters []Ingr
111100
}
112101
return nil, err
113102
}
103+
// Set the ingress filters for the subpool
104+
subpool.SetIngressFilters(ingressFilters)
114105
}
115106
go pool.loop(head, chain)
116107
return pool, nil
@@ -166,8 +157,6 @@ func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver {
166157
func (p *TxPool) Close() error {
167158
var errs []error
168159

169-
p.filterCancel() // Cancel filter work, these in-flight txs will be not be allowed through before shutdown
170-
171160
// Terminate the reset loop and wait for it to finish
172161
errc := make(chan error)
173162
p.quit <- errc
@@ -350,23 +339,11 @@ func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error {
350339
// so we can piece back the returned errors into the original order.
351340
txsets := make([][]*types.Transaction, len(p.subpools))
352341
splits := make([]int, len(txs))
353-
filtered_out := make([]bool, len(txs))
354342

355343
for i, tx := range txs {
356344
// Mark this transaction belonging to no-subpool
357345
splits[i] = -1
358346

359-
// Filter the transaction through the ingress filters
360-
for _, f := range p.ingressFilters {
361-
if !f.FilterTx(p.filterCtx, tx) {
362-
filtered_out[i] = true
363-
}
364-
}
365-
// if the transaction is filtered out, don't add it to any subpool
366-
if filtered_out[i] {
367-
continue
368-
}
369-
370347
// Try to find a subpool that accepts the transaction
371348
for j, subpool := range p.subpools {
372349
if subpool.Filter(tx) {
@@ -384,11 +361,6 @@ func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error {
384361
}
385362
errs := make([]error, len(txs))
386363
for i, split := range splits {
387-
// If the transaction was filtered out, mark it as such
388-
if filtered_out[i] {
389-
errs[i] = core.ErrTxFilteredOut
390-
continue
391-
}
392364
// If the transaction was rejected by all subpools, mark it unsupported
393365
if split == -1 {
394366
errs[i] = fmt.Errorf("%w: received type %d", core.ErrTxTypeNotSupported, txs[i].Type())

0 commit comments

Comments
 (0)