Skip to content

Commit 31bd3d1

Browse files
apollo_consensus: move proposal storage to SM; dedupe in SHC
1 parent 4771243 commit 31bd3d1

File tree

2 files changed

+35
-31
lines changed

2 files changed

+35
-31
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
#[path = "single_height_consensus_test.rs"]
99
mod single_height_consensus_test;
1010

11-
use std::collections::hash_map::Entry;
12-
use std::collections::{HashMap, VecDeque};
11+
use std::collections::{HashSet, VecDeque};
1312
use std::sync::{Arc, Mutex};
1413
use std::time::Duration;
1514

@@ -138,7 +137,8 @@ pub(crate) struct SingleHeightConsensus {
138137
validators: Vec<ValidatorId>,
139138
timeouts: TimeoutsConfig,
140139
state_machine: StateMachine,
141-
proposals: HashMap<Round, Option<ProposalCommitment>>,
140+
// Tracks rounds for which we started validating a proposal to avoid duplicate validations.
141+
pending_validation_rounds: HashSet<Round>,
142142
last_prevote: Option<Vote>,
143143
last_precommit: Option<Vote>,
144144
height_voted_storage: Arc<Mutex<dyn HeightVotedStorageTrait>>,
@@ -162,7 +162,7 @@ impl SingleHeightConsensus {
162162
validators,
163163
timeouts,
164164
state_machine,
165-
proposals: HashMap::new(),
165+
pending_validation_rounds: HashSet::new(),
166166
last_prevote: None,
167167
last_precommit: None,
168168
height_voted_storage,
@@ -209,20 +209,25 @@ impl SingleHeightConsensus {
209209
warn!("Invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer);
210210
return Ok(ShcReturn::Tasks(Vec::new()));
211211
}
212-
let Entry::Vacant(proposal_entry) = self.proposals.entry(init.round) else {
213-
warn!("Round {} already has a proposal, ignoring", init.round);
212+
// Avoid duplicate validations:
213+
// - If SM already has an entry for this round, a (re)proposal was already recorded.
214+
// - If we already started validating this round, ignore repeats.
215+
if self.state_machine.has_proposal_for_round(init.round)
216+
|| self.pending_validation_rounds.contains(&init.round)
217+
{
218+
warn!("Round {} already handled a proposal, ignoring", init.round);
214219
return Ok(ShcReturn::Tasks(Vec::new()));
215-
};
220+
}
216221
let timeout = self.timeouts.get_proposal_timeout(init.round);
217222
info!(
218223
"Accepting {init:?}. node_round: {}, timeout: {timeout:?}",
219224
self.state_machine.round()
220225
);
221226
CONSENSUS_PROPOSALS_VALID_INIT.increment(1);
222227

223-
// Since validating the proposal is non-blocking, we want to avoid validating the same round
224-
// twice in parallel. This could be caused by a network repeat or a malicious spam attack.
225-
proposal_entry.insert(None);
228+
// Since validating the proposal is non-blocking, avoid validating the same round twice in
229+
// parallel (e.g., due to repeats or spam).
230+
self.pending_validation_rounds.insert(init.round);
226231
let block_receiver = context.validate_proposal(init, timeout, p2p_messages_receiver).await;
227232
context.set_height_and_round(height, self.state_machine.round()).await;
228233
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)]))
@@ -292,23 +297,21 @@ impl SingleHeightConsensus {
292297
CONSENSUS_PROPOSALS_INVALID.increment(1);
293298
}
294299

295-
// Retaining the entry for this round prevents us from receiving another proposal on
296-
// this round. While this prevents spam attacks it also prevents re-receiving after
297-
// a network issue.
298-
let old = self.proposals.insert(round, proposal_id);
299-
assert!(
300-
old.is_some_and(|p| p.is_none()),
301-
"Proposal entry for round {round} should exist and be empty: {old:?}"
302-
);
300+
// Validation for this round finished; clear the pending marker.
301+
self.pending_validation_rounds.remove(&round);
303302
let sm_events = self.state_machine.handle_event(event, &leader_fn);
304303
self.handle_state_machine_events(context, sm_events).await
305304
}
306305
StateMachineEvent::GetProposal(proposal_id, round) => {
307306
if proposal_id.is_none() {
308307
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
309308
}
310-
let old = self.proposals.insert(round, proposal_id);
311-
assert!(old.is_none(), "There should be no entry for round {round} when proposing");
309+
// Ensure SM has no proposal recorded yet for this round when proposing.
310+
assert!(
311+
!self.state_machine.has_proposal_for_round(round),
312+
"There should be no entry for round {round} when proposing"
313+
);
314+
312315
assert_eq!(
313316
round,
314317
self.state_machine.round(),
@@ -477,15 +480,13 @@ impl SingleHeightConsensus {
477480

478481
// Make sure there is an existing proposal for the valid round and it matches the proposal
479482
// ID.
480-
let existing = self.proposals.get(&valid_round).and_then(|&inner| inner);
483+
let existing = self.state_machine.proposal_id_for_round(valid_round);
481484
assert!(
482485
existing.is_some_and(|id| id == proposal_id),
483486
"A proposal with ID {proposal_id:?} should exist for valid_round: {valid_round}. \
484487
Found: {existing:?}",
485488
);
486489

487-
let old = self.proposals.insert(round, Some(proposal_id));
488-
assert!(old.is_none(), "There should be no proposal for round {round}.");
489490
let init = ProposalInit {
490491
height: self.state_machine.height(),
491492
round,
@@ -558,14 +559,9 @@ impl SingleHeightConsensus {
558559
))
559560
};
560561
let block = self
561-
.proposals
562-
.remove(&round)
563-
.ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))?
564-
.ok_or_else(|| {
565-
invalid_decision(
566-
"Proposal is invalid or validations haven't yet completed".to_string(),
567-
)
568-
})?;
562+
.state_machine
563+
.proposal_id_for_round(round)
564+
.ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))?;
569565
if block != proposal_id {
570566
return Err(invalid_decision(format!(
571567
"StateMachine proposal commitment should match the stored block. Shc.block_id: \

crates/apollo_consensus/src/state_machine.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,14 @@ impl StateMachine {
153153
&self.precommits
154154
}
155155

156+
pub(crate) fn has_proposal_for_round(&self, round: Round) -> bool {
157+
self.proposals.contains_key(&round)
158+
}
159+
160+
pub(crate) fn proposal_id_for_round(&self, round: Round) -> Option<ProposalCommitment> {
161+
self.proposals.get(&round).and_then(|(id, _)| *id)
162+
}
163+
156164
fn make_self_vote(
157165
&self,
158166
vote_type: VoteType,

0 commit comments

Comments
 (0)