Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 28 additions & 31 deletions crates/apollo_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
#[path = "single_height_consensus_test.rs"]
mod single_height_consensus_test;

use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashSet, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::Duration;

Expand Down Expand Up @@ -138,7 +137,8 @@ pub(crate) struct SingleHeightConsensus {
validators: Vec<ValidatorId>,
timeouts: TimeoutsConfig,
state_machine: StateMachine,
proposals: HashMap<Round, Option<ProposalCommitment>>,
// Tracks rounds for which we started validating a proposal to avoid duplicate validations.
pending_validation_rounds: HashSet<Round>,
last_prevote: Option<Vote>,
last_precommit: Option<Vote>,
height_voted_storage: Arc<Mutex<dyn HeightVotedStorageTrait>>,
Expand All @@ -162,7 +162,7 @@ impl SingleHeightConsensus {
validators,
timeouts,
state_machine,
proposals: HashMap::new(),
pending_validation_rounds: HashSet::new(),
last_prevote: None,
last_precommit: None,
height_voted_storage,
Expand Down Expand Up @@ -209,20 +209,25 @@ impl SingleHeightConsensus {
warn!("Invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer);
return Ok(ShcReturn::Tasks(Vec::new()));
}
let Entry::Vacant(proposal_entry) = self.proposals.entry(init.round) else {
warn!("Round {} already has a proposal, ignoring", init.round);
// Avoid duplicate validations:
// - If SM already has an entry for this round, a (re)proposal was already recorded.
// - If we already started validating this round, ignore repeats.
if self.state_machine.has_proposal_for_round(init.round)
|| self.pending_validation_rounds.contains(&init.round)
{
warn!("Round {} already handled a proposal, ignoring", init.round);
return Ok(ShcReturn::Tasks(Vec::new()));
};
}
let timeout = self.timeouts.get_proposal_timeout(init.round);
info!(
"Accepting {init:?}. node_round: {}, timeout: {timeout:?}",
self.state_machine.round()
);
CONSENSUS_PROPOSALS_VALID_INIT.increment(1);

// Since validating the proposal is non-blocking, we want to avoid validating the same round
// twice in parallel. This could be caused by a network repeat or a malicious spam attack.
proposal_entry.insert(None);
// Since validating the proposal is non-blocking, avoid validating the same round twice in
// parallel (e.g., due to repeats or spam).
self.pending_validation_rounds.insert(init.round);
let block_receiver = context.validate_proposal(init, timeout, p2p_messages_receiver).await;
context.set_height_and_round(height, self.state_machine.round()).await;
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)]))
Expand Down Expand Up @@ -292,23 +297,22 @@ impl SingleHeightConsensus {
CONSENSUS_PROPOSALS_INVALID.increment(1);
}

// Retaining the entry for this round prevents us from receiving another proposal on
// this round. While this prevents spam attacks it also prevents re-receiving after
// a network issue.
let old = self.proposals.insert(round, proposal_id);
assert!(
old.is_some_and(|p| p.is_none()),
"Proposal entry for round {round} should exist and be empty: {old:?}"
);
// Cleanup: validation for round {round} finished, so remove it from the pending
// set. This doesn't affect logic.
self.pending_validation_rounds.remove(&round);
let sm_events = self.state_machine.handle_event(event, &leader_fn);
self.handle_state_machine_events(context, sm_events).await
}
StateMachineEvent::GetProposal(proposal_id, round) => {
if proposal_id.is_none() {
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
}
let old = self.proposals.insert(round, proposal_id);
assert!(old.is_none(), "There should be no entry for round {round} when proposing");
// Ensure SM has no proposal recorded yet for this round when proposing.
assert!(
!self.state_machine.has_proposal_for_round(round),
"There should be no entry for round {round} when proposing"
);

assert_eq!(
round,
self.state_machine.round(),
Expand Down Expand Up @@ -477,15 +481,13 @@ impl SingleHeightConsensus {

// Make sure there is an existing proposal for the valid round and it matches the proposal
// ID.
let existing = self.proposals.get(&valid_round).and_then(|&inner| inner);
let existing = self.state_machine.proposal_id_for_round(valid_round);
assert!(
existing.is_some_and(|id| id == proposal_id),
"A proposal with ID {proposal_id:?} should exist for valid_round: {valid_round}. \
Found: {existing:?}",
);

let old = self.proposals.insert(round, Some(proposal_id));
assert!(old.is_none(), "There should be no proposal for round {round}.");
let init = ProposalInit {
height: self.state_machine.height(),
round,
Expand Down Expand Up @@ -558,14 +560,9 @@ impl SingleHeightConsensus {
))
};
let block = self
.proposals
.remove(&round)
.ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))?
.ok_or_else(|| {
invalid_decision(
"Proposal is invalid or validations haven't yet completed".to_string(),
)
})?;
.state_machine
.proposal_id_for_round(round)
.ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))?;
if block != proposal_id {
return Err(invalid_decision(format!(
"StateMachine proposal commitment should match the stored block. Shc.block_id: \
Expand Down
8 changes: 8 additions & 0 deletions crates/apollo_consensus/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ impl StateMachine {
&self.precommits
}

pub(crate) fn has_proposal_for_round(&self, round: Round) -> bool {
self.proposals.contains_key(&round)
}

pub(crate) fn proposal_id_for_round(&self, round: Round) -> Option<ProposalCommitment> {
self.proposals.get(&round).and_then(|(id, _)| *id)
}

fn make_self_vote(
&self,
vote_type: VoteType,
Expand Down
Loading