Skip to content

Commit fab8400

Browse files
ancazamfirromac
andauthored
feat(code): Add the capability to re-run consensus for a given height (#952)
* Add assert for non empty commit certificate * Let full node store and write votes to WAL * Round state machine check for previous decision. Store the proposal round and value in the round state machine. Remove decided proposal from the consensus state, get it from state machine instead. Add flag to consensus state to indicate if Effect::Decide was sent to the app. Cleanup test app. * Fix mux typo * Cleanup * Change test to ensure the syncing node was a proposer at least once. * Process the commit certificate first * Panic in test app on multiple commits * Correct comment * Increase history for channel app * Make channel app propose different values when restart from initial height * Update comments, rename decided to decided_sent * Check step in commit timeout as after WAL replay we may not be in commit step. * Clear all full proposals when moving to the next height * Fix unused import * Small reorg * Cleanup * Add and handle ResetHeight * More consistent method names * Add `is_restart` to `StartedHeight` event * Use middleware in test app to reset height * Remove height reset from example app * Add test for resetting height * Only reset the height once in test * Reset sync tip height when restarting a height * Restore original sync inputs, add `restart` to `StartedHeight` * Cleanup * Add a release note * Add warnings to the adr, tutorial and comments * Fix spelling --------- Signed-off-by: Romain Ruetschi <[email protected]> Co-authored-by: Romain Ruetschi <[email protected]>
1 parent 936a1c3 commit fab8400

File tree

25 files changed

+456
-207
lines changed

25 files changed

+456
-207
lines changed

RELEASE_NOTES.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Release Notes
22

3+
## Unreleased
4+
5+
- Add the capability to re-run consensus for a given height ([#893](https://github.com/informalsystems/malachite/issues/893))
6+
37
## 0.1.0
48

59
*April 9th, 2025*

code/crates/app-channel/src/msgs.rs

+6
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ pub enum ConsensusMsg<Ctx: Context> {
213213

214214
/// Previousuly received value proposed by a validator
215215
ReceivedProposedValue(ProposedValue<Ctx>, ValueOrigin),
216+
217+
/// Instructs consensus to restart at a given height with the given validator set.
218+
RestartHeight(Ctx::Height, Ctx::ValidatorSet),
216219
}
217220

218221
impl<Ctx: Context> From<ConsensusMsg<Ctx>> for ConsensusActorMsg<Ctx> {
@@ -224,6 +227,9 @@ impl<Ctx: Context> From<ConsensusMsg<Ctx>> for ConsensusActorMsg<Ctx> {
224227
ConsensusMsg::ReceivedProposedValue(value, origin) => {
225228
ConsensusActorMsg::ReceivedProposedValue(value, origin)
226229
}
230+
ConsensusMsg::RestartHeight(height, validator_set) => {
231+
ConsensusActorMsg::RestartHeight(height, validator_set)
232+
}
227233
}
228234
}
229235
}

code/crates/core-consensus/src/handle/step_timeout.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@ where
1212
{
1313
warn!(
1414
height = %state.driver.height(), %round,
15-
"Consensus is halted in {:?} step", state.driver.step());
15+
"Consensus is halted in {:?} step", state.driver.step()
16+
);
1617

1718
if state.params.vote_sync_mode == VoteSyncMode::RequestResponse {
18-
warn!(
19-
height = %state.driver.height(), %round,
20-
"Requesting vote set");
19+
warn!(height = %state.driver.height(), %round, "Requesting vote set");
2120

2221
perform!(
2322
co,

code/crates/engine/src/consensus.rs

+43-8
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,16 @@ pub enum Msg<Ctx: Context> {
9999

100100
/// Received and assembled the full value proposed by a validator
101101
ReceivedProposedValue(ProposedValue<Ctx>, ValueOrigin),
102+
103+
/// Instructs consensus to restart at a given height with the given validator set.
104+
///
105+
/// On this input consensus resets the Write-Ahead Log.
106+
/// # Warning
107+
/// This operation should be used with extreme caution as it can lead to safety violations:
108+
/// 1. The application must clean all state associated with the height for which commit has failed
109+
/// 2. Since consensus resets its write-ahead log, the node may equivocate on proposals and votes
110+
/// for the restarted height, potentially violating protocol safety
111+
RestartHeight(Ctx::Height, Ctx::ValidatorSet),
102112
}
103113

104114
impl<Ctx: Context> From<NetworkEvent<Ctx>> for Msg<Ctx> {
@@ -298,12 +308,20 @@ where
298308
state: &mut State<Ctx>,
299309
msg: Msg<Ctx>,
300310
) -> Result<(), ActorProcessingErr> {
311+
let is_restart = matches!(msg, Msg::RestartHeight(_, _));
312+
301313
match msg {
302-
Msg::StartHeight(height, validator_set) => {
303-
self.tx_event.send(|| Event::StartedHeight(height));
314+
Msg::StartHeight(height, validator_set) | Msg::RestartHeight(height, validator_set) => {
315+
self.tx_event
316+
.send(|| Event::StartedHeight(height, is_restart));
304317

305-
// Fetch entries from the WAL
306-
let wal_entries = self.wal_fetch(height).await?;
318+
// Fetch entries from the WAL or reset the WAL if this is a restart
319+
let wal_entries = if is_restart {
320+
self.wal_reset(height).await?;
321+
vec![]
322+
} else {
323+
self.wal_fetch(height).await?
324+
};
307325

308326
if !wal_entries.is_empty() {
309327
// Set the phase to `Recovering` while we replay the WAL
@@ -324,8 +342,7 @@ where
324342
}
325343

326344
if !wal_entries.is_empty() {
327-
// Replay the entries from the WAL
328-
self.replay_wal(&myself, state, height, wal_entries).await;
345+
self.wal_replay(&myself, state, height, wal_entries).await;
329346
}
330347

331348
// Set the phase to `Running` now that we have replayed the WAL
@@ -336,7 +353,7 @@ where
336353

337354
// Notify the sync actor that we have started a new height
338355
if let Some(sync) = &self.sync {
339-
if let Err(e) = sync.cast(SyncMsg::StartedHeight(height)) {
356+
if let Err(e) = sync.cast(SyncMsg::StartedHeight(height, is_restart)) {
340357
error!(%height, "Error when notifying sync of started height: {e}")
341358
}
342359
}
@@ -615,6 +632,24 @@ where
615632
Ok(())
616633
}
617634

635+
async fn wal_reset(&self, height: Ctx::Height) -> Result<(), ActorProcessingErr> {
636+
let result = ractor::call!(self.wal, WalMsg::Reset, height);
637+
638+
match result {
639+
Ok(Ok(())) => {
640+
// Success
641+
}
642+
Ok(Err(e)) => {
643+
error!("Resetting the WAL failed: {e}");
644+
}
645+
Err(e) => {
646+
error!("Failed to send Reset command to WAL actor: {e}");
647+
}
648+
}
649+
650+
Ok(())
651+
}
652+
618653
async fn wal_fetch(
619654
&self,
620655
height: Ctx::Height,
@@ -643,7 +678,7 @@ where
643678
}
644679
}
645680

646-
async fn replay_wal(
681+
async fn wal_replay(
647682
&self,
648683
myself: &ActorRef<Msg<Ctx>>,
649684
state: &mut State<Ctx>,

code/crates/engine/src/network.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,14 @@ pub enum State<Ctx: Context> {
126126

127127
#[derive_where(Clone, Debug, PartialEq, Eq)]
128128
pub struct Status<Ctx: Context> {
129-
pub height: Ctx::Height,
129+
pub tip_height: Ctx::Height,
130130
pub history_min_height: Ctx::Height,
131131
}
132132

133133
impl<Ctx: Context> Status<Ctx> {
134-
pub fn new(height: Ctx::Height, history_min_height: Ctx::Height) -> Self {
134+
pub fn new(tip_height: Ctx::Height, history_min_height: Ctx::Height) -> Self {
135135
Self {
136-
height,
136+
tip_height,
137137
history_min_height,
138138
}
139139
}
@@ -272,7 +272,7 @@ where
272272
Msg::BroadcastStatus(status) => {
273273
let status = sync::Status {
274274
peer_id: ctrl_handle.peer_id(),
275-
height: status.height,
275+
tip_height: status.tip_height,
276276
history_min_height: status.history_min_height,
277277
};
278278

@@ -380,11 +380,11 @@ where
380380
return Ok(());
381381
}
382382

383-
trace!(%from, height = %status.height, "Received status");
383+
trace!(%from, tip_height = %status.tip_height, "Received status");
384384

385385
output_port.send(NetworkEvent::Status(
386386
status.peer_id,
387-
Status::new(status.height, status.history_min_height),
387+
Status::new(status.tip_height, status.history_min_height),
388388
));
389389
}
390390

code/crates/engine/src/sync.rs

+12-14
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tracing::{debug, error, info, warn, Instrument};
1313

1414
use malachitebft_codec as codec;
1515
use malachitebft_core_consensus::PeerId;
16-
use malachitebft_core_types::{CertificateError, CommitCertificate, Context, Height, Round};
16+
use malachitebft_core_types::{CertificateError, CommitCertificate, Context, Round};
1717
use malachitebft_sync::{self as sync, InboundRequestId, OutboundRequestId, Response};
1818
use malachitebft_sync::{RawDecidedValue, Request};
1919

@@ -82,8 +82,9 @@ pub enum Msg<Ctx: Context> {
8282
/// Consensus has decided on a value at the given height
8383
Decided(Ctx::Height),
8484

85-
/// Consensus has started a new height
86-
StartedHeight(Ctx::Height),
85+
/// Consensus has (re)started a new height.
86+
/// The boolean indicates whether this is a restart or not.
87+
StartedHeight(Ctx::Height, bool),
8788

8889
/// Host has a response for the blocks request
8990
GotDecidedBlock(InboundRequestId, Ctx::Height, Option<RawDecidedValue<Ctx>>),
@@ -351,7 +352,7 @@ where
351352
Msg::NetworkEvent(NetworkEvent::Status(peer_id, status)) => {
352353
let status = sync::Status {
353354
peer_id,
354-
height: status.height,
355+
tip_height: status.tip_height,
355356
history_min_height: status.history_min_height,
356357
};
357358

@@ -408,18 +409,15 @@ where
408409
// Ignore other gossip events
409410
}
410411

411-
Msg::Decided(height) => {
412-
self.process_input(&myself, state, sync::Input::UpdateHeight(height))
413-
.await?;
412+
// (Re)Started a new height
413+
Msg::StartedHeight(height, restart) => {
414+
self.process_input(&myself, state, sync::Input::StartedHeight(height, restart))
415+
.await?
414416
}
415417

416-
Msg::StartedHeight(height) => {
417-
if let Some(height) = height.decrement() {
418-
self.process_input(&myself, state, sync::Input::UpdateHeight(height))
419-
.await?;
420-
}
421-
422-
self.process_input(&myself, state, sync::Input::StartHeight(height))
418+
// Decided on a value
419+
Msg::Decided(height) => {
420+
self.process_input(&myself, state, sync::Input::Decided(height))
423421
.await?;
424422
}
425423

code/crates/engine/src/util/events.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl<Ctx: Context> Default for TxEvent<Ctx> {
4242

4343
#[derive_where(Clone, Debug)]
4444
pub enum Event<Ctx: Context> {
45-
StartedHeight(Ctx::Height),
45+
StartedHeight(Ctx::Height, bool),
4646
StartedRound(Ctx::Height, Round),
4747
Published(SignedConsensusMsg<Ctx>),
4848
ProposedValue(LocallyProposedValue<Ctx>),
@@ -60,7 +60,9 @@ pub enum Event<Ctx: Context> {
6060
impl<Ctx: Context> fmt::Display for Event<Ctx> {
6161
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
6262
match self {
63-
Event::StartedHeight(height) => write!(f, "StartedHeight(height: {height})"),
63+
Event::StartedHeight(height, restart) => {
64+
write!(f, "StartedHeight(height: {height}, restart: {restart})")
65+
}
6466
Event::StartedRound(height, round) => {
6567
write!(f, "StartedRound(height: {height}, round: {round})")
6668
}

code/crates/engine/src/wal.rs

+27
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub type WalReply<T> = RpcReplyPort<eyre::Result<T>>;
5353

5454
pub enum Msg<Ctx: Context> {
5555
StartedHeight(Ctx::Height, WalReply<Option<Vec<WalEntry<Ctx>>>>),
56+
Reset(Ctx::Height, WalReply<()>),
5657
Append(Ctx::Height, WalEntry<Ctx>, WalReply<()>),
5758
Flush(WalReply<()>),
5859
Dump,
@@ -92,6 +93,10 @@ where
9293
self.started_height(state, height, reply_to).await?;
9394
}
9495

96+
Msg::Reset(height, reply_to) => {
97+
self.reset(state, height, reply_to).await?;
98+
}
99+
95100
Msg::Append(height, entry, reply_to) => {
96101
if height != state.height {
97102
debug!("Ignoring append at height {} != {}", height, state.height);
@@ -113,6 +118,28 @@ where
113118
Ok(())
114119
}
115120

121+
async fn reset(
122+
&self,
123+
state: &mut State<Ctx>,
124+
height: Ctx::Height,
125+
reply_to: WalReply<()>,
126+
) -> Result<(), ActorProcessingErr> {
127+
let (tx, rx) = oneshot::channel();
128+
129+
state
130+
.wal_sender
131+
.send(self::thread::WalMsg::Reset(height, tx))
132+
.await?;
133+
134+
let result = rx.await?;
135+
136+
reply_to
137+
.send(result)
138+
.map_err(|e| eyre!("Failed to send reply: {e}"))?;
139+
140+
Ok(())
141+
}
142+
116143
async fn started_height(
117144
&self,
118145
state: &mut State<Ctx>,

code/crates/engine/src/wal/thread.rs

+13
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub type ReplyTo<T> = oneshot::Sender<Result<T>>;
1616

1717
pub enum WalMsg<Ctx: Context> {
1818
StartedHeight(Ctx::Height, ReplyTo<Vec<WalEntry<Ctx>>>),
19+
Reset(Ctx::Height, ReplyTo<()>),
1920
Append(WalEntry<Ctx>, ReplyTo<()>),
2021
Flush(ReplyTo<()>),
2122
Shutdown,
@@ -91,6 +92,18 @@ where
9192
}
9293
}
9394

95+
WalMsg::Reset(height, reply) => {
96+
let sequence = height.as_u64();
97+
98+
let result = log.restart(sequence).map_err(Into::into);
99+
100+
debug!(%height, "Reset WAL");
101+
102+
if reply.send(result).is_err() {
103+
error!("Failed to send WAL reset reply");
104+
}
105+
}
106+
94107
WalMsg::Append(entry, reply) => {
95108
let tpe = wal_entry_type(&entry);
96109

code/crates/starknet/host/src/codec.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ pub fn decode_sync_status(
173173

174174
Ok(sync::Status {
175175
peer_id: decode_peer_id(peer_id)?,
176-
height: Height::new(status.block_number, status.fork_id),
176+
tip_height: Height::new(status.block_number, status.fork_id),
177177
history_min_height: Height::new(status.earliest_block_number, status.earliest_fork_id),
178178
})
179179
}
@@ -183,8 +183,8 @@ pub fn encode_sync_status(
183183
) -> Result<proto::sync::Status, ProtoError> {
184184
Ok(proto::sync::Status {
185185
peer_id: Some(encode_peer_id(&status.peer_id)?),
186-
block_number: status.height.block_number,
187-
fork_id: status.height.fork_id,
186+
block_number: status.tip_height.block_number,
187+
fork_id: status.tip_height.fork_id,
188188
earliest_block_number: status.history_min_height.block_number,
189189
earliest_fork_id: status.history_min_height.fork_id,
190190
})

0 commit comments

Comments
 (0)