Skip to content

Commit 92e5452

Browse files
apollo_consensus: move proposal storage to SM; dedupe in SHC
1 parent 70bd845 commit 92e5452

File tree

2 files changed

+35
-31
lines changed

2 files changed

+35
-31
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
#[path = "single_height_consensus_test.rs"]
99
mod single_height_consensus_test;
1010

11-
use std::collections::hash_map::Entry;
12-
use std::collections::{HashMap, VecDeque};
11+
use std::collections::{HashSet, VecDeque};
1312
use std::time::Duration;
1413

1514
const REBROADCAST_LOG_PERIOD_SECS: u64 = 10;
@@ -136,7 +135,8 @@ pub(crate) struct SingleHeightConsensus {
136135
validators: Vec<ValidatorId>,
137136
timeouts: TimeoutsConfig,
138137
state_machine: StateMachine,
139-
proposals: HashMap<Round, Option<ProposalCommitment>>,
138+
// Tracks rounds for which we started validating a proposal to avoid duplicate validations.
139+
pending_validation_rounds: HashSet<Round>,
140140
last_prevote: Option<Vote>,
141141
last_precommit: Option<Vote>,
142142
}
@@ -158,7 +158,7 @@ impl SingleHeightConsensus {
158158
validators,
159159
timeouts,
160160
state_machine,
161-
proposals: HashMap::new(),
161+
pending_validation_rounds: HashSet::new(),
162162
last_prevote: None,
163163
last_precommit: None,
164164
}
@@ -204,20 +204,25 @@ impl SingleHeightConsensus {
204204
warn!("Invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer);
205205
return Ok(ShcReturn::Tasks(Vec::new()));
206206
}
207-
let Entry::Vacant(proposal_entry) = self.proposals.entry(init.round) else {
208-
warn!("Round {} already has a proposal, ignoring", init.round);
207+
// Avoid duplicate validations:
208+
// - If SM already has an entry for this round, a (re)proposal was already recorded.
209+
// - If we already started validating this round, ignore repeats.
210+
if self.state_machine.has_proposal_for_round(init.round)
211+
|| self.pending_validation_rounds.contains(&init.round)
212+
{
213+
warn!("Round {} already handled a proposal, ignoring", init.round);
209214
return Ok(ShcReturn::Tasks(Vec::new()));
210-
};
215+
}
211216
let timeout = self.timeouts.get_proposal_timeout(init.round);
212217
info!(
213218
"Accepting {init:?}. node_round: {}, timeout: {timeout:?}",
214219
self.state_machine.round()
215220
);
216221
CONSENSUS_PROPOSALS_VALID_INIT.increment(1);
217222

218-
// Since validating the proposal is non-blocking, we want to avoid validating the same round
219-
// twice in parallel. This could be caused by a network repeat or a malicious spam attack.
220-
proposal_entry.insert(None);
223+
// Since validating the proposal is non-blocking, avoid validating the same round twice in
224+
// parallel (e.g., due to repeats or spam).
225+
self.pending_validation_rounds.insert(init.round);
221226
let block_receiver = context.validate_proposal(init, timeout, p2p_messages_receiver).await;
222227
context.set_height_and_round(height, self.state_machine.round()).await;
223228
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)]))
@@ -287,23 +292,21 @@ impl SingleHeightConsensus {
287292
CONSENSUS_PROPOSALS_INVALID.increment(1);
288293
}
289294

290-
// Retaining the entry for this round prevents us from receiving another proposal on
291-
// this round. While this prevents spam attacks it also prevents re-receiving after
292-
// a network issue.
293-
let old = self.proposals.insert(round, proposal_id);
294-
assert!(
295-
old.is_some_and(|p| p.is_none()),
296-
"Proposal entry for round {round} should exist and be empty: {old:?}"
297-
);
295+
// Validation for this round finished; clear the pending marker.
296+
self.pending_validation_rounds.remove(&round);
298297
let sm_events = self.state_machine.handle_event(event, &leader_fn);
299298
self.handle_state_machine_events(context, sm_events).await
300299
}
301300
StateMachineEvent::GetProposal(proposal_id, round) => {
302301
if proposal_id.is_none() {
303302
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
304303
}
305-
let old = self.proposals.insert(round, proposal_id);
306-
assert!(old.is_none(), "There should be no entry for round {round} when proposing");
304+
// Ensure SM has no proposal recorded yet for this round when proposing.
305+
assert!(
306+
!self.state_machine.has_proposal_for_round(round),
307+
"There should be no entry for round {round} when proposing"
308+
);
309+
307310
assert_eq!(
308311
round,
309312
self.state_machine.round(),
@@ -472,15 +475,13 @@ impl SingleHeightConsensus {
472475

473476
// Make sure there is an existing proposal for the valid round and it matches the proposal
474477
// ID.
475-
let existing = self.proposals.get(&valid_round).and_then(|&inner| inner);
478+
let existing = self.state_machine.proposal_id_for_round(valid_round);
476479
assert!(
477480
existing.is_some_and(|id| id == proposal_id),
478481
"A proposal with ID {proposal_id:?} should exist for valid_round: {valid_round}. \
479482
Found: {existing:?}",
480483
);
481484

482-
let old = self.proposals.insert(round, Some(proposal_id));
483-
assert!(old.is_none(), "There should be no proposal for round {round}.");
484485
let init = ProposalInit {
485486
height: self.state_machine.height(),
486487
round,
@@ -544,14 +545,9 @@ impl SingleHeightConsensus {
544545
))
545546
};
546547
let block = self
547-
.proposals
548-
.remove(&round)
549-
.ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))?
550-
.ok_or_else(|| {
551-
invalid_decision(
552-
"Proposal is invalid or validations haven't yet completed".to_string(),
553-
)
554-
})?;
548+
.state_machine
549+
.proposal_id_for_round(round)
550+
.ok_or_else(|| invalid_decision("No proposal entry for this round".to_string()))?;
555551
if block != proposal_id {
556552
return Err(invalid_decision(format!(
557553
"StateMachine proposal commitment should match the stored block. Shc.block_id: \

crates/apollo_consensus/src/state_machine.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,14 @@ impl StateMachine {
153153
&self.precommits
154154
}
155155

156+
pub(crate) fn has_proposal_for_round(&self, round: Round) -> bool {
157+
self.proposals.contains_key(&round)
158+
}
159+
160+
pub(crate) fn proposal_id_for_round(&self, round: Round) -> Option<ProposalCommitment> {
161+
self.proposals.get(&round).and_then(|(id, _)| *id)
162+
}
163+
156164
fn make_self_vote(
157165
&self,
158166
vote_type: VoteType,

0 commit comments

Comments
 (0)