Skip to content

Commit 98e87df

Browse files
authored
Merge pull request #967 from subspace/fix-block-producing-node
Publish pieces on DSN in the background
2 parents aa7db08 + 7ea7649 commit 98e87df

File tree

5 files changed

+113
-98
lines changed

5 files changed

+113
-98
lines changed

crates/sc-consensus-subspace/src/archiver.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,6 @@ pub fn start_subspace_archiver<Block, Backend, Client>(
380380
client: Arc<Client>,
381381
telemetry: Option<TelemetryHandle>,
382382
spawner: &impl sp_core::traits::SpawnEssentialNamed,
383-
is_authoring_blocks: bool,
384383
) where
385384
Block: BlockT,
386385
Backend: BackendT<Block>,
@@ -423,16 +422,12 @@ pub fn start_subspace_archiver<Block, Backend, Client>(
423422

424423
async move {
425424
// Farmers may have not received all previous segments, send them now.
426-
if is_authoring_blocks {
427-
for archived_segment in older_archived_segments {
428-
send_archived_segment_notification(
429-
&archived_segment_notification_sender,
430-
archived_segment,
431-
)
432-
.await;
433-
}
434-
} else {
435-
drop(older_archived_segments);
425+
for archived_segment in older_archived_segments {
426+
send_archived_segment_notification(
427+
&archived_segment_notification_sender,
428+
archived_segment,
429+
)
430+
.await;
436431
}
437432

438433
while let Some(ImportedBlockNotification {
@@ -509,13 +504,11 @@ pub fn start_subspace_archiver<Block, Backend, Client>(
509504
{
510505
let root_block = archived_segment.root_block;
511506

512-
if is_authoring_blocks {
513-
send_archived_segment_notification(
514-
&archived_segment_notification_sender,
515-
archived_segment,
516-
)
517-
.await;
518-
}
507+
send_archived_segment_notification(
508+
&archived_segment_notification_sender,
509+
archived_segment,
510+
)
511+
.await;
519512

520513
let _ = root_block_sender.send(root_block).await;
521514
}

crates/sc-consensus-subspace/src/tests.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,6 @@ fn run_one_test(mutator: impl Fn(&mut TestHeader, Stage) + Send + Sync + 'static
500500
client.clone(),
501501
None,
502502
&task_manager.spawn_essential_handle(),
503-
false,
504503
);
505504

506505
let (archived_pieces_sender, archived_pieces_receiver) = oneshot::channel();

crates/subspace-node/src/bin/subspace-node.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ fn main() -> Result<(), Error> {
205205
client.clone(),
206206
None,
207207
&task_manager.spawn_essential_handle(),
208-
config.role.is_authority(),
209208
);
210209

211210
Ok((

crates/subspace-service/src/dsn.rs

Lines changed: 62 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
mod piece_record_store;
22

33
use crate::dsn::piece_record_store::{AuxRecordStorage, SegmentIndexGetter};
4-
use futures::StreamExt;
5-
use sc_consensus_subspace::{ArchivedSegmentNotification, SubspaceLink};
4+
use futures::{Stream, StreamExt};
5+
use sc_client_api::AuxStore;
6+
use sc_consensus_subspace::ArchivedSegmentNotification;
67
use sc_piece_cache::AuxPieceCache;
7-
use sp_core::traits::SpawnEssentialNamed;
8+
use sp_core::traits::SpawnNamed;
89
use sp_runtime::traits::Block as BlockT;
910
use std::sync::Arc;
1011
use subspace_core_primitives::{Piece, PieceIndex, PieceIndexHash, PIECES_IN_SEGMENT};
1112
use subspace_networking::libp2p::{identity, Multiaddr};
1213
use subspace_networking::{
1314
BootstrappedNetworkingParameters, CreationError, CustomRecordStore, MemoryProviderStorage,
14-
Node, PieceByHashRequestHandler, PieceByHashResponse, PieceKey, ToMultihash,
15+
Node, NodeRunner, PieceByHashRequestHandler, PieceByHashResponse, PieceKey, ToMultihash,
1516
};
1617
use tracing::{debug, info, trace, Instrument};
1718

@@ -33,31 +34,28 @@ pub struct DsnConfig {
3334
pub allow_non_global_addresses_in_dht: bool,
3435
}
3536

36-
/// Start an archiver that will listen for archived segments and send it to DSN network using
37-
/// pub-sub protocol.
38-
pub async fn start_dsn_node<Block, Spawner, AS: sc_client_api::AuxStore + Sync + Send + 'static>(
39-
subspace_link: &SubspaceLink<Block>,
37+
pub(crate) async fn create_dsn_instance<Block, AS>(
4038
dsn_config: DsnConfig,
41-
spawner: Spawner,
4239
piece_cache: AuxPieceCache<AS>,
4340
piece_getter: PieceGetter,
4441
segment_index_getter: SegmentIndexGetter,
45-
) -> Result<Node, CreationError>
42+
) -> Result<
43+
(
44+
Node,
45+
NodeRunner<CustomRecordStore<AuxRecordStorage<AS>, MemoryProviderStorage>>,
46+
),
47+
CreationError,
48+
>
4649
where
4750
Block: BlockT,
48-
Spawner: SpawnEssentialNamed,
51+
AS: AuxStore + Sync + Send + 'static,
4952
{
50-
let span = tracing::info_span!(sc_tracing::logging::PREFIX_LOG_SPAN, name = "DSN");
51-
let _enter = span.enter();
52-
5353
// TODO: Combine `AuxPieceCache` with `AuxRecordStorage` and remove `PieceCache` abstraction
5454
let record_storage = AuxRecordStorage::new(piece_cache, segment_index_getter);
5555

5656
trace!("Subspace networking starting.");
5757

58-
let networking_config = subspace_networking::Config::<
59-
CustomRecordStore<AuxRecordStorage<AS>, MemoryProviderStorage>,
60-
> {
58+
let networking_config = subspace_networking::Config {
6159
keypair: dsn_config.keypair,
6260
listen_on: dsn_config.listen_on,
6361
allow_non_global_addresses_in_dht: dsn_config.allow_non_global_addresses_in_dht,
@@ -80,58 +78,51 @@ where
8078
..subspace_networking::Config::with_generated_keypair()
8179
};
8280

83-
let (node, mut node_runner) = subspace_networking::create(networking_config).await?;
84-
85-
info!("Subspace networking initialized: Node ID is {}", node.id());
81+
subspace_networking::create(networking_config).await
82+
}
8683

87-
spawner.spawn_essential(
88-
"node-runner",
89-
Some("subspace-networking"),
90-
Box::pin(
91-
async move {
92-
node_runner.run().await;
84+
/// Start an archiver that will listen for archived segments and send it to DSN network using
85+
/// pub-sub protocol.
86+
pub(crate) async fn start_dsn_archiver<Spawner>(
87+
mut archived_segment_notification_stream: impl Stream<Item = ArchivedSegmentNotification> + Unpin,
88+
node: Node,
89+
spawner: Spawner,
90+
) where
91+
Spawner: SpawnNamed,
92+
{
93+
trace!("Subspace DSN archiver started.");
94+
95+
let mut last_published_segment_index: Option<u64> = None;
96+
while let Some(ArchivedSegmentNotification {
97+
archived_segment, ..
98+
}) = archived_segment_notification_stream.next().await
99+
{
100+
let segment_index = archived_segment.root_block.segment_index();
101+
let first_piece_index = segment_index * u64::from(PIECES_IN_SEGMENT);
102+
103+
info!(%segment_index, "Processing a segment.");
104+
105+
// skip repeating publication
106+
if let Some(last_published_segment_index) = last_published_segment_index {
107+
if last_published_segment_index == segment_index {
108+
info!(?segment_index, "Archived segment skipped.");
109+
continue;
93110
}
94-
.in_current_span(),
95-
),
96-
);
97-
98-
let mut archived_segment_notification_stream = subspace_link
99-
.archived_segment_notification_stream()
100-
.subscribe();
101-
102-
spawner.spawn_essential(
103-
"archiver",
104-
Some("subspace-networking"),
105-
Box::pin({
106-
let node = node.clone();
107-
108-
async move {
109-
trace!("Subspace DSN archiver started.");
110-
111-
let mut last_published_segment_index: Option<u64> = None;
112-
while let Some(ArchivedSegmentNotification {
113-
archived_segment, ..
114-
}) = archived_segment_notification_stream.next().await
115-
{
116-
let segment_index = archived_segment.root_block.segment_index();
117-
let first_piece_index = segment_index * u64::from(PIECES_IN_SEGMENT);
118-
119-
info!(%segment_index, "Processing a segment.");
120-
121-
// skip repeating publication
122-
if let Some(last_published_segment_index) = last_published_segment_index {
123-
if last_published_segment_index == segment_index {
124-
info!(?segment_index, "Archived segment skipped.");
125-
continue;
126-
}
127-
}
128-
let keys_iter = (first_piece_index..)
129-
.take(archived_segment.pieces.count())
130-
.map(|idx| (idx, PieceIndexHash::from_index(idx)))
131-
.map(|(idx, hash)| (idx, hash.to_multihash()));
132-
111+
}
112+
let keys_iter = (first_piece_index..)
113+
.take(archived_segment.pieces.count())
114+
.map(|idx| (idx, PieceIndexHash::from_index(idx)))
115+
.map(|(idx, hash)| (idx, hash.to_multihash()));
116+
117+
spawner.spawn(
118+
"segment-publishing",
119+
Some("subspace-networking"),
120+
Box::pin({
121+
let node = node.clone();
122+
123+
async move {
133124
for ((_idx, key), piece) in keys_iter.zip(archived_segment.pieces.as_pieces()) {
134-
//TODO: restore annoucing after https://github.com/libp2p/rust-libp2p/issues/3048
125+
//TODO: restore announcing after https://github.com/libp2p/rust-libp2p/issues/3048
135126
// trace!(?key, ?idx, "Announcing key...");
136127
//
137128
// let announcing_result = node.start_announcing(key).await;
@@ -145,13 +136,12 @@ where
145136
//TODO: ensure republication of failed announcements
146137
}
147138

148-
last_published_segment_index = Some(segment_index);
149139
info!(%segment_index, "Segment processed.");
150140
}
151-
}
152-
.in_current_span()
153-
}),
154-
);
141+
.in_current_span()
142+
}),
143+
);
155144

156-
Ok(node)
145+
last_published_segment_index = Some(segment_index);
146+
}
157147
}

crates/subspace-service/src/lib.rs

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ mod dsn;
2121
mod pool;
2222
pub mod rpc;
2323

24+
use crate::dsn::create_dsn_instance;
2425
pub use crate::pool::FullPool;
2526
use derive_more::{Deref, DerefMut, Into};
2627
use domain_runtime_primitives::Hash as DomainHash;
27-
use dsn::start_dsn_node;
28+
use dsn::start_dsn_archiver;
2829
pub use dsn::DsnConfig;
2930
use frame_system_rpc_runtime_api::AccountNonceApi;
3031
use futures::channel::oneshot;
@@ -54,6 +55,7 @@ use sp_blockchain::HeaderMetadata;
5455
use sp_consensus::Error as ConsensusError;
5556
use sp_consensus_slots::Slot;
5657
use sp_consensus_subspace::{FarmerPublicKey, SubspaceApi};
58+
use sp_core::traits::SpawnEssentialNamed;
5759
use sp_domains::ExecutorApi;
5860
use sp_objects::ObjectsApi;
5961
use sp_offchain::OffchainWorkerApi;
@@ -67,7 +69,7 @@ use subspace_fraud_proof::VerifyFraudProof;
6769
use subspace_networking::libp2p::multiaddr::Protocol;
6870
use subspace_runtime_primitives::opaque::Block;
6971
use subspace_runtime_primitives::{AccountId, Balance, Hash, Index as Nonce};
70-
use tracing::error;
72+
use tracing::{error, info, Instrument};
7173

7274
/// Error type for Subspace service.
7375
#[derive(thiserror::Error, Debug)]
@@ -409,10 +411,11 @@ where
409411
});
410412

411413
let dsn_bootstrap_nodes = {
412-
let node = start_dsn_node(
413-
&subspace_link,
414+
let span = tracing::info_span!(sc_tracing::logging::PREFIX_LOG_SPAN, name = "DSN");
415+
let _enter = span.enter();
416+
417+
let (node, mut node_runner) = create_dsn_instance::<Block, _>(
414418
config.dsn_config.clone(),
415-
task_manager.spawn_essential_handle(),
416419
piece_cache.clone(),
417420
Arc::new({
418421
let piece_cache = piece_cache.clone();
@@ -436,6 +439,38 @@ where
436439
)
437440
.await?;
438441

442+
info!("Subspace networking initialized: Node ID is {}", node.id());
443+
444+
task_manager.spawn_essential_handle().spawn_essential(
445+
"node-runner",
446+
Some("subspace-networking"),
447+
Box::pin(
448+
async move {
449+
node_runner.run().await;
450+
}
451+
.in_current_span(),
452+
),
453+
);
454+
455+
let dsn_archiving_fut = start_dsn_archiver(
456+
subspace_link
457+
.archived_segment_notification_stream()
458+
.subscribe(),
459+
node.clone(),
460+
task_manager.spawn_handle(),
461+
);
462+
463+
task_manager.spawn_essential_handle().spawn_essential(
464+
"archiver",
465+
Some("subspace-networking"),
466+
Box::pin(
467+
async move {
468+
dsn_archiving_fut.await;
469+
}
470+
.in_current_span(),
471+
),
472+
);
473+
439474
// Fall back to node itself as bootstrap node for DSN so farmer always has someone to
440475
// connect to
441476
if config.dsn_config.bootstrap_nodes.is_empty() {
@@ -485,7 +520,6 @@ where
485520
client.clone(),
486521
telemetry.as_ref().map(|telemetry| telemetry.handle()),
487522
&task_manager.spawn_essential_handle(),
488-
config.role.is_authority(),
489523
);
490524

491525
let (network, system_rpc_tx, tx_handler_controller, network_starter) =

0 commit comments

Comments
 (0)