1+ mod manual_rpc;
2+
13use clap:: Parser ;
24use domain_test_service:: { DomainNodeBuilder , EcdsaKeyring } ;
5+ use manual_rpc:: {
6+ ConsensusControl , consensus_control_channel, manual_block_production_rpc,
7+ spawn_consensus_worker,
8+ } ;
39use sc_cli:: LoggerBuilder ;
410use sc_service:: { BasePath , Role } ;
511use sp_keyring:: Sr25519Keyring ;
@@ -9,6 +15,7 @@ use std::time::Duration;
915use subspace_test_service:: { MockConsensusNode , MockConsensusNodeRpcConfig } ;
1016use tempfile:: TempDir ;
1117use tokio:: runtime:: Builder as TokioBuilder ;
18+ use tracing:: error;
1219
1320#[ derive( Debug , Parser ) ]
1421#[ command(
@@ -78,6 +85,7 @@ fn start_consensus_node(
7885 finalize_depth : Option < u32 > ,
7986 rpc_host : IpAddr ,
8087 rpc_port : u16 ,
88+ consensus_control : ConsensusControl ,
8189) -> MockConsensusNode {
8290 let private_evm = false ;
8391 let consensus_key = Sr25519Keyring :: Alice ;
@@ -90,7 +98,13 @@ fn start_consensus_node(
9098 rpc_addr : Some ( rpc_addr) ,
9199 rpc_port : Some ( rpc_port) ,
92100 } ;
93- let mut node = MockConsensusNode :: run_with_rpc_options ( tokio_handle, consensus_key, rpc_config) ;
101+
102+ let mut node = MockConsensusNode :: run_with_rpc_builder (
103+ tokio_handle,
104+ consensus_key,
105+ rpc_config,
106+ Box :: new ( move || Ok ( manual_block_production_rpc ( consensus_control. clone ( ) ) ) ) ,
107+ ) ;
94108 node. start_network ( ) ;
95109 node
96110}
@@ -106,6 +120,8 @@ fn main() {
106120 let _enter = runtime. enter ( ) ;
107121 let tokio_handle = runtime. handle ( ) . clone ( ) ;
108122
123+ let ( consensus_control, command_rx) = consensus_control_channel ( ) ;
124+
109125 // Start consensus
110126 let consensus_base = base_path. join ( "consensus" ) ;
111127 let mut consensus = start_consensus_node (
@@ -114,6 +130,7 @@ fn main() {
114130 cli. finalize_depth ,
115131 cli. rpc_host ,
116132 cli. rpc_port ,
133+ consensus_control. clone ( ) ,
117134 ) ;
118135
119136 // Optionally start domain (EVM)
@@ -130,28 +147,35 @@ fn main() {
130147 } else {
131148 None
132149 } ;
150+
133151 consensus. start_cross_domain_gossip_message_worker ( ) ;
134152
135- if block_interval_ms > 0 {
136- // Keep domain node alive - move it into the async block but don't move consensus
153+ let worker_handle = spawn_consensus_worker ( consensus, command_rx) ;
154+
155+ let consensus_for_loop = consensus_control. clone ( ) ;
156+ runtime. block_on ( async move {
137157 let _domain_guard = domain;
138- runtime . block_on ( async {
158+ if block_interval_ms > 0 {
139159 loop {
140160 tokio:: select! {
141161 _ = tokio:: signal:: ctrl_c( ) => break ,
142162 _ = tokio:: time:: sleep( Duration :: from_millis( block_interval_ms) ) => {
143- let slot = consensus. produce_slot( ) ;
144- let _ = consensus. notify_new_slot_and_wait_for_bundle( slot) . await ;
145- let _ = consensus. produce_block_with_slot( slot) . await ;
146-
163+ if let Err ( err) = consensus_for_loop. produce_block( true ) . await {
164+ error!( %err, "Failed to auto-produce block" ) ;
165+ }
147166 }
148167 }
149168 }
150- } ) ;
151- return ;
169+ } else {
170+ let _ = tokio:: signal:: ctrl_c ( ) . await ;
171+ }
172+ } ) ;
173+
174+ if let Err ( err) = runtime. block_on ( consensus_control. shutdown ( ) ) {
175+ error ! ( %err, "Failed to shut down consensus control" ) ;
152176 }
153177
154- runtime . block_on ( async {
155- let _ = tokio :: signal :: ctrl_c ( ) . await ;
156- } ) ;
178+ worker_handle
179+ . join ( )
180+ . expect ( "Failed to join consensus control thread" ) ;
157181}
0 commit comments