@@ -5,9 +5,12 @@ use crate::commitments::Commitments;
55use crate :: object_mappings:: ObjectMappings ;
66use crate :: plot:: Plot ;
77use crate :: rpc:: RpcClient ;
8- use log:: { debug, error, info} ;
8+ use futures:: channel:: mpsc;
9+ use futures:: { SinkExt , StreamExt } ;
10+ use log:: { debug, error, info, warn} ;
911use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
1012use std:: sync:: Arc ;
13+ use std:: time:: Duration ;
1114use subspace_archiving:: archiver:: { ArchivedSegment , Archiver } ;
1215use subspace_core_primitives:: objects:: { GlobalObject , PieceObject , PieceObjectMapping } ;
1316use subspace_core_primitives:: Sha256Hash ;
@@ -17,6 +20,8 @@ use thiserror::Error;
1720use tokio:: sync:: oneshot:: Receiver ;
1821use tokio:: { sync:: oneshot, task:: JoinHandle } ;
1922
23+ const BEST_BLOCK_REQUEST_TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
24+
2025#[ derive( Debug , Error ) ]
2126pub enum PlottingError {
2227 #[ error( "Plot is empty on restart, can't continue" ) ]
@@ -41,29 +46,48 @@ pub struct Plotting {
4146 handle : Option < JoinHandle < Result < ( ) , PlottingError > > > ,
4247}
4348
49+ pub struct FarmerData {
50+ plot : Plot ,
51+ commitments : Commitments ,
52+ object_mappings : ObjectMappings ,
53+ metadata : FarmerMetadata ,
54+ }
55+
56+ impl FarmerData {
57+ pub fn new (
58+ plot : Plot ,
59+ commitments : Commitments ,
60+ object_mappings : ObjectMappings ,
61+ metadata : FarmerMetadata ,
62+ ) -> Self {
63+ Self {
64+ plot,
65+ commitments,
66+ object_mappings,
67+ metadata,
68+ }
69+ }
70+ }
71+
4472/// Assumes `plot`, `commitment`, `object_mappings`, `client` and `identity` are already initialized
4573impl Plotting {
4674 /// Returns an instance of plotting, and also starts a concurrent background plotting task
4775 pub fn start < T : RpcClient + Clone + Send + Sync + ' static > (
48- plot : Plot ,
49- commitments : Commitments ,
50- object_mappings : ObjectMappings ,
76+ farmer_data : FarmerData ,
5177 client : T ,
52- farmer_metadata : FarmerMetadata ,
5378 subspace_codec : SubspaceCodec ,
79+ best_block_number_check_interval : Duration ,
5480 ) -> Self {
5581 // Oneshot channels, that will be used for interrupt/stop the process
5682 let ( stop_sender, stop_receiver) = oneshot:: channel ( ) ;
5783
5884 // Get a handle for the background task, so that we can wait on it later if we want to
5985 let plotting_handle = tokio:: spawn ( async move {
6086 background_plotting (
87+ farmer_data,
6188 client,
62- plot,
63- commitments,
64- object_mappings,
65- farmer_metadata,
6689 subspace_codec,
90+ best_block_number_check_interval,
6791 stop_receiver,
6892 )
6993 . await
@@ -95,26 +119,24 @@ impl Drop for Plotting {
95119// don't want eventually
96120/// Maintains plot in up to date state plotting new pieces as they are produced on the network.
97121async fn background_plotting < T : RpcClient + Clone + Send + ' static > (
122+ farmer_data : FarmerData ,
98123 client : T ,
99- plot : Plot ,
100- commitments : Commitments ,
101- object_mappings : ObjectMappings ,
102- farmer_metadata : FarmerMetadata ,
103124 mut subspace_codec : SubspaceCodec ,
125+ best_block_number_check_interval : Duration ,
104126 mut stop_receiver : Receiver < ( ) > ,
105127) -> Result < ( ) , PlottingError > {
106- let weak_plot = plot. downgrade ( ) ;
128+ let weak_plot = farmer_data . plot . downgrade ( ) ;
107129 let FarmerMetadata {
108130 confirmation_depth_k,
109131 record_size,
110132 recorded_history_segment_size,
111- } = farmer_metadata ;
133+ } = farmer_data . metadata ;
112134
113135 // TODO: This assumes fixed size segments, which might not be the case
114136 let merkle_num_leaves = u64:: from ( recorded_history_segment_size / record_size * 2 ) ;
115137
116138 let maybe_last_root_block = tokio:: task:: spawn_blocking ( {
117- let plot = plot. clone ( ) ;
139+ let plot = farmer_data . plot . clone ( ) ;
118140
119141 move || plot. get_last_root_block ( ) . map_err ( PlottingError :: LastBlock )
120142 } )
@@ -123,7 +145,7 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
123145
124146 let mut archiver = if let Some ( last_root_block) = maybe_last_root_block {
125147 // Continuing from existing initial state
126- if plot. is_empty ( ) {
148+ if farmer_data . plot . is_empty ( ) {
127149 return Err ( PlottingError :: ContinueError ) ;
128150 }
129151
@@ -153,12 +175,12 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
153175 }
154176 } else {
155177 // Starting from genesis
156- if !plot. is_empty ( ) {
178+ if !farmer_data . plot . is_empty ( ) {
157179 // Restart before first block was archived, erase the plot
158180 // TODO: Erase plot
159181 }
160182
161- drop ( plot) ;
183+ drop ( farmer_data . plot ) ;
162184
163185 Archiver :: new ( record_size as usize , recorded_history_segment_size as usize )
164186 . map_err ( PlottingError :: Archiver ) ?
@@ -248,12 +270,13 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
248270 {
249271 error ! ( "Failed to write encoded pieces: {}" , error) ;
250272 }
251- if let Err ( error) =
252- commitments. create_for_pieces ( & pieces, piece_index_offset)
273+ if let Err ( error) = farmer_data
274+ . commitments
275+ . create_for_pieces ( & pieces, piece_index_offset)
253276 {
254277 error ! ( "Failed to create commitments for pieces: {}" , error) ;
255278 }
256- if let Err ( error) = object_mappings. store ( & object_mapping) {
279+ if let Err ( error) = farmer_data . object_mappings . store ( & object_mapping) {
257280 error ! ( "Failed to store object mappings for pieces: {}" , error) ;
258281 }
259282 let segment_index = root_block. segment_index ( ) ;
@@ -295,6 +318,30 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
295318 . expect ( "Failed to send genesis block archiving message" ) ;
296319 }
297320
321+ let ( mut best_block_number_sender, mut best_block_number_receiver) = mpsc:: channel ( 1 ) ;
322+
323+ tokio:: spawn ( async move {
324+ loop {
325+ tokio:: time:: sleep ( best_block_number_check_interval) . await ;
326+
327+ // In case connection dies, we need to disconnect from the node
328+ let best_block_number_result =
329+ tokio:: time:: timeout ( BEST_BLOCK_REQUEST_TIMEOUT , client. best_block_number ( ) ) . await ;
330+
331+ let is_error = !matches ! ( best_block_number_result, Ok ( Ok ( _) ) ) ;
332+ // Result doesn't matter here
333+ let _ = best_block_number_sender
334+ . send ( best_block_number_result)
335+ . await ;
336+
337+ if is_error {
338+ break ;
339+ }
340+ }
341+ } ) ;
342+
343+ let mut last_best_block_number_error = false ;
344+
298345 // Listen for new blocks produced on the network
299346 loop {
300347 tokio:: select! {
@@ -308,10 +355,11 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
308355 let block_number = u32 :: from_str_radix( & head. number[ 2 ..] , 16 ) . unwrap( ) ;
309356 debug!( "Last block number: {:#?}" , block_number) ;
310357
311- if let Some ( block) = block_number. checked_sub( confirmation_depth_k) {
312- // We send block that should be archived over channel that doesn't have a buffer, atomic
313- // integer is used to make sure archiving process always read up to date value
314- block_to_archive. store( block, Ordering :: Relaxed ) ;
358+ if let Some ( block_number) = block_number. checked_sub( confirmation_depth_k) {
359+ // We send block that should be archived over channel that doesn't have
360+ // a buffer, atomic integer is used to make sure archiving process
361+ // always read up to date value
362+ block_to_archive. store( block_number, Ordering :: Relaxed ) ;
315363 let _ = new_block_to_archive_sender. try_send( Arc :: clone( & block_to_archive) ) ;
316364 }
317365 } ,
@@ -321,6 +369,44 @@ async fn background_plotting<T: RpcClient + Clone + Send + 'static>(
321369 }
322370 }
323371 }
372+ maybe_result = best_block_number_receiver. next( ) => {
373+ match maybe_result {
374+ Some ( Ok ( Ok ( best_block_number) ) ) => {
375+ debug!( "Best block number: {:#?}" , best_block_number) ;
376+ last_best_block_number_error = false ;
377+
378+ if let Some ( block_number) = best_block_number. checked_sub( confirmation_depth_k) {
379+ // We send block that should be archived over channel that doesn't have
380+ // a buffer, atomic integer is used to make sure archiving process
381+ // always read up to date value
382+ block_to_archive. fetch_max( block_number, Ordering :: Relaxed ) ;
383+ let _ = new_block_to_archive_sender. try_send( Arc :: clone( & block_to_archive) ) ;
384+ }
385+ }
386+ Some ( Ok ( Err ( error) ) ) => {
387+ if last_best_block_number_error {
388+ error!( "Request to get new best block failed second time: {error}" ) ;
389+ break ;
390+ } else {
391+ warn!( "Request to get new best block failed: {error}" ) ;
392+ last_best_block_number_error = true ;
393+ }
394+ }
395+ Some ( Err ( _error) ) => {
396+ if last_best_block_number_error {
397+ error!( "Request to get new best block timed out second time" ) ;
398+ break ;
399+ } else {
400+ warn!( "Request to get new best block timed out" ) ;
401+ last_best_block_number_error = true ;
402+ }
403+ }
404+ None => {
405+ debug!( "Best block number channel closed!" ) ;
406+ break ;
407+ }
408+ }
409+ }
324410 }
325411 }
326412
0 commit comments