diff --git a/crates/apollo_consensus/src/single_height_consensus.rs b/crates/apollo_consensus/src/single_height_consensus.rs index 33cbc0a2716..89ce4bca828 100644 --- a/crates/apollo_consensus/src/single_height_consensus.rs +++ b/crates/apollo_consensus/src/single_height_consensus.rs @@ -8,8 +8,7 @@ #[path = "single_height_consensus_test.rs"] mod single_height_consensus_test; -use std::collections::hash_map::Entry; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashSet, VecDeque}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -138,7 +137,8 @@ pub(crate) struct SingleHeightConsensus { validators: Vec, timeouts: TimeoutsConfig, state_machine: StateMachine, - proposals: HashMap>, + // Tracks rounds for which we started validating a proposal to avoid duplicate validations. + pending_validation_rounds: HashSet, last_prevote: Option, last_precommit: Option, height_voted_storage: Arc>, @@ -162,7 +162,7 @@ impl SingleHeightConsensus { validators, timeouts, state_machine, - proposals: HashMap::new(), + pending_validation_rounds: HashSet::new(), last_prevote: None, last_precommit: None, height_voted_storage, @@ -209,10 +209,15 @@ impl SingleHeightConsensus { warn!("Invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer); return Ok(ShcReturn::Tasks(Vec::new())); } - let Entry::Vacant(proposal_entry) = self.proposals.entry(init.round) else { - warn!("Round {} already has a proposal, ignoring", init.round); + // Avoid duplicate validations: + // - If SM already has an entry for this round, a (re)proposal was already recorded. + // - If we already started validating this round, ignore repeats. + if self.state_machine.has_proposal_for_round(init.round) + || self.pending_validation_rounds.contains(&init.round) + { + warn!("Round {} already handled a proposal, ignoring", init.round); return Ok(ShcReturn::Tasks(Vec::new())); - }; + } let timeout = self.timeouts.get_proposal_timeout(init.round); info!( "Accepting {init:?}. node_round: {}, timeout: {timeout:?}", @@ -220,9 +225,9 @@ impl SingleHeightConsensus { ); CONSENSUS_PROPOSALS_VALID_INIT.increment(1); - // Since validating the proposal is non-blocking, we want to avoid validating the same round - // twice in parallel. This could be caused by a network repeat or a malicious spam attack. - proposal_entry.insert(None); + // Since validating the proposal is non-blocking, avoid validating the same round twice in + // parallel (e.g., due to repeats or spam). + self.pending_validation_rounds.insert(init.round); let block_receiver = context.validate_proposal(init, timeout, p2p_messages_receiver).await; context.set_height_and_round(height, self.state_machine.round()).await; Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)])) @@ -292,14 +297,9 @@ impl SingleHeightConsensus { CONSENSUS_PROPOSALS_INVALID.increment(1); } - // Retaining the entry for this round prevents us from receiving another proposal on - // this round. While this prevents spam attacks it also prevents re-receiving after - // a network issue. - let old = self.proposals.insert(round, proposal_id); - assert!( - old.is_some_and(|p| p.is_none()), - "Proposal entry for round {round} should exist and be empty: {old:?}" - ); + // Cleanup: validation for round {round} finished, so remove it from the pending + // set. This doesn't affect logic. + self.pending_validation_rounds.remove(&round); let sm_events = self.state_machine.handle_event(event, &leader_fn); self.handle_state_machine_events(context, sm_events).await } @@ -307,8 +307,12 @@ impl SingleHeightConsensus { if proposal_id.is_none() { CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1); } - let old = self.proposals.insert(round, proposal_id); - assert!(old.is_none(), "There should be no entry for round {round} when proposing"); + // Ensure SM has no proposal recorded yet for this round when proposing. + assert!( + !self.state_machine.has_proposal_for_round(round), + "There should be no entry for round {round} when proposing" + ); + assert_eq!( round, self.state_machine.round(), @@ -477,15 +481,13 @@ impl SingleHeightConsensus { // Make sure there is an existing proposal for the valid round and it matches the proposal // ID. - let existing = self.proposals.get(&valid_round).and_then(|&inner| inner); + let existing = self.state_machine.proposal_id_for_round(valid_round); assert!( existing.is_some_and(|id| id == proposal_id), "A proposal with ID {proposal_id:?} should exist for valid_round: {valid_round}. \ Found: {existing:?}", ); - let old = self.proposals.insert(round, Some(proposal_id)); - assert!(old.is_none(), "There should be no proposal for round {round}."); let init = ProposalInit { height: self.state_machine.height(), round, @@ -558,14 +560,9 @@ impl SingleHeightConsensus { )) }; let block = self - .proposals - .remove(&round) - .ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))? - .ok_or_else(|| { - invalid_decision( - "Proposal is invalid or validations haven't yet completed".to_string(), - ) - })?; + .state_machine + .proposal_id_for_round(round) + .ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))?; if block != proposal_id { return Err(invalid_decision(format!( "StateMachine proposal commitment should match the stored block. Shc.block_id: \ diff --git a/crates/apollo_consensus/src/state_machine.rs b/crates/apollo_consensus/src/state_machine.rs index 0ad1ef60508..246c9faad1a 100644 --- a/crates/apollo_consensus/src/state_machine.rs +++ b/crates/apollo_consensus/src/state_machine.rs @@ -162,6 +162,14 @@ impl StateMachine { &self.precommits } + pub(crate) fn has_proposal_for_round(&self, round: Round) -> bool { + self.proposals.contains_key(&round) + } + + pub(crate) fn proposal_id_for_round(&self, round: Round) -> Option { + self.proposals.get(&round).and_then(|(id, _)| *id) + } + fn make_self_vote( &self, vote_type: VoteType,