Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ orchestrator/build/
orchestrator/node_modules/
lib/
s3/

*.rs
*.toml
Cargo.toml
Cargo.lock
Comment on lines +17 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so prettier is for .js, .md files only?

4 changes: 4 additions & 0 deletions madara/crates/client/block_production/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub enum ExecutorCommandError {
pub enum ExecutorCommand {
/// Force close the current block.
CloseBlock(oneshot::Sender<Result<(), ExecutorCommandError>>),
/// Reset executor state after a reorg. This reinitializes the state adapter from the current database state.
ResetState(oneshot::Sender<Result<(), ExecutorCommandError>>),
}

#[derive(Debug)]
Expand All @@ -50,6 +52,8 @@ pub enum ExecutorMessage {
},
BatchExecuted(BatchExecutionResult),
EndBlock(Box<BlockExecutionSummary>),
/// Notifies the task that executor state was reset after a reorg
StateReset { latest_block_n: Option<u64> },
}

#[derive(Default, Debug)]
Expand Down
39 changes: 39 additions & 0 deletions madara/crates/client/block_production/src/executor/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,45 @@ impl ExecutorThread {
let _ = callback.send(Ok(()));
Default::default()
}
super::ExecutorCommand::ResetState(callback) => {
tracing::info!("🔄 Resetting executor state after reorg");
// Reinitialize the state adapter from the current database state
let new_state_adaptor = match mc_exec::LayeredStateAdapter::new(self.backend.clone()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just do state = initial_state(), send the message StateReset and return from callback. all resetting of values isn't needed imo.

Ok(adaptor) => adaptor,
Err(e) => {
tracing::error!("Failed to reinitialize state adapter after reorg: {:#}", e);
let _ = callback.send(Err(super::ExecutorCommandError::ChannelClosed));
return Err(e.into());
}
};

let latest_block_n = new_state_adaptor.previous_block_n();
tracing::info!("✅ State adapter reinitialized to block_n={}", new_state_adaptor.block_n());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latest_block_n and block_n() are different blocks?


// Reset the executor state to NewBlock with the fresh state adapter
state = ExecutorThreadState::NewBlock(ExecutorStateNewBlock {
state_adaptor: new_state_adaptor,
consumed_l1_to_l2_nonces: HashSet::new(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen to the nonces that were in the hasmap earlier? will those nonces get saved in the db?

});

// Clear any pending transactions by clearing both fields
to_exec.txs.clear();
to_exec.additional_info.clear();

// Reset block state flags
block_empty = true;
force_close = false;
l2_gas_consumed_block = 0;

// Notify the main task that state was reset
if self.replies_sender.blocking_send(super::ExecutorMessage::StateReset { latest_block_n }).is_err() {
tracing::error!("Failed to send StateReset message to block production task");
}

let _ = callback.send(Ok(()));
// Continue the loop to wait for new transactions - don't return a batch
continue;
}
},
// Channel closed. Exit gracefully.
WaitTxBatchOutcome::Exit => return Ok(()),
Expand Down
9 changes: 9 additions & 0 deletions madara/crates/client/block_production/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ impl BlockProductionHandle {
recv.await.map_err(|_| ExecutorCommandError::ChannelClosed)?
}

/// Reset the executor state after a reorg. This reinitializes the state adapter from the current database state.
pub async fn reset_state(&self) -> Result<(), ExecutorCommandError> {
let (sender, recv) = oneshot::channel();
self.executor_commands
.send(ExecutorCommand::ResetState(sender))
.map_err(|_| ExecutorCommandError::ChannelClosed)?;
recv.await.map_err(|_| ExecutorCommandError::ChannelClosed)?
}

/// Send a transaction through the bypass channel to bypass mempool and validation.
pub async fn send_tx_raw(&self, tx: ValidatedTransaction) -> Result<(), ExecutorCommandError> {
self.bypass_input.send(tx).await.map_err(|_| ExecutorCommandError::ChannelClosed)
Expand Down
6 changes: 6 additions & 0 deletions madara/crates/client/block_production/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pub use handle::BlockProductionHandle;
pub enum BlockProductionStateNotification {
ClosedBlock,
BatchExecuted,
StateReset,
}

#[derive(Debug)]
Expand Down Expand Up @@ -625,6 +626,11 @@ impl BlockProductionTask {
/// Handles the state machine and its transitions.
async fn process_reply(&mut self, reply: ExecutorMessage) -> anyhow::Result<()> {
match reply {
ExecutorMessage::StateReset { latest_block_n } => {
tracing::info!("🔄 Resetting block production task state after reorg to latest_block_n={:?}", latest_block_n);
self.current_state = Some(TaskState::NotExecuting { latest_block_n });
self.send_state_notification(BlockProductionStateNotification::StateReset);
}
ExecutorMessage::StartNewBlock { exec_ctx } => {
tracing::debug!("Received ExecutorMessage::StartNewBlock block_n={}", exec_ctx.block_number);
let current_state = self.current_state.take().context("No current state")?;
Expand Down
30 changes: 15 additions & 15 deletions madara/crates/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,21 +379,6 @@ impl<D: MadaraStorage> MadaraBackend<D> {
Ok(())
}

pub fn get_custom_header(&self) -> Option<CustomHeader> {
self.get_custom_header_with_clear(false)
}

pub fn get_custom_header_with_clear(&self, clear: bool) -> Option<CustomHeader> {
let mut guard = self.custom_header.lock().expect("Poisoned lock");
let result = guard.clone();

if clear {
*guard = None;
}

result
}

pub fn set_custom_header(&self, custom_header: CustomHeader) {
let mut guard = self.custom_header.lock().expect("Poisoned lock");
*guard = Some(custom_header);
Expand Down Expand Up @@ -473,6 +458,21 @@ impl<D: MadaraStorageRead> MadaraBackend<D> {
pub fn chain_config(&self) -> &Arc<ChainConfig> {
&self.chain_config
}

pub fn get_custom_header(&self) -> Option<CustomHeader> {
self.get_custom_header_with_clear(false)
}

pub fn get_custom_header_with_clear(&self, clear: bool) -> Option<CustomHeader> {
let mut guard = self.custom_header.lock().expect("Poisoned lock");
let result = guard.clone();

if clear {
*guard = None;
}

result
}
}

/// Structure holding exclusive access to write the blocks and the tip of the chain.
Expand Down
26 changes: 15 additions & 11 deletions madara/crates/client/exec/src/layered_state_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,22 @@ impl<D: MadaraStorageRead> LayeredStateAdapter<D> {
let view = backend.view_on_latest_confirmed();
let block_number = view.latest_block_n().map(|n| n + 1).unwrap_or(/* genesis */ 0);

let l1_gas_quote = backend
.get_last_l1_gas_quote()
.context("No L1 gas quote available. Ensure that the L1 gas quote is set before calculating gas prices.")?;

let gas_prices = if let Some(block) = view.block_view_on_latest_confirmed() {
let block_info = block.get_block_info()?;
let previous_strk_l2_gas_price = block_info.header.gas_prices.strk_l2_gas_price;
let previous_l2_gas_used = block_info.total_l2_gas_used;

backend.calculate_gas_prices(&l1_gas_quote, previous_strk_l2_gas_price, previous_l2_gas_used)?
let gas_prices = if let Some(custom_header) = backend.get_custom_header().filter(|h| h.block_n == block_number) {
custom_header.gas_prices
} else {
backend.calculate_gas_prices(&l1_gas_quote, 0, 0)?
let l1_gas_quote = backend
.get_last_l1_gas_quote()
.context("No L1 gas quote available. Ensure that the L1 gas quote is set before calculating gas prices.")?;

if let Some(block) = view.block_view_on_latest_confirmed() {
let block_info = block.get_block_info()?;
let previous_strk_l2_gas_price = block_info.header.gas_prices.strk_l2_gas_price;
let previous_l2_gas_used = block_info.total_l2_gas_used;

backend.calculate_gas_prices(&l1_gas_quote, previous_strk_l2_gas_price, previous_l2_gas_used)?
} else {
backend.calculate_gas_prices(&l1_gas_quote, 0, 0)?
}
};

Ok(Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use mp_rpc::v0_9_0::{
ClassAndTxnHash, ContractAndTxnHash,
};
use mp_transactions::{L1HandlerTransactionResult, L1HandlerTransactionWithFee};

use mp_utils::service::MadaraServiceId;
#[async_trait]
impl MadaraWriteRpcApiV0_1_0Server for Starknet {
/// Submit a new class v0 declaration transaction, bypassing mempool and all validation.
Expand Down Expand Up @@ -107,6 +107,21 @@ impl MadaraWriteRpcApiV0_1_0Server for Starknet {
.map_err(StarknetRpcApiError::from)?;
let backend_chain_tip = mc_db::ChainTip::from_storage(fresh_chain_tip);
self.backend.chain_tip.send_replace(backend_chain_tip);

// Reset block production executor state if block production is enabled
let bp_status = self.ctx.service_status(MadaraServiceId::BlockProduction);
if matches!(bp_status, mp_utils::service::MadaraServiceStatus::On) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to check if service is on?

if let Some(block_prod_handle) = &self.block_prod_handle {
tracing::debug!("revertTo: resetting block production state");
block_prod_handle
.reset_state()
.await
.context("Resetting block production executor state after reorg")
.map_err(StarknetRpcApiError::from)?;
tracing::debug!("revertTo: block production state reset complete");
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is business logic, should not live in the rpc crate

Ok(())
}

Expand Down
Loading