Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
731faea
Initial trie_committer module by duplicating `Ephermal`
liuchengxu Sep 29, 2024
6f96333
Introduce `TrieCommitter`
liuchengxu Sep 29, 2024
bff20f3
Introduce `commit_trie_changes()` in `Backend` trait
liuchengxu Sep 29, 2024
6642ed2
Add basic test `test_commit_trie_changes()`
liuchengxu Sep 29, 2024
230be4a
Replace `reset_storage` with `commit_trie_changes()` in Client
liuchengxu Sep 29, 2024
3372d31
Fix IncompleteDatabase error
liuchengxu Oct 7, 2024
960c879
FMT
liuchengxu Oct 7, 2024
a8fd56c
Clean up
liuchengxu Oct 7, 2024
fc3a512
Fix test compilation
liuchengxu Oct 8, 2024
ecf9975
Fix warp sync
liuchengxu Oct 9, 2024
fb8baa1
Make test pass
liuchengxu Oct 9, 2024
6aad27d
Merge branch 'master' of https://github.com/liuchengxu/polkadot-sdk i…
liuchengxu Nov 20, 2024
3442b87
Use `emplace()` in `insert()`
liuchengxu Nov 21, 2024
db24e2a
Handle child storage
liuchengxu Nov 22, 2024
39e4f54
Nit
liuchengxu Nov 22, 2024
b5aad6c
Rename `TrieCommiter` to `StateImporter`
liuchengxu Nov 26, 2024
2733bd5
Split out tests.rs and refactor `test_state_importer()` slightly
liuchengxu Nov 26, 2024
bbcd715
Merge branch 'master' into trie-committer
liuchengxu Dec 20, 2024
1e73fed
Merge upstream/master into trie-committer
liuchengxu Dec 27, 2025
e8c8dce
Add direct trie node import for state sync
liuchengxu Dec 27, 2025
42f46a8
Add incremental trie node writing during state sync
liuchengxu Dec 27, 2025
18ab208
Thread trie_node_writer through BuildNetworkParams
liuchengxu Dec 27, 2025
dfa7449
Remove legacy state_importer.rs
liuchengxu Dec 27, 2025
e20e792
Remove set_commit_state from BlockImportOperation trait
liuchengxu Dec 27, 2025
c1bd5e8
Refactor trie node handling to use enum for type safety
liuchengxu Dec 27, 2025
3b8d057
Add license header to tests.rs and prdoc for PR 5956
liuchengxu Dec 27, 2025
7badeb4
Remove unnecessary TrieNodeWriter re-export
liuchengxu Dec 27, 2025
ee48668
Add trie_node_writer field to all BuildNetworkParams usages
liuchengxu Dec 27, 2025
dca5792
Fix missing trie_node_writer arg in state.rs tests
liuchengxu Dec 27, 2025
be21b8d
Add StateSource enum and thread trie_node_writer through ChainSync
liuchengxu Dec 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions cumulus/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ where
warp_sync_config,
block_relay: None,
metrics,
trie_node_writer: None,
})
}

Expand Down Expand Up @@ -432,7 +433,7 @@ where
finalized_header.number(),
finalized_header.hash()
);
return Ok(finalized_header)
return Ok(finalized_header);
}
}

Expand All @@ -452,7 +453,7 @@ async fn parachain_informant<Block: BlockT, Client>(
Ok(import_notifications) => import_notifications,
Err(e) => {
log::error!("Failed to get import notification stream: {e:?}. Parachain informant will not run!");
return
return;
},
};
let mut last_backed_block_time: Option<Instant> = None;
Expand All @@ -461,7 +462,7 @@ async fn parachain_informant<Block: BlockT, Client>(
Ok(candidate_events) => candidate_events,
Err(e) => {
log::warn!("Failed to get candidate events for block {}: {e:?}", n.hash());
continue
continue;
},
};
let mut backed_candidates = Vec::new();
Expand All @@ -479,7 +480,7 @@ async fn parachain_informant<Block: BlockT, Client>(
log::warn!(
"Failed to decode parachain header from backed block: {e:?}"
);
continue
continue;
},
};
let backed_block_time = Instant::now();
Expand All @@ -502,7 +503,7 @@ async fn parachain_informant<Block: BlockT, Client>(
log::warn!(
"Failed to decode parachain header from included block: {e:?}"
);
continue
continue;
},
};
let unincluded_segment_size =
Expand All @@ -523,7 +524,7 @@ async fn parachain_informant<Block: BlockT, Client>(
log::warn!(
"Failed to decode parachain header from timed out block: {e:?}"
);
continue
continue;
},
};
timed_out_candidates.push(timed_out_block);
Expand Down
1 change: 1 addition & 0 deletions cumulus/polkadot-omni-node/lib/src/nodes/aura.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ where
warp_sync_config: None,
block_relay: None,
metrics: NotificationMetrics::new(None),
trie_node_writer: None,
})?;

if config.offchain_worker.enabled {
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/service/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ where
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
block_relay: None,
metrics,
trie_node_writer: None,
})?;

if config.offchain_worker.enabled {
Expand Down
25 changes: 25 additions & 0 deletions prdoc/pr_5956.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
title: Add direct trie node import for state sync

doc:
- audience: Node Dev
description: |-
Adds incremental trie node writing during state sync to avoid accumulating all
trie nodes in memory before importing them. This significantly reduces memory
usage during warp sync by writing trie nodes directly to the database as each
chunk is received, rather than buffering them all until the sync completes.

Key changes:
- Add `TrieNodeWriter` trait for direct trie node database writes
- Implement incremental trie node writing in `StateSync`
- Thread `trie_node_writer` through `BuildNetworkParams`
- Refactor trie node handling to use `TrieNodeImportMode` enum for type safety

crates:
- name: sc-client-api
bump: major
- name: sc-client-db
bump: major
- name: sc-network-sync
bump: major
- name: sc-service
bump: major
1 change: 1 addition & 0 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
block_relay: None,
metrics,
trie_node_writer: None,
})?;

if let Some(mixnet_config) = mixnet_config {
Expand Down
26 changes: 26 additions & 0 deletions substrate/client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ pub use sp_state_machine::{Backend as StateBackend, BackendTransaction, KeyValue
/// Extracts the state backend type for the given backend.
pub type StateBackendFor<B, Block> = <B as Backend<Block>>::State;

/// Trait for writing trie nodes directly to storage during state sync.
///
/// This allows trie nodes to be written incrementally as each chunk is received,
/// rather than accumulating them all in memory before writing. Implementing this
/// trait enables O(chunk_size) memory usage instead of O(total_state_size).
pub trait TrieNodeWriter: Send + Sync {
/// Write a batch of trie nodes to storage.
///
/// Each entry is a (prefixed_key, value) pair that should be written
/// directly to the STATE column.
fn write_trie_nodes(&self, nodes: Vec<(Vec<u8>, Vec<u8>)>) -> Result<(), String>;
}

/// Describes which block import notification stream should be notified.
#[derive(Debug, Clone, Copy)]
pub enum ImportNotificationAction {
Expand Down Expand Up @@ -633,6 +646,19 @@ pub trait Backend<Block: BlockT>: AuxStore + Send + Sync {
trie_cache_context: TrieCacheContext,
) -> sp_blockchain::Result<Self::State>;

/// Import state directly from trie nodes.
/// This is used during state sync to write trie nodes directly to the STATE column,
/// avoiding the need to recompute them via delta_trie_root.
fn import_state_from_trie_nodes(
&self,
trie_nodes: Vec<(Vec<u8>, Vec<u8>)>,
expected_state_root: Block::Hash,
) -> sp_blockchain::Result<()>;

/// Verify that the state root node exists in the database.
/// Used after incremental state sync to confirm state is complete.
fn verify_state_root_exists(&self, root: Block::Hash) -> sp_blockchain::Result<()>;

/// Attempts to revert the chain by `n` blocks. If `revert_finalized` is set it will attempt to
/// revert past any finalized block, this is unsafe and can potentially leave the node in an
/// inconsistent state. All blocks higher than the best block are also reverted and not counting
Expand Down
25 changes: 19 additions & 6 deletions substrate/client/api/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl<Block: BlockT> Blockchain<Block> {
pub fn equals_to(&self, other: &Self) -> bool {
// Check ptr equality first to avoid double read locks.
if ptr::eq(self, other) {
return true
return true;
}
self.canon_equals_to(other) && self.storage.read().blocks == other.storage.read().blocks
}
Expand All @@ -201,7 +201,7 @@ impl<Block: BlockT> Blockchain<Block> {
pub fn canon_equals_to(&self, other: &Self) -> bool {
// Check ptr equality first to avoid double read locks.
if ptr::eq(self, other) {
return true
return true;
}
let this = self.storage.read();
let other = other.storage.read();
Expand Down Expand Up @@ -307,7 +307,7 @@ impl<Block: BlockT> Blockchain<Block> {
if !stored_justifications.append(justification) {
return Err(sp_blockchain::Error::BadJustification(
"Duplicate consensus engine ID".into(),
))
));
}
} else {
*block_justifications = Some(Justifications::from(justification));
Expand Down Expand Up @@ -740,7 +740,7 @@ impl<Block: BlockT> backend::Backend<Block> for Backend<Block> {
_trie_cache_context: TrieCacheContext,
) -> sp_blockchain::Result<Self::State> {
if hash == Default::default() {
return Ok(Self::State::default())
return Ok(Self::State::default());
}

self.states
Expand All @@ -750,6 +750,19 @@ impl<Block: BlockT> backend::Backend<Block> for Backend<Block> {
.ok_or_else(|| sp_blockchain::Error::UnknownBlock(format!("{}", hash)))
}

fn import_state_from_trie_nodes(
&self,
_trie_nodes: Vec<(Vec<u8>, Vec<u8>)>,
_expected_state_root: Block::Hash,
) -> sp_blockchain::Result<()> {
unimplemented!("Not needed for in-mem backend")
}

fn verify_state_root_exists(&self, _root: Block::Hash) -> sp_blockchain::Result<()> {
// In-memory backend doesn't support incremental state sync
unimplemented!("Not needed for in-mem backend")
}

fn revert(
&self,
_n: NumberFor<Block>,
Expand Down Expand Up @@ -787,15 +800,15 @@ impl<Block: BlockT> backend::LocalBackend<Block> for Backend<Block> {}
/// Check that genesis storage is valid.
pub fn check_genesis_storage(storage: &Storage) -> sp_blockchain::Result<()> {
if storage.top.iter().any(|(k, _)| well_known_keys::is_child_storage_key(k)) {
return Err(sp_blockchain::Error::InvalidState)
return Err(sp_blockchain::Error::InvalidState);
}

if storage
.children_default
.keys()
.any(|child_key| !well_known_keys::is_child_storage_key(child_key))
{
return Err(sp_blockchain::Error::InvalidState)
return Err(sp_blockchain::Error::InvalidState);
}

Ok(())
Expand Down
12 changes: 12 additions & 0 deletions substrate/client/api/src/proof_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,16 @@ pub trait ProofProvider<Block: BlockT> {
proof: CompactProof,
start_keys: &[Vec<u8>],
) -> sp_blockchain::Result<(KeyValueStates, usize)>;

/// Verify read storage proof and return both key-value pairs and trie nodes.
/// This is an extended version of `verify_range_proof` that also returns the
/// decoded trie nodes from the compact proof as raw (prefixed_key, value) pairs.
/// The trie nodes can be written directly to the database during state sync,
/// avoiding the need to recompute them.
fn verify_range_proof_with_trie_nodes(
&self,
root: Block::Hash,
proof: CompactProof,
start_keys: &[Vec<u8>],
) -> sp_blockchain::Result<(KeyValueStates, usize, Vec<(Vec<u8>, Vec<u8>)>)>;
}
53 changes: 48 additions & 5 deletions substrate/client/consensus/common/src/block_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,61 @@ pub enum StorageChanges<Block: BlockT> {
Import(ImportedState<Block>),
}

/// Imported state data. A vector of key-value pairs that should form a trie.
#[derive(PartialEq, Eq, Clone)]
/// State of trie nodes for import.
#[derive(Clone, Debug)]
pub enum TrieNodeStates {
/// Trie nodes pending import - need to be written to database.
Pending(Vec<(Vec<u8>, Vec<u8>)>),
/// Trie nodes were already imported to database during state sync.
AlreadyImported {
/// Number of trie nodes written during state sync.
nodes_count: u64,
},
}

/// Source of state data for import.
#[derive(Clone)]
pub enum StateSource {
/// Import state from key-value pairs (legacy path).
/// Trie nodes will be computed via delta_trie_root.
KeyValues(sp_state_machine::KeyValueStates),
/// Import state from trie nodes directly.
TrieNodes(TrieNodeStates),
}

impl std::fmt::Debug for StateSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::KeyValues(state) =>
f.debug_struct("KeyValues").field("count", &state.0.len()).finish(),
Self::TrieNodes(state) => f.debug_tuple("TrieNodes").field(state).finish(),
}
}
}

/// Imported state data.
#[derive(Clone)]
pub struct ImportedState<B: BlockT> {
/// Target block hash.
pub block: B::Hash,
/// State keys and values.
pub state: sp_state_machine::KeyValueStates,
/// Source of state data.
pub source: StateSource,
}

impl<B: BlockT> PartialEq for ImportedState<B> {
fn eq(&self, other: &Self) -> bool {
self.block == other.block
}
}

impl<B: BlockT> Eq for ImportedState<B> {}

impl<B: BlockT> std::fmt::Debug for ImportedState<B> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("ImportedState").field("block", &self.block).finish()
fmt.debug_struct("ImportedState")
.field("block", &self.block)
.field("source", &self.source)
.finish()
}
}

Expand Down
4 changes: 2 additions & 2 deletions substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
} else {
debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash);
}
return Err(BlockImportError::IncompleteHeader(peer))
return Err(BlockImportError::IncompleteHeader(peer));
};

trace!(target: LOG_TARGET, "Header {} has {:?} logs", block.hash, header.digest().logs().len());
Expand Down Expand Up @@ -348,7 +348,7 @@ pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
BlockImportStatus::ImportedUnknown { .. } => (),
r => {
// Any other successful result means that the block is already imported.
return Ok(SingleBlockVerificationOutcome::Imported(r))
return Ok(SingleBlockVerificationOutcome::Imported(r));
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl<B: BlockT> BasicQueueHandle<B> {
impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
if blocks.is_empty() {
return
return;
}

trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len());
Expand Down Expand Up @@ -195,7 +195,7 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
loop {
if let Err(_) = self.result_port.next_action(link).await {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
return
return;
}
}
}
Expand Down Expand Up @@ -237,7 +237,7 @@ async fn block_import_process<B: BlockT>(
target: LOG_TARGET,
"Stopping block import because the import channel was closed!",
);
return
return;
},
};

Expand Down Expand Up @@ -304,7 +304,7 @@ impl<B: BlockT> BlockImportWorker<B> {
target: LOG_TARGET,
"Stopping block import because result channel was closed!",
);
return
return;
}

// Make sure to first process all justifications
Expand All @@ -317,13 +317,13 @@ impl<B: BlockT> BlockImportWorker<B> {
target: LOG_TARGET,
"Stopping block import because justification channel was closed!",
);
return
return;
},
}
}

if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
return
return;
}

// All futures that we polled are now pending.
Expand Down Expand Up @@ -423,7 +423,7 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
Some(b) => b,
None => {
// No block left to import, success!
return ImportManyBlocksResult { block_count: count, imported, results }
return ImportManyBlocksResult { block_count: count, imported, results };
},
};

Expand Down
Loading
Loading