Skip to content

Commit 3c12554

Browse files
ancazamfirromacNenad Milosevic
authored
fix(code): Handle multiple commits for same height (#921)
* Add assert for non empty commit certificate * Let full node store and write votes to WAL * Round state machine check for previous decision. Store the proposal round and value in the round state machine. Remove decided proposal from the consensus state, get it from state machine instead. Add flag to consensus state to indicate if Effect::Decide was sent to the app. Cleanup test app. * Fix mux typo * Cleanup * Change test to ensure the syncing node was a proposer at least once. * Process the commit certificate first * Panic in test app on multiple commits * Correct comment * Increase history for channel app * Make channel app propose different values when restart from initial height * Update comments, rename decided to decided_sent * Check step in commit timeout as after WAL replay we may not be in commit step. * Clear all full proposals when moving to the next height * Fix unused import * Small reorg * Cleanup * Review comments * Assert certificate is correct Co-authored-by: Nenad Milosevic <[email protected]> * Review comments --------- Signed-off-by: Romain Ruetschi <[email protected]> Co-authored-by: Romain Ruetschi <[email protected]> Co-authored-by: Nenad Milosevic <[email protected]>
1 parent f7963ff commit 3c12554

File tree

23 files changed

+243
-230
lines changed

23 files changed

+243
-230
lines changed

code/crates/core-consensus/src/error.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,17 @@ where
2323
#[error("Proposer not found at height {0} and round {1}")]
2424
ProposerNotFound(Ctx::Height, Round),
2525

26-
/// Decided value not found after commit timeout.
27-
#[error("Decided value not found after commit timeout")]
28-
DecidedValueNotFound(Ctx::Height, Round),
26+
/// State machine has no decision in commit step.
27+
#[error("State machine has no decision in commit step")]
28+
DecisionNotFound(Ctx::Height, Round),
29+
30+
/// Driver proposal not found in commit step.
31+
#[error("Driver proposal not found in commit step")]
32+
DriverProposalNotFound(Ctx::Height, Round),
33+
34+
/// Full proposal not found in commit step.
35+
#[error("Full proposal not found in commit step")]
36+
FullProposalNotFound(Ctx::Height, Round),
2937

3038
/// The driver failed to process an input.
3139
#[error("Driver failed to process input, reason: {0}")]

code/crates/core-consensus/src/full_proposal.rs

+7-12
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use std::collections::BTreeMap;
22

33
use derive_where::derive_where;
4-
use tracing::debug;
54

6-
use malachitebft_core_types::{Context, Height, Proposal, Round, SignedProposal, Validity, Value};
5+
use malachitebft_core_types::{Context, Proposal, Round, SignedProposal, Validity, Value};
76

87
use crate::ProposedValue;
98

@@ -64,7 +63,7 @@ impl<Ctx: Context> Default for Entry<Ctx> {
6463
}
6564
}
6665

67-
/// Keeper for collecting proposed values and consensus proposal messages for a given height and round.
66+
/// Keeper for collecting proposed values and consensus proposals for a given height and round.
6867
///
6968
/// When a new_value is received from the value builder the following entry is stored:
7069
/// `Entry::ValueOnly(new_value.value, new_value.validity)`
@@ -77,7 +76,7 @@ impl<Ctx: Context> Default for Entry<Ctx> {
7776
///
7877
/// It is possible that a proposer sends two (builder_value, proposal) pairs for same `(height, round)`.
7978
/// In this case both are stored, and we consider that the proposer is equivocating.
80-
/// Currently, the actual equivocation is caught deeper in the consensus crate, through consensus actor
79+
/// Currently, the actual equivocation is caught in the driver, through consensus actor
8180
/// propagating both proposals.
8281
///
8382
/// When a new_proposal is received at most one complete proposal can be created. If a value at
@@ -88,9 +87,8 @@ impl<Ctx: Context> Default for Entry<Ctx> {
8887
/// at higher round with pol_round equal to the value round (L28). Therefore when a value is added
8988
/// multiple complete proposals may form.
9089
///
91-
/// Note: In the future when we support implicit proposal message:
92-
/// - [`FullProposalKeeper::store_proposal()`] will never be called
93-
/// - [`FullProposalKeeper::full_proposal_at_round_and_value()`] should only check the presence of `builder_value`
90+
/// Note: For `parts_only` mode there is no explicit proposal wire message, instead
91+
/// one is synthesized by the caller (`on_proposed_value` handler) before it invokes the `store_proposal` method.
9492
#[derive_where(Clone, Debug, Default)]
9593
pub struct FullProposalKeeper<Ctx: Context> {
9694
keeper: BTreeMap<(Ctx::Height, Round), Vec<Entry<Ctx>>>,
@@ -352,10 +350,7 @@ impl<Ctx: Context> FullProposalKeeper<Ctx> {
352350
}
353351
}
354352

355-
pub fn remove_full_proposals(&mut self, last_height: Ctx::Height) {
356-
// Keep last two decided heights
357-
debug!(%last_height, "Removing proposals, keep the last two");
358-
self.keeper
359-
.retain(|(height, _), _| height.increment() >= last_height);
353+
pub fn clear(&mut self) {
354+
self.keeper.clear();
360355
}
361356
}

code/crates/core-consensus/src/handle/decide.rs

+71-58
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,76 @@
1-
use crate::prelude::*;
1+
use crate::{handle::signature::verify_certificate, prelude::*};
22

33
#[cfg_attr(not(feature = "metrics"), allow(unused_variables))]
4-
pub async fn decide<Ctx>(
4+
pub async fn try_decide<Ctx>(
55
co: &Co<Ctx>,
66
state: &mut State<Ctx>,
77
metrics: &Metrics,
8-
consensus_round: Round,
9-
proposal: SignedProposal<Ctx>,
108
) -> Result<(), Error<Ctx>>
119
where
1210
Ctx: Context,
1311
{
14-
let height = proposal.height();
15-
let proposal_round = proposal.round();
16-
let value = proposal.value();
12+
if !state.driver.step_is_commit() {
13+
return Ok(());
14+
}
15+
16+
let height = state.driver.height();
17+
let consensus_round = state.driver.round();
18+
19+
let Some((proposal_round, decided_value)) = state.decided_value() else {
20+
return Err(Error::DecisionNotFound(height, consensus_round));
21+
};
22+
23+
let decided_id = decided_value.id();
24+
25+
// Look for an existing certificate
26+
let (certificate, extensions) = state
27+
.driver
28+
.commit_certificate(proposal_round, decided_id.clone())
29+
.cloned()
30+
.map(|certificate| (certificate, VoteExtensions::default()))
31+
.unwrap_or_else(|| {
32+
// Restore the commits. Note that they will be removed from `state`
33+
let mut commits = state.restore_precommits(height, proposal_round, &decided_value);
1734

18-
// We only decide proposals for the current height
19-
assert_eq!(height, state.driver.height());
35+
let extensions = extract_vote_extensions(&mut commits);
2036

21-
// Clean proposals and values
22-
state.remove_full_proposals(height);
37+
let certificate =
38+
CommitCertificate::new(height, proposal_round, decided_id.clone(), commits);
39+
40+
(certificate, extensions)
41+
});
42+
43+
let Some((proposal, _)) = state.driver.proposal_and_validity_for_round(proposal_round) else {
44+
return Err(Error::DriverProposalNotFound(height, proposal_round));
45+
};
46+
47+
let Some(full_proposal) =
48+
state.full_proposal_at_round_and_value(&height, proposal_round, &decided_value)
49+
else {
50+
return Err(Error::FullProposalNotFound(height, proposal_round));
51+
};
52+
53+
if proposal.value().id() != decided_id {
54+
info!(
55+
"Decide: driver proposal value id {} does not match the decided value id {}, this may happen if consensus and value sync run in parallel",
56+
proposal.value().id(),
57+
decided_id
58+
);
59+
}
60+
61+
assert_eq!(full_proposal.builder_value.id(), decided_id);
62+
assert_eq!(full_proposal.proposal.value().id(), decided_id);
63+
assert_eq!(full_proposal.validity, Validity::Valid);
64+
65+
// The certificate must be valid if state is Commit
66+
assert!(verify_certificate(
67+
co,
68+
certificate.clone(),
69+
state.driver.validator_set().clone(),
70+
state.params.threshold_params,
71+
)
72+
.await
73+
.is_ok());
2374

2475
// Update metrics
2576
#[cfg(feature = "metrics")]
@@ -48,32 +99,19 @@ where
4899
}
49100
}
50101

51-
// Look for an existing certificate
52-
let (certificate, extensions) = state
53-
.driver
54-
.commit_certificate(proposal_round, value.id())
55-
.cloned()
56-
.map(|certificate| (certificate, VoteExtensions::default()))
57-
.unwrap_or_else(|| {
58-
// Restore the commits. Note that they will be removed from `state`
59-
let mut commits = state.restore_precommits(height, proposal_round, value);
60-
61-
let extensions = extract_vote_extensions(&mut commits);
62-
63-
// TODO: Should we verify we have 2/3rd commits?
64-
let certificate = CommitCertificate::new(height, proposal_round, value.id(), commits);
65-
66-
(certificate, extensions)
67-
});
68-
69102
perform!(
70103
co,
71-
Effect::Decide(certificate, extensions, Default::default())
104+
Effect::CancelTimeout(Timeout::commit(state.driver.round()), Default::default())
72105
);
73106

74-
// Reinitialize to remove any previous round or equivocating precommits.
75-
// TODO: Revise when evidence module is added.
76-
state.signed_precommits.clear();
107+
if !state.decided_sent {
108+
state.decided_sent = true;
109+
110+
perform!(
111+
co,
112+
Effect::Decide(certificate, extensions, Default::default())
113+
);
114+
}
77115

78116
Ok(())
79117
}
@@ -92,28 +130,3 @@ pub fn extract_vote_extensions<Ctx: Context>(votes: &mut [SignedVote<Ctx>]) -> V
92130

93131
VoteExtensions::new(extensions)
94132
}
95-
96-
/// Decide on the current proposal without waiting for Commit timeout.
97-
pub async fn decide_current_no_timeout<Ctx>(
98-
co: &Co<Ctx>,
99-
state: &mut State<Ctx>,
100-
metrics: &Metrics,
101-
) -> Result<(), Error<Ctx>>
102-
where
103-
Ctx: Context,
104-
{
105-
let height = state.driver.height();
106-
let round = state.driver.round();
107-
108-
perform!(
109-
co,
110-
Effect::CancelTimeout(Timeout::commit(round), Default::default())
111-
);
112-
113-
let proposal = state
114-
.decision
115-
.remove(&(height, round))
116-
.ok_or_else(|| Error::DecidedValueNotFound(height, round))?;
117-
118-
decide(co, state, metrics, round, proposal).await
119-
}

code/crates/core-consensus/src/handle/driver.rs

-3
Original file line numberDiff line numberDiff line change
@@ -305,9 +305,6 @@ where
305305
"Decided",
306306
);
307307

308-
// Store value decided on for retrieval when timeout commit elapses
309-
state.store_decision(state.driver.height(), consensus_round, proposal.clone());
310-
311308
perform!(
312309
co,
313310
Effect::ScheduleTimeout(Timeout::commit(consensus_round), Default::default())

code/crates/core-consensus/src/handle/proposed_value.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::prelude::*;
33
use crate::handle::driver::apply_driver_input;
44
use crate::types::ProposedValue;
55

6-
use super::decide::decide_current_no_timeout;
6+
use super::decide::try_decide;
77
use super::signature::sign_proposal;
88

99
pub async fn on_proposed_value<Ctx>(
@@ -71,8 +71,10 @@ where
7171
.await?;
7272
}
7373

74-
if origin == ValueOrigin::Sync && state.driver.step_is_commit() {
75-
decide_current_no_timeout(co, state, metrics).await?;
74+
if origin == ValueOrigin::Sync {
75+
// The proposed value was provided by Sync, try to decide immediately, without waiting for the Commit timeout.
76+
// `try_decide` will check that we are in the commit step after applying the proposed value to the state machine.
77+
try_decide(co, state, metrics).await?;
7678
}
7779

7880
Ok(())

code/crates/core-consensus/src/handle/rebroadcast_timeout.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ where
1818

1919
let (maybe_vote, timeout) = match timeout.kind {
2020
TimeoutKind::PrevoteRebroadcast => (
21-
state.last_prevote.as_ref(),
21+
state.last_signed_prevote.as_ref(),
2222
Timeout::prevote_rebroadcast(round),
2323
),
2424
TimeoutKind::PrecommitRebroadcast => (
25-
state.last_precommit.as_ref(),
25+
state.last_signed_precommit.as_ref(),
2626
Timeout::precommit_rebroadcast(round),
2727
),
2828
_ => return Ok(()),

code/crates/core-consensus/src/handle/start_height.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ where
1919
#[cfg(feature = "metrics")]
2020
metrics.step_end(state.driver.step());
2121

22-
state.driver.move_to_height(height, validator_set);
22+
state.reset_and_start_height(height, validator_set);
2323

24-
debug_assert_eq!(state.driver.height(), height);
25-
debug_assert_eq!(state.driver.round(), Round::Nil);
24+
debug_assert_eq!(state.height(), height);
25+
debug_assert_eq!(state.round(), Round::Nil);
2626

2727
start_height(co, state, metrics, height).await
2828
}

code/crates/core-consensus/src/handle/sync.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::handle::decide::decide_current_no_timeout;
1+
use crate::handle::decide::try_decide;
22
use crate::handle::driver::apply_driver_input;
33
use crate::handle::signature::verify_certificate;
44
use crate::handle::validator_set::get_validator_set;
@@ -42,9 +42,9 @@ where
4242
)
4343
.await?;
4444

45-
if state.driver.step_is_commit() {
46-
decide_current_no_timeout(co, state, metrics).await?;
47-
}
45+
// The CommitCertificate is provided by Value Sync, try to decide immediately, without waiting for the Commit timeout.
46+
// `try_decide` will check that we are in the commit step after applying the certificate to the state machine.
47+
try_decide(co, state, metrics).await?;
4848

4949
Ok(())
5050
}

code/crates/core-consensus/src/handle/timeout.rs

+7-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::handle::decide::decide;
1+
use crate::handle::decide::try_decide;
22
use crate::handle::driver::apply_driver_input;
33
use crate::handle::rebroadcast_timeout::on_rebroadcast_timeout;
44
use crate::handle::step_timeout::on_step_limit_timeout;
@@ -53,14 +53,12 @@ where
5353
TimeoutKind::PrevoteTimeLimit | TimeoutKind::PrecommitTimeLimit => {
5454
on_step_limit_timeout(co, state, metrics, timeout.round).await
5555
}
56-
TimeoutKind::Commit => {
57-
let proposal = state
58-
.decision
59-
.remove(&(height, round))
60-
.ok_or_else(|| Error::DecidedValueNotFound(height, round))?;
61-
62-
decide(co, state, metrics, round, proposal).await
63-
}
56+
// Decide if the timeout is a commit timeout and the step is commit.
57+
// `try_decide` will check that we are in the commit step. This is necessary because the timeout can be triggered
58+
// by WAL replay before the step is `Commit`, e.g. when the node hasn't replayed the full value and
59+
// the step is still `Propose`.
60+
// For the Propose, Prevote and Precommit timeouts, the step is checked in the state machine.
61+
TimeoutKind::Commit => try_decide(co, state, metrics).await,
6462
_ => Ok(()),
6563
}
6664
}

0 commit comments

Comments
 (0)