Skip to content

Commit 19f2de7

Browse files
zerosnacksmattssegrandizzy
authored
feat(forge script): implement retries for forge script broadcast (#12564)
* add retry for non sequential txs Co-authored-by: Matthias Seitz <[email protected]> Co-authored-by: grandizzy <[email protected]> * nit * remove debug sleep --------- Co-authored-by: Matthias Seitz <[email protected]> Co-authored-by: grandizzy <[email protected]>
1 parent 032e9a4 commit 19f2de7

File tree

1 file changed

+165
-86
lines changed

1 file changed

+165
-86
lines changed

crates/script/src/broadcast.rs

Lines changed: 165 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
use crate::{
2-
ScriptArgs, ScriptConfig, build::LinkedBuildData, progress::ScriptProgress,
3-
sequence::ScriptSequenceKind, verify::BroadcastedState,
4-
};
1+
use std::{cmp::Ordering, sync::Arc, time::Duration};
2+
53
use alloy_chains::{Chain, NamedChain};
64
use alloy_consensus::TxEnvelope;
75
use alloy_eips::{BlockId, eip2718::Encodable2718};
@@ -24,9 +22,13 @@ use foundry_common::{
2422
shell,
2523
};
2624
use foundry_config::Config;
27-
use futures::{StreamExt, future::join_all};
25+
use futures::{FutureExt, StreamExt, future::join_all, stream::FuturesUnordered};
2826
use itertools::Itertools;
29-
use std::{cmp::Ordering, sync::Arc};
27+
28+
use crate::{
29+
ScriptArgs, ScriptConfig, build::LinkedBuildData, progress::ScriptProgress,
30+
sequence::ScriptSequenceKind, verify::BroadcastedState,
31+
};
3032

3133
pub async fn estimate_gas<P: Provider<AnyNetwork>>(
3234
tx: &mut WithOtherFields<TransactionRequest>,
@@ -57,82 +59,124 @@ pub async fn next_nonce(
5759
Ok(provider.get_transaction_count(caller).block_id(block_id).await?)
5860
}
5961

60-
pub async fn send_transaction(
61-
provider: Arc<RetryProvider>,
62-
mut kind: SendTransactionKind<'_>,
63-
sequential_broadcast: bool,
64-
is_fixed_gas_limit: bool,
65-
estimate_via_rpc: bool,
66-
estimate_multiplier: u64,
67-
) -> Result<TxHash> {
68-
if let SendTransactionKind::Raw(tx, _) | SendTransactionKind::Unlocked(tx) = &mut kind {
69-
if sequential_broadcast {
70-
let from = tx.from.expect("no sender");
71-
72-
let tx_nonce = tx.nonce.expect("no nonce");
73-
for attempt in 0..5 {
74-
let nonce = provider.get_transaction_count(from).await?;
75-
match nonce.cmp(&tx_nonce) {
76-
Ordering::Greater => {
77-
bail!(
78-
"EOA nonce changed unexpectedly while sending transactions. Expected {tx_nonce} got {nonce} from provider."
79-
)
80-
}
81-
Ordering::Less => {
82-
if attempt == 4 {
62+
/// Represents how to send a single transaction.
63+
#[derive(Clone)]
64+
pub enum SendTransactionKind<'a> {
65+
Unlocked(WithOtherFields<TransactionRequest>),
66+
Raw(WithOtherFields<TransactionRequest>, &'a EthereumWallet),
67+
Signed(TxEnvelope),
68+
}
69+
70+
impl<'a> SendTransactionKind<'a> {
71+
/// Prepares the transaction for broadcasting by synchronizing nonce and estimating gas.
72+
///
73+
/// This method performs two key operations:
74+
/// 1. Nonce synchronization: Waits for the provider's nonce to catch up to the expected
75+
/// transaction nonce when doing sequential broadcast
76+
/// 2. Gas estimation: Re-estimates gas right before broadcasting for chains that require it
77+
pub async fn prepare(
78+
&mut self,
79+
provider: &RetryProvider,
80+
sequential_broadcast: bool,
81+
is_fixed_gas_limit: bool,
82+
estimate_via_rpc: bool,
83+
estimate_multiplier: u64,
84+
) -> Result<()> {
85+
if let Self::Raw(tx, _) | Self::Unlocked(tx) = self {
86+
if sequential_broadcast {
87+
let from = tx.from.expect("no sender");
88+
89+
let tx_nonce = tx.nonce.expect("no nonce");
90+
for attempt in 0..5 {
91+
let nonce = provider.get_transaction_count(from).await?;
92+
match nonce.cmp(&tx_nonce) {
93+
Ordering::Greater => {
8394
bail!(
84-
"After 5 attempts, provider nonce ({nonce}) is still behind expected nonce ({tx_nonce})."
95+
"EOA nonce changed unexpectedly while sending transactions. Expected {tx_nonce} got {nonce} from provider."
8596
)
8697
}
87-
warn!(
88-
"Expected nonce ({tx_nonce}) is ahead of provider nonce ({nonce}). Retrying in 1 second..."
89-
);
90-
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
91-
}
92-
Ordering::Equal => {
93-
// Nonces are equal, we can proceed
94-
break;
98+
Ordering::Less => {
99+
if attempt == 4 {
100+
bail!(
101+
"After 5 attempts, provider nonce ({nonce}) is still behind expected nonce ({tx_nonce})."
102+
)
103+
}
104+
warn!(
105+
"Expected nonce ({tx_nonce}) is ahead of provider nonce ({nonce}). Retrying in 1 second..."
106+
);
107+
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
108+
}
109+
Ordering::Equal => {
110+
// Nonces are equal, we can proceed.
111+
break;
112+
}
95113
}
96114
}
97115
}
98-
}
99116

100-
// Chains which use `eth_estimateGas` are being sent sequentially and require their
101-
// gas to be re-estimated right before broadcasting.
102-
if !is_fixed_gas_limit && estimate_via_rpc {
103-
estimate_gas(tx, &provider, estimate_multiplier).await?;
117+
// Chains which use `eth_estimateGas` are being sent sequentially and require their
118+
// gas to be re-estimated right before broadcasting.
119+
if !is_fixed_gas_limit && estimate_via_rpc {
120+
estimate_gas(tx, provider, estimate_multiplier).await?;
121+
}
104122
}
105-
}
106123

107-
let pending = match kind {
108-
SendTransactionKind::Unlocked(tx) => {
109-
debug!("sending transaction from unlocked account {:?}", tx);
124+
Ok(())
125+
}
110126

111-
// Submit the transaction
112-
provider.send_transaction(tx).await?
113-
}
114-
SendTransactionKind::Raw(tx, signer) => {
115-
debug!("sending transaction: {:?}", tx);
116-
let signed = tx.build(signer).await?;
127+
/// Sends the transaction to the network.
128+
///
129+
/// Depending on the transaction kind, this will either:
130+
/// - Submit via `eth_sendTransaction` for unlocked accounts
131+
/// - Sign and submit via `eth_sendRawTransaction` for raw transactions
132+
/// - Submit pre-signed transaction via `eth_sendRawTransaction`
133+
pub async fn send(self, provider: Arc<RetryProvider>) -> Result<TxHash> {
134+
let pending = match self {
135+
Self::Unlocked(tx) => {
136+
debug!("sending transaction from unlocked account {:?}", tx);
137+
138+
// Submit the transaction
139+
provider.send_transaction(tx).await?
140+
}
141+
Self::Raw(tx, signer) => {
142+
debug!("sending transaction: {:?}", tx);
143+
let signed = tx.build(signer).await?;
117144

118-
// Submit the raw transaction
119-
provider.send_raw_transaction(signed.encoded_2718().as_ref()).await?
120-
}
121-
SendTransactionKind::Signed(tx) => {
122-
debug!("sending transaction: {:?}", tx);
123-
provider.send_raw_transaction(tx.encoded_2718().as_ref()).await?
124-
}
125-
};
145+
// Submit the raw transaction
146+
provider.send_raw_transaction(signed.encoded_2718().as_ref()).await?
147+
}
148+
Self::Signed(tx) => {
149+
debug!("sending transaction: {:?}", tx);
150+
provider.send_raw_transaction(tx.encoded_2718().as_ref()).await?
151+
}
152+
};
126153

127-
Ok(*pending.tx_hash())
128-
}
154+
Ok(*pending.tx_hash())
155+
}
129156

130-
/// How to send a single transaction
131-
#[derive(Clone)]
132-
pub enum SendTransactionKind<'a> {
133-
Unlocked(WithOtherFields<TransactionRequest>),
134-
Raw(WithOtherFields<TransactionRequest>, &'a EthereumWallet),
135-
Signed(TxEnvelope),
157+
/// Prepares and sends the transaction in one operation.
158+
///
159+
/// This is a convenience method that combines [`prepare`](Self::prepare) and
160+
/// [`send`](Self::send) into a single call.
161+
pub async fn prepare_and_send(
162+
mut self,
163+
provider: Arc<RetryProvider>,
164+
sequential_broadcast: bool,
165+
is_fixed_gas_limit: bool,
166+
estimate_via_rpc: bool,
167+
estimate_multiplier: u64,
168+
) -> Result<TxHash> {
169+
self.prepare(
170+
&provider,
171+
sequential_broadcast,
172+
is_fixed_gas_limit,
173+
estimate_via_rpc,
174+
estimate_multiplier,
175+
)
176+
.await?;
177+
178+
self.send(provider).await
179+
}
136180
}
137181

138182
/// Represents how to send _all_ transactions
@@ -365,30 +409,65 @@ impl BundledState {
365409
let mut index = already_broadcasted;
366410

367411
for (batch_number, batch) in transactions.chunks(batch_size).enumerate() {
368-
let mut pending_transactions = vec![];
369-
370412
seq_progress.inner.write().set_status(&format!(
371413
"Sending transactions [{} - {}]",
372414
batch_number * batch_size,
373415
batch_number * batch_size + std::cmp::min(batch_size, batch.len()) - 1
374416
));
375-
for (kind, is_fixed_gas_limit) in batch {
376-
let fut = send_transaction(
377-
provider.clone(),
378-
kind.clone(),
379-
sequential_broadcast,
380-
*is_fixed_gas_limit,
381-
estimate_via_rpc,
382-
self.args.gas_estimate_multiplier,
383-
);
384-
pending_transactions.push(fut);
385-
}
386417

387-
if !pending_transactions.is_empty() {
388-
let mut buffer = futures::stream::iter(pending_transactions).buffered(7);
418+
if !batch.is_empty() {
419+
let pending_transactions =
420+
batch.iter().map(|(kind, is_fixed_gas_limit)| {
421+
let provider = provider.clone();
422+
async move {
423+
let res = kind
424+
.clone()
425+
.prepare_and_send(
426+
provider,
427+
sequential_broadcast,
428+
*is_fixed_gas_limit,
429+
estimate_via_rpc,
430+
self.args.gas_estimate_multiplier,
431+
)
432+
.await;
433+
(res, kind, 0, None)
434+
}
435+
.boxed()
436+
});
437+
438+
let mut buffer = pending_transactions.collect::<FuturesUnordered<_>>();
439+
440+
'send: while let Some((res, kind, attempt, original_res)) =
441+
buffer.next().await
442+
{
443+
if res.is_err() && attempt <= 3 {
444+
// Try to resubmit the transaction
445+
let provider = provider.clone();
446+
let progress = seq_progress.inner.clone();
447+
buffer.push(Box::pin(async move {
448+
debug!(err=?res, ?attempt, "retrying transaction ");
449+
let attempt = attempt + 1;
450+
progress.write().set_status(&format!(
451+
"retrying transaction {res:?} (attempt {attempt})"
452+
));
453+
tokio::time::sleep(Duration::from_millis(1000 * attempt)).await;
454+
let r = kind.clone().send(provider).await;
455+
(r, kind, attempt, original_res.or(Some(res)))
456+
}));
457+
458+
continue 'send;
459+
}
389460

390-
while let Some(tx_hash) = buffer.next().await {
391-
let tx_hash = tx_hash.wrap_err("Failed to send transaction")?;
461+
// Preserve the original error if any
462+
let tx_hash = res.wrap_err_with(|| {
463+
if let Some(original_res) = original_res {
464+
format!(
465+
"Failed to send transaction after {attempt} attempts {original_res:?}"
466+
)
467+
} else {
468+
"Failed to send transaction".to_string()
469+
}
470+
})?;
392471
sequence.add_pending(index, tx_hash);
393472

394473
// Checkpoint save

0 commit comments

Comments
 (0)