Skip to content

Commit c5b44f4

Browse files
authored
Merge pull request #458 from subspace/remove-rpc-workarounds
Remove farmer workarounds for node's RPC
2 parents 09115cb + 78c4e92 commit c5b44f4

File tree

10 files changed

+43
-166
lines changed

10 files changed

+43
-166
lines changed

crates/sc-consensus-subspace-rpc/src/lib.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use std::marker::PhantomData;
4747
use std::sync::Arc;
4848
use std::time::Duration;
4949
use subspace_archiving::archiver::ArchivedSegment;
50-
use subspace_core_primitives::{BlockNumber, Solution};
50+
use subspace_core_primitives::Solution;
5151
use subspace_rpc_primitives::{
5252
BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse,
5353
};
@@ -61,10 +61,6 @@ pub trait SubspaceRpcApi {
6161
#[method(name = "subspace_getFarmerMetadata")]
6262
fn get_farmer_metadata(&self) -> RpcResult<FarmerMetadata>;
6363

64-
/// Get best block number
65-
#[method(name = "subspace_getBestBlockNumber")]
66-
fn get_best_block_number(&self) -> RpcResult<BlockNumber>;
67-
6864
#[method(name = "subspace_submitSolutionResponse")]
6965
fn submit_solution_response(&self, solution_response: SolutionResponse) -> RpcResult<()>;
7066

@@ -209,15 +205,6 @@ where
209205
})
210206
}
211207

212-
fn get_best_block_number(&self) -> RpcResult<BlockNumber> {
213-
let best_number = TryInto::<BlockNumber>::try_into(self.client.info().best_number)
214-
.unwrap_or_else(|_| {
215-
panic!("Block number can't be converted into BlockNumber");
216-
});
217-
218-
Ok(best_number)
219-
}
220-
221208
fn submit_solution_response(&self, solution_response: SolutionResponse) -> RpcResult<()> {
222209
let solution_response_senders = self.solution_response_senders.clone();
223210

crates/subspace-farmer/src/archiving.rs

Lines changed: 1 addition & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
use crate::object_mappings::ObjectMappings;
22
use crate::rpc_client::RpcClient;
3-
use futures::{SinkExt, StreamExt};
4-
use std::time::Duration;
53
use subspace_archiving::archiver::ArchivedSegment;
64
use subspace_core_primitives::objects::{GlobalObject, PieceObject, PieceObjectMapping};
75
use subspace_core_primitives::{FlatPieces, Sha256Hash};
86
use subspace_rpc_primitives::FarmerMetadata;
97
use thiserror::Error;
108
use tokio::sync::oneshot;
119
use tokio::task::JoinHandle;
12-
use tracing::{debug, error, info, warn};
13-
14-
const BEST_BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
10+
use tracing::{debug, error, info};
1511

1612
#[derive(Debug, Error)]
1713
pub enum ArchivingError {
@@ -52,7 +48,6 @@ impl Archiving {
5248
farmer_metadata: FarmerMetadata,
5349
object_mappings: ObjectMappings,
5450
client: Client,
55-
best_block_number_check_interval: Duration,
5651
mut on_pieces_to_plot: OPTP,
5752
) -> Result<Archiving, ArchivingError>
5853
where
@@ -126,38 +121,6 @@ impl Archiving {
126121
.await
127122
.map_err(ArchivingError::RpcError)?;
128123

129-
let (mut best_block_number_sender, mut best_block_number_receiver) =
130-
futures::channel::mpsc::channel(1);
131-
132-
tokio::spawn({
133-
let client = client.clone();
134-
135-
async move {
136-
loop {
137-
tokio::time::sleep(best_block_number_check_interval).await;
138-
139-
// In case connection dies, we need to disconnect from the node
140-
let best_block_number_result = tokio::time::timeout(
141-
BEST_BLOCK_REQUEST_TIMEOUT,
142-
client.best_block_number(),
143-
)
144-
.await;
145-
146-
let is_error = !matches!(best_block_number_result, Ok(Ok(_)));
147-
// Result doesn't matter here
148-
let _ = best_block_number_sender
149-
.send(best_block_number_result)
150-
.await;
151-
152-
if is_error {
153-
break;
154-
}
155-
}
156-
}
157-
});
158-
159-
let mut last_best_block_number_error = false;
160-
161124
let archiving_handle = tokio::spawn(async move {
162125
// Listen for new blocks produced on the network
163126
loop {
@@ -189,36 +152,6 @@ impl Archiving {
189152
}
190153
}
191154
}
192-
maybe_result = best_block_number_receiver.next() => {
193-
match maybe_result {
194-
Some(Ok(Ok(best_block_number))) => {
195-
debug!(best_block_number);
196-
last_best_block_number_error = false;
197-
}
198-
Some(Ok(Err(error))) => {
199-
if last_best_block_number_error {
200-
error!(%error, "Request to get new best block failed second time");
201-
break;
202-
} else {
203-
warn!(%error, "Request to get new best block failed");
204-
last_best_block_number_error = true;
205-
}
206-
}
207-
Some(Err(_error)) => {
208-
if last_best_block_number_error {
209-
error!("Request to get new best block timed out second time");
210-
break;
211-
} else {
212-
warn!("Request to get new best block timed out");
213-
last_best_block_number_error = true;
214-
}
215-
}
216-
None => {
217-
debug!("Best block number channel closed!");
218-
break;
219-
}
220-
}
221-
}
222155
}
223156
}
224157
});

crates/subspace-farmer/src/bin/subspace-farmer/bench_rpc_client.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
1-
use std::sync::Arc;
2-
31
use async_trait::async_trait;
2+
use std::sync::Arc;
43
use subspace_archiving::archiver::ArchivedSegment;
5-
use subspace_core_primitives::BlockNumber;
4+
use subspace_farmer::{RpcClient, RpcClientError as MockError};
65
use subspace_rpc_primitives::{
76
BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse,
87
};
9-
use tokio::sync::mpsc::Receiver;
108
use tokio::sync::{mpsc, Mutex};
119
use tokio::task::JoinHandle;
1210

13-
use subspace_farmer::{RpcClient, RpcClientError as MockError};
14-
1511
/// Client mock for benching purpose
1612
#[derive(Clone, Debug)]
1713
pub struct BenchRpcClient {
@@ -70,11 +66,6 @@ impl RpcClient for BenchRpcClient {
7066
Ok(self.inner.metadata.clone())
7167
}
7268

73-
async fn best_block_number(&self) -> Result<BlockNumber, MockError> {
74-
// Doesn't matter for tests (at least yet)
75-
Ok(BlockNumber::MAX)
76-
}
77-
7869
async fn subscribe_slot_info(&self) -> Result<mpsc::Receiver<SlotInfo>, MockError> {
7970
unreachable!("Unreachable, as we don't start farming for benchmarking")
8071
}
@@ -86,7 +77,7 @@ impl RpcClient for BenchRpcClient {
8677
unreachable!("Unreachable, as we don't start farming for benchmarking")
8778
}
8879

89-
async fn subscribe_block_signing(&self) -> Result<Receiver<BlockSigningInfo>, MockError> {
80+
async fn subscribe_block_signing(&self) -> Result<mpsc::Receiver<BlockSigningInfo>, MockError> {
9081
unreachable!("Unreachable, as we don't start farming for benchmarking")
9182
}
9283

@@ -97,7 +88,9 @@ impl RpcClient for BenchRpcClient {
9788
unreachable!("Unreachable, as we don't start farming for benchmarking")
9889
}
9990

100-
async fn subscribe_archived_segments(&self) -> Result<Receiver<ArchivedSegment>, MockError> {
91+
async fn subscribe_archived_segments(
92+
&self,
93+
) -> Result<mpsc::Receiver<ArchivedSegment>, MockError> {
10194
let (sender, receiver) = mpsc::channel(10);
10295
let archived_segments_receiver = self.inner.archived_segments_receiver.clone();
10396
tokio::spawn(async move {

crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use jsonrpsee::ws_server::WsServerBuilder;
33
use rand::prelude::*;
44
use std::path::PathBuf;
55
use std::sync::Arc;
6-
use std::time::Duration;
76
use std::{io, mem};
87
use subspace_archiving::archiver::ArchivedSegment;
98
use subspace_core_primitives::objects::{PieceObject, PieceObjectMapping};
@@ -93,7 +92,6 @@ pub(crate) async fn farm(
9392
plot_size,
9493
max_plot_size,
9594
}: FarmingArgs,
96-
best_block_number_check_interval: Duration,
9795
) -> Result<(), anyhow::Error> {
9896
raise_fd_limit();
9997

@@ -136,7 +134,6 @@ pub(crate) async fn farm(
136134
client,
137135
object_mappings: object_mappings.clone(),
138136
reward_address,
139-
best_block_number_check_interval,
140137
},
141138
plot_size,
142139
max_plot_size,
@@ -228,7 +225,6 @@ pub(crate) async fn bench(
228225
custom_path: Option<PathBuf>,
229226
plot_size: u64,
230227
max_plot_size: Option<u64>,
231-
best_block_number_check_interval: Duration,
232228
write_to_disk: WriteToDisk,
233229
write_pieces_size: u64,
234230
) -> anyhow::Result<()> {
@@ -281,7 +277,6 @@ pub(crate) async fn bench(
281277
client: client.clone(),
282278
object_mappings: object_mappings.clone(),
283279
reward_address: PublicKey::default(),
284-
best_block_number_check_interval,
285280
},
286281
plot_size,
287282
max_plot_size,

crates/subspace-farmer/src/bin/subspace-farmer/main.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use clap::{ArgEnum, Parser, ValueHint};
77
use sp_core::crypto::PublicError;
88
use std::net::SocketAddr;
99
use std::path::PathBuf;
10-
use std::time::Duration;
1110
use subspace_core_primitives::PublicKey;
1211
use subspace_networking::libp2p::Multiaddr;
1312
use tracing::info;
@@ -18,8 +17,6 @@ use tracing_subscriber::{
1817
EnvFilter,
1918
};
2019

21-
const BEST_BLOCK_NUMBER_CHECK_INTERVAL: Duration = Duration::from_secs(5);
22-
2320
/// Arguments for farmer
2421
#[derive(Debug, Parser)]
2522
struct FarmingArgs {
@@ -150,7 +147,7 @@ async fn main() -> Result<()> {
150147
info!("Done");
151148
}
152149
Command::Farm(args) => {
153-
commands::farm(args, BEST_BLOCK_NUMBER_CHECK_INTERVAL).await?;
150+
commands::farm(args).await?;
154151
}
155152
Command::Bench {
156153
custom_path,
@@ -163,7 +160,6 @@ async fn main() -> Result<()> {
163160
custom_path,
164161
plot_size,
165162
max_plot_size,
166-
BEST_BLOCK_NUMBER_CHECK_INTERVAL,
167163
write_to_disk,
168164
write_pieces_size,
169165
)

crates/subspace-farmer/src/mock_rpc_client.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@ use crate::rpc_client::{Error as MockError, RpcClient};
22
use async_trait::async_trait;
33
use std::sync::Arc;
44
use subspace_archiving::archiver::ArchivedSegment;
5-
use subspace_core_primitives::BlockNumber;
65
use subspace_rpc_primitives::{
76
BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse,
87
};
9-
use tokio::sync::mpsc::Receiver;
108
use tokio::sync::{mpsc, Mutex};
119

1210
/// `MockRpc` wrapper.
@@ -135,11 +133,6 @@ impl RpcClient for MockRpcClient {
135133
Ok(self.inner.metadata_receiver.lock().await.try_recv()?)
136134
}
137135

138-
async fn best_block_number(&self) -> Result<BlockNumber, MockError> {
139-
// Doesn't matter for tests (at least yet)
140-
Ok(0)
141-
}
142-
143136
async fn subscribe_slot_info(&self) -> Result<mpsc::Receiver<SlotInfo>, MockError> {
144137
let (sender, receiver) = mpsc::channel(10);
145138
let slot_receiver = self.inner.slot_info_receiver.clone();
@@ -164,7 +157,7 @@ impl RpcClient for MockRpcClient {
164157
Ok(())
165158
}
166159

167-
async fn subscribe_block_signing(&self) -> Result<Receiver<BlockSigningInfo>, MockError> {
160+
async fn subscribe_block_signing(&self) -> Result<mpsc::Receiver<BlockSigningInfo>, MockError> {
168161
let (sender, receiver) = mpsc::channel(10);
169162
let block_signing_receiver = self.inner.block_signing_info_receiver.clone();
170163
tokio::spawn(async move {
@@ -188,7 +181,9 @@ impl RpcClient for MockRpcClient {
188181
Ok(())
189182
}
190183

191-
async fn subscribe_archived_segments(&self) -> Result<Receiver<ArchivedSegment>, MockError> {
184+
async fn subscribe_archived_segments(
185+
&self,
186+
) -> Result<mpsc::Receiver<ArchivedSegment>, MockError> {
192187
let (sender, receiver) = mpsc::channel(10);
193188
let archived_segments_receiver = self.inner.archived_segments_receiver.clone();
194189
tokio::spawn(async move {

crates/subspace-farmer/src/multi_farming.rs

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use crate::{
44
use anyhow::anyhow;
55
use futures::stream::{FuturesUnordered, StreamExt};
66
use rayon::prelude::*;
7-
use std::{path::PathBuf, sync::Arc, time::Duration};
7+
use std::path::PathBuf;
8+
use std::sync::Arc;
89
use subspace_core_primitives::{PublicKey, PIECE_SIZE};
910
use subspace_solving::SubspaceCodec;
1011
use tracing::info;
@@ -43,7 +44,6 @@ pub struct Options<C: RpcClient> {
4344
pub client: C,
4445
pub object_mappings: ObjectMappings,
4546
pub reward_address: PublicKey,
46-
pub best_block_number_check_interval: Duration,
4747
}
4848

4949
impl MultiFarming {
@@ -54,7 +54,6 @@ impl MultiFarming {
5454
client,
5555
object_mappings,
5656
reward_address,
57-
best_block_number_check_interval,
5857
}: Options<C>,
5958
total_plot_size: u64,
6059
max_plot_size: u64,
@@ -125,34 +124,28 @@ impl MultiFarming {
125124
.map_err(|error| anyhow!(error))?;
126125

127126
// Start archiving task
128-
let archiving = Archiving::start(
129-
farmer_metadata,
130-
object_mappings,
131-
client.clone(),
132-
best_block_number_check_interval,
133-
{
134-
let mut on_pieces_to_plots = plots
135-
.iter()
136-
.zip(subspace_codecs)
137-
.zip(&commitments)
138-
.map(|((plot, subspace_codec), commitments)| {
139-
plotting::plot_pieces(subspace_codec, plot, commitments.clone())
127+
let archiving = Archiving::start(farmer_metadata, object_mappings, client.clone(), {
128+
let mut on_pieces_to_plots = plots
129+
.iter()
130+
.zip(subspace_codecs)
131+
.zip(&commitments)
132+
.map(|((plot, subspace_codec), commitments)| {
133+
plotting::plot_pieces(subspace_codec, plot, commitments.clone())
134+
})
135+
.collect::<Vec<_>>();
136+
137+
move |pieces_to_plot| {
138+
on_pieces_to_plots
139+
.par_iter_mut()
140+
.map(|on_pieces_to_plot| {
141+
// TODO: It might be desirable to not clone it and instead pick just
142+
// unnecessary pieces and copy pieces once since different plots will
143+
// care about different pieces
144+
on_pieces_to_plot(pieces_to_plot.clone())
140145
})
141-
.collect::<Vec<_>>();
142-
143-
move |pieces_to_plot| {
144-
on_pieces_to_plots
145-
.par_iter_mut()
146-
.map(|on_pieces_to_plot| {
147-
// TODO: It might be desirable to not clone it and instead pick just
148-
// unnecessary pieces and copy pieces once since different plots will
149-
// care about different pieces
150-
on_pieces_to_plot(pieces_to_plot.clone())
151-
})
152-
.reduce(|| true, |result, should_continue| result && should_continue)
153-
}
154-
},
155-
)
146+
.reduce(|| true, |result, should_continue| result && should_continue)
147+
}
148+
})
156149
.await?;
157150

158151
Ok(Self {

0 commit comments

Comments
 (0)