Skip to content

Commit 9c69b3a

Browse files
authored
Merge pull request #715 from subspace/move-get-pieces-by-range
Move get pieces by range
2 parents a1b9a39 + 07887b0 commit 9c69b3a

File tree

13 files changed

+202
-406
lines changed

13 files changed

+202
-406
lines changed

crates/subspace-farmer/src/dsn.rs

Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,35 @@
1-
use futures::{Stream, StreamExt};
1+
use futures::{SinkExt, Stream, StreamExt};
22
use num_traits::{WrappingAdd, WrappingSub};
33
use std::ops::Range;
44
use subspace_core_primitives::{
55
FlatPieces, PieceIndex, PieceIndexHash, PublicKey, PIECE_SIZE, U256,
66
};
7-
use subspace_networking::PiecesToPlot;
7+
use subspace_networking::libp2p::core::multihash::{Code, MultihashDigest};
8+
use subspace_networking::{PiecesByRangeRequest, PiecesByRangeResponse, PiecesToPlot};
9+
use tracing::{debug, trace, warn};
810

11+
#[cfg(test)]
12+
mod pieces_by_range_tests;
913
#[cfg(test)]
1014
mod tests;
1115

16+
const PIECES_CHANNEL_BUFFER_SIZE: usize = 20;
17+
1218
pub type PieceIndexHashNumber = U256;
1319

20+
#[derive(Debug, thiserror::Error)]
21+
pub enum GetPiecesByRangeError {
22+
/// Cannot find closest pieces by range.
23+
#[error("Cannot find closest pieces by range")]
24+
NoClosestPiecesFound,
25+
/// Cannot get closest peers from DHT.
26+
#[error("Cannot get closest peers from DHT")]
27+
CannotGetClosestPeers(#[source] subspace_networking::GetClosestPeersError),
28+
/// Node runner was dropped, impossible to get pieces by range.
29+
#[error("Node runner was dropped, impossible to get pieces by range")]
30+
NodeRunnerDropped(#[source] subspace_networking::SendRequestError),
31+
}
32+
1433
/// Options for syncing
1534
pub struct SyncOptions {
1635
/// Max plot size from node (in bytes)
@@ -55,13 +74,91 @@ impl DSNSync for NoSync {
5574
#[async_trait::async_trait]
5675
impl DSNSync for subspace_networking::Node {
5776
type Stream = futures::channel::mpsc::Receiver<PiecesToPlot>;
58-
type Error = subspace_networking::GetPiecesByRangeError;
77+
type Error = GetPiecesByRangeError;
5978

6079
async fn get_pieces(
6180
&mut self,
6281
Range { start, end }: Range<PieceIndexHash>,
6382
) -> Result<Self::Stream, Self::Error> {
64-
self.get_pieces_by_range(start, end).await
83+
// calculate the middle of the range (big endian)
84+
let middle = {
85+
let start = PieceIndexHashNumber::from(start);
86+
let end = PieceIndexHashNumber::from(end);
87+
// min + (max - min) / 2
88+
(start / 2 + end / 2).to_be_bytes()
89+
};
90+
91+
// obtain closest peers to the middle of the range
92+
let key = Code::Identity.digest(&middle);
93+
let peers = self
94+
.get_closest_peers(key)
95+
.await
96+
.map_err(GetPiecesByRangeError::CannotGetClosestPeers)?;
97+
98+
// select first peer for the piece-by-range protocol
99+
let peer_id = *peers
100+
.first()
101+
.ok_or(GetPiecesByRangeError::NoClosestPiecesFound)?;
102+
103+
trace!(%peer_id, ?start, ?end, "Peer found");
104+
105+
// prepare stream channel
106+
let (mut tx, rx) =
107+
futures::channel::mpsc::channel::<PiecesToPlot>(PIECES_CHANNEL_BUFFER_SIZE);
108+
109+
// populate resulting stream in a separate async task
110+
let node = self.clone();
111+
tokio::spawn(async move {
112+
// indicates the next starting point for a request
113+
let mut starting_index_hash = start;
114+
loop {
115+
trace!(
116+
"Sending 'Piece-by-range' request to {} with {:?}",
117+
peer_id,
118+
starting_index_hash
119+
);
120+
// request data by range
121+
let response = node
122+
.send_generic_request(
123+
peer_id,
124+
PiecesByRangeRequest {
125+
start: starting_index_hash,
126+
end,
127+
},
128+
)
129+
.await
130+
.map_err(GetPiecesByRangeError::NodeRunnerDropped);
131+
132+
// send the result to the stream and exit on any error
133+
match response {
134+
Ok(PiecesByRangeResponse {
135+
pieces,
136+
next_piece_index_hash,
137+
}) => {
138+
// send last response data stream to the result stream
139+
if !pieces.piece_indexes.is_empty() && tx.send(pieces).await.is_err() {
140+
warn!("Piece-by-range request channel was closed.");
141+
break;
142+
}
143+
144+
// prepare the next starting point for data
145+
if let Some(next_piece_index_hash) = next_piece_index_hash {
146+
debug_assert_ne!(starting_index_hash, next_piece_index_hash);
147+
starting_index_hash = next_piece_index_hash;
148+
} else {
149+
// exit loop if the last response showed no remaining data
150+
break;
151+
}
152+
}
153+
Err(err) => {
154+
debug!(%err, "Piece-by-range request failed");
155+
break;
156+
}
157+
}
158+
}
159+
});
160+
161+
Ok(rx)
65162
}
66163
}
67164

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1-
use crate::{
2-
Config, PiecesByRangeRequest, PiecesByRangeRequestHandler, PiecesByRangeResponse, PiecesToPlot,
3-
};
1+
use super::DSNSync;
42
use futures::channel::{mpsc, oneshot};
53
use futures::{SinkExt, StreamExt};
6-
use libp2p::multiaddr::Protocol;
74
use parking_lot::Mutex;
85
use std::sync::atomic::{AtomicUsize, Ordering};
96
use std::sync::Arc;
107
use subspace_core_primitives::{crypto, FlatPieces, Piece, PieceIndexHash};
8+
use subspace_networking::libp2p::multiaddr::Protocol;
9+
use subspace_networking::{
10+
Config, PiecesByRangeRequest, PiecesByRangeRequestHandler, PiecesByRangeResponse, PiecesToPlot,
11+
};
1112

1213
#[tokio::test]
1314
async fn pieces_by_range_protocol_smoke() {
1415
let request = PiecesByRangeRequest {
15-
from: PieceIndexHash::from([1u8; 32]),
16-
to: PieceIndexHash::from([1u8; 32]),
16+
start: PieceIndexHash::from([1u8; 32]),
17+
end: PieceIndexHash::from([1u8; 32]),
1718
};
1819

1920
let piece_bytes: Vec<u8> = Piece::default().into();
@@ -41,7 +42,7 @@ async fn pieces_by_range_protocol_smoke() {
4142
})],
4243
..Config::with_generated_keypair()
4344
};
44-
let (node_1, mut node_runner_1) = crate::create(config_1).await.unwrap();
45+
let (node_1, mut node_runner_1) = subspace_networking::create(config_1).await.unwrap();
4546

4647
let (node_1_address_sender, node_1_address_receiver) = oneshot::channel();
4748
let on_new_listener_handler = node_1.on_new_listener(Arc::new({
@@ -72,7 +73,7 @@ async fn pieces_by_range_protocol_smoke() {
7273
..Config::with_generated_keypair()
7374
};
7475

75-
let (node_2, mut node_runner_2) = crate::create(config_2).await.unwrap();
76+
let (node_2, mut node_runner_2) = subspace_networking::create(config_2).await.unwrap();
7677
tokio::spawn(async move {
7778
node_runner_2.run().await;
7879
});
@@ -130,7 +131,7 @@ async fn get_pieces_by_range_smoke() {
130131
})
131132
} else {
132133
// New request starts from from the previous response.
133-
assert_eq!(req.from, piece_index_continue);
134+
assert_eq!(req.start, piece_index_continue);
134135

135136
Some(PiecesByRangeResponse {
136137
pieces: response_data[request_index].clone(),
@@ -140,7 +141,7 @@ async fn get_pieces_by_range_smoke() {
140141
})],
141142
..Config::with_generated_keypair()
142143
};
143-
let (node_1, mut node_runner_1) = crate::create(config_1).await.unwrap();
144+
let (node_1, mut node_runner_1) = subspace_networking::create(config_1).await.unwrap();
144145

145146
let (node_1_address_sender, node_1_address_receiver) = oneshot::channel();
146147
let on_new_listener_handler = node_1.on_new_listener(Arc::new({
@@ -171,13 +172,13 @@ async fn get_pieces_by_range_smoke() {
171172
..Config::with_generated_keypair()
172173
};
173174

174-
let (node_2, mut node_runner_2) = crate::create(config_2).await.unwrap();
175+
let (mut node_2, mut node_runner_2) = subspace_networking::create(config_2).await.unwrap();
175176
tokio::spawn(async move {
176177
node_runner_2.run().await;
177178
});
178179

179180
let mut stream = node_2
180-
.get_pieces_by_range(piece_index_from, piece_index_end)
181+
.get_pieces(piece_index_from..piece_index_end)
181182
.await
182183
.unwrap();
183184

crates/subspace-farmer/src/dsn/tests.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{sync, DSNSync, NoSync, PieceIndexHashNumber, SyncOptions};
1+
use super::{sync, DSNSync, NoSync, PieceIndexHashNumber, PiecesToPlot, SyncOptions};
22
use crate::bench_rpc_client::{BenchRpcClient, BENCH_FARMER_PROTOCOL_INFO};
33
use crate::legacy_multi_plots_farm::{LegacyMultiPlotsFarm, Options as MultiFarmingOptions};
44
use crate::single_plot_farm::PlotFactoryOptions;
@@ -18,7 +18,6 @@ use subspace_core_primitives::{
1818
PieceIndex, PieceIndexHash, RootBlock, Sha256Hash, PIECE_SIZE, U256,
1919
};
2020
use subspace_networking::libp2p::multiaddr::Protocol;
21-
use subspace_networking::PiecesToPlot;
2221
use tempfile::TempDir;
2322

2423
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]

crates/subspace-farmer/src/single_plot_farm.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ pub mod dsn_archiving;
33
mod tests;
44

55
use crate::commitments::{CommitmentError, Commitments};
6-
use crate::dsn;
7-
use crate::dsn::{PieceIndexHashNumber, SyncOptions};
6+
use crate::dsn::{self, PieceIndexHashNumber, SyncOptions};
87
use crate::farming::Farming;
98
use crate::identity::Identity;
109
use crate::object_mappings::ObjectMappings;
@@ -451,13 +450,14 @@ impl SinglePlotFarm {
451450
let codec = codec.clone();
452451

453452
// TODO: also ask for how many pieces to read
454-
move |&PiecesByRangeRequest { from, to }| {
455-
let mut pieces_and_indexes =
456-
plot.get_sequential_pieces(from, SYNC_PIECES_AT_ONCE).ok()?;
453+
move |&PiecesByRangeRequest { start, end }| {
454+
let mut pieces_and_indexes = plot
455+
.get_sequential_pieces(start, SYNC_PIECES_AT_ONCE)
456+
.ok()?;
457457

458458
let next_piece_index_hash = if let Some(idx) = pieces_and_indexes
459459
.iter()
460-
.position(|(piece_index, _)| PieceIndexHash::from_index(*piece_index) > to)
460+
.position(|(piece_index, _)| PieceIndexHash::from_index(*piece_index) > end)
461461
{
462462
pieces_and_indexes.truncate(idx);
463463
None
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
use futures::channel::oneshot;
2-
use futures::StreamExt;
32
use libp2p::multiaddr::Protocol;
4-
use libp2p::{identity, PeerId};
3+
use libp2p::PeerId;
54
use parking_lot::Mutex;
65
use std::sync::Arc;
76
use std::time::Duration;
8-
use subspace_core_primitives::{FlatPieces, Piece, PieceIndexHash};
9-
use subspace_networking::{
10-
Config, PiecesByRangeRequestHandler, PiecesByRangeResponse, PiecesToPlot,
11-
};
7+
use subspace_networking::Config;
128

139
#[tokio::main]
1410
async fn main() {
@@ -20,36 +16,11 @@ async fn main() {
2016
const TOTAL_NODE_COUNT: usize = 100;
2117
const EXPECTED_NODE_INDEX: usize = 75;
2218

23-
let expected_response = {
24-
let piece_bytes: Vec<u8> = Piece::default().into();
25-
let flat_pieces = FlatPieces::try_from(piece_bytes).unwrap();
26-
let pieces = PiecesToPlot {
27-
piece_indexes: vec![1],
28-
pieces: flat_pieces,
29-
};
30-
31-
PiecesByRangeResponse {
32-
pieces,
33-
next_piece_index_hash: None,
34-
}
35-
};
36-
3719
for i in 0..TOTAL_NODE_COUNT {
38-
let local_response = expected_response.clone();
3920
let config = Config {
4021
bootstrap_nodes: bootstrap_nodes.clone(),
4122
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
4223
allow_non_globals_in_dht: true,
43-
request_response_protocols: vec![PiecesByRangeRequestHandler::create(move |_| {
44-
if i != EXPECTED_NODE_INDEX {
45-
return None;
46-
}
47-
48-
println!("Sending response from Node Index {}... ", i);
49-
50-
std::thread::sleep(Duration::from_secs(1));
51-
Some(local_response.clone())
52-
})],
5324
..Config::with_generated_keypair()
5425
};
5526
let (node, mut node_runner) = subspace_networking::create(config).await.unwrap();
@@ -91,7 +62,6 @@ async fn main() {
9162
bootstrap_nodes,
9263
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
9364
allow_non_globals_in_dht: true,
94-
request_response_protocols: vec![PiecesByRangeRequestHandler::create(|_request| None)],
9565
..Config::with_generated_keypair()
9666
};
9767

@@ -106,37 +76,11 @@ async fn main() {
10676

10777
tokio::time::sleep(Duration::from_secs(1)).await;
10878

109-
let encoding = expected_node_id.as_ref().digest();
110-
let public_key = identity::PublicKey::from_protobuf_encoding(encoding)
111-
.expect("Invalid public key from PeerId.");
112-
let peer_id_public_key = if let identity::PublicKey::Sr25519(pk) = public_key {
113-
pk.encode()
114-
} else {
115-
panic!("Expected PublicKey::Sr25519")
116-
};
79+
let peer_id = node
80+
.get_closest_peers(expected_node_id.into())
81+
.await
82+
.unwrap()[0];
83+
assert_eq!(peer_id, expected_node_id);
11784

118-
// create a range from expected peer's public key
119-
let from = {
120-
let mut buf = peer_id_public_key;
121-
buf[16] = 0;
122-
PieceIndexHash::from(buf)
123-
};
124-
let to = {
125-
let mut buf = peer_id_public_key;
126-
buf[16] = 50;
127-
PieceIndexHash::from(buf)
128-
};
129-
130-
let stream_future = node.get_pieces_by_range(from, to);
131-
let mut stream = stream_future.await.unwrap();
132-
if let Some(value) = stream.next().await {
133-
if value != expected_response.pieces {
134-
panic!("UNEXPECTED RESPONSE")
135-
}
136-
137-
println!("Received expected response.");
138-
}
139-
140-
tokio::time::sleep(Duration::from_secs(1)).await;
14185
println!("Exiting..");
14286
}

0 commit comments

Comments
 (0)