Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions pkg/txm/clientwrappers/dualbroadcast/error_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package dualbroadcast

import (
"context"
"fmt"
"strings"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-evm/pkg/txm"
"github.com/smartcontractkit/chainlink-evm/pkg/txm/types"
)

type errorHandler struct{}

func NewErrorHandler() *errorHandler {
return &errorHandler{}
}

func (e *errorHandler) HandleError(
ctx context.Context,
tx *types.Transaction,
txErr error,
txStore txm.TxStore,
setNonce func(common.Address, uint64),
isFromBroadcastMethod bool,
) error {
// If there are past broadcasts, don't mark the tx as fatal as they might be included on-chain.
if strings.Contains(txErr.Error(), NoBidsError) && tx.AttemptCount == 1 {
if err := txStore.MarkTxFatal(ctx, tx, tx.FromAddress); err != nil {
return err
}
setNonce(tx.FromAddress, *tx.Nonce)
return fmt.Errorf("transaction with txID: %d marked as fatal", tx.ID)
}

return txErr
}
11 changes: 6 additions & 5 deletions pkg/txm/clientwrappers/dualbroadcast/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
)

const (
timeout = time.Second * 5
metaABI = `[
timeout = time.Second * 5
NoBidsError = "no bids"
metaABI = `[
{
"type": "function",
"name": "metacall",
Expand Down Expand Up @@ -142,7 +143,7 @@ func NewMetaClient(lggr logger.Logger, c MetaClientRPC, ks MetaClientKeystore, c
}

return &MetaClient{
lggr: logger.Sugared(logger.Named(lggr, "Txm.Txm.MetaClient")),
lggr: logger.Sugared(logger.Named(lggr, "Txm.MetaClient")),
c: c,
ks: ks,
customURL: customURL,
Expand Down Expand Up @@ -179,7 +180,7 @@ func (a *MetaClient) SendTransaction(ctx context.Context, tx *types.Transaction,
return nil
}
a.lggr.Infof("No bids for transactionID(%d): ", tx.ID)
return nil
return errors.New(NoBidsError)
}
a.lggr.Infow("Broadcasting attempt to public mempool", "tx", tx)
return a.c.SendTransaction(ctx, attempt.SignedTransaction)
Expand Down Expand Up @@ -521,7 +522,7 @@ func (a *MetaClient) SendOperation(ctx context.Context, tx *types.Transaction, a
if err != nil {
return fmt.Errorf("failed to sign attempt for txID: %v, err: %w", tx.ID, err)
}
a.lggr.Infow("Intercepted attempt for tx", "txID", tx.ID, "toAddress", meta.ToAddress, "gasLimit", meta.GasLimit,
a.lggr.Infow("Intercepted attempt for tx", "txID", tx.ID, "hash", signedTx.Hash(), "toAddress", meta.ToAddress, "gasLimit", meta.GasLimit,
"TipCap", tip, "FeeCap", meta.MaxFeePerGas)
return a.c.SendTransaction(ctx, signedTx)
}
10 changes: 7 additions & 3 deletions pkg/txm/clientwrappers/dualbroadcast/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ import (
"github.com/smartcontractkit/chainlink-evm/pkg/txm"
)

func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int) (txm.Client, error) {
func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int) (txm.Client, txm.ErrorHandler, error) {
urlString := url.String()
switch {
case strings.Contains(urlString, "flashbots"):
return NewFlashbotsClient(client, keyStore, url), nil
return NewFlashbotsClient(client, keyStore, url), nil, nil
default:
return NewMetaClient(lggr, client, keyStore, url, chainID)
mc, err := NewMetaClient(lggr, client, keyStore, url, chainID)
if err != nil {
return nil, nil, err
}
return mc, NewErrorHandler(), nil
}
}
13 changes: 10 additions & 3 deletions pkg/txm/storage/inmemory_store.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package storage

import (
"errors"
"fmt"
"math/big"
"sort"
Expand Down Expand Up @@ -122,6 +121,7 @@ func (m *InMemoryStore) CreateEmptyUnconfirmedTransaction(nonce uint64, gasLimit
SpecifiedGasLimit: gasLimit,
CreatedAt: time.Now(),
State: txmgr.TxUnconfirmed,
IsPurgeable: true,
}

if _, exists := m.UnconfirmedTransactions[nonce]; exists {
Expand Down Expand Up @@ -366,8 +366,15 @@ func (m *InMemoryStore) DeleteAttemptForUnconfirmedTx(transactionNonce uint64, a
return fmt.Errorf("attempt with hash: %v for txID: %v was not found", attempt.Hash, attempt.TxID)
}

func (m *InMemoryStore) MarkTxFatal(*types.Transaction) error {
return errors.New("not implemented")
func (m *InMemoryStore) MarkTxFatal(txToMark *types.Transaction) error {
m.Lock()
defer m.Unlock()

// TODO: for now do the simple thing and drop the transaction.
delete(m.UnconfirmedTransactions, *txToMark.Nonce)
delete(m.Transactions, txToMark.ID)
txToMark.State = txmgr.TxFatalError // update the state in case the caller needs to log
return nil
}

// Orchestrator
Expand Down
17 changes: 14 additions & 3 deletions pkg/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type AttemptBuilder interface {
}

type ErrorHandler interface {
HandleError(*types.Transaction, error, AttemptBuilder, Client, TxStore, func(common.Address, uint64), bool) (err error)
HandleError(context.Context, *types.Transaction, error, TxStore, func(common.Address, uint64), bool) (err error)
}

type StuckTxDetector interface {
Expand Down Expand Up @@ -93,7 +93,17 @@ type Txm struct {
wg sync.WaitGroup
}

func NewTxm(lggr logger.Logger, chainID *big.Int, client Client, attemptBuilder AttemptBuilder, txStore TxStore, stuckTxDetector StuckTxDetector, config Config, keystore keys.AddressLister) *Txm {
func NewTxm(
lggr logger.Logger,
chainID *big.Int,
client Client,
attemptBuilder AttemptBuilder,
txStore TxStore,
stuckTxDetector StuckTxDetector,
config Config,
keystore keys.AddressLister,
errorHandler ErrorHandler,
) *Txm {
return &Txm{
lggr: logger.Sugared(logger.Named(lggr, "Txm")),
keystore: keystore,
Expand All @@ -103,6 +113,7 @@ func NewTxm(lggr logger.Logger, chainID *big.Int, client Client, attemptBuilder
txStore: txStore,
stuckTxDetector: stuckTxDetector,
config: config,
errorHandler: errorHandler,
nonceMap: make(map[common.Address]uint64),
triggerCh: make(map[common.Address]chan struct{}),
}
Expand Down Expand Up @@ -348,7 +359,7 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio
tx.AttemptCount++
t.lggr.Infow("Broadcasted attempt", "tx", tx, "attempt", attempt, "duration", time.Since(start), "txErr: ", txErr)
if txErr != nil && t.errorHandler != nil {
if err = t.errorHandler.HandleError(tx, txErr, t.attemptBuilder, t.client, t.txStore, t.setNonce, false); err != nil {
if err = t.errorHandler.HandleError(ctx, tx, txErr, t.txStore, t.setNonce, false); err != nil {
return
}
} else if txErr != nil {
Expand Down
30 changes: 15 additions & 15 deletions pkg/txm/txm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestLifecycle(t *testing.T) {
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
require.NoError(t, txStore.Add(address1))
keystore := keystest.Addresses{address1}
txm := NewTxm(lggr, testutils.FixtureChainID, client, nil, txStore, nil, config, keystore)
txm := NewTxm(lggr, testutils.FixtureChainID, client, nil, txStore, nil, config, keystore, nil)
client.On("PendingNonceAt", mock.Anything, address1).Return(uint64(0), errors.New("error")).Once()
client.On("PendingNonceAt", mock.Anything, address1).Return(uint64(100), nil).Once()
servicetest.Run(t, txm)
Expand All @@ -53,7 +53,7 @@ func TestLifecycle(t *testing.T) {
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
require.NoError(t, txStore.Add(addresses...))
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
var nonce uint64
// Start
client.On("PendingNonceAt", mock.Anything, address1).Return(nonce, nil).Once()
Expand All @@ -74,7 +74,7 @@ func TestTrigger(t *testing.T) {

t.Run("Trigger fails if Txm is unstarted", func(t *testing.T) {
lggr, observedLogs := logger.TestObserved(t, zap.ErrorLevel)
txm := NewTxm(lggr, nil, nil, nil, nil, nil, Config{}, keystest.Addresses{})
txm := NewTxm(lggr, nil, nil, nil, nil, nil, Config{}, keystest.Addresses{}, nil)
txm.Trigger(address)
tests.AssertLogEventually(t, observedLogs, "Txm unstarted")
})
Expand All @@ -87,7 +87,7 @@ func TestTrigger(t *testing.T) {
ab := newMockAttemptBuilder(t)
config := Config{BlockTime: 1 * time.Minute, RetryBlockThreshold: 10}
keystore := keystest.Addresses{address}
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
var nonce uint64
// Start
client.On("PendingNonceAt", mock.Anything, address).Return(nonce, nil).Maybe()
Expand All @@ -109,7 +109,7 @@ func TestBroadcastTransaction(t *testing.T) {
t.Run("fails if FetchUnconfirmedTransactionAtNonceWithCount for unconfirmed transactions fails", func(t *testing.T) {
mTxStore := newMockTxStore(t)
mTxStore.On("FetchUnconfirmedTransactionAtNonceWithCount", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, errors.New("call failed")).Once()
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore)
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore, nil)
bo, err := txm.broadcastTransaction(ctx, address)
require.Error(t, err)
assert.False(t, bo)
Expand All @@ -120,7 +120,7 @@ func TestBroadcastTransaction(t *testing.T) {
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
mTxStore := newMockTxStore(t)
mTxStore.On("FetchUnconfirmedTransactionAtNonceWithCount", mock.Anything, mock.Anything, mock.Anything).Return(nil, maxInFlightTransactions+1, nil).Once()
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore)
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore, nil)
bo, err := txm.broadcastTransaction(ctx, address)
assert.True(t, bo)
require.NoError(t, err)
Expand All @@ -130,7 +130,7 @@ func TestBroadcastTransaction(t *testing.T) {
t.Run("checks pending nonce if unconfirmed transactions are equal or more than maxInFlightSubset", func(t *testing.T) {
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
mTxStore := newMockTxStore(t)
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore)
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore, nil)
txm.setNonce(address, 1)
mTxStore.On("FetchUnconfirmedTransactionAtNonceWithCount", mock.Anything, mock.Anything, mock.Anything).Return(nil, maxInFlightSubset, nil).Twice()

Expand All @@ -150,7 +150,7 @@ func TestBroadcastTransaction(t *testing.T) {
t.Run("fails if UpdateUnstartedTransactionWithNonce fails", func(t *testing.T) {
mTxStore := newMockTxStore(t)
mTxStore.On("FetchUnconfirmedTransactionAtNonceWithCount", mock.Anything, mock.Anything, mock.Anything).Return(nil, 0, nil).Once()
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore)
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, mTxStore, nil, config, keystore, nil)
mTxStore.On("UpdateUnstartedTransactionWithNonce", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("call failed")).Once()
bo, err := txm.broadcastTransaction(ctx, address)
assert.False(t, bo)
Expand All @@ -162,7 +162,7 @@ func TestBroadcastTransaction(t *testing.T) {
lggr := logger.Test(t)
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
require.NoError(t, txStore.Add(address))
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
bo, err := txm.broadcastTransaction(ctx, address)
require.NoError(t, err)
assert.False(t, bo)
Expand All @@ -173,7 +173,7 @@ func TestBroadcastTransaction(t *testing.T) {
lggr := logger.Test(t)
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
require.NoError(t, txStore.Add(address))
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
txm.setNonce(address, 8)
metrics, err := NewTxmMetrics(testutils.FixtureChainID)
require.NoError(t, err)
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestBackfillTransactions(t *testing.T) {
keystore := keystest.Addresses{}

t.Run("fails if latest nonce fetching fails", func(t *testing.T) {
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
client.On("NonceAt", mock.Anything, address, mock.Anything).Return(uint64(0), errors.New("latest nonce fail")).Once()
bo, err := txm.backfillTransactions(t.Context(), address)
require.Error(t, err)
Expand All @@ -231,7 +231,7 @@ func TestBackfillTransactions(t *testing.T) {
})

t.Run("fails if MarkConfirmedAndReorgedTransactions fails", func(t *testing.T) {
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
client.On("NonceAt", mock.Anything, address, mock.Anything).Return(uint64(0), nil).Once()
txStore.On("MarkConfirmedAndReorgedTransactions", mock.Anything, mock.Anything, address).
Return([]*types.Transaction{}, []uint64{}, errors.New("marking transactions confirmed failed")).Once()
Expand All @@ -247,7 +247,7 @@ func TestBackfillTransactions(t *testing.T) {
require.NoError(t, txStore.Add(address))
ab := newMockAttemptBuilder(t)
c := Config{EIP1559: false, BlockTime: 10 * time.Minute, RetryBlockThreshold: 10, EmptyTxLimitDefault: 22000}
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, c, keystore)
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, c, keystore, nil)
emptyMetrics, err := NewTxmMetrics(testutils.FixtureChainID)
require.NoError(t, err)
txm.metrics = emptyMetrics
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestBackfillTransactions(t *testing.T) {
require.NoError(t, txStore.Add(address))
ab := newMockAttemptBuilder(t)
c := Config{EIP1559: false, BlockTime: 1 * time.Second, RetryBlockThreshold: 1, EmptyTxLimitDefault: 22000}
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, c, keystore)
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, c, keystore, nil)
emptyMetrics, err := NewTxmMetrics(testutils.FixtureChainID)
require.NoError(t, err)
txm.metrics = emptyMetrics
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestBackfillTransactions(t *testing.T) {
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
txStore := storage.NewInMemoryStoreManager(lggr, testutils.FixtureChainID)
require.NoError(t, txStore.Add(address))
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore)
txm := NewTxm(lggr, testutils.FixtureChainID, client, ab, txStore, nil, config, keystore, nil)
var nonce uint64 = 8
txm.setNonce(address, nonce)
metrics, err := NewTxmMetrics(testutils.FixtureChainID)
Expand Down
5 changes: 3 additions & 2 deletions pkg/txmgr/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,18 @@ func NewTxmV2(
RetryBlockThreshold: uint16(fCfg.BumpThreshold()),
EmptyTxLimitDefault: fCfg.LimitDefault(),
}
var eh txm.ErrorHandler
var c txm.Client
if txmV2Config.DualBroadcast() != nil && *txmV2Config.DualBroadcast() && txmV2Config.CustomURL() != nil {
var err error
c, err = dualbroadcast.SelectClient(lggr, client, keyStore, txmV2Config.CustomURL(), chainID)
c, eh, err = dualbroadcast.SelectClient(lggr, client, keyStore, txmV2Config.CustomURL(), chainID)
if err != nil {
return nil, fmt.Errorf("failed to create dual broadcast client: %w", err)
}
} else {
c = clientwrappers.NewChainClient(client)
}
t := txm.NewTxm(lggr, chainID, c, attemptBuilder, inMemoryStoreManager, stuckTxDetector, config, keyStore)
t := txm.NewTxm(lggr, chainID, c, attemptBuilder, inMemoryStoreManager, stuckTxDetector, config, keyStore, eh)
return txm.NewTxmOrchestrator(lggr, chainID, t, inMemoryStoreManager, fwdMgr, keyStore, attemptBuilder), nil
}

Expand Down
Loading