Skip to content

Commit

Permalink
polkadot: pin inclusion blocks until slashed
Browse files Browse the repository at this point in the history
  • Loading branch information
ordian committed Aug 28, 2023
1 parent 84183ac commit 6a713ed
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 65 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions polkadot/node/core/chain-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ polkadot-node-metrics = { path = "../../metrics" }
polkadot-node-subsystem = { path = "../../subsystem" }
sc-client-api = { path = "../../../../substrate/client/api" }
sc-consensus-babe = { path = "../../../../substrate/client/consensus/babe" }
schnellru = "0.2.1"

[dev-dependencies]
futures = { version = "0.3.21", features = ["thread-pool"] }
Expand Down
55 changes: 49 additions & 6 deletions polkadot/node/core/chain-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@
use std::sync::Arc;

use futures::prelude::*;
use sc_client_api::AuxStore;
use sc_client_api::{AuxStore, BlockPinning};
use schnellru::{ByLength, LruMap};
use sp_blockchain::HeaderBackend;

use polkadot_node_subsystem::{
messages::ChainApiMessage, overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem,
SubsystemError, SubsystemResult,
};
use polkadot_primitives::Block;
use polkadot_primitives::{Block, Hash};

mod metrics;
use self::metrics::Metrics;
Expand All @@ -50,24 +51,32 @@ use self::metrics::Metrics;
mod tests;

const LOG_TARGET: &str = "parachain::chain-api";
// Should be lower than the upper limit in Substrate,
// but high enough to allow the slashing to succeed.
const MAX_PINNED_BLOCKS: u32 = 64;

/// The Chain API Subsystem implementation.
pub struct ChainApiSubsystem<Client> {
client: Arc<Client>,
// Maps the block hash to the number of times it was pinned.
// The mapping is used to limit the number of pinned blocks
// and enforce unpinning of blocks that were never unpinned explicitly.
pinned_blocks: LruMap<Hash, usize>,
metrics: Metrics,
}

impl<Client> ChainApiSubsystem<Client> {
/// Create a new Chain API subsystem with the given client.
pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
ChainApiSubsystem { client, metrics }
let pinned_blocks = LruMap::new(ByLength::new(MAX_PINNED_BLOCKS));
ChainApiSubsystem { client, metrics, pinned_blocks }
}
}

#[overseer::subsystem(ChainApi, error = SubsystemError, prefix = self::overseer)]
impl<Client, Context> ChainApiSubsystem<Client>
where
Client: HeaderBackend<Block> + AuxStore + 'static,
Client: HeaderBackend<Block> + BlockPinning<Block> + AuxStore + 'static,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run::<Client, Context>(ctx, self)
Expand All @@ -80,10 +89,10 @@ where
#[overseer::contextbounds(ChainApi, prefix = self::overseer)]
async fn run<Client, Context>(
mut ctx: Context,
subsystem: ChainApiSubsystem<Client>,
mut subsystem: ChainApiSubsystem<Client>,
) -> SubsystemResult<()>
where
Client: HeaderBackend<Block> + AuxStore,
Client: HeaderBackend<Block> + BlockPinning<Block> + AuxStore,
{
loop {
match ctx.recv().await? {
Expand Down Expand Up @@ -156,6 +165,40 @@ where
subsystem.metrics.on_request(result.is_ok());
let _ = response_channel.send(result);
},
ChainApiMessage::PinBlock(hash) => {
let _timer = subsystem.metrics.time_pin_block();

// check if the map is full
if subsystem.pinned_blocks.len() == MAX_PINNED_BLOCKS as usize {
// unpin the least recently pinned block
let (hash, count) = subsystem
.pinned_blocks
.pop_oldest()
.expect("len is checked above; qed");
for _ in 0..count {
subsystem.client.unpin_block(hash);
}
}
if let Some(count) = subsystem.pinned_blocks.get_or_insert(hash, || 0) {
*count += 1;
}
// don't propagate the result
// the caller can not do anything about it
let result = subsystem.client.pin_block(hash);
subsystem.metrics.on_request(result.is_ok());
},
ChainApiMessage::UnpinBlock(hash) => {
let _timer = subsystem.metrics.time_unpin_block();
if let Some(count) = subsystem.pinned_blocks.get(&hash) {
*count = count.saturating_sub(1);
if *count == 0 {
let _ = subsystem.pinned_blocks.remove(&hash);
}
}
subsystem.client.unpin_block(hash);
// always succeeds
subsystem.metrics.on_request(true);
},
},
}
}
Expand Down
26 changes: 26 additions & 0 deletions polkadot/node/core/chain-api/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub(crate) struct MetricsInner {
pub(crate) finalized_block_hash: prometheus::Histogram,
pub(crate) finalized_block_number: prometheus::Histogram,
pub(crate) ancestors: prometheus::Histogram,
pub(crate) pin_block: prometheus::Histogram,
pub(crate) unpin_block: prometheus::Histogram,
}

/// Chain API metrics.
Expand Down Expand Up @@ -75,6 +77,16 @@ impl Metrics {
pub fn time_ancestors(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.ancestors.start_timer())
}

/// Provide a timer for `pin_block` which observes on drop.
pub fn time_pin_block(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.pin_block.start_timer())
}

/// Provide a timer for `unpin_block` which observes on drop.
pub fn time_unpin_block(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.unpin_block.start_timer())
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -132,6 +144,20 @@ impl metrics::Metrics for Metrics {
))?,
registry,
)?,
pin_block: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_chain_api_pin_block",
"Time spent within `chain_api::pin_block`",
))?,
registry,
)?,
unpin_block: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"polkadot_parachain_chain_api_unpin_block",
"Time spent within `chain_api::unpin_block`",
))?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
Expand Down
8 changes: 8 additions & 0 deletions polkadot/node/core/chain-api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ impl AuxStore for TestClient {
}
}

impl BlockPinning<Block> for TestClient {
fn pin_block(&self, _hash: Hash) -> sp_blockchain::Result<()> {
Ok(())
}

fn unpin_block(&self, _hash: Hash) {}
}

#[test]
fn request_block_number() {
test_harness(|client, mut sender| {
Expand Down
20 changes: 17 additions & 3 deletions polkadot/node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use polkadot_node_primitives::{
};
use polkadot_node_subsystem::{
messages::{
ApprovalVotingMessage, BlockDescription, ChainSelectionMessage, DisputeCoordinatorMessage,
DisputeDistributionMessage, ImportStatementsResult,
ApprovalVotingMessage, BlockDescription, ChainApiMessage, ChainSelectionMessage,
DisputeCoordinatorMessage, DisputeDistributionMessage, ImportStatementsResult,
},
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError,
};
Expand Down Expand Up @@ -402,7 +402,7 @@ impl Initialized {
let mut key_ownership_proofs = Vec::new();
let mut dispute_proofs = Vec::new();

for (_height, inclusion_parent) in inclusions {
for (_height, inclusion_parent) in inclusions.iter().cloned() {
for (validator_index, validator_id) in pending.keys.iter() {
let res =
key_ownership_proof(ctx.sender(), inclusion_parent, validator_id.clone())
Expand Down Expand Up @@ -466,6 +466,16 @@ impl Initialized {
"Could not generate key ownership proofs for {} keys",
expected_keys - resolved_keys,
);
} else {
gum::debug!(
target: LOG_TARGET,
?session_index,
?candidate_hash,
"All good. Unpinning the inclusion blocks",
);
for (_number, hash) in inclusions {
ctx.send_message(ChainApiMessage::UnpinBlock(hash)).await;
}
}
debug_assert_eq!(resolved_keys, dispute_proofs.len());

Expand Down Expand Up @@ -1267,6 +1277,7 @@ impl Initialized {

// Notify ChainSelection if a dispute has concluded against a candidate. ChainSelection
// will need to mark the candidate's relay parent as reverted.
// Also pin the inclusion blocks until we slash the validators.
if import_result.has_fresh_byzantine_threshold_against() {
let blocks_including = self.scraper.get_blocks_including_candidate(&candidate_hash);
for (parent_block_number, parent_block_hash) in &blocks_including {
Expand All @@ -1279,6 +1290,9 @@ impl Initialized {
);
}
if blocks_including.len() > 0 {
for (_number, inclusion_block_hash) in &blocks_including {
ctx.send_message(ChainApiMessage::PinBlock(*inclusion_block_hash)).await;
}
ctx.send_message(ChainSelectionMessage::RevertBlocks(blocks_including)).await;
} else {
gum::debug!(
Expand Down
18 changes: 11 additions & 7 deletions polkadot/node/malus/src/variants/back_garbage_candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
use polkadot_cli::{
prepared_overseer_builder,
service::{
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer,
OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost,
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, BlockPinning, Error, HeaderBackend,
Overseer, OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost,
ProvideRuntimeApi,
},
Cli,
Expand Down Expand Up @@ -59,17 +59,21 @@ pub(crate) struct BackGarbageCandidates {
}

impl OverseerGen for BackGarbageCandidates {
fn generate<Spawner, RuntimeClient>(
fn generate<Spawner, Client>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
args: OverseerGenArgs<'_, Spawner, Client>,
) -> Result<
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<Client>>>, OverseerHandle),
Error,
>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Client: 'static
+ ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ BlockPinning<Block>
+ AuxStore,
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
let spawner = args.spawner.clone();
Expand Down
18 changes: 11 additions & 7 deletions polkadot/node/malus/src/variants/dispute_valid_candidates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
use polkadot_cli::{
prepared_overseer_builder,
service::{
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer,
OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost,
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, BlockPinning, Error, HeaderBackend,
Overseer, OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost,
ProvideRuntimeApi,
},
Cli,
Expand Down Expand Up @@ -76,17 +76,21 @@ pub(crate) struct DisputeValidCandidates {
}

impl OverseerGen for DisputeValidCandidates {
fn generate<Spawner, RuntimeClient>(
fn generate<Spawner, Client>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
args: OverseerGenArgs<'_, Spawner, Client>,
) -> Result<
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<Client>>>, OverseerHandle),
Error,
>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Client: 'static
+ ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ BlockPinning<Block>
+ AuxStore,
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
let spawner = args.spawner.clone();
Expand Down
18 changes: 11 additions & 7 deletions polkadot/node/malus/src/variants/suggest_garbage_candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
use polkadot_cli::{
prepared_overseer_builder,
service::{
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer,
OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost,
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, BlockPinning, Error, HeaderBackend,
Overseer, OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost,
ProvideRuntimeApi,
},
Cli,
Expand Down Expand Up @@ -262,17 +262,21 @@ pub(crate) struct SuggestGarbageCandidates {
}

impl OverseerGen for SuggestGarbageCandidates {
fn generate<Spawner, RuntimeClient>(
fn generate<Spawner, Client>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
args: OverseerGenArgs<'_, Spawner, Client>,
) -> Result<
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<Client>>>, OverseerHandle),
Error,
>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Client: 'static
+ ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ BlockPinning<Block>
+ AuxStore,
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
gum::info!(
Expand Down
14 changes: 9 additions & 5 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub use consensus_common::{Proposal, SelectChain};
use frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE;
use mmr_gadget::MmrGadget;
pub use polkadot_primitives::{Block, BlockId, BlockNumber, CollatorPair, Hash, Id as ParaId};
pub use sc_client_api::{Backend, CallExecutor};
pub use sc_client_api::{Backend, BlockPinning, CallExecutor};
pub use sc_consensus::{BlockImport, LongestChain};
pub use sc_executor::NativeExecutionDispatch;
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
Expand Down Expand Up @@ -753,9 +753,13 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
Some(backoff)
};

// Warn the user that BEEFY is still experimental for Polkadot.
if enable_beefy && config.chain_spec.is_polkadot() {
gum::warn!("BEEFY is still experimental, usage on Polkadot network is discouraged.");
// If not on a known test network, warn the user that BEEFY is still experimental.
if enable_beefy &&
!config.chain_spec.is_rococo() &&
!config.chain_spec.is_wococo() &&
!config.chain_spec.is_versi()
{
gum::warn!("BEEFY is still experimental, usage on a production network is discouraged.");
}

let disable_grandpa = config.disable_grandpa;
Expand Down Expand Up @@ -1046,7 +1050,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
overseer_connector,
OverseerGenArgs {
keystore,
runtime_client: overseer_client.clone(),
client: overseer_client.clone(),
parachains_db,
network_service: network.clone(),
sync_service: sync_service.clone(),
Expand Down
Loading

0 comments on commit 6a713ed

Please sign in to comment.