Skip to content

Commit 6ad9e1a

Browse files
authored
Merge pull request #985 from subspace/dsn-put-piece-batcher2
farrmer: Add piece publishing batcher.
2 parents 81cfe10 + 50a7b3c commit 6ad9e1a

File tree

5 files changed

+146
-49
lines changed

5 files changed

+146
-49
lines changed

crates/subspace-farmer-components/src/plotting.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,6 @@ pub enum PlottingError {
6767
/// I/O error occurred
6868
#[error("I/O error: {0}")]
6969
Io(#[from] io::Error),
70-
/// Incorrect batch size for piece receiver.
71-
#[error("Incorrect batch size for piece receiver.")]
72-
IncorrectPieceReceivingBatchSize,
7370
}
7471

7572
/// Plot a single sector, where `sector` and `sector_metadata` must be positioned correctly (seek to
@@ -189,12 +186,6 @@ async fn plot_pieces_in_batches_non_blocking<PR: PieceReceiver>(
189186
cancelled: &AtomicBool,
190187
piece_receiver_batch_size: usize,
191188
) -> Result<(), PlottingError> {
192-
const MAX_PIECE_RECEIVER_BATCH_SIZE: usize = 60;
193-
194-
if piece_receiver_batch_size == 0 || piece_receiver_batch_size > MAX_PIECE_RECEIVER_BATCH_SIZE {
195-
return Err(PlottingError::IncorrectPieceReceivingBatchSize);
196-
}
197-
198189
let semaphore = Arc::new(Semaphore::new(piece_receiver_batch_size));
199190

200191
let mut pieces_receiving_futures = piece_indexes

crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub(crate) async fn farm_multi_disk(
6565
disable_farming,
6666
mut dsn,
6767
piece_receiver_batch_size,
68+
piece_publisher_batch_size,
6869
} = farming_args;
6970

7071
let readers_and_pieces = Arc::new(Mutex::new(None));
@@ -119,6 +120,7 @@ pub(crate) async fn farm_multi_disk(
119120
reward_address,
120121
dsn_node: node.clone(),
121122
piece_receiver_batch_size: farming_args.piece_receiver_batch_size,
123+
piece_publisher_batch_size: farming_args.piece_publisher_batch_size,
122124
})?;
123125

124126
single_disk_plots.push(single_disk_plot);

crates/subspace-farmer/src/bin/subspace-farmer/main.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ use tracing_subscriber::fmt::format::FmtSpan;
2323
use tracing_subscriber::prelude::*;
2424
use tracing_subscriber::{fmt, EnvFilter};
2525

26+
// Defines a maximum constraint for the piece publisher batch.
27+
const MAX_PIECE_PUBLISHER_BATCH_SIZE: usize = 30;
28+
29+
// Defines a maximum constraint for the piece receiver batch.
30+
const MAX_PIECE_RECEIVER_BATCH_SIZE: usize = 30;
31+
2632
#[cfg(all(
2733
target_arch = "x86_64",
2834
target_vendor = "unknown",
@@ -54,8 +60,11 @@ struct FarmingArgs {
5460
#[clap(flatten)]
5561
dsn: DsnArgs,
5662
/// Defines size for the pieces batch of the piece receiving process.
57-
#[arg(long, default_value_t = 20)]
63+
#[arg(long, default_value_t = 12)]
5864
piece_receiver_batch_size: usize,
65+
/// Defines size for the pieces batch of the piece publishing process.
66+
#[arg(long, default_value_t = 12)]
67+
piece_publisher_batch_size: usize,
5968
}
6069

6170
/// Arguments for DSN
@@ -287,6 +296,26 @@ async fn main() -> Result<()> {
287296
command.farm
288297
};
289298

299+
if farming_args.piece_publisher_batch_size == 0
300+
|| farming_args.piece_publisher_batch_size > MAX_PIECE_PUBLISHER_BATCH_SIZE
301+
{
302+
return Err(anyhow::anyhow!(
303+
"Incorrect piece publisher batch size: {}. Should be 1-{}",
304+
farming_args.piece_publisher_batch_size,
305+
MAX_PIECE_PUBLISHER_BATCH_SIZE
306+
));
307+
}
308+
309+
if farming_args.piece_receiver_batch_size == 0
310+
|| farming_args.piece_receiver_batch_size > MAX_PIECE_RECEIVER_BATCH_SIZE
311+
{
312+
return Err(anyhow::anyhow!(
313+
"Incorrect piece receiver batch size: {}. Should be 1-{}",
314+
farming_args.piece_receiver_batch_size,
315+
MAX_PIECE_RECEIVER_BATCH_SIZE
316+
));
317+
}
318+
290319
commands::farm_multi_disk(base_path, disk_farms, farming_args).await?;
291320
}
292321
Subcommand::Info => {

crates/subspace-farmer/src/single_disk_plot.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,8 @@ pub struct SingleDiskPlotOptions<RC> {
273273
pub dsn_node: Node,
274274
/// Defines size for the pieces batch of the piece receiving process.
275275
pub piece_receiver_batch_size: usize,
276+
/// Defines size for the pieces batch of the piece publishing process.
277+
pub piece_publisher_batch_size: usize,
276278
}
277279

278280
/// Errors happening when trying to create/open single disk plot
@@ -467,6 +469,7 @@ impl SingleDiskPlot {
467469
reward_address,
468470
dsn_node,
469471
piece_receiver_batch_size,
472+
piece_publisher_batch_size,
470473
} = options;
471474

472475
// TODO: Account for plot overhead
@@ -735,7 +738,7 @@ impl SingleDiskPlot {
735738

736739
async move {
737740
if let Err(error) = piece_publisher
738-
.publish_pieces(plotted_sector.piece_indexes)
741+
.publish_pieces(plotted_sector.piece_indexes, piece_publisher_batch_size)
739742
.await
740743
{
741744
warn!(%sector_index, %error, "Failed to publish pieces to DSN");
Lines changed: 110 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
1+
use backoff::future::retry;
2+
use backoff::ExponentialBackoff;
3+
use futures::stream::FuturesUnordered;
14
use futures::StreamExt;
25
use parity_scale_codec::Encode;
36
use std::collections::BTreeSet;
47
use std::error::Error;
8+
use std::future::Future;
9+
use std::pin::Pin;
510
use std::sync::atomic::{AtomicBool, Ordering};
611
use std::sync::Arc;
712
use std::time::Duration;
813
use subspace_core_primitives::{PieceIndex, PieceIndexHash};
914
use subspace_networking::utils::multihash::MultihashCode;
1015
use subspace_networking::{Node, ToMultihash};
11-
use tokio::time::sleep;
12-
use tracing::{debug, error, trace};
16+
use tokio::sync::Semaphore;
17+
use tokio::time::error::Elapsed;
18+
use tokio::time::timeout;
19+
use tracing::{debug, error, info, trace};
1320

14-
/// Defines a duration between piece publishing calls.
15-
const PUBLISH_PIECE_BY_SECTOR_WAITING_DURATION_IN_SECS: u64 = 1;
21+
/// Max time allocated for putting piece from DSN before attempt is considered to fail
22+
const PUT_PIECE_TIMEOUT: Duration = Duration::from_secs(5);
23+
/// Defines initial duration between put_piece calls.
24+
const PUT_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
25+
/// Defines max duration between put_piece calls.
26+
const PUT_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5);
1627

1728
// Piece-by-sector DSN publishing helper.
1829
#[derive(Clone)]
@@ -42,43 +53,104 @@ impl PieceSectorPublisher {
4253
// Publishes pieces-by-sector to DSN in bulk. Supports cancellation.
4354
pub(crate) async fn publish_pieces(
4455
&self,
45-
pieces_indexes: Vec<PieceIndex>,
56+
piece_indexes: Vec<PieceIndex>,
57+
piece_publisher_batch_size: usize,
4658
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
47-
for piece_index in pieces_indexes {
48-
'attempts: loop {
49-
self.check_cancellation()?;
50-
51-
let key = PieceIndexHash::from_index(piece_index)
52-
.to_multihash_by_code(MultihashCode::Sector);
53-
54-
// TODO: rework to piece announcing (pull-model) after fixing
55-
// https://github.com/libp2p/rust-libp2p/issues/3048
56-
let set = BTreeSet::from_iter(vec![self.dsn_node.id().to_bytes()]);
57-
58-
let result = self.dsn_node.put_value(key, set.encode()).await;
59-
60-
match result {
61-
Ok(mut stream) => {
62-
if stream.next().await.is_some() {
63-
trace!(%piece_index, ?key, "Piece publishing for a sector succeeded");
64-
break 'attempts;
65-
} else {
66-
trace!(%piece_index, ?key, "Piece publishing for a sector failed");
67-
}
68-
}
69-
Err(error) => {
70-
error!(?error, %piece_index, ?key, "Piece publishing for a sector returned an error");
71-
72-
// pause before retrying
73-
sleep(Duration::from_secs(
74-
PUBLISH_PIECE_BY_SECTOR_WAITING_DURATION_IN_SECS,
75-
))
76-
.await;
77-
}
78-
}
79-
}
59+
let semaphore = Arc::new(Semaphore::new(piece_publisher_batch_size));
60+
61+
let mut pieces_receiving_futures = piece_indexes
62+
.iter()
63+
.map(|piece_index| {
64+
Box::pin(async {
65+
let _permit = semaphore
66+
.acquire()
67+
.await
68+
.expect("Should be valid on non-closed semaphore");
69+
70+
self.publish_single_piece_with_backoff(*piece_index).await
71+
})
72+
})
73+
.collect::<FuturesUnordered<_>>();
74+
75+
while pieces_receiving_futures.next().await.is_some() {
76+
self.check_cancellation()?;
8077
}
8178

79+
info!("Piece publishing was successful.");
80+
8281
Ok(())
8382
}
83+
84+
async fn publish_single_piece_with_backoff(
85+
&self,
86+
piece_index: PieceIndex,
87+
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
88+
let backoff = ExponentialBackoff {
89+
initial_interval: PUT_PIECE_INITIAL_INTERVAL,
90+
max_interval: PUT_PIECE_MAX_INTERVAL,
91+
// Try until we get a valid piece
92+
max_elapsed_time: None,
93+
..ExponentialBackoff::default()
94+
};
95+
96+
retry(backoff, || async {
97+
self.check_cancellation()
98+
.map_err(backoff::Error::Permanent)?;
99+
100+
let publish_timeout_result: Result<Result<(), _>, Elapsed> = timeout(
101+
PUT_PIECE_TIMEOUT,
102+
Box::pin(self.publish_single_piece(piece_index))
103+
as Pin<Box<dyn Future<Output = _> + Send>>,
104+
)
105+
.await;
106+
107+
if let Ok(publish_result) = publish_timeout_result {
108+
if publish_result.is_ok() {
109+
return Ok(());
110+
}
111+
}
112+
113+
error!(%piece_index, "Couldn't publish a piece. Retrying...");
114+
115+
Err(backoff::Error::transient(
116+
"Couldn't publish piece to DSN".into(),
117+
))
118+
})
119+
.await
120+
}
121+
122+
async fn publish_single_piece(
123+
&self,
124+
piece_index: PieceIndex,
125+
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
126+
self.check_cancellation()?;
127+
128+
let key =
129+
PieceIndexHash::from_index(piece_index).to_multihash_by_code(MultihashCode::Sector);
130+
131+
// TODO: rework to piece announcing (pull-model) after fixing
132+
// https://github.com/libp2p/rust-libp2p/issues/3048
133+
let set = BTreeSet::from_iter(vec![self.dsn_node.id().to_bytes()]);
134+
135+
let result = self.dsn_node.put_value(key, set.encode()).await;
136+
137+
match result {
138+
Err(error) => {
139+
debug!(?error, %piece_index, ?key, "Piece publishing for a sector returned an error");
140+
141+
Err("Piece publishing failed".into())
142+
}
143+
Ok(mut stream) => {
144+
if stream.next().await.is_some() {
145+
trace!(%piece_index, ?key, "Piece publishing for a sector succeeded");
146+
147+
Ok(())
148+
} else {
149+
debug!(%piece_index, ?key, "Piece publishing for a sector failed");
150+
151+
Err("Piece publishing was unsuccessful".into())
152+
}
153+
}
154+
}
155+
}
84156
}

0 commit comments

Comments
 (0)