Skip to content

Commit 10cf82e

Browse files
apollo_consensus: move proposal storage to SM; dedupe in SHC
1 parent 0891466 commit 10cf82e

File tree

2 files changed

+35
-31
lines changed

2 files changed

+35
-31
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
#[path = "single_height_consensus_test.rs"]
99
mod single_height_consensus_test;
1010

11-
use std::collections::hash_map::Entry;
12-
use std::collections::{HashMap, VecDeque};
11+
use std::collections::{HashSet, VecDeque};
1312
use std::time::Duration;
1413

1514
const REBROADCAST_LOG_PERIOD_SECS: u64 = 10;
@@ -136,7 +135,8 @@ pub(crate) struct SingleHeightConsensus {
136135
validators: Vec<ValidatorId>,
137136
timeouts: TimeoutsConfig,
138137
state_machine: StateMachine,
139-
proposals: HashMap<Round, Option<ProposalCommitment>>,
138+
// Tracks rounds for which we started validating a proposal to avoid duplicate validations.
139+
pending_validation_rounds: HashSet<Round>,
140140
last_prevote: Option<Vote>,
141141
last_precommit: Option<Vote>,
142142
}
@@ -158,7 +158,7 @@ impl SingleHeightConsensus {
158158
validators,
159159
timeouts,
160160
state_machine,
161-
proposals: HashMap::new(),
161+
pending_validation_rounds: HashSet::new(),
162162
last_prevote: None,
163163
last_precommit: None,
164164
}
@@ -204,20 +204,25 @@ impl SingleHeightConsensus {
204204
warn!("Invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer);
205205
return Ok(ShcReturn::Tasks(Vec::new()));
206206
}
207-
let Entry::Vacant(proposal_entry) = self.proposals.entry(init.round) else {
208-
warn!("Round {} already has a proposal, ignoring", init.round);
207+
// Avoid duplicate validations:
208+
// - If SM already has an entry for this round, a (re)proposal was already recorded.
209+
// - If we already started validating this round, ignore repeats.
210+
if self.state_machine.proposals_ref().get(&init.round).is_some()
211+
|| self.pending_validation_rounds.contains(&init.round)
212+
{
213+
warn!("Round {} already handled a proposal, ignoring", init.round);
209214
return Ok(ShcReturn::Tasks(Vec::new()));
210-
};
215+
}
211216
let timeout = self.timeouts.proposal_timeout;
212217
info!(
213218
"Accepting {init:?}. node_round: {}, timeout: {timeout:?}",
214219
self.state_machine.round()
215220
);
216221
CONSENSUS_PROPOSALS_VALID_INIT.increment(1);
217222

218-
// Since validating the proposal is non-blocking, we want to avoid validating the same round
219-
// twice in parallel. This could be caused by a network repeat or a malicious spam attack.
220-
proposal_entry.insert(None);
223+
// Since validating the proposal is non-blocking, avoid validating the same round twice in
224+
// parallel (e.g., due to repeats or spam).
225+
self.pending_validation_rounds.insert(init.round);
221226
let block_receiver = context.validate_proposal(init, timeout, p2p_messages_receiver).await;
222227
context.set_height_and_round(height, self.state_machine.round()).await;
223228
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)]))
@@ -287,14 +292,8 @@ impl SingleHeightConsensus {
287292
CONSENSUS_PROPOSALS_INVALID.increment(1);
288293
}
289294

290-
// Retaining the entry for this round prevents us from receiving another proposal on
291-
// this round. While this prevents spam attacks it also prevents re-receiving after
292-
// a network issue.
293-
let old = self.proposals.insert(round, proposal_id);
294-
assert!(
295-
old.is_some_and(|p| p.is_none()),
296-
"Proposal entry for round {round} should exist and be empty: {old:?}"
297-
);
295+
// Validation for this round finished; clear the pending marker.
296+
self.pending_validation_rounds.remove(&round);
298297
let sm_events = self.state_machine.handle_event(
299298
StateMachineEvent::Proposal(proposal_id, round, valid_round),
300299
&leader_fn,
@@ -305,8 +304,12 @@ impl SingleHeightConsensus {
305304
if proposal_id.is_none() {
306305
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
307306
}
308-
let old = self.proposals.insert(round, proposal_id);
309-
assert!(old.is_none(), "There should be no entry for round {round} when proposing");
307+
// Ensure SM has no proposal recorded yet for this round when proposing.
308+
assert!(
309+
self.state_machine.proposals_ref().get(&round).is_none(),
310+
"There should be no entry for round {round} when proposing"
311+
);
312+
310313
assert_eq!(
311314
round,
312315
self.state_machine.round(),
@@ -463,15 +466,13 @@ impl SingleHeightConsensus {
463466

464467
// Make sure there is an existing proposal for the valid round and it matches the proposal
465468
// ID.
466-
let existing = self.proposals.get(&valid_round).and_then(|&inner| inner);
469+
let existing = self.state_machine.proposals_ref().get(&valid_round).and_then(|(id, _)| *id);
467470
assert!(
468471
existing.is_some_and(|id| id == proposal_id),
469472
"A proposal with ID {proposal_id:?} should exist for valid_round: {valid_round}. \
470473
Found: {existing:?}",
471474
);
472475

473-
let old = self.proposals.insert(round, Some(proposal_id));
474-
assert!(old.is_none(), "There should be no proposal for round {round}.");
475476
let init = ProposalInit {
476477
height: self.state_machine.height(),
477478
round,
@@ -535,14 +536,11 @@ impl SingleHeightConsensus {
535536
))
536537
};
537538
let block = self
538-
.proposals
539-
.remove(&round)
540-
.ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))?
541-
.ok_or_else(|| {
542-
invalid_decision(
543-
"Proposal is invalid or validations haven't yet completed".to_string(),
544-
)
545-
})?;
539+
.state_machine
540+
.proposals_ref()
541+
.get(&round)
542+
.and_then(|(id, _)| *id)
543+
.ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))?;
546544
if block != proposal_id {
547545
return Err(invalid_decision(format!(
548546
"StateMachine proposal commitment should match the stored block. Shc.block_id: \

crates/apollo_consensus/src/state_machine.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,12 @@ impl StateMachine {
153153
&self.precommits
154154
}
155155

156+
pub(crate) fn proposals_ref(
157+
&self,
158+
) -> &HashMap<Round, (Option<ProposalCommitment>, Option<Round>)> {
159+
&self.proposals
160+
}
161+
156162
fn make_self_vote(
157163
&self,
158164
vote_type: VoteType,

0 commit comments

Comments
 (0)