diff --git a/cosmos/runtime/ante/ante.go b/cosmos/runtime/ante/ante.go index 05c4de043..252df979a 100644 --- a/cosmos/runtime/ante/ante.go +++ b/cosmos/runtime/ante/ante.go @@ -66,12 +66,15 @@ func (ah *Provider) AnteHandler() func( return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) { // If the transaction contains a single EVM transaction, use the EVM ante handler if len(tx.GetMsgs()) == 1 { - if _, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok { + if ethTx, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok { + ctx.Logger().Info("running evm ante handler for eth tx", "hash", ethTx.Unwrap().Hash()) return ah.evmAnteHandler(ctx, tx, simulate) } else if _, ok = tx.GetMsgs()[0].(*evmtypes.WrappedPayloadEnvelope); ok { if ctx.ExecMode() != sdk.ExecModeCheck { + ctx.Logger().Info("running evm ante handler for payload tx") return ctx, nil } + ctx.Logger().Error("running evm ante handler for payload tx in check tx") return ctx, errors.New("payload envelope is not supported in CheckTx") } } diff --git a/cosmos/runtime/runtime.go b/cosmos/runtime/runtime.go index 880ccaf84..b28c83976 100644 --- a/cosmos/runtime/runtime.go +++ b/cosmos/runtime/runtime.go @@ -177,8 +177,11 @@ func (p *Polaris) SetupServices(clientCtx client.Context) error { clientCtx.TxConfig, evmtypes.WrapPayload)) // Initialize the txpool with a new transaction serializer. - p.WrappedTxPool.Init(p.logger, clientCtx, libtx.NewSerializer[*ethtypes.Transaction]( - clientCtx.TxConfig, evmtypes.WrapTx)) + p.WrappedTxPool.Init( + p.logger, clientCtx, + libtx.NewSerializer[*ethtypes.Transaction](clientCtx.TxConfig, evmtypes.WrapTx), + clientCtx.Client, + ) // Register services with Polaris. p.RegisterLifecycles([]node.Lifecycle{ diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index 18ea5905c..306116dce 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -44,19 +44,25 @@ func (m *Mempool) AnteHandle( telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs) msgs := tx.GetMsgs() + ctx.Logger().Info("AnteHandle Polaris Mempool", "msgs", len(msgs), "simulate", simulate) + // TODO: Record the time it takes to build a payload. // We only want to eject transactions from comet on recheck. if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck { + ctx.Logger().Info("AnteHandle in Check/Recheck tx") if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok { ethTx := wet.Unwrap() + ctx.Logger().Info("AnteHandle for eth tx", "tx", ethTx.Hash(), "mode", ctx.ExecMode()) if shouldEject := m.shouldEjectFromCometMempool( - ctx.BlockTime().Unix(), ethTx, + ctx, ethTx, ); shouldEject { + ctx.Logger().Info("AnteHandle dropping tx from comet mempool", "tx", ethTx.Hash()) m.crc.DropRemoteTx(ethTx.Hash()) telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs) return ctx, errors.New("eject from comet mempool") } + ctx.Logger().Info("AnteHandle NOT dropping comet mempool", "tx", ethTx.Hash()) } } return next(ctx, tx, simulate) @@ -64,28 +70,34 @@ func (m *Mempool) AnteHandle( // shouldEject returns true if the transaction should be ejected from the CometBFT mempool. func (m *Mempool) shouldEjectFromCometMempool( - currentTime int64, tx *ethtypes.Transaction, + ctx sdk.Context, tx *ethtypes.Transaction, ) bool { defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject) if tx == nil { + ctx.Logger().Info("shouldEjectFromCometMempool: tx is nil") return false } // First check things that are stateless. - if m.validateStateless(tx, currentTime) { + if m.validateStateless(ctx, tx) { + ctx.Logger().Info("shouldEjectFromCometMempool: stateless failed", "tx", tx.Hash()) return true } // Then check for things that are stateful. - return m.validateStateful(tx) + return m.validateStateful(ctx, tx) } // validateStateless returns whether the tx of the given hash is stateless. -func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool { +func (m *Mempool) validateStateless(ctx sdk.Context, tx *ethtypes.Transaction) bool { txHash := tx.Hash() + currentTime := ctx.BlockTime().Unix() + ctx.Logger().Info("validateStateless", "txHash", txHash, "currentTime", currentTime) + // 1. If the transaction has been in the mempool for longer than the configured timeout. // 2. If the transaction's gas params are less than or equal to the configured limit. expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime + ctx.Logger().Info("validateStateless", "currentTime", currentTime, "timeFirstSeen", m.crc.TimeFirstSeen(txHash), "expired", expired) priceLeLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0 if expired { @@ -95,20 +107,22 @@ func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit) } + ctx.Logger().Info("validateStateless", "expired", expired, "priceLeLimit", priceLeLimit) + return expired || priceLeLimit } // includedCanonicalChain returns whether the tx of the given hash is included in the canonical // Eth chain. -func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool { +func (m *Mempool) validateStateful(ctx sdk.Context, tx *ethtypes.Transaction) bool { // // 1. If the transaction has been included in a block. // signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID) // if _, err := ethtypes.Sender(signer, tx); err != nil { // return true // } - // tx.Nonce() < included := m.chain.GetTransactionLookup(tx.Hash()) != nil telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion) + ctx.Logger().Info("validateStateful", "included", included) return included } diff --git a/cosmos/runtime/txpool/comet.go b/cosmos/runtime/txpool/comet.go index c40809db9..9589f1eb5 100644 --- a/cosmos/runtime/txpool/comet.go +++ b/cosmos/runtime/txpool/comet.go @@ -59,9 +59,10 @@ func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool { return ok } -// Record the time the tx was inserted from Comet successfully. +// Record the first time the tx was inserted from Comet successfully. func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) { crc.timeInsertedMu.Lock() + // TODO: only insert a new timestamp if not already seen. crc.timeInserted[txHash] = time.Now().Unix() crc.timeInsertedMu.Unlock() } diff --git a/cosmos/runtime/txpool/handler.go b/cosmos/runtime/txpool/handler.go index 069003b68..a8b1cb299 100644 --- a/cosmos/runtime/txpool/handler.go +++ b/cosmos/runtime/txpool/handler.go @@ -21,10 +21,13 @@ package txpool import ( + "context" "errors" "sync/atomic" "time" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + "cosmossdk.io/log" "github.com/cosmos/cosmos-sdk/telemetry" @@ -40,7 +43,7 @@ import ( // size of tx pool. const ( txChanSize = 4096 - maxRetries = 5 + maxRetries = 0 retryDelay = 50 * time.Millisecond statPeriod = 60 * time.Second ) @@ -67,6 +70,16 @@ type TxBroadcaster interface { BroadcastTxSync(txBytes []byte) (res *sdk.TxResponse, err error) } +type TxSearcher interface { + TxSearch( + ctx context.Context, + query string, + prove bool, + page, perPage *int, + orderBy string, + ) (*coretypes.ResultTxSearch, error) +} + // Subscription represents a subscription to the txpool. type Subscription interface { event.Subscription @@ -85,6 +98,7 @@ type handler struct { logger log.Logger clientCtx TxBroadcaster serializer TxSerializer + searcher TxSearcher crc CometRemoteCache // Ethereum @@ -100,13 +114,14 @@ type handler struct { // newHandler creates a new handler. func newHandler( - clientCtx TxBroadcaster, txPool TxSubProvider, serializer TxSerializer, + clientCtx TxBroadcaster, txSearcher TxSearcher, txPool TxSubProvider, serializer TxSerializer, crc CometRemoteCache, logger log.Logger, ) *handler { h := &handler{ logger: logger, clientCtx: clientCtx, serializer: serializer, + searcher: txSearcher, crc: crc, txPool: txPool, txsCh: make(chan core.NewTxsEvent, txChanSize), @@ -122,7 +137,7 @@ func (h *handler) Start() error { return errors.New("handler already started") } go h.mainLoop() - go h.failedLoop() // Start the retry policy + // go h.failedLoop() // Start the retry policy go h.statLoop() return nil } @@ -172,6 +187,7 @@ func (h *handler) failedLoop() { h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries) continue } + h.logger.Info("retrying failed tx", "tx", failed.tx.Hash(), "retries", failed.retries) telemetry.IncrCounter(float32(1), MetricKeyBroadcastRetry) h.broadcastTransaction(failed.tx, failed.retries-1) } @@ -225,11 +241,12 @@ func (h *handler) broadcastTransactions(txs ethtypes.Transactions) { numBroadcasted := 0 for _, signedEthTx := range txs { if !h.crc.IsRemoteTx(signedEthTx.Hash()) { + h.logger.Info("broadcasting local eth tx", "hash", signedEthTx.Hash().Hex()) h.broadcastTransaction(signedEthTx, maxRetries) numBroadcasted++ } } - h.logger.Debug( + h.logger.Info( "broadcasting transactions", "num_received", len(txs), "num_broadcasted", numBroadcasted, ) } @@ -242,6 +259,8 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) { return } + h.logger.Info("broadcasting to comet", "ethTx", tx.Hash(), "sdkTx", txBytes) + // Send the transaction to the CometBFT mempool, which will gossip it to peers via // CometBFT's p2p layer. rsp, err := h.clientCtx.BroadcastTxSync(txBytes) @@ -254,6 +273,7 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) { // If rsp == 1, likely the txn is already in a block, and thus the broadcast failing is actually // the desired behaviour. if rsp == nil || rsp.Code == 0 || rsp.Code == 1 { + h.logger.Info("broadcasting to comet", "hash", tx.Hash(), "rsp", rsp, "code", rsp.Code) return } @@ -261,8 +281,7 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) { case sdkerrors.ErrMempoolIsFull.ABCICode(): h.logger.Error("failed to broadcast: comet-bft mempool is full", "tx_hash", tx.Hash()) telemetry.IncrCounter(float32(1), MetricKeyMempoolFull) - case - sdkerrors.ErrTxInMempoolCache.ABCICode(): + case sdkerrors.ErrTxInMempoolCache.ABCICode(): return default: h.logger.Error("failed to broadcast transaction", @@ -270,5 +289,7 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) { telemetry.IncrCounter(float32(1), MetricKeyBroadcastFailure) } + h.logger.Info("failed to broadcast transaction", "tx_hash", tx.Hash(), "retries", retries) + h.failedTxs <- &failedTx{tx: tx, retries: retries} } diff --git a/cosmos/runtime/txpool/handler_test.go b/cosmos/runtime/txpool/handler_test.go index a99202a26..460ff5299 100644 --- a/cosmos/runtime/txpool/handler_test.go +++ b/cosmos/runtime/txpool/handler_test.go @@ -60,7 +60,7 @@ var _ = Describe("", func() { subprovider = mocks.NewTxSubProvider(t) subprovider.On("SubscribeTransactions", mock.Anything, mock.Anything).Return(subscription) serializer = mocks.NewTxSerializer(t) - h = newHandler(broadcaster, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t)) + h = newHandler(broadcaster, nil, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t)) err := h.Start() Expect(err).NotTo(HaveOccurred()) for !h.Running() { diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index ecf582eb1..63f08321a 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -25,6 +25,7 @@ import ( "errors" "math/big" "sync" + "time" "cosmossdk.io/log" @@ -92,8 +93,9 @@ func (m *Mempool) Init( logger log.Logger, txBroadcaster TxBroadcaster, txSerializer TxSerializer, + txSearcher TxSearcher, ) { - m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, logger) + m.handler = newHandler(txBroadcaster, txSearcher, m.TxPool, txSerializer, m.crc, logger) } // Start starts the Mempool TxHandler. @@ -111,12 +113,14 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { sCtx := sdk.UnwrapSDKContext(ctx) msgs := sdkTx.GetMsgs() if len(msgs) != 1 { + sCtx.Logger().Error("mempool insert: only one message is supported") return errors.New("only one message is supported") } wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]) if !ok { // We have to return nil for non-ethereum transactions as to not fail check-tx. + sCtx.Logger().Info("mempool insert: not an ethereum transaction") return nil } @@ -130,14 +134,20 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { if len(errs) > 0 { // Handle case where a node broadcasts to itself, we don't want it to fail CheckTx. if errors.Is(errs[0], ethtxpool.ErrAlreadyKnown) && + // TODO: checking for CheckTx/ReCheck here is not necessary (only ever called in CheckTx) (sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) { telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs) + sCtx.Logger().Info("mempool insert: tx already in mempool", "mode", sCtx.ExecMode()) return nil } return errs[0] } // Add the eth tx to the remote cache. + sCtx.Logger().Info( + "mempool insert: marking remote seen", "tx", ethTx.Hash(), "time", time.Now().Unix(), + "is(already)RemoteTx", m.crc.IsRemoteTx(ethTx.Hash()), + ) m.crc.MarkRemoteSeen(ethTx.Hash()) return nil