Skip to content

Commit d218e0d

Browse files
committed
Improve piece getting mechanism on farmer
1 parent 7ea7649 commit d218e0d

File tree

4 files changed

+76
-18
lines changed

4 files changed

+76
-18
lines changed

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/subspace-farmer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ include = [
1414
[dependencies]
1515
anyhow = "1.0.66"
1616
async-trait = "0.1.58"
17+
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
1718
base58 = "0.2.0"
1819
blake2 = "0.10.5"
1920
bytesize = "1.1.0"

crates/subspace-farmer/src/single_disk_plot.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -663,8 +663,8 @@ impl SingleDiskPlot {
663663
// Some sectors may already be plotted, skip them
664664
metadata_header.lock().sector_count as usize,
665665
)
666-
.map(|(sector_index, (sector, metadata))| {
667-
(sector_index as u64 + first_sector_index, sector, metadata)
666+
.map(|(sector_offset, (sector, metadata))| {
667+
(sector_offset as u64 + first_sector_index, sector, metadata)
668668
});
669669

670670
// TODO: Concurrency
@@ -677,6 +677,8 @@ impl SingleDiskPlot {
677677
return;
678678
}
679679

680+
debug!(%sector_index, "Plotting sector");
681+
680682
let farmer_app_info = handle
681683
.block_on(rpc_client.farmer_app_info())
682684
.map_err(|error| PlottingError::FailedToGetFarmerInfo { error })?;

crates/subspace-farmer/src/single_disk_plot/piece_receiver.rs

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
11
use async_trait::async_trait;
2+
use backoff::future::retry;
3+
use backoff::ExponentialBackoff;
4+
use futures::stream::FuturesUnordered;
5+
use futures::StreamExt;
26
use parity_scale_codec::Decode;
37
use std::collections::BTreeSet;
48
use std::error::Error;
9+
use std::future::Future;
10+
use std::pin::Pin;
511
use std::sync::atomic::{AtomicBool, Ordering};
612
use std::time::Duration;
713
use subspace_core_primitives::{Piece, PieceIndex, PieceIndexHash};
814
use subspace_farmer_components::plotting::PieceReceiver;
915
use subspace_networking::libp2p::PeerId;
1016
use subspace_networking::utils::multihash::MultihashCode;
1117
use subspace_networking::{Node, PieceByHashRequest, PieceKey, ToMultihash};
12-
use tokio::time::sleep;
18+
use tokio::time::{sleep, timeout};
1319
use tracing::{debug, error, info, trace, warn};
1420

15-
/// Defines a duration between get_piece calls.
16-
const GET_PIECE_WAITING_DURATION_IN_SECS: u64 = 1;
21+
/// Defines initial duration between get_piece calls.
22+
const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
23+
/// Defines max duration between get_piece calls.
24+
const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5);
25+
/// Delay for getting piece from cache before resorting to archival storage
26+
const GET_PIECE_ARCHIVAL_STORAGE_DELAY: Duration = Duration::from_secs(1);
27+
/// Max time allocated for getting piece from DSN before attempt is considered to fail
28+
const GET_PIECE_TIMEOUT: Duration = Duration::from_secs(5);
1729

1830
// Temporary struct serving pieces from different providers using configuration arguments.
1931
pub(crate) struct MultiChannelPieceReceiver<'a> {
@@ -181,21 +193,49 @@ impl<'a> PieceReceiver for MultiChannelPieceReceiver<'a> {
181193
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
182194
trace!(%piece_index, "Piece request.");
183195

184-
// until we get a valid piece
185-
loop {
186-
self.check_cancellation()?;
187-
188-
if let Some(piece) = self.get_piece_from_cache(piece_index).await {
189-
return Ok(Some(piece));
190-
}
191-
192-
if let Some(piece) = self.get_piece_from_archival_storage(piece_index).await {
193-
return Ok(Some(piece));
196+
let backoff = ExponentialBackoff {
197+
initial_interval: GET_PIECE_INITIAL_INTERVAL,
198+
max_interval: GET_PIECE_MAX_INTERVAL,
199+
// Try until we get a valid piece
200+
max_elapsed_time: None,
201+
..ExponentialBackoff::default()
202+
};
203+
204+
retry(backoff, || async {
205+
self.check_cancellation()
206+
.map_err(backoff::Error::Permanent)?;
207+
208+
// Try to pull pieces in two ways, whichever is faster
209+
let mut piece_attempts = [
210+
timeout(
211+
GET_PIECE_TIMEOUT,
212+
Box::pin(self.get_piece_from_cache(piece_index))
213+
as Pin<Box<dyn Future<Output = _> + Send>>,
214+
),
215+
timeout(
216+
GET_PIECE_TIMEOUT,
217+
Box::pin(async {
218+
// Prefer cache if it can return quickly, otherwise fall back to archival storage
219+
sleep(GET_PIECE_ARCHIVAL_STORAGE_DELAY).await;
220+
self.get_piece_from_archival_storage(piece_index).await
221+
}) as Pin<Box<dyn Future<Output = _> + Send>>,
222+
),
223+
]
224+
.into_iter()
225+
.collect::<FuturesUnordered<_>>();
226+
227+
while let Some(maybe_piece) = piece_attempts.next().await {
228+
if let Ok(Some(piece)) = maybe_piece {
229+
return Ok(Some(piece));
230+
}
194231
}
195232

196-
warn!(%piece_index, "Couldn't get a piece from DSN. Starting a new attempt...");
233+
warn!(%piece_index, "Couldn't get a piece from DSN. Retrying...");
197234

198-
sleep(Duration::from_secs(GET_PIECE_WAITING_DURATION_IN_SECS)).await;
199-
}
235+
Err(backoff::Error::transient(
236+
"Couldn't get piece from DSN".into(),
237+
))
238+
})
239+
.await
200240
}
201241
}

0 commit comments

Comments
 (0)