Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State cache memory size WIP #6532

Draft
wants to merge 8 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ libsecp256k1 = "0.7"
log = "0.4"
lru = "0.12"
maplit = "1"
milhouse = "0.3"
# milhouse = "0.3"
milhouse = { git = "https://github.com/sigp/milhouse", branch = "mem-usage" }
num_cpus = "1"
parking_lot = "0.12"
paste = "1"
Expand Down Expand Up @@ -249,3 +250,4 @@ incremental = false

[patch.crates-io]
quick-protobuf = { git = "https://github.com/sigp/quick-protobuf.git", rev = "681f413312404ab6e51f0b46f39b0075c6f4ebfd" }
metastruct = { git = "https://github.com/sigp/metastruct", branch = "lifetime-tweaks" }
16 changes: 10 additions & 6 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1558,9 +1558,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
validator_index: usize,
) -> Result<Option<PublicKeyBytes>, Error> {
let pubkey_cache = self.validator_pubkey_cache.read();

Ok(pubkey_cache.get_pubkey_bytes(validator_index).copied())
let head = self.canonical_head.cached_head();
Ok(head
.snapshot
.beacon_state
.get_validator(validator_index)
.ok()
.map(|v| v.pubkey))
}

/// As per `Self::validator_pubkey_bytes` but will resolve multiple indices at once to avoid
Expand All @@ -1572,12 +1576,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
validator_indices: &[usize],
) -> Result<HashMap<usize, PublicKeyBytes>, Error> {
let pubkey_cache = self.validator_pubkey_cache.read();
let head = self.canonical_head.cached_head();

let mut map = HashMap::with_capacity(validator_indices.len());
for &validator_index in validator_indices {
if let Some(pubkey) = pubkey_cache.get_pubkey_bytes(validator_index) {
map.insert(validator_index, *pubkey);
if let Ok(validator) = head.snapshot.beacon_state.get_validator(validator_index) {
map.insert(validator_index, validator.pubkey);
}
}
Ok(map)
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/validator_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ impl<E: EthSpec> ValidatorMonitor<E> {
state: &BeaconState<E>,
spec: &ChainSpec,
) {
// If the validator monitor is disabled, don't waste memory filling up `self.indices`.
if self.validators.is_empty() && !self.auto_register {
return;
}

// Add any new validator indices.
state
.validators()
Expand Down
12 changes: 0 additions & 12 deletions beacon_node/beacon_chain/src/validator_pubkey_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use types::{BeaconState, FixedBytesExtended, Hash256, PublicKey, PublicKeyBytes}
pub struct ValidatorPubkeyCache<T: BeaconChainTypes> {
pubkeys: Vec<PublicKey>,
indices: HashMap<PublicKeyBytes, usize>,
pubkey_bytes: Vec<PublicKeyBytes>,
_phantom: PhantomData<T>,
}

Expand All @@ -35,7 +34,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
let mut cache = Self {
pubkeys: vec![],
indices: HashMap::new(),
pubkey_bytes: vec![],
_phantom: PhantomData,
};

Expand All @@ -49,7 +47,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pub fn load_from_store(store: BeaconStore<T>) -> Result<Self, BeaconChainError> {
let mut pubkeys = vec![];
let mut indices = HashMap::new();
let mut pubkey_bytes = vec![];

for validator_index in 0.. {
if let Some(db_pubkey) =
Expand All @@ -58,7 +55,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
let (pk, pk_bytes) = DatabasePubkey::as_pubkey(&db_pubkey)?;
pubkeys.push(pk);
indices.insert(pk_bytes, validator_index);
pubkey_bytes.push(pk_bytes);
} else {
break;
}
Expand All @@ -67,7 +63,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
Ok(ValidatorPubkeyCache {
pubkeys,
indices,
pubkey_bytes,
_phantom: PhantomData,
})
}
Expand Down Expand Up @@ -101,7 +96,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
where
I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator,
{
self.pubkey_bytes.reserve(validator_keys.len());
self.pubkeys.reserve(validator_keys.len());
self.indices.reserve(validator_keys.len());

Expand All @@ -127,7 +121,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
));

self.pubkeys.push(pubkey);
self.pubkey_bytes.push(pubkey_bytes);
self.indices.insert(pubkey_bytes, i);
}

Expand All @@ -144,11 +137,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
self.get_index(pubkey).and_then(|index| self.get(index))
}

/// Get the public key (in bytes form) for a validator with index `i`.
pub fn get_pubkey_bytes(&self, i: usize) -> Option<&PublicKeyBytes> {
self.pubkey_bytes.get(i)
}

/// Get the index of a validator with `pubkey`.
pub fn get_index(&self, pubkey: &PublicKeyBytes) -> Option<usize> {
self.indices.get(pubkey).copied()
Expand Down
55 changes: 48 additions & 7 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,21 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
info!(db.log, "Foreground compaction complete");
}

// Load the split state on startup.
let split = db.get_split_info();
let state =
db.get_hot_state(&split.state_root)?
.ok_or(HotColdDBError::MissingSplitState(
split.state_root,
split.slot,
))?;
db.state_cache.lock().update_finalized_state(
split.state_root,
split.block_root,
state,
&db.log,
)?;

Ok(db)
}

Expand Down Expand Up @@ -418,7 +433,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<(), Error> {
self.state_cache
.lock()
.update_finalized_state(state_root, block_root, state)
.update_finalized_state(state_root, block_root, state, &self.log)
}

pub fn state_cache_len(&self) -> usize {
Expand Down Expand Up @@ -1471,20 +1486,46 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
epoch_boundary_state_root,
}) = self.load_hot_state_summary(state_root)?
{
let mut boundary_state =
let max_replay = 72;
let split = self.get_split_info();
let mut boundary_state = if slot < split.slot + max_replay && slot != split.slot {
info!(
self.log,
"Replaying blocks atop split state";
"state_slot" => slot,
"split_slot" => split.slot,
);
self.load_hot_state(&split.state_root)?
.ok_or(HotColdDBError::MissingSplitState(
split.state_root,
split.slot,
))?
.0
} else {
get_full_state(&self.hot_db, &epoch_boundary_state_root, &self.spec)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
)?;
)?
};

// Immediately rebase the state from disk on the finalized state so that we can reuse
// parts of the tree for state root calculation in `replay_blocks`.
self.state_cache
let rebased_successfully = self
.state_cache
.lock()
.rebase_on_finalized(&mut boundary_state, &self.spec)?;
if !rebased_successfully && epoch_boundary_state_root != split.state_root {
warn!(
self.log,
"Memory fragmentation caused by uinitialized finalized state";
"boundary_state_slot" => boundary_state.slot(),
"state_slot" => slot,
"split_slot" => split.slot
);
}

// Optimization to avoid even *thinking* about replaying blocks if we're already
// on an epoch boundary.
let mut state = if slot % E::slots_per_epoch() == 0 {
// Optimization to avoid even *thinking* about replaying blocks if we've already loaded
// the correct state.
let mut state = if slot == boundary_state.slot() {
boundary_state
} else {
// Cache ALL intermediate states that are reached during block replay. We may want
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/store/src/partial_beacon_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{get_key_for_col, DBColumn, Error, KeyValueStore, KeyValueStoreOp};
use ssz::{Decode, DecodeError, Encode};
use ssz_derive::{Decode, Encode};
use std::sync::Arc;
use types::beacon_state::{Balances, Validators};
use types::historical_summary::HistoricalSummary;
use types::superstruct;
use types::*;
Expand Down Expand Up @@ -47,8 +48,8 @@ where
pub eth1_deposit_index: u64,

// Registry
pub validators: List<Validator, E::ValidatorRegistryLimit>,
pub balances: List<u64, E::ValidatorRegistryLimit>,
pub validators: Validators<E>,
pub balances: Balances<E>,

// Shuffling
/// Randao value from the current slot, for patching into the per-epoch randao vector.
Expand Down
64 changes: 61 additions & 3 deletions beacon_node/store/src/state_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::Error;
use lru::LruCache;
use slog::{debug, Logger};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::num::NonZeroUsize;
use types::milhouse::mem::MemoryTracker;
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot};

/// Fraction of the LRU cache to leave intact during culling.
Expand Down Expand Up @@ -64,11 +66,54 @@ impl<E: EthSpec> StateCache<E> {
self.states.cap().get()
}

fn log_memory_stats(&self, log: &Logger) {
let mut mem_tracker = MemoryTracker::default();
let mut total_usage = 0;
if let Some(finalized_state) = &self.finalized_state {
let stats = mem_tracker.track_item(&finalized_state.state);
debug!(
log,
"Memory stats";
"slot" => finalized_state.state.slot(),
"block_slot" => finalized_state.state.latest_block_header().slot,
"total_kb" => stats.total_size / 1024,
"diff_kb" => stats.differential_size / 1024,
);
total_usage += stats.differential_size;
}
let mut states = self
.states
.iter()
.map(|(_, state)| state)
.collect::<Vec<_>>();
states.sort_by_key(|s| s.slot());

for state in states {
let stats = mem_tracker.track_item(state);
debug!(
log,
"Memory stats";
"slot" => state.slot(),
"block_slot" => state.latest_block_header().slot,
"total_kb" => stats.total_size / 1024,
"diff_kb" => stats.differential_size / 1024,
);
total_usage += stats.differential_size;
}
debug!(
log,
"Total memory stats";
"num_states" => self.states.len(),
"bytes_kb" => total_usage / 1024,
);
}

pub fn update_finalized_state(
&mut self,
state_root: Hash256,
block_root: Hash256,
state: BeaconState<E>,
log: &Logger,
) -> Result<(), Error> {
if state.slot() % E::slots_per_epoch() != 0 {
return Err(Error::FinalizedStateUnaligned);
Expand All @@ -84,6 +129,10 @@ impl<E: EthSpec> StateCache<E> {
return Err(Error::FinalizedStateDecreasingSlot);
}

// Log memory states prior to pruning.
debug!(log, "Pre-pruning memory stats");
self.log_memory_stats(log);

// Add to block map.
self.block_map.insert(block_root, state.slot(), state_root);

Expand All @@ -97,6 +146,11 @@ impl<E: EthSpec> StateCache<E> {

// Update finalized state.
self.finalized_state = Some(FinalizedState { state_root, state });

// Log memory stats after pruning.
debug!(log, "Post-pruning memory stats");
self.log_memory_stats(log);

Ok(())
}

Expand All @@ -105,16 +159,20 @@ impl<E: EthSpec> StateCache<E> {
/// This function should only be called on states that are likely not to already share tree
/// nodes with the finalized state, e.g. states loaded from disk.
///
/// If the finalized state is not initialized this function is a no-op.
/// If the finalized state is not initialized this function is a no-op. Return `true` if the
/// state was rebased and `false`.
#[must_use]
pub fn rebase_on_finalized(
&self,
state: &mut BeaconState<E>,
spec: &ChainSpec,
) -> Result<(), Error> {
) -> Result<bool, Error> {
if let Some(finalized_state) = &self.finalized_state {
state.rebase_on(&finalized_state.state, spec)?;
Ok(true)
} else {
Ok(false)
}
Ok(())
}

/// Return a status indicating whether the state already existed in the cache.
Expand Down
Loading
Loading