Skip to content

Commit 815902c

Browse files
TDemecoffarall
andauthored
fix: 🚑 nonce management in the blockchain service (#549)
* feat: ✨ initial transaction pool and nonce manager logic * feat: ♻️ refactor watches to sync with new transaction pool * fix: 🐛 issue with status subscriptions * fix: 🩹 also process tx status updates on finality * feat: 🔊 improve log clarity by specifying which tx is being dropped * test: ✅ add Retracted and Usurped test cases for new logic * fix: 🎨 run cargo fmt * feat: ✨ add event extraction from successful transactions * style: 🎨 run cargo fmt * test: ✅ add Invalid transaction and nonce gap fulfilling test * feat: 🔊 Add error logs in case of failing to send status update of transactions * feat: ✅ add a way to select log level for actors in integration tests * feat: 🔊 Add logging of which call is related to which hash * fix: 🩹 Remove leftover `only: true` flag * perf: ⚡ Decrease retry timout in test * fix: 🩹 amend review (pool -> manager) * feat: ✨ add tip tracking in transaction manager * docs: 💡 add permalink to Polkadot SDK's `TransactionStatus` * test: ✅ rename and fix transaction manager integration test * fix: ✅ single volunteer test * test: ✅ improve BSP volunteer for multiple files test * feat: ✨ Allow arbitrary strings for log level in tests * feat: 🔊 Improve logs of transactions adding nonce * fix: 🐛 add initial check when watching for tx success * fix: 🐛 check for the `ExtrinsicSuccess` event to ensure the new ext didn't fail * test: ✅ minor test fixes * fix: 🩹 Fix typos and names of tests * fix: 🐛 Update transaction status in `run()` cycle of blockchain service * fix: 🩹 Remove processing of transaction status in block import and block finalised * fix: 🐛 skip to correct block when testing MSP charge * fix: 🩹 Remove `only: true` flag from test * fix: 🩹 Move `tx_status_receiver` out of `actor` struct * fix: 🐛 Avoid cross-talk between updates from replaced transactions * test: ✅ fix invalid transaction test by dropping retry tx --------- Co-authored-by: Facundo Farall <[email protected]>
1 parent 99eea85 commit 815902c

34 files changed

+2544
-801
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/blockchain-service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ sc-client-api = { workspace = true }
3636
sc-service = { workspace = true }
3737
sc-network = { workspace = true }
3838
sc-network-types = { workspace = true }
39+
sc-transaction-pool-api = { workspace = true }
3940
sc-utils = { workspace = true }
4041
sc-tracing = { workspace = true }
4142
sp-api = { workspace = true }

client/blockchain-service/src/capacity_manager.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use shc_common::types::{BlockNumber, ProviderId, StorageDataUnit};
1616
use shc_forest_manager::traits::ForestStorageHandler;
1717

1818
use crate::{
19-
transaction::SubmittedTransaction, types::ManagedProvider, types::SendExtrinsicOptions,
19+
types::{ManagedProvider, SendExtrinsicOptions, SubmittedExtrinsicInfo},
2020
BlockchainService,
2121
};
2222

@@ -38,7 +38,7 @@ pub struct CapacityRequestQueue<Runtime: StorageEnableRuntime> {
3838
/// This is reset when the `pending_requests` is moved to `requests_waiting_for_inclusion` when they have been batched in a single transaction.
3939
total_required: StorageDataUnit<Runtime>,
4040
/// The last submitted transaction which `requests_waiting_for_inclusion` is waiting for.
41-
last_submitted_transaction: Option<SubmittedTransaction<Runtime>>,
41+
last_submitted_transaction: Option<SubmittedExtrinsicInfo<Runtime>>,
4242
}
4343

4444
impl<Runtime: StorageEnableRuntime> CapacityRequestQueue<Runtime> {
@@ -53,7 +53,7 @@ impl<Runtime: StorageEnableRuntime> CapacityRequestQueue<Runtime> {
5353
}
5454

5555
/// Get the last submitted transaction.
56-
pub fn last_submitted_transaction(&self) -> Option<&SubmittedTransaction<Runtime>> {
56+
pub fn last_submitted_transaction(&self) -> Option<&SubmittedExtrinsicInfo<Runtime>> {
5757
self.last_submitted_transaction.as_ref()
5858
}
5959

@@ -131,10 +131,10 @@ impl<Runtime: StorageEnableRuntime> CapacityRequestQueue<Runtime> {
131131
!self.requests_waiting_for_inclusion.is_empty()
132132
}
133133

134-
/// Add all pending requests to the list of requests waiting for inclusion of the [`SubmittedTransaction`].
134+
/// Add all pending requests to the list of requests waiting for inclusion of the [`SubmittedExtrinsicInfo`].
135135
pub fn add_pending_requests_to_waiting_for_inclusion(
136136
&mut self,
137-
submitted_transaction: SubmittedTransaction<Runtime>,
137+
submitted_transaction: SubmittedExtrinsicInfo<Runtime>,
138138
) {
139139
self.requests_waiting_for_inclusion
140140
.extend(self.pending_requests.drain(..));
@@ -335,20 +335,20 @@ where
335335

336336
// Send extrinsic to increase capacity
337337
match self
338-
.send_extrinsic(call, &SendExtrinsicOptions::new(extrinsic_retry_timeout))
338+
.send_extrinsic(
339+
call,
340+
&SendExtrinsicOptions::new(
341+
extrinsic_retry_timeout,
342+
Some("storageProviders".to_string()),
343+
Some("changeCapacity".to_string()),
344+
),
345+
)
339346
.await
340347
{
341348
Ok(output) => {
342349
// Add all pending requests to the list of requests waiting for inclusion.
343350
if let Some(capacity_manager) = self.capacity_manager.as_mut() {
344-
capacity_manager.add_pending_requests_to_waiting_for_inclusion(
345-
SubmittedTransaction::new(
346-
output.receiver,
347-
output.hash,
348-
output.nonce,
349-
extrinsic_retry_timeout,
350-
),
351-
);
351+
capacity_manager.add_pending_requests_to_waiting_for_inclusion(output);
352352
} else {
353353
error!(target: LOG_TARGET, "Capacity manager not initialized");
354354
}

client/blockchain-service/src/commands.rs

Lines changed: 44 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use anyhow::Result;
22
use async_trait::async_trait;
33
use log::{debug, warn};
44
use sc_network::Multiaddr;
5-
use serde_json::Number;
65
use shc_common::{
76
traits::{KeyTypeOperations, StorageEnableRuntime},
87
types::StorageEnableEvents,
@@ -34,11 +33,12 @@ use shc_forest_manager::traits::ForestStorageHandler;
3433
use crate::{
3534
capacity_manager::CapacityRequestData,
3635
handler::BlockchainService,
37-
transaction::SubmittedTransaction,
36+
transaction_manager::wait_for_transaction_status,
3837
types::{
3938
ConfirmStoringRequest, Extrinsic, ExtrinsicResult, FileDeletionRequest, MinimalBlockInfo,
40-
RespondStorageRequest, RetryStrategy, SendExtrinsicOptions,
41-
StopStoringForInsolventUserRequest, SubmitProofRequest, WatchTransactionError,
39+
RespondStorageRequest, RetryStrategy, SendExtrinsicOptions, StatusToWait,
40+
StopStoringForInsolventUserRequest, SubmitProofRequest, SubmittedExtrinsicInfo,
41+
WatchTransactionError,
4242
},
4343
};
4444

@@ -51,7 +51,7 @@ const LOG_TARGET: &str = "blockchain-service-interface";
5151
default_inner_channel_type = tokio::sync::oneshot::Receiver,
5252
)]
5353
pub enum BlockchainServiceCommand<Runtime: StorageEnableRuntime> {
54-
#[command(success_type = SubmittedTransaction<Runtime>)]
54+
#[command(success_type = SubmittedExtrinsicInfo<Runtime>)]
5555
SendExtrinsic {
5656
call: Runtime::Call,
5757
options: SendExtrinsicOptions,
@@ -61,36 +61,25 @@ pub enum BlockchainServiceCommand<Runtime: StorageEnableRuntime> {
6161
block_hash: Runtime::Hash,
6262
extrinsic_hash: Runtime::Hash,
6363
},
64-
UnwatchExtrinsic {
65-
subscription_id: Number,
66-
},
6764
#[command(success_type = MinimalBlockInfo<Runtime>)]
6865
GetBestBlockInfo,
6966
#[command(mode = "AsyncResponse")]
70-
WaitForBlock {
71-
block_number: BlockNumber<Runtime>,
72-
},
67+
WaitForBlock { block_number: BlockNumber<Runtime> },
7368
#[command(mode = "AsyncResponse")]
7469
WaitForNumBlocks {
7570
number_of_blocks: BlockNumber<Runtime>,
7671
},
7772
#[command(mode = "AsyncResponse", error_type = ApiError)]
78-
WaitForTick {
79-
tick_number: TickNumber<Runtime>,
80-
},
73+
WaitForTick { tick_number: TickNumber<Runtime> },
8174
#[command(success_type = bool, error_type = IsStorageRequestOpenToVolunteersError)]
82-
IsStorageRequestOpenToVolunteers {
83-
file_key: Runtime::Hash,
84-
},
75+
IsStorageRequestOpenToVolunteers { file_key: Runtime::Hash },
8576
#[command(success_type = BlockNumber<Runtime>, error_type = QueryFileEarliestVolunteerTickError)]
8677
QueryFileEarliestVolunteerTick {
8778
bsp_id: ProviderId<Runtime>,
8879
file_key: Runtime::Hash,
8980
},
9081
#[command(success_type = BlockNumber<Runtime>, error_type = QueryEarliestChangeCapacityBlockError)]
91-
QueryEarliestChangeCapacityBlock {
92-
bsp_id: ProviderId<Runtime>,
93-
},
82+
QueryEarliestChangeCapacityBlock { bsp_id: ProviderId<Runtime> },
9483
#[command(success_type = <<Runtime as StorageEnableRuntime>::Signature as KeyTypeOperations>::Public)]
9584
GetNodePublicKey,
9685
#[command(success_type = Vec<ChunkId>, error_type = QueryBspConfirmChunksToProveForFileError)]
@@ -104,9 +93,7 @@ pub enum BlockchainServiceCommand<Runtime: StorageEnableRuntime> {
10493
file_key: Runtime::Hash,
10594
},
10695
#[command(success_type = Vec<Multiaddr>, error_type = QueryProviderMultiaddressesError)]
107-
QueryProviderMultiaddresses {
108-
provider_id: ProviderId<Runtime>,
109-
},
96+
QueryProviderMultiaddresses { provider_id: ProviderId<Runtime> },
11097
QueueSubmitProofRequest {
11198
request: SubmitProofRequest<Runtime>,
11299
},
@@ -145,21 +132,13 @@ pub enum BlockchainServiceCommand<Runtime: StorageEnableRuntime> {
145132
#[command(success_type = BlockNumber<Runtime>, error_type = ApiError)]
146133
QueryLastCheckpointChallengeTick,
147134
#[command(success_type = Vec<CustomChallenge<Runtime>>, error_type = GetCheckpointChallengesError)]
148-
QueryLastCheckpointChallenges {
149-
tick: BlockNumber<Runtime>,
150-
},
135+
QueryLastCheckpointChallenges { tick: BlockNumber<Runtime> },
151136
#[command(success_type = Runtime::Hash, error_type = GetBspInfoError)]
152-
QueryProviderForestRoot {
153-
provider_id: ProviderId<Runtime>,
154-
},
137+
QueryProviderForestRoot { provider_id: ProviderId<Runtime> },
155138
#[command(success_type = StorageDataUnit<Runtime>, error_type = QueryStorageProviderCapacityError)]
156-
QueryStorageProviderCapacity {
157-
provider_id: ProviderId<Runtime>,
158-
},
139+
QueryStorageProviderCapacity { provider_id: ProviderId<Runtime> },
159140
#[command(success_type = StorageDataUnit<Runtime>, error_type = QueryAvailableStorageCapacityError)]
160-
QueryAvailableStorageCapacity {
161-
provider_id: ProviderId<Runtime>,
162-
},
141+
QueryAvailableStorageCapacity { provider_id: ProviderId<Runtime> },
163142
#[command(success_type = Option<StorageProviderId<Runtime>>)]
164143
QueryStorageProviderId {
165144
maybe_node_pub_key:
@@ -171,15 +150,11 @@ pub enum BlockchainServiceCommand<Runtime: StorageEnableRuntime> {
171150
min_debt: Balance<Runtime>,
172151
},
173152
#[command(success_type = Option<Balance<Runtime>>)]
174-
QueryWorstCaseScenarioSlashableAmount {
175-
provider_id: ProviderId<Runtime>,
176-
},
153+
QueryWorstCaseScenarioSlashableAmount { provider_id: ProviderId<Runtime> },
177154
#[command(success_type = Balance<Runtime>)]
178155
QuerySlashAmountPerMaxFileSize,
179156
#[command(success_type = Option<MainStorageProviderId<Runtime>>, error_type = QueryMspIdOfBucketIdError)]
180-
QueryMspIdOfBucketId {
181-
bucket_id: BucketId<Runtime>,
182-
},
157+
QueryMspIdOfBucketId { bucket_id: BucketId<Runtime> },
183158
ReleaseForestRootWriteLock {
184159
forest_root_write_tx: tokio::sync::oneshot::Sender<()>,
185160
},
@@ -206,9 +181,7 @@ pub enum BlockchainServiceCommand<Runtime: StorageEnableRuntime> {
206181
bsp_id: BackupStorageProviderId<Runtime>,
207182
},
208183
#[command(success_type = Vec<BucketId<Runtime>>)]
209-
QueryBucketsForMsp {
210-
msp_id: ProviderId<Runtime>,
211-
},
184+
QueryBucketsForMsp { msp_id: ProviderId<Runtime> },
212185
}
213186

214187
/// Interface for interacting with the BlockchainService actor.
@@ -280,25 +253,34 @@ where
280253
for retry_count in 0..=retry_strategy.max_retries {
281254
debug!(target: LOG_TARGET, "Submitting transaction {:?} with tip {}", call, tip);
282255

283-
let extrinsic_options = SendExtrinsicOptions::new(options.timeout())
284-
.with_tip(tip as u128)
285-
.with_nonce(nonce);
256+
let extrinsic_options =
257+
SendExtrinsicOptions::new(options.timeout(), options.module(), options.method())
258+
.with_tip(tip as u128)
259+
.with_nonce(nonce);
286260

287-
let mut transaction = self.send_extrinsic(call.clone(), extrinsic_options).await?;
261+
let submitted_ext_info = self.send_extrinsic(call.clone(), extrinsic_options).await?;
288262

289-
let result: Result<Option<StorageHubEventsVec<Runtime>>, _> = if with_events {
290-
transaction
291-
.watch_for_success_with_events(&self)
292-
.await
293-
.map(Some)
294-
} else {
295-
transaction.watch_for_success(&self).await.map(|_| None)
296-
};
263+
// Wait for transaction to be included in a block
264+
let result = wait_for_transaction_status(
265+
submitted_ext_info.nonce,
266+
submitted_ext_info.status_subscription.clone(),
267+
StatusToWait::InBlock,
268+
options.timeout(),
269+
)
270+
.await;
297271

298272
match result {
299-
Ok(maybe_events) => {
300-
debug!(target: LOG_TARGET, "Transaction with hash {:?} succeeded", transaction.hash());
301-
return Ok(maybe_events);
273+
Ok(block_hash) => {
274+
debug!(target: LOG_TARGET, "Transaction with hash {:?} succeeded", submitted_ext_info.hash);
275+
276+
if with_events {
277+
let extrinsic = self
278+
.get_extrinsic_from_block(block_hash, submitted_ext_info.hash)
279+
.await?;
280+
return Ok(Some(extrinsic.events));
281+
} else {
282+
return Ok(None);
283+
}
302284
}
303285
Err(err) => {
304286
warn!(target: LOG_TARGET, "Transaction failed: {:?}", err);
@@ -309,18 +291,17 @@ where
309291
}
310292
}
311293

312-
warn!(target: LOG_TARGET, "Failed to submit transaction with hash {:?}, attempt #{}", transaction.hash(), retry_count + 1);
294+
warn!(target: LOG_TARGET, "Failed to submit transaction with hash {:?}, attempt #{}", submitted_ext_info.hash, retry_count + 1);
313295

314-
// TODO: Add pending transaction pool implementation to be able to resubmit transactions with nonces lower than the current one to avoid this transaction from being stuck.
315296
if let WatchTransactionError::Timeout = err {
316297
// Increase the tip to incentivise the collators to include the transaction in a block with priority
317298
tip = retry_strategy.compute_tip(retry_count + 1);
318299
// Reuse the same nonce since the transaction was not included in a block.
319-
nonce = Some(transaction.nonce());
300+
nonce = Some(submitted_ext_info.nonce);
320301

321302
// Log warning if this is not the last retry.
322303
if retry_count < retry_strategy.max_retries {
323-
warn!(target: LOG_TARGET, "Retrying with increased tip {} and nonce {}", tip, transaction.nonce());
304+
warn!(target: LOG_TARGET, "Retrying with increased tip {} and nonce {}", tip, submitted_ext_info.nonce);
324305
}
325306
}
326307
}

0 commit comments

Comments
 (0)