diff --git a/Cargo.lock b/Cargo.lock index d129f4eea452c..a013929dfdb73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20734,7 +20734,9 @@ name = "sc-rpc" version = "29.0.0" dependencies = [ "assert_matches", + "async-channel 1.9.0", "futures", + "futures-timer", "jsonrpsee", "log", "parity-scale-codec", @@ -20743,9 +20745,11 @@ dependencies = [ "sc-block-builder", "sc-chain-spec", "sc-client-api", + "sc-keystore", "sc-mixnet", "sc-network", "sc-rpc-api", + "sc-statement-store", "sc-tracing", "sc-transaction-pool", "sc-transaction-pool-api", @@ -20764,6 +20768,7 @@ dependencies = [ "sp-statement-store", "sp-version", "substrate-test-runtime-client", + "tempfile", "tokio", ] @@ -21000,13 +21005,16 @@ dependencies = [ name = "sc-statement-store" version = "10.0.0" dependencies = [ + "async-channel 1.9.0", "criterion", + "futures", "log", "parity-db", "parking_lot 0.12.3", "sc-client-api", "sc-keystore", "sc-network-statement", + "sc-utils", "sp-api", "sp-blockchain", "sp-core 28.0.0", @@ -24750,6 +24758,7 @@ dependencies = [ "sp-runtime", "sp-session", "sp-state-machine", + "sp-statement-store", "sp-tracing 16.0.0", "sp-transaction-pool", "sp-trie", diff --git a/cumulus/polkadot-omni-node/lib/src/common/rpc.rs b/cumulus/polkadot-omni-node/lib/src/common/rpc.rs index c97af2c9d7ae6..51841a3516dee 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/rpc.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/rpc.rs @@ -41,6 +41,7 @@ pub(crate) trait BuildRpcExtensions { backend: Arc, pool: Arc, statement_store: Option>, + spawn_handle: Arc, ) -> sc_service::error::Result; } @@ -66,6 +67,7 @@ where sc_transaction_pool::TransactionPoolHandle>, >, statement_store: Option>, + spawn_handle: Arc, ) -> sc_service::error::Result { let build = || -> Result> { let mut module = RpcExtension::new(()); @@ -74,7 +76,7 @@ where module.merge(TransactionPayment::new(client.clone()).into_rpc())?; module.merge(StateMigration::new(client.clone(), backend).into_rpc())?; if let Some(statement_store) = statement_store { - module.merge(StatementStore::new(statement_store).into_rpc())?; + module.merge(StatementStore::new(statement_store, spawn_handle).into_rpc())?; } module.merge(Dev::new(client).into_rpc())?; diff --git a/cumulus/polkadot-omni-node/lib/src/common/spec.rs b/cumulus/polkadot-omni-node/lib/src/common/spec.rs index eadb68c502537..e03728d4a44e8 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/spec.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/spec.rs @@ -433,18 +433,20 @@ pub(crate) trait NodeSpec: BaseNodeSpec { ); } + let spawn_handle = Arc::new(task_manager.spawn_handle()); + let rpc_builder = { let client = client.clone(); let transaction_pool = transaction_pool.clone(); let backend_for_rpc = backend.clone(); let statement_store = statement_store.clone(); - Box::new(move |_| { Self::BuildRpcExtensions::build_rpc_extensions( client.clone(), backend_for_rpc.clone(), transaction_pool.clone(), statement_store.clone(), + spawn_handle.clone(), ) }) }; diff --git a/cumulus/polkadot-omni-node/lib/src/common/statement_store.rs b/cumulus/polkadot-omni-node/lib/src/common/statement_store.rs index 604103e6c1a37..219ee6793f31b 100644 --- a/cumulus/polkadot-omni-node/lib/src/common/statement_store.rs +++ b/cumulus/polkadot-omni-node/lib/src/common/statement_store.rs @@ -70,7 +70,7 @@ pub(crate) fn build_statement_store< client, local_keystore, parachain_config.prometheus_registry(), - &task_manager.spawn_handle(), + Box::new(task_manager.spawn_handle()), ) .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?; let statement_protocol_executor = { diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs index bbdce78d0121d..218281d4db949 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs @@ -349,7 +349,7 @@ where ); }, } - + let spawn_handle = Arc::new(task_manager.spawn_handle()); let rpc_extensions_builder = { let client = client.clone(); let transaction_pool = transaction_pool.clone(); @@ -361,6 +361,7 @@ where backend_for_rpc.clone(), transaction_pool.clone(), None, + spawn_handle.clone(), )?; Ok(module) }) diff --git a/cumulus/zombienet/zombienet-sdk/Cargo.toml b/cumulus/zombienet/zombienet-sdk/Cargo.toml index 52f1a6e6364fb..781af19b09c29 100644 --- a/cumulus/zombienet/zombienet-sdk/Cargo.toml +++ b/cumulus/zombienet/zombienet-sdk/Cargo.toml @@ -21,7 +21,7 @@ zombienet-orchestrator = { workspace = true } zombienet-configuration = { workspace = true } cumulus-zombienet-sdk-helpers = { workspace = true } cumulus-test-runtime = { workspace = true } -sp-statement-store = { workspace = true, default-features = true } +sp-statement-store = { workspace = true, features = ["serde"], default-features = true } sc-statement-store = { workspace = true, default-features = true } sp-keyring = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } diff --git a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store.rs b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store.rs index 988441b15b835..69fc1721e473a 100644 --- a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store.rs +++ b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store.rs @@ -4,8 +4,11 @@ // Test that people-westend enables the statement store in the node and that statements are // propagated to peers. +use std::{time::Duration, u32}; + use anyhow::anyhow; use sp_core::{Bytes, Encode}; +use sp_statement_store::{SubmitResult, TopicFilter}; use zombienet_sdk::{subxt::ext::subxt_rpcs::rpc_params, NetworkConfigBuilder}; #[tokio::test(flavor = "multi_thread")] @@ -64,40 +67,38 @@ async fn statement_store() -> Result<(), anyhow::Error> { // Create the statement "1,2,3" signed by dave. let mut statement = sp_statement_store::Statement::new(); + let topic = [0u8; 32]; // just a dummy topic statement.set_plain_data(vec![1, 2, 3]); + statement.set_topic(0, topic); + statement.set_expiry_from_parts(u32::MAX, 0); let dave = sp_keyring::Sr25519Keyring::Dave; statement.sign_sr25519_private(&dave.pair()); let statement: Bytes = statement.encode().into(); - - // Submit the statement to charlie. - let _: () = charlie_rpc.request("statement_submit", rpc_params![statement.clone()]).await?; - - // Ensure that charlie stored the statement. - let charlie_dump: Vec = charlie_rpc.request("statement_dump", rpc_params![]).await?; - if charlie_dump != vec![statement.clone()] { - return Err(anyhow!("Charlie did not store the statement")); - } - - // Query dave until it receives the statement, stop if 20 seconds passed. - let query_start_time = std::time::SystemTime::now(); + // Subscribe to statements with topic "topic" to dave. let stop_after_secs = 20; - loop { - let dave_dump: Vec = dave_rpc.request("statement_dump", rpc_params![]).await?; - if !dave_dump.is_empty() { - if dave_dump != vec![statement.clone()] { - return Err(anyhow!("Dave statement store is not the expected one")); - } - break; - } + let mut subscription = dave_rpc + .subscribe::( + "statement_subscribeStatement", + rpc_params![TopicFilter::MatchAll(vec![topic.to_vec().into()])], + "statement_unsubscribeStatement", + ) + .await?; - let elapsed = - query_start_time.elapsed().map_err(|_| anyhow!("Failed to get elapsed time"))?; - if elapsed.as_secs() > stop_after_secs { - return Err(anyhow!("Dave did not receive the statement in time")); - } + // Submit the statement to charlie. + let _: SubmitResult = + charlie_rpc.request("statement_submit", rpc_params![statement.clone()]).await?; - tokio::time::sleep(core::time::Duration::from_secs(1)).await; - } + let statement_bytes = + tokio::time::timeout(Duration::from_secs(stop_after_secs), subscription.next()) + .await + .expect("Should not timeout") + .expect("Should receive") + .expect("Should not error"); + assert_eq!(statement_bytes, statement); + // Now make sure no more statements are received. + assert!(tokio::time::timeout(Duration::from_secs(stop_after_secs), subscription.next()) + .await + .is_err()); Ok(()) } diff --git a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs index a284344ec703a..f7d348aa041d6 100644 --- a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs +++ b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs @@ -537,7 +537,7 @@ impl Participant { fn create_session_key_statement(&self) -> Statement { let mut statement = Statement::new(); statement.set_channel(channel_public_key()); - statement.set_priority(self.sent_count); + statement.set_expiry_from_parts(u32::MAX, self.sent_count); statement.set_topic(0, topic_public_key()); statement.set_topic(1, topic_idx(self.idx)); statement.set_plain_data(self.session_key.public().to_vec()); @@ -566,7 +566,7 @@ impl Participant { statement.set_topic(0, topic0); statement.set_topic(1, topic1); statement.set_channel(channel); - statement.set_priority(self.sent_count); + statement.set_expiry_from_parts(u32::MAX, self.sent_count); statement.set_plain_data(request_data); statement.sign_sr25519_private(&self.keyring); diff --git a/substrate/client/rpc-api/src/statement/mod.rs b/substrate/client/rpc-api/src/statement/mod.rs index 0de1b31545b8b..add2594114685 100644 --- a/substrate/client/rpc-api/src/statement/mod.rs +++ b/substrate/client/rpc-api/src/statement/mod.rs @@ -19,26 +19,11 @@ //! Substrate Statement Store RPC API. use jsonrpsee::{core::RpcResult, proc_macros::rpc}; -use serde::{Deserialize, Serialize}; use sp_core::Bytes; -use sp_statement_store::SubmitResult; +use sp_statement_store::{SubmitResult, TopicFilter}; pub mod error; -/// Filter for subscribing to statements with different topics. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum TopicFilter { - /// Matches all topics. - Any, - /// Matches only statements including all of the given topics. - /// Bytes are expected to be a 32-byte topic. Up to `4` topics can be provided. - MatchAll(Vec), - /// Matches statements including any of the given topics. - /// Bytes are expected to be a 32-byte topic. Up to `128` topics can be provided. - MatchAny(Vec), -} - /// Substrate statement RPC API #[rpc(client, server)] pub trait StatementApi { diff --git a/substrate/client/rpc/Cargo.toml b/substrate/client/rpc/Cargo.toml index 3de7781c2249b..02838f4b92f0d 100644 --- a/substrate/client/rpc/Cargo.toml +++ b/substrate/client/rpc/Cargo.toml @@ -26,6 +26,7 @@ sc-chain-spec = { workspace = true, default-features = true } sc-client-api = { workspace = true, default-features = true } sc-mixnet = { workspace = true, default-features = true } sc-rpc-api = { workspace = true, default-features = true } +sc-statement-store = { workspace = true, default-features = true } sc-tracing = { workspace = true, default-features = true } sc-transaction-pool-api = { workspace = true, default-features = true } sc-utils = { workspace = true, default-features = true } @@ -47,9 +48,14 @@ assert_matches = { workspace = true } pretty_assertions = { workspace = true } sc-network = { workspace = true, default-features = true } sc-transaction-pool = { workspace = true, default-features = true } +sc-statement-store = { workspace = true } +sc-keystore = { workspace = true } sp-consensus = { workspace = true, default-features = true } sp-crypto-hashing = { workspace = true, default-features = true } substrate-test-runtime-client = { workspace = true } +tempfile = { workspace = true } +async-channel = { workspace = true } +futures-timer = { workspace = true } [features] test-helpers = [] diff --git a/substrate/client/rpc/src/statement/mod.rs b/substrate/client/rpc/src/statement/mod.rs index 350a99e645d2e..cbdaa6e74593b 100644 --- a/substrate/client/rpc/src/statement/mod.rs +++ b/substrate/client/rpc/src/statement/mod.rs @@ -18,111 +18,49 @@ //! Substrate statement store API. -use codec::{Decode, Encode}; +use codec::Decode; use jsonrpsee::{ core::{async_trait, RpcResult}, - Extensions, + Extensions, PendingSubscriptionSink, }; /// Re-export the API for backward compatibility. pub use sc_rpc_api::statement::{error::Error, StatementApiServer}; use sp_core::Bytes; -use sp_statement_store::{StatementSource, SubmitResult}; +use sp_statement_store::{StatementSource, SubmitResult, TopicFilter}; use std::sync::Arc; +use crate::{ + utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription}, + SubscriptionTaskExecutor, +}; + +#[cfg(test)] +mod tests; + +/// Trait alias for statement store API required by the RPC. +pub trait StatementStoreApi: + sp_statement_store::StatementStore + sc_statement_store::StatementStoreSubscriptionApi +{ +} +impl StatementStoreApi for T where + T: sp_statement_store::StatementStore + sc_statement_store::StatementStoreSubscriptionApi +{ +} /// Statement store API pub struct StatementStore { - store: Arc, + store: Arc, + executor: SubscriptionTaskExecutor, } impl StatementStore { /// Create new instance of Offchain API. - pub fn new(store: Arc) -> Self { - StatementStore { store } + pub fn new(store: Arc, executor: SubscriptionTaskExecutor) -> Self { + StatementStore { store, executor } } } #[async_trait] impl StatementApiServer for StatementStore { - fn dump(&self, ext: &Extensions) -> RpcResult> { - sc_rpc_api::check_if_safe(ext)?; - - let statements = - self.store.statements().map_err(|e| Error::StatementStore(e.to_string()))?; - Ok(statements.into_iter().map(|(_, s)| s.encode().into()).collect()) - } - - fn broadcasts(&self, match_all_topics: Vec<[u8; 32]>) -> RpcResult> { - Ok(self - .store - .broadcasts(&match_all_topics) - .map_err(|e| Error::StatementStore(e.to_string()))? - .into_iter() - .map(Into::into) - .collect()) - } - - fn posted(&self, match_all_topics: Vec<[u8; 32]>, dest: [u8; 32]) -> RpcResult> { - Ok(self - .store - .posted(&match_all_topics, dest) - .map_err(|e| Error::StatementStore(e.to_string()))? - .into_iter() - .map(Into::into) - .collect()) - } - - fn posted_clear( - &self, - match_all_topics: Vec<[u8; 32]>, - dest: [u8; 32], - ) -> RpcResult> { - Ok(self - .store - .posted_clear(&match_all_topics, dest) - .map_err(|e| Error::StatementStore(e.to_string()))? - .into_iter() - .map(Into::into) - .collect()) - } - - fn broadcasts_stmt(&self, match_all_topics: Vec<[u8; 32]>) -> RpcResult> { - Ok(self - .store - .broadcasts_stmt(&match_all_topics) - .map_err(|e| Error::StatementStore(e.to_string()))? - .into_iter() - .map(Into::into) - .collect()) - } - - fn posted_stmt( - &self, - match_all_topics: Vec<[u8; 32]>, - dest: [u8; 32], - ) -> RpcResult> { - Ok(self - .store - .posted_stmt(&match_all_topics, dest) - .map_err(|e| Error::StatementStore(e.to_string()))? - .into_iter() - .map(Into::into) - .collect()) - } - - fn posted_clear_stmt( - &self, - match_all_topics: Vec<[u8; 32]>, - dest: [u8; 32], - ) -> RpcResult> { - Ok(self - .store - .posted_clear_stmt(&match_all_topics, dest) - .map_err(|e| Error::StatementStore(e.to_string()))? - .into_iter() - .map(Into::into) - .collect()) - } - fn submit(&self, encoded: Bytes) -> RpcResult { let statement = Decode::decode(&mut &*encoded) .map_err(|e| Error::StatementStore(format!("Error decoding statement: {:?}", e)))?; @@ -134,7 +72,57 @@ impl StatementApiServer for StatementStore { } } - fn remove(&self, hash: [u8; 32]) -> RpcResult<()> { - Ok(self.store.remove(&hash).map_err(|e| Error::StatementStore(e.to_string()))?) + fn subscribe_statement( + &self, + pending: PendingSubscriptionSink, + ext: &Extensions, + topic_filter: TopicFilter, + ) { + let checked_topic_filter = match topic_filter.try_into() { + Ok(filter) => filter, + Err(e) => { + spawn_subscription_task( + &self.executor, + pending.reject(Error::StatementStore(format!( + "Error parsing topic filter: {:?}", + e + ))), + ); + return; + }, + }; + + let (existing_statements, subscription_sender, subscription_stream) = + match self.store.subscribe_statement(checked_topic_filter) { + Ok(res) => res, + Err(err) => { + spawn_subscription_task( + &self.executor, + pending.reject(Error::StatementStore(format!( + "Error collecting existing statements: {:?}", + err + ))), + ); + return; + }, + }; + + spawn_subscription_task(&self.executor, async { + PendingSubscription::from(pending) + .pipe_from_stream(subscription_stream, BoundedVecDeque::new(2048 * 2048)) + .await; + }); + + // Send existing statements before returning, to make sure we did not miss any statements. + for statement in existing_statements { + // Channel size is chosen to be large enough to always fit existing statements. + if let Err(e) = subscription_sender.try_send(statement.into()) { + log::warn!( + target: "statement_store_rpc", + "Failed to send existing statement in subscription: {:?}", e + ); + break; + } + } } } diff --git a/substrate/client/rpc/src/statement/tests.rs b/substrate/client/rpc/src/statement/tests.rs new file mode 100644 index 0000000000000..401f9e788300e --- /dev/null +++ b/substrate/client/rpc/src/statement/tests.rs @@ -0,0 +1,163 @@ +use super::*; +use crate::testing::test_executor; +use codec::Encode; +use futures::FutureExt; +use jsonrpsee::Subscription; +use sc_statement_store::Store; +use sp_core::traits::SpawnNamed; +use sp_statement_store::Statement; +use std::sync::Arc; + +fn generate_statements() -> Vec { + let topic = [0u8; 32]; + let topic1 = [1u8; 32]; + let topic2 = [2u8; 32]; + + let mut statements = Vec::new(); + let mut statement = sp_statement_store::Statement::new(); + statement.set_topic(0, topic); + statement.set_topic(1, topic2); + + statement + .set_proof(sp_statement_store::Proof::Ed25519 { signature: [0u8; 64], signer: [0u8; 32] }); + statement.set_expiry_from_parts(u32::MAX, 1); + + statements.push(statement.clone()); + + let mut statement = sp_statement_store::Statement::new(); + statement.set_topic(0, topic); + statement.set_topic(1, topic1); + statement + .set_proof(sp_statement_store::Proof::Ed25519 { signature: [0u8; 64], signer: [0u8; 32] }); + statement.set_expiry_from_parts(u32::MAX, 1); + + statements.push(statement.clone()); + statements +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 10)] +async fn subscribe_works() { + let executor = test_executor(); + let client = Arc::new(substrate_test_runtime_client::new()); + let temp_dir = tempfile::tempdir().expect("Failed to create temp dir"); + let store = Store::new_shared( + temp_dir.path(), + Default::default(), + Arc::clone(&client) as Arc<_>, + Arc::new(sc_keystore::LocalKeystore::in_memory()), + None, + Box::new(executor.as_ref().clone()), + ) + .expect("Failed to create statement store"); + + let api = super::StatementStore::new(Arc::clone(&store) as Arc<_>, executor.clone()); + let api_rpc = api.into_rpc(); + let api_rpc_clone = api_rpc.clone(); + let submitted = generate_statements(); + let first_topic: Bytes = submitted[0].topic(0).expect("Should have topic").to_vec().into(); + + let topic_filter = TopicFilter::MatchAll(vec![first_topic.clone()]); + let submitted_clone = submitted.clone(); + let subscription = api_rpc + .subscribe("statement_subscribeStatement", (topic_filter.clone(),), 100000) + .await + .expect("Failed to subscribe"); + + let subscription_any = api_rpc + .subscribe_unbounded("statement_subscribeStatement", (TopicFilter::Any,)) + .await + .expect("Failed to subscribe"); + + let match_any_topic = TopicFilter::MatchAny(vec![ + submitted[0].topic(1).expect("Should have topic").to_vec().into(), + submitted[1].topic(1).expect("Should have topic").to_vec().into(), + ]); + let subscription_match_any = api_rpc + .subscribe_unbounded("statement_subscribeStatement", (match_any_topic.clone(),)) + .await + .expect("Failed to subscribe"); + + executor.spawn( + "test", + None, + async move { + for statement in submitted_clone { + let encoded_statement: Bytes = statement.encode().into(); + let _: SubmitResult = api_rpc_clone + .call("statement_submit", (encoded_statement,)) + .await + .expect("Failed to submit statement"); + } + } + .boxed(), + ); + + check_submitted(submitted.clone(), subscription).await; + check_submitted(submitted.clone(), subscription_any).await; + check_submitted(submitted.clone(), subscription_match_any).await; + + // Check submitting after initial statements gets all statements through as well. + let subscription = api_rpc + .subscribe("statement_subscribeStatement", (topic_filter,), 100000) + .await + .expect("Failed to subscribe"); + + let subscription_any = api_rpc + .subscribe_unbounded("statement_subscribeStatement", (TopicFilter::Any,)) + .await + .expect("Failed to subscribe"); + + let subscription_match_any = api_rpc + .subscribe_unbounded("statement_subscribeStatement", (match_any_topic,)) + .await + .expect("Failed to subscribe"); + + check_submitted(submitted.clone(), subscription).await; + check_submitted(submitted.clone(), subscription_any).await; + check_submitted(submitted.clone(), subscription_match_any).await; + + let mut match_any_with_random = api_rpc + .subscribe_unbounded( + "statement_subscribeStatement", + (TopicFilter::MatchAny(vec![vec![7u8; 32].into()]),), + ) + .await + .expect("Failed to subscribe"); + + let res = tokio::time::timeout( + std::time::Duration::from_secs(5), + match_any_with_random.next::(), + ) + .await; + assert!(res.is_err(), "expected no message for random topic"); + + let match_all_with_random = TopicFilter::MatchAll(vec![first_topic, vec![7u8; 32].into()]); + let mut match_all_with_random = api_rpc + .subscribe("statement_subscribeStatement", (match_all_with_random,), 100000) + .await + .expect("Failed to subscribe"); + + let res = tokio::time::timeout( + std::time::Duration::from_secs(5), + match_all_with_random.next::(), + ) + .await; + assert!(res.is_err(), "expected no message for random topic"); +} + +async fn check_submitted( + mut expected: Vec, + mut subscription: Subscription, +) { + while !expected.is_empty() { + let result = subscription.next::().await; + let result = result.expect("Bytes").expect("Success").0; + let new_statement = + sp_statement_store::Statement::decode(&mut &result.0[..]).expect("Decode statement"); + let position = expected + .iter() + .position(|x| x == &new_statement) + .expect("Statement should exist"); + expected.remove(position); + } +} diff --git a/substrate/client/statement-store/Cargo.toml b/substrate/client/statement-store/Cargo.toml index d62622401494b..8f8c19ac439a3 100644 --- a/substrate/client/statement-store/Cargo.toml +++ b/substrate/client/statement-store/Cargo.toml @@ -16,7 +16,10 @@ workspace = true targets = ["x86_64-unknown-linux-gnu"] [dependencies] +async-channel = { workspace = true } +futures = { workspace = true } log = { workspace = true, default-features = true } +sc-utils = { workspace = true } parity-db = { workspace = true } parking_lot = { workspace = true, default-features = true } prometheus-endpoint = { workspace = true, default-features = true } diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index 6b4945d4e2553..14310eac00153 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -51,26 +51,33 @@ mod metrics; pub use sp_statement_store::{Error, StatementStore, MAX_TOPICS}; +use crate::subscription::{SubscriptionStatementsStream, SubscriptionsHandle}; use metrics::MetricsLink as PrometheusMetrics; use parking_lot::RwLock; use prometheus_endpoint::Registry as PrometheusRegistry; use sc_keystore::LocalKeystore; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; -use sp_core::{crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode}; +use sp_core::{ + crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Bytes, Decode, Encode, +}; use sp_runtime::traits::Block as BlockT; use sp_statement_store::{ runtime_api::{ InvalidStatement, StatementSource, StatementStoreExt, ValidStatement, ValidateStatement, }, - AccountId, BlockHash, Channel, DecryptionKey, Hash, InvalidReason, Proof, RejectionReason, - Result, Statement, SubmitResult, Topic, + AccountId, BlockHash, Channel, CheckedTopicFilter, DecryptionKey, Hash, InvalidReason, Proof, + RejectionReason, Result, Statement, SubmitResult, Topic, MAX_ANY_TOPICS, }; use std::{ collections::{BTreeMap, HashMap, HashSet}, sync::Arc, }; +pub use subscription::StatementStoreSubscriptionApi; + +mod subscription; + const KEY_VERSION: &[u8] = b"version".as_slice(); const CURRENT_VERSION: u32 = 1; @@ -88,6 +95,9 @@ pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GiB pub const MAX_STATEMENT_SIZE: usize = sc_network_statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1; +/// Number of subscription filter worker tasks. +const NUM_FILTER_WORKERS: usize = 4; + const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30); mod col { @@ -99,7 +109,7 @@ mod col { } #[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)] -struct Priority(u32); +struct Priority(u64); #[derive(PartialEq, Eq)] struct PriorityKey { @@ -202,6 +212,7 @@ where pub struct Store { db: parity_db::Db, index: RwLock, + subscription_manager: SubscriptionsHandle, validate_fn: Box< dyn Fn( Option, @@ -241,7 +252,7 @@ impl Index { if nt > 0 || key.is_some() { self.topics_and_keys.insert(hash, (all_topics, key)); } - let priority = Priority(statement.priority().unwrap_or(0)); + let priority = Priority(statement.expiry()); self.entries.insert(hash, (account, priority, statement.data_len())); self.recent.insert(hash); self.total_size += statement.data_len(); @@ -270,6 +281,69 @@ impl Index { } fn iterate_with( + &self, + key: Option, + topic: &CheckedTopicFilter, + f: impl FnMut(&Hash) -> Result<()>, + ) -> Result<()> { + match topic { + CheckedTopicFilter::Any => self.iterate_with_any(key, f), + CheckedTopicFilter::MatchAll(topics) => self.iterate_with_match_all(key, topics, f), + CheckedTopicFilter::MatchAny(topics) => self.iterate_with_match_any(key, topics, f), + } + } + + fn iterate_with_match_any( + &self, + key: Option, + match_any_topics: &[Topic], + mut f: impl FnMut(&Hash) -> Result<()>, + ) -> Result<()> { + if match_any_topics.len() > MAX_ANY_TOPICS { + return Ok(()) + } + + let key_set = self.by_dec_key.get(&key); + if key_set.map_or(0, |s| s.len()) == 0 { + // Key does not exist in the index. + return Ok(()) + } + + for t in match_any_topics.iter() { + let set = self.by_topic.get(t); + + for item in set.iter().map(|set| set.iter()).flatten() { + if key_set.map_or(false, |s| s.contains(item)) { + log::trace!( + target: LOG_TARGET, + "Iterating by topic/key: statement {:?}", + HexDisplay::from(item) + ); + f(item)? + } + } + } + Ok(()) + } + + fn iterate_with_any( + &self, + key: Option, + mut f: impl FnMut(&Hash) -> Result<()>, + ) -> Result<()> { + let key_set = self.by_dec_key.get(&key); + if key_set.map_or(0, |s| s.len()) == 0 { + // Key does not exist in the index. + return Ok(()) + } + + for item in key_set.map(|hashes| hashes.iter()).into_iter().flatten() { + f(item)? + } + Ok(()) + } + + fn iterate_with_match_all( &self, key: Option, match_all_topics: &[Topic], @@ -399,7 +473,7 @@ impl Index { let mut evicted = HashSet::new(); let mut would_free_size = 0; - let priority = Priority(statement.priority().unwrap_or(0)); + let priority = Priority(statement.expiry()); let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize); // It may happen that we can't delete enough lower priority messages // to satisfy size constraints. We check for that before deleting anything, @@ -417,8 +491,8 @@ impl Index { channel_record.priority, ); return Err(RejectionReason::ChannelPriorityTooLow { - submitted_priority: priority.0, - min_priority: channel_record.priority.0, + submitted_expiry: priority.0, + min_expiry: channel_record.priority.0, }); } else { // Would replace channel message. Still need to check for size constraints @@ -463,8 +537,8 @@ impl Index { entry.priority, ); return Err(RejectionReason::AccountFull { - submitted_priority: priority.0, - min_priority: entry.priority.0, + submitted_expiry: priority.0, + min_expiry: entry.priority.0, }); } evicted.insert(entry.hash); @@ -502,7 +576,7 @@ impl Store { client: Arc, keystore: Arc, prometheus: Option<&PrometheusRegistry>, - task_spawner: &dyn SpawnNamed, + task_spawner: Box, ) -> Result> where Block: BlockT, @@ -510,7 +584,8 @@ impl Store { Client: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, Client::Api: ValidateStatement, { - let store = Arc::new(Self::new(path, options, client, keystore, prometheus)?); + let store = + Arc::new(Self::new(path, options, client, keystore, prometheus, task_spawner.clone())?); // Perform periodic statement store maintenance let worker_store = store.clone(); @@ -538,6 +613,7 @@ impl Store { client: Arc, keystore: Arc, prometheus: Option<&PrometheusRegistry>, + task_spawner: Box, ) -> Result where Block: BlockT, @@ -588,6 +664,10 @@ impl Store { keystore, time_override: None, metrics: PrometheusMetrics::new(prometheus), + subscription_manager: SubscriptionsHandle::new( + task_spawner.clone(), + NUM_FILTER_WORKERS, + ), }; store.populate()?; Ok(store) @@ -645,15 +725,15 @@ impl Store { Ok(()) } - fn collect_statements( + fn collect_statements_locked( &self, key: Option, - match_all_topics: &[Topic], + topic_filter: &CheckedTopicFilter, + index: &Index, + result: &mut Vec, mut f: impl FnMut(Statement) -> Option, - ) -> Result> { - let mut result = Vec::new(); - let index = self.index.read(); - index.iterate_with(key, match_all_topics, |hash| { + ) -> Result<()> { + index.iterate_with(key, topic_filter, |hash| { match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? { Some(entry) => { if let Ok(statement) = Statement::decode(&mut entry.as_slice()) { @@ -680,6 +760,18 @@ impl Store { } Ok(()) })?; + Ok(()) + } + + fn collect_statements( + &self, + key: Option, + topic_filter: &CheckedTopicFilter, + f: impl FnMut(Statement) -> Option, + ) -> Result> { + let mut result = Vec::new(); + let index = self.index.read(); + self.collect_statements_locked(key, topic_filter, &index, &mut result, f)?; Ok(result) } @@ -736,45 +828,49 @@ impl Store { // Map the statement and the decrypted data to the desired result. mut map_f: impl FnMut(Statement, Vec) -> R, ) -> Result> { - self.collect_statements(Some(dest), match_all_topics, |statement| { - if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) { - let public: sp_core::ed25519::Public = UncheckedFrom::unchecked_from(key); - let public: sp_statement_store::ed25519::Public = public.into(); - match self.keystore.key_pair::(&public) { - Err(e) => { - log::debug!( - target: LOG_TARGET, - "Keystore error: {:?}, for statement {:?}", - e, - HexDisplay::from(&statement.hash()) - ); - None - }, - Ok(None) => { - log::debug!( - target: LOG_TARGET, - "Keystore is missing key for statement {:?}", - HexDisplay::from(&statement.hash()) - ); - None - }, - Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) { - Ok(r) => r.map(|data| map_f(statement, data)), + self.collect_statements( + Some(dest), + &CheckedTopicFilter::MatchAll(match_all_topics.to_vec()), + |statement| { + if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) { + let public: sp_core::ed25519::Public = UncheckedFrom::unchecked_from(key); + let public: sp_statement_store::ed25519::Public = public.into(); + match self.keystore.key_pair::(&public) { Err(e) => { log::debug!( target: LOG_TARGET, - "Decryption error: {:?}, for statement {:?}", + "Keystore error: {:?}, for statement {:?}", e, HexDisplay::from(&statement.hash()) ); None }, - }, + Ok(None) => { + log::debug!( + target: LOG_TARGET, + "Keystore is missing key for statement {:?}", + HexDisplay::from(&statement.hash()) + ); + None + }, + Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) { + Ok(r) => r.map(|data| map_f(statement, data)), + Err(e) => { + log::debug!( + target: LOG_TARGET, + "Decryption error: {:?}, for statement {:?}", + e, + HexDisplay::from(&statement.hash()) + ); + None + }, + }, + } + } else { + None } - } else { - None - } - }) + }, + ) } } @@ -851,14 +947,22 @@ impl StatementStore for Store { /// Return the data of all known statements which include all topics and have no `DecryptionKey` /// field. fn broadcasts(&self, match_all_topics: &[Topic]) -> Result>> { - self.collect_statements(None, match_all_topics, |statement| statement.into_data()) + self.collect_statements( + None, + &CheckedTopicFilter::MatchAll(match_all_topics.to_vec()), + |statement| statement.into_data(), + ) } /// Return the data of all known statements whose decryption key is identified as `dest` (this /// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the /// private key for symmetric ciphers). fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result>> { - self.collect_statements(Some(dest), match_all_topics, |statement| statement.into_data()) + self.collect_statements( + Some(dest), + &CheckedTopicFilter::MatchAll(match_all_topics.to_vec()), + |statement| statement.into_data(), + ) } /// Return the decrypted data of all known statements whose decryption key is identified as @@ -870,14 +974,22 @@ impl StatementStore for Store { /// Return all known statements which include all topics and have no `DecryptionKey` /// field. fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result>> { - self.collect_statements(None, match_all_topics, |statement| Some(statement.encode())) + self.collect_statements( + None, + &CheckedTopicFilter::MatchAll(match_all_topics.to_vec()), + |statement| Some(statement.encode()), + ) } /// Return all known statements whose decryption key is identified as `dest` (this /// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the /// private key for symmetric ciphers). fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result>> { - self.collect_statements(Some(dest), match_all_topics, |statement| Some(statement.encode())) + self.collect_statements( + Some(dest), + &CheckedTopicFilter::MatchAll(match_all_topics.to_vec()), + |statement| Some(statement.encode()), + ) } /// Return the statement and the decrypted data of all known statements whose decryption key is @@ -898,6 +1010,15 @@ impl StatementStore for Store { /// Submit a statement to the store. Validates the statement and returns validation result. fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult { let hash = statement.hash(); + // Get unix timestamp + if self.timestamp() >= statement.get_expiration_timestamp_secs().into() { + log::debug!( + target: LOG_TARGET, + "Statement is already expired: {:?}", + HexDisplay::from(&hash), + ); + return SubmitResult::Invalid(InvalidReason::AlreadyExpired); + } let encoded_size = statement.encoded_size(); if encoded_size > MAX_STATEMENT_SIZE { log::debug!( @@ -942,6 +1063,7 @@ impl StatementStore for Store { None }; let validation_result = (self.validate_fn)(at_block, source, statement.clone()); + let validation = match validation_result { Ok(validation) => validation, Err(InvalidStatement::BadProof) => { @@ -991,6 +1113,7 @@ impl StatementStore for Store { ); return SubmitResult::InternalError(Error::Db(e.to_string())) } + self.subscription_manager.notify(statement); } // Release index lock self.metrics.report(|metrics| metrics.submitted_statements.inc()); log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash)); @@ -1049,15 +1172,39 @@ impl StatementStore for Store { } } +impl StatementStoreSubscriptionApi for Store { + fn subscribe_statement( + &self, + topic_filter: CheckedTopicFilter, + ) -> Result<(Vec>, async_channel::Sender, SubscriptionStatementsStream)> { + // Keep the index read lock until after we have subscribed to avoid missing statements. + let mut existing_statements = Vec::new(); + let (subscription_sender, subscription_stream) = { + let index = self.index.read(); + self.collect_statements_locked( + None, + &topic_filter, + &index, + &mut existing_statements, + |statement| Some(statement.encode()), + )?; + self.subscription_manager.subscribe(topic_filter, existing_statements.len()) + }; + Ok((existing_statements, subscription_sender, subscription_stream)) + } +} + #[cfg(test)] mod tests { + use core::num; + use crate::Store; use sc_keystore::Keystore; use sp_core::{Decode, Encode, Pair}; use sp_statement_store::{ runtime_api::{InvalidStatement, ValidStatement, ValidateStatement}, - AccountId, Channel, DecryptionKey, Proof, SignatureVerificationResult, Statement, - StatementSource, StatementStore, SubmitResult, Topic, + AccountId, Channel, CheckedTopicFilter, DecryptionKey, Proof, SignatureVerificationResult, + Statement, StatementSource, StatementStore, SubmitResult, Topic, }; type Extrinsic = sp_runtime::OpaqueExtrinsic; @@ -1152,11 +1299,19 @@ mod tests { let mut path: std::path::PathBuf = temp_dir.path().into(); path.push("db"); let keystore = std::sync::Arc::new(sc_keystore::LocalKeystore::in_memory()); - let store = Store::new(&path, Default::default(), client, keystore, None).unwrap(); + let store = Store::new( + &path, + Default::default(), + client, + keystore, + None, + Box::new(sp_core::testing::TaskExecutor::new()), + ) + .unwrap(); (store, temp_dir) // return order is important. Store must be dropped before TempDir } - fn signed_statement(data: u8) -> Statement { + pub fn signed_statement(data: u8) -> Statement { signed_statement_with_topics(data, &[], None) } @@ -1167,6 +1322,8 @@ mod tests { ) -> Statement { let mut statement = Statement::new(); statement.set_plain_data(vec![data]); + statement.set_expiry(u64::MAX); + for i in 0..topics.len() { statement.set_topic(i, topics[i]); } @@ -1207,7 +1364,7 @@ mod tests { let mut data = Vec::new(); data.resize(data_len, 0); statement.set_plain_data(data); - statement.set_priority(priority); + statement.set_expiry_from_parts(u32::MAX, priority); if let Some(c) = c { statement.set_channel(channel(c)); } @@ -1246,7 +1403,15 @@ mod tests { let client = std::sync::Arc::new(TestClient); let mut path: std::path::PathBuf = temp.path().into(); path.push("db"); - let store = Store::new(&path, Default::default(), client, keystore, None).unwrap(); + let store = Store::new( + &path, + Default::default(), + client, + keystore, + None, + Box::new(sp_core::testing::TaskExecutor::new()), + ) + .unwrap(); assert_eq!(store.statements().unwrap().len(), 3); assert_eq!(store.broadcasts(&[]).unwrap().len(), 3); assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1)); @@ -1450,7 +1615,15 @@ mod tests { let client = std::sync::Arc::new(TestClient); let mut path: std::path::PathBuf = temp.path().into(); path.push("db"); - let store = Store::new(&path, Default::default(), client, keystore, None).unwrap(); + let store = Store::new( + &path, + Default::default(), + client, + keystore, + None, + Box::new(sp_core::testing::TaskExecutor::new()), + ) + .unwrap(); assert_eq!(store.statements().unwrap().len(), 0); assert_eq!(store.index.read().expired.len(), 0); } diff --git a/substrate/client/statement-store/src/subscription.rs b/substrate/client/statement-store/src/subscription.rs new file mode 100644 index 0000000000000..da3a4c182cc34 --- /dev/null +++ b/substrate/client/statement-store/src/subscription.rs @@ -0,0 +1,620 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Subscription logic for statement store. +//! +//! Manages subscriptions to statement topics and notifies subscribers when new statements arrive. +//! Uses multiple matcher tasks to handle subscriptions concurrently, each responsible for a subset +//! of subscriptions. Each matcher task maintains its own list of subscriptions and matches incoming +//! statements against them. When a new statement is submitted, it is sent to all matcher tasks for +//! processing. If a statement matches a subscription's filter, it is sent to the subscriber via an +//! async channel. +//! +//! This design allows for efficient handling of a large number of subscriptions and statements and +//! can be scaled by adjusting the number of matcher tasks. + +// Buffer size for the matcher task channels, to backpressure the submission senders. +// This value is generous to allow for bursts of statements without dropping any or backpressuring +// too early. +const MATCHERS_TASK_CHANNEL_BUFFER_SIZE: usize = 80_000; + +use futures::{Stream, StreamExt}; + +use crate::LOG_TARGET; +use sc_utils::id_sequence::SeqID; +use sp_core::{traits::SpawnNamed, Bytes, Encode}; +pub use sp_statement_store::StatementStore; +use sp_statement_store::{CheckedTopicFilter, Result, Statement, Topic}; +use std::{ + collections::{HashMap, HashSet}, + sync::atomic::AtomicU64, +}; + +/// Trait for initiating statement store subscriptions from the RPC module. +pub trait StatementStoreSubscriptionApi: Send + Sync { + /// Subscribe to statements matching the topic filter. + /// + /// Returns existing matching statements, a sender channel to send matched statements and a + /// stream for receiving matched statements when they arrive. + fn subscribe_statement( + &self, + topic_filter: CheckedTopicFilter, + ) -> Result<(Vec>, async_channel::Sender, SubscriptionStatementsStream)>; +} + +#[derive(Clone)] +pub enum MatcherMessage { + NewStatement(Statement), + Subscribe(SubscriptionInfo), + Unsubscribe(SeqID), +} + +// Manages subscriptions to statement topics and notifies subscribers when new statements arrive. +pub struct SubscriptionsHandle { + // Sequence generator for subscription IDs, atomic for thread safety. + // Subscription creation is expensive enough that we don't worry about overflow here. + id_sequence: AtomicU64, + // Subscriptions matchers handlers. + matchers: SubscriptionsMatchersHandlers, +} + +impl SubscriptionsHandle { + /// Create a new SubscriptionsHandle with the given task spawner and number of filter workers. + pub(crate) fn new( + task_spawner: Box, + num_matcher_workers: usize, + ) -> SubscriptionsHandle { + let mut subscriptions_matchers_senders = Vec::with_capacity(num_matcher_workers); + + for _ in 0..num_matcher_workers { + let (subscription_matcher_sender, subscription_matcher_receiver) = + async_channel::bounded(MATCHERS_TASK_CHANNEL_BUFFER_SIZE); + subscriptions_matchers_senders.push(subscription_matcher_sender); + task_spawner.spawn_blocking( + "statement-store-subscription-filters", + Some("statement-store"), + Box::pin(async move { + let mut subscriptions = SubscriptionsInfo::new(); + + loop { + match subscription_matcher_receiver.recv().await { + Ok(MatcherMessage::NewStatement(statement)) => { + subscriptions.notify_matching_subscribers(&statement); + subscriptions.notify_any_subscribers(&statement); + }, + Ok(MatcherMessage::Subscribe(info)) => { + subscriptions.subscribe(info); + }, + Ok(MatcherMessage::Unsubscribe(seq_id)) => { + subscriptions.unsubscribe(seq_id); + }, + Err(_) => { + // Expected when the subscription manager is dropped at shutdown. + log::error!( + target: LOG_TARGET, + "Statement subscription matcher channel closed" + ); + break + }, + }; + } + }), + ); + } + SubscriptionsHandle { + id_sequence: AtomicU64::new(0), + matchers: SubscriptionsMatchersHandlers::new(subscriptions_matchers_senders), + } + } + + fn next_id(&self) -> SeqID { + let id = self.id_sequence.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + SeqID::from(id) + } + + pub(crate) fn subscribe( + &self, + topic_filter: CheckedTopicFilter, + num_existing_statements: usize, + ) -> (async_channel::Sender, SubscriptionStatementsStream) { + let next_id = self.next_id(); + let (tx, rx) = async_channel::bounded(std::cmp::max( + MATCHERS_TASK_CHANNEL_BUFFER_SIZE, + num_existing_statements, + )); + let subscription_info = + SubscriptionInfo { topic_filter: topic_filter.clone(), seq_id: next_id, tx }; + + let result = ( + subscription_info.tx.clone(), + SubscriptionStatementsStream { + rx, + sub_id: subscription_info.seq_id, + matchers: self.matchers.clone(), + }, + ); + + self.matchers + .send_by_seq_id(subscription_info.seq_id, MatcherMessage::Subscribe(subscription_info)); + result + } + + pub(crate) fn notify(&self, statement: Statement) { + self.matchers.send_all(MatcherMessage::NewStatement(statement)); + } +} + +// Information about all subscriptions. +// Each matcher task will have its own instance of this struct. +struct SubscriptionsInfo { + // Subscriptions organized by topic, there can be multiple entries per subscription if it + // subscribes to multiple topics with MatchAll or MatchAny filters. + subscriptions_by_topic: HashMap>, + // Subscriptions that listen with Any filter (i.e., no topic filtering). + subscriptions_any: HashMap, + // Mapping from subscription ID to topic filter. + by_sub_id: HashMap, +} + +// Information about a single subscription. +#[derive(Clone)] +pub(crate) struct SubscriptionInfo { + // The filter used for this subscription. + topic_filter: CheckedTopicFilter, + // The unique ID of this subscription. + seq_id: SeqID, + // Channel to send matched statements to the subscriber. + tx: async_channel::Sender, +} + +impl SubscriptionsInfo { + fn new() -> SubscriptionsInfo { + SubscriptionsInfo { + subscriptions_by_topic: HashMap::new(), + subscriptions_any: HashMap::new(), + by_sub_id: HashMap::new(), + } + } + + // Subscribe a new subscription. + fn subscribe(&mut self, subscription_info: SubscriptionInfo) { + self.by_sub_id + .insert(subscription_info.seq_id, subscription_info.topic_filter.clone()); + let topics = match &subscription_info.topic_filter { + CheckedTopicFilter::Any => { + self.subscriptions_any + .insert(subscription_info.seq_id, subscription_info.clone()); + return; + }, + CheckedTopicFilter::MatchAll(topics) => topics, + CheckedTopicFilter::MatchAny(topics) => topics, + }; + for topic in topics { + self.subscriptions_by_topic + .entry(*topic) + .or_insert_with(Default::default) + .insert(subscription_info.seq_id, subscription_info.clone()); + } + } + + // Notify a single subscriber, marking it for unsubscribing if sending fails. + fn notify_subscriber( + &self, + subscription: &SubscriptionInfo, + bytes_to_send: Bytes, + needs_unsubscribing: &mut HashSet, + ) { + if let Err(err) = subscription.tx.try_send(bytes_to_send) { + log::warn!( + target: LOG_TARGET, + "Failed to send statement to subscriber {:?}: {:?} unsubscribing it", subscription.seq_id, err + ); + // Mark subscription for unsubscribing, to give it a chance to recover the buffers are + // generous enough, if subscription cannot keep up we unsubscribe it. + needs_unsubscribing.insert(subscription.seq_id); + } + } + + // Notify all subscribers whose filters match the given statement. + fn notify_matching_subscribers(&mut self, statement: &Statement) { + // Track how many topics are still needed to match for each subscription. + // `subscription_by_topic` may contain multiple entries for the same subscription if it + // subscribes to multiple topics with MatchAll or MatchAny filters. + // We decrement the counter each time we find a matching topic, and only notify + // the subscriber when the counter reaches zero. + let mut matched_senders: HashMap = HashMap::new(); + let bytes_to_send: Bytes = statement.encode().into(); + let mut needs_unsubscribing: HashSet = HashSet::new(); + + for statement_topic in statement.topics() { + if let Some(subscriptions) = self.subscriptions_by_topic.get(statement_topic) { + for subscription in subscriptions.values() { + // Check if the statement matches the subscription filter + if let Some(counter) = matched_senders.get_mut(&subscription.seq_id) { + if *counter > 0 { + *counter -= 1; + if *counter == 0 { + self.notify_subscriber( + subscription, + bytes_to_send.clone(), + &mut needs_unsubscribing, + ); + } + } + } else { + match &subscription.topic_filter { + CheckedTopicFilter::Any => { + matched_senders.insert(subscription.seq_id, 0); + self.notify_subscriber( + subscription, + bytes_to_send.clone(), + &mut needs_unsubscribing, + ); + }, + CheckedTopicFilter::MatchAll(topics) => { + let counter = topics.len() - 1; + + matched_senders.insert(subscription.seq_id, counter); + if counter == 0 { + self.notify_subscriber( + subscription, + bytes_to_send.clone(), + &mut needs_unsubscribing, + ); + } + }, + CheckedTopicFilter::MatchAny(_topics) => { + matched_senders.insert(subscription.seq_id, 0); + self.notify_subscriber( + subscription, + bytes_to_send.clone(), + &mut needs_unsubscribing, + ); + }, + } + } + } + } + } + // Unsubscribe any subscriptions that failed to receive messages, to give them a chance to + // recover and not miss statements. + for sub_id in needs_unsubscribing { + self.unsubscribe(sub_id); + } + } + + // Notify all subscribers that don't filter by topic and want to receive all statements. + fn notify_any_subscribers(&mut self, statement: &Statement) { + let mut needs_unsubscribing: HashSet = HashSet::new(); + + let bytes_to_send: Bytes = statement.encode().into(); + for subscription in self.subscriptions_any.values() { + let _ = self.notify_subscriber( + subscription, + bytes_to_send.clone(), + &mut needs_unsubscribing, + ); + } + + // Unsubscribe any subscriptions that failed to receive messages, to give them a chance to + // recover and not miss statements. + for sub_id in needs_unsubscribing { + self.unsubscribe(sub_id); + } + } + + // Unsubscribe a subscription by its ID. + fn unsubscribe(&mut self, id: SeqID) { + let entry = match self.by_sub_id.remove(&id) { + Some(e) => e, + None => return, + }; + + let topics = match &entry { + CheckedTopicFilter::Any => { + self.subscriptions_any.remove(&id); + return; + }, + CheckedTopicFilter::MatchAll(topics) => topics, + CheckedTopicFilter::MatchAny(topics) => topics, + }; + + for topic in topics { + if let Some(subscriptions) = self.subscriptions_by_topic.get_mut(topic) { + subscriptions.remove(&id); + if subscriptions.is_empty() { + self.subscriptions_by_topic.remove(topic); + } + } + } + } +} + +// Handlers to communicate with subscription matcher tasks. +#[derive(Clone)] +pub struct SubscriptionsMatchersHandlers { + // Channels to send messages to matcher tasks. + matchers: Vec>, +} + +impl SubscriptionsMatchersHandlers { + /// Create new SubscriptionsMatchersHandlers with the given matcher task senders. + fn new(matchers: Vec>) -> SubscriptionsMatchersHandlers { + SubscriptionsMatchersHandlers { matchers } + } + + // Send a message to the matcher task responsible for the given subscription ID. + fn send_by_seq_id(&self, id: SeqID, message: MatcherMessage) { + let index: u64 = id.into(); + // If matchers channels are full we backpressure the sender, in this case it will be the + // processing of new statements. + if let Err(err) = self.matchers[index as usize % self.matchers.len()].send_blocking(message) + { + log::error!( + target: LOG_TARGET, + "Failed to send statement to matcher task: {:?}", err + ); + } + } + + // Send a message to all matcher tasks. + fn send_all(&self, message: MatcherMessage) { + for sender in &self.matchers { + if let Err(err) = sender.send_blocking(message.clone()) { + log::error!( + target: LOG_TARGET, + "Failed to send message to matcher task: {:?}", err + ); + } + } + } +} + +// Stream of statements for a subscription. +pub struct SubscriptionStatementsStream { + // Channel to receive statements. + pub rx: async_channel::Receiver, + // Subscription ID, used for cleanup on drop. + sub_id: SeqID, + // Reference to the matchers for cleanup. + matchers: SubscriptionsMatchersHandlers, +} + +// When the stream is dropped, unsubscribe from the matchers. +impl Drop for SubscriptionStatementsStream { + fn drop(&mut self) { + self.matchers + .send_by_seq_id(self.sub_id, MatcherMessage::Unsubscribe(self.sub_id)); + } +} + +impl Stream for SubscriptionStatementsStream { + type Item = Bytes; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.rx.poll_next_unpin(cx) + } +} +#[cfg(test)] +mod tests { + + use crate::tests::signed_statement; + + use super::*; + use sp_core::Decode; + use sp_statement_store::Topic; + #[test] + fn test_subscribe_unsubscribe() { + let mut subscriptions = SubscriptionsInfo::new(); + + let (tx1, _rx1) = async_channel::bounded::(10); + let topic1 = [8u8; 32]; + let topic2 = [9u8; 32]; + let sub_info1 = SubscriptionInfo { + topic_filter: CheckedTopicFilter::MatchAll(vec![topic1, topic2]), + seq_id: SeqID::from(1), + tx: tx1, + }; + subscriptions.subscribe(sub_info1.clone()); + assert!(subscriptions.subscriptions_by_topic.contains_key(&topic1)); + assert!(subscriptions.subscriptions_by_topic.contains_key(&topic2)); + assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id)); + assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id)); + + subscriptions.unsubscribe(sub_info1.seq_id); + assert!(!subscriptions.subscriptions_by_topic.contains_key(&topic1)); + assert!(!subscriptions.subscriptions_by_topic.contains_key(&topic2)); + } + + #[test] + fn test_subscribe_any() { + let mut subscriptions = SubscriptionsInfo::new(); + let (tx1, _rx1) = async_channel::bounded::(10); + let sub_info1 = SubscriptionInfo { + topic_filter: CheckedTopicFilter::Any, + seq_id: SeqID::from(1), + tx: tx1, + }; + subscriptions.subscribe(sub_info1.clone()); + assert!(subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id)); + assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id)); + subscriptions.unsubscribe(sub_info1.seq_id); + assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id)); + } + + #[test] + fn test_subscribe_match_any() { + let mut subscriptions = SubscriptionsInfo::new(); + + let (tx1, _rx1) = async_channel::bounded::(10); + let topic1 = [8u8; 32]; + let topic2 = [9u8; 32]; + let sub_info1 = SubscriptionInfo { + topic_filter: CheckedTopicFilter::MatchAny(vec![topic1, topic2]), + seq_id: SeqID::from(1), + tx: tx1, + }; + subscriptions.subscribe(sub_info1.clone()); + assert!(subscriptions.subscriptions_by_topic.contains_key(&topic1)); + assert!(subscriptions.subscriptions_by_topic.contains_key(&topic2)); + assert!(subscriptions.by_sub_id.contains_key(&sub_info1.seq_id)); + assert!(!subscriptions.subscriptions_any.contains_key(&sub_info1.seq_id)); + + subscriptions.unsubscribe(sub_info1.seq_id); + assert!(!subscriptions.subscriptions_by_topic.contains_key(&topic1)); + assert!(!subscriptions.subscriptions_by_topic.contains_key(&topic2)); + } + + #[test] + fn test_notify_any_subscribers() { + let mut subscriptions = SubscriptionsInfo::new(); + + let (tx1, rx1) = async_channel::bounded::(10); + let sub_info1 = SubscriptionInfo { + topic_filter: CheckedTopicFilter::Any, + seq_id: SeqID::from(1), + tx: tx1, + }; + subscriptions.subscribe(sub_info1.clone()); + + let mut statement = signed_statement(1); + subscriptions.notify_any_subscribers(&statement); + + let received = rx1.try_recv().expect("Should receive statement"); + let decoded_statement: Statement = + Statement::decode(&mut &received.0[..]).expect("Should decode statement"); + assert_eq!(decoded_statement, statement); + } + + #[test] + fn test_notify_match_all_subscribers() { + let mut subscriptions = SubscriptionsInfo::new(); + + let (tx1, rx1) = async_channel::bounded::(10); + let topic1 = [8u8; 32]; + let topic2 = [9u8; 32]; + let sub_info1 = SubscriptionInfo { + topic_filter: CheckedTopicFilter::MatchAll(vec![topic1, topic2]), + seq_id: SeqID::from(1), + tx: tx1, + }; + subscriptions.subscribe(sub_info1.clone()); + + let mut statement = signed_statement(1); + statement.set_topic(0, Topic::from(topic2)); + subscriptions.notify_matching_subscribers(&statement); + + // Should not receive yet, only one topic matched. + assert!(rx1.try_recv().is_err()); + + statement.set_topic(1, Topic::from(topic1)); + subscriptions.notify_matching_subscribers(&statement); + + let received = rx1.try_recv().expect("Should receive statement"); + let decoded_statement: Statement = + Statement::decode(&mut &received.0[..]).expect("Should decode statement"); + assert_eq!(decoded_statement, statement); + } + + #[test] + fn test_notify_match_any_subscribers() { + let mut subscriptions = SubscriptionsInfo::new(); + let (tx1, rx1) = async_channel::bounded::(10); + let topic1 = [8u8; 32]; + let topic2 = [9u8; 32]; + let sub_info1 = SubscriptionInfo { + topic_filter: CheckedTopicFilter::MatchAny(vec![topic1, topic2]), + seq_id: SeqID::from(1), + tx: tx1, + }; + subscriptions.subscribe(sub_info1.clone()); + let mut statement = signed_statement(1); + statement.set_topic(0, Topic::from(topic2)); + subscriptions.notify_matching_subscribers(&statement); + let received = rx1.try_recv().expect("Should receive statement"); + let decoded_statement: Statement = + Statement::decode(&mut &received.0[..]).expect("Should decode statement"); + assert_eq!(decoded_statement, statement); + } + + #[tokio::test] + async fn test_subscription_handle_with_different_workers_number() { + for num_workers in 1..5 { + let subscriptions_handle = SubscriptionsHandle::new( + Box::new(sp_core::testing::TaskExecutor::new()), + num_workers, + ); + + let topic1 = [8u8; 32]; + let topic2 = [9u8; 32]; + + let streams = (0..5) + .into_iter() + .map(|_| { + subscriptions_handle + .subscribe(CheckedTopicFilter::MatchAll(vec![topic1, topic2])) + }) + .collect::>(); + + let mut statement = signed_statement(1); + statement.set_topic(0, Topic::from(topic2)); + subscriptions_handle.notify(statement.clone()); + + statement.set_topic(1, Topic::from(topic1)); + subscriptions_handle.notify(statement.clone()); + + for (tx, mut stream) in streams { + let received = stream.next().await.expect("Should receive statement"); + let decoded_statement: Statement = + Statement::decode(&mut &received.0[..]).expect("Should decode statement"); + assert_eq!(decoded_statement, statement); + } + } + } + + // #[tokio::test] + // async fn test_handle_unsubscribe() { + // let subscriptions_handle = + // SubscriptionsHandle::new(Box::new(sp_core::testing::TaskExecutor::new()), 3); + + // let topic1 = [8u8; 32]; + // let topic2 = [9u8; 32]; + + // let streams = (0..5) + // .into_iter() + // .map(|_| subscriptions_handle.subscribe(TopicFilter::MatchAll(vec![topic1, topic2]))) + // .collect::>(); + + // // Unsubscribe all streams by dropping SubscriptionStatementsStream + // let rx_channels = + // streams.into_iter().map(|(_, stream)| stream.rx.clone()).collect::>(); + + // let mut statement = signed_statement(1); + // statement.set_topic(0, Topic::from(topic2)); + // subscriptions_handle.notify(statement.clone()); + + // statement.set_topic(1, Topic::from(topic1)); + // subscriptions_handle.notify(statement.clone()); + // for rx in rx_channels { + // assert!(rx.recv().await.is_err()); + // } + // } +} diff --git a/substrate/client/utils/src/id_sequence.rs b/substrate/client/utils/src/id_sequence.rs index a1799d4e684f8..6e8efce33f878 100644 --- a/substrate/client/utils/src/id_sequence.rs +++ b/substrate/client/utils/src/id_sequence.rs @@ -51,3 +51,15 @@ impl IDSequence { id } } + +impl Into for SeqID { + fn into(self) -> u64 { + self.0 + } +} + +impl From for SeqID { + fn from(value: u64) -> Self { + SeqID(value) + } +} diff --git a/substrate/primitives/statement-store/src/lib.rs b/substrate/primitives/statement-store/src/lib.rs index 2afaf69e04949..1817ecf9e3838 100644 --- a/substrate/primitives/statement-store/src/lib.rs +++ b/substrate/primitives/statement-store/src/lib.rs @@ -44,10 +44,26 @@ pub type Channel = [u8; 32]; /// Total number of topic fields allowed. pub const MAX_TOPICS: usize = 4; +/// Maximum number of topics allowed in `MatchAny` filter. +pub const MAX_ANY_TOPICS: usize = 128; + +/// Topic filter for statement subscriptions. +#[derive(Clone)] +pub enum CheckedTopicFilter { + /// Matches all topics. + Any, + /// Matches only statements including all of the given topics. + /// Bytes are expected to be a 32-byte topic. Up to `4` topics can be provided. + MatchAll(Vec), + /// Matches statements including any of the given topics. + /// Bytes are expected to be a 32-byte topic. Up to `128` topics can be provided. + MatchAny(Vec), +} #[cfg(feature = "std")] pub use store_api::{ Error, InvalidReason, RejectionReason, Result, StatementSource, StatementStore, SubmitResult, + TopicFilter, }; #[cfg(feature = "std")] @@ -150,8 +166,8 @@ pub enum Field { AuthenticityProof(Proof) = 0, /// An identifier for the key that `Data` field may be decrypted with. DecryptionKey(DecryptionKey) = 1, - /// Priority when competing with other messages from the same sender. - Priority(u32) = 2, + /// Expiry of the statement. + Expiry(u64) = 2, /// Account channel to use. Only one message per `(account, channel)` pair is allowed. Channel(Channel) = 3, /// First statement topic. @@ -180,7 +196,7 @@ pub struct Statement { /// Proof used for authorizing the statement. proof: Option, /// An identifier for the key that `Data` field may be decrypted with. - #[deprecated(note = "Experimental feature, may be removed in future releases")] + #[deprecated(note = "Experimental feature, may be removed/changed in future releases")] decryption_key: Option, /// Used for identifying a distinct communication channel, only a message per channel is /// stored. @@ -232,7 +248,7 @@ impl Decode for Statement { match field { Field::AuthenticityProof(p) => statement.set_proof(p), Field::DecryptionKey(key) => statement.set_decryption_key(key), - Field::Priority(p) => statement.set_priority(p), + Field::Expiry(p) => statement.set_expiry(p), Field::Channel(c) => statement.set_channel(c), Field::Topic1(t) => statement.set_topic(0, t), Field::Topic2(t) => statement.set_topic(1, t), @@ -294,6 +310,11 @@ impl Statement { } } + /// Returns slice of all topics set in the statement. + pub fn topics(&self) -> &[Topic] { + &self.topics[..self.num_topics as usize] + } + /// Sign with a given private key and add the signature proof field. #[cfg(feature = "std")] pub fn sign_sr25519_private(&mut self, key: &sp_core::sr25519::Pair) { @@ -420,6 +441,7 @@ impl Statement { } /// Returns decryption key if any. + #[allow(deprecated)] pub fn decryption_key(&self) -> Option { self.decryption_key } @@ -459,6 +481,14 @@ impl Statement { self.expiry } + /// Get expiration timestamp in seconds. + /// + /// The expiration timestamp in seconds is stored in the most significant 32 bits of the expiry + /// field. + pub fn get_expiration_timestamp_secs(&self) -> u32 { + (self.expiry >> 32) as u32 + } + /// Return encoded fields that can be signed to construct or verify a proof fn signature_material(&self) -> Vec { self.encoded(true) @@ -479,6 +509,15 @@ impl Statement { self.expiry = expiry; } + /// Set statement expiry from its parts. + /// The expiration timestamp in seconds is stored in the most significant 32 bits of the expiry + /// field. + /// The lower 32 bits represents an arbitrary sequence number used to order statements with the + /// same expiration time. + pub fn set_expiry_from_parts(&mut self, expiration_timestamp_secs: u32, sequence_number: u32) { + self.expiry = (expiration_timestamp_secs as u64) << 32 | sequence_number as u64; + } + /// Set statement channel. pub fn set_channel(&mut self, channel: Channel) { self.channel = Some(channel) @@ -493,6 +532,7 @@ impl Statement { } /// Set decryption key. + #[allow(deprecated)] pub fn set_decryption_key(&mut self, key: DecryptionKey) { self.decryption_key = Some(key); } @@ -502,12 +542,13 @@ impl Statement { self.data = Some(data) } + #[allow(deprecated)] fn encoded(&self, for_signing: bool) -> Vec { // Encoding matches that of Vec. Basically this just means accepting that there // will be a prefix of vector length. - let num_fields = if !for_signing && self.proof.is_some() { 1 } else { 0 } + + // Expiry field is always present. + let num_fields = if !for_signing && self.proof.is_some() { 2 } else { 1 } + if self.decryption_key.is_some() { 1 } else { 0 } + - if self.priority.is_some() { 1 } else { 0 } + if self.channel.is_some() { 1 } else { 0 } + if self.data.is_some() { 1 } else { 0 } + self.num_topics as u32; @@ -529,10 +570,10 @@ impl Statement { 1u8.encode_to(&mut output); decryption_key.encode_to(&mut output); } - if let Some(priority) = &self.priority { - 2u8.encode_to(&mut output); - priority.encode_to(&mut output); - } + + 2u8.encode_to(&mut output); + self.expiry().encode_to(&mut output); + if let Some(channel) = &self.channel { 3u8.encode_to(&mut output); channel.encode_to(&mut output); @@ -549,6 +590,7 @@ impl Statement { } /// Encrypt give data with given key and store both in the statements. + #[allow(deprecated)] #[cfg(feature = "std")] pub fn encrypt( &mut self, @@ -604,7 +646,7 @@ mod test { let fields = vec![ Field::AuthenticityProof(proof.clone()), Field::DecryptionKey(decryption_key), - Field::Priority(priority), + Field::Expiry(priority), Field::Channel(channel), Field::Topic1(topic1), Field::Topic2(topic2), @@ -626,7 +668,7 @@ mod test { let priority = 999; let fields = vec![ - Field::Priority(priority), + Field::Expiry(priority), Field::Topic1(topic1), Field::Topic1(topic1), Field::Topic2(topic2), @@ -636,7 +678,7 @@ mod test { assert!(Statement::decode(&mut fields.as_slice()).is_err()); let fields = - vec![Field::Topic1(topic1), Field::Priority(priority), Field::Topic2(topic2)].encode(); + vec![Field::Topic1(topic1), Field::Expiry(priority), Field::Topic2(topic2)].encode(); assert!(Statement::decode(&mut fields.as_slice()).is_err()); } diff --git a/substrate/primitives/statement-store/src/store_api.rs b/substrate/primitives/statement-store/src/store_api.rs index e949a35242091..e98e684eb90c2 100644 --- a/substrate/primitives/statement-store/src/store_api.rs +++ b/substrate/primitives/statement-store/src/store_api.rs @@ -15,8 +15,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use sp_core::Bytes; + pub use crate::runtime_api::StatementSource; -use crate::{Hash, Statement, Topic}; +use crate::{CheckedTopicFilter, Hash, Statement, Topic}; /// Statement store error. #[derive(Debug, Clone, Eq, PartialEq, thiserror::Error)] @@ -33,6 +35,56 @@ pub enum Error { Runtime, } +/// Filter for subscribing to statements with different topics. +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))] +pub enum TopicFilter { + /// Matches all topics. + Any, + /// Matches only statements including all of the given topics. + /// Bytes are expected to be a 32-byte topic. Up to `4` topics can be provided. + MatchAll(Vec), + /// Matches statements including any of the given topics. + /// Bytes are expected to be a 32-byte topic. Up to `128` topics can be provided. + MatchAny(Vec), +} + +// Convert TopicFilter to CheckedTopicFilter, validating topic lengths. +impl TryInto for TopicFilter { + type Error = Error; + + fn try_into(self) -> Result { + match self { + TopicFilter::Any => Ok(CheckedTopicFilter::Any), + TopicFilter::MatchAll(topics) => { + let mut parsed_topics = Vec::with_capacity(topics.len()); + for topic in topics { + if topic.0.len() != 32 { + return Err(Error::Decode("Invalid topic format".into())); + } + let mut arr = [0u8; 32]; + arr.copy_from_slice(&topic.0); + parsed_topics.push(arr); + } + Ok(CheckedTopicFilter::MatchAll(parsed_topics)) + }, + TopicFilter::MatchAny(topics) => { + let mut parsed_topics = Vec::with_capacity(topics.len()); + for topic in topics { + if topic.0.len() != 32 { + return Err(Error::Decode("Invalid topic format".into())); + } + let mut arr = [0u8; 32]; + arr.copy_from_slice(&topic.0); + parsed_topics.push(arr); + } + Ok(CheckedTopicFilter::MatchAny(parsed_topics)) + }, + } + } +} + /// Reason why a statement was rejected from the store. #[derive(Debug, Clone, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] @@ -45,19 +97,19 @@ pub enum RejectionReason { /// Still available data size for the account. available_size: usize, }, - /// Attempting to replace a channel message with lower or equal priority. + /// Attempting to replace a channel message with lower or equal expiry. ChannelPriorityTooLow { - /// The priority of the submitted statement. - submitted_priority: u32, - /// The minimum priority of the existing channel message. - min_priority: u32, + /// The expiry of the submitted statement. + submitted_expiry: u64, + /// The minimum expiry of the existing channel message. + min_expiry: u64, }, - /// Account reached its statement limit and submitted priority is too low to evict existing. + /// Account reached its statement limit and submitted expiry is too low to evict existing. AccountFull { - /// The priority of the submitted statement. - submitted_priority: u32, - /// The minimum priority of the existing statement. - min_priority: u32, + /// The expiry of the submitted statement. + submitted_expiry: u64, + /// The minimum expiry of the existing statement. + min_expiry: u64, }, /// The global statement store is full and cannot accept new statements. StoreFull, @@ -79,6 +131,8 @@ pub enum InvalidReason { /// The maximum allowed size. max_size: usize, }, + /// Statement has already expired. + AlreadyExpired, } /// Statement submission outcome diff --git a/substrate/test-utils/runtime/Cargo.toml b/substrate/test-utils/runtime/Cargo.toml index 91bf368f573cd..1902af59de7e9 100644 --- a/substrate/test-utils/runtime/Cargo.toml +++ b/substrate/test-utils/runtime/Cargo.toml @@ -46,6 +46,7 @@ sp-offchain = { workspace = true } sp-runtime = { features = ["serde"], workspace = true } sp-session = { workspace = true } sp-state-machine = { workspace = true } +sp-statement-store = { workspace = true } sp-transaction-pool = { workspace = true } sp-trie = { workspace = true } sp-version = { workspace = true } diff --git a/substrate/test-utils/runtime/src/lib.rs b/substrate/test-utils/runtime/src/lib.rs index a04b05021bb46..a69a478aacb96 100644 --- a/substrate/test-utils/runtime/src/lib.rs +++ b/substrate/test-utils/runtime/src/lib.rs @@ -46,10 +46,12 @@ use frame_system::{ CheckNonce, CheckWeight, }; use scale_info::TypeInfo; -use sp_application_crypto::Ss58Codec; +use sp_application_crypto::{ecdsa, ed25519, sr25519, RuntimeAppPublic, Ss58Codec}; use sp_keyring::Sr25519Keyring; - -use sp_application_crypto::{ecdsa, ed25519, sr25519, RuntimeAppPublic}; +use sp_statement_store::{ + runtime_api::{InvalidStatement, StatementSource, ValidStatement}, + Proof, SignatureVerificationResult, Statement, +}; #[cfg(feature = "bls-experimental")] use sp_application_crypto::{bls381, ecdsa_bls381}; @@ -824,6 +826,26 @@ impl_runtime_apis! { vec![PresetId::from("foobar"), PresetId::from("staging")] } } + + impl sp_statement_store::runtime_api::ValidateStatement for Runtime { + fn validate_statement( + _source: StatementSource, + statement: Statement, + ) -> core::result::Result { + match statement.verify_signature() { + SignatureVerificationResult::Valid(_) => + Ok(ValidStatement { max_count: 100_000, max_size: 1_000_000 }), + SignatureVerificationResult::Invalid => Err(InvalidStatement::BadProof), + SignatureVerificationResult::NoSignature => { + if let Some(Proof::OnChain { block_hash: _, .. }) = statement.proof() { + Ok(ValidStatement { max_count: 100_000, max_size: 1_000_000 }) + } else { + Err(InvalidStatement::BadProof) + } + }, + } + } + } } fn test_ed25519_crypto(