Skip to content

Commit 3a076b1

Browse files
authored
Merge pull request #992 from subspace/dsn-node-publish-piece-batcher
Node. Add piece publishing batcher.
2 parents d376a81 + c1342f4 commit 3a076b1

File tree

22 files changed

+290
-139
lines changed

22 files changed

+290
-139
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/sp-lightclient/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ futures = "0.3.25"
3535
rand = { version = "0.8.5", features = ["min_const_gen"] }
3636
subspace-archiving = { version = "0.1.0", path = "../subspace-archiving"}
3737
subspace-farmer-components = { version = "0.1.0", path = "../subspace-farmer-components" }
38+
tokio = { version = "1.21.2", features = ["sync"] }
3839

3940
[features]
4041
default = ["std"]

crates/sp-lightclient/src/tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ impl Farmer {
107107
};
108108
let sector_codec = SectorCodec::new(PLOT_SECTOR_SIZE as usize).unwrap();
109109
let piece_receiver_batch_size = 20usize;
110+
110111
block_on(plot_sector(
111112
&public_key,
112113
sector_index,
@@ -117,7 +118,7 @@ impl Farmer {
117118
&sector_codec,
118119
Cursor::new(sector.as_mut_slice()),
119120
Cursor::new(sector_metadata.as_mut_slice()),
120-
piece_receiver_batch_size,
121+
&tokio::sync::Semaphore::new(piece_receiver_batch_size),
121122
))
122123
.unwrap();
123124

crates/subspace-farmer-components/benches/auditing.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use subspace_farmer_components::farming::audit_sector;
1919
use subspace_farmer_components::file_ext::FileExt;
2020
use subspace_farmer_components::plotting::plot_sector;
2121
use subspace_farmer_components::FarmerProtocolInfo;
22+
use tokio::sync::Semaphore;
2223
use utils::BenchPieceReceiver;
2324

2425
mod utils;
@@ -64,6 +65,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
6465
let global_challenge = Blake2b256Hash::default();
6566
let solution_range = SolutionRange::MAX;
6667
let piece_receiver_batch_size = 20usize;
68+
let piece_receiver_semaphore = Semaphore::new(piece_receiver_batch_size);
6769

6870
let plotted_sector = {
6971
let mut plotted_sector = vec![0u8; PLOT_SECTOR_SIZE as usize];
@@ -78,7 +80,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
7880
&sector_codec,
7981
plotted_sector.as_mut_slice(),
8082
io::sink(),
81-
piece_receiver_batch_size,
83+
&piece_receiver_semaphore,
8284
))
8385
.unwrap();
8486

crates/subspace-farmer-components/benches/plotting.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use subspace_core_primitives::{
1616
};
1717
use subspace_farmer_components::plotting::plot_sector;
1818
use subspace_farmer_components::FarmerProtocolInfo;
19+
use tokio::sync::Semaphore;
1920
use utils::BenchPieceReceiver;
2021

2122
mod utils;
@@ -54,6 +55,7 @@ fn criterion_benchmark(c: &mut Criterion) {
5455
};
5556
let piece_receiver = BenchPieceReceiver::new(piece);
5657
let piece_receiver_batch_size = 20usize;
58+
let piece_receiver_semaphore = Semaphore::new(piece_receiver_batch_size);
5759

5860
let mut group = c.benchmark_group("sector-plotting");
5961
group.throughput(Throughput::Bytes(PLOT_SECTOR_SIZE));
@@ -69,12 +71,12 @@ fn criterion_benchmark(c: &mut Criterion) {
6971
black_box(&sector_codec),
7072
black_box(io::sink()),
7173
black_box(io::sink()),
72-
black_box(piece_receiver_batch_size),
74+
black_box(&piece_receiver_semaphore),
7375
))
7476
.unwrap();
7577
})
7678
});
77-
let piece_receiver_batch_size = 20usize;
79+
7880
let thread_count = current_num_threads() as u64;
7981
group.throughput(Throughput::Bytes(PLOT_SECTOR_SIZE * thread_count));
8082
group.bench_function("no-writes-multi-thread", |b| {
@@ -93,7 +95,7 @@ fn criterion_benchmark(c: &mut Criterion) {
9395
black_box(&sector_codec),
9496
black_box(io::sink()),
9597
black_box(io::sink()),
96-
black_box(piece_receiver_batch_size),
98+
black_box(&piece_receiver_semaphore),
9799
))
98100
.unwrap();
99101
});

crates/subspace-farmer-components/benches/proving.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use subspace_farmer_components::farming::audit_sector;
2020
use subspace_farmer_components::file_ext::FileExt;
2121
use subspace_farmer_components::plotting::plot_sector;
2222
use subspace_farmer_components::{FarmerProtocolInfo, SectorMetadata};
23+
use tokio::sync::Semaphore;
2324
use utils::BenchPieceReceiver;
2425

2526
mod utils;
@@ -66,7 +67,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
6667
let global_challenge = Blake2b256Hash::default();
6768
let solution_range = SolutionRange::MAX;
6869
let reward_address = PublicKey::default();
69-
let piece_receiver_batch_size = 20usize;
70+
let piece_receiver_semaphore = Semaphore::new(usize::MAX);
7071

7172
let (plotted_sector, sector_metadata) = {
7273
let mut plotted_sector = vec![0u8; PLOT_SECTOR_SIZE as usize];
@@ -82,7 +83,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
8283
&sector_codec,
8384
plotted_sector.as_mut_slice(),
8485
sector_metadata.as_mut_slice(),
85-
piece_receiver_batch_size,
86+
&piece_receiver_semaphore,
8687
))
8788
.unwrap();
8889

crates/subspace-farmer-components/src/plotting.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use parity_scale_codec::Encode;
66
use std::error::Error;
77
use std::io;
88
use std::sync::atomic::{AtomicBool, Ordering};
9-
use std::sync::Arc;
109
use subspace_core_primitives::crypto::kzg;
1110
use subspace_core_primitives::crypto::kzg::{Commitment, Kzg};
1211
use subspace_core_primitives::sector_codec::{SectorCodec, SectorCodecError};
@@ -85,7 +84,7 @@ pub async fn plot_sector<PR, S, SM>(
8584
sector_codec: &SectorCodec,
8685
mut sector_output: S,
8786
mut sector_metadata_output: SM,
88-
piece_receiver_batch_size: usize,
87+
semaphore: &Semaphore,
8988
) -> Result<PlottedSector, PlottingError>
9089
where
9190
PR: PieceReceiver,
@@ -121,7 +120,7 @@ where
121120
piece_receiver,
122121
&piece_indexes,
123122
cancelled,
124-
piece_receiver_batch_size,
123+
semaphore,
125124
)
126125
.await?;
127126

@@ -184,10 +183,8 @@ async fn plot_pieces_in_batches_non_blocking<PR: PieceReceiver>(
184183
piece_receiver: &PR,
185184
piece_indexes: &[PieceIndex],
186185
cancelled: &AtomicBool,
187-
piece_receiver_batch_size: usize,
186+
semaphore: &Semaphore,
188187
) -> Result<(), PlottingError> {
189-
let semaphore = Arc::new(Semaphore::new(piece_receiver_batch_size));
190-
191188
let mut pieces_receiving_futures = piece_indexes
192189
.iter()
193190
.map(|piece_index| {

crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,15 @@ pub(crate) async fn farm_multi_disk(
7070

7171
let readers_and_pieces = Arc::new(Mutex::new(None));
7272

73+
info!("Connecting to node RPC at {}", node_rpc_url);
74+
let rpc_client = NodeRpcClient::new(&node_rpc_url).await?;
75+
let piece_publisher_semaphore = Arc::new(tokio::sync::Semaphore::new(
76+
farming_args.piece_receiver_batch_size,
77+
));
78+
let piece_receiver_semaphore = Arc::new(tokio::sync::Semaphore::new(
79+
farming_args.piece_publisher_batch_size,
80+
));
81+
7382
let (node, mut node_runner) = {
7483
// TODO: Temporary networking identity derivation from the first disk farm identity.
7584
let directory = disk_farms
@@ -83,9 +92,6 @@ pub(crate) async fn farm_multi_disk(
8392

8493
if dsn.bootstrap_nodes.is_empty() {
8594
dsn.bootstrap_nodes = {
86-
info!("Connecting to node RPC at {}", node_rpc_url);
87-
let rpc_client = NodeRpcClient::new(&node_rpc_url).await?;
88-
8995
rpc_client
9096
.farmer_app_info()
9197
.await
@@ -119,8 +125,8 @@ pub(crate) async fn farm_multi_disk(
119125
rpc_client,
120126
reward_address,
121127
dsn_node: node.clone(),
122-
piece_receiver_batch_size: farming_args.piece_receiver_batch_size,
123-
piece_publisher_batch_size: farming_args.piece_publisher_batch_size,
128+
piece_receiver_semaphore: Arc::clone(&piece_receiver_semaphore),
129+
piece_publisher_semaphore: Arc::clone(&piece_publisher_semaphore),
124130
})?;
125131

126132
single_disk_plots.push(single_disk_plot);

crates/subspace-farmer/src/single_disk_plot.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -271,10 +271,10 @@ pub struct SingleDiskPlotOptions<RC> {
271271
pub reward_address: PublicKey,
272272
/// Optional DSN Node.
273273
pub dsn_node: Node,
274-
/// Defines size for the pieces batch of the piece receiving process.
275-
pub piece_receiver_batch_size: usize,
276-
/// Defines size for the pieces batch of the piece publishing process.
277-
pub piece_publisher_batch_size: usize,
274+
/// Semaphore to limit concurrency of piece receiving process.
275+
pub piece_receiver_semaphore: Arc<tokio::sync::Semaphore>,
276+
/// Semaphore to limit concurrency of piece publishing process.
277+
pub piece_publisher_semaphore: Arc<tokio::sync::Semaphore>,
278278
}
279279

280280
/// Errors happening when trying to create/open single disk plot
@@ -468,8 +468,8 @@ impl SingleDiskPlot {
468468
rpc_client,
469469
reward_address,
470470
dsn_node,
471-
piece_receiver_batch_size,
472-
piece_publisher_batch_size,
471+
piece_publisher_semaphore,
472+
piece_receiver_semaphore,
473473
} = options;
474474

475475
// TODO: Account for plot overhead
@@ -648,7 +648,7 @@ impl SingleDiskPlot {
648648
let rpc_client = rpc_client.clone();
649649
let error_sender = Arc::clone(&error_sender);
650650
let piece_publisher =
651-
PieceSectorPublisher::new(dsn_node.clone(), shutting_down.clone());
651+
PieceSectorPublisher::new(dsn_node.clone(), shutting_down.clone(), piece_publisher_semaphore);
652652

653653
move || {
654654
let _tokio_handle_guard = handle.enter();
@@ -709,7 +709,7 @@ impl SingleDiskPlot {
709709
&sector_codec,
710710
sector,
711711
sector_metadata,
712-
piece_receiver_batch_size,
712+
&piece_receiver_semaphore,
713713
)) {
714714
Ok(plotted_sector) => {
715715
debug!(%sector_index, "Sector plotted");
@@ -738,7 +738,7 @@ impl SingleDiskPlot {
738738

739739
async move {
740740
if let Err(error) = piece_publisher
741-
.publish_pieces(plotted_sector.piece_indexes, piece_publisher_batch_size)
741+
.publish_pieces(plotted_sector.piece_indexes)
742742
.await
743743
{
744744
warn!(%sector_index, %error, "Failed to publish pieces to DSN");

crates/subspace-farmer/src/single_disk_plot/piece_publisher.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ use futures::StreamExt;
55
use parity_scale_codec::Encode;
66
use std::collections::BTreeSet;
77
use std::error::Error;
8-
use std::future::Future;
9-
use std::pin::Pin;
108
use std::sync::atomic::{AtomicBool, Ordering};
119
use std::sync::Arc;
1210
use std::time::Duration;
@@ -30,13 +28,19 @@ const PUT_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5);
3028
pub(crate) struct PieceSectorPublisher {
3129
dsn_node: Node,
3230
cancelled: Arc<AtomicBool>,
31+
semaphore: Arc<Semaphore>,
3332
}
3433

3534
impl PieceSectorPublisher {
36-
pub(crate) fn new(dsn_node: Node, cancelled: Arc<AtomicBool>) -> Self {
35+
pub(crate) fn new(
36+
dsn_node: Node,
37+
cancelled: Arc<AtomicBool>,
38+
semaphore: Arc<Semaphore>,
39+
) -> Self {
3740
Self {
3841
dsn_node,
3942
cancelled,
43+
semaphore,
4044
}
4145
}
4246

@@ -54,15 +58,13 @@ impl PieceSectorPublisher {
5458
pub(crate) async fn publish_pieces(
5559
&self,
5660
piece_indexes: Vec<PieceIndex>,
57-
piece_publisher_batch_size: usize,
5861
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
59-
let semaphore = Arc::new(Semaphore::new(piece_publisher_batch_size));
60-
6162
let mut pieces_receiving_futures = piece_indexes
6263
.iter()
6364
.map(|piece_index| {
6465
Box::pin(async {
65-
let _permit = semaphore
66+
let _permit = self
67+
.semaphore
6668
.acquire()
6769
.await
6870
.expect("Should be valid on non-closed semaphore");
@@ -97,12 +99,8 @@ impl PieceSectorPublisher {
9799
self.check_cancellation()
98100
.map_err(backoff::Error::Permanent)?;
99101

100-
let publish_timeout_result: Result<Result<(), _>, Elapsed> = timeout(
101-
PUT_PIECE_TIMEOUT,
102-
Box::pin(self.publish_single_piece(piece_index))
103-
as Pin<Box<dyn Future<Output = _> + Send>>,
104-
)
105-
.await;
102+
let publish_timeout_result: Result<Result<(), _>, Elapsed> =
103+
timeout(PUT_PIECE_TIMEOUT, self.publish_single_piece(piece_index)).await;
106104

107105
if let Ok(publish_result) = publish_timeout_result {
108106
if publish_result.is_ok() {

0 commit comments

Comments
 (0)