@@ -6,7 +6,7 @@ use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics};
66use crate :: single_disk_farm:: {
77 BackgroundTaskError , Handlers , PlotMetadataHeader , RESERVED_PLOT_METADATA ,
88} ;
9- use async_lock:: { Mutex as AsyncMutex , RwLock as AsyncRwLock , Semaphore } ;
9+ use async_lock:: { Mutex as AsyncMutex , RwLock as AsyncRwLock , Semaphore , SemaphoreGuard } ;
1010use futures:: channel:: { mpsc, oneshot} ;
1111use futures:: stream:: FuturesOrdered ;
1212use futures:: { select, FutureExt , SinkExt , StreamExt } ;
@@ -123,8 +123,30 @@ where
123123 max_plotting_sectors_per_farm,
124124 } = plotting_options;
125125
126+ let sector_plotting_options = & sector_plotting_options;
126127 let plotting_semaphore = Semaphore :: new ( max_plotting_sectors_per_farm. get ( ) ) ;
127128 let mut sectors_being_plotted = FuturesOrdered :: new ( ) ;
129+ // Channel size is intentionally unbounded for easier analysis, but it is bounded by plotting
130+ // semaphore in practice due to permit stored in `SectorPlottingResult`
131+ let ( sector_plotting_result_sender, mut sector_plotting_result_receiver) = mpsc:: unbounded ( ) ;
132+ let process_plotting_result_fut = async move {
133+ while let Some ( sector_plotting_result) = sector_plotting_result_receiver. next ( ) . await {
134+ process_plotting_result (
135+ sector_plotting_result,
136+ sectors_metadata,
137+ sectors_being_modified,
138+ & mut metadata_header,
139+ Arc :: clone ( & sector_plotting_options. metadata_file ) ,
140+ )
141+ . await ?;
142+ }
143+
144+ unreachable ! (
145+ "Stream will not end before the rest of the plotting process is shutting down"
146+ ) ;
147+ } ;
148+ let process_plotting_result_fut = process_plotting_result_fut. fuse ( ) ;
149+ let mut process_plotting_result_fut = pin ! ( process_plotting_result_fut) ;
128150
129151 // Wait for new sectors to plot from `sectors_to_plot_receiver` and wait for sectors that
130152 // already started plotting to finish plotting and then update metadata header
@@ -138,7 +160,7 @@ where
138160 let sector_index = sector_to_plot. sector_index;
139161 let sector_plotting_init_fut = plot_single_sector(
140162 sector_to_plot,
141- & sector_plotting_options,
163+ sector_plotting_options,
142164 sectors_metadata,
143165 sectors_being_modified,
144166 & plotting_semaphore,
@@ -168,25 +190,23 @@ where
168190 break ;
169191 }
170192 maybe_sector_plotting_result = sectors_being_plotted. select_next_some( ) => {
171- process_plotting_result(
172- maybe_sector_plotting_result?,
173- sectors_metadata,
174- sectors_being_modified,
175- & mut metadata_header,
176- Arc :: clone( & sector_plotting_options. metadata_file)
177- ) . await ?;
193+ sector_plotting_result_sender
194+ . unbounded_send( maybe_sector_plotting_result?)
195+ . expect( "Sending means receiver is not dropped yet; qed" ) ;
196+ }
197+ result = process_plotting_result_fut => {
198+ return result;
178199 }
179200 }
180201 }
181202 }
182203 maybe_sector_plotting_result = sectors_being_plotted. select_next_some( ) => {
183- process_plotting_result(
184- maybe_sector_plotting_result?,
185- sectors_metadata,
186- sectors_being_modified,
187- & mut metadata_header,
188- Arc :: clone( & sector_plotting_options. metadata_file)
189- ) . await ?;
204+ sector_plotting_result_sender
205+ . unbounded_send( maybe_sector_plotting_result?)
206+ . expect( "Sending means receiver is not dropped yet; qed" ) ;
207+ }
208+ result = process_plotting_result_fut => {
209+ return result;
190210 }
191211 }
192212 }
@@ -195,7 +215,7 @@ where
195215}
196216
197217async fn process_plotting_result (
198- sector_plotting_result : SectorPlottingResult ,
218+ sector_plotting_result : SectorPlottingResult < ' _ > ,
199219 sectors_metadata : & AsyncRwLock < Vec < SectorMetadataChecksummed > > ,
200220 sectors_being_modified : & AsyncRwLock < HashSet < SectorIndex > > ,
201221 metadata_header : & mut PlotMetadataHeader ,
@@ -205,6 +225,7 @@ async fn process_plotting_result(
205225 sector_metadata,
206226 replotting,
207227 last_queued,
228+ plotting_permit,
208229 } = sector_plotting_result;
209230
210231 let sector_index = sector_metadata. sector_index ;
@@ -241,6 +262,8 @@ async fn process_plotting_result(
241262 }
242263 }
243264
265+ drop ( plotting_permit) ;
266+
244267 Ok ( ( ) )
245268}
246269
@@ -250,10 +273,11 @@ enum PlotSingleSectorResult<F> {
250273 FatalError ( PlottingError ) ,
251274}
252275
253- struct SectorPlottingResult {
276+ struct SectorPlottingResult < ' a > {
254277 sector_metadata : SectorMetadataChecksummed ,
255278 replotting : bool ,
256279 last_queued : bool ,
280+ plotting_permit : SemaphoreGuard < ' a > ,
257281}
258282
259283async fn plot_single_sector < ' a , NC > (
@@ -262,7 +286,9 @@ async fn plot_single_sector<'a, NC>(
262286 sectors_metadata : & ' a AsyncRwLock < Vec < SectorMetadataChecksummed > > ,
263287 sectors_being_modified : & ' a AsyncRwLock < HashSet < SectorIndex > > ,
264288 plotting_semaphore : & ' a Semaphore ,
265- ) -> PlotSingleSectorResult < impl Future < Output = Result < SectorPlottingResult , PlottingError > > + ' a >
289+ ) -> PlotSingleSectorResult <
290+ impl Future < Output = Result < SectorPlottingResult < ' a > , PlottingError > > + ' a ,
291+ >
266292where
267293 NC : NodeClient ,
268294{
@@ -478,12 +504,11 @@ where
478504 . sector_update
479505 . call_simple ( & ( sector_index, sector_state) ) ;
480506
481- drop ( plotting_permit) ;
482-
483507 Ok ( SectorPlottingResult {
484508 sector_metadata,
485509 replotting,
486510 last_queued,
511+ plotting_permit,
487512 } )
488513 } )
489514}
0 commit comments