Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a0f4c34
feat: Publish instruction enum def
metricaez Oct 8, 2025
257d9c5
feat: BroadcastHandler
metricaez Oct 8, 2025
2e2ff2e
fix: a bunch of boilerplate BroadcastHandlers
metricaez Oct 8, 2025
83bea50
feat: boilerplate handler for executor mock
metricaez Oct 8, 2025
22efdff
feat: pallet-broadcaster
metricaez Oct 8, 2025
76898b5
feat: some tests
metricaez Oct 9, 2025
9e3ddde
feat: add published data to ParachainInherentData
metricaez Oct 9, 2025
abdea03
feat: expose published data via ParachainHost
metricaez Oct 9, 2025
47e551a
feat: broadcast adapter and rococo
metricaez Oct 9, 2025
97dfeda
feat: add missing tests
metricaez Oct 9, 2025
9278c7c
feat: delte deprecated event trait
metricaez Oct 9, 2025
e72dbce
fix: api v 16
metricaez Oct 31, 2025
de54575
feat: broadcast adapter and rococo
metricaez Oct 9, 2025
e846236
fix: unnecesary config trait
metricaez Oct 31, 2025
36b7343
feat: Subscribe instruction enum def
metricaez Oct 15, 2025
fbfd1ee
feat: subscribe executor integration start impl
metricaez Oct 16, 2025
5579328
feat: toggle subscribe logic and runtime api
metricaez Oct 28, 2025
3d25fb0
fix: missing boilerplate weights
metricaez Oct 28, 2025
2c46c78
fix: missing rococo broadcaster config params
metricaez Oct 28, 2025
65a9fca
feat: inherent processing and get data by subscription on interface
metricaez Oct 30, 2025
8f73a76
feat: tests and benchmark
metricaez Oct 30, 2025
4b878e4
feat: remove unusude api and deprecated trait
metricaez Oct 31, 2025
7d68391
feat: add missing test and clean unused trait
metricaez Oct 31, 2025
2ea8775
choir: delete unnecesary api
metricaez Oct 31, 2025
008dd41
feat: data roots population
metricaez Nov 6, 2025
9b5a1e5
feat: root changes detect and benchmarks
metricaez Nov 10, 2025
36542f2
feat: fix some tests
metricaez Nov 11, 2025
3fcea1e
feat: missing test and mocks trait
metricaez Nov 11, 2025
68edfac
choir: cleanup rococo and auxiliary scripts
metricaez Nov 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,14 @@ impl RelayChainInterface for TestRelayClient {
unimplemented!("Not needed for test")
}

async fn retrieve_subscribed_published_data(
&self,
_: ParaId,
_: RelayHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<(Vec<u8>, Vec<u8>)>>> {
unimplemented!("Not needed for test")
}

async fn persisted_validation_data(
&self,
_: RelayHash,
Expand Down
8 changes: 8 additions & 0 deletions cumulus/client/consensus/common/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ impl RelayChainInterface for Relaychain {
unimplemented!("Not needed for test")
}

async fn retrieve_subscribed_published_data(
&self,
_: ParaId,
_: PHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<(Vec<u8>, Vec<u8>)>>> {
unimplemented!("Not needed for test")
}

async fn persisted_validation_data(
&self,
hash: PHash,
Expand Down
8 changes: 8 additions & 0 deletions cumulus/client/network/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ impl RelayChainInterface for DummyRelayChainInterface {
Ok(BTreeMap::new())
}

async fn retrieve_subscribed_published_data(
&self,
_: ParaId,
_: PHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<(Vec<u8>, Vec<u8>)>>> {
Ok(BTreeMap::new())
}

async fn persisted_validation_data(
&self,
_: PHash,
Expand Down
17 changes: 17 additions & 0 deletions cumulus/client/parachain-inherent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ async fn collect_relay_storage_proof(
relevant_keys.push(relay_well_known_keys::NEXT_AUTHORITIES.to_vec());
}

// Include broadcaster published data roots
relevant_keys.push(relay_well_known_keys::BROADCASTER_PUBLISHED_DATA_ROOTS.to_vec());

// Add additional relay state keys
let unique_keys: Vec<Vec<u8>> = additional_relay_state_keys
.into_iter()
Expand Down Expand Up @@ -223,13 +226,27 @@ impl ParachainInherentDataProvider {
})
.ok()?;

let published_data = relay_chain_interface
.retrieve_subscribed_published_data(para_id, relay_parent)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occurred during requesting subscribed published data.",
);
})
.unwrap_or_default();

Some(ParachainInherentData {
downward_messages,
horizontal_messages,
validation_data: validation_data.clone(),
relay_chain_state,
relay_parent_descendants,
collator_peer_id,
published_data,
})
}
}
1 change: 1 addition & 0 deletions cumulus/client/parachain-inherent/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ impl<R: Send + Sync + GenerateRandomness<u64>> InherentDataProvider
relay_chain_state: proof,
relay_parent_descendants: Default::default(),
collator_peer_id: None,
published_data: Default::default(),
};

parachain_inherent_data.provide_inherent_data(inherent_data).await
Expand Down
8 changes: 8 additions & 0 deletions cumulus/client/pov-recovery/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,14 @@ impl RelayChainInterface for Relaychain {
unimplemented!("Not needed for test")
}

async fn retrieve_subscribed_published_data(
&self,
_: ParaId,
_: PHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<(Vec<u8>, Vec<u8>)>>> {
unimplemented!("Not needed for test")
}

async fn persisted_validation_data(
&self,
_: PHash,
Expand Down
8 changes: 8 additions & 0 deletions cumulus/client/relay-chain-inprocess-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ impl RelayChainInterface for RelayChainInProcessInterface {
.inbound_hrmp_channels_contents(relay_parent, para_id)?)
}

async fn retrieve_subscribed_published_data(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<(Vec<u8>, Vec<u8>)>>> {
Ok(self.full_client.runtime_api().get_subscribed_data(relay_parent, para_id)?)
}

async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> {
let hash = match block_id {
BlockId::Hash(hash) => hash,
Expand Down
16 changes: 16 additions & 0 deletions cumulus/client/relay-chain-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ pub trait RelayChainInterface: Send + Sync {
relay_parent: PHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>;

/// Returns published data from all subscribed publishers for the parachain we are collating
/// for.
async fn retrieve_subscribed_published_data(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<(Vec<u8>, Vec<u8>)>>>;

/// Yields the persisted validation data for the given `ParaId` along with an assumption that
/// should be used if the para currently occupies a core.
///
Expand Down Expand Up @@ -273,6 +281,14 @@ where
(**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent).await
}

async fn retrieve_subscribed_published_data(
&self,
para_id: ParaId,
relay_parent: PHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<(Vec<u8>, Vec<u8>)>>> {
(**self).retrieve_subscribed_published_data(para_id, relay_parent).await
}

async fn persisted_validation_data(
&self,
block_id: PHash,
Expand Down
8 changes: 8 additions & 0 deletions cumulus/client/relay-chain-rpc-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ impl RelayChainInterface for RelayChainRpcInterface {
.await
}

async fn retrieve_subscribed_published_data(
&self,
para_id: ParaId,
relay_parent: RelayHash,
) -> RelayChainResult<BTreeMap<ParaId, Vec<(Vec<u8>, Vec<u8>)>>> {
self.rpc_client.parachain_host_get_subscribed_data(para_id, relay_parent).await
}

async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> {
let hash = match block_id {
BlockId::Hash(hash) => hash,
Expand Down
14 changes: 14 additions & 0 deletions cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,20 @@ impl RelayChainRpcClient {
.await
}

/// Get published data from all subscribed publishers for a parachain.
pub async fn parachain_host_get_subscribed_data(
&self,
para_id: ParaId,
at: RelayHash,
) -> Result<BTreeMap<ParaId, Vec<(Vec<u8>, Vec<u8>)>>, RelayChainError> {
self.call_remote_runtime_function(
"ParachainHost_get_subscribed_data",
at,
Some(para_id),
)
.await
}

/// Get all the pending inbound messages in the downward message queue for a para.
pub async fn parachain_host_dmq_contents(
&self,
Expand Down
53 changes: 53 additions & 0 deletions cumulus/pallets/parachain-system/src/benchmarking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,59 @@ mod benchmarks {
assert_eq!(LastDmqMqcHead::<T>::get().head(), head);
}

/// Benchmark processing published data from the broadcaster pallet.
///
/// - `p`: Number of publishers with changed data
/// - `k`: Number of key-value pairs per publisher
/// - `v`: Size of each value in bytes
#[benchmark]
fn process_published_data(
p: Linear<1, 100>,
k: Linear<1, 16>,
v: Linear<1, 1024>,
) {
use alloc::collections::BTreeMap;

// Populate storage with existing data to maximize clear_prefix cost
for i in 0..p {
let para_id = ParaId::from(1000 + i);
for j in 0..k {
PublishedData::<T>::insert(
para_id,
vec![j as u8; 32],
vec![0u8; v as usize],
);
}
}

// Store initial roots
let initial_roots: BTreeMap<ParaId, Vec<u8>> = (0..p)
.map(|i| (ParaId::from(1000 + i), vec![0xBB; 32]))
.collect();
PreviousPublishedDataRoots::<T>::put(initial_roots);

// Prepare new data with changed roots
let mut published_data = BTreeMap::new();
let mut current_roots = Vec::new();

for i in 0..p {
let para_id = ParaId::from(1000 + i);
let entries: Vec<(Vec<u8>, Vec<u8>)> = (0..k)
.map(|j| (vec![j as u8; 32], vec![1u8; v as usize]))
.collect();
published_data.insert(para_id, entries);
current_roots.push((para_id, vec![0xAA; 32]));
}

#[block]
{
Pallet::<T>::process_published_data(&published_data, &current_roots);
}

// Verify storage updated
assert_eq!(PreviousPublishedDataRoots::<T>::get().len(), p as usize);
}

/// Re-implements an easy version of the `MessageQueueChain` for testing purposes.
fn mqp_head(msgs: &Vec<InboundDownwardMessage>) -> RelayHash {
let mut head = Default::default();
Expand Down
Loading