Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions substrate/client/network/statement/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ workspace = true
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

[features]
test-helpers = []

[dependencies]
array-bytes = { workspace = true, default-features = true }
async-channel = { workspace = true }
Expand All @@ -33,3 +36,17 @@ tokio = { workspace = true }

[dev-dependencies]
async-trait = { workspace = true }
criterion = { workspace = true, default-features = true }
sc-keystore = { workspace = true, default-features = true }
sc-statement-store = { workspace = true, default-features = true }
sp-api = { workspace = true, default-features = true }
sp-blockchain = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
sp-keyring = { workspace = true, default-features = true }
substrate-test-runtime-client = { workspace = true }
tempfile = { workspace = true }

[[bench]]
name = "statement_network"
harness = false
required-features = ["test-helpers"]
330 changes: 330 additions & 0 deletions substrate/client/network/statement/benches/statement_network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use criterion::{criterion_group, criterion_main, Criterion};
use futures::{stream, Stream, StreamExt};
use sc_network::{
service::traits::{NotificationEvent, NotificationService},
utils::LruHashSet,
NetworkPeers, ObservedRole,
};
use sc_network_statement::{
config::{MAX_KNOWN_STATEMENTS, MAX_PENDING_STATEMENTS},
Peer, StatementHandler,
};
use sc_network_sync::{SyncEvent, SyncEventStream};
use sc_network_types::PeerId;
use sc_statement_store::Store;
use sp_core::Pair;
use sp_statement_store::{Statement, StatementSource, StatementStore};
use std::{collections::HashMap, num::NonZeroUsize, pin::Pin, sync::Arc};
use substrate_test_runtime_client::{sc_executor::WasmExecutor, DefaultTestClientBuilderExt};

const STATEMENT_DATA_SIZE: usize = 256;

#[derive(Clone)]
struct TestNetwork;

impl TestNetwork {
fn new() -> Self {
Self
}
}

#[async_trait::async_trait]
impl NetworkPeers for TestNetwork {
fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {}
fn set_authorized_only(&self, _: bool) {}
fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {}
fn report_peer(&self, _peer_id: PeerId, _cost_benefit: sc_network::ReputationChange) {}
fn peer_reputation(&self, _: &PeerId) -> i32 {
unimplemented!()
}
fn disconnect_peer(&self, _: PeerId, _: sc_network::ProtocolName) {}
fn accept_unreserved_peers(&self) {}
fn deny_unreserved_peers(&self) {}
fn add_reserved_peer(&self, _: sc_network::config::MultiaddrWithPeerId) -> Result<(), String> {
unimplemented!()
}
fn remove_reserved_peer(&self, _: PeerId) {}
fn set_reserved_peers(
&self,
_: sc_network::ProtocolName,
_: std::collections::HashSet<sc_network::Multiaddr>,
) -> Result<(), String> {
unimplemented!()
}
fn add_peers_to_reserved_set(
&self,
_: sc_network::ProtocolName,
_: std::collections::HashSet<sc_network::Multiaddr>,
) -> Result<(), String> {
unimplemented!()
}
fn remove_peers_from_reserved_set(
&self,
_: sc_network::ProtocolName,
_: Vec<PeerId>,
) -> Result<(), String> {
unimplemented!()
}
fn sync_num_connected(&self) -> usize {
unimplemented!()
}
fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
unimplemented!()
}
async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
unimplemented!()
}
}

struct TestSync {}

impl TestSync {
fn new() -> Self {
Self {}
}
}

impl SyncEventStream for TestSync {
fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
unimplemented!()
}
}

impl sp_consensus::SyncOracle for TestSync {
fn is_major_syncing(&self) -> bool {
unimplemented!()
}
fn is_offline(&self) -> bool {
unimplemented!()
}
}

impl sc_network::NetworkEventStream for TestNetwork {
fn event_stream(
&self,
_name: &'static str,
) -> Pin<Box<dyn Stream<Item = sc_network::Event> + Send>> {
unimplemented!()
}
}

#[derive(Debug, Clone)]
struct TestNotificationService;

#[async_trait::async_trait]
impl NotificationService for TestNotificationService {
async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!()
}
async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!()
}
fn send_sync_notification(&mut self, _peer: &PeerId, _notification: Vec<u8>) {}
async fn send_async_notification(
&mut self,
_peer: &PeerId,
_notification: Vec<u8>,
) -> Result<(), sc_network::error::Error> {
unimplemented!()
}
async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!()
}
fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!()
}
async fn next_event(&mut self) -> Option<NotificationEvent> {
unimplemented!()
}
fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
unimplemented!()
}
fn protocol(&self) -> &sc_network::types::ProtocolName {
unimplemented!()
}
fn message_sink(
&self,
_peer: &PeerId,
) -> Option<Box<dyn sc_network::service::traits::MessageSink>> {
unimplemented!()
}
}

fn create_signed_statement(id: usize, keypair: &sp_core::ed25519::Pair) -> Statement {
let mut statement = Statement::new();
let mut data = vec![0u8; STATEMENT_DATA_SIZE];
data[0..8].copy_from_slice(&id.to_le_bytes());
statement.set_plain_data(data);

statement.sign_ed25519_private(keypair);
statement
}

fn build_handler(
executor: Arc<
dyn Fn(Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>) + Send + Sync,
>,
num_threads: usize,
) -> (StatementHandler<TestNetwork, TestSync>, PeerId, tempfile::TempDir) {
let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
let mut path: std::path::PathBuf = temp_dir.path().into();
path.push("db");

let wasm_executor = WasmExecutor::builder().with_max_runtime_instances(8).build();
let (client, _) = substrate_test_runtime_client::TestClientBuilder::new()
.build_with_native_executor::<substrate_test_runtime_client::runtime::RuntimeApi, _>(
Some(wasm_executor),
);
let client = Arc::new(client);
let keystore = Arc::new(sc_keystore::LocalKeystore::in_memory());
let statement_store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
let statement_store = Arc::new(statement_store);

let (queue_sender, queue_receiver) = async_channel::bounded::<(
Statement,
futures::channel::oneshot::Sender<sp_statement_store::SubmitResult>,
)>(MAX_PENDING_STATEMENTS);

let network = TestNetwork::new();
let peer_id = PeerId::random();
let mut peers = HashMap::new();
peers.insert(
peer_id,
Peer::new_for_testing(
LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_STATEMENTS).unwrap()),
ObservedRole::Full,
),
);

for _ in 0..num_threads {
let store = statement_store.clone();
let receiver = queue_receiver.clone();
executor(Box::pin(async move {
loop {
let task = receiver.recv().await;
match task {
Ok((statement, completion)) => {
let result = store.submit(statement, StatementSource::Network);
let _ = completion.send(result);
},
Err(_) => return,
}
}
}));
}

let handler = StatementHandler::new_for_testing(
"/statement/1".into(),
Box::new(TestNotificationService),
(Box::pin(stream::pending()) as Pin<Box<dyn Stream<Item = ()> + Send>>).fuse(),
network.clone(),
TestSync::new(),
(Box::pin(stream::pending()) as Pin<Box<dyn Stream<Item = SyncEvent> + Send>>).fuse(),
peers,
statement_store,
queue_sender,
);
(handler, peer_id, temp_dir)
}

fn non_blocking_executor(
handle: &tokio::runtime::Handle,
) -> Arc<dyn Fn(Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>) + Send + Sync> {
let executor: Arc<
dyn Fn(Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>) + Send + Sync,
> = Arc::new({
let h = handle.clone();
move |fut| {
h.spawn(fut);
}
});
executor
}

fn blocking_executor(
handle: &tokio::runtime::Handle,
) -> Arc<dyn Fn(Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>) + Send + Sync> {
let executor: Arc<
dyn Fn(Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>) + Send + Sync,
> = Arc::new({
let h = handle.clone();
move |fut| {
h.spawn_blocking({
let h = h.clone();
move || h.block_on(fut)
});
}
});
executor
}

fn bench_on_statements(c: &mut Criterion) {
let statement_counts = [100, 500, 1000, 2000];
let thread_counts = [1, 2, 4, 8];
let executor_types = [("blocking", true), ("non_blocking", false)];

let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap();
let runtime = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
let handle = runtime.handle();

for &num_statements in &statement_counts {
for &num_threads in &thread_counts {
for &(executor_name, is_blocking) in &executor_types {
let statements: Vec<Statement> =
(0..num_statements).map(|i| create_signed_statement(i, &keypair)).collect();
let executor = if is_blocking {
blocking_executor(&handle)
} else {
non_blocking_executor(&handle)
};

let benchmark_name = format!(
"on_statements/statements_{}/threads_{}/{}",
num_statements, num_threads, executor_name
);

c.bench_function(&benchmark_name, |b| {
b.iter_batched(
|| build_handler(executor.clone(), num_threads),
|(mut handler, peer_id, _temp_dir)| {
handler.on_statements(peer_id, statements.clone());

runtime.block_on(async {
while handler.pending_statements_mut().next().await.is_some() {}
});

let pending = handler.pending_statements_mut();
assert!(
pending.is_empty(),
"Pending statements not empty: {}",
pending.len()
);
},
criterion::BatchSize::LargeInput,
)
});
}
}
}
}

criterion_group!(benches, bench_on_statements);
criterion_main!(benches);
Loading
Loading