Skip to content

Commit

Permalink
Pure state sync refactoring (part-2) (#6521)
Browse files Browse the repository at this point in the history
This PR is the second part of the pure state sync refactoring,
encapsulating `StateSyncMetadata` as a separate entity. Now it's pretty
straightforward what changes are needed for the persistent state sync as
observed in the struct `StateSync`:

- `state`: redirect directly to the DB layer instead of being
accumulated in the memory.
- `metadata`: handle the state sync metadata on disk whenever the state
is forwarded to the DB, resume an ongoing state sync on a restart, etc.

---------

Co-authored-by: Bastian Köcher <[email protected]>
Co-authored-by: Alexandru Vasile <[email protected]>
  • Loading branch information
3 people authored Nov 19, 2024
1 parent ce20d0a commit 07a5933
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 56 deletions.
10 changes: 10 additions & 0 deletions prdoc/pr_6521.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: Pure state sync refactoring (part-2)

doc:
- audience: Node Dev
description: |
This is the last part of the pure refactoring of state sync, focusing on encapsulating `StateSyncMetadata` as a separate entity.

crates:
- name: sc-network-sync
bump: none
136 changes: 80 additions & 56 deletions substrate/client/network/sync/src/strategy/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,62 @@ pub enum ImportResult<B: BlockT> {
BadResponse,
}

/// State sync state machine. Accumulates partial state data until it
/// is ready to be imported.
pub struct StateSync<B: BlockT, Client> {
target_block: B::Hash,
struct StateSyncMetadata<B: BlockT> {
last_key: SmallVec<[Vec<u8>; 2]>,
target_header: B::Header,
target_root: B::Hash,
target_body: Option<Vec<B::Extrinsic>>,
target_justifications: Option<Justifications>,
last_key: SmallVec<[Vec<u8>; 2]>,
state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
complete: bool,
client: Arc<Client>,
imported_bytes: u64,
skip_proof: bool,
}

impl<B: BlockT> StateSyncMetadata<B> {
fn target_hash(&self) -> B::Hash {
self.target_header.hash()
}

/// Returns target block number.
fn target_number(&self) -> NumberFor<B> {
*self.target_header.number()
}

fn target_root(&self) -> B::Hash {
*self.target_header.state_root()
}

fn next_request(&self) -> StateRequest {
StateRequest {
block: self.target_hash().encode(),
start: self.last_key.clone().into_vec(),
no_proof: self.skip_proof,
}
}

fn progress(&self) -> StateSyncProgress {
let cursor = *self.last_key.get(0).and_then(|last| last.get(0)).unwrap_or(&0u8);
let percent_done = cursor as u32 * 100 / 256;
StateSyncProgress {
percentage: percent_done,
size: self.imported_bytes,
phase: if self.complete {
StateSyncPhase::ImportingState
} else {
StateSyncPhase::DownloadingState
},
}
}
}

/// State sync state machine.
///
/// Accumulates partial state data until it is ready to be imported.
pub struct StateSync<B: BlockT, Client> {
metadata: StateSyncMetadata<B>,
state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
client: Arc<Client>,
}

impl<B, Client> StateSync<B, Client>
where
B: BlockT,
Expand All @@ -120,16 +160,16 @@ where
) -> Self {
Self {
client,
target_block: target_header.hash(),
target_root: *target_header.state_root(),
target_header,
target_body,
target_justifications,
last_key: SmallVec::default(),
metadata: StateSyncMetadata {
last_key: SmallVec::default(),
target_header,
target_body,
target_justifications,
complete: false,
imported_bytes: 0,
skip_proof,
},
state: HashMap::default(),
complete: false,
imported_bytes: 0,
skip_proof,
}
}

Expand All @@ -155,7 +195,7 @@ where
if is_top && well_known_keys::is_child_storage_key(key.as_slice()) {
child_storage_roots.push((value, key));
} else {
self.imported_bytes += key.len() as u64;
self.metadata.imported_bytes += key.len() as u64;
entry.0.push((key, value));
}
}
Expand All @@ -177,11 +217,11 @@ where
// the parent cursor stays valid.
// Empty parent trie content only happens when all the response content
// is part of a single child trie.
if self.last_key.len() == 2 && response.entries[0].entries.is_empty() {
if self.metadata.last_key.len() == 2 && response.entries[0].entries.is_empty() {
// Do not remove the parent trie position.
self.last_key.pop();
self.metadata.last_key.pop();
} else {
self.last_key.clear();
self.metadata.last_key.clear();
}
for state in response.entries {
debug!(
Expand All @@ -193,7 +233,7 @@ where

if !state.complete {
if let Some(e) = state.entries.last() {
self.last_key.push(e.key.clone());
self.metadata.last_key.push(e.key.clone());
}
complete = false;
}
Expand All @@ -219,11 +259,11 @@ where
debug!(target: LOG_TARGET, "Bad state response");
return ImportResult::BadResponse
}
if !self.skip_proof && response.proof.is_empty() {
if !self.metadata.skip_proof && response.proof.is_empty() {
debug!(target: LOG_TARGET, "Missing proof");
return ImportResult::BadResponse
}
let complete = if !self.skip_proof {
let complete = if !self.metadata.skip_proof {
debug!(target: LOG_TARGET, "Importing state from {} trie nodes", response.proof.len());
let proof_size = response.proof.len() as u64;
let proof = match CompactProof::decode(&mut response.proof.as_ref()) {
Expand All @@ -234,9 +274,9 @@ where
},
};
let (values, completed) = match self.client.verify_range_proof(
self.target_root,
self.metadata.target_root(),
proof,
self.last_key.as_slice(),
self.metadata.last_key.as_slice(),
) {
Err(e) => {
debug!(
Expand All @@ -251,27 +291,25 @@ where
debug!(target: LOG_TARGET, "Imported with {} keys", values.len());

let complete = completed == 0;
if !complete && !values.update_last_key(completed, &mut self.last_key) {
if !complete && !values.update_last_key(completed, &mut self.metadata.last_key) {
debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed);
};

self.process_state_verified(values);
self.imported_bytes += proof_size;
self.metadata.imported_bytes += proof_size;
complete
} else {
self.process_state_unverified(response)
};
if complete {
self.complete = true;
self.metadata.complete = true;
let target_hash = self.metadata.target_hash();
ImportResult::Import(
self.target_block,
self.target_header.clone(),
ImportedState {
block: self.target_block,
state: std::mem::take(&mut self.state).into(),
},
self.target_body.clone(),
self.target_justifications.clone(),
target_hash,
self.metadata.target_header.clone(),
ImportedState { block: target_hash, state: std::mem::take(&mut self.state).into() },
self.metadata.target_body.clone(),
self.metadata.target_justifications.clone(),
)
} else {
ImportResult::Continue
Expand All @@ -280,40 +318,26 @@ where

/// Produce next state request.
fn next_request(&self) -> StateRequest {
StateRequest {
block: self.target_block.encode(),
start: self.last_key.clone().into_vec(),
no_proof: self.skip_proof,
}
self.metadata.next_request()
}

/// Check if the state is complete.
fn is_complete(&self) -> bool {
self.complete
self.metadata.complete
}

/// Returns target block number.
fn target_number(&self) -> NumberFor<B> {
*self.target_header.number()
self.metadata.target_number()
}

/// Returns target block hash.
fn target_hash(&self) -> B::Hash {
self.target_block
self.metadata.target_hash()
}

/// Returns state sync estimated progress.
fn progress(&self) -> StateSyncProgress {
let cursor = *self.last_key.get(0).and_then(|last| last.get(0)).unwrap_or(&0u8);
let percent_done = cursor as u32 * 100 / 256;
StateSyncProgress {
percentage: percent_done,
size: self.imported_bytes,
phase: if self.complete {
StateSyncPhase::ImportingState
} else {
StateSyncPhase::DownloadingState
},
}
self.metadata.progress()
}
}

0 comments on commit 07a5933

Please sign in to comment.