Skip to content

Commit b25fc51

Browse files
liuchengxuclaude
andcommitted
Thread trie_node_writer through ChainSync to StateSync
Add trie_node_writer parameter to ChainSync constructor and store as field. Pass to StateSync when initiating state sync, enabling incremental trie node writing during state sync to avoid 30GiB+ memory usage. - Add trie_node_writer field to ChainSync struct - Add parameter to ChainSync::new() constructor - Pass trie_node_writer to StateSync in chain sync state initiation - Update PolkadotSyncingStrategy to thread parameter through - Update all test cases with None parameter 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent da9797e commit b25fc51

File tree

3 files changed

+99
-72
lines changed

3 files changed

+99
-72
lines changed

substrate/client/network/sync/src/strategy/chain_sync.rs

Lines changed: 65 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,8 @@ pub struct ChainSync<B: BlockT, Client> {
332332
downloaded_blocks: usize,
333333
/// State sync in progress, if any.
334334
state_sync: Option<StateSync<B, Client>>,
335+
/// Optional writer for incremental trie node import during state sync.
336+
trie_node_writer: Option<Arc<dyn sc_client_api::backend::TrieNodeWriter>>,
335337
/// Enable importing existing blocks. This is used after the state download to
336338
/// catch up to the latest state while re-importing blocks.
337339
import_existing: bool,
@@ -418,12 +420,12 @@ where
418420
peer
419421
} else {
420422
error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}");
421-
return Some((hash, number))
423+
return Some((hash, number));
422424
};
423425

424426
if let PeerSyncState::AncestorSearch { .. } = peer.state {
425427
trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
426-
return None
428+
return None;
427429
}
428430

429431
let peer_info = is_best.then(|| {
@@ -439,8 +441,8 @@ where
439441
if is_best {
440442
if known && self.best_queued_number >= number {
441443
self.update_peer_common_number(&peer_id, number);
442-
} else if announce.header.parent_hash() == &self.best_queued_hash ||
443-
known_parent && self.best_queued_number >= number
444+
} else if announce.header.parent_hash() == &self.best_queued_hash
445+
|| known_parent && self.best_queued_number >= number
444446
{
445447
self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
446448
}
@@ -453,7 +455,7 @@ where
453455
if let Some(target) = self.fork_targets.get_mut(&hash) {
454456
target.peers.insert(peer_id);
455457
}
456-
return peer_info
458+
return peer_info;
457459
}
458460

459461
if ancient_parent {
@@ -464,7 +466,7 @@ where
464466
hash,
465467
announce.header,
466468
);
467-
return peer_info
469+
return peer_info;
468470
}
469471

470472
if self.status().state == SyncState::Idle {
@@ -525,14 +527,14 @@ where
525527

526528
if self.is_known(hash) {
527529
debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
528-
return
530+
return;
529531
}
530532

531533
trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
532534
for peer_id in &peers {
533535
if let Some(peer) = self.peers.get_mut(peer_id) {
534536
if let PeerSyncState::AncestorSearch { .. } = peer.state {
535-
continue
537+
continue;
536538
}
537539

538540
if number > peer.best_number {
@@ -668,7 +670,7 @@ where
668670
}
669671
for (result, hash) in results {
670672
if has_error {
671-
break
673+
break;
672674
}
673675

674676
has_error |= result.is_err();
@@ -725,7 +727,7 @@ where
725727

726728
self.complete_gap_if_target(number);
727729
},
728-
Err(BlockImportError::IncompleteHeader(peer_id)) =>
730+
Err(BlockImportError::IncompleteHeader(peer_id)) => {
729731
if let Some(peer) = peer_id {
730732
warn!(
731733
target: LOG_TARGET,
@@ -734,7 +736,8 @@ where
734736
self.actions
735737
.push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
736738
self.restart();
737-
},
739+
}
740+
},
738741
Err(BlockImportError::VerificationFailed(peer_id, e)) => {
739742
let extra_message = peer_id
740743
.map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
@@ -751,14 +754,15 @@ where
751754

752755
self.restart();
753756
},
754-
Err(BlockImportError::BadBlock(peer_id)) =>
757+
Err(BlockImportError::BadBlock(peer_id)) => {
755758
if let Some(peer) = peer_id {
756759
warn!(
757760
target: LOG_TARGET,
758761
"💔 Block {hash:?} received from peer {peer} has been blacklisted",
759762
);
760763
self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
761-
},
764+
}
765+
},
762766
Err(BlockImportError::MissingState) => {
763767
// This may happen if the chain we were requesting upon has been discarded
764768
// in the meantime because other chain has been finalized.
@@ -944,6 +948,7 @@ where
944948
block_downloader: Arc<dyn BlockDownloader<B>>,
945949
metrics_registry: Option<&Registry>,
946950
initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
951+
trie_node_writer: Option<Arc<dyn sc_client_api::backend::TrieNodeWriter>>,
947952
) -> Result<Self, ClientError> {
948953
let mut sync = Self {
949954
client,
@@ -963,6 +968,7 @@ where
963968
state_request_protocol_name,
964969
downloaded_blocks: 0,
965970
state_sync: None,
971+
trie_node_writer,
966972
import_existing: false,
967973
block_downloader,
968974
gap_sync: None,
@@ -1094,9 +1100,9 @@ where
10941100

10951101
Ok(req)
10961102
},
1097-
Ok(BlockStatus::Queued) |
1098-
Ok(BlockStatus::InChainWithState) |
1099-
Ok(BlockStatus::InChainPruned) => {
1103+
Ok(BlockStatus::Queued)
1104+
| Ok(BlockStatus::InChainWithState)
1105+
| Ok(BlockStatus::InChainPruned) => {
11001106
debug!(
11011107
target: LOG_TARGET,
11021108
"New peer {peer_id} with known best hash {best_hash} ({best_number}).",
@@ -1276,8 +1282,8 @@ where
12761282
},
12771283
};
12781284
if matching_hash.is_some() {
1279-
if *start < self.best_queued_number &&
1280-
self.best_queued_number <= peer.best_number
1285+
if *start < self.best_queued_number
1286+
&& self.best_queued_number <= peer.best_number
12811287
{
12821288
// We've made progress on this chain since the search was started.
12831289
// Opportunistically set common number to updated number
@@ -1333,8 +1339,8 @@ where
13331339
matching_hash,
13341340
peer.common_number,
13351341
);
1336-
if peer.common_number < peer.best_number &&
1337-
peer.best_number < self.best_queued_number
1342+
if peer.common_number < peer.best_number
1343+
&& peer.best_number < self.best_queued_number
13381344
{
13391345
trace!(
13401346
target: LOG_TARGET,
@@ -1362,9 +1368,9 @@ where
13621368
return Ok(());
13631369
}
13641370
},
1365-
PeerSyncState::Available |
1366-
PeerSyncState::DownloadingJustification(..) |
1367-
PeerSyncState::DownloadingState => Vec::new(),
1371+
PeerSyncState::Available
1372+
| PeerSyncState::DownloadingJustification(..)
1373+
| PeerSyncState::DownloadingState => Vec::new(),
13681374
}
13691375
} else {
13701376
// When request.is_none() this is a block announcement. Just accept blocks.
@@ -1523,14 +1529,17 @@ where
15231529

15241530
fn required_block_attributes(&self) -> BlockAttributes {
15251531
match self.mode {
1526-
ChainSyncMode::Full =>
1527-
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1528-
ChainSyncMode::LightState { storage_chain_mode: false, .. } =>
1529-
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY,
1530-
ChainSyncMode::LightState { storage_chain_mode: true, .. } =>
1531-
BlockAttributes::HEADER |
1532-
BlockAttributes::JUSTIFICATION |
1533-
BlockAttributes::INDEXED_BODY,
1532+
ChainSyncMode::Full => {
1533+
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
1534+
},
1535+
ChainSyncMode::LightState { storage_chain_mode: false, .. } => {
1536+
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
1537+
},
1538+
ChainSyncMode::LightState { storage_chain_mode: true, .. } => {
1539+
BlockAttributes::HEADER
1540+
| BlockAttributes::JUSTIFICATION
1541+
| BlockAttributes::INDEXED_BODY
1542+
},
15341543
}
15351544
}
15361545

@@ -1651,11 +1660,11 @@ where
16511660
PeerSyncState::Available => {
16521661
self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
16531662
},
1654-
PeerSyncState::AncestorSearch { .. } |
1655-
PeerSyncState::DownloadingNew(_) |
1656-
PeerSyncState::DownloadingStale(_) |
1657-
PeerSyncState::DownloadingGap(_) |
1658-
PeerSyncState::DownloadingState => {
1663+
PeerSyncState::AncestorSearch { .. }
1664+
| PeerSyncState::DownloadingNew(_)
1665+
| PeerSyncState::DownloadingStale(_)
1666+
| PeerSyncState::DownloadingGap(_)
1667+
| PeerSyncState::DownloadingState => {
16591668
// Cancel a request first, as `add_peer` may generate a new request.
16601669
self.actions
16611670
.push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
@@ -1697,8 +1706,8 @@ where
16971706
self.best_queued_hash = info.best_hash;
16981707
self.best_queued_number = info.best_number;
16991708

1700-
if self.mode == ChainSyncMode::Full &&
1701-
self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1709+
if self.mode == ChainSyncMode::Full
1710+
&& self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
17021711
{
17031712
self.import_existing = true;
17041713
// Latest state is missing, start with the last finalized state or genesis instead.
@@ -1833,9 +1842,9 @@ where
18331842
.peers
18341843
.iter_mut()
18351844
.filter_map(move |(&id, peer)| {
1836-
if !peer.state.is_available() ||
1837-
!allowed_requests.contains(&id) ||
1838-
!disconnected_peers.is_peer_available(&id)
1845+
if !peer.state.is_available()
1846+
|| !allowed_requests.contains(&id)
1847+
|| !disconnected_peers.is_peer_available(&id)
18391848
{
18401849
return None;
18411850
}
@@ -1845,11 +1854,11 @@ where
18451854
// common number is smaller than the last finalized block number, we should do an
18461855
// ancestor search to find a better common block. If the queue is full we wait till
18471856
// all blocks are imported though.
1848-
if best_queued.saturating_sub(peer.common_number) >
1849-
MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
1850-
best_queued < peer.best_number &&
1851-
peer.common_number < last_finalized &&
1852-
queue_blocks.len() <= MAJOR_SYNC_BLOCKS as usize
1857+
if best_queued.saturating_sub(peer.common_number)
1858+
> MAX_BLOCKS_TO_LOOK_BACKWARDS.into()
1859+
&& best_queued < peer.best_number
1860+
&& peer.common_number < last_finalized
1861+
&& queue_blocks.len() <= MAJOR_SYNC_BLOCKS as usize
18531862
{
18541863
trace!(
18551864
target: LOG_TARGET,
@@ -1945,8 +1954,8 @@ where
19451954
if self.allowed_requests.is_empty() {
19461955
return None;
19471956
}
1948-
if self.state_sync.is_some() &&
1949-
self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
1957+
if self.state_sync.is_some()
1958+
&& self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
19501959
{
19511960
// Only one pending state request is allowed.
19521961
return None;
@@ -1957,9 +1966,9 @@ where
19571966
}
19581967

19591968
for (id, peer) in self.peers.iter_mut() {
1960-
if peer.state.is_available() &&
1961-
peer.common_number >= sync.target_number() &&
1962-
self.disconnected_peers.is_peer_available(&id)
1969+
if peer.state.is_available()
1970+
&& peer.common_number >= sync.target_number()
1971+
&& self.disconnected_peers.is_peer_available(&id)
19631972
{
19641973
peer.state = PeerSyncState::DownloadingState;
19651974
let request = sync.next_request();
@@ -2054,7 +2063,7 @@ where
20542063
None,
20552064
None,
20562065
skip_proofs,
2057-
None,
2066+
self.trie_node_writer.clone(),
20582067
));
20592068
self.allowed_requests.set_all();
20602069
} else {
@@ -2290,8 +2299,8 @@ fn fork_sync_request<B: BlockT>(
22902299
}
22912300
// Download the fork only if it is behind or not too far ahead our tip of the chain
22922301
// Otherwise it should be downloaded in full sync mode.
2293-
if r.number <= best_num ||
2294-
(r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2302+
if r.number <= best_num
2303+
|| (r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
22952304
{
22962305
let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
22972306
let count = if parent_status == BlockStatus::Unknown {
@@ -2382,8 +2391,8 @@ pub fn validate_blocks<Block: BlockT>(
23822391
return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
23832392
}
23842393

2385-
if request.fields.contains(BlockAttributes::HEADER) &&
2386-
blocks.iter().any(|b| b.header.is_none())
2394+
if request.fields.contains(BlockAttributes::HEADER)
2395+
&& blocks.iter().any(|b| b.header.is_none())
23872396
{
23882397
trace!(
23892398
target: LOG_TARGET,

0 commit comments

Comments
 (0)