Skip to content

Commit f2ec4ed

Browse files
committed
Fix subscription leakage on the farmer
1 parent b027b5f commit f2ec4ed

File tree

7 files changed

+114
-84
lines changed

7 files changed

+114
-84
lines changed

crates/subspace-farmer/src/archiving.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::object_mappings::ObjectMappings;
22
use crate::rpc_client::RpcClient;
3+
use futures::StreamExt;
34
use subspace_archiving::archiver::ArchivedSegment;
45
use subspace_core_primitives::objects::{GlobalObject, PieceObject, PieceObjectMapping};
56
use subspace_core_primitives::{FlatPieces, Sha256Hash};
@@ -129,7 +130,7 @@ impl Archiving {
129130
info!("Plotting stopped!");
130131
break;
131132
}
132-
result = archived_segments.recv() => {
133+
result = archived_segments.next() => {
133134
match result {
134135
Some(archived_segment) => {
135136
let segment_index = archived_segment.root_block.segment_index();

crates/subspace-farmer/src/bin/subspace-farmer/bench_rpc_client.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
use async_trait::async_trait;
2+
use futures::channel::mpsc;
3+
use futures::{SinkExt, Stream, StreamExt};
4+
use std::pin::Pin;
25
use std::sync::Arc;
36
use subspace_archiving::archiver::ArchivedSegment;
47
use subspace_farmer::{RpcClient, RpcClientError as MockError};
58
use subspace_rpc_primitives::{
69
BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse,
710
};
8-
use tokio::sync::{mpsc, Mutex};
11+
use tokio::sync::Mutex;
912
use tokio::task::JoinHandle;
1013

1114
/// Client mock for benching purpose
@@ -28,17 +31,18 @@ impl BenchRpcClient {
2831
metadata: FarmerMetadata,
2932
mut archived_segments_receiver: mpsc::Receiver<ArchivedSegment>,
3033
) -> Self {
31-
let (inner_archived_segments_sender, inner_archived_segments_receiver) = mpsc::channel(10);
34+
let (mut inner_archived_segments_sender, inner_archived_segments_receiver) =
35+
mpsc::channel(10);
3236
let (acknowledge_archived_segment_sender, mut acknowledge_archived_segment_receiver) =
3337
mpsc::channel(1);
3438

3539
let segment_producer_handle = tokio::spawn({
3640
async move {
37-
while let Some(segment) = archived_segments_receiver.recv().await {
41+
while let Some(segment) = archived_segments_receiver.next().await {
3842
if inner_archived_segments_sender.send(segment).await.is_err() {
3943
break;
4044
}
41-
if acknowledge_archived_segment_receiver.recv().await.is_none() {
45+
if acknowledge_archived_segment_receiver.next().await.is_none() {
4246
break;
4347
}
4448
}
@@ -66,7 +70,9 @@ impl RpcClient for BenchRpcClient {
6670
Ok(self.inner.metadata.clone())
6771
}
6872

69-
async fn subscribe_slot_info(&self) -> Result<mpsc::Receiver<SlotInfo>, MockError> {
73+
async fn subscribe_slot_info(
74+
&self,
75+
) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, MockError> {
7076
unreachable!("Unreachable, as we don't start farming for benchmarking")
7177
}
7278

@@ -77,7 +83,9 @@ impl RpcClient for BenchRpcClient {
7783
unreachable!("Unreachable, as we don't start farming for benchmarking")
7884
}
7985

80-
async fn subscribe_block_signing(&self) -> Result<mpsc::Receiver<BlockSigningInfo>, MockError> {
86+
async fn subscribe_block_signing(
87+
&self,
88+
) -> Result<Pin<Box<dyn Stream<Item = BlockSigningInfo> + Send + 'static>>, MockError> {
8189
unreachable!("Unreachable, as we don't start farming for benchmarking")
8290
}
8391

@@ -90,24 +98,25 @@ impl RpcClient for BenchRpcClient {
9098

9199
async fn subscribe_archived_segments(
92100
&self,
93-
) -> Result<mpsc::Receiver<ArchivedSegment>, MockError> {
94-
let (sender, receiver) = mpsc::channel(10);
101+
) -> Result<Pin<Box<dyn Stream<Item = ArchivedSegment> + Send + 'static>>, MockError> {
102+
let (mut sender, receiver) = mpsc::channel(10);
95103
let archived_segments_receiver = self.inner.archived_segments_receiver.clone();
96104
tokio::spawn(async move {
97-
while let Some(archived_segment) = archived_segments_receiver.lock().await.recv().await
105+
while let Some(archived_segment) = archived_segments_receiver.lock().await.next().await
98106
{
99107
if sender.send(archived_segment).await.is_err() {
100108
break;
101109
}
102110
}
103111
});
104112

105-
Ok(receiver)
113+
Ok(Box::pin(receiver))
106114
}
107115

108116
async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), MockError> {
109117
self.inner
110118
.acknowledge_archived_segment_sender
119+
.clone()
111120
.send(segment_index)
112121
.await?;
113122
Ok(())

crates/subspace-farmer/src/bin/subspace-farmer/commands/bench.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
use std::path::{Path, PathBuf};
2-
use std::{fmt, io};
3-
1+
use crate::bench_rpc_client::BenchRpcClient;
2+
use crate::{utils, WriteToDisk};
43
use anyhow::anyhow;
4+
use futures::channel::mpsc;
5+
use futures::SinkExt;
56
use rand::prelude::*;
6-
use tempfile::TempDir;
7-
use tracing::info;
8-
7+
use std::path::{Path, PathBuf};
8+
use std::{fmt, io};
99
use subspace_archiving::archiver::ArchivedSegment;
1010
use subspace_core_primitives::objects::{PieceObject, PieceObjectMapping};
1111
use subspace_core_primitives::{
@@ -15,10 +15,9 @@ use subspace_core_primitives::{
1515
use subspace_farmer::multi_farming::{MultiFarming, Options as MultiFarmingOptions};
1616
use subspace_farmer::{ObjectMappings, PieceOffset, Plot, PlotFile, RpcClient};
1717
use subspace_rpc_primitives::FarmerMetadata;
18+
use tempfile::TempDir;
1819
use tokio::time::Instant;
19-
20-
use crate::bench_rpc_client::BenchRpcClient;
21-
use crate::{utils, WriteToDisk};
20+
use tracing::info;
2221

2322
pub struct BenchPlotMock {
2423
piece_count: u64,
@@ -90,7 +89,7 @@ pub(crate) async fn bench(
9089
) -> anyhow::Result<()> {
9190
utils::raise_fd_limit();
9291

93-
let (archived_segments_sender, archived_segments_receiver) = tokio::sync::mpsc::channel(10);
92+
let (mut archived_segments_sender, archived_segments_receiver) = mpsc::channel(10);
9493
let client = BenchRpcClient::new(BENCH_FARMER_METADATA, archived_segments_receiver);
9594

9695
let base_directory = crate::utils::get_path(custom_path);

crates/subspace-farmer/src/farming.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::commitments::Commitments;
77
use crate::identity::Identity;
88
use crate::plot::Plot;
99
use crate::rpc_client::RpcClient;
10-
use futures::{future, future::Either};
10+
use futures::{future, future::Either, StreamExt};
1111
use std::sync::mpsc;
1212
use std::time::Instant;
1313
use subspace_core_primitives::{LocalChallenge, PublicKey, Salt, Solution};
@@ -123,7 +123,7 @@ async fn subscribe_to_slot_info<T: RpcClient>(
123123

124124
let mut salts = Salts::default();
125125

126-
while let Some(slot_info) = slot_info_notifications.recv().await {
126+
while let Some(slot_info) = slot_info_notifications.next().await {
127127
debug!(?slot_info, "New slot");
128128

129129
update_commitments(plot, commitments, &mut salts, &slot_info);
@@ -184,7 +184,7 @@ async fn subscribe_to_slot_info<T: RpcClient>(
184184
if let Some(BlockSigningInfo {
185185
header_hash,
186186
public_key,
187-
}) = block_signing_info_notifications.recv().await
187+
}) = block_signing_info_notifications.next().await
188188
{
189189
// Multiple plots might have solved, only sign with correct one
190190
if identity.public_key().to_bytes() != public_key {

crates/subspace-farmer/src/mock_rpc_client.rs

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
use crate::rpc_client::{Error as MockError, RpcClient};
22
use async_trait::async_trait;
3+
use futures::channel::mpsc;
4+
use futures::{SinkExt, Stream, StreamExt};
5+
use std::pin::Pin;
36
use std::sync::Arc;
47
use subspace_archiving::archiver::ArchivedSegment;
58
use subspace_rpc_primitives::{
69
BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse,
710
};
8-
use tokio::sync::{mpsc, Mutex};
11+
use tokio::sync::Mutex;
912

1013
/// `MockRpc` wrapper.
1114
#[derive(Clone, Debug)]
@@ -71,23 +74,28 @@ impl MockRpcClient {
7174
}
7275

7376
pub(crate) async fn send_metadata(&self, metadata: FarmerMetadata) {
74-
self.inner.metadata_sender.send(metadata).await.unwrap();
77+
self.inner
78+
.metadata_sender
79+
.clone()
80+
.send(metadata)
81+
.await
82+
.unwrap();
7583
}
7684

7785
pub(crate) async fn send_slot_info(&self, slot_info: SlotInfo) {
7886
self.inner
7987
.slot_into_sender
8088
.lock()
8189
.await
82-
.as_ref()
90+
.as_mut()
8391
.unwrap()
8492
.send(slot_info)
8593
.await
8694
.unwrap();
8795
}
8896

8997
pub(crate) async fn receive_solution(&self) -> Option<SolutionResponse> {
90-
self.inner.solution_receiver.lock().await.recv().await
98+
self.inner.solution_receiver.lock().await.next().await
9199
}
92100

93101
pub(crate) async fn drop_slot_sender(&self) {
@@ -99,7 +107,7 @@ impl MockRpcClient {
99107
.archived_segments_sender
100108
.lock()
101109
.await
102-
.as_ref()
110+
.as_mut()
103111
.unwrap()
104112
.send(archived_segment)
105113
.await
@@ -112,7 +120,7 @@ impl MockRpcClient {
112120
acknowledge_archived_segment_receiver
113121
.lock()
114122
.await
115-
.recv()
123+
.next()
116124
.await;
117125
});
118126
}
@@ -130,19 +138,29 @@ impl MockRpcClient {
130138
#[async_trait]
131139
impl RpcClient for MockRpcClient {
132140
async fn farmer_metadata(&self) -> Result<FarmerMetadata, MockError> {
133-
Ok(self.inner.metadata_receiver.lock().await.try_recv()?)
141+
Ok(self
142+
.inner
143+
.metadata_receiver
144+
.lock()
145+
.await
146+
.try_next()?
147+
.unwrap())
134148
}
135149

136-
async fn subscribe_slot_info(&self) -> Result<mpsc::Receiver<SlotInfo>, MockError> {
137-
let (sender, receiver) = mpsc::channel(10);
150+
async fn subscribe_slot_info(
151+
&self,
152+
) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, MockError> {
153+
let (mut sender, receiver) = mpsc::channel(10);
138154
let slot_receiver = self.inner.slot_info_receiver.clone();
139155
tokio::spawn(async move {
140-
while let Some(slot_info) = slot_receiver.lock().await.recv().await {
141-
sender.send(slot_info).await.unwrap();
156+
while let Some(slot_info) = slot_receiver.lock().await.next().await {
157+
if sender.send(slot_info).await.is_err() {
158+
break;
159+
}
142160
}
143161
});
144162

145-
Ok(receiver)
163+
Ok(Box::pin(receiver))
146164
}
147165

148166
async fn submit_solution_response(
@@ -151,22 +169,27 @@ impl RpcClient for MockRpcClient {
151169
) -> Result<(), MockError> {
152170
self.inner
153171
.solution_sender
172+
.clone()
154173
.send(solution_response)
155174
.await
156175
.unwrap();
157176
Ok(())
158177
}
159178

160-
async fn subscribe_block_signing(&self) -> Result<mpsc::Receiver<BlockSigningInfo>, MockError> {
161-
let (sender, receiver) = mpsc::channel(10);
179+
async fn subscribe_block_signing(
180+
&self,
181+
) -> Result<Pin<Box<dyn Stream<Item = BlockSigningInfo> + Send + 'static>>, MockError> {
182+
let (mut sender, receiver) = mpsc::channel(10);
162183
let block_signing_receiver = self.inner.block_signing_info_receiver.clone();
163184
tokio::spawn(async move {
164-
while let Some(block_signing_info) = block_signing_receiver.lock().await.recv().await {
165-
sender.send(block_signing_info).await.unwrap();
185+
while let Some(block_signing_info) = block_signing_receiver.lock().await.next().await {
186+
if sender.send(block_signing_info).await.is_err() {
187+
break;
188+
}
166189
}
167190
});
168191

169-
Ok(receiver)
192+
Ok(Box::pin(receiver))
170193
}
171194

172195
async fn submit_block_signature(
@@ -175,6 +198,7 @@ impl RpcClient for MockRpcClient {
175198
) -> Result<(), MockError> {
176199
self.inner
177200
.block_signature_sender
201+
.clone()
178202
.send(block_signature)
179203
.await
180204
.unwrap();
@@ -183,22 +207,25 @@ impl RpcClient for MockRpcClient {
183207

184208
async fn subscribe_archived_segments(
185209
&self,
186-
) -> Result<mpsc::Receiver<ArchivedSegment>, MockError> {
187-
let (sender, receiver) = mpsc::channel(10);
210+
) -> Result<Pin<Box<dyn Stream<Item = ArchivedSegment> + Send + 'static>>, MockError> {
211+
let (mut sender, receiver) = mpsc::channel(10);
188212
let archived_segments_receiver = self.inner.archived_segments_receiver.clone();
189213
tokio::spawn(async move {
190-
while let Some(archived_segment) = archived_segments_receiver.lock().await.recv().await
214+
while let Some(archived_segment) = archived_segments_receiver.lock().await.next().await
191215
{
192-
sender.send(archived_segment).await.unwrap();
216+
if sender.send(archived_segment).await.is_err() {
217+
break;
218+
}
193219
}
194220
});
195221

196-
Ok(receiver)
222+
Ok(Box::pin(receiver))
197223
}
198224

199225
async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), MockError> {
200226
self.inner
201227
.acknowledge_archived_segment_sender
228+
.clone()
202229
.send(segment_index)
203230
.await
204231
.unwrap();

0 commit comments

Comments
 (0)