From 73754bb5e93a95b1e203b82048387bf63f80f367 Mon Sep 17 00:00:00 2001 From: Cal Bera Date: Tue, 30 Jan 2024 15:11:57 -0500 Subject: [PATCH 1/7] logs --- cosmos/runtime/txpool/ante.go | 27 ++++++++++++++++++++------- cosmos/runtime/txpool/comet.go | 3 ++- cosmos/runtime/txpool/handler.go | 10 +++++++--- cosmos/runtime/txpool/mempool.go | 9 +++++++++ 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index 18ea5905c..856da887c 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -44,19 +44,25 @@ func (m *Mempool) AnteHandle( telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs) msgs := tx.GetMsgs() + ctx.Logger().Info("AnteHandle Polaris Mempool", "msgs", len(msgs), "simulate", simulate) + // TODO: Record the time it takes to build a payload. // We only want to eject transactions from comet on recheck. if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck { + ctx.Logger().Info("AnteHandle in Check/Recheck tx") if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok { ethTx := wet.Unwrap() + ctx.Logger().Info("AnteHandle for eth tx", "tx", ethTx.Hash()) if shouldEject := m.shouldEjectFromCometMempool( - ctx.BlockTime().Unix(), ethTx, + ctx, ethTx, ); shouldEject { + ctx.Logger().Info("AnteHandle dropping tx from comet mempool", "tx", ethTx.Hash()) m.crc.DropRemoteTx(ethTx.Hash()) telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs) return ctx, errors.New("eject from comet mempool") } + ctx.Logger().Info("AnteHandle NOT dropping comet mempool", "tx", ethTx.Hash()) } } return next(ctx, tx, simulate) @@ -64,25 +70,30 @@ func (m *Mempool) AnteHandle( // shouldEject returns true if the transaction should be ejected from the CometBFT mempool. func (m *Mempool) shouldEjectFromCometMempool( - currentTime int64, tx *ethtypes.Transaction, + ctx sdk.Context, tx *ethtypes.Transaction, ) bool { defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject) if tx == nil { + ctx.Logger().Info("shouldEjectFromCometMempool: tx is nil") return false } // First check things that are stateless. - if m.validateStateless(tx, currentTime) { + if m.validateStateless(ctx, tx) { + ctx.Logger().Info("shouldEjectFromCometMempool: stateless failed", "tx", tx.Hash()) return true } // Then check for things that are stateful. - return m.validateStateful(tx) + return m.validateStateful(ctx, tx) } // validateStateless returns whether the tx of the given hash is stateless. -func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool { +func (m *Mempool) validateStateless(ctx sdk.Context, tx *ethtypes.Transaction) bool { txHash := tx.Hash() + currentTime := ctx.BlockTime().Unix() + ctx.Logger().Info("validateStateless", "txHash", txHash, "currentTime", currentTime) + // 1. If the transaction has been in the mempool for longer than the configured timeout. // 2. If the transaction's gas params are less than or equal to the configured limit. expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime @@ -95,20 +106,22 @@ func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit) } + ctx.Logger().Info("validateStateless", "expired", expired, "priceLeLimit", priceLeLimit) + return expired || priceLeLimit } // includedCanonicalChain returns whether the tx of the given hash is included in the canonical // Eth chain. -func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool { +func (m *Mempool) validateStateful(ctx sdk.Context, tx *ethtypes.Transaction) bool { // // 1. If the transaction has been included in a block. // signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID) // if _, err := ethtypes.Sender(signer, tx); err != nil { // return true // } - // tx.Nonce() < included := m.chain.GetTransactionLookup(tx.Hash()) != nil telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion) + ctx.Logger().Info("validateStateful", "included", included) return included } diff --git a/cosmos/runtime/txpool/comet.go b/cosmos/runtime/txpool/comet.go index c40809db9..9589f1eb5 100644 --- a/cosmos/runtime/txpool/comet.go +++ b/cosmos/runtime/txpool/comet.go @@ -59,9 +59,10 @@ func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool { return ok } -// Record the time the tx was inserted from Comet successfully. +// Record the first time the tx was inserted from Comet successfully. func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) { crc.timeInsertedMu.Lock() + // TODO: only insert a new timestamp if not already seen. crc.timeInserted[txHash] = time.Now().Unix() crc.timeInsertedMu.Unlock() } diff --git a/cosmos/runtime/txpool/handler.go b/cosmos/runtime/txpool/handler.go index 069003b68..2842dc0d8 100644 --- a/cosmos/runtime/txpool/handler.go +++ b/cosmos/runtime/txpool/handler.go @@ -172,6 +172,7 @@ func (h *handler) failedLoop() { h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries) continue } + h.logger.Info("retrying failed tx", "tx", failed.tx.Hash(), "retries", failed.retries) telemetry.IncrCounter(float32(1), MetricKeyBroadcastRetry) h.broadcastTransaction(failed.tx, failed.retries-1) } @@ -225,11 +226,12 @@ func (h *handler) broadcastTransactions(txs ethtypes.Transactions) { numBroadcasted := 0 for _, signedEthTx := range txs { if !h.crc.IsRemoteTx(signedEthTx.Hash()) { + h.logger.Info("broadcasting local eth tx", "hash", signedEthTx.Hash().Hex()) h.broadcastTransaction(signedEthTx, maxRetries) numBroadcasted++ } } - h.logger.Debug( + h.logger.Info( "broadcasting transactions", "num_received", len(txs), "num_broadcasted", numBroadcasted, ) } @@ -254,6 +256,7 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) { // If rsp == 1, likely the txn is already in a block, and thus the broadcast failing is actually // the desired behaviour. if rsp == nil || rsp.Code == 0 || rsp.Code == 1 { + h.logger.Info("broadcasting to comet", "hash", tx.Hash(), "rsp", rsp, "code", rsp.Code) return } @@ -261,8 +264,7 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) { case sdkerrors.ErrMempoolIsFull.ABCICode(): h.logger.Error("failed to broadcast: comet-bft mempool is full", "tx_hash", tx.Hash()) telemetry.IncrCounter(float32(1), MetricKeyMempoolFull) - case - sdkerrors.ErrTxInMempoolCache.ABCICode(): + case sdkerrors.ErrTxInMempoolCache.ABCICode(): return default: h.logger.Error("failed to broadcast transaction", @@ -270,5 +272,7 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) { telemetry.IncrCounter(float32(1), MetricKeyBroadcastFailure) } + h.logger.Info("failed to broadcast transaction", "tx_hash", tx.Hash(), "retries", retries) + h.failedTxs <- &failedTx{tx: tx, retries: retries} } diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index ecf582eb1..3aca3868d 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -25,6 +25,7 @@ import ( "errors" "math/big" "sync" + "time" "cosmossdk.io/log" @@ -111,12 +112,14 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { sCtx := sdk.UnwrapSDKContext(ctx) msgs := sdkTx.GetMsgs() if len(msgs) != 1 { + sCtx.Logger().Error("mempool insert: only one message is supported") return errors.New("only one message is supported") } wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]) if !ok { // We have to return nil for non-ethereum transactions as to not fail check-tx. + sCtx.Logger().Info("mempool insert: not an ethereum transaction") return nil } @@ -130,14 +133,20 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { if len(errs) > 0 { // Handle case where a node broadcasts to itself, we don't want it to fail CheckTx. if errors.Is(errs[0], ethtxpool.ErrAlreadyKnown) && + // TODO: checking for CheckTx/ReCheck here is not necessary (only ever called in CheckTx) (sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) { telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs) + sCtx.Logger().Info("mempool insert: tx already in mempool", "mode", sCtx.ExecMode()) return nil } return errs[0] } // Add the eth tx to the remote cache. + sCtx.Logger().Info( + "mempool insert: marking remote seen", "tx", ethTx.Hash(), "time", time.Now().Unix(), + "is(already)RemoteTx", m.crc.IsRemoteTx(ethTx.Hash()), + ) m.crc.MarkRemoteSeen(ethTx.Hash()) return nil From 7965850b5749923868ad9061f2fd5e75c05a4fb8 Mon Sep 17 00:00:00 2001 From: Cal Bera Date: Tue, 30 Jan 2024 16:44:15 -0500 Subject: [PATCH 2/7] more logs --- cosmos/runtime/ante/ante.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cosmos/runtime/ante/ante.go b/cosmos/runtime/ante/ante.go index 05c4de043..252df979a 100644 --- a/cosmos/runtime/ante/ante.go +++ b/cosmos/runtime/ante/ante.go @@ -66,12 +66,15 @@ func (ah *Provider) AnteHandler() func( return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) { // If the transaction contains a single EVM transaction, use the EVM ante handler if len(tx.GetMsgs()) == 1 { - if _, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok { + if ethTx, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok { + ctx.Logger().Info("running evm ante handler for eth tx", "hash", ethTx.Unwrap().Hash()) return ah.evmAnteHandler(ctx, tx, simulate) } else if _, ok = tx.GetMsgs()[0].(*evmtypes.WrappedPayloadEnvelope); ok { if ctx.ExecMode() != sdk.ExecModeCheck { + ctx.Logger().Info("running evm ante handler for payload tx") return ctx, nil } + ctx.Logger().Error("running evm ante handler for payload tx in check tx") return ctx, errors.New("payload envelope is not supported in CheckTx") } } From 6a8aa928144ad34d99906eac8e72e278b5b77cc1 Mon Sep 17 00:00:00 2001 From: Cal Bera Date: Tue, 30 Jan 2024 18:07:37 -0500 Subject: [PATCH 3/7] broadcast sdk tx logging --- cosmos/runtime/txpool/handler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cosmos/runtime/txpool/handler.go b/cosmos/runtime/txpool/handler.go index 2842dc0d8..03327544e 100644 --- a/cosmos/runtime/txpool/handler.go +++ b/cosmos/runtime/txpool/handler.go @@ -244,6 +244,8 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) { return } + h.logger.Info("broadcasting to comet", "ethTx", tx.Hash(), "sdkTx", txBytes) + // Send the transaction to the CometBFT mempool, which will gossip it to peers via // CometBFT's p2p layer. rsp, err := h.clientCtx.BroadcastTxSync(txBytes) From 3c50b5c6a3e02123c0c22ec0bd7173e6fe1b2a85 Mon Sep 17 00:00:00 2001 From: Cal Bera Date: Tue, 30 Jan 2024 19:37:54 -0500 Subject: [PATCH 4/7] rid of retry broadcast in handler --- cosmos/runtime/runtime.go | 7 +++++-- cosmos/runtime/txpool/handler.go | 21 ++++++++++++++++++--- cosmos/runtime/txpool/mempool.go | 3 ++- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/cosmos/runtime/runtime.go b/cosmos/runtime/runtime.go index 880ccaf84..b28c83976 100644 --- a/cosmos/runtime/runtime.go +++ b/cosmos/runtime/runtime.go @@ -177,8 +177,11 @@ func (p *Polaris) SetupServices(clientCtx client.Context) error { clientCtx.TxConfig, evmtypes.WrapPayload)) // Initialize the txpool with a new transaction serializer. - p.WrappedTxPool.Init(p.logger, clientCtx, libtx.NewSerializer[*ethtypes.Transaction]( - clientCtx.TxConfig, evmtypes.WrapTx)) + p.WrappedTxPool.Init( + p.logger, clientCtx, + libtx.NewSerializer[*ethtypes.Transaction](clientCtx.TxConfig, evmtypes.WrapTx), + clientCtx.Client, + ) // Register services with Polaris. p.RegisterLifecycles([]node.Lifecycle{ diff --git a/cosmos/runtime/txpool/handler.go b/cosmos/runtime/txpool/handler.go index 03327544e..a8b1cb299 100644 --- a/cosmos/runtime/txpool/handler.go +++ b/cosmos/runtime/txpool/handler.go @@ -21,10 +21,13 @@ package txpool import ( + "context" "errors" "sync/atomic" "time" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + "cosmossdk.io/log" "github.com/cosmos/cosmos-sdk/telemetry" @@ -40,7 +43,7 @@ import ( // size of tx pool. const ( txChanSize = 4096 - maxRetries = 5 + maxRetries = 0 retryDelay = 50 * time.Millisecond statPeriod = 60 * time.Second ) @@ -67,6 +70,16 @@ type TxBroadcaster interface { BroadcastTxSync(txBytes []byte) (res *sdk.TxResponse, err error) } +type TxSearcher interface { + TxSearch( + ctx context.Context, + query string, + prove bool, + page, perPage *int, + orderBy string, + ) (*coretypes.ResultTxSearch, error) +} + // Subscription represents a subscription to the txpool. type Subscription interface { event.Subscription @@ -85,6 +98,7 @@ type handler struct { logger log.Logger clientCtx TxBroadcaster serializer TxSerializer + searcher TxSearcher crc CometRemoteCache // Ethereum @@ -100,13 +114,14 @@ type handler struct { // newHandler creates a new handler. func newHandler( - clientCtx TxBroadcaster, txPool TxSubProvider, serializer TxSerializer, + clientCtx TxBroadcaster, txSearcher TxSearcher, txPool TxSubProvider, serializer TxSerializer, crc CometRemoteCache, logger log.Logger, ) *handler { h := &handler{ logger: logger, clientCtx: clientCtx, serializer: serializer, + searcher: txSearcher, crc: crc, txPool: txPool, txsCh: make(chan core.NewTxsEvent, txChanSize), @@ -122,7 +137,7 @@ func (h *handler) Start() error { return errors.New("handler already started") } go h.mainLoop() - go h.failedLoop() // Start the retry policy + // go h.failedLoop() // Start the retry policy go h.statLoop() return nil } diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index 3aca3868d..63f08321a 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -93,8 +93,9 @@ func (m *Mempool) Init( logger log.Logger, txBroadcaster TxBroadcaster, txSerializer TxSerializer, + txSearcher TxSearcher, ) { - m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, logger) + m.handler = newHandler(txBroadcaster, txSearcher, m.TxPool, txSerializer, m.crc, logger) } // Start starts the Mempool TxHandler. From f963e56eba7a54f93ed4dd9b8e012078747994e4 Mon Sep 17 00:00:00 2001 From: Cal Bera Date: Tue, 30 Jan 2024 19:40:06 -0500 Subject: [PATCH 5/7] fix test --- cosmos/runtime/txpool/handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/runtime/txpool/handler_test.go b/cosmos/runtime/txpool/handler_test.go index a99202a26..460ff5299 100644 --- a/cosmos/runtime/txpool/handler_test.go +++ b/cosmos/runtime/txpool/handler_test.go @@ -60,7 +60,7 @@ var _ = Describe("", func() { subprovider = mocks.NewTxSubProvider(t) subprovider.On("SubscribeTransactions", mock.Anything, mock.Anything).Return(subscription) serializer = mocks.NewTxSerializer(t) - h = newHandler(broadcaster, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t)) + h = newHandler(broadcaster, nil, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t)) err := h.Start() Expect(err).NotTo(HaveOccurred()) for !h.Running() { From 06f3e3e493e0eb87028d016dba0842177f52d614 Mon Sep 17 00:00:00 2001 From: cyka Date: Tue, 30 Jan 2024 21:02:26 -0500 Subject: [PATCH 6/7] adding tx mode log --- cosmos/runtime/txpool/ante.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index 856da887c..070c7a6a5 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -53,7 +53,7 @@ func (m *Mempool) AnteHandle( ctx.Logger().Info("AnteHandle in Check/Recheck tx") if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok { ethTx := wet.Unwrap() - ctx.Logger().Info("AnteHandle for eth tx", "tx", ethTx.Hash()) + ctx.Logger().Info("AnteHandle for eth tx", "tx", ethTx.Hash(), "mode", ctx.ExecMode()) if shouldEject := m.shouldEjectFromCometMempool( ctx, ethTx, ); shouldEject { From 3877af2312d18504c5afcbce432978d3a3337241 Mon Sep 17 00:00:00 2001 From: cyka Date: Tue, 30 Jan 2024 23:10:40 -0500 Subject: [PATCH 7/7] adding time first seen check on tx hash --- cosmos/runtime/txpool/ante.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index 070c7a6a5..306116dce 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -97,6 +97,7 @@ func (m *Mempool) validateStateless(ctx sdk.Context, tx *ethtypes.Transaction) b // 1. If the transaction has been in the mempool for longer than the configured timeout. // 2. If the transaction's gas params are less than or equal to the configured limit. expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime + ctx.Logger().Info("validateStateless", "currentTime", currentTime, "timeFirstSeen", m.crc.TimeFirstSeen(txHash), "expired", expired) priceLeLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0 if expired {