Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion cumulus/polkadot-omni-node/lib/src/common/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub(crate) trait BuildRpcExtensions<Client, Backend, Pool, StatementStore> {
backend: Arc<Backend>,
pool: Arc<Pool>,
statement_store: Option<Arc<StatementStore>>,
spawn_handle: Arc<dyn sp_core::traits::SpawnNamed>,
) -> sc_service::error::Result<RpcExtension>;
}

Expand All @@ -66,6 +67,7 @@ where
sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>,
>,
statement_store: Option<Arc<sc_statement_store::Store>>,
spawn_handle: Arc<dyn sp_core::traits::SpawnNamed>,
) -> sc_service::error::Result<RpcExtension> {
let build = || -> Result<RpcExtension, Box<dyn std::error::Error + Send + Sync>> {
let mut module = RpcExtension::new(());
Expand All @@ -74,7 +76,7 @@ where
module.merge(TransactionPayment::new(client.clone()).into_rpc())?;
module.merge(StateMigration::new(client.clone(), backend).into_rpc())?;
if let Some(statement_store) = statement_store {
module.merge(StatementStore::new(statement_store).into_rpc())?;
module.merge(StatementStore::new(statement_store, spawn_handle).into_rpc())?;
}
module.merge(Dev::new(client).into_rpc())?;

Expand Down
4 changes: 3 additions & 1 deletion cumulus/polkadot-omni-node/lib/src/common/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,18 +433,20 @@ pub(crate) trait NodeSpec: BaseNodeSpec {
);
}

let spawn_handle = Arc::new(task_manager.spawn_handle());

let rpc_builder = {
let client = client.clone();
let transaction_pool = transaction_pool.clone();
let backend_for_rpc = backend.clone();
let statement_store = statement_store.clone();

Box::new(move |_| {
Self::BuildRpcExtensions::build_rpc_extensions(
client.clone(),
backend_for_rpc.clone(),
transaction_pool.clone(),
statement_store.clone(),
spawn_handle.clone(),
)
})
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub(crate) fn build_statement_store<
client,
local_keystore,
parachain_config.prometheus_registry(),
&task_manager.spawn_handle(),
Box::new(task_manager.spawn_handle()),
)
.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
let statement_protocol_executor = {
Expand Down
3 changes: 2 additions & 1 deletion cumulus/polkadot-omni-node/lib/src/nodes/aura.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ where
);
},
}

let spawn_handle = Arc::new(task_manager.spawn_handle());
let rpc_extensions_builder = {
let client = client.clone();
let transaction_pool = transaction_pool.clone();
Expand All @@ -361,6 +361,7 @@ where
backend_for_rpc.clone(),
transaction_pool.clone(),
None,
spawn_handle.clone(),
)?;
Ok(module)
})
Expand Down
2 changes: 1 addition & 1 deletion cumulus/zombienet/zombienet-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ zombienet-orchestrator = { workspace = true }
zombienet-configuration = { workspace = true }
cumulus-zombienet-sdk-helpers = { workspace = true }
cumulus-test-runtime = { workspace = true }
sp-statement-store = { workspace = true, default-features = true }
sp-statement-store = { workspace = true, features = ["serde"], default-features = true }
sc-statement-store = { workspace = true, default-features = true }
sp-keyring = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
Expand Down
55 changes: 28 additions & 27 deletions cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
// Test that people-westend enables the statement store in the node and that statements are
// propagated to peers.

use std::{time::Duration, u32};

use anyhow::anyhow;
use sp_core::{Bytes, Encode};
use sp_statement_store::{SubmitResult, TopicFilter};
use zombienet_sdk::{subxt::ext::subxt_rpcs::rpc_params, NetworkConfigBuilder};

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -64,40 +67,38 @@ async fn statement_store() -> Result<(), anyhow::Error> {

// Create the statement "1,2,3" signed by dave.
let mut statement = sp_statement_store::Statement::new();
let topic = [0u8; 32]; // just a dummy topic
statement.set_plain_data(vec![1, 2, 3]);
statement.set_topic(0, topic);
statement.set_expiry_from_parts(u32::MAX, 0);
let dave = sp_keyring::Sr25519Keyring::Dave;
statement.sign_sr25519_private(&dave.pair());
let statement: Bytes = statement.encode().into();

// Submit the statement to charlie.
let _: () = charlie_rpc.request("statement_submit", rpc_params![statement.clone()]).await?;

// Ensure that charlie stored the statement.
let charlie_dump: Vec<Bytes> = charlie_rpc.request("statement_dump", rpc_params![]).await?;
if charlie_dump != vec![statement.clone()] {
return Err(anyhow!("Charlie did not store the statement"));
}

// Query dave until it receives the statement, stop if 20 seconds passed.
let query_start_time = std::time::SystemTime::now();
// Subscribe to statements with topic "topic" to dave.
let stop_after_secs = 20;
loop {
let dave_dump: Vec<Bytes> = dave_rpc.request("statement_dump", rpc_params![]).await?;
if !dave_dump.is_empty() {
if dave_dump != vec![statement.clone()] {
return Err(anyhow!("Dave statement store is not the expected one"));
}
break;
}
let mut subscription = dave_rpc
.subscribe::<Bytes>(
"statement_subscribeStatement",
rpc_params![TopicFilter::MatchAll(vec![topic.to_vec().into()])],
"statement_unsubscribeStatement",
)
.await?;

let elapsed =
query_start_time.elapsed().map_err(|_| anyhow!("Failed to get elapsed time"))?;
if elapsed.as_secs() > stop_after_secs {
return Err(anyhow!("Dave did not receive the statement in time"));
}
// Submit the statement to charlie.
let _: SubmitResult =
charlie_rpc.request("statement_submit", rpc_params![statement.clone()]).await?;

tokio::time::sleep(core::time::Duration::from_secs(1)).await;
}
let statement_bytes =
tokio::time::timeout(Duration::from_secs(stop_after_secs), subscription.next())
.await
.expect("Should not timeout")
.expect("Should receive")
.expect("Should not error");

assert_eq!(statement_bytes, statement);
// Now make sure no more statements are received.
assert!(tokio::time::timeout(Duration::from_secs(stop_after_secs), subscription.next())
.await
.is_err());
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ impl Participant {
fn create_session_key_statement(&self) -> Statement {
let mut statement = Statement::new();
statement.set_channel(channel_public_key());
statement.set_priority(self.sent_count);
statement.set_expiry_from_parts(u32::MAX, self.sent_count);
statement.set_topic(0, topic_public_key());
statement.set_topic(1, topic_idx(self.idx));
statement.set_plain_data(self.session_key.public().to_vec());
Expand Down Expand Up @@ -566,7 +566,7 @@ impl Participant {
statement.set_topic(0, topic0);
statement.set_topic(1, topic1);
statement.set_channel(channel);
statement.set_priority(self.sent_count);
statement.set_expiry_from_parts(u32::MAX, self.sent_count);
statement.set_plain_data(request_data);
statement.sign_sr25519_private(&self.keyring);

Expand Down
17 changes: 1 addition & 16 deletions substrate/client/rpc-api/src/statement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,11 @@
//! Substrate Statement Store RPC API.

use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use serde::{Deserialize, Serialize};
use sp_core::Bytes;
use sp_statement_store::SubmitResult;
use sp_statement_store::{SubmitResult, TopicFilter};

pub mod error;

/// Filter for subscribing to statements with different topics.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum TopicFilter {
/// Matches all topics.
Any,
/// Matches only statements including all of the given topics.
/// Bytes are expected to be a 32-byte topic. Up to `4` topics can be provided.
MatchAll(Vec<Bytes>),
/// Matches statements including any of the given topics.
/// Bytes are expected to be a 32-byte topic. Up to `128` topics can be provided.
MatchAny(Vec<Bytes>),
}

/// Substrate statement RPC API
#[rpc(client, server)]
pub trait StatementApi {
Expand Down
6 changes: 6 additions & 0 deletions substrate/client/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ sc-chain-spec = { workspace = true, default-features = true }
sc-client-api = { workspace = true, default-features = true }
sc-mixnet = { workspace = true, default-features = true }
sc-rpc-api = { workspace = true, default-features = true }
sc-statement-store = { workspace = true, default-features = true }
sc-tracing = { workspace = true, default-features = true }
sc-transaction-pool-api = { workspace = true, default-features = true }
sc-utils = { workspace = true, default-features = true }
Expand All @@ -47,9 +48,14 @@ assert_matches = { workspace = true }
pretty_assertions = { workspace = true }
sc-network = { workspace = true, default-features = true }
sc-transaction-pool = { workspace = true, default-features = true }
sc-statement-store = { workspace = true }
sc-keystore = { workspace = true }
sp-consensus = { workspace = true, default-features = true }
sp-crypto-hashing = { workspace = true, default-features = true }
substrate-test-runtime-client = { workspace = true }
tempfile = { workspace = true }
async-channel = { workspace = true }
futures-timer = { workspace = true }

[features]
test-helpers = []
Loading
Loading