Skip to content

Commit 91c5a2d

Browse files
committed
OEV-677 Add Meta Error Handler
1 parent a822f17 commit 91c5a2d

File tree

8 files changed

+93
-32
lines changed

8 files changed

+93
-32
lines changed

pkg/txm/clientwrappers/dualbroadcast/meta_client.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@ import (
2727
)
2828

2929
const (
30-
timeout = time.Second * 5
31-
metaABI = `[
30+
timeout = time.Second * 5
31+
NoBidsError = "no bids"
32+
NoSolverOps = "no solver operations received"
33+
NoSolverOpsAfterSimulation = "no valid solver operations after simulation"
34+
metaABI = `[
3235
{
3336
"type": "function",
3437
"name": "metacall",
@@ -142,7 +145,7 @@ func NewMetaClient(lggr logger.Logger, c MetaClientRPC, ks MetaClientKeystore, c
142145
}
143146

144147
return &MetaClient{
145-
lggr: logger.Sugared(logger.Named(lggr, "Txm.Txm.MetaClient")),
148+
lggr: logger.Sugared(logger.Named(lggr, "Txm.MetaClient")),
146149
c: c,
147150
ks: ks,
148151
customURL: customURL,
@@ -179,7 +182,7 @@ func (a *MetaClient) SendTransaction(ctx context.Context, tx *types.Transaction,
179182
return nil
180183
}
181184
a.lggr.Infof("No bids for transactionID(%d): ", tx.ID)
182-
return nil
185+
return errors.New(NoBidsError)
183186
}
184187
a.lggr.Infow("Broadcasting attempt to public mempool", "tx", tx)
185188
return a.c.SendTransaction(ctx, attempt.SignedTransaction)
@@ -355,7 +358,7 @@ func (a *MetaClient) SendRequest(parentCtx context.Context, tx *types.Transactio
355358
}
356359

357360
if response.Error.ErrorMessage != "" {
358-
if strings.Contains(response.Error.ErrorMessage, "no solver operations received") {
361+
if strings.Contains(response.Error.ErrorMessage, NoSolverOps) || strings.Contains(response.Error.ErrorMessage, NoSolverOpsAfterSimulation) {
359362
a.metrics.RecordBidsReceived(ctx, 0)
360363
return nil, nil
361364
}
@@ -521,7 +524,7 @@ func (a *MetaClient) SendOperation(ctx context.Context, tx *types.Transaction, a
521524
if err != nil {
522525
return fmt.Errorf("failed to sign attempt for txID: %v, err: %w", tx.ID, err)
523526
}
524-
a.lggr.Infow("Intercepted attempt for tx", "txID", tx.ID, "toAddress", meta.ToAddress, "gasLimit", meta.GasLimit,
527+
a.lggr.Infow("Intercepted attempt for tx", "txID", tx.ID, "hash", signedTx.Hash(), "toAddress", meta.ToAddress, "gasLimit", meta.GasLimit,
525528
"TipCap", tip, "FeeCap", meta.MaxFeePerGas)
526529
return a.c.SendTransaction(ctx, signedTx)
527530
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package dualbroadcast
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/ethereum/go-ethereum/common"
9+
10+
"github.com/smartcontractkit/chainlink-evm/pkg/txm"
11+
"github.com/smartcontractkit/chainlink-evm/pkg/txm/types"
12+
)
13+
14+
type errorHandler struct{}
15+
16+
func NewErrorHandler() *errorHandler {
17+
return &errorHandler{}
18+
}
19+
20+
func (e *errorHandler) HandleError(ctx context.Context, tx *types.Transaction, txErr error, txStore txm.TxStore, setNonce func(common.Address, uint64), isFromBroadcastMethod bool) error {
21+
// If this isn't the first broadcast, don't mark the tx as fatal as other txs might be included on-chain.
22+
if strings.Contains(txErr.Error(), NoBidsError) && tx.AttemptCount == 1 {
23+
if err := txStore.MarkTxFatal(ctx, tx, tx.FromAddress); err != nil {
24+
return err
25+
}
26+
setNonce(tx.FromAddress, *tx.Nonce)
27+
return fmt.Errorf("transaction with txID: %d marked as fatal", tx.ID)
28+
}
29+
30+
return txErr
31+
}

pkg/txm/clientwrappers/dualbroadcast/selector.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,16 @@ import (
1212
"github.com/smartcontractkit/chainlink-evm/pkg/txm"
1313
)
1414

15-
func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int) (txm.Client, error) {
15+
func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int) (txm.Client, txm.ErrorHandler, error) {
1616
urlString := url.String()
1717
switch {
1818
case strings.Contains(urlString, "flashbots"):
19-
return NewFlashbotsClient(client, keyStore, url), nil
19+
return NewFlashbotsClient(client, keyStore, url), nil, nil
2020
default:
21-
return NewMetaClient(lggr, client, keyStore, url, chainID)
21+
mc, err := NewMetaClient(lggr, client, keyStore, url, chainID)
22+
if err != nil {
23+
return nil, nil, err
24+
}
25+
return mc, NewErrorHandler(), nil
2226
}
2327
}

pkg/txm/storage/inmemory_store.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package storage
22

33
import (
4-
"errors"
54
"fmt"
65
"math/big"
76
"sort"
@@ -122,6 +121,7 @@ func (m *InMemoryStore) CreateEmptyUnconfirmedTransaction(nonce uint64, gasLimit
122121
SpecifiedGasLimit: gasLimit,
123122
CreatedAt: time.Now(),
124123
State: txmgr.TxUnconfirmed,
124+
IsPurgeable: true,
125125
}
126126

127127
if _, exists := m.UnconfirmedTransactions[nonce]; exists {
@@ -366,8 +366,15 @@ func (m *InMemoryStore) DeleteAttemptForUnconfirmedTx(transactionNonce uint64, a
366366
return fmt.Errorf("attempt with hash: %v for txID: %v was not found", attempt.Hash, attempt.TxID)
367367
}
368368

369-
func (m *InMemoryStore) MarkTxFatal(*types.Transaction) error {
370-
return errors.New("not implemented")
369+
func (m *InMemoryStore) MarkTxFatal(txToMark *types.Transaction) error {
370+
m.Lock()
371+
defer m.Unlock()
372+
373+
// TODO: for now do the simple thing and drop the transaction instead of adding it to the fatal queue.
374+
delete(m.UnconfirmedTransactions, *txToMark.Nonce)
375+
delete(m.Transactions, txToMark.ID)
376+
txToMark.State = txmgr.TxFatalError // update the state in case the caller needs to log
377+
return nil
371378
}
372379

373380
// Orchestrator

pkg/txm/storage/inmemory_store_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,20 @@ func TestFindTxWithIdempotencyKey(t *testing.T) {
481481
assert.Nil(t, itx)
482482
}
483483

484+
func TestMarkTxFatal(t *testing.T) {
485+
t.Parallel()
486+
fromAddress := testutils.NewAddress()
487+
m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID)
488+
489+
var nonce uint64 = 1
490+
tx, err := insertUnconfirmedTransaction(m, nonce)
491+
require.NoError(t, err)
492+
require.NoError(t, m.MarkTxFatal(tx))
493+
assert.Equal(t, txmgr.TxFatalError, tx.State)
494+
assert.Empty(t, m.UnconfirmedTransactions)
495+
assert.Empty(t, m.Transactions)
496+
}
497+
484498
func TestPruneConfirmedTransactions(t *testing.T) {
485499
t.Parallel()
486500
fromAddress := testutils.NewAddress()

pkg/txm/txm.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type AttemptBuilder interface {
5454
}
5555

5656
type ErrorHandler interface {
57-
HandleError(*types.Transaction, error, AttemptBuilder, Client, TxStore, func(common.Address, uint64), bool) (err error)
57+
HandleError(context.Context, *types.Transaction, error, TxStore, func(common.Address, uint64), bool) (err error)
5858
}
5959

6060
type StuckTxDetector interface {
@@ -93,7 +93,7 @@ type Txm struct {
9393
wg sync.WaitGroup
9494
}
9595

96-
func NewTxm(lggr logger.Logger, chainID *big.Int, client Client, attemptBuilder AttemptBuilder, txStore TxStore, stuckTxDetector StuckTxDetector, config Config, keystore keys.AddressLister) *Txm {
96+
func NewTxm(lggr logger.Logger, chainID *big.Int, client Client, attemptBuilder AttemptBuilder, txStore TxStore, stuckTxDetector StuckTxDetector, config Config, keystore keys.AddressLister, errorHandler ErrorHandler) *Txm {
9797
return &Txm{
9898
lggr: logger.Sugared(logger.Named(lggr, "Txm")),
9999
keystore: keystore,
@@ -103,6 +103,7 @@ func NewTxm(lggr logger.Logger, chainID *big.Int, client Client, attemptBuilder
103103
txStore: txStore,
104104
stuckTxDetector: stuckTxDetector,
105105
config: config,
106+
errorHandler: errorHandler,
106107
nonceMap: make(map[common.Address]uint64),
107108
triggerCh: make(map[common.Address]chan struct{}),
108109
}
@@ -328,7 +329,7 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio
328329
tx.AttemptCount++
329330
t.lggr.Infow("Broadcasted attempt", "tx", tx, "attempt", attempt, "duration", time.Since(start), "txErr: ", txErr)
330331
if txErr != nil && t.errorHandler != nil {
331-
if err = t.errorHandler.HandleError(tx, txErr, t.attemptBuilder, t.client, t.txStore, t.setNonce, false); err != nil {
332+
if err = t.errorHandler.HandleError(ctx, tx, txErr, t.txStore, t.setNonce, false); err != nil {
332333
return
333334
}
334335
} else if txErr != nil {

pkg/txm/txm_test.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestLifecycle(t *testing.T) {
3939
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
4040
require.NoError(t, txStore.Add(address1))
4141
keystore := keystest.Addresses{address1}
42-
txm := NewTxm(lggr, testutils.FixtureChainID, client, nil, txStore, nil, config, keystore)
42+
txm := NewTxm(lggr, testutils.FixtureChainID, client, nil, txStore, nil, config, keystore, nil)
4343
client.On("PendingNonceAt", mock.Anything, address1).Return(uint64(0), errors.New("error")).Once()
4444
client.On("PendingNonceAt", mock.Anything, address1).Return(uint64(100), nil).Once()
4545
servicetest.Run(t, txm)
@@ -53,7 +53,7 @@ func TestLifecycle(t *testing.T) {
5353
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
5454
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
5555
require.NoError(t, txStore.Add(addresses...))
56-
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
56+
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
5757
var nonce uint64
5858
// Start
5959
client.On("PendingNonceAt", mock.Anything, address1).Return(nonce, nil).Once()
@@ -74,7 +74,7 @@ func TestTrigger(t *testing.T) {
7474

7575
t.Run("Trigger fails if Txm is unstarted", func(t *testing.T) {
7676
lggr, observedLogs := logger.TestObserved(t, zap.ErrorLevel)
77-
txm := NewTxm(lggr, nil, nil, nil, nil, nil, Config{}, keystest.Addresses{})
77+
txm := NewTxm(lggr, nil, nil, nil, nil, nil, Config{}, keystest.Addresses{}, nil)
7878
txm.Trigger(address)
7979
tests.AssertLogEventually(t, observedLogs, "Txm unstarted")
8080
})
@@ -87,7 +87,7 @@ func TestTrigger(t *testing.T) {
8787
ab := newMockAttemptBuilder(t)
8888
config := Config{BlockTime: 1 * time.Minute, RetryBlockThreshold: 10}
8989
keystore := keystest.Addresses{address}
90-
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
90+
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
9191
var nonce uint64
9292
// Start
9393
client.On("PendingNonceAt", mock.Anything, address).Return(nonce, nil).Maybe()
@@ -109,7 +109,7 @@ func TestBroadcastTransaction(t *testing.T) {
109109
t.Run("fails if FetchUnconfirmedTransactionAtNonceWithCount for unconfirmed transactions fails", func(t *testing.T) {
110110
mTxStore := newMockTxStore(t)
111111
mTxStore.On("FetchUnconfirmedTransactionAtNonceWithCount", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, errors.New("call failed")).Once()
112-
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore)
112+
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore, nil)
113113
bo, err := txm.broadcastTransaction(ctx, address)
114114
require.Error(t, err)
115115
assert.False(t, bo)
@@ -120,7 +120,7 @@ func TestBroadcastTransaction(t *testing.T) {
120120
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
121121
mTxStore := newMockTxStore(t)
122122
mTxStore.On("FetchUnconfirmedTransactionAtNonceWithCount", mock.Anything, mock.Anything, mock.Anything).Return(nil, maxInFlightTransactions+1, nil).Once()
123-
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore)
123+
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore, nil)
124124
bo, err := txm.broadcastTransaction(ctx, address)
125125
assert.True(t, bo)
126126
require.NoError(t, err)
@@ -130,7 +130,7 @@ func TestBroadcastTransaction(t *testing.T) {
130130
t.Run("checks pending nonce if unconfirmed transactions are equal or more than maxInFlightSubset", func(t *testing.T) {
131131
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
132132
mTxStore := newMockTxStore(t)
133-
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore)
133+
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore, nil)
134134
txm.setNonce(address, 1)
135135
mTxStore.On("FetchUnconfirmedTransactionAtNonceWithCount", mock.Anything, mock.Anything, mock.Anything).Return(nil, maxInFlightSubset, nil).Twice()
136136

@@ -150,7 +150,7 @@ func TestBroadcastTransaction(t *testing.T) {
150150
t.Run("fails if UpdateUnstartedTransactionWithNonce fails", func(t *testing.T) {
151151
mTxStore := newMockTxStore(t)
152152
mTxStore.On("FetchUnconfirmedTransactionAtNonceWithCount", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, nil).Once()
153-
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore)
153+
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore, nil)
154154
mTxStore.On("UpdateUnstartedTransactionWithNonce", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("call failed")).Once()
155155
bo, err := txm.broadcastTransaction(ctx, address)
156156
assert.False(t, bo)
@@ -162,7 +162,7 @@ func TestBroadcastTransaction(t *testing.T) {
162162
lggr := logger.Test(t)
163163
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
164164
require.NoError(t, txStore.Add(address))
165-
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
165+
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
166166
bo, err := txm.broadcastTransaction(ctx, address)
167167
require.NoError(t, err)
168168
assert.False(t, bo)
@@ -173,7 +173,7 @@ func TestBroadcastTransaction(t *testing.T) {
173173
lggr := logger.Test(t)
174174
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
175175
require.NoError(t, txStore.Add(address))
176-
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
176+
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
177177
txm.setNonce(address, 8)
178178
metrics, err := NewTxmMetrics(testutils.FixtureChainID)
179179
require.NoError(t, err)
@@ -222,7 +222,7 @@ func TestBackfillTransactions(t *testing.T) {
222222

223223
t.Run("fails if latest nonce fetching fails", func(t *testing.T) {
224224
ab := newMockAttemptBuilder(t)
225-
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
225+
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
226226
client.On("NonceAt", mock.Anything, address, mock.Anything).Return(uint64(0), errors.New("latest nonce fail")).Once()
227227
err := txm.backfillTransactions(t.Context(), address)
228228
require.Error(t, err)
@@ -231,7 +231,7 @@ func TestBackfillTransactions(t *testing.T) {
231231

232232
t.Run("fails if MarkConfirmedAndReorgedTransactions fails", func(t *testing.T) {
233233
ab := newMockAttemptBuilder(t)
234-
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
234+
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
235235
client.On("NonceAt", mock.Anything, address, mock.Anything).Return(uint64(0), nil).Once()
236236
txStore.On("MarkConfirmedAndReorgedTransactions", mock.Anything, mock.Anything, address).
237237
Return([]*types.Transaction{}, []uint64{}, errors.New("marking transactions confirmed failed")).Once()
@@ -246,7 +246,7 @@ func TestBackfillTransactions(t *testing.T) {
246246
require.NoError(t, txStore.Add(address))
247247
ab := newMockAttemptBuilder(t)
248248
c := Config{EIP1559: false, BlockTime: 10 * time.Minute, RetryBlockThreshold: 10, EmptyTxLimitDefault: 22000}
249-
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, c, keystore)
249+
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, c, keystore, nil)
250250
emptyMetrics, err := NewTxmMetrics(testutils.FixtureChainID)
251251
require.NoError(t, err)
252252
txm.metrics = emptyMetrics
@@ -287,7 +287,7 @@ func TestBackfillTransactions(t *testing.T) {
287287
require.NoError(t, txStore.Add(address))
288288
ab := newMockAttemptBuilder(t)
289289
c := Config{EIP1559: false, BlockTime: 1 * time.Second, RetryBlockThreshold: 1, EmptyTxLimitDefault: 22000}
290-
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, c, keystore)
290+
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, c, keystore, nil)
291291
emptyMetrics, err := NewTxmMetrics(testutils.FixtureChainID)
292292
require.NoError(t, err)
293293
txm.metrics = emptyMetrics
@@ -326,7 +326,7 @@ func TestBackfillTransactions(t *testing.T) {
326326
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
327327
require.NoError(t, txStore.Add(address))
328328
ab := newMockAttemptBuilder(t)
329-
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
329+
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
330330
var nonce uint64 = 8
331331
txm.setNonce(address, nonce)
332332
metrics, err := NewTxmMetrics(testutils.FixtureChainID)

pkg/txmgr/builder.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,18 @@ func NewTxmV2(
147147
RetryBlockThreshold: uint16(fCfg.BumpThreshold()),
148148
EmptyTxLimitDefault: emptyTxLimitDefault,
149149
}
150+
var eh txm.ErrorHandler
150151
var c txm.Client
151152
if txmV2Config.DualBroadcast() != nil && *txmV2Config.DualBroadcast() && txmV2Config.CustomURL() != nil {
152153
var err error
153-
c, err = dualbroadcast.SelectClient(lggr, client, keyStore, txmV2Config.CustomURL(), chainID)
154+
c, eh, err = dualbroadcast.SelectClient(lggr, client, keyStore, txmV2Config.CustomURL(), chainID)
154155
if err != nil {
155156
return nil, fmt.Errorf("failed to create dual broadcast client: %w", err)
156157
}
157158
} else {
158159
c = clientwrappers.NewChainClient(client)
159160
}
160-
t := txm.NewTxm(lggr, chainID, c, attemptBuilder, inMemoryStoreManager, stuckTxDetector, config, keyStore)
161+
t := txm.NewTxm(lggr, chainID, c, attemptBuilder, inMemoryStoreManager, stuckTxDetector, config, keyStore, eh)
161162
return txm.NewTxmOrchestrator(lggr, chainID, t, inMemoryStoreManager, fwdMgr, keyStore, attemptBuilder), nil
162163
}
163164

0 commit comments

Comments
 (0)