Skip to content

fix: send aggregated response bump #1440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3e5f215
chore: tracing to send aggregated response
MarcosNicolau Nov 19, 2024
c30b3f6
refactor: big int mul and logs
uri-99 Nov 19, 2024
f1dee66
chore: logs on RespondToTask fail
uri-99 Nov 19, 2024
545c003
fix: nil dereference on aggregator retries
Oppen Nov 19, 2024
6198072
Merge branch 'staging' into fix/tx_overwrite
Oppen Nov 19, 2024
dd2e35d
Merge branch 'fix/tx_overwrite' into fix-send-aggregated-response-bump
uri-99 Nov 19, 2024
028466d
fix: defer recover in checkAggAndBatcherHaveEnoughBalance
uri-99 Nov 19, 2024
65c5ef6
chore: print err
uri-99 Nov 19, 2024
187f95e
chore: print err
uri-99 Nov 19, 2024
fe5a70d
fix: not dereference nil
uri-99 Nov 19, 2024
6840ae0
fix: quick fix for aggregator bump fee
MarcosNicolau Nov 19, 2024
2607c64
fix: add wait for receipt timeout
MarcosNicolau Nov 20, 2024
fbdb7ee
feat: save sent txs and check their receipts before sending a new one
MarcosNicolau Nov 20, 2024
92ff1a1
fix: re-add receipt for traces
MarcosNicolau Nov 20, 2024
02e9c82
fix: merkle root logging
MarcosNicolau Nov 20, 2024
56ca050
refactor: wait timeout of 500ms
MarcosNicolau Nov 20, 2024
e432d4e
chore: better comments on yaml
MarcosNicolau Nov 20, 2024
622277f
fix: receipt check in sending agg response
MarcosNicolau Nov 20, 2024
3880afb
feat: better defer recovers
uri-99 Nov 20, 2024
e34482f
Merge staging
MauroToscano Nov 20, 2024
f5d6516
Add missing whitespace to aggregator yaml
MauroToscano Nov 20, 2024
9999efc
Update comment in aggregator yaml
MauroToscano Nov 20, 2024
b55adb7
Typo
MauroToscano Nov 20, 2024
61f4a12
refactor: function docs and address reviews
MarcosNicolau Nov 20, 2024
c75920d
chore: docs
MarcosNicolau Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ func (agg *Aggregator) Start(ctx context.Context) error {
const MaxSentTxRetries = 5

func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
defer func() {
err := recover() //stops panics
if err != nil {
agg.logger.Error("handleBlsAggServiceResponse recovered from panic", "err", err)
}
}()

agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching task data")
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
Expand Down Expand Up @@ -277,10 +284,15 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
}

agg.logger.Info("Sending aggregated response onchain", "taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]), "merkleRoot", "0x"+hex.EncodeToString(batchData.BatchMerkleRoot[:]))
receipt, err := agg.sendAggregatedResponse(batchIdentifierHash, batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
if err == nil {
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, receipt.TxHash.String())
// In some cases, we may fail to retrieve the receipt for the transaction.
txHash := "Unknown"
if receipt != nil {
txHash = receipt.TxHash.String()
}
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash)
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
Expand Down Expand Up @@ -428,7 +440,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
defer func() {
err := recover() //stops panics
if err != nil {
agg.logger.Error("Recovered from panic", "err", err)
agg.logger.Error("ClearTasksFromMaps Recovered from panic", "err", err)
}
}()

Expand Down
7 changes: 4 additions & 3 deletions config-files/config-aggregator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ aggregator:
garbage_collector_tasks_age: 20 #The age of tasks that will be removed by the GC, in blocks. Suggested value for prod: '216000' (30 days)
garbage_collector_tasks_interval: 10 #The interval of queried blocks to get an old batch. Suggested value for prod: '900' (3 hours)
bls_service_task_timeout: 168h # The timeout of bls aggregation service tasks. Suggested value for prod '168h' (7 days)
gas_base_bump_percentage: 10 # How much to bump gas price when responding to task. Suggested value 10%
gas_bump_incremental_percentage: 2 # An extra percentage to bump every retry i*2 when responding to task. Suggested value 2%
time_to_wait_before_bump: 36s # The time to wait for the receipt when responding to task. Suggested value 36 seconds (3 blocks)
gas_base_bump_percentage: 25 # Percentage to overestimate gas price when sending a task
gas_bump_incremental_percentage: 20 # An extra percentage to overestimate in each bump of respond to task. This is additive between tries
# Gas used formula = est_gas_by_node * (gas_base_bump_percentage + gas_bum_incremental_percentage * i) / 100, where i is the iteration number.
time_to_wait_before_bump: 72s # The time to wait for the receipt when responding to task. Suggested value 72 seconds (6 blocks)

## Operator Configurations
# operator:
Expand Down
85 changes: 68 additions & 17 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chainio

import (
"context"
"encoding/hex"
"fmt"
"math/big"
"time"
Expand Down Expand Up @@ -75,9 +76,19 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
}, nil
}

// Sends AggregatedResponse and waits for the receipt for three blocks, if not received
// it will try again bumping the last tx gas price based on `CalculateGasPriceBump`
// This process happens indefinitely until the transaction is included.
// SendAggregatedResponse continuously sends a RespondToTask transaction until it is included in the blockchain.
// This function:
// 1. Simulates the transaction to calculate the nonce and initial gas price without broadcasting it.
// 2. Repeatedly attempts to send the transaction, bumping the gas price after `timeToWaitBeforeBump` has passed.
// 3. Monitors for the receipt of previously sent transactions or checks the state to confirm if the response
// has already been processed (e.g., by another transaction).
// 4. Validates that the aggregator and batcher have sufficient balance to cover transaction costs before sending.
//
// Returns:
// - A transaction receipt if the transaction is successfully included in the blockchain.
// - If no receipt is found, but the batch state indicates the response has already been processed, it exits
// without an error (returning `nil, nil`).
// - An error if the process encounters a fatal issue (e.g., permanent failure in verifying balances or state).
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) {
txOpts := *w.Signer.GetTxOpts()
txOpts.NoSend = true // simulate the transaction
Expand All @@ -93,39 +104,73 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
txOpts.NoSend = false
i := 0

var sentTxs []*types.Transaction

batchMerkleRootHashString := hex.EncodeToString(batchMerkleRoot[:])

respondToTaskV2Func := func() (*types.Receipt, error) {
gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback)
if err != nil {
return nil, err
}

bumpedGasPrice := utils.CalculateGasPriceBumpBasedOnRetry(gasPrice, gasBumpPercentage, gasBumpIncrementalPercentage, i)
// new bumped gas price must be higher than the last one (this should hardly ever happen though)
if bumpedGasPrice.Cmp(txOpts.GasPrice) > 0 {
txOpts.GasPrice = bumpedGasPrice
previousTxGasPrice := txOpts.GasPrice
// in order to avoid replacement transaction underpriced
// the bumped gas price has to be at least 10% higher than the previous one.
minimumGasPriceBump := utils.CalculateGasPriceBumpBasedOnRetry(previousTxGasPrice, 10, 0, 0)
suggestedBumpedGasPrice := utils.CalculateGasPriceBumpBasedOnRetry(
gasPrice,
gasBumpPercentage,
gasBumpIncrementalPercentage,
i,
)
// check the new gas price is sufficiently bumped.
// if the suggested bump does not meet the minimum threshold, use a fallback calculation to slightly increment the previous gas price.
if suggestedBumpedGasPrice.Cmp(minimumGasPriceBump) > 0 {
txOpts.GasPrice = suggestedBumpedGasPrice
} else {
// bump the last tx gas price a little by `gasBumpIncrementalPercentage` to replace it.
txOpts.GasPrice = utils.CalculateGasPriceBumpBasedOnRetry(txOpts.GasPrice, gasBumpIncrementalPercentage, 0, 0)
txOpts.GasPrice = minimumGasPriceBump
}

if i > 0 {
w.logger.Infof("Trying to get old sent transaction receipt before sending a new transaction", "merkle root", batchMerkleRootHashString)
for _, tx := range sentTxs {
receipt, _ := w.Client.TransactionReceipt(context.Background(), tx.Hash())
if receipt == nil {
receipt, _ = w.ClientFallback.TransactionReceipt(context.Background(), tx.Hash())
if receipt != nil {
w.checkIfAggregatorHadToPaidForBatcher(tx, batchIdentifierHash)
return receipt, nil
}
}
}
w.logger.Infof("Receipts for old transactions not found, will check if the batch state has been responded", "merkle root", batchMerkleRootHashString)
batchState, _ := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
if batchState.Responded {
w.logger.Infof("Batch state has been already responded", "merkle root", batchMerkleRootHashString)
return nil, nil
}
w.logger.Infof("Batch state has not been responded yet, will send a new tx", "merkle root", batchMerkleRootHashString)

onGasPriceBumped(txOpts.GasPrice)
}

// We compare both Aggregator funds and Batcher balance in Aligned against respondToTaskFeeLimit
// Both are required to have some balance, more details inside the function
err = w.checkAggAndBatcherHaveEnoughBalance(simTx, txOpts, batchIdentifierHash, senderAddress)
if err != nil {
w.logger.Errorf("Permanent error when checking aggregator and batcher balances, err %v", err, "merkle root", batchMerkleRootHashString)
return nil, retry.PermanentError{Inner: err}
}

w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice)

w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice, "merkle root", batchMerkleRootHashString)
realTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
if err != nil {
w.logger.Errorf("Respond to task transaction err, %v", err, "merkle root", batchMerkleRootHashString)
return nil, err
}
sentTxs = append(sentTxs, realTx)

w.logger.Infof("Transaction sent, waiting for receipt", "merkle root", batchMerkleRootHashString)
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), timeToWaitBeforeBump)
if receipt != nil {
w.checkIfAggregatorHadToPaidForBatcher(realTx, batchIdentifierHash)
Expand All @@ -136,14 +181,18 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
// we increment the i here to add an incremental percentage to increase the odds of being included in the next blocks
i++

w.logger.Infof("RespondToTask receipt waiting timeout has passed, will try again...")
w.logger.Infof("RespondToTask receipt waiting timeout has passed, will try again...", "merkle_root", batchMerkleRootHashString)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("transaction failed")
}

return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, retry.MaxInterval, 0)
// This just retries the bump of a fee in case of a timeout
// The wait is done before on WaitForTransactionReceiptRetryable, and all the functions are retriable,
// so this retry doesn't need to wait more time
maxInterval := time.Millisecond * 500
return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, maxInterval, 0)
}

// Calculates the transaction cost from the receipt and compares it with the batcher respondToTaskFeeLimit
Expand Down Expand Up @@ -171,7 +220,9 @@ func (w *AvsWriter) checkIfAggregatorHadToPaidForBatcher(tx *types.Transaction,
func (w *AvsWriter) checkAggAndBatcherHaveEnoughBalance(tx *types.Transaction, txOpts bind.TransactOpts, batchIdentifierHash [32]byte, senderAddress [20]byte) error {
w.logger.Info("Checking if aggregator and batcher have enough balance for the transaction")
aggregatorAddress := txOpts.From
txCost := new(big.Int).Mul(new(big.Int).SetUint64(tx.Gas()), txOpts.GasPrice)
txGasAsBigInt := new(big.Int).SetUint64(tx.Gas())
txGasPrice := txOpts.GasPrice
txCost := new(big.Int).Mul(txGasAsBigInt, txGasPrice)
w.logger.Info("Transaction cost", "cost", txCost)

batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
Expand All @@ -183,8 +234,8 @@ func (w *AvsWriter) checkAggAndBatcherHaveEnoughBalance(tx *types.Transaction, t
respondToTaskFeeLimit := batchState.RespondToTaskFeeLimit
w.logger.Info("Checking balance against Batch RespondToTaskFeeLimit", "RespondToTaskFeeLimit", respondToTaskFeeLimit)
// Note: we compare both Aggregator funds and Batcher balance in Aligned against respondToTaskFeeLimit
// Batcher will pay up to respondToTaskFeeLimit, for this he needs that amount of funds in Aligned
// Aggregator will pay any extra cost, for this he needs at least respondToTaskFeeLimit in his balance
// Batcher will pay up to respondToTaskFeeLimit, for this he needs that amount of funds in Aligned
// Aggregator will pay any extra cost, for this he needs at least respondToTaskFeeLimit in his balance
return w.compareBalances(respondToTaskFeeLimit, aggregatorAddress, senderAddress)
}

Expand Down
1 change: 1 addition & 0 deletions core/chainio/retryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkl
// If error try with fallback
tx, err = w.AvsContractBindings.ServiceManagerFallback.RespondToTaskV2(opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
}

return tx, err
}
return retry.RetryWithData(respondToTaskV2_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime)
Expand Down
4 changes: 2 additions & 2 deletions core/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat
if panic_err, ok := r.(error); ok {
err = panic_err
} else {
err = fmt.Errorf("panicked: %v", panic_err)
err = fmt.Errorf("RetryWithData panicked: %v", panic_err)
}
}
}()
Expand Down Expand Up @@ -151,7 +151,7 @@ func Retry(functionToRetry func() error, minDelay time.Duration, factor float64,
if panic_err, ok := r.(error); ok {
err = panic_err
} else {
err = fmt.Errorf("panicked: %v", panic_err)
err = fmt.Errorf("Retry panicked: %v", panic_err)
}
}
}()
Expand Down
Loading