diff --git a/madara/crates/client/block_production/src/executor/mod.rs b/madara/crates/client/block_production/src/executor/mod.rs index 3164b80b6..0dabbbdf8 100644 --- a/madara/crates/client/block_production/src/executor/mod.rs +++ b/madara/crates/client/block_production/src/executor/mod.rs @@ -49,7 +49,10 @@ pub enum ExecutorMessage { exec_ctx: BlockExecutionContext, }, BatchExecuted(BatchExecutionResult), + /// Normal block closing (block time reached, block full, or explicit CloseBlock). EndBlock(Box), + /// Final block closing during graceful shutdown. Only sent when executor detects shutdown. + EndFinalBlock(Box), } #[derive(Default, Debug)] diff --git a/madara/crates/client/block_production/src/executor/thread.rs b/madara/crates/client/block_production/src/executor/thread.rs index ef834721c..ff3d562dd 100644 --- a/madara/crates/client/block_production/src/executor/thread.rs +++ b/madara/crates/client/block_production/src/executor/thread.rs @@ -283,7 +283,53 @@ impl ExecutorThread { } }, // Channel closed. Exit gracefully. - WaitTxBatchOutcome::Exit => return Ok(()), + // Before exiting, check if we have an executing block that needs to be closed. + // This ensures graceful shutdown closes the block using the executor's existing state. + WaitTxBatchOutcome::Exit => { + match state { + ExecutorThreadState::Executing(mut execution_state) => { + tracing::debug!( + "Shutting down executor, closing block block_n={}", + execution_state.exec_ctx.block_number + ); + + // Finalize the block to get execution summary + // This uses the executor's current state - no re-execution needed + match execution_state.executor.finalize() { + Ok(block_exec_summary) => { + // Send EndFinalBlock message so main loop can close the block during shutdown + if self + .replies_sender + .blocking_send(super::ExecutorMessage::EndFinalBlock(Box::new( + block_exec_summary, + ))) + .is_err() + { + // Receiver closed - main loop already shut down + // Block will remain preconfirmed and be handled on restart + tracing::warn!( + "Could not send EndFinalBlock during shutdown, block will remain preconfirmed" + ); + } + } + Err(e) => { + // Finalization failed - log error but continue shutdown + // Block will remain preconfirmed and be handled on restart + tracing::warn!( + "Failed to finalize block during shutdown: {:?}. Block will remain preconfirmed", + e + ); + } + } + } + ExecutorThreadState::NewBlock(_) => { + // No block to close, just exit + tracing::debug!("Shutting down executor, no block to close"); + } + } + + return Ok(()); + } }; for (tx, additional_info) in taken { diff --git a/madara/crates/client/block_production/src/lib.rs b/madara/crates/client/block_production/src/lib.rs index 686b58c18..3e8ebb7ae 100644 --- a/madara/crates/client/block_production/src/lib.rs +++ b/madara/crates/client/block_production/src/lib.rs @@ -58,6 +58,10 @@ //! receives this message it will proceed to finalize (seal) the pending block and store it to db //! as a full block. //! +//! - [`EndFinalBlock`]: this message is sent by the [`ExecutorThread`] only during graceful shutdown +//! when it detects the batch channel closure. It signals the final block closing and the executor +//! thread exits immediately after sending it. +//! //! ## Pending Phase //! //! One important detail to note is that the [`PendingBlockState`] kept in the @@ -65,10 +69,43 @@ //! interval defined by the `pending tick` as set in the chain config. //! (TODO(mohit 13/10/2025): update this when 0.14.0 merges) //! +//! ## Graceful Shutdown and Error Handling +//! +//! The [`BlockProductionTask::run`] method implements graceful shutdown and error handling for +//! batcher and executor tasks. The main loop tracks completion of both tasks, which only complete +//! during shutdown scenarios (cancellation, error, or panic). +//! +//! ### Graceful Shutdown +//! +//! When a cancellation signal is received: +//! 1. The batcher detects cancellation and exits gracefully, closing the `send_batch` channel +//! 2. The executor detects the channel closure and finalizes any open block +//! 3. The executor sends an `EndFinalBlock` message (shutdown-specific) and then completes +//! 4. The main loop processes the `EndFinalBlock`, closes the block, and exits when both tasks complete +//! +//! ### Batcher Panic/Error +//! +//! If the batcher encounters an error or panics: +//! - **With preconfirmed block**: The error is saved and graceful shutdown is attempted. The batcher +//! closes the channel, executor closes the block, and shutdown completes with the saved error. +//! - **Without preconfirmed block**: The error is returned immediately (no need to wait for executor). +//! +//! ### Executor Panic +//! +//! If the executor thread panics: +//! - The panic is caught and propagated via the `stop` channel +//! - The main loop resumes the panic, causing the block to remain preconfirmed +//! - The preconfirmed block will be handled on restart +//! +//! The loop exits when: +//! - Both batcher and executor have completed → returns `Ok(())` or the saved batcher error +//! - Batcher errored without a preconfirmed block → returns the error immediately +//! //! [mempool]: mc_mempool //! [`StartNewBlock`]: ExecutorMessage::StartNewBlock //! [`BatchExecuted`]: ExecutorMessage::BatchExecuted //! [`EndBlock`]: ExecutorMessage::EndBlock +//! [`EndFinalBlock`]: ExecutorMessage::EndFinalBlock //! [`ExecutorThreadHandle::send_batch`]: executor::ExecutorThreadHandle::send_batch //! [`ExecutorThread::incoming_batches`]: executor::thread::ExecutorThread::incoming_batches //! [`ExecutorThread`]: executor::thread::ExecutorThread @@ -666,52 +703,61 @@ impl BlockProductionTask { } ExecutorMessage::EndBlock(block_exec_summary) => { tracing::debug!("Received ExecutorMessage::EndBlock"); - let current_state = self.current_state.take().context("No current state")?; - let TaskState::Executing(state) = current_state else { - anyhow::bail!("Invalid executor state transition: expected current state to be Executing") - }; + self.close_block(block_exec_summary).await?; + } + ExecutorMessage::EndFinalBlock(block_exec_summary) => { + tracing::debug!("Received ExecutorMessage::EndFinalBlock (shutdown)"); + self.close_block(block_exec_summary).await?; + } + } - tracing::debug!("Close and save block block_n={}", state.block_number); - let start_time = Instant::now(); - - let n_txs = self - .backend - .block_view_on_preconfirmed() - .context("No current pre-confirmed block")? - .num_executed_transactions(); - - // Convert state_diff and close block using helper function - let state_diff: mp_state_update::StateDiff = block_exec_summary.state_diff.into(); - Self::close_preconfirmed_block_with_state_diff( - self.backend.clone(), - state.block_number, - state.consumed_core_contract_nonces, - &block_exec_summary.bouncer_weights, - state_diff, - ) - .await - .context("Closing block")?; - - let time_to_close = start_time.elapsed(); - tracing::info!( - "⛏️ Closed block #{} with {n_txs} transactions - {time_to_close:?}", - state.block_number - ); + Ok(()) + } - // Record metrics - let attributes = [ - KeyValue::new("transactions_added", n_txs.to_string()), - KeyValue::new("closing_time", time_to_close.as_secs_f32().to_string()), - ]; + /// Close and save a block using the execution summary. + /// Used for both normal block closing (EndBlock) and shutdown (EndFinalBlock). + async fn close_block(&mut self, block_exec_summary: Box) -> anyhow::Result<()> { + let current_state = self.current_state.take().context("No current state")?; + let TaskState::Executing(state) = current_state else { + anyhow::bail!("Invalid executor state transition: expected current state to be Executing") + }; - self.metrics.block_counter.add(1, &[]); - self.metrics.block_gauge.record(state.block_number, &attributes); - self.metrics.transaction_counter.add(n_txs as u64, &[]); + tracing::debug!("Close and save block block_n={}", state.block_number); + let start_time = Instant::now(); - self.current_state = Some(TaskState::NotExecuting { latest_block_n: Some(state.block_number) }); - self.send_state_notification(BlockProductionStateNotification::ClosedBlock); - } - } + let n_txs = self + .backend + .block_view_on_preconfirmed() + .context("No current pre-confirmed block")? + .num_executed_transactions(); + + // Convert state_diff and close block using helper function + let state_diff: mp_state_update::StateDiff = block_exec_summary.state_diff.into(); + Self::close_preconfirmed_block_with_state_diff( + self.backend.clone(), + state.block_number, + state.consumed_core_contract_nonces, + &block_exec_summary.bouncer_weights, + state_diff, + ) + .await + .context("Closing block")?; + + let time_to_close = start_time.elapsed(); + tracing::info!("⛏️ Closed block #{} with {n_txs} transactions - {time_to_close:?}", state.block_number); + + // Record metrics + let attributes = [ + KeyValue::new("transactions_added", n_txs.to_string()), + KeyValue::new("closing_time", time_to_close.as_secs_f32().to_string()), + ]; + + self.metrics.block_counter.add(1, &[]); + self.metrics.block_gauge.record(state.block_number, &attributes); + self.metrics.transaction_counter.add(n_txs as u64, &[]); + + self.current_state = Some(TaskState::NotExecuting { latest_block_n: Some(state.block_number) }); + self.send_state_notification(BlockProductionStateNotification::ClosedBlock); Ok(()) } @@ -741,6 +787,7 @@ impl BlockProductionTask { // Batcher task is handled in a separate tokio task. let batch_sender = executor.send_batch.take().context("Channel sender already taken")?; let bypass_tx_input = self.bypass_tx_input.take().context("Bypass tx channel already taken")?; + // Clone ctx to check for cancellation in the main loop let mut batcher_task = AbortOnDrop::spawn( Batcher::new( self.backend.clone(), @@ -753,33 +800,68 @@ impl BlockProductionTask { .run(), ); - // Graceful shutdown: when the service is asked to stop, the `batcher_task` will stop, - // which will close the `send_batch` channel (by dropping it). The executor thread then will see that the channel - // is closed next time it tries to receive from it. The executor thread shuts down, dropping the `executor.stop` channel, - // therefore closing it as well. - // We will then see the anyhow::Ok(()) result in the stop channel, as per the implementation of [`StopErrorReceiver::recv`]. - // Note that for this to work, we need to make sure the `send_batch` channel is never aliased - - // otherwise it will never not be closed automatically. - // - // TODO(mohit 18/11/2025): Handle closing preconfirmed block on graceful shutdown. - // When shutting down gracefully, if there's an open preconfirmed block, we should close it using the executor's - // current state (by sending CloseBlock command and processing EndBlock message) rather than re-executing. - // This avoids unnecessary re-execution since we already have the executor running with the current state. + // Track shutdown state: both batcher and executor must complete before shutdown finishes. + // Both tasks only complete during shutdown scenarios (cancellation, error, or panic). + let mut batcher_completed = false; + let mut end_final_block_received = false; // Track if EndFinalBlock has been processed (executor completed with block) + let mut batcher_error: Option = None; // Store batcher error to return after graceful shutdown + // Main loop: handles normal operation and graceful shutdown loop { tokio::select! { + // Path 1: Batcher task completed (cancellation, error, or channel closure) + res = &mut batcher_task, if !batcher_completed => { + batcher_completed = true; + match res { + Ok(()) => tracing::debug!("Batcher task completed normally"), + Err(e) => { + let error = e.context("In batcher task"); + tracing::warn!("Batcher task errored: {error:?}"); + if self.backend.has_preconfirmed_block() { + batcher_error = Some(error); + tracing::warn!("Batcher errored with preconfirmed block, attempting graceful shutdown"); + } else { + batcher_error = Some(error); + } + } + } + } - // Bubble up errors from the batcher task. (tokio JoinHandle) - res = &mut batcher_task => return res.context("In batcher task"), - - // Process results from the execution + // Path 2: Executor replies (EndBlock for normal operation, EndFinalBlock for shutdown) Some(reply) = executor.replies.recv() => { + let is_end_final_block = matches!(reply, ExecutorMessage::EndFinalBlock(_)); self.process_reply(reply).await.context("Processing reply from executor thread")?; + // Mark executor as completed only after processing EndFinalBlock + if is_end_final_block { + end_final_block_received = true; + tracing::debug!("EndFinalBlock processed, executor completed"); + } + } + + // Path 3: Executor thread stopped (normal completion or panic) + // This fires when executor exits. If EndFinalBlock was already processed, executor is done. + // If not, executor exited without a block (no EndFinalBlock sent) - continue loop to check exit conditions. + res = executor.stop.recv() => { + res.context("In executor thread")?; } + } - // Bubble up errors from the executor thread, or graceful shutdown. - // We do this after processing all the replies to ensure we don't lose some of the state by accident. - res = executor.stop.recv() => return res.context("In executor thread"), + // Exit conditions (checked after each select iteration): + // Shutdown is complete when batcher completed AND: + // - EndFinalBlock was processed (block case), OR + // - No preconfirmed block exists (executor stopped without block) + if batcher_completed && (end_final_block_received || !self.backend.has_preconfirmed_block()) { + tracing::debug!( + "Shutdown complete: batcher completed, end_final_block_received={}, has_preconfirmed_block={}", + end_final_block_received, + self.backend.has_preconfirmed_block() + ); + return batcher_error + .map(|e| { + tracing::warn!("Shutdown completed but batcher had error: {e:?}"); + Err(e) + }) + .unwrap_or(Ok(())); } } } @@ -1631,4 +1713,115 @@ pub(crate) mod tests { } ); } + + // This test verifies that graceful shutdown properly closes any open preconfirmed block + // without requiring re-execution. When shutdown is triggered, the block production service + // should close the preconfirmed block using the executor's existing state. + #[rstest::rstest] + #[timeout(Duration::from_secs(30))] + #[tokio::test] + async fn test_graceful_shutdown_closes_preconfirmed_block( + #[future] + #[with(Duration::from_secs(100), false)] + devnet_setup: DevnetSetup, + ) { + let mut devnet_setup = devnet_setup.await; + + // Step 1: Set up block production with transactions + assert!(devnet_setup.mempool.is_empty().await); + + // Add a transaction to the mempool + sign_and_add_invoke_tx( + &devnet_setup.contracts.0[0], + &devnet_setup.contracts.0[1], + &devnet_setup.backend, + &devnet_setup.tx_validator, + Felt::ZERO, + ) + .await; + + assert!(!devnet_setup.mempool.is_empty().await); + + // Step 2: Start block production and execute a batch to create a preconfirmed block + let mut block_production_task = devnet_setup.block_prod_task(); + let mut notifications = block_production_task.subscribe_state_notifications(); + let ctx = ServiceContext::new_for_testing(); + let ctx_clone = ctx.clone(); + + let task = AbortOnDrop::spawn(async move { block_production_task.run(ctx).await }); + + // Wait for batch to be executed (transactions added to preconfirmed block) + assert_eq!(notifications.recv().await.unwrap(), BlockProductionStateNotification::BatchExecuted); + + // Verify preconfirmed block exists with transactions + assert!(devnet_setup.backend.has_preconfirmed_block()); + let preconfirmed_view = devnet_setup.backend.block_view_on_preconfirmed().unwrap(); + assert_eq!(preconfirmed_view.num_executed_transactions(), 1); + + // Step 3: Trigger graceful shutdown by cancelling ServiceContext + ctx_clone.cancel_global(); + + // Step 4: Wait for shutdown to complete + // All database writes and chain tip updates complete synchronously within the awaited rayon task, + // so by the time task.await completes, the state is already updated. No delay needed. + task.await.unwrap(); + + // Step 5: Verify the preconfirmed block is closed and saved to database + assert!(!devnet_setup.backend.has_preconfirmed_block(), "Preconfirmed block should be closed"); + + // Verify block was properly closed (check latest confirmed block number) + let latest_block_n = devnet_setup.backend.latest_confirmed_block_n(); + assert!(latest_block_n.is_some(), "Block should be closed and saved"); + let block_number = latest_block_n.unwrap(); + + // Verify transactions are preserved correctly + let closed_block = devnet_setup.backend.block_view_on_confirmed(block_number).unwrap(); + let executed_transactions = closed_block.get_executed_transactions(..).unwrap(); + assert_eq!(executed_transactions.len(), 1, "Transaction should be preserved in closed block"); + + // Verify mempool is empty (transaction was consumed) + assert!(devnet_setup.mempool.is_empty().await); + } + + // This test verifies that graceful shutdown completes successfully when there is no + // preconfirmed block to close. The shutdown should complete without errors. + #[rstest::rstest] + #[timeout(Duration::from_secs(30))] + #[tokio::test] + async fn test_graceful_shutdown_with_no_preconfirmed_block( + #[future] + #[with(Duration::from_secs(100), false)] + devnet_setup: DevnetSetup, + ) { + let mut devnet_setup = devnet_setup.await; + + // Step 1: Start block production without adding any transactions + // This ensures no preconfirmed block is created + assert!(devnet_setup.mempool.is_empty().await); + assert!(!devnet_setup.backend.has_preconfirmed_block()); + + let block_production_task = devnet_setup.block_prod_task(); + let ctx = ServiceContext::new_for_testing(); + let ctx_clone = ctx.clone(); + + let task = AbortOnDrop::spawn(async move { block_production_task.run(ctx).await }); + + // Step 2: Give a small delay to ensure block production task is running + tokio::time::sleep(Duration::from_millis(100)).await; + + // Step 3: Verify no preconfirmed block exists + assert!(!devnet_setup.backend.has_preconfirmed_block()); + + // Step 4: Trigger graceful shutdown immediately + ctx_clone.cancel_global(); + + // Step 5: Wait for shutdown to complete - should complete without errors + // Since there's no preconfirmed block, shutdown should complete immediately + // without waiting for EndBlock + task.await.unwrap(); + + // Step 6: Verify shutdown completed successfully + // No preconfirmed block should exist (still) + assert!(!devnet_setup.backend.has_preconfirmed_block()); + } }