Skip to content

Commit bf7f09a

Browse files
committed
feat: handling graceful block closing with separate message
1 parent c783ab8 commit bf7f09a

File tree

3 files changed

+84
-61
lines changed

3 files changed

+84
-61
lines changed

madara/crates/client/block_production/src/executor/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ pub enum ExecutorMessage {
4949
exec_ctx: BlockExecutionContext,
5050
},
5151
BatchExecuted(BatchExecutionResult),
52+
/// Normal block closing (block time reached, block full, or explicit CloseBlock).
5253
EndBlock(Box<BlockExecutionSummary>),
54+
/// Final block closing during graceful shutdown. Only sent when executor detects shutdown.
55+
EndFinalBlock(Box<BlockExecutionSummary>),
5356
}
5457

5558
#[derive(Default, Debug)]

madara/crates/client/block_production/src/executor/thread.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,18 +297,18 @@ impl ExecutorThread {
297297
// This uses the executor's current state - no re-execution needed
298298
match execution_state.executor.finalize() {
299299
Ok(block_exec_summary) => {
300-
// Send EndBlock message so main loop can close the block
300+
// Send EndFinalBlock message so main loop can close the block during shutdown
301301
if self
302302
.replies_sender
303-
.blocking_send(super::ExecutorMessage::EndBlock(Box::new(
303+
.blocking_send(super::ExecutorMessage::EndFinalBlock(Box::new(
304304
block_exec_summary,
305305
)))
306306
.is_err()
307307
{
308308
// Receiver closed - main loop already shut down
309309
// Block will remain preconfirmed and be handled on restart
310310
tracing::warn!(
311-
"Could not send EndBlock during shutdown, block will remain preconfirmed"
311+
"Could not send EndFinalBlock during shutdown, block will remain preconfirmed"
312312
);
313313
}
314314
}

madara/crates/client/block_production/src/lib.rs

Lines changed: 78 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@
5858
//! receives this message it will proceed to finalize (seal) the pending block and store it to db
5959
//! as a full block.
6060
//!
61+
//! - [`EndFinalBlock`]: this message is sent by the [`ExecutorThread`] only during graceful shutdown
62+
//! when it detects the batch channel closure. It signals the final block closing and the executor
63+
//! thread exits immediately after sending it.
64+
//!
6165
//! ## Pending Phase
6266
//!
6367
//! One important detail to note is that the [`PendingBlockState`] kept in the
@@ -76,8 +80,8 @@
7680
//! When a cancellation signal is received:
7781
//! 1. The batcher detects cancellation and exits gracefully, closing the `send_batch` channel
7882
//! 2. The executor detects the channel closure and finalizes any open block
79-
//! 3. The executor sends an `EndBlock` message and then completes
80-
//! 4. The main loop processes the `EndBlock`, closes the block, and exits when both tasks complete
83+
//! 3. The executor sends an `EndFinalBlock` message (shutdown-specific) and then completes
84+
//! 4. The main loop processes the `EndFinalBlock`, closes the block, and exits when both tasks complete
8185
//!
8286
//! ### Batcher Panic/Error
8387
//!
@@ -101,6 +105,7 @@
101105
//! [`StartNewBlock`]: ExecutorMessage::StartNewBlock
102106
//! [`BatchExecuted`]: ExecutorMessage::BatchExecuted
103107
//! [`EndBlock`]: ExecutorMessage::EndBlock
108+
//! [`EndFinalBlock`]: ExecutorMessage::EndFinalBlock
104109
//! [`ExecutorThreadHandle::send_batch`]: executor::ExecutorThreadHandle::send_batch
105110
//! [`ExecutorThread::incoming_batches`]: executor::thread::ExecutorThread::incoming_batches
106111
//! [`ExecutorThread`]: executor::thread::ExecutorThread
@@ -698,52 +703,61 @@ impl BlockProductionTask {
698703
}
699704
ExecutorMessage::EndBlock(block_exec_summary) => {
700705
tracing::debug!("Received ExecutorMessage::EndBlock");
701-
let current_state = self.current_state.take().context("No current state")?;
702-
let TaskState::Executing(state) = current_state else {
703-
anyhow::bail!("Invalid executor state transition: expected current state to be Executing")
704-
};
706+
self.close_block(block_exec_summary).await?;
707+
}
708+
ExecutorMessage::EndFinalBlock(block_exec_summary) => {
709+
tracing::debug!("Received ExecutorMessage::EndFinalBlock (shutdown)");
710+
self.close_block(block_exec_summary).await?;
711+
}
712+
}
705713

706-
tracing::debug!("Close and save block block_n={}", state.block_number);
707-
let start_time = Instant::now();
708-
709-
let n_txs = self
710-
.backend
711-
.block_view_on_preconfirmed()
712-
.context("No current pre-confirmed block")?
713-
.num_executed_transactions();
714-
715-
// Convert state_diff and close block using helper function
716-
let state_diff: mp_state_update::StateDiff = block_exec_summary.state_diff.into();
717-
Self::close_preconfirmed_block_with_state_diff(
718-
self.backend.clone(),
719-
state.block_number,
720-
state.consumed_core_contract_nonces,
721-
&block_exec_summary.bouncer_weights,
722-
state_diff,
723-
)
724-
.await
725-
.context("Closing block")?;
726-
727-
let time_to_close = start_time.elapsed();
728-
tracing::info!(
729-
"⛏️ Closed block #{} with {n_txs} transactions - {time_to_close:?}",
730-
state.block_number
731-
);
714+
Ok(())
715+
}
716+
717+
/// Close and save a block using the execution summary.
718+
/// Used for both normal block closing (EndBlock) and shutdown (EndFinalBlock).
719+
async fn close_block(&mut self, block_exec_summary: Box<BlockExecutionSummary>) -> anyhow::Result<()> {
720+
let current_state = self.current_state.take().context("No current state")?;
721+
let TaskState::Executing(state) = current_state else {
722+
anyhow::bail!("Invalid executor state transition: expected current state to be Executing")
723+
};
732724

733-
// Record metrics
734-
let attributes = [
735-
KeyValue::new("transactions_added", n_txs.to_string()),
736-
KeyValue::new("closing_time", time_to_close.as_secs_f32().to_string()),
737-
];
725+
tracing::debug!("Close and save block block_n={}", state.block_number);
726+
let start_time = Instant::now();
738727

739-
self.metrics.block_counter.add(1, &[]);
740-
self.metrics.block_gauge.record(state.block_number, &attributes);
741-
self.metrics.transaction_counter.add(n_txs as u64, &[]);
728+
let n_txs = self
729+
.backend
730+
.block_view_on_preconfirmed()
731+
.context("No current pre-confirmed block")?
732+
.num_executed_transactions();
742733

743-
self.current_state = Some(TaskState::NotExecuting { latest_block_n: Some(state.block_number) });
744-
self.send_state_notification(BlockProductionStateNotification::ClosedBlock);
745-
}
746-
}
734+
// Convert state_diff and close block using helper function
735+
let state_diff: mp_state_update::StateDiff = block_exec_summary.state_diff.into();
736+
Self::close_preconfirmed_block_with_state_diff(
737+
self.backend.clone(),
738+
state.block_number,
739+
state.consumed_core_contract_nonces,
740+
&block_exec_summary.bouncer_weights,
741+
state_diff,
742+
)
743+
.await
744+
.context("Closing block")?;
745+
746+
let time_to_close = start_time.elapsed();
747+
tracing::info!("⛏️ Closed block #{} with {n_txs} transactions - {time_to_close:?}", state.block_number);
748+
749+
// Record metrics
750+
let attributes = [
751+
KeyValue::new("transactions_added", n_txs.to_string()),
752+
KeyValue::new("closing_time", time_to_close.as_secs_f32().to_string()),
753+
];
754+
755+
self.metrics.block_counter.add(1, &[]);
756+
self.metrics.block_gauge.record(state.block_number, &attributes);
757+
self.metrics.transaction_counter.add(n_txs as u64, &[]);
758+
759+
self.current_state = Some(TaskState::NotExecuting { latest_block_n: Some(state.block_number) });
760+
self.send_state_notification(BlockProductionStateNotification::ClosedBlock);
747761

748762
Ok(())
749763
}
@@ -789,7 +803,7 @@ impl BlockProductionTask {
789803
// Track shutdown state: both batcher and executor must complete before shutdown finishes.
790804
// Both tasks only complete during shutdown scenarios (cancellation, error, or panic).
791805
let mut batcher_completed = false;
792-
let mut executor_completed = false;
806+
let mut end_final_block_received = false; // Track if EndFinalBlock has been processed (executor completed with block)
793807
let mut batcher_error: Option<anyhow::Error> = None; // Store batcher error to return after graceful shutdown
794808

795809
// Main loop: handles normal operation and graceful shutdown
@@ -813,36 +827,42 @@ impl BlockProductionTask {
813827
}
814828
}
815829

816-
// Path 2: Executor replies (EndBlock message during normal operation or shutdown)
830+
// Path 2: Executor replies (EndBlock for normal operation, EndFinalBlock for shutdown)
817831
Some(reply) = executor.replies.recv() => {
832+
let is_end_final_block = matches!(reply, ExecutorMessage::EndFinalBlock(_));
818833
self.process_reply(reply).await.context("Processing reply from executor thread")?;
834+
// Mark executor as completed only after processing EndFinalBlock
835+
if is_end_final_block {
836+
end_final_block_received = true;
837+
tracing::debug!("EndFinalBlock processed, executor completed");
838+
}
819839
}
820840

821841
// Path 3: Executor thread stopped (normal completion or panic)
842+
// This fires when executor exits. If EndFinalBlock was already processed, executor is done.
843+
// If not, executor exited without a block (no EndFinalBlock sent) - continue loop to check exit conditions.
822844
res = executor.stop.recv() => {
823-
executor_completed = true;
824845
res.context("In executor thread")?;
825-
tracing::debug!("Executor thread stopped");
826846
}
827847
}
828848

829-
// Exit conditions:
830-
// 1. Both completed → shutdown done (return error if any, otherwise success)
831-
// 2. Batcher errored without preconfirmed block → exit immediately with error
832-
if batcher_completed && executor_completed {
833-
tracing::debug!("Shutdown complete: batcher completed, executor completed");
849+
// Exit conditions (checked after each select iteration):
850+
// Shutdown is complete when batcher completed AND:
851+
// - EndFinalBlock was processed (block case), OR
852+
// - No preconfirmed block exists (executor stopped without block)
853+
if batcher_completed && (end_final_block_received || !self.backend.has_preconfirmed_block()) {
854+
tracing::debug!(
855+
"Shutdown complete: batcher completed, end_final_block_received={}, has_preconfirmed_block={}",
856+
end_final_block_received,
857+
self.backend.has_preconfirmed_block()
858+
);
834859
return batcher_error
835860
.map(|e| {
836861
tracing::warn!("Shutdown completed but batcher had error: {e:?}");
837862
Err(e)
838863
})
839864
.unwrap_or(Ok(()));
840865
}
841-
842-
if batcher_error.is_some() && !self.backend.has_preconfirmed_block() {
843-
tracing::debug!("Batcher errored without preconfirmed block, exiting");
844-
return Err(batcher_error.take().unwrap());
845-
}
846866
}
847867
}
848868
}

0 commit comments

Comments
 (0)