Skip to content

Commit 6c81dfb

Browse files
committed
Add StateSource enum and thread trie_node_writer through ChainSync
Introduce StateSource enum to properly signal state import mode: - TrieNodes::AlreadyImported: trie nodes written incrementally, skip import - TrieNodes::Pending: trie nodes accumulated in memory, write to DB - KeyValues: legacy key-value import path This eliminates 30GiB+ memory usage during state sync by: 1. Using StateSyncMode enum with Incremental and Accumulated variants 2. Skipping key-value accumulation in Incremental mode 3. Signaling to client that trie nodes are already imported Thread trie_node_writer through ChainSync: - Add trie_node_writer field to ChainSync struct - Add parameter to ChainSync::new() constructor - Pass trie_node_writer to StateSync in chain sync state initiation - Update PolkadotSyncingStrategy to thread parameter through - Update all test cases with None parameter
1 parent dca5792 commit 6c81dfb

File tree

14 files changed

+247
-161
lines changed

14 files changed

+247
-161
lines changed

substrate/client/api/src/backend.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,10 @@ pub trait Backend<Block: BlockT>: AuxStore + Send + Sync {
655655
expected_state_root: Block::Hash,
656656
) -> sp_blockchain::Result<()>;
657657

658+
/// Verify that the state root node exists in the database.
659+
/// Used after incremental state sync to confirm state is complete.
660+
fn verify_state_root_exists(&self, root: Block::Hash) -> sp_blockchain::Result<()>;
661+
658662
/// Attempts to revert the chain by `n` blocks. If `revert_finalized` is set it will attempt to
659663
/// revert past any finalized block, this is unsafe and can potentially leave the node in an
660664
/// inconsistent state. All blocks higher than the best block are also reverted and not counting

substrate/client/api/src/in_mem.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,11 @@ impl<Block: BlockT> backend::Backend<Block> for Backend<Block> {
758758
unimplemented!("Not needed for in-mem backend")
759759
}
760760

761+
fn verify_state_root_exists(&self, _root: Block::Hash) -> sp_blockchain::Result<()> {
762+
// In-memory backend doesn't support incremental state sync
763+
unimplemented!("Not needed for in-mem backend")
764+
}
765+
761766
fn revert(
762767
&self,
763768
_n: NumberFor<Block>,

substrate/client/consensus/common/src/block_import.rs

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,24 +126,50 @@ pub enum StorageChanges<Block: BlockT> {
126126
Import(ImportedState<Block>),
127127
}
128128

129-
/// Imported state data. A vector of key-value pairs that should form a trie.
129+
/// State of trie nodes for import.
130+
#[derive(Clone, Debug)]
131+
pub enum TrieNodeStates {
132+
/// Trie nodes pending import - need to be written to database.
133+
Pending(Vec<(Vec<u8>, Vec<u8>)>),
134+
/// Trie nodes were already imported to database during state sync.
135+
AlreadyImported {
136+
/// Number of trie nodes written during state sync.
137+
nodes_count: u64,
138+
},
139+
}
140+
141+
/// Source of state data for import.
142+
#[derive(Clone)]
143+
pub enum StateSource {
144+
/// Import state from key-value pairs (legacy path).
145+
/// Trie nodes will be computed via delta_trie_root.
146+
KeyValues(sp_state_machine::KeyValueStates),
147+
/// Import state from trie nodes directly.
148+
TrieNodes(TrieNodeStates),
149+
}
150+
151+
impl std::fmt::Debug for StateSource {
152+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153+
match self {
154+
Self::KeyValues(state) =>
155+
f.debug_struct("KeyValues").field("count", &state.0.len()).finish(),
156+
Self::TrieNodes(state) => f.debug_tuple("TrieNodes").field(state).finish(),
157+
}
158+
}
159+
}
160+
161+
/// Imported state data.
130162
#[derive(Clone)]
131163
pub struct ImportedState<B: BlockT> {
132164
/// Target block hash.
133165
pub block: B::Hash,
134-
/// State keys and values.
135-
pub state: sp_state_machine::KeyValueStates,
136-
/// Optional trie nodes from compact proofs.
137-
/// When present, these are written directly to the STATE column,
138-
/// avoiding the need to recompute trie nodes via delta_trie_root.
139-
/// Contains raw (prefixed_key, value) pairs extracted from MemoryDBs.
140-
pub trie_nodes: Option<Vec<(Vec<u8>, Vec<u8>)>>,
166+
/// Source of state data.
167+
pub source: StateSource,
141168
}
142169

143170
impl<B: BlockT> PartialEq for ImportedState<B> {
144171
fn eq(&self, other: &Self) -> bool {
145-
// Compare block and state only; trie_nodes are derived from state
146-
self.block == other.block && self.state == other.state
172+
self.block == other.block
147173
}
148174
}
149175

@@ -153,7 +179,7 @@ impl<B: BlockT> std::fmt::Debug for ImportedState<B> {
153179
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
154180
fmt.debug_struct("ImportedState")
155181
.field("block", &self.block)
156-
.field("has_trie_nodes", &self.trie_nodes.is_some())
182+
.field("source", &self.source)
157183
.finish()
158184
}
159185
}

substrate/client/consensus/common/src/import_queue.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
320320
} else {
321321
debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash);
322322
}
323-
return Err(BlockImportError::IncompleteHeader(peer))
323+
return Err(BlockImportError::IncompleteHeader(peer));
324324
};
325325

326326
trace!(target: LOG_TARGET, "Header {} has {:?} logs", block.hash, header.digest().logs().len());
@@ -348,7 +348,7 @@ pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
348348
BlockImportStatus::ImportedUnknown { .. } => (),
349349
r => {
350350
// Any other successful result means that the block is already imported.
351-
return Ok(SingleBlockVerificationOutcome::Imported(r))
351+
return Ok(SingleBlockVerificationOutcome::Imported(r));
352352
},
353353
}
354354

substrate/client/consensus/common/src/import_queue/basic_queue.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl<B: BlockT> BasicQueueHandle<B> {
127127
impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
128128
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
129129
if blocks.is_empty() {
130-
return
130+
return;
131131
}
132132

133133
trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len());
@@ -195,7 +195,7 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
195195
loop {
196196
if let Err(_) = self.result_port.next_action(link).await {
197197
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
198-
return
198+
return;
199199
}
200200
}
201201
}
@@ -237,7 +237,7 @@ async fn block_import_process<B: BlockT>(
237237
target: LOG_TARGET,
238238
"Stopping block import because the import channel was closed!",
239239
);
240-
return
240+
return;
241241
},
242242
};
243243

@@ -304,7 +304,7 @@ impl<B: BlockT> BlockImportWorker<B> {
304304
target: LOG_TARGET,
305305
"Stopping block import because result channel was closed!",
306306
);
307-
return
307+
return;
308308
}
309309

310310
// Make sure to first process all justifications
@@ -317,13 +317,13 @@ impl<B: BlockT> BlockImportWorker<B> {
317317
target: LOG_TARGET,
318318
"Stopping block import because justification channel was closed!",
319319
);
320-
return
320+
return;
321321
},
322322
}
323323
}
324324

325325
if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
326-
return
326+
return;
327327
}
328328

329329
// All futures that we polled are now pending.
@@ -423,7 +423,7 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
423423
Some(b) => b,
424424
None => {
425425
// No block left to import, success!
426-
return ImportManyBlocksResult { block_count: count, imported, results }
426+
return ImportManyBlocksResult { block_count: count, imported, results };
427427
},
428428
};
429429

substrate/client/consensus/common/src/import_queue/buffered_link.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
160160
pub async fn next_action(&mut self, link: &dyn Link<B>) -> Result<(), ()> {
161161
if let Some(msg) = self.rx.next().await {
162162
self.send_actions(msg, link);
163-
return Ok(())
163+
return Ok(());
164164
}
165165
Err(())
166166
}

substrate/client/consensus/common/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub mod metrics;
2525
pub use block_import::{
2626
BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
2727
ImportedAux, ImportedState, JustificationImport, JustificationSyncLink, StateAction,
28-
StorageChanges,
28+
StateSource, StorageChanges, TrieNodeStates,
2929
};
3030
pub use import_queue::{
3131
import_single_block, BasicQueue, BlockImportError, BlockImportStatus, BoxBlockImport,

substrate/client/consensus/common/src/longest_chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ where
9999
"Requested a finality target using max number {} below the base number {}",
100100
max_number, base_number
101101
);
102-
return Err(Application(msg.into()))
102+
return Err(Application(msg.into()));
103103
}
104104

105105
while current_head.number() > &max_number {
@@ -116,7 +116,7 @@ where
116116
"Requested a finality target using a base {:?} not in the best chain {:?}",
117117
base_hash, best_hash,
118118
);
119-
return Err(Application(msg.into()))
119+
return Err(Application(msg.into()));
120120
}
121121
let current_hash = *current_head.parent_hash();
122122
current_head = blockchain

substrate/client/db/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2376,6 +2376,19 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
23762376
Ok(())
23772377
}
23782378

2379+
fn verify_state_root_exists(&self, root: Block::Hash) -> ClientResult<()> {
2380+
if self.storage.db.get(columns::STATE, root.as_ref()).is_none() {
2381+
return Err(sp_blockchain::Error::Backend(format!(
2382+
"State root {root:?} not found in database after incremental state sync"
2383+
)));
2384+
}
2385+
log::info!(
2386+
target: "state",
2387+
"Verified state root {root:?} exists in database"
2388+
);
2389+
Ok(())
2390+
}
2391+
23792392
fn revert(
23802393
&self,
23812394
n: NumberFor<Block>,

0 commit comments

Comments
 (0)